diff --git a/src/components/Indexer/index.ts b/src/components/Indexer/index.ts index e261bb453..3c4f2a514 100644 --- a/src/components/Indexer/index.ts +++ b/src/components/Indexer/index.ts @@ -35,9 +35,10 @@ import { BlockchainRegistry } from '../BlockchainRegistry/index.js' import { CommandStatus, JobStatus } from '../../@types/commands.js' import { buildJobIdentifier, getDeployedContractBlock } from './utils.js' import { create256Hash } from '../../utils/crypt.js' -import { isReachableConnection } from '../../utils/database.js' +import { getDatabase, isReachableConnection } from '../../utils/database.js' import { sleep } from '../../utils/util.js' import { isReindexingNeeded } from './version.js' +import { DB_EVENTS, ES_CONNECTION_EVENTS } from '../database/ElasticsearchConfigHelper.js' /** * Event emitter for DDO (Data Descriptor Object) events @@ -82,6 +83,8 @@ export class OceanIndexer { private supportedChains: string[] private indexers: Map = new Map() private MIN_REQUIRED_VERSION = '0.2.2' + private isDbConnected: boolean = true + private reconnectTimer: NodeJS.Timeout | null = null constructor( db: Database, @@ -93,9 +96,64 @@ export class OceanIndexer { this.blockchainRegistry = blockchainRegistry this.supportedChains = Object.keys(supportedNetworks) INDEXING_QUEUE = [] + this.setupDbConnectionListeners() this.startAllChainIndexers() } + /** + * Listen for Elasticsearch connection events. + * + * CONNECTION_LOST → cancel any pending restart, stop all indexers once. + * CONNECTION_RESTORED → debounce restart by 5 s so rapid LOST/RESTORED cycles are a single restart. + */ + private setupDbConnectionListeners(): void { + ES_CONNECTION_EVENTS.on(DB_EVENTS.CONNECTION_LOST, async () => { + if (this.reconnectTimer) { + clearTimeout(this.reconnectTimer) + this.reconnectTimer = null + } + + if (!this.isDbConnected) { + return + } + + this.isDbConnected = false + INDEXER_LOGGER.error( + 'Database connection lost - stopping all chain indexers until DB is back' + ) + await this.stopAllChainIndexers() + }) + + ES_CONNECTION_EVENTS.on(DB_EVENTS.CONNECTION_RESTORED, () => { + if (this.isDbConnected) { + return + } + + if (this.reconnectTimer) { + clearTimeout(this.reconnectTimer) + } + + this.reconnectTimer = setTimeout(async () => { + this.reconnectTimer = null + if (this.isDbConnected) { + return + } + + this.isDbConnected = true + numCrawlAttempts = 0 + INDEXER_LOGGER.info( + 'Database connection stable - reinitialising DB and restarting all chain indexers' + ) + const freshDb = await getDatabase(true) + if (freshDb) { + this.db = freshDb + } + + await this.startAllChainIndexers() + }, 5000) + }) + } + public getSupportedNetworks(): RPCS { return this.networks } diff --git a/src/components/database/ElasticsearchConfigHelper.ts b/src/components/database/ElasticsearchConfigHelper.ts index 9838e4ff1..072ccfc2a 100644 --- a/src/components/database/ElasticsearchConfigHelper.ts +++ b/src/components/database/ElasticsearchConfigHelper.ts @@ -1,9 +1,16 @@ +import EventEmitter from 'node:events' import { Client } from '@elastic/elasticsearch' import { OceanNodeDBConfig } from '../../@types' import { DATABASE_LOGGER } from '../../utils/logging/common.js' import { GENERIC_EMOJIS, LOG_LEVELS_STR } from '../../utils/logging/Logger.js' import { DB_TYPES } from '../../utils/constants.js' +export const DB_EVENTS = { + CONNECTION_LOST: 'db:connection:lost', + CONNECTION_RESTORED: 'db:connection:restored' +} as const +export const ES_CONNECTION_EVENTS = new EventEmitter() + export interface ElasticsearchRetryConfig { requestTimeout?: number pingTimeout?: number @@ -16,20 +23,20 @@ export interface ElasticsearchRetryConfig { } export const DEFAULT_ELASTICSEARCH_CONFIG: Required = { - requestTimeout: parseInt(process.env.ELASTICSEARCH_REQUEST_TIMEOUT || '60000'), - pingTimeout: parseInt(process.env.ELASTICSEARCH_PING_TIMEOUT || '5000'), + requestTimeout: parseInt(process.env.ELASTICSEARCH_REQUEST_TIMEOUT || '30000'), + pingTimeout: parseInt(process.env.ELASTICSEARCH_PING_TIMEOUT || '3000'), resurrectStrategy: (process.env.ELASTICSEARCH_RESURRECT_STRATEGY as 'ping' | 'optimistic' | 'none') || 'ping', - maxRetries: parseInt(process.env.ELASTICSEARCH_MAX_RETRIES || '5'), + maxRetries: parseInt(process.env.ELASTICSEARCH_MAX_RETRIES || '3'), sniffOnStart: process.env.ELASTICSEARCH_SNIFF_ON_START !== 'false', sniffInterval: process.env.ELASTICSEARCH_SNIFF_INTERVAL === 'false' ? false - : parseInt(process.env.ELASTICSEARCH_SNIFF_INTERVAL || '30000'), + : parseInt(process.env.ELASTICSEARCH_SNIFF_INTERVAL || '30000', 10) || 30000, sniffOnConnectionFault: process.env.ELASTICSEARCH_SNIFF_ON_CONNECTION_FAULT !== 'false', healthCheckInterval: parseInt( - process.env.ELASTICSEARCH_HEALTH_CHECK_INTERVAL || '60000' + process.env.ELASTICSEARCH_HEALTH_CHECK_INTERVAL || '15000' ) } @@ -42,6 +49,7 @@ class ElasticsearchClientSingleton { private isRetrying: boolean = false private healthCheckTimer: NodeJS.Timeout | null = null private isMonitoring: boolean = false + private connectionLostEmitted: boolean = false private constructor() {} @@ -73,21 +81,27 @@ class ElasticsearchClientSingleton { } if (this.client && this.config) { + // Skip the extra ping here: 5 DB-class constructors all call getClient() + // during reconnect reinit, and concurrent pings cause false errors that trigger another LOST/RESTORED cycle. + if (this.isMonitoring) { + return this.client + } + const isHealthy = await this.checkConnectionHealth() if (isHealthy) { this.startHealthMonitoring(config, customConfig) return this.client } else { DATABASE_LOGGER.logMessageWithEmoji( - `Elasticsearch connection interrupted or failed to ${this.maskUrl( + `Elasticsearch connection unhealthy at ${this.maskUrl( this.config.url - )} - starting retry phase`, + )} - health monitoring will handle reconnection`, true, GENERIC_EMOJIS.EMOJI_CROSS_MARK, LOG_LEVELS_STR.LEVEL_WARN ) - this.closeConnectionSync() - return this.startRetryConnection(config, customConfig) + this.startHealthMonitoring(config, customConfig) + throw new Error('Elasticsearch connection is not healthy') } } @@ -109,33 +123,69 @@ class ElasticsearchClientSingleton { this.isMonitoring = true DATABASE_LOGGER.logMessageWithEmoji( - `Starting Elasticsearch connection monitoring (health check every ${finalConfig.healthCheckInterval}ms)`, + `Starting Elasticsearch health monitoring (interval: ${finalConfig.healthCheckInterval}ms)`, true, GENERIC_EMOJIS.EMOJI_OCEAN_WAVE, LOG_LEVELS_STR.LEVEL_DEBUG ) this.healthCheckTimer = setInterval(async () => { - if (this.client && !this.isRetrying) { - const isHealthy = await this.checkConnectionHealth() - if (!isHealthy) { - DATABASE_LOGGER.logMessageWithEmoji( - `Elasticsearch connection lost during monitoring - triggering automatic reconnection`, - true, - GENERIC_EMOJIS.EMOJI_CROSS_MARK, - LOG_LEVELS_STR.LEVEL_WARN - ) - this.closeConnectionSync() + if (this.isRetrying) { + return + } + + const isHealthy = await this.checkConnectionHealth() + if (!isHealthy) { + if (this.client) { try { - await this.startRetryConnection(config, customConfig) - } catch (error) { + this.client.close() + } catch (err) { DATABASE_LOGGER.logMessageWithEmoji( - `Automatic reconnection failed: ${error.message}`, + `Error closing Elasticsearch client during health check: ${err instanceof Error ? err.message : String(err)}`, true, GENERIC_EMOJIS.EMOJI_CROSS_MARK, - LOG_LEVELS_STR.LEVEL_ERROR + LOG_LEVELS_STR.LEVEL_DEBUG ) } + this.client = null + this.config = null + } + + // Emit CONNECTION_LOST + if (!this.connectionLostEmitted) { + this.connectionLostEmitted = true + DATABASE_LOGGER.logMessageWithEmoji( + `Elasticsearch connection lost to ${this.maskUrl( + config.url + )} - starting reconnection attempts every ${finalConfig.healthCheckInterval}ms`, + true, + GENERIC_EMOJIS.EMOJI_CROSS_MARK, + LOG_LEVELS_STR.LEVEL_WARN + ) + ES_CONNECTION_EVENTS.emit(DB_EVENTS.CONNECTION_LOST) + } + + // Single reconnection attempt + this.isRetrying = true + try { + DATABASE_LOGGER.logMessageWithEmoji( + `Attempting Elasticsearch reconnection to ${this.maskUrl(config.url)}`, + true, + GENERIC_EMOJIS.EMOJI_OCEAN_WAVE, + LOG_LEVELS_STR.LEVEL_INFO + ) + await this.createNewConnection(config, customConfig) + this.isRetrying = false + this.connectionLostEmitted = false + DATABASE_LOGGER.logMessageWithEmoji( + `Elasticsearch connection restored to ${this.maskUrl(config.url)}`, + true, + GENERIC_EMOJIS.EMOJI_CHECK_MARK, + LOG_LEVELS_STR.LEVEL_INFO + ) + ES_CONNECTION_EVENTS.emit(DB_EVENTS.CONNECTION_RESTORED) + } catch { + this.isRetrying = false } } }, finalConfig.healthCheckInterval) @@ -155,71 +205,6 @@ class ElasticsearchClientSingleton { } } - private async startRetryConnection( - config: OceanNodeDBConfig, - customConfig: Partial = {} - ): Promise { - if (!this.isElasticsearchDatabase(config)) { - throw new Error(`Database type '${config.dbType}' is not Elasticsearch`) - } - - this.isRetrying = true - const finalConfig = { - ...DEFAULT_ELASTICSEARCH_CONFIG, - ...customConfig - } - - DATABASE_LOGGER.logMessageWithEmoji( - `Starting Elasticsearch retry connection phase to ${this.maskUrl( - config.url - )} (max retries: ${finalConfig.maxRetries})`, - true, - GENERIC_EMOJIS.EMOJI_OCEAN_WAVE, - LOG_LEVELS_STR.LEVEL_INFO - ) - - for (let attempt = 1; attempt <= finalConfig.maxRetries; attempt++) { - try { - DATABASE_LOGGER.logMessageWithEmoji( - `Elasticsearch reconnection attempt ${attempt}/${ - finalConfig.maxRetries - } to ${this.maskUrl(config.url)}`, - true, - GENERIC_EMOJIS.EMOJI_OCEAN_WAVE, - LOG_LEVELS_STR.LEVEL_INFO - ) - - const client = await this.createNewConnection(config, customConfig) - this.isRetrying = false - return client - } catch (error) { - if (attempt === finalConfig.maxRetries) { - this.isRetrying = false - DATABASE_LOGGER.logMessageWithEmoji( - `Elasticsearch retry connection failed after ${ - finalConfig.maxRetries - } attempts to ${this.maskUrl(config.url)}: ${error.message}`, - true, - GENERIC_EMOJIS.EMOJI_CROSS_MARK, - LOG_LEVELS_STR.LEVEL_ERROR - ) - throw error - } - - const delay = Math.min(1000 * Math.pow(2, attempt - 1), 30000) - DATABASE_LOGGER.logMessageWithEmoji( - `Elasticsearch retry attempt ${attempt}/${finalConfig.maxRetries} failed, waiting ${delay}ms before next attempt: ${error.message}`, - true, - GENERIC_EMOJIS.EMOJI_CROSS_MARK, - LOG_LEVELS_STR.LEVEL_WARN - ) - await new Promise((resolve) => setTimeout(resolve, delay)) - } - } - - throw new Error('Maximum retry attempts reached') - } - private async checkConnectionHealth(): Promise { if (!this.client) return false @@ -228,7 +213,7 @@ class ElasticsearchClientSingleton { return true } catch (error) { DATABASE_LOGGER.logMessageWithEmoji( - `Elasticsearch connection health check failed: ${error.message}`, + `Elasticsearch health check failed: ${error.message}`, true, GENERIC_EMOJIS.EMOJI_CROSS_MARK, LOG_LEVELS_STR.LEVEL_DEBUG @@ -277,9 +262,7 @@ class ElasticsearchClientSingleton { DATABASE_LOGGER.logMessageWithEmoji( `Elasticsearch connection established successfully to ${this.maskUrl( config.url - )} (attempt ${this.connectionAttempts}/${ - finalConfig.maxRetries - }) last successful connection ${this.lastConnectionTime}`, + )} (attempt ${this.connectionAttempts}) last successful connection ${this.lastConnectionTime}`, true, GENERIC_EMOJIS.EMOJI_CHECK_MARK, LOG_LEVELS_STR.LEVEL_INFO @@ -290,9 +273,7 @@ class ElasticsearchClientSingleton { DATABASE_LOGGER.logMessageWithEmoji( `Failed to connect to Elasticsearch at ${this.maskUrl(config.url)} (attempt ${ this.connectionAttempts - }/${finalConfig.maxRetries}) last successful connection ${ - this.lastConnectionTime - }: ${error.message}`, + }) last successful connection ${this.lastConnectionTime}: ${error.message}`, true, GENERIC_EMOJIS.EMOJI_CROSS_MARK, LOG_LEVELS_STR.LEVEL_ERROR @@ -323,6 +304,7 @@ class ElasticsearchClientSingleton { this.client = null this.config = null } + this.connectionLostEmitted = false } public getConnectionStats(): {