diff --git a/src/message-stream.ts b/src/message-stream.ts index 60c404bcb..cdfad5903 100644 --- a/src/message-stream.ts +++ b/src/message-stream.ts @@ -139,6 +139,7 @@ export class ChannelError extends Error implements grpc.ServiceError { interface StreamTracked { stream?: PullStream; receivedStatus?: boolean; + pingTimeout?: NodeJS.Timeout; } /** @@ -227,6 +228,9 @@ export class MessageStream extends PassThrough { for (let i = 0; i < this._streams.length; i++) { const tracker = this._streams[i]; + if (tracker.pingTimeout) { + clearTimeout(tracker.pingTimeout); + } if (tracker.stream) { this._removeStream(i, 'overall message stream destroyed', 'n/a'); } @@ -254,6 +258,8 @@ export class MessageStream extends PassThrough { tracker.stream = stream; tracker.receivedStatus = false; + this._resetPingTimer(index); + stream .on('error', err => this._onError(index, err)) .once('status', status => this._onStatus(index, status)) @@ -264,10 +270,30 @@ export class MessageStream extends PassThrough { // Mark this stream as alive again. (reset backoff) const tracker = this._streams[index]; this._retrier.reset(tracker); + this._resetPingTimer(index); this.emit('data', data); } + private _resetPingTimer(index: number): void { + const tracker = this._streams[index]; + if (tracker.pingTimeout) { + clearTimeout(tracker.pingTimeout); + } + // We expect a packet from the server at least once every 30 seconds. + // Give it a 1-second grace period. + tracker.pingTimeout = setTimeout(() => { + this._removeStream( + index, + 'stream inactive for longer than 30 seconds', + 'will be retried', + ); + this._retrier.retryLater(tracker, () => + this._fillOne(index, undefined, 'retry'), + ); + }, 31000); + } + /** * Attempts to create and cache the desired number of StreamingPull requests. * gRPC does not supply a way to confirm that a stream is connected, so our @@ -347,6 +373,8 @@ export class MessageStream extends PassThrough { maxOutstandingBytes: this._subscriber.useLegacyFlowControl ? 0 : this._subscriber.maxBytes, + clientId: 'node-pubsub', + protocolVersion: 1, // Set protocol version to fulfill keepalive capabilities }; const otherArgs = { headers: { @@ -511,6 +539,10 @@ export class MessageStream extends PassThrough { whatNext?: string, ): void { const tracker = this._streams[index]; + if (tracker.pingTimeout) { + clearTimeout(tracker.pingTimeout); + tracker.pingTimeout = undefined; + } if (tracker.stream) { logs.subscriberStreams.info( 'closing stream %i; why: %s; next: %s',