fleetforge_telemetry/exporters/
mod.rs1pub mod otlp;
2
3use std::{collections::HashSet, env, fs, path::Path, sync::Arc};
4
5use anyhow::{anyhow, Context, Result};
6use base64::engine::general_purpose::STANDARD as BASE64_STANDARD;
7use base64::Engine;
8use chrono::SecondsFormat;
9use fleetforge_bus::OutboxExporter;
10use fleetforge_storage::models::OutboxEvent;
11use reqwest::{Client, Url};
12use serde_json::{json, Value};
13use tracing::{info, warn};
14
15const EXPORTERS_ENV: &str = "FLEETFORGE_TRACE_EXPORTERS";
16const EXPORTER_PLUGIN_DIR_ENV: &str = "FLEETFORGE_EXPORTER_PLUGIN_DIR";
17const DEFAULT_EXPORTER_PLUGIN_DIR: &str = "plugins/exporters";
18
19pub fn from_env() -> Result<Vec<Arc<dyn OutboxExporter>>> {
21 let raw = match env::var(EXPORTERS_ENV) {
22 Ok(value) => value,
23 Err(_) => return Ok(Vec::new()),
24 };
25
26 let mut exporters: Vec<Arc<dyn OutboxExporter>> = Vec::new();
27 let plugin_allowlist = load_exporter_plugins();
28
29 for token in raw.split(',') {
30 let trimmed = token.trim();
31 if trimmed.is_empty() {
32 continue;
33 }
34
35 let normalized = trimmed.to_ascii_lowercase();
36 if !plugin_allowlist.contains(normalized.as_str()) {
37 warn!(
38 exporter = trimmed,
39 "exporter requested but no plugin manifest found; skipping (see docs/reference/adapters.md)"
40 );
41 continue;
42 }
43
44 match normalized.as_str() {
45 "langsmith" => match LangSmithExporter::from_env() {
46 Ok(exporter) => exporters.push(Arc::new(exporter)),
47 Err(err) => {
48 warn!(error = ?err, "failed to initialise LangSmith exporter; skipping")
49 }
50 },
51 "langfuse" => match LangfuseExporter::from_env() {
52 Ok(exporter) => exporters.push(Arc::new(exporter)),
53 Err(err) => warn!(error = ?err, "failed to initialise Langfuse exporter; skipping"),
54 },
55 "phoenix" => match PhoenixExporter::from_env() {
56 Ok(exporter) => exporters.push(Arc::new(exporter)),
57 Err(err) => warn!(error = ?err, "failed to initialise Phoenix exporter; skipping"),
58 },
59 other => {
60 warn!(
61 exporter = other,
62 "unknown trace exporter requested; ignoring"
63 );
64 }
65 }
66 }
67
68 if !exporters.is_empty() {
69 let names: Vec<&'static str> = exporters.iter().map(|exporter| exporter.name()).collect();
70 info!(exporters = ?names, "external trace exporters enabled");
71 }
72
73 Ok(exporters)
74}
75
76fn load_exporter_plugins() -> HashSet<String> {
77 let dir = env::var(EXPORTER_PLUGIN_DIR_ENV)
78 .unwrap_or_else(|_| DEFAULT_EXPORTER_PLUGIN_DIR.to_string());
79 let path = Path::new(&dir);
80 if !path.exists() {
81 return HashSet::new();
82 }
83
84 let mut allowed = HashSet::new();
85 match fs::read_dir(path) {
86 Ok(entries) => {
87 for entry in entries.flatten() {
88 let entry_path = entry.path();
89 if !entry.file_type().map(|ft| ft.is_file()).unwrap_or(false) {
90 continue;
91 }
92 match parse_plugin_manifest(&entry_path) {
93 Ok(Some(name)) => {
94 allowed.insert(name);
95 }
96 Ok(None) => {}
97 Err(err) => {
98 warn!(path = ?entry_path, error = %err, "failed to parse exporter plugin manifest")
99 }
100 }
101 }
102 }
103 Err(err) => warn!(
104 directory = %dir,
105 error = %err,
106 "failed to read exporter plugin directory"
107 ),
108 }
109
110 allowed
111}
112
113#[derive(serde::Deserialize)]
114struct ExporterPluginManifest {
115 name: Option<String>,
116 #[serde(default = "default_enabled")]
117 enabled: bool,
118}
119
120const fn default_enabled() -> bool {
121 true
122}
123
124fn parse_plugin_manifest(path: &Path) -> Result<Option<String>> {
125 let stem = path
126 .file_stem()
127 .and_then(|value| value.to_str())
128 .map(|value| value.to_string())
129 .unwrap_or_else(|| "exporter".to_string());
130 let contents = fs::read_to_string(path)
131 .with_context(|| format!("failed to read exporter plugin {:?}", path))?;
132 let trimmed = contents.trim();
133 if trimmed.is_empty() {
134 return Ok(Some(stem.to_ascii_lowercase()));
135 }
136
137 let is_json = path
138 .extension()
139 .and_then(|ext| ext.to_str())
140 .map(|ext| ext.eq_ignore_ascii_case("json"))
141 .unwrap_or(false);
142
143 let manifest: ExporterPluginManifest = if is_json {
144 serde_json::from_str(trimmed)
145 .with_context(|| format!("failed to parse JSON plugin manifest {:?}", path))?
146 } else {
147 toml::from_str(trimmed)
148 .with_context(|| format!("failed to parse TOML plugin manifest {:?}", path))?
149 };
150
151 if !manifest.enabled {
152 return Ok(None);
153 }
154
155 let name = manifest.name.unwrap_or(stem).to_ascii_lowercase();
156
157 Ok(Some(name))
158}
159
160fn normalize_event(event: &OutboxEvent) -> Value {
161 json!({
162 "id": event.id,
163 "run_id": event.run_id.to_string(),
164 "step_id": event.step_id.map(|id| id.to_string()),
165 "kind": event.kind,
166 "created_at": event.created_at.to_rfc3339_opts(SecondsFormat::Millis, true),
167 "payload": event.payload
168 })
169}
170
171fn extract_trace_id(payload: &Value) -> Option<String> {
172 payload
173 .get("trace")
174 .and_then(|value| value.get("trace_id"))
175 .and_then(Value::as_str)
176 .map(|value| value.to_string())
177}
178
179#[derive(Clone)]
180struct LangSmithExporter {
181 client: Client,
182 endpoint: Url,
183 api_key: String,
184 dataset_id: Option<String>,
185}
186
187impl LangSmithExporter {
188 fn from_env() -> Result<Self> {
189 let base_url = env::var("LANGSMITH_API_URL")
190 .unwrap_or_else(|_| "https://api.smith.langchain.com/".to_string());
191 let api_key = env::var("LANGSMITH_API_KEY")
192 .context("LANGSMITH_API_KEY is required for LangSmith exporter")?;
193 let endpoint = Url::parse(&base_url)
194 .context("invalid LANGSMITH_API_URL")?
195 .join("v1/runs")
196 .context("failed to build LangSmith endpoint URL")?;
197 let dataset_id = env::var("LANGSMITH_DATASET_ID")
198 .ok()
199 .filter(|value| !value.is_empty());
200
201 let client = Client::builder()
202 .user_agent(user_agent())
203 .build()
204 .context("failed to build LangSmith HTTP client")?;
205
206 Ok(Self {
207 client,
208 endpoint,
209 api_key,
210 dataset_id,
211 })
212 }
213}
214
215#[async_trait::async_trait]
216impl OutboxExporter for LangSmithExporter {
217 fn name(&self) -> &'static str {
218 "langsmith"
219 }
220
221 async fn export(&self, event: &OutboxEvent) -> Result<()> {
222 let mut body = normalize_event(event);
223 if let Some(dataset) = &self.dataset_id {
224 if let Some(map) = body.as_object_mut() {
225 map.insert("dataset_id".to_string(), Value::String(dataset.clone()));
226 }
227 }
228
229 let response = self
230 .client
231 .post(self.endpoint.clone())
232 .header("X-API-KEY", &self.api_key)
233 .json(&body)
234 .send()
235 .await
236 .context("failed to send LangSmith request")?;
237
238 if !response.status().is_success() {
239 let status = response.status();
240 let text = response.text().await.unwrap_or_default();
241 return Err(anyhow!(
242 "LangSmith exporter error (status {status}): {text}"
243 ));
244 }
245
246 Ok(())
247 }
248}
249
250#[derive(Clone)]
251struct LangfuseExporter {
252 client: Client,
253 endpoint: Url,
254 auth_header: String,
255}
256
257impl LangfuseExporter {
258 fn from_env() -> Result<Self> {
259 let base_url = env::var("LANGFUSE_BASE_URL")
260 .unwrap_or_else(|_| "https://cloud.langfuse.com/api/public/".to_string());
261 let public_key = env::var("LANGFUSE_PUBLIC_KEY")
262 .context("LANGFUSE_PUBLIC_KEY is required for Langfuse exporter")?;
263 let secret_key = env::var("LANGFUSE_SECRET_KEY")
264 .context("LANGFUSE_SECRET_KEY is required for Langfuse exporter")?;
265 let endpoint = Url::parse(&base_url)
266 .context("invalid LANGFUSE_BASE_URL")?
267 .join("ingest")
268 .context("failed to build Langfuse endpoint URL")?;
269
270 let credentials = format!("{public_key}:{secret_key}");
271 let auth_header = format!("Basic {}", BASE64_STANDARD.encode(credentials));
272
273 let client = Client::builder()
274 .user_agent(user_agent())
275 .build()
276 .context("failed to build Langfuse HTTP client")?;
277
278 Ok(Self {
279 client,
280 endpoint,
281 auth_header,
282 })
283 }
284}
285
286#[async_trait::async_trait]
287impl OutboxExporter for LangfuseExporter {
288 fn name(&self) -> &'static str {
289 "langfuse"
290 }
291
292 async fn export(&self, event: &OutboxEvent) -> Result<()> {
293 let mut body = normalize_event(event);
294 if let Some(trace_id) = extract_trace_id(&event.payload) {
295 if let Some(map) = body.as_object_mut() {
296 map.insert("trace_id".to_string(), Value::String(trace_id));
297 }
298 }
299
300 let response = self
301 .client
302 .post(self.endpoint.clone())
303 .header("Authorization", &self.auth_header)
304 .json(&body)
305 .send()
306 .await
307 .context("failed to send Langfuse request")?;
308
309 if !response.status().is_success() {
310 let status = response.status();
311 let text = response.text().await.unwrap_or_default();
312 return Err(anyhow!("Langfuse exporter error (status {status}): {text}"));
313 }
314
315 Ok(())
316 }
317}
318
319#[derive(Clone)]
320struct PhoenixExporter {
321 client: Client,
322 endpoint: Url,
323 api_key: String,
324 workspace: Option<String>,
325}
326
327impl PhoenixExporter {
328 fn from_env() -> Result<Self> {
329 let base_url = env::var("PHOENIX_BASE_URL")
330 .unwrap_or_else(|_| "https://app.phoenix.arize.com/api/".to_string());
331 let api_key = env::var("PHOENIX_API_KEY")
332 .context("PHOENIX_API_KEY is required for Phoenix exporter")?;
333 let endpoint = Url::parse(&base_url)
334 .context("invalid PHOENIX_BASE_URL")?
335 .join("v1/ingest")
336 .context("failed to build Phoenix endpoint URL")?;
337 let workspace = env::var("PHOENIX_WORKSPACE")
338 .ok()
339 .filter(|value| !value.is_empty());
340
341 let client = Client::builder()
342 .user_agent(user_agent())
343 .build()
344 .context("failed to build Phoenix HTTP client")?;
345
346 Ok(Self {
347 client,
348 endpoint,
349 api_key,
350 workspace,
351 })
352 }
353}
354
355#[async_trait::async_trait]
356impl OutboxExporter for PhoenixExporter {
357 fn name(&self) -> &'static str {
358 "phoenix"
359 }
360
361 async fn export(&self, event: &OutboxEvent) -> Result<()> {
362 let mut body = normalize_event(event);
363 if let Some(trace_id) = extract_trace_id(&event.payload) {
364 if let Some(map) = body.as_object_mut() {
365 map.insert("trace_id".to_string(), Value::String(trace_id));
366 }
367 }
368 if let Some(workspace) = &self.workspace {
369 if let Some(map) = body.as_object_mut() {
370 map.insert("workspace".to_string(), Value::String(workspace.clone()));
371 }
372 }
373
374 let response = self
375 .client
376 .post(self.endpoint.clone())
377 .bearer_auth(&self.api_key)
378 .json(&body)
379 .send()
380 .await
381 .context("failed to send Phoenix request")?;
382
383 if !response.status().is_success() {
384 let status = response.status();
385 let text = response.text().await.unwrap_or_default();
386 return Err(anyhow!("Phoenix exporter error (status {status}): {text}"));
387 }
388
389 Ok(())
390 }
391}
392
393fn user_agent() -> String {
394 format!("fleetforge-exporter/{}", env!("CARGO_PKG_VERSION"))
395}
396
397#[cfg(test)]
398mod tests {
399 use super::*;
400
401 #[test]
402 fn normalize_event_copies_payload() {
403 let event = OutboxEvent {
404 id: 1,
405 run_id: uuid::Uuid::nil(),
406 step_id: None,
407 kind: "run_started".to_string(),
408 payload: json!({"trace": {"trace_id": "abc"}}),
409 created_at: Utc::now(),
410 published_at: None,
411 };
412
413 let normalized = normalize_event(&event);
414 assert_eq!(normalized["id"], 1);
415 assert_eq!(normalized["payload"]["trace"]["trace_id"], "abc");
416 }
417}