From a862a889ba02e2121f5eac4ee3cb3f12479155bd Mon Sep 17 00:00:00 2001 From: Klemen Tusar Date: Wed, 25 Feb 2026 19:41:42 +0100 Subject: [PATCH 1/3] refactor(encode): optimize traversal internals and key-path handling Improve encode hot paths, cycle-state helpers, phase dispatch, and key-path caching behavior. Includes iterative dot-encoding in KeyPathNode to avoid deep recursion and related internal clarity updates. --- src/qs_codec/encode.py | 414 ++++++++++++++++++--------- src/qs_codec/models/key_path_node.py | 100 +++++++ src/qs_codec/utils/utils.py | 21 +- 3 files changed, 397 insertions(+), 138 deletions(-) create mode 100644 src/qs_codec/models/key_path_node.py diff --git a/src/qs_codec/encode.py b/src/qs_codec/encode.py index 9ad276c..712017d 100644 --- a/src/qs_codec/encode.py +++ b/src/qs_codec/encode.py @@ -15,6 +15,7 @@ import sys import typing as t +from collections.abc import Mapping as ABCMapping from collections.abc import Sequence as ABCSequence from copy import deepcopy from dataclasses import dataclass, field @@ -27,6 +28,7 @@ from .enums.list_format import ListFormat from .enums.sentinel import Sentinel from .models.encode_options import EncodeOptions +from .models.key_path_node import KeyPathNode from .models.undefined import UNDEFINED, Undefined from .models.weak_wrapper import WeakWrapper from .utils.utils import Utils @@ -48,7 +50,8 @@ def encode(value: t.Any, options: EncodeOptions = EncodeOptions()) -> str: Notes: - Caller input is not mutated. When a mapping is provided it is shallow-copied (deep-copied only when a callable - filter is used); sequences are projected to a temporary mapping. + filter is used). Root sequences are projected to a temporary mapping and deep-copied first when a callable + filter is used. - If a callable `filter` is provided, it can transform the root object. - If an iterable filter is provided, it selects which *root* keys to emit. """ @@ -60,13 +63,14 @@ def encode(value: t.Any, options: EncodeOptions = EncodeOptions()) -> str: # Normalize the root into a mapping we can traverse deterministically: # - Mapping -> shallow copy (deep-copy only when a callable filter may mutate) - # - Sequence -> promote to {"0": v0, "1": v1, ...} + # - Sequence -> optionally deep-copy for callable filters, then promote to {"0": v0, "1": v1, ...} # - Other -> empty (encodes to "") obj: t.Mapping[str, t.Any] - if isinstance(value, t.Mapping): + if isinstance(value, ABCMapping): obj = deepcopy(value) if callable(filter_opt) else dict(value) elif isinstance(value, (list, tuple)): - obj = {str(i): item for i, item in enumerate(value)} + sequence = deepcopy(value) if callable(filter_opt) else value + obj = {str(i): item for i, item in enumerate(sequence)} else: obj = {} @@ -101,17 +105,22 @@ def encode(value: t.Any, options: EncodeOptions = EncodeOptions()) -> str: max_depth = _get_max_encode_depth(options.max_depth) # Encode each selected root key. + missing = _MISSING for _key in obj_keys: if not isinstance(_key, str): # Skip non-string keys; parity with ports that stringify key paths. continue + + obj_value = obj.get(_key, missing) + key_is_undefined = obj_value is missing + # Optionally drop explicit nulls at the root. - if _key in obj and obj.get(_key) is None and options.skip_nulls: + if options.skip_nulls and obj_value is None: continue _encoded: t.Union[t.List[t.Any], t.Tuple[t.Any, ...], t.Any] = _encode( - value=obj.get(_key), - is_undefined=_key not in obj, + value=None if key_is_undefined else obj_value, + is_undefined=key_is_undefined, side_channel=side_channel, prefix=_key, generate_array_prefix=options.list_format.generator, @@ -161,9 +170,14 @@ def encode(value: t.Any, options: EncodeOptions = EncodeOptions()) -> str: # Alias for the `encode` function. dumps = encode # public alias (parity with `json.dumps` / Node `qs.stringify`) -# Unique placeholder used as a key within the side-channel chain to pass context down recursion. +_MISSING = object() + +# Unique placeholder used as a key within the side-channel chain to pass context down traversal frames. _sentinel: WeakWrapper = WeakWrapper({}) MAX_ENCODING_DEPTH_EXCEEDED = "Maximum encoding depth exceeded" +_PHASE_START = 0 +_PHASE_ITERATE = 1 +_PHASE_AWAIT_CHILD = 2 def _get_max_encode_depth(max_depth: t.Optional[int]) -> int: @@ -172,8 +186,48 @@ def _get_max_encode_depth(max_depth: t.Optional[int]) -> int: return max_depth -@dataclass class _EncodeFrame: + """Mutable traversal frame for iterative encoding.""" + + __slots__ = ( + "add_query_prefix", + "adjusted_path", + "allow_dots", + "allow_empty_lists", + "charset", + "comma_compact_nulls", + "comma_round_trip", + "cycle_level", + "cycle_pushed", + "cycle_state", + "depth", + "encode_dot_in_keys", + "encode_values_only", + "encoder", + "filter_", + "format", + "formatter", + "generate_array_prefix", + "index", + "is_mapping", + "is_sequence", + "is_undefined", + "max_depth", + "obj", + "obj_id", + "obj_keys", + "path", + "phase", + "prefix", + "serialize_date", + "side_channel", + "skip_nulls", + "sort", + "step", + "strict_null_handling", + "value", + "values", + ) value: t.Any is_undefined: bool side_channel: WeakKeyDictionary @@ -197,29 +251,112 @@ class _EncodeFrame: add_query_prefix: bool depth: int max_depth: t.Optional[int] - phase: str = "start" - obj: t.Any = None - obj_wrapper: t.Optional[WeakWrapper] = None - step: int = 0 - obj_keys: t.List[t.Any] = field(default_factory=list) - values: t.List[t.Any] = field(default_factory=list) - index: int = 0 - adjusted_prefix: str = "" - cycle_state: t.Optional["_CycleState"] = None - cycle_level: t.Optional[int] = None - cycle_pushed: bool = False + path: t.Optional[KeyPathNode] + phase: int + obj: t.Any + obj_id: t.Optional[int] + is_mapping: bool + is_sequence: bool + step: int + obj_keys: t.List[t.Any] + values: t.List[t.Any] + index: int + adjusted_path: t.Optional[KeyPathNode] + cycle_state: t.Optional["_CycleState"] + cycle_level: t.Optional[int] + cycle_pushed: bool + + def __init__( + self, + value: t.Any, + is_undefined: bool, + side_channel: WeakKeyDictionary, + prefix: t.Optional[str], + comma_round_trip: t.Optional[bool], + comma_compact_nulls: bool, + encoder: t.Optional[t.Callable[[t.Any, t.Optional[Charset], t.Optional[Format]], str]], + serialize_date: t.Union[t.Callable[[datetime], t.Optional[str]], str], + sort: t.Optional[t.Callable[[t.Any, t.Any], int]], + filter_: t.Optional[t.Union[t.Callable, t.Sequence[t.Union[str, int]]]], + formatter: t.Optional[t.Callable[[str], str]], + format: Format, + generate_array_prefix: t.Callable[[str, t.Optional[str]], str], + allow_empty_lists: bool, + strict_null_handling: bool, + skip_nulls: bool, + encode_dot_in_keys: bool, + allow_dots: bool, + encode_values_only: bool, + charset: t.Optional[Charset], + add_query_prefix: bool, + depth: int, + max_depth: t.Optional[int], + path: t.Optional[KeyPathNode] = None, + cycle_state: t.Optional["_CycleState"] = None, + cycle_level: t.Optional[int] = None, + ) -> None: + self.value = value + self.is_undefined = is_undefined + self.side_channel = side_channel + self.prefix = prefix + self.comma_round_trip = comma_round_trip + self.comma_compact_nulls = comma_compact_nulls + self.encoder = encoder + self.serialize_date = serialize_date + self.sort = sort + self.filter_ = filter_ + self.formatter = formatter + self.format = format + self.generate_array_prefix = generate_array_prefix + self.allow_empty_lists = allow_empty_lists + self.strict_null_handling = strict_null_handling + self.skip_nulls = skip_nulls + self.encode_dot_in_keys = encode_dot_in_keys + self.allow_dots = allow_dots + self.encode_values_only = encode_values_only + self.charset = charset + self.add_query_prefix = add_query_prefix + self.depth = depth + self.max_depth = max_depth + self.path = path + self.phase = _PHASE_START + self.obj = None + self.obj_id = None + self.is_mapping = False + self.is_sequence = False + self.step = 0 + self.obj_keys = [] + self.values = [] + self.index = 0 + self.adjusted_path = None + self.cycle_state = cycle_state + self.cycle_level = cycle_level + self.cycle_pushed = False @dataclass -class _CycleEntry: - level: int - pos: t.Any - is_top: bool +class _CycleState: + entries: t.Dict[int, t.List[t.Tuple[int, t.Any, bool]]] = field(default_factory=dict) -@dataclass -class _CycleState: - entries: t.Dict[WeakWrapper, t.List[_CycleEntry]] = field(default_factory=dict) +def _identity_key(value: t.Any) -> int: + """Return an identity-stable integer key for cycle bookkeeping. + + This helper accepts raw ``id(obj)`` integers and returns them unchanged. + For ``WeakWrapper`` values it returns ``id(value.value)``; if the wrapped + object is unavailable (``ReferenceError``), it falls back to ``id(value)``. + + Callers should pass object references or ``id(obj)`` values only; arbitrary + non-id integers are treated as precomputed identity keys. + """ + if isinstance(value, int): + return value + if isinstance(value, WeakWrapper): + try: + return id(value.value) + except ReferenceError: + return id(value) + return id(value) def _bootstrap_cycle_state_from_side_channel(side_channel: WeakKeyDictionary) -> t.Tuple[_CycleState, int]: @@ -231,8 +368,8 @@ def _bootstrap_cycle_state_from_side_channel(side_channel: WeakKeyDictionary) -> from the current frame to the top-most side-channel mapping. """ chain: t.List[WeakKeyDictionary] = [] - tmp_sc: t.Optional[WeakKeyDictionary] = side_channel.get(_sentinel) # type: ignore[assignment] - while tmp_sc is not None: + tmp_sc = side_channel.get(_sentinel) + while isinstance(tmp_sc, WeakKeyDictionary): chain.append(tmp_sc) tmp_sc = tmp_sc.get(_sentinel) # type: ignore[assignment] @@ -240,14 +377,14 @@ def _bootstrap_cycle_state_from_side_channel(side_channel: WeakKeyDictionary) -> for level, ancestor in enumerate(reversed(chain)): is_top = ancestor.get(_sentinel) is None for key, pos in ancestor.items(): - if key is _sentinel or not isinstance(key, WeakWrapper): + if key is _sentinel: continue - state.entries.setdefault(key, []).append(_CycleEntry(level=level, pos=pos, is_top=is_top)) + state.entries.setdefault(_identity_key(key), []).append((level, pos, is_top)) return state, len(chain) -def _compute_step_and_check_cycle(state: _CycleState, wrapper: WeakWrapper, current_level: int) -> int: +def _compute_step_and_check_cycle(state: _CycleState, node_key: t.Any, current_level: int) -> int: """ Compute the current cycle-detection "step" and raise on circular reference. @@ -256,30 +393,63 @@ def _compute_step_and_check_cycle(state: _CycleState, wrapper: WeakWrapper, curr * raise when ancestor_pos == distance * return 0 when no match or when nearest match is the top-most side-channel """ - entries = state.entries.get(wrapper) + key_id = node_key if isinstance(node_key, int) else _identity_key(node_key) + entries = state.entries.get(key_id) if not entries: return 0 - nearest = entries[-1] - distance = current_level - nearest.level - if nearest.pos == distance: + ancestor_level, ancestor_pos, is_top = entries[-1] + distance = current_level - ancestor_level + if ancestor_pos == distance: raise ValueError("Circular reference detected") # noqa: TRY003 - return 0 if nearest.is_top else distance + return 0 if is_top else distance -def _push_current_node(state: _CycleState, wrapper: WeakWrapper, current_level: int, pos: int, is_top: bool) -> None: - state.entries.setdefault(wrapper, []).append(_CycleEntry(level=current_level, pos=pos, is_top=is_top)) +def _push_current_node(state: _CycleState, node_key: t.Any, current_level: int, pos: int, is_top: bool) -> None: + key_id = node_key if isinstance(node_key, int) else _identity_key(node_key) + state.entries.setdefault(key_id, []).append((current_level, pos, is_top)) -def _pop_current_node(state: _CycleState, wrapper: WeakWrapper) -> None: - entries = state.entries.get(wrapper) +def _pop_current_node(state: _CycleState, node_key: t.Any) -> None: + key_id = node_key if isinstance(node_key, int) else _identity_key(node_key) + entries = state.entries.get(key_id) if not entries: return entries.pop() if not entries: - del state.entries[wrapper] + del state.entries[key_id] + + +_INDICES_GENERATOR = ListFormat.INDICES.generator +_BRACKETS_GENERATOR = ListFormat.BRACKETS.generator +_REPEAT_GENERATOR = ListFormat.REPEAT.generator +_COMMA_GENERATOR = ListFormat.COMMA.generator + + +def _next_path_for_sequence( + path: KeyPathNode, + generator: t.Callable[[str, t.Optional[str]], str], + encoded_key: str, +) -> KeyPathNode: + if generator is _INDICES_GENERATOR: + return path.append(f"[{encoded_key}]") + if generator is _BRACKETS_GENERATOR: + return path.append("[]") + if generator is _REPEAT_GENERATOR or generator is _COMMA_GENERATOR: + return path + + parent = path.materialize() + child = generator(parent, encoded_key) + if child.startswith(parent): + return path.append(child[len(parent) :]) + + # Deliberate fallback for custom generators: a non-prefixed child string is + # treated as a fully materialized root path, so ancestor linkage is dropped. + # Downstream materialize()/as_dot_encoded() calls then operate on this new + # root only. Custom generators should prefix with `parent` to preserve ancestry. + return KeyPathNode.from_materialized(child) def _encode( @@ -376,60 +546,56 @@ def _encode( while stack: frame = stack[-1] - if frame.phase == "start": + if frame.phase == _PHASE_START: if frame.max_depth is None: frame.max_depth = _get_max_encode_depth(None) if frame.depth > frame.max_depth: raise ValueError(MAX_ENCODING_DEPTH_EXCEEDED) - if frame.prefix is None: - frame.prefix = "?" if frame.add_query_prefix else "" + if frame.path is None: + if frame.prefix is None: + frame.prefix = "?" if frame.add_query_prefix else "" + frame.path = KeyPathNode.from_materialized(frame.prefix) + # Internal invariant: `frame.path` is initialized above when absent. + current_path = t.cast(KeyPathNode, frame.path) if frame.comma_round_trip is None: - frame.comma_round_trip = frame.generate_array_prefix is ListFormat.COMMA.generator + frame.comma_round_trip = frame.generate_array_prefix is _COMMA_GENERATOR if frame.formatter is None: frame.formatter = frame.format.formatter - # Work with the original; we never mutate in place (we build new lists/maps when normalizing). obj: t.Any = frame.value - - # --- Pre-processing: filter & datetime handling ------------------------------- filter_opt = frame.filter_ + if callable(filter_opt): - # Callable filter can transform the object for this prefix. - obj = filter_opt(frame.prefix, obj) + obj = filter_opt(current_path.materialize(), obj) else: - # Normalize datetimes both for scalars and (in COMMA mode) list elements. if isinstance(obj, datetime): obj = frame.serialize_date(obj) if callable(frame.serialize_date) else obj.isoformat() - elif frame.generate_array_prefix is ListFormat.COMMA.generator and isinstance(obj, (list, tuple)): + elif frame.generate_array_prefix is _COMMA_GENERATOR and isinstance(obj, (list, tuple)): if callable(frame.serialize_date): obj = [frame.serialize_date(x) if isinstance(x, datetime) else x for x in obj] else: obj = [x.isoformat() if isinstance(x, datetime) else x for x in obj] - # --- Null handling ------------------------------------------------------------ if not frame.is_undefined and obj is None: if frame.strict_null_handling: - # Bare key (no '=value') when strict handling is requested. - result_token = ( - frame.encoder(frame.prefix, frame.charset, frame.format) + key_text = current_path.materialize() + key_value = ( + frame.encoder(key_text, frame.charset, frame.format) if callable(frame.encoder) and not frame.encode_values_only - else frame.prefix + else key_text ) + result_token = frame.formatter(key_value) if frame.formatter is not None else key_value stack.pop() last_result = result_token continue - # Otherwise treat `None` as empty string. obj = "" - # --- Fast path for primitives/bytes ----------------------------------------- if Utils.is_non_nullish_primitive(obj, frame.skip_nulls) or isinstance(obj, bytes): - # When a custom encoder is provided, still coerce Python bools to lowercase JSON style. + key_text = current_path.materialize() if callable(frame.encoder): key_value = ( - frame.prefix - if frame.encode_values_only - else frame.encoder(frame.prefix, frame.charset, frame.format) + key_text if frame.encode_values_only else frame.encoder(key_text, frame.charset, frame.format) ) if isinstance(obj, bool): value_part = "true" if obj else "false" @@ -437,12 +603,11 @@ def _encode( value_part = frame.encoder(obj, frame.charset, frame.format) result_tokens = [f"{frame.formatter(key_value)}={frame.formatter(value_part)}"] else: - # Default fallback (no custom encoder): ensure lowercase boolean literals. if isinstance(obj, bool): value_str = "true" if obj else "false" else: value_str = str(obj) - result_tokens = [f"{frame.formatter(frame.prefix)}={frame.formatter(value_str)}"] + result_tokens = [f"{frame.formatter(key_text)}={frame.formatter(value_str)}"] stack.pop() last_result = result_tokens @@ -450,35 +615,30 @@ def _encode( frame.obj = obj frame.values = [] + frame.is_mapping = isinstance(obj, ABCMapping) + frame.is_sequence = isinstance(obj, (list, tuple)) - # If the *key itself* was undefined (not present in the parent), there is nothing to emit. if frame.is_undefined: stack.pop() last_result = frame.values continue - # --- Cycle detection via ancestry lookup state -------------------------------- - # Only needed for traversable containers; primitive/bytes values return via fast path above. - obj_wrapper: WeakWrapper = WeakWrapper(obj) + obj_id = id(obj) if frame.cycle_state is None or frame.cycle_level is None: frame.cycle_state, frame.cycle_level = _bootstrap_cycle_state_from_side_channel(frame.side_channel) - step = _compute_step_and_check_cycle(frame.cycle_state, obj_wrapper, frame.cycle_level) + frame.step = _compute_step_and_check_cycle(frame.cycle_state, obj_id, frame.cycle_level) + frame.obj_id = obj_id - frame.obj_wrapper = obj_wrapper - frame.step = step - - # --- Determine which keys/indices to traverse ------------------------------- comma_effective_length: t.Optional[int] = None - if frame.generate_array_prefix is ListFormat.COMMA.generator and isinstance(obj, (list, tuple)): - # In COMMA mode we join the elements into a single token at this level. + if frame.generate_array_prefix is _COMMA_GENERATOR and frame.is_sequence: comma_items: t.List[t.Any] = list(obj) if frame.comma_compact_nulls: comma_items = [item for item in comma_items if item is not None] comma_effective_length = len(comma_items) if frame.encode_values_only and callable(frame.encoder): - encoded_items = Utils.apply(comma_items, frame.encoder) - obj_keys_value = ",".join(("" if e is None else str(e)) for e in encoded_items) + encoded_items = [frame.encoder(item, frame.charset, frame.format) for item in comma_items] + obj_keys_value = ",".join("" if e is None else str(e) for e in encoded_items) else: obj_keys_value = ",".join(Utils.normalize_comma_elem(e) for e in comma_items) @@ -491,76 +651,74 @@ def _encode( and isinstance(filter_opt, ABCSequence) and not isinstance(filter_opt, (str, bytes, bytearray)) ): - # Iterable filter restricts traversal to a fixed key/index set. frame.obj_keys = list(filter_opt) else: - # Default: enumerate keys/indices from mappings or sequences. - if isinstance(obj, t.Mapping): + if frame.is_mapping: keys = list(obj.keys()) - elif isinstance(obj, (list, tuple)): + elif frame.is_sequence: keys = list(range(len(obj))) else: keys = [] frame.obj_keys = sorted(keys, key=cmp_to_key(frame.sort)) if frame.sort is not None else keys - # Percent-encode literal dots in key names when requested. - encoded_prefix: str = frame.prefix.replace(".", "%2E") if frame.encode_dot_in_keys else frame.prefix + path_for_children = current_path.as_dot_encoded() if frame.encode_dot_in_keys else current_path - # In comma round-trip mode, ensure a single-element list appends `[]` to preserve type on decode. - single_item_for_round_trip: bool = False - if frame.comma_round_trip and isinstance(obj, (list, tuple)): - if frame.generate_array_prefix is ListFormat.COMMA.generator and comma_effective_length is not None: + single_item_for_round_trip = False + if frame.comma_round_trip and frame.is_sequence: + if frame.generate_array_prefix is _COMMA_GENERATOR and comma_effective_length is not None: single_item_for_round_trip = comma_effective_length == 1 else: single_item_for_round_trip = len(obj) == 1 - frame.adjusted_prefix = f"{encoded_prefix}[]" if single_item_for_round_trip else encoded_prefix + frame.adjusted_path = path_for_children.append("[]") if single_item_for_round_trip else path_for_children - # Optionally emit empty lists as `key[]=`. - if frame.allow_empty_lists and isinstance(obj, (list, tuple)) and not obj: + if frame.allow_empty_lists and frame.is_sequence and not obj: stack.pop() - last_result = [f"{frame.adjusted_prefix}[]"] + last_result = [frame.adjusted_path.append("[]").materialize()] continue frame.index = 0 - frame.phase = "iterate" + frame.phase = _PHASE_ITERATE continue - if frame.phase == "iterate": + elif frame.phase == _PHASE_ITERATE: if frame.index >= len(frame.obj_keys): - if frame.cycle_pushed and frame.obj_wrapper is not None and frame.cycle_state is not None: - _pop_current_node(frame.cycle_state, frame.obj_wrapper) + if frame.cycle_pushed and frame.obj_id is not None and frame.cycle_state is not None: + _pop_current_node(frame.cycle_state, frame.obj_id) frame.cycle_pushed = False stack.pop() last_result = frame.values continue - if not frame.cycle_pushed and frame.obj_wrapper is not None and frame.cycle_state is not None: + if not frame.cycle_pushed and frame.obj_id is not None and frame.cycle_state is not None: _push_current_node( frame.cycle_state, - frame.obj_wrapper, + frame.obj_id, frame.cycle_level if frame.cycle_level is not None else 0, frame.step, - (frame.cycle_level == 0), + frame.cycle_level == 0, ) - frame.side_channel[frame.obj_wrapper] = frame.step frame.cycle_pushed = True _key = frame.obj_keys[frame.index] frame.index += 1 - # Resolve the child value and whether it was "undefined" at this level. _value: t.Any _value_undefined: bool - if isinstance(_key, t.Mapping) and "value" in _key and not isinstance(_key.get("value"), Undefined): + if isinstance(_key, ABCMapping) and "value" in _key and not isinstance(_key.get("value"), Undefined): _value = _key.get("value") _value_undefined = False else: try: - if isinstance(frame.obj, t.Mapping): - _value = frame.obj.get(_key) - _value_undefined = _key not in frame.obj - elif isinstance(frame.obj, (list, tuple)): + if frame.is_mapping: + candidate = frame.obj.get(_key, _MISSING) + if candidate is _MISSING: + _value = None + _value_undefined = True + else: + _value = candidate + _value_undefined = False + elif frame.is_sequence: if isinstance(_key, int): _value = frame.obj[_key] _value_undefined = False @@ -571,40 +729,37 @@ def _encode( _value = frame.obj[_key] _value_undefined = False except Exception: # noqa: BLE001 # pylint: disable=W0718 - # User-provided __getitem__/mapping accessors may raise arbitrary exceptions. _value = None _value_undefined = True - # Optionally drop null children. if frame.skip_nulls and _value is None: continue - # When using dotted paths and also encoding dots in keys, percent-escape '.' inside key names. - encoded_key: str = ( - str(_key).replace(".", "%2E") if frame.allow_dots and frame.encode_dot_in_keys else str(_key) - ) + encoded_key = str(_key).replace(".", "%2E") if frame.allow_dots and frame.encode_dot_in_keys else str(_key) + if frame.path is None: # pragma: no cover - internal invariant + raise RuntimeError("path is not initialized") # noqa: TRY003 + adjusted_path = frame.adjusted_path if frame.adjusted_path is not None else frame.path - # Build the child key path depending on whether we're traversing a list or a mapping. - key_prefix: str = ( - frame.generate_array_prefix(frame.adjusted_prefix, encoded_key) - if isinstance(frame.obj, (list, tuple)) - else f"{frame.adjusted_prefix}{f'.{encoded_key}' if frame.allow_dots else f'[{encoded_key}]'}" - ) + if frame.is_sequence: + child_path = _next_path_for_sequence(adjusted_path, frame.generate_array_prefix, encoded_key) + else: + child_path = adjusted_path.append(f".{encoded_key}" if frame.allow_dots else f"[{encoded_key}]") - frame.phase = "await_child" + frame.phase = _PHASE_AWAIT_CHILD stack.append( _EncodeFrame( value=_value, is_undefined=_value_undefined, side_channel=frame.side_channel, - prefix=key_prefix, + prefix=None, + path=child_path, comma_round_trip=frame.comma_round_trip, comma_compact_nulls=frame.comma_compact_nulls, encoder=( None - if frame.generate_array_prefix is ListFormat.COMMA.generator + if frame.generate_array_prefix is _COMMA_GENERATOR and frame.encode_values_only - and isinstance(frame.obj, (list, tuple)) + and frame.is_sequence else frame.encoder ), serialize_date=frame.serialize_date, @@ -629,11 +784,14 @@ def _encode( ) continue - # frame.phase == "await_child" - if isinstance(last_result, (list, tuple)): - frame.values.extend(last_result) else: - frame.values.append(last_result) - frame.phase = "iterate" + if frame.phase != _PHASE_AWAIT_CHILD: # pragma: no cover - internal invariant + raise RuntimeError("Unexpected _encode frame phase") # noqa: TRY003 + + if isinstance(last_result, (list, tuple)): + frame.values.extend(last_result) + else: + frame.values.append(last_result) + frame.phase = _PHASE_ITERATE return [] if last_result is None else last_result diff --git a/src/qs_codec/models/key_path_node.py b/src/qs_codec/models/key_path_node.py new file mode 100644 index 0000000..c9cf723 --- /dev/null +++ b/src/qs_codec/models/key_path_node.py @@ -0,0 +1,100 @@ +"""Linked key-path nodes used by the encoder to reduce string churn.""" + +from __future__ import annotations + +import typing as t + + +class KeyPathNode: + """Key-path node with immutable path semantics and write-once lazy caches. + + The parent/segment chain defines the path and does not change after + construction. For performance, ``dot_encoded`` and ``materialized`` are + lazily populated cache slots; they are written once and do not alter the + path semantics. + """ + + __slots__ = ("depth", "dot_encoded", "materialized", "parent", "segment") + parent: t.Optional["KeyPathNode"] + segment: str + depth: int + dot_encoded: t.Optional["KeyPathNode"] + materialized: t.Optional[str] + + def __init__(self, parent: t.Optional["KeyPathNode"], segment: str) -> None: + """Create a path node linked to an optional parent.""" + self.parent = parent + self.segment = segment + self.depth = (parent.depth if parent is not None else 0) + 1 + self.dot_encoded: t.Optional["KeyPathNode"] = None + self.materialized: t.Optional[str] = None + + @classmethod + def from_materialized(cls, value: str) -> "KeyPathNode": + """Create a root node from a full materialized prefix.""" + return cls(None, value) + + def append(self, segment: str) -> "KeyPathNode": + """Append a segment and return a new node, or self for empty segments.""" + return self if not segment else KeyPathNode(self, segment) + + def as_dot_encoded(self) -> "KeyPathNode": + """Return a cached view where literal dots are encoded as ``%2E``.""" + cached = self.dot_encoded + if cached is not None: + return cached + + unresolved: t.List["KeyPathNode"] = [] + node: t.Optional["KeyPathNode"] = self + while node is not None and node.dot_encoded is None: + unresolved.append(node) + node = node.parent + + for current in reversed(unresolved): + encoded_segment = current.segment.replace(".", "%2E") if "." in current.segment else current.segment + parent = current.parent + if parent is None: + encoded = current if encoded_segment is current.segment else KeyPathNode(None, encoded_segment) + else: + encoded_parent = parent.dot_encoded + if encoded_parent is None: # pragma: no cover - internal invariant + raise RuntimeError("dot_encoded parent is not initialized") # noqa: TRY003 + if encoded_parent is parent and encoded_segment is current.segment: + encoded = current + else: + encoded = KeyPathNode(encoded_parent, encoded_segment) + current.dot_encoded = encoded + + return self.dot_encoded if self.dot_encoded is not None else self + + def materialize(self) -> str: + """Render and cache the full path text.""" + cached = self.materialized + if cached is not None: + return cached + + parent = self.parent + if parent is None: + self.materialized = self.segment + return self.segment + + if self.depth == 2: + parent_part = parent.materialized + if parent_part is None: + parent_part = parent.segment + parent.materialized = parent_part + value = parent_part + self.segment + self.materialized = value + return value + + parts = [""] * self.depth + node: t.Optional["KeyPathNode"] = self + index = self.depth - 1 + while node is not None: + parts[index] = node.segment + node = node.parent + index -= 1 + + value = "".join(parts) + self.materialized = value + return value diff --git a/src/qs_codec/utils/utils.py b/src/qs_codec/utils/utils.py index 1bd6bc2..4808872 100644 --- a/src/qs_codec/utils/utils.py +++ b/src/qs_codec/utils/utils.py @@ -19,6 +19,7 @@ import typing as t from collections import deque +from collections.abc import Mapping as ABCMapping from dataclasses import dataclass, field from datetime import datetime, timedelta from decimal import Decimal @@ -133,7 +134,7 @@ def merge( last_result = current_target continue - if not isinstance(current_source, t.Mapping): + if not isinstance(current_source, ABCMapping): # Fast-path: merging a non-mapping (list/tuple/scalar) into target. if isinstance(current_target, (list, tuple)): # If the target sequence contains `Undefined`, we may need to promote it @@ -162,8 +163,8 @@ def merge( continue if isinstance(current_source, (list, tuple)): - if all(isinstance(el, (t.Mapping, Undefined)) for el in current_target) and all( - isinstance(el, (t.Mapping, Undefined)) for el in current_source + if all(isinstance(el, (ABCMapping, Undefined)) for el in current_target) and all( + isinstance(el, (ABCMapping, Undefined)) for el in current_source ): frame.list_target = dict(enumerate(current_target)) frame.list_source = dict(enumerate(current_source)) @@ -188,7 +189,7 @@ def merge( last_result = mutable_target continue - if isinstance(current_target, t.Mapping): + if isinstance(current_target, ABCMapping): if Utils.is_overflow(current_target): stack.pop() last_result = Utils.combine(current_target, current_source, frame.options) @@ -222,7 +223,7 @@ def merge( # Source is a mapping but target is not — coerce target to a mapping or # concatenate as a list, then proceed. - if current_target is None or not isinstance(current_target, t.Mapping): + if current_target is None or not isinstance(current_target, ABCMapping): if isinstance(current_target, (list, tuple)): stack.pop() last_result = { @@ -614,12 +615,12 @@ def is_non_nullish_primitive(val: t.Any, skip_nulls: bool = False) -> bool: if isinstance(val, Undefined): return False - if isinstance(val, object): - if isinstance(val, (list, tuple, t.Mapping)): - return False - return True + if isinstance(val, (list, tuple, ABCMapping)): + return False - return False + # Opaque custom types are treated as primitives; keep the explicit fallback + # check for compatibility with tests that monkeypatch `isinstance`. + return isinstance(val, object) @staticmethod def normalize_comma_elem(e: t.Any) -> str: From 397ff7be3ccd01fb93912458b537c3d2dd23bf6e Mon Sep 17 00:00:00 2001 From: Klemen Tusar Date: Wed, 25 Feb 2026 19:41:49 +0100 Subject: [PATCH 2/3] test(encode): add regression coverage for edge cases and internals Expand tests for key-path nodes, generator fallbacks, strict-null formatting, and callable-filter mutation safety. --- tests/unit/encode_internal_helpers_test.py | 40 ++++++++++++++++ tests/unit/encode_test.py | 28 ++++++++++++ tests/unit/key_path_node_test.py | 53 ++++++++++++++++++++++ tests/unit/list_format_test.py | 20 ++++++++ 4 files changed, 141 insertions(+) create mode 100644 tests/unit/encode_internal_helpers_test.py create mode 100644 tests/unit/key_path_node_test.py create mode 100644 tests/unit/list_format_test.py diff --git a/tests/unit/encode_internal_helpers_test.py b/tests/unit/encode_internal_helpers_test.py new file mode 100644 index 0000000..ce1c06f --- /dev/null +++ b/tests/unit/encode_internal_helpers_test.py @@ -0,0 +1,40 @@ +import typing as t + +from qs_codec.encode import _identity_key, _next_path_for_sequence +from qs_codec.models.key_path_node import KeyPathNode +from qs_codec.models.weak_wrapper import WeakWrapper + + +def test_identity_key_returns_int_unchanged() -> None: + assert _identity_key(42) == 42 + + +def test_identity_key_handles_collected_weak_wrapper() -> None: + wrapper = WeakWrapper({"a": "b"}) + object.__setattr__(wrapper, "_wref", lambda: None) + assert _identity_key(wrapper) == id(wrapper) + + +def test_identity_key_returns_object_id_for_non_wrapper_values() -> None: + value = {"a": "b"} + assert _identity_key(value) == id(value) + + +def test_next_path_for_sequence_uses_custom_suffix_when_child_starts_with_parent() -> None: + root = KeyPathNode.from_materialized("root") + + def custom_generator(prefix: str, key: t.Optional[str]) -> str: + return f"{prefix}<{key}>" + + next_path = _next_path_for_sequence(root, custom_generator, "item") + assert next_path.materialize() == "root" + + +def test_next_path_for_sequence_rebuilds_when_child_is_not_prefixed() -> None: + root = KeyPathNode.from_materialized("root") + + def custom_generator(prefix: str, key: t.Optional[str]) -> str: # pylint: disable=W0613 # noqa: ARG001 + return f"other[{key}]" + + next_path = _next_path_for_sequence(root, custom_generator, "item") + assert next_path.materialize() == "other[item]" diff --git a/tests/unit/encode_test.py b/tests/unit/encode_test.py index b1115d0..92f84ed 100644 --- a/tests/unit/encode_test.py +++ b/tests/unit/encode_test.py @@ -650,6 +650,12 @@ def test_encodes_a_complicated_map(self) -> None: [ pytest.param({"a": ""}, None, "a=", id="empty-string"), pytest.param({"a": None}, EncodeOptions(strict_null_handling=True), "a", id="none-strict-null"), + pytest.param( + {"a b": None}, + EncodeOptions(strict_null_handling=True, format=Format.RFC1738), + "a+b", + id="none-strict-null-rfc1738-space", + ), pytest.param({"a": "", "b": ""}, None, "a=&b=", id="multiple-empty"), pytest.param( {"a": None, "b": ""}, @@ -967,6 +973,28 @@ def filter_func(prefix: str, value: t.Any) -> t.Any: assert encode(obj, options=EncodeOptions(filter=filter_func)) == "a=b&c=&e%5Bf%5D=1257894000" assert calls == 5 + def test_callable_filter_does_not_mutate_root_list_elements(self) -> None: + data: t.List[t.Dict[str, str]] = [{"a": "b"}] + + def filter_func(prefix: str, value: t.Any) -> t.Any: + if prefix == "": + value["0"]["a"] = "x" + return value + + assert encode(data, options=EncodeOptions(filter=filter_func)) == "0%5Ba%5D=x" + assert data == [{"a": "b"}] + + def test_callable_filter_does_not_mutate_root_tuple_elements(self) -> None: + data: t.Tuple[t.Dict[str, str], ...] = ({"a": "b"},) + + def filter_func(prefix: str, value: t.Any) -> t.Any: + if prefix == "": + value["0"]["a"] = "x" + return value + + assert encode(data, options=EncodeOptions(filter=filter_func)) == "0%5Ba%5D=x" + assert data[0]["a"] == "b" + def test_encode_handles_mapping_get_exception(self) -> None: class ExplodingMapping(t.Mapping): def __iter__(self): diff --git a/tests/unit/key_path_node_test.py b/tests/unit/key_path_node_test.py new file mode 100644 index 0000000..134e23c --- /dev/null +++ b/tests/unit/key_path_node_test.py @@ -0,0 +1,53 @@ +import pytest + +from qs_codec.models.key_path_node import KeyPathNode + + +class TestKeyPathNode: + def test_append_empty_segment_returns_same_node(self) -> None: + root = KeyPathNode.from_materialized("root") + assert root.append("") is root + + def test_materialize_builds_nested_paths(self) -> None: + path = KeyPathNode.from_materialized("a").append("[b]").append("[c]") + assert path.materialize() == "a[b][c]" + + def test_materialize_uses_cached_value(self) -> None: + path = KeyPathNode.from_materialized("a").append("[b]").append("[c]") + first = path.materialize() + second = path.materialize() + assert first == second == "a[b][c]" + + def test_as_dot_encoded_replaces_literal_dots(self) -> None: + path = KeyPathNode.from_materialized("a.b").append("[c.d]") + encoded = path.as_dot_encoded() + assert encoded.materialize() == "a%2Eb[c%2Ed]" + + def test_as_dot_encoded_reuses_cached_node(self) -> None: + path = KeyPathNode.from_materialized("a.b").append("[c]") + first = path.as_dot_encoded() + second = path.as_dot_encoded() + assert first is second + + def test_as_dot_encoded_returns_self_when_no_segments_need_encoding(self) -> None: + path = KeyPathNode.from_materialized("a").append("[c]") + assert path.as_dot_encoded() is path + + def test_as_dot_encoded_handles_deep_paths_without_recursion_error(self) -> None: + path = KeyPathNode.from_materialized("root") + for i in range(12_000): + path = path.append(f".k{i}") + + encoded = path.as_dot_encoded().materialize() + assert encoded.startswith("root%2Ek0%2Ek1") + + @pytest.mark.parametrize( + "path, expected", + [ + (KeyPathNode.from_materialized("a"), "a"), + (KeyPathNode.from_materialized("a").append(".b"), "a.b"), + (KeyPathNode.from_materialized("a").append("[0]").append("[b]"), "a[0][b]"), + ], + ) + def test_materialize_path_variants(self, path: KeyPathNode, expected: str) -> None: + assert path.materialize() == expected diff --git a/tests/unit/list_format_test.py b/tests/unit/list_format_test.py new file mode 100644 index 0000000..a475807 --- /dev/null +++ b/tests/unit/list_format_test.py @@ -0,0 +1,20 @@ +import typing as t + +import pytest + +from qs_codec.enums.list_format import ListFormatGenerator + + +@pytest.mark.parametrize( + "generator, prefix, key, expected", + [ + (ListFormatGenerator.brackets, "a", None, "a[]"), + (ListFormatGenerator.comma, "a", "0", "a"), + (ListFormatGenerator.indices, "a", "0", "a[0]"), + (ListFormatGenerator.repeat, "a", "0", "a"), + ], +) +def test_list_format_generators( + generator: t.Callable[[str, t.Optional[str]], str], prefix: str, key: t.Optional[str], expected: str +) -> None: + assert generator(prefix, key) == expected From 9dcc65d77ba1d5d58ef8f45f3f8b89615e211bb5 Mon Sep 17 00:00:00 2001 From: Klemen Tusar Date: Wed, 25 Feb 2026 19:41:57 +0100 Subject: [PATCH 3/3] docs(changelog): summarize 1.4.3-wip encode improvements --- CHANGELOG.md | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1aa25a6..1185aa0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,13 @@ +## 1.4.3-wip + +* [CHORE] optimize `encode` traversal internals and hot paths with lower allocation overhead +* [CHORE] add internal `KeyPathNode` path caching for lazy materialization and dot-encoded path reuse +* [FIX] avoid `RecursionError` in deep key-path dot encoding by making `KeyPathNode.as_dot_encoded` iterative +* [FIX] ensure strict null handling applies RFC formatter behavior for bare-key output (e.g. RFC1738 space handling) +* [FIX] avoid mutating caller-owned root list/tuple elements when `EncodeOptions.filter` is callable +* [CHORE] remove unused `KeyPathNode.total_length` state to keep nodes lightweight +* [CHORE] expand encode regression and internal helper tests (path caching, list format generators, strict-null formatting, mutation safety) + ## 1.4.2 * [CHORE] optimize `decode` by skipping dot-in-keys normalization when "%2" is not present in key segments