linux-foundation

New features in the Python deltalake 0.12.0 release

The Python deltalake 0.12.0 release is a game-changer, introducing a suite of powerful Delta operations such as MERGE (upserts), DELETE, and UPDATE. In addition, this release brings substantial performance enhancements, boosting read performance by up to 10 times. Furthermore, it significantly improves interoperability with large Arrow data types during writing.

MERGE operation: Effortless Data Integration

Merging data is one of the most versatile operations that Delta Lake brings to the table. It can handle updates, deletions, and inserts simultaneously, making it a pivotal tool for data engineers and data scientists. Some of the use cases are handling Slowly Changing Dimensions (SCD), processing only the changed data (CDC), handling deduplication during writing, and various other cases. Let's explore the capabilities of MERGE with a real-world example.

Upserts Made Easy

Imagine you have a source table or DataFrame, and you want to upsert its data into a target table using .merge(), when_matched_update(), and when_not_matched_upsert(). Let's consider a scenario where we have our initial load of the SalesOrder table.

Copy
from deltalake import DeltaTable, write_deltalake
from datetime import datetime
import polars as pl

df = pl.DataFrame(
    {
        "sales_order_id": ["1000", "1001", "1002", "1003"],
        "product": ["bike", "scooter", "car", "motorcycle"],
        "order_date": [
            datetime(2023, 1, 1),
            datetime(2023, 1, 5),
            datetime(2023, 1, 10),
            datetime(2023, 2, 1),
        ],
        "sales_price": [120.25, 2400, 32000, 9000],
        "paid_by_customer": [True, False, False, True],
    }
)
print(df)

df.write_delta("sales_orders", mode="append")
Copy
    ┌────────────────┬────────────┬─────────────────────┬─────────────┬──────────────────┐
     sales_order_id  product     order_date           sales_price  paid_by_customer 
     ---             ---         ---                  ---          ---              
     str             str         datetime[μs]         f64          bool             
    ╞════════════════╪════════════╪═════════════════════╪═════════════╪══════════════════╡
     1000            bike        2023-01-01 00:00:00  120.25       true             
     1001            scooter     2023-01-05 00:00:00  2400.0       false            
     1002            car         2023-01-10 00:00:00  32000.0      false            
     1003            motorcycle  2023-02-01 00:00:00  9000.0       true             
    └────────────────┴────────────┴─────────────────────┴─────────────┴──────────────────┘

The above code demonstrates how to write the initial data into a Delta table. Now, let's say we receive a new batch of changed data, including updates, as some customers have paid their orders, and we've adjusted prices for certain products. We've also made a new car sale.

Copy
new_data = pl.DataFrame(
    {
        "sales_order_id": ["1002", "1004"],
        "product": ["car", "car"],
        "order_date": [datetime(2023, 1, 10), datetime(2023, 2, 5)],
        "sales_price": [30000.0, 40000.0],
        "paid_by_customer": [True, True],
    }
)

To perform the upsert, we load the sales order table as a DeltaTable object and use the .merge() operation to specify the source data and the predicate for matching records. The when_matched_update_all() and when_not_matched_insert_al() functions are then used to specify how updates and inserts should be handled for all columns. Finally, the operation is executed with execute().

You can optionally provide source_alias, target_alias and error_on_type_mismatch parameters. By default, the merge operation will error out if there are type mismatches.

Copy
from polars.io.delta import _convert_pa_schema_to_delta

dt = DeltaTable("sales_orders")
source = new_data.to_arrow()
delta_schema = _convert_pa_schema_to_delta(source.schema)
source = source.cast(delta_schema)

(
    dt.merge(
        source=source,
        predicate="s.sales_order_id = t.sales_order_id",
        source_alias="s",
        target_alias="t",
    )
    .when_matched_update_all()
    .when_not_matched_insert_all()
    .execute()
)
Copy
{'num_source_rows': 2,
    'num_target_rows_inserted': 1,
    'num_target_rows_updated': 1,
    'num_target_rows_deleted': 0,
    'num_target_rows_copied': 3,
    'num_output_rows': 5,
    'num_target_files_added': 1,
    'num_target_files_removed': 1,
    'execution_time_ms': 51,
    'scan_time_ms': 0,
    'rewrite_time_ms': 45}

We can see in the metrics, one target_row got inserted and one got updated. If we read the delta table with Polars we can see that order_id 1002 got an updated sales_price and paid_by_customer field, and we added a new sales with order_id 1004.

Copy
print(pl.read_delta("sales_orders"))
Copy
┌────────────────┬────────────┬─────────────────────┬─────────────┬──────────────────┐
 sales_order_id  product     order_date           sales_price  paid_by_customer 
 ---             ---         ---                  ---          ---              
 str             str         datetime[μs]         f64          bool             
╞════════════════╪════════════╪═════════════════════╪═════════════╪══════════════════╡
 1002            car         2023-01-10 00:00:00  30000.0      true             
 1000            bike        2023-01-01 00:00:00  120.25       true             
 1001            scooter     2023-01-05 00:00:00  2400.0       false            
 1003            motorcycle  2023-02-01 00:00:00  9000.0       true             
 1004            car         2023-02-05 00:00:00  40000.0      true             
└────────────────┴────────────┴─────────────────────┴─────────────┴──────────────────┘

This real-world example illustrates how the MERGE operation in Delta-RS simplifies data integration, enabling you to handle changes, updates, and inserts seamlessly. For more examples of what Merge allows you to do, take a look at the Delta Lake Merge blog post.

UPDATE operation: Fine-Tuning data

Sometimes, you just need to make specific updates in your data based on a predicate. In this case, the .update() operation comes to the rescue. Let's consider a simple example where we need to correct a typo in the "product" column.

Copy
df = pl.DataFrame({"id": [1, 2, 3], "product": ["appl", "apple", "kiwi"]})
df.write_delta("update_test", mode="overwrite")

With the data in place, we load the Delta table and use the .update() operation to fix the typo in the "product" column where the predicate is met. Soon, you'll be able to pass Python objects as input to the .update() operation, making it even more flexible.

Copy
dt = DeltaTable("update_test")

dt.update(updates={"product": "'apple'"}, predicate="product = 'appl'")
print(pl.read_delta("update_test"))
Copy
┌─────┬─────────┐
│ id  ┆ product │
│ --- ┆ ---     │
│ i64 ┆ str     │
╞═════╪═════════╡
│ 1   ┆ apple   │
│ 2   ┆ apple   │
│ 3   ┆ kiwi    │
└─────┴─────────┘

The result is a corrected "product" column in our Delta table, showcasing how the UPDATE operation allows you to fine-tune your data effortlessly.

DELETE operation: Managing Data Cleanup

The DELETE operation is a powerful tool for data cleanup and management. For instance, you can use it to remove soft-deleted records from your table. Here's an example of how it works.

Copy
df = pl.DataFrame({"id": [1, 2, 3], "deleted": [False, True, True]})
df.write_delta("delete_test", mode="overwrite")

After loading the Delta table, we use the .delete() operation with the predicate "deleted = True" to remove the flagged records. If no predicate is provided, all records are deleted.

Copy
dt = DeltaTable("delete_test")

dt.delete(predicate="deleted = True")
print(pl.read_delta("delete_test"))
Copy
┌─────┬─────────┐
│ id  ┆ deleted │
│ --- ┆ ---     │
│ i64bool    │
╞═════╪═════════╡
│ 1false   │
└─────┴─────────┘

The result is a cleaned-up Delta table with only the non-deleted records, illustrating how the DELETE operation is a valuable operation for data management.

Enhanced Interoperability with Large Arrow Data

In this release, Delta-RS introduces enhanced interoperability with large Arrow data types. You can now do consecutive writes with Arrow tables that have large Arrow types directly using write_deltalake by just enabling large_dtypes=True.

Copy
df = pl.DataFrame({"id": [1, 2, 3], "deleted": [False, True, False]})
Copy
source = df.to_arrow()
print(f"schema: {source.schema}\n")
write_deltalake("sample_table", source, large_dtypes=True, mode="overwrite")

dt = DeltaTable("sample_table")
print(f"history: {dt.history()[0]}")
Copy
schema:
    id: int64
    deleted: bool

history: {'timestamp': 1697974132175, 'operation': 'WRITE', 'operationParameters': {'mode': 'Overwrite', 'partitionBy': '[]'}, 'clientVersion': 'delta-rs.0.17.0', 'version': 7}

The improved interoperability enables smoother integration with libraries like Polars, where you can work with large data types without conversion.

Conclusion: Bridging the Gap in Data Workloads

The 0.12.0 Python release of Delta-RS brings many powerful operations, such as MERGE, DELETE, and UPDATE, directly to Python users. This empowers data scientists and data engineers working in non-JVM workloads to build and enhance lakehouse solutions. It bridges the gap between small-to-medium data workloads, commonly handled with tools like Polars, DuckDB, or Pandas, and big data workloads managed with Spark.

Big shout out to the amazing contributors of Delta-RS, a lot of effort has gone into building the Rust API!

What's on the Horizon for Delta-RS?

  • Add support for table features (read support V3, writer support V7) and improving the overall protocol support
  • Integration of Rust Delta Kernel in Delta-RS
  • Use logical plans in MERGE for improved performance
  • Multiple "when" clauses in Python MERGE
  • Expose FSCK (repair) operation to Python
  • Adding VACUUM to the transaction log