Skip to content

Comments

feat: snowflake export worker [CM-975]#3844

Open
mbani01 wants to merge 37 commits intomainfrom
feat/snowflake_export_worker
Open

feat: snowflake export worker [CM-975]#3844
mbani01 wants to merge 37 commits intomainfrom
feat/snowflake_export_worker

Conversation

@mbani01
Copy link
Contributor

@mbani01 mbani01 commented Feb 16, 2026

This pull request introduces a new "snowflake connectors" service for exporting data to Snowflake and managing export jobs. It adds the necessary database schema, service configuration, Docker setup, and initial implementation for export activities and scheduling. The main themes are the creation of the Snowflake export infrastructure, integration with the job scheduler, and updates to the monorepo's dependency management.

Snowflake Export Infrastructure

  • Added a new database table integration."snowflakeExportJobs" to track Snowflake export jobs, including fields for platform, S3 path, timestamps, and error handling, along with relevant indexes.
  • Added a unique constraint on the s3_path column to prevent duplicate export entries.
  • Modified the export jobs table to add an exportStartedAt timestamp and a metrics JSONB column, while removing the totalRows and totalBytes columns to support more flexible export metrics. [1] [2]
  • Implemented the core export activity logic in exportActivity.ts, which performs the Snowflake export and writes job metadata. [1] [2]

Service and Job Scheduling

  • Added a new service app @crowd/snowflake-connectors with its own dependencies, scripts, and Docker configuration. [1] [2] [3] [4]
  • Integrated a scheduled cron job in the cron service to trigger the Snowflake export workflow daily, using Temporal for orchestration.
  • Centralized Temporal configuration export in the new service.

Monorepo Dependency and Lockfile Updates

  • Updated pnpm-lock.yaml to include the new snowflake_connectors app and its dependencies, and adjusted AWS SDK dependency resolutions for compatibility. [1] [2] [3] [4] [5] [6] [7] [8] [9]

Other Platform/Activity Updates

  • Added new Cvent-related activity types to the activityTypes table for event registration and attendance.

@joanagmaia you can help reviewing the export SQL and transformation


Note

Medium Risk
Introduces a new Temporal worker + polling consumer that creates integrations and processes exported data into activities, plus a new DB table and scheduled workflow; failures or misconfiguration could lead to missed/duplicated processing and new writes to core tables.

Overview
Adds a new snowflake_connectors service that schedules a daily Temporal workflow to export Snowflake data to S3 in Parquet batches, recording each batch in a new integration."snowflakeExportJobs" table (with locking/indices, metrics, and error tracking) for downstream processing.

The service also introduces a long-running consumer that claims pending export jobs, downloads/reads Parquet from S3, transforms rows into activities (initially for Cvent), resolves/creates matching integrations for segments (with Redis caching + distributed lock), and emits activity results for ingestion.

Separately, adds Cvent activity type definitions and scoring, introduces OrganizationSource/OrganizationAttributeSource = cvent and raises its attribute source priority, and adds Docker/compose + lockfile entries to build/run the new worker.

Written by Cursor Bugbot for commit 94adbd1. This will update automatically on new commits. Configure here.

@mbani01 mbani01 self-assigned this Feb 18, 2026
@mbani01 mbani01 marked this pull request as ready for review February 20, 2026 17:08
@github-actions
Copy link
Contributor

⚠️ Jira Issue Key Missing

Your PR title doesn't contain a Jira issue key. Consider adding it for better traceability.

Example:

  • feat: add user authentication (CM-123)
  • feat: add user authentication (IN-123)

Projects:

  • CM: Community Data Platform
  • IN: Insights

Please add a Jira issue key to your PR title.

@mbani01 mbani01 changed the title feat: snowflake export worker feat: snowflake export worker [CM-975] Feb 20, 2026
Comment on lines 2 to 3
('registered-event', 'cvent', false, false, 'Registered for a Cvent event', 'Registered for an event'),
('attended-event', 'cvent', false, false, 'Attended a Cvent event', 'Attended an event');
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make these descriptions more aligned with Nirav's triggered description:

  • registered-event: User registers to an event.
  • attended-event: User attends an event.

Comment on lines +65 to +74
const orgAccounts = `
org_accounts AS (
SELECT account_id, website, domain_aliases
FROM analytics.bronze_fivetran_salesforce.accounts
WHERE website IS NOT NULL
UNION ALL
SELECT account_id, website, domain_aliases
FROM analytics.bronze_fivetran_salesforce_b2b.accounts
WHERE website IS NOT NULL
)`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed that there is a domain column in the event_registration source. Can we check if the domain we get there is the same as we are getting from the slakesforce tables? If so, we could simply use the domain, and avoid this additional join

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The domain is extracted from the email, and only accurate if a work email is used, but it's not the case for most records I checked

readonly platform = PlatformType.CVENT

transformRow(row: Record<string, unknown>): TransformedActivity | null {
const userName = (row.USERNAME as string | null)?.trim() || null
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe it should be USER_NAME instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch!

Comment on lines 85 to 89
member: {
displayName,
identities,
organizations: this.buildOrganizations(row),
},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add missing attributes to both member and organizations -> In the document

Comment on lines 128 to 133
identities.push({
platform: PlatformType.CVENT,
value: accountName,
type: OrganizationIdentityType.USERNAME,
verified: false,
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

atm we only support a fixed set of platforms for organization "usernames". It's linkedin, crunchbase, facebook and twitter.
Let's not add this identity from cvent

@themarolt
Copy link
Contributor

@mbani01 please check and resolve also cursor bot comments.

Copy link
Contributor

@joanagmaia joanagmaia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@mbani01
Copy link
Contributor Author

mbani01 commented Feb 24, 2026

@mbani01 please check and resolve also cursor bot comments.

@themarolt I’ve addressed the relevant ones

const displayName =
fullName ||
(firstName && lastName ? `${firstName} ${lastName}` : firstName || lastName) ||
userName
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Member displayName can be null despite available LFID

Medium Severity

The displayName fallback chain uses fullName, firstName/lastName, and userName, but never lfUsername. The SQL query guarantees LFID IS NOT NULL (via the WHERE clause on the COALESCE), and lfUsername is extracted from the row, yet it's omitted from the displayName computation. When LFID originates from the joined mu.user_name or u.lf_username tables (rather than er.user_name), all event-registration name fields can be null, resulting in a null displayName for the member despite a valid username being available.

Additional Locations (1)

Fix in Cursor Fix in Web

await exporter
.destroy()
.catch((err) => log.warn({ err }, 'Failed to close Snowflake connection'))
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Export activity leaks Postgres connections

Medium Severity

executeExport creates a DB connection via getDbConnection(WRITE_DB_CONFIG()) but never closes it. Repeated exports can accumulate open connections/pools in the worker process and eventually exhaust Postgres connections.

Fix in Cursor Fix in Web

Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants