1use std::env;
4
5use anyhow::{Context, Result};
6use chrono::{DateTime, Utc};
7use clickhouse::{Client, Row};
8use fleetforge_contracts_telemetry::TrustMetadata;
9use serde::Serialize;
10use serde_json::Value;
11use uuid::Uuid;
12
13#[derive(Debug, Clone, Row, Serialize)]
14pub struct RunEvent {
15 pub run_id: Uuid,
16 pub status: String,
17 pub created_at: DateTime<Utc>,
18 pub updated_at: DateTime<Utc>,
19 pub duration_ms: Option<u64>,
20 pub attributes: Value,
21 pub trust: Option<TrustMetadata>,
22}
23
24#[derive(Debug, Clone, Row, Serialize)]
25pub struct StepEvent {
26 pub run_id: Uuid,
27 pub step_id: Uuid,
28 pub status: String,
29 pub attempt: i32,
30 pub started_at: DateTime<Utc>,
31 pub finished_at: Option<DateTime<Utc>>,
32 pub latency_ms: Option<u64>,
33 pub attributes: Value,
34 pub trust: Option<TrustMetadata>,
35}
36
37#[derive(Clone)]
39pub struct ClickhouseSink {
40 client: Client,
41 run_table: String,
42 step_table: String,
43}
44
45impl ClickhouseSink {
46 pub fn from_env() -> Result<Self> {
48 let url = env::var("CLICKHOUSE_DSN").unwrap_or_else(|_| "http://localhost:8123".into());
49 let database = env::var("CLICKHOUSE_DATABASE").unwrap_or_else(|_| "telemetry".into());
50 let run_table = env::var("CLICKHOUSE_RUN_TABLE").unwrap_or_else(|_| "run_events".into());
51 let step_table = env::var("CLICKHOUSE_STEP_TABLE").unwrap_or_else(|_| "step_events".into());
52
53 let client = Client::default()
54 .with_url(url)
55 .with_database(database.clone());
56
57 Ok(Self {
58 client,
59 run_table: format!("{database}.{run_table}"),
60 step_table: format!("{database}.{step_table}"),
61 })
62 }
63
64 pub fn with_client(
65 client: Client,
66 run_table: impl Into<String>,
67 step_table: impl Into<String>,
68 ) -> Self {
69 Self {
70 client,
71 run_table: run_table.into(),
72 step_table: step_table.into(),
73 }
74 }
75
76 pub async fn insert_run_event(&self, event: RunEvent) -> Result<()> {
78 let mut insert = self
79 .client
80 .insert(&self.run_table)
81 .context("failed to prepare ClickHouse run_events insert")?;
82 insert.write(&event).await?;
83 insert.end().await?;
84 Ok(())
85 }
86
87 pub async fn insert_step_event(&self, event: StepEvent) -> Result<()> {
89 let mut insert = self
90 .client
91 .insert(&self.step_table)
92 .context("failed to prepare ClickHouse step_events insert")?;
93 insert.write(&event).await?;
94 insert.end().await?;
95 Ok(())
96 }
97}
98
99#[cfg(test)]
100mod tests {
101 use super::*;
102 use serde_json::json;
103
104 #[test]
105 fn run_event_serializes() {
106 let event = RunEvent {
107 run_id: Uuid::nil(),
108 status: "succeeded".into(),
109 created_at: Utc::now(),
110 updated_at: Utc::now(),
111 duration_ms: Some(1234),
112 attributes: json!({"tokens": 10}),
113 trust: None,
114 };
115 let value = serde_json::to_value(&event).expect("serialize");
116 assert_eq!(value["status"], "succeeded");
117 assert_eq!(value["attributes"]["tokens"], 10);
118 }
119
120 #[test]
121 fn step_event_serializes() {
122 let event = StepEvent {
123 run_id: Uuid::nil(),
124 step_id: Uuid::nil(),
125 status: "failed".into(),
126 attempt: 2,
127 started_at: Utc::now(),
128 finished_at: None,
129 latency_ms: Some(42),
130 attributes: json!({"error": "timeout"}),
131 trust: None,
132 };
133 let value = serde_json::to_value(&event).expect("serialize");
134 assert_eq!(value["attempt"], 2);
135 assert_eq!(value["attributes"]["error"], "timeout");
136 }
137}
138
139#[cfg(all(test, feature = "clickhouse-tests"))]
140mod clickhouse_integration_tests {
141 use super::*;
142 use anyhow::Result;
143 use serde_json::json;
144 use std::env;
145 use uuid::Uuid;
146
147 #[tokio::test(flavor = "multi_thread")]
148 async fn clickhouse_sink_inserts_rows() -> Result<()> {
149 let dsn = match env::var("CLICKHOUSE_DSN") {
150 Ok(value) => value,
151 Err(_) => {
152 eprintln!("CLICKHOUSE_DSN not set; skipping ClickHouse sink integration test");
153 return Ok(());
154 }
155 };
156
157 let db_name = format!("telemetry_test_{}", Uuid::new_v4().simple());
158 let client = Client::default().with_url(&dsn);
159
160 let result = async {
161 client
162 .query(format!("CREATE DATABASE IF NOT EXISTS {db_name}"))
163 .execute()
164 .await?;
165
166 let run_table = format!("{db_name}.run_events");
167 let step_table = format!("{db_name}.step_events");
168
169 client
170 .query(format!(
171 "CREATE TABLE {run_table} (
172 run_id UUID,
173 status String,
174 created_at DateTime,
175 updated_at DateTime,
176 duration_ms Nullable(UInt64),
177 attributes JSON,
178 trust Nullable(JSON)
179 ) ENGINE = MergeTree ORDER BY run_id"
180 ))
181 .execute()
182 .await?;
183
184 client
185 .query(format!(
186 "CREATE TABLE {step_table} (
187 run_id UUID,
188 step_id UUID,
189 status String,
190 attempt Int32,
191 started_at DateTime,
192 finished_at Nullable(DateTime),
193 latency_ms Nullable(UInt64),
194 attributes JSON,
195 trust Nullable(JSON)
196 ) ENGINE = MergeTree ORDER BY (run_id, step_id)"
197 ))
198 .execute()
199 .await?;
200
201 let scoped = client.clone().with_database(&db_name);
202 let sink = ClickhouseSink::with_client(scoped, run_table.clone(), step_table.clone());
203
204 sink.insert_run_event(RunEvent {
205 run_id: Uuid::new_v4(),
206 status: "succeeded".into(),
207 created_at: Utc::now(),
208 updated_at: Utc::now(),
209 duration_ms: Some(100),
210 attributes: json!({"test": true}),
211 trust: Some(TrustMetadata {
212 subject: "test".into(),
213 provenance: json!({"source": "integration"}),
214 }),
215 })
216 .await?;
217
218 sink.insert_step_event(StepEvent {
219 run_id: Uuid::new_v4(),
220 step_id: Uuid::new_v4(),
221 status: "completed".into(),
222 attempt: 1,
223 started_at: Utc::now(),
224 finished_at: Some(Utc::now()),
225 latency_ms: Some(42),
226 attributes: json!({"step": 1}),
227 trust: None,
228 })
229 .await?;
230
231 let run_count: u64 = client
232 .clone()
233 .query(format!("SELECT count() FROM {run_table}"))
234 .fetch_one()
235 .await?;
236 let step_count: u64 = client
237 .clone()
238 .query(format!("SELECT count() FROM {step_table}"))
239 .fetch_one()
240 .await?;
241
242 assert_eq!(run_count, 1);
243 assert_eq!(step_count, 1);
244
245 Ok::<_, anyhow::Error>(())
246 }
247 .await;
248
249 client
250 .query(format!("DROP DATABASE IF EXISTS {db_name}"))
251 .execute()
252 .await
253 .ok();
254
255 result
256 }
257}