Skip to main content

hydro_lang/live_collections/stream/
mod.rs

1//! Definitions for the [`Stream`] live collection.
2
3use std::cell::RefCell;
4use std::future::Future;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, QuotedWithContextWithProps, q, quote_type};
11use tokio::time::Instant;
12
13use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
14use super::keyed_singleton::KeyedSingleton;
15use super::keyed_stream::{Generate, KeyedStream};
16use super::optional::Optional;
17use super::singleton::Singleton;
18use crate::compile::builder::{CycleId, FlowState};
19use crate::compile::ir::{
20    CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, SharedNode, StreamOrder, StreamRetry,
21};
22#[cfg(stageleft_runtime)]
23use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
24use crate::forward_handle::{ForwardRef, TickCycle};
25use crate::live_collections::batch_atomic::BatchAtomic;
26use crate::live_collections::singleton::SingletonBound;
27#[cfg(stageleft_runtime)]
28use crate::location::dynamic::{DynLocation, LocationId};
29use crate::location::tick::{Atomic, DeferTick};
30use crate::location::{Location, Tick, TopLevel, check_matching_location};
31use crate::manual_expr::ManualExpr;
32use crate::nondet::{NonDet, nondet};
33use crate::prelude::manual_proof;
34use crate::properties::{
35    AggFuncAlgebra, ApplyMonotoneStream, ValidCommutativityFor, ValidIdempotenceFor,
36};
37
38pub mod networking;
39
40/// A trait implemented by valid ordering markers ([`TotalOrder`] and [`NoOrder`]).
41#[sealed::sealed]
42pub trait Ordering:
43    MinOrder<Self, Min = Self> + MinOrder<TotalOrder, Min = Self> + MinOrder<NoOrder, Min = NoOrder>
44{
45    /// The [`StreamOrder`] corresponding to this type.
46    const ORDERING_KIND: StreamOrder;
47}
48
49/// Marks the stream as being totally ordered, which means that there are
50/// no sources of non-determinism (other than intentional ones) that will
51/// affect the order of elements.
52pub enum TotalOrder {}
53
54#[sealed::sealed]
55impl Ordering for TotalOrder {
56    const ORDERING_KIND: StreamOrder = StreamOrder::TotalOrder;
57}
58
59/// Marks the stream as having no order, which means that the order of
60/// elements may be affected by non-determinism.
61///
62/// This restricts certain operators, such as `fold` and `reduce`, to only
63/// be used with commutative aggregation functions.
64pub enum NoOrder {}
65
66#[sealed::sealed]
67impl Ordering for NoOrder {
68    const ORDERING_KIND: StreamOrder = StreamOrder::NoOrder;
69}
70
71/// Marker trait for an [`Ordering`] that is available when `Self` is a weaker guarantee than
72/// `Other`, which means that a stream with `Other` guarantees can be safely converted to
73/// have `Self` guarantees instead.
74#[sealed::sealed]
75pub trait WeakerOrderingThan<Other: ?Sized>: Ordering {}
76#[sealed::sealed]
77impl<O: Ordering, O2: Ordering> WeakerOrderingThan<O2> for O where O: MinOrder<O2, Min = O> {}
78
79/// Helper trait for determining the weakest of two orderings.
80#[sealed::sealed]
81pub trait MinOrder<Other: ?Sized> {
82    /// The weaker of the two orderings.
83    type Min: Ordering;
84}
85
86#[sealed::sealed]
87impl<O: Ordering> MinOrder<O> for TotalOrder {
88    type Min = O;
89}
90
91#[sealed::sealed]
92impl<O: Ordering> MinOrder<O> for NoOrder {
93    type Min = NoOrder;
94}
95
96/// A trait implemented by valid retries markers ([`ExactlyOnce`] and [`AtLeastOnce`]).
97#[sealed::sealed]
98pub trait Retries:
99    MinRetries<Self, Min = Self>
100    + MinRetries<ExactlyOnce, Min = Self>
101    + MinRetries<AtLeastOnce, Min = AtLeastOnce>
102{
103    /// The [`StreamRetry`] corresponding to this type.
104    const RETRIES_KIND: StreamRetry;
105}
106
107/// Marks the stream as having deterministic message cardinality, with no
108/// possibility of duplicates.
109pub enum ExactlyOnce {}
110
111#[sealed::sealed]
112impl Retries for ExactlyOnce {
113    const RETRIES_KIND: StreamRetry = StreamRetry::ExactlyOnce;
114}
115
116/// Marks the stream as having non-deterministic message cardinality, which
117/// means that duplicates may occur, but messages will not be dropped.
118pub enum AtLeastOnce {}
119
120#[sealed::sealed]
121impl Retries for AtLeastOnce {
122    const RETRIES_KIND: StreamRetry = StreamRetry::AtLeastOnce;
123}
124
125/// Marker trait for a [`Retries`] that is available when `Self` is a weaker guarantee than
126/// `Other`, which means that a stream with `Other` guarantees can be safely converted to
127/// have `Self` guarantees instead.
128#[sealed::sealed]
129pub trait WeakerRetryThan<Other: ?Sized>: Retries {}
130#[sealed::sealed]
131impl<R: Retries, R2: Retries> WeakerRetryThan<R2> for R where R: MinRetries<R2, Min = R> {}
132
133/// Helper trait for determining the weakest of two retry guarantees.
134#[sealed::sealed]
135pub trait MinRetries<Other: ?Sized> {
136    /// The weaker of the two retry guarantees.
137    type Min: Retries + WeakerRetryThan<Self> + WeakerRetryThan<Other>;
138}
139
140#[sealed::sealed]
141impl<R: Retries> MinRetries<R> for ExactlyOnce {
142    type Min = R;
143}
144
145#[sealed::sealed]
146impl<R: Retries> MinRetries<R> for AtLeastOnce {
147    type Min = AtLeastOnce;
148}
149
150#[sealed::sealed]
151#[diagnostic::on_unimplemented(
152    message = "The input stream must be totally-ordered (`TotalOrder`), but has order `{Self}`. Strengthen the order upstream or consider a different API.",
153    label = "required here",
154    note = "To intentionally process the stream by observing a non-deterministic (shuffled) order of elements, use `.assume_ordering`. This introduces non-determinism so avoid unless necessary."
155)]
156/// Marker trait that is implemented for the [`TotalOrder`] ordering guarantee.
157pub trait IsOrdered: Ordering {}
158
159#[sealed::sealed]
160#[diagnostic::do_not_recommend]
161impl IsOrdered for TotalOrder {}
162
163#[sealed::sealed]
164#[diagnostic::on_unimplemented(
165    message = "The input stream must be exactly-once (`ExactlyOnce`), but has retries `{Self}`. Strengthen the retries guarantee upstream or consider a different API.",
166    label = "required here",
167    note = "To intentionally process the stream by observing non-deterministic (randomly duplicated) retries, use `.assume_retries`. This introduces non-determinism so avoid unless necessary."
168)]
169/// Marker trait that is implemented for the [`ExactlyOnce`] retries guarantee.
170pub trait IsExactlyOnce: Retries {}
171
172#[sealed::sealed]
173#[diagnostic::do_not_recommend]
174impl IsExactlyOnce for ExactlyOnce {}
175
176/// Streaming sequence of elements with type `Type`.
177///
178/// This live collection represents a growing sequence of elements, with new elements being
179/// asynchronously appended to the end of the sequence. This can be used to model the arrival
180/// of network input, such as API requests, or streaming ingestion.
181///
182/// By default, all streams have deterministic ordering and each element is materialized exactly
183/// once. But streams can also capture non-determinism via the `Order` and `Retries` type
184/// parameters. When the ordering / retries guarantee is relaxed, fewer APIs will be available
185/// on the stream. For example, if the stream is unordered, you cannot invoke [`Stream::first`].
186///
187/// Type Parameters:
188/// - `Type`: the type of elements in the stream
189/// - `Loc`: the location where the stream is being materialized
190/// - `Bound`: the boundedness of the stream, which is either [`Bounded`] or [`Unbounded`]
191/// - `Order`: the ordering of the stream, which is either [`TotalOrder`] or [`NoOrder`]
192///   (default is [`TotalOrder`])
193/// - `Retries`: the retry guarantee of the stream, which is either [`ExactlyOnce`] or
194///   [`AtLeastOnce`] (default is [`ExactlyOnce`])
195pub struct Stream<
196    Type,
197    Loc,
198    Bound: Boundedness = Unbounded,
199    Order: Ordering = TotalOrder,
200    Retry: Retries = ExactlyOnce,
201> {
202    pub(crate) location: Loc,
203    pub(crate) ir_node: RefCell<HydroNode>,
204    pub(crate) flow_state: FlowState,
205
206    _phantom: PhantomData<(Type, Loc, Bound, Order, Retry)>,
207}
208
209impl<T, L, B: Boundedness, O: Ordering, R: Retries> Drop for Stream<T, L, B, O, R> {
210    fn drop(&mut self) {
211        let ir_node = self.ir_node.replace(HydroNode::Placeholder);
212        if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
213            self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
214                input: Box::new(ir_node),
215                op_metadata: HydroIrOpMetadata::new(),
216            });
217        }
218    }
219}
220
221impl<'a, T, L, O: Ordering, R: Retries> From<Stream<T, L, Bounded, O, R>>
222    for Stream<T, L, Unbounded, O, R>
223where
224    L: Location<'a>,
225{
226    fn from(stream: Stream<T, L, Bounded, O, R>) -> Stream<T, L, Unbounded, O, R> {
227        let new_meta = stream
228            .location
229            .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind());
230
231        Stream {
232            location: stream.location.clone(),
233            flow_state: stream.flow_state.clone(),
234            ir_node: RefCell::new(HydroNode::Cast {
235                inner: Box::new(stream.ir_node.replace(HydroNode::Placeholder)),
236                metadata: new_meta,
237            }),
238            _phantom: PhantomData,
239        }
240    }
241}
242
243impl<'a, T, L, B: Boundedness, R: Retries> From<Stream<T, L, B, TotalOrder, R>>
244    for Stream<T, L, B, NoOrder, R>
245where
246    L: Location<'a>,
247{
248    fn from(stream: Stream<T, L, B, TotalOrder, R>) -> Stream<T, L, B, NoOrder, R> {
249        stream.weaken_ordering()
250    }
251}
252
253impl<'a, T, L, B: Boundedness, O: Ordering> From<Stream<T, L, B, O, ExactlyOnce>>
254    for Stream<T, L, B, O, AtLeastOnce>
255where
256    L: Location<'a>,
257{
258    fn from(stream: Stream<T, L, B, O, ExactlyOnce>) -> Stream<T, L, B, O, AtLeastOnce> {
259        stream.weaken_retries()
260    }
261}
262
263impl<'a, T, L, O: Ordering, R: Retries> DeferTick for Stream<T, Tick<L>, Bounded, O, R>
264where
265    L: Location<'a>,
266{
267    fn defer_tick(self) -> Self {
268        Stream::defer_tick(self)
269    }
270}
271
272impl<'a, T, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
273    for Stream<T, Tick<L>, Bounded, O, R>
274where
275    L: Location<'a>,
276{
277    type Location = Tick<L>;
278
279    fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
280        Stream::new(
281            location.clone(),
282            HydroNode::CycleSource {
283                cycle_id,
284                metadata: location.new_node_metadata(Self::collection_kind()),
285            },
286        )
287    }
288}
289
290impl<'a, T, L, O: Ordering, R: Retries> CycleCollectionWithInitial<'a, TickCycle>
291    for Stream<T, Tick<L>, Bounded, O, R>
292where
293    L: Location<'a>,
294{
295    type Location = Tick<L>;
296
297    fn location(&self) -> &Self::Location {
298        self.location()
299    }
300
301    fn create_source_with_initial(cycle_id: CycleId, initial: Self, location: Tick<L>) -> Self {
302        let from_previous_tick: Stream<T, Tick<L>, Bounded, O, R> = Stream::new(
303            location.clone(),
304            HydroNode::DeferTick {
305                input: Box::new(HydroNode::CycleSource {
306                    cycle_id,
307                    metadata: location.new_node_metadata(Self::collection_kind()),
308                }),
309                metadata: location.new_node_metadata(Self::collection_kind()),
310            },
311        );
312
313        from_previous_tick.chain(initial.filter_if(location.optional_first_tick(q!(())).is_some()))
314    }
315}
316
317impl<'a, T, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
318    for Stream<T, Tick<L>, Bounded, O, R>
319where
320    L: Location<'a>,
321{
322    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
323        assert_eq!(
324            Location::id(&self.location),
325            expected_location,
326            "locations do not match"
327        );
328        self.location
329            .flow_state()
330            .borrow_mut()
331            .push_root(HydroRoot::CycleSink {
332                cycle_id,
333                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
334                op_metadata: HydroIrOpMetadata::new(),
335            });
336    }
337}
338
339impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
340    for Stream<T, L, B, O, R>
341where
342    L: Location<'a>,
343{
344    type Location = L;
345
346    fn create_source(cycle_id: CycleId, location: L) -> Self {
347        Stream::new(
348            location.clone(),
349            HydroNode::CycleSource {
350                cycle_id,
351                metadata: location.new_node_metadata(Self::collection_kind()),
352            },
353        )
354    }
355}
356
357impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
358    for Stream<T, L, B, O, R>
359where
360    L: Location<'a>,
361{
362    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
363        assert_eq!(
364            Location::id(&self.location),
365            expected_location,
366            "locations do not match"
367        );
368        self.location
369            .flow_state()
370            .borrow_mut()
371            .push_root(HydroRoot::CycleSink {
372                cycle_id,
373                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
374                op_metadata: HydroIrOpMetadata::new(),
375            });
376    }
377}
378
379impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Clone for Stream<T, L, B, O, R>
380where
381    T: Clone,
382    L: Location<'a>,
383{
384    fn clone(&self) -> Self {
385        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
386            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
387            *self.ir_node.borrow_mut() = HydroNode::Tee {
388                inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
389                metadata: self.location.new_node_metadata(Self::collection_kind()),
390            };
391        }
392
393        let HydroNode::Tee { inner, metadata } = &*self.ir_node.borrow() else {
394            unreachable!()
395        };
396        Stream {
397            location: self.location.clone(),
398            flow_state: self.flow_state.clone(),
399            ir_node: HydroNode::Tee {
400                inner: SharedNode(inner.0.clone()),
401                metadata: metadata.clone(),
402            }
403            .into(),
404            _phantom: PhantomData,
405        }
406    }
407}
408
409impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
410where
411    L: Location<'a>,
412{
413    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
414        debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
415        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
416
417        let flow_state = location.flow_state().clone();
418        Stream {
419            location,
420            flow_state,
421            ir_node: RefCell::new(ir_node),
422            _phantom: PhantomData,
423        }
424    }
425
426    /// Returns the [`Location`] where this stream is being materialized.
427    pub fn location(&self) -> &L {
428        &self.location
429    }
430
431    /// Weakens the consistency of this live collection to not guarantee any consistency across
432    /// cluster members (if this collection is on a cluster).
433    pub fn weaken_consistency(self) -> Stream<T, L::DropConsistency, B, O, R>
434    where
435        L: Location<'a>,
436    {
437        if L::consistency()
438            .is_none_or(|c| c == crate::location::dynamic::ClusterConsistency::NoConsistency)
439        {
440            // already no consistency
441            Stream::new(
442                self.location.drop_consistency(),
443                self.ir_node.replace(HydroNode::Placeholder),
444            )
445        } else {
446            Stream::new(
447                self.location.drop_consistency(),
448                HydroNode::Cast {
449                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
450                    metadata: self.location.drop_consistency().new_node_metadata(Stream::<
451                        T,
452                        L::DropConsistency,
453                        B,
454                        O,
455                        R,
456                    >::collection_kind(
457                    )),
458                },
459            )
460        }
461    }
462
463    /// Casts this live collection to have the consistency guarantees specified in the given
464    /// location type parameter. The developer must ensure that the strengthened consistency
465    /// is actually guaranteed, via the proof field (see [`crate::prelude::manual_proof`]).
466    pub fn assert_has_consistency_of<L2: Location<'a, DropConsistency = L::DropConsistency>>(
467        self,
468        _proof: impl crate::properties::ConsistencyProof,
469    ) -> Stream<T, L2, B, O, R>
470    where
471        L: Location<'a>,
472    {
473        if L::consistency() == L2::consistency() {
474            Stream::new(
475                self.location.with_consistency_of(),
476                self.ir_node.replace(HydroNode::Placeholder),
477            )
478        } else {
479            Stream::new(
480                self.location.with_consistency_of(),
481                HydroNode::AssertIsConsistent {
482                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
483                    trusted: false,
484                    metadata: self
485                        .location
486                        .clone()
487                        .with_consistency_of::<L2>()
488                        .new_node_metadata(Stream::<T, L2, B, O, R>::collection_kind()),
489                },
490            )
491        }
492    }
493
494    pub(crate) fn assert_has_consistency_of_trusted<
495        L2: Location<'a, DropConsistency = L::DropConsistency>,
496    >(
497        self,
498        _proof: impl crate::properties::ConsistencyProof,
499    ) -> Stream<T, L2, B, O, R>
500    where
501        L: Location<'a>,
502    {
503        if L::consistency() == L2::consistency() {
504            Stream::new(
505                self.location.with_consistency_of(),
506                self.ir_node.replace(HydroNode::Placeholder),
507            )
508        } else {
509            Stream::new(
510                self.location.with_consistency_of(),
511                HydroNode::AssertIsConsistent {
512                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
513                    trusted: true,
514                    metadata: self
515                        .location
516                        .clone()
517                        .with_consistency_of::<L2>()
518                        .new_node_metadata(Stream::<T, L2, B, O, R>::collection_kind()),
519                },
520            )
521        }
522    }
523
524    pub(crate) fn collection_kind() -> CollectionKind {
525        CollectionKind::Stream {
526            bound: B::BOUND_KIND,
527            order: O::ORDERING_KIND,
528            retry: R::RETRIES_KIND,
529            element_type: quote_type::<T>().into(),
530        }
531    }
532
533    /// Produces a stream based on invoking `f` on each element.
534    /// If you do not want to modify the stream and instead only want to view
535    /// each item use [`Stream::inspect`] instead.
536    ///
537    /// # Example
538    /// ```rust
539    /// # #[cfg(feature = "deploy")] {
540    /// # use hydro_lang::prelude::*;
541    /// # use futures::StreamExt;
542    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
543    /// let words = process.source_iter(q!(vec!["hello", "world"]));
544    /// words.map(q!(|x| x.to_uppercase()))
545    /// # }, |mut stream| async move {
546    /// # for w in vec!["HELLO", "WORLD"] {
547    /// #     assert_eq!(stream.next().await.unwrap(), w);
548    /// # }
549    /// # }));
550    /// # }
551    /// ```
552    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
553    where
554        F: Fn(T) -> U + 'a,
555    {
556        let f = crate::singleton_ref::with_singleton_capture(|| {
557            f.splice_fn1_ctx(&self.location).into()
558        });
559        Stream::new(
560            self.location.clone(),
561            HydroNode::Map {
562                f,
563                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
564                metadata: self
565                    .location
566                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
567            },
568        )
569    }
570
571    /// For each item `i` in the input stream, transform `i` using `f` and then treat the
572    /// result as an [`Iterator`] to produce items one by one. The implementation for [`Iterator`]
573    /// for the output type `U` must produce items in a **deterministic** order.
574    ///
575    /// For example, `U` could be a `Vec`, but not a `HashSet`. If the order of the items in `U` is
576    /// not deterministic, use [`Stream::flat_map_unordered`] instead.
577    ///
578    /// # Example
579    /// ```rust
580    /// # #[cfg(feature = "deploy")] {
581    /// # use hydro_lang::prelude::*;
582    /// # use futures::StreamExt;
583    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
584    /// process
585    ///     .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
586    ///     .flat_map_ordered(q!(|x| x))
587    /// # }, |mut stream| async move {
588    /// // 1, 2, 3, 4
589    /// # for w in (1..5) {
590    /// #     assert_eq!(stream.next().await.unwrap(), w);
591    /// # }
592    /// # }));
593    /// # }
594    /// ```
595    pub fn flat_map_ordered<U, I, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
596    where
597        I: IntoIterator<Item = U>,
598        F: Fn(T) -> I + 'a,
599    {
600        let f = crate::singleton_ref::with_singleton_capture(|| {
601            f.splice_fn1_ctx(&self.location).into()
602        });
603        Stream::new(
604            self.location.clone(),
605            HydroNode::FlatMap {
606                f,
607                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
608                metadata: self
609                    .location
610                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
611            },
612        )
613    }
614
615    /// Like [`Stream::flat_map_ordered`], but allows the implementation of [`Iterator`]
616    /// for the output type `U` to produce items in any order.
617    ///
618    /// # Example
619    /// ```rust
620    /// # #[cfg(feature = "deploy")] {
621    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
622    /// # use futures::StreamExt;
623    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
624    /// process
625    ///     .source_iter(q!(vec![
626    ///         std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
627    ///         std::collections::HashSet::from_iter(vec![3, 4]),
628    ///     ]))
629    ///     .flat_map_unordered(q!(|x| x))
630    /// # }, |mut stream| async move {
631    /// // 1, 2, 3, 4, but in no particular order
632    /// # let mut results = Vec::new();
633    /// # for w in (1..5) {
634    /// #     results.push(stream.next().await.unwrap());
635    /// # }
636    /// # results.sort();
637    /// # assert_eq!(results, vec![1, 2, 3, 4]);
638    /// # }));
639    /// # }
640    /// ```
641    pub fn flat_map_unordered<U, I, F>(
642        self,
643        f: impl IntoQuotedMut<'a, F, L>,
644    ) -> Stream<U, L, B, NoOrder, R>
645    where
646        I: IntoIterator<Item = U>,
647        F: Fn(T) -> I + 'a,
648    {
649        let f = crate::singleton_ref::with_singleton_capture(|| {
650            f.splice_fn1_ctx(&self.location).into()
651        });
652        Stream::new(
653            self.location.clone(),
654            HydroNode::FlatMap {
655                f,
656                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
657                metadata: self
658                    .location
659                    .new_node_metadata(Stream::<U, L, B, NoOrder, R>::collection_kind()),
660            },
661        )
662    }
663
664    /// For each item `i` in the input stream, treat `i` as an [`Iterator`] and produce its items one by one.
665    /// The implementation for [`Iterator`] for the element type `T` must produce items in a **deterministic** order.
666    ///
667    /// For example, `T` could be a `Vec`, but not a `HashSet`. If the order of the items in `T` is
668    /// not deterministic, use [`Stream::flatten_unordered`] instead.
669    ///
670    /// ```rust
671    /// # #[cfg(feature = "deploy")] {
672    /// # use hydro_lang::prelude::*;
673    /// # use futures::StreamExt;
674    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
675    /// process
676    ///     .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
677    ///     .flatten_ordered()
678    /// # }, |mut stream| async move {
679    /// // 1, 2, 3, 4
680    /// # for w in (1..5) {
681    /// #     assert_eq!(stream.next().await.unwrap(), w);
682    /// # }
683    /// # }));
684    /// # }
685    /// ```
686    pub fn flatten_ordered<U>(self) -> Stream<U, L, B, O, R>
687    where
688        T: IntoIterator<Item = U>,
689    {
690        self.flat_map_ordered(q!(|d| d))
691    }
692
693    /// Like [`Stream::flatten_ordered`], but allows the implementation of [`Iterator`]
694    /// for the element type `T` to produce items in any order.
695    ///
696    /// # Example
697    /// ```rust
698    /// # #[cfg(feature = "deploy")] {
699    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
700    /// # use futures::StreamExt;
701    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
702    /// process
703    ///     .source_iter(q!(vec![
704    ///         std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
705    ///         std::collections::HashSet::from_iter(vec![3, 4]),
706    ///     ]))
707    ///     .flatten_unordered()
708    /// # }, |mut stream| async move {
709    /// // 1, 2, 3, 4, but in no particular order
710    /// # let mut results = Vec::new();
711    /// # for w in (1..5) {
712    /// #     results.push(stream.next().await.unwrap());
713    /// # }
714    /// # results.sort();
715    /// # assert_eq!(results, vec![1, 2, 3, 4]);
716    /// # }));
717    /// # }
718    /// ```
719    pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, R>
720    where
721        T: IntoIterator<Item = U>,
722    {
723        self.flat_map_unordered(q!(|d| d))
724    }
725
726    /// For each item in the input stream, apply `f` to produce a [`futures::stream::Stream`],
727    /// then emit the elements of that stream one by one. When the inner stream yields
728    /// `Pending`, this operator yields as well.
729    pub fn flat_map_stream_blocking<U, S, F>(
730        self,
731        f: impl IntoQuotedMut<'a, F, L>,
732    ) -> Stream<U, L, B, O, R>
733    where
734        S: futures::Stream<Item = U>,
735        F: Fn(T) -> S + 'a,
736    {
737        let f = f.splice_fn1_ctx(&self.location).into();
738        Stream::new(
739            self.location.clone(),
740            HydroNode::FlatMapStreamBlocking {
741                f,
742                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
743                metadata: self
744                    .location
745                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
746            },
747        )
748    }
749
750    /// For each item in the input stream, treat it as a [`futures::stream::Stream`] and
751    /// emit its elements one by one. When the inner stream yields `Pending`, this operator
752    /// yields as well.
753    pub fn flatten_stream_blocking<U>(self) -> Stream<U, L, B, O, R>
754    where
755        T: futures::Stream<Item = U>,
756    {
757        self.flat_map_stream_blocking(q!(|d| d))
758    }
759
760    /// Creates a stream containing only the elements of the input stream that satisfy a predicate
761    /// `f`, preserving the order of the elements.
762    ///
763    /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
764    /// not modify or take ownership of the values. If you need to modify the values while filtering
765    /// use [`Stream::filter_map`] instead.
766    ///
767    /// # Example
768    /// ```rust
769    /// # #[cfg(feature = "deploy")] {
770    /// # use hydro_lang::prelude::*;
771    /// # use futures::StreamExt;
772    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
773    /// process
774    ///     .source_iter(q!(vec![1, 2, 3, 4]))
775    ///     .filter(q!(|&x| x > 2))
776    /// # }, |mut stream| async move {
777    /// // 3, 4
778    /// # for w in (3..5) {
779    /// #     assert_eq!(stream.next().await.unwrap(), w);
780    /// # }
781    /// # }));
782    /// # }
783    /// ```
784    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
785    where
786        F: Fn(&T) -> bool + 'a,
787    {
788        let f = crate::singleton_ref::with_singleton_capture(|| {
789            f.splice_fn1_borrow_ctx(&self.location).into()
790        });
791        Stream::new(
792            self.location.clone(),
793            HydroNode::Filter {
794                f,
795                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
796                metadata: self.location.new_node_metadata(Self::collection_kind()),
797            },
798        )
799    }
800
801    /// Splits the stream into two streams based on a predicate, without cloning elements.
802    ///
803    /// Elements for which `f` returns `true` are sent to the first output stream,
804    /// and elements for which `f` returns `false` are sent to the second output stream.
805    ///
806    /// Unlike using `filter` twice, this only evaluates the predicate once per element
807    /// and does not require `T: Clone`.
808    ///
809    /// The closure `f` receives a reference `&T` rather than an owned value `T` because
810    /// the predicate is only used for routing; the element itself is moved to the
811    /// appropriate output stream.
812    ///
813    /// # Example
814    /// ```rust
815    /// # #[cfg(feature = "deploy")] {
816    /// # use hydro_lang::prelude::*;
817    /// # use hydro_lang::live_collections::stream::{NoOrder, ExactlyOnce};
818    /// # use futures::StreamExt;
819    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
820    /// let numbers: Stream<_, _, Unbounded> = process.source_iter(q!(vec![1, 2, 3, 4, 5, 6])).into();
821    /// let (evens, odds) = numbers.partition(q!(|&x| x % 2 == 0));
822    /// // evens: 2, 4, 6 tagged with true; odds: 1, 3, 5 tagged with false
823    /// evens.map(q!(|x| (x, true)))
824    ///     .merge_unordered(odds.map(q!(|x| (x, false))))
825    /// # }, |mut stream| async move {
826    /// # let mut results = Vec::new();
827    /// # for _ in 0..6 {
828    /// #     results.push(stream.next().await.unwrap());
829    /// # }
830    /// # results.sort();
831    /// # assert_eq!(results, vec![(1, false), (2, true), (3, false), (4, true), (5, false), (6, true)]);
832    /// # }));
833    /// # }
834    /// ```
835    #[expect(
836        clippy::type_complexity,
837        reason = "return type mirrors the input stream type"
838    )]
839    pub fn partition<F>(
840        self,
841        f: impl IntoQuotedMut<'a, F, L>,
842    ) -> (Stream<T, L, B, O, R>, Stream<T, L, B, O, R>)
843    where
844        F: Fn(&T) -> bool + 'a,
845    {
846        let f = crate::singleton_ref::with_singleton_capture(|| {
847            f.splice_fn1_borrow_ctx(&self.location).into()
848        });
849        let shared = SharedNode(Rc::new(RefCell::new(
850            self.ir_node.replace(HydroNode::Placeholder),
851        )));
852
853        let true_stream = Stream::new(
854            self.location.clone(),
855            HydroNode::Partition {
856                inner: SharedNode(shared.0.clone()),
857                f: f.clone(),
858                is_true: true,
859                metadata: self.location.new_node_metadata(Self::collection_kind()),
860            },
861        );
862
863        let false_stream = Stream::new(
864            self.location.clone(),
865            HydroNode::Partition {
866                inner: SharedNode(shared.0),
867                f,
868                is_true: false,
869                metadata: self.location.new_node_metadata(Self::collection_kind()),
870            },
871        );
872
873        (true_stream, false_stream)
874    }
875
876    /// An operator that both filters and maps. It yields only the items for which the supplied closure `f` returns `Some(value)`.
877    ///
878    /// # Example
879    /// ```rust
880    /// # #[cfg(feature = "deploy")] {
881    /// # use hydro_lang::prelude::*;
882    /// # use futures::StreamExt;
883    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
884    /// process
885    ///     .source_iter(q!(vec!["1", "hello", "world", "2"]))
886    ///     .filter_map(q!(|s| s.parse::<usize>().ok()))
887    /// # }, |mut stream| async move {
888    /// // 1, 2
889    /// # for w in (1..3) {
890    /// #     assert_eq!(stream.next().await.unwrap(), w);
891    /// # }
892    /// # }));
893    /// # }
894    /// ```
895    pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
896    where
897        F: Fn(T) -> Option<U> + 'a,
898    {
899        let f = f.splice_fn1_ctx(&self.location).into();
900        Stream::new(
901            self.location.clone(),
902            HydroNode::FilterMap {
903                f,
904                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
905                metadata: self
906                    .location
907                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
908            },
909        )
910    }
911
912    /// Generates a stream that maps each input element `i` to a tuple `(i, x)`,
913    /// where `x` is the final value of `other`, a bounded [`Singleton`] or [`Optional`].
914    /// If `other` is an empty [`Optional`], no values will be produced.
915    ///
916    /// # Example
917    /// ```rust
918    /// # #[cfg(feature = "deploy")] {
919    /// # use hydro_lang::prelude::*;
920    /// # use futures::StreamExt;
921    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
922    /// let tick = process.tick();
923    /// let batch = process
924    ///   .source_iter(q!(vec![1, 2, 3, 4]))
925    ///   .batch(&tick, nondet!(/** test */));
926    /// let count = batch.clone().count(); // `count()` returns a singleton
927    /// batch.cross_singleton(count).all_ticks()
928    /// # }, |mut stream| async move {
929    /// // (1, 4), (2, 4), (3, 4), (4, 4)
930    /// # for w in vec![(1, 4), (2, 4), (3, 4), (4, 4)] {
931    /// #     assert_eq!(stream.next().await.unwrap(), w);
932    /// # }
933    /// # }));
934    /// # }
935    /// ```
936    pub fn cross_singleton<O2>(
937        self,
938        other: impl Into<Optional<O2, L, Bounded>>,
939    ) -> Stream<(T, O2), L, B, O, R>
940    where
941        O2: Clone,
942    {
943        let other: Optional<O2, L, Bounded> = other.into();
944        check_matching_location(&self.location, &other.location);
945
946        Stream::new(
947            self.location.clone(),
948            HydroNode::CrossSingleton {
949                left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
950                right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
951                metadata: self
952                    .location
953                    .new_node_metadata(Stream::<(T, O2), L, B, O, R>::collection_kind()),
954            },
955        )
956    }
957
958    /// Passes this stream through if the boolean signal is `true`, otherwise the output is empty.
959    ///
960    /// # Example
961    /// ```rust
962    /// # #[cfg(feature = "deploy")] {
963    /// # use hydro_lang::prelude::*;
964    /// # use futures::StreamExt;
965    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
966    /// let tick = process.tick();
967    /// // ticks are lazy by default, forces the second tick to run
968    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
969    ///
970    /// let signal = tick.optional_first_tick(q!(())).is_some(); // true on tick 1, false on tick 2
971    /// let batch_first_tick = process
972    ///   .source_iter(q!(vec![1, 2, 3, 4]))
973    ///   .batch(&tick, nondet!(/** test */));
974    /// let batch_second_tick = process
975    ///   .source_iter(q!(vec![5, 6, 7, 8]))
976    ///   .batch(&tick, nondet!(/** test */))
977    ///   .defer_tick();
978    /// batch_first_tick.chain(batch_second_tick)
979    ///   .filter_if(signal)
980    ///   .all_ticks()
981    /// # }, |mut stream| async move {
982    /// // [1, 2, 3, 4]
983    /// # for w in vec![1, 2, 3, 4] {
984    /// #     assert_eq!(stream.next().await.unwrap(), w);
985    /// # }
986    /// # }));
987    /// # }
988    /// ```
989    pub fn filter_if(self, signal: Singleton<bool, L, Bounded>) -> Stream<T, L, B, O, R> {
990        self.cross_singleton(signal.filter(q!(|b| *b)))
991            .map(q!(|(d, _)| d))
992    }
993
994    /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is empty.
995    ///
996    /// Useful for gating the release of elements based on a condition, such as only processing requests if you are the
997    /// leader of a cluster.
998    ///
999    /// # Example
1000    /// ```rust
1001    /// # #[cfg(feature = "deploy")] {
1002    /// # use hydro_lang::prelude::*;
1003    /// # use futures::StreamExt;
1004    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1005    /// let tick = process.tick();
1006    /// // ticks are lazy by default, forces the second tick to run
1007    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1008    ///
1009    /// let batch_first_tick = process
1010    ///   .source_iter(q!(vec![1, 2, 3, 4]))
1011    ///   .batch(&tick, nondet!(/** test */));
1012    /// let batch_second_tick = process
1013    ///   .source_iter(q!(vec![5, 6, 7, 8]))
1014    ///   .batch(&tick, nondet!(/** test */))
1015    ///   .defer_tick(); // appears on the second tick
1016    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
1017    /// batch_first_tick.chain(batch_second_tick)
1018    ///   .filter_if_some(some_on_first_tick)
1019    ///   .all_ticks()
1020    /// # }, |mut stream| async move {
1021    /// // [1, 2, 3, 4]
1022    /// # for w in vec![1, 2, 3, 4] {
1023    /// #     assert_eq!(stream.next().await.unwrap(), w);
1024    /// # }
1025    /// # }));
1026    /// # }
1027    /// ```
1028    #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
1029    pub fn filter_if_some<U>(self, signal: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
1030        self.filter_if(signal.is_some())
1031    }
1032
1033    /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is empty.
1034    ///
1035    /// Useful for gating the release of elements based on a condition, such as triggering a protocol if you are missing
1036    /// some local state.
1037    ///
1038    /// # Example
1039    /// ```rust
1040    /// # #[cfg(feature = "deploy")] {
1041    /// # use hydro_lang::prelude::*;
1042    /// # use futures::StreamExt;
1043    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1044    /// let tick = process.tick();
1045    /// // ticks are lazy by default, forces the second tick to run
1046    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1047    ///
1048    /// let batch_first_tick = process
1049    ///   .source_iter(q!(vec![1, 2, 3, 4]))
1050    ///   .batch(&tick, nondet!(/** test */));
1051    /// let batch_second_tick = process
1052    ///   .source_iter(q!(vec![5, 6, 7, 8]))
1053    ///   .batch(&tick, nondet!(/** test */))
1054    ///   .defer_tick(); // appears on the second tick
1055    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
1056    /// batch_first_tick.chain(batch_second_tick)
1057    ///   .filter_if_none(some_on_first_tick)
1058    ///   .all_ticks()
1059    /// # }, |mut stream| async move {
1060    /// // [5, 6, 7, 8]
1061    /// # for w in vec![5, 6, 7, 8] {
1062    /// #     assert_eq!(stream.next().await.unwrap(), w);
1063    /// # }
1064    /// # }));
1065    /// # }
1066    /// ```
1067    #[deprecated(note = "use `filter_if` with `!Optional::is_some()` instead")]
1068    pub fn filter_if_none<U>(self, other: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
1069        self.filter_if(other.is_none())
1070    }
1071
1072    /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams,
1073    /// returning all tupled pairs.
1074    ///
1075    /// When the right side is [`Bounded`], it is accumulated first and the left side streams
1076    /// through, preserving the left side's ordering. When both sides are [`Unbounded`], a
1077    /// symmetric hash join is used and ordering is [`NoOrder`].
1078    ///
1079    /// # Example
1080    /// ```rust
1081    /// # #[cfg(feature = "deploy")] {
1082    /// # use hydro_lang::prelude::*;
1083    /// # use std::collections::HashSet;
1084    /// # use futures::StreamExt;
1085    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1086    /// let tick = process.tick();
1087    /// let stream1 = process.source_iter(q!(vec![1, 2]));
1088    /// let stream2 = process.source_iter(q!(vec!['a', 'b']));
1089    /// stream1.cross_product(stream2)
1090    /// # }, |mut stream| async move {
1091    /// // (1, 'a'), (1, 'b'), (2, 'a'), (2, 'b') in any order
1092    /// # let expected = HashSet::from([(1, 'a'), (1, 'b'), (2, 'a'), (2, 'b')]);
1093    /// # stream.map(|i| assert!(expected.contains(&i)));
1094    /// # }));
1095    /// # }
1096    #[expect(
1097        clippy::type_complexity,
1098        reason = "MinRetries projection in return type"
1099    )]
1100    pub fn cross_product<T2, B2: Boundedness, O2: Ordering, R2: Retries>(
1101        self,
1102        other: Stream<T2, L, B2, O2, R2>,
1103    ) -> Stream<(T, T2), L, B, B2::PreserveOrderIfBounded<O>, <R as MinRetries<R2>>::Min>
1104    where
1105        T: Clone,
1106        T2: Clone,
1107        R: MinRetries<R2>,
1108    {
1109        self.map(q!(|v| ((), v)))
1110            .join(other.map(q!(|v| ((), v))))
1111            .map(q!(|((), (v1, v2))| (v1, v2)))
1112    }
1113
1114    /// Takes one stream as input and filters out any duplicate occurrences. The output
1115    /// contains all unique values from the input.
1116    ///
1117    /// # Example
1118    /// ```rust
1119    /// # #[cfg(feature = "deploy")] {
1120    /// # use hydro_lang::prelude::*;
1121    /// # use futures::StreamExt;
1122    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1123    /// let tick = process.tick();
1124    /// process.source_iter(q!(vec![1, 2, 3, 2, 1, 4])).unique()
1125    /// # }, |mut stream| async move {
1126    /// # for w in vec![1, 2, 3, 4] {
1127    /// #     assert_eq!(stream.next().await.unwrap(), w);
1128    /// # }
1129    /// # }));
1130    /// # }
1131    /// ```
1132    pub fn unique(self) -> Stream<T, L, B, O, ExactlyOnce>
1133    where
1134        T: Eq + Hash,
1135    {
1136        Stream::new(
1137            self.location.clone(),
1138            HydroNode::Unique {
1139                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1140                metadata: self
1141                    .location
1142                    .new_node_metadata(Stream::<T, L, B, O, ExactlyOnce>::collection_kind()),
1143            },
1144        )
1145    }
1146
1147    /// Outputs everything in this stream that is *not* contained in the `other` stream.
1148    ///
1149    /// The `other` stream must be [`Bounded`], since this function will wait until
1150    /// all its elements are available before producing any output.
1151    /// # Example
1152    /// ```rust
1153    /// # #[cfg(feature = "deploy")] {
1154    /// # use hydro_lang::prelude::*;
1155    /// # use futures::StreamExt;
1156    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1157    /// let tick = process.tick();
1158    /// let stream = process
1159    ///   .source_iter(q!(vec![ 1, 2, 3, 4 ]))
1160    ///   .batch(&tick, nondet!(/** test */));
1161    /// let batch = process
1162    ///   .source_iter(q!(vec![1, 2]))
1163    ///   .batch(&tick, nondet!(/** test */));
1164    /// stream.filter_not_in(batch).all_ticks()
1165    /// # }, |mut stream| async move {
1166    /// # for w in vec![3, 4] {
1167    /// #     assert_eq!(stream.next().await.unwrap(), w);
1168    /// # }
1169    /// # }));
1170    /// # }
1171    /// ```
1172    pub fn filter_not_in<O2: Ordering, B2>(self, other: Stream<T, L, B2, O2, R>) -> Self
1173    where
1174        T: Eq + Hash,
1175        B2: IsBounded,
1176    {
1177        check_matching_location(&self.location, &other.location);
1178
1179        Stream::new(
1180            self.location.clone(),
1181            HydroNode::Difference {
1182                pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1183                neg: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
1184                metadata: self
1185                    .location
1186                    .new_node_metadata(Stream::<T, L, Bounded, O, R>::collection_kind()),
1187            },
1188        )
1189    }
1190
1191    /// An operator which allows you to "inspect" each element of a stream without
1192    /// modifying it. The closure `f` is called on a reference to each item. This is
1193    /// mainly useful for debugging, and should not be used to generate side-effects.
1194    ///
1195    /// # Example
1196    /// ```rust
1197    /// # #[cfg(feature = "deploy")] {
1198    /// # use hydro_lang::prelude::*;
1199    /// # use futures::StreamExt;
1200    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1201    /// let nums = process.source_iter(q!(vec![1, 2]));
1202    /// // prints "1 * 10 = 10" and "2 * 10 = 20"
1203    /// nums.inspect(q!(|x| println!("{} * 10 = {}", x, x * 10)))
1204    /// # }, |mut stream| async move {
1205    /// # for w in vec![1, 2] {
1206    /// #     assert_eq!(stream.next().await.unwrap(), w);
1207    /// # }
1208    /// # }));
1209    /// # }
1210    /// ```
1211    pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L::DropConsistency>) -> Self
1212    where
1213        F: Fn(&T) + 'a,
1214    {
1215        let f = crate::singleton_ref::with_singleton_capture(|| {
1216            f.splice_fn1_borrow_ctx(&self.location.drop_consistency())
1217                .into()
1218        });
1219
1220        Stream::new(
1221            self.location.clone(),
1222            HydroNode::Inspect {
1223                f,
1224                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1225                metadata: self.location.new_node_metadata(Self::collection_kind()),
1226            },
1227        )
1228    }
1229
1230    /// Executes the provided closure for every element in this stream.
1231    ///
1232    /// Because the closure may have side effects, the stream must have deterministic order
1233    /// ([`TotalOrder`]) and no retries ([`ExactlyOnce`]). If the side effects can tolerate
1234    /// out-of-order or duplicate execution, use [`Stream::assume_ordering`] and
1235    /// [`Stream::assume_retries`] with an explanation for why this is the case.
1236    pub fn for_each<F: Fn(T) + 'a>(self, f: impl IntoQuotedMut<'a, F, L>)
1237    where
1238        O: IsOrdered,
1239        R: IsExactlyOnce,
1240    {
1241        let f = f.splice_fn1_ctx(&self.location).into();
1242        self.location
1243            .flow_state()
1244            .borrow_mut()
1245            .push_root(HydroRoot::ForEach {
1246                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1247                f,
1248                op_metadata: HydroIrOpMetadata::new(),
1249            });
1250    }
1251
1252    /// Sends all elements of this stream to a provided [`futures::Sink`], such as an external
1253    /// TCP socket to some other server. You should _not_ use this API for interacting with
1254    /// external clients, instead see [`Location::bidi_external_many_bytes`] and
1255    /// [`Location::bidi_external_many_bincode`]. This should be used for custom, low-level
1256    /// interaction with asynchronous sinks.
1257    pub fn dest_sink<S>(self, sink: impl QuotedWithContext<'a, S, L>)
1258    where
1259        O: IsOrdered,
1260        R: IsExactlyOnce,
1261        S: 'a + futures::Sink<T> + Unpin,
1262    {
1263        self.location
1264            .flow_state()
1265            .borrow_mut()
1266            .push_root(HydroRoot::DestSink {
1267                sink: sink.splice_typed_ctx(&self.location).into(),
1268                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1269                op_metadata: HydroIrOpMetadata::new(),
1270            });
1271    }
1272
1273    /// Maps each element `x` of the stream to `(i, x)`, where `i` is the index of the element.
1274    ///
1275    /// # Example
1276    /// ```rust
1277    /// # #[cfg(feature = "deploy")] {
1278    /// # use hydro_lang::{prelude::*, live_collections::stream::{TotalOrder, ExactlyOnce}};
1279    /// # use futures::StreamExt;
1280    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, TotalOrder, ExactlyOnce>(|process| {
1281    /// let tick = process.tick();
1282    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1283    /// numbers.enumerate()
1284    /// # }, |mut stream| async move {
1285    /// // (0, 1), (1, 2), (2, 3), (3, 4)
1286    /// # for w in vec![(0, 1), (1, 2), (2, 3), (3, 4)] {
1287    /// #     assert_eq!(stream.next().await.unwrap(), w);
1288    /// # }
1289    /// # }));
1290    /// # }
1291    /// ```
1292    pub fn enumerate(self) -> Stream<(usize, T), L, B, O, R>
1293    where
1294        O: IsOrdered,
1295        R: IsExactlyOnce,
1296    {
1297        Stream::new(
1298            self.location.clone(),
1299            HydroNode::Enumerate {
1300                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1301                metadata: self.location.new_node_metadata(Stream::<
1302                    (usize, T),
1303                    L,
1304                    B,
1305                    TotalOrder,
1306                    ExactlyOnce,
1307                >::collection_kind()),
1308            },
1309        )
1310    }
1311
1312    /// Combines elements of the stream into a [`Singleton`], by starting with an intitial value,
1313    /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1314    /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1315    ///
1316    /// Depending on the input stream guarantees, the closure may need to be commutative
1317    /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1318    ///
1319    /// # Example
1320    /// ```rust
1321    /// # #[cfg(feature = "deploy")] {
1322    /// # use hydro_lang::prelude::*;
1323    /// # use futures::StreamExt;
1324    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1325    /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1326    /// words
1327    ///     .fold(q!(|| String::new()), q!(|acc, x| acc.push_str(x)))
1328    ///     .into_stream()
1329    /// # }, |mut stream| async move {
1330    /// // "HELLOWORLD"
1331    /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1332    /// # }));
1333    /// # }
1334    /// ```
1335    pub fn fold<A, I, F, C, Idemp, M, B2: SingletonBound>(
1336        self,
1337        init: impl IntoQuotedMut<'a, I, L>,
1338        comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp, M>>,
1339    ) -> Singleton<A, L, B2>
1340    where
1341        I: Fn() -> A + 'a,
1342        F: 'a + Fn(&mut A, T),
1343        C: ValidCommutativityFor<O> + crate::properties::IsProved,
1344        Idemp: ValidIdempotenceFor<R> + crate::properties::IsProved,
1345        B: ApplyMonotoneStream<M, B2>,
1346    {
1347        let init = init.splice_fn0_ctx(&self.location).into();
1348        let (comb, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1349        proof.register_proof(&comb);
1350
1351        // Only assume_retries (for idempotence), not assume_ordering.
1352        // The fold hook in the simulator handles ordering non-determinism directly.
1353        let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1354        let retried: Stream<T, L::DropConsistency, B, O, ExactlyOnce> = self.assume_retries(nondet);
1355
1356        let core = HydroNode::Fold {
1357            init,
1358            acc: comb.into(),
1359            input: Box::new(retried.ir_node.replace(HydroNode::Placeholder)),
1360            is_commutative: C::IS_PROVED,
1361            is_idempotent: Idemp::IS_PROVED,
1362            metadata: retried
1363                .location
1364                .new_node_metadata(Singleton::<A, L::DropConsistency, B2>::collection_kind()),
1365            // we do not guarantee consistency at this point because if the algebraic properties
1366            // do not hold in practice, replica consistency may fail to be maintained, so we
1367            // would like the simulator to assert consistency; in the future, this will be dynamic
1368            // based on the proof mechanism
1369        };
1370
1371        Singleton::new(retried.location.clone(), core)
1372            .assert_has_consistency_of(manual_proof!(/** algebraic properties */))
1373    }
1374
1375    /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1376    /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1377    /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1378    /// reference, so that it can be modified in place.
1379    ///
1380    /// Depending on the input stream guarantees, the closure may need to be commutative
1381    /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1382    ///
1383    /// # Example
1384    /// ```rust
1385    /// # #[cfg(feature = "deploy")] {
1386    /// # use hydro_lang::prelude::*;
1387    /// # use futures::StreamExt;
1388    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1389    /// let bools = process.source_iter(q!(vec![false, true, false]));
1390    /// bools.reduce(q!(|acc, x| *acc |= x)).into_stream()
1391    /// # }, |mut stream| async move {
1392    /// // true
1393    /// # assert_eq!(stream.next().await.unwrap(), true);
1394    /// # }));
1395    /// # }
1396    /// ```
1397    pub fn reduce<F, C, Idemp>(
1398        self,
1399        comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1400    ) -> Optional<T, L, B>
1401    where
1402        F: Fn(&mut T, T) + 'a,
1403        C: ValidCommutativityFor<O> + crate::properties::IsProved,
1404        Idemp: ValidIdempotenceFor<R> + crate::properties::IsProved,
1405    {
1406        let (f, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1407        proof.register_proof(&f);
1408
1409        let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1410        let ordered_etc: Stream<T, L::DropConsistency, B> =
1411            self.assume_retries(nondet).assume_ordering(nondet);
1412
1413        let core = HydroNode::Reduce {
1414            f: f.into(),
1415            input: Box::new(ordered_etc.ir_node.replace(HydroNode::Placeholder)),
1416            is_commutative: C::IS_PROVED,
1417            is_idempotent: Idemp::IS_PROVED,
1418            metadata: ordered_etc
1419                .location
1420                .new_node_metadata(Optional::<T, L::DropConsistency, B>::collection_kind()),
1421        };
1422
1423        Optional::new(ordered_etc.location.clone(), core)
1424            .assert_has_consistency_of(manual_proof!(/** algebraic properties */))
1425    }
1426
1427    /// Computes the maximum element in the stream as an [`Optional`], which
1428    /// will be empty until the first element in the input arrives.
1429    ///
1430    /// # Example
1431    /// ```rust
1432    /// # #[cfg(feature = "deploy")] {
1433    /// # use hydro_lang::prelude::*;
1434    /// # use futures::StreamExt;
1435    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1436    /// let tick = process.tick();
1437    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1438    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1439    /// batch.max().all_ticks()
1440    /// # }, |mut stream| async move {
1441    /// // 4
1442    /// # assert_eq!(stream.next().await.unwrap(), 4);
1443    /// # }));
1444    /// # }
1445    /// ```
1446    pub fn max(self) -> Optional<T, L, B>
1447    where
1448        T: Ord,
1449    {
1450        self.assume_retries_trusted::<ExactlyOnce>(nondet!(/** max is idempotent */))
1451            .assume_ordering_trusted_bounded::<TotalOrder>(
1452                nondet!(/** max is commutative, but order affects intermediates */),
1453            )
1454            .reduce(q!(|curr, new| {
1455                if new > *curr {
1456                    *curr = new;
1457                }
1458            }))
1459    }
1460
1461    /// Computes the minimum element in the stream as an [`Optional`], which
1462    /// will be empty until the first element in the input arrives.
1463    ///
1464    /// # Example
1465    /// ```rust
1466    /// # #[cfg(feature = "deploy")] {
1467    /// # use hydro_lang::prelude::*;
1468    /// # use futures::StreamExt;
1469    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1470    /// let tick = process.tick();
1471    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1472    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1473    /// batch.min().all_ticks()
1474    /// # }, |mut stream| async move {
1475    /// // 1
1476    /// # assert_eq!(stream.next().await.unwrap(), 1);
1477    /// # }));
1478    /// # }
1479    /// ```
1480    pub fn min(self) -> Optional<T, L, B>
1481    where
1482        T: Ord,
1483    {
1484        self.assume_retries_trusted::<ExactlyOnce>(nondet!(/** min is idempotent */))
1485            .assume_ordering_trusted_bounded::<TotalOrder>(
1486                nondet!(/** max is commutative, but order affects intermediates */),
1487            )
1488            .reduce(q!(|curr, new| {
1489                if new < *curr {
1490                    *curr = new;
1491                }
1492            }))
1493    }
1494
1495    /// Computes the first element in the stream as an [`Optional`], which
1496    /// will be empty until the first element in the input arrives.
1497    ///
1498    /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1499    /// re-ordering of elements may cause the first element to change.
1500    ///
1501    /// # Example
1502    /// ```rust
1503    /// # #[cfg(feature = "deploy")] {
1504    /// # use hydro_lang::prelude::*;
1505    /// # use futures::StreamExt;
1506    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1507    /// let tick = process.tick();
1508    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1509    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1510    /// batch.first().all_ticks()
1511    /// # }, |mut stream| async move {
1512    /// // 1
1513    /// # assert_eq!(stream.next().await.unwrap(), 1);
1514    /// # }));
1515    /// # }
1516    /// ```
1517    pub fn first(self) -> Optional<T, L, B>
1518    where
1519        O: IsOrdered,
1520    {
1521        self.make_totally_ordered()
1522            .assume_retries_trusted::<ExactlyOnce>(nondet!(/** first is idempotent */))
1523            .generator(q!(|| ()), q!(|_, item| Generate::Return(item)))
1524            .reduce(q!(|_, _| {}))
1525    }
1526
1527    /// Computes the last element in the stream as an [`Optional`], which
1528    /// will be empty until an element in the input arrives.
1529    ///
1530    /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1531    /// re-ordering of elements may cause the last element to change.
1532    ///
1533    /// # Example
1534    /// ```rust
1535    /// # #[cfg(feature = "deploy")] {
1536    /// # use hydro_lang::prelude::*;
1537    /// # use futures::StreamExt;
1538    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1539    /// let tick = process.tick();
1540    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1541    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1542    /// batch.last().all_ticks()
1543    /// # }, |mut stream| async move {
1544    /// // 4
1545    /// # assert_eq!(stream.next().await.unwrap(), 4);
1546    /// # }));
1547    /// # }
1548    /// ```
1549    pub fn last(self) -> Optional<T, L, B>
1550    where
1551        O: IsOrdered,
1552    {
1553        self.make_totally_ordered()
1554            .assume_retries_trusted::<ExactlyOnce>(nondet!(/** last is idempotent */))
1555            .reduce(q!(|curr, new| *curr = new))
1556    }
1557
1558    /// Returns a stream containing at most the first `n` elements of the input stream,
1559    /// preserving the original order. Similar to `LIMIT` in SQL.
1560    ///
1561    /// This requires the stream to have a [`TotalOrder`] guarantee and [`ExactlyOnce`]
1562    /// retries, since the result depends on the order and cardinality of elements.
1563    ///
1564    /// # Example
1565    /// ```rust
1566    /// # #[cfg(feature = "deploy")] {
1567    /// # use hydro_lang::prelude::*;
1568    /// # use futures::StreamExt;
1569    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1570    /// let numbers = process.source_iter(q!(vec![10, 20, 30, 40, 50]));
1571    /// numbers.limit(q!(3))
1572    /// # }, |mut stream| async move {
1573    /// // 10, 20, 30
1574    /// # for w in vec![10, 20, 30] {
1575    /// #     assert_eq!(stream.next().await.unwrap(), w);
1576    /// # }
1577    /// # }));
1578    /// # }
1579    /// ```
1580    pub fn limit(
1581        self,
1582        n: impl QuotedWithContext<'a, usize, L> + Copy + 'a,
1583    ) -> Stream<T, L, B, TotalOrder, ExactlyOnce>
1584    where
1585        O: IsOrdered,
1586        R: IsExactlyOnce,
1587    {
1588        self.generator(
1589            q!(|| 0usize),
1590            q!(move |count, item| {
1591                if *count == n {
1592                    Generate::Break
1593                } else {
1594                    *count += 1;
1595                    if *count == n {
1596                        Generate::Return(item)
1597                    } else {
1598                        Generate::Yield(item)
1599                    }
1600                }
1601            }),
1602        )
1603    }
1604
1605    /// Collects all the elements of this stream into a single [`Vec`] element.
1606    ///
1607    /// If the input stream is [`Unbounded`], the output [`Singleton`] will be [`Unbounded`] as
1608    /// well, which means that the value of the [`Vec`] will asynchronously grow as new elements
1609    /// are added. On such a value, you can use [`Singleton::snapshot`] to grab an instance of
1610    /// the vector at an arbitrary point in time.
1611    ///
1612    /// # Example
1613    /// ```rust
1614    /// # #[cfg(feature = "deploy")] {
1615    /// # use hydro_lang::prelude::*;
1616    /// # use futures::StreamExt;
1617    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1618    /// let tick = process.tick();
1619    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1620    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1621    /// batch.collect_vec().all_ticks() // emit each tick's Vec into an unbounded stream
1622    /// # }, |mut stream| async move {
1623    /// // [ vec![1, 2, 3, 4] ]
1624    /// # for w in vec![vec![1, 2, 3, 4]] {
1625    /// #     assert_eq!(stream.next().await.unwrap(), w);
1626    /// # }
1627    /// # }));
1628    /// # }
1629    /// ```
1630    pub fn collect_vec(self) -> Singleton<Vec<T>, L, B>
1631    where
1632        O: IsOrdered,
1633        R: IsExactlyOnce,
1634    {
1635        self.make_totally_ordered().make_exactly_once().fold(
1636            q!(|| vec![]),
1637            q!(|acc, v| {
1638                acc.push(v);
1639            }),
1640        )
1641    }
1642
1643    /// Applies a function to each element of the stream, maintaining an internal state (accumulator)
1644    /// and emitting each intermediate result.
1645    ///
1646    /// Unlike `fold` which only returns the final accumulated value, `scan` produces a new stream
1647    /// containing all intermediate accumulated values. The scan operation can also terminate early
1648    /// by returning `None`.
1649    ///
1650    /// The function takes a mutable reference to the accumulator and the current element, and returns
1651    /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1652    /// If the function returns `None`, the stream is terminated and no more elements are processed.
1653    ///
1654    /// # Examples
1655    ///
1656    /// Basic usage - running sum:
1657    /// ```rust
1658    /// # #[cfg(feature = "deploy")] {
1659    /// # use hydro_lang::prelude::*;
1660    /// # use futures::StreamExt;
1661    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1662    /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1663    ///     q!(|| 0),
1664    ///     q!(|acc, x| {
1665    ///         *acc += x;
1666    ///         Some(*acc)
1667    ///     }),
1668    /// )
1669    /// # }, |mut stream| async move {
1670    /// // Output: 1, 3, 6, 10
1671    /// # for w in vec![1, 3, 6, 10] {
1672    /// #     assert_eq!(stream.next().await.unwrap(), w);
1673    /// # }
1674    /// # }));
1675    /// # }
1676    /// ```
1677    ///
1678    /// Early termination example:
1679    /// ```rust
1680    /// # #[cfg(feature = "deploy")] {
1681    /// # use hydro_lang::prelude::*;
1682    /// # use futures::StreamExt;
1683    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1684    /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1685    ///     q!(|| 1),
1686    ///     q!(|state, x| {
1687    ///         *state = *state * x;
1688    ///         if *state > 6 {
1689    ///             None // Terminate the stream
1690    ///         } else {
1691    ///             Some(-*state)
1692    ///         }
1693    ///     }),
1694    /// )
1695    /// # }, |mut stream| async move {
1696    /// // Output: -1, -2, -6
1697    /// # for w in vec![-1, -2, -6] {
1698    /// #     assert_eq!(stream.next().await.unwrap(), w);
1699    /// # }
1700    /// # }));
1701    /// # }
1702    /// ```
1703    pub fn scan<A, U, I, F>(
1704        self,
1705        init: impl IntoQuotedMut<'a, I, L>,
1706        f: impl IntoQuotedMut<'a, F, L>,
1707    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1708    where
1709        O: IsOrdered,
1710        R: IsExactlyOnce,
1711        I: Fn() -> A + 'a,
1712        F: Fn(&mut A, T) -> Option<U> + 'a,
1713    {
1714        let init = init.splice_fn0_ctx(&self.location).into();
1715        let f = f.splice_fn2_borrow_mut_ctx(&self.location).into();
1716
1717        Stream::new(
1718            self.location.clone(),
1719            HydroNode::Scan {
1720                init,
1721                acc: f,
1722                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1723                metadata: self.location.new_node_metadata(
1724                    Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
1725                ),
1726            },
1727        )
1728    }
1729
1730    /// Async version of [`Stream::scan`]. Applies an async function to each element of the
1731    /// stream, maintaining an internal state (accumulator) and emitting the values returned
1732    /// by the function.
1733    ///
1734    /// The closure runs synchronously (so it can mutate the accumulator), then returns a
1735    /// future. The future is polled to completion. If it resolves to `Some`, the value is
1736    /// emitted. If it resolves to `None`, the item is filtered out.
1737    ///
1738    /// # Examples
1739    ///
1740    /// ```rust
1741    /// # #[cfg(feature = "deploy")] {
1742    /// # use hydro_lang::prelude::*;
1743    /// # use futures::StreamExt;
1744    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1745    /// process
1746    ///     .source_iter(q!(vec![1, 2, 3, 4]))
1747    ///     .scan_async_blocking(
1748    ///         q!(|| 0),
1749    ///         q!(|acc, x| {
1750    ///             *acc += x;
1751    ///             let val = *acc;
1752    ///             async move { Some(val) }
1753    ///         }),
1754    ///     )
1755    /// # }, |mut stream| async move {
1756    /// // Output: 1, 3, 6, 10
1757    /// # for w in vec![1, 3, 6, 10] {
1758    /// #     assert_eq!(stream.next().await.unwrap(), w);
1759    /// # }
1760    /// # }));
1761    /// # }
1762    /// ```
1763    pub fn scan_async_blocking<A, U, I, F, Fut>(
1764        self,
1765        init: impl IntoQuotedMut<'a, I, L>,
1766        f: impl IntoQuotedMut<'a, F, L>,
1767    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1768    where
1769        O: IsOrdered,
1770        R: IsExactlyOnce,
1771        I: Fn() -> A + 'a,
1772        F: Fn(&mut A, T) -> Fut + 'a,
1773        Fut: Future<Output = Option<U>> + 'a,
1774    {
1775        let init = init.splice_fn0_ctx(&self.location).into();
1776        let f = f.splice_fn2_borrow_mut_ctx(&self.location).into();
1777
1778        Stream::new(
1779            self.location.clone(),
1780            HydroNode::ScanAsyncBlocking {
1781                init,
1782                acc: f,
1783                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1784                metadata: self.location.new_node_metadata(
1785                    Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
1786                ),
1787            },
1788        )
1789    }
1790
1791    /// Iteratively processes the elements of the stream using a state machine that can yield
1792    /// elements as it processes its inputs. This is designed to mirror the unstable generator
1793    /// syntax in Rust, without requiring special syntax.
1794    ///
1795    /// Like [`Stream::scan`], this function takes in an initializer that emits the initial
1796    /// state. The second argument defines the processing logic, taking in a mutable reference
1797    /// to the state and the value to be processed. It emits a [`Generate`] value, whose
1798    /// variants define what is emitted and whether further inputs should be processed.
1799    ///
1800    /// # Example
1801    /// ```rust
1802    /// # #[cfg(feature = "deploy")] {
1803    /// # use hydro_lang::prelude::*;
1804    /// # use futures::StreamExt;
1805    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1806    /// process.source_iter(q!(vec![1, 3, 100, 10])).generator(
1807    ///     q!(|| 0),
1808    ///     q!(|acc, x| {
1809    ///         *acc += x;
1810    ///         if *acc > 100 {
1811    ///             hydro_lang::live_collections::keyed_stream::Generate::Return("done!".to_owned())
1812    ///         } else if *acc % 2 == 0 {
1813    ///             hydro_lang::live_collections::keyed_stream::Generate::Yield("even".to_owned())
1814    ///         } else {
1815    ///             hydro_lang::live_collections::keyed_stream::Generate::Continue
1816    ///         }
1817    ///     }),
1818    /// )
1819    /// # }, |mut stream| async move {
1820    /// // Output: "even", "done!"
1821    /// # let mut results = Vec::new();
1822    /// # for _ in 0..2 {
1823    /// #     results.push(stream.next().await.unwrap());
1824    /// # }
1825    /// # results.sort();
1826    /// # assert_eq!(results, vec!["done!".to_owned(), "even".to_owned()]);
1827    /// # }));
1828    /// # }
1829    /// ```
1830    pub fn generator<A, U, I, F>(
1831        self,
1832        init: impl IntoQuotedMut<'a, I, L> + Copy,
1833        f: impl IntoQuotedMut<'a, F, L> + Copy,
1834    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1835    where
1836        O: IsOrdered,
1837        R: IsExactlyOnce,
1838        I: Fn() -> A + 'a,
1839        F: Fn(&mut A, T) -> Generate<U> + 'a,
1840    {
1841        let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1842        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1843
1844        let this = self.make_totally_ordered().make_exactly_once();
1845
1846        // State is Option<Option<A>>:
1847        //   None = not yet initialized
1848        //   Some(Some(a)) = active with state a
1849        //   Some(None) = terminated
1850        let scan_init = q!(|| None)
1851            .splice_fn0_ctx::<Option<Option<A>>>(&this.location)
1852            .into();
1853        let scan_f = q!(move |state: &mut Option<Option<_>>, v| {
1854            if state.is_none() {
1855                *state = Some(Some(init()));
1856            }
1857            match state {
1858                Some(Some(state_value)) => match f(state_value, v) {
1859                    Generate::Yield(out) => Some(Some(out)),
1860                    Generate::Return(out) => {
1861                        *state = Some(None);
1862                        Some(Some(out))
1863                    }
1864                    // Unlike KeyedStream, we can terminate the scan directly on
1865                    // Break/Return because there is only one state (no other keys
1866                    // that still need processing).
1867                    Generate::Break => None,
1868                    Generate::Continue => Some(None),
1869                },
1870                // State is Some(None) after Return; terminate the scan.
1871                _ => None,
1872            }
1873        })
1874        .splice_fn2_borrow_mut_ctx::<Option<Option<A>>, T, _>(&this.location)
1875        .into();
1876
1877        let scan_node = HydroNode::Scan {
1878            init: scan_init,
1879            acc: scan_f,
1880            input: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
1881            metadata: this.location.new_node_metadata(Stream::<
1882                Option<U>,
1883                L,
1884                B,
1885                TotalOrder,
1886                ExactlyOnce,
1887            >::collection_kind()),
1888        };
1889
1890        let flatten_f = q!(|d| d)
1891            .splice_fn1_ctx::<Option<U>, _>(&this.location)
1892            .into();
1893        let flatten_node = HydroNode::FlatMap {
1894            f: flatten_f,
1895            input: Box::new(scan_node),
1896            metadata: this
1897                .location
1898                .new_node_metadata(Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind()),
1899        };
1900
1901        Stream::new(this.location.clone(), flatten_node)
1902    }
1903
1904    /// Given a time interval, returns a stream corresponding to samples taken from the
1905    /// stream roughly at that interval. The output will have elements in the same order
1906    /// as the input, but with arbitrary elements skipped between samples. There is also
1907    /// no guarantee on the exact timing of the samples.
1908    ///
1909    /// # Non-Determinism
1910    /// The output stream is non-deterministic in which elements are sampled, since this
1911    /// is controlled by a clock.
1912    pub fn sample_every(
1913        self,
1914        interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1915        nondet: NonDet,
1916    ) -> Stream<T, L::DropConsistency, Unbounded, O, AtLeastOnce>
1917    where
1918        L: TopLevel<'a>,
1919    {
1920        let samples = self.location.source_interval(interval);
1921
1922        let tick = self.location.tick();
1923        self.batch(&tick, nondet)
1924            .filter_if(samples.batch(&tick, nondet).first().is_some())
1925            .all_ticks()
1926            .weaken_retries()
1927    }
1928
1929    /// Given a timeout duration, returns an [`Optional`]  which will have a value if the
1930    /// stream has not emitted a value since that duration.
1931    ///
1932    /// # Non-Determinism
1933    /// Timeout relies on non-deterministic sampling of the stream, so depending on when
1934    /// samples take place, timeouts may be non-deterministically generated or missed,
1935    /// and the notification of the timeout may be delayed as well. There is also no
1936    /// guarantee on how long the [`Optional`] will have a value after the timeout is
1937    /// detected based on when the next sample is taken.
1938    pub fn timeout(
1939        self,
1940        duration: impl QuotedWithContext<'a, std::time::Duration, Tick<L::DropConsistency>> + Copy + 'a,
1941        nondet: NonDet,
1942    ) -> Optional<(), L::DropConsistency, Unbounded>
1943    where
1944        L: TopLevel<'a>,
1945    {
1946        let tick = self.location.tick();
1947
1948        let latest_received = self.assume_retries::<ExactlyOnce>(nondet).fold(
1949            q!(|| None),
1950            q!(
1951                |latest, _| {
1952                    *latest = Some(Instant::now());
1953                },
1954                commutative = manual_proof!(/** TODO */)
1955            ),
1956        );
1957
1958        latest_received
1959            .snapshot(&tick, nondet)
1960            .filter_map(q!(move |latest_received| {
1961                if let Some(latest_received) = latest_received {
1962                    if Instant::now().duration_since(latest_received) > duration {
1963                        Some(())
1964                    } else {
1965                        None
1966                    }
1967                } else {
1968                    Some(())
1969                }
1970            }))
1971            .latest()
1972    }
1973
1974    /// Shifts this stream into an atomic context, which guarantees that any downstream logic
1975    /// will all be executed synchronously before any outputs are yielded (in [`Stream::end_atomic`]).
1976    ///
1977    /// This is useful to enforce local consistency constraints, such as ensuring that a write is
1978    /// processed before an acknowledgement is emitted.
1979    pub fn atomic(self) -> Stream<T, Atomic<L>, B, O, R> {
1980        let id = self.location.flow_state().borrow_mut().next_clock_id();
1981        let out_location = Atomic {
1982            tick: Tick {
1983                id,
1984                l: self.location.clone(),
1985            },
1986        };
1987        Stream::new(
1988            out_location.clone(),
1989            HydroNode::BeginAtomic {
1990                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1991                metadata: out_location
1992                    .new_node_metadata(Stream::<T, Atomic<L>, B, O, R>::collection_kind()),
1993            },
1994        )
1995    }
1996
1997    /// Given a tick, returns a stream corresponding to a batch of elements segmented by
1998    /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
1999    /// the order of the input. The output stream will execute in the [`Tick`] that was
2000    /// used to create the atomic section.
2001    ///
2002    /// # Non-Determinism
2003    /// The batch boundaries are non-deterministic and may change across executions.
2004    pub fn batch<L2: Location<'a, DropConsistency = L::DropConsistency>>(
2005        self,
2006        tick: &Tick<L2>,
2007        _nondet: NonDet,
2008    ) -> Stream<T, Tick<L::DropConsistency>, Bounded, O, R> {
2009        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
2010        Stream::new(
2011            tick.drop_consistency(),
2012            HydroNode::Batch {
2013                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2014                metadata: tick
2015                    .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2016            },
2017        )
2018    }
2019
2020    /// An operator which allows you to "name" a `HydroNode`.
2021    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
2022    pub fn ir_node_named(self, name: &str) -> Stream<T, L, B, O, R> {
2023        {
2024            let mut node = self.ir_node.borrow_mut();
2025            let metadata = node.metadata_mut();
2026            metadata.tag = Some(name.to_owned());
2027        }
2028        self
2029    }
2030
2031    /// Turns this [`Stream`] into a [`Optional`], under the invariant assumption that there is at
2032    /// most one element. If this invariant is broken, the program may exhibit undefined behavior,
2033    /// so uses must be carefully vetted.
2034    pub(crate) fn cast_at_most_one_element(self) -> Optional<T, L, B>
2035    where
2036        B: IsBounded,
2037    {
2038        Optional::new(
2039            self.location.clone(),
2040            HydroNode::Cast {
2041                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2042                metadata: self
2043                    .location
2044                    .new_node_metadata(Optional::<T, L, B>::collection_kind()),
2045            },
2046        )
2047    }
2048
2049    pub(crate) fn use_ordering_type<O2: Ordering>(self) -> Stream<T, L, B, O2, R> {
2050        if O::ORDERING_KIND == O2::ORDERING_KIND {
2051            Stream::new(
2052                self.location.clone(),
2053                self.ir_node.replace(HydroNode::Placeholder),
2054            )
2055        } else {
2056            panic!(
2057                "Runtime ordering {:?} did not match requested cast {:?}.",
2058                O::ORDERING_KIND,
2059                O2::ORDERING_KIND
2060            )
2061        }
2062    }
2063
2064    /// Explicitly "casts" the stream to a type with a different ordering
2065    /// guarantee. Useful in unsafe code where the ordering cannot be proven
2066    /// by the type-system.
2067    ///
2068    /// # Non-Determinism
2069    /// This function is used as an escape hatch, and any mistakes in the
2070    /// provided ordering guarantee will propagate into the guarantees
2071    /// for the rest of the program.
2072    pub fn assume_ordering<O2: Ordering>(
2073        self,
2074        _nondet: NonDet,
2075    ) -> Stream<T, L::DropConsistency, B, O2, R> {
2076        if O::ORDERING_KIND == O2::ORDERING_KIND {
2077            self.use_ordering_type().weaken_consistency()
2078        } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
2079            // We can always weaken the ordering guarantee
2080            let target_location = self.location().drop_consistency();
2081            Stream::new(
2082                target_location.clone(),
2083                HydroNode::Cast {
2084                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2085                    metadata: target_location
2086                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
2087                },
2088            )
2089        } else {
2090            let target_location = self.location().drop_consistency();
2091            Stream::new(
2092                target_location.clone(),
2093                HydroNode::ObserveNonDet {
2094                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2095                    trusted: false,
2096                    metadata: target_location
2097                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
2098                },
2099            )
2100        }
2101    }
2102
2103    // like `assume_ordering_trusted`, but only if the input stream is bounded and therefore
2104    // intermediate states will not be revealed
2105    fn assume_ordering_trusted_bounded<O2: Ordering>(
2106        self,
2107        nondet: NonDet,
2108    ) -> Stream<T, L, B, O2, R> {
2109        if B::BOUNDED {
2110            self.assume_ordering_trusted(nondet)
2111        } else {
2112            let self_location = self.location.clone();
2113            let inner: Stream<T, L::DropConsistency, B, O2, R> = self.assume_ordering(nondet);
2114            Stream::new(self_location, inner.ir_node.replace(HydroNode::Placeholder))
2115        }
2116    }
2117
2118    // only for internal APIs that have been carefully vetted to ensure that the non-determinism
2119    // is not observable
2120    pub(crate) fn assume_ordering_trusted<O2: Ordering>(
2121        self,
2122        _nondet: NonDet,
2123    ) -> Stream<T, L, B, O2, R> {
2124        if O::ORDERING_KIND == O2::ORDERING_KIND {
2125            self.use_ordering_type()
2126        } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
2127            // We can always weaken the ordering guarantee
2128            Stream::new(
2129                self.location.clone(),
2130                HydroNode::Cast {
2131                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2132                    metadata: self
2133                        .location
2134                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
2135                },
2136            )
2137        } else {
2138            Stream::new(
2139                self.location.clone(),
2140                HydroNode::ObserveNonDet {
2141                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2142                    trusted: true,
2143                    metadata: self
2144                        .location
2145                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
2146                },
2147            )
2148        }
2149    }
2150
2151    #[deprecated = "use `weaken_ordering::<NoOrder>()` instead"]
2152    /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
2153    /// which is always safe because that is the weakest possible guarantee.
2154    pub fn weakest_ordering(self) -> Stream<T, L, B, NoOrder, R> {
2155        self.weaken_ordering::<NoOrder>()
2156    }
2157
2158    /// Weakens the ordering guarantee provided by the stream to `O2`, with the type-system
2159    /// enforcing that `O2` is weaker than the input ordering guarantee.
2160    pub fn weaken_ordering<O2: WeakerOrderingThan<O>>(self) -> Stream<T, L, B, O2, R> {
2161        let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
2162        self.assume_ordering_trusted::<O2>(nondet)
2163    }
2164
2165    /// Strengthens the ordering guarantee to `TotalOrder`, given that `O: IsOrdered`, which
2166    /// implies that `O == TotalOrder`.
2167    pub fn make_totally_ordered(self) -> Stream<T, L, B, TotalOrder, R>
2168    where
2169        O: IsOrdered,
2170    {
2171        self.assume_ordering_trusted(nondet!(/** no-op */))
2172    }
2173
2174    /// Explicitly "casts" the stream to a type with a different retries
2175    /// guarantee. Useful in unsafe code where the lack of retries cannot
2176    /// be proven by the type-system.
2177    ///
2178    /// # Non-Determinism
2179    /// This function is used as an escape hatch, and any mistakes in the
2180    /// provided retries guarantee will propagate into the guarantees
2181    /// for the rest of the program.
2182    pub fn assume_retries<R2: Retries>(
2183        self,
2184        _nondet: NonDet,
2185    ) -> Stream<T, L::DropConsistency, B, O, R2> {
2186        if R::RETRIES_KIND == R2::RETRIES_KIND {
2187            Stream::new(
2188                self.location.drop_consistency(),
2189                self.ir_node.replace(HydroNode::Placeholder),
2190            )
2191        } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
2192            // We can always weaken the retries guarantee
2193            let target_location = self.location.drop_consistency();
2194            Stream::new(
2195                target_location.clone(),
2196                HydroNode::Cast {
2197                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2198                    metadata: target_location
2199                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2200                },
2201            )
2202        } else {
2203            let target_location = self.location.drop_consistency();
2204            Stream::new(
2205                target_location.clone(),
2206                HydroNode::ObserveNonDet {
2207                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2208                    trusted: false,
2209                    metadata: target_location
2210                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2211                },
2212            )
2213        }
2214    }
2215
2216    // only for internal APIs that have been carefully vetted to ensure that the non-determinism
2217    // is not observable
2218    fn assume_retries_trusted<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
2219        if R::RETRIES_KIND == R2::RETRIES_KIND {
2220            Stream::new(
2221                self.location.clone(),
2222                self.ir_node.replace(HydroNode::Placeholder),
2223            )
2224        } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
2225            // We can always weaken the retries guarantee
2226            Stream::new(
2227                self.location.clone(),
2228                HydroNode::Cast {
2229                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2230                    metadata: self
2231                        .location
2232                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2233                },
2234            )
2235        } else {
2236            Stream::new(
2237                self.location.clone(),
2238                HydroNode::ObserveNonDet {
2239                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2240                    trusted: true,
2241                    metadata: self
2242                        .location
2243                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2244                },
2245            )
2246        }
2247    }
2248
2249    #[deprecated = "use `weaken_retries::<AtLeastOnce>()` instead"]
2250    /// Weakens the retries guarantee provided by the stream to [`AtLeastOnce`],
2251    /// which is always safe because that is the weakest possible guarantee.
2252    pub fn weakest_retries(self) -> Stream<T, L, B, O, AtLeastOnce> {
2253        self.weaken_retries::<AtLeastOnce>()
2254    }
2255
2256    /// Weakens the retries guarantee provided by the stream to `R2`, with the type-system
2257    /// enforcing that `R2` is weaker than the input retries guarantee.
2258    pub fn weaken_retries<R2: WeakerRetryThan<R>>(self) -> Stream<T, L, B, O, R2> {
2259        let nondet = nondet!(/** this is a weaker retry guarantee, so it is safe to assume */);
2260        self.assume_retries_trusted::<R2>(nondet)
2261    }
2262
2263    /// Strengthens the retry guarantee to `ExactlyOnce`, given that `R: IsExactlyOnce`, which
2264    /// implies that `R == ExactlyOnce`.
2265    pub fn make_exactly_once(self) -> Stream<T, L, B, O, ExactlyOnce>
2266    where
2267        R: IsExactlyOnce,
2268    {
2269        self.assume_retries_trusted(nondet!(/** no-op */))
2270    }
2271
2272    /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
2273    /// implies that `B == Bounded`.
2274    pub fn make_bounded(self) -> Stream<T, L, Bounded, O, R>
2275    where
2276        B: IsBounded,
2277    {
2278        self.weaken_boundedness()
2279    }
2280
2281    /// Weakens the boundedness guarantee to an arbitrary boundedness `B2`, given that `B: IsBounded`,
2282    /// which implies that `B == Bounded`.
2283    pub fn weaken_boundedness<B2: Boundedness>(self) -> Stream<T, L, B2, O, R> {
2284        if B::BOUNDED == B2::BOUNDED {
2285            Stream::new(
2286                self.location.clone(),
2287                self.ir_node.replace(HydroNode::Placeholder),
2288            )
2289        } else {
2290            // We can always weaken the boundedness
2291            Stream::new(
2292                self.location.clone(),
2293                HydroNode::Cast {
2294                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2295                    metadata: self
2296                        .location
2297                        .new_node_metadata(Stream::<T, L, B2, O, R>::collection_kind()),
2298                },
2299            )
2300        }
2301    }
2302}
2303
2304impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<&T, L, B, O, R>
2305where
2306    L: Location<'a>,
2307{
2308    /// Clone each element of the stream; akin to `map(q!(|d| d.clone()))`.
2309    ///
2310    /// # Example
2311    /// ```rust
2312    /// # #[cfg(feature = "deploy")] {
2313    /// # use hydro_lang::prelude::*;
2314    /// # use futures::StreamExt;
2315    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2316    /// process.source_iter(q!(&[1, 2, 3])).cloned()
2317    /// # }, |mut stream| async move {
2318    /// // 1, 2, 3
2319    /// # for w in vec![1, 2, 3] {
2320    /// #     assert_eq!(stream.next().await.unwrap(), w);
2321    /// # }
2322    /// # }));
2323    /// # }
2324    /// ```
2325    pub fn cloned(self) -> Stream<T, L, B, O, R>
2326    where
2327        T: Clone,
2328    {
2329        self.map(q!(|d| d.clone()))
2330    }
2331}
2332
2333impl<'a, T, L, B: Boundedness, O: Ordering> Stream<T, L, B, O, ExactlyOnce>
2334where
2335    L: Location<'a>,
2336{
2337    /// Computes the number of elements in the stream as a [`Singleton`].
2338    ///
2339    /// # Example
2340    /// ```rust
2341    /// # #[cfg(feature = "deploy")] {
2342    /// # use hydro_lang::prelude::*;
2343    /// # use futures::StreamExt;
2344    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2345    /// let tick = process.tick();
2346    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
2347    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2348    /// batch.count().all_ticks()
2349    /// # }, |mut stream| async move {
2350    /// // 4
2351    /// # assert_eq!(stream.next().await.unwrap(), 4);
2352    /// # }));
2353    /// # }
2354    /// ```
2355    pub fn count(self) -> Singleton<usize, L, B::StreamToMonotone> {
2356        self.assume_ordering_trusted::<TotalOrder>(nondet!(
2357            /// Order does not affect eventual count, and also does not affect intermediate states.
2358        ))
2359        .fold(
2360            q!(|| 0usize),
2361            q!(
2362                |count, _| *count += 1,
2363                monotone = manual_proof!(/** += 1 is monotone */)
2364            ),
2365        )
2366    }
2367}
2368
2369impl<'a, T, L: Location<'a>, O: Ordering, R: Retries> Stream<T, L, Unbounded, O, R> {
2370    /// Produces a new stream that merges the elements of the two input streams.
2371    /// The result has [`NoOrder`] because the order of merging is not guaranteed.
2372    ///
2373    /// Currently, both input streams must be [`Unbounded`]. When the streams are
2374    /// [`Bounded`], you can use [`Stream::chain`] instead.
2375    ///
2376    /// # Example
2377    /// ```rust
2378    /// # #[cfg(feature = "deploy")] {
2379    /// # use hydro_lang::prelude::*;
2380    /// # use futures::StreamExt;
2381    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2382    /// let numbers: Stream<i32, _, Unbounded> = // 1, 2, 3, 4
2383    /// # process.source_iter(q!(vec![1, 2, 3, 4])).into();
2384    /// numbers.clone().map(q!(|x| x + 1)).merge_unordered(numbers)
2385    /// # }, |mut stream| async move {
2386    /// // 2, 3, 4, 5, and 1, 2, 3, 4 merged in unknown order
2387    /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
2388    /// #     assert_eq!(stream.next().await.unwrap(), w);
2389    /// # }
2390    /// # }));
2391    /// # }
2392    /// ```
2393    pub fn merge_unordered<O2: Ordering, R2: Retries>(
2394        self,
2395        other: Stream<T, L, Unbounded, O2, R2>,
2396    ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2397    where
2398        R: MinRetries<R2>,
2399    {
2400        Stream::new(
2401            self.location.clone(),
2402            HydroNode::Chain {
2403                first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2404                second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2405                metadata: self.location.new_node_metadata(Stream::<
2406                    T,
2407                    L,
2408                    Unbounded,
2409                    NoOrder,
2410                    <R as MinRetries<R2>>::Min,
2411                >::collection_kind()),
2412            },
2413        )
2414    }
2415
2416    /// Deprecated: use [`Stream::merge_unordered`] instead.
2417    #[deprecated(note = "use `merge_unordered` instead")]
2418    pub fn interleave<O2: Ordering, R2: Retries>(
2419        self,
2420        other: Stream<T, L, Unbounded, O2, R2>,
2421    ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2422    where
2423        R: MinRetries<R2>,
2424    {
2425        self.merge_unordered(other)
2426    }
2427}
2428
2429impl<'a, T, L: Location<'a>, B: Boundedness, R: Retries> Stream<T, L, B, TotalOrder, R> {
2430    /// Produces a new stream that combines the elements of the two input streams,
2431    /// preserving the relative order of elements within each input.
2432    ///
2433    /// # Non-Determinism
2434    /// The order in which elements *across* the two streams will be interleaved is
2435    /// non-deterministic, so the order of elements will vary across runs. If the output
2436    /// order is irrelevant, use [`Stream::merge_unordered`] instead, which is deterministic
2437    /// but emits an unordered stream. For deterministic first-then-second ordering on
2438    /// bounded streams, use [`Stream::chain`].
2439    ///
2440    /// # Example
2441    /// ```rust
2442    /// # #[cfg(feature = "deploy")] {
2443    /// # use hydro_lang::prelude::*;
2444    /// # use futures::StreamExt;
2445    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2446    /// let numbers: Stream<i32, _, Unbounded> = // 1, 3
2447    /// # process.source_iter(q!(vec![1, 3])).into();
2448    /// numbers.clone().merge_ordered(numbers.map(q!(|x| x + 1)), nondet!(/** example */))
2449    /// # }, |mut stream| async move {
2450    /// // 1, 3 and 2, 4 in some order, preserving the original local order
2451    /// # for w in vec![1, 3, 2, 4] {
2452    /// #     assert_eq!(stream.next().await.unwrap(), w);
2453    /// # }
2454    /// # }));
2455    /// # }
2456    /// ```
2457    pub fn merge_ordered<R2: Retries>(
2458        self,
2459        other: Stream<T, L, B, TotalOrder, R2>,
2460        _nondet: NonDet,
2461    ) -> Stream<T, L::DropConsistency, B, TotalOrder, <R as MinRetries<R2>>::Min>
2462    where
2463        R: MinRetries<R2>,
2464    {
2465        let target_location = self.location().drop_consistency();
2466        Stream::new(
2467            target_location.clone(),
2468            HydroNode::MergeOrdered {
2469                first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2470                second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2471                metadata: target_location.new_node_metadata(Stream::<
2472                    T,
2473                    L::DropConsistency,
2474                    B,
2475                    TotalOrder,
2476                    <R as MinRetries<R2>>::Min,
2477                >::collection_kind()),
2478            },
2479        )
2480    }
2481}
2482
2483impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
2484where
2485    L: Location<'a>,
2486{
2487    /// Produces a new stream that emits the input elements in sorted order.
2488    ///
2489    /// The input stream can have any ordering guarantee, but the output stream
2490    /// will have a [`TotalOrder`] guarantee. This operator will block until all
2491    /// elements in the input stream are available, so it requires the input stream
2492    /// to be [`Bounded`].
2493    ///
2494    /// # Example
2495    /// ```rust
2496    /// # #[cfg(feature = "deploy")] {
2497    /// # use hydro_lang::prelude::*;
2498    /// # use futures::StreamExt;
2499    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2500    /// let tick = process.tick();
2501    /// let numbers = process.source_iter(q!(vec![4, 2, 3, 1]));
2502    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2503    /// batch.sort().all_ticks()
2504    /// # }, |mut stream| async move {
2505    /// // 1, 2, 3, 4
2506    /// # for w in (1..5) {
2507    /// #     assert_eq!(stream.next().await.unwrap(), w);
2508    /// # }
2509    /// # }));
2510    /// # }
2511    /// ```
2512    pub fn sort(self) -> Stream<T, L, Bounded, TotalOrder, R>
2513    where
2514        B: IsBounded,
2515        T: Ord,
2516    {
2517        let this = self.make_bounded();
2518        Stream::new(
2519            this.location.clone(),
2520            HydroNode::Sort {
2521                input: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
2522                metadata: this
2523                    .location
2524                    .new_node_metadata(Stream::<T, L, Bounded, TotalOrder, R>::collection_kind()),
2525            },
2526        )
2527    }
2528
2529    /// Produces a new stream that first emits the elements of the `self` stream,
2530    /// and then emits the elements of the `other` stream. The output stream has
2531    /// a [`TotalOrder`] guarantee if and only if both input streams have a
2532    /// [`TotalOrder`] guarantee.
2533    ///
2534    /// Currently, both input streams must be [`Bounded`]. This operator will block
2535    /// on the first stream until all its elements are available. In a future version,
2536    /// we will relax the requirement on the `other` stream.
2537    ///
2538    /// # Example
2539    /// ```rust
2540    /// # #[cfg(feature = "deploy")] {
2541    /// # use hydro_lang::prelude::*;
2542    /// # use futures::StreamExt;
2543    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2544    /// let tick = process.tick();
2545    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
2546    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2547    /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
2548    /// # }, |mut stream| async move {
2549    /// // 2, 3, 4, 5, 1, 2, 3, 4
2550    /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
2551    /// #     assert_eq!(stream.next().await.unwrap(), w);
2552    /// # }
2553    /// # }));
2554    /// # }
2555    /// ```
2556    pub fn chain<O2: Ordering, R2: Retries, B2: Boundedness>(
2557        self,
2558        other: Stream<T, L, B2, O2, R2>,
2559    ) -> Stream<T, L, B2, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
2560    where
2561        B: IsBounded,
2562        O: MinOrder<O2>,
2563        R: MinRetries<R2>,
2564    {
2565        check_matching_location(&self.location, &other.location);
2566
2567        Stream::new(
2568            self.location.clone(),
2569            HydroNode::Chain {
2570                first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2571                second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2572                metadata: self.location.new_node_metadata(Stream::<
2573                    T,
2574                    L,
2575                    B2,
2576                    <O as MinOrder<O2>>::Min,
2577                    <R as MinRetries<R2>>::Min,
2578                >::collection_kind()),
2579            },
2580        )
2581    }
2582
2583    /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams.
2584    /// Unlike [`Stream::cross_product`], the output order is totally ordered when the inputs are
2585    /// because this is compiled into a nested loop.
2586    #[expect(
2587        clippy::type_complexity,
2588        reason = "MinRetries projection in return type"
2589    )]
2590    pub fn cross_product_nested_loop<T2, O2: Ordering + MinOrder<O>, R2: Retries>(
2591        self,
2592        other: Stream<T2, L, Bounded, O2, R2>,
2593    ) -> Stream<(T, T2), L, Bounded, <O2 as MinOrder<O>>::Min, <R as MinRetries<R2>>::Min>
2594    where
2595        B: IsBounded,
2596        T: Clone,
2597        T2: Clone,
2598        R: MinRetries<R2>,
2599    {
2600        let this = self.make_bounded();
2601        check_matching_location(&this.location, &other.location);
2602
2603        Stream::new(
2604            this.location.clone(),
2605            HydroNode::CrossProduct {
2606                left: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
2607                right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2608                metadata: this.location.new_node_metadata(Stream::<
2609                    (T, T2),
2610                    L,
2611                    Bounded,
2612                    <O2 as MinOrder<O>>::Min,
2613                    <R as MinRetries<R2>>::Min,
2614                >::collection_kind()),
2615            },
2616        )
2617    }
2618
2619    /// Creates a [`KeyedStream`] with the same set of keys as `keys`, but with the elements in
2620    /// `self` used as the values for *each* key.
2621    ///
2622    /// This is helpful when "broadcasting" a set of values so that all the keys have the same
2623    /// values. For example, it can be used to send the same set of elements to several cluster
2624    /// members, if the membership information is available as a [`KeyedSingleton`].
2625    ///
2626    /// # Example
2627    /// ```rust
2628    /// # #[cfg(feature = "deploy")] {
2629    /// # use hydro_lang::prelude::*;
2630    /// # use futures::StreamExt;
2631    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2632    /// # let tick = process.tick();
2633    /// let keyed_singleton = // { 1: (), 2: () }
2634    /// # process
2635    /// #     .source_iter(q!(vec![(1, ()), (2, ())]))
2636    /// #     .into_keyed()
2637    /// #     .batch(&tick, nondet!(/** test */))
2638    /// #     .first();
2639    /// let stream = // [ "a", "b" ]
2640    /// # process
2641    /// #     .source_iter(q!(vec!["a".to_owned(), "b".to_owned()]))
2642    /// #     .batch(&tick, nondet!(/** test */));
2643    /// stream.repeat_with_keys(keyed_singleton)
2644    /// # .entries().all_ticks()
2645    /// # }, |mut stream| async move {
2646    /// // { 1: ["a", "b" ], 2: ["a", "b"] }
2647    /// # let mut results = Vec::new();
2648    /// # for _ in 0..4 {
2649    /// #     results.push(stream.next().await.unwrap());
2650    /// # }
2651    /// # results.sort();
2652    /// # assert_eq!(results, vec![(1, "a".to_owned()), (1, "b".to_owned()), (2, "a".to_owned()), (2, "b".to_owned())]);
2653    /// # }));
2654    /// # }
2655    /// ```
2656    pub fn repeat_with_keys<K, V2>(
2657        self,
2658        keys: KeyedSingleton<K, V2, L, Bounded>,
2659    ) -> KeyedStream<K, T, L, Bounded, O, R>
2660    where
2661        B: IsBounded,
2662        K: Clone,
2663        T: Clone,
2664    {
2665        keys.keys()
2666            .assume_ordering_trusted::<TotalOrder>(
2667                nondet!(/** keyed stream does not depend on ordering of keys */),
2668            )
2669            .cross_product_nested_loop(self.make_bounded())
2670            .into_keyed()
2671    }
2672
2673    /// Consumes a stream of `Future<T>`, resolving each future while blocking subgraph
2674    /// execution until all results are available. The output order is based on when futures
2675    /// complete, and may be different than the input order.
2676    ///
2677    /// Unlike [`Stream::resolve_futures`], which allows the subgraph to continue executing
2678    /// while futures are pending, this variant blocks until the futures resolve.
2679    ///
2680    /// # Example
2681    /// ```rust
2682    /// # #[cfg(feature = "deploy")] {
2683    /// # use std::collections::HashSet;
2684    /// # use futures::StreamExt;
2685    /// # use hydro_lang::prelude::*;
2686    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2687    /// process
2688    ///     .source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2689    ///     .map(q!(|x| async move {
2690    ///         tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2691    ///         x
2692    ///     }))
2693    ///     .resolve_futures_blocking()
2694    /// #   },
2695    /// #   |mut stream| async move {
2696    /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
2697    /// #       let mut output = HashSet::new();
2698    /// #       for _ in 1..10 {
2699    /// #           output.insert(stream.next().await.unwrap());
2700    /// #       }
2701    /// #       assert_eq!(
2702    /// #           output,
2703    /// #           HashSet::<i32>::from_iter(1..10)
2704    /// #       );
2705    /// #   },
2706    /// # ));
2707    /// # }
2708    /// ```
2709    pub fn resolve_futures_blocking(self) -> Stream<T::Output, L, B, NoOrder, R>
2710    where
2711        T: Future,
2712    {
2713        Stream::new(
2714            self.location.clone(),
2715            HydroNode::ResolveFuturesBlocking {
2716                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2717                metadata: self
2718                    .location
2719                    .new_node_metadata(Stream::<T::Output, L, B, NoOrder, R>::collection_kind()),
2720            },
2721        )
2722    }
2723
2724    /// Returns a [`Singleton`] containing `true` if the stream has no elements, or `false` otherwise.
2725    ///
2726    /// # Example
2727    /// ```rust
2728    /// # #[cfg(feature = "deploy")] {
2729    /// # use hydro_lang::prelude::*;
2730    /// # use futures::StreamExt;
2731    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2732    /// let tick = process.tick();
2733    /// let empty: Stream<i32, _, Bounded> = process
2734    ///   .source_iter(q!(Vec::<i32>::new()))
2735    ///   .batch(&tick, nondet!(/** test */));
2736    /// empty.is_empty().all_ticks()
2737    /// # }, |mut stream| async move {
2738    /// // true
2739    /// # assert_eq!(stream.next().await.unwrap(), true);
2740    /// # }));
2741    /// # }
2742    /// ```
2743    #[expect(clippy::wrong_self_convention, reason = "stream function naming")]
2744    pub fn is_empty(self) -> Singleton<bool, L, Bounded>
2745    where
2746        B: IsBounded,
2747    {
2748        self.make_bounded()
2749            .assume_ordering_trusted::<TotalOrder>(
2750                nondet!(/** is_empty intermediates unaffected by order */),
2751            )
2752            .first()
2753            .is_none()
2754    }
2755}
2756
2757impl<'a, K, V1, L, B: Boundedness, O: Ordering, R: Retries> Stream<(K, V1), L, B, O, R>
2758where
2759    L: Location<'a>,
2760{
2761    #[expect(clippy::type_complexity, reason = "ordering / retries propagation")]
2762    /// Given two streams of pairs `(K, V1)` and `(K, V2)`, produces a new stream of nested pairs `(K, (V1, V2))`
2763    /// by equi-joining the two streams on the key attribute `K`.
2764    ///
2765    /// When the right-hand side is [`Bounded`], the join accumulates the right side first
2766    /// and streams the left side through, preserving the left side's ordering. When both
2767    /// sides are [`Unbounded`], a symmetric hash join is used and ordering is [`NoOrder`].
2768    ///
2769    /// # Example
2770    /// ```rust
2771    /// # #[cfg(feature = "deploy")] {
2772    /// # use hydro_lang::prelude::*;
2773    /// # use std::collections::HashSet;
2774    /// # use futures::StreamExt;
2775    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2776    /// let tick = process.tick();
2777    /// let stream1 = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
2778    /// let stream2 = process.source_iter(q!(vec![(1, 'x'), (2, 'y')]));
2779    /// stream1.join(stream2)
2780    /// # }, |mut stream| async move {
2781    /// // (1, ('a', 'x')), (2, ('b', 'y'))
2782    /// # let expected = HashSet::from([(1, ('a', 'x')), (2, ('b', 'y'))]);
2783    /// # stream.map(|i| assert!(expected.contains(&i)));
2784    /// # }));
2785    /// # }
2786    pub fn join<V2, B2: Boundedness, O2: Ordering, R2: Retries>(
2787        self,
2788        n: Stream<(K, V2), L, B2, O2, R2>,
2789    ) -> Stream<(K, (V1, V2)), L, B, B2::PreserveOrderIfBounded<O>, <R as MinRetries<R2>>::Min>
2790    where
2791        K: Eq + Hash + Clone,
2792        R: MinRetries<R2>,
2793        V1: Clone,
2794        V2: Clone,
2795    {
2796        check_matching_location(&self.location, &n.location);
2797
2798        let ir_node = if B2::BOUNDED {
2799            HydroNode::JoinHalf {
2800                left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2801                right: Box::new(n.ir_node.replace(HydroNode::Placeholder)),
2802                metadata: self.location.new_node_metadata(Stream::<
2803                    (K, (V1, V2)),
2804                    L,
2805                    B,
2806                    B2::PreserveOrderIfBounded<O>,
2807                    <R as MinRetries<R2>>::Min,
2808                >::collection_kind()),
2809            }
2810        } else {
2811            HydroNode::Join {
2812                left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2813                right: Box::new(n.ir_node.replace(HydroNode::Placeholder)),
2814                metadata: self.location.new_node_metadata(Stream::<
2815                    (K, (V1, V2)),
2816                    L,
2817                    B,
2818                    B2::PreserveOrderIfBounded<O>,
2819                    <R as MinRetries<R2>>::Min,
2820                >::collection_kind()),
2821            }
2822        };
2823
2824        Stream::new(self.location.clone(), ir_node)
2825    }
2826
2827    /// Given a stream of pairs `(K, V1)` and a bounded stream of keys `K`,
2828    /// computes the anti-join of the items in the input -- i.e. returns
2829    /// unique items in the first input that do not have a matching key
2830    /// in the second input.
2831    ///
2832    /// # Example
2833    /// ```rust
2834    /// # #[cfg(feature = "deploy")] {
2835    /// # use hydro_lang::prelude::*;
2836    /// # use futures::StreamExt;
2837    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2838    /// let tick = process.tick();
2839    /// let stream = process
2840    ///   .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
2841    ///   .batch(&tick, nondet!(/** test */));
2842    /// let batch = process
2843    ///   .source_iter(q!(vec![1, 2]))
2844    ///   .batch(&tick, nondet!(/** test */));
2845    /// stream.anti_join(batch).all_ticks()
2846    /// # }, |mut stream| async move {
2847    /// # for w in vec![(3, 'c'), (4, 'd')] {
2848    /// #     assert_eq!(stream.next().await.unwrap(), w);
2849    /// # }
2850    /// # }));
2851    /// # }
2852    pub fn anti_join<O2: Ordering, R2: Retries>(
2853        self,
2854        n: Stream<K, L, Bounded, O2, R2>,
2855    ) -> Stream<(K, V1), L, B, O, R>
2856    where
2857        K: Eq + Hash,
2858    {
2859        check_matching_location(&self.location, &n.location);
2860
2861        Stream::new(
2862            self.location.clone(),
2863            HydroNode::AntiJoin {
2864                pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2865                neg: Box::new(n.ir_node.replace(HydroNode::Placeholder)),
2866                metadata: self
2867                    .location
2868                    .new_node_metadata(Stream::<(K, V1), L, B, O, R>::collection_kind()),
2869            },
2870        )
2871    }
2872}
2873
2874impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
2875    Stream<(K, V), L, B, O, R>
2876{
2877    /// Transforms this stream into a [`KeyedStream`], where the first element of each tuple
2878    /// is used as the key and the second element is added to the entries associated with that key.
2879    ///
2880    /// Because [`KeyedStream`] lazily groups values into buckets, this operator has zero computational
2881    /// cost and _does not_ require that the key type is hashable. Keyed streams are useful for
2882    /// performing grouped aggregations, but also for more precise ordering guarantees such as
2883    /// total ordering _within_ each group but no ordering _across_ groups.
2884    ///
2885    /// # Example
2886    /// ```rust
2887    /// # #[cfg(feature = "deploy")] {
2888    /// # use hydro_lang::prelude::*;
2889    /// # use futures::StreamExt;
2890    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2891    /// process
2892    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
2893    ///     .into_keyed()
2894    /// #   .entries()
2895    /// # }, |mut stream| async move {
2896    /// // { 1: [2, 3], 2: [4] }
2897    /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
2898    /// #     assert_eq!(stream.next().await.unwrap(), w);
2899    /// # }
2900    /// # }));
2901    /// # }
2902    /// ```
2903    pub fn into_keyed(self) -> KeyedStream<K, V, L, B, O, R> {
2904        KeyedStream::new(
2905            self.location.clone(),
2906            HydroNode::Cast {
2907                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2908                metadata: self
2909                    .location
2910                    .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
2911            },
2912        )
2913    }
2914}
2915
2916impl<'a, K, V, L, O: Ordering, R: Retries> Stream<(K, V), Tick<L>, Bounded, O, R>
2917where
2918    K: Eq + Hash,
2919    L: Location<'a>,
2920{
2921    /// Given a stream of pairs `(K, V)`, produces a new stream of unique keys `K`.
2922    /// # Example
2923    /// ```rust
2924    /// # #[cfg(feature = "deploy")] {
2925    /// # use hydro_lang::prelude::*;
2926    /// # use futures::StreamExt;
2927    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2928    /// let tick = process.tick();
2929    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2930    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2931    /// batch.keys().all_ticks()
2932    /// # }, |mut stream| async move {
2933    /// // 1, 2
2934    /// # assert_eq!(stream.next().await.unwrap(), 1);
2935    /// # assert_eq!(stream.next().await.unwrap(), 2);
2936    /// # }));
2937    /// # }
2938    /// ```
2939    pub fn keys(self) -> Stream<K, Tick<L>, Bounded, NoOrder, ExactlyOnce> {
2940        self.into_keyed()
2941            .fold(
2942                q!(|| ()),
2943                q!(
2944                    |_, _| {},
2945                    commutative = manual_proof!(/** values are ignored */),
2946                    idempotent = manual_proof!(/** values are ignored */)
2947                ),
2948            )
2949            .keys()
2950    }
2951}
2952
2953impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Atomic<L>, B, O, R>
2954where
2955    L: Location<'a>,
2956{
2957    /// Returns a stream corresponding to the latest batch of elements being atomically
2958    /// processed. These batches are guaranteed to be contiguous across ticks and preserve
2959    /// the order of the input.
2960    ///
2961    /// # Non-Determinism
2962    /// The batch boundaries are non-deterministic and may change across executions.
2963    pub fn batch_atomic<L2: Location<'a, DropConsistency = L::DropConsistency>>(
2964        self,
2965        tick: &Tick<L2>,
2966        _nondet: NonDet,
2967    ) -> Stream<T, Tick<L::DropConsistency>, Bounded, O, R> {
2968        Stream::new(
2969            tick.drop_consistency(),
2970            HydroNode::Batch {
2971                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2972                metadata: tick
2973                    .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2974            },
2975        )
2976    }
2977
2978    /// Yields the elements of this stream back into a top-level, asynchronous execution context.
2979    /// See [`Stream::atomic`] for more details.
2980    pub fn end_atomic(self) -> Stream<T, L, B, O, R> {
2981        Stream::new(
2982            self.location.tick.l.clone(),
2983            HydroNode::EndAtomic {
2984                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2985                metadata: self
2986                    .location
2987                    .tick
2988                    .l
2989                    .new_node_metadata(Stream::<T, L, B, O, R>::collection_kind()),
2990            },
2991        )
2992    }
2993}
2994
2995impl<'a, F, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<F, L, B, O, R>
2996where
2997    L: TopLevel<'a>,
2998    F: Future<Output = T>,
2999{
3000    /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
3001    /// Future outputs are produced as available, regardless of input arrival order.
3002    ///
3003    /// # Example
3004    /// ```rust
3005    /// # #[cfg(feature = "deploy")] {
3006    /// # use std::collections::HashSet;
3007    /// # use futures::StreamExt;
3008    /// # use hydro_lang::prelude::*;
3009    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
3010    /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
3011    ///     .map(q!(|x| async move {
3012    ///         tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
3013    ///         x
3014    ///     }))
3015    ///     .resolve_futures()
3016    /// #   },
3017    /// #   |mut stream| async move {
3018    /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
3019    /// #       let mut output = HashSet::new();
3020    /// #       for _ in 1..10 {
3021    /// #           output.insert(stream.next().await.unwrap());
3022    /// #       }
3023    /// #       assert_eq!(
3024    /// #           output,
3025    /// #           HashSet::<i32>::from_iter(1..10)
3026    /// #       );
3027    /// #   },
3028    /// # ));
3029    /// # }
3030    pub fn resolve_futures(self) -> Stream<T, L, Unbounded, NoOrder, R> {
3031        Stream::new(
3032            self.location.clone(),
3033            HydroNode::ResolveFutures {
3034                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3035                metadata: self
3036                    .location
3037                    .new_node_metadata(Stream::<T, L, Unbounded, NoOrder, R>::collection_kind()),
3038            },
3039        )
3040    }
3041
3042    /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
3043    /// Future outputs are produced in the same order as the input stream.
3044    ///
3045    /// # Example
3046    /// ```rust
3047    /// # #[cfg(feature = "deploy")] {
3048    /// # use std::collections::HashSet;
3049    /// # use futures::StreamExt;
3050    /// # use hydro_lang::prelude::*;
3051    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
3052    /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
3053    ///     .map(q!(|x| async move {
3054    ///         tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
3055    ///         x
3056    ///     }))
3057    ///     .resolve_futures_ordered()
3058    /// #   },
3059    /// #   |mut stream| async move {
3060    /// // 2, 3, 1, 9, 6, 5, 4, 7, 8
3061    /// #       let mut output = Vec::new();
3062    /// #       for _ in 1..10 {
3063    /// #           output.push(stream.next().await.unwrap());
3064    /// #       }
3065    /// #       assert_eq!(
3066    /// #           output,
3067    /// #           vec![2, 3, 1, 9, 6, 5, 4, 7, 8]
3068    /// #       );
3069    /// #   },
3070    /// # ));
3071    /// # }
3072    pub fn resolve_futures_ordered(self) -> Stream<T, L, Unbounded, O, R> {
3073        Stream::new(
3074            self.location.clone(),
3075            HydroNode::ResolveFuturesOrdered {
3076                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3077                metadata: self
3078                    .location
3079                    .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
3080            },
3081        )
3082    }
3083}
3084
3085impl<'a, T, L, O: Ordering, R: Retries> Stream<T, Tick<L>, Bounded, O, R>
3086where
3087    L: Location<'a>,
3088{
3089    /// Asynchronously yields this batch of elements outside the tick as an unbounded stream,
3090    /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
3091    pub fn all_ticks(self) -> Stream<T, L, Unbounded, O, R> {
3092        Stream::new(
3093            self.location.outer().clone(),
3094            HydroNode::YieldConcat {
3095                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3096                metadata: self
3097                    .location
3098                    .outer()
3099                    .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
3100            },
3101        )
3102    }
3103
3104    /// Synchronously yields this batch of elements outside the tick as an unbounded stream,
3105    /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
3106    ///
3107    /// Unlike [`Stream::all_ticks`], this preserves synchronous execution, as the output stream
3108    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
3109    /// stream's [`Tick`] context.
3110    pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, O, R> {
3111        let out_location = Atomic {
3112            tick: self.location.clone(),
3113        };
3114
3115        Stream::new(
3116            out_location.clone(),
3117            HydroNode::YieldConcat {
3118                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3119                metadata: out_location
3120                    .new_node_metadata(Stream::<T, Atomic<L>, Unbounded, O, R>::collection_kind()),
3121            },
3122        )
3123    }
3124
3125    /// Transforms the stream using the given closure in "stateful" mode, where stateful operators
3126    /// such as `fold` retrain their memory across ticks rather than resetting across batches of
3127    /// input.
3128    ///
3129    /// This API is particularly useful for stateful computation on batches of data, such as
3130    /// maintaining an accumulated state that is up to date with the current batch.
3131    ///
3132    /// # Example
3133    /// ```rust
3134    /// # #[cfg(feature = "deploy")] {
3135    /// # use hydro_lang::prelude::*;
3136    /// # use futures::StreamExt;
3137    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
3138    /// let tick = process.tick();
3139    /// # // ticks are lazy by default, forces the second tick to run
3140    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
3141    /// # let batch_first_tick = process
3142    /// #   .source_iter(q!(vec![1, 2, 3, 4]))
3143    /// #  .batch(&tick, nondet!(/** test */));
3144    /// # let batch_second_tick = process
3145    /// #   .source_iter(q!(vec![5, 6, 7]))
3146    /// #   .batch(&tick, nondet!(/** test */))
3147    /// #   .defer_tick(); // appears on the second tick
3148    /// let input = // [1, 2, 3, 4 (first batch), 5, 6, 7 (second batch)]
3149    /// # batch_first_tick.chain(batch_second_tick).all_ticks();
3150    ///
3151    /// input.batch(&tick, nondet!(/** test */))
3152    ///     .across_ticks(|s| s.count()).all_ticks()
3153    /// # }, |mut stream| async move {
3154    /// // [4, 7]
3155    /// assert_eq!(stream.next().await.unwrap(), 4);
3156    /// assert_eq!(stream.next().await.unwrap(), 7);
3157    /// # }));
3158    /// # }
3159    /// ```
3160    pub fn across_ticks<Out: BatchAtomic<'a>>(
3161        self,
3162        thunk: impl FnOnce(Stream<T, Atomic<L>, Unbounded, O, R>) -> Out,
3163    ) -> Out::Batched {
3164        thunk(self.all_ticks_atomic()).batched_atomic()
3165    }
3166
3167    /// Shifts the elements in `self` to the **next tick**, so that the returned stream at tick `T`
3168    /// always has the elements of `self` at tick `T - 1`.
3169    ///
3170    /// At tick `0`, the output stream is empty, since there is no previous tick.
3171    ///
3172    /// This operator enables stateful iterative processing with ticks, by sending data from one
3173    /// tick to the next. For example, you can use it to compare inputs across consecutive batches.
3174    ///
3175    /// # Example
3176    /// ```rust
3177    /// # #[cfg(feature = "deploy")] {
3178    /// # use hydro_lang::prelude::*;
3179    /// # use futures::StreamExt;
3180    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
3181    /// let tick = process.tick();
3182    /// // ticks are lazy by default, forces the second tick to run
3183    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
3184    ///
3185    /// let batch_first_tick = process
3186    ///   .source_iter(q!(vec![1, 2, 3, 4]))
3187    ///   .batch(&tick, nondet!(/** test */));
3188    /// let batch_second_tick = process
3189    ///   .source_iter(q!(vec![0, 3, 4, 5, 6]))
3190    ///   .batch(&tick, nondet!(/** test */))
3191    ///   .defer_tick(); // appears on the second tick
3192    /// let changes_across_ticks = batch_first_tick.chain(batch_second_tick);
3193    ///
3194    /// changes_across_ticks.clone().filter_not_in(
3195    ///     changes_across_ticks.defer_tick() // the elements from the previous tick
3196    /// ).all_ticks()
3197    /// # }, |mut stream| async move {
3198    /// // [1, 2, 3, 4 /* first tick */, 0, 5, 6 /* second tick */]
3199    /// # for w in vec![1, 2, 3, 4, 0, 5, 6] {
3200    /// #     assert_eq!(stream.next().await.unwrap(), w);
3201    /// # }
3202    /// # }));
3203    /// # }
3204    /// ```
3205    pub fn defer_tick(self) -> Stream<T, Tick<L>, Bounded, O, R> {
3206        Stream::new(
3207            self.location.clone(),
3208            HydroNode::DeferTick {
3209                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3210                metadata: self
3211                    .location
3212                    .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
3213            },
3214        )
3215    }
3216}
3217
3218#[cfg(test)]
3219mod tests {
3220    #[cfg(feature = "deploy")]
3221    use futures::{SinkExt, StreamExt};
3222    #[cfg(feature = "deploy")]
3223    use hydro_deploy::Deployment;
3224    #[cfg(feature = "deploy")]
3225    use serde::{Deserialize, Serialize};
3226    #[cfg(any(feature = "deploy", feature = "sim"))]
3227    use stageleft::q;
3228
3229    #[cfg(any(feature = "deploy", feature = "sim"))]
3230    use crate::compile::builder::FlowBuilder;
3231    #[cfg(feature = "deploy")]
3232    use crate::live_collections::sliced::sliced;
3233    #[cfg(feature = "deploy")]
3234    use crate::live_collections::stream::ExactlyOnce;
3235    #[cfg(feature = "sim")]
3236    use crate::live_collections::stream::NoOrder;
3237    #[cfg(any(feature = "deploy", feature = "sim"))]
3238    use crate::live_collections::stream::TotalOrder;
3239    #[cfg(any(feature = "deploy", feature = "sim"))]
3240    use crate::location::Location;
3241    #[cfg(feature = "sim")]
3242    use crate::networking::TCP;
3243    #[cfg(any(feature = "deploy", feature = "sim"))]
3244    use crate::nondet::nondet;
3245
3246    mod backtrace_chained_ops;
3247
3248    #[cfg(feature = "deploy")]
3249    struct P1 {}
3250    #[cfg(feature = "deploy")]
3251    struct P2 {}
3252
3253    #[cfg(feature = "deploy")]
3254    #[derive(Serialize, Deserialize, Debug)]
3255    struct SendOverNetwork {
3256        n: u32,
3257    }
3258
3259    #[cfg(feature = "deploy")]
3260    #[tokio::test]
3261    async fn first_ten_distributed() {
3262        use crate::networking::TCP;
3263
3264        let mut deployment = Deployment::new();
3265
3266        let mut flow = FlowBuilder::new();
3267        let first_node = flow.process::<P1>();
3268        let second_node = flow.process::<P2>();
3269        let external = flow.external::<P2>();
3270
3271        let numbers = first_node.source_iter(q!(0..10));
3272        let out_port = numbers
3273            .map(q!(|n| SendOverNetwork { n }))
3274            .send(&second_node, TCP.fail_stop().bincode())
3275            .send_bincode_external(&external);
3276
3277        let nodes = flow
3278            .with_process(&first_node, deployment.Localhost())
3279            .with_process(&second_node, deployment.Localhost())
3280            .with_external(&external, deployment.Localhost())
3281            .deploy(&mut deployment);
3282
3283        deployment.deploy().await.unwrap();
3284
3285        let mut external_out = nodes.connect(out_port).await;
3286
3287        deployment.start().await.unwrap();
3288
3289        for i in 0..10 {
3290            assert_eq!(external_out.next().await.unwrap().n, i);
3291        }
3292    }
3293
3294    #[cfg(feature = "deploy")]
3295    #[tokio::test]
3296    async fn first_cardinality() {
3297        let mut deployment = Deployment::new();
3298
3299        let mut flow = FlowBuilder::new();
3300        let node = flow.process::<()>();
3301        let external = flow.external::<()>();
3302
3303        let node_tick = node.tick();
3304        let count = node_tick
3305            .singleton(q!([1, 2, 3]))
3306            .into_stream()
3307            .flatten_ordered()
3308            .first()
3309            .into_stream()
3310            .count()
3311            .all_ticks()
3312            .send_bincode_external(&external);
3313
3314        let nodes = flow
3315            .with_process(&node, deployment.Localhost())
3316            .with_external(&external, deployment.Localhost())
3317            .deploy(&mut deployment);
3318
3319        deployment.deploy().await.unwrap();
3320
3321        let mut external_out = nodes.connect(count).await;
3322
3323        deployment.start().await.unwrap();
3324
3325        assert_eq!(external_out.next().await.unwrap(), 1);
3326    }
3327
3328    #[cfg(feature = "deploy")]
3329    #[tokio::test]
3330    async fn unbounded_reduce_remembers_state() {
3331        let mut deployment = Deployment::new();
3332
3333        let mut flow = FlowBuilder::new();
3334        let node = flow.process::<()>();
3335        let external = flow.external::<()>();
3336
3337        let (input_port, input) = node.source_external_bincode(&external);
3338        let out = input
3339            .reduce(q!(|acc, v| *acc += v))
3340            .sample_eager(nondet!(/** test */))
3341            .send_bincode_external(&external);
3342
3343        let nodes = flow
3344            .with_process(&node, deployment.Localhost())
3345            .with_external(&external, deployment.Localhost())
3346            .deploy(&mut deployment);
3347
3348        deployment.deploy().await.unwrap();
3349
3350        let mut external_in = nodes.connect(input_port).await;
3351        let mut external_out = nodes.connect(out).await;
3352
3353        deployment.start().await.unwrap();
3354
3355        external_in.send(1).await.unwrap();
3356        assert_eq!(external_out.next().await.unwrap(), 1);
3357
3358        external_in.send(2).await.unwrap();
3359        assert_eq!(external_out.next().await.unwrap(), 3);
3360    }
3361
3362    #[cfg(feature = "deploy")]
3363    #[tokio::test]
3364    async fn top_level_bounded_cross_singleton() {
3365        let mut deployment = Deployment::new();
3366
3367        let mut flow = FlowBuilder::new();
3368        let node = flow.process::<()>();
3369        let external = flow.external::<()>();
3370
3371        let (input_port, input) =
3372            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3373
3374        let out = input
3375            .cross_singleton(
3376                node.source_iter(q!(vec![1, 2, 3]))
3377                    .fold(q!(|| 0), q!(|acc, v| *acc += v)),
3378            )
3379            .send_bincode_external(&external);
3380
3381        let nodes = flow
3382            .with_process(&node, deployment.Localhost())
3383            .with_external(&external, deployment.Localhost())
3384            .deploy(&mut deployment);
3385
3386        deployment.deploy().await.unwrap();
3387
3388        let mut external_in = nodes.connect(input_port).await;
3389        let mut external_out = nodes.connect(out).await;
3390
3391        deployment.start().await.unwrap();
3392
3393        external_in.send(1).await.unwrap();
3394        assert_eq!(external_out.next().await.unwrap(), (1, 6));
3395
3396        external_in.send(2).await.unwrap();
3397        assert_eq!(external_out.next().await.unwrap(), (2, 6));
3398    }
3399
3400    #[cfg(feature = "deploy")]
3401    #[tokio::test]
3402    async fn top_level_bounded_reduce_cardinality() {
3403        let mut deployment = Deployment::new();
3404
3405        let mut flow = FlowBuilder::new();
3406        let node = flow.process::<()>();
3407        let external = flow.external::<()>();
3408
3409        let (input_port, input) =
3410            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3411
3412        let out = sliced! {
3413            let input = use(input, nondet!(/** test */));
3414            let v = use(node.source_iter(q!(vec![1, 2, 3])).reduce(q!(|acc, v| *acc += v)), nondet!(/** test */));
3415            input.cross_singleton(v.into_stream().count())
3416        }
3417        .send_bincode_external(&external);
3418
3419        let nodes = flow
3420            .with_process(&node, deployment.Localhost())
3421            .with_external(&external, deployment.Localhost())
3422            .deploy(&mut deployment);
3423
3424        deployment.deploy().await.unwrap();
3425
3426        let mut external_in = nodes.connect(input_port).await;
3427        let mut external_out = nodes.connect(out).await;
3428
3429        deployment.start().await.unwrap();
3430
3431        external_in.send(1).await.unwrap();
3432        assert_eq!(external_out.next().await.unwrap(), (1, 1));
3433
3434        external_in.send(2).await.unwrap();
3435        assert_eq!(external_out.next().await.unwrap(), (2, 1));
3436    }
3437
3438    #[cfg(feature = "deploy")]
3439    #[tokio::test]
3440    async fn top_level_bounded_into_singleton_cardinality() {
3441        let mut deployment = Deployment::new();
3442
3443        let mut flow = FlowBuilder::new();
3444        let node = flow.process::<()>();
3445        let external = flow.external::<()>();
3446
3447        let (input_port, input) =
3448            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3449
3450        let out = sliced! {
3451            let input = use(input, nondet!(/** test */));
3452            let v = use(node.source_iter(q!(vec![1, 2, 3])).reduce(q!(|acc, v| *acc += v)).into_singleton(), nondet!(/** test */));
3453            input.cross_singleton(v.into_stream().count())
3454        }
3455        .send_bincode_external(&external);
3456
3457        let nodes = flow
3458            .with_process(&node, deployment.Localhost())
3459            .with_external(&external, deployment.Localhost())
3460            .deploy(&mut deployment);
3461
3462        deployment.deploy().await.unwrap();
3463
3464        let mut external_in = nodes.connect(input_port).await;
3465        let mut external_out = nodes.connect(out).await;
3466
3467        deployment.start().await.unwrap();
3468
3469        external_in.send(1).await.unwrap();
3470        assert_eq!(external_out.next().await.unwrap(), (1, 1));
3471
3472        external_in.send(2).await.unwrap();
3473        assert_eq!(external_out.next().await.unwrap(), (2, 1));
3474    }
3475
3476    #[cfg(feature = "deploy")]
3477    #[tokio::test]
3478    async fn atomic_fold_replays_each_tick() {
3479        let mut deployment = Deployment::new();
3480
3481        let mut flow = FlowBuilder::new();
3482        let node = flow.process::<()>();
3483        let external = flow.external::<()>();
3484
3485        let (input_port, input) =
3486            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3487        let tick = node.tick();
3488
3489        let out = input
3490            .batch(&tick, nondet!(/** test */))
3491            .cross_singleton(
3492                node.source_iter(q!(vec![1, 2, 3]))
3493                    .atomic()
3494                    .fold(q!(|| 0), q!(|acc, v| *acc += v))
3495                    .snapshot_atomic(&tick, nondet!(/** test */)),
3496            )
3497            .all_ticks()
3498            .send_bincode_external(&external);
3499
3500        let nodes = flow
3501            .with_process(&node, deployment.Localhost())
3502            .with_external(&external, deployment.Localhost())
3503            .deploy(&mut deployment);
3504
3505        deployment.deploy().await.unwrap();
3506
3507        let mut external_in = nodes.connect(input_port).await;
3508        let mut external_out = nodes.connect(out).await;
3509
3510        deployment.start().await.unwrap();
3511
3512        external_in.send(1).await.unwrap();
3513        assert_eq!(external_out.next().await.unwrap(), (1, 6));
3514
3515        external_in.send(2).await.unwrap();
3516        assert_eq!(external_out.next().await.unwrap(), (2, 6));
3517    }
3518
3519    #[cfg(feature = "deploy")]
3520    #[tokio::test]
3521    async fn unbounded_scan_remembers_state() {
3522        let mut deployment = Deployment::new();
3523
3524        let mut flow = FlowBuilder::new();
3525        let node = flow.process::<()>();
3526        let external = flow.external::<()>();
3527
3528        let (input_port, input) = node.source_external_bincode(&external);
3529        let out = input
3530            .scan(
3531                q!(|| 0),
3532                q!(|acc, v| {
3533                    *acc += v;
3534                    Some(*acc)
3535                }),
3536            )
3537            .send_bincode_external(&external);
3538
3539        let nodes = flow
3540            .with_process(&node, deployment.Localhost())
3541            .with_external(&external, deployment.Localhost())
3542            .deploy(&mut deployment);
3543
3544        deployment.deploy().await.unwrap();
3545
3546        let mut external_in = nodes.connect(input_port).await;
3547        let mut external_out = nodes.connect(out).await;
3548
3549        deployment.start().await.unwrap();
3550
3551        external_in.send(1).await.unwrap();
3552        assert_eq!(external_out.next().await.unwrap(), 1);
3553
3554        external_in.send(2).await.unwrap();
3555        assert_eq!(external_out.next().await.unwrap(), 3);
3556    }
3557
3558    #[cfg(feature = "deploy")]
3559    #[tokio::test]
3560    async fn unbounded_enumerate_remembers_state() {
3561        let mut deployment = Deployment::new();
3562
3563        let mut flow = FlowBuilder::new();
3564        let node = flow.process::<()>();
3565        let external = flow.external::<()>();
3566
3567        let (input_port, input) = node.source_external_bincode(&external);
3568        let out = input.enumerate().send_bincode_external(&external);
3569
3570        let nodes = flow
3571            .with_process(&node, deployment.Localhost())
3572            .with_external(&external, deployment.Localhost())
3573            .deploy(&mut deployment);
3574
3575        deployment.deploy().await.unwrap();
3576
3577        let mut external_in = nodes.connect(input_port).await;
3578        let mut external_out = nodes.connect(out).await;
3579
3580        deployment.start().await.unwrap();
3581
3582        external_in.send(1).await.unwrap();
3583        assert_eq!(external_out.next().await.unwrap(), (0, 1));
3584
3585        external_in.send(2).await.unwrap();
3586        assert_eq!(external_out.next().await.unwrap(), (1, 2));
3587    }
3588
3589    #[cfg(feature = "deploy")]
3590    #[tokio::test]
3591    async fn unbounded_unique_remembers_state() {
3592        let mut deployment = Deployment::new();
3593
3594        let mut flow = FlowBuilder::new();
3595        let node = flow.process::<()>();
3596        let external = flow.external::<()>();
3597
3598        let (input_port, input) =
3599            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3600        let out = input.unique().send_bincode_external(&external);
3601
3602        let nodes = flow
3603            .with_process(&node, deployment.Localhost())
3604            .with_external(&external, deployment.Localhost())
3605            .deploy(&mut deployment);
3606
3607        deployment.deploy().await.unwrap();
3608
3609        let mut external_in = nodes.connect(input_port).await;
3610        let mut external_out = nodes.connect(out).await;
3611
3612        deployment.start().await.unwrap();
3613
3614        external_in.send(1).await.unwrap();
3615        assert_eq!(external_out.next().await.unwrap(), 1);
3616
3617        external_in.send(2).await.unwrap();
3618        assert_eq!(external_out.next().await.unwrap(), 2);
3619
3620        external_in.send(1).await.unwrap();
3621        external_in.send(3).await.unwrap();
3622        assert_eq!(external_out.next().await.unwrap(), 3);
3623    }
3624
3625    #[cfg(feature = "sim")]
3626    #[test]
3627    #[should_panic]
3628    fn sim_batch_nondet_size() {
3629        let mut flow = FlowBuilder::new();
3630        let node = flow.process::<()>();
3631
3632        let (in_send, input) = node.sim_input::<_, TotalOrder, _>();
3633
3634        let tick = node.tick();
3635        let out_recv = input
3636            .batch(&tick, nondet!(/** test */))
3637            .count()
3638            .all_ticks()
3639            .sim_output();
3640
3641        flow.sim().exhaustive(async || {
3642            in_send.send(());
3643            in_send.send(());
3644            in_send.send(());
3645
3646            assert_eq!(out_recv.next().await.unwrap(), 3); // fails with nondet batching
3647        });
3648    }
3649
3650    #[cfg(feature = "sim")]
3651    #[test]
3652    fn sim_batch_preserves_order() {
3653        let mut flow = FlowBuilder::new();
3654        let node = flow.process::<()>();
3655
3656        let (in_send, input) = node.sim_input();
3657
3658        let tick = node.tick();
3659        let out_recv = input
3660            .batch(&tick, nondet!(/** test */))
3661            .all_ticks()
3662            .sim_output();
3663
3664        flow.sim().exhaustive(async || {
3665            in_send.send(1);
3666            in_send.send(2);
3667            in_send.send(3);
3668
3669            out_recv.assert_yields_only([1, 2, 3]).await;
3670        });
3671    }
3672
3673    #[cfg(feature = "sim")]
3674    #[test]
3675    #[should_panic]
3676    fn sim_batch_unordered_shuffles() {
3677        let mut flow = FlowBuilder::new();
3678        let node = flow.process::<()>();
3679
3680        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3681
3682        let tick = node.tick();
3683        let batch = input.batch(&tick, nondet!(/** test */));
3684        let out_recv = batch
3685            .clone()
3686            .min()
3687            .zip(batch.max())
3688            .all_ticks()
3689            .sim_output();
3690
3691        flow.sim().exhaustive(async || {
3692            in_send.send_many_unordered([1, 2, 3]);
3693
3694            if out_recv.collect::<Vec<_>>().await == vec![(1, 3), (2, 2)] {
3695                panic!("saw both (1, 3) and (2, 2), so batching must have shuffled the order");
3696            }
3697        });
3698    }
3699
3700    #[cfg(feature = "sim")]
3701    #[test]
3702    fn sim_batch_unordered_shuffles_count() {
3703        let mut flow = FlowBuilder::new();
3704        let node = flow.process::<()>();
3705
3706        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3707
3708        let tick = node.tick();
3709        let batch = input.batch(&tick, nondet!(/** test */));
3710        let out_recv = batch.all_ticks().sim_output();
3711
3712        let instance_count = flow.sim().exhaustive(async || {
3713            in_send.send_many_unordered([1, 2, 3, 4]);
3714            out_recv.assert_yields_only_unordered([1, 2, 3, 4]).await;
3715        });
3716
3717        assert_eq!(
3718            instance_count,
3719            75 // ∑ (k=1 to 4) S(4,k) × k! = 75
3720        )
3721    }
3722
3723    #[cfg(feature = "sim")]
3724    #[test]
3725    #[should_panic]
3726    fn sim_observe_order_batched() {
3727        let mut flow = FlowBuilder::new();
3728        let node = flow.process::<()>();
3729
3730        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3731
3732        let tick = node.tick();
3733        let batch = input.batch(&tick, nondet!(/** test */));
3734        let out_recv = batch
3735            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3736            .all_ticks()
3737            .sim_output();
3738
3739        flow.sim().exhaustive(async || {
3740            in_send.send_many_unordered([1, 2, 3, 4]);
3741            out_recv.assert_yields_only([1, 2, 3, 4]).await; // fails with assume_ordering
3742        });
3743    }
3744
3745    #[cfg(feature = "sim")]
3746    #[test]
3747    fn sim_observe_order_batched_count() {
3748        let mut flow = FlowBuilder::new();
3749        let node = flow.process::<()>();
3750
3751        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3752
3753        let tick = node.tick();
3754        let batch = input.batch(&tick, nondet!(/** test */));
3755        let out_recv = batch
3756            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3757            .all_ticks()
3758            .sim_output();
3759
3760        let instance_count = flow.sim().exhaustive(async || {
3761            in_send.send_many_unordered([1, 2, 3, 4]);
3762            let _ = out_recv.collect::<Vec<_>>().await;
3763        });
3764
3765        assert_eq!(
3766            instance_count,
3767            192 // 4! * 2^{4 - 1}
3768        )
3769    }
3770
3771    #[cfg(feature = "sim")]
3772    #[test]
3773    fn sim_unordered_count_instance_count() {
3774        let mut flow = FlowBuilder::new();
3775        let node = flow.process::<()>();
3776
3777        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3778
3779        let tick = node.tick();
3780        let out_recv = input
3781            .count()
3782            .snapshot(&tick, nondet!(/** test */))
3783            .all_ticks()
3784            .sim_output();
3785
3786        let instance_count = flow.sim().exhaustive(async || {
3787            in_send.send_many_unordered([1, 2, 3, 4]);
3788            assert!(out_recv.collect::<Vec<_>>().await.last().unwrap() == &4);
3789        });
3790
3791        assert_eq!(
3792            instance_count,
3793            16 // 2^4, { 0, 1, 2, 3 } can be a snapshot and 4 is always included
3794        )
3795    }
3796
3797    #[cfg(feature = "sim")]
3798    #[test]
3799    fn sim_top_level_assume_ordering() {
3800        let mut flow = FlowBuilder::new();
3801        let node = flow.process::<()>();
3802
3803        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3804
3805        let out_recv = input
3806            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3807            .sim_output();
3808
3809        let instance_count = flow.sim().exhaustive(async || {
3810            in_send.send_many_unordered([1, 2, 3]);
3811            let mut out = out_recv.collect::<Vec<_>>().await;
3812            out.sort();
3813            assert_eq!(out, vec![1, 2, 3]);
3814        });
3815
3816        assert_eq!(instance_count, 6)
3817    }
3818
3819    #[cfg(feature = "sim")]
3820    #[test]
3821    fn sim_top_level_assume_ordering_cycle_back() {
3822        let mut flow = FlowBuilder::new();
3823        let node = flow.process::<()>();
3824        let node2 = flow.process::<()>();
3825
3826        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3827
3828        let (complete_cycle_back, cycle_back) =
3829            node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3830        let ordered = input
3831            .merge_unordered(cycle_back)
3832            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3833        complete_cycle_back.complete(
3834            ordered
3835                .clone()
3836                .map(q!(|v| v + 1))
3837                .filter(q!(|v| v % 2 == 1))
3838                .send(&node2, TCP.fail_stop().bincode())
3839                .send(&node, TCP.fail_stop().bincode()),
3840        );
3841
3842        let out_recv = ordered.sim_output();
3843
3844        let mut saw = false;
3845        let instance_count = flow.sim().exhaustive(async || {
3846            in_send.send_many_unordered([0, 2]);
3847            let out = out_recv.collect::<Vec<_>>().await;
3848
3849            if out.starts_with(&[0, 1, 2]) {
3850                saw = true;
3851            }
3852        });
3853
3854        assert!(saw, "did not see an instance with 0, 1, 2 in order");
3855        assert_eq!(instance_count, 6);
3856    }
3857
3858    #[cfg(feature = "sim")]
3859    #[test]
3860    fn sim_top_level_assume_ordering_cycle_back_tick() {
3861        let mut flow = FlowBuilder::new();
3862        let node = flow.process::<()>();
3863        let node2 = flow.process::<()>();
3864
3865        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3866
3867        let (complete_cycle_back, cycle_back) =
3868            node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3869        let ordered = input
3870            .merge_unordered(cycle_back)
3871            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3872        complete_cycle_back.complete(
3873            ordered
3874                .clone()
3875                .batch(&node.tick(), nondet!(/** test */))
3876                .all_ticks()
3877                .map(q!(|v| v + 1))
3878                .filter(q!(|v| v % 2 == 1))
3879                .send(&node2, TCP.fail_stop().bincode())
3880                .send(&node, TCP.fail_stop().bincode()),
3881        );
3882
3883        let out_recv = ordered.sim_output();
3884
3885        let mut saw = false;
3886        let instance_count = flow.sim().exhaustive(async || {
3887            in_send.send_many_unordered([0, 2]);
3888            let out = out_recv.collect::<Vec<_>>().await;
3889
3890            if out.starts_with(&[0, 1, 2]) {
3891                saw = true;
3892            }
3893        });
3894
3895        assert!(saw, "did not see an instance with 0, 1, 2 in order");
3896        assert_eq!(instance_count, 58);
3897    }
3898
3899    #[cfg(feature = "sim")]
3900    #[test]
3901    fn sim_top_level_assume_ordering_multiple() {
3902        let mut flow = FlowBuilder::new();
3903        let node = flow.process::<()>();
3904        let node2 = flow.process::<()>();
3905
3906        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3907        let (_, input2) = node.sim_input::<_, NoOrder, _>();
3908
3909        let (complete_cycle_back, cycle_back) =
3910            node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3911        let input1_ordered = input
3912            .clone()
3913            .merge_unordered(cycle_back)
3914            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3915        let foo = input1_ordered
3916            .clone()
3917            .map(q!(|v| v + 3))
3918            .weaken_ordering::<NoOrder>()
3919            .merge_unordered(input2)
3920            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3921
3922        complete_cycle_back.complete(
3923            foo.filter(q!(|v| *v == 3))
3924                .send(&node2, TCP.fail_stop().bincode())
3925                .send(&node, TCP.fail_stop().bincode()),
3926        );
3927
3928        let out_recv = input1_ordered.sim_output();
3929
3930        let mut saw = false;
3931        let instance_count = flow.sim().exhaustive(async || {
3932            in_send.send_many_unordered([0, 1]);
3933            let out = out_recv.collect::<Vec<_>>().await;
3934
3935            if out.starts_with(&[0, 3, 1]) {
3936                saw = true;
3937            }
3938        });
3939
3940        assert!(saw, "did not see an instance with 0, 3, 1 in order");
3941        assert_eq!(instance_count, 24);
3942    }
3943
3944    #[cfg(feature = "sim")]
3945    #[test]
3946    fn sim_atomic_assume_ordering_cycle_back() {
3947        let mut flow = FlowBuilder::new();
3948        let node = flow.process::<()>();
3949        let node2 = flow.process::<()>();
3950
3951        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3952
3953        let (complete_cycle_back, cycle_back) =
3954            node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3955        let ordered = input
3956            .merge_unordered(cycle_back)
3957            .atomic()
3958            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3959            .end_atomic();
3960        complete_cycle_back.complete(
3961            ordered
3962                .clone()
3963                .map(q!(|v| v + 1))
3964                .filter(q!(|v| v % 2 == 1))
3965                .send(&node2, TCP.fail_stop().bincode())
3966                .send(&node, TCP.fail_stop().bincode()),
3967        );
3968
3969        let out_recv = ordered.sim_output();
3970
3971        let instance_count = flow.sim().exhaustive(async || {
3972            in_send.send_many_unordered([0, 2]);
3973            let out = out_recv.collect::<Vec<_>>().await;
3974            assert_eq!(out.len(), 4);
3975        });
3976        assert_eq!(instance_count, 22);
3977    }
3978
3979    #[cfg(feature = "deploy")]
3980    #[tokio::test]
3981    async fn partition_evens_odds() {
3982        let mut deployment = Deployment::new();
3983
3984        let mut flow = FlowBuilder::new();
3985        let node = flow.process::<()>();
3986        let external = flow.external::<()>();
3987
3988        let numbers = node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6]));
3989        let (evens, odds) = numbers.partition(q!(|x: &i32| x % 2 == 0));
3990        let evens_port = evens.send_bincode_external(&external);
3991        let odds_port = odds.send_bincode_external(&external);
3992
3993        let nodes = flow
3994            .with_process(&node, deployment.Localhost())
3995            .with_external(&external, deployment.Localhost())
3996            .deploy(&mut deployment);
3997
3998        deployment.deploy().await.unwrap();
3999
4000        let mut evens_out = nodes.connect(evens_port).await;
4001        let mut odds_out = nodes.connect(odds_port).await;
4002
4003        deployment.start().await.unwrap();
4004
4005        let mut even_results = Vec::new();
4006        for _ in 0..3 {
4007            even_results.push(evens_out.next().await.unwrap());
4008        }
4009        even_results.sort();
4010        assert_eq!(even_results, vec![2, 4, 6]);
4011
4012        let mut odd_results = Vec::new();
4013        for _ in 0..3 {
4014            odd_results.push(odds_out.next().await.unwrap());
4015        }
4016        odd_results.sort();
4017        assert_eq!(odd_results, vec![1, 3, 5]);
4018    }
4019
4020    #[cfg(feature = "deploy")]
4021    #[tokio::test]
4022    async fn unconsumed_inspect_still_runs() {
4023        use crate::deploy::DeployCrateWrapper;
4024
4025        let mut deployment = Deployment::new();
4026
4027        let mut flow = FlowBuilder::new();
4028        let node = flow.process::<()>();
4029
4030        // The return value of .inspect() is intentionally dropped.
4031        // Before the Null-root fix, this would silently do nothing.
4032        node.source_iter(q!(0..5))
4033            .inspect(q!(|x| println!("inspect: {}", x)));
4034
4035        let nodes = flow
4036            .with_process(&node, deployment.Localhost())
4037            .deploy(&mut deployment);
4038
4039        deployment.deploy().await.unwrap();
4040
4041        let mut stdout = nodes.get_process(&node).stdout();
4042
4043        deployment.start().await.unwrap();
4044
4045        let mut lines = Vec::new();
4046        for _ in 0..5 {
4047            lines.push(stdout.recv().await.unwrap());
4048        }
4049        lines.sort();
4050        assert_eq!(
4051            lines,
4052            vec![
4053                "inspect: 0",
4054                "inspect: 1",
4055                "inspect: 2",
4056                "inspect: 3",
4057                "inspect: 4",
4058            ]
4059        );
4060    }
4061
4062    #[cfg(feature = "sim")]
4063    #[test]
4064    fn sim_limit() {
4065        let mut flow = FlowBuilder::new();
4066        let node = flow.process::<()>();
4067
4068        let (in_send, input) = node.sim_input();
4069
4070        let out_recv = input.limit(q!(3)).sim_output();
4071
4072        flow.sim().exhaustive(async || {
4073            in_send.send(1);
4074            in_send.send(2);
4075            in_send.send(3);
4076            in_send.send(4);
4077            in_send.send(5);
4078
4079            out_recv.assert_yields_only([1, 2, 3]).await;
4080        });
4081    }
4082
4083    #[cfg(feature = "sim")]
4084    #[test]
4085    fn sim_limit_zero() {
4086        let mut flow = FlowBuilder::new();
4087        let node = flow.process::<()>();
4088
4089        let (in_send, input) = node.sim_input();
4090
4091        let out_recv = input.limit(q!(0)).sim_output();
4092
4093        flow.sim().exhaustive(async || {
4094            in_send.send(1);
4095            in_send.send(2);
4096
4097            out_recv.assert_yields_only::<i32, _>([]).await;
4098        });
4099    }
4100
4101    #[cfg(feature = "sim")]
4102    #[test]
4103    fn sim_merge_ordered() {
4104        let mut flow = FlowBuilder::new();
4105        let node = flow.process::<()>();
4106
4107        let (in_send, input) = node.sim_input();
4108        let (in_send2, input2) = node.sim_input();
4109
4110        let out_recv = input
4111            .merge_ordered(input2, nondet!(/** test */))
4112            .sim_output();
4113
4114        let mut saw_out_of_order = false;
4115        let instances = flow.sim().exhaustive(async || {
4116            in_send.send(1);
4117            in_send.send(2);
4118            in_send2.send(3);
4119            in_send2.send(4);
4120
4121            let out = out_recv.collect::<Vec<_>>().await;
4122
4123            if out == [1, 3, 2, 4] {
4124                saw_out_of_order = true;
4125            }
4126
4127            // Assert ordering preservation: elements from each input must
4128            // appear in their original relative order.
4129            let mut first_elements = out.iter().filter(|v| **v <= 2).copied().collect::<Vec<_>>();
4130            let mut second_elements = out.iter().filter(|v| **v > 2).copied().collect::<Vec<_>>();
4131            assert_eq!(
4132                first_elements,
4133                vec![1, 2],
4134                "first input order violated: {:?}",
4135                out
4136            );
4137            assert_eq!(
4138                second_elements,
4139                vec![3, 4],
4140                "second input order violated: {:?}",
4141                out
4142            );
4143
4144            first_elements.append(&mut second_elements);
4145            first_elements.sort();
4146            assert_eq!(first_elements, vec![1, 2, 3, 4]);
4147        });
4148
4149        assert!(saw_out_of_order);
4150        assert_eq!(instances, 6);
4151    }
4152
4153    /// Tests that merge_ordered passes through elements when only one input
4154    /// has data.
4155    #[cfg(feature = "sim")]
4156    #[test]
4157    fn sim_merge_ordered_one_empty() {
4158        let mut flow = FlowBuilder::new();
4159        let node = flow.process::<()>();
4160
4161        let (in_send, input) = node.sim_input();
4162        let (_in_send2, input2) = node.sim_input();
4163
4164        let out_recv = input
4165            .merge_ordered(input2, nondet!(/** test */))
4166            .sim_output();
4167
4168        let instances = flow.sim().exhaustive(async || {
4169            in_send.send(1);
4170            in_send.send(2);
4171
4172            let out = out_recv.collect::<Vec<_>>().await;
4173            assert_eq!(out, vec![1, 2]);
4174        });
4175
4176        // Only one possible interleaving when one input is empty
4177        assert_eq!(instances, 1);
4178    }
4179
4180    /// Tests that merge_ordered correctly handles feedback cycles.
4181    /// An element output from merge_ordered is filtered and cycled back to
4182    /// one of its inputs. The one-at-a-time release must allow the cycled-back
4183    /// element to arrive and potentially be emitted before elements still
4184    /// waiting on the other input.
4185    #[cfg(feature = "sim")]
4186    #[test]
4187    fn sim_merge_ordered_cycle_back() {
4188        let mut flow = FlowBuilder::new();
4189        let node = flow.process::<()>();
4190
4191        let (in_send, input) = node.sim_input();
4192
4193        // Create a forward ref for the cycle back
4194        let (complete_cycle_back, cycle_back) =
4195            node.forward_ref::<super::Stream<_, _, _, TotalOrder>>();
4196
4197        // merge_ordered: input (external) with cycle_back
4198        let merged = input.merge_ordered(cycle_back, nondet!(/** test */));
4199
4200        // Cycle back: elements equal to 1 get mapped to 10 and fed back
4201        complete_cycle_back.complete(merged.clone().filter(q!(|v| *v == 1)).map(q!(|v| v * 10)));
4202
4203        let out_recv = merged.sim_output();
4204
4205        // Send 1 and 2. Element 1 should cycle back as 10.
4206        // Valid orderings must have 1 before 10 (since 10 depends on 1).
4207        let mut saw_cycle_before_second = false;
4208        flow.sim().exhaustive(async || {
4209            in_send.send(1);
4210            in_send.send(2);
4211
4212            let out = out_recv.collect::<Vec<_>>().await;
4213
4214            // 10 must always come after 1 (causal dependency)
4215            let pos_1 = out.iter().position(|v| *v == 1).unwrap();
4216            let pos_10 = out.iter().position(|v| *v == 10).unwrap();
4217            assert!(pos_1 < pos_10, "causal order violated: {:?}", out);
4218
4219            // Check if we see [1, 10, 2] — the cycled element beats the second input
4220            if out == [1, 10, 2] {
4221                saw_cycle_before_second = true;
4222            }
4223
4224            let mut sorted = out;
4225            sorted.sort();
4226            assert_eq!(sorted, vec![1, 2, 10]);
4227        });
4228
4229        assert!(
4230            saw_cycle_before_second,
4231            "never saw the cycled element arrive before the second input element"
4232        );
4233    }
4234
4235    /// Tests that merge_ordered correctly interleaves when one input has a
4236    /// delayed element. With a: [1, _delay_, 2] and b: [3, 4], the delayed
4237    /// element 2 should be able to appear after b's elements.
4238    #[cfg(feature = "sim")]
4239    #[test]
4240    fn sim_merge_ordered_delayed() {
4241        let mut flow = FlowBuilder::new();
4242        let node = flow.process::<()>();
4243
4244        let (in_send, input) = node.sim_input();
4245        let (in_send2, input2) = node.sim_input();
4246
4247        let out_recv = input
4248            .merge_ordered(input2, nondet!(/** test */))
4249            .sim_output();
4250
4251        let mut saw_delayed_interleaving = false;
4252        flow.sim().exhaustive(async || {
4253            // Send 1 from a, and 3, 4 from b
4254            in_send.send(1);
4255            in_send2.send(3);
4256            in_send2.send(4);
4257
4258            // Collect what's available so far
4259            let first_batch = out_recv.collect::<Vec<_>>().await;
4260
4261            // Now send the delayed element 2 from a
4262            in_send.send(2);
4263            let second_batch = out_recv.collect::<Vec<_>>().await;
4264
4265            let mut all: Vec<_> = first_batch
4266                .iter()
4267                .chain(second_batch.iter())
4268                .copied()
4269                .collect();
4270
4271            // Check if we saw [1, 3, 4, 2] — the delayed interleaving
4272            if all == [1, 3, 4, 2] {
4273                saw_delayed_interleaving = true;
4274            }
4275
4276            all.sort();
4277            assert_eq!(all, vec![1, 2, 3, 4]);
4278        });
4279
4280        assert!(saw_delayed_interleaving);
4281    }
4282
4283    /// Deploy test: merge_ordered with a delayed element on one input.
4284    /// Sends a=1, b=3, b=4, then after receiving those, sends a=2.
4285    /// Expects to see [1, 3, 4] first, then [2] — demonstrating that
4286    /// both inputs are pulled and the delayed element arrives later.
4287    #[cfg(feature = "deploy")]
4288    #[tokio::test]
4289    async fn deploy_merge_ordered_delayed() {
4290        let mut deployment = Deployment::new();
4291
4292        let mut flow = FlowBuilder::new();
4293        let node = flow.process::<()>();
4294        let external = flow.external::<()>();
4295
4296        let (input_a_port, input_a) = node.source_external_bincode(&external);
4297        let (input_b_port, input_b) = node.source_external_bincode(&external);
4298
4299        let out = input_a
4300            .assume_ordering(nondet!(/** test */))
4301            .merge_ordered(
4302                input_b.assume_ordering(nondet!(/** test */)),
4303                nondet!(/** test */),
4304            )
4305            .send_bincode_external(&external);
4306
4307        let nodes = flow
4308            .with_process(&node, deployment.Localhost())
4309            .with_external(&external, deployment.Localhost())
4310            .deploy(&mut deployment);
4311
4312        deployment.deploy().await.unwrap();
4313
4314        let mut ext_a = nodes.connect(input_a_port).await;
4315        let mut ext_b = nodes.connect(input_b_port).await;
4316        let mut ext_out = nodes.connect(out).await;
4317
4318        deployment.start().await.unwrap();
4319
4320        // Send a=1, b=3, b=4
4321        ext_a.send(1).await.unwrap();
4322        ext_b.send(3).await.unwrap();
4323        ext_b.send(4).await.unwrap();
4324
4325        // Collect the first 3 elements
4326        let mut received = Vec::new();
4327        for _ in 0..3 {
4328            received.push(ext_out.next().await.unwrap());
4329        }
4330
4331        // Now send the delayed a=2
4332        ext_a.send(2).await.unwrap();
4333        received.push(ext_out.next().await.unwrap());
4334
4335        // All elements should be present
4336        received.sort();
4337        assert_eq!(received, vec![1, 2, 3, 4]);
4338    }
4339
4340    #[cfg(feature = "deploy")]
4341    #[tokio::test]
4342    async fn monotone_fold_threshold() {
4343        use crate::properties::manual_proof;
4344
4345        let mut deployment = Deployment::new();
4346
4347        let mut flow = FlowBuilder::new();
4348        let node = flow.process::<()>();
4349        let external = flow.external::<()>();
4350
4351        let in_unbounded: super::Stream<_, _> =
4352            node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6])).into();
4353        let sum = in_unbounded.fold(
4354            q!(|| 0),
4355            q!(
4356                |sum, v| {
4357                    *sum += v;
4358                },
4359                monotone = manual_proof!(/** test */)
4360            ),
4361        );
4362
4363        let threshold_out = sum
4364            .threshold_greater_or_equal(node.singleton(q!(7)))
4365            .send_bincode_external(&external);
4366
4367        let nodes = flow
4368            .with_process(&node, deployment.Localhost())
4369            .with_external(&external, deployment.Localhost())
4370            .deploy(&mut deployment);
4371
4372        deployment.deploy().await.unwrap();
4373
4374        let mut threshold_out = nodes.connect(threshold_out).await;
4375
4376        deployment.start().await.unwrap();
4377
4378        assert_eq!(threshold_out.next().await.unwrap(), 7);
4379    }
4380
4381    #[cfg(feature = "deploy")]
4382    #[tokio::test]
4383    async fn monotone_count_threshold() {
4384        let mut deployment = Deployment::new();
4385
4386        let mut flow = FlowBuilder::new();
4387        let node = flow.process::<()>();
4388        let external = flow.external::<()>();
4389
4390        let in_unbounded: super::Stream<_, _> =
4391            node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6])).into();
4392        let sum = in_unbounded.count();
4393
4394        let threshold_out = sum
4395            .threshold_greater_or_equal(node.singleton(q!(3)))
4396            .send_bincode_external(&external);
4397
4398        let nodes = flow
4399            .with_process(&node, deployment.Localhost())
4400            .with_external(&external, deployment.Localhost())
4401            .deploy(&mut deployment);
4402
4403        deployment.deploy().await.unwrap();
4404
4405        let mut threshold_out = nodes.connect(threshold_out).await;
4406
4407        deployment.start().await.unwrap();
4408
4409        assert_eq!(threshold_out.next().await.unwrap(), 3);
4410    }
4411
4412    #[cfg(feature = "deploy")]
4413    #[tokio::test]
4414    async fn monotone_map_order_preserving_threshold() {
4415        use crate::properties::manual_proof;
4416
4417        let mut deployment = Deployment::new();
4418
4419        let mut flow = FlowBuilder::new();
4420        let node = flow.process::<()>();
4421        let external = flow.external::<()>();
4422
4423        let in_unbounded: super::Stream<_, _> =
4424            node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6])).into();
4425        let sum = in_unbounded.fold(
4426            q!(|| 0),
4427            q!(
4428                |sum, v| {
4429                    *sum += v;
4430                },
4431                monotone = manual_proof!(/** test */)
4432            ),
4433        );
4434
4435        // map with order_preserving should preserve monotonicity
4436        let doubled = sum.map(q!(
4437            |v| v * 2,
4438            order_preserving = manual_proof!(/** doubling preserves order */)
4439        ));
4440
4441        let threshold_out = doubled
4442            .threshold_greater_or_equal(node.singleton(q!(14)))
4443            .send_bincode_external(&external);
4444
4445        let nodes = flow
4446            .with_process(&node, deployment.Localhost())
4447            .with_external(&external, deployment.Localhost())
4448            .deploy(&mut deployment);
4449
4450        deployment.deploy().await.unwrap();
4451
4452        let mut threshold_out = nodes.connect(threshold_out).await;
4453
4454        deployment.start().await.unwrap();
4455
4456        assert_eq!(threshold_out.next().await.unwrap(), 14);
4457    }
4458
4459    // === Compile-time type tests for join/cross_product ordering ===
4460
4461    #[cfg(any(feature = "deploy", feature = "sim"))]
4462    mod join_ordering_type_tests {
4463        use crate::live_collections::boundedness::{Bounded, Unbounded};
4464        use crate::live_collections::stream::{ExactlyOnce, NoOrder, Stream, TotalOrder};
4465        use crate::location::{Location, Process};
4466
4467        #[expect(dead_code, reason = "compile-time type test")]
4468        fn join_unbounded_with_bounded_preserves_order<'a>(
4469            left: Stream<(i32, char), Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4470            right: Stream<(i32, char), Process<'a>, Bounded, TotalOrder, ExactlyOnce>,
4471        ) -> Stream<(i32, (char, char)), Process<'a>, Unbounded, TotalOrder, ExactlyOnce> {
4472            left.join(right)
4473        }
4474
4475        #[expect(dead_code, reason = "compile-time type test")]
4476        fn join_unbounded_with_unbounded_is_no_order<'a>(
4477            left: Stream<(i32, char), Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4478            right: Stream<(i32, char), Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4479        ) -> Stream<(i32, (char, char)), Process<'a>, Unbounded, NoOrder, ExactlyOnce> {
4480            left.join(right)
4481        }
4482
4483        #[expect(dead_code, reason = "compile-time type test")]
4484        fn join_bounded_with_bounded_preserves_order<'a, L: Location<'a>>(
4485            left: Stream<(i32, char), L, Bounded, TotalOrder, ExactlyOnce>,
4486            right: Stream<(i32, char), L, Bounded, TotalOrder, ExactlyOnce>,
4487        ) -> Stream<(i32, (char, char)), L, Bounded, TotalOrder, ExactlyOnce> {
4488            left.join(right)
4489        }
4490
4491        #[expect(dead_code, reason = "compile-time type test")]
4492        fn join_unbounded_noorder_with_bounded<'a>(
4493            left: Stream<(i32, char), Process<'a>, Unbounded, NoOrder, ExactlyOnce>,
4494            right: Stream<(i32, char), Process<'a>, Bounded, NoOrder, ExactlyOnce>,
4495        ) -> Stream<(i32, (char, char)), Process<'a>, Unbounded, NoOrder, ExactlyOnce> {
4496            left.join(right)
4497        }
4498
4499        // === Compile-time type tests for cross_product ordering ===
4500
4501        #[expect(dead_code, reason = "compile-time type test")]
4502        fn cross_product_unbounded_with_bounded_preserves_order<'a>(
4503            left: Stream<i32, Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4504            right: Stream<char, Process<'a>, Bounded, TotalOrder, ExactlyOnce>,
4505        ) -> Stream<(i32, char), Process<'a>, Unbounded, TotalOrder, ExactlyOnce> {
4506            left.cross_product(right)
4507        }
4508
4509        #[expect(dead_code, reason = "compile-time type test")]
4510        fn cross_product_bounded_with_bounded_preserves_order<'a>(
4511            left: Stream<i32, Process<'a>, Bounded, TotalOrder, ExactlyOnce>,
4512            right: Stream<char, Process<'a>, Bounded, TotalOrder, ExactlyOnce>,
4513        ) -> Stream<(i32, char), Process<'a>, Bounded, TotalOrder, ExactlyOnce> {
4514            left.cross_product(right)
4515        }
4516
4517        #[expect(dead_code, reason = "compile-time type test")]
4518        fn cross_product_unbounded_with_unbounded_is_no_order<'a>(
4519            left: Stream<i32, Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4520            right: Stream<char, Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4521        ) -> Stream<(i32, char), Process<'a>, Unbounded, NoOrder, ExactlyOnce> {
4522            left.cross_product(right)
4523        }
4524    } // mod join_ordering_type_tests
4525
4526    // === Runtime correctness tests for bounded join/cross_product ===
4527
4528    #[cfg(feature = "sim")]
4529    #[test]
4530    fn cross_product_mixed_boundedness_correctness() {
4531        use stageleft::q;
4532
4533        use crate::compile::builder::FlowBuilder;
4534        use crate::nondet::nondet;
4535
4536        let mut flow = FlowBuilder::new();
4537        let process = flow.process::<()>();
4538        let tick = process.tick();
4539
4540        let left = process.source_iter(q!(vec![1, 2]));
4541        let right = process
4542            .source_iter(q!(vec!['a', 'b']))
4543            .batch(&tick, nondet!(/** test */))
4544            .all_ticks();
4545
4546        let out = left.cross_product(right).sim_output();
4547
4548        flow.sim().exhaustive(async || {
4549            out.assert_yields_only_unordered(vec![(1, 'a'), (1, 'b'), (2, 'a'), (2, 'b')])
4550                .await;
4551        });
4552    }
4553
4554    #[cfg(feature = "sim")]
4555    #[test]
4556    fn join_mixed_boundedness_correctness() {
4557        use stageleft::q;
4558
4559        use crate::compile::builder::FlowBuilder;
4560        use crate::nondet::nondet;
4561
4562        let mut flow = FlowBuilder::new();
4563        let process = flow.process::<()>();
4564        let tick = process.tick();
4565
4566        let left = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
4567        let right = process
4568            .source_iter(q!(vec![(1, 'x'), (2, 'y')]))
4569            .batch(&tick, nondet!(/** test */))
4570            .all_ticks();
4571
4572        let out = left.join(right).sim_output();
4573
4574        flow.sim().exhaustive(async || {
4575            out.assert_yields_only_unordered(vec![(1, ('a', 'x')), (2, ('b', 'y'))])
4576                .await;
4577        });
4578    }
4579
4580    #[cfg(feature = "sim")]
4581    #[test]
4582    fn sim_merge_unordered_independent_atomics() {
4583        let mut flow = FlowBuilder::new();
4584        let node = flow.process::<()>();
4585
4586        let (in1_send, input1) = node.sim_input::<_, TotalOrder, _>();
4587        let (in2_send, input2) = node.sim_input::<_, TotalOrder, _>();
4588
4589        let out = input1
4590            .atomic()
4591            .merge_unordered(input2.atomic())
4592            .end_atomic()
4593            .sim_output();
4594
4595        flow.sim().exhaustive(async || {
4596            in1_send.send(1);
4597            in2_send.send(2);
4598
4599            out.assert_yields_only_unordered(vec![1, 2]).await;
4600        });
4601    }
4602}