Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@

package com.datadoghq.dogstatsd.http.serializer;

import com.datadoghq.dogstatsd.Sketch;
import java.nio.BufferOverflowException;

/** Builder for sketch timeseries. */
public class SketchMetric extends Metric<SketchMetric> {

Expand All @@ -20,44 +23,44 @@ protected SketchMetric self() {
}

/**
* Add a new timeseries point.
* Add a new timeseries point sourced from a {@link Sketch}.
*
* @param timestamp Timestamp of the point in seconds since Unix epoch.
* @param sum Total sum of all observed values.
* @param min Minimum observed value.
* @param max Maximum observed value.
* @param cnt Number of observed values.
* @param binKeys Array of keys for each bin in the sketch.
* @param binCnts Array of number of observations for each bin.
* @param sketch Sketch supplying the summary statistics and bin distribution.
* @return This.
*/
public SketchMetric addPoint(
long timestamp,
double sum,
double min,
double max,
long cnt,
int[] binKeys,
int[] binCnts) {
public SketchMetric addPoint(long timestamp, Sketch sketch) {
final long maxBinCount = (1L << 32) - 1;
final long maxBinBytes = ProtoUtil.varintLen(maxBinCount);

if (binKeys.length != binCnts.length) {
throw new IllegalArgumentException("binKeys and binCnts must have the same length");
// Skip doing the work if just the bin data would exceed payload size limit.
if (sketch.count() / maxBinCount * maxBinBytes >= pb.maxPayloadSize) {
throw new BufferOverflowException();
}

pb.timestamps.put(timestamp);
pb.values.put(sum);
pb.values.put(min);
pb.values.put(max);
pb.counts.put(cnt);
pb.values.put(sketch.sum());
pb.values.put(sketch.min());
pb.values.put(sketch.max());
pb.counts.put(sketch.count());

ColumnarBuffer r = pb.currentRecord();
DeltaEncoder dk = new DeltaEncoder();
final ColumnarBuffer r = pb.currentRecord();
final DeltaEncoder dk = new DeltaEncoder();

r.putUint64(Column.sketchNumBins, binKeys.length);
for (int i = 0; i < binKeys.length; i++) {
r.putSint64(Column.sketchBinKeys, dk.encode(binKeys[i]));
r.putUint64(Column.sketchBinCnts, binCnts[i]);
}
r.putUint64(Column.sketchNumBins, sketch.size());
sketch.bins(
new Sketch.BinConsumer() {
@Override
public void consumeBin(short key, long count) {
while (count > maxBinCount) {
r.putSint64(Column.sketchBinKeys, dk.encode(key));
r.putUint64(Column.sketchBinCnts, maxBinCount);
count -= maxBinCount;
}
r.putSint64(Column.sketchBinKeys, dk.encode(key));
r.putUint64(Column.sketchBinCnts, count);
}
});

return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import com.datadoghq.dogstatsd.Sketch;
import java.util.ArrayList;
import java.util.Arrays;
import org.junit.Test;
Expand Down Expand Up @@ -42,13 +43,20 @@ public void handle(byte[] p) {

b.gauge("defgh").addPoint(100, 0).close();

Sketch sketch1 = new Sketch();
sketch1.build(new long[] {1, 2, 2}, 1.0);
Sketch sketch2 = new Sketch();
sketch2.build(new long[] {2, 2, 3, 3, 3}, 5e-10);

b.sketch("ijk")
.setTags(Arrays.asList(new String[] {"foo", "baz"}))
.addPoint(100, 4.75, 1.25, 1.75, 3, new int[] {1351, 1373}, new int[] {1, 2})
.addPoint(110, 6.5, 2.25, 2.75, 5, new int[] {1389, 1402}, new int[] {2, 3})
.addPoint(100, sketch1)
.addPoint(110, sketch2)
.close();

b.rate("lm").setInterval(10).addPoint(100, 3.14).close();
b.rate("no").addPoint(100, 1).addPoint(110, 1.5).close();
b.rate("pq").addPoint(100, 1L << 25).addPoint(110, 1.5).close();

b.close();

Expand All @@ -60,11 +68,11 @@ public void handle(byte[] p) {
new int[] {
// MetricData
(3 << 3) | 2,
188,
243,
1,
// dictNameStr
(1 << 3) | 2,
17,
23,
3,
97,
98,
Expand All @@ -82,6 +90,12 @@ public void handle(byte[] p) {
2,
108,
109, // lm
2,
110,
111, // no
2,
112,
113, // pq
// dictTagsStr
(2 << 3) | 2,
12,
Expand Down Expand Up @@ -135,98 +149,112 @@ public void handle(byte[] p) {
0,
// types
(10 << 3) | 2,
4,
6,
0x11,
0x03,
0x24,
0x14,
0x32,
0x22,
0x32,
// names
(11 << 3) | 2,
4,
6,
2,
2,
2,
2,
2,
2,
// tags
(12 << 3) | 2,
4,
6,
2,
1,
4,
3,
0,
0,
// resources
(13 << 3) | 2,
4,
6,
2,
1,
0,
0,
0,
0,
// intervals
(14 << 3) | 2,
4,
6,
0,
0,
0,
10,
0,
0,
// numPoints
(15 << 3) | 2,
4,
6,
2,
1,
2,
1,
2,
2,
// timestamps
(16 << 3) | 2,
1,
7,
11,
200,
1,
20,
19,
0,
20,
19,
0,
20,
19,
20,
// valsSint64
(17 << 3) | 2,
1,
19,
2,
4,
10,
2,
4,
6,
10,
// valsFloat32,
// list(pack('<ffffff', 4.75, 1.25, 1.75, 6.5, 2.25, 2.75))
128,
144,
196,
219,
193,
1,
4,
6,
246,
143,
223,
192,
74,
// valsFloat32, list(pack('<ff', 1, 1.5))
(18 << 3) | 2,
1,
24,
0,
0,
152,
64,
8,
0,
0,
160,
128,
63,
0,
0,
224,
192,
63,
0,
0,
208,
64,
0,
0,
16,
64,
0,
0,
48,
64,
// valsFloat64, list(pack('<d', 3.14))
// valsFloat64, list(pack('<ddd', 3.14, 1<<25, 1.5))
(19 << 3) | 2,
1,
8,
24,
31,
133,
235,
Expand All @@ -235,6 +263,22 @@ public void handle(byte[] p) {
30,
9,
64,
0,
0,
0,
0,
0,
0,
128,
65,
0,
0,
0,
0,
0,
0,
248,
63,
// sketchNumBins
(20 << 3) | 2,
1,
Expand All @@ -244,37 +288,55 @@ public void handle(byte[] p) {
// sketchBinKeys
(21 << 3) | 2,
1,
6,
142,
21,
44,
218,
7,
244,
20,
90,
206,
21,
26,
52,
0,
// sketchBinCnts
(22 << 3) | 2,
1,
4,
17,
1,
2,
2,
3,
254,
207,
172,
243,
14,
255,
255,
255,
255,
15,
254,
247,
130,
173,
6,
// sourceTypeName
(23 << 3) | 2,
1,
4,
6,
0,
0,
0,
0,
0,
0,
// origins
(24 << 3) | 2,
1,
4,
6,
2,
0,
0,
0,
0,
0,
});
}

Expand Down
Loading