diff --git a/README.md b/README.md index b0bfffe2..9b101d11 100644 --- a/README.md +++ b/README.md @@ -40,6 +40,7 @@ yarn # Install yarn dependencies - [Subscription](./docs/components/transport-types/subscription-transport.md) - [Streaming](./docs/components/transport-types/streaming-transport.md) - [Custom](./docs/components/transport-types/custom-transport.md) + - [Composite](./docs/components/transport-types/composite-transport.md) - Guides - [Porting a v2 EA to v3](./docs/guides/porting-a-v2-ea-to-v3.md) - [Creating a new v3 EA](./docs/guides/creating-a-new-v3-ea.md) diff --git a/docs/components/transport-types/composite-transport.md b/docs/components/transport-types/composite-transport.md new file mode 100644 index 00000000..4031acec --- /dev/null +++ b/docs/components/transport-types/composite-transport.md @@ -0,0 +1,50 @@ +# Composite transport + +Composite transport is a **framework feature** for multi-route endpoints: every registered child transport runs in parallel for the same endpoint, and successful cache writes are merged so only the “freshest” value wins. You enable it on the endpoint; you **do not** import or `new CompositeTransport(...)` in adapter code—that class is constructed internally when the conditions below are met. + +Typical uses: + +- Pair a low-latency stream (for example WebSocket) with a REST fallback so the cache still updates if the stream lags or drops. +- Run two data paths for the same feed and keep whichever provider reports a newer `providerIndicatedTimeUnixMs`. + +## How to use it + +1. Define the endpoint with **`transportRoutes`** (not a single `transport` field). Register **at least two** named child transports on a [`TransportRoutes`](../../../src/transports/index.ts) instance. Transport names must be lowercase letters only (see `TransportRoutes.register`). +2. Set **`enableCompositeTransport: true`** on the same [`AdapterEndpoint`](../../../src/adapter/endpoint.ts) params. +3. Turn the behavior on at runtime by setting adapter setting **`COMPOSITE_TRANSPORT`** to `true` (for example env `COMPOSITE_TRANSPORT=true`, or your adapter’s settings prefix). + +If `enableCompositeTransport` is `true` but there are fewer than two routes, construction throws. If `enableCompositeTransport` is `true` but **`COMPOSITE_TRANSPORT`** is `false` (the default), the endpoint keeps **normal multi-transport routing** (`customRouter`, request `transport`, or `defaultTransport`) so operators can flip composite mode without redeploying. + +When **both** flags are true, [`AdapterEndpoint.initialize`](../../../src/adapter/endpoint.ts) replaces the route map with a single internal route whose transport is a `CompositeTransport` built from your previous route entries. From then on the framework treats the endpoint as having one logical transport that fans out to all children. + +## Example + +```typescript +import { AdapterEndpoint } from '@chainlink/external-adapter-framework/adapter' +import { TransportRoutes } from '@chainlink/external-adapter-framework/transports' + +// wsTransport and restTransport are normal transports you already defined +export const endpoint = new AdapterEndpoint({ + name: 'example', + inputParameters, + enableCompositeTransport: true, + transportRoutes: new TransportRoutes() + .register('ws', wsTransport) + .register('rest', restTransport), +}) +``` + +Deploy or configure with **`COMPOSITE_TRANSPORT=true`** when you want parallel execution and merged caching for that endpoint. + +## How it works (internals) + +The framework’s `CompositeTransport` (see [`composite.ts`](../../../src/transports/composite.ts)) wires each child with a [`CompareResponseCache`](../../../src/cache/response-cache/compare.ts) instead of the raw endpoint cache: reads go through to the real cache, while writes are accepted only when the pending payload is newer than both the last value seen for that key on that child path and the value already stored, using **`timestamps.providerIndicatedTimeUnixMs`** (missing timestamps are treated as `0`). **`registerRequest`** and **`backgroundExecute`** are invoked on **every** child in parallel. There is no `foregroundExecute` on the composite itself; behavior comes entirely from the children. + +Child names are the keys you passed to `register`; each child’s `initialize` receives that string as its `transportName`. + +## Notes + +- **Timestamps** — Children should populate `providerIndicatedTimeUnixMs` when they have a meaningful provider clock; otherwise merge order may not match business intent. +- **Concurrency** — Delivery order across children is not guaranteed; the merge rule is strictly “larger `providerIndicatedTimeUnixMs` wins.” +- **TTL** — TTL behavior flows through the compare cache with the composite’s transport name; see `CompareResponseCache.writeTTL` if you depend on per-transport TTL semantics. +- **Errors** — Children still own parsing and errors; the composite only arbitrates successful cache updates between children. diff --git a/docs/components/transports.md b/docs/components/transports.md index d236d8a2..d8599068 100644 --- a/docs/components/transports.md +++ b/docs/components/transports.md @@ -13,6 +13,7 @@ The v3 framework provides transports to fetch data from a Provider using the com - [HTTP Transport](./transport-types/http-transport.md) - [Websocket Transport](./transport-types/websocket-transport.md) - [SSE Transport](./transport-types/sse-transport.md) +- [Composite Transport](./transport-types/composite-transport.md) - [Custom Transport](./transport-types/custom-transport.md) ### Abstract Transports diff --git a/docs/reference-tables/ea-settings.md b/docs/reference-tables/ea-settings.md index 93ae62bc..562c86ee 100644 --- a/docs/reference-tables/ea-settings.md +++ b/docs/reference-tables/ea-settings.md @@ -27,6 +27,7 @@ | CACHE_REDIS_URL | string | undefined | The URL of the Redis server. Format: [redis[s]:]//[[user][:password@]][host][:port][/db-number]?db=db-number[&password=bar[&option=value]]] | - Value must be a valid URL | | | CACHE_TYPE | enum | local | The type of cache to use throughout the EA | | | | CENSOR_SENSITIVE_LOGS | boolean | false | Controls whether the logging of sensitive information is enabled or disabled | | | +| COMPOSITE_TRANSPORT | boolean | false | Whether to use enableCompositeTransport parameter in AdapterEndpoint | | | | CORRELATION_ID_ENABLED | boolean | true | Flag to enable correlation IDs for sent requests in logging | | | | DEBUG | boolean | false | Toggles debug mode | | | | DEBUG_ENDPOINTS | boolean | false | Whether to enable debug enpoints (/debug/\*) for this adapter. Enabling them might consume more resources. | | | diff --git a/src/adapter/endpoint.ts b/src/adapter/endpoint.ts index e64ca34a..25669ecd 100644 --- a/src/adapter/endpoint.ts +++ b/src/adapter/endpoint.ts @@ -1,6 +1,6 @@ -import { ResponseCache } from '../cache/response' +import { SimpleResponseCache } from '../cache/response' import { AdapterSettings } from '../config' -import { TransportRoutes } from '../transports' +import { CompositeTransport, TransportRoutes } from '../transports' import { AdapterRequest, AdapterRequestData, @@ -46,6 +46,7 @@ export class AdapterEndpoint implements AdapterEndpo settings: T['Settings'], ) => string defaultTransport?: string + enableCompositeTransport?: boolean constructor(params: AdapterEndpointParams) { this.name = params.name @@ -55,6 +56,13 @@ export class AdapterEndpoint implements AdapterEndpo this.transportRoutes = params.transportRoutes this.customRouter = params.customRouter this.defaultTransport = params.defaultTransport + this.enableCompositeTransport = params.enableCompositeTransport + if (params.enableCompositeTransport && this.transportRoutes.routeNames().length < 2) { + throw new AdapterError({ + statusCode: 400, + message: `Composite transport requires at least 2 transports`, + }) + } } else { this.transportRoutes = new TransportRoutes().register( DEFAULT_TRANSPORT_NAME, @@ -83,7 +91,7 @@ export class AdapterEndpoint implements AdapterEndpo adapterSettings: T['Settings'], ): Promise { this.adapterName = adapterName - const responseCache = new ResponseCache({ + const responseCache = new SimpleResponseCache({ dependencies, adapterSettings: adapterSettings as AdapterSettings, adapterName, @@ -96,6 +104,14 @@ export class AdapterEndpoint implements AdapterEndpo responseCache, } + if (this.enableCompositeTransport && adapterSettings.COMPOSITE_TRANSPORT) { + logger.debug(`Enabling composite transport for endpoint "${this.name}"...`) + this.transportRoutes = new TransportRoutes().register( + DEFAULT_TRANSPORT_NAME, + new CompositeTransport(Object.fromEntries(this.transportRoutes.entries())), + ) + } + logger.debug(`Initializing transports for endpoint "${this.name}"...`) for (const [transportName, transport] of this.transportRoutes.entries()) { await transport.initialize(transportDependencies, adapterSettings, this.name, transportName) diff --git a/src/adapter/types.ts b/src/adapter/types.ts index 8d52246c..c33e6222 100644 --- a/src/adapter/types.ts +++ b/src/adapter/types.ts @@ -183,6 +183,9 @@ type MultiTransportAdapterEndpointParams = { /** If no value is returned from the custom router or the default (transport param), which transport to use */ defaultTransport?: string + + /** If true, roll all transportRoutes under a new CompositeTransport */ + enableCompositeTransport?: boolean } /** diff --git a/src/cache/response-cache/base.ts b/src/cache/response-cache/base.ts new file mode 100644 index 00000000..68d903aa --- /dev/null +++ b/src/cache/response-cache/base.ts @@ -0,0 +1,170 @@ +import { AdapterDependencies } from '../../adapter' +import { AdapterSettings } from '../../config' +import { + AdapterResponse, + makeLogger, + ResponseGenerics, + TimestampedAdapterResponse, + TimestampedProviderResult, + censor, + censorLogs, + TimestampedProviderErrorResponse, +} from '../../util' +import { + InputParameters, + InputParametersDefinition, + TypeFromDefinition, +} from '../../validation/input-params' +import { Cache, calculateAdapterName, calculateCacheKey, calculateFeedId } from '../' +import CensorList from '../../util/censor/censor-list' +import { validator } from '../../validation/utils' + +const logger = makeLogger('ResponseCache') + +export abstract class ResponseCache< + T extends { Parameters: InputParametersDefinition; Response: ResponseGenerics }, +> { + cache: Cache> + inputParameters: InputParameters + adapterName: string + endpointName: string + adapterSettings: AdapterSettings + dependencies: AdapterDependencies + + constructor({ + inputParameters, + adapterName, + endpointName, + adapterSettings, + dependencies, + }: { + dependencies: AdapterDependencies + adapterSettings: AdapterSettings + adapterName: string + endpointName: string + inputParameters: InputParameters + }) { + this.dependencies = dependencies + this.cache = dependencies.cache as Cache> + this.inputParameters = inputParameters + this.adapterName = adapterName + this.endpointName = endpointName + this.adapterSettings = adapterSettings + } + + /** + * Sets responses in the adapter cache (adding necessary metadata and defaults) + * + * @param transportName - transport name + * @param results - the entries to write to the cache + */ + abstract write(transportName: string, results: TimestampedProviderResult[]): Promise + + /** + * Sets responses with metadata in the adapter cache + * + * @param entries - the entries to write to the cache + */ + abstract writeEntries( + entries: { + key: string + value: AdapterResponse + }[], + ): Promise + + /** + * Sets a new TTL value for already cached responses in the adapter cache + * + * @param transportName - transport name + * @param params - set of parameters that uniquely relate to the response + * @param ttl - a new time in milliseconds until the response expires + */ + async writeTTL( + transportName: string, + params: TypeFromDefinition[], + ttl: number, + ): Promise { + for (const param of params) { + const key = this.getCacheKey(transportName, param) + this.cache.setTTL(key, ttl) + } + } + + async get(key: string) { + return this.cache.get(key) + } + + protected generateCacheEntry( + transportNameForMeta: string, + transportNameForCache: string, + r: TimestampedProviderResult, + ) { + const censorList = CensorList.getAll() + const { data, result, errorMessage } = r.response + if (!errorMessage && data === undefined) { + logger.warn('The "data" property of the response is undefined.') + } else if (!errorMessage && result === undefined) { + logger.warn('The "result" property of the response is undefined.') + } + let censoredResponse + if (!censorList.length) { + censoredResponse = r.response + } else { + try { + censoredResponse = censor(r.response, censorList, true) as TimestampedAdapterResponse< + T['Response'] + > + } catch (error) { + censorLogs(() => logger.error(`Error censoring response: ${error}`)) + censoredResponse = { + statusCode: 502, + errorMessage: 'Response could not be censored due to an error', + timestamps: r.response.timestamps, + } + } + } + + const response: AdapterResponse = { + ...censoredResponse, + statusCode: (censoredResponse as TimestampedProviderErrorResponse).statusCode || 200, + } + + if (this.adapterSettings.METRICS_ENABLED && this.adapterSettings.EXPERIMENTAL_METRICS_ENABLED) { + response.meta = { + adapterName: calculateAdapterName(this.adapterName, r.params), + transportName: transportNameForMeta, + metrics: { + feedId: calculateFeedId( + { + adapterSettings: this.adapterSettings, + }, + r.params, + ), + }, + } + } + + if (response.timestamps?.providerIndicatedTimeUnixMs !== undefined) { + const timestampValidator = validator.responseTimestamp() + const error = timestampValidator.fn(response.timestamps?.providerIndicatedTimeUnixMs) + if (error) { + censorLogs(() => logger.warn(`Provider indicated time is invalid: ${error}`)) + } + } + + return { + key: this.getCacheKey(transportNameForCache, r.params), + value: response, + } as const + } + + getCacheKey(transportName: string, params: TypeFromDefinition) { + return calculateCacheKey({ + transportName, + data: params, + adapterName: this.adapterName, + endpointName: this.endpointName, + adapterSettings: this.adapterSettings, + }) + } +} diff --git a/src/cache/response-cache/compare.ts b/src/cache/response-cache/compare.ts new file mode 100644 index 00000000..e329ebaf --- /dev/null +++ b/src/cache/response-cache/compare.ts @@ -0,0 +1,94 @@ +import { ResponseCache } from './base' +import { AdapterResponse, ResponseGenerics, TimestampedProviderResult } from '../../util' +import { InputParametersDefinition, TypeFromDefinition } from '../../validation/input-params' + +/** + * Compares with existing cache entries before deciding to write or not + */ +export class CompareResponseCache< + T extends { + Parameters: InputParametersDefinition + Response: ResponseGenerics + }, +> extends ResponseCache { + readonly transportName: string + // The actual cache where responses are written to + responseCache: ResponseCache + // A local map to keep track of the most recent entries written to the responseCache + // We compare with this first before comparing with value in cache + // so that we can reduce cache reads + localCache: Map> + // True if next should replace current in cache + shouldUpdate: ( + next: AdapterResponse, + current?: AdapterResponse, + ) => boolean + + constructor( + transportName: string, + responseCache: ResponseCache, + shouldUpdate: ( + next: AdapterResponse, + current?: AdapterResponse, + ) => boolean, + ) { + super({ + inputParameters: responseCache.inputParameters, + adapterName: responseCache.adapterName, + endpointName: responseCache.endpointName, + adapterSettings: responseCache.adapterSettings, + dependencies: responseCache.dependencies, + }) + this.transportName = transportName + this.responseCache = responseCache + this.localCache = new Map() + this.shouldUpdate = shouldUpdate + } + + async write(transportName: string, results: TimestampedProviderResult[]): Promise { + await this.writeEntries( + results.map((result) => this.generateCacheEntry(transportName, this.transportName, result)), + ) + } + + async writeEntries( + entries: { + key: string + value: AdapterResponse + }[], + ) { + const filteredEntries: { + key: string + value: AdapterResponse + }[] = [] + + for (const { key, value } of entries) { + if (!this.shouldUpdate(value, this.localCache.get(key))) { + continue + } + const entryInCache = await this.get(key) + if (!this.shouldUpdate(value, entryInCache)) { + continue + } + filteredEntries.push({ key, value }) + } + + await this.responseCache.writeEntries(filteredEntries) + + filteredEntries.forEach(({ key, value }) => { + this.localCache.set(key, value) + }) + } + + override async writeTTL( + _: string, + params: TypeFromDefinition[], + ttl: number, + ): Promise { + await this.responseCache.writeTTL(this.transportName, params, ttl) + } + + override async get(key: string) { + return this.responseCache.get(key) + } +} diff --git a/src/cache/response-cache/simple.ts b/src/cache/response-cache/simple.ts new file mode 100644 index 00000000..64e3249e --- /dev/null +++ b/src/cache/response-cache/simple.ts @@ -0,0 +1,46 @@ +import { ResponseCache } from './base' +import { AdapterResponse, ResponseGenerics, TimestampedProviderResult } from '../../util' +import { InputParametersDefinition } from '../../validation/input-params' +import * as cacheMetrics from '../metrics' + +/** + * Special type of cache to store responses for this adapter. + */ +export class SimpleResponseCache< + T extends { + Parameters: InputParametersDefinition + Response: ResponseGenerics + }, +> extends ResponseCache { + async writeEntries( + entries: { + key: string + value: AdapterResponse + }[], + ): Promise { + const ttl = this.adapterSettings.CACHE_MAX_AGE + await this.cache.setMany(entries, ttl) + + const now = Date.now() + for (const { key, value } of entries) { + // Only record metrics if feed Id is present, otherwise assuming value is not adapter response to record + const response = value as unknown as AdapterResponse + const feedId = response.meta?.metrics?.feedId + if (feedId) { + const providerTime = response.timestamps?.providerIndicatedTimeUnixMs + const timeDelta = providerTime ? now - providerTime : undefined + + // Record cache set count, max age, and staleness (set to 0 for cache set) + const label = cacheMetrics.cacheMetricsLabel(key, feedId, this.cache.type) + cacheMetrics.cacheSet(label, ttl, timeDelta) + } + } + + return + } + + async write(transportName: string, results: TimestampedProviderResult[]): Promise { + const entries = results.map((r) => this.generateCacheEntry(transportName, transportName, r)) + await this.writeEntries(entries) + } +} diff --git a/src/cache/response.ts b/src/cache/response.ts index 2fe961e4..e7294569 100644 --- a/src/cache/response.ts +++ b/src/cache/response.ts @@ -1,179 +1,3 @@ -import { AdapterDependencies } from '../adapter' -import { AdapterSettings } from '../config' -import { - AdapterResponse, - ResponseGenerics, - TimestampedAdapterResponse, - TimestampedProviderErrorResponse, - TimestampedProviderResult, - censor, - censorLogs, - makeLogger, -} from '../util' -import CensorList from '../util/censor/censor-list' -import { - InputParameters, - InputParametersDefinition, - TypeFromDefinition, -} from '../validation/input-params' -import { validator } from '../validation/utils' -import { Cache, calculateCacheKey, calculateFeedId, calculateAdapterName } from './' -import * as cacheMetrics from './metrics' - -const logger = makeLogger('ResponseCache') - -/** - * Special type of cache to store responses for this adapter. - */ -export class ResponseCache< - T extends { - Parameters: InputParametersDefinition - Response: ResponseGenerics - }, -> { - cache: Cache> - inputParameters: InputParameters - adapterName: string - endpointName: string - adapterSettings: AdapterSettings - - constructor({ - inputParameters, - adapterName, - endpointName, - adapterSettings, - dependencies, - }: { - dependencies: AdapterDependencies - adapterSettings: AdapterSettings - adapterName: string - endpointName: string - inputParameters: InputParameters - }) { - this.cache = dependencies.cache as Cache> - this.inputParameters = inputParameters - this.adapterName = adapterName - this.endpointName = endpointName - this.adapterSettings = adapterSettings - } - - /** - * Sets responses in the adapter cache (adding necessary metadata and defaults) - * - * @param results - the entries to write to the cache - */ - async write(transportName: string, results: TimestampedProviderResult[]): Promise { - const censorList = CensorList.getAll() - const entries = results.map((r) => { - const { data, result, errorMessage } = r.response - if (!errorMessage && data === undefined) { - logger.warn('The "data" property of the response is undefined.') - } else if (!errorMessage && result === undefined) { - logger.warn('The "result" property of the response is undefined.') - } - let censoredResponse - if (!censorList.length) { - censoredResponse = r.response - } else { - try { - censoredResponse = censor(r.response, censorList, true) as TimestampedAdapterResponse< - T['Response'] - > - } catch (error) { - censorLogs(() => logger.error(`Error censoring response: ${error}`)) - censoredResponse = { - statusCode: 502, - errorMessage: 'Response could not be censored due to an error', - timestamps: r.response.timestamps, - } - } - } - - const response: AdapterResponse = { - ...censoredResponse, - statusCode: (censoredResponse as TimestampedProviderErrorResponse).statusCode || 200, - } - - if ( - this.adapterSettings.METRICS_ENABLED && - this.adapterSettings.EXPERIMENTAL_METRICS_ENABLED - ) { - response.meta = { - adapterName: calculateAdapterName(this.adapterName, r.params), - metrics: { - feedId: calculateFeedId( - { - adapterSettings: this.adapterSettings, - }, - r.params, - ), - }, - } - } - - if (response.timestamps?.providerIndicatedTimeUnixMs !== undefined) { - const timestampValidator = validator.responseTimestamp() - const error = timestampValidator.fn(response.timestamps?.providerIndicatedTimeUnixMs) - if (error) { - censorLogs(() => logger.warn(`Provider indicated time is invalid: ${error}`)) - } - } - - return { - key: calculateCacheKey({ - transportName, - data: r.params, - adapterName: this.adapterName, - endpointName: this.endpointName, - adapterSettings: this.adapterSettings, - }), - value: response, - } as const - }) - - const ttl = this.adapterSettings.CACHE_MAX_AGE - await this.cache.setMany(entries, ttl) - - const now = Date.now() - for (const { key, value } of entries) { - // Only record metrics if feed Id is present, otherwise assuming value is not adapter response to record - const response = value as unknown as AdapterResponse - const feedId = response.meta?.metrics?.feedId - if (feedId) { - const providerTime = response.timestamps?.providerIndicatedTimeUnixMs - const timeDelta = providerTime ? now - providerTime : undefined - - // Record cache set count, max age, and staleness (set to 0 for cache set) - const label = cacheMetrics.cacheMetricsLabel(key, feedId, this.cache.type) - cacheMetrics.cacheSet(label, ttl, timeDelta) - } - } - - return - } - - /** - * Sets a new TTL value for already cached responses in the adapter cache - * - * @param transportName - transport name - * @param params - set of parameters that uniquely relate to the response - * @param ttl - a new time in milliseconds until the response expires - */ - async writeTTL( - transportName: string, - params: TypeFromDefinition[], - ttl: number, - ): Promise { - for (const param of params) { - const key = calculateCacheKey({ - transportName: transportName, - data: param, - adapterName: this.adapterName, - endpointName: this.endpointName, - adapterSettings: this.adapterSettings, - }) - - this.cache.setTTL(key, ttl) - } - } -} +export { ResponseCache } from './response-cache/base' +export { SimpleResponseCache } from './response-cache/simple' +export { CompareResponseCache } from './response-cache/compare' diff --git a/src/config/index.ts b/src/config/index.ts index 71006470..a97236f5 100644 --- a/src/config/index.ts +++ b/src/config/index.ts @@ -389,6 +389,11 @@ export const BaseSettingsDefinition = { 'Whether to enable debug enpoints (/debug/*) for this adapter. Enabling them might consume more resources.', default: false, }, + COMPOSITE_TRANSPORT: { + type: 'boolean', + description: 'Whether to use enableCompositeTransport parameter in AdapterEndpoint', + default: false, + }, } as const satisfies SettingsDefinitionMap export const buildAdapterSettings = < diff --git a/src/transports/abstract/subscription.ts b/src/transports/abstract/subscription.ts index ec563bbe..31385adf 100644 --- a/src/transports/abstract/subscription.ts +++ b/src/transports/abstract/subscription.ts @@ -19,6 +19,7 @@ export abstract class SubscriptionTransport impleme subscriptionSet!: SubscriptionSet> subscriptionTtl!: number name!: string + initialized = false async initialize( dependencies: TransportDependencies, @@ -26,6 +27,10 @@ export abstract class SubscriptionTransport impleme endpointName: string, name: string, ): Promise { + if (this.initialized) { + throw new Error(`Transport ${name} has already been initialized`) + } + this.initialized = true this.responseCache = dependencies.responseCache this.subscriptionSet = dependencies.subscriptionSetFactory.buildSet(endpointName, name) this.subscriptionTtl = this.getSubscriptionTtlFromConfig(adapterSettings) // Will be implemented by subclasses diff --git a/src/transports/composite.ts b/src/transports/composite.ts new file mode 100644 index 00000000..32d50348 --- /dev/null +++ b/src/transports/composite.ts @@ -0,0 +1,72 @@ +import { EndpointContext } from '../adapter' +import { CompareResponseCache } from '../cache/response-cache/compare' +import { ResponseCache } from '../cache/response' +import { makeLogger } from '../util' +import { AdapterRequest } from '../util/types' +import { TypeFromDefinition } from '../validation/input-params' +import type { Transport, TransportDependencies, TransportGenerics } from '.' + +const logger = makeLogger('CompositeTransport') + +// Send requests to multiple transports and merge responses into a single cache according to bigger providerIndicatedTimeUnixMs +export class CompositeTransport implements Transport { + name!: string + responseCache!: ResponseCache + + constructor(private readonly transports: Record>) {} + + async initialize( + dependencies: TransportDependencies, + adapterSettings: T['Settings'], + endpointName: string, + transportName: string, + ): Promise { + this.name = transportName + this.responseCache = dependencies.responseCache + + const compareCache = new CompareResponseCache( + transportName, + this.responseCache, + (next, current) => + (next.timestamps?.providerIndicatedTimeUnixMs ?? 0) > + (current?.timestamps?.providerIndicatedTimeUnixMs ?? 0), + ) + + await Promise.all( + Object.entries(this.transports).map(([name, transport]) => + transport.initialize( + { ...dependencies, responseCache: compareCache }, + adapterSettings, + endpointName, + name, + ), + ), + ) + } + + async registerRequest( + req: AdapterRequest>, + adapterSettings: T['Settings'], + ): Promise { + const results = await Promise.allSettled( + Object.values(this.transports).map((t) => t.registerRequest?.(req, adapterSettings)), + ) + results + .filter((r) => r.status === 'rejected') + .forEach((r) => { + logger.error(`Transport registerRequest failed: ${r.reason}`) + }) + } + + async backgroundExecute(context: EndpointContext): Promise { + const results = await Promise.allSettled( + Object.values(this.transports).map((t) => t.backgroundExecute?.(context)), + ) + + results + .filter((r) => r.status === 'rejected') + .forEach((r) => { + logger.error(`Transport backgroundExecute failed: ${r.reason}`) + }) + } +} diff --git a/src/transports/index.ts b/src/transports/index.ts index ffb8a6e4..4ef80e99 100644 --- a/src/transports/index.ts +++ b/src/transports/index.ts @@ -7,6 +7,7 @@ import { InputParametersDefinition, TypeFromDefinition } from '../validation/inp export * from './http' export * from './sse' export * from './websocket' +export * from './composite' /** * Helper struct type that will be used to pass types to the generic parameters of a Transport. diff --git a/src/util/types.ts b/src/util/types.ts index 4caf9e49..c0d5085e 100644 --- a/src/util/types.ts +++ b/src/util/types.ts @@ -89,6 +89,8 @@ export interface AdapterRequestMeta { export interface AdapterResponseMeta extends AdapterRequestMeta { /** Name of the adapter */ adapterName: string + /** Name of the transport */ + transportName: string } /** diff --git a/test/cache/response-cache/compare.test.ts b/test/cache/response-cache/compare.test.ts new file mode 100644 index 00000000..161ab252 --- /dev/null +++ b/test/cache/response-cache/compare.test.ts @@ -0,0 +1,104 @@ +import test from 'ava' +import { LocalCache } from '../../../src/cache/local' +import { CompareResponseCache } from '../../../src/cache/response-cache/compare' +import { SimpleResponseCache } from '../../../src/cache/response-cache/simple' +import { AdapterConfig } from '../../../src/config' +import { LoggerFactoryProvider } from '../../../src/util/logger' +import { InputParameters } from '../../../src/validation' +import { cacheTestInputParameters, CacheTestTransportTypes } from '../helper' +import { AdapterDependencies } from '../../../src/adapter' +import { metrics } from '../../../src/metrics' + +test.before(() => { + LoggerFactoryProvider.set() + metrics.initialize() +}) + +const buildSimpleCache = () => { + const config = new AdapterConfig({}) + config.initialize() + config.settings.METRICS_ENABLED = true + config.settings.EXPERIMENTAL_METRICS_ENABLED = true + config.validate() + + return new SimpleResponseCache({ + dependencies: { cache: new LocalCache(100) } as unknown as AdapterDependencies, + adapterSettings: config.settings, + adapterName: 'TEST', + endpointName: 'test', + inputParameters: new InputParameters(cacheTestInputParameters.definition), + }) +} + +const providerResult = (params: { base: string; factor: number }, result: number) => ({ + params, + response: { + data: null, + result, + timestamps: { + providerDataRequestedUnixMs: 0, + providerDataReceivedUnixMs: 0, + providerIndicatedTimeUnixMs: undefined, + }, + }, +}) + +test('writes under CompareResponseCache transportName', async (t) => { + const compareCache = new CompareResponseCache('merged', buildSimpleCache(), () => true) + + const params = { base: 'ETH', factor: 1 } + + await compareCache.write('ws', [providerResult(params, 42)]) + + t.is(await compareCache.get(compareCache.getCacheKey('ws', params)), undefined) + + const entry = await compareCache.get(compareCache.getCacheKey('merged', params)) + t.is(entry?.result, 42) + t.is(entry?.meta?.transportName, 'ws') +}) + +test('second write override first write', async (t) => { + const compareCache = new CompareResponseCache('merged', buildSimpleCache(), () => true) + + const params = { base: 'ETH', factor: 1 } + + await compareCache.write('ws', [providerResult(params, 1), providerResult(params, 2)]) + + t.is((await compareCache.get(compareCache.getCacheKey('merged', params)))?.result, 2) +}) + +test('shouldUpdate can block write when new value is not fresher than cache', async (t) => { + const compareCache = new CompareResponseCache( + 'merged', + buildSimpleCache(), + (next, current) => (next?.result || 0) > (current?.result || 0), + ) + + const params = { base: 'ETH', factor: 1 } + + await compareCache.write('merged', [providerResult(params, 50)]) + t.is((await compareCache.get(compareCache.getCacheKey('merged', params)))?.result, 50) + + await compareCache.write('merged', [providerResult(params, 25)]) + t.is((await compareCache.get(compareCache.getCacheKey('merged', params)))?.result, 50) + t.is(compareCache.localCache.size, 1) +}) + +test('shouldUpdate can block write without old value in localCache', async (t) => { + const simpleCache = buildSimpleCache() + + const compareCache = new CompareResponseCache( + 'merged', + simpleCache, + (next, current) => (next?.result || 0) > (current?.result || 0), + ) + + const params = { base: 'ETH', factor: 1 } + + await simpleCache.write('merged', [providerResult(params, 100)]) + t.is((await compareCache.get(compareCache.getCacheKey('merged', params)))?.result, 100) + + await compareCache.write('merged', [providerResult(params, 25)]) + t.is((await compareCache.get(compareCache.getCacheKey('merged', params)))?.result, 100) + t.is(compareCache.localCache.size, 0) +}) diff --git a/test/cache/response-cache.test.ts b/test/cache/response-cache/simple.test.ts similarity index 95% rename from test/cache/response-cache.test.ts rename to test/cache/response-cache/simple.test.ts index b01ae581..9fda037e 100644 --- a/test/cache/response-cache.test.ts +++ b/test/cache/response-cache/simple.test.ts @@ -1,18 +1,18 @@ import { Clock as InstalledClock } from '@sinonjs/fake-timers' -import { installTimers } from '../helper' +import { installTimers } from '../../helper' import untypedTest, { TestFn } from 'ava' import { FastifyInstance } from 'fastify' -import { Adapter, AdapterEndpoint } from '../../src/adapter' -import { AdapterConfig, SettingsDefinitionFromConfig } from '../../src/config' -import { AdapterRequest } from '../../src/util' -import { TypeFromDefinition } from '../../src/validation/input-params' +import { Adapter, AdapterEndpoint } from '../../../src/adapter' +import { AdapterConfig, SettingsDefinitionFromConfig } from '../../../src/config' +import { AdapterRequest } from '../../../src/util' +import { TypeFromDefinition } from '../../../src/validation/input-params' import { NopTransport, TestAdapter, assertEqualResponses, runAllUntilTime, -} from '../../src/util/testing-utils' -import { cacheTestInputParameters, CacheTestTransportTypes } from './helper' +} from '../../../src/util/testing-utils' +import { cacheTestInputParameters, CacheTestTransportTypes } from '../helper' const test = untypedTest as TestFn<{ clock: InstalledClock diff --git a/test/metrics/metrics.test.ts b/test/metrics/metrics.test.ts index ef3d7399..6b9730ca 100644 --- a/test/metrics/metrics.test.ts +++ b/test/metrics/metrics.test.ts @@ -332,6 +332,7 @@ test.serial('validate response.meta has the correct properties', async (t) => { t.deepEqual(response.meta, { adapterName: 'TEST', metrics: { feedId: '{"from":"eth","to":"usd"}' }, + transportName: 'default_single_transport', }) }) diff --git a/test/transports/composite.test.ts b/test/transports/composite.test.ts new file mode 100644 index 00000000..de97258f --- /dev/null +++ b/test/transports/composite.test.ts @@ -0,0 +1,291 @@ +import { installTimers } from '../helper' +import untypedTest, { TestFn } from 'ava' +import axios, { AxiosResponse } from 'axios' +import MockAdapter from 'axios-mock-adapter' +import { FastifyInstance } from 'fastify' +import { Adapter, AdapterEndpoint, EndpointContext } from '../../src/adapter' +import { AdapterError } from '../../src/validation/error' +import { AdapterConfig } from '../../src/config' +import { + HttpTransport, + Transport, + TransportDependencies, + TransportGenerics, + TransportRoutes, +} from '../../src/transports' +import { ResponseCache } from '../../src/cache/response' +import { AdapterRequest } from '../../src/util/types' +import { TestAdapter } from '../../src/util/testing-utils' +import { TypeFromDefinition } from '../../src/validation/input-params' +import { cacheTestInputParameters, CacheTestTransportTypes } from '../cache/helper' + +const test = untypedTest as TestFn<{ + clock: ReturnType + testAdapter: TestAdapter + api: FastifyInstance | undefined + ws: CountingCacheHttpTransport + rest: CountingCacheHttpTransport +}> + +process.env['CACHE_POLLING_MAX_RETRIES'] = '20' +process.env['CACHE_POLLING_SLEEP_MS'] = '10' +process.env['RETRY'] = '0' +process.env['BACKGROUND_EXECUTE_MS_HTTP'] = '1' +process.env['API_TIMEOUT'] = '0' + +const WS_PROVIDER = 'http://ea-composite-ws.test' +const REST_PROVIDER = 'http://ea-composite-rest.test' + +const axiosMock = new MockAdapter(axios) + +type CacheTestHttpTypes = CacheTestTransportTypes & { + Provider: { + RequestBody: unknown + ResponseBody: { result: number; ts?: number } + } +} + +class CountingCacheHttpTransport extends HttpTransport { + registerRequestCalls = 0 + + constructor(logicalName: string, baseURL: string) { + super({ + prepareRequests: (params) => + params.map((p) => ({ + params: [p], + request: { + baseURL, + url: '/price', + method: 'GET', + params: { base: p.base, factor: p.factor }, + }, + })), + parseResponse: (params, res: AxiosResponse<{ result: number; ts?: number }>) => + params.map((p) => ({ + params: p, + response: { + data: null, + result: res.data.result, + timestamps: { + providerDataRequestedUnixMs: 0, + providerDataReceivedUnixMs: 0, + providerIndicatedTimeUnixMs: res.data.ts ?? 1, + }, + }, + })), + }) + this.name = logicalName + } + + override async registerRequest( + req: AdapterRequest>, + settings: CacheTestHttpTypes['Settings'], + ): Promise { + this.registerRequestCalls++ + return super.registerRequest(req, settings) + } +} + +test.before(async (t) => { + t.context.clock = installTimers() + + const ws = new CountingCacheHttpTransport('ws', WS_PROVIDER) + const rest = new CountingCacheHttpTransport('rest', REST_PROVIDER) + t.context.ws = ws + t.context.rest = rest + + const adapter = new Adapter({ + name: 'TEST', + defaultEndpoint: 'test', + endpoints: [ + new AdapterEndpoint({ + name: 'test', + inputParameters: cacheTestInputParameters, + enableCompositeTransport: true, + transportRoutes: new TransportRoutes() + .register('ws', ws) + .register('rest', rest), + }), + ], + config: new AdapterConfig({}, { envDefaultOverrides: { COMPOSITE_TRANSPORT: true } }), + }) + + await TestAdapter.start(adapter, t.context) +}) + +test.after(async (t) => { + await t.context.testAdapter?.api.close() +}) + +test.afterEach((t) => { + t.context.ws.registerRequestCalls = 0 + t.context.rest.registerRequestCalls = 0 +}) + +test.serial( + 'composite transport returns value from working transport when one transport fails to produce a value', + async (t) => { + axiosMock.onGet(`${WS_PROVIDER}/price`, { params: { base: 'ETH', factor: 5 } }).reply(500) + axiosMock + .onGet(`${REST_PROVIDER}/price`, { params: { base: 'ETH', factor: 5 } }) + .reply(200, { result: 42, ts: 100 }) + + const res = await t.context.testAdapter.request({ base: 'ETH', factor: 5 }) + + t.is(res.statusCode, 200) + t.is(res.json().result, 42) + t.is(t.context.ws.registerRequestCalls, 1) + t.is(t.context.rest.registerRequestCalls, 1) + }, +) + +test.serial( + 'composite transport merges child writes by providerIndicatedTimeUnixMs when run under an adapter', + async (t) => { + axiosMock + .onGet(`${WS_PROVIDER}/price`, { params: { base: 'BTC', factor: 3 } }) + .reply(200, { result: 10, ts: 1000 }) + axiosMock + .onGet(`${REST_PROVIDER}/price`, { params: { base: 'BTC', factor: 3 } }) + .reply(200, { result: 100, ts: 2000 }) + + t.is(t.context.ws.name, 'ws') + t.is(t.context.rest.name, 'rest') + + const res = await t.context.testAdapter.request({ base: 'BTC', factor: 3, transport: 'rest' }) + + t.is(res.statusCode, 200) + t.is(res.json().result, 100) + t.is(t.context.ws.registerRequestCalls, 1) + t.is(t.context.rest.registerRequestCalls, 1) + }, +) + +class ThrowingTransport implements Transport { + name!: string + responseCache!: ResponseCache + + async initialize( + dependencies: TransportDependencies, + _adapterSettings: T['Settings'], + _endpointName: string, + transportName: string, + ): Promise { + this.name = transportName + this.responseCache = dependencies.responseCache + } + + async registerRequest( + _req: AdapterRequest>, + _adapterSettings: T['Settings'], + ): Promise { + throw new Error('ThrowingTransport.registerRequest intentional error') + } + + async backgroundExecute(_context: EndpointContext): Promise { + throw new Error('ThrowingTransport.backgroundExecute intentional error') + } +} + +test.serial( + 'composite transport returns value from working transport when the other transport throws in registerRequest and backgroundExecute', + async (t) => { + const workingTransport = new CountingCacheHttpTransport('working', WS_PROVIDER) + const throwingTransport = new ThrowingTransport() + + const adapter = new Adapter({ + name: 'TEST_THROWING', + defaultEndpoint: 'test', + endpoints: [ + new AdapterEndpoint({ + name: 'test', + inputParameters: cacheTestInputParameters, + enableCompositeTransport: true, + transportRoutes: new TransportRoutes() + .register('working', workingTransport) + .register('throwing', throwingTransport), + }), + ], + config: new AdapterConfig({}, { envDefaultOverrides: { COMPOSITE_TRANSPORT: true } }), + }) + + const localContext = { clock: t.context.clock } as typeof t.context + const localAdapter = await TestAdapter.start(adapter, localContext) + + axiosMock + .onGet(`${WS_PROVIDER}/price`, { params: { base: 'LINK', factor: 2 } }) + .reply(200, { result: 77, ts: 100 }) + + const res = await localAdapter.request({ base: 'LINK', factor: 2 }) + + t.is(res.statusCode, 200) + t.is(res.json().result, 77) + t.is(workingTransport.registerRequestCalls, 1) + + await localAdapter.api.close() + }, +) + +test.serial( + 'enableCompositeTransport does not use composite routing when COMPOSITE_TRANSPORT is false', + async (t) => { + const ws = new CountingCacheHttpTransport('ws', WS_PROVIDER) + const rest = new CountingCacheHttpTransport('rest', REST_PROVIDER) + + const adapter = new Adapter({ + name: 'TEST_COMPOSITE_OFF', + defaultEndpoint: 'test', + endpoints: [ + new AdapterEndpoint({ + name: 'test', + inputParameters: cacheTestInputParameters, + enableCompositeTransport: true, + transportRoutes: new TransportRoutes() + .register('ws', ws) + .register('rest', rest), + }), + ], + config: new AdapterConfig({}, { envDefaultOverrides: { COMPOSITE_TRANSPORT: false } }), + }) + + const localContext = { clock: t.context.clock } as typeof t.context + const localAdapter = await TestAdapter.start(adapter, localContext) + + axiosMock + .onGet(`${REST_PROVIDER}/price`, { params: { base: 'SOL', factor: 7 } }) + .reply(200, { result: 99, ts: 50 }) + + const res = await localAdapter.request({ base: 'SOL', factor: 7, transport: 'rest' }) + + t.is(res.statusCode, 200) + t.is(res.json().result, 99) + t.is(ws.registerRequestCalls, 0) + t.is(rest.registerRequestCalls, 1) + + await localAdapter.api.close() + }, +) + +test.serial( + 'AdapterEndpoint throws when enableCompositeTransport is true with only one transport', + (t) => { + const onlyTransport = new CountingCacheHttpTransport('only', WS_PROVIDER) + + const error = t.throws( + () => + new AdapterEndpoint({ + name: 'test', + inputParameters: cacheTestInputParameters, + enableCompositeTransport: true, + transportRoutes: new TransportRoutes().register( + 'only', + onlyTransport, + ), + }), + { instanceOf: AdapterError }, + ) + + t.is(error?.message, 'Composite transport requires at least 2 transports') + t.is(error?.statusCode, 400) + }, +) diff --git a/test/transports/routing.test.ts b/test/transports/routing.test.ts index 99f10478..31428910 100644 --- a/test/transports/routing.test.ts +++ b/test/transports/routing.test.ts @@ -269,12 +269,17 @@ class MockSseTransport extends SseTransport { } } -const transports = new TransportRoutes() - .register('websocket', new MockWebSocketTransport()) - .register('batch', new MockHttpTransport()) - .register('sse', new MockSseTransport()) +function createTransportRoutes(): TransportRoutes { + return new TransportRoutes() + .register('websocket', new MockWebSocketTransport()) + .register('batch', new MockHttpTransport()) + .register('sse', new MockSseTransport()) +} + +let transports: TransportRoutes test.beforeEach(async (t) => { + transports = createTransportRoutes() const sampleEndpoint = new AdapterEndpoint({ inputParameters, name: 'price', // /price @@ -401,11 +406,67 @@ test.serial('endpoint routing can route to SSE transport', async (t) => { t.assert(internalTransport.registerRequestCalls > 0) }) +test.serial( + 'single endpoint cannot register the same transport instance under two route names', + async (t) => { + const sharedWs = new MockWebSocketTransport() + + const adapter = new Adapter({ + name: 'SHARED', + defaultEndpoint: 'price', + config: new AdapterConfig(settings, {}), + endpoints: [ + new AdapterEndpoint({ + inputParameters, + name: 'price', + transportRoutes: new TransportRoutes() + .register('primary', sharedWs) + .register('mirror', sharedWs), + }), + ], + }) + + await t.throwsAsync(async () => adapter.initialize(), { + message: 'Transport mirror has already been initialized', + }) + }, +) + +test.serial( + 'two endpoints on the same adapter cannot share the same transport instances', + async (t) => { + const sharedRoutes = createTransportRoutes() + + const adapter = new Adapter({ + name: 'SHAREDEP', + defaultEndpoint: 'price', + config: new AdapterConfig(settings), + endpoints: [ + new AdapterEndpoint({ + inputParameters, + name: 'price', + transportRoutes: sharedRoutes, + }), + new AdapterEndpoint({ + inputParameters, + name: 'quote', + transportRoutes: sharedRoutes, + }), + ], + }) + + await t.throwsAsync(async () => adapter.initialize(), { + message: 'Transport websocket has already been initialized', + }) + }, +) + test.serial('custom router is applied to get valid transport to route to', async (t) => { + const testTransports = createTransportRoutes() const endpoint = new AdapterEndpoint({ inputParameters, name: 'price', // /price - transportRoutes: transports, + transportRoutes: testTransports, customRouter: () => 'batch', }) @@ -459,15 +520,16 @@ test.serial('custom router is applied to get valid transport to route to', async }) t.is(error.statusCode, 504) - const internalTransport = transports.get('batch') as unknown as MockHttpTransport + const internalTransport = testTransports.get('batch') as unknown as MockHttpTransport t.assert(internalTransport.registerRequestCalls > 0) }) test.serial('custom router returns invalid transport and request fails', async (t) => { + const testTransports = createTransportRoutes() const endpoint = new AdapterEndpoint({ inputParameters, name: 'price', // /price - transportRoutes: transports, + transportRoutes: testTransports, customRouter: () => 'qweqwe', }) @@ -527,10 +589,11 @@ test.serial('custom router returns invalid transport and request fails', async ( }) test.serial('missing transport in input params with no default fails request', async (t) => { + const testTransports = createTransportRoutes() const endpoint = new AdapterEndpoint({ inputParameters, name: 'price', // /price - transportRoutes: transports, + transportRoutes: testTransports, }) const customConfig = new AdapterConfig(settings, { @@ -588,10 +651,11 @@ test.serial('missing transport in input params with no default fails request', a }) test.serial('missing transport in input params with default succeeds', async (t) => { + const testTransports = createTransportRoutes() const endpoint = new AdapterEndpoint({ inputParameters, name: 'price', // /price - transportRoutes: transports, + transportRoutes: testTransports, defaultTransport: 'batch', }) @@ -644,7 +708,7 @@ test.serial('missing transport in input params with default succeeds', async (t) }) t.is(error.statusCode, 504) - const internalTransport = transports.get('batch') as unknown as MockHttpTransport + const internalTransport = testTransports.get('batch') as unknown as MockHttpTransport t.assert(internalTransport.registerRequestCalls > 0) }) @@ -789,10 +853,11 @@ test.serial('invalid transport override is skipped', async (t) => { test.serial( 'transport and transport override are ignored when custom router returns a value', async (t) => { + const testTransports = createTransportRoutes() const endpoint = new AdapterEndpoint({ inputParameters, name: 'price', // /price - transportRoutes: transports, + transportRoutes: testTransports, customRouter: () => 'batch', }) @@ -851,7 +916,7 @@ test.serial( }) t.is(error.statusCode, 504) - const internalTransport = transports.get('batch') as unknown as MockHttpTransport + const internalTransport = testTransports.get('batch') as unknown as MockHttpTransport t.assert(internalTransport.registerRequestCalls > 0) }, )