veecle_telemetry/collector/
mod.rs

1//! Telemetry data collection and export infrastructure.
2//!
3//! This module provides the core infrastructure for collecting telemetry data and exporting it
4//! to various backends.
5//! It includes the global collector singleton, export trait, and various
6//! built-in exporters.
7//!
8//! # Global Collector
9//!
10//! The collector uses a global singleton pattern to ensure telemetry data is collected
11//! consistently across the entire application.
12//! The collector must be initialized once
13//! using [`set_exporter`] before any telemetry data can be collected.
14//!
15//! # Export Trait
16//!
17//! The [`Export`] trait defines the interface for exporting telemetry data.
18//! Custom exporters can be implemented by providing an implementation of this trait.
19//!
20//! # Built-in Exporters
21//!
22//! - [`ConsoleJsonExporter`] - Exports telemetry data as JSON to stdout
23//! - [`TestExporter`] - Collects telemetry data in memory for testing purposes
24
25#[cfg(feature = "std")]
26mod json_exporter;
27#[cfg(feature = "std")]
28mod test_exporter;
29
30use core::fmt::Debug;
31#[cfg(feature = "enable")]
32use core::sync::atomic::{AtomicUsize, Ordering};
33use core::{error, fmt};
34
35#[cfg(feature = "std")]
36pub use json_exporter::ConsoleJsonExporter;
37#[cfg(feature = "std")]
38#[doc(hidden)]
39pub use test_exporter::TestExporter;
40
41use crate::protocol::InstanceMessage;
42#[cfg(feature = "enable")]
43pub use crate::protocol::ProcessId;
44#[cfg(feature = "enable")]
45use crate::protocol::{
46    LogMessage, SpanAddEventMessage, SpanAddLinkMessage, SpanCloseMessage, SpanCreateMessage,
47    SpanEnterMessage, SpanExitMessage, SpanSetAttributeMessage, TelemetryMessage, ThreadId,
48    TracingMessage,
49};
50
51/// Trait for exporting telemetry data to external systems.
52///
53/// Implementors of this trait define how telemetry data should be exported,
54/// whether to files, network endpoints, or other destinations.
55///
56/// # Examples
57///
58/// ```rust
59/// use veecle_telemetry::collector::Export;
60/// use veecle_telemetry::protocol::InstanceMessage;
61///
62/// #[derive(Debug)]
63/// struct CustomExporter;
64///
65/// impl Export for CustomExporter {
66///     fn export(&self, message: InstanceMessage<'_>) {
67///         // Custom export logic here
68///         println!("Exporting: {:?}", message);
69///     }
70/// }
71/// ```
72pub trait Export: Debug {
73    /// Exports a telemetry message.
74    ///
75    /// This method is called for each telemetry message that needs to be exported.
76    /// The implementation should handle the message appropriately based on its type.
77    fn export(&self, message: InstanceMessage<'_>);
78}
79
80/// The global telemetry collector.
81///
82/// This structure manages the collection and export of telemetry data.
83/// It maintains a unique execution ID, handles trace ID generation, and coordinates with the
84/// configured exporter.
85///
86/// The collector is typically accessed through the [`get_collector`] function rather
87/// than being constructed directly.
88#[derive(Debug)]
89pub struct Collector {
90    #[cfg(feature = "enable")]
91    inner: CollectorInner,
92}
93
94#[cfg(feature = "enable")]
95#[derive(Debug)]
96struct CollectorInner {
97    process_id: ProcessId,
98
99    exporter: &'static (dyn Export + Sync),
100}
101
102#[cfg(feature = "enable")]
103#[derive(Debug)]
104struct NopExporter;
105
106#[cfg(feature = "enable")]
107impl Export for NopExporter {
108    fn export(&self, _: InstanceMessage) {}
109}
110
111// The GLOBAL_COLLECTOR static holds a pointer to the global exporter. It is protected by
112// the GLOBAL_INIT static which determines whether GLOBAL_EXPORTER has been initialized.
113#[cfg(feature = "enable")]
114static mut GLOBAL_COLLECTOR: Collector = Collector {
115    inner: CollectorInner {
116        process_id: ProcessId::from_raw(0),
117        exporter: &NO_EXPORTER,
118    },
119};
120static NO_COLLECTOR: Collector = Collector {
121    #[cfg(feature = "enable")]
122    inner: CollectorInner {
123        process_id: ProcessId::from_raw(0),
124        exporter: &NO_EXPORTER,
125    },
126};
127#[cfg(feature = "enable")]
128static NO_EXPORTER: NopExporter = NopExporter;
129
130#[cfg(feature = "enable")]
131static GLOBAL_INIT: AtomicUsize = AtomicUsize::new(0);
132
133// There are three different states that we care about:
134// - the collector is uninitialized
135// - the collector is initializing (set_exporter has been called but GLOBAL_COLLECTOR hasn't been set yet)
136// - the collector is active
137#[cfg(feature = "enable")]
138const UNINITIALIZED: usize = 0;
139#[cfg(feature = "enable")]
140const INITIALIZING: usize = 1;
141#[cfg(feature = "enable")]
142const INITIALIZED: usize = 2;
143
144/// Initializes the collector with the given Exporter and [`ProcessId`].
145///
146/// A [`ProcessId`] should never be re-used as it's used to collect metadata about the execution and to generate
147/// [`SpanContext`]s which need to be globally unique.
148///
149/// [`SpanContext`]: crate::SpanContext
150#[cfg(feature = "enable")]
151pub fn set_exporter(
152    process_id: ProcessId,
153    exporter: &'static (dyn Export + Sync),
154) -> Result<(), SetExporterError> {
155    if GLOBAL_INIT
156        .compare_exchange(
157            UNINITIALIZED,
158            INITIALIZING,
159            Ordering::Acquire,
160            Ordering::Relaxed,
161        )
162        .is_ok()
163    {
164        // SAFETY: this is guarded by the atomic
165        unsafe { GLOBAL_COLLECTOR = Collector::new(process_id, exporter) }
166        GLOBAL_INIT.store(INITIALIZED, Ordering::Release);
167
168        Ok(())
169    } else {
170        Err(SetExporterError(()))
171    }
172}
173
174/// Returns a reference to the collector.
175///
176/// If an exporter has not been set, a no-op implementation is returned.
177pub fn get_collector() -> &'static Collector {
178    #[cfg(not(feature = "enable"))]
179    {
180        &NO_COLLECTOR
181    }
182
183    // Acquire memory ordering guarantees that current thread would see any
184    // memory writes that happened before store of the value
185    // into `GLOBAL_INIT` with memory ordering `Release` or stronger.
186    //
187    // Since the value `INITIALIZED` is written only after `GLOBAL_COLLECTOR` was
188    // initialized, observing it after `Acquire` load here makes both
189    // write to the `GLOBAL_COLLECTOR` static and initialization of the exporter
190    // internal state synchronized with current thread.
191    #[cfg(feature = "enable")]
192    if GLOBAL_INIT.load(Ordering::Acquire) != INITIALIZED {
193        &NO_COLLECTOR
194    } else {
195        // SAFETY: this is guarded by the atomic
196        unsafe {
197            #[expect(clippy::deref_addrof, reason = "false positive")]
198            &*&raw const GLOBAL_COLLECTOR
199        }
200    }
201}
202
203/// The type returned by [`set_exporter`] if [`set_exporter`] has already been called.
204///
205/// [`set_exporter`]: fn.set_exporter.html
206#[derive(Debug)]
207pub struct SetExporterError(());
208
209impl SetExporterError {
210    const MESSAGE: &'static str = "a global exporter has already been set";
211}
212
213impl fmt::Display for SetExporterError {
214    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
215        fmt.write_str(Self::MESSAGE)
216    }
217}
218
219impl error::Error for SetExporterError {}
220
221#[cfg(feature = "enable")]
222impl Collector {
223    fn new(process_id: ProcessId, exporter: &'static (dyn Export + Sync)) -> Self {
224        Self {
225            inner: CollectorInner {
226                process_id,
227                exporter,
228            },
229        }
230    }
231
232    #[inline]
233    pub(crate) fn process_id(&self) -> ProcessId {
234        self.inner.process_id
235    }
236
237    /// Collects and exports an external telemetry message.
238    ///
239    /// This method allows external systems to inject telemetry messages into the
240    /// collector pipeline.
241    /// The message will be exported using the configured exporter.
242    ///
243    /// # Examples
244    ///
245    /// ```rust
246    /// use core::num::NonZeroU64;
247    /// use veecle_telemetry::collector::get_collector;
248    /// use veecle_telemetry::protocol::{
249    ///     ThreadId,
250    ///     ProcessId,
251    ///     InstanceMessage,
252    ///     TelemetryMessage,
253    ///     TimeSyncMessage,
254    /// };
255    ///
256    /// let collector = get_collector();
257    /// let message = InstanceMessage {
258    ///     thread: ThreadId::from_raw(ProcessId::from_raw(1), NonZeroU64::new(1).unwrap()),
259    ///     message: TelemetryMessage::TimeSync(TimeSyncMessage {
260    ///         local_timestamp: 0,
261    ///         since_epoch: 0,
262    ///     }),
263    /// };
264    /// collector.collect_external(message);
265    /// ```
266    #[inline]
267    pub fn collect_external(&self, message: InstanceMessage<'_>) {
268        self.inner.exporter.export(message);
269    }
270
271    #[inline]
272    pub(crate) fn new_span(&self, span: SpanCreateMessage<'_>) {
273        self.tracing_message(TracingMessage::CreateSpan(span));
274    }
275
276    #[inline]
277    pub(crate) fn enter_span(&self, enter: SpanEnterMessage) {
278        self.tracing_message(TracingMessage::EnterSpan(enter));
279    }
280
281    #[inline]
282    pub(crate) fn exit_span(&self, exit: SpanExitMessage) {
283        self.tracing_message(TracingMessage::ExitSpan(exit));
284    }
285
286    #[inline]
287    pub(crate) fn close_span(&self, span: SpanCloseMessage) {
288        self.tracing_message(TracingMessage::CloseSpan(span));
289    }
290
291    #[inline]
292    pub(crate) fn span_event(&self, event: SpanAddEventMessage<'_>) {
293        self.tracing_message(TracingMessage::AddEvent(event));
294    }
295
296    #[inline]
297    pub(crate) fn span_link(&self, link: SpanAddLinkMessage) {
298        self.tracing_message(TracingMessage::AddLink(link));
299    }
300
301    #[inline]
302    pub(crate) fn span_attribute(&self, attribute: SpanSetAttributeMessage<'_>) {
303        self.tracing_message(TracingMessage::SetAttribute(attribute));
304    }
305
306    #[inline]
307    pub(crate) fn log_message(&self, log: LogMessage<'_>) {
308        self.inner.exporter.export(InstanceMessage {
309            thread: ThreadId::current(self.inner.process_id),
310            message: TelemetryMessage::Log(log),
311        });
312    }
313
314    #[inline]
315    fn tracing_message(&self, message: TracingMessage<'_>) {
316        self.inner.exporter.export(InstanceMessage {
317            thread: ThreadId::current(self.inner.process_id),
318            message: TelemetryMessage::Tracing(message),
319        });
320    }
321}