diff --git a/examples/tracing-http-propagator/src/server.rs b/examples/tracing-http-propagator/src/server.rs index 8060c99841..13eb3eb9e2 100644 --- a/examples/tracing-http-propagator/src/server.rs +++ b/examples/tracing-http-propagator/src/server.rs @@ -118,7 +118,7 @@ impl LogProcessor for EnrichWithBaggageLogProcessor { }); } - fn force_flush(&self) -> OTelSdkResult { + fn force_flush_with_timeout(&self, _timeout: Duration) -> OTelSdkResult { Ok(()) } @@ -132,7 +132,7 @@ impl LogProcessor for EnrichWithBaggageLogProcessor { #[derive(Debug)] struct EnrichWithBaggageSpanProcessor; impl SpanProcessor for EnrichWithBaggageSpanProcessor { - fn force_flush(&self) -> OTelSdkResult { + fn force_flush_with_timeout(&self, _timeout: Duration) -> OTelSdkResult { Ok(()) } diff --git a/opentelemetry-appender-tracing/src/layer.rs b/opentelemetry-appender-tracing/src/layer.rs index d5f2435ab2..42a96f26d7 100644 --- a/opentelemetry-appender-tracing/src/layer.rs +++ b/opentelemetry-appender-tracing/src/layer.rs @@ -284,6 +284,7 @@ const fn severity_of_level(level: &Level) -> Severity { #[cfg(test)] mod tests { + use std::time::Duration; use crate::layer; use opentelemetry::logs::Severity; use opentelemetry::trace::TracerProvider; @@ -931,7 +932,7 @@ mod tests { true } - fn force_flush(&self) -> OTelSdkResult { + fn force_flush_with_timeout(&self, _timeout: Duration) -> OTelSdkResult { Ok(()) } diff --git a/opentelemetry-proto/src/transform/logs.rs b/opentelemetry-proto/src/transform/logs.rs index c99f36fa0b..086f6c5b3f 100644 --- a/opentelemetry-proto/src/transform/logs.rs +++ b/opentelemetry-proto/src/transform/logs.rs @@ -221,6 +221,7 @@ pub mod tonic { #[cfg(test)] mod tests { + use std::time::Duration; use crate::transform::common::tonic::ResourceAttributesWithSchema; use opentelemetry::logs::LogRecord as _; use opentelemetry::logs::Logger; @@ -238,7 +239,7 @@ mod tests { impl LogProcessor for MockProcessor { fn emit(&self, _record: &mut SdkLogRecord, _instrumentation: &InstrumentationScope) {} - fn force_flush(&self) -> OTelSdkResult { + fn force_flush_with_timeout(&self, _timeout: Duration) -> OTelSdkResult { Ok(()) } diff --git a/opentelemetry-sdk/src/logs/batch_log_processor.rs b/opentelemetry-sdk/src/logs/batch_log_processor.rs index 9aae314f29..0627cbf097 100644 --- a/opentelemetry-sdk/src/logs/batch_log_processor.rs +++ b/opentelemetry-sdk/src/logs/batch_log_processor.rs @@ -129,7 +129,6 @@ pub struct BatchLogProcessor { logs_sender: SyncSender, // Data channel to store log records and instrumentation scopes message_sender: SyncSender, // Control channel to store control messages for the worker thread handle: Mutex>>, - forceflush_timeout: Duration, export_log_message_sent: Arc, current_batch_size: Arc, max_export_batch_size: usize, @@ -221,21 +220,19 @@ impl LogProcessor for BatchLogProcessor { } } - fn force_flush(&self) -> OTelSdkResult { + fn force_flush_with_timeout(&self, timeout: Duration) -> OTelSdkResult { let (sender, receiver) = mpsc::sync_channel(1); match self .message_sender .try_send(BatchMessage::ForceFlush(sender)) { - Ok(_) => receiver - .recv_timeout(self.forceflush_timeout) - .map_err(|err| { - if err == RecvTimeoutError::Timeout { - OTelSdkError::Timeout(self.forceflush_timeout) - } else { - OTelSdkError::InternalFailure(format!("{err}")) - } - })?, + Ok(_) => receiver.recv_timeout(timeout).map_err(|err| { + if err == RecvTimeoutError::Timeout { + OTelSdkError::Timeout(timeout) + } else { + OTelSdkError::InternalFailure(format!("{err}")) + } + })?, Err(mpsc::TrySendError::Full(_)) => { // If the control message could not be sent, emit a warning. otel_debug!( @@ -489,7 +486,6 @@ impl BatchLogProcessor { logs_sender, message_sender, handle: Mutex::new(Some(handle)), - forceflush_timeout: Duration::from_secs(5), // TODO: make this configurable dropped_logs_count: AtomicUsize::new(0), max_queue_size, export_log_message_sent: Arc::new(AtomicBool::new(false)), diff --git a/opentelemetry-sdk/src/logs/concurrent_log_processor.rs b/opentelemetry-sdk/src/logs/concurrent_log_processor.rs index 052e3d9796..414f7a7bf1 100644 --- a/opentelemetry-sdk/src/logs/concurrent_log_processor.rs +++ b/opentelemetry-sdk/src/logs/concurrent_log_processor.rs @@ -37,7 +37,7 @@ impl LogProcessor for SimpleConcurrentLogProcessor { } } - fn force_flush(&self) -> OTelSdkResult { + fn force_flush_with_timeout(&self, _timeout: Duration) -> OTelSdkResult { // TODO: invoke flush on exporter // once https://github.com/open-telemetry/opentelemetry-rust/issues/2261 // is resolved diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index de882e5f6f..35de04a87a 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -53,7 +53,11 @@ pub trait LogProcessor: Send + Sync + Debug { /// - `instrumentation`: The instrumentation scope associated with the log record. fn emit(&self, data: &mut SdkLogRecord, instrumentation: &InstrumentationScope); /// Force the logs lying in the cache to be exported. - fn force_flush(&self) -> OTelSdkResult; + fn force_flush_with_timeout(&self, _timeout: Duration) -> OTelSdkResult; + /// Force the logs lying in the cache to be exported with default timeout. + fn force_flush(&self) -> OTelSdkResult { + self.force_flush_with_timeout(Duration::from_secs(5)) + } /// Shuts down the processor. /// After shutdown returns the log processor should stop processing any logs. /// It's up to the implementation on when to drop the LogProcessor. @@ -103,6 +107,7 @@ pub(crate) mod tests { use opentelemetry::logs::{Logger, LoggerProvider}; use opentelemetry::{InstrumentationScope, Key}; use std::sync::{Arc, Mutex}; + use std::time::Duration; #[derive(Debug, Clone)] pub(crate) struct MockLogExporter { @@ -152,7 +157,7 @@ pub(crate) mod tests { .push((record.clone(), instrumentation.clone())); //clone as the LogProcessor is storing the data. } - fn force_flush(&self) -> OTelSdkResult { + fn force_flush_with_timeout(&self, _timeout: Duration) -> OTelSdkResult { Ok(()) } @@ -182,7 +187,7 @@ pub(crate) mod tests { .push((record.clone(), instrumentation.clone())); } - fn force_flush(&self) -> OTelSdkResult { + fn force_flush_with_timeout(&self, _timeout: Duration) -> OTelSdkResult { Ok(()) } diff --git a/opentelemetry-sdk/src/logs/log_processor_with_async_runtime.rs b/opentelemetry-sdk/src/logs/log_processor_with_async_runtime.rs index bd725bec9a..46d3f84628 100644 --- a/opentelemetry-sdk/src/logs/log_processor_with_async_runtime.rs +++ b/opentelemetry-sdk/src/logs/log_processor_with_async_runtime.rs @@ -76,7 +76,7 @@ impl LogProcessor for BatchLogProcessor { } } - fn force_flush(&self) -> OTelSdkResult { + fn force_flush_with_timeout(&self, _timeout: Duration) -> OTelSdkResult { let (res_sender, res_receiver) = oneshot::channel(); self.message_sender .try_send(BatchMessage::Flush(Some(res_sender))) @@ -625,7 +625,7 @@ mod tests { .push((record.clone(), instrumentation.clone())); //clone as the LogProcessor is storing the data. } - fn force_flush(&self) -> OTelSdkResult { + fn force_flush_with_timeout(&self, _timeout: Duration) -> OTelSdkResult { Ok(()) } @@ -655,7 +655,7 @@ mod tests { .push((record.clone(), instrumentation.clone())); } - fn force_flush(&self) -> OTelSdkResult { + fn force_flush_with_timeout(&self, _timeout: Duration) -> OTelSdkResult { Ok(()) } diff --git a/opentelemetry-sdk/src/logs/logger_provider.rs b/opentelemetry-sdk/src/logs/logger_provider.rs index 0097c3b6ca..00f6956ecc 100644 --- a/opentelemetry-sdk/src/logs/logger_provider.rs +++ b/opentelemetry-sdk/src/logs/logger_provider.rs @@ -83,11 +83,11 @@ impl SdkLoggerProvider { } /// Force flush all remaining logs in log processors and return results. - pub fn force_flush(&self) -> OTelSdkResult { + pub fn force_flush_with_timeout(&self, timeout: Duration) -> OTelSdkResult { let result: Vec<_> = self .log_processors() .iter() - .map(|processor| processor.force_flush()) + .map(|processor| processor.force_flush_with_timeout(timeout)) .collect(); if result.iter().all(|r| r.is_ok()) { Ok(()) @@ -96,6 +96,11 @@ impl SdkLoggerProvider { } } + /// Force flush all remaining logs with default timeout. + pub fn force_flush(&self) -> OTelSdkResult { + self.force_flush_with_timeout(Duration::from_secs(5)) + } + /// Shuts down this `LoggerProvider` pub fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult { otel_debug!( @@ -340,7 +345,7 @@ mod tests { .expect("lock poisoned"); } - fn force_flush(&self) -> OTelSdkResult { + fn force_flush_with_timeout(&self, _timeout: Duration) -> OTelSdkResult { Ok(()) } @@ -393,7 +398,7 @@ mod tests { // nothing to do. } - fn force_flush(&self) -> OTelSdkResult { + fn force_flush_with_timeout(&self, _timeout: Duration) -> OTelSdkResult { Ok(()) } @@ -913,7 +918,7 @@ mod tests { // nothing to do. } - fn force_flush(&self) -> OTelSdkResult { + fn force_flush_with_timeout(&self, _timeout: Duration) -> OTelSdkResult { *self.flush_called.lock().unwrap() = true; Ok(()) } @@ -944,7 +949,7 @@ mod tests { // nothing to do } - fn force_flush(&self) -> OTelSdkResult { + fn force_flush_with_timeout(&self, _timeout: Duration) -> OTelSdkResult { *self.flush_called.lock().unwrap() = true; Ok(()) } diff --git a/opentelemetry-sdk/src/logs/mod.rs b/opentelemetry-sdk/src/logs/mod.rs index c54f001f97..dbc913dda1 100644 --- a/opentelemetry-sdk/src/logs/mod.rs +++ b/opentelemetry-sdk/src/logs/mod.rs @@ -46,6 +46,7 @@ mod tests { use std::borrow::Borrow; use std::collections::HashMap; use std::sync::{Arc, Mutex}; + use std::time::Duration; #[test] fn logging_sdk_test() { @@ -167,7 +168,7 @@ mod tests { }); } - fn force_flush(&self) -> crate::error::OTelSdkResult { + fn force_flush_with_timeout(&self, _timeout: Duration) -> OTelSdkResult { Ok(()) } @@ -273,7 +274,7 @@ mod tests { } } - fn force_flush(&self) -> OTelSdkResult { + fn force_flush_with_timeout(&self, _timeout: Duration) -> OTelSdkResult { Ok(()) } diff --git a/opentelemetry-sdk/src/logs/simple_log_processor.rs b/opentelemetry-sdk/src/logs/simple_log_processor.rs index 5c8642221a..d388d0728c 100644 --- a/opentelemetry-sdk/src/logs/simple_log_processor.rs +++ b/opentelemetry-sdk/src/logs/simple_log_processor.rs @@ -113,7 +113,7 @@ impl LogProcessor for SimpleLogProcessor { } } - fn force_flush(&self) -> OTelSdkResult { + fn force_flush_with_timeout(&self, _timeout: Duration) -> OTelSdkResult { Ok(()) } diff --git a/opentelemetry-sdk/src/trace/export.rs b/opentelemetry-sdk/src/trace/export.rs index f09e5d5707..7b3a48996a 100644 --- a/opentelemetry-sdk/src/trace/export.rs +++ b/opentelemetry-sdk/src/trace/export.rs @@ -66,10 +66,15 @@ pub trait SpanExporter: Send + Sync + Debug { /// implemented as a blocking API or an asynchronous API which notifies the caller via /// a callback or an event. OpenTelemetry client authors can decide if they want to /// make the flush timeout configurable. - fn force_flush(&mut self) -> OTelSdkResult { + fn force_flush_with_timeout(&mut self, _timeout: Duration) -> OTelSdkResult { Ok(()) } + /// Force flush the exporter with default timeout. + fn force_flush(&mut self) -> OTelSdkResult { + self.force_flush_with_timeout(Duration::from_secs(5)) + } + /// Set the resource for the exporter. fn set_resource(&mut self, _resource: &Resource) {} } diff --git a/opentelemetry-sdk/src/trace/mod.rs b/opentelemetry-sdk/src/trace/mod.rs index 585f65f27d..7f793b5c45 100644 --- a/opentelemetry-sdk/src/trace/mod.rs +++ b/opentelemetry-sdk/src/trace/mod.rs @@ -143,7 +143,7 @@ mod tests { // let _c = Context::current(); } - fn force_flush(&self) -> crate::error::OTelSdkResult { + fn force_flush_with_timeout(&self, _timeout: Duration) -> OTelSdkResult { Ok(()) } diff --git a/opentelemetry-sdk/src/trace/provider.rs b/opentelemetry-sdk/src/trace/provider.rs index 2b05f89aea..70f3654a2d 100644 --- a/opentelemetry-sdk/src/trace/provider.rs +++ b/opentelemetry-sdk/src/trace/provider.rs @@ -194,7 +194,7 @@ impl SdkTracerProvider { self.inner.is_shutdown.load(Ordering::Relaxed) } - /// Force flush all remaining spans in span processors and return results. + /// Force flush all remaining spans in span processors with a default timeout and return results. /// /// # Examples /// @@ -228,10 +228,15 @@ impl SdkTracerProvider { /// } /// ``` pub fn force_flush(&self) -> OTelSdkResult { + self.force_flush_with_timeout(Duration::from_secs(5)) + } + + /// force flush processors with a specified timeout + pub fn force_flush_with_timeout(&self, timeout: Duration) -> OTelSdkResult { let result: Vec<_> = self .span_processors() .iter() - .map(|processor| processor.force_flush()) + .map(|processor| processor.force_flush_with_timeout(timeout)) .collect(); if result.iter().all(|r| r.is_ok()) { Ok(()) @@ -530,7 +535,7 @@ mod tests { // ignore } - fn force_flush(&self) -> OTelSdkResult { + fn force_flush_with_timeout(&self, _timeout: Duration) -> OTelSdkResult { if self.success { Ok(()) } else { @@ -793,7 +798,7 @@ mod tests { // No operation needed for this processor } - fn force_flush(&self) -> OTelSdkResult { + fn force_flush_with_timeout(&self, _timeout: Duration) -> OTelSdkResult { Ok(()) } diff --git a/opentelemetry-sdk/src/trace/span_processor.rs b/opentelemetry-sdk/src/trace/span_processor.rs index 0ebe80f885..c6a9ca7a4a 100644 --- a/opentelemetry-sdk/src/trace/span_processor.rs +++ b/opentelemetry-sdk/src/trace/span_processor.rs @@ -85,7 +85,11 @@ pub trait SpanProcessor: Send + Sync + std::fmt::Debug { /// TODO - This method should take reference to `SpanData` fn on_end(&self, span: SpanData); /// Force the spans lying in the cache to be exported. - fn force_flush(&self) -> OTelSdkResult; + fn force_flush_with_timeout(&self, _timeout: Duration) -> OTelSdkResult; + /// Force flush the spans with a default timeout. + fn force_flush(&self) -> OTelSdkResult { + self.force_flush_with_timeout(Duration::from_secs(5)) + } /// Shuts down the processor. Called when SDK is shut down. This is an /// opportunity for processors to do any cleanup required. /// @@ -153,7 +157,7 @@ impl SpanProcessor for SimpleSpanProcessor { } } - fn force_flush(&self) -> OTelSdkResult { + fn force_flush_with_timeout(&self, _timeout: Duration) -> OTelSdkResult { // Nothing to flush for simple span processor. Ok(()) } @@ -286,7 +290,6 @@ pub struct BatchSpanProcessor { span_sender: SyncSender, // Data channel to store spans message_sender: SyncSender, // Control channel to store control messages. handle: Mutex>>, - forceflush_timeout: Duration, export_span_message_sent: Arc, current_batch_size: Arc, max_export_batch_size: usize, @@ -424,7 +427,6 @@ impl BatchSpanProcessor { span_sender, message_sender, handle: Mutex::new(Some(handle)), - forceflush_timeout: Duration::from_secs(5), // TODO: make this configurable dropped_spans_count: AtomicUsize::new(0), max_queue_size, export_span_message_sent: Arc::new(AtomicBool::new(false)), @@ -593,21 +595,19 @@ impl SpanProcessor for BatchSpanProcessor { } /// Flushes all pending spans. - fn force_flush(&self) -> OTelSdkResult { + fn force_flush_with_timeout(&self, timeout: Duration) -> OTelSdkResult { let (sender, receiver) = std::sync::mpsc::sync_channel(1); match self .message_sender .try_send(BatchMessage::ForceFlush(sender)) { - Ok(_) => receiver - .recv_timeout(self.forceflush_timeout) - .map_err(|err| { - if err == std::sync::mpsc::RecvTimeoutError::Timeout { - OTelSdkError::Timeout(self.forceflush_timeout) - } else { - OTelSdkError::InternalFailure(format!("{err}")) - } - })?, + Ok(_) => receiver.recv_timeout(timeout).map_err(|err| { + if err == std::sync::mpsc::RecvTimeoutError::Timeout { + OTelSdkError::Timeout(timeout) + } else { + OTelSdkError::InternalFailure(format!("{err}")) + } + })?, Err(std::sync::mpsc::TrySendError::Full(_)) => { // If the control message could not be sent, emit a warning. otel_debug!( diff --git a/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs b/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs index b294f74043..f349377e04 100644 --- a/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs +++ b/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs @@ -123,7 +123,7 @@ impl SpanProcessor for BatchSpanProcessor { } } - fn force_flush(&self) -> OTelSdkResult { + fn force_flush_with_timeout(&self, _timeout: Duration) -> OTelSdkResult { let (res_sender, res_receiver) = oneshot::channel(); self.message_sender .try_send(BatchMessage::Flush(Some(res_sender))) diff --git a/stress/src/traces.rs b/stress/src/traces.rs index e0f15099e5..bccb59bd51 100644 --- a/stress/src/traces.rs +++ b/stress/src/traces.rs @@ -41,7 +41,7 @@ impl SpanProcessor for NoOpSpanProcessor { // No-op } - fn force_flush(&self) -> OTelSdkResult { + fn force_flush_with_timeout(&self, _timeout: Duration) -> OTelSdkResult { Ok(()) }