Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions ddprof-lib/src/main/cpp/arguments.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,11 @@ Error Arguments::parse(const char *args) {
_remote_symbolication = true;
}

CASE("wallprecheck")
if (value != NULL) {
_wall_precheck = strcmp(value, "false") != 0 && strcmp(value, "0") != 0;
}

CASE("wallsampler")
if (value != NULL) {
switch (value[0]) {
Expand Down
2 changes: 2 additions & 0 deletions ddprof-lib/src/main/cpp/arguments.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ class Arguments {
long _cpu;
long _wall;
bool _wall_collapsing;
bool _wall_precheck;
int _wall_threads_per_tick;
WallclockSampler _wallclock_sampler;
long _memory;
Expand Down Expand Up @@ -204,6 +205,7 @@ class Arguments {
_cpu(-1),
_wall(-1),
_wall_collapsing(false),
_wall_precheck(true),
_wall_threads_per_tick(DEFAULT_WALL_THREADS_PER_TICK),
_wallclock_sampler(ASGCT),
_memory(-1),
Expand Down
3 changes: 3 additions & 0 deletions ddprof-lib/src/main/cpp/counters.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@
X(AGCT_NATIVE_NO_JAVA_CONTEXT, "agct_native_no_java_context") \
X(AGCT_BLOCKED_IN_VM, "agct_blocked_in_vm") \
X(SKIPPED_WALLCLOCK_UNWINDS, "skipped_wallclock_unwinds") \
X(WC_SIGNAL_SKIPPED_SLEEPING, "wc_signals_skipped_sleeping") \
X(WC_SIGNAL_SKIPPED_PARKED, "wc_signals_skipped_parked") \
X(WC_SIGNAL_QUEUE_FULL, "wc_signals_queue_full") \
X(UNWINDING_TIME_ASYNC, "unwinding_ticks_async") \
X(UNWINDING_TIME_JVMTI, "unwinding_ticks_jvmti") \
X(CALLTRACE_STORAGE_DROPPED, "calltrace_storage_dropped_traces") \
Expand Down
16 changes: 15 additions & 1 deletion ddprof-lib/src/main/cpp/event.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,13 @@ class WallClockEpochEvent {
u32 _num_failed_samples;
u32 _num_exited_threads;
u32 _num_permission_denied;
u32 _num_skipped_sleeping;

WallClockEpochEvent(u64 start_time)
: _dirty(false), _start_time(start_time), _duration_millis(0),
_num_samplable_threads(0), _num_successful_samples(0),
_num_failed_samples(0), _num_exited_threads(0),
_num_permission_denied(0) {}
_num_permission_denied(0), _num_skipped_sleeping(0) {}

bool hasChanged() { return _dirty; }

Expand Down Expand Up @@ -153,6 +154,10 @@ class WallClockEpochEvent {
}
}

void updateNumSkippedSleeping(u32 n) {
if (_num_skipped_sleeping != n) { _dirty = true; _num_skipped_sleeping = n; }
}

void endEpoch(u64 millis) { _duration_millis = millis; }

void clean() { _dirty = false; }
Expand Down Expand Up @@ -184,4 +189,13 @@ typedef struct QueueTimeEvent {
u32 _queueLength;
} QueueTimeEvent;

typedef struct TaskBlockEvent {
u64 _start;
u64 _end;
u64 _blocker;
u64 _unblockingSpanId;
/** Span IDs and tag encodings for JFR (park exit uses snapshot from park enter). */
Context _ctx;
} TaskBlockEvent;

#endif // _EVENT_H
26 changes: 26 additions & 0 deletions ddprof-lib/src/main/cpp/flightRecorder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1535,6 +1535,7 @@ void Recording::recordWallClockEpoch(Buffer *buf, WallClockEpochEvent *event) {
buf->putVar64(event->_num_failed_samples);
buf->putVar64(event->_num_exited_threads);
buf->putVar64(event->_num_permission_denied);
buf->putVar64(event->_num_skipped_sleeping);
writeEventSizePrefix(buf, start);
flushIfNeeded(buf);
}
Expand Down Expand Up @@ -1570,6 +1571,19 @@ void Recording::recordQueueTime(Buffer *buf, int tid, QueueTimeEvent *event) {
flushIfNeeded(buf);
}

void Recording::recordTaskBlock(Buffer *buf, int tid, TaskBlockEvent *event) {
int start = buf->skip(1);
buf->putVar64(T_TASK_BLOCK);
buf->putVar64(event->_start);
buf->putVar64(event->_end - event->_start);
buf->putVar64(tid);
writeContextSnapshot(buf, event->_ctx);
buf->putVar64(event->_blocker);
buf->putVar64(event->_unblockingSpanId);
writeEventSizePrefix(buf, start);
flushIfNeeded(buf);
}

void Recording::recordAllocation(RecordingBuffer *buf, int tid,
u64 call_trace_id, AllocEvent *event) {
int start = buf->skip(1);
Expand Down Expand Up @@ -1789,6 +1803,18 @@ void FlightRecorder::recordQueueTime(int lock_index, int tid,
}
}

void FlightRecorder::recordTaskBlock(int lock_index, int tid,
TaskBlockEvent *event) {
OptionalSharedLockGuard locker(&_rec_lock);
if (locker.ownsLock()) {
Recording* rec = _rec;
if (rec != nullptr) {
Buffer *buf = rec->buffer(lock_index);
rec->recordTaskBlock(buf, tid, event);
}
}
}

void FlightRecorder::recordDatadogSetting(int lock_index, int length,
const char *name, const char *value,
const char *unit) {
Expand Down
2 changes: 2 additions & 0 deletions ddprof-lib/src/main/cpp/flightRecorder.h
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ class Recording {
void recordWallClockEpoch(Buffer *buf, WallClockEpochEvent *event);
void recordTraceRoot(Buffer *buf, int tid, TraceRootEvent *event);
void recordQueueTime(Buffer *buf, int tid, QueueTimeEvent *event);
void recordTaskBlock(Buffer *buf, int tid, TaskBlockEvent *event);
void recordAllocation(RecordingBuffer *buf, int tid, u64 call_trace_id,
AllocEvent *event);
void recordMallocSample(Buffer *buf, int tid, u64 call_trace_id,
Expand Down Expand Up @@ -347,6 +348,7 @@ class FlightRecorder {
void wallClockEpoch(int lock_index, WallClockEpochEvent *event);
void recordTraceRoot(int lock_index, int tid, TraceRootEvent *event);
void recordQueueTime(int lock_index, int tid, QueueTimeEvent *event);
void recordTaskBlock(int lock_index, int tid, TaskBlockEvent *event);

bool active() const { return _rec != NULL; }

Expand Down
9 changes: 9 additions & 0 deletions ddprof-lib/src/main/cpp/hotspot/vmStructs.inline.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,21 @@

#include "hotspot/vmStructs.h"
#include "jvmThread.h"
#include "vmEntry.h"

VMThread* VMThread::current() {
// JVMThread::current() is the native thread self pointer. On OpenJ9/Zing it
// is not a HotSpot JavaThread*; only HotSpot may reinterpret it as VMThread*.
if (!VM::isHotspot() || JVMThread::current() == nullptr) {
return nullptr;
}
return VMThread::cast(JVMThread::current());
}

VMThread* VMThread::fromJavaThread(JNIEnv* env, jthread thread) {
if (!VM::isHotspot()) {
return nullptr;
}
assert(_eetop != nullptr);
if (_eetop != nullptr) {
return VMThread::cast((void*)env->GetLongField(thread, _eetop));
Expand Down
78 changes: 74 additions & 4 deletions ddprof-lib/src/main/cpp/javaApi.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
#include "counters.h"
#include "common.h"
#include "engine.h"
#include "hotspot/vmStructs.h"
#include "hotspot/vmStructs.inline.h"
#include "incbin.h"
#include "jvmThread.h"
#include "os.h"
Expand Down Expand Up @@ -150,15 +150,21 @@ JavaCritical_com_datadoghq_profiler_JavaProfiler_filterThreadAdd0() {

int slot_id = current->filterSlotId();
if (unlikely(slot_id == -1)) {
// Thread doesn't have a slot ID yet (e.g., main thread), so register it
// Happens when we are not enabled before thread start
// Thread doesn't have a slot ID yet (e.g., main thread or profiler started
// after thread creation). Register now.
slot_id = thread_filter->registerThread();
current->setFilterSlotId(slot_id);
}

if (unlikely(slot_id == -1)) {
return; // Failed to register thread
}
// Refresh HotSpot VMThread* for wall thread-filter precheck (vmStructs OS state).
// HotSpot only: VMThread::current() asserts VM::isHotspot(). OpenJ9/Zing: leave null.
thread_filter->setVMThread(slot_id, VM::isHotspot() ? VMThread::current() : nullptr);
// Refresh ProfiledThread* so wall-clock mitigations can observe per-thread parked state.
// Publish pointer fields before publishing tid via add() to preserve visibility ordering.
thread_filter->setProfiledThread(slot_id, current);
thread_filter->add(tid, slot_id);
}

Expand Down Expand Up @@ -313,6 +319,70 @@ Java_com_datadoghq_profiler_JavaProfiler_recordQueueEnd0(
Profiler::instance()->recordQueueTime(tid, &event);
}

static inline bool exceedsMinTaskBlockDuration(u64 start_ticks, u64 end_ticks) {
static const u64 kMinTaskBlockNanos = 1000000; // 1 ms
u64 min_ticks = (TSC::frequency() * kMinTaskBlockNanos) / 1000000000ULL;
return end_ticks > start_ticks && (end_ticks - start_ticks) >= min_ticks;
}

extern "C" DLLEXPORT void JNICALL
Java_com_datadoghq_profiler_JavaProfiler_recordTaskBlock0(
JNIEnv *env, jclass unused, jlong startTicks, jlong endTicks, jlong spanId,
jlong rootSpanId, jlong blocker, jlong unblockingSpanId) {
int tid = ProfiledThread::currentTid();
if (tid < 0) {
return;
}
if (!exceedsMinTaskBlockDuration(startTicks, endTicks) || spanId == 0) {
return;
}
TaskBlockEvent event{};
event._start = startTicks;
event._end = endTicks;
event._blocker = blocker;
event._unblockingSpanId = unblockingSpanId;
event._ctx = ContextApi::snapshot();
event._ctx.spanId = (u64)spanId;
event._ctx.rootSpanId = (u64)rootSpanId;
Profiler::instance()->recordTaskBlock(tid, &event);
}

extern "C" DLLEXPORT void JNICALL
Java_com_datadoghq_profiler_JavaProfiler_parkEnter0(
JNIEnv *env, jclass unused, jlong spanId, jlong rootSpanId) {
ProfiledThread *current = ProfiledThread::current();
if (current == nullptr) {
return;
}
current->parkEnter(spanId, rootSpanId, TSC::ticks());
}

extern "C" DLLEXPORT void JNICALL
Java_com_datadoghq_profiler_JavaProfiler_parkExit0(
JNIEnv *env, jclass unused, jlong blocker, jlong unblockingSpanId) {
ProfiledThread *current = ProfiledThread::current();
if (current == nullptr) {
return;
}
u64 start_ticks = 0;
Context park_context = {};
if (!current->parkExit(start_ticks, park_context)) {
return;
}
u64 end_ticks = TSC::ticks();
if (!exceedsMinTaskBlockDuration(start_ticks, end_ticks) ||
park_context.spanId == 0) {
return;
}
TaskBlockEvent event{};
event._start = start_ticks;
event._end = end_ticks;
event._blocker = blocker;
event._unblockingSpanId = unblockingSpanId;
event._ctx = park_context;
Profiler::instance()->recordTaskBlock(current->tid(), &event);
}

extern "C" DLLEXPORT jlong JNICALL
Java_com_datadoghq_profiler_JavaProfiler_currentTicks0(JNIEnv *env,
jclass unused) {
Expand Down
15 changes: 14 additions & 1 deletion ddprof-lib/src/main/cpp/jfrMetadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,9 @@ void JfrMetadata::initialize(
<< field("numExitedThreads", T_INT,
"Number of Exited Threads Before Handling Signal")
<< field("numPermissionDenied", T_INT,
"Number of Permission Denied Errors"))
"Number of Permission Denied Errors")
<< field("numSkippedSleepingPrecheck", T_INT,
"Signals Skipped Due to Sleeping Precheck"))

<< (type("datadog.ObjectSample", T_ALLOC, "Allocation sample")
<< category("Datadog", "Profiling")
Expand Down Expand Up @@ -205,6 +207,17 @@ void JfrMetadata::initialize(
<< field("localRootSpanId", T_LONG, "Local Root Span ID") ||
contextAttributes)

<< (type("datadog.TaskBlock", T_TASK_BLOCK, "Task Block")
<< category("Datadog")
<< field("startTime", T_LONG, "Start Time", F_TIME_TICKS)
<< field("duration", T_LONG, "Duration", F_DURATION_TICKS)
<< field("eventThread", T_THREAD, "Event Thread", F_CPOOL)
<< field("spanId", T_LONG, "Span ID")
<< field("localRootSpanId", T_LONG, "Local Root Span ID")
<< field("blocker", T_LONG, "Blocker Identity Hash")
<< field("unblockingSpanId", T_LONG, "Unblocking Span ID") ||
contextAttributes)

<< (type("datadog.HeapUsage", T_HEAP_USAGE, "JVM Heap Usage")
<< category("Datadog")
<< field("startTime", T_LONG, "Start Time", F_TIME_TICKS)
Expand Down
1 change: 1 addition & 0 deletions ddprof-lib/src/main/cpp/jfrMetadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ enum JfrType {
T_DATADOG_COUNTER = 125,
T_UNWIND_FAILURE = 126,
T_MALLOC = 127,
T_TASK_BLOCK = 128,
T_ANNOTATION = 200,
T_LABEL = 201,
T_CATEGORY = 202,
Expand Down
4 changes: 4 additions & 0 deletions ddprof-lib/src/main/cpp/livenessTracker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,10 @@ Error LivenessTracker::initialize(Arguments &args) {
}

if (_initialized) {
// Tracker settings are sticky across recordings. Preserve the historical
// table/config behavior, but allow HeapUsage recording to be enabled later
// (e.g. if an earlier test initialized liveness without ':L').
_record_heap_usage = _record_heap_usage || args._record_heap_usage;
// if the tracker was previously initialized return the stored result for
// consistency this hack also means that if the profiler is started with
// different arguments for liveness tracking those will be ignored it is
Expand Down
18 changes: 18 additions & 0 deletions ddprof-lib/src/main/cpp/profiler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ void Profiler::onThreadStart(jvmtiEnv *jvmti, JNIEnv *jni, jthread thread) {
if (_thread_filter.enabled()) {
int slot_id = _thread_filter.registerThread();
current->setFilterSlotId(slot_id);
// VMThread / vmStructs are HotSpot-only; VMThread::current() asserts VM::isHotspot().
_thread_filter.setVMThread(slot_id, VM::isHotspot() ? VMThread::current() : nullptr);
_thread_filter.setProfiledThread(slot_id, current);
_thread_filter.remove(slot_id); // Remove from filtering initially
}
if (thread != NULL) {
Expand All @@ -95,6 +98,8 @@ void Profiler::onThreadEnd(jvmtiEnv *jvmti, JNIEnv *jni, jthread thread) {
tid = current->tid();

if (_thread_filter.enabled()) {
_thread_filter.setVMThread(slot_id, nullptr);
_thread_filter.setProfiledThread(slot_id, nullptr);
_thread_filter.unregisterThread(slot_id);
current->setFilterSlotId(-1);
}
Expand Down Expand Up @@ -629,6 +634,17 @@ void Profiler::recordQueueTime(int tid, QueueTimeEvent *event) {
_locks[lock_index].unlock();
}

void Profiler::recordTaskBlock(int tid, TaskBlockEvent *event) {
u32 lock_index = getLockIndex(tid);
if (!_locks[lock_index].tryLock() &&
!_locks[lock_index = (lock_index + 1) % CONCURRENCY_LEVEL].tryLock() &&
!_locks[lock_index = (lock_index + 2) % CONCURRENCY_LEVEL].tryLock()) {
return;
}
_jfr.recordTaskBlock(lock_index, tid, event);
_locks[lock_index].unlock();
}

void Profiler::recordExternalSample(u64 weight, int tid, int num_frames,
ASGCT_CallFrame *frames, bool truncated,
jint event_type, Event *event) {
Expand Down Expand Up @@ -1136,6 +1152,8 @@ Error Profiler::start(Arguments &args, bool reset) {
assert(current != nullptr);
int slot_id = _thread_filter.registerThread();
current->setFilterSlotId(slot_id);
_thread_filter.setVMThread(slot_id, VM::isHotspot() ? VMThread::current() : nullptr);
_thread_filter.setProfiledThread(slot_id, current);
_thread_filter.remove(slot_id); // Remove from filtering initially (matches onThreadStart behavior)
}

Expand Down
1 change: 1 addition & 0 deletions ddprof-lib/src/main/cpp/profiler.h
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,7 @@ class alignas(alignof(SpinLock)) Profiler {
void recordWallClockEpoch(int tid, WallClockEpochEvent *event);
void recordTraceRoot(int tid, TraceRootEvent *event);
void recordQueueTime(int tid, QueueTimeEvent *event);
void recordTaskBlock(int tid, TaskBlockEvent *event);
void writeLog(LogLevel level, const char *message);
void writeLog(LogLevel level, const char *message, size_t len);
void writeDatadogProfilerSetting(int tid, int length, const char *name,
Expand Down
3 changes: 3 additions & 0 deletions ddprof-lib/src/main/cpp/thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ void ProfiledThread::releaseFromBuffer() {
_wall_epoch = 0;
_call_trace_id = 0;
_recording_epoch = 0;
__atomic_fetch_and(&_misc_flags, ~FLAG_PARKED, __ATOMIC_RELEASE);
_park_start_ticks = 0;
memset(&_park_context, 0, sizeof(_park_context));
_filter_slot_id = -1;
_init_window = 0;
_unwind_failures.clear();
Expand Down
Loading
Loading