-
Notifications
You must be signed in to change notification settings - Fork 318
Add long running traces to flare report, allow flare files to be downloaded with JMX #9874
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
fb54a79
a26bc61
0ee2791
e075ca1
b46f180
e17ca75
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||
|---|---|---|---|---|---|---|---|---|
| @@ -1,14 +1,29 @@ | ||||||||
| package datadog.trace.core; | ||||||||
|
|
||||||||
| import static java.util.Comparator.comparingLong; | ||||||||
|
|
||||||||
| import datadog.communication.ddagent.DDAgentFeaturesDiscovery; | ||||||||
| import datadog.communication.ddagent.SharedCommunicationObjects; | ||||||||
| import datadog.trace.api.Config; | ||||||||
| import datadog.trace.api.config.TracerConfig; | ||||||||
| import datadog.trace.api.flare.TracerFlare; | ||||||||
| import datadog.trace.common.writer.TraceDumpJsonExporter; | ||||||||
| import datadog.trace.core.monitor.HealthMetrics; | ||||||||
| import java.io.IOException; | ||||||||
| import java.util.ArrayList; | ||||||||
| import java.util.Comparator; | ||||||||
| import java.util.List; | ||||||||
| import java.util.concurrent.TimeUnit; | ||||||||
| import java.util.zip.ZipOutputStream; | ||||||||
| import org.slf4j.Logger; | ||||||||
| import org.slf4j.LoggerFactory; | ||||||||
|
|
||||||||
| public class LongRunningTracesTracker implements TracerFlare.Reporter { | ||||||||
| private static final Logger LOGGER = LoggerFactory.getLogger(LongRunningTracesTracker.class); | ||||||||
| private static final int MAX_DUMPED_TRACES = 50; | ||||||||
| private static final Comparator<PendingTrace> TRACE_BY_START_TIME = | ||||||||
| comparingLong(PendingTrace::getRunningTraceStartTime); | ||||||||
|
|
||||||||
| public class LongRunningTracesTracker { | ||||||||
| private final DDAgentFeaturesDiscovery features; | ||||||||
| private final HealthMetrics healthMetrics; | ||||||||
| private long lastFlushMilli = 0; | ||||||||
|
|
@@ -21,6 +36,7 @@ public class LongRunningTracesTracker { | |||||||
| private int dropped = 0; | ||||||||
| private int write = 0; | ||||||||
| private int expired = 0; | ||||||||
| private int droppedSampling = 0; | ||||||||
|
|
||||||||
| public static final int NOT_TRACKED = -1; | ||||||||
| public static final int UNDEFINED = 0; | ||||||||
|
|
@@ -41,6 +57,18 @@ public LongRunningTracesTracker( | |||||||
| (int) TimeUnit.SECONDS.toMillis(config.getLongRunningTraceFlushInterval()); | ||||||||
| this.features = sharedCommunicationObjects.featuresDiscovery(config); | ||||||||
| this.healthMetrics = healthMetrics; | ||||||||
|
|
||||||||
| if (!features.supportsLongRunning()) { | ||||||||
| LOGGER.warn( | ||||||||
| "Long running trace tracking is enabled via {}, however the Datadog Agent version {} does not support receiving long running traces. " | ||||||||
| + "Long running traces will be tracked locally in memory (up to {} traces) but will NOT be automatically reported to the agent. " | ||||||||
| + "Long running traces are included in tracer flares.", | ||||||||
| "dd." + TracerConfig.TRACE_LONG_RUNNING_ENABLED, | ||||||||
| features.getVersion() != null ? features.getVersion() : "unknown", | ||||||||
| maxTrackedTraces); | ||||||||
| } | ||||||||
|
|
||||||||
| TracerFlare.addReporter(this); | ||||||||
| } | ||||||||
|
|
||||||||
| public boolean add(PendingTraceBuffer.Element element) { | ||||||||
|
|
@@ -56,7 +84,7 @@ public boolean add(PendingTraceBuffer.Element element) { | |||||||
| return true; | ||||||||
| } | ||||||||
|
|
||||||||
| private void addTrace(PendingTrace trace) { | ||||||||
| private synchronized void addTrace(PendingTrace trace) { | ||||||||
| if (trace.empty()) { | ||||||||
| return; | ||||||||
| } | ||||||||
|
|
@@ -67,7 +95,7 @@ private void addTrace(PendingTrace trace) { | |||||||
| traceArray.add(trace); | ||||||||
| } | ||||||||
|
|
||||||||
| public void flushAndCompact(long nowMilli) { | ||||||||
| public synchronized void flushAndCompact(long nowMilli) { | ||||||||
| if (nowMilli < lastFlushMilli + TimeUnit.SECONDS.toMillis(1)) { | ||||||||
| return; | ||||||||
| } | ||||||||
|
|
@@ -78,7 +106,7 @@ public void flushAndCompact(long nowMilli) { | |||||||
| cleanSlot(i); | ||||||||
| continue; | ||||||||
| } | ||||||||
| if (trace.empty() || !features.supportsLongRunning()) { | ||||||||
| if (trace.empty()) { | ||||||||
| trace.compareAndSetLongRunningState(WRITE_RUNNING_SPANS, NOT_TRACKED); | ||||||||
| cleanSlot(i); | ||||||||
| continue; | ||||||||
|
|
@@ -92,12 +120,15 @@ public void flushAndCompact(long nowMilli) { | |||||||
| if (shouldFlush(nowMilli, trace)) { | ||||||||
| if (negativeOrNullPriority(trace)) { | ||||||||
| trace.compareAndSetLongRunningState(TRACKED, NOT_TRACKED); | ||||||||
| droppedSampling++; | ||||||||
| cleanSlot(i); | ||||||||
| continue; | ||||||||
| } | ||||||||
| trace.compareAndSetLongRunningState(TRACKED, WRITE_RUNNING_SPANS); | ||||||||
| write++; | ||||||||
| trace.write(); | ||||||||
| if (features.supportsLongRunning()) { | ||||||||
| trace.compareAndSetLongRunningState(TRACKED, WRITE_RUNNING_SPANS); | ||||||||
| write++; | ||||||||
| trace.write(); | ||||||||
| } | ||||||||
| } | ||||||||
| i++; | ||||||||
| } | ||||||||
|
|
@@ -134,9 +165,28 @@ private boolean negativeOrNullPriority(PendingTrace trace) { | |||||||
| } | ||||||||
|
|
||||||||
| private void flushStats() { | ||||||||
| healthMetrics.onLongRunningUpdate(dropped, write, expired); | ||||||||
| healthMetrics.onLongRunningUpdate(dropped, write, expired, droppedSampling); | ||||||||
| dropped = 0; | ||||||||
| write = 0; | ||||||||
| expired = 0; | ||||||||
| droppedSampling = 0; | ||||||||
| } | ||||||||
|
|
||||||||
| public synchronized String getTracesAsJson() { | ||||||||
| try (TraceDumpJsonExporter writer = new TraceDumpJsonExporter()) { | ||||||||
| List<PendingTrace> traces = new ArrayList<>(traceArray); | ||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you just lock while doing the snapshot of the pending traces? List<PendingTrace> traces;
synchronized(this) {
traces = new ArrayList<>(traceArray);
} |
||||||||
| traces.sort(TRACE_BY_START_TIME); | ||||||||
|
|
||||||||
| int limit = Math.min(traces.size(), MAX_DUMPED_TRACES); | ||||||||
| for (int i = 0; i < limit; i++) { | ||||||||
| writer.write(traces.get(i).getSpans()); | ||||||||
| } | ||||||||
| return writer.getDumpJson(); | ||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: WDYT about this to match other
Suggested change
(along with a corresponding change in the the
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh, good catch! I was meaning to change this, but actually going the other way. I was thinking it would be best to just return an empty string when there are no records for a few reasons:
I think |
||||||||
| } | ||||||||
| } | ||||||||
|
|
||||||||
| @Override | ||||||||
| public void addReportToFlare(ZipOutputStream zip) throws IOException { | ||||||||
| TracerFlare.addText(zip, "long_running_traces.txt", getTracesAsJson()); | ||||||||
| } | ||||||||
| } | ||||||||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is synchronization really needed? AFAIK all access to the tracker are done from the single thread atPendingTraceBuffer#WorkerMy bad, it's synchronized as it's used as a reporter.