1use std::future::Future;
2use std::io::Error;
3use std::pin::Pin;
4
5use dfir_lang::graph::DfirGraph;
6use dfir_rs::bytes::Bytes;
7use dfir_rs::futures::{Sink, Stream};
8use serde::Serialize;
9use serde::de::DeserializeOwned;
10use stageleft::QuotedWithContext;
11
12pub mod macro_runtime;
13pub use macro_runtime::*;
14
15#[cfg(feature = "deploy")]
16#[cfg(stageleft_runtime)]
17pub(crate) mod trybuild;
18
19#[cfg(feature = "deploy")]
20#[cfg(stageleft_runtime)]
21mod trybuild_rewriters;
22
23#[cfg(feature = "deploy")]
24#[cfg(stageleft_runtime)]
25pub use trybuild::init_test;
26
27#[cfg(feature = "deploy")]
28#[cfg(stageleft_runtime)]
29pub mod deploy_graph;
30
31#[cfg(feature = "deploy")]
32#[cfg(stageleft_runtime)]
33pub use deploy_graph::*;
34
35pub mod in_memory_graph;
36pub use in_memory_graph::*;
37
38pub trait LocalDeploy<'a> {
39 type Process: Node<Meta = Self::Meta>;
40 type Cluster: Node<Meta = Self::Meta>;
41 type ExternalProcess: Node<Meta = Self::Meta>;
42 type Meta: Default;
43 type GraphId;
44
45 fn has_trivial_node() -> bool {
46 false
47 }
48
49 fn trivial_process(_id: usize) -> Self::Process {
50 panic!("No trivial process")
51 }
52
53 fn trivial_cluster(_id: usize) -> Self::Cluster {
54 panic!("No trivial cluster")
55 }
56
57 fn trivial_external(_id: usize) -> Self::ExternalProcess {
58 panic!("No trivial external")
59 }
60}
61
62pub trait Deploy<'a> {
63 type InstantiateEnv;
64 type CompileEnv;
65
66 type Process: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv> + Clone;
67 type Cluster: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv> + Clone;
68 type ExternalProcess: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv>
69 + RegisterPort<'a, Self>;
70 type Port: Clone;
71 type ExternalRawPort;
72 type Meta: Default;
73
74 type GraphId;
76
77 fn has_trivial_node() -> bool {
78 false
79 }
80
81 fn trivial_process(_id: usize) -> Self::Process {
82 panic!("No trivial process")
83 }
84
85 fn trivial_cluster(_id: usize) -> Self::Cluster {
86 panic!("No trivial cluster")
87 }
88
89 fn allocate_process_port(process: &Self::Process) -> Self::Port;
90 fn allocate_cluster_port(cluster: &Self::Cluster) -> Self::Port;
91 fn allocate_external_port(external: &Self::ExternalProcess) -> Self::Port;
92
93 fn o2o_sink_source(
94 compile_env: &Self::CompileEnv,
95 p1: &Self::Process,
96 p1_port: &Self::Port,
97 p2: &Self::Process,
98 p2_port: &Self::Port,
99 ) -> (syn::Expr, syn::Expr);
100 fn o2o_connect(
101 p1: &Self::Process,
102 p1_port: &Self::Port,
103 p2: &Self::Process,
104 p2_port: &Self::Port,
105 ) -> Box<dyn FnOnce()>;
106
107 fn o2m_sink_source(
108 compile_env: &Self::CompileEnv,
109 p1: &Self::Process,
110 p1_port: &Self::Port,
111 c2: &Self::Cluster,
112 c2_port: &Self::Port,
113 ) -> (syn::Expr, syn::Expr);
114 fn o2m_connect(
115 p1: &Self::Process,
116 p1_port: &Self::Port,
117 c2: &Self::Cluster,
118 c2_port: &Self::Port,
119 ) -> Box<dyn FnOnce()>;
120
121 fn m2o_sink_source(
122 compile_env: &Self::CompileEnv,
123 c1: &Self::Cluster,
124 c1_port: &Self::Port,
125 p2: &Self::Process,
126 p2_port: &Self::Port,
127 ) -> (syn::Expr, syn::Expr);
128 fn m2o_connect(
129 c1: &Self::Cluster,
130 c1_port: &Self::Port,
131 p2: &Self::Process,
132 p2_port: &Self::Port,
133 ) -> Box<dyn FnOnce()>;
134
135 fn m2m_sink_source(
136 compile_env: &Self::CompileEnv,
137 c1: &Self::Cluster,
138 c1_port: &Self::Port,
139 c2: &Self::Cluster,
140 c2_port: &Self::Port,
141 ) -> (syn::Expr, syn::Expr);
142 fn m2m_connect(
143 c1: &Self::Cluster,
144 c1_port: &Self::Port,
145 c2: &Self::Cluster,
146 c2_port: &Self::Port,
147 ) -> Box<dyn FnOnce()>;
148
149 fn e2o_source(
150 compile_env: &Self::CompileEnv,
151 p1: &Self::ExternalProcess,
152 p1_port: &Self::Port,
153 p2: &Self::Process,
154 p2_port: &Self::Port,
155 ) -> syn::Expr;
156 fn e2o_connect(
157 p1: &Self::ExternalProcess,
158 p1_port: &Self::Port,
159 p2: &Self::Process,
160 p2_port: &Self::Port,
161 ) -> Box<dyn FnOnce()>;
162
163 fn o2e_sink(
164 compile_env: &Self::CompileEnv,
165 p1: &Self::Process,
166 p1_port: &Self::Port,
167 p2: &Self::ExternalProcess,
168 p2_port: &Self::Port,
169 ) -> syn::Expr;
170 fn o2e_connect(
171 p1: &Self::Process,
172 p1_port: &Self::Port,
173 p2: &Self::ExternalProcess,
174 p2_port: &Self::Port,
175 ) -> Box<dyn FnOnce()>;
176
177 fn cluster_ids(
178 env: &Self::CompileEnv,
179 of_cluster: usize,
180 ) -> impl QuotedWithContext<'a, &'a [u32], ()> + Copy + 'a;
181 fn cluster_self_id(env: &Self::CompileEnv) -> impl QuotedWithContext<'a, u32, ()> + Copy + 'a;
182}
183
184impl<
185 'a,
186 T: Deploy<'a, Process = N, Cluster = C, ExternalProcess = E, Meta = M, GraphId = R>,
187 N: Node<Meta = M>,
188 C: Node<Meta = M>,
189 E: Node<Meta = M>,
190 M: Default,
191 R,
192> LocalDeploy<'a> for T
193{
194 type Process = N;
195 type Cluster = C;
196 type ExternalProcess = E;
197 type Meta = M;
198 type GraphId = R;
199
200 fn has_trivial_node() -> bool {
201 <T as Deploy<'a>>::has_trivial_node()
202 }
203
204 fn trivial_process(id: usize) -> Self::Process {
205 <T as Deploy<'a>>::trivial_process(id)
206 }
207
208 fn trivial_cluster(id: usize) -> Self::Cluster {
209 <T as Deploy<'a>>::trivial_cluster(id)
210 }
211}
212
213pub trait ProcessSpec<'a, D: LocalDeploy<'a> + ?Sized> {
214 fn build(self, id: usize, name_hint: &str) -> D::Process;
215}
216
217pub trait IntoProcessSpec<'a, D: LocalDeploy<'a> + ?Sized> {
218 type ProcessSpec: ProcessSpec<'a, D>;
219 fn into_process_spec(self) -> Self::ProcessSpec;
220}
221
222impl<'a, D: LocalDeploy<'a> + ?Sized, T: ProcessSpec<'a, D>> IntoProcessSpec<'a, D> for T {
223 type ProcessSpec = T;
224 fn into_process_spec(self) -> Self::ProcessSpec {
225 self
226 }
227}
228
229pub trait ClusterSpec<'a, D: LocalDeploy<'a> + ?Sized> {
230 fn build(self, id: usize, name_hint: &str) -> D::Cluster;
231}
232
233pub trait ExternalSpec<'a, D: LocalDeploy<'a> + ?Sized> {
234 fn build(self, id: usize, name_hint: &str) -> D::ExternalProcess;
235}
236
237pub trait Node {
238 type Port;
239 type Meta;
240 type InstantiateEnv;
241
242 fn next_port(&self) -> Self::Port;
243
244 fn update_meta(&mut self, meta: &Self::Meta);
245
246 fn instantiate(
247 &self,
248 env: &mut Self::InstantiateEnv,
249 meta: &mut Self::Meta,
250 graph: DfirGraph,
251 extra_stmts: Vec<syn::Stmt>,
252 );
253}
254
255pub trait RegisterPort<'a, D: Deploy<'a> + ?Sized>: Clone {
256 fn register(&self, key: usize, port: D::Port);
257 fn raw_port(&self, key: usize) -> D::ExternalRawPort;
258
259 fn as_bytes_sink(
260 &self,
261 key: usize,
262 ) -> impl Future<Output = Pin<Box<dyn Sink<Bytes, Error = Error>>>> + 'a;
263
264 fn as_bincode_sink<T: Serialize + 'static>(
265 &self,
266 key: usize,
267 ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = Error>>>> + 'a;
268
269 fn as_bytes_source(
270 &self,
271 key: usize,
272 ) -> impl Future<Output = Pin<Box<dyn Stream<Item = Bytes>>>> + 'a;
273
274 fn as_bincode_source<T: DeserializeOwned + 'static>(
275 &self,
276 key: usize,
277 ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a;
278}