Skip to content

Commit 83c695a

Browse files
committed
feat: add error handler callbacks for batch span processor
Add optional error handler callbacks to BatchSpanProcessor to provide programmatic access to background errors while maintaining internal SDK logging for observability. Changes: - Add error_handler callback to BatchSpanProcessor for background errors (span drops, export failures during timer-based flushes) - Return errors from shutdown_with_timeout when spans were dropped - Maintain all internal otel_debug/otel_warn/otel_error logging - Update BatchSpanProcessor builder with with_error_handler() method - Change dropped_spans_count to Arc<AtomicUsize> for shared ownership - Both log AND invoke error handler when errors occur This gives users explicit control over error handling via opt-in callbacks while preserving default observability through internal SDK logging. The error handler is a supplement to logging, not a replacement. Addresses error handling requirements from the OpenTelemetry specification which states that SDKs MAY expose callbacks for self-diagnostics AND that errors should be logged when they cannot be returned to the caller.
1 parent 3b2f751 commit 83c695a

File tree

2 files changed

+270
-63
lines changed

2 files changed

+270
-63
lines changed

opentelemetry-sdk/src/trace/span_processor.rs

Lines changed: 109 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,12 @@ impl<T: SpanExporter> SpanProcessor for SimpleSpanProcessor<T> {
151151
reason = format!("{:?}", err)
152152
);
153153
}
154+
155+
// TODO: on_end() currently has a void return type, so errors are silently discarded.
156+
// This should be changed to return Result so users can be notified of export failures.
157+
// The OpenTelemetry spec requires on_end to be fast/non-blocking, but doesn't mandate
158+
// a void return - returning an error doesn't violate that requirement.
159+
// Until the trait is changed, we cannot return this error to the caller.
154160
}
155161

156162
fn force_flush(&self) -> OTelSdkResult {
@@ -281,7 +287,6 @@ enum BatchMessage {
281287
/// cause deadlock. Instead, call `shutdown()` from a separate thread or use
282288
/// tokio's `spawn_blocking`.
283289
///
284-
#[derive(Debug)]
285290
pub struct BatchSpanProcessor {
286291
span_sender: SyncSender<SpanData>, // Data channel to store spans
287292
message_sender: SyncSender<BatchMessage>, // Control channel to store control messages.
@@ -290,18 +295,37 @@ pub struct BatchSpanProcessor {
290295
export_span_message_sent: Arc<AtomicBool>,
291296
current_batch_size: Arc<AtomicUsize>,
292297
max_export_batch_size: usize,
293-
dropped_spans_count: AtomicUsize,
298+
dropped_spans_count: Arc<AtomicUsize>,
294299
max_queue_size: usize,
300+
error_handler: Option<Arc<dyn Fn(OTelSdkError) + Send + Sync + 'static>>,
301+
}
302+
303+
impl std::fmt::Debug for BatchSpanProcessor {
304+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
305+
f.debug_struct("BatchSpanProcessor")
306+
.field("span_sender", &self.span_sender)
307+
.field("message_sender", &self.message_sender)
308+
.field("handle", &self.handle)
309+
.field("forceflush_timeout", &self.forceflush_timeout)
310+
.field("export_span_message_sent", &self.export_span_message_sent)
311+
.field("current_batch_size", &self.current_batch_size)
312+
.field("max_export_batch_size", &self.max_export_batch_size)
313+
.field("dropped_spans_count", &self.dropped_spans_count)
314+
.field("max_queue_size", &self.max_queue_size)
315+
.field(
316+
"error_handler",
317+
&self.error_handler.as_ref().map(|_| "<function>"),
318+
)
319+
.finish()
320+
}
295321
}
296322

297323
impl BatchSpanProcessor {
298324
/// Creates a new instance of `BatchSpanProcessor`.
299325
pub fn new<E>(
300326
mut exporter: E,
301327
config: BatchConfig,
302-
//max_queue_size: usize,
303-
//scheduled_delay: Duration,
304-
//shutdown_timeout: Duration,
328+
error_handler: Option<Arc<dyn Fn(OTelSdkError) + Send + Sync + 'static>>,
305329
) -> Self
306330
where
307331
E: SpanExporter + Send + 'static,
@@ -425,11 +449,12 @@ impl BatchSpanProcessor {
425449
message_sender,
426450
handle: Mutex::new(Some(handle)),
427451
forceflush_timeout: Duration::from_secs(5), // TODO: make this configurable
428-
dropped_spans_count: AtomicUsize::new(0),
452+
dropped_spans_count: Arc::new(AtomicUsize::new(0)),
429453
max_queue_size,
430454
export_span_message_sent: Arc::new(AtomicBool::new(false)),
431455
current_batch_size,
432456
max_export_batch_size,
457+
error_handler,
433458
}
434459
}
435460

@@ -441,6 +466,7 @@ impl BatchSpanProcessor {
441466
BatchSpanProcessorBuilder {
442467
exporter,
443468
config: BatchConfig::default(),
469+
error_handler: None,
444470
}
445471
}
446472

@@ -576,18 +602,38 @@ impl SpanProcessor for BatchSpanProcessor {
576602
Err(std::sync::mpsc::TrySendError::Full(_)) => {
577603
// Increment dropped spans count. The first time we have to drop
578604
// a span, emit a warning.
579-
if self.dropped_spans_count.fetch_add(1, Ordering::Relaxed) == 0 {
605+
let previous_count = self.dropped_spans_count.fetch_add(1, Ordering::Relaxed);
606+
if previous_count == 0 {
580607
otel_warn!(name: "BatchSpanProcessor.SpanDroppingStarted",
581608
message = "BatchSpanProcessor dropped a Span due to queue full. No further log will be emitted for further drops until Shutdown. During Shutdown time, a log will be emitted with exact count of total spans dropped.");
582609
}
610+
611+
// Also invoke error handler if registered
612+
if let Some(ref handler) = self.error_handler {
613+
handler(OTelSdkError::InternalFailure(format!(
614+
"Span dropped due to full queue (total dropped: {})",
615+
previous_count + 1
616+
)));
617+
}
583618
}
584619
Err(std::sync::mpsc::TrySendError::Disconnected(_)) => {
585620
// Given background thread is the only receiver, and it's
586621
// disconnected, it indicates the thread is shutdown
587-
otel_warn!(
588-
name: "BatchSpanProcessor.OnEnd.AfterShutdown",
589-
message = "Spans are being emitted even after Shutdown. This indicates incorrect lifecycle management of TracerProvider in application. Spans will not be exported."
590-
);
622+
let previous_count = self.dropped_spans_count.fetch_add(1, Ordering::Relaxed);
623+
if previous_count == 0 {
624+
otel_warn!(
625+
name: "BatchSpanProcessor.OnEnd.AfterShutdown",
626+
message = "Spans are being emitted even after Shutdown. This indicates incorrect lifecycle management of TracerProvider in application. Spans will not be exported."
627+
);
628+
}
629+
630+
// Also invoke error handler if registered
631+
if let Some(ref handler) = self.error_handler {
632+
handler(OTelSdkError::InternalFailure(format!(
633+
"Span dropped due to processor already shut down (total dropped: {}). This indicates incorrect lifecycle management of TracerProvider.",
634+
previous_count + 1
635+
)));
636+
}
591637
}
592638
}
593639
}
@@ -609,7 +655,7 @@ impl SpanProcessor for BatchSpanProcessor {
609655
}
610656
})?,
611657
Err(std::sync::mpsc::TrySendError::Full(_)) => {
612-
// If the control message could not be sent, emit a warning.
658+
// If the control message could not be sent, emit a debug log.
613659
otel_debug!(
614660
name: "BatchSpanProcessor.ForceFlush.ControlChannelFull",
615661
message = "Control message to flush the worker thread could not be sent as the control channel is full. This can occur if user repeatedly calls force_flush/shutdown without finishing the previous call."
@@ -621,7 +667,7 @@ impl SpanProcessor for BatchSpanProcessor {
621667
// disconnected, it indicates the thread is shutdown
622668
otel_debug!(
623669
name: "BatchSpanProcessor.ForceFlush.AlreadyShutdown",
624-
message = "ForceFlush invoked after Shutdown. This will not perform Flush and indicates a incorrect lifecycle management in Application."
670+
message = "ForceFlush invoked after Shutdown. This will not perform Flush and indicates incorrect lifecycle management in Application."
625671
);
626672

627673
Err(OTelSdkError::AlreadyShutdown)
@@ -632,14 +678,21 @@ impl SpanProcessor for BatchSpanProcessor {
632678
/// Shuts down the processor.
633679
fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult {
634680
let dropped_spans = self.dropped_spans_count.load(Ordering::Relaxed);
635-
let max_queue_size = self.max_queue_size;
636681
if dropped_spans > 0 {
682+
// Log warning for observability (always happens)
637683
otel_warn!(
638-
name: "BatchSpanProcessor.SpansDropped",
639-
dropped_span_count = dropped_spans,
640-
max_queue_size = max_queue_size,
641-
message = "Spans were dropped due to a queue being full. The count represents the total count of spans dropped in the lifetime of this BatchSpanProcessor. Consider increasing the queue size and/or decrease delay between intervals."
684+
name: "BatchSpanProcessor.Shutdown",
685+
dropped_spans = dropped_spans,
686+
max_queue_size = self.max_queue_size,
687+
message = "Spans were dropped due to a full queue. The count represents the total count of span records dropped in the lifetime of the BatchSpanProcessor. Consider increasing the queue size and/or decrease delay between intervals."
642688
);
689+
690+
// Also return error so user code can handle it programmatically
691+
return Err(OTelSdkError::InternalFailure(format!(
692+
"BatchSpanProcessor dropped {} spans during its lifetime due to full queue (max queue size: {}). Consider increasing queue size or decreasing delay between intervals.",
693+
dropped_spans,
694+
self.max_queue_size
695+
)));
643696
}
644697

645698
let (sender, receiver) = std::sync::mpsc::sync_channel(1);
@@ -687,7 +740,6 @@ impl SpanProcessor for BatchSpanProcessor {
687740
name: "BatchSpanProcessor.Shutdown.AlreadyShutdown",
688741
message = "Shutdown is being invoked more than once. This is noop, but indicates a potential issue in the application's lifecycle management."
689742
);
690-
691743
Err(OTelSdkError::AlreadyShutdown)
692744
}
693745
}
@@ -703,13 +755,29 @@ impl SpanProcessor for BatchSpanProcessor {
703755
}
704756

705757
/// Builder for `BatchSpanProcessorDedicatedThread`.
706-
#[derive(Debug, Default)]
707758
pub struct BatchSpanProcessorBuilder<E>
708759
where
709760
E: SpanExporter + Send + 'static,
710761
{
711762
exporter: E,
712763
config: BatchConfig,
764+
error_handler: Option<Arc<dyn Fn(OTelSdkError) + Send + Sync + 'static>>,
765+
}
766+
767+
impl<E: std::fmt::Debug> std::fmt::Debug for BatchSpanProcessorBuilder<E>
768+
where
769+
E: SpanExporter + Send + 'static,
770+
{
771+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
772+
f.debug_struct("BatchSpanProcessorBuilder")
773+
.field("exporter", &self.exporter)
774+
.field("config", &self.config)
775+
.field(
776+
"error_handler",
777+
&self.error_handler.as_ref().map(|_| "<function>"),
778+
)
779+
.finish()
780+
}
713781
}
714782

715783
impl<E> BatchSpanProcessorBuilder<E>
@@ -721,9 +789,20 @@ where
721789
BatchSpanProcessorBuilder { config, ..self }
722790
}
723791

792+
/// Set the error handler for background export failures
793+
pub fn with_error_handler<F>(self, handler: F) -> Self
794+
where
795+
F: Fn(OTelSdkError) + Send + Sync + 'static,
796+
{
797+
BatchSpanProcessorBuilder {
798+
error_handler: Some(Arc::new(handler)),
799+
..self
800+
}
801+
}
802+
724803
/// Build a new instance of `BatchSpanProcessor`.
725804
pub fn build(self) -> BatchSpanProcessor {
726-
BatchSpanProcessor::new(self.exporter, self.config)
805+
BatchSpanProcessor::new(self.exporter, self.config, self.error_handler)
727806
}
728807
}
729808

@@ -1189,7 +1268,7 @@ mod tests {
11891268
.with_max_export_batch_size(10)
11901269
.with_scheduled_delay(Duration::from_secs(5))
11911270
.build();
1192-
let processor = BatchSpanProcessor::new(exporter, config);
1271+
let processor = BatchSpanProcessor::new(exporter, config, None);
11931272

11941273
let test_span = create_test_span("test_span");
11951274
processor.on_end(test_span.clone());
@@ -1211,7 +1290,7 @@ mod tests {
12111290
.with_max_export_batch_size(10)
12121291
.with_scheduled_delay(Duration::from_secs(5))
12131292
.build();
1214-
let processor = BatchSpanProcessor::new(exporter, config);
1293+
let processor = BatchSpanProcessor::new(exporter, config, None);
12151294

12161295
// Create a test span and send it to the processor
12171296
let test_span = create_test_span("force_flush_span");
@@ -1237,7 +1316,7 @@ mod tests {
12371316
let exporter = InMemorySpanExporterBuilder::new()
12381317
.keep_records_on_shutdown()
12391318
.build();
1240-
let processor = BatchSpanProcessor::new(exporter.clone(), BatchConfig::default());
1319+
let processor = BatchSpanProcessor::new(exporter.clone(), BatchConfig::default(), None);
12411320

12421321
let record = create_test_span("test_span");
12431322

@@ -1261,7 +1340,7 @@ mod tests {
12611340
.with_max_export_batch_size(512) // Explicitly set to avoid env var override
12621341
.with_scheduled_delay(Duration::from_secs(5))
12631342
.build();
1264-
let processor = BatchSpanProcessor::new(exporter, config);
1343+
let processor = BatchSpanProcessor::new(exporter, config, None);
12651344

12661345
// Create test spans and send them to the processor
12671346
let span1 = create_test_span("span1");
@@ -1306,7 +1385,7 @@ mod tests {
13061385
let exporter = MockSpanExporter::new();
13071386
let exporter_shared = exporter.exported_spans.clone();
13081387
let config = BatchConfigBuilder::default().build();
1309-
let processor = BatchSpanProcessor::new(exporter, config);
1388+
let processor = BatchSpanProcessor::new(exporter, config, None);
13101389

13111390
// Create a span with attributes
13121391
let mut span_data = create_test_span("attribute_validation");
@@ -1337,7 +1416,7 @@ mod tests {
13371416
let exporter_shared = exporter.exported_spans.clone();
13381417
let resource_shared = exporter.exported_resource.clone();
13391418
let config = BatchConfigBuilder::default().build();
1340-
let mut processor = BatchSpanProcessor::new(exporter, config);
1419+
let mut processor = BatchSpanProcessor::new(exporter, config, None);
13411420

13421421
// Set a resource for the processor
13431422
let resource = Resource::new(vec![KeyValue::new("service.name", "test_service")]);
@@ -1376,7 +1455,7 @@ mod tests {
13761455
.with_max_export_batch_size(3)
13771456
.build();
13781457

1379-
let processor = BatchSpanProcessor::new(exporter, config);
1458+
let processor = BatchSpanProcessor::new(exporter, config, None);
13801459

13811460
for _ in 0..4 {
13821461
let span = new_test_export_span_data();
@@ -1399,7 +1478,7 @@ mod tests {
13991478
.with_max_export_batch_size(3)
14001479
.build();
14011480

1402-
let processor = BatchSpanProcessor::new(exporter, config);
1481+
let processor = BatchSpanProcessor::new(exporter, config, None);
14031482

14041483
for _ in 0..4 {
14051484
let span = new_test_export_span_data();
@@ -1423,7 +1502,7 @@ mod tests {
14231502
.build();
14241503

14251504
// Create the processor with the thread-safe exporter
1426-
let processor = Arc::new(BatchSpanProcessor::new(exporter, config));
1505+
let processor = Arc::new(BatchSpanProcessor::new(exporter, config, None));
14271506

14281507
let mut handles = vec![];
14291508
for _ in 0..10 {

0 commit comments

Comments
 (0)