Skip to content

Commit 81ec538

Browse files
authored
Extract SparkPlan product and append to trace (#9783)
* Extract Spark Plan product with keys for Scala 2.13 * Extract Spark Plan product values for Spark 2.12 * Update tests for meta field in Spark SQL plans * Remove unused logic to parse children, enrich product parsing to support more types and use JSON arrays * Update tests to assert on meta values * Use Abstract class for common functions * Use Jackson JSON parser instead of rolling own parsing * Refactor AbstractSparkPlanUtils to only require key generation on impl * Default to returning null if class not recognized, limit recursion depth and array length * Improve testing scheme for Spark32 on Scala 212 with unknown keys * Improve method & class naming, reuse ObjectMapper from listener * Gate Spark Plan parsing with flag * Match classes by string comparison, add negative cache * Add unit tests for AbstractSparkPlanSerializer * Make ObjectMapper protected on AbstractDatadogSparkListener instead of public * Specify correct helper class names * Add dd.data.jobs.experimental_features.enabled FF * Remove knownMatchingTypes override from version-specific impls * Catch NullPointerException for getDeclaredMethod calls * Adjust more gates to match classes using string comparison * Revert "Catch NullPointerException for getDeclaredMethod calls" This reverts commit 5527ad0. * Explicit cast to String on simpleString calls * Use toMap to convert mutable to immutable map in Scala 2.12 * Improvements from comments: use singleton, string concat instead of format, add FF to supported-configurations.json properly * Avoid merge conflict, reorder flags * Exit loop early, store reflected methods as class fields
1 parent 29ed9f5 commit 81ec538

File tree

16 files changed

+978
-45
lines changed

16 files changed

+978
-45
lines changed

dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/Spark212Instrumentation.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,20 @@
88
import datadog.trace.api.Config;
99
import net.bytebuddy.asm.Advice;
1010
import org.apache.spark.SparkContext;
11+
import org.apache.spark.sql.execution.SparkPlan;
12+
import org.apache.spark.sql.execution.SparkPlanInfo;
1113
import org.slf4j.Logger;
1214
import org.slf4j.LoggerFactory;
15+
import scala.Predef;
16+
import scala.collection.JavaConverters;
1317

1418
@AutoService(InstrumenterModule.class)
1519
public class Spark212Instrumentation extends AbstractSparkInstrumentation {
1620
@Override
1721
public String[] helperClassNames() {
1822
return new String[] {
1923
packageName + ".AbstractDatadogSparkListener",
24+
packageName + ".AbstractSparkPlanSerializer",
2025
packageName + ".DatabricksParentContext",
2126
packageName + ".OpenlineageParentContext",
2227
packageName + ".DatadogSpark212Listener",
@@ -27,6 +32,7 @@ public String[] helperClassNames() {
2732
packageName + ".SparkSQLUtils",
2833
packageName + ".SparkSQLUtils$SparkPlanInfoForStage",
2934
packageName + ".SparkSQLUtils$AccumulatorWithStage",
35+
packageName + ".Spark212PlanSerializer"
3036
};
3137
}
3238

@@ -40,6 +46,13 @@ public void methodAdvice(MethodTransformer transformer) {
4046
.and(isDeclaredBy(named("org.apache.spark.SparkContext")))
4147
.and(takesNoArguments()),
4248
Spark212Instrumentation.class.getName() + "$InjectListener");
49+
50+
transformer.applyAdvice(
51+
isMethod()
52+
.and(named("fromSparkPlan"))
53+
.and(takesArgument(0, named("org.apache.spark.sql.execution.SparkPlan")))
54+
.and(isDeclaredBy(named("org.apache.spark.sql.execution.SparkPlanInfo$"))),
55+
Spark212Instrumentation.class.getName() + "$SparkPlanInfoAdvice");
4356
}
4457

4558
public static class InjectListener {
@@ -78,4 +91,25 @@ public static void enter(@Advice.This SparkContext sparkContext) {
7891
sparkContext.listenerBus().addToSharedQueue(AbstractDatadogSparkListener.listener);
7992
}
8093
}
94+
95+
public static class SparkPlanInfoAdvice {
96+
@Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class)
97+
public static void exit(
98+
@Advice.Return(readOnly = false) SparkPlanInfo planInfo,
99+
@Advice.Argument(0) SparkPlan plan) {
100+
if (planInfo.metadata().size() == 0
101+
&& (Config.get().isDataJobsParseSparkPlanEnabled()
102+
|| Config.get().isDataJobsExperimentalFeaturesEnabled())) {
103+
Spark212PlanSerializer planUtils = new Spark212PlanSerializer();
104+
planInfo =
105+
new SparkPlanInfo(
106+
planInfo.nodeName(),
107+
planInfo.simpleString(),
108+
planInfo.children(),
109+
JavaConverters.mapAsScalaMap(planUtils.extractFormattedProduct(plan))
110+
.toMap(Predef.$conforms()),
111+
planInfo.metrics());
112+
}
113+
}
114+
}
81115
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package datadog.trace.instrumentation.spark;
2+
3+
import org.apache.spark.sql.catalyst.trees.TreeNode;
4+
5+
public class Spark212PlanSerializer extends AbstractSparkPlanSerializer {
6+
@Override
7+
public String getKey(int idx, TreeNode node) {
8+
return "_dd.unknown_key." + idx;
9+
}
10+
}

dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213Instrumentation.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,20 @@
88
import datadog.trace.api.Config;
99
import net.bytebuddy.asm.Advice;
1010
import org.apache.spark.SparkContext;
11+
import org.apache.spark.sql.execution.SparkPlan;
12+
import org.apache.spark.sql.execution.SparkPlanInfo;
1113
import org.slf4j.Logger;
1214
import org.slf4j.LoggerFactory;
15+
import scala.collection.JavaConverters;
16+
import scala.collection.immutable.HashMap;
1317

1418
@AutoService(InstrumenterModule.class)
1519
public class Spark213Instrumentation extends AbstractSparkInstrumentation {
1620
@Override
1721
public String[] helperClassNames() {
1822
return new String[] {
1923
packageName + ".AbstractDatadogSparkListener",
24+
packageName + ".AbstractSparkPlanSerializer",
2025
packageName + ".DatabricksParentContext",
2126
packageName + ".OpenlineageParentContext",
2227
packageName + ".DatadogSpark213Listener",
@@ -27,6 +32,7 @@ public String[] helperClassNames() {
2732
packageName + ".SparkSQLUtils",
2833
packageName + ".SparkSQLUtils$SparkPlanInfoForStage",
2934
packageName + ".SparkSQLUtils$AccumulatorWithStage",
35+
packageName + ".Spark213PlanSerializer"
3036
};
3137
}
3238

@@ -40,6 +46,13 @@ public void methodAdvice(MethodTransformer transformer) {
4046
.and(isDeclaredBy(named("org.apache.spark.SparkContext")))
4147
.and(takesNoArguments()),
4248
Spark213Instrumentation.class.getName() + "$InjectListener");
49+
50+
transformer.applyAdvice(
51+
isMethod()
52+
.and(named("fromSparkPlan"))
53+
.and(takesArgument(0, named("org.apache.spark.sql.execution.SparkPlan")))
54+
.and(isDeclaredBy(named("org.apache.spark.sql.execution.SparkPlanInfo$"))),
55+
Spark213Instrumentation.class.getName() + "$SparkPlanInfoAdvice");
4356
}
4457

4558
public static class InjectListener {
@@ -79,4 +92,24 @@ public static void enter(@Advice.This SparkContext sparkContext) {
7992
sparkContext.listenerBus().addToSharedQueue(AbstractDatadogSparkListener.listener);
8093
}
8194
}
95+
96+
public static class SparkPlanInfoAdvice {
97+
@Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class)
98+
public static void exit(
99+
@Advice.Return(readOnly = false) SparkPlanInfo planInfo,
100+
@Advice.Argument(0) SparkPlan plan) {
101+
if (planInfo.metadata().size() == 0
102+
&& (Config.get().isDataJobsParseSparkPlanEnabled()
103+
|| Config.get().isDataJobsExperimentalFeaturesEnabled())) {
104+
Spark213PlanSerializer planUtils = new Spark213PlanSerializer();
105+
planInfo =
106+
new SparkPlanInfo(
107+
planInfo.nodeName(),
108+
planInfo.simpleString(),
109+
planInfo.children(),
110+
HashMap.from(JavaConverters.asScala(planUtils.extractFormattedProduct(plan))),
111+
planInfo.metrics());
112+
}
113+
}
114+
}
82115
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package datadog.trace.instrumentation.spark;
2+
3+
import org.apache.spark.sql.catalyst.trees.TreeNode;
4+
5+
public class Spark213PlanSerializer extends AbstractSparkPlanSerializer {
6+
@Override
7+
public String getKey(int idx, TreeNode node) {
8+
return node.productElementName(idx);
9+
}
10+
}

dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@
6666
*/
6767
public abstract class AbstractDatadogSparkListener extends SparkListener {
6868
private static final Logger log = LoggerFactory.getLogger(AbstractDatadogSparkListener.class);
69-
private static final ObjectMapper objectMapper = new ObjectMapper();
69+
protected static final ObjectMapper objectMapper = new ObjectMapper();
7070
public static volatile AbstractDatadogSparkListener listener = null;
7171

7272
public static volatile boolean finishTraceOnApplicationEnd = true;

dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkInstrumentation.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@ public String[] knownMatchingTypes() {
3636
"org.apache.spark.deploy.yarn.ApplicationMaster",
3737
"org.apache.spark.util.Utils",
3838
"org.apache.spark.util.SparkClassUtils",
39-
"org.apache.spark.scheduler.LiveListenerBus"
39+
"org.apache.spark.scheduler.LiveListenerBus",
40+
"org.apache.spark.sql.execution.SparkPlanInfo$"
4041
};
4142
}
4243

0 commit comments

Comments
 (0)