1use std::any::type_name;
2use std::cell::RefCell;
3use std::collections::HashMap;
4use std::marker::PhantomData;
5use std::rc::Rc;
6
7#[cfg(feature = "build")]
8use compiled::CompiledFlow;
9#[cfg(feature = "build")]
10use deploy::{DeployFlow, DeployResult};
11use stageleft::*;
12
13#[cfg(feature = "build")]
14use crate::deploy::{ClusterSpec, Deploy, ExternalSpec, IntoProcessSpec, LocalDeploy};
15use crate::ir::HydroLeaf;
16use crate::location::{Cluster, ExternalProcess, Process};
17use crate::staging_util::Invariant;
18
19#[cfg(feature = "build")]
20pub mod built;
21#[cfg(feature = "build")]
22pub mod compiled;
23#[cfg(feature = "build")]
24pub mod deploy;
25
26pub struct FlowStateInner {
27 pub(crate) leaves: Option<Vec<HydroLeaf>>,
31
32 pub(crate) next_external_out: usize,
34
35 pub(crate) cycle_counts: HashMap<usize, usize>,
37
38 pub(crate) next_clock_id: usize,
40
41 pub(crate) next_node_id: usize,
43}
44
45pub type FlowState = Rc<RefCell<FlowStateInner>>;
46
47pub const FLOW_USED_MESSAGE: &str = "Attempted to add a leaf to a flow that has already been finalized. No leaves can be added after the flow has been compiled.";
48
49pub struct FlowBuilder<'a> {
50 flow_state: FlowState,
51 processes: RefCell<Vec<(usize, String)>>,
52 clusters: RefCell<Vec<(usize, String)>>,
53 externals: RefCell<Vec<(usize, String)>>,
54
55 next_location_id: RefCell<usize>,
56
57 finalized: bool,
60
61 _phantom: Invariant<'a>,
66}
67
68impl Drop for FlowBuilder<'_> {
69 fn drop(&mut self) {
70 if !self.finalized {
71 panic!(
72 "Dropped FlowBuilder without finalizing, you may have forgotten to call `with_default_optimize`, `optimize_with`, or `finalize`."
73 );
74 }
75 }
76}
77
78impl QuotedContext for FlowBuilder<'_> {
79 fn create() -> Self {
80 FlowBuilder::new()
81 }
82}
83
84impl<'a> FlowBuilder<'a> {
85 #[expect(
86 clippy::new_without_default,
87 reason = "call `new` explicitly, not `default`"
88 )]
89 pub fn new() -> FlowBuilder<'a> {
90 FlowBuilder {
91 flow_state: Rc::new(RefCell::new(FlowStateInner {
92 leaves: Some(vec![]),
93 next_external_out: 0,
94 cycle_counts: HashMap::new(),
95 next_clock_id: 0,
96 next_node_id: 0,
97 })),
98 processes: RefCell::new(vec![]),
99 clusters: RefCell::new(vec![]),
100 externals: RefCell::new(vec![]),
101 next_location_id: RefCell::new(0),
102 finalized: false,
103 _phantom: PhantomData,
104 }
105 }
106
107 #[cfg(feature = "build")]
108 pub fn finalize(mut self) -> built::BuiltFlow<'a> {
109 self.finalized = true;
110
111 built::BuiltFlow {
112 ir: self.flow_state.borrow_mut().leaves.take().unwrap(),
113 process_id_name: self.processes.replace(vec![]),
114 cluster_id_name: self.clusters.replace(vec![]),
115 external_id_name: self.externals.replace(vec![]),
116 used: false,
117 _phantom: PhantomData,
118 }
119 }
120
121 #[cfg(feature = "build")]
122 pub fn with_default_optimize<D: LocalDeploy<'a>>(self) -> DeployFlow<'a, D> {
123 self.finalize().with_default_optimize()
124 }
125
126 #[cfg(feature = "build")]
127 pub fn optimize_with(self, f: impl FnOnce(&mut [HydroLeaf])) -> built::BuiltFlow<'a> {
128 self.finalize().optimize_with(f)
129 }
130
131 pub fn flow_state(&self) -> &FlowState {
132 &self.flow_state
133 }
134
135 pub fn process<P>(&self) -> Process<'a, P> {
136 let mut next_location_id = self.next_location_id.borrow_mut();
137 let id = *next_location_id;
138 *next_location_id += 1;
139
140 self.processes
141 .borrow_mut()
142 .push((id, type_name::<P>().to_string()));
143
144 Process {
145 id,
146 flow_state: self.flow_state().clone(),
147 _phantom: PhantomData,
148 }
149 }
150
151 pub fn external_process<P>(&self) -> ExternalProcess<'a, P> {
152 let mut next_location_id = self.next_location_id.borrow_mut();
153 let id = *next_location_id;
154 *next_location_id += 1;
155
156 self.externals
157 .borrow_mut()
158 .push((id, type_name::<P>().to_string()));
159
160 ExternalProcess {
161 id,
162 flow_state: self.flow_state().clone(),
163 _phantom: PhantomData,
164 }
165 }
166
167 pub fn cluster<C>(&self) -> Cluster<'a, C> {
168 let mut next_location_id = self.next_location_id.borrow_mut();
169 let id = *next_location_id;
170 *next_location_id += 1;
171
172 self.clusters
173 .borrow_mut()
174 .push((id, type_name::<C>().to_string()));
175
176 Cluster {
177 id,
178 flow_state: self.flow_state().clone(),
179 _phantom: PhantomData,
180 }
181 }
182
183 #[cfg(feature = "build")]
184 pub fn with_process<P, D: LocalDeploy<'a>>(
185 self,
186 process: &Process<P>,
187 spec: impl IntoProcessSpec<'a, D>,
188 ) -> DeployFlow<'a, D> {
189 self.with_default_optimize().with_process(process, spec)
190 }
191
192 #[cfg(feature = "build")]
193 pub fn with_remaining_processes<D: LocalDeploy<'a>, S: IntoProcessSpec<'a, D> + 'a>(
194 self,
195 spec: impl Fn() -> S,
196 ) -> DeployFlow<'a, D> {
197 self.with_default_optimize().with_remaining_processes(spec)
198 }
199
200 #[cfg(feature = "build")]
201 pub fn with_external<P, D: LocalDeploy<'a>>(
202 self,
203 process: &ExternalProcess<P>,
204 spec: impl ExternalSpec<'a, D>,
205 ) -> DeployFlow<'a, D> {
206 self.with_default_optimize().with_external(process, spec)
207 }
208
209 #[cfg(feature = "build")]
210 pub fn with_remaining_externals<D: LocalDeploy<'a>, S: ExternalSpec<'a, D> + 'a>(
211 self,
212 spec: impl Fn() -> S,
213 ) -> DeployFlow<'a, D> {
214 self.with_default_optimize().with_remaining_externals(spec)
215 }
216
217 #[cfg(feature = "build")]
218 pub fn with_cluster<C, D: LocalDeploy<'a>>(
219 self,
220 cluster: &Cluster<C>,
221 spec: impl ClusterSpec<'a, D>,
222 ) -> DeployFlow<'a, D> {
223 self.with_default_optimize().with_cluster(cluster, spec)
224 }
225
226 #[cfg(feature = "build")]
227 pub fn with_remaining_clusters<D: LocalDeploy<'a>, S: ClusterSpec<'a, D> + 'a>(
228 self,
229 spec: impl Fn() -> S,
230 ) -> DeployFlow<'a, D> {
231 self.with_default_optimize().with_remaining_clusters(spec)
232 }
233
234 #[cfg(feature = "build")]
235 pub fn compile<D: Deploy<'a>>(self, env: &D::CompileEnv) -> CompiledFlow<'a, D::GraphId> {
236 self.with_default_optimize::<D>().compile(env)
237 }
238
239 #[cfg(feature = "build")]
240 pub fn compile_no_network<D: LocalDeploy<'a>>(self) -> CompiledFlow<'a, D::GraphId> {
241 self.with_default_optimize::<D>().compile_no_network()
242 }
243
244 #[cfg(feature = "build")]
245 pub fn deploy<D: Deploy<'a, CompileEnv = ()>>(
246 self,
247 env: &mut D::InstantiateEnv,
248 ) -> DeployResult<'a, D> {
249 self.with_default_optimize().deploy(env)
250 }
251}