Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@
.vscode
**/.DS_Store
dist/*
.qoder
28 changes: 0 additions & 28 deletions bindings/python/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,34 +21,6 @@

This project builds the Rust-powered core for [PyPaimon](https://paimon.apache.org/docs/master/pypaimon/overview/) while also providing DataFusion integration for querying Paimon tables.

## Usage

The recommended way to query Paimon tables is through `SQLContext`, which supports
multi-catalog registration, DDL, DML, and all Paimon-specific SQL extensions:

```python
from pypaimon_rust.datafusion import SQLContext

ctx = SQLContext()
ctx.register_catalog("paimon", {"warehouse": "/path/to/warehouse"})

batches = ctx.sql("SELECT * FROM paimon.default.my_table")
```

Alternatively, you can register a `PaimonCatalog` into DataFusion's native `SessionContext`:

```python
from datafusion import SessionContext
from pypaimon_rust.datafusion import PaimonCatalog

catalog = PaimonCatalog({"warehouse": "/path/to/warehouse"})
ctx = SessionContext()
ctx.register_catalog_provider("paimon", catalog)

df = ctx.sql("SELECT * FROM paimon.default.my_table")
df.show()
```

## Setup

Install [uv](https://docs.astral.sh/uv/getting-started/installation/):
Expand Down
19 changes: 19 additions & 0 deletions bindings/python/project-description.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,25 @@ ctx.sql("INSERT INTO paimon.my_db.users VALUES (1, 'alice'), (2, 'bob')")
batches = ctx.sql("SELECT * FROM paimon.my_db.users")
```

### Temporary Tables

You can register temporary in-memory tables programmatically. Names support the same resolution rules as SQL: bare names use the current catalog and database, partially qualified names use the current catalog, and fully qualified names specify catalog.database.table.

Register a single PyArrow RecordBatch as a temporary table:

```python
import pyarrow as pa

batch = pa.record_batch([[1, 2], ["alice", "bob"]], names=["id", "name"])

ctx.register_batch("paimon.default.my_temp", batch)

batches = ctx.sql("SELECT * FROM paimon.default.my_temp")

# Drop it via SQL when no longer needed
ctx.sql("DROP TEMPORARY TABLE paimon.default.my_temp")
```

Alternatively, if you want to use the native Python DataFusion `SessionContext`,
install `datafusion` and register a `PaimonCatalog`:

Expand Down
1 change: 1 addition & 0 deletions bindings/python/python/pypaimon_rust/datafusion.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,5 @@ class SQLContext:
) -> None: ...
def set_current_catalog(self, catalog_name: str) -> None: ...
def set_current_database(self, database_name: str) -> None: ...
def register_batch(self, name: str, batch: pyarrow.RecordBatch) -> None: ...
def sql(self, sql: str) -> List[pyarrow.RecordBatch]: ...
12 changes: 11 additions & 1 deletion bindings/python/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
use std::collections::HashMap;
use std::sync::Arc;

use arrow::pyarrow::ToPyArrow;
use arrow::pyarrow::{FromPyArrow, ToPyArrow};
use datafusion::catalog::CatalogProvider;
use datafusion_ffi::catalog_provider::FFI_CatalogProvider;
use datafusion_ffi::proto::logical_extension_codec::FFI_LogicalExtensionCodec;
Expand Down Expand Up @@ -138,6 +138,16 @@ impl PySQLContext {
})
}

fn register_batch(&self, name: String, batch: Bound<'_, PyAny>) -> PyResult<()> {
let batch = datafusion::arrow::record_batch::RecordBatch::from_pyarrow_bound(&batch)?;
let schema = batch.schema();
let mem_table = datafusion::datasource::MemTable::try_new(schema, vec![vec![batch]])
.map_err(df_to_py_err)?;
self.inner
.register_temp_table(&name, Arc::new(mem_table))
.map_err(df_to_py_err)
}

fn sql(&self, py: Python<'_>, sql: String) -> PyResult<Vec<Py<PyAny>>> {
let rt = runtime();
let batches = rt.block_on(async {
Expand Down
99 changes: 99 additions & 0 deletions bindings/python/tests/test_datafusion.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,102 @@ def test_sql_context_ddl_dml():

ctx.sql("DROP TABLE paimon.test_db.users")
ctx.sql("DROP SCHEMA paimon.test_db")


def test_register_batch_fully_qualified():
with tempfile.TemporaryDirectory() as warehouse:
ctx = SQLContext()
ctx.register_catalog("paimon", {"warehouse": warehouse})

batch = pa.record_batch([[1, 2], ["alice", "bob"]], names=["id", "name"])
ctx.register_batch("paimon.default.my_temp", batch)

batches = ctx.sql("SELECT id, name FROM paimon.default.my_temp")
table = pa.Table.from_batches(batches)
rows = sorted(zip(table["id"].to_pylist(), table["name"].to_pylist()))
assert rows == [(1, "alice"), (2, "bob")]

ctx.sql("DROP TEMPORARY TABLE paimon.default.my_temp")


def test_register_batch_bare_name():
with tempfile.TemporaryDirectory() as warehouse:
ctx = SQLContext()
ctx.register_catalog("paimon", {"warehouse": warehouse})

batch = pa.record_batch([[1, 2], ["alice", "bob"]], names=["id", "name"])
# Bare name uses current catalog and current database
ctx.register_batch("my_temp", batch)

batches = ctx.sql("SELECT id, name FROM paimon.default.my_temp")
table = pa.Table.from_batches(batches)
rows = sorted(zip(table["id"].to_pylist(), table["name"].to_pylist()))
assert rows == [(1, "alice"), (2, "bob")]

ctx.sql("DROP TEMPORARY TABLE paimon.default.my_temp")


def test_temp_table_shadows_paimon_table():
with tempfile.TemporaryDirectory() as warehouse:
ctx = SQLContext()
ctx.register_catalog("paimon", {"warehouse": warehouse})

ctx.sql("CREATE SCHEMA paimon.test_db")
ctx.sql("CREATE TABLE paimon.test_db.users (id INT, name STRING)")
ctx.sql("INSERT INTO paimon.test_db.users VALUES (1, 'real')")

batch = pa.record_batch([[2], ["temp"]], names=["id", "name"])
ctx.register_batch("paimon.test_db.users", batch)

# Temp table should shadow the real Paimon table
batches = ctx.sql("SELECT id, name FROM paimon.test_db.users")
table = pa.Table.from_batches(batches)
rows = sorted(zip(table["id"].to_pylist(), table["name"].to_pylist()))
assert rows == [(2, "temp")]

ctx.sql("DROP TEMPORARY TABLE paimon.test_db.users")

# After dropping, the real table is visible again
batches = ctx.sql("SELECT id, name FROM paimon.test_db.users")
table = pa.Table.from_batches(batches)
rows = sorted(zip(table["id"].to_pylist(), table["name"].to_pylist()))
assert rows == [(1, "real")]

ctx.sql("DROP TABLE paimon.test_db.users")
ctx.sql("DROP SCHEMA paimon.test_db")


def test_drop_temp_table_if_exists():
with tempfile.TemporaryDirectory() as warehouse:
ctx = SQLContext()
ctx.register_catalog("paimon", {"warehouse": warehouse})

batch = pa.record_batch([[1]], names=["id"])
ctx.register_batch("paimon.default.my_temp", batch)

ctx.sql("DROP TEMPORARY TABLE IF EXISTS paimon.default.my_temp")

# Should be able to drop again without error
ctx.sql("DROP TEMPORARY TABLE IF EXISTS paimon.default.my_temp")


def test_multi_catalog_temp_table():
with tempfile.TemporaryDirectory() as wh1, tempfile.TemporaryDirectory() as wh2:
ctx = SQLContext()
ctx.register_catalog("cat1", {"warehouse": wh1})
ctx.register_catalog("cat2", {"warehouse": wh2})

batch1 = pa.record_batch([[1]], names=["id"])
batch2 = pa.record_batch([[2]], names=["id"])

ctx.register_batch("cat1.default.t1", batch1)
ctx.register_batch("cat2.default.t2", batch2)

result1 = ctx.sql("SELECT id FROM cat1.default.t1")
assert pa.Table.from_batches(result1)["id"].to_pylist() == [1]

result2 = ctx.sql("SELECT id FROM cat2.default.t2")
assert pa.Table.from_batches(result2)["id"].to_pylist() == [2]

ctx.sql("DROP TEMPORARY TABLE cat1.default.t1")
ctx.sql("DROP TEMPORARY TABLE cat2.default.t2")
1 change: 1 addition & 0 deletions crates/integrations/datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ futures = "0.3"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
tokio = { workspace = true, features = ["rt", "time", "fs"] }
uuid = { version = "1", features = ["v4"] }

[dev-dependencies]
arrow-array = { workspace = true }
Expand Down
Loading
Loading