diff --git a/dogstatsd-http-core/src/main/java/com/datadoghq/dogstatsd/http/serializer/SketchMetric.java b/dogstatsd-http-core/src/main/java/com/datadoghq/dogstatsd/http/serializer/SketchMetric.java index d7358a5c..970feca3 100644 --- a/dogstatsd-http-core/src/main/java/com/datadoghq/dogstatsd/http/serializer/SketchMetric.java +++ b/dogstatsd-http-core/src/main/java/com/datadoghq/dogstatsd/http/serializer/SketchMetric.java @@ -7,6 +7,9 @@ package com.datadoghq.dogstatsd.http.serializer; +import com.datadoghq.dogstatsd.Sketch; +import java.nio.BufferOverflowException; + /** Builder for sketch timeseries. */ public class SketchMetric extends Metric { @@ -20,44 +23,44 @@ protected SketchMetric self() { } /** - * Add a new timeseries point. + * Add a new timeseries point sourced from a {@link Sketch}. * * @param timestamp Timestamp of the point in seconds since Unix epoch. - * @param sum Total sum of all observed values. - * @param min Minimum observed value. - * @param max Maximum observed value. - * @param cnt Number of observed values. - * @param binKeys Array of keys for each bin in the sketch. - * @param binCnts Array of number of observations for each bin. + * @param sketch Sketch supplying the summary statistics and bin distribution. * @return This. */ - public SketchMetric addPoint( - long timestamp, - double sum, - double min, - double max, - long cnt, - int[] binKeys, - int[] binCnts) { + public SketchMetric addPoint(long timestamp, Sketch sketch) { + final long maxBinCount = (1L << 32) - 1; + final long maxBinBytes = ProtoUtil.varintLen(maxBinCount); - if (binKeys.length != binCnts.length) { - throw new IllegalArgumentException("binKeys and binCnts must have the same length"); + // Skip doing the work if just the bin data would exceed payload size limit. + if (sketch.count() / maxBinCount * maxBinBytes >= pb.maxPayloadSize) { + throw new BufferOverflowException(); } pb.timestamps.put(timestamp); - pb.values.put(sum); - pb.values.put(min); - pb.values.put(max); - pb.counts.put(cnt); + pb.values.put(sketch.sum()); + pb.values.put(sketch.min()); + pb.values.put(sketch.max()); + pb.counts.put(sketch.count()); - ColumnarBuffer r = pb.currentRecord(); - DeltaEncoder dk = new DeltaEncoder(); + final ColumnarBuffer r = pb.currentRecord(); + final DeltaEncoder dk = new DeltaEncoder(); - r.putUint64(Column.sketchNumBins, binKeys.length); - for (int i = 0; i < binKeys.length; i++) { - r.putSint64(Column.sketchBinKeys, dk.encode(binKeys[i])); - r.putUint64(Column.sketchBinCnts, binCnts[i]); - } + r.putUint64(Column.sketchNumBins, sketch.size()); + sketch.bins( + new Sketch.BinConsumer() { + @Override + public void consumeBin(short key, long count) { + while (count > maxBinCount) { + r.putSint64(Column.sketchBinKeys, dk.encode(key)); + r.putUint64(Column.sketchBinCnts, maxBinCount); + count -= maxBinCount; + } + r.putSint64(Column.sketchBinKeys, dk.encode(key)); + r.putUint64(Column.sketchBinCnts, count); + } + }); return this; } diff --git a/dogstatsd-http-core/src/test/java/com/datadoghq/dogstatsd/http/serializer/PayloadBuilderTest.java b/dogstatsd-http-core/src/test/java/com/datadoghq/dogstatsd/http/serializer/PayloadBuilderTest.java index 887a0c5c..6fa11660 100644 --- a/dogstatsd-http-core/src/test/java/com/datadoghq/dogstatsd/http/serializer/PayloadBuilderTest.java +++ b/dogstatsd-http-core/src/test/java/com/datadoghq/dogstatsd/http/serializer/PayloadBuilderTest.java @@ -10,6 +10,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import com.datadoghq.dogstatsd.Sketch; import java.util.ArrayList; import java.util.Arrays; import org.junit.Test; @@ -42,13 +43,20 @@ public void handle(byte[] p) { b.gauge("defgh").addPoint(100, 0).close(); + Sketch sketch1 = new Sketch(); + sketch1.build(new long[] {1, 2, 2}, 1.0); + Sketch sketch2 = new Sketch(); + sketch2.build(new long[] {2, 2, 3, 3, 3}, 5e-10); + b.sketch("ijk") .setTags(Arrays.asList(new String[] {"foo", "baz"})) - .addPoint(100, 4.75, 1.25, 1.75, 3, new int[] {1351, 1373}, new int[] {1, 2}) - .addPoint(110, 6.5, 2.25, 2.75, 5, new int[] {1389, 1402}, new int[] {2, 3}) + .addPoint(100, sketch1) + .addPoint(110, sketch2) .close(); b.rate("lm").setInterval(10).addPoint(100, 3.14).close(); + b.rate("no").addPoint(100, 1).addPoint(110, 1.5).close(); + b.rate("pq").addPoint(100, 1L << 25).addPoint(110, 1.5).close(); b.close(); @@ -60,11 +68,11 @@ public void handle(byte[] p) { new int[] { // MetricData (3 << 3) | 2, - 188, + 243, 1, // dictNameStr (1 << 3) | 2, - 17, + 23, 3, 97, 98, @@ -82,6 +90,12 @@ public void handle(byte[] p) { 2, 108, 109, // lm + 2, + 110, + 111, // no + 2, + 112, + 113, // pq // dictTagsStr (2 << 3) | 2, 12, @@ -135,50 +149,62 @@ public void handle(byte[] p) { 0, // types (10 << 3) | 2, - 4, + 6, 0x11, 0x03, - 0x24, + 0x14, + 0x32, + 0x22, 0x32, // names (11 << 3) | 2, - 4, + 6, + 2, + 2, 2, 2, 2, 2, // tags (12 << 3) | 2, - 4, + 6, 2, 1, 4, 3, + 0, + 0, // resources (13 << 3) | 2, - 4, + 6, 2, 1, 0, 0, + 0, + 0, // intervals (14 << 3) | 2, - 4, + 6, 0, 0, 0, 10, + 0, + 0, // numPoints (15 << 3) | 2, - 4, + 6, 2, 1, 2, 1, + 2, + 2, // timestamps (16 << 3) | 2, 1, - 7, + 11, 200, 1, 20, @@ -186,47 +212,49 @@ public void handle(byte[] p) { 0, 20, 19, + 0, + 20, + 19, + 20, // valsSint64 (17 << 3) | 2, 1, + 19, + 2, 4, + 10, 2, 4, 6, - 10, - // valsFloat32, - // list(pack('