fleetforge_telemetry/exporters/
mod.rs

1pub mod otlp;
2
3use std::{collections::HashSet, env, fs, path::Path, sync::Arc};
4
5use anyhow::{anyhow, Context, Result};
6use base64::engine::general_purpose::STANDARD as BASE64_STANDARD;
7use base64::Engine;
8use chrono::SecondsFormat;
9use fleetforge_bus::OutboxExporter;
10use fleetforge_storage::models::OutboxEvent;
11use reqwest::{Client, Url};
12use serde_json::{json, Value};
13use tracing::{info, warn};
14
15const EXPORTERS_ENV: &str = "FLEETFORGE_TRACE_EXPORTERS";
16const EXPORTER_PLUGIN_DIR_ENV: &str = "FLEETFORGE_EXPORTER_PLUGIN_DIR";
17const DEFAULT_EXPORTER_PLUGIN_DIR: &str = "plugins/exporters";
18
19/// Builds the configured set of external exporters from environment variables.
20pub fn from_env() -> Result<Vec<Arc<dyn OutboxExporter>>> {
21    let raw = match env::var(EXPORTERS_ENV) {
22        Ok(value) => value,
23        Err(_) => return Ok(Vec::new()),
24    };
25
26    let mut exporters: Vec<Arc<dyn OutboxExporter>> = Vec::new();
27    let plugin_allowlist = load_exporter_plugins();
28
29    for token in raw.split(',') {
30        let trimmed = token.trim();
31        if trimmed.is_empty() {
32            continue;
33        }
34
35        let normalized = trimmed.to_ascii_lowercase();
36        if !plugin_allowlist.contains(normalized.as_str()) {
37            warn!(
38                exporter = trimmed,
39                "exporter requested but no plugin manifest found; skipping (see docs/reference/adapters.md)"
40            );
41            continue;
42        }
43
44        match normalized.as_str() {
45            "langsmith" => match LangSmithExporter::from_env() {
46                Ok(exporter) => exporters.push(Arc::new(exporter)),
47                Err(err) => {
48                    warn!(error = ?err, "failed to initialise LangSmith exporter; skipping")
49                }
50            },
51            "langfuse" => match LangfuseExporter::from_env() {
52                Ok(exporter) => exporters.push(Arc::new(exporter)),
53                Err(err) => warn!(error = ?err, "failed to initialise Langfuse exporter; skipping"),
54            },
55            "phoenix" => match PhoenixExporter::from_env() {
56                Ok(exporter) => exporters.push(Arc::new(exporter)),
57                Err(err) => warn!(error = ?err, "failed to initialise Phoenix exporter; skipping"),
58            },
59            other => {
60                warn!(
61                    exporter = other,
62                    "unknown trace exporter requested; ignoring"
63                );
64            }
65        }
66    }
67
68    if !exporters.is_empty() {
69        let names: Vec<&'static str> = exporters.iter().map(|exporter| exporter.name()).collect();
70        info!(exporters = ?names, "external trace exporters enabled");
71    }
72
73    Ok(exporters)
74}
75
76fn load_exporter_plugins() -> HashSet<String> {
77    let dir = env::var(EXPORTER_PLUGIN_DIR_ENV)
78        .unwrap_or_else(|_| DEFAULT_EXPORTER_PLUGIN_DIR.to_string());
79    let path = Path::new(&dir);
80    if !path.exists() {
81        return HashSet::new();
82    }
83
84    let mut allowed = HashSet::new();
85    match fs::read_dir(path) {
86        Ok(entries) => {
87            for entry in entries.flatten() {
88                let entry_path = entry.path();
89                if !entry.file_type().map(|ft| ft.is_file()).unwrap_or(false) {
90                    continue;
91                }
92                match parse_plugin_manifest(&entry_path) {
93                    Ok(Some(name)) => {
94                        allowed.insert(name);
95                    }
96                    Ok(None) => {}
97                    Err(err) => {
98                        warn!(path = ?entry_path, error = %err, "failed to parse exporter plugin manifest")
99                    }
100                }
101            }
102        }
103        Err(err) => warn!(
104            directory = %dir,
105            error = %err,
106            "failed to read exporter plugin directory"
107        ),
108    }
109
110    allowed
111}
112
113#[derive(serde::Deserialize)]
114struct ExporterPluginManifest {
115    name: Option<String>,
116    #[serde(default = "default_enabled")]
117    enabled: bool,
118}
119
120const fn default_enabled() -> bool {
121    true
122}
123
124fn parse_plugin_manifest(path: &Path) -> Result<Option<String>> {
125    let stem = path
126        .file_stem()
127        .and_then(|value| value.to_str())
128        .map(|value| value.to_string())
129        .unwrap_or_else(|| "exporter".to_string());
130    let contents = fs::read_to_string(path)
131        .with_context(|| format!("failed to read exporter plugin {:?}", path))?;
132    let trimmed = contents.trim();
133    if trimmed.is_empty() {
134        return Ok(Some(stem.to_ascii_lowercase()));
135    }
136
137    let is_json = path
138        .extension()
139        .and_then(|ext| ext.to_str())
140        .map(|ext| ext.eq_ignore_ascii_case("json"))
141        .unwrap_or(false);
142
143    let manifest: ExporterPluginManifest = if is_json {
144        serde_json::from_str(trimmed)
145            .with_context(|| format!("failed to parse JSON plugin manifest {:?}", path))?
146    } else {
147        toml::from_str(trimmed)
148            .with_context(|| format!("failed to parse TOML plugin manifest {:?}", path))?
149    };
150
151    if !manifest.enabled {
152        return Ok(None);
153    }
154
155    let name = manifest.name.unwrap_or(stem).to_ascii_lowercase();
156
157    Ok(Some(name))
158}
159
160fn normalize_event(event: &OutboxEvent) -> Value {
161    json!({
162        "id": event.id,
163        "run_id": event.run_id.to_string(),
164        "step_id": event.step_id.map(|id| id.to_string()),
165        "kind": event.kind,
166        "created_at": event.created_at.to_rfc3339_opts(SecondsFormat::Millis, true),
167        "payload": event.payload
168    })
169}
170
171fn extract_trace_id(payload: &Value) -> Option<String> {
172    payload
173        .get("trace")
174        .and_then(|value| value.get("trace_id"))
175        .and_then(Value::as_str)
176        .map(|value| value.to_string())
177}
178
179#[derive(Clone)]
180struct LangSmithExporter {
181    client: Client,
182    endpoint: Url,
183    api_key: String,
184    dataset_id: Option<String>,
185}
186
187impl LangSmithExporter {
188    fn from_env() -> Result<Self> {
189        let base_url = env::var("LANGSMITH_API_URL")
190            .unwrap_or_else(|_| "https://api.smith.langchain.com/".to_string());
191        let api_key = env::var("LANGSMITH_API_KEY")
192            .context("LANGSMITH_API_KEY is required for LangSmith exporter")?;
193        let endpoint = Url::parse(&base_url)
194            .context("invalid LANGSMITH_API_URL")?
195            .join("v1/runs")
196            .context("failed to build LangSmith endpoint URL")?;
197        let dataset_id = env::var("LANGSMITH_DATASET_ID")
198            .ok()
199            .filter(|value| !value.is_empty());
200
201        let client = Client::builder()
202            .user_agent(user_agent())
203            .build()
204            .context("failed to build LangSmith HTTP client")?;
205
206        Ok(Self {
207            client,
208            endpoint,
209            api_key,
210            dataset_id,
211        })
212    }
213}
214
215#[async_trait::async_trait]
216impl OutboxExporter for LangSmithExporter {
217    fn name(&self) -> &'static str {
218        "langsmith"
219    }
220
221    async fn export(&self, event: &OutboxEvent) -> Result<()> {
222        let mut body = normalize_event(event);
223        if let Some(dataset) = &self.dataset_id {
224            if let Some(map) = body.as_object_mut() {
225                map.insert("dataset_id".to_string(), Value::String(dataset.clone()));
226            }
227        }
228
229        let response = self
230            .client
231            .post(self.endpoint.clone())
232            .header("X-API-KEY", &self.api_key)
233            .json(&body)
234            .send()
235            .await
236            .context("failed to send LangSmith request")?;
237
238        if !response.status().is_success() {
239            let status = response.status();
240            let text = response.text().await.unwrap_or_default();
241            return Err(anyhow!(
242                "LangSmith exporter error (status {status}): {text}"
243            ));
244        }
245
246        Ok(())
247    }
248}
249
250#[derive(Clone)]
251struct LangfuseExporter {
252    client: Client,
253    endpoint: Url,
254    auth_header: String,
255}
256
257impl LangfuseExporter {
258    fn from_env() -> Result<Self> {
259        let base_url = env::var("LANGFUSE_BASE_URL")
260            .unwrap_or_else(|_| "https://cloud.langfuse.com/api/public/".to_string());
261        let public_key = env::var("LANGFUSE_PUBLIC_KEY")
262            .context("LANGFUSE_PUBLIC_KEY is required for Langfuse exporter")?;
263        let secret_key = env::var("LANGFUSE_SECRET_KEY")
264            .context("LANGFUSE_SECRET_KEY is required for Langfuse exporter")?;
265        let endpoint = Url::parse(&base_url)
266            .context("invalid LANGFUSE_BASE_URL")?
267            .join("ingest")
268            .context("failed to build Langfuse endpoint URL")?;
269
270        let credentials = format!("{public_key}:{secret_key}");
271        let auth_header = format!("Basic {}", BASE64_STANDARD.encode(credentials));
272
273        let client = Client::builder()
274            .user_agent(user_agent())
275            .build()
276            .context("failed to build Langfuse HTTP client")?;
277
278        Ok(Self {
279            client,
280            endpoint,
281            auth_header,
282        })
283    }
284}
285
286#[async_trait::async_trait]
287impl OutboxExporter for LangfuseExporter {
288    fn name(&self) -> &'static str {
289        "langfuse"
290    }
291
292    async fn export(&self, event: &OutboxEvent) -> Result<()> {
293        let mut body = normalize_event(event);
294        if let Some(trace_id) = extract_trace_id(&event.payload) {
295            if let Some(map) = body.as_object_mut() {
296                map.insert("trace_id".to_string(), Value::String(trace_id));
297            }
298        }
299
300        let response = self
301            .client
302            .post(self.endpoint.clone())
303            .header("Authorization", &self.auth_header)
304            .json(&body)
305            .send()
306            .await
307            .context("failed to send Langfuse request")?;
308
309        if !response.status().is_success() {
310            let status = response.status();
311            let text = response.text().await.unwrap_or_default();
312            return Err(anyhow!("Langfuse exporter error (status {status}): {text}"));
313        }
314
315        Ok(())
316    }
317}
318
319#[derive(Clone)]
320struct PhoenixExporter {
321    client: Client,
322    endpoint: Url,
323    api_key: String,
324    workspace: Option<String>,
325}
326
327impl PhoenixExporter {
328    fn from_env() -> Result<Self> {
329        let base_url = env::var("PHOENIX_BASE_URL")
330            .unwrap_or_else(|_| "https://app.phoenix.arize.com/api/".to_string());
331        let api_key = env::var("PHOENIX_API_KEY")
332            .context("PHOENIX_API_KEY is required for Phoenix exporter")?;
333        let endpoint = Url::parse(&base_url)
334            .context("invalid PHOENIX_BASE_URL")?
335            .join("v1/ingest")
336            .context("failed to build Phoenix endpoint URL")?;
337        let workspace = env::var("PHOENIX_WORKSPACE")
338            .ok()
339            .filter(|value| !value.is_empty());
340
341        let client = Client::builder()
342            .user_agent(user_agent())
343            .build()
344            .context("failed to build Phoenix HTTP client")?;
345
346        Ok(Self {
347            client,
348            endpoint,
349            api_key,
350            workspace,
351        })
352    }
353}
354
355#[async_trait::async_trait]
356impl OutboxExporter for PhoenixExporter {
357    fn name(&self) -> &'static str {
358        "phoenix"
359    }
360
361    async fn export(&self, event: &OutboxEvent) -> Result<()> {
362        let mut body = normalize_event(event);
363        if let Some(trace_id) = extract_trace_id(&event.payload) {
364            if let Some(map) = body.as_object_mut() {
365                map.insert("trace_id".to_string(), Value::String(trace_id));
366            }
367        }
368        if let Some(workspace) = &self.workspace {
369            if let Some(map) = body.as_object_mut() {
370                map.insert("workspace".to_string(), Value::String(workspace.clone()));
371            }
372        }
373
374        let response = self
375            .client
376            .post(self.endpoint.clone())
377            .bearer_auth(&self.api_key)
378            .json(&body)
379            .send()
380            .await
381            .context("failed to send Phoenix request")?;
382
383        if !response.status().is_success() {
384            let status = response.status();
385            let text = response.text().await.unwrap_or_default();
386            return Err(anyhow!("Phoenix exporter error (status {status}): {text}"));
387        }
388
389        Ok(())
390    }
391}
392
393fn user_agent() -> String {
394    format!("fleetforge-exporter/{}", env!("CARGO_PKG_VERSION"))
395}
396
397#[cfg(test)]
398mod tests {
399    use super::*;
400
401    #[test]
402    fn normalize_event_copies_payload() {
403        let event = OutboxEvent {
404            id: 1,
405            run_id: uuid::Uuid::nil(),
406            step_id: None,
407            kind: "run_started".to_string(),
408            payload: json!({"trace": {"trace_id": "abc"}}),
409            created_at: Utc::now(),
410            published_at: None,
411        };
412
413        let normalized = normalize_event(&event);
414        assert_eq!(normalized["id"], 1);
415        assert_eq!(normalized["payload"]["trace"]["trace_id"], "abc");
416    }
417}