Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 156 additions & 0 deletions datadog_sync/model/restriction_policies.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ class RestrictionPolicies(BaseResource):
# Additional RestrictionPolicies specific attributes
current_user_path: str = "/api/v2/current_user"
org_principal: Optional[str] = None
# UUID of the syncing service account, captured in pre_apply_hook from
# /api/v2/current_user. Used by the self-demote pre-filter to skip
# policies that would remove this user's existing editor binding —
# the Datadog API rejects such requests with 400 "users cannot
# decrease their own level of access", producing noisy ERROR events
# in prod telemetry with no actionable signal for operators.
current_user_uuid: Optional[str] = None

async def get_resources(self, client: CustomClient) -> List[Dict]:
policies = []
Expand Down Expand Up @@ -93,19 +100,168 @@ async def import_resource(self, _id: Optional[str] = None, resource: Optional[Di
return import_id, resource["data"]

async def pre_resource_action_hook(self, _id, resource: Dict) -> None:
# Pre-filter two deterministic failure modes BEFORE the API call so
# they don't surface as ERROR-level events in operator/customer
# telemetry (NATHAN-50). Both checks are best-effort — they bail
# out silently when the state needed to decide is unavailable.
self._skip_if_target_dashboard_is_read_only(_id, resource)
self._skip_if_would_self_demote(_id, resource)

# Existing behavior: rewrite the source-org "org:<uuid>" principal
# to the destination-org "org:<uuid>" so bindings reference the
# local org. Runs AFTER the skip filters so the skip checks see
# the as-imported source-shaped resource and aren't disturbed by
# the in-place principal rewrite.
if self.org_principal:
for binding in resource["attributes"]["bindings"]:
for i, key in enumerate(binding["principals"]):
if key.startswith("org:"):
binding["principals"][i] = self.org_principal
break

def _skip_if_target_dashboard_is_read_only(self, _id: str, resource: Dict) -> None:
"""Skip dashboard-targeted policies when the destination dashboard
is read-only (built-in / template / shared).

The Datadog restriction-policy API rejects attachment to read-only
dashboards with 403 "This dashboard is read-only". The check uses
the ``is_read_only`` field from the destination dashboard state,
which is populated by the dashboards resource's create/update
response (the API returns the full body including this field even
though it is in ``excluded_attributes`` — that list only affects
diffing, not state storage; see resource_utils.prep_resource).
"""
policy_id = resource.get("id") or _id
if not isinstance(policy_id, str) or not policy_id.startswith("dashboard:"):
# Only dashboards expose ``is_read_only``; other target types
# pass through this branch.
return

# Resolve the destination dashboard via the source dashboard id
# (the second half of the policy id), keyed in the destination
# state by source id. ``connect_id`` does the same lookup later
# for principal rewriting.
try:
_, src_dashboard_id = policy_id.split(":", 1)
except ValueError:
return

dashboards_state = self.config.state.destination.get("dashboards", {})
dashboard = dashboards_state.get(src_dashboard_id)
if not dashboard:
# Dashboard not in destination state — the existing
# connect_id pass will surface a missing-connections error
# if applicable. Don't short-circuit that path here.
return

if not dashboard.get("is_read_only"):
return

self.config.logger.info(
f"[restriction_policies - {policy_id}] skipping: "
f"target dashboard {src_dashboard_id} is read-only on destination"
)
raise SkipResource(
policy_id,
self.resource_type,
f"Target dashboard {src_dashboard_id} is read-only on destination " "(skipped to avoid API 403).",
)

def _skip_if_would_self_demote(self, _id: str, resource: Dict) -> None:
"""Skip policies that would remove the syncing service account's own
``editor`` binding.

The Datadog API rejects such requests with 400 "users cannot
decrease their own level of access (from editor to viewer)". When
the operator has explicitly set ``--allow-self-lockout``, the
request goes through with ``?allow_self_lockout=true`` and the
API permits it — we must NOT pre-filter in that case.

Scope: this check inspects only **direct** ``user:<uuid>`` bindings.
We fire the skip only when there is at least one ``user:<X>``
editor principal AND the syncing SA's UUID is not among them —
that is the unambiguous "removed from editor" case the API
rejects. When the editor bindings are entirely ``role:`` /
``team:`` / ``org:`` (no direct user bindings at all), we cannot
infer self-demote from the payload alone and let the request
proceed — the API will accept it if the SA retains effective
access via membership.
"""
if not self.current_user_uuid:
# pre_apply_hook didn't capture the uuid — no reliable way to
# decide. Let the request go through and surface the API error
# as it does today.
return
if self.config.allow_self_lockout:
# Operator opted in to bypass the safety check.
return

sa_user_principal = f"user:{self.current_user_uuid}"
bindings = resource.get("attributes", {}).get("bindings") or []
if not bindings:
# Empty bindings clears all restrictions — the syncing user
# keeps whatever org-default access they have. Not a
# self-demote case.
return

# Three signals from the bindings:
# - sa_is_in_editor: SA appears under an editor binding (no self-demote).
# - sa_is_in_viewer: SA appears under a viewer/non-editor binding
# (explicit downgrade — self-demote).
# - has_user_editor_principal: at least one user:<X> appears under an
# editor binding (lets us decide on absence: if user editors exist
# but SA isn't one, SA is being removed → self-demote).
sa_is_in_editor = False
sa_is_in_viewer = False
has_user_editor_principal = False
for binding in bindings:
relation = binding.get("relation")
for principal in binding.get("principals") or []:
if not isinstance(principal, str):
continue
if relation == "editor":
if principal.startswith("user:"):
has_user_editor_principal = True
if principal == sa_user_principal:
sa_is_in_editor = True
elif principal == sa_user_principal:
# SA listed under a non-editor relation (typically
# viewer) — explicit downgrade.
sa_is_in_viewer = True

if sa_is_in_editor:
# SA is explicitly retained as editor — no self-demote.
return
if not sa_is_in_viewer and not has_user_editor_principal:
# No SA-specific signal and editor bindings are only
# role:/team:/org: principals. SA's effective access may come
# from membership we can't analyze. Let the API decide.
return

policy_id = resource.get("id") or _id
self.config.logger.info(
f"[restriction_policies - {policy_id}] skipping: "
f"would self-demote syncing user {self.current_user_uuid}"
)
raise SkipResource(
policy_id,
self.resource_type,
f"Policy would self-demote the syncing user {self.current_user_uuid} "
"(skipped to avoid API 400). Pass --allow-self-lockout to override.",
)

async def pre_apply_hook(self) -> None:
destination_client = self.config.destination_client
try:
resp = await destination_client.get(self.current_user_path)
org_id = resp["data"]["relationships"]["org"]["data"]["id"]
self.org_principal = f"org:{org_id}"
# Capture the syncing user's UUID for the self-demote pre-filter.
# ``data.id`` is the user UUID per /api/v2/current_user schema;
# ``data.attributes.uuid`` carries the same value. Fall back to
# the attributes copy if the top-level id is absent for any
# reason (it's been stable in the API for years, but be defensive).
self.current_user_uuid = resp["data"].get("id") or resp["data"].get("attributes", {}).get("uuid")
except Exception as e:
self.config.logger.error(f"Failed to get org details: {e}")
raise
Expand Down
Loading
Loading