From 323eed7a9347e89aaa5bd952e507c490677e9faa Mon Sep 17 00:00:00 2001 From: Paul Fournillon Date: Fri, 24 Apr 2026 10:24:56 +0000 Subject: [PATCH 01/14] feat(thread-filter): add VMThread* slot to ThreadFilter for OS-state reads --- ddprof-lib/src/main/cpp/threadFilter.cpp | 34 ++++++++++++++++++++++++ ddprof-lib/src/main/cpp/threadFilter.h | 17 +++++++++--- 2 files changed, 48 insertions(+), 3 deletions(-) diff --git a/ddprof-lib/src/main/cpp/threadFilter.cpp b/ddprof-lib/src/main/cpp/threadFilter.cpp index 77e6dfb53..77aff3c78 100644 --- a/ddprof-lib/src/main/cpp/threadFilter.cpp +++ b/ddprof-lib/src/main/cpp/threadFilter.cpp @@ -21,6 +21,7 @@ #include "threadFilter.h" #include "arch.h" +#include "hotspot/vmStructs.h" #include "os.h" #include #include @@ -284,6 +285,39 @@ void ThreadFilter::collect(std::vector& tids) const { } } +void ThreadFilter::setVMThread(SlotID slot_id, VMThread* vm_thread) { + if (slot_id < 0) return; + int chunk_idx = slot_id >> kChunkShift; + int slot_idx = slot_id & kChunkMask; + ChunkStorage* chunk = _chunks[chunk_idx].load(std::memory_order_acquire); + if (chunk != nullptr) { + chunk->slots[slot_idx].vm_thread.store(vm_thread, std::memory_order_release); + } +} + +void ThreadFilter::collectWithState(std::vector& entries) const { + entries.clear(); + entries.reserve(512); + + int num_chunks = _num_chunks.load(std::memory_order_relaxed); + for (int chunk_idx = 0; chunk_idx < num_chunks; ++chunk_idx) { + ChunkStorage* chunk = _chunks[chunk_idx].load(std::memory_order_acquire); + if (chunk == nullptr) continue; + + for (const auto& slot : chunk->slots) { + int slot_tid = slot.value.load(std::memory_order_acquire); + if (slot_tid != -1) { + VMThread* vm = slot.vm_thread.load(std::memory_order_acquire); + entries.push_back({slot_tid, vm}); + } + } + } + + if (entries.capacity() > entries.size() * 2) { + entries.shrink_to_fit(); + } +} + void ThreadFilter::init(const char* filter) { // Simple logic: any filter value (including "0") enables filtering // Only explicitly registered threads via addThread() will be sampled diff --git a/ddprof-lib/src/main/cpp/threadFilter.h b/ddprof-lib/src/main/cpp/threadFilter.h index 2440d4351..dd5b878cb 100644 --- a/ddprof-lib/src/main/cpp/threadFilter.h +++ b/ddprof-lib/src/main/cpp/threadFilter.h @@ -24,6 +24,13 @@ #include "arch.h" +class VMThread; + +struct ThreadEntry { + int tid; + VMThread* vm_thread; +}; + class ThreadFilter { public: using SlotID = int; @@ -48,15 +55,19 @@ class ThreadFilter { void add(int tid, SlotID slot_id); void remove(SlotID slot_id); void collect(std::vector& tids) const; + void setVMThread(SlotID slot_id, VMThread* vm_thread); + void collectWithState(std::vector& entries) const; SlotID registerThread(); void unregisterThread(SlotID slot_id); private: - // Optimized slot structure with padding to avoid false sharing + // Optimized slot structure with padding to avoid false sharing. + // Pointers are placed before the int to avoid implicit alignment padding between them. struct alignas(DEFAULT_CACHE_LINE_SIZE) Slot { - std::atomic value{-1}; - char padding[DEFAULT_CACHE_LINE_SIZE - sizeof(value)]; // Pad to cache line size + std::atomic vm_thread{nullptr}; // 8 bytes + std::atomic value{-1}; // 4 bytes + char padding[DEFAULT_CACHE_LINE_SIZE - sizeof(vm_thread) - sizeof(value)]; }; static_assert(sizeof(Slot) == DEFAULT_CACHE_LINE_SIZE, "Slot must be exactly one cache line"); From f80802bb45fc8333e4cbd5f09587f5d5941781b7 Mon Sep 17 00:00:00 2001 From: Paul Fournillon Date: Fri, 24 Apr 2026 10:26:15 +0000 Subject: [PATCH 02/14] feat(wall): register VMThread* in ThreadFilter at thread start and end --- ddprof-lib/src/main/cpp/arguments.cpp | 5 ++++ ddprof-lib/src/main/cpp/arguments.h | 2 ++ ddprof-lib/src/main/cpp/counters.h | 1 + ddprof-lib/src/main/cpp/event.h | 7 ++++- ddprof-lib/src/main/cpp/javaApi.cpp | 11 +++++--- ddprof-lib/src/main/cpp/profiler.cpp | 4 +++ ddprof-lib/src/main/cpp/wallClock.cpp | 37 ++++++++++++++++----------- ddprof-lib/src/main/cpp/wallClock.h | 3 ++- 8 files changed, 49 insertions(+), 21 deletions(-) diff --git a/ddprof-lib/src/main/cpp/arguments.cpp b/ddprof-lib/src/main/cpp/arguments.cpp index 4436e80a8..37c5bac30 100644 --- a/ddprof-lib/src/main/cpp/arguments.cpp +++ b/ddprof-lib/src/main/cpp/arguments.cpp @@ -364,6 +364,11 @@ Error Arguments::parse(const char *args) { _remote_symbolication = true; } + CASE("wallprecheck") + if (value != NULL) { + _wall_precheck = strcmp(value, "false") != 0 && strcmp(value, "0") != 0; + } + CASE("wallsampler") if (value != NULL) { switch (value[0]) { diff --git a/ddprof-lib/src/main/cpp/arguments.h b/ddprof-lib/src/main/cpp/arguments.h index 2d400f213..2b413b93b 100644 --- a/ddprof-lib/src/main/cpp/arguments.h +++ b/ddprof-lib/src/main/cpp/arguments.h @@ -168,6 +168,7 @@ class Arguments { long _cpu; long _wall; bool _wall_collapsing; + bool _wall_precheck; int _wall_threads_per_tick; WallclockSampler _wallclock_sampler; long _memory; @@ -204,6 +205,7 @@ class Arguments { _cpu(-1), _wall(-1), _wall_collapsing(false), + _wall_precheck(true), _wall_threads_per_tick(DEFAULT_WALL_THREADS_PER_TICK), _wallclock_sampler(ASGCT), _memory(-1), diff --git a/ddprof-lib/src/main/cpp/counters.h b/ddprof-lib/src/main/cpp/counters.h index 7ae44f9bc..355f810bb 100644 --- a/ddprof-lib/src/main/cpp/counters.h +++ b/ddprof-lib/src/main/cpp/counters.h @@ -60,6 +60,7 @@ X(AGCT_NATIVE_NO_JAVA_CONTEXT, "agct_native_no_java_context") \ X(AGCT_BLOCKED_IN_VM, "agct_blocked_in_vm") \ X(SKIPPED_WALLCLOCK_UNWINDS, "skipped_wallclock_unwinds") \ + X(WC_SIGNAL_SKIPPED_SLEEPING, "wc_signals_skipped_sleeping") \ X(UNWINDING_TIME_ASYNC, "unwinding_ticks_async") \ X(UNWINDING_TIME_JVMTI, "unwinding_ticks_jvmti") \ X(CALLTRACE_STORAGE_DROPPED, "calltrace_storage_dropped_traces") \ diff --git a/ddprof-lib/src/main/cpp/event.h b/ddprof-lib/src/main/cpp/event.h index 752db842d..ff6c56ecc 100644 --- a/ddprof-lib/src/main/cpp/event.h +++ b/ddprof-lib/src/main/cpp/event.h @@ -109,12 +109,13 @@ class WallClockEpochEvent { u32 _num_failed_samples; u32 _num_exited_threads; u32 _num_permission_denied; + u32 _num_skipped_sleeping; WallClockEpochEvent(u64 start_time) : _dirty(false), _start_time(start_time), _duration_millis(0), _num_samplable_threads(0), _num_successful_samples(0), _num_failed_samples(0), _num_exited_threads(0), - _num_permission_denied(0) {} + _num_permission_denied(0), _num_skipped_sleeping(0) {} bool hasChanged() { return _dirty; } @@ -153,6 +154,10 @@ class WallClockEpochEvent { } } + void updateNumSkippedSleeping(u32 n) { + if (_num_skipped_sleeping != n) { _dirty = true; _num_skipped_sleeping = n; } + } + void endEpoch(u64 millis) { _duration_millis = millis; } void clean() { _dirty = false; } diff --git a/ddprof-lib/src/main/cpp/javaApi.cpp b/ddprof-lib/src/main/cpp/javaApi.cpp index 76cdb104e..0687323c5 100644 --- a/ddprof-lib/src/main/cpp/javaApi.cpp +++ b/ddprof-lib/src/main/cpp/javaApi.cpp @@ -24,7 +24,7 @@ #include "counters.h" #include "common.h" #include "engine.h" -#include "hotspot/vmStructs.h" +#include "hotspot/vmStructs.inline.h" #include "incbin.h" #include "jvmThread.h" #include "os.h" @@ -150,15 +150,18 @@ JavaCritical_com_datadoghq_profiler_JavaProfiler_filterThreadAdd0() { int slot_id = current->filterSlotId(); if (unlikely(slot_id == -1)) { - // Thread doesn't have a slot ID yet (e.g., main thread), so register it - // Happens when we are not enabled before thread start + // Thread doesn't have a slot ID yet (e.g., main thread or profiler started + // after thread creation). Register now. slot_id = thread_filter->registerThread(); current->setFilterSlotId(slot_id); } - + if (unlikely(slot_id == -1)) { return; // Failed to register thread } + // Refresh HotSpot VMThread* for wall thread-filter precheck (vmStructs OS state). + // HotSpot only: VMThread::current() asserts VM::isHotspot(). OpenJ9/Zing: leave null. + thread_filter->setVMThread(slot_id, VM::isHotspot() ? VMThread::current() : nullptr); thread_filter->add(tid, slot_id); } diff --git a/ddprof-lib/src/main/cpp/profiler.cpp b/ddprof-lib/src/main/cpp/profiler.cpp index c70b5df17..0e3a81302 100644 --- a/ddprof-lib/src/main/cpp/profiler.cpp +++ b/ddprof-lib/src/main/cpp/profiler.cpp @@ -75,6 +75,8 @@ void Profiler::onThreadStart(jvmtiEnv *jvmti, JNIEnv *jni, jthread thread) { if (_thread_filter.enabled()) { int slot_id = _thread_filter.registerThread(); current->setFilterSlotId(slot_id); + // VMThread / vmStructs are HotSpot-only; VMThread::current() asserts VM::isHotspot(). + _thread_filter.setVMThread(slot_id, VM::isHotspot() ? VMThread::current() : nullptr); _thread_filter.remove(slot_id); // Remove from filtering initially } if (thread != NULL) { @@ -95,6 +97,7 @@ void Profiler::onThreadEnd(jvmtiEnv *jvmti, JNIEnv *jni, jthread thread) { tid = current->tid(); if (_thread_filter.enabled()) { + _thread_filter.setVMThread(slot_id, nullptr); _thread_filter.unregisterThread(slot_id); current->setFilterSlotId(-1); } @@ -1136,6 +1139,7 @@ Error Profiler::start(Arguments &args, bool reset) { assert(current != nullptr); int slot_id = _thread_filter.registerThread(); current->setFilterSlotId(slot_id); + _thread_filter.setVMThread(slot_id, VM::isHotspot() ? VMThread::current() : nullptr); _thread_filter.remove(slot_id); // Remove from filtering initially (matches onThreadStart behavior) } diff --git a/ddprof-lib/src/main/cpp/wallClock.cpp b/ddprof-lib/src/main/cpp/wallClock.cpp index a95aef42d..e3e40c5cb 100644 --- a/ddprof-lib/src/main/cpp/wallClock.cpp +++ b/ddprof-lib/src/main/cpp/wallClock.cpp @@ -165,6 +165,7 @@ bool BaseWallClock::isEnabled() const { void WallClockASGCT::initialize(Arguments& args) { _collapsing = args._wall_collapsing; + _precheck = args._wall_precheck; // J9WallClock uses JVMTI GetAllStackTracesExtended polling, not SIGVTALRM // signals — it has no sharedSignalHandler and needs no signal-origin gate. // Engines are started sequentially; this call is idempotent (no-op after first). @@ -173,41 +174,47 @@ void WallClockASGCT::initialize(Arguments& args) { } void WallClockASGCT::timerLoop() { - // todo: re-allocating the vector every time is not efficient - auto collectThreads = [&](std::vector& tids) { - // Get thread IDs from the filter if it's enabled - // Otherwise list all threads in the system + auto collectThreads = [&](std::vector& entries) { if (Profiler::instance()->threadFilter()->enabled()) { - Profiler::instance()->threadFilter()->collect(tids); + Profiler::instance()->threadFilter()->collectWithState(entries); } else { ThreadList *thread_list = OS::listThreads(); while (thread_list->hasNext()) { int tid = thread_list->next(); - // Don't include the current thread if (tid != OS::threadId()) { - tids.push_back(tid); + entries.push_back({tid, nullptr}); } } delete thread_list; } }; - auto sampleThreads = [&](int tid, int& num_failures, int& threads_already_exited, int& permission_denied) { - // Deliver SIGVTALRM with a cookie so our handler can distinguish this - // signal from any other in-process sender (see signalCookie.h / - // wallClock.cpp sharedSignalHandler). - if (!OS::sendSignalWithCookie(tid, SIGVTALRM, SignalCookie::wallclock())) { + auto sampleThreads = [&](ThreadEntry entry, int& num_failures, int& threads_already_exited, int& permission_denied) { + if (_precheck && entry.vm_thread != nullptr) { + OSThreadState state = entry.vm_thread->osThreadState(); + // SLEEPING: Thread.sleep() on JDK < 21. + // CONDVAR_WAIT: Thread.sleep() on JDK 21+ (switched from OSThreadSleepState + // to OSThreadWaitState(false) in JVM_Sleep). Both states represent pure + // time-based sleeping with no useful profiling signal. + if (state == OSThreadState::SLEEPING || state == OSThreadState::CONDVAR_WAIT) { + Counters::increment(WC_SIGNAL_SKIPPED_SLEEPING); + return false; + } + } + + if (!OS::sendSignalToThread(entry.tid, SIGVTALRM)) { num_failures++; if (errno != 0) { if (errno == ESRCH) { - threads_already_exited++; + threads_already_exited++; } else if (errno == EPERM) { permission_denied++; } else if (errno == EAGAIN) { // Signal queue limit (RLIMIT_SIGPENDING) reached; signal was not // delivered — count as missed sample. + permission_denied++; } else { - Log::debug("unexpected error %s", strerror(errno)); + Log::debug("unexpected error %s", strerror(errno)); } } return false; @@ -218,5 +225,5 @@ void WallClockASGCT::timerLoop() { auto doNothing = []() { }; - timerLoopCommon(collectThreads, sampleThreads, doNothing, _reservoir_size, _interval); + timerLoopCommon(collectThreads, sampleThreads, doNothing, _reservoir_size, _interval); } diff --git a/ddprof-lib/src/main/cpp/wallClock.h b/ddprof-lib/src/main/cpp/wallClock.h index 89338fbaf..ffe3523e8 100644 --- a/ddprof-lib/src/main/cpp/wallClock.h +++ b/ddprof-lib/src/main/cpp/wallClock.h @@ -141,6 +141,7 @@ class BaseWallClock : public Engine { class WallClockASGCT : public BaseWallClock { private: bool _collapsing; + bool _precheck; static bool inSyscall(void* ucontext); @@ -151,7 +152,7 @@ class WallClockASGCT : public BaseWallClock { void timerLoop() override; public: - WallClockASGCT() : BaseWallClock(), _collapsing(false) {} + WallClockASGCT() : BaseWallClock(), _collapsing(false), _precheck(true) {} const char* name() override { return "WallClock (ASGCT)"; } From 10129d2d66f177445eadf58ec0671e9a06f40bdb Mon Sep 17 00:00:00 2001 From: Paul Fournillon Date: Fri, 24 Apr 2026 10:28:36 +0000 Subject: [PATCH 03/14] test(wall): add PrecheckTest and PrecheckEfficiencyTest; disable precheck for Thread.sleep tests --- ddprof-lib/src/main/cpp/wallClock.cpp | 2 +- .../profiler/context/TagContextTest.java | 4 +- .../wallclock/CollapsingSleepTest.java | 2 +- .../wallclock/ContextWallClockTest.java | 2 +- .../wallclock/PrecheckEfficiencyTest.java | 346 ++++++++++++++++++ .../profiler/wallclock/PrecheckTest.java | 56 +++ .../profiler/wallclock/SleepTest.java | 2 +- .../wallclock/WallClockThreadFilterTest.java | 5 +- 8 files changed, 413 insertions(+), 6 deletions(-) create mode 100644 ddprof-test/src/test/java/com/datadoghq/profiler/wallclock/PrecheckEfficiencyTest.java create mode 100644 ddprof-test/src/test/java/com/datadoghq/profiler/wallclock/PrecheckTest.java diff --git a/ddprof-lib/src/main/cpp/wallClock.cpp b/ddprof-lib/src/main/cpp/wallClock.cpp index e3e40c5cb..ac027faa6 100644 --- a/ddprof-lib/src/main/cpp/wallClock.cpp +++ b/ddprof-lib/src/main/cpp/wallClock.cpp @@ -202,7 +202,7 @@ void WallClockASGCT::timerLoop() { } } - if (!OS::sendSignalToThread(entry.tid, SIGVTALRM)) { + if (!OS::sendSignalWithCookie(entry.tid, SIGVTALRM, SignalCookie::wallclock())) { num_failures++; if (errno != 0) { if (errno == ESRCH) { diff --git a/ddprof-test/src/test/java/com/datadoghq/profiler/context/TagContextTest.java b/ddprof-test/src/test/java/com/datadoghq/profiler/context/TagContextTest.java index 9b8fe4404..8d2ba2db9 100644 --- a/ddprof-test/src/test/java/com/datadoghq/profiler/context/TagContextTest.java +++ b/ddprof-test/src/test/java/com/datadoghq/profiler/context/TagContextTest.java @@ -262,6 +262,8 @@ private void checkTagValues(ContextSetter contextSetter, String contextAttribute @Override protected String getProfilerCommand() { - return "wall=1ms,filter=0,attributes=tag1;tag2;tag3"; + // wallprecheck=false: work() calls Thread.sleep() while a context tag is set; + // the precheck would suppress signals during sleep and lose the tagged samples. + return "wall=1ms,filter=0,attributes=tag1;tag2;tag3,wallprecheck=false"; } } diff --git a/ddprof-test/src/test/java/com/datadoghq/profiler/wallclock/CollapsingSleepTest.java b/ddprof-test/src/test/java/com/datadoghq/profiler/wallclock/CollapsingSleepTest.java index ff362085f..c81c67425 100644 --- a/ddprof-test/src/test/java/com/datadoghq/profiler/wallclock/CollapsingSleepTest.java +++ b/ddprof-test/src/test/java/com/datadoghq/profiler/wallclock/CollapsingSleepTest.java @@ -33,6 +33,6 @@ public void testSleep() { @Override protected String getProfilerCommand() { - return "wall=~1ms"; + return "wall=~1ms,wallprecheck=false"; } } diff --git a/ddprof-test/src/test/java/com/datadoghq/profiler/wallclock/ContextWallClockTest.java b/ddprof-test/src/test/java/com/datadoghq/profiler/wallclock/ContextWallClockTest.java index 41a1ac606..c84a1716b 100644 --- a/ddprof-test/src/test/java/com/datadoghq/profiler/wallclock/ContextWallClockTest.java +++ b/ddprof-test/src/test/java/com/datadoghq/profiler/wallclock/ContextWallClockTest.java @@ -38,6 +38,6 @@ public void test(@CStack String cstack) throws ExecutionException, InterruptedEx @Override protected String getProfilerCommand() { - return "wall=1ms,filter=0,loglevel=warn"; + return "wall=1ms,filter=0,loglevel=warn,wallprecheck=false"; } } diff --git a/ddprof-test/src/test/java/com/datadoghq/profiler/wallclock/PrecheckEfficiencyTest.java b/ddprof-test/src/test/java/com/datadoghq/profiler/wallclock/PrecheckEfficiencyTest.java new file mode 100644 index 000000000..1c5ff442b --- /dev/null +++ b/ddprof-test/src/test/java/com/datadoghq/profiler/wallclock/PrecheckEfficiencyTest.java @@ -0,0 +1,346 @@ +package com.datadoghq.profiler.wallclock; + +import com.datadoghq.profiler.AbstractProfilerTest; +import com.datadoghq.profiler.Platform; +import org.junit.jupiter.api.Assumptions; +import org.junit.jupiter.api.Test; +import org.openjdk.jmc.common.item.IItem; +import org.openjdk.jmc.common.item.IItemCollection; +import org.openjdk.jmc.common.item.IItemIterable; +import org.openjdk.jmc.common.item.IMemberAccessor; +import org.openjdk.jmc.flightrecorder.jdk.JdkAttributes; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.LockSupport; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Measures how many signals each precheck strategy would suppress for a workload + * containing threads in each blocked state. Runs with wallprecheck=false so all + * samples are collected; the distribution reveals the theoretical suppression rate. + * + * Current precheck (SLEEPING only): suppresses only Thread.sleep samples. + * Aggressive precheck (all blocked states): would also suppress LockSupport.park and Object.wait. + * + *

Sample classification prefers Java thread name ({@code EVENT_THREAD_NAME}) for the fixed worker + * threads when present, then JFR thread state, then stack strings — some JVMs (e.g. Graal on aarch64) + * omit SLEEPING / {@code Thread.sleep} in state or stacks while samples still hit the worker. + */ +public class PrecheckEfficiencyTest extends AbstractProfilerTest { + + private static final String EFFICIENCY_SLEEPING = "efficiency-sleeping"; + private static final String EFFICIENCY_PARKED = "efficiency-parked"; + private static final String EFFICIENCY_WAITING = "efficiency-waiting"; + private static final String EFFICIENCY_WORKING = "efficiency-working"; + + @Test + public void compareSuppressionRates() throws Exception { + Assumptions.assumeTrue(!Platform.isJ9()); + + CountDownLatch ready = new CountDownLatch(4); + AtomicBoolean stop = new AtomicBoolean(false); + Object monitor = new Object(); + + // Thread in SLEEPING state (Thread.sleep) — suppressed by both precheck variants + Thread sleeping = new Thread(() -> { + registerCurrentThreadForWallClockProfiling(); + ready.countDown(); + try { Thread.sleep(10_000); } catch (InterruptedException ignored) {} + }, EFFICIENCY_SLEEPING); + + // Thread in CONDVAR_WAIT state (LockSupport.parkNanos) — suppressed by old precheck only + Thread parked = new Thread(() -> { + registerCurrentThreadForWallClockProfiling(); + ready.countDown(); + LockSupport.parkNanos(10_000_000_000L); + }, EFFICIENCY_PARKED); + + // Thread in OBJECT_WAIT state (Object.wait) — suppressed by old precheck only + Thread waiting = new Thread(() -> { + registerCurrentThreadForWallClockProfiling(); + ready.countDown(); + synchronized (monitor) { + try { monitor.wait(10_000); } catch (InterruptedException ignored) {} + } + }, EFFICIENCY_WAITING); + + // Thread in RUNNABLE state (CPU spin) — not suppressed by either precheck + Thread working = new Thread(() -> { + registerCurrentThreadForWallClockProfiling(); + ready.countDown(); + long x = 0; + while (!stop.get()) { x++; } + }, EFFICIENCY_WORKING); + + sleeping.setDaemon(true); + parked.setDaemon(true); + waiting.setDaemon(true); + working.setDaemon(true); + + sleeping.start(); + parked.start(); + waiting.start(); + working.start(); + + ready.await(); + Thread.sleep(500); + + stop.set(true); + sleeping.interrupt(); + LockSupport.unpark(parked); + synchronized (monitor) { monitor.notifyAll(); } + + sleeping.join(1000); + parked.join(1000); + waiting.join(1000); + working.join(1000); + + stopProfiler(); + + IItemCollection events = verifyEvents("datadog.MethodSample", false); + + long sleepSamples = 0, parkSamples = 0, objectWaitSamples = 0, runnableSamples = 0; + + for (IItemIterable batch : events) { + IMemberAccessor stackAccessor = JdkAttributes.STACK_TRACE_STRING.getAccessor(batch.getType()); + IMemberAccessor stateAccessor = THREAD_STATE.getAccessor(batch.getType()); + IMemberAccessor threadNameAccessor = + JdkAttributes.EVENT_THREAD_NAME.getAccessor(batch.getType()); + if (stackAccessor == null && stateAccessor == null && threadNameAccessor == null) { + continue; + } + for (IItem item : batch) { + if (threadNameAccessor != null) { + String threadName = threadNameAccessor.getMember(item); + if (EFFICIENCY_SLEEPING.equals(threadName)) { + sleepSamples++; + continue; + } + if (EFFICIENCY_PARKED.equals(threadName)) { + parkSamples++; + continue; + } + if (EFFICIENCY_WAITING.equals(threadName)) { + objectWaitSamples++; + continue; + } + if (EFFICIENCY_WORKING.equals(threadName)) { + runnableSamples++; + continue; + } + } + String state = stateAccessor != null ? stateAccessor.getMember(item) : null; + // Native OSThreadState is written as jdk.types.ThreadState; CONDVAR_WAIT → "PARKED" + // in JFR metadata (flightRecorder.cpp writeThreadStates). Prefer state over stacks: + // stacks often omit LockSupport/Unsafe frames after inlining. + if (state != null && !state.isEmpty()) { + switch (state) { + case "SLEEPING": + sleepSamples++; + continue; + case "PARKED": + parkSamples++; + continue; + case "WAITING": + objectWaitSamples++; + continue; + default: + break; + } + } + String stack = stackAccessor != null ? stackAccessor.getMember(item) : null; + if (stack != null && (stack.contains("Thread.sleep") || stack.contains("sleep0"))) { + sleepSamples++; + } else if (stack != null && (stack.contains("LockSupport.park") || stack.contains("Unsafe.park") + || stack.contains("parkNanos"))) { + parkSamples++; + } else if (stack != null && (stack.contains("Object.wait") || stack.contains("wait0"))) { + objectWaitSamples++; + } else { + runnableSamples++; + } + } + } + + long total = sleepSamples + parkSamples + objectWaitSamples + runnableSamples; + if (total == 0) { + System.out.println("No samples collected — skipping efficiency report"); + return; + } + + double pctSleep = 100.0 * sleepSamples / total; + double pctPark = 100.0 * parkSamples / total; + double pctObjectWait = 100.0 * objectWaitSamples / total; + double pctRunnable = 100.0 * runnableSamples / total; + + double newPrecheckSuppression = pctSleep; + double oldPrecheckSuppression = pctSleep + pctPark + pctObjectWait; + + System.out.printf("%nPrecheck efficiency report (wallprecheck=false baseline, %d total samples):%n", total); + System.out.printf(" SLEEPING (Thread.sleep): %4d samples (%5.1f%%)%n", sleepSamples, pctSleep); + System.out.printf(" CONDVAR_WAIT (LockSupport.park): %4d samples (%5.1f%%)%n", parkSamples, pctPark); + System.out.printf(" OBJECT_WAIT (Object.wait): %4d samples (%5.1f%%)%n", objectWaitSamples, pctObjectWait); + System.out.printf(" RUNNABLE / other: %4d samples (%5.1f%%)%n", runnableSamples, pctRunnable); + System.out.printf("Current precheck (SLEEPING only): %.1f%% of signals suppressed%n", newPrecheckSuppression); + System.out.printf("Aggressive precheck (all blocked states): %.1f%% of signals suppressed%n", oldPrecheckSuppression); + + // Sanity: each controlled thread type should produce at least a few samples. + // JDK 8 can collapse park/wait into WAITING-only classification depending on runtime/JFR details. + assertTrue(sleepSamples > 0, "Expected samples from sleeping thread"); + if (Platform.isJavaVersion(8)) { + assertTrue(parkSamples + objectWaitSamples > 0, + "Expected WAITING/PARKED samples from parked or object-waiting threads on JDK 8"); + } else { + assertTrue(parkSamples > 0, "Expected samples from parked thread"); + assertTrue(objectWaitSamples > 0, "Expected samples from object-waiting thread"); + } + assertTrue(runnableSamples > 0, "Expected RUNNABLE samples (working thread or unidentified)"); + } + + /** + * Simulates a typical Java service: a fixed thread pool that is mostly idle (threads parked + * in {@code LinkedBlockingQueue.take()}), plus a scheduler thread doing periodic + * {@code Thread.sleep} wakeups, plus a continuously-busy computation thread. + * + * This workload is representative of real applications where {@code LockSupport.park} + * dominates the thread state distribution. The output shows how aggressively each + * precheck variant would reduce signals, and what fraction of interesting blocking + * visibility each strategy sacrifices. + */ + @Test + public void realisticServiceWorkload() throws Exception { + Assumptions.assumeTrue(!Platform.isJ9()); + + final int POOL_SIZE = 8; + final int TASK_DURATION_MS = 20; // each submitted task takes ~20 ms + final int SCHEDULE_INTERVAL_MS = 50; // scheduler fires every 50 ms + + AtomicBoolean stop = new AtomicBoolean(false); + AtomicInteger threadIndex = new AtomicInteger(0); + + // Thread pool whose threads register themselves with the wall-clock filter. + // When idle, pool threads sit in LinkedBlockingQueue.take() → LockSupport.park (CONDVAR_WAIT). + ExecutorService pool = Executors.newFixedThreadPool(POOL_SIZE, r -> { + Thread t = new Thread(() -> { + registerCurrentThreadForWallClockProfiling(); + r.run(); + }); + t.setName("realistic-pool-" + threadIndex.incrementAndGet()); + t.setDaemon(true); + return t; + }); + + // Pre-warm: submit N tasks so the executor creates all POOL_SIZE threads before measurement. + CountDownLatch primed = new CountDownLatch(POOL_SIZE); + for (int i = 0; i < POOL_SIZE; i++) { + pool.submit(primed::countDown); + } + primed.await(); + Thread.sleep(50); // let all pool threads return to idle (parked) state + + // Scheduler: sleeps between submissions, simulating a periodic task trigger. + // Thread.sleep → SLEEPING, which is what the new precheck targets. + Thread scheduler = new Thread(() -> { + registerCurrentThreadForWallClockProfiling(); + while (!stop.get()) { + try { + Thread.sleep(SCHEDULE_INTERVAL_MS); + } catch (InterruptedException e) { + break; + } + // Submit a short CPU task to one pool thread + pool.submit(() -> { + long x = 0; + long deadline = System.nanoTime() + TASK_DURATION_MS * 1_000_000L; + while (System.nanoTime() < deadline) { x++; } + return x; + }); + } + }, "realistic-scheduler"); + scheduler.setDaemon(true); + scheduler.start(); + + // Always-busy thread: simulates a background aggregation/analytics loop. + Thread hotThread = new Thread(() -> { + registerCurrentThreadForWallClockProfiling(); + long x = 0; + while (!stop.get()) { x++; } + }, "realistic-hot"); + hotThread.setDaemon(true); + hotThread.start(); + + // Measurement window + Thread.sleep(500); + + stop.set(true); + scheduler.interrupt(); + pool.shutdownNow(); + pool.awaitTermination(2, TimeUnit.SECONDS); + hotThread.join(1000); + + stopProfiler(); + + IItemCollection events = verifyEvents("datadog.MethodSample", false); + + long sleepSamples = 0, parkSamples = 0, otherSamples = 0; + + for (IItemIterable batch : events) { + IMemberAccessor stackAccessor = JdkAttributes.STACK_TRACE_STRING.getAccessor(batch.getType()); + if (stackAccessor == null) continue; + for (IItem item : batch) { + String stack = stackAccessor.getMember(item); + if (stack == null) { + otherSamples++; + } else if (stack.contains("Thread.sleep") || stack.contains("sleep0")) { + // SLEEPING — suppressed by new precheck + sleepSamples++; + } else if (stack.contains("LockSupport.park") || stack.contains("Unsafe.park")) { + // CONDVAR_WAIT — suppressed by old precheck only + parkSamples++; + } else { + // RUNNABLE or other — not suppressed by either precheck + otherSamples++; + } + } + } + + long total = sleepSamples + parkSamples + otherSamples; + if (total == 0) { + System.out.println("No samples collected — skipping realistic workload report"); + return; + } + + double pctSleep = 100.0 * sleepSamples / total; + double pctPark = 100.0 * parkSamples / total; + double pctOther = 100.0 * otherSamples / total; + + double newPrecheckSuppression = pctSleep; + double oldPrecheckSuppression = pctSleep + pctPark; + + System.out.printf("%nRealistic service workload report (%d pool threads, 1 scheduler, 1 hot thread, 500ms):%n", POOL_SIZE); + System.out.printf(" SLEEPING (Thread.sleep — scheduler): %4d samples (%5.1f%%) — new precheck suppresses these%n", sleepSamples, pctSleep); + System.out.printf(" CONDVAR_WAIT (LockSupport.park — idle pool): %4d samples (%5.1f%%) — old precheck also suppressed these%n", parkSamples, pctPark); + System.out.printf(" RUNNABLE / other (active threads): %4d samples (%5.1f%%) — never suppressed%n", otherSamples, pctOther); + System.out.printf("Current precheck (SLEEPING only): %.1f%% of signals suppressed%n", newPrecheckSuppression); + System.out.printf("Aggressive precheck (all blocked states): %.1f%% of signals suppressed%n", oldPrecheckSuppression); + + // The idle pool threads should dominate: most samples should be parked + assertTrue(parkSamples > otherSamples, + String.format("Expected idle pool threads (park=%d) to dominate active threads (other=%d)", parkSamples, otherSamples)); + // The scheduler must appear (it sleeps 50ms at a time over 500ms → ~10 cycles) + assertTrue(sleepSamples > 0, "Expected samples from scheduler's Thread.sleep"); + } + + @Override + protected String getProfilerCommand() { + // Run with no suppression so all states are sampled; we infer the suppression rates + // from the sample distribution. + return "wall=1ms,wallprecheck=false"; + } +} diff --git a/ddprof-test/src/test/java/com/datadoghq/profiler/wallclock/PrecheckTest.java b/ddprof-test/src/test/java/com/datadoghq/profiler/wallclock/PrecheckTest.java new file mode 100644 index 000000000..43be5b50d --- /dev/null +++ b/ddprof-test/src/test/java/com/datadoghq/profiler/wallclock/PrecheckTest.java @@ -0,0 +1,56 @@ +package com.datadoghq.profiler.wallclock; + +import com.datadoghq.profiler.AbstractProfilerTest; +import com.datadoghq.profiler.Platform; +import org.junit.jupiter.api.Assumptions; +import org.junit.jupiter.api.Test; +import org.openjdk.jmc.common.item.Aggregators; + +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Verifies the wallprecheck feature (wallprecheck=true, the default): the timer loop reads + * osThreadState() before sending SIGVTALRM and skips threads in SLEEPING state (Thread.sleep). + * A thread sleeping for 300 ms at a 1 ms interval should receive nearly zero signals. + * + *

Runs only on JDK 11+: JDK 8 HotSpot often does not expose a consistent SLEEPING OSThread + * state for Thread.sleep in vmStructs, so precheck cannot suppress signals reliably on JDK 8 CI. + */ +public class PrecheckTest extends AbstractProfilerTest { + + @Test + public void testSleepingThreadIsNotSampled() throws InterruptedException { + Assumptions.assumeTrue(!Platform.isJ9()); + // Wall precheck uses VMThread::osThreadState() -> SLEEPING (wallClock.cpp). JDK 8 + // frequently misreports vs JDK 11+ across vendors/libcs (Oracle, musl, glibc), so + // nearly all wall ticks still signal — CI sees hundreds of MethodSamples. + Assumptions.assumeTrue(Platform.isJavaVersionAtLeast(11)); + registerCurrentThreadForWallClockProfiling(); + + // Sleep for 300 ms — enough for ~300 timer ticks at 1 ms interval. + Thread.sleep(300); + + stopProfiler(); + + // The timer thread reads osThreadState() and skips SIGVTALRM for SLEEPING threads, + // so sample count should be near zero (a handful may slip through at sleep entry/exit). + long sampleCount = verifyEvents("datadog.MethodSample", false) + .getAggregate(Aggregators.count()).longValue(); + assertTrue(sampleCount < 10, + "Expected nearly no MethodSample events for a sleeping thread with wallprecheck=true, got: " + sampleCount); + + // Confirm the suppression counter incremented (only available in COUNTERS-enabled builds). + Map counters = profiler.getDebugCounters(); + if (counters.containsKey("wc_signals_skipped_sleeping")) { + assertTrue(counters.get("wc_signals_skipped_sleeping") > 0, + "wc_signals_skipped_sleeping should be > 0 for a 300 ms Thread.sleep()"); + } + } + + @Override + protected String getProfilerCommand() { + return "wall=1ms"; // wallprecheck=true by default + } +} diff --git a/ddprof-test/src/test/java/com/datadoghq/profiler/wallclock/SleepTest.java b/ddprof-test/src/test/java/com/datadoghq/profiler/wallclock/SleepTest.java index 5cc63689e..673268e2c 100644 --- a/ddprof-test/src/test/java/com/datadoghq/profiler/wallclock/SleepTest.java +++ b/ddprof-test/src/test/java/com/datadoghq/profiler/wallclock/SleepTest.java @@ -26,6 +26,6 @@ public void testSleep() { @Override protected String getProfilerCommand() { - return "wall=1ms"; + return "wall=1ms,wallprecheck=false"; } } diff --git a/ddprof-test/src/test/java/com/datadoghq/profiler/wallclock/WallClockThreadFilterTest.java b/ddprof-test/src/test/java/com/datadoghq/profiler/wallclock/WallClockThreadFilterTest.java index 3a7ad7208..a6952fb11 100644 --- a/ddprof-test/src/test/java/com/datadoghq/profiler/wallclock/WallClockThreadFilterTest.java +++ b/ddprof-test/src/test/java/com/datadoghq/profiler/wallclock/WallClockThreadFilterTest.java @@ -38,6 +38,9 @@ public void test() throws InterruptedException { @Override protected String getProfilerCommand() { - return "wall=~1ms,filter=0"; + // wallprecheck=false: this test exercises thread filter + JFR thread identity, not sleep + // suppression (see PrecheckTest). With wallprecheck=true and VMThread set on the filter + // slot, a thread sleeping the whole window receives no wall signals. + return "wall=~1ms,filter=0,wallprecheck=false"; } } From 28856f43bd5ad5d2dce4938b8a41f2257ac89cc5 Mon Sep 17 00:00:00 2001 From: Paul Fournillon Date: Wed, 29 Apr 2026 11:17:59 +0200 Subject: [PATCH 04/14] fix(liveness): update the _record_heap_usage flag if profiler args change --- ddprof-lib/src/main/cpp/livenessTracker.cpp | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/ddprof-lib/src/main/cpp/livenessTracker.cpp b/ddprof-lib/src/main/cpp/livenessTracker.cpp index afb2c1f74..d5ab05f9d 100644 --- a/ddprof-lib/src/main/cpp/livenessTracker.cpp +++ b/ddprof-lib/src/main/cpp/livenessTracker.cpp @@ -217,6 +217,10 @@ Error LivenessTracker::initialize(Arguments &args) { } if (_initialized) { + // Tracker settings are sticky across recordings. Preserve the historical + // table/config behavior, but allow HeapUsage recording to be enabled later + // (e.g. if an earlier test initialized liveness without ':L'). + _record_heap_usage = _record_heap_usage || args._record_heap_usage; // if the tracker was previously initialized return the stored result for // consistency this hack also means that if the profiler is started with // different arguments for liveness tracking those will be ignored it is From ff115140753079dc5cfbe5b5d89ba73f9082fe7f Mon Sep 17 00:00:00 2001 From: Paul Fournillon Date: Wed, 29 Apr 2026 16:22:31 +0200 Subject: [PATCH 05/14] feat(jfr): add TaskBlockEvent and datadog.TaskBlock metadata --- ddprof-lib/src/main/cpp/event.h | 9 +++++++++ ddprof-lib/src/main/cpp/jfrMetadata.cpp | 11 +++++++++++ ddprof-lib/src/main/cpp/jfrMetadata.h | 1 + 3 files changed, 21 insertions(+) diff --git a/ddprof-lib/src/main/cpp/event.h b/ddprof-lib/src/main/cpp/event.h index ff6c56ecc..bfb0e2293 100644 --- a/ddprof-lib/src/main/cpp/event.h +++ b/ddprof-lib/src/main/cpp/event.h @@ -189,4 +189,13 @@ typedef struct QueueTimeEvent { u32 _queueLength; } QueueTimeEvent; +typedef struct TaskBlockEvent { + u64 _start; + u64 _end; + u64 _blocker; + u64 _unblockingSpanId; + /** Span IDs and tag encodings for JFR (park exit uses snapshot from park enter). */ + Context _ctx; +} TaskBlockEvent; + #endif // _EVENT_H diff --git a/ddprof-lib/src/main/cpp/jfrMetadata.cpp b/ddprof-lib/src/main/cpp/jfrMetadata.cpp index 54e0f6a15..b5c0fffa7 100644 --- a/ddprof-lib/src/main/cpp/jfrMetadata.cpp +++ b/ddprof-lib/src/main/cpp/jfrMetadata.cpp @@ -205,6 +205,17 @@ void JfrMetadata::initialize( << field("localRootSpanId", T_LONG, "Local Root Span ID") || contextAttributes) + << (type("datadog.TaskBlock", T_TASK_BLOCK, "Task Block") + << category("Datadog") + << field("startTime", T_LONG, "Start Time", F_TIME_TICKS) + << field("duration", T_LONG, "Duration", F_DURATION_TICKS) + << field("eventThread", T_THREAD, "Event Thread", F_CPOOL) + << field("spanId", T_LONG, "Span ID") + << field("localRootSpanId", T_LONG, "Local Root Span ID") + << field("blocker", T_LONG, "Blocker Identity Hash") + << field("unblockingSpanId", T_LONG, "Unblocking Span ID") || + contextAttributes) + << (type("datadog.HeapUsage", T_HEAP_USAGE, "JVM Heap Usage") << category("Datadog") << field("startTime", T_LONG, "Start Time", F_TIME_TICKS) diff --git a/ddprof-lib/src/main/cpp/jfrMetadata.h b/ddprof-lib/src/main/cpp/jfrMetadata.h index 52c2e0ae8..bd021d1a8 100644 --- a/ddprof-lib/src/main/cpp/jfrMetadata.h +++ b/ddprof-lib/src/main/cpp/jfrMetadata.h @@ -80,6 +80,7 @@ enum JfrType { T_DATADOG_COUNTER = 125, T_UNWIND_FAILURE = 126, T_MALLOC = 127, + T_TASK_BLOCK = 128, T_ANNOTATION = 200, T_LABEL = 201, T_CATEGORY = 202, From 0fd00b59c33184c1a9b95182131d4c015de1fd1a Mon Sep 17 00:00:00 2001 From: Paul Fournillon Date: Wed, 29 Apr 2026 16:50:09 +0200 Subject: [PATCH 06/14] feat(jfr): record datadog.TaskBlock in FlightRecorder --- ddprof-lib/src/main/cpp/flightRecorder.cpp | 25 ++++++++++++++++++++++ ddprof-lib/src/main/cpp/flightRecorder.h | 2 ++ ddprof-lib/src/main/cpp/profiler.cpp | 11 ++++++++++ ddprof-lib/src/main/cpp/profiler.h | 1 + 4 files changed, 39 insertions(+) diff --git a/ddprof-lib/src/main/cpp/flightRecorder.cpp b/ddprof-lib/src/main/cpp/flightRecorder.cpp index cefbc476b..228f4ac32 100644 --- a/ddprof-lib/src/main/cpp/flightRecorder.cpp +++ b/ddprof-lib/src/main/cpp/flightRecorder.cpp @@ -1570,6 +1570,19 @@ void Recording::recordQueueTime(Buffer *buf, int tid, QueueTimeEvent *event) { flushIfNeeded(buf); } +void Recording::recordTaskBlock(Buffer *buf, int tid, TaskBlockEvent *event) { + int start = buf->skip(1); + buf->putVar64(T_TASK_BLOCK); + buf->putVar64(event->_start); + buf->putVar64(event->_end - event->_start); + buf->putVar64(tid); + writeContextSnapshot(buf, event->_ctx); + buf->putVar64(event->_blocker); + buf->putVar64(event->_unblockingSpanId); + writeEventSizePrefix(buf, start); + flushIfNeeded(buf); +} + void Recording::recordAllocation(RecordingBuffer *buf, int tid, u64 call_trace_id, AllocEvent *event) { int start = buf->skip(1); @@ -1789,6 +1802,18 @@ void FlightRecorder::recordQueueTime(int lock_index, int tid, } } +void FlightRecorder::recordTaskBlock(int lock_index, int tid, + TaskBlockEvent *event) { + OptionalSharedLockGuard locker(&_rec_lock); + if (locker.ownsLock()) { + Recording* rec = _rec; + if (rec != nullptr) { + Buffer *buf = rec->buffer(lock_index); + rec->recordTaskBlock(buf, tid, event); + } + } +} + void FlightRecorder::recordDatadogSetting(int lock_index, int length, const char *name, const char *value, const char *unit) { diff --git a/ddprof-lib/src/main/cpp/flightRecorder.h b/ddprof-lib/src/main/cpp/flightRecorder.h index e9aa3cde1..cf2005df1 100644 --- a/ddprof-lib/src/main/cpp/flightRecorder.h +++ b/ddprof-lib/src/main/cpp/flightRecorder.h @@ -279,6 +279,7 @@ class Recording { void recordWallClockEpoch(Buffer *buf, WallClockEpochEvent *event); void recordTraceRoot(Buffer *buf, int tid, TraceRootEvent *event); void recordQueueTime(Buffer *buf, int tid, QueueTimeEvent *event); + void recordTaskBlock(Buffer *buf, int tid, TaskBlockEvent *event); void recordAllocation(RecordingBuffer *buf, int tid, u64 call_trace_id, AllocEvent *event); void recordMallocSample(Buffer *buf, int tid, u64 call_trace_id, @@ -347,6 +348,7 @@ class FlightRecorder { void wallClockEpoch(int lock_index, WallClockEpochEvent *event); void recordTraceRoot(int lock_index, int tid, TraceRootEvent *event); void recordQueueTime(int lock_index, int tid, QueueTimeEvent *event); + void recordTaskBlock(int lock_index, int tid, TaskBlockEvent *event); bool active() const { return _rec != NULL; } diff --git a/ddprof-lib/src/main/cpp/profiler.cpp b/ddprof-lib/src/main/cpp/profiler.cpp index 0e3a81302..bbd1fc56d 100644 --- a/ddprof-lib/src/main/cpp/profiler.cpp +++ b/ddprof-lib/src/main/cpp/profiler.cpp @@ -632,6 +632,17 @@ void Profiler::recordQueueTime(int tid, QueueTimeEvent *event) { _locks[lock_index].unlock(); } +void Profiler::recordTaskBlock(int tid, TaskBlockEvent *event) { + u32 lock_index = getLockIndex(tid); + if (!_locks[lock_index].tryLock() && + !_locks[lock_index = (lock_index + 1) % CONCURRENCY_LEVEL].tryLock() && + !_locks[lock_index = (lock_index + 2) % CONCURRENCY_LEVEL].tryLock()) { + return; + } + _jfr.recordTaskBlock(lock_index, tid, event); + _locks[lock_index].unlock(); +} + void Profiler::recordExternalSample(u64 weight, int tid, int num_frames, ASGCT_CallFrame *frames, bool truncated, jint event_type, Event *event) { diff --git a/ddprof-lib/src/main/cpp/profiler.h b/ddprof-lib/src/main/cpp/profiler.h index d9dc28ac7..d35bdbf1e 100644 --- a/ddprof-lib/src/main/cpp/profiler.h +++ b/ddprof-lib/src/main/cpp/profiler.h @@ -337,6 +337,7 @@ class alignas(alignof(SpinLock)) Profiler { void recordWallClockEpoch(int tid, WallClockEpochEvent *event); void recordTraceRoot(int tid, TraceRootEvent *event); void recordQueueTime(int tid, QueueTimeEvent *event); + void recordTaskBlock(int tid, TaskBlockEvent *event); void writeLog(LogLevel level, const char *message); void writeLog(LogLevel level, const char *message, size_t len); void writeDatadogProfilerSetting(int tid, int length, const char *name, From 51cd4e3bf41f1ef36a0f92486285d3077c1e80de Mon Sep 17 00:00:00 2001 From: Paul Fournillon Date: Wed, 29 Apr 2026 16:51:10 +0200 Subject: [PATCH 07/14] feat(thread): add park enter/exit state for wall-clock gating --- ddprof-lib/src/main/cpp/thread.cpp | 3 +++ ddprof-lib/src/main/cpp/thread.h | 32 +++++++++++++++++++++++++++++- 2 files changed, 34 insertions(+), 1 deletion(-) diff --git a/ddprof-lib/src/main/cpp/thread.cpp b/ddprof-lib/src/main/cpp/thread.cpp index bb89f2f00..6279b27b7 100644 --- a/ddprof-lib/src/main/cpp/thread.cpp +++ b/ddprof-lib/src/main/cpp/thread.cpp @@ -98,6 +98,9 @@ void ProfiledThread::releaseFromBuffer() { _wall_epoch = 0; _call_trace_id = 0; _recording_epoch = 0; + __atomic_fetch_and(&_misc_flags, ~FLAG_PARKED, __ATOMIC_RELEASE); + _park_start_ticks = 0; + memset(&_park_context, 0, sizeof(_park_context)); _filter_slot_id = -1; _init_window = 0; _unwind_failures.clear(); diff --git a/ddprof-lib/src/main/cpp/thread.h b/ddprof-lib/src/main/cpp/thread.h index b3c721bfb..36d16361c 100644 --- a/ddprof-lib/src/main/cpp/thread.h +++ b/ddprof-lib/src/main/cpp/thread.h @@ -28,6 +28,8 @@ class ProfiledThread : public ThreadLocalData { TYPE_MASK = TYPE_JAVA_THREAD | TYPE_NOT_JAVA_THREAD }; + static constexpr u32 FLAG_PARKED = 0x4u; + private: // We are allowing several levels of nesting because we can be // eg. in a crash handler when wallclock signal kicks in, @@ -68,6 +70,8 @@ class ProfiledThread : public ThreadLocalData { u64 _call_trace_id; u32 _recording_epoch; u32 _misc_flags; + u64 _park_start_ticks; + Context _park_context; int _filter_slot_id; // Slot ID for thread filtering uint8_t _init_window; // Countdown for JVM thread init race window (PROF-13072) UnwindFailures _unwind_failures; @@ -87,7 +91,9 @@ class ProfiledThread : public ThreadLocalData { ProfiledThread(int buffer_pos, int tid) : ThreadLocalData(), _pc(0), _sp(0), _span_id(0), _crash_depth(0), _buffer_pos(buffer_pos), _tid(tid), _cpu_epoch(0), - _wall_epoch(0), _call_trace_id(0), _recording_epoch(0), _misc_flags(0), _filter_slot_id(-1), _init_window(0), + _wall_epoch(0), _call_trace_id(0), _recording_epoch(0), _misc_flags(0), + _park_start_ticks(0), _park_context{}, + _filter_slot_id(-1), _init_window(0), _otel_ctx_initialized(false), _crash_protection_active(false), _otel_ctx_record{}, _otel_tag_encodings{}, _otel_local_root_span_id(0) {}; @@ -250,6 +256,30 @@ class ProfiledThread : public ThreadLocalData { _otel_local_root_span_id = 0; } + inline void parkEnter(u64 span_id, u64 root_span_id, u64 start_ticks) { + _park_context.spanId = span_id; + _park_context.rootSpanId = root_span_id; + for (size_t i = 0; i < DD_TAGS_CAPACITY; i++) { + _park_context.tags[i].value = _otel_tag_encodings[i]; + } + _park_start_ticks = start_ticks; + __atomic_fetch_or(&_misc_flags, FLAG_PARKED, __ATOMIC_RELEASE); + } + + inline bool parkExit(u64 &start_ticks, Context &park_context) { + u32 prev = __atomic_fetch_and(&_misc_flags, ~FLAG_PARKED, __ATOMIC_ACQ_REL); + if ((prev & FLAG_PARKED) == 0) { + return false; + } + start_ticks = _park_start_ticks; + park_context = _park_context; + return true; + } + + inline bool isParkedForWallclock() const { + return (__atomic_load_n(&_misc_flags, __ATOMIC_ACQUIRE) & FLAG_PARKED) != 0; + } + Context snapshotContext(size_t numAttrs); private: From 13fbd2014718b8f729452f1b739a18ef11c953fe Mon Sep 17 00:00:00 2001 From: Paul Fournillon Date: Wed, 29 Apr 2026 16:53:31 +0200 Subject: [PATCH 08/14] feat(wall): store ProfiledThread in thread filter for wall sampling --- ddprof-lib/src/main/cpp/javaApi.cpp | 3 +++ ddprof-lib/src/main/cpp/profiler.cpp | 3 +++ ddprof-lib/src/main/cpp/threadFilter.cpp | 18 +++++++++++++++++- ddprof-lib/src/main/cpp/threadFilter.h | 6 +++++- 4 files changed, 28 insertions(+), 2 deletions(-) diff --git a/ddprof-lib/src/main/cpp/javaApi.cpp b/ddprof-lib/src/main/cpp/javaApi.cpp index 0687323c5..896592ac0 100644 --- a/ddprof-lib/src/main/cpp/javaApi.cpp +++ b/ddprof-lib/src/main/cpp/javaApi.cpp @@ -162,6 +162,9 @@ JavaCritical_com_datadoghq_profiler_JavaProfiler_filterThreadAdd0() { // Refresh HotSpot VMThread* for wall thread-filter precheck (vmStructs OS state). // HotSpot only: VMThread::current() asserts VM::isHotspot(). OpenJ9/Zing: leave null. thread_filter->setVMThread(slot_id, VM::isHotspot() ? VMThread::current() : nullptr); + // Refresh ProfiledThread* so wall-clock mitigations can observe per-thread parked state. + // Publish pointer fields before publishing tid via add() to preserve visibility ordering. + thread_filter->setProfiledThread(slot_id, current); thread_filter->add(tid, slot_id); } diff --git a/ddprof-lib/src/main/cpp/profiler.cpp b/ddprof-lib/src/main/cpp/profiler.cpp index bbd1fc56d..cb54351b5 100644 --- a/ddprof-lib/src/main/cpp/profiler.cpp +++ b/ddprof-lib/src/main/cpp/profiler.cpp @@ -77,6 +77,7 @@ void Profiler::onThreadStart(jvmtiEnv *jvmti, JNIEnv *jni, jthread thread) { current->setFilterSlotId(slot_id); // VMThread / vmStructs are HotSpot-only; VMThread::current() asserts VM::isHotspot(). _thread_filter.setVMThread(slot_id, VM::isHotspot() ? VMThread::current() : nullptr); + _thread_filter.setProfiledThread(slot_id, current); _thread_filter.remove(slot_id); // Remove from filtering initially } if (thread != NULL) { @@ -98,6 +99,7 @@ void Profiler::onThreadEnd(jvmtiEnv *jvmti, JNIEnv *jni, jthread thread) { if (_thread_filter.enabled()) { _thread_filter.setVMThread(slot_id, nullptr); + _thread_filter.setProfiledThread(slot_id, nullptr); _thread_filter.unregisterThread(slot_id); current->setFilterSlotId(-1); } @@ -1151,6 +1153,7 @@ Error Profiler::start(Arguments &args, bool reset) { int slot_id = _thread_filter.registerThread(); current->setFilterSlotId(slot_id); _thread_filter.setVMThread(slot_id, VM::isHotspot() ? VMThread::current() : nullptr); + _thread_filter.setProfiledThread(slot_id, current); _thread_filter.remove(slot_id); // Remove from filtering initially (matches onThreadStart behavior) } diff --git a/ddprof-lib/src/main/cpp/threadFilter.cpp b/ddprof-lib/src/main/cpp/threadFilter.cpp index 77aff3c78..b0457ffc3 100644 --- a/ddprof-lib/src/main/cpp/threadFilter.cpp +++ b/ddprof-lib/src/main/cpp/threadFilter.cpp @@ -23,6 +23,7 @@ #include "arch.h" #include "hotspot/vmStructs.h" #include "os.h" +#include "thread.h" #include #include #include @@ -79,6 +80,8 @@ void ThreadFilter::initializeChunk(int chunk_idx) { ChunkStorage* new_chunk = new ChunkStorage(); for (auto& slot : new_chunk->slots) { slot.value.store(-1, std::memory_order_relaxed); + slot.vm_thread.store(nullptr, std::memory_order_relaxed); + slot.profiled_thread.store(nullptr, std::memory_order_relaxed); } // Try to install it atomically @@ -198,6 +201,8 @@ void ThreadFilter::remove(SlotID slot_id) { } chunk->slots[slot_idx].value.store(-1, std::memory_order_release); + chunk->slots[slot_idx].vm_thread.store(nullptr, std::memory_order_release); + chunk->slots[slot_idx].profiled_thread.store(nullptr, std::memory_order_release); } void ThreadFilter::unregisterThread(SlotID slot_id) { @@ -295,6 +300,16 @@ void ThreadFilter::setVMThread(SlotID slot_id, VMThread* vm_thread) { } } +void ThreadFilter::setProfiledThread(SlotID slot_id, ProfiledThread* profiled_thread) { + if (slot_id < 0) return; + int chunk_idx = slot_id >> kChunkShift; + int slot_idx = slot_id & kChunkMask; + ChunkStorage* chunk = _chunks[chunk_idx].load(std::memory_order_acquire); + if (chunk != nullptr) { + chunk->slots[slot_idx].profiled_thread.store(profiled_thread, std::memory_order_release); + } +} + void ThreadFilter::collectWithState(std::vector& entries) const { entries.clear(); entries.reserve(512); @@ -308,7 +323,8 @@ void ThreadFilter::collectWithState(std::vector& entries) const { int slot_tid = slot.value.load(std::memory_order_acquire); if (slot_tid != -1) { VMThread* vm = slot.vm_thread.load(std::memory_order_acquire); - entries.push_back({slot_tid, vm}); + ProfiledThread* pt = slot.profiled_thread.load(std::memory_order_acquire); + entries.push_back({slot_tid, vm, pt}); } } } diff --git a/ddprof-lib/src/main/cpp/threadFilter.h b/ddprof-lib/src/main/cpp/threadFilter.h index dd5b878cb..70911862f 100644 --- a/ddprof-lib/src/main/cpp/threadFilter.h +++ b/ddprof-lib/src/main/cpp/threadFilter.h @@ -25,10 +25,12 @@ #include "arch.h" class VMThread; +class ProfiledThread; struct ThreadEntry { int tid; VMThread* vm_thread; + ProfiledThread* profiled_thread; }; class ThreadFilter { @@ -56,6 +58,7 @@ class ThreadFilter { void remove(SlotID slot_id); void collect(std::vector& tids) const; void setVMThread(SlotID slot_id, VMThread* vm_thread); + void setProfiledThread(SlotID slot_id, ProfiledThread* profiled_thread); void collectWithState(std::vector& entries) const; SlotID registerThread(); @@ -66,8 +69,9 @@ class ThreadFilter { // Pointers are placed before the int to avoid implicit alignment padding between them. struct alignas(DEFAULT_CACHE_LINE_SIZE) Slot { std::atomic vm_thread{nullptr}; // 8 bytes + std::atomic profiled_thread{nullptr}; // 8 bytes std::atomic value{-1}; // 4 bytes - char padding[DEFAULT_CACHE_LINE_SIZE - sizeof(vm_thread) - sizeof(value)]; + char padding[DEFAULT_CACHE_LINE_SIZE - sizeof(vm_thread) - sizeof(profiled_thread) - sizeof(value)]; }; static_assert(sizeof(Slot) == DEFAULT_CACHE_LINE_SIZE, "Slot must be exactly one cache line"); From 05d9ac4cc12fc822e086045f004e57a8dde31796 Mon Sep 17 00:00:00 2001 From: Paul Fournillon Date: Wed, 29 Apr 2026 16:54:11 +0200 Subject: [PATCH 09/14] feat(wall): skip wall SIGVTALRM while thread is parked --- ddprof-lib/src/main/cpp/counters.h | 1 + ddprof-lib/src/main/cpp/wallClock.cpp | 9 ++++++++- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/ddprof-lib/src/main/cpp/counters.h b/ddprof-lib/src/main/cpp/counters.h index 355f810bb..63c18bd35 100644 --- a/ddprof-lib/src/main/cpp/counters.h +++ b/ddprof-lib/src/main/cpp/counters.h @@ -61,6 +61,7 @@ X(AGCT_BLOCKED_IN_VM, "agct_blocked_in_vm") \ X(SKIPPED_WALLCLOCK_UNWINDS, "skipped_wallclock_unwinds") \ X(WC_SIGNAL_SKIPPED_SLEEPING, "wc_signals_skipped_sleeping") \ + X(WC_SIGNAL_SKIPPED_PARKED, "wc_signals_skipped_parked") \ X(UNWINDING_TIME_ASYNC, "unwinding_ticks_async") \ X(UNWINDING_TIME_JVMTI, "unwinding_ticks_jvmti") \ X(CALLTRACE_STORAGE_DROPPED, "calltrace_storage_dropped_traces") \ diff --git a/ddprof-lib/src/main/cpp/wallClock.cpp b/ddprof-lib/src/main/cpp/wallClock.cpp index ac027faa6..ef4bba493 100644 --- a/ddprof-lib/src/main/cpp/wallClock.cpp +++ b/ddprof-lib/src/main/cpp/wallClock.cpp @@ -87,6 +87,13 @@ void WallClockASGCT::signalHandler(int signo, siginfo_t *siginfo, void *ucontext current->tickInitWindow(); return; } + // Parked suppression is evaluated on the sampled thread itself (TLS-backed + // ProfiledThread::currentSignalSafe()) to avoid dereferencing cross-thread + // ProfiledThread pointers that may race with thread teardown. + if (current != nullptr && current->isParkedForWallclock()) { + Counters::increment(WC_SIGNAL_SKIPPED_PARKED); + return; + } int tid = current != NULL ? current->tid() : OS::threadId(); Shims::instance().setSighandlerTid(tid); u64 call_trace_id = 0; @@ -182,7 +189,7 @@ void WallClockASGCT::timerLoop() { while (thread_list->hasNext()) { int tid = thread_list->next(); if (tid != OS::threadId()) { - entries.push_back({tid, nullptr}); + entries.push_back({tid, nullptr, nullptr}); } } delete thread_list; From dcc0810b6f679dda7439250180a3851cbef8fd09 Mon Sep 17 00:00:00 2001 From: Paul Fournillon Date: Wed, 29 Apr 2026 16:54:29 +0200 Subject: [PATCH 10/14] feat(api): add recordTaskBlock and park JNI entry points --- ddprof-lib/src/main/cpp/javaApi.cpp | 64 +++++++++++++++++++ .../com/datadoghq/profiler/JavaProfiler.java | 39 +++++++++++ 2 files changed, 103 insertions(+) diff --git a/ddprof-lib/src/main/cpp/javaApi.cpp b/ddprof-lib/src/main/cpp/javaApi.cpp index 896592ac0..358ffa26a 100644 --- a/ddprof-lib/src/main/cpp/javaApi.cpp +++ b/ddprof-lib/src/main/cpp/javaApi.cpp @@ -319,6 +319,70 @@ Java_com_datadoghq_profiler_JavaProfiler_recordQueueEnd0( Profiler::instance()->recordQueueTime(tid, &event); } +static inline bool exceedsMinTaskBlockDuration(u64 start_ticks, u64 end_ticks) { + static const u64 kMinTaskBlockNanos = 1000000; // 1 ms + u64 min_ticks = (TSC::frequency() * kMinTaskBlockNanos) / 1000000000ULL; + return end_ticks > start_ticks && (end_ticks - start_ticks) >= min_ticks; +} + +extern "C" DLLEXPORT void JNICALL +Java_com_datadoghq_profiler_JavaProfiler_recordTaskBlock0( + JNIEnv *env, jclass unused, jlong startTicks, jlong endTicks, jlong spanId, + jlong rootSpanId, jlong blocker, jlong unblockingSpanId) { + int tid = ProfiledThread::currentTid(); + if (tid < 0) { + return; + } + if (!exceedsMinTaskBlockDuration(startTicks, endTicks) || spanId == 0) { + return; + } + TaskBlockEvent event{}; + event._start = startTicks; + event._end = endTicks; + event._blocker = blocker; + event._unblockingSpanId = unblockingSpanId; + event._ctx = ContextApi::snapshot(); + event._ctx.spanId = (u64)spanId; + event._ctx.rootSpanId = (u64)rootSpanId; + Profiler::instance()->recordTaskBlock(tid, &event); +} + +extern "C" DLLEXPORT void JNICALL +Java_com_datadoghq_profiler_JavaProfiler_parkEnter0( + JNIEnv *env, jclass unused, jlong spanId, jlong rootSpanId) { + ProfiledThread *current = ProfiledThread::current(); + if (current == nullptr) { + return; + } + current->parkEnter(spanId, rootSpanId, TSC::ticks()); +} + +extern "C" DLLEXPORT void JNICALL +Java_com_datadoghq_profiler_JavaProfiler_parkExit0( + JNIEnv *env, jclass unused, jlong blocker, jlong unblockingSpanId) { + ProfiledThread *current = ProfiledThread::current(); + if (current == nullptr) { + return; + } + u64 start_ticks = 0; + Context park_context = {}; + if (!current->parkExit(start_ticks, park_context)) { + return; + } + u64 end_ticks = TSC::ticks(); + if (!exceedsMinTaskBlockDuration(start_ticks, end_ticks) || + park_context.spanId == 0) { + return; + } + TaskBlockEvent event{}; + event._start = start_ticks; + event._end = end_ticks; + event._blocker = blocker; + event._unblockingSpanId = unblockingSpanId; + event._ctx = park_context; + Profiler::instance()->recordTaskBlock(current->tid(), &event); +} + extern "C" DLLEXPORT jlong JNICALL Java_com_datadoghq_profiler_JavaProfiler_currentTicks0(JNIEnv *env, jclass unused) { diff --git a/ddprof-lib/src/main/java/com/datadoghq/profiler/JavaProfiler.java b/ddprof-lib/src/main/java/com/datadoghq/profiler/JavaProfiler.java index 33e0bdc13..1d19cd78b 100644 --- a/ddprof-lib/src/main/java/com/datadoghq/profiler/JavaProfiler.java +++ b/ddprof-lib/src/main/java/com/datadoghq/profiler/JavaProfiler.java @@ -277,6 +277,39 @@ public void recordQueueTime(long startTicks, recordQueueEnd0(startTicks, endTicks, task.getName(), scheduler.getName(), origin, queueType.getName(), queueLength); } + /** + * Records a {@code datadog.TaskBlock} interval for the current thread (direct API). + * + * @param startTicks TSC tick at block start + * @param endTicks TSC tick at block end + * @param spanId active span id when blocking began + * @param rootSpanId active local root span id when blocking began + * @param blocker blocker identity (e.g. monitor hash), or 0 + * @param unblockingSpanId span id of unblocking thread, or 0 + */ + public void recordTaskBlock(long startTicks, + long endTicks, + long spanId, + long rootSpanId, + long blocker, + long unblockingSpanId) { + recordTaskBlock0(startTicks, endTicks, spanId, rootSpanId, blocker, unblockingSpanId); + } + + /** + * Called before {@code LockSupport.park}; native wall-clock sampling may skip SIGVTALRM for this interval. + */ + public void parkEnter(long spanId, long rootSpanId) { + parkEnter0(spanId, rootSpanId); + } + + /** + * Called after {@code LockSupport.park}; clears parked state and may emit {@code datadog.TaskBlock}. + */ + public void parkExit(long blocker, long unblockingSpanId) { + parkExit0(blocker, unblockingSpanId); + } + /** * Get the ticks for the current thread. * @return ticks @@ -332,6 +365,12 @@ private static ThreadContext initializeThreadContext() { private static native void recordQueueEnd0(long startTicks, long endTicks, String task, String scheduler, Thread origin, String queueType, int queueLength); + private static native void recordTaskBlock0(long startTicks, long endTicks, long spanId, long rootSpanId, long blocker, long unblockingSpanId); + + private static native void parkEnter0(long spanId, long rootSpanId); + + private static native void parkExit0(long blocker, long unblockingSpanId); + private static native long currentTicks0(); private static native long tscFrequency0(); From cb97b77bc38ff4ea9b5f933be7a5509a4df4b10c Mon Sep 17 00:00:00 2001 From: Paul Fournillon Date: Wed, 29 Apr 2026 16:58:26 +0200 Subject: [PATCH 11/14] fix(hotspot): guard VMThread access for non-HotSpot JVMs --- ddprof-lib/src/main/cpp/hotspot/vmStructs.inline.h | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/ddprof-lib/src/main/cpp/hotspot/vmStructs.inline.h b/ddprof-lib/src/main/cpp/hotspot/vmStructs.inline.h index 03a81fea6..825d6a566 100644 --- a/ddprof-lib/src/main/cpp/hotspot/vmStructs.inline.h +++ b/ddprof-lib/src/main/cpp/hotspot/vmStructs.inline.h @@ -9,12 +9,21 @@ #include "hotspot/vmStructs.h" #include "jvmThread.h" +#include "vmEntry.h" VMThread* VMThread::current() { + // JVMThread::current() is the native thread self pointer. On OpenJ9/Zing it + // is not a HotSpot JavaThread*; only HotSpot may reinterpret it as VMThread*. + if (!VM::isHotspot() || JVMThread::current() == nullptr) { + return nullptr; + } return VMThread::cast(JVMThread::current()); } VMThread* VMThread::fromJavaThread(JNIEnv* env, jthread thread) { + if (!VM::isHotspot()) { + return nullptr; + } assert(_eetop != nullptr); if (_eetop != nullptr) { return VMThread::cast((void*)env->GetLongField(thread, _eetop)); From 430e97993966de9098e16f38a8b99bb75a2dc371 Mon Sep 17 00:00:00 2001 From: Paul Fournillon Date: Wed, 29 Apr 2026 16:59:04 +0200 Subject: [PATCH 12/14] test(thread): add gtests for ProfiledThread park state --- ddprof-lib/src/test/cpp/park_state_ut.cpp | 63 +++++++++++++++++++++++ 1 file changed, 63 insertions(+) create mode 100644 ddprof-lib/src/test/cpp/park_state_ut.cpp diff --git a/ddprof-lib/src/test/cpp/park_state_ut.cpp b/ddprof-lib/src/test/cpp/park_state_ut.cpp new file mode 100644 index 000000000..e65e1a173 --- /dev/null +++ b/ddprof-lib/src/test/cpp/park_state_ut.cpp @@ -0,0 +1,63 @@ +/* + * Copyright 2026 Datadog, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include "thread.h" + +TEST(ProfiledThreadParkStateTest, ParkFlagLifecycle) { + ProfiledThread* thread = ProfiledThread::forTid(12345); + + EXPECT_FALSE(thread->isParkedForWallclock()); + + thread->parkEnter(101, 202, 777); + EXPECT_TRUE(thread->isParkedForWallclock()); + + u64 start_ticks = 0; + Context park_context = {}; + EXPECT_TRUE(thread->parkExit(start_ticks, park_context)); + EXPECT_EQ(777ULL, start_ticks); + EXPECT_EQ(101ULL, park_context.spanId); + EXPECT_EQ(202ULL, park_context.rootSpanId); + EXPECT_FALSE(thread->isParkedForWallclock()); + + // Second exit is a no-op once the parked bit is cleared. + EXPECT_FALSE(thread->parkExit(start_ticks, park_context)); +} + +TEST(ProfiledThreadParkStateTest, ParkEnterSnapshotsTagEncodings) { + ProfiledThread* thread = ProfiledThread::forTid(12346); + u32* tags = thread->getOtelTagEncodingsPtr(); + tags[0] = 11; + tags[1] = 22; + tags[2] = 33; + + thread->parkEnter(303, 404, 888); + + // Mutate live encodings after enter: park context must keep the enter snapshot. + tags[0] = 111; + tags[1] = 222; + tags[2] = 333; + + u64 start_ticks = 0; + Context park_context = {}; + ASSERT_TRUE(thread->parkExit(start_ticks, park_context)); + EXPECT_EQ(888ULL, start_ticks); + EXPECT_EQ(303ULL, park_context.spanId); + EXPECT_EQ(404ULL, park_context.rootSpanId); + EXPECT_EQ(11U, park_context.tags[0].value); + EXPECT_EQ(22U, park_context.tags[1].value); + EXPECT_EQ(33U, park_context.tags[2].value); +} From 81c7712d2564b06df8bf8bc5e106eaa6266b47d2 Mon Sep 17 00:00:00 2001 From: Paul Fournillon Date: Wed, 29 Apr 2026 17:24:18 +0200 Subject: [PATCH 13/14] test(wall): add TaskBlock and combined precheck/park coverage --- .../profiler/wallclock/ParkTaskBlockTest.java | 68 ++++++++++ .../WallclockMitigationsCombinedTest.java | 122 ++++++++++++++++++ 2 files changed, 190 insertions(+) create mode 100644 ddprof-test/src/test/java/com/datadoghq/profiler/wallclock/ParkTaskBlockTest.java create mode 100644 ddprof-test/src/test/java/com/datadoghq/profiler/wallclock/WallclockMitigationsCombinedTest.java diff --git a/ddprof-test/src/test/java/com/datadoghq/profiler/wallclock/ParkTaskBlockTest.java b/ddprof-test/src/test/java/com/datadoghq/profiler/wallclock/ParkTaskBlockTest.java new file mode 100644 index 000000000..1e794a39b --- /dev/null +++ b/ddprof-test/src/test/java/com/datadoghq/profiler/wallclock/ParkTaskBlockTest.java @@ -0,0 +1,68 @@ +package com.datadoghq.profiler.wallclock; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.datadoghq.profiler.AbstractProfilerTest; +import com.datadoghq.profiler.Platform; +import org.junit.jupiter.api.Assumptions; +import org.junit.jupiter.api.Test; +import org.openjdk.jmc.common.item.Aggregators; +import org.openjdk.jmc.common.item.IItemCollection; + +import java.util.Map; + +/** + * Approach B integration test: parkEnter/parkExit must emit TaskBlock and suppress wall-clock + * signals while parked (via ProfiledThread flag when thread filter lists the thread). + */ +public class ParkTaskBlockTest extends AbstractProfilerTest { + + /** Verifies TaskBlock emission and parked signal suppression on the same thread. */ + @Test + public void parkIntervalEmitsTaskBlockAndSuppressesSignals() { + Assumptions.assumeTrue(!Platform.isJ9()); + registerCurrentThreadForWallClockProfiling(); + + long spanId = 0x1234L; + long rootSpanId = 0x5678L; + profiler.setContext(rootSpanId, spanId, 0, 0); + + long parkUntil = System.nanoTime() + 250_000_000L; // 250 ms + profiler.parkEnter(spanId, rootSpanId); + while (System.nanoTime() < parkUntil) { + // Deliberately stay runnable while "parked": suppression must come from park flag, + // not from sleeping-state precheck. + } + profiler.parkExit(System.identityHashCode(this), 0L); + profiler.clearContext(); + + // Keep profiler active after park interval so regular wall samples still occur. + long activeUntil = System.nanoTime() + 120_000_000L; + while (System.nanoTime() < activeUntil) { + // busy + } + + stopProfiler(); + + IItemCollection taskBlocks = verifyEvents("datadog.TaskBlock"); + long taskBlockCount = taskBlocks.getAggregate(Aggregators.count()).longValue(); + assertTrue(taskBlockCount > 0, "Expected datadog.TaskBlock events after parkEnter/parkExit"); + + IItemCollection methodSamples = verifyEvents("datadog.MethodSample"); + long methodSampleCount = methodSamples.getAggregate(Aggregators.count()).longValue(); + assertTrue(methodSampleCount > 0, "Expected MethodSample events outside the parked interval"); + + Map counters = profiler.getDebugCounters(); + if (counters.containsKey("wc_signals_skipped_parked")) { + assertTrue( + counters.get("wc_signals_skipped_parked") > 0, + "Expected wc_signals_skipped_parked > 0"); + } + } + + /** Configures wall-clock profiling with precheck disabled to isolate parked-flag behavior. */ + @Override + protected String getProfilerCommand() { + return "wall=1ms,filter=0,wallprecheck=false"; + } +} diff --git a/ddprof-test/src/test/java/com/datadoghq/profiler/wallclock/WallclockMitigationsCombinedTest.java b/ddprof-test/src/test/java/com/datadoghq/profiler/wallclock/WallclockMitigationsCombinedTest.java new file mode 100644 index 000000000..2f3a8aefc --- /dev/null +++ b/ddprof-test/src/test/java/com/datadoghq/profiler/wallclock/WallclockMitigationsCombinedTest.java @@ -0,0 +1,122 @@ +package com.datadoghq.profiler.wallclock; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.datadoghq.profiler.AbstractProfilerTest; +import com.datadoghq.profiler.Platform; +import org.junit.jupiter.api.Assumptions; +import org.junit.jupiter.api.Test; +import org.openjdk.jmc.common.item.Aggregators; +import org.openjdk.jmc.common.item.IItemCollection; + +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Combined Approach A + B integration test: + * - Approach A (precheck): sleeping thread should be skipped. + * - Approach B (park flag): parked runnable thread should be skipped and produce TaskBlock. + * - Correctness guard: runnable thread still produces MethodSample events. + */ +public class WallclockMitigationsCombinedTest extends AbstractProfilerTest { + + /** Verifies Approach A and B counters/events can be observed concurrently. */ + @Test + public void precheckAndParkSuppressionWorkTogether() throws Exception { + Assumptions.assumeTrue(!Platform.isJ9()); + Assumptions.assumeTrue( + Platform.isJavaVersionAtLeast(11), + "Sleeping-state precheck assertions are stable on JDK 11+"); + + CountDownLatch ready = new CountDownLatch(3); + AtomicBoolean stop = new AtomicBoolean(false); + + Thread sleeping = + new Thread( + () -> { + registerCurrentThreadForWallClockProfiling(); + ready.countDown(); + try { + Thread.sleep(280); + } catch (InterruptedException ignored) { + } + }, + "combined-sleeping"); + + Thread parkedBusy = + new Thread( + () -> { + registerCurrentThreadForWallClockProfiling(); + long spanId = 0x1111L; + long rootSpanId = 0x2222L; + profiler.setContext(rootSpanId, spanId, 0, 0); + ready.countDown(); + profiler.parkEnter(spanId, rootSpanId); + long parkedUntil = System.nanoTime() + 280_000_000L; + while (System.nanoTime() < parkedUntil) { + // Runnable while flagged parked. + } + profiler.parkExit(System.identityHashCode(this), 0L); + profiler.clearContext(); + }, + "combined-parked"); + + Thread runnable = + new Thread( + () -> { + registerCurrentThreadForWallClockProfiling(); + ready.countDown(); + while (!stop.get()) { + // keep runnable + } + }, + "combined-runnable"); + + sleeping.setDaemon(true); + parkedBusy.setDaemon(true); + runnable.setDaemon(true); + sleeping.start(); + parkedBusy.start(); + runnable.start(); + + ready.await(); + Thread.sleep(350); + stop.set(true); + + sleeping.interrupt(); + sleeping.join(1000); + parkedBusy.join(1000); + runnable.join(1000); + + stopProfiler(); + + IItemCollection taskBlocks = verifyEvents("datadog.TaskBlock"); + assertTrue( + taskBlocks.getAggregate(Aggregators.count()).longValue() > 0, + "Expected TaskBlock events from parked interval"); + + IItemCollection methodSamples = verifyEvents("datadog.MethodSample"); + assertTrue( + methodSamples.getAggregate(Aggregators.count()).longValue() > 0, + "Expected runnable MethodSample events while mitigations are enabled"); + + Map counters = profiler.getDebugCounters(); + if (counters.containsKey("wc_signals_skipped_sleeping")) { + assertTrue( + counters.get("wc_signals_skipped_sleeping") > 0, + "Expected sleeping precheck counter to increase"); + } + if (counters.containsKey("wc_signals_skipped_parked")) { + assertTrue( + counters.get("wc_signals_skipped_parked") > 0, + "Expected parked suppression counter to increase"); + } + } + + /** Enables wall-clock profiling with default precheck behavior for combined assertions. */ + @Override + protected String getProfilerCommand() { + return "wall=1ms,filter=0"; + } +} From c45d4d00b92b35a98e3f0d3cbe1eb16d629fc5f5 Mon Sep 17 00:00:00 2001 From: Paul Fournillon Date: Thu, 30 Apr 2026 15:02:36 +0200 Subject: [PATCH 14/14] fix --- ddprof-lib/src/main/cpp/counters.h | 1 + ddprof-lib/src/main/cpp/flightRecorder.cpp | 1 + ddprof-lib/src/main/cpp/jfrMetadata.cpp | 4 +++- ddprof-lib/src/main/cpp/thread.h | 3 ++- ddprof-lib/src/main/cpp/threadFilter.cpp | 4 ---- ddprof-lib/src/main/cpp/wallClock.cpp | 11 ++++++----- ddprof-lib/src/main/cpp/wallClock.h | 10 +++++++--- ddprof-lib/src/test/cpp/park_state_ut.cpp | 6 +++--- 8 files changed, 23 insertions(+), 17 deletions(-) diff --git a/ddprof-lib/src/main/cpp/counters.h b/ddprof-lib/src/main/cpp/counters.h index 63c18bd35..721d82b5a 100644 --- a/ddprof-lib/src/main/cpp/counters.h +++ b/ddprof-lib/src/main/cpp/counters.h @@ -62,6 +62,7 @@ X(SKIPPED_WALLCLOCK_UNWINDS, "skipped_wallclock_unwinds") \ X(WC_SIGNAL_SKIPPED_SLEEPING, "wc_signals_skipped_sleeping") \ X(WC_SIGNAL_SKIPPED_PARKED, "wc_signals_skipped_parked") \ + X(WC_SIGNAL_QUEUE_FULL, "wc_signals_queue_full") \ X(UNWINDING_TIME_ASYNC, "unwinding_ticks_async") \ X(UNWINDING_TIME_JVMTI, "unwinding_ticks_jvmti") \ X(CALLTRACE_STORAGE_DROPPED, "calltrace_storage_dropped_traces") \ diff --git a/ddprof-lib/src/main/cpp/flightRecorder.cpp b/ddprof-lib/src/main/cpp/flightRecorder.cpp index 228f4ac32..33b59e5c3 100644 --- a/ddprof-lib/src/main/cpp/flightRecorder.cpp +++ b/ddprof-lib/src/main/cpp/flightRecorder.cpp @@ -1535,6 +1535,7 @@ void Recording::recordWallClockEpoch(Buffer *buf, WallClockEpochEvent *event) { buf->putVar64(event->_num_failed_samples); buf->putVar64(event->_num_exited_threads); buf->putVar64(event->_num_permission_denied); + buf->putVar64(event->_num_skipped_sleeping); writeEventSizePrefix(buf, start); flushIfNeeded(buf); } diff --git a/ddprof-lib/src/main/cpp/jfrMetadata.cpp b/ddprof-lib/src/main/cpp/jfrMetadata.cpp index b5c0fffa7..229bccd27 100644 --- a/ddprof-lib/src/main/cpp/jfrMetadata.cpp +++ b/ddprof-lib/src/main/cpp/jfrMetadata.cpp @@ -153,7 +153,9 @@ void JfrMetadata::initialize( << field("numExitedThreads", T_INT, "Number of Exited Threads Before Handling Signal") << field("numPermissionDenied", T_INT, - "Number of Permission Denied Errors")) + "Number of Permission Denied Errors") + << field("numSkippedSleepingPrecheck", T_INT, + "Signals Skipped Due to Sleeping Precheck")) << (type("datadog.ObjectSample", T_ALLOC, "Allocation sample") << category("Datadog", "Profiling") diff --git a/ddprof-lib/src/main/cpp/thread.h b/ddprof-lib/src/main/cpp/thread.h index 36d16361c..a5e56fc2b 100644 --- a/ddprof-lib/src/main/cpp/thread.h +++ b/ddprof-lib/src/main/cpp/thread.h @@ -97,10 +97,11 @@ class ProfiledThread : public ThreadLocalData { _otel_ctx_initialized(false), _crash_protection_active(false), _otel_ctx_record{}, _otel_tag_encodings{}, _otel_local_root_span_id(0) {}; - virtual ~ProfiledThread() { } + virtual ~ProfiledThread() {} void releaseFromBuffer(); public: static ProfiledThread *forTid(int tid) { return new ProfiledThread(-1, tid); } + static ProfiledThread *inBuffer(int buffer_pos) { return new ProfiledThread(buffer_pos, 0); } diff --git a/ddprof-lib/src/main/cpp/threadFilter.cpp b/ddprof-lib/src/main/cpp/threadFilter.cpp index b0457ffc3..e1141733c 100644 --- a/ddprof-lib/src/main/cpp/threadFilter.cpp +++ b/ddprof-lib/src/main/cpp/threadFilter.cpp @@ -328,10 +328,6 @@ void ThreadFilter::collectWithState(std::vector& entries) const { } } } - - if (entries.capacity() > entries.size() * 2) { - entries.shrink_to_fit(); - } } void ThreadFilter::init(const char* filter) { diff --git a/ddprof-lib/src/main/cpp/wallClock.cpp b/ddprof-lib/src/main/cpp/wallClock.cpp index ef4bba493..cbf1d4dc3 100644 --- a/ddprof-lib/src/main/cpp/wallClock.cpp +++ b/ddprof-lib/src/main/cpp/wallClock.cpp @@ -196,7 +196,8 @@ void WallClockASGCT::timerLoop() { } }; - auto sampleThreads = [&](ThreadEntry entry, int& num_failures, int& threads_already_exited, int& permission_denied) { + auto sampleThreads = [&](ThreadEntry entry, int& num_failures, int& threads_already_exited, + int& permission_denied, u32& num_skipped_sleeping) { if (_precheck && entry.vm_thread != nullptr) { OSThreadState state = entry.vm_thread->osThreadState(); // SLEEPING: Thread.sleep() on JDK < 21. @@ -205,6 +206,7 @@ void WallClockASGCT::timerLoop() { // time-based sleeping with no useful profiling signal. if (state == OSThreadState::SLEEPING || state == OSThreadState::CONDVAR_WAIT) { Counters::increment(WC_SIGNAL_SKIPPED_SLEEPING); + num_skipped_sleeping++; return false; } } @@ -215,11 +217,10 @@ void WallClockASGCT::timerLoop() { if (errno == ESRCH) { threads_already_exited++; } else if (errno == EPERM) { - permission_denied++; - } else if (errno == EAGAIN) { - // Signal queue limit (RLIMIT_SIGPENDING) reached; signal was not - // delivered — count as missed sample. permission_denied++; + } else if (errno == EAGAIN) { + // Signal queue limit (RLIMIT_SIGPENDING) reached; not a permission error. + Counters::increment(WC_SIGNAL_QUEUE_FULL); } else { Log::debug("unexpected error %s", strerror(errno)); } diff --git a/ddprof-lib/src/main/cpp/wallClock.h b/ddprof-lib/src/main/cpp/wallClock.h index ffe3523e8..a03f7cbd2 100644 --- a/ddprof-lib/src/main/cpp/wallClock.h +++ b/ddprof-lib/src/main/cpp/wallClock.h @@ -81,16 +81,20 @@ class BaseWallClock : public Engine { int num_failures = 0; int threads_already_exited = 0; int permission_denied = 0; + u32 num_skipped_sleeping = 0; + u32 num_successful_samples = 0; std::vector sample = reservoir.sample(threads); for (ThreadType thread : sample) { - if (!sampleThreads(thread, num_failures, threads_already_exited, permission_denied)) { - continue; + if (sampleThreads(thread, num_failures, threads_already_exited, permission_denied, + num_skipped_sleeping)) { + num_successful_samples++; } } epoch.updateNumSamplableThreads(threads.size()); epoch.updateNumFailedSamples(num_failures); - epoch.updateNumSuccessfulSamples(sample.size() - num_failures); + epoch.updateNumSuccessfulSamples(num_successful_samples); + epoch.updateNumSkippedSleeping(num_skipped_sleeping); epoch.updateNumExitedThreads(threads_already_exited); epoch.updateNumPermissionDenied(permission_denied); u64 endTime = TSC::ticks(); diff --git a/ddprof-lib/src/test/cpp/park_state_ut.cpp b/ddprof-lib/src/test/cpp/park_state_ut.cpp index e65e1a173..4fc472607 100644 --- a/ddprof-lib/src/test/cpp/park_state_ut.cpp +++ b/ddprof-lib/src/test/cpp/park_state_ut.cpp @@ -18,7 +18,7 @@ #include "thread.h" TEST(ProfiledThreadParkStateTest, ParkFlagLifecycle) { - ProfiledThread* thread = ProfiledThread::forTid(12345); + ProfiledThread *thread = ProfiledThread::forTid(12345); EXPECT_FALSE(thread->isParkedForWallclock()); @@ -38,8 +38,8 @@ TEST(ProfiledThreadParkStateTest, ParkFlagLifecycle) { } TEST(ProfiledThreadParkStateTest, ParkEnterSnapshotsTagEncodings) { - ProfiledThread* thread = ProfiledThread::forTid(12346); - u32* tags = thread->getOtelTagEncodingsPtr(); + ProfiledThread *thread = ProfiledThread::forTid(12346); + u32 *tags = thread->getOtelTagEncodingsPtr(); tags[0] = 11; tags[1] = 22; tags[2] = 33;