diff --git a/.circleci/config.yml b/.circleci/config.yml index fbe8249b..36130d1a 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -14,41 +14,54 @@ commands: name: Grab java version and dump to file command: java -version > << parameters.filename >> -default_steps: &default_steps +default_steps_7-11: &default_steps_7-11 steps: - checkout + - run: mvn clean install + - run: cd dogstatsd-http-core && mvn clean install - - run: | - mvn clean install +default_steps_11-21: &default_steps_11-21 + steps: + - checkout + - run: mvn clean install - run: cd dogstatsd-http-core && mvn clean install + - run: cd dogstatsd-http-forwarder && mvn clean install + +default_steps_21-24: &default_steps_21-24 + # Java 7 source code is not compatible with OpenJDK 21 and later. + steps: + - checkout + - run: cd dogstatsd-http-forwarder && mvn clean install jobs: openjdk7: docker: - image: jfullaondo/openjdk:7 - <<: *default_steps + <<: *default_steps_7-11 openjdk8: docker: &jdk8 - image: cimg/openjdk:8.0 - <<: *default_steps + <<: *default_steps_7-11 openjdk11: docker: - image: cimg/openjdk:11.0 - <<: *default_steps + <<: *default_steps_11-21 openjdk13: docker: - image: cimg/openjdk:13.0 - <<: *default_steps + <<: *default_steps_11-21 openjdk17: docker: - image: cimg/openjdk:17.0 - <<: *default_steps - -## Fails with "Source option 7 is no longer supported. Use 8 or later." -# openjdk21: -# docker: -# - image: cimg/openjdk:21.0 -# <<: *default_steps + <<: *default_steps_11-21 + openjdk21: + docker: + - image: cimg/openjdk:21.0 + <<: *default_steps_21-24 + openjdk24: + docker: + - image: cimg/openjdk:24.0 + <<: *default_steps_21-24 windows-openjdk12: executor: @@ -85,7 +98,8 @@ workflows: - openjdk11 - openjdk13 - openjdk17 -# - openjdk21 + - openjdk21 + - openjdk24 - windows-openjdk12 - openjdk8-jnr-exclude - openjdk8-jnr-latest diff --git a/dogstatsd-http-forwarder/.gitignore b/dogstatsd-http-forwarder/.gitignore new file mode 100644 index 00000000..b83d2226 --- /dev/null +++ b/dogstatsd-http-forwarder/.gitignore @@ -0,0 +1 @@ +/target/ diff --git a/dogstatsd-http-forwarder/pom.xml b/dogstatsd-http-forwarder/pom.xml new file mode 100644 index 00000000..840e86f7 --- /dev/null +++ b/dogstatsd-http-forwarder/pom.xml @@ -0,0 +1,100 @@ + + 4.0.0 + + com.datadoghq + dogstatsd-http-forwarder + jar + dogstatsd-http-forwarder + 1.0.0-SNAPSHOT + HTTP forwarder for DogStatsD metrics. + https://github.com/DataDog/java-dogstatsd-client + + + UTF-8 + + + + + The MIT License (MIT) + http://opensource.org/licenses/MIT + repo + + + + + https://github.com/DataDog/java-dogstatsd-client + scm:git:git@github.com:DataDog/java-dogstatsd-client.git + scm:git:git@github.com:Datadog/java-dogstatsd-client.git + + + + + datadog + Datadog developers + dev@datadoghq.com + + + + + + junit + junit + 4.13.1 + test + + + + + + spotless + + [17.0,) + + + + + com.diffplug.spotless + spotless-maven-plugin + 2.45.0 + + + + 1.28.0 + + + + + + + + check + + + + + + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.8.1 + + 11 + 11 + + + + org.apache.maven.plugins + maven-surefire-plugin + 2.19 + + + + diff --git a/dogstatsd-http-forwarder/src/main/java/com/datadoghq/dogstatsd/http/forwarder/BoundedQueue.java b/dogstatsd-http-forwarder/src/main/java/com/datadoghq/dogstatsd/http/forwarder/BoundedQueue.java new file mode 100644 index 00000000..41d44e42 --- /dev/null +++ b/dogstatsd-http-forwarder/src/main/java/com/datadoghq/dogstatsd/http/forwarder/BoundedQueue.java @@ -0,0 +1,136 @@ +/* Unless explicitly stated otherwise all files in this repository are + * licensed under the Apache 2.0 License. + * + * This product includes software developed at Datadog + * (https://www.datadoghq.com/) Copyright 2026 Datadog, Inc. + */ + +package com.datadoghq.dogstatsd.http.forwarder; + +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +class BoundedQueue { + // Key represents a tuple of integers (tries, clock). + static class Key implements Comparable { + final long tries; + final long clock; + + Key(long clock) { + this.tries = 0; + this.clock = clock; + } + + private Key(long tries, long clock) { + this.tries = tries; + this.clock = clock; + } + + Key next() { + return new Key(tries + 1, clock); + } + + @Override + public int compareTo(Key o) { + // Keys are ordered such first we try items with fewer + // attempts, and then with a newer (larger) clock value. + if (tries == o.tries) { + return Long.compare(o.clock, clock); + } + return Long.compare(tries, o.tries); + } + } + + long clock = Long.MIN_VALUE; + long bytes; + final long maxBytes; + final long maxTries; + final WhenFull whenFull; + + final TreeMap items = new TreeMap<>(); + + long droppedItems; + long droppedBytes; + + Lock lock = new ReentrantLock(); + Condition notEmpty = lock.newCondition(); + Condition notFull = lock.newCondition(); + + BoundedQueue(long maxBytes, long maxTries, WhenFull whenFull) { + this.maxBytes = maxBytes; + this.maxTries = maxTries; + this.whenFull = whenFull; + } + + void add(byte[] item) throws InterruptedException { + put(null, item, whenFull); + } + + void requeue(Map.Entry item) throws InterruptedException { + Key nextKey = item.getKey().next(); + if (nextKey.tries > maxTries) { + droppedItems++; + droppedBytes += item.getValue().length; + return; + } + put(nextKey, item.getValue(), WhenFull.DROP); + } + + // Must be called when lock is held. + private Key newKey() { + clock++; + return new Key(clock); + } + + private void put(Key key, byte[] item, WhenFull whenFull) throws InterruptedException { + lock.lock(); + try { + if (key == null) { + key = newKey(); + } + ensureSpace(item.length, whenFull); + items.put(key, item); + bytes += item.length; + notEmpty.signal(); + } finally { + lock.unlock(); + } + } + + private void ensureSpace(int length, WhenFull whenFull) throws InterruptedException { + if (length > maxBytes) { + throw new IllegalArgumentException("item length is larger than maxBytes"); + } + while (bytes + length > maxBytes) { + switch (whenFull) { + case DROP: + Map.Entry last = items.pollLastEntry(); + droppedItems++; + droppedBytes += last.getValue().length; + bytes -= last.getValue().length; + break; + case BLOCK: + notFull.await(); + break; + } + } + } + + Map.Entry next() throws InterruptedException { + lock.lock(); + try { + while (items.size() == 0) { + notEmpty.await(); + } + Map.Entry item = items.pollFirstEntry(); + bytes -= item.getValue().length; + notFull.signalAll(); + return item; + } finally { + lock.unlock(); + } + } +} diff --git a/dogstatsd-http-forwarder/src/main/java/com/datadoghq/dogstatsd/http/forwarder/Forwarder.java b/dogstatsd-http-forwarder/src/main/java/com/datadoghq/dogstatsd/http/forwarder/Forwarder.java new file mode 100644 index 00000000..e795d533 --- /dev/null +++ b/dogstatsd-http-forwarder/src/main/java/com/datadoghq/dogstatsd/http/forwarder/Forwarder.java @@ -0,0 +1,191 @@ +/* Unless explicitly stated otherwise all files in this repository are + * licensed under the Apache 2.0 License. + * + * This product includes software developed at Datadog + * (https://www.datadoghq.com/) Copyright 2026 Datadog, Inc. + */ + +package com.datadoghq.dogstatsd.http.forwarder; + +import static java.net.http.HttpRequest.BodyPublishers; +import static java.net.http.HttpResponse.BodyHandlers; + +import java.io.IOException; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.time.Duration; +import java.util.Map; +import java.util.Random; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * An HTTP forwarder that delivers DogStatsD HTTP payloads to a remote endpoint. + * + *

Payloads are enqueued via {@link #send(byte[])} and delivered asynchronously by a background + * thread. Failed requests are retried with exponential back-off up to {@code maxTries} attempts + * before being discarded. + */ +public class Forwarder extends Thread { + static final Logger logger = Logger.getLogger(Forwarder.class.getName()); + final BoundedQueue queue; + final URI url; + final HttpClient client; + final Duration requestTimeout; + final Random rng = new Random(); + + String localData; + String externalData; + + int responseOk, responseBadRequest, responseOther; + + /** + * Creates a new forwarder targeting the given URL. + * + * @param url the remote HTTP endpoint to POST payloads to + * @param maxRequestsBytes maximum total size of buffered payloads, in bytes + * @param maxTries maximum number of delivery attempts per payload + * @param whenFull action to take when the queue is at capacity + * @param connectTimeout timeout for establishing the TCP connection + * @param requestTimeout timeout from sending the request until response headers are received; + * {@code null} disables the request timeout + */ + public Forwarder( + URI url, + long maxRequestsBytes, + long maxTries, + WhenFull whenFull, + Duration connectTimeout, + Duration requestTimeout) { + this.url = url; + this.queue = new BoundedQueue(maxRequestsBytes, maxTries, whenFull); + this.requestTimeout = requestTimeout; + this.client = + HttpClient.newBuilder() + .version(HttpClient.Version.HTTP_2) + .connectTimeout(connectTimeout) + .build(); + } + + /** Runs the forwarding loop, delivering queued payloads until the thread is interrupted. */ + @Override + public void run() { + try { + while (true) { + runOnce(queue.next()); + } + } catch (InterruptedException e) { + return; + } + } + + /** + * Enqueues a payload for delivery to the remote endpoint. + * + *

If the queue is full, behaviour is determined by the {@link WhenFull} policy supplied at + * construction time. + * + * @param payload the raw bytes to deliver + * @throws InterruptedException if the calling thread is interrupted while waiting for space + * ({@link WhenFull#BLOCK} mode only) + */ + public void send(byte[] payload) throws InterruptedException { + queue.add(payload); + } + + void runOnce(Map.Entry item) throws InterruptedException { + byte[] payload = item.getValue(); + logger.log(Level.INFO, "sending {0} bytes", payload.length); + + HttpRequest.Builder builder = + HttpRequest.newBuilder(url).POST(BodyPublishers.ofByteArray(payload)); + if (requestTimeout != null) { + builder.timeout(requestTimeout); + } + if (localData != null) { + builder.setHeader("x-dsd-ld", localData); + } + if (externalData != null) { + builder.setHeader("x-dsd-ed", externalData); + } + HttpRequest req = builder.build(); + + try { + HttpResponse res = client.send(req, BodyHandlers.ofString()); + res.body(); + + logger.log( + Level.INFO, "response {0}: {1}", new Object[] {res.statusCode(), res.body()}); + + switch (res.statusCode()) { + case 400: + responseBadRequest++; + onSuccess(); + break; + case 200: + responseOk++; + onSuccess(); + break; + default: + responseOther++; + onError(); + queue.requeue(item); + } + } catch (IOException ex) { + logger.log(Level.WARNING, "error sending request: {0}", ex.toString()); + responseOther++; + onError(); + queue.requeue(item); + } + + backoff(); + } + + int delay; + + void onSuccess() { + delay >>= 4; + } + + void onError() { + if (delay < 64) delay <<= 1; + if (delay == 0) delay = 1; + } + + void backoff() throws InterruptedException { + if (delay > 0) { + int sleep = (int) (250.0 * delay * (0.5 + rng.nextDouble())); + logger.log(Level.INFO, "backoff={0}, sleeping {1}ms", new Object[] {delay, sleep}); + Thread.sleep(sleep); + } + } + + /** + * Sets the local-data value sent as the {@code x-dsd-ld} header with each request. + * + *

Local data carries the container ID or cgroup node inode used by the Datadog Agent for + * origin detection (DogStatsD protocol v1.4). + * + * @param data the local-data string, or {@code null} to omit the header + */ + public void setLocalData(String data) { + logger.log(Level.INFO, "using local data: {0}", data); + localData = data; + } + + /** + * Sets the external-data value sent as the {@code x-dsd-ed} header with each request. + * + *

External data is supplied by the Datadog Agent Admission Controller and is used by the + * Agent to enrich metrics with container tags when a container ID is unavailable (DogStatsD + * protocol v1.5, Agent ≥ v7.57.0). + * + * @param data the external-data string, or {@code null} to omit the header + */ + public void setExternalData(String data) { + logger.log(Level.INFO, "using external data: {0}", data); + externalData = data; + } +} diff --git a/dogstatsd-http-forwarder/src/main/java/com/datadoghq/dogstatsd/http/forwarder/WhenFull.java b/dogstatsd-http-forwarder/src/main/java/com/datadoghq/dogstatsd/http/forwarder/WhenFull.java new file mode 100644 index 00000000..b2b84484 --- /dev/null +++ b/dogstatsd-http-forwarder/src/main/java/com/datadoghq/dogstatsd/http/forwarder/WhenFull.java @@ -0,0 +1,16 @@ +/* Unless explicitly stated otherwise all files in this repository are + * licensed under the Apache 2.0 License. + * + * This product includes software developed at Datadog + * (https://www.datadoghq.com/) Copyright 2026 Datadog, Inc. + */ + +package com.datadoghq.dogstatsd.http.forwarder; + +/** Controls the behavior of {@link Forwarder} when its internal queue is full. */ +public enum WhenFull { + /** Block the caller until space becomes available in the queue. */ + BLOCK, + /** Drop a payload to make room, according to the queue's eviction policy. */ + DROP; +} diff --git a/dogstatsd-http-forwarder/src/test/java/com/datadoghq/dogstatsd/http/forwarder/BoundedQueueTest.java b/dogstatsd-http-forwarder/src/test/java/com/datadoghq/dogstatsd/http/forwarder/BoundedQueueTest.java new file mode 100644 index 00000000..aca496d1 --- /dev/null +++ b/dogstatsd-http-forwarder/src/test/java/com/datadoghq/dogstatsd/http/forwarder/BoundedQueueTest.java @@ -0,0 +1,227 @@ +/* Unless explicitly stated otherwise all files in this repository are + * licensed under the Apache 2.0 License. + * + * This product includes software developed at Datadog + * (https://www.datadoghq.com/) Copyright 2026 Datadog, Inc. + */ + +package com.datadoghq.dogstatsd.http.forwarder; + +import static org.junit.Assert.*; + +import java.util.Arrays; +import java.util.Map; +import org.junit.Test; + +public class BoundedQueueTest { + private static byte[] bytes(int n) { + return bytes(n, (byte) 0); + } + + private static byte[] bytes(int n, byte v) { + byte[] b = new byte[n]; + Arrays.fill(b, v); + return b; + } + + // --- Round-trip / bytes tracking --- + + @Test + public void addThenNextReturnsItem() throws InterruptedException { + BoundedQueue q = new BoundedQueue(10, 1, WhenFull.DROP); + byte[] item = bytes(4); + q.add(item); + Map.Entry entry = q.next(); + assertSame(item, entry.getValue()); + assertEquals(0, q.bytes); + } + + @Test + public void bytesDecrementedOnNext() throws InterruptedException { + BoundedQueue q = new BoundedQueue(30, 1, WhenFull.DROP); + q.add(bytes(3)); + q.add(bytes(3)); + q.add(bytes(3)); + assertEquals(9, q.bytes); + q.next(); + assertEquals(6, q.bytes); + q.next(); + assertEquals(3, q.bytes); + q.next(); + assertEquals(0, q.bytes); + } + + @Test + public void newestItemDequeuesFirstWithinSameTries() throws InterruptedException { + BoundedQueue q = new BoundedQueue(30, 1, WhenFull.DROP); + byte[] a = {1}; + byte[] b = {2}; + byte[] c = {3}; + q.add(a); + q.add(b); + q.add(c); + // Within tries=0, larger clock (added later) sorts first → LIFO + assertSame(c, q.next().getValue()); + assertSame(b, q.next().getValue()); + assertSame(a, q.next().getValue()); + } + + // --- WhenFull.DROP --- + + @Test + public void dropWhenFullDropsOldestItem() throws InterruptedException { + BoundedQueue q = new BoundedQueue(10, 1, WhenFull.DROP); + byte[] a = bytes(5); // added first → smallest clock → last in TreeMap + byte[] b = bytes(4); + q.add(a); // queue: a(clock=MIN+1) + q.add(b); // queue full: a, b + byte[] c = bytes(3); + q.add(c); // a (oldest, last entry) evicted + assertEquals(1, q.droppedItems); + assertEquals(5, q.droppedBytes); + // Remaining: c (newest, clock=MIN+3) then b (clock=MIN+2) + assertSame(c, q.next().getValue()); + assertSame(b, q.next().getValue()); + } + + @Test + public void dropCountersAccumulate() throws InterruptedException { + BoundedQueue q = new BoundedQueue(5, 1, WhenFull.DROP); + q.add(bytes(5)); // fills queue (X) + q.add(bytes(5)); // X dropped (Y in) + q.add(bytes(5)); // Y dropped (Z in) + assertEquals(2, q.droppedItems); + assertEquals(10, q.droppedBytes); + } + + @Test(timeout = 3000) + public void dropDoesNotBlock() throws InterruptedException { + BoundedQueue q = new BoundedQueue(5, 1, WhenFull.DROP); + q.add(bytes(5)); // fill + q.add(bytes(5)); // should return immediately via DROP + } + + // --- WhenFull.BLOCK --- + + @Test(timeout = 5000) + public void blockUnblocksWhenSpaceFreed() throws InterruptedException { + BoundedQueue q = new BoundedQueue(5, 1, WhenFull.BLOCK); + q.add(bytes(5)); // queue full + + Thread producer = + new Thread( + () -> { + try { + q.add(bytes(5)); + } catch (InterruptedException e) { + return; + } + }); + producer.start(); + + // Give producer time to reach await() + while (!(producer.getState() == Thread.State.WAITING + || producer.getState() == Thread.State.TIMED_WAITING)) { + Thread.sleep(50); + } + + q.next(); // frees space, signals notFull + producer.join(2000); + assertFalse(producer.isAlive()); + assertEquals(5, q.bytes); + assertEquals(0, q.droppedItems); + } + + @Test(expected = IllegalArgumentException.class) + public void addThrowsForOversizedItem() throws InterruptedException { + BoundedQueue q = new BoundedQueue(4, 1, WhenFull.DROP); + q.add(bytes(5)); + } + + @Test + public void requeueIncrementsTriesPreservesClock() throws InterruptedException { + BoundedQueue q = new BoundedQueue(20, 3, WhenFull.DROP); + q.add(bytes(4)); + Map.Entry entry = q.next(); + assertEquals(0, entry.getKey().tries); + long originalClock = entry.getKey().clock; + + q.requeue(entry); + Map.Entry requeued = q.next(); + assertEquals(1, requeued.getKey().tries); + assertEquals(originalClock, requeued.getKey().clock); + assertEquals(0, q.droppedItems); + } + + @Test + public void requeuedItemDequeuesAfterFreshItems() throws InterruptedException { + BoundedQueue q = new BoundedQueue(20, 3, WhenFull.DROP); + byte[] a = {10}; + byte[] b = {20}; + q.add(a); + Map.Entry entryA = q.next(); + q.requeue(entryA); // A now has tries=1 + + q.add(b); // B has tries=0 → higher priority + + assertSame(b, q.next().getValue()); // B first (fewer tries) + assertSame(a, q.next().getValue()); // A second + } + + @Test + public void requeueAtMaxTriesIsAccepted() throws InterruptedException { + BoundedQueue q = new BoundedQueue(20, 2, WhenFull.DROP); + q.add(bytes(3)); + Map.Entry e = q.next(); + q.requeue(e); // tries → 1 + e = q.next(); + q.requeue(e); // tries → 2 == maxTries, should be accepted + assertEquals(0, q.droppedItems); + assertFalse(q.items.isEmpty()); + } + + @Test + public void requeuePastMaxTriesDropsItem() throws InterruptedException { + BoundedQueue q = new BoundedQueue(20, 2, WhenFull.DROP); + byte[] item = bytes(7); + q.add(item); + Map.Entry e = q.next(); + q.requeue(e); // tries → 1 + e = q.next(); + q.requeue(e); // tries → 2 == maxTries, accepted + e = q.next(); + q.requeue(e); // tries → 3 > maxTries, dropped + assertEquals(1, q.droppedItems); + assertEquals(7, q.droppedBytes); + assertEquals(0, q.bytes); + assertTrue(q.items.isEmpty()); + } + + @Test(timeout = 5000) + public void nextBlocksUntilItemAdded() throws InterruptedException { + BoundedQueue q = new BoundedQueue(100, 1, WhenFull.DROP); + byte[] item = bytes(3); + Map.Entry[] result = new Map.Entry[1]; + + Thread consumer = + new Thread( + () -> { + try { + result[0] = q.next(); + } catch (InterruptedException e) { + return; + } + }); + consumer.start(); + + while (!(consumer.getState() == Thread.State.WAITING + || consumer.getState() == Thread.State.TIMED_WAITING)) { + Thread.sleep(50); + } + + q.add(item); + consumer.join(2000); + assertFalse(consumer.isAlive()); + assertSame(item, result[0].getValue()); + } +} diff --git a/dogstatsd-http-forwarder/src/test/java/com/datadoghq/dogstatsd/http/forwarder/KeyComparatorTest.java b/dogstatsd-http-forwarder/src/test/java/com/datadoghq/dogstatsd/http/forwarder/KeyComparatorTest.java new file mode 100644 index 00000000..2c9327e5 --- /dev/null +++ b/dogstatsd-http-forwarder/src/test/java/com/datadoghq/dogstatsd/http/forwarder/KeyComparatorTest.java @@ -0,0 +1,123 @@ +/* Unless explicitly stated otherwise all files in this repository are + * licensed under the Apache 2.0 License. + * + * This product includes software developed at Datadog + * (https://www.datadoghq.com/) Copyright 2026 Datadog, Inc. + */ + +package com.datadoghq.dogstatsd.http.forwarder; + +import static org.junit.Assert.*; + +import org.junit.Test; + +public class KeyComparatorTest { + + private static BoundedQueue.Key key(long clock) { + return new BoundedQueue.Key(clock); + } + + // --- Reflexivity --- + + @Test + public void reflexive() { + BoundedQueue.Key k = key(5); + assertEquals(0, k.compareTo(k)); + } + + // --- Antisymmetry --- + + @Test + public void antisymmetric_sameTries_differentClock() { + BoundedQueue.Key k1 = key(10); + BoundedQueue.Key k2 = key(20); + assertEquals(-Integer.signum(k1.compareTo(k2)), Integer.signum(k2.compareTo(k1))); + } + + @Test + public void antisymmetric_differentTries() { + BoundedQueue.Key k0 = key(5); + BoundedQueue.Key k1 = k0.next(); + assertEquals(-Integer.signum(k0.compareTo(k1)), Integer.signum(k1.compareTo(k0))); + } + + // --- Transitivity --- + + @Test + public void transitive_triesOrdering() { + BoundedQueue.Key k0 = key(1); + BoundedQueue.Key k1 = k0.next(); + BoundedQueue.Key k2 = k1.next(); + assertTrue(k0.compareTo(k1) < 0); + assertTrue(k1.compareTo(k2) < 0); + assertTrue(k0.compareTo(k2) < 0); + } + + @Test + public void transitive_clockOrdering() { + // Within tries=0: larger clock → smaller compareTo → sorts first + BoundedQueue.Key k30 = key(30); + BoundedQueue.Key k20 = key(20); + BoundedQueue.Key k10 = key(10); + assertTrue(k30.compareTo(k20) < 0); + assertTrue(k20.compareTo(k10) < 0); + assertTrue(k30.compareTo(k10) < 0); + } + + // --- Ordering rules --- + + @Test + public void fewerTriesIsLess() { + BoundedQueue.Key k0 = key(1); + BoundedQueue.Key k1 = k0.next(); + BoundedQueue.Key k2 = k1.next(); + assertTrue(k0.compareTo(k1) < 0); + assertTrue(k1.compareTo(k2) < 0); + } + + @Test + public void triesDominatesClockValue() { + BoundedQueue.Key lowTriesLowClock = key(1); // tries=0, clock=1 + BoundedQueue.Key highTriesHighClock = key(9999).next(); // tries=1, clock=9999 + assertTrue(lowTriesLowClock.compareTo(highTriesHighClock) < 0); + } + + @Test + public void sameTries_largerClockSortsFirst() { + BoundedQueue.Key kBig = key(100); // tries=0, clock=100 → sorts before kSmall + BoundedQueue.Key kSmall = key(1); // tries=0, clock=1 + assertTrue(kBig.compareTo(kSmall) < 0); + } + + @Test + public void sameTries_equalClockReturnsZero() { + BoundedQueue.Key k1 = key(50); + BoundedQueue.Key k2 = key(50); + assertEquals(0, k1.compareTo(k2)); + } + + // --- next() behaviour --- + + @Test + public void nextIncrementsTries() { + BoundedQueue.Key k = key(42); + assertEquals(0, k.tries); + assertEquals(42, k.clock); + + BoundedQueue.Key k1 = k.next(); + assertEquals(1, k1.tries); + assertEquals(42, k1.clock); + + BoundedQueue.Key k2 = k1.next(); + assertEquals(2, k2.tries); + assertEquals(42, k2.clock); + } + + @Test + public void nextDoesNotMutateOriginal() { + BoundedQueue.Key k = key(7); + k.next(); + assertEquals(0, k.tries); + assertEquals(7, k.clock); + } +}