hydro_lang/deploy/
mod.rs

1use std::future::Future;
2use std::io::Error;
3use std::pin::Pin;
4
5use dfir_lang::graph::DfirGraph;
6use dfir_rs::bytes::Bytes;
7use dfir_rs::futures::{Sink, Stream};
8use serde::Serialize;
9use serde::de::DeserializeOwned;
10use stageleft::QuotedWithContext;
11
12pub mod macro_runtime;
13pub use macro_runtime::*;
14
15#[cfg(feature = "deploy")]
16#[cfg(stageleft_runtime)]
17pub(crate) mod trybuild;
18
19#[cfg(feature = "deploy")]
20#[cfg(stageleft_runtime)]
21mod trybuild_rewriters;
22
23#[cfg(feature = "deploy")]
24#[cfg(stageleft_runtime)]
25pub use trybuild::init_test;
26
27#[cfg(feature = "deploy")]
28#[cfg(stageleft_runtime)]
29pub mod deploy_graph;
30
31#[cfg(feature = "deploy")]
32#[cfg(stageleft_runtime)]
33pub use deploy_graph::*;
34
35pub mod in_memory_graph;
36pub use in_memory_graph::*;
37
38pub trait LocalDeploy<'a> {
39    type Process: Node<Meta = Self::Meta>;
40    type Cluster: Node<Meta = Self::Meta>;
41    type ExternalProcess: Node<Meta = Self::Meta>;
42    type Meta: Default;
43    type GraphId;
44
45    fn has_trivial_node() -> bool {
46        false
47    }
48
49    fn trivial_process(_id: usize) -> Self::Process {
50        panic!("No trivial process")
51    }
52
53    fn trivial_cluster(_id: usize) -> Self::Cluster {
54        panic!("No trivial cluster")
55    }
56
57    fn trivial_external(_id: usize) -> Self::ExternalProcess {
58        panic!("No trivial external")
59    }
60}
61
62pub trait Deploy<'a> {
63    type InstantiateEnv;
64    type CompileEnv;
65
66    type Process: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv> + Clone;
67    type Cluster: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv> + Clone;
68    type ExternalProcess: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv>
69        + RegisterPort<'a, Self>;
70    type Port: Clone;
71    type ExternalRawPort;
72    type Meta: Default;
73
74    /// Type of ID used to switch between different subgraphs at runtime.
75    type GraphId;
76
77    fn has_trivial_node() -> bool {
78        false
79    }
80
81    fn trivial_process(_id: usize) -> Self::Process {
82        panic!("No trivial process")
83    }
84
85    fn trivial_cluster(_id: usize) -> Self::Cluster {
86        panic!("No trivial cluster")
87    }
88
89    fn allocate_process_port(process: &Self::Process) -> Self::Port;
90    fn allocate_cluster_port(cluster: &Self::Cluster) -> Self::Port;
91    fn allocate_external_port(external: &Self::ExternalProcess) -> Self::Port;
92
93    fn o2o_sink_source(
94        compile_env: &Self::CompileEnv,
95        p1: &Self::Process,
96        p1_port: &Self::Port,
97        p2: &Self::Process,
98        p2_port: &Self::Port,
99    ) -> (syn::Expr, syn::Expr);
100    fn o2o_connect(
101        p1: &Self::Process,
102        p1_port: &Self::Port,
103        p2: &Self::Process,
104        p2_port: &Self::Port,
105    ) -> Box<dyn FnOnce()>;
106
107    fn o2m_sink_source(
108        compile_env: &Self::CompileEnv,
109        p1: &Self::Process,
110        p1_port: &Self::Port,
111        c2: &Self::Cluster,
112        c2_port: &Self::Port,
113    ) -> (syn::Expr, syn::Expr);
114    fn o2m_connect(
115        p1: &Self::Process,
116        p1_port: &Self::Port,
117        c2: &Self::Cluster,
118        c2_port: &Self::Port,
119    ) -> Box<dyn FnOnce()>;
120
121    fn m2o_sink_source(
122        compile_env: &Self::CompileEnv,
123        c1: &Self::Cluster,
124        c1_port: &Self::Port,
125        p2: &Self::Process,
126        p2_port: &Self::Port,
127    ) -> (syn::Expr, syn::Expr);
128    fn m2o_connect(
129        c1: &Self::Cluster,
130        c1_port: &Self::Port,
131        p2: &Self::Process,
132        p2_port: &Self::Port,
133    ) -> Box<dyn FnOnce()>;
134
135    fn m2m_sink_source(
136        compile_env: &Self::CompileEnv,
137        c1: &Self::Cluster,
138        c1_port: &Self::Port,
139        c2: &Self::Cluster,
140        c2_port: &Self::Port,
141    ) -> (syn::Expr, syn::Expr);
142    fn m2m_connect(
143        c1: &Self::Cluster,
144        c1_port: &Self::Port,
145        c2: &Self::Cluster,
146        c2_port: &Self::Port,
147    ) -> Box<dyn FnOnce()>;
148
149    fn e2o_source(
150        compile_env: &Self::CompileEnv,
151        p1: &Self::ExternalProcess,
152        p1_port: &Self::Port,
153        p2: &Self::Process,
154        p2_port: &Self::Port,
155    ) -> syn::Expr;
156    fn e2o_connect(
157        p1: &Self::ExternalProcess,
158        p1_port: &Self::Port,
159        p2: &Self::Process,
160        p2_port: &Self::Port,
161    ) -> Box<dyn FnOnce()>;
162
163    fn o2e_sink(
164        compile_env: &Self::CompileEnv,
165        p1: &Self::Process,
166        p1_port: &Self::Port,
167        p2: &Self::ExternalProcess,
168        p2_port: &Self::Port,
169    ) -> syn::Expr;
170    fn o2e_connect(
171        p1: &Self::Process,
172        p1_port: &Self::Port,
173        p2: &Self::ExternalProcess,
174        p2_port: &Self::Port,
175    ) -> Box<dyn FnOnce()>;
176
177    fn cluster_ids(
178        env: &Self::CompileEnv,
179        of_cluster: usize,
180    ) -> impl QuotedWithContext<'a, &'a [u32], ()> + Copy + 'a;
181    fn cluster_self_id(env: &Self::CompileEnv) -> impl QuotedWithContext<'a, u32, ()> + Copy + 'a;
182}
183
184impl<
185    'a,
186    T: Deploy<'a, Process = N, Cluster = C, ExternalProcess = E, Meta = M, GraphId = R>,
187    N: Node<Meta = M>,
188    C: Node<Meta = M>,
189    E: Node<Meta = M>,
190    M: Default,
191    R,
192> LocalDeploy<'a> for T
193{
194    type Process = N;
195    type Cluster = C;
196    type ExternalProcess = E;
197    type Meta = M;
198    type GraphId = R;
199
200    fn has_trivial_node() -> bool {
201        <T as Deploy<'a>>::has_trivial_node()
202    }
203
204    fn trivial_process(id: usize) -> Self::Process {
205        <T as Deploy<'a>>::trivial_process(id)
206    }
207
208    fn trivial_cluster(id: usize) -> Self::Cluster {
209        <T as Deploy<'a>>::trivial_cluster(id)
210    }
211}
212
213pub trait ProcessSpec<'a, D: LocalDeploy<'a> + ?Sized> {
214    fn build(self, id: usize, name_hint: &str) -> D::Process;
215}
216
217pub trait IntoProcessSpec<'a, D: LocalDeploy<'a> + ?Sized> {
218    type ProcessSpec: ProcessSpec<'a, D>;
219    fn into_process_spec(self) -> Self::ProcessSpec;
220}
221
222impl<'a, D: LocalDeploy<'a> + ?Sized, T: ProcessSpec<'a, D>> IntoProcessSpec<'a, D> for T {
223    type ProcessSpec = T;
224    fn into_process_spec(self) -> Self::ProcessSpec {
225        self
226    }
227}
228
229pub trait ClusterSpec<'a, D: LocalDeploy<'a> + ?Sized> {
230    fn build(self, id: usize, name_hint: &str) -> D::Cluster;
231}
232
233pub trait ExternalSpec<'a, D: LocalDeploy<'a> + ?Sized> {
234    fn build(self, id: usize, name_hint: &str) -> D::ExternalProcess;
235}
236
237pub trait Node {
238    type Port;
239    type Meta;
240    type InstantiateEnv;
241
242    fn next_port(&self) -> Self::Port;
243
244    fn update_meta(&mut self, meta: &Self::Meta);
245
246    fn instantiate(
247        &self,
248        env: &mut Self::InstantiateEnv,
249        meta: &mut Self::Meta,
250        graph: DfirGraph,
251        extra_stmts: Vec<syn::Stmt>,
252    );
253}
254
255pub trait RegisterPort<'a, D: Deploy<'a> + ?Sized>: Clone {
256    fn register(&self, key: usize, port: D::Port);
257    fn raw_port(&self, key: usize) -> D::ExternalRawPort;
258
259    fn as_bytes_sink(
260        &self,
261        key: usize,
262    ) -> impl Future<Output = Pin<Box<dyn Sink<Bytes, Error = Error>>>> + 'a;
263
264    fn as_bincode_sink<T: Serialize + 'static>(
265        &self,
266        key: usize,
267    ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = Error>>>> + 'a;
268
269    fn as_bytes_source(
270        &self,
271        key: usize,
272    ) -> impl Future<Output = Pin<Box<dyn Stream<Item = Bytes>>>> + 'a;
273
274    fn as_bincode_source<T: DeserializeOwned + 'static>(
275        &self,
276        key: usize,
277    ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a;
278}