diff --git a/mkdocs/docs/api.md b/mkdocs/docs/api.md index 65f91c9619..7838925162 100644 --- a/mkdocs/docs/api.md +++ b/mkdocs/docs/api.md @@ -272,10 +272,15 @@ catalog.purge_table("docs_example.bids") ## Write to a table -Reading and writing is being done using [Apache Arrow](https://arrow.apache.org/). Arrow is an in-memory columnar format for fast data interchange and in-memory analytics. Let's consider the following Arrow Table: +PyIceberg supports several write modes: [append](#append), [overwrite](#overwrite), [delete](#delete), [dynamic partition overwrite](#dynamic-partition-overwrite), and [upsert](#upsert). All writes use [Apache Arrow](https://arrow.apache.org/) as the in-memory format. Writes can be issued directly on the `Table` object or grouped together using the [Transaction API](#transaction-api). + +To set up a table for the examples below: ```python import pyarrow as pa +from pyiceberg.catalog import load_catalog + +catalog = load_catalog("default") df = pa.Table.from_pylist( [ @@ -285,19 +290,11 @@ df = pa.Table.from_pylist( {"city": "Paris", "lat": 48.864716, "long": 2.349014}, ], ) -``` - -Next, create a table using the Arrow schema: - -```python -from pyiceberg.catalog import load_catalog - -catalog = load_catalog("default") tbl = catalog.create_table("default.cities", schema=df.schema) ``` -Next, write the data to the table. Both `append` and `overwrite` produce the same result, since the table is empty on creation: +### Append @@ -306,15 +303,13 @@ Next, write the data to the table. Both `append` and `overwrite` produce the sam +Use `append` to add new rows to an existing table without modifying any existing data: + ```python tbl.append(df) - -# or - -tbl.overwrite(df) ``` -Now, the data is written to the table, and the table can be read using `tbl.scan().to_arrow()`: +After the first append, `tbl.scan().to_arrow()` returns: ```python pyarrow.Table @@ -327,7 +322,7 @@ lat: [[52.371807,37.773972,53.11254,48.864716]] long: [[4.896029,-122.431297,6.0989,2.349014]] ``` -If we want to add more data, we can use `.append()` again: +Each call to `append` produces a new [Parquet file](https://parquet.apache.org/). Calling `append` a second time adds another batch of rows: ```python tbl.append(pa.Table.from_pylist( @@ -335,7 +330,7 @@ tbl.append(pa.Table.from_pylist( )) ``` -When reading the table `tbl.scan().to_arrow()` you can see that `Groningen` is now also part of the table: +The nested lists in `tbl.scan().to_arrow()` reflect the separate Arrow buffers from each write: ```python pyarrow.Table @@ -348,98 +343,113 @@ lat: [[52.371807,37.773972,53.11254,48.864716],[53.21917]] long: [[4.896029,-122.431297,6.0989,2.349014],[6.56667]] ``` -The nested lists indicate the different Arrow buffers. Each of the writes produce a [Parquet file](https://parquet.apache.org/) where each [row group](https://parquet.apache.org/docs/concepts/) translates into an Arrow buffer. In the case where the table is large, PyIceberg also allows the option to stream the buffers using the Arrow [RecordBatchReader](https://arrow.apache.org/docs/python/generated/pyarrow.RecordBatchReader.html), avoiding pulling everything into memory right away: +To avoid type inconsistencies, convert the Iceberg table schema to Arrow before writing: ```python -for buf in tbl.scan().to_arrow_batch_reader(): - print(f"Buffer contains {len(buf)} rows") +df = pa.Table.from_pylist( + [{"city": "Groningen", "lat": 53.21917, "long": 6.56667}], schema=tbl.schema().as_arrow() +) + +tbl.append(df) ``` -To avoid any type inconsistencies during writing, you can convert the Iceberg table schema to Arrow: +Optionally, you can attach custom properties to the snapshot created by `append`, or target a specific branch: ```python -df = pa.Table.from_pylist( - [{"city": "Groningen", "lat": 53.21917, "long": 6.56667}], schema=table.schema().as_arrow() -) +tbl.append(df, snapshot_properties={"owner": "etl-job", "run_id": "abc123"}) -tbl.append(df) +# Write to a branch instead of main +tbl.append(df, branch="staging") ``` -You can delete some of the data from the table by calling `tbl.delete()` with a desired `delete_filter`. This will use the Iceberg metadata to only open up the Parquet files that contain relevant information. +### Overwrite + +`overwrite` replaces data in the table with new data. When called without an `overwrite_filter`, it behaves like a full table replacement — existing data is deleted and the new data is written. When the table is empty, `overwrite` and `append` produce the same result. ```python -tbl.delete(delete_filter="city == 'Paris'") +tbl.overwrite(df) ``` -In the above example, any records where the city field value equals to `Paris` will be deleted. Running `tbl.scan().to_arrow()` will now yield: +#### Partial overwrite with `overwrite_filter` + +Pass an `overwrite_filter` to delete only the rows that match the predicate before appending the new data. This is useful for replacing a specific subset of rows. + +For example, to replace the record for `Paris` with a record for `New York`: + +```python +from pyiceberg.expressions import EqualTo + +df_new = pa.Table.from_pylist( + [{"city": "New York", "lat": 40.7128, "long": 74.0060}] +) +tbl.overwrite(df_new, overwrite_filter=EqualTo("city", "Paris")) +``` + +After the overwrite, `tbl.scan().to_arrow()` yields: ```python pyarrow.Table -city: string +city: large_string lat: double long: double ---- -city: [["Amsterdam","San Francisco","Drachten"],["Groningen"]] -lat: [[52.371807,37.773972,53.11254],[53.21917]] -long: [[4.896029,-122.431297,6.0989],[6.56667]] +city: [["New York"],["Amsterdam","San Francisco","Drachten"]] +lat: [[40.7128],[52.371807,37.773972,53.11254]] +long: [[74.006],[4.896029,-122.431297,6.0989]] ``` -In the case of `tbl.delete(delete_filter="city == 'Groningen'")`, the whole Parquet file will be dropped without checking it contents, since from the Iceberg metadata PyIceberg can derive that all the content in the file matches the predicate. +The `overwrite_filter` accepts both expression objects (e.g., `EqualTo`, `GreaterThan`) and SQL-style string predicates (e.g., `"city == 'Paris'"`). Matching is case-sensitive by default; pass `case_sensitive=False` to change this. -### Partial overwrites - -When using the `overwrite` API, you can use an `overwrite_filter` to delete data that matches the filter before appending new data into the table. For example, consider the following Iceberg table: +Optionally, you can also set snapshot properties or target a branch: ```python -import pyarrow as pa -df = pa.Table.from_pylist( - [ - {"city": "Amsterdam", "lat": 52.371807, "long": 4.896029}, - {"city": "San Francisco", "lat": 37.773972, "long": -122.431297}, - {"city": "Drachten", "lat": 53.11254, "long": 6.0989}, - {"city": "Paris", "lat": 48.864716, "long": 2.349014}, - ], -) +tbl.overwrite(df_new, overwrite_filter=EqualTo("city", "Paris"), snapshot_properties={"owner": "etl-job"}) -from pyiceberg.catalog import load_catalog -catalog = load_catalog("default") +# Write to a branch instead of main +tbl.overwrite(df_new, overwrite_filter=EqualTo("city", "Paris"), branch="staging") +``` -tbl = catalog.create_table("default.cities", schema=df.schema) +### Delete -tbl.append(df) -``` +Use `delete` to remove rows matching a predicate without writing new data. PyIceberg uses Iceberg metadata to prune which Parquet files need to be opened, so only relevant files are read. The filter is case-sensitive by default; pass `case_sensitive=False` to change this. + + -You can overwrite the record of `Paris` with a record of `New York`: +!!! warning "Merge-on-read not yet supported" + If the table property `write.delete.mode` is set to `merge-on-read`, PyIceberg will fall back to copy-on-write and emit a warning. All deletes currently rewrite Parquet files. + + ```python -from pyiceberg.expressions import EqualTo -df = pa.Table.from_pylist( - [ - {"city": "New York", "lat": 40.7128, "long": 74.0060}, - ] -) -tbl.overwrite(df, overwrite_filter=EqualTo('city', "Paris")) +tbl.delete(delete_filter="city == 'Paris'") ``` -This produces the following result with `tbl.scan().to_arrow()`: +Any rows where `city` equals `Paris` are removed. Running `tbl.scan().to_arrow()` afterwards yields: ```python pyarrow.Table -city: large_string +city: string lat: double long: double ---- -city: [["New York"],["Amsterdam","San Francisco","Drachten"]] -lat: [[40.7128],[52.371807,37.773972,53.11254]] -long: [[74.006],[4.896029,-122.431297,6.0989]] +city: [["Amsterdam","San Francisco","Drachten"],["Groningen"]] +lat: [[52.371807,37.773972,53.11254],[53.21917]] +long: [[4.896029,-122.431297,6.0989],[6.56667]] ``` -If the PyIceberg table is partitioned, you can use `tbl.dynamic_partition_overwrite(df)` to replace the existing partitions with new ones provided in the dataframe. The partitions to be replaced are detected automatically from the provided arrow table. -For example, with an iceberg table with a partition specified on `"city"` field: +When the predicate matches all rows in a Parquet file (e.g., `tbl.delete(delete_filter="city == 'Groningen'")`), PyIceberg drops the entire file without scanning its contents. + +### Dynamic Partition Overwrite + +For partitioned tables, `dynamic_partition_overwrite` replaces only the partitions present in the provided Arrow table. The partitions to overwrite are detected automatically — you do not need to specify them explicitly. + +First, create a partitioned table: ```python from pyiceberg.schema import Schema from pyiceberg.types import DoubleType, NestedField, StringType +from pyiceberg.partitioning import PartitionSpec, PartitionField +from pyiceberg.transforms import IdentityTransform schema = Schema( NestedField(1, "city", StringType(), required=False), @@ -454,23 +464,21 @@ tbl = catalog.create_table( ) ``` -And we want to overwrite the data for the partition of `"Paris"`: +Write some initial data: ```python -import pyarrow as pa - df = pa.Table.from_pylist( [ {"city": "Amsterdam", "lat": 52.371807, "long": 4.896029}, {"city": "San Francisco", "lat": 37.773972, "long": -122.431297}, {"city": "Drachten", "lat": 53.11254, "long": 6.0989}, - {"city": "Paris", "lat": -48.864716, "long": -2.349014}, + {"city": "Paris", "lat": -48.864716, "long": -2.349014}, # incorrect coordinates ], ) tbl.append(df) ``` -Then we can call `dynamic_partition_overwrite` with this arrow table: +To correct only the `Paris` partition: ```python df_corrected = pa.Table.from_pylist([ @@ -479,7 +487,7 @@ df_corrected = pa.Table.from_pylist([ tbl.dynamic_partition_overwrite(df_corrected) ``` -This produces the following result with `tbl.scan().to_arrow()`: +Only the `Paris` partition is replaced. All other partitions remain unchanged. `tbl.scan().to_arrow()` now yields: ```python pyarrow.Table @@ -492,6 +500,35 @@ lat: [[48.864716],[52.371807],[53.11254],[37.773972]] long: [[2.349014],[4.896029],[6.0989],[-122.431297]] ``` +### Transaction API + +All write operations can also be issued as part of a transaction, which lets you combine multiple mutations — including schema changes, property updates, and data writes — into a single atomic commit. + +```python +with tbl.transaction() as txn: + txn.append(df) +``` + +You can combine multiple write operations in one transaction: + +```python +with tbl.transaction() as txn: + txn.delete("city == 'Paris'") + txn.append(pa.Table.from_pylist([{"city": "New York", "lat": 40.7128, "long": 74.0060}])) +``` + +You can also mix data writes with metadata changes in the same transaction: + +```python +with tbl.transaction() as txn: + txn.append(df) + with txn.update_schema() as update_schema: + update_schema.add_column("population", "long") + txn.set_properties(owner="data-team") +``` + +If an exception is raised inside the `with` block, no snapshot is committed to the catalog. Note that data files already written to object storage are not automatically cleaned up in that case. + ### Upsert PyIceberg supports upsert operations, meaning that it is able to merge an Arrow table into an Iceberg table. Rows are considered the same based on the [identifier field](https://iceberg.apache.org/spec/?column-projection#identifier-field-ids). If a row is already in the table, it will update that row. If a row cannot be found, it will insert that new row. @@ -553,6 +590,7 @@ upd = tbl.upsert(df) assert upd.rows_updated == 1 assert upd.rows_inserted == 1 +# Paris was already up-to-date; PyIceberg skips it silently ``` PyIceberg will automatically detect which rows need to be updated, inserted or can simply be ignored. @@ -1370,20 +1408,6 @@ table = table.transaction().remove_properties("abc").commit_transaction() assert table.properties == {} ``` -## Snapshot properties - -Optionally, Snapshot properties can be set while writing to a table using `append` or `overwrite` API: - -```python -tbl.append(df, snapshot_properties={"abc": "def"}) - -# or - -tbl.overwrite(df, snapshot_properties={"abc": "def"}) - -assert tbl.metadata.snapshots[-1].summary["abc"] == "def" -``` - ## Snapshot Management Manage snapshots with operations through the `Table` API: