Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
ff73e50
feat: native socket I/O tracking via PLT hooks
jbachorik Apr 20, 2026
00005c2
ci: retrigger CI
jbachorik Apr 20, 2026
f6820d4
ci: retrigger after ready-for-review
jbachorik Apr 20, 2026
fdd82ac
ci: trigger after rebase to jb/native_allocs
jbachorik Apr 20, 2026
013dd0b
fix: restore 'bytesTransferred' JFR field name
jbachorik Apr 20, 2026
a336f2f
debug: TEST_LOG diagnostics for native socket PLT hooks
jbachorik Apr 20, 2026
f40db52
fix: use writeCurrentContext in recordNativeSocketSample
jbachorik Apr 20, 2026
b80da0b
fix: unblock NativeSocket tests on J9 and HotSpot
jbachorik Apr 21, 2026
41db008
feat: natsock=INTERVAL to override initial sampling period
jbachorik Apr 21, 2026
a31c76c
fix: stabilize NativeSocketRemoteAddressTest with natsock=100us
jbachorik Apr 21, 2026
c347315
test: raise sampling rate to 100us across natsock tests
jbachorik Apr 21, 2026
7d612ca
debug: TEST_LOG around memleak HeapUsage write path
jbachorik Apr 21, 2026
79d6ddb
debug: TEST_LOG each branch of LivenessTracker::initialize
jbachorik Apr 21, 2026
cecb1bd
fix: apply _record_heap_usage on every LivenessTracker init
jbachorik Apr 21, 2026
f0c86f0
fix: restore async Java stack walk for malloc/socket with cstack=dwar…
jbachorik Apr 28, 2026
4fcf385
fix: nativesocket review fixes + extend to musl
jbachorik Apr 29, 2026
2bed058
fix: ASSERT_TRUE(e) → static_cast<bool> for non-const Error::operator…
jbachorik Apr 29, 2026
5378130
fix: fall back to RTLD_DEFAULT when RTLD_NEXT returns NULL on musl
jbachorik Apr 29, 2026
ae46f28
fix: encapsulate _orig_* fns; AF_UNIX, var64 tid, more tests
jbachorik Apr 30, 2026
1a3d314
fix: muse review fixes — overflow, null guards, op labels, eviction
jbachorik Apr 30, 2026
80d35d8
fix: split chained assignment between incompatible fn-pointer types
jbachorik Apr 30, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions ddprof-lib/src/main/cpp/arguments.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,18 @@ Error Arguments::parse(const char *args) {
msg = "nativemem must be >= 0";
}

CASE("natsock")
if (value != NULL) {
_nativesocket_interval = parseUnits(value, NANOS);
if (_nativesocket_interval < 0) {
msg = "natsock interval must be >= 0";
} else {
_nativesocket = true;
}
} else {
_nativesocket = true;
}

DEFAULT()
if (_unknown_arg == NULL)
_unknown_arg = arg;
Expand Down
11 changes: 8 additions & 3 deletions ddprof-lib/src/main/cpp/arguments.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,10 @@ enum EventMask {
EM_LOCK = 4,
EM_WALL = 8,
EM_NATIVEMEM = 16,
EM_METHOD_TRACE = 32
EM_METHOD_TRACE = 32,
EM_NATIVESOCKET = 64
};
constexpr int EVENT_MASK_SIZE = 6;
constexpr int EVENT_MASK_SIZE = 7;

struct StackWalkFeatures {
// Deprecated stack recovery techniques used to workaround AsyncGetCallTrace flaws
Expand Down Expand Up @@ -192,6 +193,8 @@ class Arguments {
bool _lightweight;
bool _enable_method_cleanup;
bool _remote_symbolication; // Enable remote symbolication for native frames
bool _nativesocket;
long _nativesocket_interval; // initial sampling period in nanoseconds; 0 = engine default

Arguments(bool persistent = false)
: _buf(NULL),
Expand Down Expand Up @@ -227,7 +230,9 @@ class Arguments {
_context_attributes({}),
_lightweight(false),
_enable_method_cleanup(true),
_remote_symbolication(false) {}
_remote_symbolication(false),
_nativesocket(false),
_nativesocket_interval(0) {}

~Arguments();

Expand Down
13 changes: 12 additions & 1 deletion ddprof-lib/src/main/cpp/codeCache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -350,13 +350,24 @@ void CodeCache::addImport(void **entry, const char *name) {
case 'r':
if (strcmp(name, "realloc") == 0) {
saveImport(im_realloc, entry);
} else if (strcmp(name, "recv") == 0) {
saveImport(im_recv, entry);
} else if (strcmp(name, "read") == 0) {
saveImport(im_read, entry);
}
break;
case 's':
if (strcmp(name, "sigaction") == 0) {
if (strcmp(name, "send") == 0) {
saveImport(im_send, entry);
} else if (strcmp(name, "sigaction") == 0) {
saveImport(im_sigaction, entry);
}
break;
case 'w':
if (strcmp(name, "write") == 0) {
saveImport(im_write, entry);
}
break;
}
}

Expand Down
4 changes: 4 additions & 0 deletions ddprof-lib/src/main/cpp/codeCache.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ enum ImportId {
im_posix_memalign,
im_aligned_alloc,
im_sigaction,
im_send,
im_recv,
im_write,
im_read,
NUM_IMPORTS
};

Expand Down
13 changes: 13 additions & 0 deletions ddprof-lib/src/main/cpp/event.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,19 @@ class MallocEvent : public Event {
MallocEvent() : Event(), _start_time(0), _address(0), _size(0), _weight(1.0f) {}
};

class NativeSocketEvent : public Event {
public:
u64 _start_time; // TSC ticks at call entry
u64 _end_time; // TSC ticks at call return
u8 _operation; // 0 = SEND, 1 = RECV, 2 = WRITE, 3 = READ
char _remote_addr[64]; // "ip:port" null-terminated string
u64 _bytes; // bytes transferred (return value of send/recv/write/read)
float _weight; // inverse-transform sample weight

NativeSocketEvent() : Event(), _start_time(0), _end_time(0), _operation(0),
_bytes(0), _weight(1.0f) { _remote_addr[0] = '\0'; }
};

class WallClockEpochEvent {
public:
bool _dirty;
Expand Down
21 changes: 21 additions & 0 deletions ddprof-lib/src/main/cpp/flightRecorder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1600,6 +1600,24 @@ void Recording::recordMallocSample(Buffer *buf, int tid, u64 call_trace_id,
flushIfNeeded(buf);
}

void Recording::recordNativeSocketSample(Buffer *buf, int tid, u64 call_trace_id,
NativeSocketEvent *event) {
int start = buf->skip(1);
buf->putVar64(T_NATIVE_SOCKET);
buf->putVar64(event->_start_time);
buf->putVar64(tid);
buf->putVar64(call_trace_id);
buf->putVar64(event->_end_time - event->_start_time);
static const char* const kOpNames[] = {"SEND", "RECV", "WRITE", "READ"};
buf->putUtf8(event->_operation < 4 ? kOpNames[event->_operation] : "UNKNOWN");
buf->putUtf8(event->_remote_addr);
buf->putVar64(event->_bytes);
buf->putFloat(event->_weight);
writeCurrentContext(buf);
writeEventSizePrefix(buf, start);
flushIfNeeded(buf);
}

void Recording::recordHeapLiveObject(Buffer *buf, int tid, u64 call_trace_id,
ObjectLivenessEvent *event) {
int start = buf->skip(1);
Expand Down Expand Up @@ -1845,6 +1863,9 @@ void FlightRecorder::recordEvent(int lock_index, int tid, u64 call_trace_id,
case BCI_NATIVE_MALLOC:
rec->recordMallocSample(buf, tid, call_trace_id, (MallocEvent *)event);
break;
case BCI_NATIVE_SOCKET:
rec->recordNativeSocketSample(buf, tid, call_trace_id, (NativeSocketEvent *)event);
break;
}
rec->flushIfNeeded(buf);
rec->addThread(lock_index, tid);
Expand Down
2 changes: 2 additions & 0 deletions ddprof-lib/src/main/cpp/flightRecorder.h
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,8 @@ class Recording {
AllocEvent *event);
void recordMallocSample(Buffer *buf, int tid, u64 call_trace_id,
MallocEvent *event);
void recordNativeSocketSample(Buffer *buf, int tid, u64 call_trace_id,
NativeSocketEvent *event);
void recordHeapLiveObject(Buffer *buf, int tid, u64 call_trace_id,
ObjectLivenessEvent *event);
void recordMonitorBlocked(Buffer *buf, int tid, u64 call_trace_id,
Expand Down
26 changes: 25 additions & 1 deletion ddprof-lib/src/main/cpp/hotspot/hotspotSupport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1106,7 +1106,31 @@ int HotspotSupport::walkJavaStack(StackWalkRequest& request) {
int java_frames = 0;
if (features.mixed) {
java_frames = walkVM(ucontext, frames, max_depth, features, eventTypeFromBCI(request.event_type), lock_index, truncated);
} else if (request.event_type == BCI_CPU || request.event_type == BCI_WALL || request.event_type == BCI_NATIVE_MALLOC) {
} else if (request.event_type == BCI_NATIVE_MALLOC || request.event_type == BCI_NATIVE_SOCKET) {
if (cstack >= CSTACK_VM) {
java_frames = walkVM(ucontext, frames, max_depth, features, eventTypeFromBCI(request.event_type), lock_index, truncated);
} else {
AsyncSampleMutex mutex(ProfiledThread::currentSignalSafe());
if (mutex.acquired()) {
java_frames = getJavaTraceAsync(ucontext, frames, max_depth, java_ctx, truncated);
if (java_frames > 0 && java_ctx->pc != NULL && VMStructs::hasMethodStructs()) {
VMNMethod* nmethod = CodeHeap::findNMethod(java_ctx->pc);
if (nmethod != NULL) {
fillFrameTypes(frames, java_frames, nmethod);
}
}
}
if (java_frames > 0 && VM::hotspot_version() >= 21 && java_frames < max_depth) {
VMThread* carrier = VMThread::current();
if (carrier != nullptr && carrier->isCarryingVirtualThread()) {
frames[java_frames].bci = BCI_NATIVE_FRAME;
frames[java_frames].method_id = (jmethodID) "JVM Continuation";
LP64_ONLY(frames[java_frames].padding = 0;)
java_frames++;
}
}
}
} else if (request.event_type == BCI_CPU || request.event_type == BCI_WALL) {
if (cstack >= CSTACK_VM) {
java_frames = walkVM(ucontext, frames, max_depth, features, eventTypeFromBCI(request.event_type), lock_index, truncated);
} else {
Expand Down
14 changes: 14 additions & 0 deletions ddprof-lib/src/main/cpp/jfrMetadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,20 @@ void JfrMetadata::initialize(
<< field("localRootSpanId", T_LONG, "Local Root Span ID") ||
contextAttributes)

<< (type("datadog.NativeSocketEvent", T_NATIVE_SOCKET, "Native Socket I/O")
<< category("Datadog", "Profiling")
<< field("startTime", T_LONG, "Start Time", F_TIME_TICKS)
<< field("eventThread", T_THREAD, "Event Thread", F_CPOOL)
<< field("stackTrace", T_STACK_TRACE, "Stack Trace", F_CPOOL)
<< field("duration", T_LONG, "Duration", F_DURATION_TICKS)
<< field("operation", T_STRING, "Operation")
<< field("remoteAddress", T_STRING, "Remote Address")
<< field("bytesTransferred", T_LONG, "Bytes Transferred", F_BYTES)
<< field("weight", T_FLOAT, "Sample weight")
<< field("spanId", T_LONG, "Span ID")
<< field("localRootSpanId", T_LONG, "Local Root Span ID") ||
contextAttributes)

<< (type("jdk.OSInformation", T_OS_INFORMATION, "OS Information")
<< category("Operating System")
<< field("startTime", T_LONG, "Start Time", F_TIME_TICKS)
Expand Down
1 change: 1 addition & 0 deletions ddprof-lib/src/main/cpp/jfrMetadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ enum JfrType {
T_DATADOG_COUNTER = 125,
T_UNWIND_FAILURE = 126,
T_MALLOC = 127,
T_NATIVE_SOCKET = 128,
T_ANNOTATION = 200,
T_LABEL = 201,
T_CATEGORY = 202,
Expand Down
3 changes: 2 additions & 1 deletion ddprof-lib/src/main/cpp/jvmSupport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ int JVMSupport::walkJavaStack(StackWalkRequest& request) {
} else if (VM::isOpenJ9() || VM::isZing()) {
assert(request.event_type == BCI_CPU ||
request.event_type == BCI_WALL ||
request.event_type == BCI_NATIVE_MALLOC);
request.event_type == BCI_NATIVE_MALLOC ||
request.event_type == BCI_NATIVE_SOCKET);
return asyncGetCallTrace(request.frames, request.max_depth, request.ucontext);
}
assert(false && "Unsupported JVM");
Expand Down
27 changes: 26 additions & 1 deletion ddprof-lib/src/main/cpp/libraryPatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

#include "codeCache.h"
#include "spinLock.h"
#include <atomic>

#ifdef __linux__

Expand All @@ -28,15 +29,36 @@ class LibraryPatcher {
static PatchEntry _sigaction_entries[MAX_NATIVE_LIBS];
static int _sigaction_size;

// Separate tracking for socket (send/recv/write/read) patches.
// Each library can contribute up to 4 GOT slots (send/recv/write/read).
static PatchEntry _socket_entries[4 * MAX_NATIVE_LIBS];
static int _socket_size;

static void patch_library_unlocked(CodeCache* lib);
static void patch_pthread_create();
static void patch_pthread_setspecific();
static void patch_sigaction_in_library(CodeCache* lib);
public:
// True while socket hooks are installed; read by Profiler::dlopen_hook
// to decide whether to re-patch after a new library is loaded.
// Set to true after the first batch of libraries is patched in patch_socket_functions().
// Libraries loaded after profiler start are picked up on the next dlopen_hook call,
// which calls install_socket_hooks() to patch them if _socket_active is true.
// Low-probability race: stop() is called only on JVM exit; atomic<bool> is zero-cost insurance.
static std::atomic<bool> _socket_active;
static void initialize();
static void patch_libraries();
static void unpatch_libraries();
static void patch_sigaction();
static bool patch_socket_functions();
static void unpatch_socket_functions();
// Called from Profiler::dlopen_hook after a new library is loaded.
// No-op when socket hooks are not active.
static inline void install_socket_hooks() {
if (_socket_active.load(std::memory_order_acquire)) {
patch_socket_functions();
}
}
};

#else
Expand All @@ -47,8 +69,11 @@ class LibraryPatcher {
static void patch_libraries() { }
static void unpatch_libraries() { }
static void patch_sigaction() { }
static bool patch_socket_functions() { return false; }
static void unpatch_socket_functions() { }
static void install_socket_hooks() { }
};

#endif

#endif // _LIBRARYPATCHER_H
#endif // _LIBRARYPATCHER_H
Loading
Loading