From 9cefb7b8b902afb1447957c71df0494dfff6e466 Mon Sep 17 00:00:00 2001 From: Yeganathan S <63534555+skwowet@users.noreply.github.com> Date: Wed, 18 Mar 2026 00:40:58 +0530 Subject: [PATCH 1/5] fix: optimize OpenSearch query for merge suggestions Signed-off-by: Yeganathan S <63534555+skwowet@users.noreply.github.com> --- .../src/activities/memberMergeSuggestions.ts | 1 + .../src/activities/organizationMergeSuggestions.ts | 8 +++++--- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/services/apps/merge_suggestions_worker/src/activities/memberMergeSuggestions.ts b/services/apps/merge_suggestions_worker/src/activities/memberMergeSuggestions.ts index 4ddbdf3572..474ceb7c11 100644 --- a/services/apps/merge_suggestions_worker/src/activities/memberMergeSuggestions.ts +++ b/services/apps/merge_suggestions_worker/src/activities/memberMergeSuggestions.ts @@ -207,6 +207,7 @@ export async function getMemberMergeSuggestions( query: value, prefix_length: 1, fuzziness: 'auto', + max_expansions: 3, }, }, }), diff --git a/services/apps/merge_suggestions_worker/src/activities/organizationMergeSuggestions.ts b/services/apps/merge_suggestions_worker/src/activities/organizationMergeSuggestions.ts index b054f752dd..e27190e310 100644 --- a/services/apps/merge_suggestions_worker/src/activities/organizationMergeSuggestions.ts +++ b/services/apps/merge_suggestions_worker/src/activities/organizationMergeSuggestions.ts @@ -158,7 +158,7 @@ export async function getOrganizationMergeSuggestions( return cleaned } - // Process up to 100 identities + // Process up to 75 identities // This is a safety limit to prevent OpenSearch max clause errors for (let i = 0; i < Math.min(identities.length, 75); i++) { const { value: rawValue, platform } = identities[i] @@ -187,7 +187,7 @@ export async function getOrganizationMergeSuggestions( // Build OpenSearch query clauses const identitiesShould = [] - const CHUNK_SIZE = 20 // Split queries into chunks to avoid OpenSearch limits + const CHUNK_SIZE = 15 // Split queries into chunks to avoid OpenSearch limits const clauseBuilders: OpenSearchQueryClauseBuilder>[] = [ { @@ -196,7 +196,7 @@ export async function getOrganizationMergeSuggestions( builder: ({ value, platform }) => ({ bool: { must: [ - { match: { [`nested_identities.string_value`]: value } }, + { match_phrase: { [`nested_identities.string_value`]: value } }, { match: { [`nested_identities.string_platform`]: platform } }, { term: { [`nested_identities.bool_verified`]: false } }, ], @@ -212,6 +212,7 @@ export async function getOrganizationMergeSuggestions( query: value, prefix_length: 1, fuzziness: 'auto', + max_expansions: 3, }, }, }), @@ -224,6 +225,7 @@ export async function getOrganizationMergeSuggestions( prefix: { [`nested_identities.string_value`]: { value, + rewrite: 'top_terms_3', }, }, }), From c3f62c5f5fb2fb93d8f3e3d81f83409474ad6fa7 Mon Sep 17 00:00:00 2001 From: Yeganathan S <63534555+skwowet@users.noreply.github.com> Date: Wed, 18 Mar 2026 00:45:00 +0530 Subject: [PATCH 2/5] chore: add test workflow and verify Signed-off-by: Yeganathan S <63534555+skwowet@users.noreply.github.com> --- .../merge_suggestions_worker/src/workflows.ts | 2 + .../testOrganizationMergeSuggestions.ts | 51 +++++++++++++++++++ 2 files changed, 53 insertions(+) create mode 100644 services/apps/merge_suggestions_worker/src/workflows/testOrganizationMergeSuggestions.ts diff --git a/services/apps/merge_suggestions_worker/src/workflows.ts b/services/apps/merge_suggestions_worker/src/workflows.ts index 488e8d71a3..c23ace7611 100644 --- a/services/apps/merge_suggestions_worker/src/workflows.ts +++ b/services/apps/merge_suggestions_worker/src/workflows.ts @@ -5,6 +5,7 @@ import { mergeOrganizationsWithLLM } from './workflows/mergeOrganizationsWithLLM import { spawnMemberMergeSuggestionsForAllTenants } from './workflows/spawnMemberMergeSuggestionsForAllTenants' import { spawnOrganizationMergeSuggestionsForAllTenants } from './workflows/spawnOrganizationMergeSuggestionsForAllTenants' import { testMergingEntitiesWithLLM } from './workflows/testMergingEntitiesWithLLM' +import { testOrganizationMergeSuggestions } from './workflows/testOrganizationMergeSuggestions' export { generateMemberMergeSuggestions, @@ -14,4 +15,5 @@ export { testMergingEntitiesWithLLM, mergeOrganizationsWithLLM, mergeMembersWithLLM, + testOrganizationMergeSuggestions, } diff --git a/services/apps/merge_suggestions_worker/src/workflows/testOrganizationMergeSuggestions.ts b/services/apps/merge_suggestions_worker/src/workflows/testOrganizationMergeSuggestions.ts new file mode 100644 index 0000000000..5f427249e9 --- /dev/null +++ b/services/apps/merge_suggestions_worker/src/workflows/testOrganizationMergeSuggestions.ts @@ -0,0 +1,51 @@ +import { proxyActivities } from '@temporalio/workflow' + +import { IOrganizationBaseForMergeSuggestions, IOrganizationMergeSuggestion } from '@crowd/types' + +import * as activities from '../activities/organizationMergeSuggestions' +import { svc } from '../main' +import { IProcessGenerateOrganizationMergeSuggestionsArgs } from '../types' +import { chunkArray } from '../utils' + +const activity = proxyActivities({ startToCloseTimeout: '1 minute' }) + +export async function testOrganizationMergeSuggestions( + args: IProcessGenerateOrganizationMergeSuggestionsArgs, +): Promise { + const PAGE_SIZE = 25 + const PARALLEL_SUGGESTION_PROCESSING = 50 + + const result: IOrganizationBaseForMergeSuggestions[] = await activity.getOrganizations( + args.tenantId, + PAGE_SIZE, + null, + null, + args.organizationIds, + ) + + if (result.length === 0) { + svc.log.info('No organizations found for merge suggestion test!') + return + } + + const allMergeSuggestions: IOrganizationMergeSuggestion[] = [] + + const promiseChunks = chunkArray(result, PARALLEL_SUGGESTION_PROCESSING) + + for (const chunk of promiseChunks) { + const mergeSuggestionsPromises: Promise[] = chunk.map( + (organization) => activity.getOrganizationMergeSuggestions(args.tenantId, organization), + ) + + const mergeSuggestionsResults: IOrganizationMergeSuggestion[][] = + await Promise.all(mergeSuggestionsPromises) + allMergeSuggestions.push(...mergeSuggestionsResults.flat()) + } + + // Add all merge suggestions to add to merge + if (allMergeSuggestions.length > 0) { + svc.log.info('Found merge suggestions!', allMergeSuggestions) + } else { + svc.log.info('No merge suggestions found for provided organizations!') + } +} From b071dd62a00b93a6bcf0122a328297662b644609 Mon Sep 17 00:00:00 2001 From: Yeganathan S <63534555+skwowet@users.noreply.github.com> Date: Wed, 18 Mar 2026 00:51:30 +0530 Subject: [PATCH 3/5] fix: use console logs Signed-off-by: Yeganathan S <63534555+skwowet@users.noreply.github.com> --- .../src/workflows/testOrganizationMergeSuggestions.ts | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/services/apps/merge_suggestions_worker/src/workflows/testOrganizationMergeSuggestions.ts b/services/apps/merge_suggestions_worker/src/workflows/testOrganizationMergeSuggestions.ts index 5f427249e9..ce2b8237b8 100644 --- a/services/apps/merge_suggestions_worker/src/workflows/testOrganizationMergeSuggestions.ts +++ b/services/apps/merge_suggestions_worker/src/workflows/testOrganizationMergeSuggestions.ts @@ -3,7 +3,6 @@ import { proxyActivities } from '@temporalio/workflow' import { IOrganizationBaseForMergeSuggestions, IOrganizationMergeSuggestion } from '@crowd/types' import * as activities from '../activities/organizationMergeSuggestions' -import { svc } from '../main' import { IProcessGenerateOrganizationMergeSuggestionsArgs } from '../types' import { chunkArray } from '../utils' @@ -24,7 +23,7 @@ export async function testOrganizationMergeSuggestions( ) if (result.length === 0) { - svc.log.info('No organizations found for merge suggestion test!') + console.log('No organizations found for merge suggestion test!') return } @@ -44,8 +43,8 @@ export async function testOrganizationMergeSuggestions( // Add all merge suggestions to add to merge if (allMergeSuggestions.length > 0) { - svc.log.info('Found merge suggestions!', allMergeSuggestions) + console.log('Found merge suggestions!', allMergeSuggestions) } else { - svc.log.info('No merge suggestions found for provided organizations!') + console.log('No merge suggestions found for provided organizations!') } } From 6367b5fb97f197beda1590cde6bf008def3589e2 Mon Sep 17 00:00:00 2001 From: Yeganathan S <63534555+skwowet@users.noreply.github.com> Date: Wed, 18 Mar 2026 01:02:10 +0530 Subject: [PATCH 4/5] chore: rm test workflow Signed-off-by: Yeganathan S <63534555+skwowet@users.noreply.github.com> --- .../merge_suggestions_worker/src/workflows.ts | 2 - .../testOrganizationMergeSuggestions.ts | 50 ------------------- 2 files changed, 52 deletions(-) delete mode 100644 services/apps/merge_suggestions_worker/src/workflows/testOrganizationMergeSuggestions.ts diff --git a/services/apps/merge_suggestions_worker/src/workflows.ts b/services/apps/merge_suggestions_worker/src/workflows.ts index c23ace7611..488e8d71a3 100644 --- a/services/apps/merge_suggestions_worker/src/workflows.ts +++ b/services/apps/merge_suggestions_worker/src/workflows.ts @@ -5,7 +5,6 @@ import { mergeOrganizationsWithLLM } from './workflows/mergeOrganizationsWithLLM import { spawnMemberMergeSuggestionsForAllTenants } from './workflows/spawnMemberMergeSuggestionsForAllTenants' import { spawnOrganizationMergeSuggestionsForAllTenants } from './workflows/spawnOrganizationMergeSuggestionsForAllTenants' import { testMergingEntitiesWithLLM } from './workflows/testMergingEntitiesWithLLM' -import { testOrganizationMergeSuggestions } from './workflows/testOrganizationMergeSuggestions' export { generateMemberMergeSuggestions, @@ -15,5 +14,4 @@ export { testMergingEntitiesWithLLM, mergeOrganizationsWithLLM, mergeMembersWithLLM, - testOrganizationMergeSuggestions, } diff --git a/services/apps/merge_suggestions_worker/src/workflows/testOrganizationMergeSuggestions.ts b/services/apps/merge_suggestions_worker/src/workflows/testOrganizationMergeSuggestions.ts deleted file mode 100644 index ce2b8237b8..0000000000 --- a/services/apps/merge_suggestions_worker/src/workflows/testOrganizationMergeSuggestions.ts +++ /dev/null @@ -1,50 +0,0 @@ -import { proxyActivities } from '@temporalio/workflow' - -import { IOrganizationBaseForMergeSuggestions, IOrganizationMergeSuggestion } from '@crowd/types' - -import * as activities from '../activities/organizationMergeSuggestions' -import { IProcessGenerateOrganizationMergeSuggestionsArgs } from '../types' -import { chunkArray } from '../utils' - -const activity = proxyActivities({ startToCloseTimeout: '1 minute' }) - -export async function testOrganizationMergeSuggestions( - args: IProcessGenerateOrganizationMergeSuggestionsArgs, -): Promise { - const PAGE_SIZE = 25 - const PARALLEL_SUGGESTION_PROCESSING = 50 - - const result: IOrganizationBaseForMergeSuggestions[] = await activity.getOrganizations( - args.tenantId, - PAGE_SIZE, - null, - null, - args.organizationIds, - ) - - if (result.length === 0) { - console.log('No organizations found for merge suggestion test!') - return - } - - const allMergeSuggestions: IOrganizationMergeSuggestion[] = [] - - const promiseChunks = chunkArray(result, PARALLEL_SUGGESTION_PROCESSING) - - for (const chunk of promiseChunks) { - const mergeSuggestionsPromises: Promise[] = chunk.map( - (organization) => activity.getOrganizationMergeSuggestions(args.tenantId, organization), - ) - - const mergeSuggestionsResults: IOrganizationMergeSuggestion[][] = - await Promise.all(mergeSuggestionsPromises) - allMergeSuggestions.push(...mergeSuggestionsResults.flat()) - } - - // Add all merge suggestions to add to merge - if (allMergeSuggestions.length > 0) { - console.log('Found merge suggestions!', allMergeSuggestions) - } else { - console.log('No merge suggestions found for provided organizations!') - } -} From ee8cfa9bec952ad3c49a9bf24bb6ed60aec5bea5 Mon Sep 17 00:00:00 2001 From: Yeganathan S <63534555+skwowet@users.noreply.github.com> Date: Wed, 18 Mar 2026 01:23:08 +0530 Subject: [PATCH 5/5] fix: add test workflow and update organization merge suggestion processing Signed-off-by: Yeganathan S <63534555+skwowet@users.noreply.github.com> --- .../organizationMergeSuggestions.ts | 4 +- .../merge_suggestions_worker/src/workflows.ts | 2 + .../testOrganizationMergeSuggestions.ts | 50 +++++++++++++++++++ 3 files changed, 54 insertions(+), 2 deletions(-) create mode 100644 services/apps/merge_suggestions_worker/src/workflows/testOrganizationMergeSuggestions.ts diff --git a/services/apps/merge_suggestions_worker/src/activities/organizationMergeSuggestions.ts b/services/apps/merge_suggestions_worker/src/activities/organizationMergeSuggestions.ts index e27190e310..a01f426b09 100644 --- a/services/apps/merge_suggestions_worker/src/activities/organizationMergeSuggestions.ts +++ b/services/apps/merge_suggestions_worker/src/activities/organizationMergeSuggestions.ts @@ -155,7 +155,7 @@ export async function getOrganizationMergeSuggestions( cleaned = cleaned.split(':').pop() || cleaned } - return cleaned + return cleaned.toLowerCase() } // Process up to 75 identities @@ -196,7 +196,7 @@ export async function getOrganizationMergeSuggestions( builder: ({ value, platform }) => ({ bool: { must: [ - { match_phrase: { [`nested_identities.string_value`]: value } }, + { match: { [`nested_identities.string_value`]: value } }, { match: { [`nested_identities.string_platform`]: platform } }, { term: { [`nested_identities.bool_verified`]: false } }, ], diff --git a/services/apps/merge_suggestions_worker/src/workflows.ts b/services/apps/merge_suggestions_worker/src/workflows.ts index 488e8d71a3..c23ace7611 100644 --- a/services/apps/merge_suggestions_worker/src/workflows.ts +++ b/services/apps/merge_suggestions_worker/src/workflows.ts @@ -5,6 +5,7 @@ import { mergeOrganizationsWithLLM } from './workflows/mergeOrganizationsWithLLM import { spawnMemberMergeSuggestionsForAllTenants } from './workflows/spawnMemberMergeSuggestionsForAllTenants' import { spawnOrganizationMergeSuggestionsForAllTenants } from './workflows/spawnOrganizationMergeSuggestionsForAllTenants' import { testMergingEntitiesWithLLM } from './workflows/testMergingEntitiesWithLLM' +import { testOrganizationMergeSuggestions } from './workflows/testOrganizationMergeSuggestions' export { generateMemberMergeSuggestions, @@ -14,4 +15,5 @@ export { testMergingEntitiesWithLLM, mergeOrganizationsWithLLM, mergeMembersWithLLM, + testOrganizationMergeSuggestions, } diff --git a/services/apps/merge_suggestions_worker/src/workflows/testOrganizationMergeSuggestions.ts b/services/apps/merge_suggestions_worker/src/workflows/testOrganizationMergeSuggestions.ts new file mode 100644 index 0000000000..ce2b8237b8 --- /dev/null +++ b/services/apps/merge_suggestions_worker/src/workflows/testOrganizationMergeSuggestions.ts @@ -0,0 +1,50 @@ +import { proxyActivities } from '@temporalio/workflow' + +import { IOrganizationBaseForMergeSuggestions, IOrganizationMergeSuggestion } from '@crowd/types' + +import * as activities from '../activities/organizationMergeSuggestions' +import { IProcessGenerateOrganizationMergeSuggestionsArgs } from '../types' +import { chunkArray } from '../utils' + +const activity = proxyActivities({ startToCloseTimeout: '1 minute' }) + +export async function testOrganizationMergeSuggestions( + args: IProcessGenerateOrganizationMergeSuggestionsArgs, +): Promise { + const PAGE_SIZE = 25 + const PARALLEL_SUGGESTION_PROCESSING = 50 + + const result: IOrganizationBaseForMergeSuggestions[] = await activity.getOrganizations( + args.tenantId, + PAGE_SIZE, + null, + null, + args.organizationIds, + ) + + if (result.length === 0) { + console.log('No organizations found for merge suggestion test!') + return + } + + const allMergeSuggestions: IOrganizationMergeSuggestion[] = [] + + const promiseChunks = chunkArray(result, PARALLEL_SUGGESTION_PROCESSING) + + for (const chunk of promiseChunks) { + const mergeSuggestionsPromises: Promise[] = chunk.map( + (organization) => activity.getOrganizationMergeSuggestions(args.tenantId, organization), + ) + + const mergeSuggestionsResults: IOrganizationMergeSuggestion[][] = + await Promise.all(mergeSuggestionsPromises) + allMergeSuggestions.push(...mergeSuggestionsResults.flat()) + } + + // Add all merge suggestions to add to merge + if (allMergeSuggestions.length > 0) { + console.log('Found merge suggestions!', allMergeSuggestions) + } else { + console.log('No merge suggestions found for provided organizations!') + } +}