Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
303 changes: 210 additions & 93 deletions aredis_om/model/encoders.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,114 +65,153 @@ def generate_encoders_by_class_tuples(
encoders_by_class_tuples = generate_encoders_by_class_tuples(ENCODERS_BY_TYPE)


def jsonable_encoder(
obj: Any,
include: Optional[Union[SetIntStr, DictIntStrAny]] = None,
exclude: Optional[Union[SetIntStr, DictIntStrAny]] = None,
by_alias: bool = True,
exclude_unset: bool = False,
exclude_defaults: bool = False,
exclude_none: bool = False,
custom_encoder: Dict[Any, Callable[[Any], Any]] = {},
sqlalchemy_safe: bool = True,
) -> Any:
def _normalize_include_exclude(
include: Optional[Union[SetIntStr, DictIntStrAny]],
exclude: Optional[Union[SetIntStr, DictIntStrAny]],
) -> Tuple[Optional[Union[SetIntStr, DictIntStrAny]], Optional[Union[SetIntStr, DictIntStrAny]]]:
if include is not None and not isinstance(include, (set, dict)):
include = set(include)
if exclude is not None and not isinstance(exclude, (set, dict)):
exclude = set(exclude)
return include, exclude

if isinstance(obj, BaseModel) and hasattr(obj, "__config__"):
encoder = getattr(obj.__config__, "json_encoders", {})
if custom_encoder:
encoder.update(custom_encoder)
obj_dict = obj.model_dump(
include=include, # type: ignore # in Pydantic
exclude=exclude, # type: ignore # in Pydantic
by_alias=by_alias,
exclude_unset=exclude_unset,
exclude_none=exclude_none,
exclude_defaults=exclude_defaults,
)
if "__root__" in obj_dict:
obj_dict = obj_dict["__root__"]
return jsonable_encoder(
obj_dict,
exclude_none=exclude_none,
exclude_defaults=exclude_defaults,
custom_encoder=encoder,
sqlalchemy_safe=sqlalchemy_safe,

def _encode_pydantic_model(
obj: BaseModel,
include: Optional[Union[SetIntStr, DictIntStrAny]],
exclude: Optional[Union[SetIntStr, DictIntStrAny]],
by_alias: bool,
exclude_unset: bool,
exclude_defaults: bool,
exclude_none: bool,
custom_encoder: Dict[Any, Callable[[Any], Any]],
sqlalchemy_safe: bool,
) -> Any:
encoder = dict(getattr(obj.__config__, "json_encoders", {}))
if custom_encoder:
encoder.update(custom_encoder)
obj_dict = obj.model_dump(
include=include, # type: ignore # in Pydantic
exclude=exclude, # type: ignore # in Pydantic
by_alias=by_alias,
exclude_unset=exclude_unset,
exclude_none=exclude_none,
exclude_defaults=exclude_defaults,
)
if "__root__" in obj_dict:
obj_dict = obj_dict["__root__"]
return jsonable_encoder(
obj_dict,
exclude_none=exclude_none,
exclude_defaults=exclude_defaults,
custom_encoder=encoder,
sqlalchemy_safe=sqlalchemy_safe,
)


def _encode_dict(
obj: Dict[Any, Any],
include: Optional[Union[SetIntStr, DictIntStrAny]],
exclude: Optional[Union[SetIntStr, DictIntStrAny]],
by_alias: bool,
exclude_unset: bool,
exclude_defaults: bool,
exclude_none: bool,
custom_encoder: Dict[Any, Callable[[Any], Any]],
sqlalchemy_safe: bool,
) -> Dict[Any, Any]:
encoded_dict = {}
for key, value in obj.items():
should_include = (
(
not sqlalchemy_safe
or (not isinstance(key, str))
or (not key.startswith("_sa"))
)
and value is not PydanticUndefined
and (value is not None or not exclude_none)
and ((include and key in include) or not exclude or key not in exclude)
)
if dataclasses.is_dataclass(obj):
return dataclasses.asdict(obj) # type: ignore
if isinstance(obj, Enum):
return obj.value
if isinstance(obj, PurePath):
return str(obj)
if isinstance(obj, (str, int, float, type(None))):
return obj
if isinstance(obj, dict):
encoded_dict = {}
for key, value in obj.items():
if (
(
not sqlalchemy_safe
or (not isinstance(key, str))
or (not key.startswith("_sa"))
)
and value is not PydanticUndefined
and (value is not None or not exclude_none)
and ((include and key in include) or not exclude or key not in exclude)
):
encoded_key = jsonable_encoder(
key,
by_alias=by_alias,
exclude_unset=exclude_unset,
exclude_none=exclude_none,
custom_encoder=custom_encoder,
sqlalchemy_safe=sqlalchemy_safe,
)
encoded_value = jsonable_encoder(
value,
by_alias=by_alias,
exclude_unset=exclude_unset,
exclude_none=exclude_none,
custom_encoder=custom_encoder,
sqlalchemy_safe=sqlalchemy_safe,
)
encoded_dict[encoded_key] = encoded_value
return encoded_dict
if isinstance(obj, (list, set, frozenset, GeneratorType, tuple)):
encoded_list = []
for item in obj:
encoded_list.append(
jsonable_encoder(
item,
include=include,
exclude=exclude,
by_alias=by_alias,
exclude_unset=exclude_unset,
exclude_defaults=exclude_defaults,
exclude_none=exclude_none,
custom_encoder=custom_encoder,
sqlalchemy_safe=sqlalchemy_safe,
)
if should_include:
encoded_key = jsonable_encoder(
key,
by_alias=by_alias,
exclude_unset=exclude_unset,
exclude_none=exclude_none,
custom_encoder=custom_encoder,
sqlalchemy_safe=sqlalchemy_safe,
)
encoded_value = jsonable_encoder(
value,
by_alias=by_alias,
exclude_unset=exclude_unset,
exclude_none=exclude_none,
custom_encoder=custom_encoder,
sqlalchemy_safe=sqlalchemy_safe,
)
return encoded_list
encoded_dict[encoded_key] = encoded_value
return encoded_dict

if custom_encoder:
if type(obj) in custom_encoder:
return custom_encoder[type(obj)](obj)
else:
for encoder_type, encoder in custom_encoder.items():
if isinstance(obj, encoder_type):
return encoder(obj)

def _encode_iterable(
obj: Union[List[Any], Set[Any], frozenset, GeneratorType, Tuple[Any, ...]],
include: Optional[Union[SetIntStr, DictIntStrAny]],
exclude: Optional[Union[SetIntStr, DictIntStrAny]],
by_alias: bool,
exclude_unset: bool,
exclude_defaults: bool,
exclude_none: bool,
custom_encoder: Dict[Any, Callable[[Any], Any]],
sqlalchemy_safe: bool,
) -> List[Any]:
encoded_list = []
for item in obj:
encoded_list.append(
jsonable_encoder(
item,
include=include,
exclude=exclude,
by_alias=by_alias,
exclude_unset=exclude_unset,
exclude_defaults=exclude_defaults,
exclude_none=exclude_none,
custom_encoder=custom_encoder,
sqlalchemy_safe=sqlalchemy_safe,
)
)
return encoded_list


def _apply_custom_encoder(
obj: Any,
custom_encoder: Dict[Any, Callable[[Any], Any]],
) -> Any:
if type(obj) in custom_encoder:
return custom_encoder[type(obj)](obj)
for encoder_type, encoder in custom_encoder.items():
if isinstance(obj, encoder_type):
return encoder(obj)
return None


def _apply_builtin_encoder(obj: Any) -> Any:
if type(obj) in ENCODERS_BY_TYPE:
return ENCODERS_BY_TYPE[type(obj)](obj)
for encoder, classes_tuple in encoders_by_class_tuples.items():
if isinstance(obj, classes_tuple):
return encoder(obj)
return None


def _encode_fallback_object(
obj: Any,
by_alias: bool,
exclude_unset: bool,
exclude_defaults: bool,
exclude_none: bool,
custom_encoder: Dict[Any, Callable[[Any], Any]],
sqlalchemy_safe: bool,
) -> Any:
errors: List[Exception] = []
try:
data = dict(obj)
Expand All @@ -192,3 +231,81 @@ def jsonable_encoder(
custom_encoder=custom_encoder,
sqlalchemy_safe=sqlalchemy_safe,
)


def jsonable_encoder(
obj: Any,
include: Optional[Union[SetIntStr, DictIntStrAny]] = None,
exclude: Optional[Union[SetIntStr, DictIntStrAny]] = None,
by_alias: bool = True,
exclude_unset: bool = False,
exclude_defaults: bool = False,
exclude_none: bool = False,
custom_encoder: Dict[Any, Callable[[Any], Any]] = {},
sqlalchemy_safe: bool = True,
) -> Any:
include, exclude = _normalize_include_exclude(include, exclude)

if isinstance(obj, BaseModel) and hasattr(obj, "__config__"):
return _encode_pydantic_model(
obj,
include,
exclude,
by_alias,
exclude_unset,
exclude_defaults,
exclude_none,
custom_encoder,
sqlalchemy_safe,
)
if dataclasses.is_dataclass(obj):
return dataclasses.asdict(obj) # type: ignore
if isinstance(obj, Enum):
return obj.value
if isinstance(obj, PurePath):
return str(obj)
if isinstance(obj, (str, int, float, type(None))):
return obj
if isinstance(obj, dict):
return _encode_dict(
obj,
include,
exclude,
by_alias,
exclude_unset,
exclude_defaults,
exclude_none,
custom_encoder,
sqlalchemy_safe,
)
if isinstance(obj, (list, set, frozenset, GeneratorType, tuple)):
return _encode_iterable(
obj,
include,
exclude,
by_alias,
exclude_unset,
exclude_defaults,
exclude_none,
custom_encoder,
sqlalchemy_safe,
)

if custom_encoder:
encoded_obj = _apply_custom_encoder(obj, custom_encoder)
if encoded_obj is not None:
return encoded_obj

encoded_obj = _apply_builtin_encoder(obj)
if encoded_obj is not None:
return encoded_obj
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Encoders returning None are silently dropped

Medium Severity

_apply_custom_encoder and _apply_builtin_encoder both return None as a "not found" sentinel, but None is also a valid encoder return value. The if encoded_obj is not None checks in jsonable_encoder cause the code to incorrectly skip an encoder that legitimately returns None and fall through to _encode_fallback_object, which will likely raise a ValueError. The original code returned the encoder's result directly without any None check.

Additional Locations (1)
Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 3e2a085. Configure here.


return _encode_fallback_object(
obj,
by_alias,
exclude_unset,
exclude_defaults,
exclude_none,
custom_encoder,
sqlalchemy_safe,
)
Loading