fleetforge_telemetry/
analytics.rs

1//! Analytics sinks that persist normalized run/step telemetry into ClickHouse.
2
3use std::env;
4
5use anyhow::{Context, Result};
6use chrono::{DateTime, Utc};
7use clickhouse::{Client, Row};
8use fleetforge_contracts_telemetry::TrustMetadata;
9use serde::Serialize;
10use serde_json::Value;
11use uuid::Uuid;
12
13#[derive(Debug, Clone, Row, Serialize)]
14pub struct RunEvent {
15    pub run_id: Uuid,
16    pub status: String,
17    pub created_at: DateTime<Utc>,
18    pub updated_at: DateTime<Utc>,
19    pub duration_ms: Option<u64>,
20    pub attributes: Value,
21    pub trust: Option<TrustMetadata>,
22}
23
24#[derive(Debug, Clone, Row, Serialize)]
25pub struct StepEvent {
26    pub run_id: Uuid,
27    pub step_id: Uuid,
28    pub status: String,
29    pub attempt: i32,
30    pub started_at: DateTime<Utc>,
31    pub finished_at: Option<DateTime<Utc>>,
32    pub latency_ms: Option<u64>,
33    pub attributes: Value,
34    pub trust: Option<TrustMetadata>,
35}
36
37/// Configurable ClickHouse-backed analytics sink.
38#[derive(Clone)]
39pub struct ClickhouseSink {
40    client: Client,
41    run_table: String,
42    step_table: String,
43}
44
45impl ClickhouseSink {
46    /// Build a sink from environment variables or sensible defaults.
47    pub fn from_env() -> Result<Self> {
48        let url = env::var("CLICKHOUSE_DSN").unwrap_or_else(|_| "http://localhost:8123".into());
49        let database = env::var("CLICKHOUSE_DATABASE").unwrap_or_else(|_| "telemetry".into());
50        let run_table = env::var("CLICKHOUSE_RUN_TABLE").unwrap_or_else(|_| "run_events".into());
51        let step_table = env::var("CLICKHOUSE_STEP_TABLE").unwrap_or_else(|_| "step_events".into());
52
53        let client = Client::default()
54            .with_url(url)
55            .with_database(database.clone());
56
57        Ok(Self {
58            client,
59            run_table: format!("{database}.{run_table}"),
60            step_table: format!("{database}.{step_table}"),
61        })
62    }
63
64    pub fn with_client(
65        client: Client,
66        run_table: impl Into<String>,
67        step_table: impl Into<String>,
68    ) -> Self {
69        Self {
70            client,
71            run_table: run_table.into(),
72            step_table: step_table.into(),
73        }
74    }
75
76    /// Inserts a normalized run event row.
77    pub async fn insert_run_event(&self, event: RunEvent) -> Result<()> {
78        let mut insert = self
79            .client
80            .insert(&self.run_table)
81            .context("failed to prepare ClickHouse run_events insert")?;
82        insert.write(&event).await?;
83        insert.end().await?;
84        Ok(())
85    }
86
87    /// Inserts a normalized step event row.
88    pub async fn insert_step_event(&self, event: StepEvent) -> Result<()> {
89        let mut insert = self
90            .client
91            .insert(&self.step_table)
92            .context("failed to prepare ClickHouse step_events insert")?;
93        insert.write(&event).await?;
94        insert.end().await?;
95        Ok(())
96    }
97}
98
99#[cfg(test)]
100mod tests {
101    use super::*;
102    use serde_json::json;
103
104    #[test]
105    fn run_event_serializes() {
106        let event = RunEvent {
107            run_id: Uuid::nil(),
108            status: "succeeded".into(),
109            created_at: Utc::now(),
110            updated_at: Utc::now(),
111            duration_ms: Some(1234),
112            attributes: json!({"tokens": 10}),
113            trust: None,
114        };
115        let value = serde_json::to_value(&event).expect("serialize");
116        assert_eq!(value["status"], "succeeded");
117        assert_eq!(value["attributes"]["tokens"], 10);
118    }
119
120    #[test]
121    fn step_event_serializes() {
122        let event = StepEvent {
123            run_id: Uuid::nil(),
124            step_id: Uuid::nil(),
125            status: "failed".into(),
126            attempt: 2,
127            started_at: Utc::now(),
128            finished_at: None,
129            latency_ms: Some(42),
130            attributes: json!({"error": "timeout"}),
131            trust: None,
132        };
133        let value = serde_json::to_value(&event).expect("serialize");
134        assert_eq!(value["attempt"], 2);
135        assert_eq!(value["attributes"]["error"], "timeout");
136    }
137}
138
139#[cfg(all(test, feature = "clickhouse-tests"))]
140mod clickhouse_integration_tests {
141    use super::*;
142    use anyhow::Result;
143    use serde_json::json;
144    use std::env;
145    use uuid::Uuid;
146
147    #[tokio::test(flavor = "multi_thread")]
148    async fn clickhouse_sink_inserts_rows() -> Result<()> {
149        let dsn = match env::var("CLICKHOUSE_DSN") {
150            Ok(value) => value,
151            Err(_) => {
152                eprintln!("CLICKHOUSE_DSN not set; skipping ClickHouse sink integration test");
153                return Ok(());
154            }
155        };
156
157        let db_name = format!("telemetry_test_{}", Uuid::new_v4().simple());
158        let client = Client::default().with_url(&dsn);
159
160        let result = async {
161            client
162                .query(format!("CREATE DATABASE IF NOT EXISTS {db_name}"))
163                .execute()
164                .await?;
165
166            let run_table = format!("{db_name}.run_events");
167            let step_table = format!("{db_name}.step_events");
168
169            client
170                .query(format!(
171                    "CREATE TABLE {run_table} (
172                        run_id UUID,
173                        status String,
174                        created_at DateTime,
175                        updated_at DateTime,
176                        duration_ms Nullable(UInt64),
177                        attributes JSON,
178                        trust Nullable(JSON)
179                    ) ENGINE = MergeTree ORDER BY run_id"
180                ))
181                .execute()
182                .await?;
183
184            client
185                .query(format!(
186                    "CREATE TABLE {step_table} (
187                        run_id UUID,
188                        step_id UUID,
189                        status String,
190                        attempt Int32,
191                        started_at DateTime,
192                        finished_at Nullable(DateTime),
193                        latency_ms Nullable(UInt64),
194                        attributes JSON,
195                        trust Nullable(JSON)
196                    ) ENGINE = MergeTree ORDER BY (run_id, step_id)"
197                ))
198                .execute()
199                .await?;
200
201            let scoped = client.clone().with_database(&db_name);
202            let sink = ClickhouseSink::with_client(scoped, run_table.clone(), step_table.clone());
203
204            sink.insert_run_event(RunEvent {
205                run_id: Uuid::new_v4(),
206                status: "succeeded".into(),
207                created_at: Utc::now(),
208                updated_at: Utc::now(),
209                duration_ms: Some(100),
210                attributes: json!({"test": true}),
211                trust: Some(TrustMetadata {
212                    subject: "test".into(),
213                    provenance: json!({"source": "integration"}),
214                }),
215            })
216            .await?;
217
218            sink.insert_step_event(StepEvent {
219                run_id: Uuid::new_v4(),
220                step_id: Uuid::new_v4(),
221                status: "completed".into(),
222                attempt: 1,
223                started_at: Utc::now(),
224                finished_at: Some(Utc::now()),
225                latency_ms: Some(42),
226                attributes: json!({"step": 1}),
227                trust: None,
228            })
229            .await?;
230
231            let run_count: u64 = client
232                .clone()
233                .query(format!("SELECT count() FROM {run_table}"))
234                .fetch_one()
235                .await?;
236            let step_count: u64 = client
237                .clone()
238                .query(format!("SELECT count() FROM {step_table}"))
239                .fetch_one()
240                .await?;
241
242            assert_eq!(run_count, 1);
243            assert_eq!(step_count, 1);
244
245            Ok::<_, anyhow::Error>(())
246        }
247        .await;
248
249        client
250            .query(format!("DROP DATABASE IF EXISTS {db_name}"))
251            .execute()
252            .await
253            .ok();
254
255        result
256    }
257}