From ef15e69cfd5a346f89bd092656cfb162b26668b0 Mon Sep 17 00:00:00 2001 From: Wojtek Majewski Date: Thu, 19 Feb 2026 13:27:21 +0100 Subject: [PATCH] fix(core): stabilize condition failure handling and skip archival --- ...100_function__cascade_force_skip_steps.sql | 17 +- ...00_function_cascade_resolve_conditions.sql | 6 +- pkgs/core/schemas/0100_function_fail_task.sql | 29 +--- .../20260214181656_pgflow_step_conditions.sql | 52 ++---- pkgs/core/supabase/migrations/atlas.sum | 4 +- ...preexisting_skipped_step_messages.test.sql | 69 ++++++++ ...met_fail_archives_active_messages.test.sql | 13 +- ...ot_root_step_pattern_matches_fail.test.sql | 4 +- .../root_step_condition_unmet_fail.test.sql | 24 ++- ...t_unmet_fail_emits_failure_events.test.sql | 15 +- .../runtime/condition-options.test.ts | 36 ++++ pkgs/dsl/__tests__/runtime/utils.test.ts | 28 +-- .../types/condition-pattern.test-d.ts | 21 ++- .../types/getStepDefinition.test-d.ts | 60 +++++++ pkgs/dsl/src/compile-flow.ts | 29 +++- pkgs/dsl/src/dsl.ts | 160 ++++++++++++++++-- pkgs/dsl/src/utils.ts | 33 ++-- 17 files changed, 460 insertions(+), 140 deletions(-) create mode 100644 pkgs/core/supabase/tests/_cascade_force_skip_steps/does_not_archive_preexisting_skipped_step_messages.test.sql diff --git a/pkgs/core/schemas/0100_function__cascade_force_skip_steps.sql b/pkgs/core/schemas/0100_function__cascade_force_skip_steps.sql index 4ca27be34..7f41905da 100644 --- a/pkgs/core/schemas/0100_function__cascade_force_skip_steps.sql +++ b/pkgs/core/schemas/0100_function__cascade_force_skip_steps.sql @@ -108,19 +108,10 @@ BEGIN WHERE r.run_id = _cascade_force_skip_steps.run_id AND skipped_count.count > 0 ) - SELECT COUNT(*) INTO v_total_skipped FROM skipped; - - -- Archive queued/started task messages for all steps that were just skipped - -- (query step_states since CTE state is no longer accessible) - PERFORM pgmq.archive(v_flow_slug, ARRAY_AGG(st.message_id)) - FROM pgflow.step_tasks st - JOIN pgflow.step_states ss ON ss.run_id = st.run_id AND ss.step_slug = st.step_slug - WHERE st.run_id = _cascade_force_skip_steps.run_id - AND st.status IN ('queued', 'started') - AND st.message_id IS NOT NULL - AND ss.status = 'skipped' - AND ss.skipped_at >= now() - interval '1 second' -- Only recently skipped - HAVING COUNT(st.message_id) > 0; + SELECT skipped_count.count + INTO v_total_skipped + FROM (SELECT COUNT(*) AS count FROM skipped) skipped_count + LEFT JOIN archived_messages ON true; RETURN v_total_skipped; END; diff --git a/pkgs/core/schemas/0100_function_cascade_resolve_conditions.sql b/pkgs/core/schemas/0100_function_cascade_resolve_conditions.sql index c7f2ba1f5..cf410028b 100644 --- a/pkgs/core/schemas/0100_function_cascade_resolve_conditions.sql +++ b/pkgs/core/schemas/0100_function_cascade_resolve_conditions.sql @@ -93,7 +93,11 @@ BEGIN FROM steps_with_conditions swc LEFT JOIN step_deps_output sdo ON sdo.step_slug = swc.step_slug ) - SELECT flow_slug, step_slug, required_input_pattern, forbidden_input_pattern + SELECT + flow_slug, + step_slug, + required_input_pattern, + forbidden_input_pattern INTO v_first_fail FROM condition_evaluations WHERE NOT condition_met AND when_unmet = 'fail' diff --git a/pkgs/core/schemas/0100_function_fail_task.sql b/pkgs/core/schemas/0100_function_fail_task.sql index 029b87d14..d1e0c7bb3 100644 --- a/pkgs/core/schemas/0100_function_fail_task.sql +++ b/pkgs/core/schemas/0100_function_fail_task.sql @@ -49,19 +49,17 @@ IF EXISTS (SELECT 1 FROM pgflow.runs WHERE pgflow.runs.run_id = fail_task.run_id RETURN; END IF; --- Late callback guard: if step is not 'started', don't mutate step/run state --- Capture previous status BEFORE any CTE updates (for transition-based decrement) -SELECT ss.status INTO v_prev_step_status -FROM pgflow.step_states ss +-- Late callback guard: lock run + step rows and use current step status +-- under lock so concurrent fail_task calls cannot read stale status. +SELECT ss.status, r.flow_slug INTO v_prev_step_status, v_flow_slug +FROM pgflow.runs r +JOIN pgflow.step_states ss ON ss.run_id = r.run_id WHERE ss.run_id = fail_task.run_id - AND ss.step_slug = fail_task.step_slug; + AND ss.step_slug = fail_task.step_slug +FOR UPDATE OF r, ss; IF v_prev_step_status IS NOT NULL AND v_prev_step_status != 'started' THEN -- Archive the task message if present - SELECT r.flow_slug INTO v_flow_slug - FROM pgflow.runs r - WHERE r.run_id = fail_task.run_id; - PERFORM pgmq.archive(v_flow_slug, ARRAY_AGG(st.message_id)) FROM pgflow.step_tasks st WHERE st.run_id = fail_task.run_id @@ -77,18 +75,7 @@ IF v_prev_step_status IS NOT NULL AND v_prev_step_status != 'started' THEN RETURN; END IF; -WITH run_lock AS ( - SELECT * FROM pgflow.runs - WHERE pgflow.runs.run_id = fail_task.run_id - FOR UPDATE -), -step_lock AS ( - SELECT * FROM pgflow.step_states - WHERE pgflow.step_states.run_id = fail_task.run_id - AND pgflow.step_states.step_slug = fail_task.step_slug - FOR UPDATE -), -flow_info AS ( +WITH flow_info AS ( SELECT r.flow_slug FROM pgflow.runs r WHERE r.run_id = fail_task.run_id diff --git a/pkgs/core/supabase/migrations/20260214181656_pgflow_step_conditions.sql b/pkgs/core/supabase/migrations/20260214181656_pgflow_step_conditions.sql index 4334ea2ab..d147bc8a7 100644 --- a/pkgs/core/supabase/migrations/20260214181656_pgflow_step_conditions.sql +++ b/pkgs/core/supabase/migrations/20260214181656_pgflow_step_conditions.sql @@ -430,19 +430,10 @@ BEGIN WHERE r.run_id = _cascade_force_skip_steps.run_id AND skipped_count.count > 0 ) - SELECT COUNT(*) INTO v_total_skipped FROM skipped; - - -- Archive queued/started task messages for all steps that were just skipped - -- (query step_states since CTE state is no longer accessible) - PERFORM pgmq.archive(v_flow_slug, ARRAY_AGG(st.message_id)) - FROM pgflow.step_tasks st - JOIN pgflow.step_states ss ON ss.run_id = st.run_id AND ss.step_slug = st.step_slug - WHERE st.run_id = _cascade_force_skip_steps.run_id - AND st.status IN ('queued', 'started') - AND st.message_id IS NOT NULL - AND ss.status = 'skipped' - AND ss.skipped_at >= now() - interval '1 second' -- Only recently skipped - HAVING COUNT(st.message_id) > 0; + SELECT skipped_count.count + INTO v_total_skipped + FROM (SELECT COUNT(*) AS count FROM skipped) skipped_count + LEFT JOIN archived_messages ON true; RETURN v_total_skipped; END; @@ -532,7 +523,11 @@ BEGIN FROM steps_with_conditions swc LEFT JOIN step_deps_output sdo ON sdo.step_slug = swc.step_slug ) - SELECT flow_slug, step_slug, required_input_pattern, forbidden_input_pattern + SELECT + flow_slug, + step_slug, + required_input_pattern, + forbidden_input_pattern INTO v_first_fail FROM condition_evaluations WHERE NOT condition_met AND when_unmet = 'fail' @@ -1327,19 +1322,17 @@ IF EXISTS (SELECT 1 FROM pgflow.runs WHERE pgflow.runs.run_id = fail_task.run_id RETURN; END IF; --- Late callback guard: if step is not 'started', don't mutate step/run state --- Capture previous status BEFORE any CTE updates (for transition-based decrement) -SELECT ss.status INTO v_prev_step_status -FROM pgflow.step_states ss +-- Late callback guard: lock run + step rows and use current step status +-- under lock so concurrent fail_task calls cannot read stale status. +SELECT ss.status, r.flow_slug INTO v_prev_step_status, v_flow_slug +FROM pgflow.runs r +JOIN pgflow.step_states ss ON ss.run_id = r.run_id WHERE ss.run_id = fail_task.run_id - AND ss.step_slug = fail_task.step_slug; + AND ss.step_slug = fail_task.step_slug +FOR UPDATE OF r, ss; IF v_prev_step_status IS NOT NULL AND v_prev_step_status != 'started' THEN -- Archive the task message if present - SELECT r.flow_slug INTO v_flow_slug - FROM pgflow.runs r - WHERE r.run_id = fail_task.run_id; - PERFORM pgmq.archive(v_flow_slug, ARRAY_AGG(st.message_id)) FROM pgflow.step_tasks st WHERE st.run_id = fail_task.run_id @@ -1355,18 +1348,7 @@ IF v_prev_step_status IS NOT NULL AND v_prev_step_status != 'started' THEN RETURN; END IF; -WITH run_lock AS ( - SELECT * FROM pgflow.runs - WHERE pgflow.runs.run_id = fail_task.run_id - FOR UPDATE -), -step_lock AS ( - SELECT * FROM pgflow.step_states - WHERE pgflow.step_states.run_id = fail_task.run_id - AND pgflow.step_states.step_slug = fail_task.step_slug - FOR UPDATE -), -flow_info AS ( +WITH flow_info AS ( SELECT r.flow_slug FROM pgflow.runs r WHERE r.run_id = fail_task.run_id diff --git a/pkgs/core/supabase/migrations/atlas.sum b/pkgs/core/supabase/migrations/atlas.sum index e9707016b..60c45edf9 100644 --- a/pkgs/core/supabase/migrations/atlas.sum +++ b/pkgs/core/supabase/migrations/atlas.sum @@ -1,4 +1,4 @@ -h1:ZZJEI67KUViUzd0rVHGMZPpbUXU2MFSXdTIe/yyJqyE= +h1:jDc+2bvTL4ZYqATAMfBXbTYtMlx8RPvDUvRJjrP537w= 20250429164909_pgflow_initial.sql h1:I3n/tQIg5Q5nLg7RDoU3BzqHvFVjmumQxVNbXTPG15s= 20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:wTuXuwMxVniCr3ONCpodpVWJcHktoQZIbqMZ3sUHKMY= 20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:ggGanW4Wyt8Kv6TWjnZ00/qVb3sm+/eFVDjGfT8qyPg= @@ -18,4 +18,4 @@ h1:ZZJEI67KUViUzd0rVHGMZPpbUXU2MFSXdTIe/yyJqyE= 20260103145141_pgflow_step_output_storage.sql h1:mgVHSFDLdtYy//SZ6C03j9Str1iS9xCM8Rz/wyFwn3o= 20260120205547_pgflow_requeue_stalled_tasks.sql h1:4wCBBvjtETCgJf1eXmlH5wCTKDUhiLi0uzsFG1V528E= 20260124113408_pgflow_auth_secret_support.sql h1:i/s1JkBqRElN6FOYFQviJt685W08SuSo30aP25lNlLc= -20260214181656_pgflow_step_conditions.sql h1:nG52qhydTJMeLTd4AoI4buATJNHdEN2C1ZJdKp+i7wE= +20260214181656_pgflow_step_conditions.sql h1:rHQnXCeZ/QGxPlChdTMxumtsTtYHr1ej183Dd+auw34= diff --git a/pkgs/core/supabase/tests/_cascade_force_skip_steps/does_not_archive_preexisting_skipped_step_messages.test.sql b/pkgs/core/supabase/tests/_cascade_force_skip_steps/does_not_archive_preexisting_skipped_step_messages.test.sql new file mode 100644 index 000000000..cc69acb50 --- /dev/null +++ b/pkgs/core/supabase/tests/_cascade_force_skip_steps/does_not_archive_preexisting_skipped_step_messages.test.sql @@ -0,0 +1,69 @@ +\set ON_ERROR_STOP on +\set QUIET on + +begin; +select plan(4); + +select pgflow_tests.reset_db(); + +select pgflow.create_flow('cascade_skip_preexisting'); +select pgflow.add_step('cascade_skip_preexisting', 'target', '{}', step_type => 'map'); +select pgflow.add_step('cascade_skip_preexisting', 'already_skipped', '{}', step_type => 'map'); + +select pgflow.start_flow('cascade_skip_preexisting', '[1, 2]'::jsonb); + +select ok( + ( + select count(*) > 0 + from pgmq.q_cascade_skip_preexisting q + join pgflow.step_tasks st on st.message_id = q.msg_id + where st.flow_slug = 'cascade_skip_preexisting' + and st.step_slug = 'already_skipped' + ), + 'Setup: already_skipped has queued messages before cascade call' +); + +update pgflow.step_states +set status = 'skipped', + skip_reason = 'preexisting_skip', + skipped_at = now(), + remaining_tasks = null +where flow_slug = 'cascade_skip_preexisting' + and step_slug = 'already_skipped'; + +select pgflow._cascade_force_skip_steps( + (select run_id from pgflow.runs where flow_slug = 'cascade_skip_preexisting'), + 'target', + 'condition_unmet' +); + +select is_empty( + $$ + select 1 + from pgmq.q_cascade_skip_preexisting q + join pgflow.step_tasks st on st.message_id = q.msg_id + where st.flow_slug = 'cascade_skip_preexisting' + and st.step_slug = 'target' + $$, + 'Target step messages should be archived' +); + +select isnt_empty( + $$ + select 1 + from pgmq.q_cascade_skip_preexisting q + join pgflow.step_tasks st on st.message_id = q.msg_id + where st.flow_slug = 'cascade_skip_preexisting' + and st.step_slug = 'already_skipped' + $$, + 'Preexisting skipped step messages should remain queued' +); + +select is( + (select status from pgflow.step_states where flow_slug = 'cascade_skip_preexisting' and step_slug = 'target'), + 'skipped'::text, + 'Target step should be marked skipped' +); + +select * from finish(); +rollback; diff --git a/pkgs/core/supabase/tests/condition_evaluation/dependent_unmet_fail_archives_active_messages.test.sql b/pkgs/core/supabase/tests/condition_evaluation/dependent_unmet_fail_archives_active_messages.test.sql index afeed5c52..034672cd1 100644 --- a/pkgs/core/supabase/tests/condition_evaluation/dependent_unmet_fail_archives_active_messages.test.sql +++ b/pkgs/core/supabase/tests/condition_evaluation/dependent_unmet_fail_archives_active_messages.test.sql @@ -1,5 +1,5 @@ begin; -select plan(5); +select plan(6); select pgflow_tests.reset_db(); @@ -92,6 +92,17 @@ select is( 'previously active messages should be in archive' ); +select is( + ( + select error_message + from pgflow.step_states + where run_id = (select run_id from run_ids) + and step_slug = 'checker' + ), + 'Condition not met', + 'checker failure should use stable condition error message' +); + drop table if exists run_ids; drop table if exists pre_failure_msgs; diff --git a/pkgs/core/supabase/tests/condition_evaluation/ifnot_root_step_pattern_matches_fail.test.sql b/pkgs/core/supabase/tests/condition_evaluation/ifnot_root_step_pattern_matches_fail.test.sql index fe7a7111b..41ef7c3e7 100644 --- a/pkgs/core/supabase/tests/condition_evaluation/ifnot_root_step_pattern_matches_fail.test.sql +++ b/pkgs/core/supabase/tests/condition_evaluation/ifnot_root_step_pattern_matches_fail.test.sql @@ -30,12 +30,12 @@ select is( 'Step with matched ifNot pattern and whenUnmet=fail should be failed' ); --- Test 2: Error message should indicate condition not met +-- Test 2: Error message should remain stable and minimal select is( (select error_message from pgflow.step_states where run_id = (select run_id from run_ids) and step_slug = 'no_admin_step'), 'Condition not met', - 'Error message should indicate condition not met' + 'Error message should use stable condition error message' ); -- Test 3: No task should be created for failed step diff --git a/pkgs/core/supabase/tests/condition_evaluation/root_step_condition_unmet_fail.test.sql b/pkgs/core/supabase/tests/condition_evaluation/root_step_condition_unmet_fail.test.sql index 97ae0039a..5ace52e36 100644 --- a/pkgs/core/supabase/tests/condition_evaluation/root_step_condition_unmet_fail.test.sql +++ b/pkgs/core/supabase/tests/condition_evaluation/root_step_condition_unmet_fail.test.sql @@ -2,7 +2,7 @@ -- Verifies that a root step with unmet condition and whenUnmet='fail' -- causes the run to fail immediately begin; -select plan(4); +select plan(5); -- Reset database select pgflow_tests.reset_db(); @@ -30,11 +30,12 @@ select is( 'Step with unmet condition and whenUnmet=fail should be failed' ); --- Test 2: error_message should indicate condition unmet -select ok( +-- Test 2: error_message should remain stable and minimal +select is( (select error_message from pgflow.step_states - where run_id = (select run_id from run_ids) and step_slug = 'checked_step') ILIKE '%condition%', - 'Failed step should have error message about condition' + where run_id = (select run_id from run_ids) and step_slug = 'checked_step'), + 'Condition not met', + 'Failed step should use stable condition error message' ); -- Test 3: No task should be created @@ -52,6 +53,19 @@ select is( 'Run should fail when step condition fails with fail mode' ); +-- Test 5: Run-level error event should use same stable message +select is( + ( + select payload->>'error_message' + from pgflow_tests.get_realtime_message( + event_type => 'run:failed', + run_id => (select run_id from run_ids) + ) + ), + 'Condition not met', + 'Run failed event should use stable condition error message' +); + -- Clean up drop table if exists run_ids; diff --git a/pkgs/core/supabase/tests/condition_evaluation/root_unmet_fail_emits_failure_events.test.sql b/pkgs/core/supabase/tests/condition_evaluation/root_unmet_fail_emits_failure_events.test.sql index cde264bbf..741cd4edd 100644 --- a/pkgs/core/supabase/tests/condition_evaluation/root_unmet_fail_emits_failure_events.test.sql +++ b/pkgs/core/supabase/tests/condition_evaluation/root_unmet_fail_emits_failure_events.test.sql @@ -1,5 +1,5 @@ begin; -select plan(6); +select plan(7); select pgflow_tests.reset_db(); @@ -82,6 +82,19 @@ select is( 'run:failed payload should include failed status' ); +select is( + ( + select payload->>'error_message' + from pgflow_tests.get_realtime_message( + event_type => 'step:failed', + run_id => (select run_id from run_ids), + step_slug => 'guarded' + ) + ), + 'Condition not met', + 'step:failed payload should use stable condition error message' +); + drop table if exists run_ids; select finish(); diff --git a/pkgs/dsl/__tests__/runtime/condition-options.test.ts b/pkgs/dsl/__tests__/runtime/condition-options.test.ts index 1dcd89ea7..59e79e1bb 100644 --- a/pkgs/dsl/__tests__/runtime/condition-options.test.ts +++ b/pkgs/dsl/__tests__/runtime/condition-options.test.ts @@ -149,6 +149,24 @@ describe('Condition Options', () => { ); expect(statements[2]).toContain("when_unmet => 'skip'"); }); + + it('should escape apostrophes in required_input_pattern', () => { + const flow = new Flow({ slug: 'test_flow' }).step( + { + slug: 'step1', + if: { note: "O'Reilly" }, + whenUnmet: 'skip', + }, + () => 'result' + ); + + const statements = compileFlow(flow); + + expect(statements).toHaveLength(2); + expect(statements[1]).toContain( + 'required_input_pattern => \'{"note":"O\'\'Reilly"}\'' + ); + }); }); describe('whenUnmet validation', () => { @@ -284,5 +302,23 @@ describe('Condition Options', () => { ); expect(statements[2]).toContain("when_unmet => 'skip'"); }); + + it('should escape apostrophes in forbidden_input_pattern', () => { + const flow = new Flow({ slug: 'test_flow' }).step( + { + slug: 'step1', + ifNot: { note: "Don't run" }, + whenUnmet: 'skip', + }, + () => 'result' + ); + + const statements = compileFlow(flow); + + expect(statements).toHaveLength(2); + expect(statements[1]).toContain( + 'forbidden_input_pattern => \'{"note":"Don\'\'t run"}\'' + ); + }); }); }); diff --git a/pkgs/dsl/__tests__/runtime/utils.test.ts b/pkgs/dsl/__tests__/runtime/utils.test.ts index e0d6d0bff..9bcd9aa03 100644 --- a/pkgs/dsl/__tests__/runtime/utils.test.ts +++ b/pkgs/dsl/__tests__/runtime/utils.test.ts @@ -6,6 +6,7 @@ describe('validateSlug', () => { expect(() => validateSlug('valid_slug')).not.toThrowError(); expect(() => validateSlug('valid_slug_123')).not.toThrowError(); expect(() => validateSlug('validSlug123')).not.toThrowError(); + expect(() => validateSlug('_valid_slug')).not.toThrowError(); }); it('rejects slugs that start with numbers', () => { @@ -14,33 +15,34 @@ describe('validateSlug', () => { ); }); - it('rejects slugs that start with underscores', () => { - expect(() => validateSlug('_invalid')).toThrowError( - `Slug '_invalid' cannot start with an underscore` - ); + it('rejects empty slugs', () => { + expect(() => validateSlug('')).toThrowError(`Slug cannot be empty`); }); - it('rejects slugs containing spaces', () => { - expect(() => validateSlug('invalid slug')).toThrowError( - `Slug 'invalid slug' cannot contain spaces` + it('rejects reserved words', () => { + expect(() => validateSlug('run')).toThrowError( + `Slug 'run' is reserved and cannot be used` ); }); - it('rejects slugs containing special characters', () => { + it('rejects slugs containing invalid characters', () => { + expect(() => validateSlug('invalid slug')).toThrowError( + `Slug 'invalid slug' can only contain letters, numbers, and underscores` + ); expect(() => validateSlug('invalid/slug')).toThrowError( - `Slug 'invalid/slug' cannot contain special characters like /, :, ?, #, -` + `Slug 'invalid/slug' can only contain letters, numbers, and underscores` ); expect(() => validateSlug('invalid:slug')).toThrowError( - `Slug 'invalid:slug' cannot contain special characters like /, :, ?, #, -` + `Slug 'invalid:slug' can only contain letters, numbers, and underscores` ); expect(() => validateSlug('invalid?slug')).toThrowError( - `Slug 'invalid?slug' cannot contain special characters like /, :, ?, #, -` + `Slug 'invalid?slug' can only contain letters, numbers, and underscores` ); expect(() => validateSlug('invalid#slug')).toThrowError( - `Slug 'invalid#slug' cannot contain special characters like /, :, ?, #, -` + `Slug 'invalid#slug' can only contain letters, numbers, and underscores` ); expect(() => validateSlug('invalid-slug')).toThrowError( - `Slug 'invalid-slug' cannot contain special characters like /, :, ?, #, -` + `Slug 'invalid-slug' can only contain letters, numbers, and underscores` ); }); diff --git a/pkgs/dsl/__tests__/types/condition-pattern.test-d.ts b/pkgs/dsl/__tests__/types/condition-pattern.test-d.ts index a19f0bfea..7db3d244d 100644 --- a/pkgs/dsl/__tests__/types/condition-pattern.test-d.ts +++ b/pkgs/dsl/__tests__/types/condition-pattern.test-d.ts @@ -152,8 +152,8 @@ describe('if option typing in step methods', () => { it('should reject invalid keys in if', () => { type FlowInput = { userId: string; role: string }; - // @ts-expect-error - 'invalidKey' does not exist on FlowInput new Flow({ slug: 'test_flow' }).step( + // @ts-expect-error - 'invalidKey' does not exist on FlowInput { slug: 'check', if: { invalidKey: 'value' } }, // eslint-disable-next-line @typescript-eslint/no-explicit-any (input: any) => input.userId @@ -163,8 +163,8 @@ describe('if option typing in step methods', () => { it('should reject wrong value types in if', () => { type FlowInput = { userId: string; role: string }; - // @ts-expect-error - role should be string, not number new Flow({ slug: 'test_flow' }).step( + // @ts-expect-error - role should be string, not number { slug: 'check', if: { role: 123 } }, // eslint-disable-next-line @typescript-eslint/no-explicit-any (input: any) => input.userId @@ -337,8 +337,8 @@ describe('ifNot option typing in step methods', () => { it('should reject invalid keys in ifNot', () => { type FlowInput = { userId: string; role: string }; - // @ts-expect-error - 'invalidKey' does not exist on FlowInput new Flow({ slug: 'test_flow' }).step( + // @ts-expect-error - 'invalidKey' does not exist on FlowInput { slug: 'check', ifNot: { invalidKey: 'value' } }, // eslint-disable-next-line @typescript-eslint/no-explicit-any (input: any) => input.userId @@ -348,8 +348,8 @@ describe('ifNot option typing in step methods', () => { it('should reject wrong value types in ifNot', () => { type FlowInput = { userId: string; role: string }; - // @ts-expect-error - role should be string, not number new Flow({ slug: 'test_flow' }).step( + // @ts-expect-error - role should be string, not number { slug: 'check', ifNot: { role: 123 } }, // eslint-disable-next-line @typescript-eslint/no-explicit-any (input: any) => input.userId @@ -520,6 +520,17 @@ describe('whenUnmet requires if or ifNot', () => { expectTypeOf(flow).toBeObject(); }); + + it('should infer omitted whenUnmet as skip when condition is present', () => { + const flow = new Flow<{ active: boolean }>({ slug: 'test_flow' }) + .step({ slug: 'conditioned', if: { active: true } }, () => 'ok') + .step({ slug: 'consumer', dependsOn: ['conditioned'] }, (deps) => { + expectTypeOf(deps).toEqualTypeOf<{ conditioned?: string }>(); + return deps.conditioned ?? 'fallback'; + }); + + expectTypeOf(flow).toBeObject(); + }); }); describe('array method', () => { @@ -563,8 +574,8 @@ describe('whenUnmet requires if or ifNot', () => { () => ({ done: true }) ); - // @ts-expect-error - whenUnmet requires if or ifNot flow.step( + // @ts-expect-error - whenUnmet requires if or ifNot { slug: 'second', dependsOn: ['first'], whenUnmet: 'skip' }, // Handler typed as any to suppress cascading error from failed overload (deps: any) => deps.first.done diff --git a/pkgs/dsl/__tests__/types/getStepDefinition.test-d.ts b/pkgs/dsl/__tests__/types/getStepDefinition.test-d.ts index 33e186ef3..375e8d451 100644 --- a/pkgs/dsl/__tests__/types/getStepDefinition.test-d.ts +++ b/pkgs/dsl/__tests__/types/getStepDefinition.test-d.ts @@ -63,3 +63,63 @@ it('should correctly type step handlers when using getStepDefinition', () => { expect(root_b.dependencies).toEqual([]); expect(merge.dependencies).toEqual(['root_a', 'root_b']); }); + +it('should make only skip dependencies optional in getStepDefinition handler input', () => { + const flow = new Flow<{ flag: boolean }>({ slug: 'dep_typing' }) + .step( + { slug: 'skip_dep', if: { flag: true }, whenUnmet: 'skip' }, + () => 'skip' + ) + .step( + { slug: 'cascade_dep', if: { flag: true }, whenUnmet: 'skip-cascade' }, + () => 'cascade' + ) + .step({ slug: 'required_dep' }, () => 42) + .step( + { slug: 'uses_skip', dependsOn: ['skip_dep', 'required_dep'] }, + (deps) => { + expectTypeOf(deps).toEqualTypeOf<{ + skip_dep?: string; + required_dep: number; + }>(); + return deps.required_dep; + } + ) + .step( + { slug: 'uses_cascade', dependsOn: ['cascade_dep', 'required_dep'] }, + (deps) => { + expectTypeOf(deps).toEqualTypeOf<{ + cascade_dep: string; + required_dep: number; + }>(); + return deps.required_dep; + } + ); + + const usesSkip = flow.getStepDefinition('uses_skip'); + type UsesSkipInput = Parameters[0]; + expectTypeOf().toEqualTypeOf<{ + skip_dep?: string; + required_dep: number; + }>(); + + const usesCascade = flow.getStepDefinition('uses_cascade'); + type UsesCascadeInput = Parameters[0]; + expectTypeOf().toEqualTypeOf<{ + cascade_dep: string; + required_dep: number; + }>(); +}); + +it('should treat omitted whenUnmet with conditions as skip in getStepDefinition types', () => { + const flow = new Flow<{ active: boolean }>({ slug: 'default_when_unmet' }) + .step({ slug: 'conditioned', if: { active: true } }, () => 'ok') + .step({ slug: 'consumer', dependsOn: ['conditioned'] }, (deps) => { + expectTypeOf(deps).toEqualTypeOf<{ conditioned?: string }>(); + return deps.conditioned ?? 'fallback'; + }); + + const consumer = flow.getStepDefinition('consumer'); + type ConsumerInput = Parameters[0]; + expectTypeOf().toEqualTypeOf<{ conditioned?: string }>(); +}); diff --git a/pkgs/dsl/src/compile-flow.ts b/pkgs/dsl/src/compile-flow.ts index f3fdc976f..f108a3c44 100644 --- a/pkgs/dsl/src/compile-flow.ts +++ b/pkgs/dsl/src/compile-flow.ts @@ -9,10 +9,13 @@ import { AnyFlow, RuntimeOptions, StepRuntimeOptions } from './dsl.js'; */ export function compileFlow(flow: AnyFlow): string[] { const statements: string[] = []; + const escapedFlowSlug = escapeSqlLiteral(flow.slug); // Create the flow const flowOptions = formatRuntimeOptions(flow.options); - statements.push(`SELECT pgflow.create_flow('${flow.slug}'${flowOptions});`); + statements.push( + `SELECT pgflow.create_flow('${escapedFlowSlug}'${flowOptions});` + ); // Add steps in the order they were defined for (const stepSlug of flow.stepOrder) { @@ -22,7 +25,9 @@ export function compileFlow(flow: AnyFlow): string[] { // Format dependencies array if it exists let depsClause = ''; if (step.dependencies.length > 0) { - const depsArray = step.dependencies.map((dep) => `'${dep}'`).join(', '); + const depsArray = step.dependencies + .map((dep) => `'${escapeSqlLiteral(dep)}'`) + .join(', '); depsClause = `, ARRAY[${depsArray}]`; } @@ -33,7 +38,9 @@ export function compileFlow(flow: AnyFlow): string[] { } statements.push( - `SELECT pgflow.add_step('${flow.slug}', '${step.slug}'${depsClause}${stepOptions}${stepTypeClause});` + `SELECT pgflow.add_step('${escapedFlowSlug}', '${escapeSqlLiteral( + step.slug + )}'${depsClause}${stepOptions}${stepTypeClause});` ); } @@ -65,24 +72,28 @@ function formatRuntimeOptions( } if ('if' in options && options.if !== undefined) { - // Serialize JSON pattern and escape for SQL const jsonStr = JSON.stringify(options.if); - parts.push(`required_input_pattern => '${jsonStr}'`); + parts.push(`required_input_pattern => '${escapeSqlLiteral(jsonStr)}'`); } if ('ifNot' in options && options.ifNot !== undefined) { - // Serialize JSON pattern and escape for SQL const jsonStr = JSON.stringify(options.ifNot); - parts.push(`forbidden_input_pattern => '${jsonStr}'`); + parts.push(`forbidden_input_pattern => '${escapeSqlLiteral(jsonStr)}'`); } if ('whenUnmet' in options && options.whenUnmet !== undefined) { - parts.push(`when_unmet => '${options.whenUnmet}'`); + parts.push(`when_unmet => '${escapeSqlLiteral(options.whenUnmet)}'`); } if ('whenExhausted' in options && options.whenExhausted !== undefined) { - parts.push(`when_exhausted => '${options.whenExhausted}'`); + parts.push( + `when_exhausted => '${escapeSqlLiteral(options.whenExhausted)}'` + ); } return parts.length > 0 ? `, ${parts.join(', ')}` : ''; } + +function escapeSqlLiteral(value: string): string { + return value.replace(/'/g, "''"); +} diff --git a/pkgs/dsl/src/dsl.ts b/pkgs/dsl/src/dsl.ts index a34e3c4de..8344bc9bf 100644 --- a/pkgs/dsl/src/dsl.ts +++ b/pkgs/dsl/src/dsl.ts @@ -688,13 +688,13 @@ export class Flow< : Simplify< { [K in StepDependencies[SlugType][number] as K extends keyof Steps - ? Steps[K]['skippable'] extends true + ? Steps[K]['skippable'] extends 'skip' ? never : K : never]: K extends keyof Steps ? Steps[K]['output'] : never; } & { [K in StepDependencies[SlugType][number] as K extends keyof Steps - ? Steps[K]['skippable'] extends true + ? Steps[K]['skippable'] extends 'skip' ? K : never : never]?: K extends keyof Steps ? Steps[K]['output'] : never; @@ -713,13 +713,41 @@ export class Flow< return this.stepDefinitions[slug as string]; } - // Overload 1: Root step (no dependsOn) - receives flowInput directly - // if is typed as ContainmentPattern - // whenUnmet is only allowed when if or ifNot is provided (enforced by ConditionOpts union) + // Overload 1: Root step without conditions + step< + Slug extends string, + TOutput, + TRetries extends WhenExhaustedMode | undefined = undefined + >( + opts: Simplify< + { + slug: Slug extends keyof Steps ? never : Slug; + dependsOn?: never; + whenExhausted?: TRetries; + } & WithoutCondition & + Omit + >, + handler: ( + flowInput: TFlowInput, + context: FlowContext & TContext + ) => TOutput | Promise + ): Flow< + TFlowInput, + TContext, + Steps & { + [K in Slug]: StepMeta< + Awaited, + TRetries extends 'skip' | 'skip-cascade' ? TRetries : false + >; + }, + StepDependencies & { [K in Slug]: [] }, + TEnv + >; + + // Overload 2: Root step with condition and omitted whenUnmet defaults to 'skip' step< Slug extends string, TOutput, - TWhenUnmet extends WhenUnmetMode | undefined = undefined, TRetries extends WhenExhaustedMode | undefined = undefined >( opts: Simplify< @@ -728,9 +756,40 @@ export class Flow< dependsOn?: never; whenExhausted?: TRetries; } & ( - | (WithIfCondition & { whenUnmet?: TWhenUnmet }) - | (WithIfNotCondition & { whenUnmet?: TWhenUnmet }) - | WithoutCondition + | (WithIfCondition & { whenUnmet?: undefined }) + | (WithIfNotCondition & { whenUnmet?: undefined }) + ) & + Omit + >, + handler: ( + flowInput: TFlowInput, + context: FlowContext & TContext + ) => TOutput | Promise + ): Flow< + TFlowInput, + TContext, + Steps & { + [K in Slug]: StepMeta, 'skip'>; + }, + StepDependencies & { [K in Slug]: [] }, + TEnv + >; + + // Overload 3: Root step with explicit whenUnmet + step< + Slug extends string, + TOutput, + TWhenUnmet extends WhenUnmetMode, + TRetries extends WhenExhaustedMode | undefined = undefined + >( + opts: Simplify< + { + slug: Slug extends keyof Steps ? never : Slug; + dependsOn?: never; + whenExhausted?: TRetries; + } & ( + | (WithIfCondition & { whenUnmet: TWhenUnmet }) + | (WithIfNotCondition & { whenUnmet: TWhenUnmet }) ) & Omit >, @@ -755,16 +814,43 @@ export class Flow< TEnv >; - // Overload 2: Dependent step (with dependsOn) - receives deps, flowInput via context - // if is typed as ContainmentPattern - // Note: [Deps, ...Deps[]] requires at least one dependency - empty arrays are rejected at compile time - // Handler receives deps with correct optionality based on upstream steps' skippability - // whenUnmet is only allowed when if or ifNot is provided (enforced by ConditionOpts union) + // Overload 4: Dependent step without conditions + step< + Slug extends string, + Deps extends Extract, + TOutput, + TRetries extends WhenExhaustedMode | undefined = undefined + >( + opts: Simplify< + { + slug: Slug extends keyof Steps ? never : Slug; + dependsOn: [Deps, ...Deps[]]; + whenExhausted?: TRetries; + } & WithoutCondition & + Omit + >, + handler: ( + deps: Simplify>, + context: FlowContext & TContext + ) => TOutput | Promise + ): Flow< + TFlowInput, + TContext, + Steps & { + [K in Slug]: StepMeta< + Awaited, + TRetries extends 'skip' | 'skip-cascade' ? TRetries : false + >; + }, + StepDependencies & { [K in Slug]: Deps[] }, + TEnv + >; + + // Overload 5: Dependent step with condition and omitted whenUnmet defaults to 'skip' step< Slug extends string, Deps extends Extract, TOutput, - TWhenUnmet extends WhenUnmetMode | undefined = undefined, TRetries extends WhenExhaustedMode | undefined = undefined >( opts: Simplify< @@ -774,12 +860,50 @@ export class Flow< whenExhausted?: TRetries; } & ( | (WithIfCondition>> & { - whenUnmet?: TWhenUnmet; + whenUnmet?: undefined; }) | (WithIfNotCondition< Simplify> - > & { whenUnmet?: TWhenUnmet }) - | WithoutCondition + > & { + whenUnmet?: undefined; + }) + ) & + Omit + >, + handler: ( + deps: Simplify>, + context: FlowContext & TContext + ) => TOutput | Promise + ): Flow< + TFlowInput, + TContext, + Steps & { + [K in Slug]: StepMeta, 'skip'>; + }, + StepDependencies & { [K in Slug]: Deps[] }, + TEnv + >; + + // Overload 6: Dependent step with explicit whenUnmet + step< + Slug extends string, + Deps extends Extract, + TOutput, + TWhenUnmet extends WhenUnmetMode, + TRetries extends WhenExhaustedMode | undefined = undefined + >( + opts: Simplify< + { + slug: Slug extends keyof Steps ? never : Slug; + dependsOn: [Deps, ...Deps[]]; + whenExhausted?: TRetries; + } & ( + | (WithIfCondition>> & { + whenUnmet: TWhenUnmet; + }) + | (WithIfNotCondition< + Simplify> + > & { whenUnmet: TWhenUnmet }) ) & Omit >, diff --git a/pkgs/dsl/src/utils.ts b/pkgs/dsl/src/utils.ts index 14156dfb4..70844f533 100644 --- a/pkgs/dsl/src/utils.ts +++ b/pkgs/dsl/src/utils.ts @@ -1,34 +1,34 @@ /** * Validates a slug string according to the following rules: * - Cannot start with a number - * - Cannot start with an underscore - * - Cannot contain spaces - * - Cannot contain special characters like /, :, ?, # + * - Cannot be empty + * - Cannot use reserved words + * - Must contain only letters, numbers, and underscores * - Cannot be longer than 128 characters * * @param slug The slug string to validate * @throws Error if the slug is invalid */ export function validateSlug(slug: string): void { - if (slug.length > 128) { - throw new Error(`Slug '${slug}' cannot be longer than 128 characters`); + if (slug.length === 0) { + throw new Error('Slug cannot be empty'); } - if (/^\d/.test(slug)) { - throw new Error(`Slug '${slug}' cannot start with a number`); + if (slug.length > 128) { + throw new Error(`Slug '${slug}' cannot be longer than 128 characters`); } - if (/^_/.test(slug)) { - throw new Error(`Slug '${slug}' cannot start with an underscore`); + if (slug === 'run') { + throw new Error(`Slug 'run' is reserved and cannot be used`); } - if (/\s/.test(slug)) { - throw new Error(`Slug '${slug}' cannot contain spaces`); + if (/^\d/.test(slug)) { + throw new Error(`Slug '${slug}' cannot start with a number`); } - if (/[/:#\-?]/.test(slug)) { + if (!/^[a-zA-Z_][a-zA-Z0-9_]*$/.test(slug)) { throw new Error( - `Slug '${slug}' cannot contain special characters like /, :, ?, #, -` + `Slug '${slug}' can only contain letters, numbers, and underscores` ); } } @@ -53,7 +53,12 @@ export interface ValidateRuntimeOptionsOpts { * @throws Error if any runtime option is invalid */ export function validateRuntimeOptions( - options: { maxAttempts?: number; baseDelay?: number; timeout?: number; startDelay?: number }, + options: { + maxAttempts?: number; + baseDelay?: number; + timeout?: number; + startDelay?: number; + }, opts: ValidateRuntimeOptionsOpts = { optional: false } ): void { const { maxAttempts, baseDelay, timeout, startDelay } = options;