1use std::collections::BTreeMap;
2use std::fmt;
3
4use anyhow::{Context, Result};
5use chrono::{DateTime, Utc};
6use serde::de::Error as DeError;
7use serde::ser::Serializer;
8use serde::{Deserialize, Serialize};
9use serde_json::Value;
10use uuid::Uuid;
11
12use crate::{validate_step_spec, StepSpecError};
13use fleetforge_trust::{Trust, TrustOrigin};
14
15#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)]
17#[serde(transparent)]
18pub struct RunId(pub Uuid);
19
20impl From<Uuid> for RunId {
21 fn from(value: Uuid) -> Self {
22 Self(value)
23 }
24}
25
26impl From<RunId> for Uuid {
27 fn from(value: RunId) -> Self {
28 value.0
29 }
30}
31
32impl fmt::Display for RunId {
33 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
34 write!(f, "{}", self.0)
35 }
36}
37
38#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)]
40#[serde(transparent)]
41pub struct StepId(pub Uuid);
42
43impl From<Uuid> for StepId {
44 fn from(value: Uuid) -> Self {
45 Self(value)
46 }
47}
48
49impl From<StepId> for Uuid {
50 fn from(value: StepId) -> Self {
51 value.0
52 }
53}
54
55impl fmt::Display for StepId {
56 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
57 write!(f, "{}", self.0)
58 }
59}
60
61#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Hash)]
63#[serde(rename_all = "snake_case")]
64pub enum RunStatus {
65 Pending,
66 Running,
67 Succeeded,
68 Failed,
69 Canceled,
70 PausedAtStep,
71}
72
73impl RunStatus {
74 pub fn as_str(self) -> &'static str {
75 match self {
76 RunStatus::Pending => "pending",
77 RunStatus::Running => "running",
78 RunStatus::Succeeded => "succeeded",
79 RunStatus::Failed => "failed",
80 RunStatus::Canceled => "canceled",
81 RunStatus::PausedAtStep => "paused_at_step",
82 }
83 }
84}
85
86impl From<fleetforge_storage::models::RunStatus> for RunStatus {
87 fn from(value: fleetforge_storage::models::RunStatus) -> Self {
88 match value {
89 fleetforge_storage::models::RunStatus::Pending => RunStatus::Pending,
90 fleetforge_storage::models::RunStatus::Running => RunStatus::Running,
91 fleetforge_storage::models::RunStatus::Succeeded => RunStatus::Succeeded,
92 fleetforge_storage::models::RunStatus::Failed => RunStatus::Failed,
93 fleetforge_storage::models::RunStatus::Canceled => RunStatus::Canceled,
94 fleetforge_storage::models::RunStatus::PausedAtStep => RunStatus::PausedAtStep,
95 }
96 }
97}
98
99impl From<RunStatus> for fleetforge_storage::models::RunStatus {
100 fn from(value: RunStatus) -> Self {
101 match value {
102 RunStatus::Pending => fleetforge_storage::models::RunStatus::Pending,
103 RunStatus::Running => fleetforge_storage::models::RunStatus::Running,
104 RunStatus::Succeeded => fleetforge_storage::models::RunStatus::Succeeded,
105 RunStatus::Failed => fleetforge_storage::models::RunStatus::Failed,
106 RunStatus::Canceled => fleetforge_storage::models::RunStatus::Canceled,
107 RunStatus::PausedAtStep => fleetforge_storage::models::RunStatus::PausedAtStep,
108 }
109 }
110}
111
112#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Hash)]
114#[serde(rename_all = "snake_case")]
115pub enum StepStatus {
116 Queued,
117 Blocked,
118 Leased,
119 Running,
120 Succeeded,
121 Failed,
122 Skipped,
123}
124
125impl StepStatus {
126 pub fn as_str(self) -> &'static str {
127 match self {
128 StepStatus::Queued => "queued",
129 StepStatus::Blocked => "blocked",
130 StepStatus::Leased => "leased",
131 StepStatus::Running => "running",
132 StepStatus::Succeeded => "succeeded",
133 StepStatus::Failed => "failed",
134 StepStatus::Skipped => "skipped",
135 }
136 }
137}
138
139impl From<fleetforge_storage::models::StepStatus> for StepStatus {
140 fn from(value: fleetforge_storage::models::StepStatus) -> Self {
141 match value {
142 fleetforge_storage::models::StepStatus::Queued => StepStatus::Queued,
143 fleetforge_storage::models::StepStatus::Blocked => StepStatus::Blocked,
144 fleetforge_storage::models::StepStatus::Leased => StepStatus::Leased,
145 fleetforge_storage::models::StepStatus::Running => StepStatus::Running,
146 fleetforge_storage::models::StepStatus::Succeeded => StepStatus::Succeeded,
147 fleetforge_storage::models::StepStatus::Failed => StepStatus::Failed,
148 fleetforge_storage::models::StepStatus::Skipped => StepStatus::Skipped,
149 }
150 }
151}
152
153impl From<StepStatus> for fleetforge_storage::models::StepStatus {
154 fn from(value: StepStatus) -> Self {
155 match value {
156 StepStatus::Queued => fleetforge_storage::models::StepStatus::Queued,
157 StepStatus::Blocked => fleetforge_storage::models::StepStatus::Blocked,
158 StepStatus::Leased => fleetforge_storage::models::StepStatus::Leased,
159 StepStatus::Running => fleetforge_storage::models::StepStatus::Running,
160 StepStatus::Succeeded => fleetforge_storage::models::StepStatus::Succeeded,
161 StepStatus::Failed => fleetforge_storage::models::StepStatus::Failed,
162 StepStatus::Skipped => fleetforge_storage::models::StepStatus::Skipped,
163 }
164 }
165}
166
167#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Hash)]
169#[serde(rename_all = "snake_case")]
170pub enum StepType {
171 Llm,
172 Tool,
173 Http,
174 Map,
175 Reduce,
176 Branch,
177 Replay,
178 Agent,
179}
180
181impl StepType {
182 pub fn as_str(self) -> &'static str {
183 match self {
184 StepType::Llm => "llm",
185 StepType::Tool => "tool",
186 StepType::Http => "http",
187 StepType::Map => "map",
188 StepType::Reduce => "reduce",
189 StepType::Branch => "branch",
190 StepType::Replay => "replay",
191 StepType::Agent => "agent",
192 }
193 }
194}
195
196impl std::str::FromStr for StepType {
197 type Err = anyhow::Error;
198
199 fn from_str(s: &str) -> Result<Self, Self::Err> {
200 match s {
201 "llm" => Ok(StepType::Llm),
202 "tool" => Ok(StepType::Tool),
203 "http" => Ok(StepType::Http),
204 "map" => Ok(StepType::Map),
205 "reduce" => Ok(StepType::Reduce),
206 "branch" => Ok(StepType::Branch),
207 "replay" => Ok(StepType::Replay),
208 "agent" => Ok(StepType::Agent),
209 other => Err(anyhow::anyhow!("unknown step type '{}'", other)),
210 }
211 }
212}
213
214impl fmt::Display for StepType {
215 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
216 write!(f, "{}", self.as_str())
217 }
218}
219
220#[derive(Debug, Clone, Serialize, Deserialize)]
222pub struct StepSpec {
223 pub id: StepId,
224 #[serde(rename = "type")]
225 pub r#type: StepType,
226 pub inputs: Value,
227 #[serde(default = "default_policy_blob")]
228 pub policy: Value,
229 #[serde(default)]
230 pub execution: StepExecution,
231 #[serde(default, skip_serializing_if = "Option::is_none")]
232 pub slug: Option<String>,
233 #[serde(default, skip_serializing_if = "Option::is_none")]
234 pub trust: Option<Trust>,
235 #[serde(default, skip_serializing_if = "Option::is_none")]
236 pub trust_origin: Option<TrustOrigin>,
237 #[serde(skip)]
238 pub llm_inputs: Option<LlmInputs>,
239}
240
241fn default_policy_blob() -> Value {
242 Value::Object(Default::default())
243}
244
245#[derive(Debug, Clone, Serialize, Deserialize)]
247#[serde(default)]
248pub struct StepExecution {
249 pub max_attempts: u32,
250 pub retry_backoff_ms: u64,
251 pub retry_backoff_factor: f64,
252 pub deadline_ms: Option<u64>,
253 pub checkpoint: bool,
254 pub compensation: Option<CompensationSpec>,
255 pub cost_hint: Option<CostHint>,
256}
257
258impl Default for StepExecution {
259 fn default() -> Self {
260 Self {
261 max_attempts: 1,
262 retry_backoff_ms: 0,
263 retry_backoff_factor: 1.0,
264 deadline_ms: None,
265 checkpoint: false,
266 compensation: None,
267 cost_hint: None,
268 }
269 }
270}
271
272#[derive(Debug, Clone, Serialize, Deserialize)]
274pub struct CompensationSpec {
275 pub step: String,
276 #[serde(default, skip_serializing_if = "Option::is_none")]
277 pub reason: Option<String>,
278}
279
280#[derive(Debug, Clone, Serialize, Deserialize, Default)]
282pub struct CostHint {
283 #[serde(default, skip_serializing_if = "Option::is_none")]
284 pub tokens: Option<i64>,
285 #[serde(default, skip_serializing_if = "Option::is_none")]
286 pub usd: Option<f64>,
287}
288
289#[derive(Debug, Clone, Serialize, Deserialize)]
291#[serde(rename_all = "snake_case")]
292pub enum ChatRole {
293 System,
294 User,
295 Assistant,
296 Tool,
297}
298
299#[derive(Debug, Clone, Serialize, Deserialize)]
301pub struct ChatMessage {
302 pub role: ChatRole,
303 pub content: Value,
304 #[serde(default)]
305 pub name: Option<String>,
306 #[serde(default)]
307 pub tool_call_id: Option<String>,
308 #[serde(default)]
309 pub metadata: Option<Value>,
310 #[serde(default, skip_serializing_if = "Option::is_none")]
311 pub trust: Option<Trust>,
312 #[serde(default, skip_serializing_if = "Option::is_none")]
313 pub trust_origin: Option<TrustOrigin>,
314}
315
316#[derive(Debug, Clone, Serialize, Deserialize)]
318#[serde(default)]
319pub struct ToolSpec {
320 pub name: String,
321 pub description: String,
322 pub schema: Value,
323 #[serde(flatten)]
324 pub extra: BTreeMap<String, Value>,
325 #[serde(default, skip_serializing_if = "Option::is_none")]
326 pub trust: Option<Trust>,
327 #[serde(default, skip_serializing_if = "Option::is_none")]
328 pub trust_origin: Option<TrustOrigin>,
329}
330
331impl Default for ToolSpec {
332 fn default() -> Self {
333 Self {
334 name: String::new(),
335 description: String::new(),
336 schema: Value::Null,
337 extra: BTreeMap::new(),
338 trust: None,
339 trust_origin: None,
340 }
341 }
342}
343
344#[derive(Debug, Clone, Serialize, Deserialize)]
346pub struct ContextSource {
347 pub kind: ContextSourceKind,
348 #[serde(default)]
349 pub namespace: Option<String>,
350 #[serde(default)]
351 pub key: Option<String>,
352 #[serde(default)]
353 pub top_k: Option<u32>,
354 #[serde(default)]
355 pub query: Option<String>,
356 #[serde(flatten)]
357 pub extra: BTreeMap<String, Value>,
358}
359
360impl Default for ContextSource {
361 fn default() -> Self {
362 Self {
363 kind: ContextSourceKind::Input,
364 namespace: None,
365 key: None,
366 top_k: None,
367 query: None,
368 extra: BTreeMap::new(),
369 }
370 }
371}
372
373#[derive(Debug, Clone, Serialize, Deserialize)]
375pub enum ContextSourceKind {
376 #[serde(rename = "memory:kv")]
377 MemoryKv,
378 #[serde(rename = "memory:vector")]
379 MemoryVector,
380 #[serde(rename = "artifact")]
381 Artifact,
382 #[serde(rename = "input")]
383 Input,
384}
385
386#[derive(Debug, Clone, Serialize, Deserialize)]
388#[serde(untagged)]
389pub enum ResponseSchema {
390 Inline(Value),
391 Reference(String),
392}
393
394#[derive(Debug, Clone, PartialEq, Eq)]
396pub enum ToolChoice {
397 Auto,
398 None,
399 Required(String),
400}
401
402impl Default for ToolChoice {
403 fn default() -> Self {
404 ToolChoice::Auto
405 }
406}
407
408impl serde::Serialize for ToolChoice {
409 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
410 where
411 S: Serializer,
412 {
413 match self {
414 ToolChoice::Auto => serializer.serialize_str("auto"),
415 ToolChoice::None => serializer.serialize_str("none"),
416 ToolChoice::Required(name) => serializer.serialize_str(&format!("required:{name}")),
417 }
418 }
419}
420
421impl<'de> serde::Deserialize<'de> for ToolChoice {
422 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
423 where
424 D: serde::Deserializer<'de>,
425 {
426 let value = String::deserialize(deserializer)?;
427 let trimmed = value.trim();
428 match trimmed.to_ascii_lowercase().as_str() {
429 "auto" => Ok(ToolChoice::Auto),
430 "none" => Ok(ToolChoice::None),
431 other => {
432 if let Some(name) = other.strip_prefix("required:") {
433 if name.is_empty() {
434 return Err(DeError::custom(
435 "tool_choice 'required:' must include tool name",
436 ));
437 }
438 Ok(ToolChoice::Required(name.to_string()))
439 } else {
440 Err(DeError::custom(format!(
441 "unsupported tool_choice '{value}'"
442 )))
443 }
444 }
445 }
446 }
447}
448
449#[derive(Debug, Clone, Serialize, Deserialize, Default)]
451#[serde(default)]
452pub struct LlmInputs {
453 pub messages: Option<Vec<ChatMessage>>,
454 pub prompt_ref: Option<String>,
455 pub prompt_params: Option<Value>,
456 pub prompt: Option<String>,
457 pub model: Option<String>,
458 pub temperature: Option<f64>,
459 pub tools: Option<Vec<ToolSpec>>,
460 pub tool_choice: Option<ToolChoice>,
461 pub response_schema: Option<ResponseSchema>,
462 pub response_strict: Option<bool>,
463 pub context_sources: Option<Vec<ContextSource>>,
464 pub params: Option<Value>,
465 #[serde(flatten)]
466 pub extra: BTreeMap<String, Value>,
467}
468
469#[derive(Debug, thiserror::Error)]
471pub enum StepSpecDecodeError {
472 #[error("step spec validation failed: {0:?}")]
473 Validation(#[from] StepSpecError),
474 #[error("step spec deserialization failed: {0}")]
475 Deserialize(#[from] serde_json::Error),
476 #[error("{0}")]
477 FeatureGate(String),
478}
479
480impl StepSpec {
481 pub fn try_from_value(value: Value) -> Result<Self, StepSpecDecodeError> {
483 validate_step_spec(&value)?;
484 let mut spec: StepSpec = serde_json::from_value(value)?;
485 enforce_prompt_v2_gate(&spec)?;
486 if spec.r#type == StepType::Llm {
487 let typed: LlmInputs = serde_json::from_value(spec.inputs.clone())?;
488 spec.llm_inputs = Some(typed);
489 }
490 Ok(spec)
491 }
492}
493
494#[derive(Debug, Clone)]
496pub struct QueuedStep {
497 pub run_id: RunId,
498 pub step_id: StepId,
499 pub idx: i32,
500 pub priority: i16,
501 pub status: StepStatus,
502 pub attempt: i32,
503 pub created_at: DateTime<Utc>,
504 pub max_attempts: i32,
505 pub retry_backoff_ms: i64,
506 pub retry_backoff_factor: f64,
507 pub not_before: Option<DateTime<Utc>>,
508 pub deadline_at: Option<DateTime<Utc>>,
509 pub checkpoint: Option<Value>,
510 pub compensation_step: Option<StepId>,
511 pub compensation_scheduled: bool,
512 pub spec: StepSpec,
513}
514
515impl TryFrom<fleetforge_storage::models::Step> for QueuedStep {
516 type Error = anyhow::Error;
517
518 fn try_from(value: fleetforge_storage::models::Step) -> Result<Self> {
519 let fleetforge_storage::models::Step {
520 run_id,
521 step_id,
522 idx,
523 priority,
524 spec_json,
525 status,
526 attempt,
527 created_at,
528 max_attempts,
529 retry_backoff_ms,
530 retry_backoff_factor,
531 not_before,
532 deadline_at,
533 leased_by: _,
534 lease_expires_at: _,
535 output_json: _,
536 error_json: _,
537 checkpoint,
538 compensation_step,
539 compensation_scheduled,
540 } = value;
541
542 let spec = StepSpec::try_from_value(spec_json)
543 .map_err(|err| anyhow::anyhow!("invalid step spec stored in DB: {err}"))?;
544
545 Ok(Self {
546 run_id: RunId(run_id),
547 step_id: StepId(step_id),
548 idx,
549 priority,
550 status: StepStatus::from(status),
551 attempt,
552 created_at,
553 max_attempts,
554 retry_backoff_ms,
555 retry_backoff_factor,
556 not_before,
557 deadline_at,
558 checkpoint,
559 compensation_step: compensation_step.map(StepId),
560 compensation_scheduled,
561 spec,
562 })
563 }
564}
565
566#[cfg(test)]
567mod tests {
568 use super::*;
569 use crate::validate_step_spec;
570 use proptest::prelude::*;
571 use serde_json::{json, Value};
572 use std::env;
573 use uuid::Uuid;
574
575 #[test]
576 fn valid_step_spec_passes() {
577 let spec = json!({
578 "id": "550e8400-e29b-41d4-a716-446655440000",
579 "type": "map",
580 "inputs": {
581 "workers": 4
582 }
583 });
584 validate_step_spec(&spec).expect("valid step spec should pass");
585 }
586
587 #[test]
588 fn invalid_step_spec_fails() {
589 let spec = json!({
590 "id": "not-a-uuid",
591 "type": "map",
592 "inputs": {}
593 });
594 let err = validate_step_spec(&spec).expect_err("invalid spec must fail");
595 assert!(!err.details.is_empty());
596 assert!(
597 err.details
598 .iter()
599 .any(|detail| detail.instance_path.contains("id")),
600 "error details should reference the invalid id field"
601 );
602 }
603
604 #[test]
605 fn prompt_v2_fields_require_flag() {
606 env::remove_var(PROMPTS_V2_FLAG);
607 let spec = json!({
608 "id": "00000000-0000-4000-8000-000000000001",
609 "type": "llm",
610 "inputs": {
611 "messages": [
612 {"role": "user", "content": "Hello"}
613 ]
614 }
615 });
616 let err = StepSpec::try_from_value(spec)
617 .expect_err("feature gate should reject prompts v2 fields");
618 match err {
619 StepSpecDecodeError::FeatureGate(message) => {
620 assert!(message.contains(PROMPTS_V2_FLAG));
621 }
622 other => panic!("expected feature gate error, found {other:?}"),
623 }
624 }
625
626 #[test]
627 fn prompt_v2_fields_parse_with_flag() {
628 env::set_var(PROMPTS_V2_FLAG, "1");
629 let spec_value = json!({
630 "id": "00000000-0000-4000-8000-000000000002",
631 "type": "llm",
632 "inputs": {
633 "messages": [
634 {"role": "system", "content": "system"},
635 {"role": "user", "content": "collect"}
636 ],
637 "tools": [
638 {
639 "name": "web_search",
640 "description": "search",
641 "schema": {"type": "object", "properties": {"query": {"type": "string"}}, "required": ["query"]}
642 }
643 ],
644 "response_schema": {
645 "type": "object",
646 "required": ["insights"],
647 "properties": {
648 "insights": {
649 "type": "array",
650 "items": {"type": "object", "required": ["summary"], "properties": {"summary": {"type": "string"}}}
651 }
652 }
653 },
654 "tool_choice": "auto",
655 "response_strict": true
656 }
657 });
658 let spec = StepSpec::try_from_value(spec_value)
659 .expect("llm inputs should parse with feature flag");
660 let llm = spec.llm_inputs.expect("llm inputs should be populated");
661 let messages = llm.messages.expect("messages present");
662 assert_eq!(messages.len(), 2);
663 let tools = llm.tools.expect("tools present");
664 assert_eq!(tools.len(), 1);
665 assert!(matches!(llm.tool_choice, Some(ToolChoice::Auto)));
666 env::remove_var(PROMPTS_V2_FLAG);
667 }
668
669 fn step_type_strategy() -> impl Strategy<Value> {
670 prop_oneof![
671 Just(json!("llm")),
672 Just(json!("tool")),
673 Just(json!("map")),
674 Just(json!("reduce")),
675 Just(json!("branch")),
676 Just(json!("replay")),
677 ]
678 }
679
680 fn key_strategy() -> impl Strategy<String> {
681 proptest::string::string_regex("[a-zA-Z0-9_]{1,8}").unwrap()
682 }
683
684 fn json_object_strategy() -> impl Strategy<Value> {
685 prop::collection::hash_map(key_strategy(), any_json_value(), 0..4).prop_map(|map| {
686 let map: serde_json::Map<String, Value> = map.into_iter().collect();
687 Value::Object(map)
688 })
689 }
690
691 fn any_json_value() -> impl Strategy<Value> {
692 prop_oneof![
693 Just(Value::Null),
694 any::<bool>().prop_map(Value::Bool),
695 any::<i64>().prop_map(|i| Value::Number(i.into())),
696 any::<f64>()
697 .prop_filter_map("finite", |f| serde_json::Number::from_f64(f))
698 .prop_map(Value::Number),
699 "[a-zA-Z0-9_]{0,12}".prop_map(Value::String),
700 prop::collection::vec(any::<i64>().prop_map(|i| Value::Number(i.into())), 0..4)
701 .prop_map(Value::Array),
702 ]
703 }
704
705 fn valid_spec_strategy() -> impl Strategy<Value> {
706 (
707 any::<Uuid>(),
708 step_type_strategy(),
709 json_object_strategy(),
710 json_object_strategy(),
711 )
712 .prop_map(|(id, ty, inputs, policy)| {
713 let mut obj = serde_json::Map::new();
714 obj.insert("id".into(), Value::String(id.to_string()));
715 obj.insert("type".into(), ty);
716 obj.insert("inputs".into(), inputs);
717 obj.insert("policy".into(), policy);
718 Value::Object(obj)
719 })
720 }
721
722 fn invalid_spec_strategy() -> impl Strategy<Value> {
723 prop_oneof![
724 valid_spec_strategy().prop_map(|mut v| {
725 if let Value::Object(ref mut obj) = v {
726 obj.remove("id");
727 }
728 v
729 }),
730 valid_spec_strategy().prop_map(|mut v| {
731 if let Value::Object(ref mut obj) = v {
732 obj.insert("type".to_string(), Value::String("unknown".to_string()));
733 }
734 v
735 }),
736 valid_spec_strategy().prop_map(|mut v| {
737 if let Value::Object(ref mut obj) = v {
738 obj.insert("inputs".to_string(), Value::Bool(true));
739 }
740 v
741 }),
742 ]
743 }
744
745 proptest! {
746 #[test]
747 fn prop_valid_specs_round_trip(spec in valid_spec_strategy()) {
748 validate_step_spec(&spec).unwrap();
749 let decoded = StepSpec::try_from_value(spec.clone()).expect("decode StepSpec");
750 let reencoded = serde_json::to_value(decoded).expect("encode StepSpec");
751 validate_step_spec(&reencoded).unwrap();
752 }
753
754 #[test]
755 fn prop_invalid_specs_rejected(spec in invalid_spec_strategy()) {
756 prop_assert!(validate_step_spec(&spec).is_err());
757 prop_assert!(StepSpec::try_from_value(spec).is_err());
758 }
759 }
760
761 #[test]
762 fn missing_type_field_is_rejected() {
763 let spec = json!({
764 "id": "550e8400-e29b-41d4-a716-446655440000",
765 "inputs": {}
766 });
767 let err = validate_step_spec(&spec).expect_err("missing type should fail validation");
768 assert!(
769 err.details
770 .iter()
771 .any(|detail| detail.instance_path.contains("/type")),
772 "expected error referencing type field, got {err:?}"
773 );
774 }
775
776 #[test]
777 fn non_object_policy_is_rejected() {
778 let spec = json!({
779 "id": "550e8400-e29b-41d4-a716-446655440001",
780 "type": "tool",
781 "inputs": {},
782 "policy": "invalid"
783 });
784 let err = validate_step_spec(&spec).expect_err("non-object policy should fail validation");
785 assert!(
786 err.details
787 .iter()
788 .any(|detail| detail.instance_path.contains("/policy")),
789 "expected error referencing policy field, got {err:?}"
790 );
791 }
792}
793
794const PROMPTS_V2_FLAG: &str = "FLEETFORGE_PROMPTS_V2";
795
796fn enforce_prompt_v2_gate(spec: &StepSpec) -> Result<(), StepSpecDecodeError> {
797 if prompts_v2_enabled() || spec.r#type != StepType::Llm {
798 return Ok(());
799 }
800 if !has_prompt_v2_fields(&spec.inputs) {
801 return Ok(());
802 }
803 Err(StepSpecDecodeError::FeatureGate(format!(
804 "prompt v2 fields require setting {PROMPTS_V2_FLAG}=1 (or true/yes/on) for step {}",
805 spec.id
806 )))
807}
808
809fn prompts_v2_enabled() -> bool {
810 std::env::var(PROMPTS_V2_FLAG)
811 .ok()
812 .map(|value| match value.trim().to_ascii_lowercase().as_str() {
813 "1" | "true" | "yes" | "on" => true,
814 _ => false,
815 })
816 .unwrap_or(false)
817}
818
819fn has_prompt_v2_fields(inputs: &Value) -> bool {
820 let Some(map) = inputs.as_object() else {
821 return false;
822 };
823 const PROMPT_FIELDS: &[&str] = &[
824 "messages",
825 "prompt_ref",
826 "prompt_params",
827 "tools",
828 "tool_choice",
829 "response_schema",
830 "response_strict",
831 "context_sources",
832 ];
833 map.keys().any(|key| PROMPT_FIELDS.contains(&key.as_str()))
834}