22Helpers for working with the `act` claim on verified access token claims.
33"""
44
5- from __future__ import annotations
6-
75from collections .abc import Mapping
86from typing import Any
97
108from .errors import VerifyAccessTokenError
11- from .types import ActClaim
9+
10+ INVALID_ACT_CLAIM_MESSAGE = "Invalid act claim"
1211
1312
1413def get_current_actor (claims : Mapping [str , Any ]) -> str | None :
@@ -18,12 +17,21 @@ def get_current_actor(claims: Mapping[str, Any]) -> str | None:
1817 Only the outermost `act.sub` should be used for authorization decisions.
1918 Nested `act` values represent prior actors and are informational.
2019 """
20+ if not isinstance (claims , Mapping ):
21+ raise VerifyAccessTokenError (INVALID_ACT_CLAIM_MESSAGE )
2122
22- act_claim = _get_validated_act_claim ( claims )
23+ act_claim = claims . get ( "act" )
2324 if act_claim is None :
2425 return None
2526
26- return act_claim ["sub" ]
27+ if not isinstance (act_claim , Mapping ):
28+ raise VerifyAccessTokenError (INVALID_ACT_CLAIM_MESSAGE )
29+
30+ sub = act_claim .get ("sub" )
31+ if not isinstance (sub , str ) or not sub .strip ():
32+ raise VerifyAccessTokenError (INVALID_ACT_CLAIM_MESSAGE )
33+
34+ return sub
2735
2836
2937def get_delegation_chain (claims : Mapping [str , Any ]) -> list [str ]:
@@ -34,54 +42,23 @@ def get_delegation_chain(claims: Mapping[str, Any]) -> list[str]:
3442 prior actors from nested `act` values and are typically most useful for audit
3543 and attribution.
3644 """
45+ if not isinstance (claims , Mapping ):
46+ raise VerifyAccessTokenError (INVALID_ACT_CLAIM_MESSAGE )
3747
38- act_claim = _get_validated_act_claim ( claims )
39- if act_claim is None :
48+ current = claims . get ( "act" )
49+ if current is None :
4050 return []
4151
4252 chain : list [str ] = []
43- current : ActClaim | None = act_claim
4453 while current is not None :
45- chain .append (current ["sub" ])
46- current = current .get ("act" )
47-
48- return chain
49-
54+ if not isinstance (current , Mapping ):
55+ raise VerifyAccessTokenError (INVALID_ACT_CLAIM_MESSAGE )
5056
51- def _get_validated_act_claim (claims : Mapping [str , Any ]) -> ActClaim | None :
52- if not isinstance (claims , Mapping ):
53- raise VerifyAccessTokenError ("Verified access token claims must be an object" )
54-
55- act_claim = claims .get ("act" )
56- if act_claim is None :
57- return None
58-
59- return _parse_act_claim (act_claim , path = "act" , seen = set ())
60-
61-
62- def _parse_act_claim (value : Any , * , path : str , seen : set [int ]) -> ActClaim :
63- if not isinstance (value , Mapping ):
64- raise VerifyAccessTokenError (f"{ path } must be an object" )
57+ sub = current .get ("sub" )
58+ if not isinstance (sub , str ) or not sub .strip ():
59+ raise VerifyAccessTokenError (INVALID_ACT_CLAIM_MESSAGE )
6560
66- object_id = id (value )
67- if object_id in seen :
68- raise VerifyAccessTokenError (f"{ path } contains a circular reference" )
61+ chain .append (sub )
62+ current = current .get ("act" )
6963
70- seen .add (object_id )
71- try :
72- sub = value .get ("sub" )
73- if not isinstance (sub , str ) or not sub .strip ():
74- raise VerifyAccessTokenError (f"{ path } .sub must be a non-empty string" )
75-
76- parsed : ActClaim = {"sub" : sub }
77- nested_act = value .get ("act" )
78- if nested_act is not None :
79- parsed ["act" ] = _parse_act_claim (
80- nested_act ,
81- path = f"{ path } .act" ,
82- seen = seen ,
83- )
84-
85- return parsed
86- finally :
87- seen .remove (object_id )
64+ return chain
0 commit comments