Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
2a29c63
feat: implement async context manager support for connection, writers…
qzyu999 Apr 10, 2026
dec0921
feat: expand python context manager test coverage for scanners and co…
qzyu999 Apr 10, 2026
180ad11
refactor: adjust indentation and clean up Python example code structure
qzyu999 Apr 11, 2026
a1cadf2
fix: Add self.close()?; to __aexit__ in connection.rs
qzyu999 Apr 11, 2026
ff10371
fix: Cleanup docstring
qzyu999 Apr 11, 2026
a6a5bff
feat: implement graceful connection shutdown with async close and upd…
qzyu999 Apr 17, 2026
df9e8c7
feat: add optional timeout parameter to Python connection close method
qzyu999 Apr 17, 2026
713e5c2
Merge remote-tracking branch 'upstream/main' into feat/456-python-asy…
qzyu999 Apr 17, 2026
0fa4500
feat: revert python example indentation and add example for async con…
qzyu999 Apr 18, 2026
efbf4f7
fix: revert changes to original example.py
qzyu999 Apr 18, 2026
c1546fc
fix: revert changes to example.py (again)
qzyu999 Apr 18, 2026
5e0adc8
fix: always flush writers on context manager exit
qzyu999 Apr 20, 2026
8bec2d8
refactor: polish connection signature and clean up tests
qzyu999 Apr 20, 2026
195579f
fix: prevent flush errors from masking in-flight exceptions
qzyu999 Apr 28, 2026
54a0361
fix: update dictionary in append to include all 9 fields in the schema
qzyu999 Apr 29, 2026
f634652
refactor: update the __aexit__ to use is_exc_none for FlussConnection
qzyu999 Apr 29, 2026
ca0a4ad
fix: move the async context manager demo to avoid the dropped table i…
qzyu999 Apr 29, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion bindings/python/example/example.py
Original file line number Diff line number Diff line change
Expand Up @@ -667,6 +667,26 @@ async def main():
except Exception as e:
print(f"Error during projection: {e}")


print("\n--- New: async context manager demo ---")
async with await fluss.FlussConnection.create(config) as demo_conn:
demo_table = await demo_conn.get_table(table_path)
async with demo_table.new_append().create_writer() as writer:
writer.append(
{
"id": 1,
"name": "demo",
"score": 1.0,
"age": 25,
"birth_date": date(2000, 1, 1),
"check_in_time": dt_time(12, 0, 0),
"created_at": datetime(2024, 1, 1, 12, 0, 0),
"updated_at": datetime(2024, 1, 1, 12, 0, 0),
"salary": Decimal("100.00"),
}
)
# auto-flushes on exit

# Demo: Drop tables
print("\n--- Testing drop_table() ---")
try:
Expand Down Expand Up @@ -933,8 +953,10 @@ async def main():
print(f"Error with partitioned KV table: {e}")
traceback.print_exc()



# Close connection
conn.close()
await conn.close()
print("\nConnection closed")


Expand Down
53 changes: 52 additions & 1 deletion bindings/python/fluss/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -245,14 +245,21 @@ class FlussConnection:
async def create(config: Config) -> FlussConnection: ...
def get_admin(self) -> FlussAdmin: ...
async def get_table(self, table_path: TablePath) -> FlussTable: ...
def close(self) -> None: ...
async def close(self) -> None: ...
def __enter__(self) -> FlussConnection: ...
def __exit__(
self,
exc_type: Optional[type],
exc_value: Optional[BaseException],
traceback: Optional[TracebackType],
) -> bool: ...
async def __aenter__(self) -> FlussConnection: ...
async def __aexit__(
self,
exc_type: Optional[type],
exc_value: Optional[BaseException],
traceback: Optional[TracebackType],
) -> bool: ...
def __repr__(self) -> str: ...

class ServerNode:
Expand Down Expand Up @@ -611,6 +618,27 @@ class AppendWriter:
def write_arrow_batch(self, batch: pa.RecordBatch) -> WriteResultHandle: ...
def write_pandas(self, df: pd.DataFrame) -> None: ...
async def flush(self) -> None: ...
async def __aenter__(self) -> AppendWriter:
"""
Enter the async context manager.

Returns:
The AppendWriter instance.
"""
...
async def __aexit__(
self,
exc_type: Optional[type],
exc_value: Optional[BaseException],
traceback: Optional[TracebackType],
) -> bool:
"""
Exit the async context manager.

On exit, the writer is automatically flushed to ensure
all pending records are sent and acknowledged.
"""
...
def __repr__(self) -> str: ...

class UpsertWriter:
Expand Down Expand Up @@ -644,6 +672,27 @@ class UpsertWriter:
async def flush(self) -> None:
"""Flush all pending upsert/delete operations to the server."""
...
async def __aenter__(self) -> UpsertWriter:
"""
Enter the async context manager.

Returns:
The UpsertWriter instance.
"""
...
async def __aexit__(
self,
exc_type: Optional[type],
exc_value: Optional[BaseException],
traceback: Optional[TracebackType],
) -> bool:
"""
Exit the async context manager.

On exit, the writer is automatically flushed to ensure
all pending records are sent and acknowledged.
"""
...
def __repr__(self) -> str: ...


Expand Down Expand Up @@ -807,6 +856,8 @@ class LogScanner:

You must call subscribe(), subscribe_buckets(), or subscribe_partition() first.
"""
...

def __repr__(self) -> str: ...
def __aiter__(self) -> AsyncIterator[Union[ScanRecord, RecordBatch]]: ...

Expand Down
48 changes: 44 additions & 4 deletions bindings/python/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use crate::*;
use pyo3_async_runtimes::tokio::future_into_py;
use std::sync::Arc;
use std::time::Duration;

/// Connection to a Fluss cluster
#[pyclass]
Expand Down Expand Up @@ -82,9 +83,19 @@ impl FlussConnection {
})
}

// Close the connection
fn close(&mut self) -> PyResult<()> {
Ok(())
/// Close the connection (async).
///
/// Gracefully shuts down the connection by draining any pending write batches.
/// This method is awaitable.
fn close<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
let inner = self.inner.clone();

future_into_py(py, async move {
inner
.close(Duration::MAX)
.await
.map_err(|e| FlussError::from_core_error(&e))
})
}

// Enter the runtime context (for 'with' statement)
Expand All @@ -100,10 +111,39 @@ impl FlussConnection {
_exc_value: Option<Bound<'_, PyAny>>,
_traceback: Option<Bound<'_, PyAny>>,
) -> PyResult<bool> {
self.close()?;
// Sync exit cannot await the graceful drain, so it's a no-op here.
// Users should use 'async with' for graceful shutdown.
Ok(false)
}

// Enter the async runtime context (for 'async with' statement)
fn __aenter__<'py>(slf: PyRef<'py, Self>, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
let py_slf = slf.into_pyobject(py)?.unbind();
future_into_py(py, async move { Ok(py_slf) })
}

// Exit the async runtime context (for 'async with' statement)
#[pyo3(signature = (exc_type=None, _exc_value=None, _traceback=None))]
fn __aexit__<'py>(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: mirror the writers' is_exc_none guard

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @fresh-borzoni, thanks for catching this, fixed in f634652.

&self,
py: Python<'py>,
exc_type: Option<Bound<'py, PyAny>>,
_exc_value: Option<Bound<'py, PyAny>>,
_traceback: Option<Bound<'py, PyAny>>,
) -> PyResult<Bound<'py, PyAny>> {
let inner = self.inner.clone();
let is_exc_none = exc_type.as_ref().map_or(true, |e| e.is_none());
future_into_py(py, async move {
let res = inner.close(Duration::MAX).await;
if let Err(e) = res {
if is_exc_none {
return Err(FlussError::from_core_error(&e));
}
}
Ok(false)
})
}
Comment on lines +125 to +145
Copy link

Copilot AI Apr 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

__exit__ calls self.close()?, but the new async context manager __aexit__ does not. This means async with await FlussConnection.create(...) will not close the connection (even if close() is implemented later). Please mirror __exit__ by calling self.close()? in __aexit__ (it can be done before creating the future since close() is synchronous).

Copilot uses AI. Check for mistakes.

fn __repr__(&self) -> String {
"FlussConnection()".to_string()
}
Expand Down
29 changes: 29 additions & 0 deletions bindings/python/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -989,6 +989,35 @@ impl AppendWriter {
})
}

// Enter the async runtime context (for 'async with' statement)
fn __aenter__<'py>(slf: PyRef<'py, Self>, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
let py_slf = slf.into_pyobject(py)?.unbind();
future_into_py(py, async move { Ok(py_slf) })
}

// Exit the async runtime context (for 'async with' statement)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need this logic. The whole idea of context managers is guaranteed cleanup, and here we skip flush() just to return the error faster - that doesn't match.

Can we just always call flush() on exit

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @fresh-borzoni, this has been addressed in 5e0adc8.

/// On exit, the writer is automatically flushed.
#[pyo3(signature = (exc_type=None, _exc_value=None, _traceback=None))]
fn __aexit__<'py>(
&self,
py: Python<'py>,
exc_type: Option<Bound<'py, PyAny>>,
_exc_value: Option<Bound<'py, PyAny>>,
_traceback: Option<Bound<'py, PyAny>>,
) -> PyResult<Bound<'py, PyAny>> {
let inner = self.inner.clone();
let is_exc_none = exc_type.as_ref().map_or(true, |e| e.is_none());
future_into_py(py, async move {
let res = inner.flush().await;
if let Err(e) = res {
if is_exc_none {
return Err(FlussError::from_core_error(&e));
}
}
Ok(false)
})
}
Comment on lines +998 to +1019
Copy link

Copilot AI Apr 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AppendWriter.__aexit__ only flushes on the success path and never closes/invalidates the writer. After leaving async with, the writer object remains fully usable and any underlying resources are not deterministically released, which doesn’t match the linked issue’s “flush then close / close on exception” contract. Consider adding an explicit close() (even if initially a no-op) and calling it from __aexit__, or otherwise marking the writer as closed so further writes fail fast.

Copilot uses AI. Check for mistakes.

fn __repr__(&self) -> String {
"AppendWriter()".to_string()
}
Expand Down
29 changes: 29 additions & 0 deletions bindings/python/src/upsert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,35 @@ impl UpsertWriter {
})
}

// Enter the async runtime context (for 'async with' statement)
fn __aenter__<'py>(slf: PyRef<'py, Self>, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
let py_slf = slf.into_pyobject(py)?.unbind();
future_into_py(py, async move { Ok(py_slf) })
}

// Exit the async runtime context (for 'async with' statement)
/// On exit, the writer is automatically flushed.
#[pyo3(signature = (exc_type=None, _exc_value=None, _traceback=None))]
fn __aexit__<'py>(
&self,
py: Python<'py>,
exc_type: Option<Bound<'py, PyAny>>,
_exc_value: Option<Bound<'py, PyAny>>,
_traceback: Option<Bound<'py, PyAny>>,
) -> PyResult<Bound<'py, PyAny>> {
let writer = self.writer.clone();
let is_exc_none = exc_type.as_ref().map_or(true, |e| e.is_none());
future_into_py(py, async move {
let res = writer.flush().await;
if let Err(e) = res {
if is_exc_none {
return Err(FlussError::from_core_error(&e));
}
}
Ok(false)
})
Comment on lines +117 to +137
Copy link

Copilot AI Apr 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UpsertWriter.__aexit__ only flushes on the success path and does not close/invalidate the writer in either path. This means leaving an async with block does not actually end the writer’s lifecycle, which diverges from the linked issue’s expected semantics. Consider adding/using an explicit close() and invoking it from __aexit__ (flush+close on success; close-only on exception).

Copilot uses AI. Check for mistakes.
}

fn __repr__(&self) -> String {
"UpsertWriter()".to_string()
}
Expand Down
2 changes: 1 addition & 1 deletion bindings/python/test/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ async def _connect(bootstrap_servers):
nodes = await admin.get_server_nodes()
if any(n.server_type == "TabletServer" for n in nodes):
return conn
conn.close()
await conn.close()
last_err = RuntimeError("No TabletServer available yet")
except Exception as e:
last_err = e
Expand Down
116 changes: 116 additions & 0 deletions bindings/python/test/test_context_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
# Licensed to the Apache Software Foundation (ASF) under one
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the tests, I think three are enough: one for each async with-enabled type (AppendWriter, UpsertWriter, FlussConnection), each verifying the one behavior the CM adds - that pending writes get flushed/drained on exit.
A single strong test_connection_drain_on_close (write N records without flushing, rely on async with conn: to drain on exit, verify all N arrive) is the one that actually proves the PR's core value.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @fresh-borzoni, this has been addressed in 5e0adc8.

# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import pytest
import pyarrow as pa
import time
import fluss

def _poll_records(scanner, expected_count, timeout_s=10):
"""Poll a record-based scanner until expected_count records are collected."""
collected = []
deadline = time.monotonic() + timeout_s
while len(collected) < expected_count and time.monotonic() < deadline:
records = scanner.poll(5000)
collected.extend(records)
return collected

@pytest.mark.asyncio
async def test_connection_context_manager(plaintext_bootstrap_servers):
config = fluss.Config({"bootstrap.servers": plaintext_bootstrap_servers})
async with await fluss.FlussConnection.create(config) as conn:
admin = conn.get_admin()
nodes = await admin.get_server_nodes()
assert len(nodes) > 0


@pytest.mark.asyncio
async def test_append_writer_success_flush(connection, admin):
table_path = fluss.TablePath("fluss", "test_append_ctx_success")
await admin.drop_table(table_path, ignore_if_not_exists=True)

schema = fluss.Schema(pa.schema([pa.field("a", pa.int32())]))
await admin.create_table(table_path, fluss.TableDescriptor(schema))

table = await connection.get_table(table_path)

async with table.new_append().create_writer() as writer:
writer.append({"a": 1})
writer.append({"a": 2})
# No explicit flush here

# After context exit, data should be flushed
scanner = await table.new_scan().create_log_scanner()
scanner.subscribe(0, fluss.EARLIEST_OFFSET)
records = _poll_records(scanner, expected_count=2)
assert len(records) == 2
assert sorted([r.row["a"] for r in records]) == [1, 2]

@pytest.mark.asyncio
async def test_connection_drain_on_close(plaintext_bootstrap_servers, admin):
table_path = fluss.TablePath("fluss", "test_conn_drain")
await admin.drop_table(table_path, ignore_if_not_exists=True)
schema = fluss.Schema(pa.schema([pa.field("a", pa.int32())]))
await admin.create_table(table_path, fluss.TableDescriptor(schema))

config = fluss.Config({"bootstrap.servers": plaintext_bootstrap_servers})
async with await fluss.FlussConnection.create(config) as conn:
table = await conn.get_table(table_path)
writer = table.new_append().create_writer()
writer.append({"a": 123})
# No explicit flush, no writer context exit.
# Rely on connection.__aexit__ -> close() to drain.

# Re-connect with a new connection to verify data arrived
async with await fluss.FlussConnection.create(config) as conn2:
table2 = await conn2.get_table(table_path)
scanner = await table2.new_scan().create_log_scanner()
scanner.subscribe(0, fluss.EARLIEST_OFFSET)
records = _poll_records(scanner, expected_count=1)
assert len(records) == 1
assert records[0].row["a"] == 123

@pytest.mark.asyncio
async def test_upsert_writer_context_manager(connection, admin):
table_path = fluss.TablePath("fluss", "test_upsert_ctx")
await admin.drop_table(table_path, ignore_if_not_exists=True)

schema = fluss.Schema(pa.schema([pa.field("id", pa.int32()), pa.field("v", pa.string())]), primary_keys=["id"])
await admin.create_table(table_path, fluss.TableDescriptor(schema))

table = await connection.get_table(table_path)

# Success path: verify it flushes
async with table.new_upsert().create_writer() as writer:
writer.upsert({"id": 1, "v": "a"})

lookuper = table.new_lookup().create_lookuper()
res = await lookuper.lookup({"id": 1})
assert res is not None
assert res["v"] == "a"

@pytest.mark.asyncio
async def test_connection_context_manager_exception(plaintext_bootstrap_servers):
config = fluss.Config({"bootstrap.servers": plaintext_bootstrap_servers})
class TestException(Exception): pass

try:
async with await fluss.FlussConnection.create(config) as conn:
raise TestException("connection error")
except TestException:
pass
# If we reach here without hanging, the connection __aexit__ gracefully handled the error
Loading
Loading