From 872368456bfd72cdb45dc8c82f1ab16392d6a1f8 Mon Sep 17 00:00:00 2001 From: Phillip Jones Date: Wed, 18 Feb 2026 14:38:39 -0800 Subject: [PATCH] Improve pipelines setup command with validation retry loops (#12401) --- .changeset/fancy-gifts-shave.md | 14 + .../src/__tests__/pipelines-setup.test.ts | 1444 +++++++++++++++++ .../wrangler/src/__tests__/pipelines.test.ts | 7 +- packages/wrangler/src/pipelines/cli/setup.ts | 921 ++++++++--- .../src/pipelines/cli/streams/utils.ts | 32 +- packages/wrangler/src/pipelines/index.ts | 56 +- 6 files changed, 2191 insertions(+), 283 deletions(-) create mode 100644 .changeset/fancy-gifts-shave.md create mode 100644 packages/wrangler/src/__tests__/pipelines-setup.test.ts diff --git a/.changeset/fancy-gifts-shave.md b/.changeset/fancy-gifts-shave.md new file mode 100644 index 000000000000..6b2973fbc866 --- /dev/null +++ b/.changeset/fancy-gifts-shave.md @@ -0,0 +1,14 @@ +--- +"wrangler": minor +--- + +Add validation retry loops to pipelines setup command + +The `wrangler pipelines setup` command now prompts users to retry when validation errors occur, instead of failing the entire setup process. This includes: + +- Validation retry prompts for pipeline names, bucket names, and field names +- A "simple" mode for sink configuration that uses sensible defaults +- Automatic bucket creation when buckets don't exist +- Automatic Data Catalog enablement when not already active + +This improves the setup experience by allowing users to correct mistakes without restarting the entire configuration flow. diff --git a/packages/wrangler/src/__tests__/pipelines-setup.test.ts b/packages/wrangler/src/__tests__/pipelines-setup.test.ts new file mode 100644 index 000000000000..7db19d85eb4a --- /dev/null +++ b/packages/wrangler/src/__tests__/pipelines-setup.test.ts @@ -0,0 +1,1444 @@ +import { writeFileSync } from "node:fs"; +import { http, HttpResponse } from "msw"; +/* eslint-disable workers-sdk/no-vitest-import-expect -- MSW handlers use expect at module scope */ +import { afterEach, beforeAll, describe, expect, it, vi } from "vitest"; +/* eslint-enable workers-sdk/no-vitest-import-expect */ +import { + __testSkipCredentialValidation, + __testSkipDelays, +} from "../pipelines/index"; +import { mockAccountId, mockApiToken } from "./helpers/mock-account-id"; +import { mockConsoleMethods } from "./helpers/mock-console"; +import { + clearDialogs, + mockConfirm, + mockPrompt, + mockSelect, +} from "./helpers/mock-dialogs"; +import { useMockIsTTY } from "./helpers/mock-istty"; +import { createFetchResult, msw } from "./helpers/msw"; +import { runInTempDir } from "./helpers/run-in-tmp"; +import { runWrangler } from "./helpers/run-wrangler"; +import type { Sink, Stream } from "../pipelines/types"; + +describe("wrangler pipelines setup", () => { + const std = mockConsoleMethods(); + mockAccountId(); + mockApiToken(); + runInTempDir(); + const { setIsTTY } = useMockIsTTY(); + + const accountId = "some-account-id"; + + beforeAll(() => { + __testSkipDelays(); + __testSkipCredentialValidation(); + }); + + function mockGetR2Bucket(bucketName: string, exists: boolean) { + msw.use( + http.get( + `*/accounts/${accountId}/r2/buckets/${bucketName}`, + () => { + if (exists) { + return HttpResponse.json( + createFetchResult({ name: bucketName, location: "WNAM" }) + ); + } + return HttpResponse.json( + createFetchResult(null, false, [ + { code: 10006, message: "bucket not found" }, + ]), + { status: 404 } + ); + }, + { once: true } + ) + ); + } + + function mockCreateStreamRequest( + expectedName: string, + options: { fail?: boolean } = {} + ) { + const requests = { count: 0 }; + msw.use( + http.post( + `*/accounts/${accountId}/pipelines/v1/streams`, + async ({ request }) => { + requests.count++; + const body = (await request.json()) as { name: string }; + expect(body.name).toBe(expectedName); + + if (options.fail) { + return HttpResponse.json( + { + success: false, + errors: [{ code: 1000, message: "Stream creation failed" }], + messages: [], + result: null, + }, + { status: 400 } + ); + } + + return HttpResponse.json({ + success: true, + errors: [], + messages: [], + result: { + id: "stream_123", + name: body.name, + version: 1, + endpoint: `https://pipelines.cloudflare.com/${body.name}`, + format: { type: "json", unstructured: true }, + schema: null, + http: { enabled: true, authentication: false }, + worker_binding: { enabled: true }, + created_at: "2024-01-01T00:00:00Z", + modified_at: "2024-01-01T00:00:00Z", + } as Stream, + }); + }, + { once: true } + ) + ); + return requests; + } + + function mockCreateSinkRequest( + expectedName: string, + options: { fail?: boolean } = {} + ) { + const requests = { count: 0 }; + msw.use( + http.post( + `*/accounts/${accountId}/pipelines/v1/sinks`, + async ({ request }) => { + requests.count++; + const body = (await request.json()) as { name: string }; + expect(body.name).toBe(expectedName); + + if (options.fail) { + return HttpResponse.json( + { + success: false, + errors: [{ code: 1000, message: "Sink creation failed" }], + messages: [], + result: null, + }, + { status: 400 } + ); + } + + return HttpResponse.json({ + success: true, + errors: [], + messages: [], + result: { + id: "sink_123", + name: body.name, + type: "r2", + format: { type: "json" }, + schema: null, + config: { bucket: "test-bucket" }, + created_at: "2024-01-01T00:00:00Z", + modified_at: "2024-01-01T00:00:00Z", + } as Sink, + }); + }, + { once: true } + ) + ); + return requests; + } + + function mockDeleteStream(streamId: string) { + const requests = { count: 0 }; + msw.use( + http.delete( + `*/accounts/${accountId}/pipelines/v1/streams/${streamId}`, + () => { + requests.count++; + return HttpResponse.json({ + success: true, + errors: [], + messages: [], + result: null, + }); + }, + { once: true } + ) + ); + return requests; + } + + function mockValidateSql(sql: string, options: { fail?: boolean } = {}) { + const requests = { count: 0 }; + msw.use( + http.post( + `*/accounts/${accountId}/pipelines/v1/validate_sql`, + async ({ request }) => { + requests.count++; + const body = (await request.json()) as { sql: string }; + expect(body.sql).toBe(sql); + + if (options.fail) { + return HttpResponse.json( + { + success: false, + errors: [{ code: 1000, message: "SQL validation failed" }], + messages: [], + result: null, + }, + { status: 400 } + ); + } + + return HttpResponse.json({ + success: true, + errors: [], + messages: [], + result: { + tables: { + test_pipeline_stream: { type: "stream" }, + test_pipeline_sink: { type: "sink" }, + }, + }, + }); + }, + { once: true } + ) + ); + return requests; + } + + function mockCreatePipeline( + expectedName: string, + options: { fail?: boolean } = {} + ) { + const requests = { count: 0 }; + msw.use( + http.post( + `*/accounts/${accountId}/pipelines/v1/pipelines`, + async ({ request }) => { + requests.count++; + const body = (await request.json()) as { name: string }; + expect(body.name).toBe(expectedName); + + if (options.fail) { + return HttpResponse.json( + { + success: false, + errors: [{ code: 1000, message: "Pipeline creation failed" }], + messages: [], + result: null, + }, + { status: 400 } + ); + } + + return HttpResponse.json({ + success: true, + errors: [], + messages: [], + result: { + id: "pipeline_123", + name: body.name, + sql: "INSERT INTO test_pipeline_sink SELECT * FROM test_pipeline_stream;", + status: "active", + created_at: "2024-01-01T00:00:00Z", + modified_at: "2024-01-01T00:00:00Z", + }, + }); + }, + { once: true } + ) + ); + return requests; + } + + type DestinationType = "r2" | "r2_data_catalog"; + type SetupMode = "simple" | "advanced"; + + function mockSinkDialogs(destination: DestinationType, mode: SetupMode) { + mockConfirm({ + text: "Enable HTTP endpoint for sending data?", + result: true, + }); + mockConfirm({ + text: "Require authentication for HTTP endpoint?", + result: false, + }); + mockConfirm({ + text: "Configure custom CORS origins?", + result: false, + }); + mockSelect({ + text: "How would you like to define the schema for incoming events?", + result: "skip", + }); + mockSelect({ + text: "Destination type:", + result: destination, + }); + mockSelect({ + text: "Setup mode:", + result: mode, + }); + } + + function mockAdvancedR2SinkPrompts() { + mockPrompt({ + text: "The base prefix in your bucket where data will be written (optional):", + result: "", + }); + mockPrompt({ + text: "Time partition pattern (optional):", + result: "year=%Y/month=%m/day=%d", + }); + mockSelect({ + text: "Output format:", + result: "json", + }); + mockPrompt({ + text: "Roll file when size reaches (MB, minimum 5):", + result: "100", + }); + mockPrompt({ + text: "Roll file when time reaches (seconds, minimum 10):", + result: "300", + }); + mockConfirm({ + text: "Automatically generate credentials needed to write to your R2 bucket?", + result: false, + }); + mockPrompt({ + text: "R2 Access Key ID:", + result: "test-access-key", + }); + mockPrompt({ + text: "R2 Secret Access Key:", + result: "test-secret-key", + }); + } + + function mockCancelAtBucketName() { + mockPrompt({ + text: "R2 bucket name (will be created if it doesn't exist):", + result: "", + }); + mockConfirm({ + text: "Would you like to try again?", + result: false, + }); + } + + function mockBasicStreamConfig() { + mockConfirm({ + text: "Enable HTTP endpoint for sending data?", + result: false, + }); + mockSelect({ + text: "How would you like to define the schema for incoming events?", + result: "skip", + }); + mockSelect({ + text: "Destination type:", + result: "r2", + }); + mockSelect({ + text: "Setup mode:", + result: "simple", + }); + } + + function mockGetR2Catalog( + bucketName: string, + options: { exists?: boolean; active?: boolean } = {} + ) { + const { exists = true, active = true } = options; + msw.use( + http.get( + `*/accounts/${accountId}/r2-catalog/${bucketName}`, + () => { + if (!exists) { + return HttpResponse.json( + createFetchResult(null, false, [ + { code: 10006, message: "catalog not found" }, + ]), + { status: 404 } + ); + } + return HttpResponse.json( + createFetchResult({ + id: "catalog_123", + name: bucketName, + bucket: bucketName, + status: active ? "active" : "inactive", + }) + ); + }, + { once: true } + ) + ); + } + + function mockEnableR2Catalog(bucketName: string) { + msw.use( + http.post( + `*/accounts/${accountId}/r2-catalog/${bucketName}/enable`, + () => { + return HttpResponse.json( + createFetchResult({ + id: "catalog_123", + name: bucketName, + }) + ); + }, + { once: true } + ) + ); + } + + function mockUpsertR2CatalogCredential( + bucketName: string, + options: { fail?: boolean } = {} + ) { + msw.use( + http.post( + `*/accounts/${accountId}/r2-catalog/${bucketName}/credential`, + () => { + if (options.fail) { + return HttpResponse.json( + createFetchResult(null, false, [ + { code: 1000, message: "Invalid token" }, + ]), + { status: 400 } + ); + } + return HttpResponse.json(createFetchResult({ success: true })); + }, + { once: true } + ) + ); + } + + function mockCreateR2Bucket(bucketName: string) { + msw.use( + http.post( + `*/accounts/${accountId}/r2/buckets`, + async ({ request }) => { + const body = (await request.json()) as { name: string }; + expect(body.name).toBe(bucketName); + return HttpResponse.json(createFetchResult({})); + }, + { once: true } + ) + ); + } + + afterEach(() => { + clearDialogs(); + vi.restoreAllMocks(); + }); + + describe("pipeline name validation", () => { + it("validates pipeline name provided via --name flag - rejects hyphens", async () => { + setIsTTY(true); + + await expect( + runWrangler('pipelines setup --name "invalid-name!"') + ).rejects.toThrowErrorMatchingInlineSnapshot( + `[Error: pipeline name must contain only letters, numbers, and underscores]` + ); + }); + + it("accepts valid pipeline name with underscores and proceeds to stream config", async () => { + setIsTTY(true); + + mockBasicStreamConfig(); + mockCancelAtBucketName(); + + await expect( + runWrangler('pipelines setup --name "valid_pipeline_name"') + ).rejects.toThrowErrorMatchingInlineSnapshot( + `[Error: R2 bucket name - cancelled]` + ); + + expect(std.out).toContain("Cloudflare Pipelines Setup"); + expect(std.out).toContain("STREAM"); + }); + + it("falls back to interactive prompt when --name is empty string", async () => { + setIsTTY(true); + + mockPrompt({ + text: "What would you like to name your pipeline?", + result: "", + }); + mockConfirm({ + text: "Would you like to try again?", + result: false, + }); + + await expect( + runWrangler('pipelines setup --name ""') + ).rejects.toThrowErrorMatchingInlineSnapshot( + `[Error: Pipeline name - cancelled]` + ); + }); + }); + + describe("interactive validation retry", () => { + it("shows retry prompt when invalid pipeline name entered interactively", async () => { + setIsTTY(true); + + mockPrompt({ + text: "What would you like to name your pipeline?", + result: "invalid-name!", + }); + mockConfirm({ + text: "Would you like to try again?", + result: false, + }); + + await expect( + runWrangler("pipelines setup") + ).rejects.toThrowErrorMatchingInlineSnapshot( + `[Error: Pipeline name - cancelled]` + ); + }); + + it("allows retry and accepts valid name on second attempt", async () => { + setIsTTY(true); + + mockPrompt({ + text: "What would you like to name your pipeline?", + result: "invalid-name!", + }); + mockConfirm({ + text: "Would you like to try again?", + result: true, + }); + mockPrompt({ + text: "What would you like to name your pipeline?", + result: "valid_name", + }); + mockSinkDialogs("r2", "simple"); + mockCancelAtBucketName(); + + await expect( + runWrangler("pipelines setup") + ).rejects.toThrowErrorMatchingInlineSnapshot( + `[Error: R2 bucket name - cancelled]` + ); + + expect(std.out).toContain("STREAM"); + expect(std.out).toContain("SINK"); + }); + + it("allows multiple retries before succeeding", async () => { + setIsTTY(true); + + mockPrompt({ + text: "What would you like to name your pipeline?", + result: "bad-name", + }); + mockConfirm({ + text: "Would you like to try again?", + result: true, + }); + mockPrompt({ + text: "What would you like to name your pipeline?", + result: "still bad!", + }); + mockConfirm({ + text: "Would you like to try again?", + result: true, + }); + mockPrompt({ + text: "What would you like to name your pipeline?", + result: "finally_valid", + }); + mockBasicStreamConfig(); + mockCancelAtBucketName(); + + await expect( + runWrangler("pipelines setup") + ).rejects.toThrowErrorMatchingInlineSnapshot( + `[Error: R2 bucket name - cancelled]` + ); + + expect(std.err).toContain( + "pipeline name must contain only letters, numbers, and underscores" + ); + }); + }); + + describe("bucket name validation", () => { + it("rejects bucket names with underscores", async () => { + setIsTTY(true); + + mockSinkDialogs("r2", "simple"); + mockPrompt({ + text: "R2 bucket name (will be created if it doesn't exist):", + result: "invalid_bucket_name", + }); + mockConfirm({ + text: "Would you like to try again?", + result: false, + }); + + await expect( + runWrangler('pipelines setup --name "test_pipeline"') + ).rejects.toThrowErrorMatchingInlineSnapshot( + `[Error: R2 bucket name - cancelled]` + ); + + expect(std.err).toContain('The bucket name "invalid_bucket_name"'); + }); + + it("rejects bucket names that are too short", async () => { + setIsTTY(true); + + mockBasicStreamConfig(); + mockPrompt({ + text: "R2 bucket name (will be created if it doesn't exist):", + result: "ab", + }); + mockConfirm({ + text: "Would you like to try again?", + result: false, + }); + + await expect( + runWrangler('pipelines setup --name "test_pipeline"') + ).rejects.toThrowErrorMatchingInlineSnapshot( + `[Error: R2 bucket name - cancelled]` + ); + + expect(std.err).toContain('The bucket name "ab"'); + }); + + it("allows retry with valid bucket name after invalid input", async () => { + setIsTTY(true); + + mockSinkDialogs("r2", "advanced"); + mockPrompt({ + text: "R2 bucket name (will be created if it doesn't exist):", + result: "invalid_bucket", + }); + mockConfirm({ + text: "Would you like to try again?", + result: true, + }); + mockPrompt({ + text: "R2 bucket name (will be created if it doesn't exist):", + result: "UPPERCASE", + }); + mockConfirm({ + text: "Would you like to try again?", + result: true, + }); + mockPrompt({ + text: "R2 bucket name (will be created if it doesn't exist):", + result: "valid-bucket-name", + }); + mockGetR2Bucket("valid-bucket-name", true); + mockAdvancedR2SinkPrompts(); + + mockConfirm({ + text: "Create resources?", + result: false, + }); + + await expect( + runWrangler('pipelines setup --name "test_pipeline"') + ).rejects.toThrowErrorMatchingInlineSnapshot(`[Error: Setup cancelled]`); + + expect(std.out).toMatchInlineSnapshot(` + " + ⛅️ wrangler x.x.x + ────────────────── + Cloudflare Pipelines Setup + + + STREAM + + + SINK + + Using existing bucket "valid-bucket-name" + + To create R2 API credentials: + Visit https://dash.cloudflare.com/some-account-id/r2/api-tokens/create?type=account + Create token with "Object Read & Write" permissions + + done + + SUMMARY + + Stream test_pipeline_stream + HTTP enabled, unstructured + + Sink test_pipeline_sink + R2 → valid-bucket-name, json + + " + `); + expect(std.err).toMatchInlineSnapshot(` + "X [ERROR] The bucket name "invalid_bucket" is invalid. Bucket names must begin and end with an alphanumeric character, only contain lowercase letters, numbers, and hyphens, and be between 3 and 63 characters long. + + + X [ERROR] The bucket name "UPPERCASE" is invalid. Bucket names must begin and end with an alphanumeric character, only contain lowercase letters, numbers, and hyphens, and be between 3 and 63 characters long. + + + X [ERROR] Setup cancelled + + " + `); + expect(std.err).toMatchInlineSnapshot(` + "X [ERROR] The bucket name "invalid_bucket" is invalid. Bucket names must begin and end with an alphanumeric character, only contain lowercase letters, numbers, and hyphens, and be between 3 and 63 characters long. + + + X [ERROR] The bucket name "UPPERCASE" is invalid. Bucket names must begin and end with an alphanumeric character, only contain lowercase letters, numbers, and hyphens, and be between 3 and 63 characters long. + + + X [ERROR] Setup cancelled + + " + `); + }); + }); + + describe("stream configuration", () => { + it("proceeds through schema selection options with HTTP auth enabled", async () => { + setIsTTY(true); + + mockConfirm({ + text: "Enable HTTP endpoint for sending data?", + result: true, + }); + mockConfirm({ + text: "Require authentication for HTTP endpoint?", + result: true, + }); + mockConfirm({ + text: "Configure custom CORS origins?", + result: false, + }); + mockSelect({ + text: "How would you like to define the schema for incoming events?", + result: "skip", + }); + mockSelect({ + text: "Destination type:", + result: "r2", + }); + mockSelect({ + text: "Setup mode:", + result: "simple", + }); + mockCancelAtBucketName(); + + await expect( + runWrangler('pipelines setup --name "test_pipeline"') + ).rejects.toThrowErrorMatchingInlineSnapshot( + `[Error: R2 bucket name - cancelled]` + ); + + expect(std.out).toContain("STREAM"); + expect(std.out).toContain("SINK"); + }); + }); + + describe("schema loading from file", () => { + it("loads schema from JSON file", async () => { + setIsTTY(true); + + const schema = { + fields: [ + { name: "user_id", type: "string", required: true }, + { name: "event", type: "string", required: true }, + ], + }; + writeFileSync("schema.json", JSON.stringify(schema)); + + mockConfirm({ + text: "Enable HTTP endpoint for sending data?", + result: false, + }); + mockSelect({ + text: "How would you like to define the schema for incoming events?", + result: "file", + }); + mockPrompt({ + text: "Schema file path:", + result: "schema.json", + }); + mockSelect({ + text: "Destination type:", + result: "r2", + }); + mockSelect({ + text: "Setup mode:", + result: "simple", + }); + mockCancelAtBucketName(); + + await expect( + runWrangler('pipelines setup --name "test_pipeline"') + ).rejects.toThrowErrorMatchingInlineSnapshot( + `[Error: R2 bucket name - cancelled]` + ); + + expect(std.out).toContain("SINK"); + }); + + it("retries when schema file not found", async () => { + setIsTTY(true); + + const schema = { + fields: [{ name: "id", type: "string", required: true }], + }; + writeFileSync("valid-schema.json", JSON.stringify(schema)); + + mockConfirm({ + text: "Enable HTTP endpoint for sending data?", + result: false, + }); + mockSelect({ + text: "How would you like to define the schema for incoming events?", + result: "file", + }); + mockPrompt({ + text: "Schema file path:", + result: "nonexistent.json", + }); + mockConfirm({ + text: "Would you like to try again?", + result: true, + }); + mockPrompt({ + text: "Schema file path:", + result: "valid-schema.json", + }); + mockSelect({ + text: "Destination type:", + result: "r2", + }); + mockSelect({ + text: "Setup mode:", + result: "simple", + }); + mockCancelAtBucketName(); + + await expect( + runWrangler('pipelines setup --name "test_pipeline"') + ).rejects.toThrowErrorMatchingInlineSnapshot( + `[Error: R2 bucket name - cancelled]` + ); + + expect(std.err).toContain("Failed to read schema file"); + expect(std.out).toContain("SINK"); + }); + }); + + describe("sink creation retry and cleanup", () => { + it("cleans up stream when user cancels after sink failure", async () => { + setIsTTY(true); + + mockSinkDialogs("r2", "advanced"); + + mockPrompt({ + text: "R2 bucket name (will be created if it doesn't exist):", + result: "test-bucket", + }); + mockGetR2Bucket("test-bucket", true); + mockAdvancedR2SinkPrompts(); + + mockConfirm({ + text: "Create resources?", + result: true, + }); + + mockCreateStreamRequest("test_pipeline_stream"); + mockCreateSinkRequest("test_pipeline_sink", { fail: true }); + + mockConfirm({ + text: " Retry? (stream was created successfully)", + result: false, + }); + + const deleteReq = mockDeleteStream("stream_123"); + + await expect( + runWrangler('pipelines setup --name "test_pipeline"') + ).rejects.toThrowErrorMatchingInlineSnapshot( + `[Error: Sink creation cancelled]` + ); + + expect(deleteReq.count).toBe(1); + expect(std.out).toMatchInlineSnapshot(` + " + ⛅️ wrangler x.x.x + ────────────────── + Cloudflare Pipelines Setup + + + STREAM + + + SINK + + Using existing bucket "test-bucket" + + To create R2 API credentials: + Visit https://dash.cloudflare.com/some-account-id/r2/api-tokens/create?type=account + Create token with "Object Read & Write" permissions + + done + + SUMMARY + + Stream test_pipeline_stream + HTTP enabled, unstructured + + Sink test_pipeline_sink + R2 → test-bucket, json + + done + failed + Sink creation failed [code: 1000] + done + " + `); + expect(std.err).toMatchInlineSnapshot(` + "X [ERROR] Sink creation cancelled + + " + `); + }); + }); + + describe("pipeline creation failure", () => { + it("exits gracefully when user declines retry after pipeline failure", async () => { + setIsTTY(true); + + mockSinkDialogs("r2", "advanced"); + + mockPrompt({ + text: "R2 bucket name (will be created if it doesn't exist):", + result: "test-bucket", + }); + mockGetR2Bucket("test-bucket", true); + mockAdvancedR2SinkPrompts(); + + mockConfirm({ + text: "Create resources?", + result: true, + }); + + mockCreateStreamRequest("test_pipeline_stream"); + mockCreateSinkRequest("test_pipeline_sink"); + + mockSelect({ + text: "Query:", + result: "simple", + }); + + const sql = + "INSERT INTO test_pipeline_sink SELECT * FROM test_pipeline_stream;"; + mockValidateSql(sql); + + // Pipeline creation fails + mockCreatePipeline("test_pipeline", { fail: true }); + + // Decline retry - should exit gracefully without throwing + mockConfirm({ + text: " Try again with different SQL?", + result: false, + }); + + // Should complete without error, just showing guidance + await runWrangler('pipelines setup --name "test_pipeline"'); + + expect(std.out).toMatchInlineSnapshot(` + " + ⛅️ wrangler x.x.x + ────────────────── + Cloudflare Pipelines Setup + + + STREAM + + + SINK + + Using existing bucket "test-bucket" + + To create R2 API credentials: + Visit https://dash.cloudflare.com/some-account-id/r2/api-tokens/create?type=account + Create token with "Object Read & Write" permissions + + done + + SUMMARY + + Stream test_pipeline_stream + HTTP enabled, unstructured + + Sink test_pipeline_sink + R2 → test-bucket, json + + done + done + + SQL + + Available tables: + test_pipeline_stream (source) + test_pipeline_sink (sink) + + + INSERT INTO test_pipeline_sink SELECT * FROM test_pipeline_stream; + + done + failed + Pipeline creation failed [code: 1000] + + Stream and sink were created, but pipeline creation failed. + + You can create the pipeline later with: wrangler pipelines create + Your stream "test_pipeline_stream" and sink "test_pipeline_sink" are ready." + `); + expect(std.err).toMatchInlineSnapshot(`""`); + }); + }); + + describe("rolling policy validation", () => { + it("validates file size minimum", async () => { + setIsTTY(true); + + mockSinkDialogs("r2", "advanced"); + + mockPrompt({ + text: "R2 bucket name (will be created if it doesn't exist):", + result: "test-bucket", + }); + mockGetR2Bucket("test-bucket", true); + + mockPrompt({ + text: "The base prefix in your bucket where data will be written (optional):", + result: "", + }); + mockPrompt({ + text: "Time partition pattern (optional):", + result: "year=%Y/month=%m/day=%d", + }); + mockSelect({ + text: "Output format:", + result: "json", + }); + mockPrompt({ + text: "Roll file when size reaches (MB, minimum 5):", + result: "2", + }); + mockConfirm({ + text: "Would you like to try again?", + result: false, + }); + + await expect( + runWrangler('pipelines setup --name "test_pipeline"') + ).rejects.toThrowErrorMatchingInlineSnapshot( + `[Error: File size - cancelled]` + ); + + expect(std.err).toContain("File size must be a number >= 5"); + }); + + it("validates interval minimum", async () => { + setIsTTY(true); + + mockSinkDialogs("r2", "advanced"); + + mockPrompt({ + text: "R2 bucket name (will be created if it doesn't exist):", + result: "test-bucket", + }); + mockGetR2Bucket("test-bucket", true); + + mockPrompt({ + text: "The base prefix in your bucket where data will be written (optional):", + result: "", + }); + mockPrompt({ + text: "Time partition pattern (optional):", + result: "year=%Y/month=%m/day=%d", + }); + mockSelect({ + text: "Output format:", + result: "json", + }); + mockPrompt({ + text: "Roll file when size reaches (MB, minimum 5):", + result: "100", + }); + mockPrompt({ + text: "Roll file when time reaches (seconds, minimum 10):", + result: "5", + }); + mockConfirm({ + text: "Would you like to try again?", + result: false, + }); + + await expect( + runWrangler('pipelines setup --name "test_pipeline"') + ).rejects.toThrowErrorMatchingInlineSnapshot( + `[Error: Interval - cancelled]` + ); + + expect(std.err).toContain("Interval must be a number >= 10"); + }); + }); + + describe("Data Catalog sink configuration", () => { + it("enables catalog when not already enabled", async () => { + setIsTTY(true); + + mockSinkDialogs("r2_data_catalog", "simple"); + + mockPrompt({ + text: "R2 bucket name (will be created if it doesn't exist):", + result: "test-bucket", + }); + mockGetR2Bucket("test-bucket", true); + mockGetR2Catalog("test-bucket", { exists: false }); + mockEnableR2Catalog("test-bucket"); + + mockPrompt({ + text: "Table name (e.g. events, user_activity):", + result: "", + }); + mockConfirm({ + text: "Would you like to try again?", + result: false, + }); + + await expect( + runWrangler('pipelines setup --name "test_pipeline"') + ).rejects.toThrowErrorMatchingInlineSnapshot( + `[Error: Table name - cancelled]` + ); + + expect(std.out).toContain("done"); + }); + + it("shows already enabled message when catalog is active", async () => { + setIsTTY(true); + + mockSinkDialogs("r2_data_catalog", "simple"); + + mockPrompt({ + text: "R2 bucket name (will be created if it doesn't exist):", + result: "test-bucket", + }); + mockGetR2Bucket("test-bucket", true); + mockGetR2Catalog("test-bucket", { exists: true, active: true }); + + mockPrompt({ + text: "Table name (e.g. events, user_activity):", + result: "", + }); + mockConfirm({ + text: "Would you like to try again?", + result: false, + }); + + await expect( + runWrangler('pipelines setup --name "test_pipeline"') + ).rejects.toThrowErrorMatchingInlineSnapshot( + `[Error: Table name - cancelled]` + ); + + expect(std.out).toContain("Data Catalog already enabled"); + }); + + it("creates bucket when it does not exist", async () => { + setIsTTY(true); + + mockSinkDialogs("r2_data_catalog", "simple"); + + mockPrompt({ + text: "R2 bucket name (will be created if it doesn't exist):", + result: "new-bucket", + }); + mockGetR2Bucket("new-bucket", false); + mockCreateR2Bucket("new-bucket"); + mockGetR2Catalog("new-bucket", { exists: false }); + mockEnableR2Catalog("new-bucket"); + + mockPrompt({ + text: "Table name (e.g. events, user_activity):", + result: "", + }); + mockConfirm({ + text: "Would you like to try again?", + result: false, + }); + + await expect( + runWrangler('pipelines setup --name "test_pipeline"') + ).rejects.toThrowErrorMatchingInlineSnapshot( + `[Error: Table name - cancelled]` + ); + + expect(std.out).toContain("done"); + }); + + it("retries when catalog token validation fails", async () => { + setIsTTY(true); + + mockSinkDialogs("r2_data_catalog", "simple"); + + mockPrompt({ + text: "R2 bucket name (will be created if it doesn't exist):", + result: "test-bucket", + }); + mockGetR2Bucket("test-bucket", true); + mockGetR2Catalog("test-bucket", { exists: true, active: true }); + + mockPrompt({ + text: "Table name (e.g. events, user_activity):", + result: "events", + }); + mockPrompt({ + text: "Catalog API token:", + result: "bad-token", + }); + mockUpsertR2CatalogCredential("test-bucket", { fail: true }); + mockConfirm({ + text: "Would you like to try again?", + result: true, + }); + mockPrompt({ + text: "Catalog API token:", + result: "", + }); + mockConfirm({ + text: "Would you like to try again?", + result: false, + }); + + await expect( + runWrangler('pipelines setup --name "test_pipeline"') + ).rejects.toThrowErrorMatchingInlineSnapshot( + `[Error: Catalog API token - cancelled]` + ); + + expect(std.out).toContain("failed"); + }); + }); + + describe("Advanced mode Data Catalog sink", () => { + it("prompts for namespace and table name", async () => { + setIsTTY(true); + + mockSinkDialogs("r2_data_catalog", "advanced"); + + mockPrompt({ + text: "R2 bucket name (will be created if it doesn't exist):", + result: "test-bucket", + }); + mockGetR2Bucket("test-bucket", true); + mockGetR2Catalog("test-bucket", { exists: true, active: true }); + + mockPrompt({ + text: "Namespace:", + result: "custom_namespace", + }); + mockPrompt({ + text: "Table name:", + result: "", + }); + mockConfirm({ + text: "Would you like to try again?", + result: false, + }); + + await expect( + runWrangler('pipelines setup --name "test_pipeline"') + ).rejects.toThrowErrorMatchingInlineSnapshot( + `[Error: Table name - cancelled]` + ); + + expect(std.out).toContain("Data Catalog already enabled"); + }); + }); + + describe("Data Catalog full flow", () => { + it("completes full setup from bucket to pipeline creation", async () => { + setIsTTY(true); + vi.useFakeTimers({ now: new Date("2025-01-01T00:00:00Z") }); + + mockSinkDialogs("r2_data_catalog", "simple"); + + mockPrompt({ + text: "R2 bucket name (will be created if it doesn't exist):", + result: "test-bucket", + }); + mockGetR2Bucket("test-bucket", true); + mockGetR2Catalog("test-bucket", { exists: false }); + mockEnableR2Catalog("test-bucket"); + + mockPrompt({ + text: "Table name (e.g. events, user_activity):", + result: "events", + }); + mockPrompt({ + text: "Catalog API token:", + result: "test-catalog-token", + }); + mockUpsertR2CatalogCredential("test-bucket"); + + mockConfirm({ + text: "Create resources?", + result: true, + }); + + mockCreateStreamRequest("test_pipeline_stream"); + mockCreateSinkRequest("test_pipeline_sink"); + + mockSelect({ + text: "Query:", + result: "simple", + }); + + const sql = + "INSERT INTO test_pipeline_sink SELECT * FROM test_pipeline_stream;"; + mockValidateSql(sql); + mockCreatePipeline("test_pipeline"); + + await runWrangler('pipelines setup --name "test_pipeline"'); + + expect(std.out).toMatchInlineSnapshot(` + " + ⛅️ wrangler x.x.x + ────────────────── + Cloudflare Pipelines Setup + + + STREAM + + + SINK + + Using existing bucket "test-bucket" + done + + To create a Catalog API token: + Visit https://dash.cloudflare.com/some-account-id/r2/api-tokens/create?type=account + Create token with "Admin Read & Write" permissions + + done + + SUMMARY + + Stream test_pipeline_stream + HTTP enabled, unstructured + + Sink test_pipeline_sink + Data Catalog → default/events + + done + done + + SQL + + Available tables: + test_pipeline_stream (source) + test_pipeline_sink (sink) + + + INSERT INTO test_pipeline_sink SELECT * FROM test_pipeline_stream; + + done + done + + ✓ Setup complete + + To access your new Pipeline in your Worker, add the following snippet to your configuration file: + { + "pipelines": [ + { + "pipeline": "stream_123", + "binding": "TEST_PIPELINE_STREAM" + } + ] + } + + Then send events: + + await env.TEST_PIPELINE_STREAM.send([{"user_id":"sample_user_id","event_name":"sample_event_name","timestamp":1735689600000}]); + + Or via HTTP: + + curl -X POST https://pipelines.cloudflare.com/test_pipeline_stream / + -H "Content-Type: application/json" / + -d '[{"user_id":"sample_user_id","event_name":"sample_event_name","timestamp":1735689600000}]' + + Docs: https://developers.cloudflare.com/pipelines/ + " + `); + expect(std.err).toMatchInlineSnapshot(`""`); + }); + }); + + describe("SQL validation", () => { + it("shows error and exits when validation fails and user declines retry", async () => { + setIsTTY(true); + + mockSinkDialogs("r2", "advanced"); + + mockPrompt({ + text: "R2 bucket name (will be created if it doesn't exist):", + result: "test-bucket", + }); + mockGetR2Bucket("test-bucket", true); + mockAdvancedR2SinkPrompts(); + + mockConfirm({ + text: "Create resources?", + result: true, + }); + + mockCreateStreamRequest("test_pipeline_stream"); + mockCreateSinkRequest("test_pipeline_sink"); + + mockSelect({ + text: "Query:", + result: "simple", + }); + + const sql = + "INSERT INTO test_pipeline_sink SELECT * FROM test_pipeline_stream;"; + mockValidateSql(sql, { fail: true }); + + mockConfirm({ + text: " SQL validation failed [code: 1000]\n\n Retry with different SQL?", + result: false, + }); + + await expect( + runWrangler('pipelines setup --name "test_pipeline"') + ).rejects.toThrowErrorMatchingInlineSnapshot( + `[Error: SQL validation failed and setup cannot continue without valid pipeline SQL]` + ); + + expect(std.out).toContain("failed"); + }); + }); +}); diff --git a/packages/wrangler/src/__tests__/pipelines.test.ts b/packages/wrangler/src/__tests__/pipelines.test.ts index f515f6188f1a..5e3c82843bee 100644 --- a/packages/wrangler/src/__tests__/pipelines.test.ts +++ b/packages/wrangler/src/__tests__/pipelines.test.ts @@ -270,11 +270,8 @@ describe("wrangler pipelines", () => { expect(std.out).toContain( "✨ Successfully created pipeline 'my_pipeline' with id 'pipeline_123'." ); - expect(std.out).toContain( - "Send your first event to stream 'test_stream':" - ); - expect(std.out).toContain("Worker Integration:"); - expect(std.out).toContain("HTTP Endpoint:"); + expect(std.out).toContain("Then send events:"); + expect(std.out).toContain("Or via HTTP:"); }); it("should create pipeline from SQL file", async () => { diff --git a/packages/wrangler/src/pipelines/cli/setup.ts b/packages/wrangler/src/pipelines/cli/setup.ts index 0b1d71392a01..142aa8aac57d 100644 --- a/packages/wrangler/src/pipelines/cli/setup.ts +++ b/packages/wrangler/src/pipelines/cli/setup.ts @@ -1,24 +1,31 @@ import { readFileSync } from "node:fs"; import { + APIError, bucketFormatMessage, isValidR2BucketName, parseJSON, UserError, } from "@cloudflare/workers-utils"; +import chalk from "chalk"; import { createCommand } from "../../core/create-command"; import { confirm, prompt, select } from "../../dialogs"; import { logger } from "../../logger"; +import { createR2Bucket, getR2Bucket } from "../../r2/helpers/bucket"; +import { + enableR2Catalog, + getR2Catalog, + upsertR2CatalogCredential, +} from "../../r2/helpers/catalog"; import { requireAuth } from "../../user"; import { createPipeline, createSink, createStream, - deleteSink, deleteStream, validateSql, } from "../client"; import { SINK_DEFAULTS } from "../defaults"; -import { authorizeR2Bucket } from "../index"; +import { authorizeR2Bucket, verifyR2Credentials } from "../index"; import { validateEntityName } from "../validate"; import { displayUsageExamples, @@ -36,6 +43,291 @@ import type { } from "../types"; import type { Config } from "@cloudflare/workers-utils"; +function getErrorMessage(error: unknown, fallback: string): string { + if (error instanceof APIError && error.notes.length > 0) { + return error.notes[0].text; + } + if (error instanceof Error) { + return error.message; + } + return fallback; +} + +async function promptWithRetry( + getMessage: () => string, + getValue: () => Promise, + validate: (value: T) => void +): Promise { + while (true) { + const value = await getValue(); + try { + validate(value); + return value; + } catch (error) { + const message = + error instanceof Error ? error.message : "Validation failed"; + logger.error(message); + + const retry = await confirm("Would you like to try again?", { + defaultValue: true, + }); + if (!retry) { + throw new UserError(`${getMessage()} - cancelled`); + } + } + } +} + +async function ensureBucketExists( + config: Config, + accountId: string, + bucketName: string +): Promise { + try { + await getR2Bucket(config, accountId, bucketName); + logger.log(` Using existing bucket "${bucketName}"`); + return false; + } catch (err) { + if (err instanceof APIError && err.code === 10006) { + // Bucket doesn't exist, create it + } else { + throw err; + } + } + + process.stdout.write(` Creating bucket "${bucketName}"...`); + await createR2Bucket(config, accountId, bucketName); + logger.log(chalk.green(" done")); + return true; +} + +function validateBucketName(name: string): void { + if (!name) { + throw new UserError("Bucket name is required"); + } + if (!isValidR2BucketName(name)) { + throw new UserError( + `The bucket name "${name}" is invalid. ${bucketFormatMessage}` + ); + } +} + +async function promptBucketName(): Promise { + return promptWithRetry( + () => "R2 bucket name", + () => prompt("R2 bucket name (will be created if it doesn't exist):"), + validateBucketName + ); +} + +async function ensureCatalogEnabled( + config: Config, + accountId: string, + bucketName: string +): Promise { + let catalogEnabled = false; + try { + const catalog = await getR2Catalog(config, accountId, bucketName); + catalogEnabled = catalog.status === "active"; + } catch (err) { + if (err instanceof APIError && err.code === 10006) { + // Catalog not enabled yet + } else { + throw err; + } + } + + if (catalogEnabled) { + logger.log(" Data Catalog already enabled"); + } else { + process.stdout.write(" Enabling Data Catalog..."); + await enableR2Catalog(config, accountId, bucketName); + logger.log(chalk.green(" done")); + } +} + +function displayCatalogTokenInstructions(accountId: string): void { + logger.log(chalk.dim("\n To create a Catalog API token:")); + logger.log( + chalk.dim( + ` Visit https://dash.cloudflare.com/${accountId}/r2/api-tokens/create?type=account` + ) + ); + logger.log( + chalk.dim(' Create token with "Admin Read & Write" permissions\n') + ); +} + +function displayR2CredentialsInstructions(accountId: string): void { + logger.log(chalk.dim("\n To create R2 API credentials:")); + logger.log( + chalk.dim( + ` Visit https://dash.cloudflare.com/${accountId}/r2/api-tokens/create?type=account` + ) + ); + logger.log( + chalk.dim(' Create token with "Object Read & Write" permissions\n') + ); +} + +const COMPRESSION_CHOICES = [ + { title: "uncompressed", value: "uncompressed" }, + { title: "snappy", value: "snappy" }, + { title: "gzip", value: "gzip" }, + { title: "zstd", value: "zstd" }, + { title: "lz4", value: "lz4" }, +] as const; + +async function promptCompression(): Promise { + return select("Compression:", { + choices: [...COMPRESSION_CHOICES], + defaultOption: 3, // zstd + fallbackOption: 3, + }); +} + +async function promptRollingPolicy(): Promise<{ + fileSizeBytes: number; + intervalSeconds: number; +}> { + const fileSizeMB = await promptWithRetry( + () => "File size", + () => + prompt("Roll file when size reaches (MB, minimum 5):", { + defaultValue: "100", + }), + (value) => { + const num = parseInt(value, 10); + if (isNaN(num) || num < 5) { + throw new UserError("File size must be a number >= 5"); + } + } + ); + + const intervalSeconds = await promptWithRetry( + () => "Interval", + () => + prompt("Roll file when time reaches (seconds, minimum 10):", { + defaultValue: String(SINK_DEFAULTS.rolling_policy.interval_seconds), + }), + (value) => { + const num = parseInt(value, 10); + if (isNaN(num) || num < 10) { + throw new UserError("Interval must be a number >= 10"); + } + } + ); + + return { + fileSizeBytes: parseInt(fileSizeMB) * 1024 * 1024, + intervalSeconds: parseInt(intervalSeconds), + }; +} + +async function promptCatalogToken( + config: Config, + accountId: string, + bucketName: string +): Promise { + while (true) { + const token = await prompt("Catalog API token:", { isSecret: true }); + + if (!token) { + logger.error("Catalog API token is required"); + const retry = await confirm("Would you like to try again?", { + defaultValue: true, + }); + if (!retry) { + throw new UserError("Catalog API token - cancelled"); + } + continue; + } + + process.stdout.write(" Validating token..."); + try { + await upsertR2CatalogCredential(config, accountId, bucketName, token); + logger.log(chalk.green(" done")); + return token; + } catch { + logger.log(chalk.red(" failed")); + logger.log( + chalk.dim( + ' Token invalid or missing permissions. Ensure it has "Admin Read & Write" access.' + ) + ); + + const retry = await confirm("Would you like to try again?", { + defaultValue: true, + }); + if (!retry) { + throw new UserError("Catalog API token validation failed"); + } + } + } +} + +async function promptR2Credentials( + accountId: string, + bucketName: string +): Promise<{ access_key_id: string; secret_access_key: string }> { + while (true) { + const accessKeyId = await prompt("R2 Access Key ID:"); + if (!accessKeyId) { + logger.error("R2 Access Key ID is required"); + const retry = await confirm("Would you like to try again?", { + defaultValue: true, + }); + if (!retry) { + throw new UserError("R2 Access Key ID - cancelled"); + } + continue; + } + + const secretAccessKey = await prompt("R2 Secret Access Key:", { + isSecret: true, + }); + if (!secretAccessKey) { + logger.error("R2 Secret Access Key is required"); + const retry = await confirm("Would you like to try again?", { + defaultValue: true, + }); + if (!retry) { + throw new UserError("R2 Secret Access Key - cancelled"); + } + continue; + } + + process.stdout.write(" Validating credentials..."); + try { + await verifyR2Credentials( + accountId, + bucketName, + accessKeyId, + secretAccessKey + ); + logger.log(chalk.green(" done")); + return { + access_key_id: accessKeyId, + secret_access_key: secretAccessKey, + }; + } catch { + logger.log(chalk.red(" failed")); + logger.log( + chalk.dim( + ' Credentials invalid or missing permissions. Ensure token has "Object Read & Write" access.' + ) + ); + + const retry = await confirm("Would you like to try again?", { + defaultValue: true, + }); + if (!retry) { + throw new UserError("R2 credentials validation failed"); + } + } + } +} + interface SetupConfig { pipelineName: string; streamName: string; @@ -60,10 +352,7 @@ export const pipelinesSetupCommand = createCommand({ async handler(args, { config }) { await requireAuth(config); - logger.log("🚀 Welcome to Cloudflare Pipelines Setup!"); - logger.log( - "This will guide you through creating a complete pipeline: stream → pipeline → sink\n" - ); + logger.log("Cloudflare Pipelines Setup\n"); try { const setupConfig = await setupPipelineNaming(args.name); @@ -71,7 +360,7 @@ export const pipelinesSetupCommand = createCommand({ await setupStreamConfiguration(setupConfig); await setupSinkConfiguration(config, setupConfig); const created = await reviewAndCreateStreamSink(config, setupConfig); - await setupSQLTransformationWithValidation(config, setupConfig, created); + await setupSQLTransformationWithValidation(config, setupConfig); await createPipelineIfNeeded(config, setupConfig, created, args); } catch (error) { if (error instanceof UserError) { @@ -87,16 +376,24 @@ export const pipelinesSetupCommand = createCommand({ async function setupPipelineNaming( providedName?: string ): Promise { - const pipelineName = - providedName || - (await prompt("What would you like to name your pipeline?")); + const pipelineName = providedName + ? providedName + : await promptWithRetry( + () => "Pipeline name", + () => prompt("What would you like to name your pipeline?"), + (name) => { + if (!name) { + throw new UserError("Pipeline name is required"); + } + validateEntityName("pipeline", name); + } + ); - if (!pipelineName) { - throw new UserError("Pipeline name is required"); + // If name was provided via args, still validate it (but no retry) + if (providedName) { + validateEntityName("pipeline", providedName); } - validateEntityName("pipeline", pipelineName); - const streamName = `${pipelineName}_stream`; const sinkName = `${pipelineName}_sink`; @@ -120,7 +417,7 @@ async function setupPipelineNaming( async function setupStreamConfiguration( setupConfig: SetupConfig ): Promise { - logger.log("\n▶ Let's configure your data source (stream):"); + logger.log("\nSTREAM\n"); const httpEnabled = await confirm("Enable HTTP endpoint for sending data?", { defaultValue: true, @@ -160,13 +457,11 @@ async function setupStreamConfiguration( worker_binding: { enabled: true }, ...(schema && { schema: { fields: schema } }), }; - - logger.log("✨ Stream configuration complete\n"); } async function setupSchemaConfiguration(): Promise { const schemaMethod = await select( - "How would you like to define the schema?", + "How would you like to define the schema for incoming events?", { choices: [ { title: "Build interactively", value: "interactive" }, @@ -192,24 +487,20 @@ async function setupSchemaConfiguration(): Promise { async function buildSchemaInteractively(): Promise { const fields: SchemaField[] = []; - let fieldNumber = 1; - logger.log("\n▶ Building schema interactively:"); + logger.log(""); - let continueAdding = true; - while (continueAdding) { - const field = await buildField(fieldNumber); - fields.push(field); + let fieldNumber = 1; + while (true) { + fields.push(await buildField(fieldNumber)); const addAnother = await confirm(`Add field #${fieldNumber + 1}?`, { defaultValue: fieldNumber < 3, }); - if (!addAnother) { - continueAdding = false; - } else { - fieldNumber++; + break; } + fieldNumber++; } return fields; @@ -222,11 +513,15 @@ async function buildField( const indent = " ".repeat(depth); logger.log(`${indent}Field #${fieldNumber}:`); - const name = await prompt(`${indent} Name:`); - - if (!name) { - throw new UserError("Field name is required"); - } + const name = await promptWithRetry( + () => "Field name", + () => prompt(`${indent} Name:`), + (value) => { + if (!value) { + throw new UserError("Field name is required"); + } + } + ); const typeChoices = [ { title: "string", value: "string" }, @@ -240,7 +535,6 @@ async function buildField( { title: "bytes", value: "bytes" }, ]; - // Only show complex types if not nested too deep if (depth < 2) { typeChoices.push( { title: "struct (nested object)", value: "struct" }, @@ -264,7 +558,6 @@ async function buildField( required, }; - // Handle type-specific configuration if (type === "timestamp") { const unit = await select(`${indent} Unit:`, { choices: [ @@ -280,23 +573,19 @@ async function buildField( } else if (type === "struct" && depth < 2) { logger.log(`\nDefine nested fields for struct '${name}':`); field.fields = []; - let structFieldNumber = 1; - let continueAdding = true; - while (continueAdding) { - const structField = await buildField(structFieldNumber, depth + 1); - field.fields.push(structField); + let structFieldNumber = 1; + while (true) { + field.fields.push(await buildField(structFieldNumber, depth + 1)); const addAnother = await confirm( `${indent}Add another field to struct '${name}'?`, { defaultValue: false } ); - if (!addAnother) { - continueAdding = false; - } else { - structFieldNumber++; + break; } + structFieldNumber++; } } else if (type === "list" && depth < 2) { logger.log(`\nDefine item type for list '${name}':`); @@ -337,16 +626,66 @@ async function loadSchemaFromFile(): Promise { } } +async function loadSqlFromFile(): Promise { + const filePath = await prompt("SQL file path:"); + + try { + const sql = readFileSync(filePath, "utf-8").trim(); + + if (!sql) { + throw new UserError("SQL file is empty"); + } + + return sql; + } catch (error) { + logger.error( + `Failed to read SQL file: ${error instanceof Error ? error.message : String(error)}` + ); + + const retry = await confirm("Would you like to try again?", { + defaultValue: true, + }); + + if (retry) { + return loadSqlFromFile(); + } else { + throw new UserError("SQL file loading cancelled"); + } + } +} + async function setupSinkConfiguration( config: Config, setupConfig: SetupConfig ): Promise { - logger.log("▶ Let's configure your destination (sink):"); + logger.log("\nSINK\n"); - const sinkType = await select("Select destination type:", { + const sinkType = await select("Destination type:", { choices: [ { title: "R2 Bucket", value: "r2" }, - { title: "Data Catalog Table", value: "r2_data_catalog" }, + { title: "Data Catalog (Iceberg)", value: "r2_data_catalog" }, + ], + defaultOption: 0, + fallbackOption: 0, + }); + + const simpleDescription = + sinkType === "r2" + ? "parquet + zstd, 100MB file rolls, auto-generated credentials" + : "parquet + zstd, 100MB file rolls"; + + const setupMode = await select("Setup mode:", { + choices: [ + { + title: "Simple (recommended defaults)", + description: simpleDescription, + value: "simple", + }, + { + title: "Advanced (configure all options)", + description: "format, compression, rolling policy, credentials, etc.", + value: "advanced", + }, ], defaultOption: 0, fallbackOption: 0, @@ -354,31 +693,113 @@ async function setupSinkConfiguration( const accountId = await requireAuth(config); - if (sinkType === "r2") { - await setupR2Sink(config, accountId, setupConfig); + if (setupMode === "simple") { + if (sinkType === "r2") { + await setupSimpleR2Sink(config, accountId, setupConfig); + } else { + await setupSimpleDataCatalogSink(config, accountId, setupConfig); + } } else { - await setupDataCatalogSink(setupConfig); + if (sinkType === "r2") { + await setupR2Sink(config, accountId, setupConfig); + } else { + await setupDataCatalogSink(config, accountId, setupConfig); + } } +} + +async function setupSimpleR2Sink( + config: Config, + accountId: string, + setupConfig: SetupConfig +): Promise { + const bucket = await promptBucketName(); + await ensureBucketExists(config, accountId, bucket); + + process.stdout.write(" Generating credentials..."); + const cleanedSinkName = setupConfig.sinkName.replace(/_/g, "-"); + const auth = await authorizeR2Bucket( + config, + cleanedSinkName, + accountId, + bucket, + { quiet: true } + ); + logger.log(chalk.green(" done")); - logger.log("✨ Sink configuration complete\n"); + setupConfig.sinkConfig = { + name: setupConfig.sinkName, + type: "r2", + format: { + type: "parquet", + compression: "zstd", + }, + config: { + bucket, + partitioning: { + time_pattern: "year=%Y/month=%m/day=%d", + }, + credentials: { + access_key_id: auth.accessKeyId, + secret_access_key: auth.secretAccessKey, + }, + rolling_policy: { + file_size_bytes: 100 * 1024 * 1024, + interval_seconds: SINK_DEFAULTS.rolling_policy.interval_seconds, + }, + }, + }; } -async function setupR2Sink( +async function setupSimpleDataCatalogSink( config: Config, accountId: string, setupConfig: SetupConfig ): Promise { - const bucket = await prompt("R2 bucket name:"); + const bucket = await promptBucketName(); + await ensureBucketExists(config, accountId, bucket); + await ensureCatalogEnabled(config, accountId, bucket); + + const tableName = await promptWithRetry( + () => "Table name", + () => prompt("Table name (e.g. events, user_activity):"), + (value) => { + if (!value) { + throw new UserError("Table name is required"); + } + } + ); - if (!bucket) { - throw new UserError("Bucket name is required"); - } + displayCatalogTokenInstructions(accountId); + const token = await promptCatalogToken(config, accountId, bucket); - if (!isValidR2BucketName(bucket)) { - throw new UserError( - `The bucket name "${bucket}" is invalid. ${bucketFormatMessage}` - ); - } + setupConfig.sinkConfig = { + name: setupConfig.sinkName, + type: "r2_data_catalog", + format: { + type: "parquet", + compression: "zstd", + }, + config: { + bucket, + namespace: "default", + table_name: tableName, + token, + rolling_policy: { + file_size_bytes: 100 * 1024 * 1024, + interval_seconds: SINK_DEFAULTS.rolling_policy.interval_seconds, + }, + }, + }; +} + +async function setupR2Sink( + config: Config, + accountId: string, + setupConfig: SetupConfig +): Promise { + const bucket = await promptBucketName(); + await ensureBucketExists(config, accountId, bucket); const path = await prompt( "The base prefix in your bucket where data will be written (optional):", @@ -403,33 +824,9 @@ async function setupR2Sink( fallbackOption: 0, }); - let compression; - if (format === "parquet") { - compression = await select("Compression:", { - choices: [ - { title: "uncompressed", value: "uncompressed" }, - { title: "snappy", value: "snappy" }, - { title: "gzip", value: "gzip" }, - { title: "zstd", value: "zstd" }, - { title: "lz4", value: "lz4" }, - ], - defaultOption: 3, - fallbackOption: 3, - }); - } - - const fileSizeMB = await prompt( - "Roll file when size reaches (MB, minimum 5):", - { - defaultValue: "100", - } - ); - const intervalSeconds = await prompt( - "Roll file when time reaches (seconds, minimum 10):", - { - defaultValue: String(SINK_DEFAULTS.rolling_policy.interval_seconds), - } - ); + const compression = + format === "parquet" ? await promptCompression() : undefined; + const rollingPolicy = await promptRollingPolicy(); const useOAuth = await confirm( "Automatically generate credentials needed to write to your R2 bucket?", @@ -440,26 +837,23 @@ async function setupR2Sink( let credentials; if (useOAuth) { - logger.log("🔐 Generating R2 credentials..."); - // Clean up sink name for service token generation (remove underscores) + process.stdout.write(" Generating credentials..."); const cleanedSinkName = setupConfig.sinkName.replace(/_/g, "-"); const auth = await authorizeR2Bucket( config, cleanedSinkName, accountId, - bucket + bucket, + { quiet: true } ); + logger.log(chalk.green(" done")); credentials = { access_key_id: auth.accessKeyId, secret_access_key: auth.secretAccessKey, }; } else { - credentials = { - access_key_id: await prompt("R2 Access Key ID:"), - secret_access_key: await prompt("R2 Secret Access Key:", { - isSecret: true, - }), - }; + displayR2CredentialsInstructions(accountId); + credentials = await promptR2Credentials(accountId, bucket); } let formatConfig: SinkFormat; @@ -488,48 +882,48 @@ async function setupR2Sink( }), credentials, rolling_policy: { - file_size_bytes: parseInt(fileSizeMB) * 1024 * 1024, // Convert MB to bytes - interval_seconds: parseInt(intervalSeconds), + file_size_bytes: rollingPolicy.fileSizeBytes, + interval_seconds: rollingPolicy.intervalSeconds, }, }, }; } -async function setupDataCatalogSink(setupConfig: SetupConfig): Promise { - const bucket = await prompt("R2 bucket name (for catalog storage):"); - const namespace = await prompt("Namespace:", { defaultValue: "default" }); - const tableName = await prompt("Table name:"); - const token = await prompt("Catalog API token:", { isSecret: true }); - - if (!bucket || !namespace || !tableName || !token) { - throw new UserError("All Data Catalog fields are required"); - } - - const compression = await select("Compression:", { - choices: [ - { title: "uncompressed", value: "uncompressed" }, - { title: "snappy", value: "snappy" }, - { title: "gzip", value: "gzip" }, - { title: "zstd", value: "zstd" }, - { title: "lz4", value: "lz4" }, - ], - defaultOption: 3, - fallbackOption: 3, - }); - - const fileSizeMB = await prompt( - "Roll file when size reaches (MB, minimum 5):", - { - defaultValue: "100", +async function setupDataCatalogSink( + config: Config, + accountId: string, + setupConfig: SetupConfig +): Promise { + const bucket = await promptBucketName(); + await ensureBucketExists(config, accountId, bucket); + await ensureCatalogEnabled(config, accountId, bucket); + + const namespace = await promptWithRetry( + () => "Namespace", + () => prompt("Namespace:", { defaultValue: "default" }), + (value) => { + if (!value) { + throw new UserError("Namespace is required"); + } } ); - const intervalSeconds = await prompt( - "Roll file when time reaches (seconds, minimum 10):", - { - defaultValue: String(SINK_DEFAULTS.rolling_policy.interval_seconds), + + const tableName = await promptWithRetry( + () => "Table name", + () => prompt("Table name:"), + (value) => { + if (!value) { + throw new UserError("Table name is required"); + } } ); + displayCatalogTokenInstructions(accountId); + const token = await promptCatalogToken(config, accountId, bucket); + + const compression = await promptCompression(); + const rollingPolicy = await promptRollingPolicy(); + setupConfig.sinkConfig = { name: setupConfig.sinkName, type: "r2_data_catalog", @@ -543,8 +937,8 @@ async function setupDataCatalogSink(setupConfig: SetupConfig): Promise { table_name: tableName, token, rolling_policy: { - file_size_bytes: parseInt(fileSizeMB) * 1024 * 1024, - interval_seconds: parseInt(intervalSeconds), + file_size_bytes: rollingPolicy.fileSizeBytes, + interval_seconds: rollingPolicy.intervalSeconds, }, }, }; @@ -552,67 +946,57 @@ async function setupDataCatalogSink(setupConfig: SetupConfig): Promise { async function setupSQLTransformationWithValidation( config: Config, - setupConfig: SetupConfig, - created: { stream?: Stream; sink?: Sink } + setupConfig: SetupConfig ): Promise { - logger.log("\n▶ Pipeline SQL:"); + logger.log("\nSQL\n"); - logger.log("\nAvailable tables:"); - logger.log(` • ${setupConfig.streamName} (source stream)`); - logger.log(` • ${setupConfig.sinkName} (destination sink)`); + logger.log(" Available tables:"); + logger.log(` ${setupConfig.streamName} (source)`); + logger.log(` ${setupConfig.sinkName} (sink)\n`); if (setupConfig.streamConfig.schema?.fields) { - logger.log("\nStream input schema:"); + logger.log(" Schema:"); const schemaRows = formatSchemaFieldsForTable( setupConfig.streamConfig.schema.fields ); logger.table(schemaRows); } - const sqlMethod = await select( - "How would you like to provide SQL that will define how your pipeline will transform and route data?", - { - choices: [ - { - title: - "Use simple ingestion query (copy all data from stream to sink)", - value: "simple", - }, - { title: "Write interactively", value: "interactive" }, - { title: "Load from file", value: "file" }, - ], - defaultOption: 0, - fallbackOption: 0, - } - ); + const sqlMethod = await select("Query:", { + choices: [ + { + title: "Simple ingestion (SELECT * FROM stream)", + value: "simple", + }, + { title: "Write custom SQL", value: "interactive" }, + { title: "Load from file", value: "file" }, + ], + defaultOption: 0, + fallbackOption: 0, + }); let sql: string; if (sqlMethod === "simple") { sql = `INSERT INTO ${setupConfig.sinkName} SELECT * FROM ${setupConfig.streamName};`; - logger.log(`\nUsing query: ${sql}`); + logger.log(chalk.dim(`\n ${sql}\n`)); } else if (sqlMethod === "interactive") { logger.log( - `\n💡 Example: INSERT INTO ${setupConfig.sinkName} SELECT * FROM ${setupConfig.streamName};` + chalk.dim( + `\n Example: INSERT INTO ${setupConfig.sinkName} SELECT * FROM ${setupConfig.streamName};\n` + ) ); sql = await promptMultiline("Enter SQL query:", "SQL"); } else { - const filePath = await prompt("SQL file path:"); - try { - sql = readFileSync(filePath, "utf-8").trim(); - } catch (error) { - throw new UserError( - `Failed to read SQL file: ${error instanceof Error ? error.message : String(error)}` - ); - } + sql = await loadSqlFromFile(); } if (!sql) { throw new UserError("SQL query cannot be empty"); } - logger.log("🌀 Validating SQL..."); + process.stdout.write(" Validating..."); try { const validationResult = await validateSql(config, { sql }); @@ -620,14 +1004,12 @@ async function setupSQLTransformationWithValidation( !validationResult.tables || Object.keys(validationResult.tables).length === 0 ) { + logger.log(chalk.yellow(" warning")); logger.warn( - "SQL validation returned no tables - this might indicate an issue with the query" + " SQL validation returned no tables - this might indicate an issue with the query" ); } else { - const tableNames = Object.keys(validationResult.tables); - logger.log( - `✨ SQL validation passed. References tables: ${tableNames.join(", ")}` - ); + logger.log(chalk.green(" done")); } setupConfig.pipelineConfig = { @@ -635,6 +1017,8 @@ async function setupSQLTransformationWithValidation( sql, }; } catch (error) { + logger.log(chalk.red(" failed")); + let errorMessage = "SQL validation failed"; if (error && typeof error === "object") { @@ -653,20 +1037,18 @@ async function setupSQLTransformationWithValidation( } const retry = await confirm( - `SQL validation failed: ${errorMessage}\n\nRetry with different SQL?`, + ` ${errorMessage}\n\n Retry with different SQL?`, { defaultValue: true } ); if (retry) { - return setupSQLTransformationWithValidation(config, setupConfig, created); + return setupSQLTransformationWithValidation(config, setupConfig); } else { throw new UserError( "SQL validation failed and setup cannot continue without valid pipeline SQL" ); } } - - logger.log("✨ SQL configuration complete\n"); } async function promptMultiline( @@ -693,37 +1075,42 @@ async function reviewAndCreateStreamSink( config: Config, setupConfig: SetupConfig ): Promise<{ stream?: Stream; sink?: Sink }> { - // Display summary - logger.log("▶ Configuration Summary:"); - logger.log(`\nStream: ${setupConfig.streamName}`); - logger.log( - ` • HTTP: ${setupConfig.streamConfig.http.enabled ? "Enabled" : "Disabled"}` - ); - if (setupConfig.streamConfig.http.enabled) { - logger.log( - ` • Authentication: ${setupConfig.streamConfig.http.authentication ? "Required" : "None"}` - ); - } - logger.log( - ` • Schema: ${setupConfig.streamConfig.schema?.fields ? `${setupConfig.streamConfig.schema.fields.length} fields` : "Unstructured"}` - ); + logger.log("\nSUMMARY\n"); + + const httpStatus = setupConfig.streamConfig.http.enabled + ? setupConfig.streamConfig.http.authentication + ? "enabled (auth required)" + : "enabled" + : "disabled"; + const schemaStatus = setupConfig.streamConfig.schema?.fields + ? `${setupConfig.streamConfig.schema.fields.length} field${setupConfig.streamConfig.schema.fields.length === 1 ? "" : "s"}` + : "unstructured"; + + logger.log(` Stream ${setupConfig.streamName}`); + logger.log(chalk.dim(` HTTP ${httpStatus}, ${schemaStatus}\n`)); - logger.log(`\nSink: ${setupConfig.sinkName}`); - logger.log( - ` • Type: ${setupConfig.sinkConfig.type === "r2" ? "R2 Bucket" : "Data Catalog"}` - ); if (setupConfig.sinkConfig.type === "r2") { - logger.log(` • Bucket: ${setupConfig.sinkConfig.config.bucket}`); + const format = setupConfig.sinkConfig.format?.type || "parquet"; + const compression = + setupConfig.sinkConfig.format?.type === "parquet" + ? (setupConfig.sinkConfig.format as ParquetFormat).compression || "" + : ""; + logger.log(` Sink ${setupConfig.sinkName}`); logger.log( - ` • Format: ${setupConfig.sinkConfig.format?.type || "parquet"}` + chalk.dim( + ` R2 → ${setupConfig.sinkConfig.config.bucket}, ${format}${compression ? ` + ${compression}` : ""}\n` + ) ); } else { + logger.log(` Sink ${setupConfig.sinkName}`); logger.log( - ` • Table: ${setupConfig.sinkConfig.config.namespace}/${setupConfig.sinkConfig.config.table_name}` + chalk.dim( + ` Data Catalog → ${setupConfig.sinkConfig.config.namespace}/${setupConfig.sinkConfig.config.table_name}\n` + ) ); } - const proceed = await confirm("Create stream and sink?", { + const proceed = await confirm("Create resources?", { defaultValue: true, }); @@ -731,53 +1118,62 @@ async function reviewAndCreateStreamSink( throw new UserError("Setup cancelled"); } - // Create resources with rollback on failure const created: { stream?: Stream; sink?: Sink } = {}; - try { - // Create stream - logger.log("\n🌀 Creating stream..."); - created.stream = await createStream(config, setupConfig.streamConfig); - logger.log(`✨ Created stream: ${created.stream.name}`); - - // Create sink - logger.log("🌀 Creating sink..."); - created.sink = await createSink(config, setupConfig.sinkConfig); - logger.log(`✨ Created sink: ${created.sink.name}`); - - logger.log("\n✨ Stream and sink created successfully!"); - return created; - } catch (error) { - logger.error( - `❌ Setup failed: ${error instanceof Error ? error.message : String(error)}` - ); - - logger.log("🌀 Rolling back created resources..."); + while (!created.stream) { + try { + process.stdout.write("\n Creating stream..."); + created.stream = await createStream(config, setupConfig.streamConfig); + logger.log(chalk.green(" done")); + } catch (error) { + logger.log(chalk.red(" failed")); + logger.log( + chalk.dim(` ${getErrorMessage(error, "Stream creation failed")}`) + ); - if (created.stream) { - try { - await deleteStream(config, created.stream.id); - logger.log(`✨ Cleaned up stream: ${created.stream.name}`); - } catch (cleanupError) { - logger.warn( - `Failed to cleanup stream: ${cleanupError instanceof Error ? cleanupError.message : String(cleanupError)}` - ); + const retry = await confirm(" Retry?", { + defaultValue: true, + }); + if (!retry) { + throw new UserError("Stream creation cancelled"); } } + } - if (created.sink) { - try { - await deleteSink(config, created.sink.id); - logger.log(`✨ Cleaned up sink: ${created.sink.name}`); - } catch (cleanupError) { - logger.warn( - `Failed to cleanup sink: ${cleanupError instanceof Error ? cleanupError.message : String(cleanupError)}` - ); + while (!created.sink) { + try { + process.stdout.write(" Creating sink..."); + created.sink = await createSink(config, setupConfig.sinkConfig); + logger.log(chalk.green(" done")); + } catch (error) { + logger.log(chalk.red(" failed")); + logger.log( + chalk.dim(` ${getErrorMessage(error, "Sink creation failed")}`) + ); + + const retry = await confirm( + " Retry? (stream was created successfully)", + { + defaultValue: true, + } + ); + if (!retry) { + process.stdout.write(" Cleaning up stream..."); + try { + await deleteStream(config, created.stream.id); + logger.log(chalk.green(" done")); + } catch (cleanupError) { + logger.log(chalk.red(" failed")); + logger.warn( + ` ${cleanupError instanceof Error ? cleanupError.message : String(cleanupError)}` + ); + } + throw new UserError("Sink creation cancelled"); } } - - throw error; } + + return created; } async function createPipelineIfNeeded( @@ -790,26 +1186,47 @@ async function createPipelineIfNeeded( throw new UserError("Pipeline configuration is missing"); } - try { - logger.log("🌀 Creating pipeline..."); - const pipeline = await createPipeline(config, setupConfig.pipelineConfig); - logger.log(`✨ Created pipeline: ${pipeline.name}`); + while (true) { + try { + process.stdout.write(" Creating pipeline..."); + await createPipeline(config, setupConfig.pipelineConfig); + logger.log(chalk.green(" done")); + + logger.log(chalk.green("\n✓ Setup complete\n")); + + if (created.stream) { + await displayUsageExamples(created.stream, config, args); + } + return; + } catch (error) { + logger.log(chalk.red(" failed")); + logger.log( + chalk.dim(` ${getErrorMessage(error, "Pipeline creation failed")}`) + ); + logger.log( + chalk.dim( + "\n Stream and sink were created, but pipeline creation failed." + ) + ); - logger.log("\n✨ Setup complete!"); + const retry = await confirm(" Try again with different SQL?", { + defaultValue: true, + }); + if (!retry) { + logger.log( + chalk.dim( + "\n You can create the pipeline later with: wrangler pipelines create" + ) + ); + logger.log( + chalk.dim( + ` Your stream "${setupConfig.streamName}" and sink "${setupConfig.sinkName}" are ready.` + ) + ); + return; + } - if (created.stream) { - await displayUsageExamples(created.stream, config, args); + await setupSQLTransformationWithValidation(config, setupConfig); } - } catch (error) { - logger.error( - `❌ Pipeline creation failed: ${error instanceof Error ? error.message : String(error)}` - ); - logger.log( - "\n⚠️ Stream and sink were created successfully, but pipeline creation failed." - ); - logger.log( - "You can try creating the pipeline manually with: wrangler pipelines create" - ); - throw error; } } diff --git a/packages/wrangler/src/pipelines/cli/streams/utils.ts b/packages/wrangler/src/pipelines/cli/streams/utils.ts index 1b40badf66c2..359c48867c92 100644 --- a/packages/wrangler/src/pipelines/cli/streams/utils.ts +++ b/packages/wrangler/src/pipelines/cli/streams/utils.ts @@ -1,3 +1,4 @@ +import chalk from "chalk"; import { logger } from "../../../logger"; import { createdResourceConfig } from "../../../utils/add-created-resource-config"; import formatLabelledValues from "../../../utils/render-labelled-values"; @@ -212,11 +213,7 @@ export async function displayUsageExamples( ) { const bindingName = generateStreamBindingName(stream.name); const exampleData = generateExampleData(stream); - - logger.log(`\nSend your first event to stream '${stream.name}':`); - - // Worker binding example (always shown since worker_binding is always enabled) - logger.log("\nWorker Integration:"); + const compactData = JSON.stringify(JSON.parse(exampleData)); await createdResourceConfig( "pipelines", @@ -229,12 +226,11 @@ export async function displayUsageExamples( { updateConfig: false } ); - logger.log("\nIn your Worker:"); - logger.log(`await env.${bindingName}.send([${exampleData}]);`); + logger.log("\nThen send events:\n"); + logger.log(` await env.${bindingName}.send([${compactData}]);\n`); - // HTTP endpoint example (only if HTTP is enabled) if (stream.http.enabled) { - logger.log("\nHTTP Endpoint:"); + logger.log("Or via HTTP:\n"); const curlCommand = [ `curl -X POST ${stream.endpoint}`, @@ -242,23 +238,19 @@ export async function displayUsageExamples( ? `-H "Authorization: Bearer YOUR_API_TOKEN"` : null, `-H "Content-Type: application/json"`, - `-d '[${exampleData}]'`, + `-d '[${compactData}]'`, ] .filter(Boolean) .join(" \\\n "); - logger.log(curlCommand); + logger.log(` ${curlCommand}\n`); if (stream.http.authentication) { - logger.log("(Replace YOUR_API_TOKEN with your Cloudflare API token)"); - } - - if ( - stream.http.cors && - stream.http.cors.origins && - !stream.http.cors.origins.includes("*") - ) { - logger.log(`CORS origins: ${stream.http.cors.origins.join(", ")}`); + logger.log( + chalk.dim(" (Replace YOUR_API_TOKEN with your Cloudflare API token)") + ); } } + + logger.log(chalk.dim("Docs: https://developers.cloudflare.com/pipelines/\n")); } diff --git a/packages/wrangler/src/pipelines/index.ts b/packages/wrangler/src/pipelines/index.ts index 7f445d11cead..75e9df4d2ef5 100644 --- a/packages/wrangler/src/pipelines/index.ts +++ b/packages/wrangler/src/pipelines/index.ts @@ -1,5 +1,9 @@ import { setTimeout } from "node:timers/promises"; -import { HeadBucketCommand, S3Client } from "@aws-sdk/client-s3"; +import { + HeadBucketCommand, + ListObjectsV2Command, + S3Client, +} from "@aws-sdk/client-s3"; import { APIError, FatalError, @@ -12,8 +16,9 @@ import type { ComplianceConfig } from "@cloudflare/workers-utils"; export const BYTES_PER_MB = 1000 * 1000; -// flag to skip delays for tests +// flags to skip delays/validation for tests let __testSkipDelaysFlag = false; +let __testSkipCredentialValidationFlag = false; /** * Verify the credentials used by the S3Client can access a R2 bucket by performing the @@ -47,12 +52,43 @@ async function verifyBucketAccess(r2: S3Client, bucketName: string) { } } +export interface AuthorizeR2BucketOptions { + /** Suppress log messages (for callers that handle their own output) */ + quiet?: boolean; +} + +export async function verifyR2Credentials( + accountId: string, + bucketName: string, + accessKeyId: string, + secretAccessKey: string +): Promise { + if (__testSkipCredentialValidationFlag) { + return; + } + + const endpoint = getAccountR2Endpoint(accountId); + const r2 = new S3Client({ + region: "auto", + credentials: { + accessKeyId, + secretAccessKey, + }, + endpoint, + }); + + await r2.send(new ListObjectsV2Command({ Bucket: bucketName, MaxKeys: 1 })); +} + export async function authorizeR2Bucket( complianceConfig: ComplianceConfig, pipelineName: string, accountId: string, - bucketName: string + bucketName: string, + options: AuthorizeR2BucketOptions = {} ) { + const { quiet = false } = options; + try { await getR2Bucket(complianceConfig, accountId, bucketName); } catch (err) { @@ -64,7 +100,9 @@ export async function authorizeR2Bucket( throw err; } - logger.log(`🌀 Authorizing R2 bucket "${bucketName}"`); + if (!quiet) { + logger.log(`🌀 Authorizing R2 bucket "${bucketName}"`); + } const serviceToken = await generateR2ServiceToken( accountId, @@ -89,7 +127,9 @@ export async function authorizeR2Bucket( }); // Wait for token to settle/propagate, retry up to 10 times, with 2s waits in-between errors - logger.log(`🌀 Checking access to R2 bucket "${bucketName}"`); + if (!quiet) { + logger.log(`🌀 Checking access to R2 bucket "${bucketName}"`); + } await verifyBucketAccess(r2, bucketName); return serviceToken; @@ -126,7 +166,11 @@ export const pipelinesNamespace = createNamespace({ }, }); -// Test exception to remove delays +// Test helpers to skip delays/validation export function __testSkipDelays() { __testSkipDelaysFlag = true; } + +export function __testSkipCredentialValidation() { + __testSkipCredentialValidationFlag = true; +}