Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 70 additions & 65 deletions handwritten/firestore/dev/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1891,73 +1891,78 @@ export class Firestore implements firestore.Firestore {
return this._retry(methodName, requestTag, () => {
const result = new Deferred<Duplex>();

void this._clientPool.run(requestTag, bidrectional, async gapicClient => {
logger(
'Firestore.requestStream',
requestTag,
'Sending request: %j',
request,
);

this._traceUtil
.currentSpan()
.addEvent(`Firestore.${methodName}: Start`);

try {
const stream = bidirectional
? gapicClient[methodName](callOptions)
: gapicClient[methodName](request, callOptions);
const logStream = new Transform({
objectMode: true,
transform: (chunk, encoding, callback) => {
logger(
'Firestore.requestStream',
requestTag,
'Received response: %j',
chunk,
);
numResponses++;
if (numResponses === 1) {
this._traceUtil
.currentSpan()
.addEvent(`Firestore.${methodName}: First response received`);
} else if (numResponses % NUM_RESPONSES_PER_TRACE_EVENT === 0) {
this._traceUtil
.currentSpan()
.addEvent(
`Firestore.${methodName}: Received ${numResponses} responses`,
);
}
callback();
},
});
stream.pipe(logStream);

const lifetime = new Deferred<void>();
const resultStream = await this._initializeStream(
stream,
lifetime,
void this._clientPool.run(
requestTag,
bidrectional,
async gapicClient => {
logger(
'Firestore.requestStream',
requestTag,
bidirectional ? request : undefined,
'Sending request: %j',
request,
);
resultStream.on('end', () => {
stream.end();
this._traceUtil
.currentSpan()
.addEvent(`Firestore.${methodName}: Completed`, {
[ATTRIBUTE_KEY_NUM_RESPONSES]: numResponses,
});
});
result.resolve(resultStream);

// While we return the stream to the callee early, we don't want to
// release the GAPIC client until the callee has finished processing the
// stream.
return lifetime.promise;
} catch (e) {
result.reject(e);
}
});

this._traceUtil
.currentSpan()
.addEvent(`Firestore.${methodName}: Start`);

try {
const stream = bidirectional
? gapicClient[methodName](callOptions)
: gapicClient[methodName](request, callOptions);
const logStream = new Transform({
objectMode: true,
transform: (chunk, encoding, callback) => {
logger(
'Firestore.requestStream',
requestTag,
'Received response: %j',
chunk,
);
numResponses++;
if (numResponses === 1) {
this._traceUtil.currentSpan().addEvent(
`Firestore.${methodName}: First response received`,
);
} else if (numResponses % NUM_RESPONSES_PER_TRACE_EVENT === 0) {
this._traceUtil
.currentSpan()
.addEvent(
`Firestore.${methodName}: Received ${numResponses} responses`,
);
}
callback();
},
});
stream.pipe(logStream);

const lifetime = new Deferred<void>();
const resultStream = await this._initializeStream(
stream,
lifetime,
requestTag,
bidirectional ? request : undefined,
);
resultStream.on('end', () => {
stream.end();
this._traceUtil
.currentSpan()
.addEvent(`Firestore.${methodName}: Completed`, {
[ATTRIBUTE_KEY_NUM_RESPONSES]: numResponses,
});
});
result.resolve(resultStream);

// While we return the stream to the callee early, we don't want to
// release the GAPIC client until the callee has finished processing the
// stream.
return lifetime.promise;
} catch (e) {
result.reject(e);
}
},
/* preferIdleClients= */ true,
);

return result.promise;
});
Expand Down
33 changes: 25 additions & 8 deletions handwritten/firestore/dev/src/pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,11 @@ export class ClientPool<T extends object> {
* @private
* @internal
*/
private acquire(requestTag: string, requiresGrpc: boolean): T {
private acquire(
requestTag: string,
requiresGrpc: boolean,
preferIdleClients = false,
): T {
let selectedClient: T | null = null;
let selectedClientRequestCount = -1;

Expand All @@ -125,15 +129,27 @@ export class ClientPool<T extends object> {
requiresGrpc = requiresGrpc || this.grpcEnabled;

for (const [client, metadata] of this.activeClients) {
if (
this.failedClients.has(client) ||
metadata.activeRequestCount >= this.concurrentOperationLimit ||
(!metadata.grpcEnabled && requiresGrpc)
) {
continue;
}

if (preferIdleClients) {
if (metadata.activeRequestCount === 0) {
selectedClient = client;
selectedClientRequestCount = metadata.activeRequestCount;
break;
}
continue;
}

// Use the "most-full" client that can still accommodate the request
// in order to maximize the number of idle clients as operations start to
// complete.
if (
!this.failedClients.has(client) &&
metadata.activeRequestCount > selectedClientRequestCount &&
metadata.activeRequestCount < this.concurrentOperationLimit &&
(metadata.grpcEnabled || !requiresGrpc)
) {
if (metadata.activeRequestCount > selectedClientRequestCount) {
selectedClient = client;
selectedClientRequestCount = metadata.activeRequestCount;
}
Expand Down Expand Up @@ -345,11 +361,12 @@ export class ClientPool<T extends object> {
requestTag: string,
requiresGrpc: boolean,
op: (client: T) => Promise<V>,
preferIdleClients = false,
): Promise<V> {
if (this.terminated) {
return Promise.reject(new Error(CLIENT_TERMINATED_ERROR_MSG));
}
const client = this.acquire(requestTag, requiresGrpc);
const client = this.acquire(requestTag, requiresGrpc, preferIdleClients);

return op(client)
.catch(async (err: GoogleError) => {
Expand Down
60 changes: 60 additions & 0 deletions handwritten/firestore/dev/test/pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,66 @@ describe('Client pool', () => {
expect(instanceCount).to.equal(1);
});

it('re-uses idle instances when preferIdleClients is enabled', async () => {
let instanceCount = 0;
const clientPool = new ClientPool<{}>(10, 1, () => {
++instanceCount;
return {};
});

const operationPromises = deferredPromises(2);

let completionPromise = clientPool.run(
REQUEST_TAG,
USE_REST,
() => operationPromises[0].promise,
/* preferIdleClients= */ true,
);
expect(clientPool.size).to.equal(1);
operationPromises[0].resolve();
await completionPromise;

completionPromise = clientPool.run(
REQUEST_TAG,
USE_REST,
() => operationPromises[1].promise,
/* preferIdleClients= */ true,
);
expect(clientPool.size).to.equal(1);
operationPromises[1].resolve();
await completionPromise;

expect(instanceCount).to.equal(1);
});

it('creates a new client when preferIdleClients is enabled and all clients are busy', () => {
const clientPool = new ClientPool<{}>(10, 1, () => {
return {};
});

const operationPromises = deferredPromises(2);

void clientPool.run(
REQUEST_TAG,
USE_REST,
() => operationPromises[0].promise,
/* preferIdleClients= */ true,
);
expect(clientPool.size).to.equal(1);

void clientPool.run(
REQUEST_TAG,
USE_REST,
() => operationPromises[1].promise,
/* preferIdleClients= */ true,
);

expect(clientPool.size).to.equal(2);

operationPromises[0].resolve();
operationPromises[1].resolve();
});

it('does not re-use rest instance for grpc call', async () => {
const clientPool = new ClientPool<{}>(10, 1, () => {
return {};
Expand Down
Loading