fleetforge_runtime/
model.rs

1use std::collections::BTreeMap;
2use std::fmt;
3
4use anyhow::{Context, Result};
5use chrono::{DateTime, Utc};
6use serde::de::Error as DeError;
7use serde::ser::Serializer;
8use serde::{Deserialize, Serialize};
9use serde_json::Value;
10use uuid::Uuid;
11
12use crate::{validate_step_spec, StepSpecError};
13use fleetforge_trust::{Trust, TrustOrigin};
14
15/// Stable identifier assigned to every run stored in FleetForge.
16#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)]
17#[serde(transparent)]
18pub struct RunId(pub Uuid);
19
20impl From<Uuid> for RunId {
21    fn from(value: Uuid) -> Self {
22        Self(value)
23    }
24}
25
26impl From<RunId> for Uuid {
27    fn from(value: RunId) -> Self {
28        value.0
29    }
30}
31
32impl fmt::Display for RunId {
33    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
34        write!(f, "{}", self.0)
35    }
36}
37
38/// Unique identifier for an individual step within a run.
39#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)]
40#[serde(transparent)]
41pub struct StepId(pub Uuid);
42
43impl From<Uuid> for StepId {
44    fn from(value: Uuid) -> Self {
45        Self(value)
46    }
47}
48
49impl From<StepId> for Uuid {
50    fn from(value: StepId) -> Self {
51        value.0
52    }
53}
54
55impl fmt::Display for StepId {
56    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
57        write!(f, "{}", self.0)
58    }
59}
60
61/// High-level lifecycle phases for a run.
62#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Hash)]
63#[serde(rename_all = "snake_case")]
64pub enum RunStatus {
65    Pending,
66    Running,
67    Succeeded,
68    Failed,
69    Canceled,
70    PausedAtStep,
71}
72
73impl RunStatus {
74    pub fn as_str(self) -> &'static str {
75        match self {
76            RunStatus::Pending => "pending",
77            RunStatus::Running => "running",
78            RunStatus::Succeeded => "succeeded",
79            RunStatus::Failed => "failed",
80            RunStatus::Canceled => "canceled",
81            RunStatus::PausedAtStep => "paused_at_step",
82        }
83    }
84}
85
86impl From<fleetforge_storage::models::RunStatus> for RunStatus {
87    fn from(value: fleetforge_storage::models::RunStatus) -> Self {
88        match value {
89            fleetforge_storage::models::RunStatus::Pending => RunStatus::Pending,
90            fleetforge_storage::models::RunStatus::Running => RunStatus::Running,
91            fleetforge_storage::models::RunStatus::Succeeded => RunStatus::Succeeded,
92            fleetforge_storage::models::RunStatus::Failed => RunStatus::Failed,
93            fleetforge_storage::models::RunStatus::Canceled => RunStatus::Canceled,
94            fleetforge_storage::models::RunStatus::PausedAtStep => RunStatus::PausedAtStep,
95        }
96    }
97}
98
99impl From<RunStatus> for fleetforge_storage::models::RunStatus {
100    fn from(value: RunStatus) -> Self {
101        match value {
102            RunStatus::Pending => fleetforge_storage::models::RunStatus::Pending,
103            RunStatus::Running => fleetforge_storage::models::RunStatus::Running,
104            RunStatus::Succeeded => fleetforge_storage::models::RunStatus::Succeeded,
105            RunStatus::Failed => fleetforge_storage::models::RunStatus::Failed,
106            RunStatus::Canceled => fleetforge_storage::models::RunStatus::Canceled,
107            RunStatus::PausedAtStep => fleetforge_storage::models::RunStatus::PausedAtStep,
108        }
109    }
110}
111
112/// Scheduler-facing lifecycle phases for an individual step.
113#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Hash)]
114#[serde(rename_all = "snake_case")]
115pub enum StepStatus {
116    Queued,
117    Blocked,
118    Leased,
119    Running,
120    Succeeded,
121    Failed,
122    Skipped,
123}
124
125impl StepStatus {
126    pub fn as_str(self) -> &'static str {
127        match self {
128            StepStatus::Queued => "queued",
129            StepStatus::Blocked => "blocked",
130            StepStatus::Leased => "leased",
131            StepStatus::Running => "running",
132            StepStatus::Succeeded => "succeeded",
133            StepStatus::Failed => "failed",
134            StepStatus::Skipped => "skipped",
135        }
136    }
137}
138
139impl From<fleetforge_storage::models::StepStatus> for StepStatus {
140    fn from(value: fleetforge_storage::models::StepStatus) -> Self {
141        match value {
142            fleetforge_storage::models::StepStatus::Queued => StepStatus::Queued,
143            fleetforge_storage::models::StepStatus::Blocked => StepStatus::Blocked,
144            fleetforge_storage::models::StepStatus::Leased => StepStatus::Leased,
145            fleetforge_storage::models::StepStatus::Running => StepStatus::Running,
146            fleetforge_storage::models::StepStatus::Succeeded => StepStatus::Succeeded,
147            fleetforge_storage::models::StepStatus::Failed => StepStatus::Failed,
148            fleetforge_storage::models::StepStatus::Skipped => StepStatus::Skipped,
149        }
150    }
151}
152
153impl From<StepStatus> for fleetforge_storage::models::StepStatus {
154    fn from(value: StepStatus) -> Self {
155        match value {
156            StepStatus::Queued => fleetforge_storage::models::StepStatus::Queued,
157            StepStatus::Blocked => fleetforge_storage::models::StepStatus::Blocked,
158            StepStatus::Leased => fleetforge_storage::models::StepStatus::Leased,
159            StepStatus::Running => fleetforge_storage::models::StepStatus::Running,
160            StepStatus::Succeeded => fleetforge_storage::models::StepStatus::Succeeded,
161            StepStatus::Failed => fleetforge_storage::models::StepStatus::Failed,
162            StepStatus::Skipped => fleetforge_storage::models::StepStatus::Skipped,
163        }
164    }
165}
166
167/// Canonical step execution classes understood by the runtime.
168#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Hash)]
169#[serde(rename_all = "snake_case")]
170pub enum StepType {
171    Llm,
172    Tool,
173    Http,
174    Map,
175    Reduce,
176    Branch,
177    Replay,
178    Agent,
179}
180
181impl StepType {
182    pub fn as_str(self) -> &'static str {
183        match self {
184            StepType::Llm => "llm",
185            StepType::Tool => "tool",
186            StepType::Http => "http",
187            StepType::Map => "map",
188            StepType::Reduce => "reduce",
189            StepType::Branch => "branch",
190            StepType::Replay => "replay",
191            StepType::Agent => "agent",
192        }
193    }
194}
195
196impl std::str::FromStr for StepType {
197    type Err = anyhow::Error;
198
199    fn from_str(s: &str) -> Result<Self, Self::Err> {
200        match s {
201            "llm" => Ok(StepType::Llm),
202            "tool" => Ok(StepType::Tool),
203            "http" => Ok(StepType::Http),
204            "map" => Ok(StepType::Map),
205            "reduce" => Ok(StepType::Reduce),
206            "branch" => Ok(StepType::Branch),
207            "replay" => Ok(StepType::Replay),
208            "agent" => Ok(StepType::Agent),
209            other => Err(anyhow::anyhow!("unknown step type '{}'", other)),
210        }
211    }
212}
213
214impl fmt::Display for StepType {
215    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
216        write!(f, "{}", self.as_str())
217    }
218}
219
220/// Fully resolved specification for a step scheduled by the runtime.
221#[derive(Debug, Clone, Serialize, Deserialize)]
222pub struct StepSpec {
223    pub id: StepId,
224    #[serde(rename = "type")]
225    pub r#type: StepType,
226    pub inputs: Value,
227    #[serde(default = "default_policy_blob")]
228    pub policy: Value,
229    #[serde(default)]
230    pub execution: StepExecution,
231    #[serde(default, skip_serializing_if = "Option::is_none")]
232    pub slug: Option<String>,
233    #[serde(default, skip_serializing_if = "Option::is_none")]
234    pub trust: Option<Trust>,
235    #[serde(default, skip_serializing_if = "Option::is_none")]
236    pub trust_origin: Option<TrustOrigin>,
237    #[serde(skip)]
238    pub llm_inputs: Option<LlmInputs>,
239}
240
241fn default_policy_blob() -> Value {
242    Value::Object(Default::default())
243}
244
245/// Execution policy configuration embedded in a [`StepSpec`].
246#[derive(Debug, Clone, Serialize, Deserialize)]
247#[serde(default)]
248pub struct StepExecution {
249    pub max_attempts: u32,
250    pub retry_backoff_ms: u64,
251    pub retry_backoff_factor: f64,
252    pub deadline_ms: Option<u64>,
253    pub checkpoint: bool,
254    pub compensation: Option<CompensationSpec>,
255    pub cost_hint: Option<CostHint>,
256}
257
258impl Default for StepExecution {
259    fn default() -> Self {
260        Self {
261            max_attempts: 1,
262            retry_backoff_ms: 0,
263            retry_backoff_factor: 1.0,
264            deadline_ms: None,
265            checkpoint: false,
266            compensation: None,
267            cost_hint: None,
268        }
269    }
270}
271
272/// Compensation step reference invoked when a primary step fails.
273#[derive(Debug, Clone, Serialize, Deserialize)]
274pub struct CompensationSpec {
275    pub step: String,
276    #[serde(default, skip_serializing_if = "Option::is_none")]
277    pub reason: Option<String>,
278}
279
280/// Optional cost metadata used for planning and budgeting.
281#[derive(Debug, Clone, Serialize, Deserialize, Default)]
282pub struct CostHint {
283    #[serde(default, skip_serializing_if = "Option::is_none")]
284    pub tokens: Option<i64>,
285    #[serde(default, skip_serializing_if = "Option::is_none")]
286    pub usd: Option<f64>,
287}
288
289/// Chat roles accepted by the LLM adapters.
290#[derive(Debug, Clone, Serialize, Deserialize)]
291#[serde(rename_all = "snake_case")]
292pub enum ChatRole {
293    System,
294    User,
295    Assistant,
296    Tool,
297}
298
299/// Message envelope passed to language models.
300#[derive(Debug, Clone, Serialize, Deserialize)]
301pub struct ChatMessage {
302    pub role: ChatRole,
303    pub content: Value,
304    #[serde(default)]
305    pub name: Option<String>,
306    #[serde(default)]
307    pub tool_call_id: Option<String>,
308    #[serde(default)]
309    pub metadata: Option<Value>,
310    #[serde(default, skip_serializing_if = "Option::is_none")]
311    pub trust: Option<Trust>,
312    #[serde(default, skip_serializing_if = "Option::is_none")]
313    pub trust_origin: Option<TrustOrigin>,
314}
315
316/// Declarative tool description exposed to the LLM.
317#[derive(Debug, Clone, Serialize, Deserialize)]
318#[serde(default)]
319pub struct ToolSpec {
320    pub name: String,
321    pub description: String,
322    pub schema: Value,
323    #[serde(flatten)]
324    pub extra: BTreeMap<String, Value>,
325    #[serde(default, skip_serializing_if = "Option::is_none")]
326    pub trust: Option<Trust>,
327    #[serde(default, skip_serializing_if = "Option::is_none")]
328    pub trust_origin: Option<TrustOrigin>,
329}
330
331impl Default for ToolSpec {
332    fn default() -> Self {
333        Self {
334            name: String::new(),
335            description: String::new(),
336            schema: Value::Null,
337            extra: BTreeMap::new(),
338            trust: None,
339            trust_origin: None,
340        }
341    }
342}
343
344/// Retrieval or memory source that augments LLM prompts.
345#[derive(Debug, Clone, Serialize, Deserialize)]
346pub struct ContextSource {
347    pub kind: ContextSourceKind,
348    #[serde(default)]
349    pub namespace: Option<String>,
350    #[serde(default)]
351    pub key: Option<String>,
352    #[serde(default)]
353    pub top_k: Option<u32>,
354    #[serde(default)]
355    pub query: Option<String>,
356    #[serde(flatten)]
357    pub extra: BTreeMap<String, Value>,
358}
359
360impl Default for ContextSource {
361    fn default() -> Self {
362        Self {
363            kind: ContextSourceKind::Input,
364            namespace: None,
365            key: None,
366            top_k: None,
367            query: None,
368            extra: BTreeMap::new(),
369        }
370    }
371}
372
373/// Supported context source kinds.
374#[derive(Debug, Clone, Serialize, Deserialize)]
375pub enum ContextSourceKind {
376    #[serde(rename = "memory:kv")]
377    MemoryKv,
378    #[serde(rename = "memory:vector")]
379    MemoryVector,
380    #[serde(rename = "artifact")]
381    Artifact,
382    #[serde(rename = "input")]
383    Input,
384}
385
386/// Schema reference that constrains structured responses.
387#[derive(Debug, Clone, Serialize, Deserialize)]
388#[serde(untagged)]
389pub enum ResponseSchema {
390    Inline(Value),
391    Reference(String),
392}
393
394/// Strategy the LLM should use when deciding whether to call a tool.
395#[derive(Debug, Clone, PartialEq, Eq)]
396pub enum ToolChoice {
397    Auto,
398    None,
399    Required(String),
400}
401
402impl Default for ToolChoice {
403    fn default() -> Self {
404        ToolChoice::Auto
405    }
406}
407
408impl serde::Serialize for ToolChoice {
409    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
410    where
411        S: Serializer,
412    {
413        match self {
414            ToolChoice::Auto => serializer.serialize_str("auto"),
415            ToolChoice::None => serializer.serialize_str("none"),
416            ToolChoice::Required(name) => serializer.serialize_str(&format!("required:{name}")),
417        }
418    }
419}
420
421impl<'de> serde::Deserialize<'de> for ToolChoice {
422    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
423    where
424        D: serde::Deserializer<'de>,
425    {
426        let value = String::deserialize(deserializer)?;
427        let trimmed = value.trim();
428        match trimmed.to_ascii_lowercase().as_str() {
429            "auto" => Ok(ToolChoice::Auto),
430            "none" => Ok(ToolChoice::None),
431            other => {
432                if let Some(name) = other.strip_prefix("required:") {
433                    if name.is_empty() {
434                        return Err(DeError::custom(
435                            "tool_choice 'required:' must include tool name",
436                        ));
437                    }
438                    Ok(ToolChoice::Required(name.to_string()))
439                } else {
440                    Err(DeError::custom(format!(
441                        "unsupported tool_choice '{value}'"
442                    )))
443                }
444            }
445        }
446    }
447}
448
449/// Normalized inputs for an LLM step, extracted from the `inputs` blob.
450#[derive(Debug, Clone, Serialize, Deserialize, Default)]
451#[serde(default)]
452pub struct LlmInputs {
453    pub messages: Option<Vec<ChatMessage>>,
454    pub prompt_ref: Option<String>,
455    pub prompt_params: Option<Value>,
456    pub prompt: Option<String>,
457    pub model: Option<String>,
458    pub temperature: Option<f64>,
459    pub tools: Option<Vec<ToolSpec>>,
460    pub tool_choice: Option<ToolChoice>,
461    pub response_schema: Option<ResponseSchema>,
462    pub response_strict: Option<bool>,
463    pub context_sources: Option<Vec<ContextSource>>,
464    pub params: Option<Value>,
465    #[serde(flatten)]
466    pub extra: BTreeMap<String, Value>,
467}
468
469/// Errors surfaced while decoding a [`StepSpec`] from JSON.
470#[derive(Debug, thiserror::Error)]
471pub enum StepSpecDecodeError {
472    #[error("step spec validation failed: {0:?}")]
473    Validation(#[from] StepSpecError),
474    #[error("step spec deserialization failed: {0}")]
475    Deserialize(#[from] serde_json::Error),
476    #[error("{0}")]
477    FeatureGate(String),
478}
479
480impl StepSpec {
481    /// Validates a JSON blob, enriches it with derived fields, and returns a [`StepSpec`].
482    pub fn try_from_value(value: Value) -> Result<Self, StepSpecDecodeError> {
483        validate_step_spec(&value)?;
484        let mut spec: StepSpec = serde_json::from_value(value)?;
485        enforce_prompt_v2_gate(&spec)?;
486        if spec.r#type == StepType::Llm {
487            let typed: LlmInputs = serde_json::from_value(spec.inputs.clone())?;
488            spec.llm_inputs = Some(typed);
489        }
490        Ok(spec)
491    }
492}
493
494/// Snapshot of a queued step fetched from storage for scheduling.
495#[derive(Debug, Clone)]
496pub struct QueuedStep {
497    pub run_id: RunId,
498    pub step_id: StepId,
499    pub idx: i32,
500    pub priority: i16,
501    pub status: StepStatus,
502    pub attempt: i32,
503    pub created_at: DateTime<Utc>,
504    pub max_attempts: i32,
505    pub retry_backoff_ms: i64,
506    pub retry_backoff_factor: f64,
507    pub not_before: Option<DateTime<Utc>>,
508    pub deadline_at: Option<DateTime<Utc>>,
509    pub checkpoint: Option<Value>,
510    pub compensation_step: Option<StepId>,
511    pub compensation_scheduled: bool,
512    pub spec: StepSpec,
513}
514
515impl TryFrom<fleetforge_storage::models::Step> for QueuedStep {
516    type Error = anyhow::Error;
517
518    fn try_from(value: fleetforge_storage::models::Step) -> Result<Self> {
519        let fleetforge_storage::models::Step {
520            run_id,
521            step_id,
522            idx,
523            priority,
524            spec_json,
525            status,
526            attempt,
527            created_at,
528            max_attempts,
529            retry_backoff_ms,
530            retry_backoff_factor,
531            not_before,
532            deadline_at,
533            leased_by: _,
534            lease_expires_at: _,
535            output_json: _,
536            error_json: _,
537            checkpoint,
538            compensation_step,
539            compensation_scheduled,
540        } = value;
541
542        let spec = StepSpec::try_from_value(spec_json)
543            .map_err(|err| anyhow::anyhow!("invalid step spec stored in DB: {err}"))?;
544
545        Ok(Self {
546            run_id: RunId(run_id),
547            step_id: StepId(step_id),
548            idx,
549            priority,
550            status: StepStatus::from(status),
551            attempt,
552            created_at,
553            max_attempts,
554            retry_backoff_ms,
555            retry_backoff_factor,
556            not_before,
557            deadline_at,
558            checkpoint,
559            compensation_step: compensation_step.map(StepId),
560            compensation_scheduled,
561            spec,
562        })
563    }
564}
565
566#[cfg(test)]
567mod tests {
568    use super::*;
569    use crate::validate_step_spec;
570    use proptest::prelude::*;
571    use serde_json::{json, Value};
572    use std::env;
573    use uuid::Uuid;
574
575    #[test]
576    fn valid_step_spec_passes() {
577        let spec = json!({
578            "id": "550e8400-e29b-41d4-a716-446655440000",
579            "type": "map",
580            "inputs": {
581                "workers": 4
582            }
583        });
584        validate_step_spec(&spec).expect("valid step spec should pass");
585    }
586
587    #[test]
588    fn invalid_step_spec_fails() {
589        let spec = json!({
590            "id": "not-a-uuid",
591            "type": "map",
592            "inputs": {}
593        });
594        let err = validate_step_spec(&spec).expect_err("invalid spec must fail");
595        assert!(!err.details.is_empty());
596        assert!(
597            err.details
598                .iter()
599                .any(|detail| detail.instance_path.contains("id")),
600            "error details should reference the invalid id field"
601        );
602    }
603
604    #[test]
605    fn prompt_v2_fields_require_flag() {
606        env::remove_var(PROMPTS_V2_FLAG);
607        let spec = json!({
608            "id": "00000000-0000-4000-8000-000000000001",
609            "type": "llm",
610            "inputs": {
611                "messages": [
612                    {"role": "user", "content": "Hello"}
613                ]
614            }
615        });
616        let err = StepSpec::try_from_value(spec)
617            .expect_err("feature gate should reject prompts v2 fields");
618        match err {
619            StepSpecDecodeError::FeatureGate(message) => {
620                assert!(message.contains(PROMPTS_V2_FLAG));
621            }
622            other => panic!("expected feature gate error, found {other:?}"),
623        }
624    }
625
626    #[test]
627    fn prompt_v2_fields_parse_with_flag() {
628        env::set_var(PROMPTS_V2_FLAG, "1");
629        let spec_value = json!({
630            "id": "00000000-0000-4000-8000-000000000002",
631            "type": "llm",
632            "inputs": {
633                "messages": [
634                    {"role": "system", "content": "system"},
635                    {"role": "user", "content": "collect"}
636                ],
637                "tools": [
638                    {
639                        "name": "web_search",
640                        "description": "search",
641                        "schema": {"type": "object", "properties": {"query": {"type": "string"}}, "required": ["query"]}
642                    }
643                ],
644                "response_schema": {
645                    "type": "object",
646                    "required": ["insights"],
647                    "properties": {
648                        "insights": {
649                            "type": "array",
650                            "items": {"type": "object", "required": ["summary"], "properties": {"summary": {"type": "string"}}}
651                        }
652                    }
653                },
654                "tool_choice": "auto",
655                "response_strict": true
656            }
657        });
658        let spec = StepSpec::try_from_value(spec_value)
659            .expect("llm inputs should parse with feature flag");
660        let llm = spec.llm_inputs.expect("llm inputs should be populated");
661        let messages = llm.messages.expect("messages present");
662        assert_eq!(messages.len(), 2);
663        let tools = llm.tools.expect("tools present");
664        assert_eq!(tools.len(), 1);
665        assert!(matches!(llm.tool_choice, Some(ToolChoice::Auto)));
666        env::remove_var(PROMPTS_V2_FLAG);
667    }
668
669    fn step_type_strategy() -> impl Strategy<Value> {
670        prop_oneof![
671            Just(json!("llm")),
672            Just(json!("tool")),
673            Just(json!("map")),
674            Just(json!("reduce")),
675            Just(json!("branch")),
676            Just(json!("replay")),
677        ]
678    }
679
680    fn key_strategy() -> impl Strategy<String> {
681        proptest::string::string_regex("[a-zA-Z0-9_]{1,8}").unwrap()
682    }
683
684    fn json_object_strategy() -> impl Strategy<Value> {
685        prop::collection::hash_map(key_strategy(), any_json_value(), 0..4).prop_map(|map| {
686            let map: serde_json::Map<String, Value> = map.into_iter().collect();
687            Value::Object(map)
688        })
689    }
690
691    fn any_json_value() -> impl Strategy<Value> {
692        prop_oneof![
693            Just(Value::Null),
694            any::<bool>().prop_map(Value::Bool),
695            any::<i64>().prop_map(|i| Value::Number(i.into())),
696            any::<f64>()
697                .prop_filter_map("finite", |f| serde_json::Number::from_f64(f))
698                .prop_map(Value::Number),
699            "[a-zA-Z0-9_]{0,12}".prop_map(Value::String),
700            prop::collection::vec(any::<i64>().prop_map(|i| Value::Number(i.into())), 0..4)
701                .prop_map(Value::Array),
702        ]
703    }
704
705    fn valid_spec_strategy() -> impl Strategy<Value> {
706        (
707            any::<Uuid>(),
708            step_type_strategy(),
709            json_object_strategy(),
710            json_object_strategy(),
711        )
712            .prop_map(|(id, ty, inputs, policy)| {
713                let mut obj = serde_json::Map::new();
714                obj.insert("id".into(), Value::String(id.to_string()));
715                obj.insert("type".into(), ty);
716                obj.insert("inputs".into(), inputs);
717                obj.insert("policy".into(), policy);
718                Value::Object(obj)
719            })
720    }
721
722    fn invalid_spec_strategy() -> impl Strategy<Value> {
723        prop_oneof![
724            valid_spec_strategy().prop_map(|mut v| {
725                if let Value::Object(ref mut obj) = v {
726                    obj.remove("id");
727                }
728                v
729            }),
730            valid_spec_strategy().prop_map(|mut v| {
731                if let Value::Object(ref mut obj) = v {
732                    obj.insert("type".to_string(), Value::String("unknown".to_string()));
733                }
734                v
735            }),
736            valid_spec_strategy().prop_map(|mut v| {
737                if let Value::Object(ref mut obj) = v {
738                    obj.insert("inputs".to_string(), Value::Bool(true));
739                }
740                v
741            }),
742        ]
743    }
744
745    proptest! {
746        #[test]
747        fn prop_valid_specs_round_trip(spec in valid_spec_strategy()) {
748            validate_step_spec(&spec).unwrap();
749            let decoded = StepSpec::try_from_value(spec.clone()).expect("decode StepSpec");
750            let reencoded = serde_json::to_value(decoded).expect("encode StepSpec");
751            validate_step_spec(&reencoded).unwrap();
752        }
753
754        #[test]
755        fn prop_invalid_specs_rejected(spec in invalid_spec_strategy()) {
756            prop_assert!(validate_step_spec(&spec).is_err());
757            prop_assert!(StepSpec::try_from_value(spec).is_err());
758        }
759    }
760
761    #[test]
762    fn missing_type_field_is_rejected() {
763        let spec = json!({
764            "id": "550e8400-e29b-41d4-a716-446655440000",
765            "inputs": {}
766        });
767        let err = validate_step_spec(&spec).expect_err("missing type should fail validation");
768        assert!(
769            err.details
770                .iter()
771                .any(|detail| detail.instance_path.contains("/type")),
772            "expected error referencing type field, got {err:?}"
773        );
774    }
775
776    #[test]
777    fn non_object_policy_is_rejected() {
778        let spec = json!({
779            "id": "550e8400-e29b-41d4-a716-446655440001",
780            "type": "tool",
781            "inputs": {},
782            "policy": "invalid"
783        });
784        let err = validate_step_spec(&spec).expect_err("non-object policy should fail validation");
785        assert!(
786            err.details
787                .iter()
788                .any(|detail| detail.instance_path.contains("/policy")),
789            "expected error referencing policy field, got {err:?}"
790        );
791    }
792}
793
794const PROMPTS_V2_FLAG: &str = "FLEETFORGE_PROMPTS_V2";
795
796fn enforce_prompt_v2_gate(spec: &StepSpec) -> Result<(), StepSpecDecodeError> {
797    if prompts_v2_enabled() || spec.r#type != StepType::Llm {
798        return Ok(());
799    }
800    if !has_prompt_v2_fields(&spec.inputs) {
801        return Ok(());
802    }
803    Err(StepSpecDecodeError::FeatureGate(format!(
804        "prompt v2 fields require setting {PROMPTS_V2_FLAG}=1 (or true/yes/on) for step {}",
805        spec.id
806    )))
807}
808
809fn prompts_v2_enabled() -> bool {
810    std::env::var(PROMPTS_V2_FLAG)
811        .ok()
812        .map(|value| match value.trim().to_ascii_lowercase().as_str() {
813            "1" | "true" | "yes" | "on" => true,
814            _ => false,
815        })
816        .unwrap_or(false)
817}
818
819fn has_prompt_v2_fields(inputs: &Value) -> bool {
820    let Some(map) = inputs.as_object() else {
821        return false;
822    };
823    const PROMPT_FIELDS: &[&str] = &[
824        "messages",
825        "prompt_ref",
826        "prompt_params",
827        "tools",
828        "tool_choice",
829        "response_schema",
830        "response_strict",
831        "context_sources",
832    ];
833    map.keys().any(|key| PROMPT_FIELDS.contains(&key.as_str()))
834}