item = items.pollFirstEntry();
+ bytes -= item.getValue().length;
+ notFull.signalAll();
+ return item;
+ } finally {
+ lock.unlock();
+ }
+ }
+}
diff --git a/dogstatsd-http-forwarder/src/main/java/com/datadoghq/dogstatsd/http/forwarder/Forwarder.java b/dogstatsd-http-forwarder/src/main/java/com/datadoghq/dogstatsd/http/forwarder/Forwarder.java
new file mode 100644
index 00000000..e795d533
--- /dev/null
+++ b/dogstatsd-http-forwarder/src/main/java/com/datadoghq/dogstatsd/http/forwarder/Forwarder.java
@@ -0,0 +1,191 @@
+/* Unless explicitly stated otherwise all files in this repository are
+ * licensed under the Apache 2.0 License.
+ *
+ * This product includes software developed at Datadog
+ * (https://www.datadoghq.com/) Copyright 2026 Datadog, Inc.
+ */
+
+package com.datadoghq.dogstatsd.http.forwarder;
+
+import static java.net.http.HttpRequest.BodyPublishers;
+import static java.net.http.HttpResponse.BodyHandlers;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.time.Duration;
+import java.util.Map;
+import java.util.Random;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * An HTTP forwarder that delivers DogStatsD HTTP payloads to a remote endpoint.
+ *
+ * Payloads are enqueued via {@link #send(byte[])} and delivered asynchronously by a background
+ * thread. Failed requests are retried with exponential back-off up to {@code maxTries} attempts
+ * before being discarded.
+ */
+public class Forwarder extends Thread {
+ static final Logger logger = Logger.getLogger(Forwarder.class.getName());
+ final BoundedQueue queue;
+ final URI url;
+ final HttpClient client;
+ final Duration requestTimeout;
+ final Random rng = new Random();
+
+ String localData;
+ String externalData;
+
+ int responseOk, responseBadRequest, responseOther;
+
+ /**
+ * Creates a new forwarder targeting the given URL.
+ *
+ * @param url the remote HTTP endpoint to POST payloads to
+ * @param maxRequestsBytes maximum total size of buffered payloads, in bytes
+ * @param maxTries maximum number of delivery attempts per payload
+ * @param whenFull action to take when the queue is at capacity
+ * @param connectTimeout timeout for establishing the TCP connection
+ * @param requestTimeout timeout from sending the request until response headers are received;
+ * {@code null} disables the request timeout
+ */
+ public Forwarder(
+ URI url,
+ long maxRequestsBytes,
+ long maxTries,
+ WhenFull whenFull,
+ Duration connectTimeout,
+ Duration requestTimeout) {
+ this.url = url;
+ this.queue = new BoundedQueue(maxRequestsBytes, maxTries, whenFull);
+ this.requestTimeout = requestTimeout;
+ this.client =
+ HttpClient.newBuilder()
+ .version(HttpClient.Version.HTTP_2)
+ .connectTimeout(connectTimeout)
+ .build();
+ }
+
+ /** Runs the forwarding loop, delivering queued payloads until the thread is interrupted. */
+ @Override
+ public void run() {
+ try {
+ while (true) {
+ runOnce(queue.next());
+ }
+ } catch (InterruptedException e) {
+ return;
+ }
+ }
+
+ /**
+ * Enqueues a payload for delivery to the remote endpoint.
+ *
+ *
If the queue is full, behaviour is determined by the {@link WhenFull} policy supplied at
+ * construction time.
+ *
+ * @param payload the raw bytes to deliver
+ * @throws InterruptedException if the calling thread is interrupted while waiting for space
+ * ({@link WhenFull#BLOCK} mode only)
+ */
+ public void send(byte[] payload) throws InterruptedException {
+ queue.add(payload);
+ }
+
+ void runOnce(Map.Entry item) throws InterruptedException {
+ byte[] payload = item.getValue();
+ logger.log(Level.INFO, "sending {0} bytes", payload.length);
+
+ HttpRequest.Builder builder =
+ HttpRequest.newBuilder(url).POST(BodyPublishers.ofByteArray(payload));
+ if (requestTimeout != null) {
+ builder.timeout(requestTimeout);
+ }
+ if (localData != null) {
+ builder.setHeader("x-dsd-ld", localData);
+ }
+ if (externalData != null) {
+ builder.setHeader("x-dsd-ed", externalData);
+ }
+ HttpRequest req = builder.build();
+
+ try {
+ HttpResponse res = client.send(req, BodyHandlers.ofString());
+ res.body();
+
+ logger.log(
+ Level.INFO, "response {0}: {1}", new Object[] {res.statusCode(), res.body()});
+
+ switch (res.statusCode()) {
+ case 400:
+ responseBadRequest++;
+ onSuccess();
+ break;
+ case 200:
+ responseOk++;
+ onSuccess();
+ break;
+ default:
+ responseOther++;
+ onError();
+ queue.requeue(item);
+ }
+ } catch (IOException ex) {
+ logger.log(Level.WARNING, "error sending request: {0}", ex.toString());
+ responseOther++;
+ onError();
+ queue.requeue(item);
+ }
+
+ backoff();
+ }
+
+ int delay;
+
+ void onSuccess() {
+ delay >>= 4;
+ }
+
+ void onError() {
+ if (delay < 64) delay <<= 1;
+ if (delay == 0) delay = 1;
+ }
+
+ void backoff() throws InterruptedException {
+ if (delay > 0) {
+ int sleep = (int) (250.0 * delay * (0.5 + rng.nextDouble()));
+ logger.log(Level.INFO, "backoff={0}, sleeping {1}ms", new Object[] {delay, sleep});
+ Thread.sleep(sleep);
+ }
+ }
+
+ /**
+ * Sets the local-data value sent as the {@code x-dsd-ld} header with each request.
+ *
+ * Local data carries the container ID or cgroup node inode used by the Datadog Agent for
+ * origin detection (DogStatsD protocol v1.4).
+ *
+ * @param data the local-data string, or {@code null} to omit the header
+ */
+ public void setLocalData(String data) {
+ logger.log(Level.INFO, "using local data: {0}", data);
+ localData = data;
+ }
+
+ /**
+ * Sets the external-data value sent as the {@code x-dsd-ed} header with each request.
+ *
+ *
External data is supplied by the Datadog Agent Admission Controller and is used by the
+ * Agent to enrich metrics with container tags when a container ID is unavailable (DogStatsD
+ * protocol v1.5, Agent ≥ v7.57.0).
+ *
+ * @param data the external-data string, or {@code null} to omit the header
+ */
+ public void setExternalData(String data) {
+ logger.log(Level.INFO, "using external data: {0}", data);
+ externalData = data;
+ }
+}
diff --git a/dogstatsd-http-forwarder/src/main/java/com/datadoghq/dogstatsd/http/forwarder/WhenFull.java b/dogstatsd-http-forwarder/src/main/java/com/datadoghq/dogstatsd/http/forwarder/WhenFull.java
new file mode 100644
index 00000000..b2b84484
--- /dev/null
+++ b/dogstatsd-http-forwarder/src/main/java/com/datadoghq/dogstatsd/http/forwarder/WhenFull.java
@@ -0,0 +1,16 @@
+/* Unless explicitly stated otherwise all files in this repository are
+ * licensed under the Apache 2.0 License.
+ *
+ * This product includes software developed at Datadog
+ * (https://www.datadoghq.com/) Copyright 2026 Datadog, Inc.
+ */
+
+package com.datadoghq.dogstatsd.http.forwarder;
+
+/** Controls the behavior of {@link Forwarder} when its internal queue is full. */
+public enum WhenFull {
+ /** Block the caller until space becomes available in the queue. */
+ BLOCK,
+ /** Drop a payload to make room, according to the queue's eviction policy. */
+ DROP;
+}
diff --git a/dogstatsd-http-forwarder/src/test/java/com/datadoghq/dogstatsd/http/forwarder/BoundedQueueTest.java b/dogstatsd-http-forwarder/src/test/java/com/datadoghq/dogstatsd/http/forwarder/BoundedQueueTest.java
new file mode 100644
index 00000000..aca496d1
--- /dev/null
+++ b/dogstatsd-http-forwarder/src/test/java/com/datadoghq/dogstatsd/http/forwarder/BoundedQueueTest.java
@@ -0,0 +1,227 @@
+/* Unless explicitly stated otherwise all files in this repository are
+ * licensed under the Apache 2.0 License.
+ *
+ * This product includes software developed at Datadog
+ * (https://www.datadoghq.com/) Copyright 2026 Datadog, Inc.
+ */
+
+package com.datadoghq.dogstatsd.http.forwarder;
+
+import static org.junit.Assert.*;
+
+import java.util.Arrays;
+import java.util.Map;
+import org.junit.Test;
+
+public class BoundedQueueTest {
+ private static byte[] bytes(int n) {
+ return bytes(n, (byte) 0);
+ }
+
+ private static byte[] bytes(int n, byte v) {
+ byte[] b = new byte[n];
+ Arrays.fill(b, v);
+ return b;
+ }
+
+ // --- Round-trip / bytes tracking ---
+
+ @Test
+ public void addThenNextReturnsItem() throws InterruptedException {
+ BoundedQueue q = new BoundedQueue(10, 1, WhenFull.DROP);
+ byte[] item = bytes(4);
+ q.add(item);
+ Map.Entry entry = q.next();
+ assertSame(item, entry.getValue());
+ assertEquals(0, q.bytes);
+ }
+
+ @Test
+ public void bytesDecrementedOnNext() throws InterruptedException {
+ BoundedQueue q = new BoundedQueue(30, 1, WhenFull.DROP);
+ q.add(bytes(3));
+ q.add(bytes(3));
+ q.add(bytes(3));
+ assertEquals(9, q.bytes);
+ q.next();
+ assertEquals(6, q.bytes);
+ q.next();
+ assertEquals(3, q.bytes);
+ q.next();
+ assertEquals(0, q.bytes);
+ }
+
+ @Test
+ public void newestItemDequeuesFirstWithinSameTries() throws InterruptedException {
+ BoundedQueue q = new BoundedQueue(30, 1, WhenFull.DROP);
+ byte[] a = {1};
+ byte[] b = {2};
+ byte[] c = {3};
+ q.add(a);
+ q.add(b);
+ q.add(c);
+ // Within tries=0, larger clock (added later) sorts first → LIFO
+ assertSame(c, q.next().getValue());
+ assertSame(b, q.next().getValue());
+ assertSame(a, q.next().getValue());
+ }
+
+ // --- WhenFull.DROP ---
+
+ @Test
+ public void dropWhenFullDropsOldestItem() throws InterruptedException {
+ BoundedQueue q = new BoundedQueue(10, 1, WhenFull.DROP);
+ byte[] a = bytes(5); // added first → smallest clock → last in TreeMap
+ byte[] b = bytes(4);
+ q.add(a); // queue: a(clock=MIN+1)
+ q.add(b); // queue full: a, b
+ byte[] c = bytes(3);
+ q.add(c); // a (oldest, last entry) evicted
+ assertEquals(1, q.droppedItems);
+ assertEquals(5, q.droppedBytes);
+ // Remaining: c (newest, clock=MIN+3) then b (clock=MIN+2)
+ assertSame(c, q.next().getValue());
+ assertSame(b, q.next().getValue());
+ }
+
+ @Test
+ public void dropCountersAccumulate() throws InterruptedException {
+ BoundedQueue q = new BoundedQueue(5, 1, WhenFull.DROP);
+ q.add(bytes(5)); // fills queue (X)
+ q.add(bytes(5)); // X dropped (Y in)
+ q.add(bytes(5)); // Y dropped (Z in)
+ assertEquals(2, q.droppedItems);
+ assertEquals(10, q.droppedBytes);
+ }
+
+ @Test(timeout = 3000)
+ public void dropDoesNotBlock() throws InterruptedException {
+ BoundedQueue q = new BoundedQueue(5, 1, WhenFull.DROP);
+ q.add(bytes(5)); // fill
+ q.add(bytes(5)); // should return immediately via DROP
+ }
+
+ // --- WhenFull.BLOCK ---
+
+ @Test(timeout = 5000)
+ public void blockUnblocksWhenSpaceFreed() throws InterruptedException {
+ BoundedQueue q = new BoundedQueue(5, 1, WhenFull.BLOCK);
+ q.add(bytes(5)); // queue full
+
+ Thread producer =
+ new Thread(
+ () -> {
+ try {
+ q.add(bytes(5));
+ } catch (InterruptedException e) {
+ return;
+ }
+ });
+ producer.start();
+
+ // Give producer time to reach await()
+ while (!(producer.getState() == Thread.State.WAITING
+ || producer.getState() == Thread.State.TIMED_WAITING)) {
+ Thread.sleep(50);
+ }
+
+ q.next(); // frees space, signals notFull
+ producer.join(2000);
+ assertFalse(producer.isAlive());
+ assertEquals(5, q.bytes);
+ assertEquals(0, q.droppedItems);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void addThrowsForOversizedItem() throws InterruptedException {
+ BoundedQueue q = new BoundedQueue(4, 1, WhenFull.DROP);
+ q.add(bytes(5));
+ }
+
+ @Test
+ public void requeueIncrementsTriesPreservesClock() throws InterruptedException {
+ BoundedQueue q = new BoundedQueue(20, 3, WhenFull.DROP);
+ q.add(bytes(4));
+ Map.Entry entry = q.next();
+ assertEquals(0, entry.getKey().tries);
+ long originalClock = entry.getKey().clock;
+
+ q.requeue(entry);
+ Map.Entry requeued = q.next();
+ assertEquals(1, requeued.getKey().tries);
+ assertEquals(originalClock, requeued.getKey().clock);
+ assertEquals(0, q.droppedItems);
+ }
+
+ @Test
+ public void requeuedItemDequeuesAfterFreshItems() throws InterruptedException {
+ BoundedQueue q = new BoundedQueue(20, 3, WhenFull.DROP);
+ byte[] a = {10};
+ byte[] b = {20};
+ q.add(a);
+ Map.Entry entryA = q.next();
+ q.requeue(entryA); // A now has tries=1
+
+ q.add(b); // B has tries=0 → higher priority
+
+ assertSame(b, q.next().getValue()); // B first (fewer tries)
+ assertSame(a, q.next().getValue()); // A second
+ }
+
+ @Test
+ public void requeueAtMaxTriesIsAccepted() throws InterruptedException {
+ BoundedQueue q = new BoundedQueue(20, 2, WhenFull.DROP);
+ q.add(bytes(3));
+ Map.Entry e = q.next();
+ q.requeue(e); // tries → 1
+ e = q.next();
+ q.requeue(e); // tries → 2 == maxTries, should be accepted
+ assertEquals(0, q.droppedItems);
+ assertFalse(q.items.isEmpty());
+ }
+
+ @Test
+ public void requeuePastMaxTriesDropsItem() throws InterruptedException {
+ BoundedQueue q = new BoundedQueue(20, 2, WhenFull.DROP);
+ byte[] item = bytes(7);
+ q.add(item);
+ Map.Entry e = q.next();
+ q.requeue(e); // tries → 1
+ e = q.next();
+ q.requeue(e); // tries → 2 == maxTries, accepted
+ e = q.next();
+ q.requeue(e); // tries → 3 > maxTries, dropped
+ assertEquals(1, q.droppedItems);
+ assertEquals(7, q.droppedBytes);
+ assertEquals(0, q.bytes);
+ assertTrue(q.items.isEmpty());
+ }
+
+ @Test(timeout = 5000)
+ public void nextBlocksUntilItemAdded() throws InterruptedException {
+ BoundedQueue q = new BoundedQueue(100, 1, WhenFull.DROP);
+ byte[] item = bytes(3);
+ Map.Entry[] result = new Map.Entry[1];
+
+ Thread consumer =
+ new Thread(
+ () -> {
+ try {
+ result[0] = q.next();
+ } catch (InterruptedException e) {
+ return;
+ }
+ });
+ consumer.start();
+
+ while (!(consumer.getState() == Thread.State.WAITING
+ || consumer.getState() == Thread.State.TIMED_WAITING)) {
+ Thread.sleep(50);
+ }
+
+ q.add(item);
+ consumer.join(2000);
+ assertFalse(consumer.isAlive());
+ assertSame(item, result[0].getValue());
+ }
+}
diff --git a/dogstatsd-http-forwarder/src/test/java/com/datadoghq/dogstatsd/http/forwarder/KeyComparatorTest.java b/dogstatsd-http-forwarder/src/test/java/com/datadoghq/dogstatsd/http/forwarder/KeyComparatorTest.java
new file mode 100644
index 00000000..2c9327e5
--- /dev/null
+++ b/dogstatsd-http-forwarder/src/test/java/com/datadoghq/dogstatsd/http/forwarder/KeyComparatorTest.java
@@ -0,0 +1,123 @@
+/* Unless explicitly stated otherwise all files in this repository are
+ * licensed under the Apache 2.0 License.
+ *
+ * This product includes software developed at Datadog
+ * (https://www.datadoghq.com/) Copyright 2026 Datadog, Inc.
+ */
+
+package com.datadoghq.dogstatsd.http.forwarder;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+
+public class KeyComparatorTest {
+
+ private static BoundedQueue.Key key(long clock) {
+ return new BoundedQueue.Key(clock);
+ }
+
+ // --- Reflexivity ---
+
+ @Test
+ public void reflexive() {
+ BoundedQueue.Key k = key(5);
+ assertEquals(0, k.compareTo(k));
+ }
+
+ // --- Antisymmetry ---
+
+ @Test
+ public void antisymmetric_sameTries_differentClock() {
+ BoundedQueue.Key k1 = key(10);
+ BoundedQueue.Key k2 = key(20);
+ assertEquals(-Integer.signum(k1.compareTo(k2)), Integer.signum(k2.compareTo(k1)));
+ }
+
+ @Test
+ public void antisymmetric_differentTries() {
+ BoundedQueue.Key k0 = key(5);
+ BoundedQueue.Key k1 = k0.next();
+ assertEquals(-Integer.signum(k0.compareTo(k1)), Integer.signum(k1.compareTo(k0)));
+ }
+
+ // --- Transitivity ---
+
+ @Test
+ public void transitive_triesOrdering() {
+ BoundedQueue.Key k0 = key(1);
+ BoundedQueue.Key k1 = k0.next();
+ BoundedQueue.Key k2 = k1.next();
+ assertTrue(k0.compareTo(k1) < 0);
+ assertTrue(k1.compareTo(k2) < 0);
+ assertTrue(k0.compareTo(k2) < 0);
+ }
+
+ @Test
+ public void transitive_clockOrdering() {
+ // Within tries=0: larger clock → smaller compareTo → sorts first
+ BoundedQueue.Key k30 = key(30);
+ BoundedQueue.Key k20 = key(20);
+ BoundedQueue.Key k10 = key(10);
+ assertTrue(k30.compareTo(k20) < 0);
+ assertTrue(k20.compareTo(k10) < 0);
+ assertTrue(k30.compareTo(k10) < 0);
+ }
+
+ // --- Ordering rules ---
+
+ @Test
+ public void fewerTriesIsLess() {
+ BoundedQueue.Key k0 = key(1);
+ BoundedQueue.Key k1 = k0.next();
+ BoundedQueue.Key k2 = k1.next();
+ assertTrue(k0.compareTo(k1) < 0);
+ assertTrue(k1.compareTo(k2) < 0);
+ }
+
+ @Test
+ public void triesDominatesClockValue() {
+ BoundedQueue.Key lowTriesLowClock = key(1); // tries=0, clock=1
+ BoundedQueue.Key highTriesHighClock = key(9999).next(); // tries=1, clock=9999
+ assertTrue(lowTriesLowClock.compareTo(highTriesHighClock) < 0);
+ }
+
+ @Test
+ public void sameTries_largerClockSortsFirst() {
+ BoundedQueue.Key kBig = key(100); // tries=0, clock=100 → sorts before kSmall
+ BoundedQueue.Key kSmall = key(1); // tries=0, clock=1
+ assertTrue(kBig.compareTo(kSmall) < 0);
+ }
+
+ @Test
+ public void sameTries_equalClockReturnsZero() {
+ BoundedQueue.Key k1 = key(50);
+ BoundedQueue.Key k2 = key(50);
+ assertEquals(0, k1.compareTo(k2));
+ }
+
+ // --- next() behaviour ---
+
+ @Test
+ public void nextIncrementsTries() {
+ BoundedQueue.Key k = key(42);
+ assertEquals(0, k.tries);
+ assertEquals(42, k.clock);
+
+ BoundedQueue.Key k1 = k.next();
+ assertEquals(1, k1.tries);
+ assertEquals(42, k1.clock);
+
+ BoundedQueue.Key k2 = k1.next();
+ assertEquals(2, k2.tries);
+ assertEquals(42, k2.clock);
+ }
+
+ @Test
+ public void nextDoesNotMutateOriginal() {
+ BoundedQueue.Key k = key(7);
+ k.next();
+ assertEquals(0, k.tries);
+ assertEquals(7, k.clock);
+ }
+}