diff --git a/handwritten/firestore/dev/src/index.ts b/handwritten/firestore/dev/src/index.ts index 5218efaf939..75e06bec3ec 100644 --- a/handwritten/firestore/dev/src/index.ts +++ b/handwritten/firestore/dev/src/index.ts @@ -1891,73 +1891,78 @@ export class Firestore implements firestore.Firestore { return this._retry(methodName, requestTag, () => { const result = new Deferred(); - void this._clientPool.run(requestTag, bidrectional, async gapicClient => { - logger( - 'Firestore.requestStream', - requestTag, - 'Sending request: %j', - request, - ); - - this._traceUtil - .currentSpan() - .addEvent(`Firestore.${methodName}: Start`); - - try { - const stream = bidirectional - ? gapicClient[methodName](callOptions) - : gapicClient[methodName](request, callOptions); - const logStream = new Transform({ - objectMode: true, - transform: (chunk, encoding, callback) => { - logger( - 'Firestore.requestStream', - requestTag, - 'Received response: %j', - chunk, - ); - numResponses++; - if (numResponses === 1) { - this._traceUtil - .currentSpan() - .addEvent(`Firestore.${methodName}: First response received`); - } else if (numResponses % NUM_RESPONSES_PER_TRACE_EVENT === 0) { - this._traceUtil - .currentSpan() - .addEvent( - `Firestore.${methodName}: Received ${numResponses} responses`, - ); - } - callback(); - }, - }); - stream.pipe(logStream); - - const lifetime = new Deferred(); - const resultStream = await this._initializeStream( - stream, - lifetime, + void this._clientPool.run( + requestTag, + bidrectional, + async gapicClient => { + logger( + 'Firestore.requestStream', requestTag, - bidirectional ? request : undefined, + 'Sending request: %j', + request, ); - resultStream.on('end', () => { - stream.end(); - this._traceUtil - .currentSpan() - .addEvent(`Firestore.${methodName}: Completed`, { - [ATTRIBUTE_KEY_NUM_RESPONSES]: numResponses, - }); - }); - result.resolve(resultStream); - - // While we return the stream to the callee early, we don't want to - // release the GAPIC client until the callee has finished processing the - // stream. - return lifetime.promise; - } catch (e) { - result.reject(e); - } - }); + + this._traceUtil + .currentSpan() + .addEvent(`Firestore.${methodName}: Start`); + + try { + const stream = bidirectional + ? gapicClient[methodName](callOptions) + : gapicClient[methodName](request, callOptions); + const logStream = new Transform({ + objectMode: true, + transform: (chunk, encoding, callback) => { + logger( + 'Firestore.requestStream', + requestTag, + 'Received response: %j', + chunk, + ); + numResponses++; + if (numResponses === 1) { + this._traceUtil.currentSpan().addEvent( + `Firestore.${methodName}: First response received`, + ); + } else if (numResponses % NUM_RESPONSES_PER_TRACE_EVENT === 0) { + this._traceUtil + .currentSpan() + .addEvent( + `Firestore.${methodName}: Received ${numResponses} responses`, + ); + } + callback(); + }, + }); + stream.pipe(logStream); + + const lifetime = new Deferred(); + const resultStream = await this._initializeStream( + stream, + lifetime, + requestTag, + bidirectional ? request : undefined, + ); + resultStream.on('end', () => { + stream.end(); + this._traceUtil + .currentSpan() + .addEvent(`Firestore.${methodName}: Completed`, { + [ATTRIBUTE_KEY_NUM_RESPONSES]: numResponses, + }); + }); + result.resolve(resultStream); + + // While we return the stream to the callee early, we don't want to + // release the GAPIC client until the callee has finished processing the + // stream. + return lifetime.promise; + } catch (e) { + result.reject(e); + } + }, + /* preferIdleClients= */ true, + ); return result.promise; }); diff --git a/handwritten/firestore/dev/src/pool.ts b/handwritten/firestore/dev/src/pool.ts index 5bb4550171b..9369b4d75cd 100644 --- a/handwritten/firestore/dev/src/pool.ts +++ b/handwritten/firestore/dev/src/pool.ts @@ -114,7 +114,11 @@ export class ClientPool { * @private * @internal */ - private acquire(requestTag: string, requiresGrpc: boolean): T { + private acquire( + requestTag: string, + requiresGrpc: boolean, + preferIdleClients = false, + ): T { let selectedClient: T | null = null; let selectedClientRequestCount = -1; @@ -125,15 +129,27 @@ export class ClientPool { requiresGrpc = requiresGrpc || this.grpcEnabled; for (const [client, metadata] of this.activeClients) { + if ( + this.failedClients.has(client) || + metadata.activeRequestCount >= this.concurrentOperationLimit || + (!metadata.grpcEnabled && requiresGrpc) + ) { + continue; + } + + if (preferIdleClients) { + if (metadata.activeRequestCount === 0) { + selectedClient = client; + selectedClientRequestCount = metadata.activeRequestCount; + break; + } + continue; + } + // Use the "most-full" client that can still accommodate the request // in order to maximize the number of idle clients as operations start to // complete. - if ( - !this.failedClients.has(client) && - metadata.activeRequestCount > selectedClientRequestCount && - metadata.activeRequestCount < this.concurrentOperationLimit && - (metadata.grpcEnabled || !requiresGrpc) - ) { + if (metadata.activeRequestCount > selectedClientRequestCount) { selectedClient = client; selectedClientRequestCount = metadata.activeRequestCount; } @@ -345,11 +361,12 @@ export class ClientPool { requestTag: string, requiresGrpc: boolean, op: (client: T) => Promise, + preferIdleClients = false, ): Promise { if (this.terminated) { return Promise.reject(new Error(CLIENT_TERMINATED_ERROR_MSG)); } - const client = this.acquire(requestTag, requiresGrpc); + const client = this.acquire(requestTag, requiresGrpc, preferIdleClients); return op(client) .catch(async (err: GoogleError) => { diff --git a/handwritten/firestore/dev/test/pool.ts b/handwritten/firestore/dev/test/pool.ts index 3a7b188a560..7d58b79b83a 100644 --- a/handwritten/firestore/dev/test/pool.ts +++ b/handwritten/firestore/dev/test/pool.ts @@ -166,6 +166,66 @@ describe('Client pool', () => { expect(instanceCount).to.equal(1); }); + it('re-uses idle instances when preferIdleClients is enabled', async () => { + let instanceCount = 0; + const clientPool = new ClientPool<{}>(10, 1, () => { + ++instanceCount; + return {}; + }); + + const operationPromises = deferredPromises(2); + + let completionPromise = clientPool.run( + REQUEST_TAG, + USE_REST, + () => operationPromises[0].promise, + /* preferIdleClients= */ true, + ); + expect(clientPool.size).to.equal(1); + operationPromises[0].resolve(); + await completionPromise; + + completionPromise = clientPool.run( + REQUEST_TAG, + USE_REST, + () => operationPromises[1].promise, + /* preferIdleClients= */ true, + ); + expect(clientPool.size).to.equal(1); + operationPromises[1].resolve(); + await completionPromise; + + expect(instanceCount).to.equal(1); + }); + + it('creates a new client when preferIdleClients is enabled and all clients are busy', () => { + const clientPool = new ClientPool<{}>(10, 1, () => { + return {}; + }); + + const operationPromises = deferredPromises(2); + + void clientPool.run( + REQUEST_TAG, + USE_REST, + () => operationPromises[0].promise, + /* preferIdleClients= */ true, + ); + expect(clientPool.size).to.equal(1); + + void clientPool.run( + REQUEST_TAG, + USE_REST, + () => operationPromises[1].promise, + /* preferIdleClients= */ true, + ); + + expect(clientPool.size).to.equal(2); + + operationPromises[0].resolve(); + operationPromises[1].resolve(); + }); + it('does not re-use rest instance for grpc call', async () => { const clientPool = new ClientPool<{}>(10, 1, () => { return {};