From f5ad75bbed0af85995cae14dbbd76c8be23ba1c7 Mon Sep 17 00:00:00 2001 From: Arne Luenser Date: Mon, 15 Jun 2026 15:21:00 +0200 Subject: [PATCH 1/3] feat: pause/resume event streams --- cmd/cloudx/eventstreams/flags.go | 37 ++++++++++++ cmd/cloudx/eventstreams/flags_test.go | 84 +++++++++++++++++++++++++++ cmd/cloudx/eventstreams/output.go | 3 +- cmd/cloudx/eventstreams/status.go | 59 +++++++++++++++++++ cmd/cloudx/eventstreams/update.go | 6 +- cmd/cloudx/pause.go | 29 +++++++++ cmd/cloudx/resume.go | 29 +++++++++ cmd/root.go | 2 + go.mod | 2 +- go.sum | 4 +- 10 files changed, 249 insertions(+), 6 deletions(-) create mode 100644 cmd/cloudx/eventstreams/flags_test.go create mode 100644 cmd/cloudx/eventstreams/status.go create mode 100644 cmd/cloudx/pause.go create mode 100644 cmd/cloudx/resume.go diff --git a/cmd/cloudx/eventstreams/flags.go b/cmd/cloudx/eventstreams/flags.go index ca53c52e..fcd0cf2a 100644 --- a/cmd/cloudx/eventstreams/flags.go +++ b/cmd/cloudx/eventstreams/flags.go @@ -12,9 +12,28 @@ import ( "github.com/ory/client-go" ) +// Event stream statuses. A paused stream does not forward any events until it +// is set back to active. +const ( + StatusActive = "active" + StatusPaused = "paused" +) + type streamConfig client.CreateEventStreamBody func (c *streamConfig) Validate() error { + // The status flag is optional. An empty value is normalized to nil so the + // server keeps the current status (on update) or applies its default (on create). + if c.Status != nil { + switch *c.Status { + case "": + c.Status = nil + case StatusActive, StatusPaused: + default: + return fmt.Errorf(`flag --status must be one of %q or %q`, StatusActive, StatusPaused) + } + } + switch c.Type { case "": return fmt.Errorf("flag --type must be set") @@ -51,9 +70,27 @@ func (c *streamConfig) Validate() error { return nil } +// toSetBody maps the shared stream config onto the update (set) request body. +// The two bodies are no longer convertible by type assertion: SetEventStreamBody.Type +// is a pointer (optional on update) whereas CreateEventStreamBody.Type is required. +func (c streamConfig) toSetBody() client.SetEventStreamBody { + body := client.SetEventStreamBody{ + HttpsEndpoint: c.HttpsEndpoint, + RoleArn: c.RoleArn, + Status: c.Status, + TopicArn: c.TopicArn, + } + if c.Type != "" { + t := c.Type + body.Type = &t + } + return body +} + func registerStreamConfigFlags(f *pflag.FlagSet, c *streamConfig) { f.StringVar(&c.Type, "type", "", `The type of the event stream destination. Supported values are "sns" for AWS SNS topics and "https" for generic HTTPS endpoints.`) c.RoleArn = f.String("aws-iam-role-arn", "", "The ARN of the AWS IAM role to assume when publishing messages to the SNS topic.") c.TopicArn = f.String("aws-sns-topic-arn", "", "The ARN of the AWS SNS topic.") c.HttpsEndpoint = f.String("https-endpoint", "", "The URL of the HTTPS endpoint.") + c.Status = f.String("status", "", fmt.Sprintf("The status of the event stream. Supported values are %q and %q. Defaults to %q.", StatusActive, StatusPaused, StatusActive)) } diff --git a/cmd/cloudx/eventstreams/flags_test.go b/cmd/cloudx/eventstreams/flags_test.go new file mode 100644 index 00000000..d19e2a5e --- /dev/null +++ b/cmd/cloudx/eventstreams/flags_test.go @@ -0,0 +1,84 @@ +// Copyright © 2024 Ory Corp +// SPDX-License-Identifier: Apache-2.0 + +package eventstreams + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func ptr(s string) *string { return &s } + +func TestStreamConfigValidate(t *testing.T) { + t.Parallel() + + base := func() streamConfig { + return streamConfig{ + Type: "https", + HttpsEndpoint: ptr("https://example.com/webhook"), + } + } + + t.Run("accepts a valid active status", func(t *testing.T) { + c := base() + c.Status = ptr(StatusActive) + require.NoError(t, c.Validate()) + assert.Equal(t, StatusActive, *c.Status) + }) + + t.Run("accepts a valid paused status", func(t *testing.T) { + c := base() + c.Status = ptr(StatusPaused) + require.NoError(t, c.Validate()) + assert.Equal(t, StatusPaused, *c.Status) + }) + + t.Run("normalizes an empty status to nil so the server default applies", func(t *testing.T) { + c := base() + c.Status = ptr("") + require.NoError(t, c.Validate()) + assert.Nil(t, c.Status) + }) + + t.Run("rejects an unknown status", func(t *testing.T) { + c := base() + c.Status = ptr("frozen") + assert.ErrorContains(t, c.Validate(), "--status") + }) + + t.Run("status is optional when unset", func(t *testing.T) { + c := base() + require.NoError(t, c.Validate()) + assert.Nil(t, c.Status) + }) +} + +func TestStreamConfigToSetBody(t *testing.T) { + t.Parallel() + + t.Run("maps all fields including the required type as a pointer", func(t *testing.T) { + c := streamConfig{ + Type: "https", + HttpsEndpoint: ptr("https://example.com/webhook"), + Status: ptr(StatusPaused), + } + body := c.toSetBody() + require.NotNil(t, body.Type) + assert.Equal(t, "https", *body.Type) + require.NotNil(t, body.HttpsEndpoint) + assert.Equal(t, "https://example.com/webhook", *body.HttpsEndpoint) + require.NotNil(t, body.Status) + assert.Equal(t, StatusPaused, *body.Status) + }) + + t.Run("leaves type nil when unset so the current type is kept", func(t *testing.T) { + c := streamConfig{Status: ptr(StatusActive)} + body := c.toSetBody() + assert.Nil(t, body.Type) + require.NotNil(t, body.Status) + assert.Equal(t, StatusActive, *body.Status) + }) +} diff --git a/cmd/cloudx/eventstreams/output.go b/cmd/cloudx/eventstreams/output.go index ab76052e..dea3f385 100644 --- a/cmd/cloudx/eventstreams/output.go +++ b/cmd/cloudx/eventstreams/output.go @@ -15,13 +15,14 @@ type ( ) func (output) Header() []string { - return []string{"ID", "TYPE", "IAM_ROLE_ARN", "SNS_TOPIC_ARN", "HTTPS_ENDPOINT"} + return []string{"ID", "TYPE", "STATUS", "IAM_ROLE_ARN", "SNS_TOPIC_ARN", "HTTPS_ENDPOINT"} } func (o output) Columns() []string { return []string{ coalesce(o.Id), coalesce(o.Type), + coalesce(o.Status), coalesce(o.RoleArn), coalesce(o.TopicArn), coalesce(o.HttpsEndpoint.Get()), diff --git a/cmd/cloudx/eventstreams/status.go b/cmd/cloudx/eventstreams/status.go new file mode 100644 index 00000000..76b77e2d --- /dev/null +++ b/cmd/cloudx/eventstreams/status.go @@ -0,0 +1,59 @@ +// Copyright © 2024 Ory Corp +// SPDX-License-Identifier: Apache-2.0 + +package eventstreams + +import ( + "fmt" + + "github.com/spf13/cobra" + + "github.com/ory/cli/cmd/cloudx/client" + cloud "github.com/ory/client-go" + "github.com/ory/x/cmdx" +) + +func NewPauseEventStreamCmd() *cobra.Command { + return newSetStatusCmd("pause", StatusPaused, "Pause the event stream with the given ID", "A paused event stream does not forward any events until it is resumed.") +} + +func NewResumeEventStreamCmd() *cobra.Command { + return newSetStatusCmd("resume", StatusActive, "Resume the event stream with the given ID", "Resuming a paused event stream makes it forward events again.") +} + +func newSetStatusCmd(verb, status, short, long string) *cobra.Command { + cmd := &cobra.Command{ + Use: "event-stream [--project=PROJECT_ID]", + Args: cobra.ExactArgs(1), + Short: short, + Long: short + "\n\n" + long, + RunE: func(cmd *cobra.Command, args []string) error { + ctx := cmd.Context() + + h, err := client.NewCobraCommandHelper(cmd) + if err != nil { + return err + } + + projectID, err := h.ProjectID() + if err != nil { + return cmdx.PrintOpenAPIError(cmd, err) + } + streamID := args[0] + + stream, err := h.UpdateEventStream(ctx, projectID, streamID, cloud.SetEventStreamBody{Status: &status}) + if err != nil { + return cmdx.PrintOpenAPIError(cmd, err) + } + + _, _ = fmt.Fprintf(h.VerboseErrWriter, "Event stream %sd successfully!\n", verb) + cmdx.PrintRow(cmd, output(*stream)) + return nil + }, + } + + client.RegisterProjectFlag(cmd.Flags()) + client.RegisterWorkspaceFlag(cmd.Flags()) + cmdx.RegisterFormatFlags(cmd.Flags()) + return cmd +} diff --git a/cmd/cloudx/eventstreams/update.go b/cmd/cloudx/eventstreams/update.go index 6f511adc..1a17f02b 100644 --- a/cmd/cloudx/eventstreams/update.go +++ b/cmd/cloudx/eventstreams/update.go @@ -9,7 +9,6 @@ import ( "github.com/spf13/cobra" "github.com/ory/cli/cmd/cloudx/client" - cloud "github.com/ory/client-go" "github.com/ory/x/cmdx" ) @@ -37,7 +36,7 @@ func NewUpdateEventStreamCmd() *cobra.Command { if err := c.Validate(); err != nil { return err } - stream, err := h.UpdateEventStream(ctx, projectID, streamID, cloud.SetEventStreamBody(c)) + stream, err := h.UpdateEventStream(ctx, projectID, streamID, c.toSetBody()) if err != nil { return cmdx.PrintOpenAPIError(cmd, err) } @@ -49,7 +48,10 @@ func NewUpdateEventStreamCmd() *cobra.Command { } client.RegisterProjectFlag(cmd.Flags()) + client.RegisterWorkspaceFlag(cmd.Flags()) cmdx.RegisterFormatFlags(cmd.Flags()) + registerStreamConfigFlags(cmd.Flags(), &c) + return cmd } diff --git a/cmd/cloudx/pause.go b/cmd/cloudx/pause.go new file mode 100644 index 00000000..90b90b63 --- /dev/null +++ b/cmd/cloudx/pause.go @@ -0,0 +1,29 @@ +// Copyright © 2024 Ory Corp +// SPDX-License-Identifier: Apache-2.0 + +package cloudx + +import ( + "github.com/spf13/cobra" + + "github.com/ory/cli/cmd/cloudx/client" + "github.com/ory/cli/cmd/cloudx/eventstreams" + "github.com/ory/x/cmdx" +) + +func NewPauseCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "pause", + Short: "Pause Ory Network resources", + } + + cmd.AddCommand( + eventstreams.NewPauseEventStreamCmd(), + ) + + client.RegisterConfigFlag(cmd.PersistentFlags()) + client.RegisterYesFlag(cmd.PersistentFlags()) + cmdx.RegisterNoiseFlags(cmd.PersistentFlags()) + cmdx.RegisterJSONFormatFlags(cmd.PersistentFlags()) + return cmd +} diff --git a/cmd/cloudx/resume.go b/cmd/cloudx/resume.go new file mode 100644 index 00000000..e4e4513b --- /dev/null +++ b/cmd/cloudx/resume.go @@ -0,0 +1,29 @@ +// Copyright © 2024 Ory Corp +// SPDX-License-Identifier: Apache-2.0 + +package cloudx + +import ( + "github.com/spf13/cobra" + + "github.com/ory/cli/cmd/cloudx/client" + "github.com/ory/cli/cmd/cloudx/eventstreams" + "github.com/ory/x/cmdx" +) + +func NewResumeCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "resume", + Short: "Resume Ory Network resources", + } + + cmd.AddCommand( + eventstreams.NewResumeEventStreamCmd(), + ) + + client.RegisterConfigFlag(cmd.PersistentFlags()) + client.RegisterYesFlag(cmd.PersistentFlags()) + cmdx.RegisterNoiseFlags(cmd.PersistentFlags()) + cmdx.RegisterJSONFormatFlags(cmd.PersistentFlags()) + return cmd +} diff --git a/cmd/root.go b/cmd/root.go index d62302a5..336b44df 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -38,9 +38,11 @@ func NewRootCmd() *cobra.Command { cloudx.NewOpenCmd(), cloudx.NewPatchCmd(), cloudx.NewParseCmd(), + cloudx.NewPauseCmd(), cloudx.NewPerformCmd(), proxy.NewProxyCommand(), proxy.NewTunnelCommand(), + cloudx.NewResumeCmd(), cloudx.NewUpdateCmd(), cloudx.NewValidateCmd(), cloudx.NewRevokeCmd(), diff --git a/go.mod b/go.mod index 735c68aa..d3b21ff6 100644 --- a/go.mod +++ b/go.mod @@ -15,7 +15,7 @@ require ( github.com/gofrs/uuid v4.4.0+incompatible github.com/gomarkdown/markdown v0.0.0-20260417124207-7d523f7318df github.com/hashicorp/go-retryablehttp v0.7.8 - github.com/ory/client-go v1.22.41 + github.com/ory/client-go v1.22.51 github.com/ory/gochimp3 v0.0.0-20200417124117-ccd242db3655 github.com/ory/graceful v0.2.0 github.com/ory/herodot v0.10.9-0.20260330111132-da75ef0fbc22 diff --git a/go.sum b/go.sum index 4e3cad8a..57b0256f 100644 --- a/go.sum +++ b/go.sum @@ -554,8 +554,8 @@ github.com/openzipkin/zipkin-go v0.4.3/go.mod h1:M9wCJZFWCo2RiY+o1eBCEMe0Dp2S5LD github.com/orisano/pixelmatch v0.0.0-20220722002657-fb0b55479cde/go.mod h1:nZgzbfBr3hhjoZnS66nKrHmduYNpc34ny7RK4z5/HM0= github.com/ory/analytics-go/v5 v5.0.1 h1:LX8T5B9FN8KZXOtxgN+R3I4THRRVB6+28IKgKBpXmAM= github.com/ory/analytics-go/v5 v5.0.1/go.mod h1:lWCiCjAaJkKfgR/BN5DCLMol8BjKS1x+4jxBxff/FF0= -github.com/ory/client-go v1.22.41 h1:AywohwpZUMDVwnPaAoeZWseapSEF58GgPn1RPUvTaqQ= -github.com/ory/client-go v1.22.41/go.mod h1:G1f+5+m/PJVvl40bsRn0QuyVIcXe7EHiWeM7iWpIDjw= +github.com/ory/client-go v1.22.51 h1:T5tmhDvomkPTZeHQgfDcqjRnSvV1wkkG2xYQ/r6TQdk= +github.com/ory/client-go v1.22.51/go.mod h1:G1f+5+m/PJVvl40bsRn0QuyVIcXe7EHiWeM7iWpIDjw= github.com/ory/dockertest/v4 v4.0.0 h1:i19aFsO/VXE0VrMk4ifnKW4G/KIJ93PCjLOslxXoPME= github.com/ory/dockertest/v4 v4.0.0/go.mod h1:b5Ofu8VIxWNhXFvQcLu17pRNQdoUBKtXBW74G4Ygzx8= github.com/ory/go-acc v0.2.9-0.20230103102148-6b1c9a70dbbe h1:rvu4obdvqR0fkSIJ8IfgzKOWwZ5kOT2UNfLq81Qk7rc= From 96fa216d284980283c354da1390c8147f4846b08 Mon Sep 17 00:00:00 2001 From: Arne Luenser Date: Mon, 15 Jun 2026 15:41:07 +0200 Subject: [PATCH 2/3] fix: return ProjectID error directly in pause/resume commands ProjectID() returns a plain ErrProjectNotSet, not an OpenAPI error, so wrapping it with cmdx.PrintOpenAPIError was a no-op and semantically misleading. Match the pattern used in create.go and update.go. Co-Authored-By: Claude Opus 4.8 (1M context) --- cmd/cloudx/eventstreams/status.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/cloudx/eventstreams/status.go b/cmd/cloudx/eventstreams/status.go index 76b77e2d..65509e6e 100644 --- a/cmd/cloudx/eventstreams/status.go +++ b/cmd/cloudx/eventstreams/status.go @@ -37,7 +37,7 @@ func newSetStatusCmd(verb, status, short, long string) *cobra.Command { projectID, err := h.ProjectID() if err != nil { - return cmdx.PrintOpenAPIError(cmd, err) + return err } streamID := args[0] From 3d39795a84ce45e04e71fa8cb1824cb0ac6785ef Mon Sep 17 00:00:00 2001 From: Arne Luenser Date: Mon, 15 Jun 2026 17:40:56 +0200 Subject: [PATCH 3/3] docs: clarify --status flag is a partial update on event-stream update The --status help text said it defaults to "active", which is accurate for create but misleading for update: omitting --status does NOT reset a paused stream to active. The behavior is already a partial update (an unset status is normalized to nil and omitted from the request), so only the wording needed fixing. Reword to state that an unset status keeps an existing stream's current status. Co-Authored-By: Claude Opus 4.8 (1M context) --- cmd/cloudx/eventstreams/flags.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/cloudx/eventstreams/flags.go b/cmd/cloudx/eventstreams/flags.go index fcd0cf2a..54bc8ddb 100644 --- a/cmd/cloudx/eventstreams/flags.go +++ b/cmd/cloudx/eventstreams/flags.go @@ -92,5 +92,5 @@ func registerStreamConfigFlags(f *pflag.FlagSet, c *streamConfig) { c.RoleArn = f.String("aws-iam-role-arn", "", "The ARN of the AWS IAM role to assume when publishing messages to the SNS topic.") c.TopicArn = f.String("aws-sns-topic-arn", "", "The ARN of the AWS SNS topic.") c.HttpsEndpoint = f.String("https-endpoint", "", "The URL of the HTTPS endpoint.") - c.Status = f.String("status", "", fmt.Sprintf("The status of the event stream. Supported values are %q and %q. Defaults to %q.", StatusActive, StatusPaused, StatusActive)) + c.Status = f.String("status", "", fmt.Sprintf("The status of the event stream, either %q or %q. When unset, a new stream defaults to %q and an existing stream keeps its current status.", StatusActive, StatusPaused, StatusActive)) }