Skip to content

HDDS-14005. EventNotification: EventNotification: add the plugin framework#10095

Open
gardenia wants to merge 2 commits intoapache:HDDS-13513_Event_Notification_FeatureBranchfrom
gardenia:HDDS-14005
Open

HDDS-14005. EventNotification: EventNotification: add the plugin framework#10095
gardenia wants to merge 2 commits intoapache:HDDS-13513_Event_Notification_FeatureBranchfrom
gardenia:HDDS-14005

Conversation

@gardenia
Copy link
Copy Markdown

Please describe your PR in detail:

  • Add the plugin framework for dynamically configurable EventListener plugin
  • an event listener plugin will implement the interface OMEventListener
  • plugins can be loaded/configured dynamically similarly to ranger plugins, e.g.:
ozone.om.plugin.destination.kafka=true
ozone.om.plugin.destination.kafka.classname=org.apache.hadoop.ozone.om.eventlistener.OMEventListenerKafkaPublisher
ozone.notify.kafka.topic=test123
ozone.notify.kafka.bootstrap.servers=kafka-3:29092,kafka-1:29092,kafka-2:29092

NOTE: this change does not provide any concrete plugin implementation

What is the link to the Apache JIRA

https://issues.apache.org/jira/browse/HDDS-14005

How was this patch tested?

unit tests

@gardenia
Copy link
Copy Markdown
Author

@ChenSammi : as per #9366 I was unable to reopen that PR and so the only option was to open a new one.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds a basic plugin framework for OM Event Listener integrations, enabling dynamically configurable OMEventListener implementations to be discovered via OzoneConfiguration and lifecycle-managed by OM.

Changes:

  • Introduces the OMEventListener and OMEventListenerPluginContext APIs in hadoop-ozone/common.
  • Adds OMEventListenerPluginManager (load/start/shutdown) and a minimal context implementation in ozone-manager.
  • Adds unit tests and dummy listener implementations to validate plugin loading behavior.

Reviewed changes

Copilot reviewed 9 out of 9 changed files in this pull request and generated 7 comments.

Show a summary per file
File Description
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginManager.java Implements config-based discovery, instantiation, and lifecycle management for OMEventListener plugins.
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginContextImpl.java Provides an OM-specific context object passed to plugins.
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/eventlistener/package-info.java Package documentation for the OM-side eventlistener implementation.
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListener.java Defines the plugin interface (initialize/start/shutdown).
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginContext.java Defines the (currently empty) plugin context interface.
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/eventlistener/package-info.java Package documentation for the common eventlistener API.
hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/eventlistener/TestOMEventListenerPluginManager.java Adds unit tests for plugin discovery/loading error handling.
hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/eventlistener/FooPlugin.java Dummy test plugin implementing OMEventListener.
hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/eventlistener/BarPlugin.java Dummy test plugin implementing OMEventListener.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +57 to +60
@Test
public void testLoadSinglePlugin() throws InterruptedException {
OzoneConfiguration conf = new OzoneConfiguration();
conf.set("ozone.om.plugin.destination.foo", "enabled");
Copy link

Copilot AI Apr 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests declare throws InterruptedException, but nothing in the test body blocks or interrupts. Removing the unused checked exception would simplify the test signatures.

Copilot uses AI. Check for mistakes.
Comment on lines +68 to +72
static List<OMEventListener> loadAll(OzoneManager ozoneManager, OzoneConfiguration conf) {
List<OMEventListener> plugins = new ArrayList<>();

Map<String, String> props = conf.getPropsMatchPrefixAndTrimPrefix(PLUGIN_DEST_BASE);
List<String> destNameList = new ArrayList<>();
Copy link

Copilot AI Apr 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getPropsMatchPrefixAndTrimPrefix is called with PLUGIN_DEST_BASE without a trailing dot, which means the trimmed keys will start with a leading . (eg .foo) and the match can also include unintended keys like ozone.om.plugin.destinationX.... This makes later concatenation (PLUGIN_DEST_BASE + destName + ".classname") fragile. Consider using a prefix with the delimiter (eg PLUGIN_DEST_BASE + ".") and keeping destName normalized (no leading dot).

Copilot uses AI. Check for mistakes.
Comment on lines +91 to +95
OMEventListener impl = cls.newInstance();
impl.initialize(conf, pluginContext);

plugins.add(impl);
} catch (Exception ex) {
Copy link

Copilot AI Apr 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Class#newInstance() is deprecated and can fail for classes without a public no-arg constructor; it also provides less useful exception context. Prefer cls.getDeclaredConstructor().newInstance() and catch ReflectiveOperationException (or the specific exceptions) so failures are logged with clear cause.

Suggested change
OMEventListener impl = cls.newInstance();
impl.initialize(conf, pluginContext);
plugins.add(impl);
} catch (Exception ex) {
OMEventListener impl = cls.getDeclaredConstructor().newInstance();
impl.initialize(conf, pluginContext);
plugins.add(impl);
} catch (ReflectiveOperationException ex) {

Copilot uses AI. Check for mistakes.

public void startAll() {
for (OMEventListener plugin : plugins) {
plugin.start();
Copy link

Copilot AI Apr 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If any plugin throws from start(), this will abort starting subsequent plugins. Consider wrapping each plugin.start() in its own try/catch so one bad plugin doesn't prevent others from starting, and log the plugin class/name on failure.

Suggested change
plugin.start();
try {
plugin.start();
} catch (Exception ex) {
LOG.error("Failed to start event listener plugin {}",
plugin.getClass().getName(), ex);
}

Copilot uses AI. Check for mistakes.

public void shutdownAll() {
for (OMEventListener plugin : plugins) {
plugin.shutdown();
Copy link

Copilot AI Apr 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If any plugin throws from shutdown(), this will abort shutting down subsequent plugins. Consider handling exceptions per plugin and continuing shutdown so a single failure doesn't block cleanup of the rest.

Suggested change
plugin.shutdown();
try {
plugin.shutdown();
} catch (Exception ex) {
LOG.error("Failed to shut down event listener plugin {}", plugin, ex);
}

Copilot uses AI. Check for mistakes.
private static Class<? extends OMEventListener> resolvePluginClass(OzoneConfiguration conf,
String destName) {
String classnameProp = PLUGIN_DEST_BASE + destName + ".classname";
LOG.info("Gettting classname for {} with propety {}", destName, classnameProp);
Copy link

Copilot AI Apr 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo in the log message: "Gettting" / "propety". Fixing this will make logs easier to search/understand.

Suggested change
LOG.info("Gettting classname for {} with propety {}", destName, classnameProp);
LOG.info("Getting classname for {} with property {}", destName, classnameProp);

Copilot uses AI. Check for mistakes.
public void testPluginClassDoesNotImplementInterface() throws InterruptedException {
OzoneConfiguration conf = new OzoneConfiguration();
conf.set("ozone.om.plugin.destination.foo", "enabled");
conf.set("ozone.om.plugin.destination.foo.classname", "org.apache.hadoop.ozone.om.eventlistener.BrokenFooPlugin");
Copy link

Copilot AI Apr 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

testPluginClassDoesNotImplementInterface configures classname org.apache.hadoop.ozone.om.eventlistener.BrokenFooPlugin, but BrokenFooPlugin is declared as a private static nested class in this test, so its runtime name is TestOMEventListenerPluginManager$BrokenFooPlugin. As written, this test is effectively exercising the "class does not exist" path rather than the "does not implement interface" path. Consider using BrokenFooPlugin.class.getName() or moving BrokenFooPlugin to its own top-level test class in the same package.

Suggested change
conf.set("ozone.om.plugin.destination.foo.classname", "org.apache.hadoop.ozone.om.eventlistener.BrokenFooPlugin");
conf.set("ozone.om.plugin.destination.foo.classname", BrokenFooPlugin.class.getName());

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants