From 0e6b1eaaf65feffb9c8fc06f4eaf6f0cac5a1b5e Mon Sep 17 00:00:00 2001 From: Alexander Date: Mon, 30 Mar 2026 09:15:54 +0300 Subject: [PATCH 1/4] adding raw callback (data records as bytes, requires dbn decode_raw). --- databento/common/types.py | 40 ++++++++++++++++++++++++++++++ databento/live/client.py | 39 +++++++++++++++++++++++++++++ databento/live/protocol.py | 50 ++++++++++++++++++++++---------------- databento/live/session.py | 45 ++++++++++++++++++++++++++++++++++ 4 files changed, 153 insertions(+), 21 deletions(-) diff --git a/databento/common/types.py b/databento/common/types.py index 39c55c2..45eecfe 100644 --- a/databento/common/types.py +++ b/databento/common/types.py @@ -72,6 +72,7 @@ class MappingIntervalDict(TypedDict): RecordCallback = Callable[[databento_dbn.DBNRecord], None] +RawRecordCallback = Callable[[bytes], None] ExceptionCallback = Callable[[Exception], None] ReconnectCallback = Callable[[pd.Timestamp, pd.Timestamp], None] @@ -262,3 +263,42 @@ def _warn(self, msg: str) -> None: BentoWarning, stacklevel=3, ) + + +class ClientRawRecordCallback: + def __init__( + self, + fn: RawRecordCallback, + exc_fn: ExceptionCallback | None = None, + max_warnings: int = 10, + ) -> None: + if not callable(fn): + raise ValueError(f"{fn} is not callable") + if exc_fn is not None and not callable(exc_fn): + raise ValueError(f"{exc_fn} is not callable") + + self._fn = fn + self._exc_fn = exc_fn + self._max_warnings = max(0, max_warnings) + self._warning_count = 0 + + @property + def callback_name(self) -> str: + return getattr(self._fn, "__name__", str(self._fn)) + + def call(self, raw: bytes) -> None: + try: + self._fn(raw) + except Exception as exc: + if self._exc_fn is None: + logger.warning( + "raw callback '%s' encountered an exception without an exception callback: %r", + self.callback_name, + exc, + ) + else: + try: + self._exc_fn(exc) + except Exception as inner_exc: + raise inner_exc from exc + raise exc diff --git a/databento/live/client.py b/databento/live/client.py index 2a047f2..50df7f3 100644 --- a/databento/live/client.py +++ b/databento/live/client.py @@ -26,9 +26,11 @@ from databento.common.error import BentoError from databento.common.parsing import optional_datetime_to_unix_nanoseconds from databento.common.publishers import Dataset +from databento.common.types import ClientRawRecordCallback from databento.common.types import ClientRecordCallback from databento.common.types import ClientStream from databento.common.types import ExceptionCallback +from databento.common.types import RawRecordCallback from databento.common.types import ReconnectCallback from databento.common.types import RecordCallback from databento.common.validation import validate_enum @@ -354,6 +356,43 @@ def add_callback( logger.info("adding user callback %s", client_callback.callback_name) self._session._user_callbacks.append(client_callback) + def add_raw_callback( + self, + record_callback: RawRecordCallback, + exception_callback: ExceptionCallback | None = None, + ) -> None: + """ + Add a callback for handling records as raw bytes. + + Unlike `add_callback`, this receives each record as a raw `bytes` object + without boxing into a Python record type. This avoids CPython memory arena + accumulation at high message rates and is the preferred path for consumers + that immediately re-serialize the data (e.g. pass to a native encoder). + + Parameters + ---------- + record_callback : Callable[[bytes], None] + A callback to register for handling live records as raw bytes. + exception_callback : Callable[[Exception], None], optional + An error handling callback for exceptions raised in `record_callback`. + + Raises + ------ + ValueError + If `record_callback` is not callable. + + See Also + -------- + Live.add_callback + + """ + client_callback = ClientRawRecordCallback( + fn=record_callback, + exc_fn=exception_callback, + ) + logger.info("adding raw callback %s", client_callback.callback_name) + self._session._raw_callbacks.append(client_callback) + def add_stream( self, stream: IO[bytes] | PathLike[str] | str, diff --git a/databento/live/protocol.py b/databento/live/protocol.py index 4d06dcc..ec5ff33 100644 --- a/databento/live/protocol.py +++ b/databento/live/protocol.py @@ -387,29 +387,37 @@ def _process_dbn(self, data: bytes) -> None: if isinstance(record, databento_dbn.Metadata): self.received_metadata(record) continue - if isinstance(record, databento_dbn.ErrorMsg): - logger.error( - "gateway error code=%s err='%s'", - record.code, - record.err, - ) - self._error_msgs.append(record.err) - elif isinstance(record, databento_dbn.SystemMsg): - if record.is_heartbeat(): - logger.debug("gateway heartbeat") - else: - if record.code == SystemCode.END_OF_INTERVAL: - system_msg_level = logging.DEBUG - else: - system_msg_level = logging.INFO - logger.log( - system_msg_level, - "system message code=%s msg='%s'", - record.code, - record.msg, - ) + self._handle_control_record(record) self.received_record(record) + def _handle_control_record(self, record: DBNRecord) -> None: + """ + Process control record side effects: logging and error tracking. + + Called for ErrorMsg and SystemMsg before received_record(). + """ + if isinstance(record, databento_dbn.ErrorMsg): + logger.error( + "gateway error code=%s err='%s'", + record.code, + record.err, + ) + self._error_msgs.append(record.err) + elif isinstance(record, databento_dbn.SystemMsg): + if record.is_heartbeat(): + logger.debug("gateway heartbeat") + else: + if record.code == SystemCode.END_OF_INTERVAL: + system_msg_level = logging.DEBUG + else: + system_msg_level = logging.INFO + logger.log( + system_msg_level, + "system message code=%s msg='%s'", + record.code, + record.msg, + ) + def _process_gateway(self, data: bytes) -> None: try: self._gateway_decoder.write(data) diff --git a/databento/live/session.py b/databento/live/session.py index d5cb9a9..5170a32 100644 --- a/databento/live/session.py +++ b/databento/live/session.py @@ -24,9 +24,11 @@ from databento.common.enums import SlowReaderBehavior from databento.common.error import BentoError from databento.common.publishers import Dataset +from databento.common.types import ClientRawRecordCallback from databento.common.types import ClientRecordCallback from databento.common.types import ClientStream from databento.common.types import ExceptionCallback +from databento.common.types import RawRecordCallback from databento.common.types import ReconnectCallback from databento.live.gateway import SubscriptionRequest from databento.live.protocol import DatabentoLiveProtocol @@ -209,6 +211,7 @@ def __init__( heartbeat_interval_s: int | None = None, slow_reader_behavior: SlowReaderBehavior | str | None = None, compression: Compression = Compression.NONE, + raw_callbacks: list[ClientRawRecordCallback] | None = None, ): super().__init__( api_key, @@ -224,6 +227,7 @@ def __init__( self._metadata: SessionMetadata = metadata self._user_callbacks = user_callbacks self._user_streams = user_streams + self._raw_callbacks: list[ClientRawRecordCallback] = raw_callbacks if raw_callbacks is not None else [] self._last_ts_event: int | None = None self._last_msg_loop_time: float = math.inf @@ -253,6 +257,45 @@ def received_record(self, record: DBNRecord) -> None: return super().received_record(record) + def _process_dbn(self, data: bytes) -> None: + if not self._raw_callbacks: + return super()._process_dbn(data) + + try: + self._dbn_decoder.write(bytes(data)) + records = self._dbn_decoder.decode_raw() + except Exception: + logger.exception("error decoding DBN record") + self.transport.close() + raise + + for record in records: + if isinstance(record, databento_dbn.Metadata): + self.received_metadata(record) + elif isinstance(record, bytes): + # Data record as raw bytes, no Python object creation. + logger.debug("dispatching raw data record") + self._dispatch_raw_callbacks(record) + # ts_event lives at RecordHeader offset 8 (u64 LE). + self._last_ts_event = struct.unpack_from(" None: + for callback in self._raw_callbacks: + try: + callback.call(raw) + except Exception as exc: + logger.error( + "error dispatching raw record to `%s` callback", + callback.callback_name, + exc_info=exc, + ) + def _dispatch_callbacks(self, record: DBNRecord) -> None: for callback in self._user_callbacks: try: @@ -336,6 +379,7 @@ def __init__( self._user_gateway: str | None = user_gateway self._user_streams: list[ClientStream] = [] self._user_callbacks: list[ClientRecordCallback] = [] + self._raw_callbacks: list[ClientRawRecordCallback] = [] self._user_reconnect_callbacks: list[tuple[ReconnectCallback, ExceptionCallback | None]] = ( [] ) @@ -598,6 +642,7 @@ def _create_protocol(self, dataset: Dataset | str) -> _SessionProtocol: heartbeat_interval_s=self.heartbeat_interval_s, slow_reader_behavior=self._slow_reader_behavior, compression=self._compression, + raw_callbacks=self._raw_callbacks, ) def _connect( From b7e742f8a59c077ad1b41c72deaad064f27ecb67 Mon Sep 17 00:00:00 2001 From: Alexander Date: Mon, 30 Mar 2026 09:16:08 +0300 Subject: [PATCH 2/4] adding test for raw callback --- tests/test_live_client.py | 36 ++++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/tests/test_live_client.py b/tests/test_live_client.py index 75aa7bb..ce1233f 100644 --- a/tests/test_live_client.py +++ b/tests/test_live_client.py @@ -1301,6 +1301,42 @@ def callback(record: DBNRecord) -> None: assert isinstance(records[3], databento_dbn.MBOMsg) +async def test_live_raw_callback( + live_client: client.Live, +) -> None: + """ + Test raw callback dispatch of DBN records as bytes. + + Mirrors test_live_callback but uses add_raw_callback. Data records + should arrive as raw bytes; control records still go to add_callback. + """ + # Arrange + live_client.subscribe( + dataset=Dataset.GLBX_MDP3, + schema=Schema.MBO, + stype_in=SType.RAW_SYMBOL, + symbols="TEST", + ) + raw_records: list[bytes] = [] + + def raw_callback(raw: bytes) -> None: + raw_records.append(raw) + + # Act + live_client.add_raw_callback(raw_callback) + + live_client.start() + + await live_client.wait_for_close() + + # Assert — same 4 MBO records, but as raw bytes + assert len(raw_records) == 4 + mbo_size = len(bytes(databento_dbn.MBOMsg(0x01, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0))) + for raw in raw_records: + assert isinstance(raw, bytes) + assert len(raw) == mbo_size + + @pytest.mark.parametrize( "dataset", [ From aa4f6451186f6eaa96a09cc048550667a5d839d7 Mon Sep 17 00:00:00 2001 From: Alexander Date: Mon, 30 Mar 2026 11:23:58 +0300 Subject: [PATCH 3/4] minors --- databento/live/client.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/databento/live/client.py b/databento/live/client.py index 50df7f3..be443f3 100644 --- a/databento/live/client.py +++ b/databento/live/client.py @@ -364,10 +364,8 @@ def add_raw_callback( """ Add a callback for handling records as raw bytes. - Unlike `add_callback`, this receives each record as a raw `bytes` object - without boxing into a Python record type. This avoids CPython memory arena - accumulation at high message rates and is the preferred path for consumers - that immediately re-serialize the data (e.g. pass to a native encoder). + Unlike `add_callback`, this receives each record as raw `bytes`. + No Python objects are created, avoiding overhead and memory issues. Parameters ---------- From ec63348e48b311a2aa64bba2fdc2c72fc803aedf Mon Sep 17 00:00:00 2001 From: Alexander Date: Mon, 30 Mar 2026 12:00:29 +0300 Subject: [PATCH 4/4] simplifying raw callback (inheritence, code reuse) --- databento/common/types.py | 37 +++++++------------------------------ databento/live/protocol.py | 18 ++++++++++++------ databento/live/session.py | 9 ++------- 3 files changed, 21 insertions(+), 43 deletions(-) diff --git a/databento/common/types.py b/databento/common/types.py index 45eecfe..01b2530 100644 --- a/databento/common/types.py +++ b/databento/common/types.py @@ -265,40 +265,17 @@ def _warn(self, msg: str) -> None: ) -class ClientRawRecordCallback: +class ClientRawRecordCallback(ClientRecordCallback): def __init__( self, fn: RawRecordCallback, exc_fn: ExceptionCallback | None = None, max_warnings: int = 10, ) -> None: - if not callable(fn): - raise ValueError(f"{fn} is not callable") - if exc_fn is not None and not callable(exc_fn): - raise ValueError(f"{exc_fn} is not callable") - - self._fn = fn - self._exc_fn = exc_fn - self._max_warnings = max(0, max_warnings) - self._warning_count = 0 - - @property - def callback_name(self) -> str: - return getattr(self._fn, "__name__", str(self._fn)) + super().__init__(fn=fn, exc_fn=exc_fn, max_warnings=max_warnings) # type: ignore [arg-type] - def call(self, raw: bytes) -> None: - try: - self._fn(raw) - except Exception as exc: - if self._exc_fn is None: - logger.warning( - "raw callback '%s' encountered an exception without an exception callback: %r", - self.callback_name, - exc, - ) - else: - try: - self._exc_fn(exc) - except Exception as inner_exc: - raise inner_exc from exc - raise exc + def call(self, raw: bytes) -> None: # type: ignore [override] + """ + Execute the callback, passing raw bytes. + """ + super().call(raw) # type: ignore [arg-type] diff --git a/databento/live/protocol.py b/databento/live/protocol.py index ec5ff33..67b0d14 100644 --- a/databento/live/protocol.py +++ b/databento/live/protocol.py @@ -383,12 +383,18 @@ def _process_dbn(self, data: bytes) -> None: raise else: for record in records: - logger.debug("dispatching %s", type(record).__name__) - if isinstance(record, databento_dbn.Metadata): - self.received_metadata(record) - continue - self._handle_control_record(record) - self.received_record(record) + self._dispatch_decoded_record(record) + + def _dispatch_decoded_record(self, record: DBNRecord | Metadata) -> None: + """ + Route a single decoded record to the appropriate handler. + """ + logger.debug("dispatching %s", type(record).__name__) + if isinstance(record, databento_dbn.Metadata): + self.received_metadata(record) + else: + self._handle_control_record(record) + self.received_record(record) def _handle_control_record(self, record: DBNRecord) -> None: """ diff --git a/databento/live/session.py b/databento/live/session.py index 5170a32..d1bf049 100644 --- a/databento/live/session.py +++ b/databento/live/session.py @@ -270,9 +270,7 @@ def _process_dbn(self, data: bytes) -> None: raise for record in records: - if isinstance(record, databento_dbn.Metadata): - self.received_metadata(record) - elif isinstance(record, bytes): + if isinstance(record, bytes): # Data record as raw bytes, no Python object creation. logger.debug("dispatching raw data record") self._dispatch_raw_callbacks(record) @@ -280,10 +278,7 @@ def _process_dbn(self, data: bytes) -> None: self._last_ts_event = struct.unpack_from(" None: for callback in self._raw_callbacks: