fleetforge_telemetry/
telemetry_core.rs1use anyhow::{anyhow, Result};
2use opentelemetry::trace::{SpanId, TraceFlags, TraceId};
3use opentelemetry::KeyValue;
4use opentelemetry_otlp::WithExportConfig;
5use opentelemetry_sdk::{runtime::Tokio, trace, Resource};
6use std::env;
7use tonic09::metadata::{MetadataKey, MetadataMap, MetadataValue};
8use tracing::Subscriber;
9use tracing_opentelemetry::OpenTelemetryLayer;
10use tracing_subscriber::registry::LookupSpan;
11
12pub mod propagation {
13 use super::*;
14
15 pub fn traceparent(trace_id: TraceId, span_id: SpanId, trace_flags: TraceFlags) -> String {
16 format!(
17 "00-{}-{}-{:02x}",
18 trace_id.to_string(),
19 span_id.to_string(),
20 trace_flags.to_u8()
21 )
22 }
23
24 pub fn baggage_header(baggage: &[(String, String)]) -> Option<String> {
25 if baggage.is_empty() {
26 return None;
27 }
28
29 let formatted = baggage
30 .iter()
31 .map(|(k, v)| format!("{}={}", encode_baggage_token(k), encode_baggage_token(v)))
32 .collect::<Vec<_>>()
33 .join(",");
34
35 if formatted.is_empty() {
36 None
37 } else {
38 Some(formatted)
39 }
40 }
41
42 pub fn w3c_headers(
43 trace_id: TraceId,
44 span_id: SpanId,
45 trace_flags: TraceFlags,
46 tracestate: Option<&str>,
47 baggage: &[(String, String)],
48 ) -> Vec<(String, String)> {
49 let mut headers = vec![(
50 "traceparent".to_string(),
51 traceparent(trace_id, span_id, trace_flags),
52 )];
53
54 if let Some(state) = tracestate {
55 if !state.is_empty() {
56 headers.push(("tracestate".to_string(), state.to_string()));
57 }
58 }
59
60 if let Some(value) = baggage_header(baggage) {
61 headers.push(("baggage".to_string(), value));
62 }
63
64 headers
65 }
66
67 fn encode_baggage_token(token: &str) -> String {
68 url::form_urlencoded::byte_serialize(token.as_bytes()).collect()
69 }
70}
71
72pub mod otlp {
73 use super::*;
74 use opentelemetry::global;
75 use opentelemetry_sdk::trace::Tracer;
76
77 pub fn build_layers<S>(service_name: &str) -> Result<OpenTelemetryLayer<S, Tracer>>
78 where
79 S: Subscriber + for<'span> LookupSpan<'span>,
80 {
81 global::set_text_map_propagator(
82 opentelemetry_sdk::propagation::TraceContextPropagator::new(),
83 );
84
85 let resource = Resource::new(vec![
86 KeyValue::new("service.name", service_name.to_string()),
87 KeyValue::new("service.namespace", "fleetforge"),
88 KeyValue::new(
89 "service.version",
90 env::var("FLEETFORGE_SERVICE_VERSION")
91 .unwrap_or_else(|_| env!("CARGO_PKG_VERSION").to_string()),
92 ),
93 KeyValue::new(
94 "deployment.environment",
95 env::var("RUST_ENV").unwrap_or_else(|_| "development".into()),
96 ),
97 ]);
98
99 let endpoint =
100 env::var("OTEL_EXPORTER_OTLP_ENDPOINT").unwrap_or_else(|_| default_otlp_endpoint());
101 let header_pairs = parse_otlp_headers(
102 &env::var("OTEL_EXPORTER_OTLP_HEADERS").unwrap_or_else(|_| default_otlp_headers()),
103 );
104
105 let mut exporter = opentelemetry_otlp::new_exporter()
106 .tonic()
107 .with_endpoint(endpoint);
108 if let Some(metadata) = build_otlp_metadata(header_pairs)? {
109 exporter = exporter.with_metadata(metadata);
110 }
111
112 let tracer = opentelemetry_otlp::new_pipeline()
113 .tracing()
114 .with_exporter(exporter)
115 .with_trace_config(trace::Config::default().with_resource(resource))
116 .install_batch(Tokio)?;
117
118 Ok(tracing_opentelemetry::layer().with_tracer(tracer))
119 }
120
121 pub fn parse_otlp_headers(raw: &str) -> Vec<(String, String)> {
122 raw.split(',')
123 .filter_map(|pair| {
124 let mut parts = pair.splitn(2, '=');
125 let key = parts.next()?.trim();
126 let value = parts.next()?.trim();
127 if key.is_empty() || value.is_empty() {
128 return None;
129 }
130 Some((key.to_string(), value.to_string()))
131 })
132 .collect()
133 }
134
135 pub fn build_otlp_metadata(pairs: Vec<(String, String)>) -> Result<Option<MetadataMap>> {
136 if pairs.is_empty() {
137 return Ok(None);
138 }
139
140 let mut metadata = MetadataMap::new();
141
142 for (key, value) in pairs {
143 let metadata_key = MetadataKey::from_bytes(key.as_bytes())
144 .map_err(|err| anyhow!("invalid OTLP header key '{}': {err}", key))?;
145 let metadata_value = MetadataValue::try_from(value.as_str())
146 .map_err(|err| anyhow!("invalid OTLP header value for '{}': {err}", key))?;
147 metadata.insert(metadata_key, metadata_value);
148 }
149
150 Ok(Some(metadata))
151 }
152
153 fn default_otlp_endpoint() -> String {
154 if env::var("GOOGLE_CLOUD_PROJECT").is_ok() || env::var("GCP_PROJECT").is_ok() {
155 "https://otlp.googleapis.com:443".to_string()
156 } else {
157 "http://localhost:4317".to_string()
158 }
159 }
160
161 fn default_otlp_headers() -> String {
162 if let Ok(project) = env::var("GOOGLE_CLOUD_PROJECT")
163 .or_else(|_| env::var("GCP_PROJECT"))
164 .or_else(|_| env::var("PROJECT_ID"))
165 {
166 format!("x-goog-user-project={project}")
167 } else {
168 String::new()
169 }
170 }
171
172 #[cfg(test)]
173 mod tests {
174 use super::*;
175
176 #[test]
177 fn parse_headers_skips_empty_entries() {
178 let headers = parse_otlp_headers("auth = token , , key=value");
179 assert_eq!(headers.len(), 2);
180 assert_eq!(headers[0], ("auth".into(), "token".into()));
181 assert_eq!(headers[1], ("key".into(), "value".into()));
182 }
183
184 #[test]
185 fn build_metadata_rejects_invalid_key() {
186 let err = build_otlp_metadata(vec![("spa ce".into(), "value".into())]).unwrap_err();
187 assert!(err.to_string().contains("invalid OTLP header key"));
188 }
189
190 #[test]
191 fn build_metadata_rejects_invalid_value() {
192 let err = build_otlp_metadata(vec![("x-test".into(), "\u{7f}".into())]).unwrap_err();
193 assert!(err.to_string().contains("invalid OTLP header value"));
194 }
195
196 #[test]
197 fn build_metadata_accepts_valid_pairs() {
198 let meta = build_otlp_metadata(vec![
199 ("authorization".into(), "Bearer token".into()),
200 ("x-test".into(), "value".into()),
201 ])
202 .unwrap()
203 .unwrap();
204
205 assert!(meta.contains_key("authorization"));
206 assert!(meta.contains_key("x-test"));
207 }
208 }
209}