fleetforge_telemetry/
telemetry_core.rs

1use anyhow::{anyhow, Result};
2use opentelemetry::trace::{SpanId, TraceFlags, TraceId};
3use opentelemetry::KeyValue;
4use opentelemetry_otlp::WithExportConfig;
5use opentelemetry_sdk::{runtime::Tokio, trace, Resource};
6use std::env;
7use tonic09::metadata::{MetadataKey, MetadataMap, MetadataValue};
8use tracing::Subscriber;
9use tracing_opentelemetry::OpenTelemetryLayer;
10use tracing_subscriber::registry::LookupSpan;
11
12pub mod propagation {
13    use super::*;
14
15    pub fn traceparent(trace_id: TraceId, span_id: SpanId, trace_flags: TraceFlags) -> String {
16        format!(
17            "00-{}-{}-{:02x}",
18            trace_id.to_string(),
19            span_id.to_string(),
20            trace_flags.to_u8()
21        )
22    }
23
24    pub fn baggage_header(baggage: &[(String, String)]) -> Option<String> {
25        if baggage.is_empty() {
26            return None;
27        }
28
29        let formatted = baggage
30            .iter()
31            .map(|(k, v)| format!("{}={}", encode_baggage_token(k), encode_baggage_token(v)))
32            .collect::<Vec<_>>()
33            .join(",");
34
35        if formatted.is_empty() {
36            None
37        } else {
38            Some(formatted)
39        }
40    }
41
42    pub fn w3c_headers(
43        trace_id: TraceId,
44        span_id: SpanId,
45        trace_flags: TraceFlags,
46        tracestate: Option<&str>,
47        baggage: &[(String, String)],
48    ) -> Vec<(String, String)> {
49        let mut headers = vec![(
50            "traceparent".to_string(),
51            traceparent(trace_id, span_id, trace_flags),
52        )];
53
54        if let Some(state) = tracestate {
55            if !state.is_empty() {
56                headers.push(("tracestate".to_string(), state.to_string()));
57            }
58        }
59
60        if let Some(value) = baggage_header(baggage) {
61            headers.push(("baggage".to_string(), value));
62        }
63
64        headers
65    }
66
67    fn encode_baggage_token(token: &str) -> String {
68        url::form_urlencoded::byte_serialize(token.as_bytes()).collect()
69    }
70}
71
72pub mod otlp {
73    use super::*;
74    use opentelemetry::global;
75    use opentelemetry_sdk::trace::Tracer;
76
77    pub fn build_layers<S>(service_name: &str) -> Result<OpenTelemetryLayer<S, Tracer>>
78    where
79        S: Subscriber + for<'span> LookupSpan<'span>,
80    {
81        global::set_text_map_propagator(
82            opentelemetry_sdk::propagation::TraceContextPropagator::new(),
83        );
84
85        let resource = Resource::new(vec![
86            KeyValue::new("service.name", service_name.to_string()),
87            KeyValue::new("service.namespace", "fleetforge"),
88            KeyValue::new(
89                "service.version",
90                env::var("FLEETFORGE_SERVICE_VERSION")
91                    .unwrap_or_else(|_| env!("CARGO_PKG_VERSION").to_string()),
92            ),
93            KeyValue::new(
94                "deployment.environment",
95                env::var("RUST_ENV").unwrap_or_else(|_| "development".into()),
96            ),
97        ]);
98
99        let endpoint =
100            env::var("OTEL_EXPORTER_OTLP_ENDPOINT").unwrap_or_else(|_| default_otlp_endpoint());
101        let header_pairs = parse_otlp_headers(
102            &env::var("OTEL_EXPORTER_OTLP_HEADERS").unwrap_or_else(|_| default_otlp_headers()),
103        );
104
105        let mut exporter = opentelemetry_otlp::new_exporter()
106            .tonic()
107            .with_endpoint(endpoint);
108        if let Some(metadata) = build_otlp_metadata(header_pairs)? {
109            exporter = exporter.with_metadata(metadata);
110        }
111
112        let tracer = opentelemetry_otlp::new_pipeline()
113            .tracing()
114            .with_exporter(exporter)
115            .with_trace_config(trace::Config::default().with_resource(resource))
116            .install_batch(Tokio)?;
117
118        Ok(tracing_opentelemetry::layer().with_tracer(tracer))
119    }
120
121    pub fn parse_otlp_headers(raw: &str) -> Vec<(String, String)> {
122        raw.split(',')
123            .filter_map(|pair| {
124                let mut parts = pair.splitn(2, '=');
125                let key = parts.next()?.trim();
126                let value = parts.next()?.trim();
127                if key.is_empty() || value.is_empty() {
128                    return None;
129                }
130                Some((key.to_string(), value.to_string()))
131            })
132            .collect()
133    }
134
135    pub fn build_otlp_metadata(pairs: Vec<(String, String)>) -> Result<Option<MetadataMap>> {
136        if pairs.is_empty() {
137            return Ok(None);
138        }
139
140        let mut metadata = MetadataMap::new();
141
142        for (key, value) in pairs {
143            let metadata_key = MetadataKey::from_bytes(key.as_bytes())
144                .map_err(|err| anyhow!("invalid OTLP header key '{}': {err}", key))?;
145            let metadata_value = MetadataValue::try_from(value.as_str())
146                .map_err(|err| anyhow!("invalid OTLP header value for '{}': {err}", key))?;
147            metadata.insert(metadata_key, metadata_value);
148        }
149
150        Ok(Some(metadata))
151    }
152
153    fn default_otlp_endpoint() -> String {
154        if env::var("GOOGLE_CLOUD_PROJECT").is_ok() || env::var("GCP_PROJECT").is_ok() {
155            "https://otlp.googleapis.com:443".to_string()
156        } else {
157            "http://localhost:4317".to_string()
158        }
159    }
160
161    fn default_otlp_headers() -> String {
162        if let Ok(project) = env::var("GOOGLE_CLOUD_PROJECT")
163            .or_else(|_| env::var("GCP_PROJECT"))
164            .or_else(|_| env::var("PROJECT_ID"))
165        {
166            format!("x-goog-user-project={project}")
167        } else {
168            String::new()
169        }
170    }
171
172    #[cfg(test)]
173    mod tests {
174        use super::*;
175
176        #[test]
177        fn parse_headers_skips_empty_entries() {
178            let headers = parse_otlp_headers("auth = token , , key=value");
179            assert_eq!(headers.len(), 2);
180            assert_eq!(headers[0], ("auth".into(), "token".into()));
181            assert_eq!(headers[1], ("key".into(), "value".into()));
182        }
183
184        #[test]
185        fn build_metadata_rejects_invalid_key() {
186            let err = build_otlp_metadata(vec![("spa ce".into(), "value".into())]).unwrap_err();
187            assert!(err.to_string().contains("invalid OTLP header key"));
188        }
189
190        #[test]
191        fn build_metadata_rejects_invalid_value() {
192            let err = build_otlp_metadata(vec![("x-test".into(), "\u{7f}".into())]).unwrap_err();
193            assert!(err.to_string().contains("invalid OTLP header value"));
194        }
195
196        #[test]
197        fn build_metadata_accepts_valid_pairs() {
198            let meta = build_otlp_metadata(vec![
199                ("authorization".into(), "Bearer token".into()),
200                ("x-test".into(), "value".into()),
201            ])
202            .unwrap()
203            .unwrap();
204
205            assert!(meta.contains_key("authorization"));
206            assert!(meta.contains_key("x-test"));
207        }
208    }
209}