fleetforge_runtime/
otel.rs

1use fleetforge_telemetry::otel::{get_active_span, KeyValue, SpanContext, Value as OtelValue};
2use tracing::{info_span, Span as TracingSpan};
3
4use fleetforge_telemetry::{
5    context::{StepScope, ToolCallScope, TraceContext},
6    metrics,
7};
8use serde_json::Value;
9
10use crate::{
11    features,
12    model::{QueuedStep, StepSpec},
13};
14
15fn set_attributes<K, V>(attributes: impl IntoIterator<Item = (K, V)>)
16where
17    K: Into<String>,
18    V: Into<OtelValue>,
19{
20    get_active_span(|span| {
21        for (key, value) in attributes.into_iter() {
22            span.set_attribute(KeyValue::new(key.into(), value.into()));
23        }
24    });
25}
26
27fn request_model(spec: &StepSpec) -> Option<String> {
28    spec.inputs
29        .get("model")
30        .and_then(Value::as_str)
31        .map(|s| s.to_string())
32}
33
34fn request_max_tokens(spec: &StepSpec) -> Option<i64> {
35    spec.inputs.get("max_tokens").and_then(Value::as_i64)
36}
37
38fn request_temperature(spec: &StepSpec) -> Option<f64> {
39    spec.inputs.get("temperature").and_then(Value::as_f64)
40}
41
42fn finish_reasons(value: &Value) -> Option<String> {
43    value.as_array().map(|items| {
44        items
45            .iter()
46            .filter_map(Value::as_str)
47            .collect::<Vec<_>>()
48            .join(",")
49    })
50}
51
52pub fn span_run(parent: &TraceContext) -> (TracingSpan, TraceContext) {
53    let run_scope = parent.run.clone();
54    let mut span = info_span!(
55        "fleetforge.runtime.run",
56        run_id = %run_scope.run_id,
57        workspace_id = %run_scope.workspace_id,
58        app_id = %run_scope.app_id,
59        attempt_id = %run_scope.attempt_id
60    );
61    span.set_parent(parent.as_parent_context());
62    {
63        let _guard = span.enter();
64        set_attributes(parent.attributes());
65    }
66    let span_context = span.context().span().span_context().clone();
67    let derived = TraceContext::from_span_context(
68        run_scope,
69        None,
70        None,
71        None,
72        parent.budget.clone(),
73        Some(parent.span_id()),
74        span_context,
75        parent.seed,
76        parent.baggage.clone(),
77    );
78    (span, derived)
79}
80
81pub fn span_step<'a>(
82    parent: &'a TraceContext,
83    step: &QueuedStep,
84    spec: &StepSpec,
85    worker_id: &str,
86) -> (TracingSpan, TraceContext) {
87    let step_index = if step.idx >= 0 { step.idx as u32 } else { 0 };
88    let step_attempt = if step.attempt >= 0 {
89        (step.attempt + 1) as u32
90    } else {
91        0
92    };
93
94    let mut span = info_span!(
95        "fleetforge.runtime.step",
96        run_id = %step.run_id,
97        step_id = %step.step_id,
98        step_kind = %spec.r#type,
99        step_idx = step.idx,
100        attempt = step_attempt,
101        scheduler = %worker_id,
102        fleetforge_tokens_total = tracing::field::Empty,
103        fleetforge_tokens_input = tracing::field::Empty,
104        fleetforge_tokens_output = tracing::field::Empty,
105        fleetforge_cost_usd = tracing::field::Empty,
106        fleetforge_duration_ms = tracing::field::Empty,
107        fleetforge_status = tracing::field::Empty,
108        error_message = tracing::field::Empty,
109        fleetforge_policy_pack = tracing::field::Empty,
110        fleetforge_policy_effect = tracing::field::Empty
111    );
112    span.set_parent(parent.as_parent_context());
113
114    {
115        let _guard = span.enter();
116        set_attributes([
117            ("gen_ai.system", "fleetforge"),
118            ("gen_ai.operation.name", spec.r#type.as_str()),
119        ]);
120        if let Some(model) = request_model(spec) {
121            set_attributes([("gen_ai.request.model", model.as_str())]);
122        }
123        if let Some(max_tokens) = request_max_tokens(spec) {
124            set_attributes([("gen_ai.request.max_tokens", max_tokens as i64)]);
125        }
126        if let Some(temp) = request_temperature(spec) {
127            set_attributes([("gen_ai.request.temperature", temp)]);
128        }
129    }
130
131    let step_scope = StepScope::new(step.step_id.to_string())
132        .with_index(step_index)
133        .with_attempt(step_attempt)
134        .with_kind(spec.r#type.to_string())
135        .with_scheduler(worker_id.to_string());
136    let step_trace = TraceContext::from_span_context(
137        parent.run.clone(),
138        Some(step_scope),
139        None,
140        None,
141        parent.budget.clone(),
142        Some(parent.span_id()),
143        span.context().span().span_context().clone(),
144        parent.seed,
145        parent.baggage.clone(),
146    );
147
148    (span, step_trace)
149}
150
151pub fn span_tool(
152    parent: &TraceContext,
153    scope: ToolCallScope,
154    span_name: &str,
155    tool_name: &str,
156    attrs: Vec<(String, String)>,
157) -> (TracingSpan, TraceContext) {
158    let mut span = info_span!(
159        "fleetforge.runtime.tool",
160        run_id = %parent.run.run_id,
161        tool_call_id = %scope.tool_call_id,
162        tool_name = tool_name,
163        span_name = span_name,
164        tool_status = tracing::field::Empty
165    );
166    span.set_parent(parent.as_parent_context());
167    {
168        let _guard = span.enter();
169        set_attributes([("gen_ai.tool.name", tool_name)]);
170        for (key, value) in attrs {
171            set_attributes([(key, value)]);
172        }
173    }
174
175    let tool_trace = TraceContext::from_span_context(
176        parent.run.clone(),
177        parent.step.clone(),
178        Some(scope),
179        None,
180        parent.budget.clone(),
181        Some(parent.span_id()),
182        span.context().span().span_context().clone(),
183        parent.seed,
184        parent.baggage.clone(),
185    );
186
187    (span, tool_trace)
188}
189
190pub fn record_tool_completion(
191    usage_tokens_in: Option<u64>,
192    usage_tokens_out: Option<u64>,
193    cost_usd: Option<f64>,
194    finish: Option<&Value>,
195) {
196    if let Some(input_tokens) = usage_tokens_in {
197        set_attributes([("gen_ai.response.usage.prompt_tokens", input_tokens as i64)]);
198    }
199    if let Some(output_tokens) = usage_tokens_out {
200        set_attributes([(
201            "gen_ai.response.usage.completion_tokens",
202            output_tokens as i64,
203        )]);
204    }
205    if let Some(cost) = cost_usd {
206        set_attributes([("gen_ai.response.cost", cost)]);
207    }
208    if let Some(reasons) = finish.and_then(finish_reasons) {
209        set_attributes([("gen_ai.response.finish_reasons", reasons)]);
210    }
211}
212
213pub fn event_policy(pack: &str, effect: &str, summary: &str, artifact: Option<&str>) {
214    get_active_span(|span| {
215        let mut kv = vec![
216            KeyValue::new("fleetforge.policy.pack", pack.to_string()),
217            KeyValue::new("fleetforge.policy.effect", effect.to_string()),
218            KeyValue::new("fleetforge.policy.summary", summary.to_string()),
219        ];
220        if let Some(artifact_id) = artifact {
221            kv.push(KeyValue::new(
222                "fleetforge.policy.decisions_artifact",
223                artifact_id.to_string(),
224            ));
225        }
226        span.add_event("policy", kv.into_iter());
227    });
228    metrics::record_policy_event(effect, Some(pack));
229}
230
231pub fn record_attribute_str(key: &str, value: &str) {
232    set_attributes([(key.to_string(), value.to_string())]);
233}
234
235pub fn record_attribute_i64(key: &str, value: i64) {
236    set_attributes([(key.to_string(), value.into())]);
237}
238
239pub fn record_attribute_f64(key: &str, value: f64) {
240    set_attributes([(key.to_string(), value.into())]);
241}
242
243pub fn record_trust_attributes(
244    attestation_ids: &[String],
245    subject: Option<&str>,
246    policy_decision_id: Option<&str>,
247) {
248    if !features::trust_mesh_alpha_enabled() {
249        return;
250    }
251    if !attestation_ids.is_empty() {
252        let joined = attestation_ids.join(",");
253        set_attributes([("trust.attestation_ids".to_string(), joined.into())]);
254        if let Some(first) = attestation_ids.first() {
255            set_attributes([("trust.attestation_id".to_string(), first.clone().into())]);
256        }
257    }
258    if let Some(subject) = subject {
259        set_attributes([("trust.subject".to_string(), subject.to_string().into())]);
260    }
261    if let Some(policy_id) = policy_decision_id {
262        set_attributes([(
263            "trust.policy_decision_id".to_string(),
264            policy_id.to_string().into(),
265        )]);
266    }
267}
268
269#[macro_export]
270macro_rules! span_step {
271    (parent: $parent:expr, step: $step:expr, spec: $spec:expr, worker: $worker:expr) => {
272        $crate::otel::span_step($parent, $step, $spec, $worker)
273    };
274}
275
276#[macro_export]
277macro_rules! span_run {
278    (parent: $parent:expr) => {
279        $crate::otel::span_run($parent)
280    };
281}
282
283#[macro_export]
284macro_rules! span_tool {
285    (parent: $parent:expr, scope: $scope:expr, span: $span_name:expr, name: $tool_name:expr, attrs: $attrs:expr) => {
286        $crate::otel::span_tool($parent, $scope, $span_name, $tool_name, $attrs)
287    };
288}
289
290#[macro_export]
291macro_rules! event_policy {
292    (pack: $pack:expr, effect: $effect:expr, summary: $summary:expr, artifact: $artifact:expr) => {
293        $crate::otel::event_policy($pack, $effect, $summary, $artifact)
294    };
295    (pack: $pack:expr, effect: $effect:expr, summary: $summary:expr) => {
296        $crate::otel::event_policy($pack, $effect, $summary, None::<&str>)
297    };
298}