Skip to content

RANGER-4676, RANGER-5615: Add OpenSearch Dispatcher to Ranger Audit Server#986

Open
paras200 wants to merge 3 commits into
apache:masterfrom
paras200:RANGER-5615
Open

RANGER-4676, RANGER-5615: Add OpenSearch Dispatcher to Ranger Audit Server#986
paras200 wants to merge 3 commits into
apache:masterfrom
paras200:RANGER-5615

Conversation

@paras200

@paras200 paras200 commented May 28, 2026

Copy link
Copy Markdown

Adds a new OpenSearch dispatcher module to the Ranger Audit Server that consumes audit events from Kafka and bulk-indexes them into OpenSearch, providing an alternative to the Solr-based audit store.

Core dispatcher module (audit-server/audit-dispatcher/dispatcher-opensearch):

  • OpenSearchDispatcherManager — lifecycle manager with retry-based initialization (exponential backoff, max 5 attempts) and graceful shutdown
  • AuditOpenSearchDispatcher — Kafka consumer that batches audit events and writes them to OpenSearch via the _bulk API using the low-level RestClient
  • Supports basic auth and Kerberos/SPNEGO authentication for OpenSearch connections
  • Document ID deduplication — uses audit.eventId as _id in bulk metadata, falls back to UUID when absent
  • Error handling with partition seek-back and retry sleep on batch failures

Shared mapping (audit-server/audit-dispatcher/dispatcher-common):

  • AuditEventDocMapper — canonical 27-field event-to-document mapper, reusable across dispatcher destinations

Configuration & packaging (distro):

  • Per-dispatcher logback support (logback-opensearch.xml) in start-audit-dispatcher.sh
  • Assembly descriptor updated to package the opensearch dispatcher module

Docker & E2E infrastructure (dev-support/ranger-docker):

  • docker-compose.ranger-audit-dispatcher-opensearch.yml for the dispatcher container
  • KDC healthcheck + ZK depends_on: service_healthy to fix keytab provisioning race condition
  • e2e-audit-opensearch.sh — single-command end-to-end test script (start → validate → teardown)
  • Helper scripts: create-ranger-audit-topic.sh, create-ranger-audit-index.sh

Bug fix:

  • Fix ElasticSearchMgr.connect() to return the client on first connection (missing me = client assignment)

How was this patch tested?

Unit tests:

  • TestAuditOpenSearchDispatcher (6 tests) — validates bulk request formatting, document field mapping, HTTP error handling, item-level error detection, UUID generation for missing event IDs
  • TestOpenSearchDispatcherManager (5 tests) — validates dispatcher type filtering, disabled destination handling, fail-fast when dispatcher class cannot be instantiated
  • TestAuditEventDocMapper (4 tests) — validates all 27 fields are correctly mapped from AuthzAuditEvent to document

End-to-end test (./scripts/audit/e2e-audit-opensearch.sh):

  • Full Docker stack: KDC → ZK → Kafka → Ranger Admin → Audit Ingestor → OpenSearch → OpenSearch Dispatcher
  • Posts a SPNEGO-authenticated audit event through the ingestor REST API
  • Verifies the document is indexed in OpenSearch with the correct _id (marker-based assertion)
  • Validates all service health endpoints and container states
  • Automated teardown on exit (or --no-teardown for debugging)

Pipeline validated: Plugin → Ingestor → Kafka → Dispatcher → OpenSearch

RestClientBuilder restClientBuilder = getRestClientBuilder(urls, protocol, user, password, port);

client = new RestHighLevelClient(restClientBuilder);
me = client;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please correct the description.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed — updated the PR description. The actual change is a bug fix: connect() was not assigning the newly created RestHighLevelClient to the return variable me, so the method could return null on first invocation.

audit_elasticsearch_password=NONE
audit_elasticsearch_index=ranger_audits
audit_elasticsearch_bootstrap_enabled=true

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How to differentiate whether it is open search or elasticsearch

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From Ranger Admin's perspective, OpenSearch and Elasticsearch use the same REST API (wire-compatible) — there is no separate audit_store=opensearch value. The Admin connects using audit_store=elasticsearch and the audit_elasticsearch_* properties regardless of whether the backend is Elasticsearch or OpenSearch. The differentiation happens at the dispatcher level (separate dispatcher-opensearch module with its own class), not at the Admin level. This properties file is a Docker dev profile where the Admin queries OpenSearch directly for the audit UI — the connection is API-compatible.

<value>ranger_audits</value>
</property>

<property>

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Description correctly states unauthenticated OpenSearch is allowed. Cross-link to production hardening (require user/password when xasecure.audit.destination.elasticsearch=true).

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done — updated the user/password property descriptions to explicitly note (dev only) for empty values and added Production: configure user/password or Kerberos keytab guidance.

private static final String TYPE_OPENSEARCH =
"opensearch";
/** Property controlling OpenSearch destination. */
private static final String ES_DEST_PROP =

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Module is dispatcher-opensearch but config uses xasecure.audit.destination.elasticsearch.*. Intentional for backward compatibility — please document in README/site XML so operators do not search for opensearch.urls.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done — added an XML comment block in the site XML explaining this:

<!--
    OPENSEARCH DESTINATION CONFIGURATION
    NOTE: OpenSearch is wire-compatible with Elasticsearch. Config uses the
    xasecure.audit.destination.elasticsearch.* namespace for interoperability
    with Ranger Admin audit queries (which read from the same index).
-->

This way operators understand why they won't find opensearch.urls and should use the elasticsearch.* keys instead.

if (clsName != null && clsName.contains(
"AuditOpenSearchDispatcher")) {
isEnabled = true;
props.setProperty(ES_DEST_PROP, "true");

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When dispatcher class name contains AuditOpenSearchDispatcher, code sets props.setProperty(ES_DEST_PROP, "true"). Side effect on the shared config object may surprise other components reading the same Properties. Prefer local flag instead of mutating props.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch — removed props.setProperty(ES_DEST_PROP, "true"). The enablement decision now stays in the local isEnabled boolean without mutating the shared Properties object.

"type": "long"
}
}
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AuditEventDocMapper indexes additionalInfo (line 93–94) but this schema has no additionalInfo property. Relying on dynamic mapping may cause type conflicts vs security-admin/contrib/elasticsearch_for_audit_setup/conf/ranger_es_schema.json. Align docker schema with contrib schema for Admin audit UI search.

@paras200 paras200 May 28, 2026

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed — added additionalInfo as a text field to the Docker schema. Note that the contrib schema (security-admin/contrib/elasticsearch_for_audit_setup/conf/ranger_es_schema.json) has the same gap — both were relying on dynamic mapping for this field. I've aligned the Docker schema; the contrib schema fix can be addressed in a follow-up. I can raise a JIRA related to this.

* @param auditEvent the audit event to convert
* @return map of field names to values
*/
public static Map<String, Object> toDoc(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@paras200 - please avoid breaking statements into multiple lines. Such splits make it harder to read the code. Days of 80-character max width are long gone! Let's make it a little easier for folks to read the code in widescreens that are easily capable of 200+ characters :-).

Please review and update entire contents of this PR.

LOG.info("Skipping OpenSearchDispatcherManager"
    + " initialization since dispatcher"
    + " type is {}", dispatcherType);

vs

LOG.info("Skipping OpenSearchDispatcherManager initialization since dispatcher type is {}", dispatcherType);

@paras200 paras200 Jun 4, 2026

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done — This was the result of running the checkstyle within the code base, now I have removed all multi-line string concatenation splits and unnecessary line breaks across OpenSearchDispatcherManager, AuditOpenSearchDispatcher, and AuditEventDocMapper (now renamed). Also stripped verbose Javadoc comments that didn't add value beyond what the identifiers already communicate. All statements now fit naturally on a single line.

/**
* Maps {@link AuthzAuditEvent} to an OpenSearch document.
*/
public final class AuditEventDocMapper {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AuditEventDocMapper class is referenced only in dispatcher-opensearch project. I suggest moving this class from dispatcher-common to dispatcher-opensearch module.

@paras200 paras200 Jun 4, 2026

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done — moved from dispatcher-common to dispatcher-opensearch. Also renamed to AuditEventOpenSearchDocMapper to make its scope explicit now that it lives in the OpenSearch module. Test moved and renamed accordingly.

String pfx = propPrefix + ".";

this.openSearchIndex = MiscUtil.getStringProperty(
props, ES_DEST_PREFIX + ".index", DEFAULT_INDEX);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the hardcoded prefix used here, instead of the one received via method parameter propPrefix? Review other references in lines 257 to 265 as well and replace them. Also, remove the constant ES_DEST_PREFIX in line 79.

@paras200 paras200 Jun 4, 2026

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done — removed ES_DEST_PREFIX constant. init() and createOpenSearchClient() now use the propPrefix parameter for all OpenSearch connection property lookups (.urls, .port, .protocol, .index, .user, .password). Updated both site XML configs to use the ranger.audit.dispatcher.* namespace for these properties.

List<String> auditBatch = new ArrayList<>();
List<ConsumerRecord<String, String>>
recordList = new ArrayList<>();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using ConsumerRecords.partitions() to iterate and commit. recordList created here is not necessary with this approach.

for (TopicPartition tp : records.partitions()) {
  List<ConsumerRecord<String, String>> tpRecords = records.records(tp);

  if (tpRecords.isEmpty()) {
    continue;
  }

  try {
    List<String> auditBatch = tpRecords.streams().map(ConsumerRecord<String, String>::value).collect(Collectors.toList());

    processMessageBatch(auditBatch);

    ConsumerRecord<String, String> last = tpRecords.get(tpRecords.size() - 1);

    pendingOffsets.put(tp, new OffsetAndMetadata(last.offset() + 1));

    messagesProcessedSinceLastCommit.addAndGet(tpRecords.size());
  } catch (Exception ex) {
    ConsumerRecord<String, String> first = tpRecords.get(0);

    pendingOffsets.put(tp, new OffsetAndMetadata(first.offset()));

    try {
      workerDispatcher.seek(tp, first.offset());
    } catch (Exception e) {
      ...
    }
  }
}

@paras200 paras200 Jun 4, 2026

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done — refactored processRecordBatch to iterate via records.partitions() with per-partition try/catch.

String clsStr = MiscUtil.getStringProperty(
props,
AuditServerConstants.PROP_DISPATCHER_CLASS,
"org.apache.ranger.audit.dispatcher"

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest replacing "org.apache.ranger.audit.dispatcher.kafka.AuditOpenSearchDispatcher" with AuditOpenSearchDispatcher.class.getName()

@paras200 paras200 Jun 4, 2026

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done — replaced the fully-qualified collection names with AuditOpenSearchDispatcher.class.getName().

}
}

private void init(final Properties props, final String propPrefix) throws Exception {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be consistent with rest of the code base, please keep private methods after all public and protected methods. Please make sure to browse through a short list of coding guidelines at https://cwiki.apache.org/confluence/display/RANGER/Coding+guidelines.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done — reviewed the coding guidelines and reordered. After making processMessageBatch private (per your other comment), the class now follows: public constructor → protected overrides (getDispatcherName, createDispatcherWorker, shutdownDestination) → private methods (init, processMessageBatch, createOpenSearchClient, isCredentialConfigured) → private inner class. No public methods exist after the protected block.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made changes as per this new guideline

LOG.info("<== AuditOpenSearchDispatcher.init()");
}

public final void processMessageBatch(final Collection<String> audits) throws Exception {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

processMessageBatch() called only from this class; consider marking as private.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done — changed to private. It's only called from the inner OpenSearchDispatcherWorker.processRecordBatch(). Updated the unit test to invoke via reflection.

private static final Logger LOG = LoggerFactory.getLogger(OpenSearchDispatcherManager.class);
private static final String CONFIG_DISPATCHER_TYPE = AuditServerConstants.PROP_DISPATCHER_TYPE;
private static final String TYPE_OPENSEARCH = "opensearch";
private static final String ES_DEST_PROP = "xasecure.audit.destination.elasticsearch";

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it necessary to use existing configuration for Elasticsearch - xasecure.audit.destination.elasticsearch? I suggest using xasecure.audit.destination.opensearch - unless there are any issues.

@paras200 paras200 Jun 9, 2026

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current approach reuses xasecure.audit.destination.elasticsearch as explained below:

The dispatcher uses the Elasticsearch low-level RestClient (not RestHighLevelClient) to talk to OpenSearch, which is wire-compatible with ES. The enablement flag xasecure.audit.destination.elasticsearch=true was chosen because the dispatcher doesn't have a corresponding OpenSearchAuditDestination class in agents-audit — unlike Solr, where dispatcher-solr delegates to the existing SolrAuditDestination class and shares the xasecure.audit.destination.solr.* namespace.

Renaming is one option, however, xasecure.audit.destination.opensearch creates a half-measure: the property name implies an OpenSearchAuditDestination class exists (following the pattern in AuditProviderFactory.getProviderFromConfig()), but it doesn't — there's no "opensearch" case mapped in that factory, and plugins couldn't write directly to OpenSearch.

Proposed approach (Maybe a follow up PR): To do this properly, we can introduce a full OpenSearchAuditDestination class in agents-audit (following the SolrAuditDestination pattern), register "opensearch" in AuditProviderFactory, and have the dispatcher delegate to it — exactly as dispatcher-solr delegates to SolrAuditDestination. This would:

  1. Enable xasecure.audit.destination.opensearch=true with proper semantics
  2. Allow plugins to write audits directly to OpenSearch (bypassing Kafka) if needed
  3. Share connection/write logic between the plugin path and the dispatcher
  4. Use the xasecure.audit.destination.opensearch.* namespace consistently for urls/port/index/user/password

As this PR is to implement OpenSearch Dispatcher for audit server, I've kept the elasticsearch namespace since the dispatcher's inlined REST logic is functionally equivalent to talking to an ES-compatible backend. The dedicated OpenSearchAuditDestination + namespace rename can be done in a separate JIRA and a followup PR.

Would you prefer we add OpenSearchAuditDestination in this PR itself, or is a follow-up acceptable?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding to my earlier note — introducing OpenSearchAuditDestination would also unblock upgrading to latest OpenSearch server versions (currently 3.6.0). Today we're pinned to OpenSearch 1.3.19 because we reuse the Elasticsearch RestClient from ranger-audit-dest-es (which pulls ES client 7.17.x). OpenSearch 2.0+ removed type-based APIs that the ES HLRC depends on, so a direct server upgrade isn't possible without a client migration.

The OpenSearchAuditDestination work would require introducing these new dependencies:

  • org.opensearch.client:opensearch-java:3.1.0 — the official OpenSearch Java client (Apache 2.0, supports OpenSearch 1.x–3.x)
  • org.opensearch.client:opensearch-rest-client:3.1.0 — low-level REST transport layer

This replaces the current reliance on org.elasticsearch.client:elasticsearch-rest-client:7.17.x and org.elasticsearch.client:elasticsearch-rest-high-level-client:7.17.x for the OpenSearch path.

This is a non-trivial effort — the opensearch-java client has a different API surface (builder-based, no type parameters), so it's not a drop-in replacement. It will need its own set of integration tests against OpenSearch 2.x/3.x. I'd suggest handling this as a separate JIRA/PR to keep the scope manageable.

@paras200 paras200 force-pushed the RANGER-5615 branch 3 times, most recently from 788d080 to dac9739 Compare June 9, 2026 16:36
<artifactId>mockito-core</artifactId>
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JUnit/Mockito were added to dispatcher-common but the mapper moved out and there are no tests there. Remove those dependencies unless tests are added.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done — removed JUnit and Mockito test dependencies from dispatcher-common/pom.xml. No tests exist in that module after the mapper class moved to dispatcher-opensearch.

@ramackri

Copy link
Copy Markdown
Contributor

@paras200
The Maven build failed at the audit-dispatcher-opensearch module during the checkstyle-check goal with 16 Checkstyle violations.

@paras200 paras200 force-pushed the RANGER-5615 branch 2 times, most recently from 188138d to 328cd6b Compare June 11, 2026 08:59
@paras200

Copy link
Copy Markdown
Author

@ramackri The checkstyle violations are all LineLength > 80 and JavadocVariable (missing Javadoc on private fields). These are the same class of violations present across the existing dispatcher modules — for example, dispatcher-solr has 101 identical violations and passes CI without issue.

The 80-character line limit is explicitly deprecated per the Apache Ranger Java Style Guide which sets the column limit at 512 characters. @mneethiraj also confirmed in this PR review: "days of 80-character max width are long gone."

The JavadocVariable violations are on private constants/fields — adding Javadoc to these would contradict the coding guideline to avoid unnecessary comments when identifiers are self-explanatory.

The CI build-17 failure is unrelated to checkstyle — it's a timeout in TestGdsREST in security-admin (same issue affecting other PRs like RANGER-5637). The checkstyle plugin is not configured as a CI gate for this module.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants