Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ private static void run(final StatsDClientManager statsDClientManager, final Con
.refreshBeansPeriod(refreshBeansPeriod)
.globalTags(globalTags)
.reporter(reporter)
.jmxfetchTelemetry(config.isTelemetryJmxEnabled())
.connectionFactory(new AgentConnectionFactory());

if (config.isJmxFetchMultipleRuntimeServicesEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ public final class GeneralConfig {
public static final String TELEMETRY_DEPENDENCY_RESOLUTION_QUEUE_SIZE =
"telemetry.dependency-resolution.queue.size";
public static final String TELEMETRY_DEBUG_REQUESTS_ENABLED = "telemetry.debug.requests.enabled";
public static final String TELEMETRY_JMX_ENABLED = "telemetry.jmx.enabled";
public static final String AGENTLESS_LOG_SUBMISSION_ENABLED = "agentless.log.submission.enabled";
public static final String AGENTLESS_LOG_SUBMISSION_QUEUE_SIZE =
"agentless.log.submission.queue.size";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,9 @@
import com.squareup.moshi.JsonAdapter;
import com.squareup.moshi.Moshi;
import com.squareup.moshi.Types;
import datadog.trace.api.flare.TracerFlare;
import datadog.trace.core.DDSpan;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.zip.ZipOutputStream;

public class TraceDumpJsonExporter implements Writer {

Expand All @@ -17,11 +14,9 @@ public class TraceDumpJsonExporter implements Writer {
.add(DDSpanJsonAdapter.buildFactory(false))
.build()
.adapter(Types.newParameterizedType(Collection.class, DDSpan.class));
private StringBuilder dumpText;
private ZipOutputStream zip;
private final StringBuilder dumpText;

public TraceDumpJsonExporter(ZipOutputStream zip) {
this.zip = zip;
public TraceDumpJsonExporter() {
dumpText = new StringBuilder();
}

Expand All @@ -32,7 +27,8 @@ public void write(final Collection<DDSpan> trace) {

@Override
public void write(List<DDSpan> trace) {
// Do nothing
Collection<DDSpan> collectionTrace = trace;
write(collectionTrace);
}

@Override
Expand All @@ -42,14 +38,14 @@ public void start() {

@Override
public boolean flush() {
try {
TracerFlare.addText(zip, "pending_traces.txt", dumpText.toString());
} catch (IOException e) {
// do nothing
}
// do nothing
return true;
}

public String getDumpJson() {
return dumpText.toString();
}

@Override
public void close() {
// do nothing
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,29 @@
package datadog.trace.core;

import static java.util.Comparator.comparingLong;

import datadog.communication.ddagent.DDAgentFeaturesDiscovery;
import datadog.communication.ddagent.SharedCommunicationObjects;
import datadog.trace.api.Config;
import datadog.trace.api.config.TracerConfig;
import datadog.trace.api.flare.TracerFlare;
import datadog.trace.common.writer.TraceDumpJsonExporter;
import datadog.trace.core.monitor.HealthMetrics;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.zip.ZipOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LongRunningTracesTracker implements TracerFlare.Reporter {
private static final Logger LOGGER = LoggerFactory.getLogger(LongRunningTracesTracker.class);
private static final int MAX_DUMPED_TRACES = 50;
private static final Comparator<PendingTrace> TRACE_BY_START_TIME =
comparingLong(PendingTrace::getRunningTraceStartTime);

public class LongRunningTracesTracker {
private final DDAgentFeaturesDiscovery features;
private final HealthMetrics healthMetrics;
private long lastFlushMilli = 0;
Expand All @@ -21,6 +36,7 @@ public class LongRunningTracesTracker {
private int dropped = 0;
private int write = 0;
private int expired = 0;
private int droppedSampling = 0;

public static final int NOT_TRACKED = -1;
public static final int UNDEFINED = 0;
Expand All @@ -41,6 +57,18 @@ public LongRunningTracesTracker(
(int) TimeUnit.SECONDS.toMillis(config.getLongRunningTraceFlushInterval());
this.features = sharedCommunicationObjects.featuresDiscovery(config);
this.healthMetrics = healthMetrics;

if (!features.supportsLongRunning()) {
LOGGER.warn(
"Long running trace tracking is enabled via {}, however the Datadog Agent version {} does not support receiving long running traces. "
+ "Long running traces will be tracked locally in memory (up to {} traces) but will NOT be automatically reported to the agent. "
+ "Long running traces are included in tracer flares.",
"dd." + TracerConfig.TRACE_LONG_RUNNING_ENABLED,
features.getVersion() != null ? features.getVersion() : "unknown",
maxTrackedTraces);
}

TracerFlare.addReporter(this);
}

public boolean add(PendingTraceBuffer.Element element) {
Expand All @@ -56,7 +84,7 @@ public boolean add(PendingTraceBuffer.Element element) {
return true;
}

private void addTrace(PendingTrace trace) {
private synchronized void addTrace(PendingTrace trace) {
Copy link
Member

@manuel-alvarez-alvarez manuel-alvarez-alvarez Nov 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is synchronization really needed? AFAIK all access to the tracker are done from the single thread at PendingTraceBuffer#Worker

My bad, it's synchronized as it's used as a reporter.

if (trace.empty()) {
return;
}
Expand All @@ -67,7 +95,7 @@ private void addTrace(PendingTrace trace) {
traceArray.add(trace);
}

public void flushAndCompact(long nowMilli) {
public synchronized void flushAndCompact(long nowMilli) {
if (nowMilli < lastFlushMilli + TimeUnit.SECONDS.toMillis(1)) {
return;
}
Expand All @@ -78,7 +106,7 @@ public void flushAndCompact(long nowMilli) {
cleanSlot(i);
continue;
}
if (trace.empty() || !features.supportsLongRunning()) {
if (trace.empty()) {
trace.compareAndSetLongRunningState(WRITE_RUNNING_SPANS, NOT_TRACKED);
cleanSlot(i);
continue;
Expand All @@ -92,12 +120,15 @@ public void flushAndCompact(long nowMilli) {
if (shouldFlush(nowMilli, trace)) {
if (negativeOrNullPriority(trace)) {
trace.compareAndSetLongRunningState(TRACKED, NOT_TRACKED);
droppedSampling++;
cleanSlot(i);
continue;
}
trace.compareAndSetLongRunningState(TRACKED, WRITE_RUNNING_SPANS);
write++;
trace.write();
if (features.supportsLongRunning()) {
trace.compareAndSetLongRunningState(TRACKED, WRITE_RUNNING_SPANS);
write++;
trace.write();
}
}
i++;
}
Expand Down Expand Up @@ -134,9 +165,28 @@ private boolean negativeOrNullPriority(PendingTrace trace) {
}

private void flushStats() {
healthMetrics.onLongRunningUpdate(dropped, write, expired);
healthMetrics.onLongRunningUpdate(dropped, write, expired, droppedSampling);
dropped = 0;
write = 0;
expired = 0;
droppedSampling = 0;
}

public synchronized String getTracesAsJson() {
try (TraceDumpJsonExporter writer = new TraceDumpJsonExporter()) {
List<PendingTrace> traces = new ArrayList<>(traceArray);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you just lock while doing the snapshot of the pending traces?

List<PendingTrace> traces;
synchronized(this) {
    traces = new ArrayList<>(traceArray);
}

traces.sort(TRACE_BY_START_TIME);

int limit = Math.min(traces.size(), MAX_DUMPED_TRACES);
for (int i = 0; i < limit; i++) {
writer.write(traces.get(i).getSpans());
}
return writer.getDumpJson();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: WDYT about this to match other getTracesAsJson() methods that return "[]" rather than an empty string when the json is empty?

Suggested change
return writer.getDumpJson();
String json = writer.getDumpJson();
return json.isEmpty() ? "[]" : json;

(along with a corresponding change in the the "getTracesAsJson with no traces" test)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, good catch! I was meaning to change this, but actually going the other way. I was thinking it would be best to just return an empty string when there are no records for a few reasons:

  1. The existing implementation for pending traces serializes each pending trace as its own JSON record, with a newline between records (JSON Lines style). In this case, it's fine to have an empty string when there are no records.
  2. I think it's slightly more correct to have an empty string when there are no pending/long-running traces instead of using []. [] suggests a single pending/long-running trace with no pending spans (uncommon, but it can happen, particularly with pending traces once all the spans are finished but before it's processed in the queue).
  3. Doesn't change the existing functionality.

I think [] is a relic of my early days working on this before I understood the existing functionality--I had one heck of a time trying to actually see anything in the pending buffer.

}
}

@Override
public void addReportToFlare(ZipOutputStream zip) throws IOException {
TracerFlare.addText(zip, "long_running_traces.txt", getTracesAsJson());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public interface Element {
}

private static class DelayingPendingTraceBuffer extends PendingTraceBuffer {
private static final Logger LOGGER = LoggerFactory.getLogger(DelayingPendingTraceBuffer.class);
private static final long FORCE_SEND_DELAY_MS = TimeUnit.SECONDS.toMillis(5);
private static final long SEND_DELAY_NS = TimeUnit.MILLISECONDS.toNanos(500);
private static final long SLEEP_TIME_MS = 100;
Expand All @@ -64,13 +65,31 @@ private static class DelayingPendingTraceBuffer extends PendingTraceBuffer {
private final MpscBlockingConsumerArrayQueue<Element> queue;
private final Thread worker;
private final TimeSource timeSource;
private final HealthMetrics healthMetrics;

private volatile boolean closed = false;
private final AtomicInteger flushCounter = new AtomicInteger(0);
private final AtomicInteger dumpCounter = new AtomicInteger(0);

private final LongRunningTracesTracker runningTracesTracker;

private void dumpTraces() {
if (worker.isAlive()) {
int count = dumpCounter.get();
int loop = 1;
boolean signaled = queue.offer(DUMP_ELEMENT);
while (!closed && !signaled) {
yieldOrSleep(loop++);
signaled = queue.offer(DUMP_ELEMENT);
}
int newCount = dumpCounter.get();
while (!closed && count >= newCount) {
yieldOrSleep(loop++);
newCount = dumpCounter.get();
}
}
}

public boolean longRunningSpansEnabled() {
return runningTracesTracker != null;
}
Expand All @@ -85,6 +104,7 @@ public void enqueue(Element pendingTrace) {
if (!pendingTrace.writeOnBufferFull()) {
return;
}
healthMetrics.onPendingWriteAround();
pendingTrace.write();
}
}
Expand Down Expand Up @@ -136,6 +156,18 @@ public void flush() {
}
}

private String getDumpJson() {
try (TraceDumpJsonExporter writer = new TraceDumpJsonExporter()) {
for (Element e : DumpDrain.DUMP_DRAIN.collectTraces()) {
if (e instanceof PendingTrace) {
PendingTrace trace = (PendingTrace) e;
writer.write(trace.getSpans());
}
}
return writer.getDumpJson();
}
}

private static final class WriteDrain implements MessagePassingQueue.Consumer<Element> {
private static final WriteDrain WRITE_DRAIN = new WriteDrain();

Expand Down Expand Up @@ -286,6 +318,17 @@ public void run() {
}
}

@Override
public String getTracesAsJson() throws IOException {
dumpTraces();
String json = getDumpJson();
if (json.isEmpty()) {
return "[]";
} else {
return json;
}
}

public DelayingPendingTraceBuffer(
int bufferSize,
TimeSource timeSource,
Expand All @@ -295,6 +338,7 @@ public DelayingPendingTraceBuffer(
this.queue = new MpscBlockingConsumerArrayQueue<>(bufferSize);
this.worker = newAgentThread(TRACE_MONITOR, new Worker());
this.timeSource = timeSource;
this.healthMetrics = healthMetrics;
boolean runningSpansEnabled = config.isLongRunningTraceEnabled();
this.runningTracesTracker =
runningSpansEnabled
Expand All @@ -321,6 +365,11 @@ public void enqueue(Element pendingTrace) {
log.debug(
"PendingTrace enqueued but won't be reported. Root span: {}", pendingTrace.getRootSpan());
}

@Override
public String getTracesAsJson() {
return "[]";
}
}

public static PendingTraceBuffer delaying(
Expand All @@ -345,6 +394,8 @@ public static PendingTraceBuffer discarding() {

public abstract void enqueue(Element pendingTrace);

public abstract String getTracesAsJson() throws IOException;

private static class TracerDump implements TracerFlare.Reporter {
private final DelayingPendingTraceBuffer buffer;

Expand All @@ -354,32 +405,12 @@ private TracerDump(DelayingPendingTraceBuffer buffer) {

@Override
public void prepareForFlare() {
if (buffer.worker.isAlive()) {
int count = buffer.dumpCounter.get();
int loop = 1;
boolean signaled = buffer.queue.offer(DelayingPendingTraceBuffer.DUMP_ELEMENT);
while (!buffer.closed && !signaled) {
buffer.yieldOrSleep(loop++);
signaled = buffer.queue.offer(DelayingPendingTraceBuffer.DUMP_ELEMENT);
}
int newCount = buffer.dumpCounter.get();
while (!buffer.closed && count >= newCount) {
buffer.yieldOrSleep(loop++);
newCount = buffer.dumpCounter.get();
}
}
buffer.dumpTraces();
}

@Override
public void addReportToFlare(ZipOutputStream zip) throws IOException {
TraceDumpJsonExporter writer = new TraceDumpJsonExporter(zip);
for (Element e : DelayingPendingTraceBuffer.DumpDrain.DUMP_DRAIN.collectTraces()) {
if (e instanceof PendingTrace) {
PendingTrace trace = (PendingTrace) e;
writer.write(trace.getSpans());
}
}
writer.flush();
TracerFlare.addText(zip, "pending_traces.txt", buffer.getDumpJson());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,10 @@ public void onSend(
public void onFailedSend(
final int traceCount, final int sizeInBytes, final RemoteApi.Response response) {}

public void onLongRunningUpdate(final int dropped, final int write, final int expired) {}
public void onPendingWriteAround() {}

public void onLongRunningUpdate(
final int dropped, final int write, final int expired, final int droppedSampling) {}

/**
* Report that a trace has been used to compute client stats.
Expand Down
Loading