diff --git a/ddprof-lib/src/main/cpp/arguments.cpp b/ddprof-lib/src/main/cpp/arguments.cpp index 72b8aec22..18226b322 100644 --- a/ddprof-lib/src/main/cpp/arguments.cpp +++ b/ddprof-lib/src/main/cpp/arguments.cpp @@ -362,6 +362,21 @@ Error Arguments::parse(const char *args) { _remote_symbolication = true; } + CASE("wallprecheck") + if (value != NULL) { + _wall_precheck = strcmp(value, "false") != 0 && strcmp(value, "0") != 0; + } + + CASE("wallpark") + if (value != NULL) { + _wall_park = strcmp(value, "false") != 0 && strcmp(value, "0") != 0; + } + + CASE("wallparkmin") + if (value != NULL) { + _wall_park_min_ns = (u64)strtoull(value, NULL, 10); + } + CASE("wallsampler") if (value != NULL) { switch (value[0]) { diff --git a/ddprof-lib/src/main/cpp/arguments.h b/ddprof-lib/src/main/cpp/arguments.h index 3f2542705..d6484f949 100644 --- a/ddprof-lib/src/main/cpp/arguments.h +++ b/ddprof-lib/src/main/cpp/arguments.h @@ -166,6 +166,9 @@ class Arguments { long _cpu; long _wall; bool _wall_collapsing; + bool _wall_precheck; + bool _wall_park; + uint64_t _wall_park_min_ns; int _wall_threads_per_tick; WallclockSampler _wallclock_sampler; long _memory; @@ -201,6 +204,9 @@ class Arguments { _cpu(-1), _wall(-1), _wall_collapsing(false), + _wall_precheck(true), + _wall_park(true), + _wall_park_min_ns(1000000ULL), // 1 ms default _wall_threads_per_tick(DEFAULT_WALL_THREADS_PER_TICK), _memory(-1), _record_allocations(false), diff --git a/ddprof-lib/src/main/cpp/counters.h b/ddprof-lib/src/main/cpp/counters.h index 275149cac..93b10cbac 100644 --- a/ddprof-lib/src/main/cpp/counters.h +++ b/ddprof-lib/src/main/cpp/counters.h @@ -61,6 +61,11 @@ X(AGCT_NATIVE_NO_JAVA_CONTEXT, "agct_native_no_java_context") \ X(AGCT_BLOCKED_IN_VM, "agct_blocked_in_vm") \ X(SKIPPED_WALLCLOCK_UNWINDS, "skipped_wallclock_unwinds") \ + X(WC_SIGNAL_SKIPPED_SLEEPING, "wc_signals_skipped_sleeping") \ + X(WC_SIGNAL_SKIPPED_MONITOR, "wc_signals_skipped_monitor_wait") \ + X(WC_SIGNAL_SKIPPED_CONDVAR, "wc_signals_skipped_condvar_wait") \ + X(WC_SIGNAL_SKIPPED_OBJECT_WAIT, "wc_signals_skipped_object_wait") \ + X(WC_SIGNAL_SKIPPED_PARKED, "wc_signals_skipped_parked") \ X(UNWINDING_TIME_ASYNC, "unwinding_ticks_async") \ X(UNWINDING_TIME_JVMTI, "unwinding_ticks_jvmti") \ X(CALLTRACE_STORAGE_DROPPED, "calltrace_storage_dropped_traces") \ diff --git a/ddprof-lib/src/main/cpp/event.h b/ddprof-lib/src/main/cpp/event.h index e9363165f..ddde81b2c 100644 --- a/ddprof-lib/src/main/cpp/event.h +++ b/ddprof-lib/src/main/cpp/event.h @@ -98,12 +98,13 @@ class WallClockEpochEvent { u32 _num_failed_samples; u32 _num_exited_threads; u32 _num_permission_denied; + u32 _num_skipped_sleeping; WallClockEpochEvent(u64 start_time) : _dirty(false), _start_time(start_time), _duration_millis(0), _num_samplable_threads(0), _num_successful_samples(0), _num_failed_samples(0), _num_exited_threads(0), - _num_permission_denied(0) {} + _num_permission_denied(0), _num_skipped_sleeping(0) {} bool hasChanged() { return _dirty; } @@ -142,6 +143,10 @@ class WallClockEpochEvent { } } + void updateNumSkippedSleeping(u32 n) { + if (_num_skipped_sleeping != n) { _dirty = true; _num_skipped_sleeping = n; } + } + void endEpoch(u64 millis) { _duration_millis = millis; } void clean() { _dirty = false; } @@ -155,14 +160,27 @@ class WallClockEpochEvent { class TraceRootEvent { public: u64 _local_root_span_id; + u64 _parent_span_id; + u64 _start_ticks; u32 _label; u32 _operation; - TraceRootEvent(u64 local_root_span_id, u32 label, u32 operation) - : _local_root_span_id(local_root_span_id), _label(label), - _operation(operation){}; + TraceRootEvent(u64 local_root_span_id, u64 parent_span_id, u64 start_ticks, + u32 label, u32 operation) + : _local_root_span_id(local_root_span_id), + _parent_span_id(parent_span_id), _start_ticks(start_ticks), + _label(label), _operation(operation){}; }; +typedef struct TaskBlockEvent { + u64 _start_ticks; + u64 _end_ticks; + u64 _span_id; + u64 _root_span_id; + uintptr_t _blocker; + u64 _unblocking_span_id; +} TaskBlockEvent; + typedef struct QueueTimeEvent { u64 _start; u64 _end; @@ -170,7 +188,9 @@ typedef struct QueueTimeEvent { u32 _scheduler; u32 _origin; u32 _queueType; - u32 _queueLength; + u32 _queueLength; + u64 _submitting_span_id; + u64 _consuming_span_id; // 0: use current Context; else: override JFR spanId } QueueTimeEvent; #endif // _EVENT_H diff --git a/ddprof-lib/src/main/cpp/flightRecorder.cpp b/ddprof-lib/src/main/cpp/flightRecorder.cpp index 4adc5f727..8aa0b74ed 100644 --- a/ddprof-lib/src/main/cpp/flightRecorder.cpp +++ b/ddprof-lib/src/main/cpp/flightRecorder.cpp @@ -1531,13 +1531,63 @@ void Recording::recordTraceRoot(Buffer *buf, int tid, TraceRootEvent *event) { flushIfNeeded(buf); int start = buf->skip(1); buf->putVar64(T_ENDPOINT); - buf->putVar64(TSC::ticks()); - buf->put8(0); + buf->putVar64(event->_start_ticks); + buf->putVar64(TSC::ticks() - event->_start_ticks); buf->putVar32(tid); buf->put8(0); buf->putVar32(event->_label); buf->putVar32(event->_operation); buf->putVar64(event->_local_root_span_id); + buf->putVar64(event->_parent_span_id); + writeEventSizePrefix(buf, start); + flushIfNeeded(buf); +} + +void Recording::recordTaskBlock(Buffer *buf, int tid, TaskBlockEvent *event) { + flushIfNeeded(buf); + int start = buf->skip(1); + buf->putVar64(T_TASK_BLOCK); + buf->putVar64(event->_start_ticks); + buf->putVar64(event->_end_ticks - event->_start_ticks); + buf->putVar32(tid); + buf->put8(0); + buf->putVar64(event->_span_id); + buf->putVar64(event->_root_span_id); + buf->putVar64((u64)event->_blocker); + buf->putVar64(event->_unblocking_span_id); + writeEventSizePrefix(buf, start); + flushIfNeeded(buf); +} + +void Recording::recordSpanNode(Buffer *buf, int tid, u64 spanId, u64 parentSpanId, u64 rootSpanId, + u64 startNanos, u64 durationNanos, u32 encodedOperation, u32 encodedResource) { + // Convert epoch nanoseconds to JFR ticks so that standard JFR tooling (JMC, Mission + // Control) can correlate SpanNode events with other events on the recording timeline. + // _start_time is in microseconds; multiply by 1000 to get the recording epoch in nanos. + u64 start_epoch_nanos = _start_time * 1000ULL; + // Use signed arithmetic: a span that started in a previous JFR chunk has + // startNanos < start_epoch_nanos. Unsigned subtraction would wrap around and + // produce a huge positive u64, making (u64)(negative_double) undefined behaviour. + // With signed delta the result is a negative tick offset, placing the event just + // before the chunk boundary, which is the correct behaviour for pre-chunk spans. + long long delta_nanos = (long long)startNanos - (long long)start_epoch_nanos; + long long delta_ticks = (long long)((double)delta_nanos * TSC::frequency() / NANOTIME_FREQ); + u64 startTicks = (u64)((long long)_start_ticks + delta_ticks); + u64 durationTicks = (u64)((double)durationNanos * TSC::frequency() / NANOTIME_FREQ); + + flushIfNeeded(buf); + int start = buf->skip(1); + buf->putVar64(T_SPAN_NODE); + buf->putVar64(startTicks); // startTime (F_TIME_TICKS) + buf->putVar64(durationTicks); // duration (F_DURATION_TICKS) + buf->putVar32(tid); // eventThread (F_CPOOL) + buf->putVar64(spanId); + buf->putVar64(parentSpanId); + buf->putVar64(rootSpanId); + buf->putVar64(startNanos); // startNanos — epoch ns, used by backend extractor + buf->putVar64(durationNanos); // durationNanos — ns + buf->putVar32(encodedOperation); + buf->putVar32(encodedResource); writeEventSizePrefix(buf, start); flushIfNeeded(buf); } @@ -1553,7 +1603,33 @@ void Recording::recordQueueTime(Buffer *buf, int tid, QueueTimeEvent *event) { buf->putVar64(event->_scheduler); buf->putVar64(event->_queueType); buf->putVar64(event->_queueLength); - writeContext(buf, Contexts::get()); + // The schema declares fields in this order: + // spanId, localRootSpanId, submittingSpanId, contextAttr[0..n] + // writeContext() would emit spanId + localRootSpanId + contextAttrs[0..n] in one shot, + // leaving no room to insert submittingSpanId between localRootSpanId and contextAttrs. + // Inline the context fields manually so submittingSpanId lands at the correct position. + Context &ctx = Contexts::get(); + u64 spanId = 0, rootSpanId = 0; + u64 stored = ctx.checksum; + if (stored != 0) { + spanId = ctx.spanId; + rootSpanId = ctx.rootSpanId; + if (stored != Contexts::checksum(spanId, rootSpanId)) { + spanId = 0; + rootSpanId = 0; + } + } + if (event->_consuming_span_id != 0) { + // Java requested an explicit consuming span (e.g. self-loop disambiguation); keep root + // from the active context when consistent with the current trace. + spanId = event->_consuming_span_id; + } + buf->putVar64(spanId); // schema pos: spanId + buf->putVar64(rootSpanId); // schema pos: localRootSpanId + buf->putVar64(event->_submitting_span_id); // schema pos: submittingSpanId (CORRECT position) + for (size_t i = 0; i < Profiler::instance()->numContextAttributes(); i++) { + buf->putVar32(ctx.get_tag(i).value); // schema pos: contextAttr[i] + } writeEventSizePrefix(buf, start); flushIfNeeded(buf); } @@ -1745,6 +1821,7 @@ void FlightRecorder::recordTraceRoot(int lock_index, int tid, if (rec != nullptr) { Buffer *buf = rec->buffer(lock_index); rec->recordTraceRoot(buf, tid, event); + rec->addThread(lock_index, tid); } } } @@ -1757,6 +1834,37 @@ void FlightRecorder::recordQueueTime(int lock_index, int tid, if (rec != nullptr) { Buffer *buf = rec->buffer(lock_index); rec->recordQueueTime(buf, tid, event); + rec->addThread(lock_index, tid); + } + } +} + +void FlightRecorder::recordTaskBlock(int lock_index, int tid, + TaskBlockEvent *event) { + OptionalSharedLockGuard locker(&_rec_lock); + if (locker.ownsLock()) { + Recording* rec = _rec; + if (rec != nullptr) { + Buffer *buf = rec->buffer(lock_index); + rec->recordTaskBlock(buf, tid, event); + rec->addThread(lock_index, tid); + } + } +} + +void FlightRecorder::recordSpanNode(int lock_index, int tid, u64 spanId, u64 parentSpanId, u64 rootSpanId, + u64 startNanos, u64 durationNanos, u32 encodedOperation, u32 encodedResource) { + OptionalSharedLockGuard locker(&_rec_lock); + if (locker.ownsLock()) { + Recording* rec = _rec; + if (rec != nullptr) { + Buffer *buf = rec->buffer(lock_index); + rec->recordSpanNode(buf, tid, spanId, parentSpanId, rootSpanId, startNanos, durationNanos, encodedOperation, encodedResource); + // Register the emitting thread in the JFR thread CPOOL so that JMC can resolve + // the eventThread reference. Without this, threads that emit SpanNode events but + // have no CPU/wall profiling samples in the current chunk are absent from the + // CPOOL, causing IMCThread to be null in the backend and threadId=0 ("unknown span"). + rec->addThread(lock_index, tid); } } } diff --git a/ddprof-lib/src/main/cpp/flightRecorder.h b/ddprof-lib/src/main/cpp/flightRecorder.h index c1ab88262..3fa4fdf13 100644 --- a/ddprof-lib/src/main/cpp/flightRecorder.h +++ b/ddprof-lib/src/main/cpp/flightRecorder.h @@ -278,6 +278,9 @@ class Recording { void recordWallClockEpoch(Buffer *buf, WallClockEpochEvent *event); void recordTraceRoot(Buffer *buf, int tid, TraceRootEvent *event); void recordQueueTime(Buffer *buf, int tid, QueueTimeEvent *event); + void recordTaskBlock(Buffer *buf, int tid, TaskBlockEvent *event); + void recordSpanNode(Buffer *buf, int tid, u64 spanId, u64 parentSpanId, u64 rootSpanId, + u64 startNanos, u64 durationNanos, u32 encodedOperation, u32 encodedResource); void recordAllocation(RecordingBuffer *buf, int tid, u64 call_trace_id, AllocEvent *event); void recordHeapLiveObject(Buffer *buf, int tid, u64 call_trace_id, @@ -344,6 +347,9 @@ class FlightRecorder { void wallClockEpoch(int lock_index, WallClockEpochEvent *event); void recordTraceRoot(int lock_index, int tid, TraceRootEvent *event); void recordQueueTime(int lock_index, int tid, QueueTimeEvent *event); + void recordTaskBlock(int lock_index, int tid, TaskBlockEvent *event); + void recordSpanNode(int lock_index, int tid, u64 spanId, u64 parentSpanId, u64 rootSpanId, + u64 startNanos, u64 durationNanos, u32 encodedOperation, u32 encodedResource); bool active() const { return _rec != NULL; } diff --git a/ddprof-lib/src/main/cpp/javaApi.cpp b/ddprof-lib/src/main/cpp/javaApi.cpp index a354274c6..67acf0460 100644 --- a/ddprof-lib/src/main/cpp/javaApi.cpp +++ b/ddprof-lib/src/main/cpp/javaApi.cpp @@ -153,6 +153,8 @@ JavaCritical_com_datadoghq_profiler_JavaProfiler_filterThreadAdd0() { // Happens when we are not enabled before thread start slot_id = thread_filter->registerThread(); current->setFilterSlotId(slot_id); + thread_filter->setVMThread(slot_id, VMThread::current()); + thread_filter->setProfiledThread(slot_id, current); } if (unlikely(slot_id == -1)) { @@ -199,8 +201,8 @@ Java_com_datadoghq_profiler_JavaProfiler_filterThreadRemove0(JNIEnv *env, extern "C" DLLEXPORT jboolean JNICALL Java_com_datadoghq_profiler_JavaProfiler_recordTrace0( - JNIEnv *env, jclass unused, jlong rootSpanId, jstring endpoint, - jstring operation, jint sizeLimit) { + JNIEnv *env, jclass unused, jlong rootSpanId, jlong parentSpanId, + jlong startTicks, jstring endpoint, jstring operation, jint sizeLimit) { JniString endpoint_str(env, endpoint); u32 endpointLabel = Profiler::instance()->stringLabelMap()->bounded_lookup( endpoint_str.c_str(), endpoint_str.length(), sizeLimit); @@ -212,13 +214,72 @@ Java_com_datadoghq_profiler_JavaProfiler_recordTrace0( operationLabel = Profiler::instance()->contextValueMap()->bounded_lookup( operation_str.c_str(), operation_str.length(), 1 << 16); } - TraceRootEvent event(rootSpanId, endpointLabel, operationLabel); + TraceRootEvent event(rootSpanId, (u64)parentSpanId, (u64)startTicks, + endpointLabel, operationLabel); int tid = ProfiledThread::currentTid(); Profiler::instance()->recordTraceRoot(tid, &event); } return acceptValue; } +extern "C" DLLEXPORT void JNICALL +Java_com_datadoghq_profiler_JavaProfiler_recordTaskBlock0( + JNIEnv *env, jclass unused, jlong startTicks, jlong endTicks, + jlong spanId, jlong rootSpanId, jlong blocker, jlong unblockingSpanId) { + TaskBlockEvent event; + event._start_ticks = (u64)startTicks; + event._end_ticks = (u64)endTicks; + event._span_id = (u64)spanId; + event._root_span_id = (u64)rootSpanId; + event._blocker = (uintptr_t)blocker; + event._unblocking_span_id = (u64)unblockingSpanId; + int tid = ProfiledThread::currentTid(); + Profiler::instance()->recordTaskBlock(tid, &event); +} + +extern "C" DLLEXPORT void JNICALL +Java_com_datadoghq_profiler_JavaProfiler_parkEnter0( + JNIEnv *env, jclass unused, jlong spanId, jlong rootSpanId) { + ProfiledThread *current = ProfiledThread::current(); + if (current != nullptr) { + current->enterPark(TSC::ticks(), (u64)spanId, (u64)rootSpanId); + } +} + +extern "C" DLLEXPORT void JNICALL +Java_com_datadoghq_profiler_JavaProfiler_parkExit0( + JNIEnv *env, jclass unused, jlong blocker, jlong unblockingSpanId) { + ProfiledThread *current = ProfiledThread::current(); + if (current == nullptr) return; + u64 endTicks = TSC::ticks(); + u64 startTicks = current->exitPark(); + if (startTicks == 0) return; + u64 durationNs = TSC::ticks_to_nanos(endTicks - startTicks); + if (durationNs < Profiler::instance()->parkMinDurationNs()) return; + if (current->parkState().span_id == 0) return; // no trace context — skip event + TaskBlockEvent event; + event._start_ticks = startTicks; + event._end_ticks = endTicks; + event._span_id = current->parkState().span_id; + event._root_span_id = current->parkState().root_span_id; + event._blocker = (uintptr_t)blocker; + event._unblocking_span_id = (u64)unblockingSpanId; + int tid = ProfiledThread::currentTid(); + Profiler::instance()->recordTaskBlock(tid, &event); +} + +extern "C" DLLEXPORT void JNICALL +Java_com_datadoghq_profiler_JavaProfiler_recordSpanNode0( + JNIEnv *env, jclass unused, + jlong spanId, jlong parentSpanId, jlong rootSpanId, + jlong startNanos, jlong durationNanos, + jint encodedOperation, jint encodedResource) { + int tid = ProfiledThread::currentTid(); + Profiler::instance()->recordSpanNode(tid, (u64)spanId, (u64)parentSpanId, (u64)rootSpanId, + (u64)startNanos, (u64)durationNanos, + (u32)encodedOperation, (u32)encodedResource); +} + extern "C" DLLEXPORT jint JNICALL Java_com_datadoghq_profiler_JavaProfiler_registerConstant0(JNIEnv *env, jclass unused, @@ -291,7 +352,8 @@ static int dictionarizeClassName(JNIEnv* env, jstring className) { extern "C" DLLEXPORT void JNICALL Java_com_datadoghq_profiler_JavaProfiler_recordQueueEnd0( JNIEnv *env, jclass unused, jlong startTime, jlong endTime, jstring task, - jstring scheduler, jthread origin, jstring queueType, jint queueLength) { + jstring scheduler, jthread origin, jstring queueType, jint queueLength, + jlong submittingSpanId, jlong consumingSpanIdOverride) { int tid = ProfiledThread::currentTid(); if (tid < 0) { return; @@ -321,6 +383,8 @@ Java_com_datadoghq_profiler_JavaProfiler_recordQueueEnd0( event._origin = origin_tid; event._queueType = queue_type_offset; event._queueLength = queueLength; + event._submitting_span_id = (u64)submittingSpanId; + event._consuming_span_id = (u64)consumingSpanIdOverride; Profiler::instance()->recordQueueTime(tid, &event); } diff --git a/ddprof-lib/src/main/cpp/jfrMetadata.cpp b/ddprof-lib/src/main/cpp/jfrMetadata.cpp index 6991a8a12..15d185a89 100644 --- a/ddprof-lib/src/main/cpp/jfrMetadata.cpp +++ b/ddprof-lib/src/main/cpp/jfrMetadata.cpp @@ -176,7 +176,32 @@ void JfrMetadata::initialize( << field("stackTrace", T_STACK_TRACE, "Stack Trace", F_CPOOL) << field("endpoint", T_STRING, "Endpoint", F_CPOOL) << field("operation", T_ATTRIBUTE_VALUE, "Operation", F_CPOOL) - << field("localRootSpanId", T_LONG, "Local Root Span ID")) + << field("localRootSpanId", T_LONG, "Local Root Span ID") + << field("parentSpanId", T_LONG, "Parent Span ID")) + + << (type("datadog.TaskBlock", T_TASK_BLOCK, "Task Block") + << category("Datadog") + << field("startTime", T_LONG, "Start Time", F_TIME_TICKS) + << field("duration", T_LONG, "Duration", F_DURATION_TICKS) + << field("eventThread", T_THREAD, "Event Thread", F_CPOOL) + << field("stackTrace", T_STACK_TRACE, "Stack Trace", F_CPOOL) + << field("spanId", T_LONG, "Span ID") + << field("localRootSpanId", T_LONG, "Local Root Span ID") + << field("blocker", T_LONG, "Blocker Object Hash", F_UNSIGNED) + << field("unblockingSpanId", T_LONG, "Unblocking Span ID")) + + << (type("datadog.SpanNode", T_SPAN_NODE, "Span Node") + << category("Datadog") + << field("startTime", T_LONG, "Start Time", F_TIME_TICKS) + << field("duration", T_LONG, "Duration", F_DURATION_TICKS) + << field("eventThread", T_THREAD, "Event Thread", F_CPOOL) + << field("spanId", T_LONG, "Span ID") + << field("parentSpanId", T_LONG, "Parent Span ID") + << field("localRootSpanId", T_LONG, "Local Root Span ID") + << field("startNanos", T_LONG, "Start Time (epoch ns)") + << field("durationNanos", T_LONG, "Duration (ns)") + << field("encodedOperation", T_INT, "Encoded Operation Name") + << field("encodedResource", T_INT, "Encoded Resource Name")) << (type("datadog.QueueTime", T_QUEUE_TIME, "Queue Time") << category("Datadog") @@ -189,7 +214,8 @@ void JfrMetadata::initialize( << field("queueType", T_CLASS, "Queue Type", F_CPOOL) << field("queueLength", T_INT, "Queue Length on Entry") << field("spanId", T_LONG, "Span ID") - << field("localRootSpanId", T_LONG, "Local Root Span ID") || + << field("localRootSpanId", T_LONG, "Local Root Span ID") + << field("submittingSpanId", T_LONG, "Submitting Span ID") || contextAttributes) << (type("datadog.HeapUsage", T_HEAP_USAGE, "JVM Heap Usage") diff --git a/ddprof-lib/src/main/cpp/jfrMetadata.h b/ddprof-lib/src/main/cpp/jfrMetadata.h index 77da96d3f..bc2997e2e 100644 --- a/ddprof-lib/src/main/cpp/jfrMetadata.h +++ b/ddprof-lib/src/main/cpp/jfrMetadata.h @@ -78,6 +78,8 @@ enum JfrType { T_DATADOG_CLASSREF_CACHE = 124, T_DATADOG_COUNTER = 125, T_UNWIND_FAILURE = 126, + T_TASK_BLOCK = 127, + T_SPAN_NODE = 128, T_ANNOTATION = 200, T_LABEL = 201, T_CATEGORY = 202, diff --git a/ddprof-lib/src/main/cpp/profiler.cpp b/ddprof-lib/src/main/cpp/profiler.cpp index cbcfb7284..70ff734a6 100644 --- a/ddprof-lib/src/main/cpp/profiler.cpp +++ b/ddprof-lib/src/main/cpp/profiler.cpp @@ -114,6 +114,8 @@ void Profiler::onThreadStart(jvmtiEnv *jvmti, JNIEnv *jni, jthread thread) { if (_thread_filter.enabled()) { int slot_id = _thread_filter.registerThread(); current->setFilterSlotId(slot_id); + _thread_filter.setVMThread(slot_id, VMThread::current()); + _thread_filter.setProfiledThread(slot_id, current); _thread_filter.remove(slot_id); // Remove from filtering initially } if (thread != NULL) { @@ -134,6 +136,8 @@ void Profiler::onThreadEnd(jvmtiEnv *jvmti, JNIEnv *jni, jthread thread) { tid = current->tid(); if (_thread_filter.enabled()) { + _thread_filter.setVMThread(slot_id, nullptr); + _thread_filter.setProfiledThread(slot_id, nullptr); _thread_filter.unregisterThread(slot_id); current->setFilterSlotId(-1); } @@ -940,6 +944,110 @@ void Profiler::recordQueueTime(int tid, QueueTimeEvent *event) { _locks[lock_index].unlock(); } +void JNICALL Profiler::MonitorContendedEnterCallback(jvmtiEnv *jvmti, JNIEnv *jni, + jthread thread, jobject object) { + ProfiledThread* thrd = ProfiledThread::current(); + if (thrd == nullptr) return; + Context& ctx = Contexts::get(); + if (ctx.spanId == 0) return; + thrd->_monitor_block.start_ticks = TSC::ticks(); + thrd->_monitor_block.span_id = ctx.spanId; + thrd->_monitor_block.root_span_id = ctx.rootSpanId; + thrd->_monitor_block.obj_addr = (uintptr_t)(void*)object; + thrd->_monitor_block.unblocking_span_id = VMThread::monitorOwnerSpanId((const void*)object); + // If this contention is the re-acquisition of a monitor after Object.wait(), capture + // the current owner as the notifier. Works when the notifier still holds the lock; + // falls back to 0 when the notifier released before this thread contended. + if (thrd->_monitor_wait.obj_addr == (uintptr_t)(void*)object) { + thrd->_monitor_wait.unblocking_span_id = VMThread::monitorOwnerSpanId((const void*)object); + } +} + +void JNICALL Profiler::MonitorContendedEnteredCallback(jvmtiEnv *jvmti, JNIEnv *jni, + jthread thread, jobject object) { + ProfiledThread* thrd = ProfiledThread::current(); + if (thrd == nullptr || thrd->_monitor_block.obj_addr == 0) return; + + TaskBlockEvent event; + event._start_ticks = thrd->_monitor_block.start_ticks; + event._end_ticks = TSC::ticks(); + event._span_id = thrd->_monitor_block.span_id; + event._root_span_id = thrd->_monitor_block.root_span_id; + event._blocker = thrd->_monitor_block.obj_addr; + event._unblocking_span_id = thrd->_monitor_block.unblocking_span_id; + + thrd->_monitor_block.obj_addr = 0; + + instance()->recordTaskBlock(ProfiledThread::currentTid(), &event); +} + +// Object.wait() coverage for JDK 8-20: wait(long) is native on those versions, so BCI cannot +// instrument it. MonitorWait fires at wait() entry; MonitorWaited fires at wait() exit (both on +// the waiting thread). Span context is read from Contexts::get() — the C++ thread_local that is +// kept up to date by the Java side via setContext() on every span activation. +// On JDK 21+, these callbacks are NOT registered (ObjectWaitProfilingInstrumentation handles it). +void JNICALL Profiler::MonitorWaitCallback(jvmtiEnv *jvmti, JNIEnv *jni, + jthread thread, jobject object, jlong timeout) { + ProfiledThread* thrd = ProfiledThread::current(); + if (thrd == nullptr) return; + Context& ctx = Contexts::get(); + if (ctx.spanId == 0) return; + thrd->_monitor_wait.start_ticks = TSC::ticks(); + thrd->_monitor_wait.span_id = ctx.spanId; + thrd->_monitor_wait.root_span_id = ctx.rootSpanId; + thrd->_monitor_wait.obj_addr = (uintptr_t)(void*)object; +} + +void JNICALL Profiler::MonitorWaitedCallback(jvmtiEnv *jvmti, JNIEnv *jni, + jthread thread, jobject object, jboolean timed_out) { + ProfiledThread* thrd = ProfiledThread::current(); + if (thrd == nullptr || thrd->_monitor_wait.obj_addr == 0) return; + + u64 end_ticks = TSC::ticks(); + // Duration filter: skip waits shorter than 1 ms (matches BCI and LockSupport thresholds). + if (TSC::ticks_to_nanos(end_ticks - thrd->_monitor_wait.start_ticks) < 1'000'000ULL) { + thrd->_monitor_wait.obj_addr = 0; + return; + } + + TaskBlockEvent event; + event._start_ticks = thrd->_monitor_wait.start_ticks; + event._end_ticks = end_ticks; + event._span_id = thrd->_monitor_wait.span_id; + event._root_span_id = thrd->_monitor_wait.root_span_id; + event._blocker = thrd->_monitor_wait.obj_addr; + // unblocking_span_id is captured in MonitorContendedEnterCallback when the waiting thread + // contends for the lock and the notifier still holds it. Zero if the notifier released first. + event._unblocking_span_id = thrd->_monitor_wait.unblocking_span_id; + + thrd->_monitor_wait.obj_addr = 0; + thrd->_monitor_wait.unblocking_span_id = 0; + instance()->recordTaskBlock(ProfiledThread::currentTid(), &event); +} + +void Profiler::recordTaskBlock(int tid, TaskBlockEvent *event) { + u32 lock_index = getLockIndex(tid); + if (!_locks[lock_index].tryLock() && + !_locks[lock_index = (lock_index + 1) % CONCURRENCY_LEVEL].tryLock() && + !_locks[lock_index = (lock_index + 2) % CONCURRENCY_LEVEL].tryLock()) { + return; + } + _jfr.recordTaskBlock(lock_index, tid, event); + _locks[lock_index].unlock(); +} + +void Profiler::recordSpanNode(int tid, u64 spanId, u64 parentSpanId, u64 rootSpanId, + u64 startNanos, u64 durationNanos, u32 encodedOperation, u32 encodedResource) { + u32 lock_index = getLockIndex(tid); + if (!_locks[lock_index].tryLock() && + !_locks[lock_index = (lock_index + 1) % CONCURRENCY_LEVEL].tryLock() && + !_locks[lock_index = (lock_index + 2) % CONCURRENCY_LEVEL].tryLock()) { + return; + } + _jfr.recordSpanNode(lock_index, tid, spanId, parentSpanId, rootSpanId, startNanos, durationNanos, encodedOperation, encodedResource); + _locks[lock_index].unlock(); +} + void Profiler::recordExternalSample(u64 weight, int tid, int num_frames, ASGCT_CallFrame *frames, bool truncated, jint event_type, Event *event) { @@ -1389,6 +1497,7 @@ Error Profiler::start(Arguments &args, bool reset) { if (!VMStructs::hasCompilerStructs()) { _features.comp_task = 0; } + _wall_park_min_ns = args._wall_park_min_ns; _safe_mode = args._safe_mode; if (VM::hotspot_version() < 8 || VM::isZing()) { _safe_mode |= GC_TRACES | LAST_JAVA_PC; @@ -1403,6 +1512,8 @@ Error Profiler::start(Arguments &args, bool reset) { if (current != nullptr) { int slot_id = _thread_filter.registerThread(); current->setFilterSlotId(slot_id); + _thread_filter.setVMThread(slot_id, VMThread::current()); + _thread_filter.setProfiledThread(slot_id, current); _thread_filter.remove(slot_id); // Remove from filtering initially (matches onThreadStart behavior) } } diff --git a/ddprof-lib/src/main/cpp/profiler.h b/ddprof-lib/src/main/cpp/profiler.h index 285c64805..ccb8a8a53 100644 --- a/ddprof-lib/src/main/cpp/profiler.h +++ b/ddprof-lib/src/main/cpp/profiler.h @@ -121,6 +121,7 @@ class alignas(alignof(SpinLock)) Profiler { Dictionary _string_label_map; Dictionary _context_value_map; ThreadFilter _thread_filter; + u64 _wall_park_min_ns{1000000ULL}; CallTraceStorage _call_trace_storage; FlightRecorder _jfr; Engine *_cpu_engine; @@ -247,6 +248,7 @@ class alignas(alignof(SpinLock)) Profiler { Dictionary *contextValueMap() { return &_context_value_map; } u32 numContextAttributes() { return _num_context_attributes; } ThreadFilter *threadFilter() { return &_thread_filter; } + u64 parkMinDurationNs() const { return _wall_park_min_ns; } int lookupClass(const char *key, size_t length); void processCallTraces(std::function&)> processor) { @@ -375,6 +377,9 @@ class alignas(alignof(SpinLock)) Profiler { void recordWallClockEpoch(int tid, WallClockEpochEvent *event); void recordTraceRoot(int tid, TraceRootEvent *event); void recordQueueTime(int tid, QueueTimeEvent *event); + void recordTaskBlock(int tid, TaskBlockEvent *event); + void recordSpanNode(int tid, u64 spanId, u64 parentSpanId, u64 rootSpanId, + u64 startNanos, u64 durationNanos, u32 encodedOperation, u32 encodedResource); void writeLog(LogLevel level, const char *message); void writeLog(LogLevel level, const char *message, size_t len); void writeDatadogProfilerSetting(int tid, int length, const char *name, @@ -420,6 +425,18 @@ class alignas(alignof(SpinLock)) Profiler { instance()->onThreadEnd(jvmti, jni, thread); } + static void JNICALL MonitorContendedEnterCallback(jvmtiEnv *jvmti, JNIEnv *jni, + jthread thread, jobject object); + static void JNICALL MonitorContendedEnteredCallback(jvmtiEnv *jvmti, JNIEnv *jni, + jthread thread, jobject object); + + // Object.wait() coverage for JDK 8-20 (wait(long) is native there; BCI cannot instrument it). + // On JDK 21+, ObjectWaitProfilingInstrumentation (BCI) handles this instead. + static void JNICALL MonitorWaitCallback(jvmtiEnv *jvmti, JNIEnv *jni, + jthread thread, jobject object, jlong timeout); + static void JNICALL MonitorWaitedCallback(jvmtiEnv *jvmti, JNIEnv *jni, + jthread thread, jobject object, jboolean timed_out); + // Keep backward compatibility with the upstream async-profiler inline CodeCache* findLibraryByAddress(const void *address) { #ifdef DEBUG diff --git a/ddprof-lib/src/main/cpp/thread.cpp b/ddprof-lib/src/main/cpp/thread.cpp index 5457f3fab..b73a0dea4 100644 --- a/ddprof-lib/src/main/cpp/thread.cpp +++ b/ddprof-lib/src/main/cpp/thread.cpp @@ -133,6 +133,15 @@ ProfiledThread *ProfiledThread::currentSignalSafe() { return __atomic_load_n(&_tls_key_initialized, __ATOMIC_ACQUIRE) ? (ProfiledThread *)pthread_getspecific(_tls_key) : nullptr; } +ProfiledThread* ProfiledThread::findByTid(int tid) { + int size = __atomic_load_n(&_running_buffer_pos, __ATOMIC_ACQUIRE); + for (int i = 0; i < size; i++) { + ProfiledThread* t = _buffer[i]; + if (t != nullptr && t->_tid == tid) return t; + } + return nullptr; +} + int ProfiledThread::popFreeSlot() { int current_top; int new_top; diff --git a/ddprof-lib/src/main/cpp/thread.h b/ddprof-lib/src/main/cpp/thread.h index 4cb12d0ca..0a5f6158f 100644 --- a/ddprof-lib/src/main/cpp/thread.h +++ b/ddprof-lib/src/main/cpp/thread.h @@ -74,12 +74,46 @@ class ProfiledThread : public ThreadLocalData { ProfiledThread(int buffer_pos, int tid) : ThreadLocalData(), _pc(0), _sp(0), _span_id(0), _root_span_id(0), _crash_depth(0), _buffer_pos(buffer_pos), _tid(tid), _cpu_epoch(0), - _wall_epoch(0), _call_trace_id(0), _recording_epoch(0), _misc_flags(0), _filter_slot_id(-1), _ctx_tls_initialized(false), _crash_protection_active(false), _ctx_tls_ptr(nullptr) {}; + _wall_epoch(0), _call_trace_id(0), _recording_epoch(0), _misc_flags(0), _filter_slot_id(-1), _ctx_tls_initialized(false), _crash_protection_active(false), _ctx_tls_ptr(nullptr), _monitor_block{} {}; void releaseFromBuffer(); public: + // In-flight state for monitor contention tracking (MonitorContendedEnter → + // MonitorContendedEntered). Keyed on obj_addr == 0 meaning "no contention in + // progress". Object addresses are never 0 in the JVM. + struct MonitorBlockState { + u64 start_ticks; + u64 span_id; + u64 root_span_id; + uintptr_t obj_addr; + u64 unblocking_span_id; + }; + MonitorBlockState _monitor_block{}; + + // Park state for Approach 2 signal suppression and off-CPU interval recording. + // start_ticks is the "is parked" flag: 0 = not parked, non-zero = parked since that TSC tick. + // Written by the parking thread; read by the timer thread for signal-suppression check. + struct ParkState { + std::atomic start_ticks{0}; + u64 span_id{0}; + u64 root_span_id{0}; + }; + ParkState _park_state{}; + + // In-flight state for Object.wait() tracking on JDK 8-20 (MonitorWait → MonitorWaited). + // Keyed on obj_addr == 0 meaning "not waiting". Mirrors MonitorBlockState layout. + struct MonitorWaitState { + u64 start_ticks{0}; + u64 span_id{0}; + u64 root_span_id{0}; + uintptr_t obj_addr{0}; + u64 unblocking_span_id{0}; // captured via MonitorContendedEnter, 0 if not available + }; + MonitorWaitState _monitor_wait{}; + static ProfiledThread *forTid(int tid) { return new ProfiledThread(-1, tid); } + static ProfiledThread* findByTid(int tid); static ProfiledThread *inBuffer(int buffer_pos) { return new ProfiledThread(buffer_pos, 0); } @@ -163,6 +197,22 @@ class ProfiledThread : public ThreadLocalData { int filterSlotId() { return _filter_slot_id; } void setFilterSlotId(int slotId) { _filter_slot_id = slotId; } + + void enterPark(u64 ticks, u64 span_id, u64 root_span_id) { + _park_state.span_id = span_id; + _park_state.root_span_id = root_span_id; + _park_state.start_ticks.store(ticks, std::memory_order_release); + } + + u64 exitPark() { + return _park_state.start_ticks.exchange(0, std::memory_order_acq_rel); + } + + bool isParked() const { + return _park_state.start_ticks.load(std::memory_order_acquire) != 0; + } + + const ParkState& parkState() const { return _park_state; } // Signal handler reentrancy protection bool tryEnterCriticalSection() { diff --git a/ddprof-lib/src/main/cpp/threadFilter.cpp b/ddprof-lib/src/main/cpp/threadFilter.cpp index 77e6dfb53..644aea601 100644 --- a/ddprof-lib/src/main/cpp/threadFilter.cpp +++ b/ddprof-lib/src/main/cpp/threadFilter.cpp @@ -22,6 +22,8 @@ #include "threadFilter.h" #include "arch.h" #include "os.h" +#include "thread.h" +#include "vmStructs.h" #include #include #include @@ -284,6 +286,50 @@ void ThreadFilter::collect(std::vector& tids) const { } } +void ThreadFilter::setVMThread(SlotID slot_id, VMThread* vm_thread) { + if (slot_id < 0) return; + int chunk_idx = slot_id >> kChunkShift; + int slot_idx = slot_id & kChunkMask; + ChunkStorage* chunk = _chunks[chunk_idx].load(std::memory_order_acquire); + if (chunk != nullptr) { + chunk->slots[slot_idx].vm_thread.store(vm_thread, std::memory_order_release); + } +} + +void ThreadFilter::setProfiledThread(SlotID slot_id, ProfiledThread* profiled_thread) { + if (slot_id < 0) return; + int chunk_idx = slot_id >> kChunkShift; + int slot_idx = slot_id & kChunkMask; + ChunkStorage* chunk = _chunks[chunk_idx].load(std::memory_order_acquire); + if (chunk != nullptr) { + chunk->slots[slot_idx].profiled_thread.store(profiled_thread, std::memory_order_release); + } +} + +void ThreadFilter::collectWithState(std::vector& entries) const { + entries.clear(); + entries.reserve(512); + + int num_chunks = _num_chunks.load(std::memory_order_relaxed); + for (int chunk_idx = 0; chunk_idx < num_chunks; ++chunk_idx) { + ChunkStorage* chunk = _chunks[chunk_idx].load(std::memory_order_acquire); + if (chunk == nullptr) continue; + + for (const auto& slot : chunk->slots) { + int slot_tid = slot.value.load(std::memory_order_acquire); + if (slot_tid != -1) { + VMThread* vm = slot.vm_thread.load(std::memory_order_acquire); + ProfiledThread* pt = slot.profiled_thread.load(std::memory_order_acquire); + entries.push_back({slot_tid, vm, pt}); + } + } + } + + if (entries.capacity() > entries.size() * 2) { + entries.shrink_to_fit(); + } +} + void ThreadFilter::init(const char* filter) { // Simple logic: any filter value (including "0") enables filtering // Only explicitly registered threads via addThread() will be sampled diff --git a/ddprof-lib/src/main/cpp/threadFilter.h b/ddprof-lib/src/main/cpp/threadFilter.h index 2440d4351..a57b71b66 100644 --- a/ddprof-lib/src/main/cpp/threadFilter.h +++ b/ddprof-lib/src/main/cpp/threadFilter.h @@ -24,6 +24,15 @@ #include "arch.h" +class VMThread; +class ProfiledThread; + +struct ThreadEntry { + int tid; + VMThread* vm_thread; + ProfiledThread* profiled_thread; +}; + class ThreadFilter { public: using SlotID = int; @@ -48,15 +57,21 @@ class ThreadFilter { void add(int tid, SlotID slot_id); void remove(SlotID slot_id); void collect(std::vector& tids) const; + void setVMThread(SlotID slot_id, VMThread* vm_thread); + void setProfiledThread(SlotID slot_id, ProfiledThread* profiled_thread); + void collectWithState(std::vector& entries) const; SlotID registerThread(); void unregisterThread(SlotID slot_id); private: - // Optimized slot structure with padding to avoid false sharing + // Optimized slot structure with padding to avoid false sharing. + // Pointers are placed before the int to avoid implicit alignment padding between them. struct alignas(DEFAULT_CACHE_LINE_SIZE) Slot { - std::atomic value{-1}; - char padding[DEFAULT_CACHE_LINE_SIZE - sizeof(value)]; // Pad to cache line size + std::atomic vm_thread{nullptr}; // 8 bytes + std::atomic profiled_thread{nullptr}; // 8 bytes + std::atomic value{-1}; // 4 bytes + char padding[DEFAULT_CACHE_LINE_SIZE - sizeof(vm_thread) - sizeof(profiled_thread) - sizeof(value)]; }; static_assert(sizeof(Slot) == DEFAULT_CACHE_LINE_SIZE, "Slot must be exactly one cache line"); diff --git a/ddprof-lib/src/main/cpp/tsc.h b/ddprof-lib/src/main/cpp/tsc.h index 70e7e2a28..7aa679412 100644 --- a/ddprof-lib/src/main/cpp/tsc.h +++ b/ddprof-lib/src/main/cpp/tsc.h @@ -106,6 +106,13 @@ class TSC { static u64 ticks_to_millis(u64 ticks) { return TSC_SUPPORTED ? 1000 * ticks / frequency() : ticks / 1000 / 1000; } + + // Convert ticks to nanoseconds (overflow-safe: divide first, then scale remainder) + static u64 ticks_to_nanos(u64 ticks) { + if (!TSC_SUPPORTED) return ticks; + u64 freq = frequency(); + return (ticks / freq) * 1000000000ULL + (ticks % freq) * 1000000000ULL / freq; + } }; #endif // _TSC_H diff --git a/ddprof-lib/src/main/cpp/vmEntry.cpp b/ddprof-lib/src/main/cpp/vmEntry.cpp index 272c6e053..5b8bd28d7 100644 --- a/ddprof-lib/src/main/cpp/vmEntry.cpp +++ b/ddprof-lib/src/main/cpp/vmEntry.cpp @@ -435,6 +435,14 @@ bool VM::initProfilerBridge(JavaVM *vm, bool attach) { callbacks.SampledObjectAlloc = ObjectSampler::SampledObjectAlloc; callbacks.GarbageCollectionFinish = LivenessTracker::GarbageCollectionFinish; callbacks.NativeMethodBind = VMStructs::NativeMethodBind; + callbacks.MonitorContendedEnter = Profiler::MonitorContendedEnterCallback; + callbacks.MonitorContendedEntered = Profiler::MonitorContendedEnteredCallback; + // Object.wait() coverage for JDK 8-20 only: on JDK 21+, wait(long) is pure Java and + // ObjectWaitProfilingInstrumentation (BCI in dd-trace-java) handles it instead. + if (VM::java_version() < 21) { + callbacks.MonitorWait = Profiler::MonitorWaitCallback; + callbacks.MonitorWaited = Profiler::MonitorWaitedCallback; + } _jvmti->SetEventCallbacks(&callbacks, sizeof(callbacks)); _jvmti->SetEventNotificationMode(JVMTI_ENABLE, JVMTI_EVENT_VM_DEATH, NULL); @@ -445,6 +453,12 @@ bool VM::initProfilerBridge(JavaVM *vm, bool attach) { JVMTI_EVENT_DYNAMIC_CODE_GENERATED, NULL); _jvmti->SetEventNotificationMode(JVMTI_ENABLE, JVMTI_EVENT_NATIVE_METHOD_BIND, NULL); + _jvmti->SetEventNotificationMode(JVMTI_ENABLE, JVMTI_EVENT_MONITOR_CONTENDED_ENTER, NULL); + _jvmti->SetEventNotificationMode(JVMTI_ENABLE, JVMTI_EVENT_MONITOR_CONTENDED_ENTERED, NULL); + if (VM::java_version() < 21) { + _jvmti->SetEventNotificationMode(JVMTI_ENABLE, JVMTI_EVENT_MONITOR_WAIT, NULL); + _jvmti->SetEventNotificationMode(JVMTI_ENABLE, JVMTI_EVENT_MONITOR_WAITED, NULL); + } if (hotspot_version() == 0 || !CodeHeap::available()) { // Workaround for JDK-8173361: avoid CompiledMethodLoad events when possible diff --git a/ddprof-lib/src/main/cpp/vmStructs.cpp b/ddprof-lib/src/main/cpp/vmStructs.cpp index 00ed4f991..3d8700683 100644 --- a/ddprof-lib/src/main/cpp/vmStructs.cpp +++ b/ddprof-lib/src/main/cpp/vmStructs.cpp @@ -9,6 +9,8 @@ #include #include "vmStructs.h" #include "vmEntry.h" +#include "context.h" +#include "thread.h" #include "j9Ext.h" #include "jniHelper.h" #include "jvmHeap.h" @@ -675,6 +677,38 @@ int VMThread::osThreadId() { return -1; } +u64 VMThread::monitorOwnerSpanId(const void* object) { + if (_monitor_owner_offset == 0) return 0; + + // Read the mark word from the object header. + uintptr_t mark = (uintptr_t)SafeAccess::load((void**)object, nullptr); + + // At MonitorContendedEnter time the monitor must be inflated (MONITOR_BIT set + // in bits [1:0]). Guard defensively for any narrow races. + if ((mark & 0x3) != MONITOR_BIT) return 0; + + // Extract ObjectMonitor* (pointer stored in bits 63:2, 4-byte aligned). + const char* monitor = (const char*)(mark & ~(uintptr_t)0x3); + + // Read ObjectMonitor::_owner (JavaThread*). + const char* owner_thread = (const char*)SafeAccess::load( + (void**)(monitor + _monitor_owner_offset), nullptr); + if (owner_thread == nullptr) return 0; + + // JavaThread* -> OS thread ID via existing VMThread infrastructure. + int owner_tid = VMThread::cast(owner_thread)->osThreadId(); + if (owner_tid <= 0) return 0; + + // OS thread ID -> ProfiledThread* -> span ID. + ProfiledThread* owner = ProfiledThread::findByTid(owner_tid); + if (owner == nullptr || !owner->isContextTlsInitialized()) return 0; + + Context* ctx = owner->getContextTlsPtr(); + if (ctx == nullptr) return 0; + + return ctx->spanId; // volatile u64, safe to read cross-thread on x86/aarch64 +} + JNIEnv* VMThread::jni() { if (_env_offset < 0) { return VM::jni(); // fallback for non-HotSpot JVM diff --git a/ddprof-lib/src/main/cpp/vmStructs.h b/ddprof-lib/src/main/cpp/vmStructs.h index 9a51fdc08..6a47edd03 100644 --- a/ddprof-lib/src/main/cpp/vmStructs.h +++ b/ddprof-lib/src/main/cpp/vmStructs.h @@ -196,6 +196,9 @@ typedef void* address; field(_osthread_id_offset, offset, MATCH_SYMBOLS("_thread_id")) \ field_with_version(_osthread_state_offset, offset, 10, MAX_VERSION, MATCH_SYMBOLS("_state")) \ type_end() \ + type_begin(VMObjectMonitor, MATCH_SYMBOLS("ObjectMonitor")) \ + field(_monitor_owner_offset, offset, MATCH_SYMBOLS("_owner")) \ + type_end() \ type_begin(VMThreadShadow, MATCH_SYMBOLS("ThreadShadow")) \ field(_thread_exception_offset, offset, MATCH_SYMBOLS("_exception_file")) \ type_end() \ @@ -714,6 +717,11 @@ DECLARE(VMThread) int osThreadId(); + // Returns the span ID of the JavaThread currently owning the given monitor + // object, reading ObjectMonitor::_owner directly without a safepoint. + // Returns 0 if the owner cannot be determined. + static u64 monitorOwnerSpanId(const void* object); + JNIEnv* jni(); const void** vtable() { diff --git a/ddprof-lib/src/main/cpp/wallClock.cpp b/ddprof-lib/src/main/cpp/wallClock.cpp index 7bd0c6a9d..61d227bf6 100644 --- a/ddprof-lib/src/main/cpp/wallClock.cpp +++ b/ddprof-lib/src/main/cpp/wallClock.cpp @@ -150,24 +150,22 @@ bool BaseWallClock::isEnabled() const { } void WallClockASGCT::initialize(Arguments& args) { - _collapsing = args._wall_collapsing; + _collapsing = args._wall_collapsing; + _precheck = args._wall_precheck; + _park_check = args._wall_park; OS::installSignalHandler(SIGVTALRM, sharedSignalHandler); } void WallClockASGCT::timerLoop() { - // todo: re-allocating the vector every time is not efficient - auto collectThreads = [&](std::vector& tids) { - // Get thread IDs from the filter if it's enabled - // Otherwise list all threads in the system + auto collectThreads = [&](std::vector& entries) { if (Profiler::instance()->threadFilter()->enabled()) { - Profiler::instance()->threadFilter()->collect(tids); + Profiler::instance()->threadFilter()->collectWithState(entries); } else { ThreadList *thread_list = OS::listThreads(); while (thread_list->hasNext()) { int tid = thread_list->next(); - // Don't include the current thread if (tid != OS::threadId()) { - tids.push_back(tid); + entries.push_back({tid, nullptr, nullptr}); } tid = thread_list->next(); } @@ -175,16 +173,33 @@ void WallClockASGCT::timerLoop() { } }; - auto sampleThreads = [&](int tid, int& num_failures, int& threads_already_exited, int& permission_denied) { - if (!OS::sendSignalToThread(tid, SIGVTALRM)) { + auto sampleThreads = [&](ThreadEntry entry, int& num_failures, int& threads_already_exited, int& permission_denied) { + if (_precheck && entry.vm_thread != nullptr) { + OSThreadState state = entry.vm_thread->osThreadState(); + switch (state) { + case OSThreadState::SLEEPING: + Counters::increment(WC_SIGNAL_SKIPPED_SLEEPING); return false; + case OSThreadState::MONITOR_WAIT: + Counters::increment(WC_SIGNAL_SKIPPED_MONITOR); return false; + case OSThreadState::CONDVAR_WAIT: + Counters::increment(WC_SIGNAL_SKIPPED_CONDVAR); return false; + case OSThreadState::OBJECT_WAIT: + Counters::increment(WC_SIGNAL_SKIPPED_OBJECT_WAIT); return false; + default: break; + } + } + if (_park_check && entry.profiled_thread != nullptr && entry.profiled_thread->isParked()) { + Counters::increment(WC_SIGNAL_SKIPPED_PARKED); return false; + } + if (!OS::sendSignalToThread(entry.tid, SIGVTALRM)) { num_failures++; if (errno != 0) { if (errno == ESRCH) { - threads_already_exited++; + threads_already_exited++; } else if (errno == EPERM) { - permission_denied++; + permission_denied++; } else { - Log::debug("unexpected error %s", strerror(errno)); + Log::debug("unexpected error %s", strerror(errno)); } } return false; @@ -195,5 +210,5 @@ void WallClockASGCT::timerLoop() { auto doNothing = []() { }; - timerLoopCommon(collectThreads, sampleThreads, doNothing, _reservoir_size, _interval); + timerLoopCommon(collectThreads, sampleThreads, doNothing, _reservoir_size, _interval); } diff --git a/ddprof-lib/src/main/cpp/wallClock.h b/ddprof-lib/src/main/cpp/wallClock.h index 66201cde6..638a22311 100644 --- a/ddprof-lib/src/main/cpp/wallClock.h +++ b/ddprof-lib/src/main/cpp/wallClock.h @@ -139,6 +139,8 @@ class BaseWallClock : public Engine { class WallClockASGCT : public BaseWallClock { private: bool _collapsing; + bool _precheck; + bool _park_check; static bool inSyscall(void* ucontext); @@ -149,7 +151,7 @@ class WallClockASGCT : public BaseWallClock { void timerLoop() override; public: - WallClockASGCT() : BaseWallClock(), _collapsing(false) {} + WallClockASGCT() : BaseWallClock(), _collapsing(false), _precheck(true), _park_check(true) {} const char* name() override { return "WallClock (ASGCT)"; } diff --git a/ddprof-lib/src/main/java/com/datadoghq/profiler/JavaProfiler.java b/ddprof-lib/src/main/java/com/datadoghq/profiler/JavaProfiler.java index c49b4479e..6f58b408e 100644 --- a/ddprof-lib/src/main/java/com/datadoghq/profiler/JavaProfiler.java +++ b/ddprof-lib/src/main/java/com/datadoghq/profiler/JavaProfiler.java @@ -161,11 +161,26 @@ public String execute(String command) throws IllegalArgumentException, IllegalSt return execute0(command); } + /** + * Records the completion of the trace root with causal DAG metadata. + * @param rootSpanId the local root span ID + * @param parentSpanId the parent span ID (0 if none) + * @param startTicks TSC tick captured at span start via {@link #getCurrentTicks()} + * @param endpoint the endpoint/resource name + * @param operation the operation name + * @param sizeLimit max number of distinct endpoints to track + */ + public boolean recordTraceRoot(long rootSpanId, long parentSpanId, long startTicks, + String endpoint, String operation, int sizeLimit) { + return recordTrace0(rootSpanId, parentSpanId, startTicks, endpoint, operation, sizeLimit); + } + /** * Records the completion of the trace root */ + @Deprecated public boolean recordTraceRoot(long rootSpanId, String endpoint, String operation, int sizeLimit) { - return recordTrace0(rootSpanId, endpoint, operation, sizeLimit); + return recordTrace0(rootSpanId, 0L, 0L, endpoint, operation, sizeLimit); } /** @@ -173,7 +188,35 @@ public boolean recordTraceRoot(long rootSpanId, String endpoint, String operatio */ @Deprecated public boolean recordTraceRoot(long rootSpanId, String endpoint, int sizeLimit) { - return recordTrace0(rootSpanId, endpoint, null, sizeLimit); + return recordTrace0(rootSpanId, 0L, 0L, endpoint, null, sizeLimit); + } + + /** + * Records a blocking interval for the current span. + * @param startTicks TSC tick at block entry + * @param endTicks TSC tick at block exit + * @param spanId the span that was blocked + * @param rootSpanId the local root span ID + * @param blocker identity hash code of the blocking object + */ + public void recordTaskBlock(long startTicks, long endTicks, + long spanId, long rootSpanId, long blocker, long unblockingSpanId) { + recordTaskBlock0(startTicks, endTicks, spanId, rootSpanId, blocker, unblockingSpanId); + } + + public void parkEnter(long spanId, long rootSpanId) { + parkEnter0(spanId, rootSpanId); + } + + public void parkExit(long blocker, long unblockingSpanId) { + parkExit0(blocker, unblockingSpanId); + } + + public void recordSpanNode(long spanId, long parentSpanId, long rootSpanId, + long startNanos, long durationNanos, + int encodedOperation, int encodedResource) { + recordSpanNode0(spanId, parentSpanId, rootSpanId, startNanos, durationNanos, + encodedOperation, encodedResource); } /** @@ -279,8 +322,35 @@ public void recordQueueTime(long startTicks, Class scheduler, Class queueType, int queueLength, - Thread origin) { - recordQueueEnd0(startTicks, endTicks, task.getName(), scheduler.getName(), origin, queueType.getName(), queueLength); + Thread origin, + long submittingSpanId) { + recordQueueTime( + startTicks, endTicks, task, scheduler, queueType, queueLength, origin, submittingSpanId, 0L); + } + + /** + * @param consumingSpanIdOverride 0: use current {@code Contexts} span id; else JFR "spanId" + * (consuming) for the event + */ + public void recordQueueTime(long startTicks, + long endTicks, + Class task, + Class scheduler, + Class queueType, + int queueLength, + Thread origin, + long submittingSpanId, + long consumingSpanIdOverride) { + recordQueueEnd0( + startTicks, + endTicks, + task.getName(), + scheduler.getName(), + origin, + queueType.getName(), + queueLength, + submittingSpanId, + consumingSpanIdOverride); } /** @@ -323,7 +393,15 @@ private static ThreadContext initializeThreadContext() { private static native int getTid0(); - private static native boolean recordTrace0(long rootSpanId, String endpoint, String operation, int sizeLimit); + private static native boolean recordTrace0(long rootSpanId, long parentSpanId, long startTicks, String endpoint, String operation, int sizeLimit); + + private static native void recordTaskBlock0(long startTicks, long endTicks, long spanId, long rootSpanId, long blocker, long unblockingSpanId); + + private static native void parkEnter0(long spanId, long rootSpanId); + + private static native void parkExit0(long blocker, long unblockingSpanId); + + private static native void recordSpanNode0(long spanId, long parentSpanId, long rootSpanId, long startNanos, long durationNanos, int encodedOperation, int encodedResource); private static native int registerConstant0(String value); @@ -335,7 +413,16 @@ private static ThreadContext initializeThreadContext() { private static native void recordSettingEvent0(String name, String value, String unit); - private static native void recordQueueEnd0(long startTicks, long endTicks, String task, String scheduler, Thread origin, String queueType, int queueLength); + private static native void recordQueueEnd0( + long startTicks, + long endTicks, + String task, + String scheduler, + Thread origin, + String queueType, + int queueLength, + long submittingSpanId, + long consumingSpanIdOverride); private static native long currentTicks0(); diff --git a/ddprof-stresstest/src/jmh/java/com/datadoghq/profiler/stresstest/scenarios/throughput/SignalSuppressionBenchmark.java b/ddprof-stresstest/src/jmh/java/com/datadoghq/profiler/stresstest/scenarios/throughput/SignalSuppressionBenchmark.java new file mode 100644 index 000000000..6f580fe9e --- /dev/null +++ b/ddprof-stresstest/src/jmh/java/com/datadoghq/profiler/stresstest/scenarios/throughput/SignalSuppressionBenchmark.java @@ -0,0 +1,136 @@ +/* + * Copyright 2025, Datadog, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datadoghq.profiler.stresstest.scenarios.throughput; + +import com.datadoghq.profiler.JavaProfiler; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.LockSupport; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +/** + * Measures the impact of wall-clock signal suppression (Approach 1 + 2) on active-thread + * throughput in the presence of a large background population of parked threads. + * + *

Setup: 200 background threads each call profiler.addThread() then park indefinitely via + * LockSupport.park(), mimicking the idle-thread mix found in typical Java services. JMH worker + * threads do CPU-bound work in the foreground. + * + *

Two {@code command} params are compared: + *

    + *
  • {@code wall=1ms} — suppression on (wallprecheck=true, wallpark=true by default)
  • + *
  • {@code wall=1ms,wallprecheck=false,wallpark=false} — suppression off (baseline)
  • + *
+ * + *

Key secondary metrics (reported by {@link com.datadoghq.profiler.stresstest.WhiteboxProfiler}): + *

    + *
  • {@code wc_signals_skipped_sleeping} — increments when Approach 1 skips a sleeping thread
  • + *
  • {@code wc_signals_skipped_parked} — increments when Approach 2 skips a parked thread
  • + *
  • {@code jfr_filesize_bytes} — proxy for total samples recorded
  • + *
+ * With suppression on, skipped-signal counters should be roughly + * {@code BACKGROUND_THREADS * measurement_seconds / wall_interval_ms} while throughput (ops/s) + * should be noticeably higher due to fewer context-switches hitting the worker threads. + */ +@State(Scope.Benchmark) +public class SignalSuppressionBenchmark { + + private static final int BACKGROUND_THREADS = 200; + + /** + * Compare suppression-on vs suppression-off. + * The first param is the treatment; the second is the baseline. + */ + @Param({"wall=1ms", "wall=1ms,wallprecheck=false,wallpark=false"}) + public String command; + + @Param({"false"}) + public String skipResults; + + private final List backgroundThreads = new ArrayList<>(BACKGROUND_THREADS); + private volatile boolean running; + private CountDownLatch backgroundReady; + + @Setup(Level.Trial) + public void setup() throws InterruptedException, IOException { + running = true; + backgroundReady = new CountDownLatch(BACKGROUND_THREADS); + JavaProfiler profiler = JavaProfiler.getInstance(); + + for (int i = 0; i < BACKGROUND_THREADS; i++) { + Thread t = new Thread(() -> { + // Register with the wall-clock thread filter so the timer loop considers this thread. + profiler.addThread(); + backgroundReady.countDown(); + // Park until the trial ends — this is what we want the profiler to skip. + while (running) { + LockSupport.park(); + } + }); + t.setDaemon(true); + t.setName("suppression-sleeper-" + i); + t.start(); + backgroundThreads.add(t); + } + // Wait until all background threads have registered and parked. + backgroundReady.await(10, TimeUnit.SECONDS); + } + + @TearDown(Level.Trial) + public void teardown() { + running = false; + for (Thread t : backgroundThreads) { + t.interrupt(); + } + backgroundThreads.clear(); + } + + private long doWork(long seed) { + long result = seed; + for (int i = 0; i < 100_000; i++) { + result = (result * 1103515245L + 12345L) & 0x7fffffffL; + } + return result; + } + + @Benchmark + @BenchmarkMode(Mode.Throughput) + @Fork(value = 1, warmups = 0) + @Warmup(iterations = 2, time = 2) + @Measurement(iterations = 5, time = 5) + @Threads(8) + @OutputTimeUnit(TimeUnit.SECONDS) + public void activeThreadsWithSleepingBackground(Blackhole bh) { + bh.consume(doWork(System.nanoTime())); + } +} diff --git a/ddprof-test/src/test/java/com/datadoghq/profiler/queue/QueueTimeTest.java b/ddprof-test/src/test/java/com/datadoghq/profiler/queue/QueueTimeTest.java index f16912930..44d073065 100644 --- a/ddprof-test/src/test/java/com/datadoghq/profiler/queue/QueueTimeTest.java +++ b/ddprof-test/src/test/java/com/datadoghq/profiler/queue/QueueTimeTest.java @@ -45,12 +45,105 @@ public void run() { profiler.setContext(1, 2); long now = profiler.getCurrentTicks(); if (profiler.isThresholdExceeded(9, start, now)) { - profiler.recordQueueTime(start, now, getClass(), QueueTimeTest.class, ArrayBlockingQueue.class, 10, origin); + profiler.recordQueueTime(start, now, getClass(), QueueTimeTest.class, ArrayBlockingQueue.class, 10, origin, 0L); } profiler.clearContext(); } } + /** + * Regression test for the QueueTime field serialization order bug: submittingSpanId was + * written before writeContext(), placing it at field position 9 (where the schema expects + * the consuming spanId). This caused the backend to read completely wrong span IDs. + *

+ * Verifies that all three span-ID fields (spanId, localRootSpanId, submittingSpanId) round-trip + * correctly when they are all distinct and non-zero. + */ + @Test + public void testQueueTimeFieldOrder() throws Exception { + IAttribute submittingSpanIdAttr = attr("submittingSpanId", "", "", NUMBER); + + Thread origin = Thread.currentThread(); + origin.setName("origin-field-order"); + // Use distinct, non-zero values so any field-order swap is detectable: + // consuming spanId=7, rootSpanId=42 (set via setContext), submittingSpanId=99. + long start = profiler.getCurrentTicks(); + Runnable worker = () -> { + profiler.setContext(7, 42); + long now = profiler.getCurrentTicks(); + profiler.recordQueueTime(start, now, QueueTimeTest.class, QueueTimeTest.class, + ArrayBlockingQueue.class, 1, origin, 99L); + profiler.clearContext(); + }; + Thread thread = new Thread(worker, "destination-field-order"); + Thread.sleep(10); + thread.start(); + thread.join(); + stopProfiler(); + + IItemCollection events = verifyEvents("datadog.QueueTime"); + boolean found = false; + for (IItemIterable it : events) { + IMemberAccessor spanIdAccessor = SPAN_ID.getAccessor(it.getType()); + IMemberAccessor rootSpanIdAccessor = LOCAL_ROOT_SPAN_ID.getAccessor(it.getType()); + IMemberAccessor submittingAccessor = submittingSpanIdAttr.getAccessor(it.getType()); + for (IItem item : it) { + if (spanIdAccessor.getMember(item).longValue() == 7) { + found = true; + assertEquals(7, spanIdAccessor.getMember(item).longValue(), "spanId must be the consuming span (not the submitting span)"); + assertEquals(42, rootSpanIdAccessor.getMember(item).longValue(), "localRootSpanId must be the root"); + assertEquals(99, submittingAccessor.getMember(item).longValue(), "submittingSpanId must be the submitting span (not the root)"); + } + } + } + assertTrue(found, "Expected at least one QueueTime event with spanId=7"); + } + + /** + * When {@code consumingSpanIdOverride} is non-zero, native code must use it for JFR + * {@code spanId} instead of the active {@code Contexts} span. + */ + @Test + public void testQueueTimeConsumingSpanIdOverride() throws Exception { + Thread origin = Thread.currentThread(); + origin.setName("origin-override"); + long start = profiler.getCurrentTicks(); + Runnable worker = () -> { + profiler.setContext(1, 2); + long now = profiler.getCurrentTicks(); + long override = 88L; + profiler.recordQueueTime( + start, + now, + QueueTimeTest.class, + QueueTimeTest.class, + ArrayBlockingQueue.class, + 1, + origin, + 99L, + override); + profiler.clearContext(); + }; + Thread thread = new Thread(worker, "destination-override"); + Thread.sleep(10); + thread.start(); + thread.join(); + stopProfiler(); + + IItemCollection events = verifyEvents("datadog.QueueTime"); + boolean found = false; + for (IItemIterable it : events) { + IMemberAccessor acc = SPAN_ID.getAccessor(it.getType()); + for (IItem item : it) { + if (acc.getMember(item).longValue() == 88) { + found = true; + break; + } + } + } + assertTrue(found, "Expected QueueTime with spanId=88 from override"); + } + @Test public void testRecordQueueTime() throws Exception { Thread origin = Thread.currentThread(); diff --git a/ddprof-test/src/test/java/com/datadoghq/profiler/wallclock/CollapsingSleepTest.java b/ddprof-test/src/test/java/com/datadoghq/profiler/wallclock/CollapsingSleepTest.java index aa5d3d9fd..91614cb65 100644 --- a/ddprof-test/src/test/java/com/datadoghq/profiler/wallclock/CollapsingSleepTest.java +++ b/ddprof-test/src/test/java/com/datadoghq/profiler/wallclock/CollapsingSleepTest.java @@ -33,6 +33,7 @@ public void testSleep() { @Override protected String getProfilerCommand() { - return "wall=~1ms"; + // wallprecheck=false,wallpark=false: verify sampling still works when suppression is off + return "wall=~1ms,wallprecheck=false,wallpark=false"; } } diff --git a/ddprof-test/src/test/java/com/datadoghq/profiler/wallclock/MonitorWaitTest.java b/ddprof-test/src/test/java/com/datadoghq/profiler/wallclock/MonitorWaitTest.java new file mode 100644 index 000000000..daa87e28f --- /dev/null +++ b/ddprof-test/src/test/java/com/datadoghq/profiler/wallclock/MonitorWaitTest.java @@ -0,0 +1,264 @@ +package com.datadoghq.profiler.wallclock; + +import com.datadoghq.profiler.AbstractProfilerTest; +import com.datadoghq.profiler.Platform; +import com.datadoghq.profiler.context.Tracing; +import org.junit.jupiter.api.Assumptions; +import org.junit.jupiter.api.Test; +import org.openjdk.jmc.common.item.Attribute; +import org.openjdk.jmc.common.item.IAttribute; +import org.openjdk.jmc.common.item.IItem; +import org.openjdk.jmc.common.item.IItemCollection; +import org.openjdk.jmc.common.item.IItemIterable; +import org.openjdk.jmc.common.item.IMemberAccessor; +import org.openjdk.jmc.common.unit.IQuantity; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicLong; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.openjdk.jmc.common.unit.UnitLookup.NUMBER; + +/** + * Verifies the JVMTI MonitorWait/MonitorWaited callback implementation for JDK 8-20. + * + * These callbacks emit datadog.TaskBlock JFR events when a thread with an active span + * calls Object.wait() for longer than 1 ms. On JDK 21+, the BCI instrumentation handles + * this instead, so the JVMTI path is skipped on those versions. + * + * Addendum C: when a notifier thread still holds the monitor when the waiting thread + * re-acquires it (MonitorContendedEnter fires), the notifier's span ID is captured + * as unblockingSpanId in the TaskBlock event. + */ +public class MonitorWaitTest extends AbstractProfilerTest { + + private static final IAttribute UNBLOCKING_SPAN_ID = + Attribute.attr("unblockingSpanId", "unblockingSpanId", "Unblocking Span ID", NUMBER); + + private static final long WAIT_DURATION_MS = 100; + + // ------------------------------------------------------------------------- + // Test 1: basic emission — waiting thread with active span emits TaskBlock + // ------------------------------------------------------------------------- + + @Test + public void testWaitWithSpanEmitsTaskBlock() throws InterruptedException { + Assumptions.assumeTrue(!Platform.isJ9()); + + final Object monitor = new Object(); + final AtomicLong capturedSpanId = new AtomicLong(); + final AtomicLong capturedRootSpanId = new AtomicLong(); + final CountDownLatch waitStarted = new CountDownLatch(1); + final CountDownLatch notified = new CountDownLatch(1); + + // Thread A: waits on the monitor with an active span. + Thread waiter = new Thread(() -> { + try (Tracing.Context ctx = Tracing.newContext(() -> 0xAAAABBBBL, profiler)) { + capturedSpanId.set(ctx.getSpanId()); + capturedRootSpanId.set(ctx.getRootSpanId()); + synchronized (monitor) { + waitStarted.countDown(); + monitor.wait(WAIT_DURATION_MS * 3); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + + // Thread B: notifies after waiting thread is parked. + Thread notifier = new Thread(() -> { + try { + waitStarted.await(); + Thread.sleep(WAIT_DURATION_MS); + synchronized (monitor) { + monitor.notifyAll(); + } + notified.countDown(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + + waiter.start(); + notifier.start(); + waiter.join(5_000); + notifier.join(5_000); + stopProfiler(); + + long spanId = capturedSpanId.get(); + long rootSpanId = capturedRootSpanId.get(); + + IItemCollection blocks = verifyEvents("datadog.TaskBlock"); + assertTrue(blocks.hasItems(), "Expected at least one datadog.TaskBlock event"); + + boolean foundMatchingBlock = false; + for (IItemIterable items : blocks) { + IMemberAccessor spanIdAccessor = SPAN_ID.getAccessor(items.getType()); + IMemberAccessor rootSpanIdAccessor = LOCAL_ROOT_SPAN_ID.getAccessor(items.getType()); + if (spanIdAccessor == null) continue; + for (IItem item : items) { + if (spanIdAccessor.getMember(item).longValue() == spanId + && rootSpanIdAccessor.getMember(item).longValue() == rootSpanId) { + foundMatchingBlock = true; + } + } + } + assertTrue(foundMatchingBlock, + "No TaskBlock event matched spanId=" + spanId + " rootSpanId=" + rootSpanId); + } + + // ------------------------------------------------------------------------- + // Test 2: duration filter — wait below 1 ms threshold emits no TaskBlock + // ------------------------------------------------------------------------- + + @Test + public void testWaitBelowThresholdEmitsNoTaskBlock() throws InterruptedException { + Assumptions.assumeTrue(!Platform.isJ9()); + + final Object monitor = new Object(); + + try (Tracing.Context ctx = Tracing.newContext(() -> 0xCCCCDDDDL, profiler)) { + synchronized (monitor) { + // wait(1) — 1 ms timeout, well below the 1 ms minimum duration filter + // (the actual wait will be even shorter than 1 ms in practice). + monitor.wait(1); + } + } + + stopProfiler(); + + IItemCollection blocks = verifyEvents("datadog.TaskBlock", false); + assertFalse(blocks.hasItems(), + "Expected no TaskBlock event for a sub-threshold Object.wait()"); + } + + // ------------------------------------------------------------------------- + // Test 3: no span — wait without active span emits no TaskBlock + // ------------------------------------------------------------------------- + + @Test + public void testWaitWithoutSpanEmitsNoTaskBlock() throws InterruptedException { + Assumptions.assumeTrue(!Platform.isJ9()); + + final Object monitor = new Object(); + final CountDownLatch waitStarted = new CountDownLatch(1); + + Thread waiter = new Thread(() -> { + // No Tracing.Context — spanId is 0; callback must be a no-op. + synchronized (monitor) { + try { + waitStarted.countDown(); + monitor.wait(WAIT_DURATION_MS * 2); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + }); + + Thread notifier = new Thread(() -> { + try { + waitStarted.await(); + Thread.sleep(WAIT_DURATION_MS); + synchronized (monitor) { + monitor.notifyAll(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + + waiter.start(); + notifier.start(); + waiter.join(5_000); + notifier.join(5_000); + stopProfiler(); + + IItemCollection blocks = verifyEvents("datadog.TaskBlock", false); + assertFalse(blocks.hasItems(), + "Expected no TaskBlock event when Object.wait() is called without an active span"); + } + + // ------------------------------------------------------------------------- + // Test 4 (Addendum C): notifier holds lock after notify — unblockingSpanId captured + // ------------------------------------------------------------------------- + + @Test + public void testUnblockingSpanIdCapturedWhenNotifierHoldsLock() throws InterruptedException { + // Addendum C uses MonitorContendedEnter, which is only wired on JDK < 21 (JVMTI path). + // On JDK 21+ the BCI path always emits unblockingSpanId = 0 (notify is native). + Assumptions.assumeTrue(!Platform.isJ9() && !Platform.isJavaVersionAtLeast(21)); + + final Object monitor = new Object(); + final AtomicLong waiterSpanId = new AtomicLong(); + final AtomicLong notifierSpanId = new AtomicLong(); + final CountDownLatch waitStarted = new CountDownLatch(1); + + Thread waiter = new Thread(() -> { + try (Tracing.Context ctx = Tracing.newContext(() -> 0x1111_2222L, profiler)) { + waiterSpanId.set(ctx.getSpanId()); + synchronized (monitor) { + waitStarted.countDown(); + monitor.wait(WAIT_DURATION_MS * 5); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + + Thread notifier = new Thread(() -> { + try { + waitStarted.await(); + Thread.sleep(WAIT_DURATION_MS); + // Notifier holds the lock for 50 ms after notify() so that when the waiter + // tries to re-acquire (MonitorContendedEnter fires), the notifier is still + // the owner — Addendum C can then capture notifierSpanId as unblockingSpanId. + try (Tracing.Context ctx = Tracing.newContext(() -> 0x3333_4444L, profiler)) { + notifierSpanId.set(ctx.getSpanId()); + synchronized (monitor) { + monitor.notifyAll(); + Thread.sleep(50); // keep holding the lock after notify + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + + waiter.start(); + notifier.start(); + waiter.join(5_000); + notifier.join(5_000); + stopProfiler(); + + long spanId = waiterSpanId.get(); + long expectedUnblocking = notifierSpanId.get(); + + IItemCollection blocks = verifyEvents("datadog.TaskBlock"); + assertTrue(blocks.hasItems(), "Expected at least one datadog.TaskBlock event"); + + boolean foundWithUnblocking = false; + for (IItemIterable items : blocks) { + IMemberAccessor spanIdAccessor = SPAN_ID.getAccessor(items.getType()); + IMemberAccessor unblockingAccessor = UNBLOCKING_SPAN_ID.getAccessor(items.getType()); + if (spanIdAccessor == null || unblockingAccessor == null) continue; + for (IItem item : items) { + if (spanIdAccessor.getMember(item).longValue() == spanId) { + long unblocking = unblockingAccessor.getMember(item).longValue(); + if (unblocking == expectedUnblocking) { + foundWithUnblocking = true; + } + } + } + } + assertTrue(foundWithUnblocking, + "Expected a TaskBlock event for waiter span " + spanId + + " with unblockingSpanId=" + expectedUnblocking + + " (notifier held lock after notify — Addendum C capture)"); + } + + @Override + protected String getProfilerCommand() { + return "wall=1ms"; + } +} diff --git a/ddprof-test/src/test/java/com/datadoghq/profiler/wallclock/ParkFlagTest.java b/ddprof-test/src/test/java/com/datadoghq/profiler/wallclock/ParkFlagTest.java new file mode 100644 index 000000000..ac45d27fb --- /dev/null +++ b/ddprof-test/src/test/java/com/datadoghq/profiler/wallclock/ParkFlagTest.java @@ -0,0 +1,144 @@ +package com.datadoghq.profiler.wallclock; + +import com.datadoghq.profiler.AbstractProfilerTest; +import com.datadoghq.profiler.Platform; +import com.datadoghq.profiler.context.Tracing; +import org.junit.jupiter.api.Assumptions; +import org.junit.jupiter.api.Test; +import org.openjdk.jmc.common.item.IItem; +import org.openjdk.jmc.common.item.IItemCollection; +import org.openjdk.jmc.common.item.IItemIterable; +import org.openjdk.jmc.common.item.IMemberAccessor; +import org.openjdk.jmc.common.unit.IQuantity; +import org.openjdk.jmc.common.unit.UnitLookup; + +import java.util.Map; +import java.util.concurrent.locks.LockSupport; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Verifies Approach 2: profiler.parkEnter/parkExit set an atomic flag in ProfiledThread that + * suppresses SIGVTALRM signals during the parked interval, and emit a datadog.TaskBlock JFR + * event with accurate timing when a trace span is active. + */ +public class ParkFlagTest extends AbstractProfilerTest { + + private static final IQuantity NANOSECOND = UnitLookup.NANOSECOND.quantity(1); + private static final long PARK_DURATION_NS = 50_000_000L; // 50 ms + private static final long MIN_DURATION_NS = 1_000_000L; // wallparkmin default: 1 ms + + @Test + public void testParkFlagSuppressesSignals() { + Assumptions.assumeTrue(!Platform.isJ9()); + registerCurrentThreadForWallClockProfiling(); + + // Call parkEnter/parkExit directly (BCI is in dd-trace-java; here we test the native layer). + profiler.parkEnter(0L, 0L); + LockSupport.parkNanos(300_000_000L); // 300 ms park + profiler.parkExit(0L, 0L); + + stopProfiler(); + + // With the park flag set, the timer thread skips SIGVTALRM for the parked interval. + long sampleCount = verifyEvents("datadog.MethodSample", false) + .getAggregate(org.openjdk.jmc.common.item.Aggregators.count()).longValue(); + assertTrue(sampleCount < 10, + "Expected nearly no MethodSample events while park flag is set, got: " + sampleCount); + + Map counters = profiler.getDebugCounters(); + if (counters.containsKey("wc_signals_skipped_parked")) { + assertTrue(counters.get("wc_signals_skipped_parked") > 0, + "wc_signals_skipped_parked should be > 0 during a 300 ms park"); + } + } + + @Test + public void testParkWithSpanEmitsTaskBlockEvent() { + Assumptions.assumeTrue(!Platform.isJ9()); + registerCurrentThreadForWallClockProfiling(); + + long spanId; + long rootSpanId; + long beforeParkNs = System.nanoTime(); + + try (Tracing.Context ctx = Tracing.newContext(() -> 0xCAFEBABEL, profiler)) { + spanId = ctx.getSpanId(); + rootSpanId = ctx.getRootSpanId(); + + profiler.parkEnter(spanId, rootSpanId); + LockSupport.parkNanos(PARK_DURATION_NS); + profiler.parkExit(0L, 0L); + } + + long afterParkNs = System.nanoTime(); + stopProfiler(); + + // Verify a datadog.TaskBlock event was emitted with correct span IDs and duration >= 50 ms. + IItemCollection blocks = verifyEvents("datadog.TaskBlock"); + assertTrue(blocks.hasItems(), "Expected at least one datadog.TaskBlock event"); + + boolean foundMatchingBlock = false; + for (IItemIterable items : blocks) { + IMemberAccessor spanIdAccessor = SPAN_ID.getAccessor(items.getType()); + IMemberAccessor rootSpanIdAccessor = LOCAL_ROOT_SPAN_ID.getAccessor(items.getType()); + for (IItem item : items) { + long eventSpanId = spanIdAccessor.getMember(item).longValue(); + long eventRootSpanId = rootSpanIdAccessor.getMember(item).longValue(); + if (eventSpanId == spanId && eventRootSpanId == rootSpanId) { + foundMatchingBlock = true; + } + } + } + assertTrue(foundMatchingBlock, + "No TaskBlock event matched spanId=" + spanId + " rootSpanId=" + rootSpanId); + } + + @Test + public void testParkBelowThresholdEmitsNoTaskBlock() { + Assumptions.assumeTrue(!Platform.isJ9()); + registerCurrentThreadForWallClockProfiling(); + + try (Tracing.Context ctx = Tracing.newContext(() -> 0xDEADBEEFL, profiler)) { + long spanId = ctx.getSpanId(); + long rootSpanId = ctx.getRootSpanId(); + + // Park for 0.1 ms — below the default 1 ms threshold (wallparkmin=1000000). + profiler.parkEnter(spanId, rootSpanId); + LockSupport.parkNanos(100_000L); + profiler.parkExit(0L, 0L); + } + + stopProfiler(); + + // Duration is below threshold, so no TaskBlock should be emitted. + IItemCollection blocks = verifyEvents("datadog.TaskBlock", false); + assertFalse(blocks.hasItems(), + "Expected no TaskBlock event for a sub-threshold park (0.1 ms < 1 ms threshold)"); + } + + @Test + public void testParkWithoutSpanEmitsNoTaskBlock() { + Assumptions.assumeTrue(!Platform.isJ9()); + registerCurrentThreadForWallClockProfiling(); + + // spanId = 0 means no active trace — signal suppression still works but no JFR event. + profiler.parkEnter(0L, 0L); + LockSupport.parkNanos(PARK_DURATION_NS); + profiler.parkExit(0L, 0L); + + stopProfiler(); + + IItemCollection blocks = verifyEvents("datadog.TaskBlock", false); + assertFalse(blocks.hasItems(), + "Expected no TaskBlock event when parked without a trace span (spanId=0)"); + } + + @Override + protected String getProfilerCommand() { + // wallprecheck=false so only the park flag (Approach 2) is exercised in isolation. + // wallpark=true (default) enables Approach 2. + return "wall=1ms,wallprecheck=false"; + } +} diff --git a/ddprof-test/src/test/java/com/datadoghq/profiler/wallclock/PrecheckTest.java b/ddprof-test/src/test/java/com/datadoghq/profiler/wallclock/PrecheckTest.java new file mode 100644 index 000000000..120637707 --- /dev/null +++ b/ddprof-test/src/test/java/com/datadoghq/profiler/wallclock/PrecheckTest.java @@ -0,0 +1,51 @@ +package com.datadoghq.profiler.wallclock; + +import com.datadoghq.profiler.AbstractProfilerTest; +import com.datadoghq.profiler.Platform; +import org.junit.jupiter.api.Assumptions; +import org.junit.jupiter.api.Test; +import org.openjdk.jmc.common.item.Aggregators; + +import java.util.Map; +import java.util.concurrent.locks.LockSupport; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Verifies Approach 1: the timer loop reads osThreadState() before sending SIGVTALRM and + * skips threads in SLEEPING/MONITOR_WAIT/CONDVAR_WAIT/OBJECT_WAIT states (wallprecheck=true, + * the default). A thread parked for 300 ms at a 1 ms interval should receive nearly zero + * signals, while SleepTest (wallprecheck=false) confirms sampling still works when disabled. + */ +public class PrecheckTest extends AbstractProfilerTest { + + @Test + public void testSleepingThreadIsNotSampled() { + Assumptions.assumeTrue(!Platform.isJ9()); + registerCurrentThreadForWallClockProfiling(); + + // Park for 300 ms — enough for ~300 timer ticks at 1 ms interval. + LockSupport.parkNanos(300_000_000L); + + stopProfiler(); + + // The timer thread reads osThreadState() and skips the SIGVTALRM for sleeping threads, + // so sample count should be near zero (a handful may slip through at park entry/exit). + long sampleCount = verifyEvents("datadog.MethodSample", false) + .getAggregate(Aggregators.count()).longValue(); + assertTrue(sampleCount < 10, + "Expected nearly no MethodSample events for a sleeping thread with wallprecheck=true, got: " + sampleCount); + + // Confirm the suppression counter incremented (only available in COUNTERS-enabled builds). + Map counters = profiler.getDebugCounters(); + if (counters.containsKey("wc_signals_skipped_sleeping")) { + assertTrue(counters.get("wc_signals_skipped_sleeping") > 0, + "wc_signals_skipped_sleeping should be > 0 for a 300 ms park"); + } + } + + @Override + protected String getProfilerCommand() { + return "wall=1ms"; // wallprecheck=true by default + } +} diff --git a/ddprof-test/src/test/java/com/datadoghq/profiler/wallclock/SleepTest.java b/ddprof-test/src/test/java/com/datadoghq/profiler/wallclock/SleepTest.java index 5cc63689e..016d8c7cb 100644 --- a/ddprof-test/src/test/java/com/datadoghq/profiler/wallclock/SleepTest.java +++ b/ddprof-test/src/test/java/com/datadoghq/profiler/wallclock/SleepTest.java @@ -26,6 +26,7 @@ public void testSleep() { @Override protected String getProfilerCommand() { - return "wall=1ms"; + // wallprecheck=false,wallpark=false: verify sampling still works when suppression is off + return "wall=1ms,wallprecheck=false,wallpark=false"; } }