Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions ddprof-lib/src/main/cpp/arguments.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,21 @@ Error Arguments::parse(const char *args) {
_remote_symbolication = true;
}

CASE("wallprecheck")
if (value != NULL) {
_wall_precheck = strcmp(value, "false") != 0 && strcmp(value, "0") != 0;
}

CASE("wallpark")
if (value != NULL) {
_wall_park = strcmp(value, "false") != 0 && strcmp(value, "0") != 0;
}

CASE("wallparkmin")
if (value != NULL) {
_wall_park_min_ns = (u64)strtoull(value, NULL, 10);
}

CASE("wallsampler")
if (value != NULL) {
switch (value[0]) {
Expand Down
6 changes: 6 additions & 0 deletions ddprof-lib/src/main/cpp/arguments.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,9 @@ class Arguments {
long _cpu;
long _wall;
bool _wall_collapsing;
bool _wall_precheck;
bool _wall_park;
uint64_t _wall_park_min_ns;
int _wall_threads_per_tick;
WallclockSampler _wallclock_sampler;
long _memory;
Expand Down Expand Up @@ -201,6 +204,9 @@ class Arguments {
_cpu(-1),
_wall(-1),
_wall_collapsing(false),
_wall_precheck(true),
_wall_park(true),
_wall_park_min_ns(1000000ULL), // 1 ms default
_wall_threads_per_tick(DEFAULT_WALL_THREADS_PER_TICK),
_memory(-1),
_record_allocations(false),
Expand Down
5 changes: 5 additions & 0 deletions ddprof-lib/src/main/cpp/counters.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@
X(AGCT_NATIVE_NO_JAVA_CONTEXT, "agct_native_no_java_context") \
X(AGCT_BLOCKED_IN_VM, "agct_blocked_in_vm") \
X(SKIPPED_WALLCLOCK_UNWINDS, "skipped_wallclock_unwinds") \
X(WC_SIGNAL_SKIPPED_SLEEPING, "wc_signals_skipped_sleeping") \
X(WC_SIGNAL_SKIPPED_MONITOR, "wc_signals_skipped_monitor_wait") \
X(WC_SIGNAL_SKIPPED_CONDVAR, "wc_signals_skipped_condvar_wait") \
X(WC_SIGNAL_SKIPPED_OBJECT_WAIT, "wc_signals_skipped_object_wait") \
X(WC_SIGNAL_SKIPPED_PARKED, "wc_signals_skipped_parked") \
X(UNWINDING_TIME_ASYNC, "unwinding_ticks_async") \
X(UNWINDING_TIME_JVMTI, "unwinding_ticks_jvmti") \
X(CALLTRACE_STORAGE_DROPPED, "calltrace_storage_dropped_traces") \
Expand Down
30 changes: 25 additions & 5 deletions ddprof-lib/src/main/cpp/event.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,13 @@ class WallClockEpochEvent {
u32 _num_failed_samples;
u32 _num_exited_threads;
u32 _num_permission_denied;
u32 _num_skipped_sleeping;

WallClockEpochEvent(u64 start_time)
: _dirty(false), _start_time(start_time), _duration_millis(0),
_num_samplable_threads(0), _num_successful_samples(0),
_num_failed_samples(0), _num_exited_threads(0),
_num_permission_denied(0) {}
_num_permission_denied(0), _num_skipped_sleeping(0) {}

bool hasChanged() { return _dirty; }

Expand Down Expand Up @@ -142,6 +143,10 @@ class WallClockEpochEvent {
}
}

void updateNumSkippedSleeping(u32 n) {
if (_num_skipped_sleeping != n) { _dirty = true; _num_skipped_sleeping = n; }
}

void endEpoch(u64 millis) { _duration_millis = millis; }

void clean() { _dirty = false; }
Expand All @@ -155,22 +160,37 @@ class WallClockEpochEvent {
class TraceRootEvent {
public:
u64 _local_root_span_id;
u64 _parent_span_id;
u64 _start_ticks;
u32 _label;
u32 _operation;

TraceRootEvent(u64 local_root_span_id, u32 label, u32 operation)
: _local_root_span_id(local_root_span_id), _label(label),
_operation(operation){};
TraceRootEvent(u64 local_root_span_id, u64 parent_span_id, u64 start_ticks,
u32 label, u32 operation)
: _local_root_span_id(local_root_span_id),
_parent_span_id(parent_span_id), _start_ticks(start_ticks),
_label(label), _operation(operation){};
};

typedef struct TaskBlockEvent {
u64 _start_ticks;
u64 _end_ticks;
u64 _span_id;
u64 _root_span_id;
uintptr_t _blocker;
u64 _unblocking_span_id;
} TaskBlockEvent;

typedef struct QueueTimeEvent {
u64 _start;
u64 _end;
u32 _task;
u32 _scheduler;
u32 _origin;
u32 _queueType;
u32 _queueLength;
u32 _queueLength;
u64 _submitting_span_id;
u64 _consuming_span_id; // 0: use current Context; else: override JFR spanId
} QueueTimeEvent;

#endif // _EVENT_H
114 changes: 111 additions & 3 deletions ddprof-lib/src/main/cpp/flightRecorder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1531,13 +1531,63 @@ void Recording::recordTraceRoot(Buffer *buf, int tid, TraceRootEvent *event) {
flushIfNeeded(buf);
int start = buf->skip(1);
buf->putVar64(T_ENDPOINT);
buf->putVar64(TSC::ticks());
buf->put8(0);
buf->putVar64(event->_start_ticks);
buf->putVar64(TSC::ticks() - event->_start_ticks);
buf->putVar32(tid);
buf->put8(0);
buf->putVar32(event->_label);
buf->putVar32(event->_operation);
buf->putVar64(event->_local_root_span_id);
buf->putVar64(event->_parent_span_id);
writeEventSizePrefix(buf, start);
flushIfNeeded(buf);
}

void Recording::recordTaskBlock(Buffer *buf, int tid, TaskBlockEvent *event) {
flushIfNeeded(buf);
int start = buf->skip(1);
buf->putVar64(T_TASK_BLOCK);
buf->putVar64(event->_start_ticks);
buf->putVar64(event->_end_ticks - event->_start_ticks);
buf->putVar32(tid);
buf->put8(0);
buf->putVar64(event->_span_id);
buf->putVar64(event->_root_span_id);
buf->putVar64((u64)event->_blocker);
buf->putVar64(event->_unblocking_span_id);
writeEventSizePrefix(buf, start);
flushIfNeeded(buf);
}

void Recording::recordSpanNode(Buffer *buf, int tid, u64 spanId, u64 parentSpanId, u64 rootSpanId,
u64 startNanos, u64 durationNanos, u32 encodedOperation, u32 encodedResource) {
// Convert epoch nanoseconds to JFR ticks so that standard JFR tooling (JMC, Mission
// Control) can correlate SpanNode events with other events on the recording timeline.
// _start_time is in microseconds; multiply by 1000 to get the recording epoch in nanos.
u64 start_epoch_nanos = _start_time * 1000ULL;
// Use signed arithmetic: a span that started in a previous JFR chunk has
// startNanos < start_epoch_nanos. Unsigned subtraction would wrap around and
// produce a huge positive u64, making (u64)(negative_double) undefined behaviour.
// With signed delta the result is a negative tick offset, placing the event just
// before the chunk boundary, which is the correct behaviour for pre-chunk spans.
long long delta_nanos = (long long)startNanos - (long long)start_epoch_nanos;
long long delta_ticks = (long long)((double)delta_nanos * TSC::frequency() / NANOTIME_FREQ);
u64 startTicks = (u64)((long long)_start_ticks + delta_ticks);
u64 durationTicks = (u64)((double)durationNanos * TSC::frequency() / NANOTIME_FREQ);

flushIfNeeded(buf);
int start = buf->skip(1);
buf->putVar64(T_SPAN_NODE);
buf->putVar64(startTicks); // startTime (F_TIME_TICKS)
buf->putVar64(durationTicks); // duration (F_DURATION_TICKS)
buf->putVar32(tid); // eventThread (F_CPOOL)
buf->putVar64(spanId);
buf->putVar64(parentSpanId);
buf->putVar64(rootSpanId);
buf->putVar64(startNanos); // startNanos — epoch ns, used by backend extractor
buf->putVar64(durationNanos); // durationNanos — ns
buf->putVar32(encodedOperation);
buf->putVar32(encodedResource);
writeEventSizePrefix(buf, start);
flushIfNeeded(buf);
}
Expand All @@ -1553,7 +1603,33 @@ void Recording::recordQueueTime(Buffer *buf, int tid, QueueTimeEvent *event) {
buf->putVar64(event->_scheduler);
buf->putVar64(event->_queueType);
buf->putVar64(event->_queueLength);
writeContext(buf, Contexts::get());
// The schema declares fields in this order:
// spanId, localRootSpanId, submittingSpanId, contextAttr[0..n]
// writeContext() would emit spanId + localRootSpanId + contextAttrs[0..n] in one shot,
// leaving no room to insert submittingSpanId between localRootSpanId and contextAttrs.
// Inline the context fields manually so submittingSpanId lands at the correct position.
Context &ctx = Contexts::get();
u64 spanId = 0, rootSpanId = 0;
u64 stored = ctx.checksum;
if (stored != 0) {
spanId = ctx.spanId;
rootSpanId = ctx.rootSpanId;
if (stored != Contexts::checksum(spanId, rootSpanId)) {
spanId = 0;
rootSpanId = 0;
}
}
if (event->_consuming_span_id != 0) {
// Java requested an explicit consuming span (e.g. self-loop disambiguation); keep root
// from the active context when consistent with the current trace.
spanId = event->_consuming_span_id;
}
buf->putVar64(spanId); // schema pos: spanId
buf->putVar64(rootSpanId); // schema pos: localRootSpanId
buf->putVar64(event->_submitting_span_id); // schema pos: submittingSpanId (CORRECT position)
for (size_t i = 0; i < Profiler::instance()->numContextAttributes(); i++) {
buf->putVar32(ctx.get_tag(i).value); // schema pos: contextAttr[i]
}
writeEventSizePrefix(buf, start);
flushIfNeeded(buf);
}
Expand Down Expand Up @@ -1745,6 +1821,7 @@ void FlightRecorder::recordTraceRoot(int lock_index, int tid,
if (rec != nullptr) {
Buffer *buf = rec->buffer(lock_index);
rec->recordTraceRoot(buf, tid, event);
rec->addThread(lock_index, tid);
}
}
}
Expand All @@ -1757,6 +1834,37 @@ void FlightRecorder::recordQueueTime(int lock_index, int tid,
if (rec != nullptr) {
Buffer *buf = rec->buffer(lock_index);
rec->recordQueueTime(buf, tid, event);
rec->addThread(lock_index, tid);
}
}
}

void FlightRecorder::recordTaskBlock(int lock_index, int tid,
TaskBlockEvent *event) {
OptionalSharedLockGuard locker(&_rec_lock);
if (locker.ownsLock()) {
Recording* rec = _rec;
if (rec != nullptr) {
Buffer *buf = rec->buffer(lock_index);
rec->recordTaskBlock(buf, tid, event);
rec->addThread(lock_index, tid);
}
}
}

void FlightRecorder::recordSpanNode(int lock_index, int tid, u64 spanId, u64 parentSpanId, u64 rootSpanId,
u64 startNanos, u64 durationNanos, u32 encodedOperation, u32 encodedResource) {
OptionalSharedLockGuard locker(&_rec_lock);
if (locker.ownsLock()) {
Recording* rec = _rec;
if (rec != nullptr) {
Buffer *buf = rec->buffer(lock_index);
rec->recordSpanNode(buf, tid, spanId, parentSpanId, rootSpanId, startNanos, durationNanos, encodedOperation, encodedResource);
// Register the emitting thread in the JFR thread CPOOL so that JMC can resolve
// the eventThread reference. Without this, threads that emit SpanNode events but
// have no CPU/wall profiling samples in the current chunk are absent from the
// CPOOL, causing IMCThread to be null in the backend and threadId=0 ("unknown span").
rec->addThread(lock_index, tid);
}
}
}
Expand Down
6 changes: 6 additions & 0 deletions ddprof-lib/src/main/cpp/flightRecorder.h
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,9 @@ class Recording {
void recordWallClockEpoch(Buffer *buf, WallClockEpochEvent *event);
void recordTraceRoot(Buffer *buf, int tid, TraceRootEvent *event);
void recordQueueTime(Buffer *buf, int tid, QueueTimeEvent *event);
void recordTaskBlock(Buffer *buf, int tid, TaskBlockEvent *event);
void recordSpanNode(Buffer *buf, int tid, u64 spanId, u64 parentSpanId, u64 rootSpanId,
u64 startNanos, u64 durationNanos, u32 encodedOperation, u32 encodedResource);
void recordAllocation(RecordingBuffer *buf, int tid, u64 call_trace_id,
AllocEvent *event);
void recordHeapLiveObject(Buffer *buf, int tid, u64 call_trace_id,
Expand Down Expand Up @@ -344,6 +347,9 @@ class FlightRecorder {
void wallClockEpoch(int lock_index, WallClockEpochEvent *event);
void recordTraceRoot(int lock_index, int tid, TraceRootEvent *event);
void recordQueueTime(int lock_index, int tid, QueueTimeEvent *event);
void recordTaskBlock(int lock_index, int tid, TaskBlockEvent *event);
void recordSpanNode(int lock_index, int tid, u64 spanId, u64 parentSpanId, u64 rootSpanId,
u64 startNanos, u64 durationNanos, u32 encodedOperation, u32 encodedResource);

bool active() const { return _rec != NULL; }

Expand Down
Loading
Loading