hydro_lang/live_collections/keyed_stream/mod.rs
1//! Definitions for the [`KeyedStream`] live collection.
2
3use std::cell::RefCell;
4use std::collections::HashMap;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, QuotedWithContextWithProps, q};
11
12use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
13use super::keyed_singleton::KeyedSingleton;
14use super::optional::Optional;
15use super::stream::{
16 ExactlyOnce, IsExactlyOnce, IsOrdered, MinOrder, MinRetries, NoOrder, Stream, TotalOrder,
17};
18use crate::compile::builder::{CycleId, FlowState};
19use crate::compile::ir::{
20 CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, SharedNode, StreamOrder, StreamRetry,
21};
22#[cfg(stageleft_runtime)]
23use crate::forward_handle::{CycleCollection, ReceiverComplete};
24use crate::forward_handle::{ForwardRef, TickCycle};
25use crate::live_collections::batch_atomic::BatchAtomic;
26use crate::live_collections::keyed_singleton::KeyedSingletonBound;
27use crate::live_collections::stream::{
28 AtLeastOnce, Ordering, Retries, WeakerOrderingThan, WeakerRetryThan,
29};
30#[cfg(stageleft_runtime)]
31use crate::location::dynamic::{DynLocation, LocationId};
32use crate::location::tick::DeferTick;
33use crate::location::{Atomic, Location, Tick, check_matching_location};
34use crate::manual_expr::ManualExpr;
35use crate::nondet::{NonDet, nondet};
36use crate::properties::{
37 AggFuncAlgebra, ApplyMonotoneKeyedStream, ValidCommutativityFor, ValidIdempotenceFor,
38 manual_proof,
39};
40
41pub mod networking;
42
43/// Streaming elements of type `V` grouped by a key of type `K`.
44///
45/// Keyed Streams capture streaming elements of type `V` grouped by a key of type `K`, where the
46/// order of keys is non-deterministic but the order *within* each group may be deterministic.
47///
48/// Although keyed streams are conceptually grouped by keys, values are not immediately grouped
49/// into buckets when constructing a keyed stream. Instead, keyed streams defer grouping until an
50/// operator such as [`KeyedStream::fold`] is called, which requires `K: Hash + Eq`.
51///
52/// Type Parameters:
53/// - `K`: the type of the key for each group
54/// - `V`: the type of the elements inside each group
55/// - `Loc`: the [`Location`] where the keyed stream is materialized
56/// - `Bound`: tracks whether the entries are [`Bounded`] (local and finite) or [`Unbounded`] (asynchronous and possibly infinite)
57/// - `Order`: tracks whether the elements within each group have deterministic order
58/// ([`TotalOrder`]) or not ([`NoOrder`])
59/// - `Retries`: tracks whether the elements within each group have deterministic cardinality
60/// ([`ExactlyOnce`]) or may have non-deterministic retries ([`crate::live_collections::stream::AtLeastOnce`])
61pub struct KeyedStream<
62 K,
63 V,
64 Loc,
65 Bound: Boundedness = Unbounded,
66 Order: Ordering = TotalOrder,
67 Retry: Retries = ExactlyOnce,
68> {
69 pub(crate) location: Loc,
70 pub(crate) ir_node: RefCell<HydroNode>,
71 pub(crate) flow_state: FlowState,
72
73 _phantom: PhantomData<(K, V, Loc, Bound, Order, Retry)>,
74}
75
76impl<K, V, L, B: Boundedness, O: Ordering, R: Retries> Drop for KeyedStream<K, V, L, B, O, R> {
77 fn drop(&mut self) {
78 let ir_node = self.ir_node.replace(HydroNode::Placeholder);
79 if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
80 self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
81 input: Box::new(ir_node),
82 op_metadata: HydroIrOpMetadata::new(),
83 });
84 }
85 }
86}
87
88impl<'a, K, V, L, O: Ordering, R: Retries> From<KeyedStream<K, V, L, Bounded, O, R>>
89 for KeyedStream<K, V, L, Unbounded, O, R>
90where
91 L: Location<'a>,
92{
93 fn from(stream: KeyedStream<K, V, L, Bounded, O, R>) -> KeyedStream<K, V, L, Unbounded, O, R> {
94 let new_meta = stream
95 .location
96 .new_node_metadata(KeyedStream::<K, V, L, Unbounded, O, R>::collection_kind());
97
98 KeyedStream {
99 location: stream.location.clone(),
100 flow_state: stream.flow_state.clone(),
101 ir_node: RefCell::new(HydroNode::Cast {
102 inner: Box::new(stream.ir_node.replace(HydroNode::Placeholder)),
103 metadata: new_meta,
104 }),
105 _phantom: PhantomData,
106 }
107 }
108}
109
110impl<'a, K, V, L, B: Boundedness, R: Retries> From<KeyedStream<K, V, L, B, TotalOrder, R>>
111 for KeyedStream<K, V, L, B, NoOrder, R>
112where
113 L: Location<'a>,
114{
115 fn from(stream: KeyedStream<K, V, L, B, TotalOrder, R>) -> KeyedStream<K, V, L, B, NoOrder, R> {
116 stream.weaken_ordering()
117 }
118}
119
120impl<'a, K, V, L, O: Ordering, R: Retries> DeferTick for KeyedStream<K, V, Tick<L>, Bounded, O, R>
121where
122 L: Location<'a>,
123{
124 fn defer_tick(self) -> Self {
125 KeyedStream::defer_tick(self)
126 }
127}
128
129impl<'a, K, V, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
130 for KeyedStream<K, V, Tick<L>, Bounded, O, R>
131where
132 L: Location<'a>,
133{
134 type Location = Tick<L>;
135
136 fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
137 KeyedStream {
138 flow_state: location.flow_state().clone(),
139 location: location.clone(),
140 ir_node: RefCell::new(HydroNode::CycleSource {
141 cycle_id,
142 metadata: location.new_node_metadata(
143 KeyedStream::<K, V, Tick<L>, Bounded, O, R>::collection_kind(),
144 ),
145 }),
146 _phantom: PhantomData,
147 }
148 }
149}
150
151impl<'a, K, V, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
152 for KeyedStream<K, V, Tick<L>, Bounded, O, R>
153where
154 L: Location<'a>,
155{
156 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
157 assert_eq!(
158 Location::id(&self.location),
159 expected_location,
160 "locations do not match"
161 );
162
163 self.location
164 .flow_state()
165 .borrow_mut()
166 .push_root(HydroRoot::CycleSink {
167 cycle_id,
168 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
169 op_metadata: HydroIrOpMetadata::new(),
170 });
171 }
172}
173
174impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
175 for KeyedStream<K, V, L, B, O, R>
176where
177 L: Location<'a>,
178{
179 type Location = L;
180
181 fn create_source(cycle_id: CycleId, location: L) -> Self {
182 KeyedStream {
183 flow_state: location.flow_state().clone(),
184 location: location.clone(),
185 ir_node: RefCell::new(HydroNode::CycleSource {
186 cycle_id,
187 metadata: location
188 .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
189 }),
190 _phantom: PhantomData,
191 }
192 }
193}
194
195impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
196 for KeyedStream<K, V, L, B, O, R>
197where
198 L: Location<'a>,
199{
200 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
201 assert_eq!(
202 Location::id(&self.location),
203 expected_location,
204 "locations do not match"
205 );
206 self.location
207 .flow_state()
208 .borrow_mut()
209 .push_root(HydroRoot::CycleSink {
210 cycle_id,
211 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
212 op_metadata: HydroIrOpMetadata::new(),
213 });
214 }
215}
216
217impl<'a, K: Clone, V: Clone, Loc: Location<'a>, Bound: Boundedness, Order: Ordering, R: Retries>
218 Clone for KeyedStream<K, V, Loc, Bound, Order, R>
219{
220 fn clone(&self) -> Self {
221 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
222 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
223 *self.ir_node.borrow_mut() = HydroNode::Tee {
224 inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
225 metadata: self.location.new_node_metadata(Self::collection_kind()),
226 };
227 }
228
229 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
230 KeyedStream {
231 location: self.location.clone(),
232 flow_state: self.flow_state.clone(),
233 ir_node: HydroNode::Tee {
234 inner: SharedNode(inner.0.clone()),
235 metadata: metadata.clone(),
236 }
237 .into(),
238 _phantom: PhantomData,
239 }
240 } else {
241 unreachable!()
242 }
243 }
244}
245
246/// The output of a Hydro generator created with [`KeyedStream::generator`], which can yield elements and
247/// control the processing of future elements.
248pub enum Generate<T> {
249 /// Emit the provided element, and keep processing future inputs.
250 Yield(T),
251 /// Emit the provided element as the _final_ element, do not process future inputs.
252 Return(T),
253 /// Do not emit anything, but continue processing future inputs.
254 Continue,
255 /// Do not emit anything, and do not process further inputs.
256 Break,
257}
258
259impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
260 KeyedStream<K, V, L, B, O, R>
261{
262 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
263 debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
264 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
265
266 let flow_state = location.flow_state().clone();
267 KeyedStream {
268 location,
269 flow_state,
270 ir_node: RefCell::new(ir_node),
271 _phantom: PhantomData,
272 }
273 }
274
275 /// Returns the [`CollectionKind`] corresponding to this type.
276 pub fn collection_kind() -> CollectionKind {
277 CollectionKind::KeyedStream {
278 bound: B::BOUND_KIND,
279 value_order: O::ORDERING_KIND,
280 value_retry: R::RETRIES_KIND,
281 key_type: stageleft::quote_type::<K>().into(),
282 value_type: stageleft::quote_type::<V>().into(),
283 }
284 }
285
286 /// Returns the [`Location`] where this keyed stream is being materialized.
287 pub fn location(&self) -> &L {
288 &self.location
289 }
290
291 /// Weakens the consistency of this live collection to not guarantee any consistency across
292 /// cluster members (if this collection is on a cluster).
293 pub fn weaken_consistency(self) -> KeyedStream<K, V, L::DropConsistency, B, O, R>
294 where
295 L: Location<'a>,
296 {
297 if L::consistency()
298 .is_none_or(|c| c == crate::location::dynamic::ClusterConsistency::NoConsistency)
299 {
300 // already no consistency
301 KeyedStream::new(
302 self.location.drop_consistency(),
303 self.ir_node.replace(HydroNode::Placeholder),
304 )
305 } else {
306 KeyedStream::new(
307 self.location.drop_consistency(),
308 HydroNode::Cast {
309 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
310 metadata: self
311 .location
312 .drop_consistency()
313 .new_node_metadata(
314 KeyedStream::<K, V, L::DropConsistency, B>::collection_kind(),
315 ),
316 },
317 )
318 }
319 }
320
321 /// Casts this live collection to have the consistency guarantees specified in the given
322 /// location type parameter. The developer must ensure that the strengthened consistency
323 /// is actually guaranteed, via the proof field (see [`crate::prelude::manual_proof`]).
324 pub fn assert_has_consistency_of<L2: Location<'a, DropConsistency = L::DropConsistency>>(
325 self,
326 _proof: impl crate::properties::ConsistencyProof,
327 ) -> KeyedStream<K, V, L2, B, O, R>
328 where
329 L: Location<'a>,
330 {
331 if L::consistency() == L2::consistency() {
332 KeyedStream::new(
333 self.location.with_consistency_of(),
334 self.ir_node.replace(HydroNode::Placeholder),
335 )
336 } else {
337 KeyedStream::new(
338 self.location.with_consistency_of(),
339 HydroNode::AssertIsConsistent {
340 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
341 trusted: false,
342 metadata: self
343 .location
344 .clone()
345 .with_consistency_of::<L2>()
346 .new_node_metadata(KeyedStream::<K, V, L2, B, O, R>::collection_kind()),
347 },
348 )
349 }
350 }
351
352 pub(crate) fn assert_has_consistency_of_trusted<
353 L2: Location<'a, DropConsistency = L::DropConsistency>,
354 >(
355 self,
356 _proof: impl crate::properties::ConsistencyProof,
357 ) -> KeyedStream<K, V, L2, B, O, R>
358 where
359 L: Location<'a>,
360 {
361 if L::consistency() == L2::consistency() {
362 KeyedStream::new(
363 self.location.with_consistency_of(),
364 self.ir_node.replace(HydroNode::Placeholder),
365 )
366 } else {
367 KeyedStream::new(
368 self.location.with_consistency_of(),
369 HydroNode::AssertIsConsistent {
370 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
371 trusted: true,
372 metadata: self
373 .location
374 .clone()
375 .with_consistency_of::<L2>()
376 .new_node_metadata(KeyedStream::<K, V, L2, B, O, R>::collection_kind()),
377 },
378 )
379 }
380 }
381
382 /// Turns this [`KeyedStream`] into a [`Stream`] preserving ordering, under the invariant
383 /// assumption that there is at most one key. If this invariant is broken, the program
384 /// may exhibit undefined behavior, so uses must be carefully vetted.
385 pub(crate) fn cast_at_most_one_key(self) -> Stream<(K, V), L, B, O, R> {
386 Stream::new(
387 self.location.clone(),
388 HydroNode::Cast {
389 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
390 metadata: self
391 .location
392 .new_node_metadata(Stream::<(K, V), L, B, O, R>::collection_kind()),
393 },
394 )
395 }
396
397 /// Turns this [`KeyedStream`] into a [`KeyedSingleton`], under the invariant assumption that
398 /// there is at most one entry per key. If this invariant is broken, the program may exhibit
399 /// undefined behavior, so uses must be carefully vetted.
400 pub(crate) fn cast_at_most_one_entry_per_key(
401 self,
402 ) -> KeyedSingleton<K, V, L, B::WithBoundedValue> {
403 KeyedSingleton::new(
404 self.location.clone(),
405 HydroNode::Cast {
406 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
407 metadata: self.location.new_node_metadata(KeyedSingleton::<
408 K,
409 V,
410 L,
411 B::WithBoundedValue,
412 >::collection_kind()),
413 },
414 )
415 }
416
417 pub(crate) fn use_ordering_type<O2: Ordering>(self) -> KeyedStream<K, V, L, B, O2, R> {
418 if O::ORDERING_KIND == O2::ORDERING_KIND {
419 KeyedStream::new(
420 self.location.clone(),
421 self.ir_node.replace(HydroNode::Placeholder),
422 )
423 } else {
424 panic!(
425 "Runtime ordering {:?} did not match requested cast {:?}.",
426 O::ORDERING_KIND,
427 O2::ORDERING_KIND
428 )
429 }
430 }
431
432 /// Explicitly "casts" the keyed stream to a type with a different ordering
433 /// guarantee for each group. Useful in unsafe code where the ordering cannot be proven
434 /// by the type-system.
435 ///
436 /// # Non-Determinism
437 /// This function is used as an escape hatch, and any mistakes in the
438 /// provided ordering guarantee will propagate into the guarantees
439 /// for the rest of the program.
440 pub fn assume_ordering<O2: Ordering>(
441 self,
442 _nondet: NonDet,
443 ) -> KeyedStream<K, V, L::DropConsistency, B, O2, R> {
444 if O::ORDERING_KIND == O2::ORDERING_KIND {
445 self.use_ordering_type().weaken_consistency()
446 } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
447 // We can always weaken the ordering guarantee
448 let target_location = self.location.drop_consistency();
449 KeyedStream::new(
450 target_location.clone(),
451 HydroNode::Cast {
452 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
453 metadata: target_location
454 .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
455 },
456 )
457 } else {
458 let target_location = self.location.drop_consistency();
459 KeyedStream::new(
460 target_location.clone(),
461 HydroNode::ObserveNonDet {
462 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
463 trusted: false,
464 metadata: target_location
465 .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
466 },
467 )
468 }
469 }
470
471 fn assume_ordering_trusted<O2: Ordering>(
472 self,
473 _nondet: NonDet,
474 ) -> KeyedStream<K, V, L, B, O2, R> {
475 if O::ORDERING_KIND == O2::ORDERING_KIND {
476 KeyedStream::new(
477 self.location.clone(),
478 self.ir_node.replace(HydroNode::Placeholder),
479 )
480 } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
481 // We can always weaken the ordering guarantee
482 KeyedStream::new(
483 self.location.clone(),
484 HydroNode::Cast {
485 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
486 metadata: self
487 .location
488 .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
489 },
490 )
491 } else {
492 KeyedStream::new(
493 self.location.clone(),
494 HydroNode::ObserveNonDet {
495 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
496 trusted: true,
497 metadata: self
498 .location
499 .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
500 },
501 )
502 }
503 }
504
505 #[deprecated = "use `weaken_ordering::<NoOrder>()` instead"]
506 /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
507 /// which is always safe because that is the weakest possible guarantee.
508 pub fn weakest_ordering(self) -> KeyedStream<K, V, L, B, NoOrder, R> {
509 self.weaken_ordering::<NoOrder>()
510 }
511
512 /// Weakens the ordering guarantee provided by the stream to `O2`, with the type-system
513 /// enforcing that `O2` is weaker than the input ordering guarantee.
514 pub fn weaken_ordering<O2: WeakerOrderingThan<O>>(self) -> KeyedStream<K, V, L, B, O2, R> {
515 let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
516 self.assume_ordering_trusted::<O2>(nondet)
517 }
518
519 /// Strengthens the ordering guarantee to `TotalOrder`, given that `O: IsOrdered`, which
520 /// implies that `O == TotalOrder`.
521 pub fn make_totally_ordered(self) -> KeyedStream<K, V, L, B, TotalOrder, R>
522 where
523 O: IsOrdered,
524 {
525 self.assume_ordering_trusted(nondet!(/** no-op */))
526 }
527
528 /// Explicitly "casts" the keyed stream to a type with a different retries
529 /// guarantee for each group. Useful in unsafe code where the lack of retries cannot
530 /// be proven by the type-system.
531 ///
532 /// # Non-Determinism
533 /// This function is used as an escape hatch, and any mistakes in the
534 /// provided retries guarantee will propagate into the guarantees
535 /// for the rest of the program.
536 pub fn assume_retries<R2: Retries>(
537 self,
538 _nondet: NonDet,
539 ) -> KeyedStream<K, V, L::DropConsistency, B, O, R2> {
540 if R::RETRIES_KIND == R2::RETRIES_KIND {
541 KeyedStream::new(
542 self.location.drop_consistency(),
543 self.ir_node.replace(HydroNode::Placeholder),
544 )
545 } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
546 // We can always weaken the retries guarantee
547 let target_location = self.location.drop_consistency();
548 KeyedStream::new(
549 target_location.clone(),
550 HydroNode::Cast {
551 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
552 metadata: target_location
553 .new_node_metadata(KeyedStream::<K, V, L, B, O, R2>::collection_kind()),
554 },
555 )
556 } else {
557 let target_location = self.location.drop_consistency();
558 KeyedStream::new(
559 target_location.clone(),
560 HydroNode::ObserveNonDet {
561 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
562 trusted: false,
563 metadata: target_location
564 .new_node_metadata(KeyedStream::<K, V, L, B, O, R2>::collection_kind()),
565 },
566 )
567 }
568 }
569
570 // only for internal APIs that have been carefully vetted to ensure that the non-determinism
571 // is not observable
572 fn assume_retries_trusted<R2: Retries>(
573 self,
574 _nondet: NonDet,
575 ) -> KeyedStream<K, V, L, B, O, R2> {
576 if R::RETRIES_KIND == R2::RETRIES_KIND {
577 KeyedStream::new(
578 self.location.clone(),
579 self.ir_node.replace(HydroNode::Placeholder),
580 )
581 } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
582 // We can always weaken the retries guarantee
583 KeyedStream::new(
584 self.location.clone(),
585 HydroNode::Cast {
586 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
587 metadata: self
588 .location
589 .new_node_metadata(KeyedStream::<K, V, L, B, O, R2>::collection_kind()),
590 },
591 )
592 } else {
593 KeyedStream::new(
594 self.location.clone(),
595 HydroNode::ObserveNonDet {
596 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
597 trusted: true,
598 metadata: self
599 .location
600 .new_node_metadata(KeyedStream::<K, V, L, B, O, R2>::collection_kind()),
601 },
602 )
603 }
604 }
605
606 #[deprecated = "use `weaken_retries::<AtLeastOnce>()` instead"]
607 /// Weakens the retries guarantee provided by the stream to [`AtLeastOnce`],
608 /// which is always safe because that is the weakest possible guarantee.
609 pub fn weakest_retries(self) -> KeyedStream<K, V, L, B, O, AtLeastOnce> {
610 self.weaken_retries::<AtLeastOnce>()
611 }
612
613 /// Weakens the retries guarantee provided by the stream to `R2`, with the type-system
614 /// enforcing that `R2` is weaker than the input retries guarantee.
615 pub fn weaken_retries<R2: WeakerRetryThan<R>>(self) -> KeyedStream<K, V, L, B, O, R2> {
616 let nondet = nondet!(/** this is a weaker retries guarantee, so it is safe to assume */);
617 self.assume_retries_trusted::<R2>(nondet)
618 }
619
620 /// Strengthens the retry guarantee to `ExactlyOnce`, given that `R: IsExactlyOnce`, which
621 /// implies that `R == ExactlyOnce`.
622 pub fn make_exactly_once(self) -> KeyedStream<K, V, L, B, O, ExactlyOnce>
623 where
624 R: IsExactlyOnce,
625 {
626 self.assume_retries_trusted(nondet!(/** no-op */))
627 }
628
629 /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
630 /// implies that `B == Bounded`.
631 pub fn make_bounded(self) -> KeyedStream<K, V, L, Bounded, O, R>
632 where
633 B: IsBounded,
634 {
635 self.weaken_boundedness()
636 }
637
638 /// Weakens the boundedness guarantee to an arbitrary boundedness `B2`, given that `B: IsBounded`,
639 /// which implies that `B == Bounded`.
640 pub fn weaken_boundedness<B2: Boundedness>(self) -> KeyedStream<K, V, L, B2, O, R> {
641 if B::BOUNDED == B2::BOUNDED {
642 KeyedStream::new(
643 self.location.clone(),
644 self.ir_node.replace(HydroNode::Placeholder),
645 )
646 } else {
647 // We can always weaken the boundedness
648 KeyedStream::new(
649 self.location.clone(),
650 HydroNode::Cast {
651 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
652 metadata: self
653 .location
654 .new_node_metadata(KeyedStream::<K, V, L, B2, O, R>::collection_kind()),
655 },
656 )
657 }
658 }
659
660 /// Flattens the keyed stream into an unordered stream of key-value pairs.
661 ///
662 /// # Example
663 /// ```rust
664 /// # #[cfg(feature = "deploy")] {
665 /// # use hydro_lang::prelude::*;
666 /// # use futures::StreamExt;
667 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
668 /// process
669 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
670 /// .into_keyed()
671 /// .entries()
672 /// # }, |mut stream| async move {
673 /// // (1, 2), (1, 3), (2, 4) in any order
674 /// # let mut results = Vec::new();
675 /// # for _ in 0..3 {
676 /// # results.push(stream.next().await.unwrap());
677 /// # }
678 /// # results.sort();
679 /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4)]);
680 /// # }));
681 /// # }
682 /// ```
683 pub fn entries(self) -> Stream<(K, V), L, B, NoOrder, R> {
684 Stream::new(
685 self.location.clone(),
686 HydroNode::Cast {
687 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
688 metadata: self
689 .location
690 .new_node_metadata(Stream::<(K, V), L, B, NoOrder, R>::collection_kind()),
691 },
692 )
693 }
694
695 /// Flattens the keyed stream into a totally ordered stream of key-value pairs,
696 /// preserving the order of values within each key group but non-deterministically
697 /// interleaving across keys.
698 ///
699 /// Requires the keyed stream to be totally ordered within each group (`O: IsOrdered`).
700 ///
701 /// # Non-Determinism
702 /// The interleaving of entries across different keys is non-deterministic.
703 /// Within each key, the original order is preserved.
704 pub fn entries_partially_ordered(
705 self,
706 _nondet: NonDet,
707 ) -> Stream<(K, V), L::DropConsistency, B, TotalOrder, R>
708 where
709 O: IsOrdered,
710 {
711 let target_location = self.location.drop_consistency();
712 Stream::new(
713 target_location.clone(),
714 HydroNode::ObserveNonDet {
715 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
716 trusted: false,
717 metadata: target_location
718 .new_node_metadata(Stream::<(K, V), L, B, TotalOrder, R>::collection_kind()),
719 },
720 )
721 }
722
723 /// Flattens the keyed stream into an unordered stream of only the values.
724 ///
725 /// # Example
726 /// ```rust
727 /// # #[cfg(feature = "deploy")] {
728 /// # use hydro_lang::prelude::*;
729 /// # use futures::StreamExt;
730 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
731 /// process
732 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
733 /// .into_keyed()
734 /// .values()
735 /// # }, |mut stream| async move {
736 /// // 2, 3, 4 in any order
737 /// # let mut results = Vec::new();
738 /// # for _ in 0..3 {
739 /// # results.push(stream.next().await.unwrap());
740 /// # }
741 /// # results.sort();
742 /// # assert_eq!(results, vec![2, 3, 4]);
743 /// # }));
744 /// # }
745 /// ```
746 pub fn values(self) -> Stream<V, L, B, NoOrder, R> {
747 self.entries().map(q!(|(_, v)| v))
748 }
749
750 /// Flattens the keyed stream into an unordered stream of just the keys.
751 ///
752 /// # Example
753 /// ```rust
754 /// # #[cfg(feature = "deploy")] {
755 /// # use hydro_lang::prelude::*;
756 /// # use futures::StreamExt;
757 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
758 /// # process
759 /// # .source_iter(q!(vec![(1, 2), (2, 4), (1, 5)]))
760 /// # .into_keyed()
761 /// # .keys()
762 /// # }, |mut stream| async move {
763 /// // 1, 2 in any order
764 /// # let mut results = Vec::new();
765 /// # for _ in 0..2 {
766 /// # results.push(stream.next().await.unwrap());
767 /// # }
768 /// # results.sort();
769 /// # assert_eq!(results, vec![1, 2]);
770 /// # }));
771 /// # }
772 /// ```
773 pub fn keys(self) -> Stream<K, L, B, NoOrder, ExactlyOnce>
774 where
775 K: Eq + Hash,
776 {
777 self.entries().map(q!(|(k, _)| k)).unique()
778 }
779
780 /// Transforms each value by invoking `f` on each element, with keys staying the same
781 /// after transformation. If you need access to the key, see [`KeyedStream::map_with_key`].
782 ///
783 /// If you do not want to modify the stream and instead only want to view
784 /// each item use [`KeyedStream::inspect`] instead.
785 ///
786 /// # Example
787 /// ```rust
788 /// # #[cfg(feature = "deploy")] {
789 /// # use hydro_lang::prelude::*;
790 /// # use futures::StreamExt;
791 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
792 /// process
793 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
794 /// .into_keyed()
795 /// .map(q!(|v| v + 1))
796 /// # .entries()
797 /// # }, |mut stream| async move {
798 /// // { 1: [3, 4], 2: [5] }
799 /// # let mut results = Vec::new();
800 /// # for _ in 0..3 {
801 /// # results.push(stream.next().await.unwrap());
802 /// # }
803 /// # results.sort();
804 /// # assert_eq!(results, vec![(1, 3), (1, 4), (2, 5)]);
805 /// # }));
806 /// # }
807 /// ```
808 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, U, L, B, O, R>
809 where
810 F: Fn(V) -> U + 'a,
811 {
812 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
813 let map_f = q!({
814 let orig = f;
815 move |(k, v)| (k, orig(v))
816 })
817 .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
818 .into();
819
820 KeyedStream::new(
821 self.location.clone(),
822 HydroNode::Map {
823 f: map_f,
824 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
825 metadata: self
826 .location
827 .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
828 },
829 )
830 }
831
832 /// Transforms each value by invoking `f` on each key-value pair. The resulting values are **not**
833 /// re-grouped even they are tuples; instead they will be grouped under the original key.
834 ///
835 /// If you do not want to modify the stream and instead only want to view
836 /// each item use [`KeyedStream::inspect_with_key`] instead.
837 ///
838 /// # Example
839 /// ```rust
840 /// # #[cfg(feature = "deploy")] {
841 /// # use hydro_lang::prelude::*;
842 /// # use futures::StreamExt;
843 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
844 /// process
845 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
846 /// .into_keyed()
847 /// .map_with_key(q!(|(k, v)| k + v))
848 /// # .entries()
849 /// # }, |mut stream| async move {
850 /// // { 1: [3, 4], 2: [6] }
851 /// # let mut results = Vec::new();
852 /// # for _ in 0..3 {
853 /// # results.push(stream.next().await.unwrap());
854 /// # }
855 /// # results.sort();
856 /// # assert_eq!(results, vec![(1, 3), (1, 4), (2, 6)]);
857 /// # }));
858 /// # }
859 /// ```
860 pub fn map_with_key<U, F>(
861 self,
862 f: impl IntoQuotedMut<'a, F, L> + Copy,
863 ) -> KeyedStream<K, U, L, B, O, R>
864 where
865 F: Fn((K, V)) -> U + 'a,
866 K: Clone,
867 {
868 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
869 let map_f = q!({
870 let orig = f;
871 move |(k, v)| {
872 let out = orig((Clone::clone(&k), v));
873 (k, out)
874 }
875 })
876 .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
877 .into();
878
879 KeyedStream::new(
880 self.location.clone(),
881 HydroNode::Map {
882 f: map_f,
883 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
884 metadata: self
885 .location
886 .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
887 },
888 )
889 }
890
891 /// Prepends a new value to the key of each element in the stream, producing a new
892 /// keyed stream with compound keys. Because the original key is preserved, no re-grouping
893 /// occurs and the elements in each group preserve their original order.
894 ///
895 /// # Example
896 /// ```rust
897 /// # #[cfg(feature = "deploy")] {
898 /// # use hydro_lang::prelude::*;
899 /// # use futures::StreamExt;
900 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
901 /// process
902 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
903 /// .into_keyed()
904 /// .prefix_key(q!(|&(k, _)| k % 2))
905 /// # .entries()
906 /// # }, |mut stream| async move {
907 /// // { (1, 1): [2, 3], (0, 2): [4] }
908 /// # let mut results = Vec::new();
909 /// # for _ in 0..3 {
910 /// # results.push(stream.next().await.unwrap());
911 /// # }
912 /// # results.sort();
913 /// # assert_eq!(results, vec![((0, 2), 4), ((1, 1), 2), ((1, 1), 3)]);
914 /// # }));
915 /// # }
916 /// ```
917 pub fn prefix_key<K2, F>(
918 self,
919 f: impl IntoQuotedMut<'a, F, L> + Copy,
920 ) -> KeyedStream<(K2, K), V, L, B, O, R>
921 where
922 F: Fn(&(K, V)) -> K2 + 'a,
923 {
924 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
925 let map_f = q!({
926 let orig = f;
927 move |kv| {
928 let out = orig(&kv);
929 ((out, kv.0), kv.1)
930 }
931 })
932 .splice_fn1_ctx::<(K, V), ((K2, K), V)>(&self.location)
933 .into();
934
935 KeyedStream::new(
936 self.location.clone(),
937 HydroNode::Map {
938 f: map_f,
939 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
940 metadata: self
941 .location
942 .new_node_metadata(KeyedStream::<(K2, K), V, L, B, O, R>::collection_kind()),
943 },
944 )
945 }
946
947 /// Creates a stream containing only the elements of each group stream that satisfy a predicate
948 /// `f`, preserving the order of the elements within the group.
949 ///
950 /// The closure `f` receives a reference `&V` rather than an owned value `v` because filtering does
951 /// not modify or take ownership of the values. If you need to modify the values while filtering
952 /// use [`KeyedStream::filter_map`] instead.
953 ///
954 /// # Example
955 /// ```rust
956 /// # #[cfg(feature = "deploy")] {
957 /// # use hydro_lang::prelude::*;
958 /// # use futures::StreamExt;
959 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
960 /// process
961 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
962 /// .into_keyed()
963 /// .filter(q!(|&x| x > 2))
964 /// # .entries()
965 /// # }, |mut stream| async move {
966 /// // { 1: [3], 2: [4] }
967 /// # let mut results = Vec::new();
968 /// # for _ in 0..2 {
969 /// # results.push(stream.next().await.unwrap());
970 /// # }
971 /// # results.sort();
972 /// # assert_eq!(results, vec![(1, 3), (2, 4)]);
973 /// # }));
974 /// # }
975 /// ```
976 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, V, L, B, O, R>
977 where
978 F: Fn(&V) -> bool + 'a,
979 {
980 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
981 let filter_f = q!({
982 let orig = f;
983 move |t: &(_, _)| orig(&t.1)
984 })
985 .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
986 .into();
987
988 KeyedStream::new(
989 self.location.clone(),
990 HydroNode::Filter {
991 f: filter_f,
992 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
993 metadata: self.location.new_node_metadata(Self::collection_kind()),
994 },
995 )
996 }
997
998 /// Creates a stream containing only the elements of each group stream that satisfy a predicate
999 /// `f` (which receives the key-value tuple), preserving the order of the elements within the group.
1000 ///
1001 /// The closure `f` receives a reference `&(K, V)` rather than an owned value `(K, V)` because filtering does
1002 /// not modify or take ownership of the values. If you need to modify the values while filtering
1003 /// use [`KeyedStream::filter_map_with_key`] instead.
1004 ///
1005 /// # Example
1006 /// ```rust
1007 /// # #[cfg(feature = "deploy")] {
1008 /// # use hydro_lang::prelude::*;
1009 /// # use futures::StreamExt;
1010 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1011 /// process
1012 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
1013 /// .into_keyed()
1014 /// .filter_with_key(q!(|&(k, v)| v - k == 2))
1015 /// # .entries()
1016 /// # }, |mut stream| async move {
1017 /// // { 1: [3], 2: [4] }
1018 /// # let mut results = Vec::new();
1019 /// # for _ in 0..2 {
1020 /// # results.push(stream.next().await.unwrap());
1021 /// # }
1022 /// # results.sort();
1023 /// # assert_eq!(results, vec![(1, 3), (2, 4)]);
1024 /// # }));
1025 /// # }
1026 /// ```
1027 pub fn filter_with_key<F>(
1028 self,
1029 f: impl IntoQuotedMut<'a, F, L> + Copy,
1030 ) -> KeyedStream<K, V, L, B, O, R>
1031 where
1032 F: Fn(&(K, V)) -> bool + 'a,
1033 {
1034 let filter_f = f
1035 .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
1036 .into();
1037
1038 KeyedStream::new(
1039 self.location.clone(),
1040 HydroNode::Filter {
1041 f: filter_f,
1042 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1043 metadata: self.location.new_node_metadata(Self::collection_kind()),
1044 },
1045 )
1046 }
1047
1048 /// An operator that both filters and maps each value, with keys staying the same.
1049 /// It yields only the items for which the supplied closure `f` returns `Some(value)`.
1050 /// If you need access to the key, see [`KeyedStream::filter_map_with_key`].
1051 ///
1052 /// # Example
1053 /// ```rust
1054 /// # #[cfg(feature = "deploy")] {
1055 /// # use hydro_lang::prelude::*;
1056 /// # use futures::StreamExt;
1057 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1058 /// process
1059 /// .source_iter(q!(vec![(1, "2"), (1, "hello"), (2, "4")]))
1060 /// .into_keyed()
1061 /// .filter_map(q!(|s| s.parse::<usize>().ok()))
1062 /// # .entries()
1063 /// # }, |mut stream| async move {
1064 /// // { 1: [2], 2: [4] }
1065 /// # let mut results = Vec::new();
1066 /// # for _ in 0..2 {
1067 /// # results.push(stream.next().await.unwrap());
1068 /// # }
1069 /// # results.sort();
1070 /// # assert_eq!(results, vec![(1, 2), (2, 4)]);
1071 /// # }));
1072 /// # }
1073 /// ```
1074 pub fn filter_map<U, F>(
1075 self,
1076 f: impl IntoQuotedMut<'a, F, L> + Copy,
1077 ) -> KeyedStream<K, U, L, B, O, R>
1078 where
1079 F: Fn(V) -> Option<U> + 'a,
1080 {
1081 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
1082 let filter_map_f = q!({
1083 let orig = f;
1084 move |(k, v)| orig(v).map(|o| (k, o))
1085 })
1086 .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
1087 .into();
1088
1089 KeyedStream::new(
1090 self.location.clone(),
1091 HydroNode::FilterMap {
1092 f: filter_map_f,
1093 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1094 metadata: self
1095 .location
1096 .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
1097 },
1098 )
1099 }
1100
1101 /// An operator that both filters and maps each key-value pair. The resulting values are **not**
1102 /// re-grouped even they are tuples; instead they will be grouped under the original key.
1103 /// It yields only the items for which the supplied closure `f` returns `Some(value)`.
1104 ///
1105 /// # Example
1106 /// ```rust
1107 /// # #[cfg(feature = "deploy")] {
1108 /// # use hydro_lang::prelude::*;
1109 /// # use futures::StreamExt;
1110 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1111 /// process
1112 /// .source_iter(q!(vec![(1, "2"), (1, "hello"), (2, "2")]))
1113 /// .into_keyed()
1114 /// .filter_map_with_key(q!(|(k, s)| s.parse::<usize>().ok().filter(|v| v == &k)))
1115 /// # .entries()
1116 /// # }, |mut stream| async move {
1117 /// // { 2: [2] }
1118 /// # let mut results = Vec::new();
1119 /// # for _ in 0..1 {
1120 /// # results.push(stream.next().await.unwrap());
1121 /// # }
1122 /// # results.sort();
1123 /// # assert_eq!(results, vec![(2, 2)]);
1124 /// # }));
1125 /// # }
1126 /// ```
1127 pub fn filter_map_with_key<U, F>(
1128 self,
1129 f: impl IntoQuotedMut<'a, F, L> + Copy,
1130 ) -> KeyedStream<K, U, L, B, O, R>
1131 where
1132 F: Fn((K, V)) -> Option<U> + 'a,
1133 K: Clone,
1134 {
1135 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
1136 let filter_map_f = q!({
1137 let orig = f;
1138 move |(k, v)| {
1139 let out = orig((Clone::clone(&k), v));
1140 out.map(|o| (k, o))
1141 }
1142 })
1143 .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
1144 .into();
1145
1146 KeyedStream::new(
1147 self.location.clone(),
1148 HydroNode::FilterMap {
1149 f: filter_map_f,
1150 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1151 metadata: self
1152 .location
1153 .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
1154 },
1155 )
1156 }
1157
1158 /// Generates a keyed stream that maps each value `v` to a tuple `(v, x)`,
1159 /// where `v` is the value of `other`, a bounded [`super::singleton::Singleton`] or
1160 /// [`Optional`]. If `other` is an empty [`Optional`], no values will be produced.
1161 ///
1162 /// # Example
1163 /// ```rust
1164 /// # #[cfg(feature = "deploy")] {
1165 /// # use hydro_lang::prelude::*;
1166 /// # use futures::StreamExt;
1167 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1168 /// let tick = process.tick();
1169 /// let batch = process
1170 /// .source_iter(q!(vec![(1, 123), (1, 456), (2, 123)]))
1171 /// .into_keyed()
1172 /// .batch(&tick, nondet!(/** test */));
1173 /// let count = batch.clone().entries().count(); // `count()` returns a singleton
1174 /// batch.cross_singleton(count).all_ticks().entries()
1175 /// # }, |mut stream| async move {
1176 /// // { 1: [(123, 3), (456, 3)], 2: [(123, 3)] }
1177 /// # let mut results = Vec::new();
1178 /// # for _ in 0..3 {
1179 /// # results.push(stream.next().await.unwrap());
1180 /// # }
1181 /// # results.sort();
1182 /// # assert_eq!(results, vec![(1, (123, 3)), (1, (456, 3)), (2, (123, 3))]);
1183 /// # }));
1184 /// # }
1185 /// ```
1186 pub fn cross_singleton<O2>(
1187 self,
1188 other: impl Into<Optional<O2, L, Bounded>>,
1189 ) -> KeyedStream<K, (V, O2), L, B, O, R>
1190 where
1191 O2: Clone,
1192 {
1193 let other: Optional<O2, L, Bounded> = other.into();
1194 check_matching_location(&self.location, &other.location);
1195
1196 Stream::new(
1197 self.location.clone(),
1198 HydroNode::CrossSingleton {
1199 left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1200 right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
1201 metadata: self
1202 .location
1203 .new_node_metadata(Stream::<((K, V), O2), L, B, O, R>::collection_kind()),
1204 },
1205 )
1206 .map(q!(|((k, v), o2)| (k, (v, o2))))
1207 .into_keyed()
1208 }
1209
1210 /// For each value `v` in each group, transform `v` using `f` and then treat the
1211 /// result as an [`Iterator`] to produce values one by one within the same group.
1212 /// The implementation for [`Iterator`] for the output type `I` must produce items
1213 /// in a **deterministic** order.
1214 ///
1215 /// For example, `I` could be a `Vec`, but not a `HashSet`. If the order of the items in `I` is
1216 /// not deterministic, use [`KeyedStream::flat_map_unordered`] instead.
1217 ///
1218 /// # Example
1219 /// ```rust
1220 /// # #[cfg(feature = "deploy")] {
1221 /// # use hydro_lang::prelude::*;
1222 /// # use futures::StreamExt;
1223 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1224 /// process
1225 /// .source_iter(q!(vec![(1, vec![2, 3]), (1, vec![4]), (2, vec![5, 6])]))
1226 /// .into_keyed()
1227 /// .flat_map_ordered(q!(|x| x))
1228 /// # .entries()
1229 /// # }, |mut stream| async move {
1230 /// // { 1: [2, 3, 4], 2: [5, 6] }
1231 /// # let mut results = Vec::new();
1232 /// # for _ in 0..5 {
1233 /// # results.push(stream.next().await.unwrap());
1234 /// # }
1235 /// # results.sort();
1236 /// # assert_eq!(results, vec![(1, 2), (1, 3), (1, 4), (2, 5), (2, 6)]);
1237 /// # }));
1238 /// # }
1239 /// ```
1240 pub fn flat_map_ordered<U, I, F>(
1241 self,
1242 f: impl IntoQuotedMut<'a, F, L> + Copy,
1243 ) -> KeyedStream<K, U, L, B, O, R>
1244 where
1245 I: IntoIterator<Item = U>,
1246 F: Fn(V) -> I + 'a,
1247 K: Clone,
1248 {
1249 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
1250 let flat_map_f = q!({
1251 let orig = f;
1252 move |(k, v)| orig(v).into_iter().map(move |u| (Clone::clone(&k), u))
1253 })
1254 .splice_fn1_ctx::<(K, V), _>(&self.location)
1255 .into();
1256
1257 KeyedStream::new(
1258 self.location.clone(),
1259 HydroNode::FlatMap {
1260 f: flat_map_f,
1261 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1262 metadata: self
1263 .location
1264 .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
1265 },
1266 )
1267 }
1268
1269 /// Like [`KeyedStream::flat_map_ordered`], but allows the implementation of [`Iterator`]
1270 /// for the output type `I` to produce items in any order.
1271 ///
1272 /// # Example
1273 /// ```rust
1274 /// # #[cfg(feature = "deploy")] {
1275 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
1276 /// # use futures::StreamExt;
1277 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
1278 /// process
1279 /// .source_iter(q!(vec![
1280 /// (1, std::collections::HashSet::<i32>::from_iter(vec![2, 3])),
1281 /// (2, std::collections::HashSet::from_iter(vec![4, 5]))
1282 /// ]))
1283 /// .into_keyed()
1284 /// .flat_map_unordered(q!(|x| x))
1285 /// # .entries()
1286 /// # }, |mut stream| async move {
1287 /// // { 1: [2, 3], 2: [4, 5] } with values in each group in unknown order
1288 /// # let mut results = Vec::new();
1289 /// # for _ in 0..4 {
1290 /// # results.push(stream.next().await.unwrap());
1291 /// # }
1292 /// # results.sort();
1293 /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4), (2, 5)]);
1294 /// # }));
1295 /// # }
1296 /// ```
1297 pub fn flat_map_unordered<U, I, F>(
1298 self,
1299 f: impl IntoQuotedMut<'a, F, L> + Copy,
1300 ) -> KeyedStream<K, U, L, B, NoOrder, R>
1301 where
1302 I: IntoIterator<Item = U>,
1303 F: Fn(V) -> I + 'a,
1304 K: Clone,
1305 {
1306 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
1307 let flat_map_f = q!({
1308 let orig = f;
1309 move |(k, v)| orig(v).into_iter().map(move |u| (Clone::clone(&k), u))
1310 })
1311 .splice_fn1_ctx::<(K, V), _>(&self.location)
1312 .into();
1313
1314 KeyedStream::new(
1315 self.location.clone(),
1316 HydroNode::FlatMap {
1317 f: flat_map_f,
1318 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1319 metadata: self
1320 .location
1321 .new_node_metadata(KeyedStream::<K, U, L, B, NoOrder, R>::collection_kind()),
1322 },
1323 )
1324 }
1325
1326 /// For each value `v` in each group, treat `v` as an [`Iterator`] and produce its items one by one
1327 /// within the same group. The implementation for [`Iterator`] for the value type `V` must produce
1328 /// items in a **deterministic** order.
1329 ///
1330 /// For example, `V` could be a `Vec`, but not a `HashSet`. If the order of the items in `V` is
1331 /// not deterministic, use [`KeyedStream::flatten_unordered`] instead.
1332 ///
1333 /// # Example
1334 /// ```rust
1335 /// # #[cfg(feature = "deploy")] {
1336 /// # use hydro_lang::prelude::*;
1337 /// # use futures::StreamExt;
1338 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1339 /// process
1340 /// .source_iter(q!(vec![(1, vec![2, 3]), (1, vec![4]), (2, vec![5, 6])]))
1341 /// .into_keyed()
1342 /// .flatten_ordered()
1343 /// # .entries()
1344 /// # }, |mut stream| async move {
1345 /// // { 1: [2, 3, 4], 2: [5, 6] }
1346 /// # let mut results = Vec::new();
1347 /// # for _ in 0..5 {
1348 /// # results.push(stream.next().await.unwrap());
1349 /// # }
1350 /// # results.sort();
1351 /// # assert_eq!(results, vec![(1, 2), (1, 3), (1, 4), (2, 5), (2, 6)]);
1352 /// # }));
1353 /// # }
1354 /// ```
1355 pub fn flatten_ordered<U>(self) -> KeyedStream<K, U, L, B, O, R>
1356 where
1357 V: IntoIterator<Item = U>,
1358 K: Clone,
1359 {
1360 self.flat_map_ordered(q!(|d| d))
1361 }
1362
1363 /// Like [`KeyedStream::flatten_ordered`], but allows the implementation of [`Iterator`]
1364 /// for the value type `V` to produce items in any order.
1365 ///
1366 /// # Example
1367 /// ```rust
1368 /// # #[cfg(feature = "deploy")] {
1369 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
1370 /// # use futures::StreamExt;
1371 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
1372 /// process
1373 /// .source_iter(q!(vec![
1374 /// (1, std::collections::HashSet::<i32>::from_iter(vec![2, 3])),
1375 /// (2, std::collections::HashSet::from_iter(vec![4, 5]))
1376 /// ]))
1377 /// .into_keyed()
1378 /// .flatten_unordered()
1379 /// # .entries()
1380 /// # }, |mut stream| async move {
1381 /// // { 1: [2, 3], 2: [4, 5] } with values in each group in unknown order
1382 /// # let mut results = Vec::new();
1383 /// # for _ in 0..4 {
1384 /// # results.push(stream.next().await.unwrap());
1385 /// # }
1386 /// # results.sort();
1387 /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4), (2, 5)]);
1388 /// # }));
1389 /// # }
1390 /// ```
1391 pub fn flatten_unordered<U>(self) -> KeyedStream<K, U, L, B, NoOrder, R>
1392 where
1393 V: IntoIterator<Item = U>,
1394 K: Clone,
1395 {
1396 self.flat_map_unordered(q!(|d| d))
1397 }
1398
1399 /// An operator which allows you to "inspect" each element of a stream without
1400 /// modifying it. The closure `f` is called on a reference to each value. This is
1401 /// mainly useful for debugging, and should not be used to generate side-effects.
1402 ///
1403 /// # Example
1404 /// ```rust
1405 /// # #[cfg(feature = "deploy")] {
1406 /// # use hydro_lang::prelude::*;
1407 /// # use futures::StreamExt;
1408 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1409 /// process
1410 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
1411 /// .into_keyed()
1412 /// .inspect(q!(|v| println!("{}", v)))
1413 /// # .entries()
1414 /// # }, |mut stream| async move {
1415 /// # let mut results = Vec::new();
1416 /// # for _ in 0..3 {
1417 /// # results.push(stream.next().await.unwrap());
1418 /// # }
1419 /// # results.sort();
1420 /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4)]);
1421 /// # }));
1422 /// # }
1423 /// ```
1424 pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> Self
1425 where
1426 F: Fn(&V) + 'a,
1427 {
1428 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
1429 let inspect_f = q!({
1430 let orig = f;
1431 move |t: &(_, _)| orig(&t.1)
1432 })
1433 .splice_fn1_borrow_ctx::<(K, V), ()>(&self.location)
1434 .into();
1435
1436 KeyedStream::new(
1437 self.location.clone(),
1438 HydroNode::Inspect {
1439 f: inspect_f,
1440 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1441 metadata: self.location.new_node_metadata(Self::collection_kind()),
1442 },
1443 )
1444 }
1445
1446 /// An operator which allows you to "inspect" each element of a stream without
1447 /// modifying it. The closure `f` is called on a reference to each key-value pair. This is
1448 /// mainly useful for debugging, and should not be used to generate side-effects.
1449 ///
1450 /// # Example
1451 /// ```rust
1452 /// # #[cfg(feature = "deploy")] {
1453 /// # use hydro_lang::prelude::*;
1454 /// # use futures::StreamExt;
1455 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1456 /// process
1457 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
1458 /// .into_keyed()
1459 /// .inspect_with_key(q!(|(k, v)| println!("{}: {}", k, v)))
1460 /// # .entries()
1461 /// # }, |mut stream| async move {
1462 /// # let mut results = Vec::new();
1463 /// # for _ in 0..3 {
1464 /// # results.push(stream.next().await.unwrap());
1465 /// # }
1466 /// # results.sort();
1467 /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4)]);
1468 /// # }));
1469 /// # }
1470 /// ```
1471 pub fn inspect_with_key<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
1472 where
1473 F: Fn(&(K, V)) + 'a,
1474 {
1475 let inspect_f = f.splice_fn1_borrow_ctx::<(K, V), ()>(&self.location).into();
1476
1477 KeyedStream::new(
1478 self.location.clone(),
1479 HydroNode::Inspect {
1480 f: inspect_f,
1481 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1482 metadata: self.location.new_node_metadata(Self::collection_kind()),
1483 },
1484 )
1485 }
1486
1487 /// An operator which allows you to "name" a `HydroNode`.
1488 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
1489 pub fn ir_node_named(self, name: &str) -> KeyedStream<K, V, L, B, O, R> {
1490 {
1491 let mut node = self.ir_node.borrow_mut();
1492 let metadata = node.metadata_mut();
1493 metadata.tag = Some(name.to_owned());
1494 }
1495 self
1496 }
1497
1498 /// A special case of [`Stream::scan`] for keyed streams. For each key group the values are transformed via the `f` combinator.
1499 ///
1500 /// Unlike [`KeyedStream::fold`] which only returns the final accumulated value, `scan` produces a new stream
1501 /// containing all intermediate accumulated values paired with the key. The scan operation can also terminate
1502 /// early by returning `None`.
1503 ///
1504 /// The function takes a mutable reference to the accumulator and the current element, and returns
1505 /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1506 /// If the function returns `None`, the stream is terminated and no more elements are processed.
1507 ///
1508 /// # Example
1509 /// ```rust
1510 /// # #[cfg(feature = "deploy")] {
1511 /// # use hydro_lang::prelude::*;
1512 /// # use futures::StreamExt;
1513 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1514 /// process
1515 /// .source_iter(q!(vec![(0, 1), (0, 3), (1, 3), (1, 4)]))
1516 /// .into_keyed()
1517 /// .scan(
1518 /// q!(|| 0),
1519 /// q!(|acc, x| {
1520 /// *acc += x;
1521 /// if *acc % 2 == 0 { None } else { Some(*acc) }
1522 /// }),
1523 /// )
1524 /// # .entries()
1525 /// # }, |mut stream| async move {
1526 /// // Output: { 0: [1], 1: [3, 7] }
1527 /// # let mut results = Vec::new();
1528 /// # for _ in 0..3 {
1529 /// # results.push(stream.next().await.unwrap());
1530 /// # }
1531 /// # results.sort();
1532 /// # assert_eq!(results, vec![(0, 1), (1, 3), (1, 7)]);
1533 /// # }));
1534 /// # }
1535 /// ```
1536 pub fn scan<A, U, I, F>(
1537 self,
1538 init: impl IntoQuotedMut<'a, I, L> + Copy,
1539 f: impl IntoQuotedMut<'a, F, L> + Copy,
1540 ) -> KeyedStream<K, U, L, B, TotalOrder, ExactlyOnce>
1541 where
1542 O: IsOrdered,
1543 R: IsExactlyOnce,
1544 K: Clone + Eq + Hash,
1545 I: Fn() -> A + 'a,
1546 F: Fn(&mut A, V) -> Option<U> + 'a,
1547 {
1548 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1549 self.make_totally_ordered().make_exactly_once().generator(
1550 init,
1551 q!({
1552 let orig = f;
1553 move |state, v| {
1554 if let Some(out) = orig(state, v) {
1555 Generate::Yield(out)
1556 } else {
1557 Generate::Break
1558 }
1559 }
1560 }),
1561 )
1562 }
1563
1564 /// Iteratively processes the elements in each group using a state machine that can yield
1565 /// elements as it processes its inputs. This is designed to mirror the unstable generator
1566 /// syntax in Rust, without requiring special syntax.
1567 ///
1568 /// Like [`KeyedStream::scan`], this function takes in an initializer that emits the initial
1569 /// state for each group. The second argument defines the processing logic, taking in a
1570 /// mutable reference to the group's state and the value to be processed. It emits a
1571 /// [`Generate`] value, whose variants define what is emitted and whether further inputs
1572 /// should be processed.
1573 ///
1574 /// # Example
1575 /// ```rust
1576 /// # #[cfg(feature = "deploy")] {
1577 /// # use hydro_lang::prelude::*;
1578 /// # use futures::StreamExt;
1579 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1580 /// process
1581 /// .source_iter(q!(vec![(0, 1), (0, 3), (0, 100), (0, 10), (1, 3), (1, 4), (1, 3)]))
1582 /// .into_keyed()
1583 /// .generator(
1584 /// q!(|| 0),
1585 /// q!(|acc, x| {
1586 /// *acc += x;
1587 /// if *acc > 100 {
1588 /// hydro_lang::live_collections::keyed_stream::Generate::Return(
1589 /// "done!".to_owned()
1590 /// )
1591 /// } else if *acc % 2 == 0 {
1592 /// hydro_lang::live_collections::keyed_stream::Generate::Yield(
1593 /// "even".to_owned()
1594 /// )
1595 /// } else {
1596 /// hydro_lang::live_collections::keyed_stream::Generate::Continue
1597 /// }
1598 /// }),
1599 /// )
1600 /// # .entries()
1601 /// # }, |mut stream| async move {
1602 /// // Output: { 0: ["even", "done!"], 1: ["even"] }
1603 /// # let mut results = Vec::new();
1604 /// # for _ in 0..3 {
1605 /// # results.push(stream.next().await.unwrap());
1606 /// # }
1607 /// # results.sort();
1608 /// # assert_eq!(results, vec![(0, "done!".to_owned()), (0, "even".to_owned()), (1, "even".to_owned())]);
1609 /// # }));
1610 /// # }
1611 /// ```
1612 pub fn generator<A, U, I, F>(
1613 self,
1614 init: impl IntoQuotedMut<'a, I, L> + Copy,
1615 f: impl IntoQuotedMut<'a, F, L> + Copy,
1616 ) -> KeyedStream<K, U, L, B, TotalOrder, ExactlyOnce>
1617 where
1618 O: IsOrdered,
1619 R: IsExactlyOnce,
1620 K: Clone + Eq + Hash,
1621 I: Fn() -> A + 'a,
1622 F: Fn(&mut A, V) -> Generate<U> + 'a,
1623 {
1624 let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1625 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1626
1627 let this = self.make_totally_ordered().make_exactly_once();
1628
1629 let scan_init = q!(|| HashMap::new())
1630 .splice_fn0_ctx::<HashMap<K, Option<A>>>(&this.location)
1631 .into();
1632 let scan_f = q!(move |acc: &mut HashMap<_, _>, (k, v)| {
1633 let existing_state = acc.entry(Clone::clone(&k)).or_insert_with(|| Some(init()));
1634 if let Some(existing_state_value) = existing_state {
1635 match f(existing_state_value, v) {
1636 Generate::Yield(out) => Some(Some((k, out))),
1637 Generate::Return(out) => {
1638 let _ = existing_state.take(); // TODO(shadaj): garbage collect with termination markers
1639 Some(Some((k, out)))
1640 }
1641 Generate::Break => {
1642 let _ = existing_state.take(); // TODO(shadaj): garbage collect with termination markers
1643 Some(None)
1644 }
1645 Generate::Continue => Some(None),
1646 }
1647 } else {
1648 Some(None)
1649 }
1650 })
1651 .splice_fn2_borrow_mut_ctx::<HashMap<K, Option<A>>, (K, V), _>(&this.location)
1652 .into();
1653
1654 let scan_node = HydroNode::Scan {
1655 init: scan_init,
1656 acc: scan_f,
1657 input: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
1658 metadata: this.location.new_node_metadata(Stream::<
1659 Option<(K, U)>,
1660 L,
1661 B,
1662 TotalOrder,
1663 ExactlyOnce,
1664 >::collection_kind()),
1665 };
1666
1667 let flatten_f = q!(|d| d)
1668 .splice_fn1_ctx::<Option<(K, U)>, _>(&this.location)
1669 .into();
1670 let flatten_node = HydroNode::FlatMap {
1671 f: flatten_f,
1672 input: Box::new(scan_node),
1673 metadata: this.location.new_node_metadata(KeyedStream::<
1674 K,
1675 U,
1676 L,
1677 B,
1678 TotalOrder,
1679 ExactlyOnce,
1680 >::collection_kind()),
1681 };
1682
1683 KeyedStream::new(this.location.clone(), flatten_node)
1684 }
1685
1686 /// A variant of [`Stream::fold`], intended for keyed streams. The aggregation is executed
1687 /// in-order across the values in each group. But the aggregation function returns a boolean,
1688 /// which when true indicates that the aggregated result is complete and can be released to
1689 /// downstream computation. Unlike [`KeyedStream::fold`], this means that even if the input
1690 /// stream is [`super::boundedness::Unbounded`], the outputs of the fold can be processed like
1691 /// normal stream elements.
1692 ///
1693 /// # Example
1694 /// ```rust
1695 /// # #[cfg(feature = "deploy")] {
1696 /// # use hydro_lang::prelude::*;
1697 /// # use futures::StreamExt;
1698 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1699 /// process
1700 /// .source_iter(q!(vec![(0, 2), (0, 3), (1, 3), (1, 6)]))
1701 /// .into_keyed()
1702 /// .fold_early_stop(
1703 /// q!(|| 0),
1704 /// q!(|acc, x| {
1705 /// *acc += x;
1706 /// x % 2 == 0
1707 /// }),
1708 /// )
1709 /// # .entries()
1710 /// # }, |mut stream| async move {
1711 /// // Output: { 0: 2, 1: 9 }
1712 /// # let mut results = Vec::new();
1713 /// # for _ in 0..2 {
1714 /// # results.push(stream.next().await.unwrap());
1715 /// # }
1716 /// # results.sort();
1717 /// # assert_eq!(results, vec![(0, 2), (1, 9)]);
1718 /// # }));
1719 /// # }
1720 /// ```
1721 pub fn fold_early_stop<A, I, F>(
1722 self,
1723 init: impl IntoQuotedMut<'a, I, L> + Copy,
1724 f: impl IntoQuotedMut<'a, F, L> + Copy,
1725 ) -> KeyedSingleton<K, A, L, B::WithBoundedValue>
1726 where
1727 O: IsOrdered,
1728 R: IsExactlyOnce,
1729 K: Clone + Eq + Hash,
1730 I: Fn() -> A + 'a,
1731 F: Fn(&mut A, V) -> bool + 'a,
1732 {
1733 let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1734 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1735 let out_without_bound_cast = self.generator(
1736 q!(move || Some(init())),
1737 q!(move |key_state, v| {
1738 if let Some(key_state_value) = key_state.as_mut() {
1739 if f(key_state_value, v) {
1740 Generate::Return(key_state.take().unwrap())
1741 } else {
1742 Generate::Continue
1743 }
1744 } else {
1745 unreachable!()
1746 }
1747 }),
1748 );
1749
1750 // SAFETY: The generator will only ever return at most one value per key, since once it
1751 // returns a value for a key it will never process any more values for that key.
1752 out_without_bound_cast.cast_at_most_one_entry_per_key()
1753 }
1754
1755 /// Gets the first element inside each group of values as a [`KeyedSingleton`] that preserves
1756 /// the original group keys. Requires the input stream to have [`TotalOrder`] guarantees,
1757 /// otherwise the first element would be non-deterministic.
1758 ///
1759 /// # Example
1760 /// ```rust
1761 /// # #[cfg(feature = "deploy")] {
1762 /// # use hydro_lang::prelude::*;
1763 /// # use futures::StreamExt;
1764 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1765 /// process
1766 /// .source_iter(q!(vec![(0, 2), (0, 3), (1, 3), (1, 6)]))
1767 /// .into_keyed()
1768 /// .first()
1769 /// # .entries()
1770 /// # }, |mut stream| async move {
1771 /// // Output: { 0: 2, 1: 3 }
1772 /// # let mut results = Vec::new();
1773 /// # for _ in 0..2 {
1774 /// # results.push(stream.next().await.unwrap());
1775 /// # }
1776 /// # results.sort();
1777 /// # assert_eq!(results, vec![(0, 2), (1, 3)]);
1778 /// # }));
1779 /// # }
1780 /// ```
1781 pub fn first(self) -> KeyedSingleton<K, V, L, B::WithBoundedValue>
1782 where
1783 O: IsOrdered,
1784 R: IsExactlyOnce,
1785 K: Clone + Eq + Hash,
1786 {
1787 self.fold_early_stop(
1788 q!(|| None),
1789 q!(|acc, v| {
1790 *acc = Some(v);
1791 true
1792 }),
1793 )
1794 .map(q!(|v| v.unwrap()))
1795 }
1796
1797 /// Returns a keyed stream containing at most the first `n` values per key,
1798 /// preserving the original order within each group. Similar to SQL `LIMIT`
1799 /// applied per group.
1800 ///
1801 /// This requires the stream to have a [`TotalOrder`] guarantee and [`ExactlyOnce`]
1802 /// retries, since the result depends on the order and cardinality of elements
1803 /// within each group.
1804 ///
1805 /// # Example
1806 /// ```rust
1807 /// # #[cfg(feature = "deploy")] {
1808 /// # use hydro_lang::prelude::*;
1809 /// # use futures::StreamExt;
1810 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1811 /// process
1812 /// .source_iter(q!(vec![(1, 10), (1, 20), (1, 30), (2, 40), (2, 50)]))
1813 /// .into_keyed()
1814 /// .limit(q!(2))
1815 /// # .entries()
1816 /// # }, |mut stream| async move {
1817 /// // { 1: [10, 20], 2: [40, 50] }
1818 /// # let mut results = Vec::new();
1819 /// # for _ in 0..4 {
1820 /// # results.push(stream.next().await.unwrap());
1821 /// # }
1822 /// # results.sort();
1823 /// # assert_eq!(results, vec![(1, 10), (1, 20), (2, 40), (2, 50)]);
1824 /// # }));
1825 /// # }
1826 /// ```
1827 pub fn limit(
1828 self,
1829 n: impl QuotedWithContext<'a, usize, L> + Copy + 'a,
1830 ) -> KeyedStream<K, V, L, B, TotalOrder, ExactlyOnce>
1831 where
1832 O: IsOrdered,
1833 R: IsExactlyOnce,
1834 K: Clone + Eq + Hash,
1835 {
1836 self.generator(
1837 q!(|| 0usize),
1838 q!(move |count, item| {
1839 if *count == n {
1840 Generate::Break
1841 } else {
1842 *count += 1;
1843 if *count == n {
1844 Generate::Return(item)
1845 } else {
1846 Generate::Yield(item)
1847 }
1848 }
1849 }),
1850 )
1851 }
1852
1853 /// Assigns a zero-based index to each value within each key group, emitting
1854 /// `(K, (index, V))` tuples with per-key sequential indices.
1855 ///
1856 /// The output keyed stream has [`TotalOrder`] and [`ExactlyOnce`] guarantees.
1857 /// This is a streaming operator that processes elements as they arrive.
1858 ///
1859 /// # Example
1860 /// ```rust
1861 /// # #[cfg(feature = "deploy")] {
1862 /// # use hydro_lang::prelude::*;
1863 /// # use futures::StreamExt;
1864 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1865 /// process
1866 /// .source_iter(q!(vec![(1, 10), (2, 20), (1, 30)]))
1867 /// .into_keyed()
1868 /// .enumerate()
1869 /// # .entries()
1870 /// # }, |mut stream| async move {
1871 /// // per-key indices: { 1: [(0, 10), (1, 30)], 2: [(0, 20)] }
1872 /// # let mut results = Vec::new();
1873 /// # for _ in 0..3 {
1874 /// # results.push(stream.next().await.unwrap());
1875 /// # }
1876 /// # let key1: Vec<_> = results.iter().filter(|(k, _)| *k == 1).map(|(_, v)| *v).collect();
1877 /// # let key2: Vec<_> = results.iter().filter(|(k, _)| *k == 2).map(|(_, v)| *v).collect();
1878 /// # assert_eq!(key1, vec![(0, 10), (1, 30)]);
1879 /// # assert_eq!(key2, vec![(0, 20)]);
1880 /// # }));
1881 /// # }
1882 /// ```
1883 pub fn enumerate(self) -> KeyedStream<K, (usize, V), L, B, TotalOrder, ExactlyOnce>
1884 where
1885 O: IsOrdered,
1886 R: IsExactlyOnce,
1887 K: Eq + Hash + Clone,
1888 {
1889 self.scan(
1890 q!(|| 0),
1891 q!(|acc, next| {
1892 let curr = *acc;
1893 *acc += 1;
1894 Some((curr, next))
1895 }),
1896 )
1897 }
1898
1899 /// Counts the number of elements in each group, producing a [`KeyedSingleton`] with the counts.
1900 ///
1901 /// # Example
1902 /// ```rust
1903 /// # #[cfg(feature = "deploy")] {
1904 /// # use hydro_lang::prelude::*;
1905 /// # use futures::StreamExt;
1906 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1907 /// let tick = process.tick();
1908 /// let numbers = process
1909 /// .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4), (1, 5)]))
1910 /// .into_keyed();
1911 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1912 /// batch
1913 /// .value_counts()
1914 /// .entries()
1915 /// .all_ticks()
1916 /// # }, |mut stream| async move {
1917 /// // (1, 3), (2, 2)
1918 /// # let mut results = Vec::new();
1919 /// # for _ in 0..2 {
1920 /// # results.push(stream.next().await.unwrap());
1921 /// # }
1922 /// # results.sort();
1923 /// # assert_eq!(results, vec![(1, 3), (2, 2)]);
1924 /// # }));
1925 /// # }
1926 /// ```
1927 pub fn value_counts(
1928 self,
1929 ) -> KeyedSingleton<K, usize, L, <B as KeyedSingletonBound>::KeyedStreamToMonotone>
1930 where
1931 R: IsExactlyOnce,
1932 K: Eq + Hash,
1933 {
1934 self.make_exactly_once()
1935 .assume_ordering_trusted(
1936 nondet!(/** ordering within each group affects neither result nor intermediates */),
1937 )
1938 .fold(
1939 q!(|| 0),
1940 q!(
1941 |acc, _| *acc += 1,
1942 monotone = manual_proof!(/** += 1 is monotonic */)
1943 ),
1944 )
1945 }
1946
1947 /// Like [`Stream::fold`] but in the spirit of SQL `GROUP BY`, aggregates the values in each
1948 /// group via the `comb` closure.
1949 ///
1950 /// Depending on the input stream guarantees, the closure may need to be commutative
1951 /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1952 ///
1953 /// If the input and output value types are the same and do not require initialization then use
1954 /// [`KeyedStream::reduce`].
1955 ///
1956 /// # Example
1957 /// ```rust
1958 /// # #[cfg(feature = "deploy")] {
1959 /// # use hydro_lang::prelude::*;
1960 /// # use futures::StreamExt;
1961 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1962 /// let tick = process.tick();
1963 /// let numbers = process
1964 /// .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1965 /// .into_keyed();
1966 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1967 /// batch
1968 /// .fold(q!(|| false), q!(|acc, x| *acc |= x))
1969 /// .entries()
1970 /// .all_ticks()
1971 /// # }, |mut stream| async move {
1972 /// // (1, false), (2, true)
1973 /// # let mut results = Vec::new();
1974 /// # for _ in 0..2 {
1975 /// # results.push(stream.next().await.unwrap());
1976 /// # }
1977 /// # results.sort();
1978 /// # assert_eq!(results, vec![(1, false), (2, true)]);
1979 /// # }));
1980 /// # }
1981 /// ```
1982 pub fn fold<A, I: Fn() -> A + 'a, F: 'a + Fn(&mut A, V), C, Idemp, M, B2: KeyedSingletonBound>(
1983 self,
1984 init: impl IntoQuotedMut<'a, I, L>,
1985 comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp, M>>,
1986 ) -> KeyedSingleton<K, A, L, B2>
1987 where
1988 K: Eq + Hash,
1989 C: ValidCommutativityFor<O> + crate::properties::IsProved,
1990 Idemp: ValidIdempotenceFor<R> + crate::properties::IsProved,
1991 B: ApplyMonotoneKeyedStream<M, B2>,
1992 {
1993 let init = init.splice_fn0_ctx(&self.location).into();
1994 let (comb, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1995 proof.register_proof(&comb);
1996
1997 let retried = self
1998 .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */));
1999
2000 KeyedSingleton::new(
2001 retried.location.clone(),
2002 HydroNode::FoldKeyed {
2003 init,
2004 acc: comb.into(),
2005 input: Box::new(retried.ir_node.replace(HydroNode::Placeholder)),
2006 is_commutative: C::IS_PROVED,
2007 is_idempotent: Idemp::IS_PROVED,
2008 metadata: retried
2009 .location
2010 .new_node_metadata(KeyedSingleton::<K, A, L, B2>::collection_kind()),
2011 },
2012 )
2013 .assert_has_consistency_of(manual_proof!(/** algebraic properties */))
2014 }
2015
2016 /// Like [`Stream::reduce`] but in the spirit of SQL `GROUP BY`, aggregates the values in each
2017 /// group via the `comb` closure.
2018 ///
2019 /// Depending on the input stream guarantees, the closure may need to be commutative
2020 /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
2021 ///
2022 /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold`].
2023 ///
2024 /// # Example
2025 /// ```rust
2026 /// # #[cfg(feature = "deploy")] {
2027 /// # use hydro_lang::prelude::*;
2028 /// # use futures::StreamExt;
2029 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2030 /// let tick = process.tick();
2031 /// let numbers = process
2032 /// .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
2033 /// .into_keyed();
2034 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2035 /// batch
2036 /// .reduce(q!(|acc, x| *acc |= x))
2037 /// .entries()
2038 /// .all_ticks()
2039 /// # }, |mut stream| async move {
2040 /// // (1, false), (2, true)
2041 /// # let mut results = Vec::new();
2042 /// # for _ in 0..2 {
2043 /// # results.push(stream.next().await.unwrap());
2044 /// # }
2045 /// # results.sort();
2046 /// # assert_eq!(results, vec![(1, false), (2, true)]);
2047 /// # }));
2048 /// # }
2049 /// ```
2050 pub fn reduce<F: Fn(&mut V, V) + 'a, C, Idemp>(
2051 self,
2052 comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
2053 ) -> KeyedSingleton<K, V, L, B>
2054 where
2055 K: Eq + Hash,
2056 C: ValidCommutativityFor<O> + crate::properties::IsProved,
2057 Idemp: ValidIdempotenceFor<R> + crate::properties::IsProved,
2058 {
2059 let (f, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
2060 proof.register_proof(&f);
2061
2062 let ordered = self
2063 .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
2064 .assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */));
2065
2066 KeyedSingleton::new(
2067 ordered.location.clone(),
2068 HydroNode::ReduceKeyed {
2069 f: f.into(),
2070 input: Box::new(ordered.ir_node.replace(HydroNode::Placeholder)),
2071 is_commutative: C::IS_PROVED,
2072 is_idempotent: Idemp::IS_PROVED,
2073 metadata: ordered
2074 .location
2075 .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
2076 },
2077 )
2078 .assert_has_consistency_of(manual_proof!(/** algebraic properties */))
2079 }
2080
2081 /// A special case of [`KeyedStream::reduce`] where tuples with keys less than the watermark
2082 /// are automatically deleted.
2083 ///
2084 /// Depending on the input stream guarantees, the closure may need to be commutative
2085 /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
2086 ///
2087 /// # Example
2088 /// ```rust
2089 /// # #[cfg(feature = "deploy")] {
2090 /// # use hydro_lang::prelude::*;
2091 /// # use futures::StreamExt;
2092 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2093 /// let tick = process.tick();
2094 /// let watermark = tick.singleton(q!(2));
2095 /// let numbers = process
2096 /// .source_iter(q!([(0, false), (1, false), (2, false), (2, true)]))
2097 /// .into_keyed();
2098 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2099 /// batch
2100 /// .reduce_watermark(watermark, q!(|acc, x| *acc |= x))
2101 /// .entries()
2102 /// .all_ticks()
2103 /// # }, |mut stream| async move {
2104 /// // (2, true)
2105 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
2106 /// # }));
2107 /// # }
2108 /// ```
2109 pub fn reduce_watermark<O2, F, C, Idemp>(
2110 self,
2111 other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>,
2112 comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
2113 ) -> KeyedSingleton<K, V, L, B>
2114 where
2115 K: Eq + Hash,
2116 O2: Clone,
2117 F: Fn(&mut V, V) + 'a,
2118 C: ValidCommutativityFor<O> + crate::properties::IsProved,
2119 Idemp: ValidIdempotenceFor<R> + crate::properties::IsProved,
2120 {
2121 let other: Optional<O2, Tick<L::Root>, Bounded> = other.into();
2122 check_matching_location(&self.location.root(), other.location.outer());
2123 let (f, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
2124 proof.register_proof(&f);
2125
2126 let ordered = self
2127 .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
2128 .assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */));
2129
2130 KeyedSingleton::new(
2131 ordered.location.clone(),
2132 HydroNode::ReduceKeyedWatermark {
2133 f: f.into(),
2134 input: Box::new(ordered.ir_node.replace(HydroNode::Placeholder)),
2135 watermark: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2136 is_commutative: C::IS_PROVED,
2137 is_idempotent: Idemp::IS_PROVED,
2138 metadata: ordered
2139 .location
2140 .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
2141 },
2142 )
2143 .assert_has_consistency_of(manual_proof!(/** algebraic properties */))
2144 }
2145
2146 /// Given a bounded stream of keys `K`, returns a new keyed stream containing only the groups
2147 /// whose keys are not in the bounded stream.
2148 ///
2149 /// # Example
2150 /// ```rust
2151 /// # #[cfg(feature = "deploy")] {
2152 /// # use hydro_lang::prelude::*;
2153 /// # use futures::StreamExt;
2154 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2155 /// let tick = process.tick();
2156 /// let keyed_stream = process
2157 /// .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
2158 /// .batch(&tick, nondet!(/** test */))
2159 /// .into_keyed();
2160 /// let keys_to_remove = process
2161 /// .source_iter(q!(vec![1, 2]))
2162 /// .batch(&tick, nondet!(/** test */));
2163 /// keyed_stream.filter_key_not_in(keys_to_remove).all_ticks()
2164 /// # .entries()
2165 /// # }, |mut stream| async move {
2166 /// // { 3: ['c'], 4: ['d'] }
2167 /// # let mut results = Vec::new();
2168 /// # for _ in 0..2 {
2169 /// # results.push(stream.next().await.unwrap());
2170 /// # }
2171 /// # results.sort();
2172 /// # assert_eq!(results, vec![(3, 'c'), (4, 'd')]);
2173 /// # }));
2174 /// # }
2175 /// ```
2176 pub fn filter_key_not_in<O2: Ordering, R2: Retries>(
2177 self,
2178 other: Stream<K, L, Bounded, O2, R2>,
2179 ) -> Self
2180 where
2181 K: Eq + Hash,
2182 {
2183 check_matching_location(&self.location, &other.location);
2184
2185 KeyedStream::new(
2186 self.location.clone(),
2187 HydroNode::AntiJoin {
2188 pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2189 neg: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2190 metadata: self.location.new_node_metadata(Self::collection_kind()),
2191 },
2192 )
2193 }
2194
2195 /// Emit a keyed stream containing keys shared between two keyed streams,
2196 /// where each value in the output keyed stream is a tuple of
2197 /// (self's value, other's value).
2198 /// If there are multiple values for the same key, this performs a cross product
2199 /// for each matching key.
2200 ///
2201 /// # Example
2202 /// ```rust
2203 /// # #[cfg(feature = "deploy")] {
2204 /// # use hydro_lang::prelude::*;
2205 /// # use futures::StreamExt;
2206 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2207 /// let tick = process.tick();
2208 /// let keyed_data = process
2209 /// .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
2210 /// .into_keyed()
2211 /// .batch(&tick, nondet!(/** test */));
2212 /// let other_data = process
2213 /// .source_iter(q!(vec![(1, 100), (2, 200), (2, 201)]))
2214 /// .into_keyed()
2215 /// .batch(&tick, nondet!(/** test */));
2216 /// keyed_data.join_keyed_stream(other_data).entries().all_ticks()
2217 /// # }, |mut stream| async move {
2218 /// // { 1: [(10, 100), (11, 100)], 2: [(20, 200), (20, 201)] } in any order
2219 /// # let mut results = vec![];
2220 /// # for _ in 0..4 {
2221 /// # results.push(stream.next().await.unwrap());
2222 /// # }
2223 /// # results.sort();
2224 /// # assert_eq!(results, vec![(1, (10, 100)), (1, (11, 100)), (2, (20, 200)), (2, (20, 201))]);
2225 /// # }));
2226 /// # }
2227 /// ```
2228 #[expect(clippy::type_complexity, reason = "ordering / retries propagation")]
2229 pub fn join_keyed_stream<V2, B2: Boundedness, O2: Ordering, R2: Retries>(
2230 self,
2231 other: KeyedStream<K, V2, L, B2, O2, R2>,
2232 ) -> KeyedStream<
2233 K,
2234 (V, V2),
2235 L,
2236 B,
2237 B2::PreserveOrderIfBounded<NoOrder>,
2238 <R as MinRetries<R2>>::Min,
2239 >
2240 where
2241 K: Eq + Hash + Clone,
2242 R: MinRetries<R2>,
2243 V: Clone,
2244 V2: Clone,
2245 {
2246 self.entries().join(other.entries()).into_keyed()
2247 }
2248
2249 /// Deduplicates values within each key group, emitting each unique value per key
2250 /// exactly once.
2251 ///
2252 /// # Example
2253 /// ```rust
2254 /// # #[cfg(feature = "deploy")] {
2255 /// # use hydro_lang::prelude::*;
2256 /// # use futures::StreamExt;
2257 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2258 /// process
2259 /// .source_iter(q!(vec![(1, 10), (2, 20), (1, 10), (2, 30), (1, 20)]))
2260 /// .into_keyed()
2261 /// .unique()
2262 /// # .entries()
2263 /// # }, |mut stream| async move {
2264 /// // unique values per key: { 1: [10, 20], 2: [20, 30] }
2265 /// # let mut results = Vec::new();
2266 /// # for _ in 0..4 {
2267 /// # results.push(stream.next().await.unwrap());
2268 /// # }
2269 /// # let mut key1: Vec<_> = results.iter().filter(|(k, _)| *k == 1).map(|(_, v)| *v).collect();
2270 /// # let mut key2: Vec<_> = results.iter().filter(|(k, _)| *k == 2).map(|(_, v)| *v).collect();
2271 /// # key1.sort();
2272 /// # key2.sort();
2273 /// # assert_eq!(key1, vec![10, 20]);
2274 /// # assert_eq!(key2, vec![20, 30]);
2275 /// # }));
2276 /// # }
2277 /// ```
2278 pub fn unique(self) -> KeyedStream<K, V, L, B, NoOrder, ExactlyOnce>
2279 where
2280 K: Eq + Hash + Clone,
2281 V: Eq + Hash + Clone,
2282 {
2283 self.entries().unique().into_keyed()
2284 }
2285
2286 /// Sorts the values within each key group in ascending order.
2287 ///
2288 /// The output keyed stream has a [`TotalOrder`] guarantee on the values within
2289 /// each group. This operator will block until all elements in the input stream
2290 /// are available, so it requires the input stream to be [`Bounded`].
2291 ///
2292 /// # Example
2293 /// ```rust
2294 /// # #[cfg(feature = "deploy")] {
2295 /// # use hydro_lang::prelude::*;
2296 /// # use futures::StreamExt;
2297 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2298 /// let tick = process.tick();
2299 /// let numbers = process
2300 /// .source_iter(q!(vec![(1, 3), (2, 1), (1, 1), (2, 2)]))
2301 /// .into_keyed();
2302 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2303 /// batch.sort().all_ticks()
2304 /// # .entries()
2305 /// # }, |mut stream| async move {
2306 /// // values sorted within each key: { 1: [1, 3], 2: [1, 2] }
2307 /// # let mut results = Vec::new();
2308 /// # for _ in 0..4 {
2309 /// # results.push(stream.next().await.unwrap());
2310 /// # }
2311 /// # let key1_vals: Vec<_> = results.iter().filter(|(k, _)| *k == 1).map(|(_, v)| *v).collect();
2312 /// # let key2_vals: Vec<_> = results.iter().filter(|(k, _)| *k == 2).map(|(_, v)| *v).collect();
2313 /// # assert_eq!(key1_vals, vec![1, 3]);
2314 /// # assert_eq!(key2_vals, vec![1, 2]);
2315 /// # }));
2316 /// # }
2317 /// ```
2318 pub fn sort(self) -> KeyedStream<K, V, L, Bounded, TotalOrder, R>
2319 where
2320 B: IsBounded,
2321 K: Ord,
2322 V: Ord,
2323 {
2324 self.entries().sort().into_keyed()
2325 }
2326
2327 /// Produces a new keyed stream that combines the groups of the inputs by first emitting the
2328 /// elements of the `self` stream, and then emits the elements of the `other` stream (if a key
2329 /// is only present in one of the inputs, its values are passed through as-is). The output has
2330 /// a [`TotalOrder`] guarantee if and only if both inputs have a [`TotalOrder`] guarantee.
2331 ///
2332 /// Currently, both input streams must be [`Bounded`]. This operator will block
2333 /// on the first stream until all its elements are available. In a future version,
2334 /// we will relax the requirement on the `other` stream.
2335 ///
2336 /// # Example
2337 /// ```rust
2338 /// # #[cfg(feature = "deploy")] {
2339 /// # use hydro_lang::prelude::*;
2340 /// # use futures::StreamExt;
2341 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2342 /// let tick = process.tick();
2343 /// let numbers = process.source_iter(q!(vec![(0, 1), (1, 3)])).into_keyed();
2344 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2345 /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
2346 /// # .entries()
2347 /// # }, |mut stream| async move {
2348 /// // { 0: [2, 1], 1: [4, 3] }
2349 /// # let mut results = Vec::new();
2350 /// # for _ in 0..4 {
2351 /// # results.push(stream.next().await.unwrap());
2352 /// # }
2353 /// # results.sort();
2354 /// # assert_eq!(results, vec![(0, 1), (0, 2), (1, 3), (1, 4)]);
2355 /// # }));
2356 /// # }
2357 /// ```
2358 pub fn chain<O2: Ordering, R2: Retries>(
2359 self,
2360 other: KeyedStream<K, V, L, Bounded, O2, R2>,
2361 ) -> KeyedStream<K, V, L, Bounded, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
2362 where
2363 B: IsBounded,
2364 O: MinOrder<O2>,
2365 R: MinRetries<R2>,
2366 {
2367 let this = self.make_bounded();
2368 check_matching_location(&this.location, &other.location);
2369
2370 KeyedStream::new(
2371 this.location.clone(),
2372 HydroNode::Chain {
2373 first: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
2374 second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2375 metadata: this.location.new_node_metadata(KeyedStream::<
2376 K,
2377 V,
2378 L,
2379 Bounded,
2380 <O as MinOrder<O2>>::Min,
2381 <R as MinRetries<R2>>::Min,
2382 >::collection_kind()),
2383 },
2384 )
2385 }
2386
2387 /// Emit a keyed stream containing keys shared between the keyed stream and the
2388 /// keyed singleton, where each value in the output keyed stream is a tuple of
2389 /// (the keyed stream's value, the keyed singleton's value).
2390 ///
2391 /// # Example
2392 /// ```rust
2393 /// # #[cfg(feature = "deploy")] {
2394 /// # use hydro_lang::prelude::*;
2395 /// # use futures::StreamExt;
2396 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2397 /// let tick = process.tick();
2398 /// let keyed_data = process
2399 /// .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
2400 /// .into_keyed()
2401 /// .batch(&tick, nondet!(/** test */));
2402 /// let singleton_data = process
2403 /// .source_iter(q!(vec![(1, 100), (2, 200)]))
2404 /// .into_keyed()
2405 /// .batch(&tick, nondet!(/** test */))
2406 /// .first();
2407 /// keyed_data.join_keyed_singleton(singleton_data).entries().all_ticks()
2408 /// # }, |mut stream| async move {
2409 /// // { 1: [(10, 100), (11, 100)], 2: [(20, 200)] } in any order
2410 /// # let mut results = vec![];
2411 /// # for _ in 0..3 {
2412 /// # results.push(stream.next().await.unwrap());
2413 /// # }
2414 /// # results.sort();
2415 /// # assert_eq!(results, vec![(1, (10, 100)), (1, (11, 100)), (2, (20, 200))]);
2416 /// # }));
2417 /// # }
2418 /// ```
2419 pub fn join_keyed_singleton<V2: Clone, B2: IsBounded>(
2420 self,
2421 other: KeyedSingleton<K, V2, L, B2>,
2422 ) -> KeyedStream<K, (V, V2), L, B, O, R>
2423 where
2424 K: Eq + Hash + Clone,
2425 V: Clone,
2426 {
2427 let ir_node = if B2::BOUNDED {
2428 HydroNode::JoinHalf {
2429 left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2430 right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2431 metadata: self
2432 .location
2433 .new_node_metadata(KeyedStream::<K, (V, V2), L, B, O, R>::collection_kind()),
2434 }
2435 } else {
2436 HydroNode::Join {
2437 left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2438 right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2439 metadata: self
2440 .location
2441 .new_node_metadata(KeyedStream::<K, (V, V2), L, B, O, R>::collection_kind()),
2442 }
2443 };
2444
2445 KeyedStream::new(self.location.clone(), ir_node)
2446 }
2447
2448 /// Gets the values associated with a specific key from the keyed stream.
2449 /// Returns an empty stream if the key is `None` or there are no associated values.
2450 ///
2451 /// # Example
2452 /// ```rust
2453 /// # #[cfg(feature = "deploy")] {
2454 /// # use hydro_lang::prelude::*;
2455 /// # use futures::StreamExt;
2456 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2457 /// let tick = process.tick();
2458 /// let keyed_data = process
2459 /// .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
2460 /// .into_keyed()
2461 /// .batch(&tick, nondet!(/** test */));
2462 /// let key = tick.singleton(q!(1));
2463 /// keyed_data.get(key).all_ticks()
2464 /// # }, |mut stream| async move {
2465 /// // 10, 11
2466 /// # let mut results = vec![];
2467 /// # for _ in 0..2 {
2468 /// # results.push(stream.next().await.unwrap());
2469 /// # }
2470 /// # results.sort();
2471 /// # assert_eq!(results, vec![10, 11]);
2472 /// # }));
2473 /// # }
2474 /// ```
2475 pub fn get(self, key: impl Into<Optional<K, L, Bounded>>) -> Stream<V, L, B, O, R>
2476 where
2477 K: Eq + Hash + Clone,
2478 V: Clone,
2479 {
2480 let joined =
2481 self.join_keyed_singleton(key.into().map(q!(|k| (k, ()))).into_keyed_singleton());
2482
2483 if O::ORDERING_KIND == StreamOrder::TotalOrder {
2484 joined
2485 .use_ordering_type::<TotalOrder>()
2486 .cast_at_most_one_key()
2487 .map(q!(|(_, (v, _))| v))
2488 .weaken_ordering()
2489 } else {
2490 joined.values().map(q!(|(v, _)| v)).use_ordering_type()
2491 }
2492 }
2493
2494 /// For each value in `self`, find the matching key in `lookup`.
2495 /// The output is a keyed stream with the key from `self`, and a value
2496 /// that is a tuple of (`self`'s value, Option<`lookup`'s value>).
2497 /// If the key is not present in `lookup`, the option will be [`None`].
2498 ///
2499 /// # Example
2500 /// ```rust
2501 /// # #[cfg(feature = "deploy")] {
2502 /// # use hydro_lang::prelude::*;
2503 /// # use futures::StreamExt;
2504 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2505 /// # let tick = process.tick();
2506 /// let requests = // { 1: [10, 11], 2: 20 }
2507 /// # process
2508 /// # .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
2509 /// # .into_keyed()
2510 /// # .batch(&tick, nondet!(/** test */));
2511 /// let other_data = // { 10: 100, 11: 110 }
2512 /// # process
2513 /// # .source_iter(q!(vec![(10, 100), (11, 110)]))
2514 /// # .into_keyed()
2515 /// # .batch(&tick, nondet!(/** test */))
2516 /// # .first();
2517 /// requests.lookup_keyed_singleton(other_data)
2518 /// # .entries().all_ticks()
2519 /// # }, |mut stream| async move {
2520 /// // { 1: [(10, Some(100)), (11, Some(110))], 2: (20, None) }
2521 /// # let mut results = vec![];
2522 /// # for _ in 0..3 {
2523 /// # results.push(stream.next().await.unwrap());
2524 /// # }
2525 /// # results.sort();
2526 /// # assert_eq!(results, vec![(1, (10, Some(100))), (1, (11, Some(110))), (2, (20, None))]);
2527 /// # }));
2528 /// # }
2529 /// ```
2530 pub fn lookup_keyed_singleton<V2>(
2531 self,
2532 lookup: KeyedSingleton<V, V2, L, Bounded>,
2533 ) -> KeyedStream<K, (V, Option<V2>), L, Bounded, NoOrder, R>
2534 where
2535 B: IsBounded,
2536 K: Eq + Hash + Clone,
2537 V: Eq + Hash + Clone,
2538 V2: Clone,
2539 {
2540 self.lookup_keyed_stream(lookup.into_keyed_stream().weaken_retries::<R>())
2541 }
2542
2543 /// For each value in `self`, find the matching key in `lookup`.
2544 /// The output is a keyed stream with the key from `self`, and a value
2545 /// that is a tuple of (`self`'s value, Option<`lookup`'s value>).
2546 /// If the key is not present in `lookup`, the option will be [`None`].
2547 ///
2548 /// # Example
2549 /// ```rust
2550 /// # #[cfg(feature = "deploy")] {
2551 /// # use hydro_lang::prelude::*;
2552 /// # use futures::StreamExt;
2553 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2554 /// # let tick = process.tick();
2555 /// let requests = // { 1: [10, 11], 2: 20 }
2556 /// # process
2557 /// # .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
2558 /// # .into_keyed()
2559 /// # .batch(&tick, nondet!(/** test */));
2560 /// let other_data = // { 10: [100, 101], 11: 110 }
2561 /// # process
2562 /// # .source_iter(q!(vec![(10, 100), (10, 101), (11, 110)]))
2563 /// # .into_keyed()
2564 /// # .batch(&tick, nondet!(/** test */));
2565 /// requests.lookup_keyed_stream(other_data)
2566 /// # .entries().all_ticks()
2567 /// # }, |mut stream| async move {
2568 /// // { 1: [(10, Some(100)), (10, Some(101)), (11, Some(110))], 2: (20, None) }
2569 /// # let mut results = vec![];
2570 /// # for _ in 0..4 {
2571 /// # results.push(stream.next().await.unwrap());
2572 /// # }
2573 /// # results.sort();
2574 /// # assert_eq!(results, vec![(1, (10, Some(100))), (1, (10, Some(101))), (1, (11, Some(110))), (2, (20, None))]);
2575 /// # }));
2576 /// # }
2577 /// ```
2578 #[expect(clippy::type_complexity, reason = "retries propagation")]
2579 pub fn lookup_keyed_stream<V2, O2: Ordering, R2: Retries>(
2580 self,
2581 lookup: KeyedStream<V, V2, L, Bounded, O2, R2>,
2582 ) -> KeyedStream<K, (V, Option<V2>), L, Bounded, NoOrder, <R as MinRetries<R2>>::Min>
2583 where
2584 B: IsBounded,
2585 K: Eq + Hash + Clone,
2586 V: Eq + Hash + Clone,
2587 V2: Clone,
2588 R: MinRetries<R2>,
2589 {
2590 let inverted = self
2591 .make_bounded()
2592 .entries()
2593 .map(q!(|(key, lookup_value)| (lookup_value, key)))
2594 .into_keyed();
2595 let found = inverted
2596 .clone()
2597 .join_keyed_stream(lookup.clone())
2598 .entries()
2599 .map(q!(|(lookup_value, (key, value))| (
2600 key,
2601 (lookup_value, Some(value))
2602 )))
2603 .into_keyed();
2604 let not_found = inverted
2605 .filter_key_not_in(lookup.keys())
2606 .entries()
2607 .map(q!(|(lookup_value, key)| (key, (lookup_value, None))))
2608 .into_keyed();
2609
2610 found.chain(not_found.weaken_retries::<<R as MinRetries<R2>>::Min>())
2611 }
2612
2613 /// Shifts this keyed stream into an atomic context, which guarantees that any downstream logic
2614 /// will all be executed synchronously before any outputs are yielded (in [`KeyedStream::end_atomic`]).
2615 ///
2616 /// This is useful to enforce local consistency constraints, such as ensuring that a write is
2617 /// processed before an acknowledgement is emitted.
2618 pub fn atomic(self) -> KeyedStream<K, V, Atomic<L>, B, O, R> {
2619 let id = self.location.flow_state().borrow_mut().next_clock_id();
2620 let out_location = Atomic {
2621 tick: Tick {
2622 id,
2623 l: self.location.clone(),
2624 },
2625 };
2626 KeyedStream::new(
2627 out_location.clone(),
2628 HydroNode::BeginAtomic {
2629 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2630 metadata: out_location
2631 .new_node_metadata(KeyedStream::<K, V, Atomic<L>, B, O, R>::collection_kind()),
2632 },
2633 )
2634 }
2635
2636 /// Given a tick, returns a keyed stream corresponding to a batch of elements segmented by
2637 /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
2638 /// the order of the input.
2639 ///
2640 /// # Non-Determinism
2641 /// The batch boundaries are non-deterministic and may change across executions.
2642 pub fn batch<L2: Location<'a, DropConsistency = L::DropConsistency>>(
2643 self,
2644 tick: &Tick<L2>,
2645 nondet: NonDet,
2646 ) -> KeyedStream<K, V, Tick<L::DropConsistency>, Bounded, O, R> {
2647 let _ = nondet;
2648 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
2649 KeyedStream::new(
2650 tick.drop_consistency(),
2651 HydroNode::Batch {
2652 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2653 metadata: tick.new_node_metadata(
2654 KeyedStream::<K, V, Tick<L>, Bounded, O, R>::collection_kind(),
2655 ),
2656 },
2657 )
2658 }
2659}
2660
2661impl<'a, K1, K2, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
2662 KeyedStream<(K1, K2), V, L, B, O, R>
2663{
2664 /// Produces a new keyed stream by dropping the first element of the compound key.
2665 ///
2666 /// Because multiple keys may share the same suffix, this operation results in re-grouping
2667 /// of the values under the new keys. The values across groups with the same new key
2668 /// will be interleaved, so the resulting stream has [`NoOrder`] within each group.
2669 ///
2670 /// # Example
2671 /// ```rust
2672 /// # #[cfg(feature = "deploy")] {
2673 /// # use hydro_lang::prelude::*;
2674 /// # use futures::StreamExt;
2675 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2676 /// process
2677 /// .source_iter(q!(vec![((1, 10), 2), ((1, 10), 3), ((2, 20), 4)]))
2678 /// .into_keyed()
2679 /// .drop_key_prefix()
2680 /// # .entries()
2681 /// # }, |mut stream| async move {
2682 /// // { 10: [2, 3], 20: [4] }
2683 /// # let mut results = Vec::new();
2684 /// # for _ in 0..3 {
2685 /// # results.push(stream.next().await.unwrap());
2686 /// # }
2687 /// # results.sort();
2688 /// # assert_eq!(results, vec![(10, 2), (10, 3), (20, 4)]);
2689 /// # }));
2690 /// # }
2691 /// ```
2692 pub fn drop_key_prefix(self) -> KeyedStream<K2, V, L, B, NoOrder, R> {
2693 self.entries()
2694 .map(q!(|((_k1, k2), v)| (k2, v)))
2695 .into_keyed()
2696 }
2697}
2698
2699impl<'a, K, V, L: Location<'a>, O: Ordering, R: Retries> KeyedStream<K, V, L, Unbounded, O, R> {
2700 /// Produces a new keyed stream that "merges" the inputs by interleaving the elements
2701 /// of any overlapping groups. The result has [`NoOrder`] on each group because the
2702 /// order of interleaving is not guaranteed. If the keys across both inputs do not overlap,
2703 /// the ordering will be deterministic and you can safely use [`Self::assume_ordering`].
2704 ///
2705 /// Currently, both input streams must be [`Unbounded`].
2706 ///
2707 /// # Example
2708 /// ```rust
2709 /// # #[cfg(feature = "deploy")] {
2710 /// # use hydro_lang::prelude::*;
2711 /// # use futures::StreamExt;
2712 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2713 /// let numbers1: KeyedStream<i32, i32, _> = // { 1: [2], 3: [4] }
2714 /// # process.source_iter(q!(vec![(1, 2), (3, 4)])).into_keyed().into();
2715 /// let numbers2: KeyedStream<i32, i32, _> = // { 1: [3], 3: [5] }
2716 /// # process.source_iter(q!(vec![(1, 3), (3, 5)])).into_keyed().into();
2717 /// numbers1.merge_unordered(numbers2)
2718 /// # .entries()
2719 /// # }, |mut stream| async move {
2720 /// // { 1: [2, 3], 3: [4, 5] } with each group in unknown order
2721 /// # let mut results = Vec::new();
2722 /// # for _ in 0..4 {
2723 /// # results.push(stream.next().await.unwrap());
2724 /// # }
2725 /// # results.sort();
2726 /// # assert_eq!(results, vec![(1, 2), (1, 3), (3, 4), (3, 5)]);
2727 /// # }));
2728 /// # }
2729 /// ```
2730 pub fn merge_unordered<O2: Ordering, R2: Retries>(
2731 self,
2732 other: KeyedStream<K, V, L, Unbounded, O2, R2>,
2733 ) -> KeyedStream<K, V, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2734 where
2735 R: MinRetries<R2>,
2736 {
2737 KeyedStream::new(
2738 self.location.clone(),
2739 HydroNode::Chain {
2740 first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2741 second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2742 metadata: self.location.new_node_metadata(KeyedStream::<
2743 K,
2744 V,
2745 L,
2746 Unbounded,
2747 NoOrder,
2748 <R as MinRetries<R2>>::Min,
2749 >::collection_kind()),
2750 },
2751 )
2752 }
2753
2754 /// Deprecated: use [`KeyedStream::merge_unordered`] instead.
2755 #[deprecated(note = "use `merge_unordered` instead")]
2756 pub fn interleave<O2: Ordering, R2: Retries>(
2757 self,
2758 other: KeyedStream<K, V, L, Unbounded, O2, R2>,
2759 ) -> KeyedStream<K, V, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2760 where
2761 R: MinRetries<R2>,
2762 {
2763 self.merge_unordered(other)
2764 }
2765}
2766
2767impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> KeyedStream<K, V, Atomic<L>, B, O, R>
2768where
2769 L: Location<'a>,
2770{
2771 /// Returns a keyed stream corresponding to the latest batch of elements being atomically
2772 /// processed. These batches are guaranteed to be contiguous across ticks and preserve
2773 /// the order of the input. The output keyed stream will execute in the [`Tick`] that was
2774 /// used to create the atomic section.
2775 ///
2776 /// # Non-Determinism
2777 /// The batch boundaries are non-deterministic and may change across executions.
2778 pub fn batch_atomic<L2: Location<'a, DropConsistency = L::DropConsistency>>(
2779 self,
2780 tick: &Tick<L2>,
2781 nondet: NonDet,
2782 ) -> KeyedStream<K, V, Tick<L::DropConsistency>, Bounded, O, R> {
2783 let _ = nondet;
2784 KeyedStream::new(
2785 tick.drop_consistency(),
2786 HydroNode::Batch {
2787 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2788 metadata: tick.new_node_metadata(
2789 KeyedStream::<K, V, Tick<L>, Bounded, O, R>::collection_kind(),
2790 ),
2791 },
2792 )
2793 }
2794
2795 /// Yields the elements of this keyed stream back into a top-level, asynchronous execution context.
2796 /// See [`KeyedStream::atomic`] for more details.
2797 pub fn end_atomic(self) -> KeyedStream<K, V, L, B, O, R> {
2798 KeyedStream::new(
2799 self.location.tick.l.clone(),
2800 HydroNode::EndAtomic {
2801 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2802 metadata: self
2803 .location
2804 .tick
2805 .l
2806 .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
2807 },
2808 )
2809 }
2810}
2811
2812impl<'a, K, V, L, O: Ordering, R: Retries> KeyedStream<K, V, Tick<L>, Bounded, O, R>
2813where
2814 L: Location<'a>,
2815{
2816 /// Asynchronously yields this batch of keyed elements outside the tick as an unbounded keyed stream,
2817 /// which will stream all the elements across _all_ tick iterations by concatenating the batches for
2818 /// each key.
2819 pub fn all_ticks(self) -> KeyedStream<K, V, L, Unbounded, O, R> {
2820 KeyedStream::new(
2821 self.location.outer().clone(),
2822 HydroNode::YieldConcat {
2823 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2824 metadata: self.location.outer().new_node_metadata(KeyedStream::<
2825 K,
2826 V,
2827 L,
2828 Unbounded,
2829 O,
2830 R,
2831 >::collection_kind(
2832 )),
2833 },
2834 )
2835 }
2836
2837 /// Synchronously yields this batch of keyed elements outside the tick as an unbounded keyed stream,
2838 /// which will stream all the elements across _all_ tick iterations by concatenating the batches for
2839 /// each key.
2840 ///
2841 /// Unlike [`KeyedStream::all_ticks`], this preserves synchronous execution, as the output stream
2842 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
2843 /// stream's [`Tick`] context.
2844 pub fn all_ticks_atomic(self) -> KeyedStream<K, V, Atomic<L>, Unbounded, O, R> {
2845 let out_location = Atomic {
2846 tick: self.location.clone(),
2847 };
2848
2849 KeyedStream::new(
2850 out_location.clone(),
2851 HydroNode::YieldConcat {
2852 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2853 metadata: out_location.new_node_metadata(KeyedStream::<
2854 K,
2855 V,
2856 Atomic<L>,
2857 Unbounded,
2858 O,
2859 R,
2860 >::collection_kind()),
2861 },
2862 )
2863 }
2864
2865 /// Transforms the keyed stream using the given closure in "stateful" mode, where stateful operators
2866 /// such as `fold` retrain their memory for each key across ticks rather than resetting across batches of each key.
2867 ///
2868 /// This API is particularly useful for stateful computation on batches of data, such as
2869 /// maintaining an accumulated state that is up to date with the current batch.
2870 ///
2871 /// # Example
2872 /// ```rust
2873 /// # #[cfg(feature = "deploy")] {
2874 /// # use hydro_lang::prelude::*;
2875 /// # use futures::StreamExt;
2876 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2877 /// let tick = process.tick();
2878 /// # // ticks are lazy by default, forces the second tick to run
2879 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2880 /// # let batch_first_tick = process
2881 /// # .source_iter(q!(vec![(0, 1), (1, 2), (2, 3), (3, 4)]))
2882 /// # .into_keyed()
2883 /// # .batch(&tick, nondet!(/** test */));
2884 /// # let batch_second_tick = process
2885 /// # .source_iter(q!(vec![(0, 5), (1, 6), (2, 7)]))
2886 /// # .into_keyed()
2887 /// # .batch(&tick, nondet!(/** test */))
2888 /// # .defer_tick(); // appears on the second tick
2889 /// let input = batch_first_tick.chain(batch_second_tick).all_ticks();
2890 ///
2891 /// input.batch(&tick, nondet!(/** test */))
2892 /// .across_ticks(|s| s.reduce(q!(|sum, new| {
2893 /// *sum += new;
2894 /// }))).entries().all_ticks()
2895 /// # }, |mut stream| async move {
2896 /// // First tick: [(0, 1), (1, 2), (2, 3), (3, 4)]
2897 /// # let mut results = Vec::new();
2898 /// # for _ in 0..4 {
2899 /// # results.push(stream.next().await.unwrap());
2900 /// # }
2901 /// # results.sort();
2902 /// # assert_eq!(results, vec![(0, 1), (1, 2), (2, 3), (3, 4)]);
2903 /// // Second tick: [(0, 6), (1, 8), (2, 10), (3, 4)]
2904 /// # results.clear();
2905 /// # for _ in 0..4 {
2906 /// # results.push(stream.next().await.unwrap());
2907 /// # }
2908 /// # results.sort();
2909 /// # assert_eq!(results, vec![(0, 6), (1, 8), (2, 10), (3, 4)]);
2910 /// # }));
2911 /// # }
2912 /// ```
2913 pub fn across_ticks<Out: BatchAtomic<'a>>(
2914 self,
2915 thunk: impl FnOnce(KeyedStream<K, V, Atomic<L>, Unbounded, O, R>) -> Out,
2916 ) -> Out::Batched {
2917 thunk(self.all_ticks_atomic()).batched_atomic()
2918 }
2919
2920 /// Shifts the entries in `self` to the **next tick**, so that the returned keyed stream at
2921 /// tick `T` always has the entries of `self` at tick `T - 1`.
2922 ///
2923 /// At tick `0`, the output keyed stream is empty, since there is no previous tick.
2924 ///
2925 /// This operator enables stateful iterative processing with ticks, by sending data from one
2926 /// tick to the next. For example, you can use it to combine inputs across consecutive batches.
2927 ///
2928 /// # Example
2929 /// ```rust
2930 /// # #[cfg(feature = "deploy")] {
2931 /// # use hydro_lang::prelude::*;
2932 /// # use futures::StreamExt;
2933 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2934 /// let tick = process.tick();
2935 /// # // ticks are lazy by default, forces the second tick to run
2936 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2937 /// # let batch_first_tick = process
2938 /// # .source_iter(q!(vec![(1, 2), (1, 3)]))
2939 /// # .batch(&tick, nondet!(/** test */))
2940 /// # .into_keyed();
2941 /// # let batch_second_tick = process
2942 /// # .source_iter(q!(vec![(1, 4), (2, 5)]))
2943 /// # .batch(&tick, nondet!(/** test */))
2944 /// # .defer_tick()
2945 /// # .into_keyed(); // appears on the second tick
2946 /// let changes_across_ticks = // { 1: [2, 3] } (first tick), { 1: [4], 2: [5] } (second tick)
2947 /// # batch_first_tick.chain(batch_second_tick);
2948 /// changes_across_ticks.clone().defer_tick().chain( // from the previous tick
2949 /// changes_across_ticks // from the current tick
2950 /// )
2951 /// # .entries().all_ticks()
2952 /// # }, |mut stream| async move {
2953 /// // First tick: { 1: [2, 3] }
2954 /// # let mut results = Vec::new();
2955 /// # for _ in 0..2 {
2956 /// # results.push(stream.next().await.unwrap());
2957 /// # }
2958 /// # results.sort();
2959 /// # assert_eq!(results, vec![(1, 2), (1, 3)]);
2960 /// // Second tick: { 1: [2, 3, 4], 2: [5] }
2961 /// # results.clear();
2962 /// # for _ in 0..4 {
2963 /// # results.push(stream.next().await.unwrap());
2964 /// # }
2965 /// # results.sort();
2966 /// # assert_eq!(results, vec![(1, 2), (1, 3), (1, 4), (2, 5)]);
2967 /// // Third tick: { 1: [4], 2: [5] }
2968 /// # results.clear();
2969 /// # for _ in 0..2 {
2970 /// # results.push(stream.next().await.unwrap());
2971 /// # }
2972 /// # results.sort();
2973 /// # assert_eq!(results, vec![(1, 4), (2, 5)]);
2974 /// # }));
2975 /// # }
2976 /// ```
2977 pub fn defer_tick(self) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
2978 KeyedStream::new(
2979 self.location.clone(),
2980 HydroNode::DeferTick {
2981 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2982 metadata: self.location.new_node_metadata(KeyedStream::<
2983 K,
2984 V,
2985 Tick<L>,
2986 Bounded,
2987 O,
2988 R,
2989 >::collection_kind()),
2990 },
2991 )
2992 }
2993}
2994
2995#[cfg(test)]
2996mod tests {
2997 #[cfg(feature = "deploy")]
2998 use futures::{SinkExt, StreamExt};
2999 #[cfg(feature = "deploy")]
3000 use hydro_deploy::Deployment;
3001 #[cfg(any(feature = "deploy", feature = "sim"))]
3002 use stageleft::q;
3003
3004 #[cfg(any(feature = "deploy", feature = "sim"))]
3005 use crate::compile::builder::FlowBuilder;
3006 #[cfg(feature = "deploy")]
3007 use crate::live_collections::stream::ExactlyOnce;
3008 #[cfg(feature = "sim")]
3009 use crate::live_collections::stream::{NoOrder, TotalOrder};
3010 #[cfg(any(feature = "deploy", feature = "sim"))]
3011 use crate::location::Location;
3012 #[cfg(feature = "sim")]
3013 use crate::networking::TCP;
3014 #[cfg(any(feature = "deploy", feature = "sim"))]
3015 use crate::nondet::nondet;
3016 #[cfg(feature = "deploy")]
3017 use crate::properties::manual_proof;
3018
3019 #[cfg(feature = "deploy")]
3020 #[tokio::test]
3021 async fn get_unbounded_keyed_stream_bounded_singleton() {
3022 let mut deployment = Deployment::new();
3023
3024 let mut flow = FlowBuilder::new();
3025 let node = flow.process::<()>();
3026 let external = flow.external::<()>();
3027
3028 let (input_send, input_stream) =
3029 node.source_external_bincode::<_, (i32, i32), _, ExactlyOnce>(&external);
3030
3031 let key = node.singleton(q!(1));
3032
3033 let out = input_stream
3034 .into_keyed()
3035 .get(key)
3036 .send_bincode_external(&external);
3037
3038 let nodes = flow
3039 .with_process(&node, deployment.Localhost())
3040 .with_external(&external, deployment.Localhost())
3041 .deploy(&mut deployment);
3042
3043 deployment.deploy().await.unwrap();
3044
3045 let mut input_send = nodes.connect(input_send).await;
3046 let mut out = nodes.connect(out).await;
3047
3048 deployment.start().await.unwrap();
3049
3050 // First batch
3051 input_send.send((1, 10)).await.unwrap();
3052 input_send.send((2, 20)).await.unwrap();
3053 assert_eq!(out.next().await.unwrap(), 10);
3054
3055 // Second batch
3056 input_send.send((1, 11)).await.unwrap();
3057 input_send.send((2, 21)).await.unwrap();
3058 assert_eq!(out.next().await.unwrap(), 11);
3059 }
3060
3061 #[cfg(feature = "deploy")]
3062 #[tokio::test]
3063 async fn reduce_watermark_filter() {
3064 let mut deployment = Deployment::new();
3065
3066 let mut flow = FlowBuilder::new();
3067 let node = flow.process::<()>();
3068 let external = flow.external::<()>();
3069
3070 let node_tick = node.tick();
3071 let watermark = node_tick.singleton(q!(2));
3072
3073 let sum = node
3074 .source_stream(q!(tokio_stream::iter([
3075 (0, 100),
3076 (1, 101),
3077 (2, 102),
3078 (2, 102)
3079 ])))
3080 .into_keyed()
3081 .reduce_watermark(
3082 watermark,
3083 q!(|acc, v| {
3084 *acc += v;
3085 }),
3086 )
3087 .snapshot(&node_tick, nondet!(/** test */))
3088 .entries()
3089 .all_ticks()
3090 .send_bincode_external(&external);
3091
3092 let nodes = flow
3093 .with_process(&node, deployment.Localhost())
3094 .with_external(&external, deployment.Localhost())
3095 .deploy(&mut deployment);
3096
3097 deployment.deploy().await.unwrap();
3098
3099 let mut out = nodes.connect(sum).await;
3100
3101 deployment.start().await.unwrap();
3102
3103 assert_eq!(out.next().await.unwrap(), (2, 204));
3104 }
3105
3106 #[cfg(feature = "deploy")]
3107 #[tokio::test]
3108 async fn reduce_watermark_bounded() {
3109 let mut deployment = Deployment::new();
3110
3111 let mut flow = FlowBuilder::new();
3112 let node = flow.process::<()>();
3113 let external = flow.external::<()>();
3114
3115 let node_tick = node.tick();
3116 let watermark = node_tick.singleton(q!(2));
3117
3118 let sum = node
3119 .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
3120 .into_keyed()
3121 .reduce_watermark(
3122 watermark,
3123 q!(|acc, v| {
3124 *acc += v;
3125 }),
3126 )
3127 .entries()
3128 .send_bincode_external(&external);
3129
3130 let nodes = flow
3131 .with_process(&node, deployment.Localhost())
3132 .with_external(&external, deployment.Localhost())
3133 .deploy(&mut deployment);
3134
3135 deployment.deploy().await.unwrap();
3136
3137 let mut out = nodes.connect(sum).await;
3138
3139 deployment.start().await.unwrap();
3140
3141 assert_eq!(out.next().await.unwrap(), (2, 204));
3142 }
3143
3144 #[cfg(feature = "deploy")]
3145 #[tokio::test]
3146 async fn reduce_watermark_garbage_collect() {
3147 let mut deployment = Deployment::new();
3148
3149 let mut flow = FlowBuilder::new();
3150 let node = flow.process::<()>();
3151 let external = flow.external::<()>();
3152 let (tick_send, tick_trigger) =
3153 node.source_external_bincode::<_, _, _, ExactlyOnce>(&external);
3154
3155 let node_tick = node.tick();
3156 let (watermark_complete_cycle, watermark) =
3157 node_tick.cycle_with_initial(node_tick.singleton(q!(2)));
3158 let next_watermark = watermark.clone().map(q!(|v| v + 1));
3159 watermark_complete_cycle.complete_next_tick(next_watermark);
3160
3161 let tick_triggered_input = node_tick
3162 .singleton(q!((3, 103)))
3163 .into_stream()
3164 .filter_if(
3165 tick_trigger
3166 .clone()
3167 .batch(&node_tick, nondet!(/** test */))
3168 .first()
3169 .is_some(),
3170 )
3171 .all_ticks();
3172
3173 let sum = node
3174 .source_stream(q!(tokio_stream::iter([
3175 (0, 100),
3176 (1, 101),
3177 (2, 102),
3178 (2, 102)
3179 ])))
3180 .merge_unordered(tick_triggered_input)
3181 .into_keyed()
3182 .reduce_watermark(
3183 watermark,
3184 q!(
3185 |acc, v| {
3186 *acc += v;
3187 },
3188 commutative = manual_proof!(/** integer addition is commutative */)
3189 ),
3190 )
3191 .snapshot(&node_tick, nondet!(/** test */))
3192 .entries()
3193 .all_ticks()
3194 .send_bincode_external(&external);
3195
3196 let nodes = flow
3197 .with_default_optimize()
3198 .with_process(&node, deployment.Localhost())
3199 .with_external(&external, deployment.Localhost())
3200 .deploy(&mut deployment);
3201
3202 deployment.deploy().await.unwrap();
3203
3204 let mut tick_send = nodes.connect(tick_send).await;
3205 let mut out_recv = nodes.connect(sum).await;
3206
3207 deployment.start().await.unwrap();
3208
3209 assert_eq!(out_recv.next().await.unwrap(), (2, 204));
3210
3211 tick_send.send(()).await.unwrap();
3212
3213 assert_eq!(out_recv.next().await.unwrap(), (3, 103));
3214 }
3215
3216 #[cfg(feature = "sim")]
3217 #[test]
3218 #[should_panic]
3219 fn sim_batch_nondet_size() {
3220 let mut flow = FlowBuilder::new();
3221 let node = flow.process::<()>();
3222
3223 let input = node.source_iter(q!([(1, 1), (1, 2), (2, 3)])).into_keyed();
3224
3225 let tick = node.tick();
3226 let out_recv = input
3227 .batch(&tick, nondet!(/** test */))
3228 .fold(q!(|| vec![]), q!(|acc, v| acc.push(v)))
3229 .entries()
3230 .all_ticks()
3231 .sim_output();
3232
3233 flow.sim().exhaustive(async || {
3234 out_recv
3235 .assert_yields_only_unordered([(1, vec![1, 2])])
3236 .await;
3237 });
3238 }
3239
3240 #[cfg(feature = "sim")]
3241 #[test]
3242 fn sim_batch_preserves_group_order() {
3243 let mut flow = FlowBuilder::new();
3244 let node = flow.process::<()>();
3245
3246 let input = node.source_iter(q!([(1, 1), (1, 2), (2, 3)])).into_keyed();
3247
3248 let tick = node.tick();
3249 let out_recv = input
3250 .batch(&tick, nondet!(/** test */))
3251 .all_ticks()
3252 .fold_early_stop(
3253 q!(|| 0),
3254 q!(|acc, v| {
3255 *acc = std::cmp::max(v, *acc);
3256 *acc >= 2
3257 }),
3258 )
3259 .entries()
3260 .sim_output();
3261
3262 let instances = flow.sim().exhaustive(async || {
3263 out_recv
3264 .assert_yields_only_unordered([(1, 2), (2, 3)])
3265 .await;
3266 });
3267
3268 assert_eq!(instances, 8);
3269 // - three cases: all three in a separate tick (pick where (2, 3) is)
3270 // - two cases: (1, 1) and (1, 2) together, (2, 3) before or after
3271 // - two cases: (1, 1) and (1, 2) separate, (2, 3) grouped with one of them
3272 // - one case: all three together
3273 }
3274
3275 #[cfg(feature = "sim")]
3276 #[test]
3277 fn sim_batch_unordered_shuffles() {
3278 let mut flow = FlowBuilder::new();
3279 let node = flow.process::<()>();
3280
3281 let input = node
3282 .source_iter(q!([(1, 1), (1, 2), (2, 3)]))
3283 .into_keyed()
3284 .weaken_ordering::<NoOrder>();
3285
3286 let tick = node.tick();
3287 let out_recv = input
3288 .batch(&tick, nondet!(/** test */))
3289 .all_ticks()
3290 .entries()
3291 .sim_output();
3292
3293 let instances = flow.sim().exhaustive(async || {
3294 out_recv
3295 .assert_yields_only_unordered([(1, 1), (1, 2), (2, 3)])
3296 .await;
3297 });
3298
3299 assert_eq!(instances, 13);
3300 // - 6 (3 * 2) cases: all three in a separate tick (pick where (2, 3) is), and order of (1, 1), (1, 2)
3301 // - two cases: (1, 1) and (1, 2) together, (2, 3) before or after (order of (1, 1), (1, 2) doesn't matter because batched is still unordered)
3302 // - 4 (2 * 2) cases: (1, 1) and (1, 2) separate, (2, 3) grouped with one of them, and order of (1, 1), (1, 2)
3303 // - one case: all three together (order of (1, 1), (1, 2) doesn't matter because batched is still unordered)
3304 }
3305
3306 #[cfg(feature = "sim")]
3307 #[test]
3308 #[should_panic]
3309 fn sim_observe_order_batched() {
3310 let mut flow = FlowBuilder::new();
3311 let node = flow.process::<()>();
3312
3313 let (port, input) = node.sim_input::<_, NoOrder, _>();
3314
3315 let tick = node.tick();
3316 let batch = input.into_keyed().batch(&tick, nondet!(/** test */));
3317 let out_recv = batch
3318 .assume_ordering::<TotalOrder>(nondet!(/** test */))
3319 .all_ticks()
3320 .first()
3321 .entries()
3322 .sim_output();
3323
3324 flow.sim().exhaustive(async || {
3325 port.send_many_unordered([(1, 1), (1, 2), (2, 1), (2, 2)]);
3326 out_recv
3327 .assert_yields_only_unordered([(1, 1), (2, 1)])
3328 .await; // fails with assume_ordering
3329 });
3330 }
3331
3332 #[cfg(feature = "sim")]
3333 #[test]
3334 fn sim_observe_order_batched_count() {
3335 let mut flow = FlowBuilder::new();
3336 let node = flow.process::<()>();
3337
3338 let (port, input) = node.sim_input::<_, NoOrder, _>();
3339
3340 let tick = node.tick();
3341 let batch = input.into_keyed().batch(&tick, nondet!(/** test */));
3342 let out_recv = batch
3343 .assume_ordering::<TotalOrder>(nondet!(/** test */))
3344 .all_ticks()
3345 .entries()
3346 .sim_output();
3347
3348 let instance_count = flow.sim().exhaustive(async || {
3349 port.send_many_unordered([(1, 1), (1, 2), (2, 1), (2, 2)]);
3350 let _ = out_recv.collect_sorted::<Vec<_>>().await;
3351 });
3352
3353 assert_eq!(instance_count, 104); // too complicated to enumerate here, but less than stream equivalent
3354 }
3355
3356 #[cfg(feature = "sim")]
3357 #[test]
3358 fn sim_top_level_assume_ordering() {
3359 use std::collections::HashMap;
3360
3361 let mut flow = FlowBuilder::new();
3362 let node = flow.process::<()>();
3363
3364 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3365
3366 let out_recv = input
3367 .into_keyed()
3368 .assume_ordering::<TotalOrder>(nondet!(/** test */))
3369 .fold_early_stop(
3370 q!(|| Vec::new()),
3371 q!(|acc, v| {
3372 acc.push(v);
3373 acc.len() >= 2
3374 }),
3375 )
3376 .entries()
3377 .sim_output();
3378
3379 let instance_count = flow.sim().exhaustive(async || {
3380 in_send.send_many_unordered([(1, 'a'), (1, 'b'), (2, 'c'), (2, 'd')]);
3381 let out: HashMap<_, _> = out_recv
3382 .collect_sorted::<Vec<_>>()
3383 .await
3384 .into_iter()
3385 .collect();
3386 // Each key accumulates its values; we get one entry per key
3387 assert_eq!(out.len(), 2);
3388 });
3389
3390 assert_eq!(instance_count, 24)
3391 }
3392
3393 #[cfg(feature = "sim")]
3394 #[test]
3395 fn sim_top_level_assume_ordering_cycle_back() {
3396 use std::collections::HashMap;
3397
3398 let mut flow = FlowBuilder::new();
3399 let node = flow.process::<()>();
3400 let node2 = flow.process::<()>();
3401
3402 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3403
3404 let (complete_cycle_back, cycle_back) =
3405 node.forward_ref::<super::KeyedStream<_, _, _, _, NoOrder>>();
3406 let ordered = input
3407 .into_keyed()
3408 .merge_unordered(cycle_back)
3409 .assume_ordering::<TotalOrder>(nondet!(/** test */));
3410 complete_cycle_back.complete(
3411 ordered
3412 .clone()
3413 .map(q!(|v| v + 1))
3414 .filter(q!(|v| v % 2 == 1))
3415 .entries()
3416 .send(&node2, TCP.fail_stop().bincode())
3417 .send(&node, TCP.fail_stop().bincode())
3418 .into_keyed(),
3419 );
3420
3421 let out_recv = ordered
3422 .fold_early_stop(
3423 q!(|| Vec::new()),
3424 q!(|acc, v| {
3425 acc.push(v);
3426 acc.len() >= 2
3427 }),
3428 )
3429 .entries()
3430 .sim_output();
3431
3432 let mut saw = false;
3433 let instance_count = flow.sim().exhaustive(async || {
3434 // Send (1, 0) and (1, 2). 0+1=1 is odd so cycles back.
3435 // We want to see [0, 1] - the cycled back value interleaved
3436 in_send.send_many_unordered([(1, 0), (1, 2)]);
3437 let out: HashMap<_, _> = out_recv
3438 .collect_sorted::<Vec<_>>()
3439 .await
3440 .into_iter()
3441 .collect();
3442
3443 // We want to see an instance where key 1 gets: 0, then 1 (cycled back from 0+1)
3444 if let Some(values) = out.get(&1)
3445 && *values == vec![0, 1]
3446 {
3447 saw = true;
3448 }
3449 });
3450
3451 assert!(
3452 saw,
3453 "did not see an instance with key 1 having [0, 1] in order"
3454 );
3455 assert_eq!(instance_count, 6);
3456 }
3457
3458 #[cfg(feature = "sim")]
3459 #[test]
3460 fn sim_top_level_assume_ordering_cross_key_cycle() {
3461 use std::collections::HashMap;
3462
3463 // This test demonstrates why releasing one entry at a time is important:
3464 // When one key's observed order cycles back into a different key, we need
3465 // to be able to interleave the cycled-back entry with pending items for
3466 // that other key.
3467 let mut flow = FlowBuilder::new();
3468 let node = flow.process::<()>();
3469 let node2 = flow.process::<()>();
3470
3471 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3472
3473 let (complete_cycle_back, cycle_back) =
3474 node.forward_ref::<super::KeyedStream<_, _, _, _, NoOrder>>();
3475 let ordered = input
3476 .into_keyed()
3477 .merge_unordered(cycle_back)
3478 .assume_ordering::<TotalOrder>(nondet!(/** test */));
3479
3480 // Cycle back: when we see (1, 10), emit (2, 100) to key 2
3481 complete_cycle_back.complete(
3482 ordered
3483 .clone()
3484 .filter(q!(|v| *v == 10))
3485 .map(q!(|_| 100))
3486 .entries()
3487 .map(q!(|(_, v)| (2, v))) // Change key from 1 to 2
3488 .send(&node2, TCP.fail_stop().bincode())
3489 .send(&node, TCP.fail_stop().bincode())
3490 .into_keyed(),
3491 );
3492
3493 let out_recv = ordered
3494 .fold_early_stop(
3495 q!(|| Vec::new()),
3496 q!(|acc, v| {
3497 acc.push(v);
3498 acc.len() >= 2
3499 }),
3500 )
3501 .entries()
3502 .sim_output();
3503
3504 // We want to see an instance where:
3505 // - (1, 10) is released first
3506 // - This causes (2, 100) to be cycled back
3507 // - (2, 100) is released BEFORE (2, 20) which was already pending
3508 let mut saw_cross_key_interleave = false;
3509 let instance_count = flow.sim().exhaustive(async || {
3510 // Send (1, 10), (1, 11) for key 1, and (2, 20), (2, 21) for key 2
3511 in_send.send_many_unordered([(1, 10), (1, 11), (2, 20), (2, 21)]);
3512 let out: HashMap<_, _> = out_recv
3513 .collect_sorted::<Vec<_>>()
3514 .await
3515 .into_iter()
3516 .collect();
3517
3518 // Check if we see the cross-key interleaving:
3519 // key 2 should have [100, 20] or [100, 21] - cycled back 100 before a pending item
3520 if let Some(values) = out.get(&2)
3521 && values.len() >= 2
3522 && values[0] == 100
3523 {
3524 saw_cross_key_interleave = true;
3525 }
3526 });
3527
3528 assert!(
3529 saw_cross_key_interleave,
3530 "did not see an instance where cycled-back 100 was released before pending items for key 2"
3531 );
3532 assert_eq!(instance_count, 60);
3533 }
3534
3535 #[cfg(feature = "sim")]
3536 #[test]
3537 fn sim_top_level_assume_ordering_cycle_back_tick() {
3538 use std::collections::HashMap;
3539
3540 let mut flow = FlowBuilder::new();
3541 let node = flow.process::<()>();
3542 let node2 = flow.process::<()>();
3543
3544 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3545
3546 let (complete_cycle_back, cycle_back) =
3547 node.forward_ref::<super::KeyedStream<_, _, _, _, NoOrder>>();
3548 let ordered = input
3549 .into_keyed()
3550 .merge_unordered(cycle_back)
3551 .assume_ordering::<TotalOrder>(nondet!(/** test */));
3552 complete_cycle_back.complete(
3553 ordered
3554 .clone()
3555 .batch(&node.tick(), nondet!(/** test */))
3556 .all_ticks()
3557 .map(q!(|v| v + 1))
3558 .filter(q!(|v| v % 2 == 1))
3559 .entries()
3560 .send(&node2, TCP.fail_stop().bincode())
3561 .send(&node, TCP.fail_stop().bincode())
3562 .into_keyed(),
3563 );
3564
3565 let out_recv = ordered
3566 .fold_early_stop(
3567 q!(|| Vec::new()),
3568 q!(|acc, v| {
3569 acc.push(v);
3570 acc.len() >= 2
3571 }),
3572 )
3573 .entries()
3574 .sim_output();
3575
3576 let mut saw = false;
3577 let instance_count = flow.sim().exhaustive(async || {
3578 in_send.send_many_unordered([(1, 0), (1, 2)]);
3579 let out: HashMap<_, _> = out_recv
3580 .collect_sorted::<Vec<_>>()
3581 .await
3582 .into_iter()
3583 .collect();
3584
3585 if let Some(values) = out.get(&1)
3586 && *values == vec![0, 1]
3587 {
3588 saw = true;
3589 }
3590 });
3591
3592 assert!(
3593 saw,
3594 "did not see an instance with key 1 having [0, 1] in order"
3595 );
3596 assert_eq!(instance_count, 58);
3597 }
3598
3599 #[cfg(feature = "sim")]
3600 #[test]
3601 fn sim_entries_partially_ordered_bounded() {
3602 let mut flow = FlowBuilder::new();
3603 let node = flow.process::<()>();
3604
3605 let (port, input) = node.sim_input::<_, TotalOrder, _>();
3606
3607 let tick = node.tick();
3608 let batch = input.into_keyed().batch(&tick, nondet!(/** test */));
3609 let out_recv = batch
3610 .entries_partially_ordered(nondet!(/** test */))
3611 .all_ticks()
3612 .sim_output();
3613
3614 let instance_count = flow.sim().exhaustive(async || {
3615 port.send((1, 'a'));
3616 port.send((1, 'b'));
3617 port.send((2, 'c'));
3618 let _: Vec<(i32, char)> = out_recv.collect().await;
3619 });
3620
3621 assert_eq!(instance_count, 12);
3622 }
3623
3624 #[cfg(feature = "sim")]
3625 #[test]
3626 fn sim_entries_partially_ordered_top_level() {
3627 let mut flow = FlowBuilder::new();
3628 let node = flow.process::<()>();
3629
3630 let (in_send, input) = node.sim_input::<_, TotalOrder, _>();
3631
3632 let out_recv = input
3633 .into_keyed()
3634 .entries_partially_ordered(nondet!(/** test */))
3635 .sim_output();
3636
3637 let instance_count = flow.sim().exhaustive(async || {
3638 in_send.send((1, 'a'));
3639 in_send.send((1, 'b'));
3640 in_send.send((2, 'c'));
3641 let _: Vec<(i32, char)> = out_recv.collect().await;
3642 });
3643
3644 assert_eq!(instance_count, 3);
3645 }
3646
3647 #[cfg(feature = "sim")]
3648 #[test]
3649 fn sim_entries_partially_ordered_cycle_back() {
3650 let mut flow = FlowBuilder::new();
3651 let node = flow.process::<()>();
3652 let node2 = flow.process::<()>();
3653
3654 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3655
3656 let (complete_cycle_back, cycle_back) =
3657 node.forward_ref::<super::KeyedStream<_, _, _, _, NoOrder>>();
3658 let ordered = input
3659 .into_keyed()
3660 .merge_unordered(cycle_back)
3661 .assume_ordering::<TotalOrder>(nondet!(/** test */));
3662
3663 let flat = ordered
3664 .clone()
3665 .entries_partially_ordered(nondet!(/** test */));
3666
3667 complete_cycle_back.complete(
3668 flat.clone()
3669 .map(q!(|(k, v): (i32, i32)| (k, v + 1)))
3670 .filter(q!(|(_, v)| *v % 2 == 1))
3671 .send(&node2, TCP.fail_stop().bincode())
3672 .send(&node, TCP.fail_stop().bincode())
3673 .into_keyed(),
3674 );
3675
3676 let out_recv = flat.sim_output();
3677
3678 let mut saw = false;
3679 let instance_count = flow.sim().exhaustive(async || {
3680 // Send (1, 0) and (1, 2). 0+1=1 is odd so cycles back as (1, 1).
3681 // We want to see (1, 1) before (1, 2) - the cycled back value beats the pending one
3682 in_send.send_many_unordered([(1, 0), (1, 2)]);
3683 let results: Vec<(i32, i32)> = out_recv.collect().await;
3684
3685 let pos_1 = results.iter().position(|v| *v == (1, 1));
3686 let pos_2 = results.iter().position(|v| *v == (1, 2));
3687 if let (Some(p1), Some(p2)) = (pos_1, pos_2)
3688 && p1 < p2
3689 {
3690 saw = true;
3691 }
3692 });
3693
3694 assert!(saw, "did not see an instance with (1, 1) before (1, 2)");
3695 assert_eq!(instance_count, 78);
3696 }
3697}