-
Notifications
You must be signed in to change notification settings - Fork 340
Improve structured concurrency instrumentation #11759
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
bde3da3
ce656ac
5cc0be5
fb70e06
829131e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,42 @@ | ||
| package datadog.trace.bootstrap.instrumentation.java.concurrent; | ||
|
|
||
| import static java.lang.invoke.MethodHandles.publicLookup; | ||
| import static java.lang.invoke.MethodType.methodType; | ||
|
|
||
| import java.lang.invoke.MethodHandle; | ||
|
|
||
| /** Helper for the java-concurrent-25.0 {@code StructuredTaskScopeForkInstrumentation}. */ | ||
| public final class StructuredTaskScopeHelper { | ||
| private static final MethodHandle IS_CANCELLED = resolveIsCancelled(); | ||
|
|
||
| private StructuredTaskScopeHelper() {} | ||
|
|
||
| private static MethodHandle resolveIsCancelled() { | ||
| try { | ||
| ClassLoader classLoader = StructuredTaskScopeHelper.class.getClassLoader(); | ||
| Class<?> scopeClass = | ||
| Class.forName("java.util.concurrent.StructuredTaskScope", false, classLoader); | ||
| return publicLookup().findVirtual(scopeClass, "isCancelled", methodType(boolean.class)); | ||
| } catch (Throwable ignored) { | ||
| return null; | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Returns whether the given {@code StructuredTaskScope} has been cancelled. | ||
| * | ||
| * @param scope a {@code java.util.concurrent.StructuredTaskScope} instance. | ||
| * @return {@code true} if the scope is cancelled; {@code false} otherwise, including when {@code | ||
| * isCancelled()} could not be resolved or invoked. | ||
| */ | ||
| public static boolean isCancelled(Object scope) { | ||
| if (IS_CANCELLED == null || scope == null) { | ||
| return false; | ||
| } | ||
| try { | ||
| return (boolean) IS_CANCELLED.invoke(scope); | ||
| } catch (Throwable ignored) { | ||
| return false; | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,66 +1,60 @@ | ||
| package datadog.trace.instrumentation.java.concurrent.structuredconcurrency25; | ||
|
|
||
| import static datadog.environment.JavaVirtualMachine.isJavaVersionAtLeast; | ||
| import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; | ||
| import static datadog.trace.bootstrap.InstrumentationContext.get; | ||
| import static datadog.trace.bootstrap.instrumentation.java.concurrent.AdviceUtils.capture; | ||
| import static net.bytebuddy.matcher.ElementMatchers.isConstructor; | ||
| import static datadog.trace.bootstrap.instrumentation.java.concurrent.AdviceUtils.cancelTask; | ||
| import static net.bytebuddy.matcher.ElementMatchers.takesArgument; | ||
|
|
||
| import com.google.auto.service.AutoService; | ||
| import datadog.trace.agent.tooling.Instrumenter; | ||
| import datadog.trace.agent.tooling.InstrumenterModule; | ||
| import datadog.trace.bootstrap.ContextStore; | ||
| import datadog.trace.bootstrap.instrumentation.java.concurrent.State; | ||
| import datadog.trace.bootstrap.instrumentation.java.concurrent.StructuredTaskScopeHelper; | ||
| import java.util.concurrent.Callable; | ||
| import net.bytebuddy.asm.Advice.OnMethodExit; | ||
| import net.bytebuddy.asm.Advice.Return; | ||
| import net.bytebuddy.asm.Advice.This; | ||
|
|
||
| // WARNING: | ||
| // This instrumentation is tested using smoke tests as instrumented tests cannot run using Java 25. | ||
| // Instrumented tests rely on Spock / Groovy which cannot run using Java 25 due to byte-code | ||
| // compatibility. Check | ||
| // dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-25.0 for this | ||
| // instrumentation test suite. | ||
|
|
||
| /** | ||
| * This instrumentation captures the active span scope at StructuredTaskScope task creation | ||
| * (SubtaskImpl). The scope is then activate and close through the {@link Runnable} instrumentation | ||
| * (SubtaskImpl implementing {@link Runnable}). | ||
| * This instrumentation cancels the continuation captured by {@link | ||
| * StructuredTaskScope25TaskInstrumentation} for a subtask forked into an already-canceled scope. | ||
| * | ||
| * <p>In that case, the subtask's thread is never started, so {@code SubtaskImpl.run()} never runs, | ||
| * and the {@link Runnable} instrumentation never activates/closes the captured continuation. It | ||
| * leads to continuation leak. This instrumentation verifies the subtask was canceled and releases | ||
| * the related continuation. | ||
| */ | ||
| @SuppressWarnings("unused") | ||
| @AutoService(InstrumenterModule.class) | ||
| public class StructuredTaskScope25Instrumentation extends InstrumenterModule.ContextTracking | ||
| public class StructuredTaskScope25Instrumentation | ||
| implements Instrumenter.ForBootstrap, Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice { | ||
|
|
||
| public StructuredTaskScope25Instrumentation() { | ||
| super("java_concurrent", "structured-task-scope", "structured-task-scope-25"); | ||
| } | ||
|
|
||
| @Override | ||
| public String instrumentedType() { | ||
| return "java.util.concurrent.StructuredTaskScopeImpl$SubtaskImpl"; | ||
| } | ||
|
|
||
| @Override | ||
| public boolean isEnabled() { | ||
| return isJavaVersionAtLeast(25) && super.isEnabled(); | ||
| return "java.util.concurrent.StructuredTaskScopeImpl"; | ||
| } | ||
|
|
||
| @Override | ||
| public void methodAdvice(MethodTransformer transformer) { | ||
| transformer.applyAdvice(isConstructor(), getClass().getName() + "$ConstructorAdvice"); | ||
| transformer.applyAdvice( | ||
| named("fork").and(takesArgument(0, Callable.class)), getClass().getName() + "$ForkAdvice"); | ||
| } | ||
|
|
||
| public static final class ConstructorAdvice { | ||
| public static final class ForkAdvice { | ||
| /** | ||
| * Captures task scope to be restored at the start of VirtualThread.run() method by {@link | ||
| * Runnable} instrumentation. | ||
| * Cancels the task scope continuation captured at task creation when the subtask is forked into | ||
| * an already-canceled scope: its thread never starts, so the {@link Runnable} instrumentation | ||
| * never releases the continuation. | ||
| * | ||
| * @param subTaskImpl The StructuredTaskScopeImpl.SubtaskImpl object (the advice are compile | ||
| * against Java 8 so the type from JDK25 can't be referred, using {@link Object} instead | ||
| * @param scope The StructuredTaskScopeImpl object (the advice is compiled against Java 8 so the | ||
| * type from JDK25 can't be referred, using {@link Object} instead). | ||
| * @param subtask The StructuredTaskScopeImpl.SubtaskImpl object (the advice is compiled against | ||
| * Java 8 so the type from JDK25 can't be referred, using {@link Object} instead). | ||
| */ | ||
| @OnMethodExit(suppress = Throwable.class) | ||
| public static void captureScope(@This Object subTaskImpl) { | ||
| ContextStore<Runnable, State> contextStore = get(Runnable.class, State.class); | ||
| capture(contextStore, (Runnable) subTaskImpl); | ||
| public static void afterFork(@This Object scope, @Return Object subtask) { | ||
| if (subtask instanceof Runnable && StructuredTaskScopeHelper.isCancelled(scope)) { | ||
| ContextStore<Runnable, State> contextStore = get(Runnable.class, State.class); | ||
| cancelTask(contextStore, (Runnable) subtask); | ||
|
Comment on lines
+54
to
+56
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
When a scope is canceled by another subtask or by a timeout after Useful? React with 👍 / 👎.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is an issue, I will handle it next week. |
||
| } | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,40 @@ | ||
| package datadog.trace.instrumentation.java.concurrent.structuredconcurrency25; | ||
|
|
||
| import static datadog.environment.JavaVirtualMachine.isJavaVersionAtLeast; | ||
| import static java.util.Arrays.asList; | ||
| import static java.util.Collections.singletonMap; | ||
|
|
||
| import com.google.auto.service.AutoService; | ||
| import datadog.trace.agent.tooling.Instrumenter; | ||
| import datadog.trace.agent.tooling.InstrumenterModule; | ||
| import datadog.trace.bootstrap.instrumentation.java.concurrent.State; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
|
|
||
| /** | ||
| * This module propagates context across {@code java.util.concurrent.StructuredTaskScope} forked | ||
| * subtasks (JDK 25+). | ||
| */ | ||
| @SuppressWarnings("unused") | ||
| @AutoService(InstrumenterModule.class) | ||
| public class StructuredTaskScope25Module extends InstrumenterModule.ContextTracking { | ||
| public StructuredTaskScope25Module() { | ||
| super("java_concurrent", "structured-task-scope", "structured-task-scope-25"); | ||
| } | ||
|
|
||
| @Override | ||
| public boolean isEnabled() { | ||
| return isJavaVersionAtLeast(25) && super.isEnabled(); | ||
| } | ||
|
|
||
| @Override | ||
| public Map<String, String> contextStore() { | ||
| return singletonMap(Runnable.class.getName(), State.class.getName()); | ||
| } | ||
|
|
||
| @Override | ||
| public List<Instrumenter> typeInstrumentations() { | ||
| return asList( | ||
| new StructuredTaskScope25TaskInstrumentation(), new StructuredTaskScope25Instrumentation()); | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,46 @@ | ||
| package datadog.trace.instrumentation.java.concurrent.structuredconcurrency25; | ||
|
|
||
| import static datadog.trace.bootstrap.InstrumentationContext.get; | ||
| import static datadog.trace.bootstrap.instrumentation.java.concurrent.AdviceUtils.capture; | ||
| import static net.bytebuddy.matcher.ElementMatchers.isConstructor; | ||
|
|
||
| import datadog.trace.agent.tooling.Instrumenter; | ||
| import datadog.trace.bootstrap.ContextStore; | ||
| import datadog.trace.bootstrap.instrumentation.java.concurrent.State; | ||
| import net.bytebuddy.asm.Advice.OnMethodExit; | ||
| import net.bytebuddy.asm.Advice.This; | ||
|
|
||
| /** | ||
| * This instrumentation captures the active span scope at StructuredTaskScope task creation | ||
| * (SubtaskImpl). The scope is then activate and close through the {@link Runnable} instrumentation | ||
| * (SubtaskImpl implementing {@link Runnable}). | ||
| */ | ||
| @SuppressWarnings("unused") | ||
| public class StructuredTaskScope25TaskInstrumentation | ||
| implements Instrumenter.ForBootstrap, Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice { | ||
|
|
||
| @Override | ||
| public String instrumentedType() { | ||
| return "java.util.concurrent.StructuredTaskScopeImpl$SubtaskImpl"; | ||
| } | ||
|
|
||
| @Override | ||
| public void methodAdvice(MethodTransformer transformer) { | ||
| transformer.applyAdvice(isConstructor(), getClass().getName() + "$ConstructorAdvice"); | ||
| } | ||
|
|
||
| public static final class ConstructorAdvice { | ||
| /** | ||
| * Captures task scope to be restored at the start of VirtualThread.run() method by {@link | ||
| * Runnable} instrumentation. | ||
| * | ||
| * @param subTaskImpl The StructuredTaskScopeImpl.SubtaskImpl object (the advice is compiled | ||
| * against Java 8 so the type from JDK25 can't be referred, using {@link Object} instead). | ||
| */ | ||
| @OnMethodExit(suppress = Throwable.class) | ||
| public static void captureScope(@This Object subTaskImpl) { | ||
| ContextStore<Runnable, State> contextStore = get(Runnable.class, State.class); | ||
| capture(contextStore, (Runnable) subTaskImpl); | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can use also our
datadog.trace.util.MethodHandlesclass that already handle class loading, method finding and exception suppressionThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting! I will have a look if I still need it 👍