Skip to main content

hydro_lang/live_collections/keyed_stream/
mod.rs

1//! Definitions for the [`KeyedStream`] live collection.
2
3use std::cell::RefCell;
4use std::collections::HashMap;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, QuotedWithContextWithProps, q};
11
12use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
13use super::keyed_singleton::KeyedSingleton;
14use super::optional::Optional;
15use super::stream::{
16    ExactlyOnce, IsExactlyOnce, IsOrdered, MinOrder, MinRetries, NoOrder, Stream, TotalOrder,
17};
18use crate::compile::builder::{CycleId, FlowState};
19use crate::compile::ir::{
20    CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, SharedNode, StreamOrder, StreamRetry,
21};
22#[cfg(stageleft_runtime)]
23use crate::forward_handle::{CycleCollection, ReceiverComplete};
24use crate::forward_handle::{ForwardRef, TickCycle};
25use crate::live_collections::batch_atomic::BatchAtomic;
26use crate::live_collections::keyed_singleton::KeyedSingletonBound;
27use crate::live_collections::stream::{
28    AtLeastOnce, Ordering, Retries, WeakerOrderingThan, WeakerRetryThan,
29};
30#[cfg(stageleft_runtime)]
31use crate::location::dynamic::{DynLocation, LocationId};
32use crate::location::tick::DeferTick;
33use crate::location::{Atomic, Location, Tick, check_matching_location};
34use crate::manual_expr::ManualExpr;
35use crate::nondet::{NonDet, nondet};
36use crate::properties::{
37    AggFuncAlgebra, ApplyMonotoneKeyedStream, ValidCommutativityFor, ValidIdempotenceFor,
38    manual_proof,
39};
40
41pub mod networking;
42
43/// Streaming elements of type `V` grouped by a key of type `K`.
44///
45/// Keyed Streams capture streaming elements of type `V` grouped by a key of type `K`, where the
46/// order of keys is non-deterministic but the order *within* each group may be deterministic.
47///
48/// Although keyed streams are conceptually grouped by keys, values are not immediately grouped
49/// into buckets when constructing a keyed stream. Instead, keyed streams defer grouping until an
50/// operator such as [`KeyedStream::fold`] is called, which requires `K: Hash + Eq`.
51///
52/// Type Parameters:
53/// - `K`: the type of the key for each group
54/// - `V`: the type of the elements inside each group
55/// - `Loc`: the [`Location`] where the keyed stream is materialized
56/// - `Bound`: tracks whether the entries are [`Bounded`] (local and finite) or [`Unbounded`] (asynchronous and possibly infinite)
57/// - `Order`: tracks whether the elements within each group have deterministic order
58///   ([`TotalOrder`]) or not ([`NoOrder`])
59/// - `Retries`: tracks whether the elements within each group have deterministic cardinality
60///   ([`ExactlyOnce`]) or may have non-deterministic retries ([`crate::live_collections::stream::AtLeastOnce`])
61pub struct KeyedStream<
62    K,
63    V,
64    Loc,
65    Bound: Boundedness = Unbounded,
66    Order: Ordering = TotalOrder,
67    Retry: Retries = ExactlyOnce,
68> {
69    pub(crate) location: Loc,
70    pub(crate) ir_node: RefCell<HydroNode>,
71    pub(crate) flow_state: FlowState,
72
73    _phantom: PhantomData<(K, V, Loc, Bound, Order, Retry)>,
74}
75
76impl<K, V, L, B: Boundedness, O: Ordering, R: Retries> Drop for KeyedStream<K, V, L, B, O, R> {
77    fn drop(&mut self) {
78        let ir_node = self.ir_node.replace(HydroNode::Placeholder);
79        if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
80            self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
81                input: Box::new(ir_node),
82                op_metadata: HydroIrOpMetadata::new(),
83            });
84        }
85    }
86}
87
88impl<'a, K, V, L, O: Ordering, R: Retries> From<KeyedStream<K, V, L, Bounded, O, R>>
89    for KeyedStream<K, V, L, Unbounded, O, R>
90where
91    L: Location<'a>,
92{
93    fn from(stream: KeyedStream<K, V, L, Bounded, O, R>) -> KeyedStream<K, V, L, Unbounded, O, R> {
94        let new_meta = stream
95            .location
96            .new_node_metadata(KeyedStream::<K, V, L, Unbounded, O, R>::collection_kind());
97
98        KeyedStream {
99            location: stream.location.clone(),
100            flow_state: stream.flow_state.clone(),
101            ir_node: RefCell::new(HydroNode::Cast {
102                inner: Box::new(stream.ir_node.replace(HydroNode::Placeholder)),
103                metadata: new_meta,
104            }),
105            _phantom: PhantomData,
106        }
107    }
108}
109
110impl<'a, K, V, L, B: Boundedness, R: Retries> From<KeyedStream<K, V, L, B, TotalOrder, R>>
111    for KeyedStream<K, V, L, B, NoOrder, R>
112where
113    L: Location<'a>,
114{
115    fn from(stream: KeyedStream<K, V, L, B, TotalOrder, R>) -> KeyedStream<K, V, L, B, NoOrder, R> {
116        stream.weaken_ordering()
117    }
118}
119
120impl<'a, K, V, L, O: Ordering, R: Retries> DeferTick for KeyedStream<K, V, Tick<L>, Bounded, O, R>
121where
122    L: Location<'a>,
123{
124    fn defer_tick(self) -> Self {
125        KeyedStream::defer_tick(self)
126    }
127}
128
129impl<'a, K, V, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
130    for KeyedStream<K, V, Tick<L>, Bounded, O, R>
131where
132    L: Location<'a>,
133{
134    type Location = Tick<L>;
135
136    fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
137        KeyedStream {
138            flow_state: location.flow_state().clone(),
139            location: location.clone(),
140            ir_node: RefCell::new(HydroNode::CycleSource {
141                cycle_id,
142                metadata: location.new_node_metadata(
143                    KeyedStream::<K, V, Tick<L>, Bounded, O, R>::collection_kind(),
144                ),
145            }),
146            _phantom: PhantomData,
147        }
148    }
149}
150
151impl<'a, K, V, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
152    for KeyedStream<K, V, Tick<L>, Bounded, O, R>
153where
154    L: Location<'a>,
155{
156    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
157        assert_eq!(
158            Location::id(&self.location),
159            expected_location,
160            "locations do not match"
161        );
162
163        self.location
164            .flow_state()
165            .borrow_mut()
166            .push_root(HydroRoot::CycleSink {
167                cycle_id,
168                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
169                op_metadata: HydroIrOpMetadata::new(),
170            });
171    }
172}
173
174impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
175    for KeyedStream<K, V, L, B, O, R>
176where
177    L: Location<'a>,
178{
179    type Location = L;
180
181    fn create_source(cycle_id: CycleId, location: L) -> Self {
182        KeyedStream {
183            flow_state: location.flow_state().clone(),
184            location: location.clone(),
185            ir_node: RefCell::new(HydroNode::CycleSource {
186                cycle_id,
187                metadata: location
188                    .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
189            }),
190            _phantom: PhantomData,
191        }
192    }
193}
194
195impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
196    for KeyedStream<K, V, L, B, O, R>
197where
198    L: Location<'a>,
199{
200    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
201        assert_eq!(
202            Location::id(&self.location),
203            expected_location,
204            "locations do not match"
205        );
206        self.location
207            .flow_state()
208            .borrow_mut()
209            .push_root(HydroRoot::CycleSink {
210                cycle_id,
211                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
212                op_metadata: HydroIrOpMetadata::new(),
213            });
214    }
215}
216
217impl<'a, K: Clone, V: Clone, Loc: Location<'a>, Bound: Boundedness, Order: Ordering, R: Retries>
218    Clone for KeyedStream<K, V, Loc, Bound, Order, R>
219{
220    fn clone(&self) -> Self {
221        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
222            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
223            *self.ir_node.borrow_mut() = HydroNode::Tee {
224                inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
225                metadata: self.location.new_node_metadata(Self::collection_kind()),
226            };
227        }
228
229        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
230            KeyedStream {
231                location: self.location.clone(),
232                flow_state: self.flow_state.clone(),
233                ir_node: HydroNode::Tee {
234                    inner: SharedNode(inner.0.clone()),
235                    metadata: metadata.clone(),
236                }
237                .into(),
238                _phantom: PhantomData,
239            }
240        } else {
241            unreachable!()
242        }
243    }
244}
245
246/// The output of a Hydro generator created with [`KeyedStream::generator`], which can yield elements and
247/// control the processing of future elements.
248pub enum Generate<T> {
249    /// Emit the provided element, and keep processing future inputs.
250    Yield(T),
251    /// Emit the provided element as the _final_ element, do not process future inputs.
252    Return(T),
253    /// Do not emit anything, but continue processing future inputs.
254    Continue,
255    /// Do not emit anything, and do not process further inputs.
256    Break,
257}
258
259impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
260    KeyedStream<K, V, L, B, O, R>
261{
262    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
263        debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
264        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
265
266        let flow_state = location.flow_state().clone();
267        KeyedStream {
268            location,
269            flow_state,
270            ir_node: RefCell::new(ir_node),
271            _phantom: PhantomData,
272        }
273    }
274
275    /// Returns the [`CollectionKind`] corresponding to this type.
276    pub fn collection_kind() -> CollectionKind {
277        CollectionKind::KeyedStream {
278            bound: B::BOUND_KIND,
279            value_order: O::ORDERING_KIND,
280            value_retry: R::RETRIES_KIND,
281            key_type: stageleft::quote_type::<K>().into(),
282            value_type: stageleft::quote_type::<V>().into(),
283        }
284    }
285
286    /// Returns the [`Location`] where this keyed stream is being materialized.
287    pub fn location(&self) -> &L {
288        &self.location
289    }
290
291    /// Weakens the consistency of this live collection to not guarantee any consistency across
292    /// cluster members (if this collection is on a cluster).
293    pub fn weaken_consistency(self) -> KeyedStream<K, V, L::DropConsistency, B, O, R>
294    where
295        L: Location<'a>,
296    {
297        if L::consistency()
298            .is_none_or(|c| c == crate::location::dynamic::ClusterConsistency::NoConsistency)
299        {
300            // already no consistency
301            KeyedStream::new(
302                self.location.drop_consistency(),
303                self.ir_node.replace(HydroNode::Placeholder),
304            )
305        } else {
306            KeyedStream::new(
307                self.location.drop_consistency(),
308                HydroNode::Cast {
309                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
310                    metadata: self
311                        .location
312                        .drop_consistency()
313                        .new_node_metadata(
314                            KeyedStream::<K, V, L::DropConsistency, B>::collection_kind(),
315                        ),
316                },
317            )
318        }
319    }
320
321    /// Casts this live collection to have the consistency guarantees specified in the given
322    /// location type parameter. The developer must ensure that the strengthened consistency
323    /// is actually guaranteed, via the proof field (see [`crate::prelude::manual_proof`]).
324    pub fn assert_has_consistency_of<L2: Location<'a, DropConsistency = L::DropConsistency>>(
325        self,
326        _proof: impl crate::properties::ConsistencyProof,
327    ) -> KeyedStream<K, V, L2, B, O, R>
328    where
329        L: Location<'a>,
330    {
331        if L::consistency() == L2::consistency() {
332            KeyedStream::new(
333                self.location.with_consistency_of(),
334                self.ir_node.replace(HydroNode::Placeholder),
335            )
336        } else {
337            KeyedStream::new(
338                self.location.with_consistency_of(),
339                HydroNode::AssertIsConsistent {
340                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
341                    trusted: false,
342                    metadata: self
343                        .location
344                        .clone()
345                        .with_consistency_of::<L2>()
346                        .new_node_metadata(KeyedStream::<K, V, L2, B, O, R>::collection_kind()),
347                },
348            )
349        }
350    }
351
352    pub(crate) fn assert_has_consistency_of_trusted<
353        L2: Location<'a, DropConsistency = L::DropConsistency>,
354    >(
355        self,
356        _proof: impl crate::properties::ConsistencyProof,
357    ) -> KeyedStream<K, V, L2, B, O, R>
358    where
359        L: Location<'a>,
360    {
361        if L::consistency() == L2::consistency() {
362            KeyedStream::new(
363                self.location.with_consistency_of(),
364                self.ir_node.replace(HydroNode::Placeholder),
365            )
366        } else {
367            KeyedStream::new(
368                self.location.with_consistency_of(),
369                HydroNode::AssertIsConsistent {
370                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
371                    trusted: true,
372                    metadata: self
373                        .location
374                        .clone()
375                        .with_consistency_of::<L2>()
376                        .new_node_metadata(KeyedStream::<K, V, L2, B, O, R>::collection_kind()),
377                },
378            )
379        }
380    }
381
382    /// Turns this [`KeyedStream`] into a [`Stream`] preserving ordering, under the invariant
383    /// assumption that there is at most one key. If this invariant is broken, the program
384    /// may exhibit undefined behavior, so uses must be carefully vetted.
385    pub(crate) fn cast_at_most_one_key(self) -> Stream<(K, V), L, B, O, R> {
386        Stream::new(
387            self.location.clone(),
388            HydroNode::Cast {
389                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
390                metadata: self
391                    .location
392                    .new_node_metadata(Stream::<(K, V), L, B, O, R>::collection_kind()),
393            },
394        )
395    }
396
397    /// Turns this [`KeyedStream`] into a [`KeyedSingleton`], under the invariant assumption that
398    /// there is at most one entry per key. If this invariant is broken, the program may exhibit
399    /// undefined behavior, so uses must be carefully vetted.
400    pub(crate) fn cast_at_most_one_entry_per_key(
401        self,
402    ) -> KeyedSingleton<K, V, L, B::WithBoundedValue> {
403        KeyedSingleton::new(
404            self.location.clone(),
405            HydroNode::Cast {
406                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
407                metadata: self.location.new_node_metadata(KeyedSingleton::<
408                    K,
409                    V,
410                    L,
411                    B::WithBoundedValue,
412                >::collection_kind()),
413            },
414        )
415    }
416
417    pub(crate) fn use_ordering_type<O2: Ordering>(self) -> KeyedStream<K, V, L, B, O2, R> {
418        if O::ORDERING_KIND == O2::ORDERING_KIND {
419            KeyedStream::new(
420                self.location.clone(),
421                self.ir_node.replace(HydroNode::Placeholder),
422            )
423        } else {
424            panic!(
425                "Runtime ordering {:?} did not match requested cast {:?}.",
426                O::ORDERING_KIND,
427                O2::ORDERING_KIND
428            )
429        }
430    }
431
432    /// Explicitly "casts" the keyed stream to a type with a different ordering
433    /// guarantee for each group. Useful in unsafe code where the ordering cannot be proven
434    /// by the type-system.
435    ///
436    /// # Non-Determinism
437    /// This function is used as an escape hatch, and any mistakes in the
438    /// provided ordering guarantee will propagate into the guarantees
439    /// for the rest of the program.
440    pub fn assume_ordering<O2: Ordering>(
441        self,
442        _nondet: NonDet,
443    ) -> KeyedStream<K, V, L::DropConsistency, B, O2, R> {
444        if O::ORDERING_KIND == O2::ORDERING_KIND {
445            self.use_ordering_type().weaken_consistency()
446        } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
447            // We can always weaken the ordering guarantee
448            let target_location = self.location.drop_consistency();
449            KeyedStream::new(
450                target_location.clone(),
451                HydroNode::Cast {
452                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
453                    metadata: target_location
454                        .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
455                },
456            )
457        } else {
458            let target_location = self.location.drop_consistency();
459            KeyedStream::new(
460                target_location.clone(),
461                HydroNode::ObserveNonDet {
462                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
463                    trusted: false,
464                    metadata: target_location
465                        .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
466                },
467            )
468        }
469    }
470
471    fn assume_ordering_trusted<O2: Ordering>(
472        self,
473        _nondet: NonDet,
474    ) -> KeyedStream<K, V, L, B, O2, R> {
475        if O::ORDERING_KIND == O2::ORDERING_KIND {
476            KeyedStream::new(
477                self.location.clone(),
478                self.ir_node.replace(HydroNode::Placeholder),
479            )
480        } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
481            // We can always weaken the ordering guarantee
482            KeyedStream::new(
483                self.location.clone(),
484                HydroNode::Cast {
485                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
486                    metadata: self
487                        .location
488                        .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
489                },
490            )
491        } else {
492            KeyedStream::new(
493                self.location.clone(),
494                HydroNode::ObserveNonDet {
495                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
496                    trusted: true,
497                    metadata: self
498                        .location
499                        .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
500                },
501            )
502        }
503    }
504
505    #[deprecated = "use `weaken_ordering::<NoOrder>()` instead"]
506    /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
507    /// which is always safe because that is the weakest possible guarantee.
508    pub fn weakest_ordering(self) -> KeyedStream<K, V, L, B, NoOrder, R> {
509        self.weaken_ordering::<NoOrder>()
510    }
511
512    /// Weakens the ordering guarantee provided by the stream to `O2`, with the type-system
513    /// enforcing that `O2` is weaker than the input ordering guarantee.
514    pub fn weaken_ordering<O2: WeakerOrderingThan<O>>(self) -> KeyedStream<K, V, L, B, O2, R> {
515        let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
516        self.assume_ordering_trusted::<O2>(nondet)
517    }
518
519    /// Strengthens the ordering guarantee to `TotalOrder`, given that `O: IsOrdered`, which
520    /// implies that `O == TotalOrder`.
521    pub fn make_totally_ordered(self) -> KeyedStream<K, V, L, B, TotalOrder, R>
522    where
523        O: IsOrdered,
524    {
525        self.assume_ordering_trusted(nondet!(/** no-op */))
526    }
527
528    /// Explicitly "casts" the keyed stream to a type with a different retries
529    /// guarantee for each group. Useful in unsafe code where the lack of retries cannot
530    /// be proven by the type-system.
531    ///
532    /// # Non-Determinism
533    /// This function is used as an escape hatch, and any mistakes in the
534    /// provided retries guarantee will propagate into the guarantees
535    /// for the rest of the program.
536    pub fn assume_retries<R2: Retries>(
537        self,
538        _nondet: NonDet,
539    ) -> KeyedStream<K, V, L::DropConsistency, B, O, R2> {
540        if R::RETRIES_KIND == R2::RETRIES_KIND {
541            KeyedStream::new(
542                self.location.drop_consistency(),
543                self.ir_node.replace(HydroNode::Placeholder),
544            )
545        } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
546            // We can always weaken the retries guarantee
547            let target_location = self.location.drop_consistency();
548            KeyedStream::new(
549                target_location.clone(),
550                HydroNode::Cast {
551                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
552                    metadata: target_location
553                        .new_node_metadata(KeyedStream::<K, V, L, B, O, R2>::collection_kind()),
554                },
555            )
556        } else {
557            let target_location = self.location.drop_consistency();
558            KeyedStream::new(
559                target_location.clone(),
560                HydroNode::ObserveNonDet {
561                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
562                    trusted: false,
563                    metadata: target_location
564                        .new_node_metadata(KeyedStream::<K, V, L, B, O, R2>::collection_kind()),
565                },
566            )
567        }
568    }
569
570    // only for internal APIs that have been carefully vetted to ensure that the non-determinism
571    // is not observable
572    fn assume_retries_trusted<R2: Retries>(
573        self,
574        _nondet: NonDet,
575    ) -> KeyedStream<K, V, L, B, O, R2> {
576        if R::RETRIES_KIND == R2::RETRIES_KIND {
577            KeyedStream::new(
578                self.location.clone(),
579                self.ir_node.replace(HydroNode::Placeholder),
580            )
581        } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
582            // We can always weaken the retries guarantee
583            KeyedStream::new(
584                self.location.clone(),
585                HydroNode::Cast {
586                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
587                    metadata: self
588                        .location
589                        .new_node_metadata(KeyedStream::<K, V, L, B, O, R2>::collection_kind()),
590                },
591            )
592        } else {
593            KeyedStream::new(
594                self.location.clone(),
595                HydroNode::ObserveNonDet {
596                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
597                    trusted: true,
598                    metadata: self
599                        .location
600                        .new_node_metadata(KeyedStream::<K, V, L, B, O, R2>::collection_kind()),
601                },
602            )
603        }
604    }
605
606    #[deprecated = "use `weaken_retries::<AtLeastOnce>()` instead"]
607    /// Weakens the retries guarantee provided by the stream to [`AtLeastOnce`],
608    /// which is always safe because that is the weakest possible guarantee.
609    pub fn weakest_retries(self) -> KeyedStream<K, V, L, B, O, AtLeastOnce> {
610        self.weaken_retries::<AtLeastOnce>()
611    }
612
613    /// Weakens the retries guarantee provided by the stream to `R2`, with the type-system
614    /// enforcing that `R2` is weaker than the input retries guarantee.
615    pub fn weaken_retries<R2: WeakerRetryThan<R>>(self) -> KeyedStream<K, V, L, B, O, R2> {
616        let nondet = nondet!(/** this is a weaker retries guarantee, so it is safe to assume */);
617        self.assume_retries_trusted::<R2>(nondet)
618    }
619
620    /// Strengthens the retry guarantee to `ExactlyOnce`, given that `R: IsExactlyOnce`, which
621    /// implies that `R == ExactlyOnce`.
622    pub fn make_exactly_once(self) -> KeyedStream<K, V, L, B, O, ExactlyOnce>
623    where
624        R: IsExactlyOnce,
625    {
626        self.assume_retries_trusted(nondet!(/** no-op */))
627    }
628
629    /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
630    /// implies that `B == Bounded`.
631    pub fn make_bounded(self) -> KeyedStream<K, V, L, Bounded, O, R>
632    where
633        B: IsBounded,
634    {
635        self.weaken_boundedness()
636    }
637
638    /// Weakens the boundedness guarantee to an arbitrary boundedness `B2`, given that `B: IsBounded`,
639    /// which implies that `B == Bounded`.
640    pub fn weaken_boundedness<B2: Boundedness>(self) -> KeyedStream<K, V, L, B2, O, R> {
641        if B::BOUNDED == B2::BOUNDED {
642            KeyedStream::new(
643                self.location.clone(),
644                self.ir_node.replace(HydroNode::Placeholder),
645            )
646        } else {
647            // We can always weaken the boundedness
648            KeyedStream::new(
649                self.location.clone(),
650                HydroNode::Cast {
651                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
652                    metadata: self
653                        .location
654                        .new_node_metadata(KeyedStream::<K, V, L, B2, O, R>::collection_kind()),
655                },
656            )
657        }
658    }
659
660    /// Flattens the keyed stream into an unordered stream of key-value pairs.
661    ///
662    /// # Example
663    /// ```rust
664    /// # #[cfg(feature = "deploy")] {
665    /// # use hydro_lang::prelude::*;
666    /// # use futures::StreamExt;
667    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
668    /// process
669    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
670    ///     .into_keyed()
671    ///     .entries()
672    /// # }, |mut stream| async move {
673    /// // (1, 2), (1, 3), (2, 4) in any order
674    /// # let mut results = Vec::new();
675    /// # for _ in 0..3 {
676    /// #     results.push(stream.next().await.unwrap());
677    /// # }
678    /// # results.sort();
679    /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4)]);
680    /// # }));
681    /// # }
682    /// ```
683    pub fn entries(self) -> Stream<(K, V), L, B, NoOrder, R> {
684        Stream::new(
685            self.location.clone(),
686            HydroNode::Cast {
687                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
688                metadata: self
689                    .location
690                    .new_node_metadata(Stream::<(K, V), L, B, NoOrder, R>::collection_kind()),
691            },
692        )
693    }
694
695    /// Flattens the keyed stream into a totally ordered stream of key-value pairs,
696    /// preserving the order of values within each key group but non-deterministically
697    /// interleaving across keys.
698    ///
699    /// Requires the keyed stream to be totally ordered within each group (`O: IsOrdered`).
700    ///
701    /// # Non-Determinism
702    /// The interleaving of entries across different keys is non-deterministic.
703    /// Within each key, the original order is preserved.
704    pub fn entries_partially_ordered(
705        self,
706        _nondet: NonDet,
707    ) -> Stream<(K, V), L::DropConsistency, B, TotalOrder, R>
708    where
709        O: IsOrdered,
710    {
711        let target_location = self.location.drop_consistency();
712        Stream::new(
713            target_location.clone(),
714            HydroNode::ObserveNonDet {
715                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
716                trusted: false,
717                metadata: target_location
718                    .new_node_metadata(Stream::<(K, V), L, B, TotalOrder, R>::collection_kind()),
719            },
720        )
721    }
722
723    /// Flattens the keyed stream into an unordered stream of only the values.
724    ///
725    /// # Example
726    /// ```rust
727    /// # #[cfg(feature = "deploy")] {
728    /// # use hydro_lang::prelude::*;
729    /// # use futures::StreamExt;
730    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
731    /// process
732    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
733    ///     .into_keyed()
734    ///     .values()
735    /// # }, |mut stream| async move {
736    /// // 2, 3, 4 in any order
737    /// # let mut results = Vec::new();
738    /// # for _ in 0..3 {
739    /// #     results.push(stream.next().await.unwrap());
740    /// # }
741    /// # results.sort();
742    /// # assert_eq!(results, vec![2, 3, 4]);
743    /// # }));
744    /// # }
745    /// ```
746    pub fn values(self) -> Stream<V, L, B, NoOrder, R> {
747        self.entries().map(q!(|(_, v)| v))
748    }
749
750    /// Flattens the keyed stream into an unordered stream of just the keys.
751    ///
752    /// # Example
753    /// ```rust
754    /// # #[cfg(feature = "deploy")] {
755    /// # use hydro_lang::prelude::*;
756    /// # use futures::StreamExt;
757    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
758    /// # process
759    /// #     .source_iter(q!(vec![(1, 2), (2, 4), (1, 5)]))
760    /// #     .into_keyed()
761    /// #     .keys()
762    /// # }, |mut stream| async move {
763    /// // 1, 2 in any order
764    /// # let mut results = Vec::new();
765    /// # for _ in 0..2 {
766    /// #     results.push(stream.next().await.unwrap());
767    /// # }
768    /// # results.sort();
769    /// # assert_eq!(results, vec![1, 2]);
770    /// # }));
771    /// # }
772    /// ```
773    pub fn keys(self) -> Stream<K, L, B, NoOrder, ExactlyOnce>
774    where
775        K: Eq + Hash,
776    {
777        self.entries().map(q!(|(k, _)| k)).unique()
778    }
779
780    /// Transforms each value by invoking `f` on each element, with keys staying the same
781    /// after transformation. If you need access to the key, see [`KeyedStream::map_with_key`].
782    ///
783    /// If you do not want to modify the stream and instead only want to view
784    /// each item use [`KeyedStream::inspect`] instead.
785    ///
786    /// # Example
787    /// ```rust
788    /// # #[cfg(feature = "deploy")] {
789    /// # use hydro_lang::prelude::*;
790    /// # use futures::StreamExt;
791    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
792    /// process
793    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
794    ///     .into_keyed()
795    ///     .map(q!(|v| v + 1))
796    /// #   .entries()
797    /// # }, |mut stream| async move {
798    /// // { 1: [3, 4], 2: [5] }
799    /// # let mut results = Vec::new();
800    /// # for _ in 0..3 {
801    /// #     results.push(stream.next().await.unwrap());
802    /// # }
803    /// # results.sort();
804    /// # assert_eq!(results, vec![(1, 3), (1, 4), (2, 5)]);
805    /// # }));
806    /// # }
807    /// ```
808    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, U, L, B, O, R>
809    where
810        F: Fn(V) -> U + 'a,
811    {
812        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
813        let map_f = q!({
814            let orig = f;
815            move |(k, v)| (k, orig(v))
816        })
817        .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
818        .into();
819
820        KeyedStream::new(
821            self.location.clone(),
822            HydroNode::Map {
823                f: map_f,
824                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
825                metadata: self
826                    .location
827                    .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
828            },
829        )
830    }
831
832    /// Transforms each value by invoking `f` on each key-value pair. The resulting values are **not**
833    /// re-grouped even they are tuples; instead they will be grouped under the original key.
834    ///
835    /// If you do not want to modify the stream and instead only want to view
836    /// each item use [`KeyedStream::inspect_with_key`] instead.
837    ///
838    /// # Example
839    /// ```rust
840    /// # #[cfg(feature = "deploy")] {
841    /// # use hydro_lang::prelude::*;
842    /// # use futures::StreamExt;
843    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
844    /// process
845    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
846    ///     .into_keyed()
847    ///     .map_with_key(q!(|(k, v)| k + v))
848    /// #   .entries()
849    /// # }, |mut stream| async move {
850    /// // { 1: [3, 4], 2: [6] }
851    /// # let mut results = Vec::new();
852    /// # for _ in 0..3 {
853    /// #     results.push(stream.next().await.unwrap());
854    /// # }
855    /// # results.sort();
856    /// # assert_eq!(results, vec![(1, 3), (1, 4), (2, 6)]);
857    /// # }));
858    /// # }
859    /// ```
860    pub fn map_with_key<U, F>(
861        self,
862        f: impl IntoQuotedMut<'a, F, L> + Copy,
863    ) -> KeyedStream<K, U, L, B, O, R>
864    where
865        F: Fn((K, V)) -> U + 'a,
866        K: Clone,
867    {
868        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
869        let map_f = q!({
870            let orig = f;
871            move |(k, v)| {
872                let out = orig((Clone::clone(&k), v));
873                (k, out)
874            }
875        })
876        .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
877        .into();
878
879        KeyedStream::new(
880            self.location.clone(),
881            HydroNode::Map {
882                f: map_f,
883                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
884                metadata: self
885                    .location
886                    .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
887            },
888        )
889    }
890
891    /// Prepends a new value to the key of each element in the stream, producing a new
892    /// keyed stream with compound keys. Because the original key is preserved, no re-grouping
893    /// occurs and the elements in each group preserve their original order.
894    ///
895    /// # Example
896    /// ```rust
897    /// # #[cfg(feature = "deploy")] {
898    /// # use hydro_lang::prelude::*;
899    /// # use futures::StreamExt;
900    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
901    /// process
902    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
903    ///     .into_keyed()
904    ///     .prefix_key(q!(|&(k, _)| k % 2))
905    /// #   .entries()
906    /// # }, |mut stream| async move {
907    /// // { (1, 1): [2, 3], (0, 2): [4] }
908    /// # let mut results = Vec::new();
909    /// # for _ in 0..3 {
910    /// #     results.push(stream.next().await.unwrap());
911    /// # }
912    /// # results.sort();
913    /// # assert_eq!(results, vec![((0, 2), 4), ((1, 1), 2), ((1, 1), 3)]);
914    /// # }));
915    /// # }
916    /// ```
917    pub fn prefix_key<K2, F>(
918        self,
919        f: impl IntoQuotedMut<'a, F, L> + Copy,
920    ) -> KeyedStream<(K2, K), V, L, B, O, R>
921    where
922        F: Fn(&(K, V)) -> K2 + 'a,
923    {
924        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
925        let map_f = q!({
926            let orig = f;
927            move |kv| {
928                let out = orig(&kv);
929                ((out, kv.0), kv.1)
930            }
931        })
932        .splice_fn1_ctx::<(K, V), ((K2, K), V)>(&self.location)
933        .into();
934
935        KeyedStream::new(
936            self.location.clone(),
937            HydroNode::Map {
938                f: map_f,
939                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
940                metadata: self
941                    .location
942                    .new_node_metadata(KeyedStream::<(K2, K), V, L, B, O, R>::collection_kind()),
943            },
944        )
945    }
946
947    /// Creates a stream containing only the elements of each group stream that satisfy a predicate
948    /// `f`, preserving the order of the elements within the group.
949    ///
950    /// The closure `f` receives a reference `&V` rather than an owned value `v` because filtering does
951    /// not modify or take ownership of the values. If you need to modify the values while filtering
952    /// use [`KeyedStream::filter_map`] instead.
953    ///
954    /// # Example
955    /// ```rust
956    /// # #[cfg(feature = "deploy")] {
957    /// # use hydro_lang::prelude::*;
958    /// # use futures::StreamExt;
959    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
960    /// process
961    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
962    ///     .into_keyed()
963    ///     .filter(q!(|&x| x > 2))
964    /// #   .entries()
965    /// # }, |mut stream| async move {
966    /// // { 1: [3], 2: [4] }
967    /// # let mut results = Vec::new();
968    /// # for _ in 0..2 {
969    /// #     results.push(stream.next().await.unwrap());
970    /// # }
971    /// # results.sort();
972    /// # assert_eq!(results, vec![(1, 3), (2, 4)]);
973    /// # }));
974    /// # }
975    /// ```
976    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, V, L, B, O, R>
977    where
978        F: Fn(&V) -> bool + 'a,
979    {
980        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
981        let filter_f = q!({
982            let orig = f;
983            move |t: &(_, _)| orig(&t.1)
984        })
985        .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
986        .into();
987
988        KeyedStream::new(
989            self.location.clone(),
990            HydroNode::Filter {
991                f: filter_f,
992                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
993                metadata: self.location.new_node_metadata(Self::collection_kind()),
994            },
995        )
996    }
997
998    /// Creates a stream containing only the elements of each group stream that satisfy a predicate
999    /// `f` (which receives the key-value tuple), preserving the order of the elements within the group.
1000    ///
1001    /// The closure `f` receives a reference `&(K, V)` rather than an owned value `(K, V)` because filtering does
1002    /// not modify or take ownership of the values. If you need to modify the values while filtering
1003    /// use [`KeyedStream::filter_map_with_key`] instead.
1004    ///
1005    /// # Example
1006    /// ```rust
1007    /// # #[cfg(feature = "deploy")] {
1008    /// # use hydro_lang::prelude::*;
1009    /// # use futures::StreamExt;
1010    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1011    /// process
1012    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
1013    ///     .into_keyed()
1014    ///     .filter_with_key(q!(|&(k, v)| v - k == 2))
1015    /// #   .entries()
1016    /// # }, |mut stream| async move {
1017    /// // { 1: [3], 2: [4] }
1018    /// # let mut results = Vec::new();
1019    /// # for _ in 0..2 {
1020    /// #     results.push(stream.next().await.unwrap());
1021    /// # }
1022    /// # results.sort();
1023    /// # assert_eq!(results, vec![(1, 3), (2, 4)]);
1024    /// # }));
1025    /// # }
1026    /// ```
1027    pub fn filter_with_key<F>(
1028        self,
1029        f: impl IntoQuotedMut<'a, F, L> + Copy,
1030    ) -> KeyedStream<K, V, L, B, O, R>
1031    where
1032        F: Fn(&(K, V)) -> bool + 'a,
1033    {
1034        let filter_f = f
1035            .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
1036            .into();
1037
1038        KeyedStream::new(
1039            self.location.clone(),
1040            HydroNode::Filter {
1041                f: filter_f,
1042                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1043                metadata: self.location.new_node_metadata(Self::collection_kind()),
1044            },
1045        )
1046    }
1047
1048    /// An operator that both filters and maps each value, with keys staying the same.
1049    /// It yields only the items for which the supplied closure `f` returns `Some(value)`.
1050    /// If you need access to the key, see [`KeyedStream::filter_map_with_key`].
1051    ///
1052    /// # Example
1053    /// ```rust
1054    /// # #[cfg(feature = "deploy")] {
1055    /// # use hydro_lang::prelude::*;
1056    /// # use futures::StreamExt;
1057    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1058    /// process
1059    ///     .source_iter(q!(vec![(1, "2"), (1, "hello"), (2, "4")]))
1060    ///     .into_keyed()
1061    ///     .filter_map(q!(|s| s.parse::<usize>().ok()))
1062    /// #   .entries()
1063    /// # }, |mut stream| async move {
1064    /// // { 1: [2], 2: [4] }
1065    /// # let mut results = Vec::new();
1066    /// # for _ in 0..2 {
1067    /// #     results.push(stream.next().await.unwrap());
1068    /// # }
1069    /// # results.sort();
1070    /// # assert_eq!(results, vec![(1, 2), (2, 4)]);
1071    /// # }));
1072    /// # }
1073    /// ```
1074    pub fn filter_map<U, F>(
1075        self,
1076        f: impl IntoQuotedMut<'a, F, L> + Copy,
1077    ) -> KeyedStream<K, U, L, B, O, R>
1078    where
1079        F: Fn(V) -> Option<U> + 'a,
1080    {
1081        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
1082        let filter_map_f = q!({
1083            let orig = f;
1084            move |(k, v)| orig(v).map(|o| (k, o))
1085        })
1086        .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
1087        .into();
1088
1089        KeyedStream::new(
1090            self.location.clone(),
1091            HydroNode::FilterMap {
1092                f: filter_map_f,
1093                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1094                metadata: self
1095                    .location
1096                    .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
1097            },
1098        )
1099    }
1100
1101    /// An operator that both filters and maps each key-value pair. The resulting values are **not**
1102    /// re-grouped even they are tuples; instead they will be grouped under the original key.
1103    /// It yields only the items for which the supplied closure `f` returns `Some(value)`.
1104    ///
1105    /// # Example
1106    /// ```rust
1107    /// # #[cfg(feature = "deploy")] {
1108    /// # use hydro_lang::prelude::*;
1109    /// # use futures::StreamExt;
1110    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1111    /// process
1112    ///     .source_iter(q!(vec![(1, "2"), (1, "hello"), (2, "2")]))
1113    ///     .into_keyed()
1114    ///     .filter_map_with_key(q!(|(k, s)| s.parse::<usize>().ok().filter(|v| v == &k)))
1115    /// #   .entries()
1116    /// # }, |mut stream| async move {
1117    /// // { 2: [2] }
1118    /// # let mut results = Vec::new();
1119    /// # for _ in 0..1 {
1120    /// #     results.push(stream.next().await.unwrap());
1121    /// # }
1122    /// # results.sort();
1123    /// # assert_eq!(results, vec![(2, 2)]);
1124    /// # }));
1125    /// # }
1126    /// ```
1127    pub fn filter_map_with_key<U, F>(
1128        self,
1129        f: impl IntoQuotedMut<'a, F, L> + Copy,
1130    ) -> KeyedStream<K, U, L, B, O, R>
1131    where
1132        F: Fn((K, V)) -> Option<U> + 'a,
1133        K: Clone,
1134    {
1135        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
1136        let filter_map_f = q!({
1137            let orig = f;
1138            move |(k, v)| {
1139                let out = orig((Clone::clone(&k), v));
1140                out.map(|o| (k, o))
1141            }
1142        })
1143        .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
1144        .into();
1145
1146        KeyedStream::new(
1147            self.location.clone(),
1148            HydroNode::FilterMap {
1149                f: filter_map_f,
1150                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1151                metadata: self
1152                    .location
1153                    .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
1154            },
1155        )
1156    }
1157
1158    /// Generates a keyed stream that maps each value `v` to a tuple `(v, x)`,
1159    /// where `v` is the value of `other`, a bounded [`super::singleton::Singleton`] or
1160    /// [`Optional`]. If `other` is an empty [`Optional`], no values will be produced.
1161    ///
1162    /// # Example
1163    /// ```rust
1164    /// # #[cfg(feature = "deploy")] {
1165    /// # use hydro_lang::prelude::*;
1166    /// # use futures::StreamExt;
1167    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1168    /// let tick = process.tick();
1169    /// let batch = process
1170    ///   .source_iter(q!(vec![(1, 123), (1, 456), (2, 123)]))
1171    ///   .into_keyed()
1172    ///   .batch(&tick, nondet!(/** test */));
1173    /// let count = batch.clone().entries().count(); // `count()` returns a singleton
1174    /// batch.cross_singleton(count).all_ticks().entries()
1175    /// # }, |mut stream| async move {
1176    /// // { 1: [(123, 3), (456, 3)], 2: [(123, 3)] }
1177    /// # let mut results = Vec::new();
1178    /// # for _ in 0..3 {
1179    /// #     results.push(stream.next().await.unwrap());
1180    /// # }
1181    /// # results.sort();
1182    /// # assert_eq!(results, vec![(1, (123, 3)), (1, (456, 3)), (2, (123, 3))]);
1183    /// # }));
1184    /// # }
1185    /// ```
1186    pub fn cross_singleton<O2>(
1187        self,
1188        other: impl Into<Optional<O2, L, Bounded>>,
1189    ) -> KeyedStream<K, (V, O2), L, B, O, R>
1190    where
1191        O2: Clone,
1192    {
1193        let other: Optional<O2, L, Bounded> = other.into();
1194        check_matching_location(&self.location, &other.location);
1195
1196        Stream::new(
1197            self.location.clone(),
1198            HydroNode::CrossSingleton {
1199                left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1200                right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
1201                metadata: self
1202                    .location
1203                    .new_node_metadata(Stream::<((K, V), O2), L, B, O, R>::collection_kind()),
1204            },
1205        )
1206        .map(q!(|((k, v), o2)| (k, (v, o2))))
1207        .into_keyed()
1208    }
1209
1210    /// For each value `v` in each group, transform `v` using `f` and then treat the
1211    /// result as an [`Iterator`] to produce values one by one within the same group.
1212    /// The implementation for [`Iterator`] for the output type `I` must produce items
1213    /// in a **deterministic** order.
1214    ///
1215    /// For example, `I` could be a `Vec`, but not a `HashSet`. If the order of the items in `I` is
1216    /// not deterministic, use [`KeyedStream::flat_map_unordered`] instead.
1217    ///
1218    /// # Example
1219    /// ```rust
1220    /// # #[cfg(feature = "deploy")] {
1221    /// # use hydro_lang::prelude::*;
1222    /// # use futures::StreamExt;
1223    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1224    /// process
1225    ///     .source_iter(q!(vec![(1, vec![2, 3]), (1, vec![4]), (2, vec![5, 6])]))
1226    ///     .into_keyed()
1227    ///     .flat_map_ordered(q!(|x| x))
1228    /// #   .entries()
1229    /// # }, |mut stream| async move {
1230    /// // { 1: [2, 3, 4], 2: [5, 6] }
1231    /// # let mut results = Vec::new();
1232    /// # for _ in 0..5 {
1233    /// #     results.push(stream.next().await.unwrap());
1234    /// # }
1235    /// # results.sort();
1236    /// # assert_eq!(results, vec![(1, 2), (1, 3), (1, 4), (2, 5), (2, 6)]);
1237    /// # }));
1238    /// # }
1239    /// ```
1240    pub fn flat_map_ordered<U, I, F>(
1241        self,
1242        f: impl IntoQuotedMut<'a, F, L> + Copy,
1243    ) -> KeyedStream<K, U, L, B, O, R>
1244    where
1245        I: IntoIterator<Item = U>,
1246        F: Fn(V) -> I + 'a,
1247        K: Clone,
1248    {
1249        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
1250        let flat_map_f = q!({
1251            let orig = f;
1252            move |(k, v)| orig(v).into_iter().map(move |u| (Clone::clone(&k), u))
1253        })
1254        .splice_fn1_ctx::<(K, V), _>(&self.location)
1255        .into();
1256
1257        KeyedStream::new(
1258            self.location.clone(),
1259            HydroNode::FlatMap {
1260                f: flat_map_f,
1261                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1262                metadata: self
1263                    .location
1264                    .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
1265            },
1266        )
1267    }
1268
1269    /// Like [`KeyedStream::flat_map_ordered`], but allows the implementation of [`Iterator`]
1270    /// for the output type `I` to produce items in any order.
1271    ///
1272    /// # Example
1273    /// ```rust
1274    /// # #[cfg(feature = "deploy")] {
1275    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
1276    /// # use futures::StreamExt;
1277    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
1278    /// process
1279    ///     .source_iter(q!(vec![
1280    ///         (1, std::collections::HashSet::<i32>::from_iter(vec![2, 3])),
1281    ///         (2, std::collections::HashSet::from_iter(vec![4, 5]))
1282    ///     ]))
1283    ///     .into_keyed()
1284    ///     .flat_map_unordered(q!(|x| x))
1285    /// #   .entries()
1286    /// # }, |mut stream| async move {
1287    /// // { 1: [2, 3], 2: [4, 5] } with values in each group in unknown order
1288    /// # let mut results = Vec::new();
1289    /// # for _ in 0..4 {
1290    /// #     results.push(stream.next().await.unwrap());
1291    /// # }
1292    /// # results.sort();
1293    /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4), (2, 5)]);
1294    /// # }));
1295    /// # }
1296    /// ```
1297    pub fn flat_map_unordered<U, I, F>(
1298        self,
1299        f: impl IntoQuotedMut<'a, F, L> + Copy,
1300    ) -> KeyedStream<K, U, L, B, NoOrder, R>
1301    where
1302        I: IntoIterator<Item = U>,
1303        F: Fn(V) -> I + 'a,
1304        K: Clone,
1305    {
1306        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
1307        let flat_map_f = q!({
1308            let orig = f;
1309            move |(k, v)| orig(v).into_iter().map(move |u| (Clone::clone(&k), u))
1310        })
1311        .splice_fn1_ctx::<(K, V), _>(&self.location)
1312        .into();
1313
1314        KeyedStream::new(
1315            self.location.clone(),
1316            HydroNode::FlatMap {
1317                f: flat_map_f,
1318                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1319                metadata: self
1320                    .location
1321                    .new_node_metadata(KeyedStream::<K, U, L, B, NoOrder, R>::collection_kind()),
1322            },
1323        )
1324    }
1325
1326    /// For each value `v` in each group, treat `v` as an [`Iterator`] and produce its items one by one
1327    /// within the same group. The implementation for [`Iterator`] for the value type `V` must produce
1328    /// items in a **deterministic** order.
1329    ///
1330    /// For example, `V` could be a `Vec`, but not a `HashSet`. If the order of the items in `V` is
1331    /// not deterministic, use [`KeyedStream::flatten_unordered`] instead.
1332    ///
1333    /// # Example
1334    /// ```rust
1335    /// # #[cfg(feature = "deploy")] {
1336    /// # use hydro_lang::prelude::*;
1337    /// # use futures::StreamExt;
1338    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1339    /// process
1340    ///     .source_iter(q!(vec![(1, vec![2, 3]), (1, vec![4]), (2, vec![5, 6])]))
1341    ///     .into_keyed()
1342    ///     .flatten_ordered()
1343    /// #   .entries()
1344    /// # }, |mut stream| async move {
1345    /// // { 1: [2, 3, 4], 2: [5, 6] }
1346    /// # let mut results = Vec::new();
1347    /// # for _ in 0..5 {
1348    /// #     results.push(stream.next().await.unwrap());
1349    /// # }
1350    /// # results.sort();
1351    /// # assert_eq!(results, vec![(1, 2), (1, 3), (1, 4), (2, 5), (2, 6)]);
1352    /// # }));
1353    /// # }
1354    /// ```
1355    pub fn flatten_ordered<U>(self) -> KeyedStream<K, U, L, B, O, R>
1356    where
1357        V: IntoIterator<Item = U>,
1358        K: Clone,
1359    {
1360        self.flat_map_ordered(q!(|d| d))
1361    }
1362
1363    /// Like [`KeyedStream::flatten_ordered`], but allows the implementation of [`Iterator`]
1364    /// for the value type `V` to produce items in any order.
1365    ///
1366    /// # Example
1367    /// ```rust
1368    /// # #[cfg(feature = "deploy")] {
1369    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
1370    /// # use futures::StreamExt;
1371    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
1372    /// process
1373    ///     .source_iter(q!(vec![
1374    ///         (1, std::collections::HashSet::<i32>::from_iter(vec![2, 3])),
1375    ///         (2, std::collections::HashSet::from_iter(vec![4, 5]))
1376    ///     ]))
1377    ///     .into_keyed()
1378    ///     .flatten_unordered()
1379    /// #   .entries()
1380    /// # }, |mut stream| async move {
1381    /// // { 1: [2, 3], 2: [4, 5] } with values in each group in unknown order
1382    /// # let mut results = Vec::new();
1383    /// # for _ in 0..4 {
1384    /// #     results.push(stream.next().await.unwrap());
1385    /// # }
1386    /// # results.sort();
1387    /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4), (2, 5)]);
1388    /// # }));
1389    /// # }
1390    /// ```
1391    pub fn flatten_unordered<U>(self) -> KeyedStream<K, U, L, B, NoOrder, R>
1392    where
1393        V: IntoIterator<Item = U>,
1394        K: Clone,
1395    {
1396        self.flat_map_unordered(q!(|d| d))
1397    }
1398
1399    /// An operator which allows you to "inspect" each element of a stream without
1400    /// modifying it. The closure `f` is called on a reference to each value. This is
1401    /// mainly useful for debugging, and should not be used to generate side-effects.
1402    ///
1403    /// # Example
1404    /// ```rust
1405    /// # #[cfg(feature = "deploy")] {
1406    /// # use hydro_lang::prelude::*;
1407    /// # use futures::StreamExt;
1408    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1409    /// process
1410    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
1411    ///     .into_keyed()
1412    ///     .inspect(q!(|v| println!("{}", v)))
1413    /// #   .entries()
1414    /// # }, |mut stream| async move {
1415    /// # let mut results = Vec::new();
1416    /// # for _ in 0..3 {
1417    /// #     results.push(stream.next().await.unwrap());
1418    /// # }
1419    /// # results.sort();
1420    /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4)]);
1421    /// # }));
1422    /// # }
1423    /// ```
1424    pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> Self
1425    where
1426        F: Fn(&V) + 'a,
1427    {
1428        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
1429        let inspect_f = q!({
1430            let orig = f;
1431            move |t: &(_, _)| orig(&t.1)
1432        })
1433        .splice_fn1_borrow_ctx::<(K, V), ()>(&self.location)
1434        .into();
1435
1436        KeyedStream::new(
1437            self.location.clone(),
1438            HydroNode::Inspect {
1439                f: inspect_f,
1440                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1441                metadata: self.location.new_node_metadata(Self::collection_kind()),
1442            },
1443        )
1444    }
1445
1446    /// An operator which allows you to "inspect" each element of a stream without
1447    /// modifying it. The closure `f` is called on a reference to each key-value pair. This is
1448    /// mainly useful for debugging, and should not be used to generate side-effects.
1449    ///
1450    /// # Example
1451    /// ```rust
1452    /// # #[cfg(feature = "deploy")] {
1453    /// # use hydro_lang::prelude::*;
1454    /// # use futures::StreamExt;
1455    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1456    /// process
1457    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
1458    ///     .into_keyed()
1459    ///     .inspect_with_key(q!(|(k, v)| println!("{}: {}", k, v)))
1460    /// #   .entries()
1461    /// # }, |mut stream| async move {
1462    /// # let mut results = Vec::new();
1463    /// # for _ in 0..3 {
1464    /// #     results.push(stream.next().await.unwrap());
1465    /// # }
1466    /// # results.sort();
1467    /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4)]);
1468    /// # }));
1469    /// # }
1470    /// ```
1471    pub fn inspect_with_key<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
1472    where
1473        F: Fn(&(K, V)) + 'a,
1474    {
1475        let inspect_f = f.splice_fn1_borrow_ctx::<(K, V), ()>(&self.location).into();
1476
1477        KeyedStream::new(
1478            self.location.clone(),
1479            HydroNode::Inspect {
1480                f: inspect_f,
1481                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1482                metadata: self.location.new_node_metadata(Self::collection_kind()),
1483            },
1484        )
1485    }
1486
1487    /// An operator which allows you to "name" a `HydroNode`.
1488    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
1489    pub fn ir_node_named(self, name: &str) -> KeyedStream<K, V, L, B, O, R> {
1490        {
1491            let mut node = self.ir_node.borrow_mut();
1492            let metadata = node.metadata_mut();
1493            metadata.tag = Some(name.to_owned());
1494        }
1495        self
1496    }
1497
1498    /// A special case of [`Stream::scan`] for keyed streams. For each key group the values are transformed via the `f` combinator.
1499    ///
1500    /// Unlike [`KeyedStream::fold`] which only returns the final accumulated value, `scan` produces a new stream
1501    /// containing all intermediate accumulated values paired with the key. The scan operation can also terminate
1502    /// early by returning `None`.
1503    ///
1504    /// The function takes a mutable reference to the accumulator and the current element, and returns
1505    /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1506    /// If the function returns `None`, the stream is terminated and no more elements are processed.
1507    ///
1508    /// # Example
1509    /// ```rust
1510    /// # #[cfg(feature = "deploy")] {
1511    /// # use hydro_lang::prelude::*;
1512    /// # use futures::StreamExt;
1513    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1514    /// process
1515    ///     .source_iter(q!(vec![(0, 1), (0, 3), (1, 3), (1, 4)]))
1516    ///     .into_keyed()
1517    ///     .scan(
1518    ///         q!(|| 0),
1519    ///         q!(|acc, x| {
1520    ///             *acc += x;
1521    ///             if *acc % 2 == 0 { None } else { Some(*acc) }
1522    ///         }),
1523    ///     )
1524    /// #   .entries()
1525    /// # }, |mut stream| async move {
1526    /// // Output: { 0: [1], 1: [3, 7] }
1527    /// # let mut results = Vec::new();
1528    /// # for _ in 0..3 {
1529    /// #     results.push(stream.next().await.unwrap());
1530    /// # }
1531    /// # results.sort();
1532    /// # assert_eq!(results, vec![(0, 1), (1, 3), (1, 7)]);
1533    /// # }));
1534    /// # }
1535    /// ```
1536    pub fn scan<A, U, I, F>(
1537        self,
1538        init: impl IntoQuotedMut<'a, I, L> + Copy,
1539        f: impl IntoQuotedMut<'a, F, L> + Copy,
1540    ) -> KeyedStream<K, U, L, B, TotalOrder, ExactlyOnce>
1541    where
1542        O: IsOrdered,
1543        R: IsExactlyOnce,
1544        K: Clone + Eq + Hash,
1545        I: Fn() -> A + 'a,
1546        F: Fn(&mut A, V) -> Option<U> + 'a,
1547    {
1548        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1549        self.make_totally_ordered().make_exactly_once().generator(
1550            init,
1551            q!({
1552                let orig = f;
1553                move |state, v| {
1554                    if let Some(out) = orig(state, v) {
1555                        Generate::Yield(out)
1556                    } else {
1557                        Generate::Break
1558                    }
1559                }
1560            }),
1561        )
1562    }
1563
1564    /// Iteratively processes the elements in each group using a state machine that can yield
1565    /// elements as it processes its inputs. This is designed to mirror the unstable generator
1566    /// syntax in Rust, without requiring special syntax.
1567    ///
1568    /// Like [`KeyedStream::scan`], this function takes in an initializer that emits the initial
1569    /// state for each group. The second argument defines the processing logic, taking in a
1570    /// mutable reference to the group's state and the value to be processed. It emits a
1571    /// [`Generate`] value, whose variants define what is emitted and whether further inputs
1572    /// should be processed.
1573    ///
1574    /// # Example
1575    /// ```rust
1576    /// # #[cfg(feature = "deploy")] {
1577    /// # use hydro_lang::prelude::*;
1578    /// # use futures::StreamExt;
1579    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1580    /// process
1581    ///     .source_iter(q!(vec![(0, 1), (0, 3), (0, 100), (0, 10), (1, 3), (1, 4), (1, 3)]))
1582    ///     .into_keyed()
1583    ///     .generator(
1584    ///         q!(|| 0),
1585    ///         q!(|acc, x| {
1586    ///             *acc += x;
1587    ///             if *acc > 100 {
1588    ///                 hydro_lang::live_collections::keyed_stream::Generate::Return(
1589    ///                     "done!".to_owned()
1590    ///                 )
1591    ///             } else if *acc % 2 == 0 {
1592    ///                 hydro_lang::live_collections::keyed_stream::Generate::Yield(
1593    ///                     "even".to_owned()
1594    ///                 )
1595    ///             } else {
1596    ///                 hydro_lang::live_collections::keyed_stream::Generate::Continue
1597    ///             }
1598    ///         }),
1599    ///     )
1600    /// #   .entries()
1601    /// # }, |mut stream| async move {
1602    /// // Output: { 0: ["even", "done!"], 1: ["even"] }
1603    /// # let mut results = Vec::new();
1604    /// # for _ in 0..3 {
1605    /// #     results.push(stream.next().await.unwrap());
1606    /// # }
1607    /// # results.sort();
1608    /// # assert_eq!(results, vec![(0, "done!".to_owned()), (0, "even".to_owned()), (1, "even".to_owned())]);
1609    /// # }));
1610    /// # }
1611    /// ```
1612    pub fn generator<A, U, I, F>(
1613        self,
1614        init: impl IntoQuotedMut<'a, I, L> + Copy,
1615        f: impl IntoQuotedMut<'a, F, L> + Copy,
1616    ) -> KeyedStream<K, U, L, B, TotalOrder, ExactlyOnce>
1617    where
1618        O: IsOrdered,
1619        R: IsExactlyOnce,
1620        K: Clone + Eq + Hash,
1621        I: Fn() -> A + 'a,
1622        F: Fn(&mut A, V) -> Generate<U> + 'a,
1623    {
1624        let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1625        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1626
1627        let this = self.make_totally_ordered().make_exactly_once();
1628
1629        let scan_init = q!(|| HashMap::new())
1630            .splice_fn0_ctx::<HashMap<K, Option<A>>>(&this.location)
1631            .into();
1632        let scan_f = q!(move |acc: &mut HashMap<_, _>, (k, v)| {
1633            let existing_state = acc.entry(Clone::clone(&k)).or_insert_with(|| Some(init()));
1634            if let Some(existing_state_value) = existing_state {
1635                match f(existing_state_value, v) {
1636                    Generate::Yield(out) => Some(Some((k, out))),
1637                    Generate::Return(out) => {
1638                        let _ = existing_state.take(); // TODO(shadaj): garbage collect with termination markers
1639                        Some(Some((k, out)))
1640                    }
1641                    Generate::Break => {
1642                        let _ = existing_state.take(); // TODO(shadaj): garbage collect with termination markers
1643                        Some(None)
1644                    }
1645                    Generate::Continue => Some(None),
1646                }
1647            } else {
1648                Some(None)
1649            }
1650        })
1651        .splice_fn2_borrow_mut_ctx::<HashMap<K, Option<A>>, (K, V), _>(&this.location)
1652        .into();
1653
1654        let scan_node = HydroNode::Scan {
1655            init: scan_init,
1656            acc: scan_f,
1657            input: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
1658            metadata: this.location.new_node_metadata(Stream::<
1659                Option<(K, U)>,
1660                L,
1661                B,
1662                TotalOrder,
1663                ExactlyOnce,
1664            >::collection_kind()),
1665        };
1666
1667        let flatten_f = q!(|d| d)
1668            .splice_fn1_ctx::<Option<(K, U)>, _>(&this.location)
1669            .into();
1670        let flatten_node = HydroNode::FlatMap {
1671            f: flatten_f,
1672            input: Box::new(scan_node),
1673            metadata: this.location.new_node_metadata(KeyedStream::<
1674                K,
1675                U,
1676                L,
1677                B,
1678                TotalOrder,
1679                ExactlyOnce,
1680            >::collection_kind()),
1681        };
1682
1683        KeyedStream::new(this.location.clone(), flatten_node)
1684    }
1685
1686    /// A variant of [`Stream::fold`], intended for keyed streams. The aggregation is executed
1687    /// in-order across the values in each group. But the aggregation function returns a boolean,
1688    /// which when true indicates that the aggregated result is complete and can be released to
1689    /// downstream computation. Unlike [`KeyedStream::fold`], this means that even if the input
1690    /// stream is [`super::boundedness::Unbounded`], the outputs of the fold can be processed like
1691    /// normal stream elements.
1692    ///
1693    /// # Example
1694    /// ```rust
1695    /// # #[cfg(feature = "deploy")] {
1696    /// # use hydro_lang::prelude::*;
1697    /// # use futures::StreamExt;
1698    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1699    /// process
1700    ///     .source_iter(q!(vec![(0, 2), (0, 3), (1, 3), (1, 6)]))
1701    ///     .into_keyed()
1702    ///     .fold_early_stop(
1703    ///         q!(|| 0),
1704    ///         q!(|acc, x| {
1705    ///             *acc += x;
1706    ///             x % 2 == 0
1707    ///         }),
1708    ///     )
1709    /// #   .entries()
1710    /// # }, |mut stream| async move {
1711    /// // Output: { 0: 2, 1: 9 }
1712    /// # let mut results = Vec::new();
1713    /// # for _ in 0..2 {
1714    /// #     results.push(stream.next().await.unwrap());
1715    /// # }
1716    /// # results.sort();
1717    /// # assert_eq!(results, vec![(0, 2), (1, 9)]);
1718    /// # }));
1719    /// # }
1720    /// ```
1721    pub fn fold_early_stop<A, I, F>(
1722        self,
1723        init: impl IntoQuotedMut<'a, I, L> + Copy,
1724        f: impl IntoQuotedMut<'a, F, L> + Copy,
1725    ) -> KeyedSingleton<K, A, L, B::WithBoundedValue>
1726    where
1727        O: IsOrdered,
1728        R: IsExactlyOnce,
1729        K: Clone + Eq + Hash,
1730        I: Fn() -> A + 'a,
1731        F: Fn(&mut A, V) -> bool + 'a,
1732    {
1733        let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1734        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1735        let out_without_bound_cast = self.generator(
1736            q!(move || Some(init())),
1737            q!(move |key_state, v| {
1738                if let Some(key_state_value) = key_state.as_mut() {
1739                    if f(key_state_value, v) {
1740                        Generate::Return(key_state.take().unwrap())
1741                    } else {
1742                        Generate::Continue
1743                    }
1744                } else {
1745                    unreachable!()
1746                }
1747            }),
1748        );
1749
1750        // SAFETY: The generator will only ever return at most one value per key, since once it
1751        // returns a value for a key it will never process any more values for that key.
1752        out_without_bound_cast.cast_at_most_one_entry_per_key()
1753    }
1754
1755    /// Gets the first element inside each group of values as a [`KeyedSingleton`] that preserves
1756    /// the original group keys. Requires the input stream to have [`TotalOrder`] guarantees,
1757    /// otherwise the first element would be non-deterministic.
1758    ///
1759    /// # Example
1760    /// ```rust
1761    /// # #[cfg(feature = "deploy")] {
1762    /// # use hydro_lang::prelude::*;
1763    /// # use futures::StreamExt;
1764    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1765    /// process
1766    ///     .source_iter(q!(vec![(0, 2), (0, 3), (1, 3), (1, 6)]))
1767    ///     .into_keyed()
1768    ///     .first()
1769    /// #   .entries()
1770    /// # }, |mut stream| async move {
1771    /// // Output: { 0: 2, 1: 3 }
1772    /// # let mut results = Vec::new();
1773    /// # for _ in 0..2 {
1774    /// #     results.push(stream.next().await.unwrap());
1775    /// # }
1776    /// # results.sort();
1777    /// # assert_eq!(results, vec![(0, 2), (1, 3)]);
1778    /// # }));
1779    /// # }
1780    /// ```
1781    pub fn first(self) -> KeyedSingleton<K, V, L, B::WithBoundedValue>
1782    where
1783        O: IsOrdered,
1784        R: IsExactlyOnce,
1785        K: Clone + Eq + Hash,
1786    {
1787        self.fold_early_stop(
1788            q!(|| None),
1789            q!(|acc, v| {
1790                *acc = Some(v);
1791                true
1792            }),
1793        )
1794        .map(q!(|v| v.unwrap()))
1795    }
1796
1797    /// Returns a keyed stream containing at most the first `n` values per key,
1798    /// preserving the original order within each group. Similar to SQL `LIMIT`
1799    /// applied per group.
1800    ///
1801    /// This requires the stream to have a [`TotalOrder`] guarantee and [`ExactlyOnce`]
1802    /// retries, since the result depends on the order and cardinality of elements
1803    /// within each group.
1804    ///
1805    /// # Example
1806    /// ```rust
1807    /// # #[cfg(feature = "deploy")] {
1808    /// # use hydro_lang::prelude::*;
1809    /// # use futures::StreamExt;
1810    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1811    /// process
1812    ///     .source_iter(q!(vec![(1, 10), (1, 20), (1, 30), (2, 40), (2, 50)]))
1813    ///     .into_keyed()
1814    ///     .limit(q!(2))
1815    /// #   .entries()
1816    /// # }, |mut stream| async move {
1817    /// // { 1: [10, 20], 2: [40, 50] }
1818    /// # let mut results = Vec::new();
1819    /// # for _ in 0..4 {
1820    /// #     results.push(stream.next().await.unwrap());
1821    /// # }
1822    /// # results.sort();
1823    /// # assert_eq!(results, vec![(1, 10), (1, 20), (2, 40), (2, 50)]);
1824    /// # }));
1825    /// # }
1826    /// ```
1827    pub fn limit(
1828        self,
1829        n: impl QuotedWithContext<'a, usize, L> + Copy + 'a,
1830    ) -> KeyedStream<K, V, L, B, TotalOrder, ExactlyOnce>
1831    where
1832        O: IsOrdered,
1833        R: IsExactlyOnce,
1834        K: Clone + Eq + Hash,
1835    {
1836        self.generator(
1837            q!(|| 0usize),
1838            q!(move |count, item| {
1839                if *count == n {
1840                    Generate::Break
1841                } else {
1842                    *count += 1;
1843                    if *count == n {
1844                        Generate::Return(item)
1845                    } else {
1846                        Generate::Yield(item)
1847                    }
1848                }
1849            }),
1850        )
1851    }
1852
1853    /// Assigns a zero-based index to each value within each key group, emitting
1854    /// `(K, (index, V))` tuples with per-key sequential indices.
1855    ///
1856    /// The output keyed stream has [`TotalOrder`] and [`ExactlyOnce`] guarantees.
1857    /// This is a streaming operator that processes elements as they arrive.
1858    ///
1859    /// # Example
1860    /// ```rust
1861    /// # #[cfg(feature = "deploy")] {
1862    /// # use hydro_lang::prelude::*;
1863    /// # use futures::StreamExt;
1864    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1865    /// process
1866    ///     .source_iter(q!(vec![(1, 10), (2, 20), (1, 30)]))
1867    ///     .into_keyed()
1868    ///     .enumerate()
1869    /// # .entries()
1870    /// # }, |mut stream| async move {
1871    /// // per-key indices: { 1: [(0, 10), (1, 30)], 2: [(0, 20)] }
1872    /// # let mut results = Vec::new();
1873    /// # for _ in 0..3 {
1874    /// #     results.push(stream.next().await.unwrap());
1875    /// # }
1876    /// # let key1: Vec<_> = results.iter().filter(|(k, _)| *k == 1).map(|(_, v)| *v).collect();
1877    /// # let key2: Vec<_> = results.iter().filter(|(k, _)| *k == 2).map(|(_, v)| *v).collect();
1878    /// # assert_eq!(key1, vec![(0, 10), (1, 30)]);
1879    /// # assert_eq!(key2, vec![(0, 20)]);
1880    /// # }));
1881    /// # }
1882    /// ```
1883    pub fn enumerate(self) -> KeyedStream<K, (usize, V), L, B, TotalOrder, ExactlyOnce>
1884    where
1885        O: IsOrdered,
1886        R: IsExactlyOnce,
1887        K: Eq + Hash + Clone,
1888    {
1889        self.scan(
1890            q!(|| 0),
1891            q!(|acc, next| {
1892                let curr = *acc;
1893                *acc += 1;
1894                Some((curr, next))
1895            }),
1896        )
1897    }
1898
1899    /// Counts the number of elements in each group, producing a [`KeyedSingleton`] with the counts.
1900    ///
1901    /// # Example
1902    /// ```rust
1903    /// # #[cfg(feature = "deploy")] {
1904    /// # use hydro_lang::prelude::*;
1905    /// # use futures::StreamExt;
1906    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1907    /// let tick = process.tick();
1908    /// let numbers = process
1909    ///     .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4), (1, 5)]))
1910    ///     .into_keyed();
1911    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1912    /// batch
1913    ///     .value_counts()
1914    ///     .entries()
1915    ///     .all_ticks()
1916    /// # }, |mut stream| async move {
1917    /// // (1, 3), (2, 2)
1918    /// # let mut results = Vec::new();
1919    /// # for _ in 0..2 {
1920    /// #     results.push(stream.next().await.unwrap());
1921    /// # }
1922    /// # results.sort();
1923    /// # assert_eq!(results, vec![(1, 3), (2, 2)]);
1924    /// # }));
1925    /// # }
1926    /// ```
1927    pub fn value_counts(
1928        self,
1929    ) -> KeyedSingleton<K, usize, L, <B as KeyedSingletonBound>::KeyedStreamToMonotone>
1930    where
1931        R: IsExactlyOnce,
1932        K: Eq + Hash,
1933    {
1934        self.make_exactly_once()
1935            .assume_ordering_trusted(
1936                nondet!(/** ordering within each group affects neither result nor intermediates */),
1937            )
1938            .fold(
1939                q!(|| 0),
1940                q!(
1941                    |acc, _| *acc += 1,
1942                    monotone = manual_proof!(/** += 1 is monotonic */)
1943                ),
1944            )
1945    }
1946
1947    /// Like [`Stream::fold`] but in the spirit of SQL `GROUP BY`, aggregates the values in each
1948    /// group via the `comb` closure.
1949    ///
1950    /// Depending on the input stream guarantees, the closure may need to be commutative
1951    /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1952    ///
1953    /// If the input and output value types are the same and do not require initialization then use
1954    /// [`KeyedStream::reduce`].
1955    ///
1956    /// # Example
1957    /// ```rust
1958    /// # #[cfg(feature = "deploy")] {
1959    /// # use hydro_lang::prelude::*;
1960    /// # use futures::StreamExt;
1961    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1962    /// let tick = process.tick();
1963    /// let numbers = process
1964    ///     .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1965    ///     .into_keyed();
1966    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1967    /// batch
1968    ///     .fold(q!(|| false), q!(|acc, x| *acc |= x))
1969    ///     .entries()
1970    ///     .all_ticks()
1971    /// # }, |mut stream| async move {
1972    /// // (1, false), (2, true)
1973    /// # let mut results = Vec::new();
1974    /// # for _ in 0..2 {
1975    /// #     results.push(stream.next().await.unwrap());
1976    /// # }
1977    /// # results.sort();
1978    /// # assert_eq!(results, vec![(1, false), (2, true)]);
1979    /// # }));
1980    /// # }
1981    /// ```
1982    pub fn fold<A, I: Fn() -> A + 'a, F: 'a + Fn(&mut A, V), C, Idemp, M, B2: KeyedSingletonBound>(
1983        self,
1984        init: impl IntoQuotedMut<'a, I, L>,
1985        comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp, M>>,
1986    ) -> KeyedSingleton<K, A, L, B2>
1987    where
1988        K: Eq + Hash,
1989        C: ValidCommutativityFor<O> + crate::properties::IsProved,
1990        Idemp: ValidIdempotenceFor<R> + crate::properties::IsProved,
1991        B: ApplyMonotoneKeyedStream<M, B2>,
1992    {
1993        let init = init.splice_fn0_ctx(&self.location).into();
1994        let (comb, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1995        proof.register_proof(&comb);
1996
1997        let retried = self
1998            .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */));
1999
2000        KeyedSingleton::new(
2001            retried.location.clone(),
2002            HydroNode::FoldKeyed {
2003                init,
2004                acc: comb.into(),
2005                input: Box::new(retried.ir_node.replace(HydroNode::Placeholder)),
2006                is_commutative: C::IS_PROVED,
2007                is_idempotent: Idemp::IS_PROVED,
2008                metadata: retried
2009                    .location
2010                    .new_node_metadata(KeyedSingleton::<K, A, L, B2>::collection_kind()),
2011            },
2012        )
2013        .assert_has_consistency_of(manual_proof!(/** algebraic properties */))
2014    }
2015
2016    /// Like [`Stream::reduce`] but in the spirit of SQL `GROUP BY`, aggregates the values in each
2017    /// group via the `comb` closure.
2018    ///
2019    /// Depending on the input stream guarantees, the closure may need to be commutative
2020    /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
2021    ///
2022    /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold`].
2023    ///
2024    /// # Example
2025    /// ```rust
2026    /// # #[cfg(feature = "deploy")] {
2027    /// # use hydro_lang::prelude::*;
2028    /// # use futures::StreamExt;
2029    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2030    /// let tick = process.tick();
2031    /// let numbers = process
2032    ///     .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
2033    ///     .into_keyed();
2034    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2035    /// batch
2036    ///     .reduce(q!(|acc, x| *acc |= x))
2037    ///     .entries()
2038    ///     .all_ticks()
2039    /// # }, |mut stream| async move {
2040    /// // (1, false), (2, true)
2041    /// # let mut results = Vec::new();
2042    /// # for _ in 0..2 {
2043    /// #     results.push(stream.next().await.unwrap());
2044    /// # }
2045    /// # results.sort();
2046    /// # assert_eq!(results, vec![(1, false), (2, true)]);
2047    /// # }));
2048    /// # }
2049    /// ```
2050    pub fn reduce<F: Fn(&mut V, V) + 'a, C, Idemp>(
2051        self,
2052        comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
2053    ) -> KeyedSingleton<K, V, L, B>
2054    where
2055        K: Eq + Hash,
2056        C: ValidCommutativityFor<O> + crate::properties::IsProved,
2057        Idemp: ValidIdempotenceFor<R> + crate::properties::IsProved,
2058    {
2059        let (f, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
2060        proof.register_proof(&f);
2061
2062        let ordered = self
2063            .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
2064            .assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */));
2065
2066        KeyedSingleton::new(
2067            ordered.location.clone(),
2068            HydroNode::ReduceKeyed {
2069                f: f.into(),
2070                input: Box::new(ordered.ir_node.replace(HydroNode::Placeholder)),
2071                is_commutative: C::IS_PROVED,
2072                is_idempotent: Idemp::IS_PROVED,
2073                metadata: ordered
2074                    .location
2075                    .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
2076            },
2077        )
2078        .assert_has_consistency_of(manual_proof!(/** algebraic properties */))
2079    }
2080
2081    /// A special case of [`KeyedStream::reduce`] where tuples with keys less than the watermark
2082    /// are automatically deleted.
2083    ///
2084    /// Depending on the input stream guarantees, the closure may need to be commutative
2085    /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
2086    ///
2087    /// # Example
2088    /// ```rust
2089    /// # #[cfg(feature = "deploy")] {
2090    /// # use hydro_lang::prelude::*;
2091    /// # use futures::StreamExt;
2092    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2093    /// let tick = process.tick();
2094    /// let watermark = tick.singleton(q!(2));
2095    /// let numbers = process
2096    ///     .source_iter(q!([(0, false), (1, false), (2, false), (2, true)]))
2097    ///     .into_keyed();
2098    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2099    /// batch
2100    ///     .reduce_watermark(watermark, q!(|acc, x| *acc |= x))
2101    ///     .entries()
2102    ///     .all_ticks()
2103    /// # }, |mut stream| async move {
2104    /// // (2, true)
2105    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
2106    /// # }));
2107    /// # }
2108    /// ```
2109    pub fn reduce_watermark<O2, F, C, Idemp>(
2110        self,
2111        other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>,
2112        comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
2113    ) -> KeyedSingleton<K, V, L, B>
2114    where
2115        K: Eq + Hash,
2116        O2: Clone,
2117        F: Fn(&mut V, V) + 'a,
2118        C: ValidCommutativityFor<O> + crate::properties::IsProved,
2119        Idemp: ValidIdempotenceFor<R> + crate::properties::IsProved,
2120    {
2121        let other: Optional<O2, Tick<L::Root>, Bounded> = other.into();
2122        check_matching_location(&self.location.root(), other.location.outer());
2123        let (f, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
2124        proof.register_proof(&f);
2125
2126        let ordered = self
2127            .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
2128            .assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */));
2129
2130        KeyedSingleton::new(
2131            ordered.location.clone(),
2132            HydroNode::ReduceKeyedWatermark {
2133                f: f.into(),
2134                input: Box::new(ordered.ir_node.replace(HydroNode::Placeholder)),
2135                watermark: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2136                is_commutative: C::IS_PROVED,
2137                is_idempotent: Idemp::IS_PROVED,
2138                metadata: ordered
2139                    .location
2140                    .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
2141            },
2142        )
2143        .assert_has_consistency_of(manual_proof!(/** algebraic properties */))
2144    }
2145
2146    /// Given a bounded stream of keys `K`, returns a new keyed stream containing only the groups
2147    /// whose keys are not in the bounded stream.
2148    ///
2149    /// # Example
2150    /// ```rust
2151    /// # #[cfg(feature = "deploy")] {
2152    /// # use hydro_lang::prelude::*;
2153    /// # use futures::StreamExt;
2154    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2155    /// let tick = process.tick();
2156    /// let keyed_stream = process
2157    ///     .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
2158    ///     .batch(&tick, nondet!(/** test */))
2159    ///     .into_keyed();
2160    /// let keys_to_remove = process
2161    ///     .source_iter(q!(vec![1, 2]))
2162    ///     .batch(&tick, nondet!(/** test */));
2163    /// keyed_stream.filter_key_not_in(keys_to_remove).all_ticks()
2164    /// #   .entries()
2165    /// # }, |mut stream| async move {
2166    /// // { 3: ['c'], 4: ['d'] }
2167    /// # let mut results = Vec::new();
2168    /// # for _ in 0..2 {
2169    /// #     results.push(stream.next().await.unwrap());
2170    /// # }
2171    /// # results.sort();
2172    /// # assert_eq!(results, vec![(3, 'c'), (4, 'd')]);
2173    /// # }));
2174    /// # }
2175    /// ```
2176    pub fn filter_key_not_in<O2: Ordering, R2: Retries>(
2177        self,
2178        other: Stream<K, L, Bounded, O2, R2>,
2179    ) -> Self
2180    where
2181        K: Eq + Hash,
2182    {
2183        check_matching_location(&self.location, &other.location);
2184
2185        KeyedStream::new(
2186            self.location.clone(),
2187            HydroNode::AntiJoin {
2188                pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2189                neg: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2190                metadata: self.location.new_node_metadata(Self::collection_kind()),
2191            },
2192        )
2193    }
2194
2195    /// Emit a keyed stream containing keys shared between two keyed streams,
2196    /// where each value in the output keyed stream is a tuple of
2197    /// (self's value, other's value).
2198    /// If there are multiple values for the same key, this performs a cross product
2199    /// for each matching key.
2200    ///
2201    /// # Example
2202    /// ```rust
2203    /// # #[cfg(feature = "deploy")] {
2204    /// # use hydro_lang::prelude::*;
2205    /// # use futures::StreamExt;
2206    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2207    /// let tick = process.tick();
2208    /// let keyed_data = process
2209    ///     .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
2210    ///     .into_keyed()
2211    ///     .batch(&tick, nondet!(/** test */));
2212    /// let other_data = process
2213    ///     .source_iter(q!(vec![(1, 100), (2, 200), (2, 201)]))
2214    ///     .into_keyed()
2215    ///     .batch(&tick, nondet!(/** test */));
2216    /// keyed_data.join_keyed_stream(other_data).entries().all_ticks()
2217    /// # }, |mut stream| async move {
2218    /// // { 1: [(10, 100), (11, 100)], 2: [(20, 200), (20, 201)] } in any order
2219    /// # let mut results = vec![];
2220    /// # for _ in 0..4 {
2221    /// #     results.push(stream.next().await.unwrap());
2222    /// # }
2223    /// # results.sort();
2224    /// # assert_eq!(results, vec![(1, (10, 100)), (1, (11, 100)), (2, (20, 200)), (2, (20, 201))]);
2225    /// # }));
2226    /// # }
2227    /// ```
2228    #[expect(clippy::type_complexity, reason = "ordering / retries propagation")]
2229    pub fn join_keyed_stream<V2, B2: Boundedness, O2: Ordering, R2: Retries>(
2230        self,
2231        other: KeyedStream<K, V2, L, B2, O2, R2>,
2232    ) -> KeyedStream<
2233        K,
2234        (V, V2),
2235        L,
2236        B,
2237        B2::PreserveOrderIfBounded<NoOrder>,
2238        <R as MinRetries<R2>>::Min,
2239    >
2240    where
2241        K: Eq + Hash + Clone,
2242        R: MinRetries<R2>,
2243        V: Clone,
2244        V2: Clone,
2245    {
2246        self.entries().join(other.entries()).into_keyed()
2247    }
2248
2249    /// Deduplicates values within each key group, emitting each unique value per key
2250    /// exactly once.
2251    ///
2252    /// # Example
2253    /// ```rust
2254    /// # #[cfg(feature = "deploy")] {
2255    /// # use hydro_lang::prelude::*;
2256    /// # use futures::StreamExt;
2257    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2258    /// process
2259    ///     .source_iter(q!(vec![(1, 10), (2, 20), (1, 10), (2, 30), (1, 20)]))
2260    ///     .into_keyed()
2261    ///     .unique()
2262    /// # .entries()
2263    /// # }, |mut stream| async move {
2264    /// // unique values per key: { 1: [10, 20], 2: [20, 30] }
2265    /// # let mut results = Vec::new();
2266    /// # for _ in 0..4 {
2267    /// #     results.push(stream.next().await.unwrap());
2268    /// # }
2269    /// # let mut key1: Vec<_> = results.iter().filter(|(k, _)| *k == 1).map(|(_, v)| *v).collect();
2270    /// # let mut key2: Vec<_> = results.iter().filter(|(k, _)| *k == 2).map(|(_, v)| *v).collect();
2271    /// # key1.sort();
2272    /// # key2.sort();
2273    /// # assert_eq!(key1, vec![10, 20]);
2274    /// # assert_eq!(key2, vec![20, 30]);
2275    /// # }));
2276    /// # }
2277    /// ```
2278    pub fn unique(self) -> KeyedStream<K, V, L, B, NoOrder, ExactlyOnce>
2279    where
2280        K: Eq + Hash + Clone,
2281        V: Eq + Hash + Clone,
2282    {
2283        self.entries().unique().into_keyed()
2284    }
2285
2286    /// Sorts the values within each key group in ascending order.
2287    ///
2288    /// The output keyed stream has a [`TotalOrder`] guarantee on the values within
2289    /// each group. This operator will block until all elements in the input stream
2290    /// are available, so it requires the input stream to be [`Bounded`].
2291    ///
2292    /// # Example
2293    /// ```rust
2294    /// # #[cfg(feature = "deploy")] {
2295    /// # use hydro_lang::prelude::*;
2296    /// # use futures::StreamExt;
2297    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2298    /// let tick = process.tick();
2299    /// let numbers = process
2300    ///     .source_iter(q!(vec![(1, 3), (2, 1), (1, 1), (2, 2)]))
2301    ///     .into_keyed();
2302    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2303    /// batch.sort().all_ticks()
2304    /// # .entries()
2305    /// # }, |mut stream| async move {
2306    /// // values sorted within each key: { 1: [1, 3], 2: [1, 2] }
2307    /// # let mut results = Vec::new();
2308    /// # for _ in 0..4 {
2309    /// #     results.push(stream.next().await.unwrap());
2310    /// # }
2311    /// # let key1_vals: Vec<_> = results.iter().filter(|(k, _)| *k == 1).map(|(_, v)| *v).collect();
2312    /// # let key2_vals: Vec<_> = results.iter().filter(|(k, _)| *k == 2).map(|(_, v)| *v).collect();
2313    /// # assert_eq!(key1_vals, vec![1, 3]);
2314    /// # assert_eq!(key2_vals, vec![1, 2]);
2315    /// # }));
2316    /// # }
2317    /// ```
2318    pub fn sort(self) -> KeyedStream<K, V, L, Bounded, TotalOrder, R>
2319    where
2320        B: IsBounded,
2321        K: Ord,
2322        V: Ord,
2323    {
2324        self.entries().sort().into_keyed()
2325    }
2326
2327    /// Produces a new keyed stream that combines the groups of the inputs by first emitting the
2328    /// elements of the `self` stream, and then emits the elements of the `other` stream (if a key
2329    /// is only present in one of the inputs, its values are passed through as-is). The output has
2330    /// a [`TotalOrder`] guarantee if and only if both inputs have a [`TotalOrder`] guarantee.
2331    ///
2332    /// Currently, both input streams must be [`Bounded`]. This operator will block
2333    /// on the first stream until all its elements are available. In a future version,
2334    /// we will relax the requirement on the `other` stream.
2335    ///
2336    /// # Example
2337    /// ```rust
2338    /// # #[cfg(feature = "deploy")] {
2339    /// # use hydro_lang::prelude::*;
2340    /// # use futures::StreamExt;
2341    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2342    /// let tick = process.tick();
2343    /// let numbers = process.source_iter(q!(vec![(0, 1), (1, 3)])).into_keyed();
2344    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2345    /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
2346    /// # .entries()
2347    /// # }, |mut stream| async move {
2348    /// // { 0: [2, 1], 1: [4, 3] }
2349    /// # let mut results = Vec::new();
2350    /// # for _ in 0..4 {
2351    /// #     results.push(stream.next().await.unwrap());
2352    /// # }
2353    /// # results.sort();
2354    /// # assert_eq!(results, vec![(0, 1), (0, 2), (1, 3), (1, 4)]);
2355    /// # }));
2356    /// # }
2357    /// ```
2358    pub fn chain<O2: Ordering, R2: Retries>(
2359        self,
2360        other: KeyedStream<K, V, L, Bounded, O2, R2>,
2361    ) -> KeyedStream<K, V, L, Bounded, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
2362    where
2363        B: IsBounded,
2364        O: MinOrder<O2>,
2365        R: MinRetries<R2>,
2366    {
2367        let this = self.make_bounded();
2368        check_matching_location(&this.location, &other.location);
2369
2370        KeyedStream::new(
2371            this.location.clone(),
2372            HydroNode::Chain {
2373                first: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
2374                second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2375                metadata: this.location.new_node_metadata(KeyedStream::<
2376                    K,
2377                    V,
2378                    L,
2379                    Bounded,
2380                    <O as MinOrder<O2>>::Min,
2381                    <R as MinRetries<R2>>::Min,
2382                >::collection_kind()),
2383            },
2384        )
2385    }
2386
2387    /// Emit a keyed stream containing keys shared between the keyed stream and the
2388    /// keyed singleton, where each value in the output keyed stream is a tuple of
2389    /// (the keyed stream's value, the keyed singleton's value).
2390    ///
2391    /// # Example
2392    /// ```rust
2393    /// # #[cfg(feature = "deploy")] {
2394    /// # use hydro_lang::prelude::*;
2395    /// # use futures::StreamExt;
2396    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2397    /// let tick = process.tick();
2398    /// let keyed_data = process
2399    ///     .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
2400    ///     .into_keyed()
2401    ///     .batch(&tick, nondet!(/** test */));
2402    /// let singleton_data = process
2403    ///     .source_iter(q!(vec![(1, 100), (2, 200)]))
2404    ///     .into_keyed()
2405    ///     .batch(&tick, nondet!(/** test */))
2406    ///     .first();
2407    /// keyed_data.join_keyed_singleton(singleton_data).entries().all_ticks()
2408    /// # }, |mut stream| async move {
2409    /// // { 1: [(10, 100), (11, 100)], 2: [(20, 200)] } in any order
2410    /// # let mut results = vec![];
2411    /// # for _ in 0..3 {
2412    /// #     results.push(stream.next().await.unwrap());
2413    /// # }
2414    /// # results.sort();
2415    /// # assert_eq!(results, vec![(1, (10, 100)), (1, (11, 100)), (2, (20, 200))]);
2416    /// # }));
2417    /// # }
2418    /// ```
2419    pub fn join_keyed_singleton<V2: Clone, B2: IsBounded>(
2420        self,
2421        other: KeyedSingleton<K, V2, L, B2>,
2422    ) -> KeyedStream<K, (V, V2), L, B, O, R>
2423    where
2424        K: Eq + Hash + Clone,
2425        V: Clone,
2426    {
2427        let ir_node = if B2::BOUNDED {
2428            HydroNode::JoinHalf {
2429                left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2430                right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2431                metadata: self
2432                    .location
2433                    .new_node_metadata(KeyedStream::<K, (V, V2), L, B, O, R>::collection_kind()),
2434            }
2435        } else {
2436            HydroNode::Join {
2437                left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2438                right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2439                metadata: self
2440                    .location
2441                    .new_node_metadata(KeyedStream::<K, (V, V2), L, B, O, R>::collection_kind()),
2442            }
2443        };
2444
2445        KeyedStream::new(self.location.clone(), ir_node)
2446    }
2447
2448    /// Gets the values associated with a specific key from the keyed stream.
2449    /// Returns an empty stream if the key is `None` or there are no associated values.
2450    ///
2451    /// # Example
2452    /// ```rust
2453    /// # #[cfg(feature = "deploy")] {
2454    /// # use hydro_lang::prelude::*;
2455    /// # use futures::StreamExt;
2456    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2457    /// let tick = process.tick();
2458    /// let keyed_data = process
2459    ///     .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
2460    ///     .into_keyed()
2461    ///     .batch(&tick, nondet!(/** test */));
2462    /// let key = tick.singleton(q!(1));
2463    /// keyed_data.get(key).all_ticks()
2464    /// # }, |mut stream| async move {
2465    /// // 10, 11
2466    /// # let mut results = vec![];
2467    /// # for _ in 0..2 {
2468    /// #     results.push(stream.next().await.unwrap());
2469    /// # }
2470    /// # results.sort();
2471    /// # assert_eq!(results, vec![10, 11]);
2472    /// # }));
2473    /// # }
2474    /// ```
2475    pub fn get(self, key: impl Into<Optional<K, L, Bounded>>) -> Stream<V, L, B, O, R>
2476    where
2477        K: Eq + Hash + Clone,
2478        V: Clone,
2479    {
2480        let joined =
2481            self.join_keyed_singleton(key.into().map(q!(|k| (k, ()))).into_keyed_singleton());
2482
2483        if O::ORDERING_KIND == StreamOrder::TotalOrder {
2484            joined
2485                .use_ordering_type::<TotalOrder>()
2486                .cast_at_most_one_key()
2487                .map(q!(|(_, (v, _))| v))
2488                .weaken_ordering()
2489        } else {
2490            joined.values().map(q!(|(v, _)| v)).use_ordering_type()
2491        }
2492    }
2493
2494    /// For each value in `self`, find the matching key in `lookup`.
2495    /// The output is a keyed stream with the key from `self`, and a value
2496    /// that is a tuple of (`self`'s value, Option<`lookup`'s value>).
2497    /// If the key is not present in `lookup`, the option will be [`None`].
2498    ///
2499    /// # Example
2500    /// ```rust
2501    /// # #[cfg(feature = "deploy")] {
2502    /// # use hydro_lang::prelude::*;
2503    /// # use futures::StreamExt;
2504    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2505    /// # let tick = process.tick();
2506    /// let requests = // { 1: [10, 11], 2: 20 }
2507    /// # process
2508    /// #     .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
2509    /// #     .into_keyed()
2510    /// #     .batch(&tick, nondet!(/** test */));
2511    /// let other_data = // { 10: 100, 11: 110 }
2512    /// # process
2513    /// #     .source_iter(q!(vec![(10, 100), (11, 110)]))
2514    /// #     .into_keyed()
2515    /// #     .batch(&tick, nondet!(/** test */))
2516    /// #     .first();
2517    /// requests.lookup_keyed_singleton(other_data)
2518    /// # .entries().all_ticks()
2519    /// # }, |mut stream| async move {
2520    /// // { 1: [(10, Some(100)), (11, Some(110))], 2: (20, None) }
2521    /// # let mut results = vec![];
2522    /// # for _ in 0..3 {
2523    /// #     results.push(stream.next().await.unwrap());
2524    /// # }
2525    /// # results.sort();
2526    /// # assert_eq!(results, vec![(1, (10, Some(100))), (1, (11, Some(110))), (2, (20, None))]);
2527    /// # }));
2528    /// # }
2529    /// ```
2530    pub fn lookup_keyed_singleton<V2>(
2531        self,
2532        lookup: KeyedSingleton<V, V2, L, Bounded>,
2533    ) -> KeyedStream<K, (V, Option<V2>), L, Bounded, NoOrder, R>
2534    where
2535        B: IsBounded,
2536        K: Eq + Hash + Clone,
2537        V: Eq + Hash + Clone,
2538        V2: Clone,
2539    {
2540        self.lookup_keyed_stream(lookup.into_keyed_stream().weaken_retries::<R>())
2541    }
2542
2543    /// For each value in `self`, find the matching key in `lookup`.
2544    /// The output is a keyed stream with the key from `self`, and a value
2545    /// that is a tuple of (`self`'s value, Option<`lookup`'s value>).
2546    /// If the key is not present in `lookup`, the option will be [`None`].
2547    ///
2548    /// # Example
2549    /// ```rust
2550    /// # #[cfg(feature = "deploy")] {
2551    /// # use hydro_lang::prelude::*;
2552    /// # use futures::StreamExt;
2553    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2554    /// # let tick = process.tick();
2555    /// let requests = // { 1: [10, 11], 2: 20 }
2556    /// # process
2557    /// #     .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
2558    /// #     .into_keyed()
2559    /// #     .batch(&tick, nondet!(/** test */));
2560    /// let other_data = // { 10: [100, 101], 11: 110 }
2561    /// # process
2562    /// #     .source_iter(q!(vec![(10, 100), (10, 101), (11, 110)]))
2563    /// #     .into_keyed()
2564    /// #     .batch(&tick, nondet!(/** test */));
2565    /// requests.lookup_keyed_stream(other_data)
2566    /// # .entries().all_ticks()
2567    /// # }, |mut stream| async move {
2568    /// // { 1: [(10, Some(100)), (10, Some(101)), (11, Some(110))], 2: (20, None) }
2569    /// # let mut results = vec![];
2570    /// # for _ in 0..4 {
2571    /// #     results.push(stream.next().await.unwrap());
2572    /// # }
2573    /// # results.sort();
2574    /// # assert_eq!(results, vec![(1, (10, Some(100))), (1, (10, Some(101))), (1, (11, Some(110))), (2, (20, None))]);
2575    /// # }));
2576    /// # }
2577    /// ```
2578    #[expect(clippy::type_complexity, reason = "retries propagation")]
2579    pub fn lookup_keyed_stream<V2, O2: Ordering, R2: Retries>(
2580        self,
2581        lookup: KeyedStream<V, V2, L, Bounded, O2, R2>,
2582    ) -> KeyedStream<K, (V, Option<V2>), L, Bounded, NoOrder, <R as MinRetries<R2>>::Min>
2583    where
2584        B: IsBounded,
2585        K: Eq + Hash + Clone,
2586        V: Eq + Hash + Clone,
2587        V2: Clone,
2588        R: MinRetries<R2>,
2589    {
2590        let inverted = self
2591            .make_bounded()
2592            .entries()
2593            .map(q!(|(key, lookup_value)| (lookup_value, key)))
2594            .into_keyed();
2595        let found = inverted
2596            .clone()
2597            .join_keyed_stream(lookup.clone())
2598            .entries()
2599            .map(q!(|(lookup_value, (key, value))| (
2600                key,
2601                (lookup_value, Some(value))
2602            )))
2603            .into_keyed();
2604        let not_found = inverted
2605            .filter_key_not_in(lookup.keys())
2606            .entries()
2607            .map(q!(|(lookup_value, key)| (key, (lookup_value, None))))
2608            .into_keyed();
2609
2610        found.chain(not_found.weaken_retries::<<R as MinRetries<R2>>::Min>())
2611    }
2612
2613    /// Shifts this keyed stream into an atomic context, which guarantees that any downstream logic
2614    /// will all be executed synchronously before any outputs are yielded (in [`KeyedStream::end_atomic`]).
2615    ///
2616    /// This is useful to enforce local consistency constraints, such as ensuring that a write is
2617    /// processed before an acknowledgement is emitted.
2618    pub fn atomic(self) -> KeyedStream<K, V, Atomic<L>, B, O, R> {
2619        let id = self.location.flow_state().borrow_mut().next_clock_id();
2620        let out_location = Atomic {
2621            tick: Tick {
2622                id,
2623                l: self.location.clone(),
2624            },
2625        };
2626        KeyedStream::new(
2627            out_location.clone(),
2628            HydroNode::BeginAtomic {
2629                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2630                metadata: out_location
2631                    .new_node_metadata(KeyedStream::<K, V, Atomic<L>, B, O, R>::collection_kind()),
2632            },
2633        )
2634    }
2635
2636    /// Given a tick, returns a keyed stream corresponding to a batch of elements segmented by
2637    /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
2638    /// the order of the input.
2639    ///
2640    /// # Non-Determinism
2641    /// The batch boundaries are non-deterministic and may change across executions.
2642    pub fn batch<L2: Location<'a, DropConsistency = L::DropConsistency>>(
2643        self,
2644        tick: &Tick<L2>,
2645        nondet: NonDet,
2646    ) -> KeyedStream<K, V, Tick<L::DropConsistency>, Bounded, O, R> {
2647        let _ = nondet;
2648        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
2649        KeyedStream::new(
2650            tick.drop_consistency(),
2651            HydroNode::Batch {
2652                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2653                metadata: tick.new_node_metadata(
2654                    KeyedStream::<K, V, Tick<L>, Bounded, O, R>::collection_kind(),
2655                ),
2656            },
2657        )
2658    }
2659}
2660
2661impl<'a, K1, K2, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
2662    KeyedStream<(K1, K2), V, L, B, O, R>
2663{
2664    /// Produces a new keyed stream by dropping the first element of the compound key.
2665    ///
2666    /// Because multiple keys may share the same suffix, this operation results in re-grouping
2667    /// of the values under the new keys. The values across groups with the same new key
2668    /// will be interleaved, so the resulting stream has [`NoOrder`] within each group.
2669    ///
2670    /// # Example
2671    /// ```rust
2672    /// # #[cfg(feature = "deploy")] {
2673    /// # use hydro_lang::prelude::*;
2674    /// # use futures::StreamExt;
2675    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2676    /// process
2677    ///     .source_iter(q!(vec![((1, 10), 2), ((1, 10), 3), ((2, 20), 4)]))
2678    ///     .into_keyed()
2679    ///     .drop_key_prefix()
2680    /// #   .entries()
2681    /// # }, |mut stream| async move {
2682    /// // { 10: [2, 3], 20: [4] }
2683    /// # let mut results = Vec::new();
2684    /// # for _ in 0..3 {
2685    /// #     results.push(stream.next().await.unwrap());
2686    /// # }
2687    /// # results.sort();
2688    /// # assert_eq!(results, vec![(10, 2), (10, 3), (20, 4)]);
2689    /// # }));
2690    /// # }
2691    /// ```
2692    pub fn drop_key_prefix(self) -> KeyedStream<K2, V, L, B, NoOrder, R> {
2693        self.entries()
2694            .map(q!(|((_k1, k2), v)| (k2, v)))
2695            .into_keyed()
2696    }
2697}
2698
2699impl<'a, K, V, L: Location<'a>, O: Ordering, R: Retries> KeyedStream<K, V, L, Unbounded, O, R> {
2700    /// Produces a new keyed stream that "merges" the inputs by interleaving the elements
2701    /// of any overlapping groups. The result has [`NoOrder`] on each group because the
2702    /// order of interleaving is not guaranteed. If the keys across both inputs do not overlap,
2703    /// the ordering will be deterministic and you can safely use [`Self::assume_ordering`].
2704    ///
2705    /// Currently, both input streams must be [`Unbounded`].
2706    ///
2707    /// # Example
2708    /// ```rust
2709    /// # #[cfg(feature = "deploy")] {
2710    /// # use hydro_lang::prelude::*;
2711    /// # use futures::StreamExt;
2712    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2713    /// let numbers1: KeyedStream<i32, i32, _> = // { 1: [2], 3: [4] }
2714    /// # process.source_iter(q!(vec![(1, 2), (3, 4)])).into_keyed().into();
2715    /// let numbers2: KeyedStream<i32, i32, _> = // { 1: [3], 3: [5] }
2716    /// # process.source_iter(q!(vec![(1, 3), (3, 5)])).into_keyed().into();
2717    /// numbers1.merge_unordered(numbers2)
2718    /// #   .entries()
2719    /// # }, |mut stream| async move {
2720    /// // { 1: [2, 3], 3: [4, 5] } with each group in unknown order
2721    /// # let mut results = Vec::new();
2722    /// # for _ in 0..4 {
2723    /// #     results.push(stream.next().await.unwrap());
2724    /// # }
2725    /// # results.sort();
2726    /// # assert_eq!(results, vec![(1, 2), (1, 3), (3, 4), (3, 5)]);
2727    /// # }));
2728    /// # }
2729    /// ```
2730    pub fn merge_unordered<O2: Ordering, R2: Retries>(
2731        self,
2732        other: KeyedStream<K, V, L, Unbounded, O2, R2>,
2733    ) -> KeyedStream<K, V, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2734    where
2735        R: MinRetries<R2>,
2736    {
2737        KeyedStream::new(
2738            self.location.clone(),
2739            HydroNode::Chain {
2740                first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2741                second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2742                metadata: self.location.new_node_metadata(KeyedStream::<
2743                    K,
2744                    V,
2745                    L,
2746                    Unbounded,
2747                    NoOrder,
2748                    <R as MinRetries<R2>>::Min,
2749                >::collection_kind()),
2750            },
2751        )
2752    }
2753
2754    /// Deprecated: use [`KeyedStream::merge_unordered`] instead.
2755    #[deprecated(note = "use `merge_unordered` instead")]
2756    pub fn interleave<O2: Ordering, R2: Retries>(
2757        self,
2758        other: KeyedStream<K, V, L, Unbounded, O2, R2>,
2759    ) -> KeyedStream<K, V, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2760    where
2761        R: MinRetries<R2>,
2762    {
2763        self.merge_unordered(other)
2764    }
2765}
2766
2767impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> KeyedStream<K, V, Atomic<L>, B, O, R>
2768where
2769    L: Location<'a>,
2770{
2771    /// Returns a keyed stream corresponding to the latest batch of elements being atomically
2772    /// processed. These batches are guaranteed to be contiguous across ticks and preserve
2773    /// the order of the input. The output keyed stream will execute in the [`Tick`] that was
2774    /// used to create the atomic section.
2775    ///
2776    /// # Non-Determinism
2777    /// The batch boundaries are non-deterministic and may change across executions.
2778    pub fn batch_atomic<L2: Location<'a, DropConsistency = L::DropConsistency>>(
2779        self,
2780        tick: &Tick<L2>,
2781        nondet: NonDet,
2782    ) -> KeyedStream<K, V, Tick<L::DropConsistency>, Bounded, O, R> {
2783        let _ = nondet;
2784        KeyedStream::new(
2785            tick.drop_consistency(),
2786            HydroNode::Batch {
2787                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2788                metadata: tick.new_node_metadata(
2789                    KeyedStream::<K, V, Tick<L>, Bounded, O, R>::collection_kind(),
2790                ),
2791            },
2792        )
2793    }
2794
2795    /// Yields the elements of this keyed stream back into a top-level, asynchronous execution context.
2796    /// See [`KeyedStream::atomic`] for more details.
2797    pub fn end_atomic(self) -> KeyedStream<K, V, L, B, O, R> {
2798        KeyedStream::new(
2799            self.location.tick.l.clone(),
2800            HydroNode::EndAtomic {
2801                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2802                metadata: self
2803                    .location
2804                    .tick
2805                    .l
2806                    .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
2807            },
2808        )
2809    }
2810}
2811
2812impl<'a, K, V, L, O: Ordering, R: Retries> KeyedStream<K, V, Tick<L>, Bounded, O, R>
2813where
2814    L: Location<'a>,
2815{
2816    /// Asynchronously yields this batch of keyed elements outside the tick as an unbounded keyed stream,
2817    /// which will stream all the elements across _all_ tick iterations by concatenating the batches for
2818    /// each key.
2819    pub fn all_ticks(self) -> KeyedStream<K, V, L, Unbounded, O, R> {
2820        KeyedStream::new(
2821            self.location.outer().clone(),
2822            HydroNode::YieldConcat {
2823                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2824                metadata: self.location.outer().new_node_metadata(KeyedStream::<
2825                    K,
2826                    V,
2827                    L,
2828                    Unbounded,
2829                    O,
2830                    R,
2831                >::collection_kind(
2832                )),
2833            },
2834        )
2835    }
2836
2837    /// Synchronously yields this batch of keyed elements outside the tick as an unbounded keyed stream,
2838    /// which will stream all the elements across _all_ tick iterations by concatenating the batches for
2839    /// each key.
2840    ///
2841    /// Unlike [`KeyedStream::all_ticks`], this preserves synchronous execution, as the output stream
2842    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
2843    /// stream's [`Tick`] context.
2844    pub fn all_ticks_atomic(self) -> KeyedStream<K, V, Atomic<L>, Unbounded, O, R> {
2845        let out_location = Atomic {
2846            tick: self.location.clone(),
2847        };
2848
2849        KeyedStream::new(
2850            out_location.clone(),
2851            HydroNode::YieldConcat {
2852                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2853                metadata: out_location.new_node_metadata(KeyedStream::<
2854                    K,
2855                    V,
2856                    Atomic<L>,
2857                    Unbounded,
2858                    O,
2859                    R,
2860                >::collection_kind()),
2861            },
2862        )
2863    }
2864
2865    /// Transforms the keyed stream using the given closure in "stateful" mode, where stateful operators
2866    /// such as `fold` retrain their memory for each key across ticks rather than resetting across batches of each key.
2867    ///
2868    /// This API is particularly useful for stateful computation on batches of data, such as
2869    /// maintaining an accumulated state that is up to date with the current batch.
2870    ///
2871    /// # Example
2872    /// ```rust
2873    /// # #[cfg(feature = "deploy")] {
2874    /// # use hydro_lang::prelude::*;
2875    /// # use futures::StreamExt;
2876    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2877    /// let tick = process.tick();
2878    /// # // ticks are lazy by default, forces the second tick to run
2879    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2880    /// # let batch_first_tick = process
2881    /// #   .source_iter(q!(vec![(0, 1), (1, 2), (2, 3), (3, 4)]))
2882    /// #   .into_keyed()
2883    /// #   .batch(&tick, nondet!(/** test */));
2884    /// # let batch_second_tick = process
2885    /// #   .source_iter(q!(vec![(0, 5), (1, 6), (2, 7)]))
2886    /// #   .into_keyed()
2887    /// #   .batch(&tick, nondet!(/** test */))
2888    /// #   .defer_tick(); // appears on the second tick
2889    /// let input = batch_first_tick.chain(batch_second_tick).all_ticks();
2890    ///
2891    /// input.batch(&tick, nondet!(/** test */))
2892    ///     .across_ticks(|s| s.reduce(q!(|sum, new| {
2893    ///         *sum += new;
2894    ///     }))).entries().all_ticks()
2895    /// # }, |mut stream| async move {
2896    /// // First tick: [(0, 1), (1, 2), (2, 3), (3, 4)]
2897    /// # let mut results = Vec::new();
2898    /// # for _ in 0..4 {
2899    /// #     results.push(stream.next().await.unwrap());
2900    /// # }
2901    /// # results.sort();
2902    /// # assert_eq!(results, vec![(0, 1), (1, 2), (2, 3), (3, 4)]);
2903    /// // Second tick: [(0, 6), (1, 8), (2, 10), (3, 4)]
2904    /// # results.clear();
2905    /// # for _ in 0..4 {
2906    /// #     results.push(stream.next().await.unwrap());
2907    /// # }
2908    /// # results.sort();
2909    /// # assert_eq!(results, vec![(0, 6), (1, 8), (2, 10), (3, 4)]);
2910    /// # }));
2911    /// # }
2912    /// ```
2913    pub fn across_ticks<Out: BatchAtomic<'a>>(
2914        self,
2915        thunk: impl FnOnce(KeyedStream<K, V, Atomic<L>, Unbounded, O, R>) -> Out,
2916    ) -> Out::Batched {
2917        thunk(self.all_ticks_atomic()).batched_atomic()
2918    }
2919
2920    /// Shifts the entries in `self` to the **next tick**, so that the returned keyed stream at
2921    /// tick `T` always has the entries of `self` at tick `T - 1`.
2922    ///
2923    /// At tick `0`, the output keyed stream is empty, since there is no previous tick.
2924    ///
2925    /// This operator enables stateful iterative processing with ticks, by sending data from one
2926    /// tick to the next. For example, you can use it to combine inputs across consecutive batches.
2927    ///
2928    /// # Example
2929    /// ```rust
2930    /// # #[cfg(feature = "deploy")] {
2931    /// # use hydro_lang::prelude::*;
2932    /// # use futures::StreamExt;
2933    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2934    /// let tick = process.tick();
2935    /// # // ticks are lazy by default, forces the second tick to run
2936    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2937    /// # let batch_first_tick = process
2938    /// #   .source_iter(q!(vec![(1, 2), (1, 3)]))
2939    /// #   .batch(&tick, nondet!(/** test */))
2940    /// #   .into_keyed();
2941    /// # let batch_second_tick = process
2942    /// #   .source_iter(q!(vec![(1, 4), (2, 5)]))
2943    /// #   .batch(&tick, nondet!(/** test */))
2944    /// #   .defer_tick()
2945    /// #   .into_keyed(); // appears on the second tick
2946    /// let changes_across_ticks = // { 1: [2, 3] } (first tick), { 1: [4], 2: [5] } (second tick)
2947    /// # batch_first_tick.chain(batch_second_tick);
2948    /// changes_across_ticks.clone().defer_tick().chain( // from the previous tick
2949    ///     changes_across_ticks // from the current tick
2950    /// )
2951    /// # .entries().all_ticks()
2952    /// # }, |mut stream| async move {
2953    /// // First tick: { 1: [2, 3] }
2954    /// # let mut results = Vec::new();
2955    /// # for _ in 0..2 {
2956    /// #     results.push(stream.next().await.unwrap());
2957    /// # }
2958    /// # results.sort();
2959    /// # assert_eq!(results, vec![(1, 2), (1, 3)]);
2960    /// // Second tick: { 1: [2, 3, 4], 2: [5] }
2961    /// # results.clear();
2962    /// # for _ in 0..4 {
2963    /// #     results.push(stream.next().await.unwrap());
2964    /// # }
2965    /// # results.sort();
2966    /// # assert_eq!(results, vec![(1, 2), (1, 3), (1, 4), (2, 5)]);
2967    /// // Third tick: { 1: [4], 2: [5] }
2968    /// # results.clear();
2969    /// # for _ in 0..2 {
2970    /// #     results.push(stream.next().await.unwrap());
2971    /// # }
2972    /// # results.sort();
2973    /// # assert_eq!(results, vec![(1, 4), (2, 5)]);
2974    /// # }));
2975    /// # }
2976    /// ```
2977    pub fn defer_tick(self) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
2978        KeyedStream::new(
2979            self.location.clone(),
2980            HydroNode::DeferTick {
2981                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2982                metadata: self.location.new_node_metadata(KeyedStream::<
2983                    K,
2984                    V,
2985                    Tick<L>,
2986                    Bounded,
2987                    O,
2988                    R,
2989                >::collection_kind()),
2990            },
2991        )
2992    }
2993}
2994
2995#[cfg(test)]
2996mod tests {
2997    #[cfg(feature = "deploy")]
2998    use futures::{SinkExt, StreamExt};
2999    #[cfg(feature = "deploy")]
3000    use hydro_deploy::Deployment;
3001    #[cfg(any(feature = "deploy", feature = "sim"))]
3002    use stageleft::q;
3003
3004    #[cfg(any(feature = "deploy", feature = "sim"))]
3005    use crate::compile::builder::FlowBuilder;
3006    #[cfg(feature = "deploy")]
3007    use crate::live_collections::stream::ExactlyOnce;
3008    #[cfg(feature = "sim")]
3009    use crate::live_collections::stream::{NoOrder, TotalOrder};
3010    #[cfg(any(feature = "deploy", feature = "sim"))]
3011    use crate::location::Location;
3012    #[cfg(feature = "sim")]
3013    use crate::networking::TCP;
3014    #[cfg(any(feature = "deploy", feature = "sim"))]
3015    use crate::nondet::nondet;
3016    #[cfg(feature = "deploy")]
3017    use crate::properties::manual_proof;
3018
3019    #[cfg(feature = "deploy")]
3020    #[tokio::test]
3021    async fn get_unbounded_keyed_stream_bounded_singleton() {
3022        let mut deployment = Deployment::new();
3023
3024        let mut flow = FlowBuilder::new();
3025        let node = flow.process::<()>();
3026        let external = flow.external::<()>();
3027
3028        let (input_send, input_stream) =
3029            node.source_external_bincode::<_, (i32, i32), _, ExactlyOnce>(&external);
3030
3031        let key = node.singleton(q!(1));
3032
3033        let out = input_stream
3034            .into_keyed()
3035            .get(key)
3036            .send_bincode_external(&external);
3037
3038        let nodes = flow
3039            .with_process(&node, deployment.Localhost())
3040            .with_external(&external, deployment.Localhost())
3041            .deploy(&mut deployment);
3042
3043        deployment.deploy().await.unwrap();
3044
3045        let mut input_send = nodes.connect(input_send).await;
3046        let mut out = nodes.connect(out).await;
3047
3048        deployment.start().await.unwrap();
3049
3050        // First batch
3051        input_send.send((1, 10)).await.unwrap();
3052        input_send.send((2, 20)).await.unwrap();
3053        assert_eq!(out.next().await.unwrap(), 10);
3054
3055        // Second batch
3056        input_send.send((1, 11)).await.unwrap();
3057        input_send.send((2, 21)).await.unwrap();
3058        assert_eq!(out.next().await.unwrap(), 11);
3059    }
3060
3061    #[cfg(feature = "deploy")]
3062    #[tokio::test]
3063    async fn reduce_watermark_filter() {
3064        let mut deployment = Deployment::new();
3065
3066        let mut flow = FlowBuilder::new();
3067        let node = flow.process::<()>();
3068        let external = flow.external::<()>();
3069
3070        let node_tick = node.tick();
3071        let watermark = node_tick.singleton(q!(2));
3072
3073        let sum = node
3074            .source_stream(q!(tokio_stream::iter([
3075                (0, 100),
3076                (1, 101),
3077                (2, 102),
3078                (2, 102)
3079            ])))
3080            .into_keyed()
3081            .reduce_watermark(
3082                watermark,
3083                q!(|acc, v| {
3084                    *acc += v;
3085                }),
3086            )
3087            .snapshot(&node_tick, nondet!(/** test */))
3088            .entries()
3089            .all_ticks()
3090            .send_bincode_external(&external);
3091
3092        let nodes = flow
3093            .with_process(&node, deployment.Localhost())
3094            .with_external(&external, deployment.Localhost())
3095            .deploy(&mut deployment);
3096
3097        deployment.deploy().await.unwrap();
3098
3099        let mut out = nodes.connect(sum).await;
3100
3101        deployment.start().await.unwrap();
3102
3103        assert_eq!(out.next().await.unwrap(), (2, 204));
3104    }
3105
3106    #[cfg(feature = "deploy")]
3107    #[tokio::test]
3108    async fn reduce_watermark_bounded() {
3109        let mut deployment = Deployment::new();
3110
3111        let mut flow = FlowBuilder::new();
3112        let node = flow.process::<()>();
3113        let external = flow.external::<()>();
3114
3115        let node_tick = node.tick();
3116        let watermark = node_tick.singleton(q!(2));
3117
3118        let sum = node
3119            .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
3120            .into_keyed()
3121            .reduce_watermark(
3122                watermark,
3123                q!(|acc, v| {
3124                    *acc += v;
3125                }),
3126            )
3127            .entries()
3128            .send_bincode_external(&external);
3129
3130        let nodes = flow
3131            .with_process(&node, deployment.Localhost())
3132            .with_external(&external, deployment.Localhost())
3133            .deploy(&mut deployment);
3134
3135        deployment.deploy().await.unwrap();
3136
3137        let mut out = nodes.connect(sum).await;
3138
3139        deployment.start().await.unwrap();
3140
3141        assert_eq!(out.next().await.unwrap(), (2, 204));
3142    }
3143
3144    #[cfg(feature = "deploy")]
3145    #[tokio::test]
3146    async fn reduce_watermark_garbage_collect() {
3147        let mut deployment = Deployment::new();
3148
3149        let mut flow = FlowBuilder::new();
3150        let node = flow.process::<()>();
3151        let external = flow.external::<()>();
3152        let (tick_send, tick_trigger) =
3153            node.source_external_bincode::<_, _, _, ExactlyOnce>(&external);
3154
3155        let node_tick = node.tick();
3156        let (watermark_complete_cycle, watermark) =
3157            node_tick.cycle_with_initial(node_tick.singleton(q!(2)));
3158        let next_watermark = watermark.clone().map(q!(|v| v + 1));
3159        watermark_complete_cycle.complete_next_tick(next_watermark);
3160
3161        let tick_triggered_input = node_tick
3162            .singleton(q!((3, 103)))
3163            .into_stream()
3164            .filter_if(
3165                tick_trigger
3166                    .clone()
3167                    .batch(&node_tick, nondet!(/** test */))
3168                    .first()
3169                    .is_some(),
3170            )
3171            .all_ticks();
3172
3173        let sum = node
3174            .source_stream(q!(tokio_stream::iter([
3175                (0, 100),
3176                (1, 101),
3177                (2, 102),
3178                (2, 102)
3179            ])))
3180            .merge_unordered(tick_triggered_input)
3181            .into_keyed()
3182            .reduce_watermark(
3183                watermark,
3184                q!(
3185                    |acc, v| {
3186                        *acc += v;
3187                    },
3188                    commutative = manual_proof!(/** integer addition is commutative */)
3189                ),
3190            )
3191            .snapshot(&node_tick, nondet!(/** test */))
3192            .entries()
3193            .all_ticks()
3194            .send_bincode_external(&external);
3195
3196        let nodes = flow
3197            .with_default_optimize()
3198            .with_process(&node, deployment.Localhost())
3199            .with_external(&external, deployment.Localhost())
3200            .deploy(&mut deployment);
3201
3202        deployment.deploy().await.unwrap();
3203
3204        let mut tick_send = nodes.connect(tick_send).await;
3205        let mut out_recv = nodes.connect(sum).await;
3206
3207        deployment.start().await.unwrap();
3208
3209        assert_eq!(out_recv.next().await.unwrap(), (2, 204));
3210
3211        tick_send.send(()).await.unwrap();
3212
3213        assert_eq!(out_recv.next().await.unwrap(), (3, 103));
3214    }
3215
3216    #[cfg(feature = "sim")]
3217    #[test]
3218    #[should_panic]
3219    fn sim_batch_nondet_size() {
3220        let mut flow = FlowBuilder::new();
3221        let node = flow.process::<()>();
3222
3223        let input = node.source_iter(q!([(1, 1), (1, 2), (2, 3)])).into_keyed();
3224
3225        let tick = node.tick();
3226        let out_recv = input
3227            .batch(&tick, nondet!(/** test */))
3228            .fold(q!(|| vec![]), q!(|acc, v| acc.push(v)))
3229            .entries()
3230            .all_ticks()
3231            .sim_output();
3232
3233        flow.sim().exhaustive(async || {
3234            out_recv
3235                .assert_yields_only_unordered([(1, vec![1, 2])])
3236                .await;
3237        });
3238    }
3239
3240    #[cfg(feature = "sim")]
3241    #[test]
3242    fn sim_batch_preserves_group_order() {
3243        let mut flow = FlowBuilder::new();
3244        let node = flow.process::<()>();
3245
3246        let input = node.source_iter(q!([(1, 1), (1, 2), (2, 3)])).into_keyed();
3247
3248        let tick = node.tick();
3249        let out_recv = input
3250            .batch(&tick, nondet!(/** test */))
3251            .all_ticks()
3252            .fold_early_stop(
3253                q!(|| 0),
3254                q!(|acc, v| {
3255                    *acc = std::cmp::max(v, *acc);
3256                    *acc >= 2
3257                }),
3258            )
3259            .entries()
3260            .sim_output();
3261
3262        let instances = flow.sim().exhaustive(async || {
3263            out_recv
3264                .assert_yields_only_unordered([(1, 2), (2, 3)])
3265                .await;
3266        });
3267
3268        assert_eq!(instances, 8);
3269        // - three cases: all three in a separate tick (pick where (2, 3) is)
3270        // - two cases: (1, 1) and (1, 2) together, (2, 3) before or after
3271        // - two cases: (1, 1) and (1, 2) separate, (2, 3) grouped with one of them
3272        // - one case: all three together
3273    }
3274
3275    #[cfg(feature = "sim")]
3276    #[test]
3277    fn sim_batch_unordered_shuffles() {
3278        let mut flow = FlowBuilder::new();
3279        let node = flow.process::<()>();
3280
3281        let input = node
3282            .source_iter(q!([(1, 1), (1, 2), (2, 3)]))
3283            .into_keyed()
3284            .weaken_ordering::<NoOrder>();
3285
3286        let tick = node.tick();
3287        let out_recv = input
3288            .batch(&tick, nondet!(/** test */))
3289            .all_ticks()
3290            .entries()
3291            .sim_output();
3292
3293        let instances = flow.sim().exhaustive(async || {
3294            out_recv
3295                .assert_yields_only_unordered([(1, 1), (1, 2), (2, 3)])
3296                .await;
3297        });
3298
3299        assert_eq!(instances, 13);
3300        // - 6 (3 * 2) cases: all three in a separate tick (pick where (2, 3) is), and order of (1, 1), (1, 2)
3301        // - two cases: (1, 1) and (1, 2) together, (2, 3) before or after (order of (1, 1), (1, 2) doesn't matter because batched is still unordered)
3302        // - 4 (2 * 2) cases: (1, 1) and (1, 2) separate, (2, 3) grouped with one of them, and order of (1, 1), (1, 2)
3303        // - one case: all three together (order of (1, 1), (1, 2) doesn't matter because batched is still unordered)
3304    }
3305
3306    #[cfg(feature = "sim")]
3307    #[test]
3308    #[should_panic]
3309    fn sim_observe_order_batched() {
3310        let mut flow = FlowBuilder::new();
3311        let node = flow.process::<()>();
3312
3313        let (port, input) = node.sim_input::<_, NoOrder, _>();
3314
3315        let tick = node.tick();
3316        let batch = input.into_keyed().batch(&tick, nondet!(/** test */));
3317        let out_recv = batch
3318            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3319            .all_ticks()
3320            .first()
3321            .entries()
3322            .sim_output();
3323
3324        flow.sim().exhaustive(async || {
3325            port.send_many_unordered([(1, 1), (1, 2), (2, 1), (2, 2)]);
3326            out_recv
3327                .assert_yields_only_unordered([(1, 1), (2, 1)])
3328                .await; // fails with assume_ordering
3329        });
3330    }
3331
3332    #[cfg(feature = "sim")]
3333    #[test]
3334    fn sim_observe_order_batched_count() {
3335        let mut flow = FlowBuilder::new();
3336        let node = flow.process::<()>();
3337
3338        let (port, input) = node.sim_input::<_, NoOrder, _>();
3339
3340        let tick = node.tick();
3341        let batch = input.into_keyed().batch(&tick, nondet!(/** test */));
3342        let out_recv = batch
3343            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3344            .all_ticks()
3345            .entries()
3346            .sim_output();
3347
3348        let instance_count = flow.sim().exhaustive(async || {
3349            port.send_many_unordered([(1, 1), (1, 2), (2, 1), (2, 2)]);
3350            let _ = out_recv.collect_sorted::<Vec<_>>().await;
3351        });
3352
3353        assert_eq!(instance_count, 104); // too complicated to enumerate here, but less than stream equivalent
3354    }
3355
3356    #[cfg(feature = "sim")]
3357    #[test]
3358    fn sim_top_level_assume_ordering() {
3359        use std::collections::HashMap;
3360
3361        let mut flow = FlowBuilder::new();
3362        let node = flow.process::<()>();
3363
3364        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3365
3366        let out_recv = input
3367            .into_keyed()
3368            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3369            .fold_early_stop(
3370                q!(|| Vec::new()),
3371                q!(|acc, v| {
3372                    acc.push(v);
3373                    acc.len() >= 2
3374                }),
3375            )
3376            .entries()
3377            .sim_output();
3378
3379        let instance_count = flow.sim().exhaustive(async || {
3380            in_send.send_many_unordered([(1, 'a'), (1, 'b'), (2, 'c'), (2, 'd')]);
3381            let out: HashMap<_, _> = out_recv
3382                .collect_sorted::<Vec<_>>()
3383                .await
3384                .into_iter()
3385                .collect();
3386            // Each key accumulates its values; we get one entry per key
3387            assert_eq!(out.len(), 2);
3388        });
3389
3390        assert_eq!(instance_count, 24)
3391    }
3392
3393    #[cfg(feature = "sim")]
3394    #[test]
3395    fn sim_top_level_assume_ordering_cycle_back() {
3396        use std::collections::HashMap;
3397
3398        let mut flow = FlowBuilder::new();
3399        let node = flow.process::<()>();
3400        let node2 = flow.process::<()>();
3401
3402        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3403
3404        let (complete_cycle_back, cycle_back) =
3405            node.forward_ref::<super::KeyedStream<_, _, _, _, NoOrder>>();
3406        let ordered = input
3407            .into_keyed()
3408            .merge_unordered(cycle_back)
3409            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3410        complete_cycle_back.complete(
3411            ordered
3412                .clone()
3413                .map(q!(|v| v + 1))
3414                .filter(q!(|v| v % 2 == 1))
3415                .entries()
3416                .send(&node2, TCP.fail_stop().bincode())
3417                .send(&node, TCP.fail_stop().bincode())
3418                .into_keyed(),
3419        );
3420
3421        let out_recv = ordered
3422            .fold_early_stop(
3423                q!(|| Vec::new()),
3424                q!(|acc, v| {
3425                    acc.push(v);
3426                    acc.len() >= 2
3427                }),
3428            )
3429            .entries()
3430            .sim_output();
3431
3432        let mut saw = false;
3433        let instance_count = flow.sim().exhaustive(async || {
3434            // Send (1, 0) and (1, 2). 0+1=1 is odd so cycles back.
3435            // We want to see [0, 1] - the cycled back value interleaved
3436            in_send.send_many_unordered([(1, 0), (1, 2)]);
3437            let out: HashMap<_, _> = out_recv
3438                .collect_sorted::<Vec<_>>()
3439                .await
3440                .into_iter()
3441                .collect();
3442
3443            // We want to see an instance where key 1 gets: 0, then 1 (cycled back from 0+1)
3444            if let Some(values) = out.get(&1)
3445                && *values == vec![0, 1]
3446            {
3447                saw = true;
3448            }
3449        });
3450
3451        assert!(
3452            saw,
3453            "did not see an instance with key 1 having [0, 1] in order"
3454        );
3455        assert_eq!(instance_count, 6);
3456    }
3457
3458    #[cfg(feature = "sim")]
3459    #[test]
3460    fn sim_top_level_assume_ordering_cross_key_cycle() {
3461        use std::collections::HashMap;
3462
3463        // This test demonstrates why releasing one entry at a time is important:
3464        // When one key's observed order cycles back into a different key, we need
3465        // to be able to interleave the cycled-back entry with pending items for
3466        // that other key.
3467        let mut flow = FlowBuilder::new();
3468        let node = flow.process::<()>();
3469        let node2 = flow.process::<()>();
3470
3471        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3472
3473        let (complete_cycle_back, cycle_back) =
3474            node.forward_ref::<super::KeyedStream<_, _, _, _, NoOrder>>();
3475        let ordered = input
3476            .into_keyed()
3477            .merge_unordered(cycle_back)
3478            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3479
3480        // Cycle back: when we see (1, 10), emit (2, 100) to key 2
3481        complete_cycle_back.complete(
3482            ordered
3483                .clone()
3484                .filter(q!(|v| *v == 10))
3485                .map(q!(|_| 100))
3486                .entries()
3487                .map(q!(|(_, v)| (2, v))) // Change key from 1 to 2
3488                .send(&node2, TCP.fail_stop().bincode())
3489                .send(&node, TCP.fail_stop().bincode())
3490                .into_keyed(),
3491        );
3492
3493        let out_recv = ordered
3494            .fold_early_stop(
3495                q!(|| Vec::new()),
3496                q!(|acc, v| {
3497                    acc.push(v);
3498                    acc.len() >= 2
3499                }),
3500            )
3501            .entries()
3502            .sim_output();
3503
3504        // We want to see an instance where:
3505        // - (1, 10) is released first
3506        // - This causes (2, 100) to be cycled back
3507        // - (2, 100) is released BEFORE (2, 20) which was already pending
3508        let mut saw_cross_key_interleave = false;
3509        let instance_count = flow.sim().exhaustive(async || {
3510            // Send (1, 10), (1, 11) for key 1, and (2, 20), (2, 21) for key 2
3511            in_send.send_many_unordered([(1, 10), (1, 11), (2, 20), (2, 21)]);
3512            let out: HashMap<_, _> = out_recv
3513                .collect_sorted::<Vec<_>>()
3514                .await
3515                .into_iter()
3516                .collect();
3517
3518            // Check if we see the cross-key interleaving:
3519            // key 2 should have [100, 20] or [100, 21] - cycled back 100 before a pending item
3520            if let Some(values) = out.get(&2)
3521                && values.len() >= 2
3522                && values[0] == 100
3523            {
3524                saw_cross_key_interleave = true;
3525            }
3526        });
3527
3528        assert!(
3529            saw_cross_key_interleave,
3530            "did not see an instance where cycled-back 100 was released before pending items for key 2"
3531        );
3532        assert_eq!(instance_count, 60);
3533    }
3534
3535    #[cfg(feature = "sim")]
3536    #[test]
3537    fn sim_top_level_assume_ordering_cycle_back_tick() {
3538        use std::collections::HashMap;
3539
3540        let mut flow = FlowBuilder::new();
3541        let node = flow.process::<()>();
3542        let node2 = flow.process::<()>();
3543
3544        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3545
3546        let (complete_cycle_back, cycle_back) =
3547            node.forward_ref::<super::KeyedStream<_, _, _, _, NoOrder>>();
3548        let ordered = input
3549            .into_keyed()
3550            .merge_unordered(cycle_back)
3551            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3552        complete_cycle_back.complete(
3553            ordered
3554                .clone()
3555                .batch(&node.tick(), nondet!(/** test */))
3556                .all_ticks()
3557                .map(q!(|v| v + 1))
3558                .filter(q!(|v| v % 2 == 1))
3559                .entries()
3560                .send(&node2, TCP.fail_stop().bincode())
3561                .send(&node, TCP.fail_stop().bincode())
3562                .into_keyed(),
3563        );
3564
3565        let out_recv = ordered
3566            .fold_early_stop(
3567                q!(|| Vec::new()),
3568                q!(|acc, v| {
3569                    acc.push(v);
3570                    acc.len() >= 2
3571                }),
3572            )
3573            .entries()
3574            .sim_output();
3575
3576        let mut saw = false;
3577        let instance_count = flow.sim().exhaustive(async || {
3578            in_send.send_many_unordered([(1, 0), (1, 2)]);
3579            let out: HashMap<_, _> = out_recv
3580                .collect_sorted::<Vec<_>>()
3581                .await
3582                .into_iter()
3583                .collect();
3584
3585            if let Some(values) = out.get(&1)
3586                && *values == vec![0, 1]
3587            {
3588                saw = true;
3589            }
3590        });
3591
3592        assert!(
3593            saw,
3594            "did not see an instance with key 1 having [0, 1] in order"
3595        );
3596        assert_eq!(instance_count, 58);
3597    }
3598
3599    #[cfg(feature = "sim")]
3600    #[test]
3601    fn sim_entries_partially_ordered_bounded() {
3602        let mut flow = FlowBuilder::new();
3603        let node = flow.process::<()>();
3604
3605        let (port, input) = node.sim_input::<_, TotalOrder, _>();
3606
3607        let tick = node.tick();
3608        let batch = input.into_keyed().batch(&tick, nondet!(/** test */));
3609        let out_recv = batch
3610            .entries_partially_ordered(nondet!(/** test */))
3611            .all_ticks()
3612            .sim_output();
3613
3614        let instance_count = flow.sim().exhaustive(async || {
3615            port.send((1, 'a'));
3616            port.send((1, 'b'));
3617            port.send((2, 'c'));
3618            let _: Vec<(i32, char)> = out_recv.collect().await;
3619        });
3620
3621        assert_eq!(instance_count, 12);
3622    }
3623
3624    #[cfg(feature = "sim")]
3625    #[test]
3626    fn sim_entries_partially_ordered_top_level() {
3627        let mut flow = FlowBuilder::new();
3628        let node = flow.process::<()>();
3629
3630        let (in_send, input) = node.sim_input::<_, TotalOrder, _>();
3631
3632        let out_recv = input
3633            .into_keyed()
3634            .entries_partially_ordered(nondet!(/** test */))
3635            .sim_output();
3636
3637        let instance_count = flow.sim().exhaustive(async || {
3638            in_send.send((1, 'a'));
3639            in_send.send((1, 'b'));
3640            in_send.send((2, 'c'));
3641            let _: Vec<(i32, char)> = out_recv.collect().await;
3642        });
3643
3644        assert_eq!(instance_count, 3);
3645    }
3646
3647    #[cfg(feature = "sim")]
3648    #[test]
3649    fn sim_entries_partially_ordered_cycle_back() {
3650        let mut flow = FlowBuilder::new();
3651        let node = flow.process::<()>();
3652        let node2 = flow.process::<()>();
3653
3654        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3655
3656        let (complete_cycle_back, cycle_back) =
3657            node.forward_ref::<super::KeyedStream<_, _, _, _, NoOrder>>();
3658        let ordered = input
3659            .into_keyed()
3660            .merge_unordered(cycle_back)
3661            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3662
3663        let flat = ordered
3664            .clone()
3665            .entries_partially_ordered(nondet!(/** test */));
3666
3667        complete_cycle_back.complete(
3668            flat.clone()
3669                .map(q!(|(k, v): (i32, i32)| (k, v + 1)))
3670                .filter(q!(|(_, v)| *v % 2 == 1))
3671                .send(&node2, TCP.fail_stop().bincode())
3672                .send(&node, TCP.fail_stop().bincode())
3673                .into_keyed(),
3674        );
3675
3676        let out_recv = flat.sim_output();
3677
3678        let mut saw = false;
3679        let instance_count = flow.sim().exhaustive(async || {
3680            // Send (1, 0) and (1, 2). 0+1=1 is odd so cycles back as (1, 1).
3681            // We want to see (1, 1) before (1, 2) - the cycled back value beats the pending one
3682            in_send.send_many_unordered([(1, 0), (1, 2)]);
3683            let results: Vec<(i32, i32)> = out_recv.collect().await;
3684
3685            let pos_1 = results.iter().position(|v| *v == (1, 1));
3686            let pos_2 = results.iter().position(|v| *v == (1, 2));
3687            if let (Some(p1), Some(p2)) = (pos_1, pos_2)
3688                && p1 < p2
3689            {
3690                saw = true;
3691            }
3692        });
3693
3694        assert!(saw, "did not see an instance with (1, 1) before (1, 2)");
3695        assert_eq!(instance_count, 78);
3696    }
3697}