1use std::{
2 collections::{BTreeMap, VecDeque},
3 fmt::Debug,
4 io::Write,
5 ops::ControlFlow,
6 sync::Arc,
7};
8
9use itertools::Itertools;
10use serde::{Deserialize, Serialize};
11use sha2::{
12 digest::{generic_array::GenericArray, typenum::U32},
13 Digest, Sha256,
14};
15use ControlFlow::{Break, Continue};
16
17use super::scan_state::transaction_snark::{LedgerProofWithSokMessage, TransactionWithWitness};
18
19#[derive(Clone, Eq, Ord, PartialEq, PartialOrd)]
22pub struct SequenceNumber(u64);
23
24impl Debug for SequenceNumber {
25 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
26 f.write_fmt(format_args!("{}", self.0))
27 }
28}
29
30impl SequenceNumber {
31 pub(super) fn new(number: u64) -> Self {
32 Self(number)
33 }
34
35 fn zero() -> Self {
36 Self(0)
37 }
38
39 pub fn incr(&self) -> Self {
40 Self(self.0 + 1)
41 }
42
43 fn is_u64_max(&self) -> bool {
44 self.0 == u64::MAX
45 }
46
47 pub fn as_u64(&self) -> u64 {
48 self.0
49 }
50}
51
52impl std::ops::Sub for &'_ SequenceNumber {
53 type Output = SequenceNumber;
54
55 fn sub(self, rhs: &'_ SequenceNumber) -> Self::Output {
56 SequenceNumber(self.0 - rhs.0)
57 }
58}
59
60#[derive(Clone, Debug)]
63pub enum JobStatus {
64 Todo,
65 Done,
66}
67
68impl JobStatus {
69 pub fn is_done(&self) -> bool {
70 matches!(self, Self::Done)
71 }
72
73 fn as_str(&self) -> &'static str {
74 match self {
75 JobStatus::Todo => "Todo",
76 JobStatus::Done => "Done",
77 }
78 }
79}
80
81#[derive(Clone)]
85pub struct Weight {
86 pub(super) base: u64,
87 pub(super) merge: u64,
88}
89
90impl Debug for Weight {
91 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
92 write!(f, "{{ base: {} merge: {} }}", self.base, self.merge)
93 }
94}
95
96impl Weight {
97 fn zero() -> Self {
98 Self { base: 0, merge: 0 }
99 }
100}
101
102trait Lens {
103 type Value;
104 type Target;
105 fn get<'a>(&self, target: &'a Self::Target) -> &'a Self::Value;
106 fn set(&self, target: &Self::Target, value: Self::Value) -> Self::Target;
107}
108
109enum WeightLens {
110 Base,
111 Merge,
112}
113
114impl Lens for WeightLens {
115 type Value = u64;
116 type Target = Weight;
117
118 fn get<'a>(&self, target: &'a Self::Target) -> &'a Self::Value {
119 match self {
120 WeightLens::Base => &target.base,
121 WeightLens::Merge => &target.merge,
122 }
123 }
124
125 fn set(&self, target: &Self::Target, value: Self::Value) -> Self::Target {
126 match self {
127 WeightLens::Base => Self::Target {
128 base: value,
129 merge: target.merge,
130 },
131 WeightLens::Merge => Self::Target {
132 base: target.base,
133 merge: value,
134 },
135 }
136 }
137}
138
139#[derive(Debug)]
140enum WorkForTree {
141 Current,
142 Next,
143}
144
145pub mod base {
147 use super::*;
148
149 #[derive(Clone)]
150 pub struct Record<BaseJob> {
151 pub job: BaseJob,
152 pub seq_no: SequenceNumber,
153 pub state: JobStatus,
154 }
155
156 impl<BaseJob> Debug for Record<BaseJob> {
157 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
158 let Self {
159 job: _,
160 seq_no,
161 state,
162 } = self;
163 f.write_fmt(format_args!("seq_no: {:?}, state: {:?}", seq_no, state))
164 }
165 }
166
167 #[derive(Clone)]
168 pub enum Job<BaseJob> {
169 Empty,
170 Full(Record<BaseJob>),
171 }
172
173 impl<BaseJob> Debug for Job<BaseJob> {
174 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
175 match self {
176 Self::Empty => write!(f, "Empty"),
177 Self::Full(arg0) => f.write_fmt(format_args!("Full {{ {:?} }}", arg0)),
178 }
179 }
180 }
181
182 #[derive(Clone, Debug)]
183 pub struct Base<BaseJob> {
184 pub weight: Weight,
185 pub job: Job<BaseJob>,
186 }
187
188 impl<BaseJob: Clone> Record<BaseJob> {
189 pub fn map<F: Fn(&BaseJob) -> BaseJob>(&self, fun: F) -> Self {
190 Self {
191 job: fun(&self.job),
192 seq_no: self.seq_no.clone(),
193 state: self.state.clone(),
194 }
195 }
196
197 pub fn with_seq_no(&self, no: SequenceNumber) -> Self {
198 Self {
199 seq_no: no,
200 state: self.state.clone(),
201 job: self.job.clone(),
202 }
203 }
204 }
205
206 impl<BaseJob: Clone> Job<BaseJob> {
207 pub fn map<F: Fn(&BaseJob) -> BaseJob>(&self, fun: F) -> Self {
208 match self {
209 Job::Empty => Self::Empty,
210 Job::Full(r) => Job::Full(r.map(fun)),
211 }
212 }
213 }
214
215 impl<BaseJob: Clone> Base<BaseJob> {
216 pub fn map<F: Fn(&BaseJob) -> BaseJob>(&self, fun: F) -> Self {
217 Self {
218 weight: self.weight.clone(),
219 job: self.job.map(fun),
220 }
221 }
222
223 pub fn with_seq_no(&self, no: SequenceNumber) -> Self {
224 Self {
225 weight: self.weight.clone(),
226 job: match &self.job {
227 Job::Full(record) => Job::Full(record.with_seq_no(no)),
228 x => x.clone(),
229 },
230 }
231 }
232 }
233}
234
235pub mod merge {
237 use super::*;
238
239 #[derive(Clone)]
240 pub struct Record<MergeJob> {
241 pub left: MergeJob,
242 pub right: MergeJob,
243 pub seq_no: SequenceNumber,
244 pub state: JobStatus,
245 }
246
247 impl<MergeJob> Debug for Record<MergeJob> {
248 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
249 let Self {
250 left: _,
251 right: _,
252 seq_no,
253 state,
254 } = self;
255 f.write_fmt(format_args!("seq_no: {:?}, state: {:?}", seq_no, state))
256 }
257 }
258
259 impl<MergeJob: Clone> Record<MergeJob> {
260 pub fn with_seq_no(&self, no: SequenceNumber) -> Self {
261 Self {
262 seq_no: no,
263 left: self.left.clone(),
264 right: self.right.clone(),
265 state: self.state.clone(),
266 }
267 }
268 }
269
270 #[derive(Clone)]
271 pub enum Job<MergeJob> {
272 Empty,
273 Part(MergeJob), Full(Record<MergeJob>),
275 }
276
277 impl<MergeJob> Debug for Job<MergeJob> {
278 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
279 match self {
280 Self::Empty => write!(f, "Empty"),
281 Self::Part(_) => write!(f, "Part(merge)"),
282 Self::Full(arg0) => f.write_fmt(format_args!("Full {{ {:?} }}", arg0)),
283 }
284 }
285 }
286
287 #[derive(Clone, Debug)]
288 pub struct Merge<MergeJob> {
289 pub weight: (Weight, Weight),
290 pub job: Job<MergeJob>,
291 }
292
293 impl<MergeJob> Record<MergeJob> {
294 pub fn map<F: Fn(&MergeJob) -> MergeJob>(&self, fun: F) -> Self {
295 Self {
296 left: fun(&self.left),
297 right: fun(&self.right),
298 seq_no: self.seq_no.clone(),
299 state: self.state.clone(),
300 }
301 }
302 }
303
304 impl<MergeJob> Job<MergeJob> {
305 pub fn map<F: Fn(&MergeJob) -> MergeJob>(&self, fun: F) -> Self {
306 match self {
307 Job::Empty => Self::Empty,
308 Job::Part(j) => Job::Part(fun(j)),
309 Job::Full(r) => Job::Full(r.map(fun)),
310 }
311 }
312 }
313
314 impl<MergeJob: Clone> Merge<MergeJob> {
315 pub fn map<F: Fn(&MergeJob) -> MergeJob>(&self, fun: F) -> Self {
316 Self {
317 weight: self.weight.clone(),
318 job: self.job.map(fun),
319 }
320 }
321
322 pub fn with_seq_no(&self, no: SequenceNumber) -> Self {
323 Self {
324 weight: self.weight.clone(),
325 job: match &self.job {
326 Job::Full(record) => Job::Full(record.with_seq_no(no)),
327 x => x.clone(),
328 },
329 }
330 }
331 }
332}
333
334#[derive(Clone, Debug, Serialize, Deserialize)]
336pub enum AvailableJob<BaseJob, MergeJob> {
337 Base(BaseJob),
338 Merge { left: MergeJob, right: MergeJob },
339}
340
341#[derive(Clone, Debug)]
343enum Job<BaseJob, MergeJob> {
344 Base(BaseJob),
345 Merge(MergeJob),
346}
347
348#[derive(Clone)]
355pub struct SpacePartition {
356 pub first: (u64, u64),
357 pub second: Option<(u64, u64)>,
358}
359
360impl Debug for SpacePartition {
361 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
362 struct Detail {
363 space_available: u64,
364 njobs_to_be_completed: u64,
365 }
366
367 impl Debug for Detail {
368 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
369 f.write_fmt(format_args!(
370 "space_available: {}, njobs_to_be_completed={}",
371 self.space_available, self.njobs_to_be_completed
372 ))
373 }
374 }
375
376 f.debug_struct("SpacePartition")
377 .field(
378 "first(current_tree)",
379 &Detail {
380 space_available: self.first.0,
381 njobs_to_be_completed: self.first.1,
382 },
383 )
384 .field(
385 "second(next_tree)",
386 &self.second.map(|second| Detail {
387 space_available: second.0,
388 njobs_to_be_completed: second.1,
389 }),
390 )
391 .finish()
392 }
393}
394
395trait WithVTable<T>: Debug {
396 fn by_ref(&self) -> &T;
397}
398
399impl<T: Debug> WithVTable<T> for T {
400 fn by_ref(&self) -> &Self {
401 self
402 }
403}
404
405#[derive(Clone, Debug)]
406pub enum Value<B, M> {
407 Leaf(B),
408 Node(M),
409}
410
411#[derive(Clone)]
413pub struct Tree<B, M> {
414 pub(super) values: Vec<Value<B, M>>,
415}
416
417impl<B, M> Debug for Tree<base::Base<B>, merge::Merge<M>>
418where
419 B: Debug,
420 M: Debug,
421{
422 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
423 enum BaseOrMerge<'a, B, M> {
424 Base(&'a base::Base<B>),
425 Merge(&'a merge::Merge<M>),
426 }
427
428 impl<B, M> Debug for BaseOrMerge<'_, B, M>
429 where
430 B: Debug,
431 M: Debug,
432 {
433 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
434 match self {
435 Self::Base(arg0) => write!(f, "{:?}", arg0),
436 Self::Merge(arg0) => write!(f, "{:?}", arg0),
437 }
438 }
439 }
440
441 let mut by_depth = BTreeMap::<usize, Vec<_>>::default();
442
443 for (index, v) in self.values.iter().enumerate() {
444 let vec = by_depth.entry(btree::depth_at(index) as usize).or_default();
445 let v = match v {
446 Value::Leaf(b) => BaseOrMerge::Base(b),
447 Value::Node(m) => BaseOrMerge::Merge(m),
448 };
449 vec.push(v);
450 }
451
452 for (depth, values) in by_depth.iter() {
453 writeln!(f, "depth={} {:#?}", depth, values)?;
454 }
455
456 Ok(())
457 }
458}
459
460mod btree {
461 pub fn depth_at(index: usize) -> u64 {
463 let depth = ((index + 1) as f32).log2().floor() as u32;
466 depth as u64
467 }
468
469 pub fn child_left(index: usize) -> usize {
470 (index * 2) + 1
471 }
472
473 pub fn child_right(index: usize) -> usize {
474 (index * 2) + 2
475 }
476
477 pub fn parent(index: usize) -> Option<usize> {
478 Some(index.checked_sub(1)? / 2)
479 }
480
481 pub fn range_at_depth(depth: u64) -> std::ops::Range<usize> {
482 if depth == 0 {
483 return 0..1;
484 }
485
486 let start = (1 << depth) - 1;
487 let end = (1 << (depth + 1)) - 1;
488
489 start..end
490 }
491}
492
493impl<B, M> Tree<B, M>
494where
495 B: Debug + 'static,
496 M: Debug + 'static,
497{
498 fn map_depth<FunMerge, FunBase>(&self, fun_merge: &FunMerge, fun_base: &FunBase) -> Self
500 where
501 FunMerge: for<'a> Fn(u64, &'a M) -> M,
502 FunBase: for<'a> Fn(&'a B) -> B,
503 {
504 let values = self
505 .values
506 .iter()
507 .enumerate()
508 .map(|(index, value)| match value {
509 Value::Leaf(base) => Value::Leaf(fun_base(base)),
510 Value::Node(merge) => Value::Node(fun_merge(btree::depth_at(index), merge)),
511 })
512 .collect();
513
514 Self { values }
515 }
516
517 pub(super) fn values_by_depth(&self) -> BTreeMap<u64, Value<Vec<&B>, Vec<&M>>> {
519 let mut values: BTreeMap<u64, Value<Vec<&B>, Vec<&M>>> = BTreeMap::default();
520
521 for (index, value) in self.values.iter().enumerate() {
522 values
523 .entry(btree::depth_at(index))
524 .and_modify(|for_depth: &mut Value<_, _>| match for_depth {
525 Value::Leaf(vec) => {
526 let Value::Leaf(leaf) = value else {
527 panic!("invalid")
528 };
529 vec.push(leaf);
530 }
531 Value::Node(vec) => {
532 let Value::Node(node) = value else {
533 panic!("invalid")
534 };
535 vec.push(node);
536 }
537 })
538 .or_insert({
539 match value {
540 Value::Leaf(leaf) => {
541 let mut vec = Vec::with_capacity(255);
542 vec.push(leaf);
543 Value::Leaf(vec)
544 }
545 Value::Node(node) => {
546 let mut vec = Vec::with_capacity(255);
547 vec.push(node);
548 Value::Node(vec)
549 }
550 }
551 });
552 }
553
554 values
555 }
556
557 fn map<FunMerge, FunBase>(&self, fun_merge: FunMerge, fun_base: FunBase) -> Self
558 where
559 FunMerge: Fn(&M) -> M,
560 FunBase: Fn(&B) -> B,
561 {
562 self.map_depth(&|_, m| fun_merge(m), &fun_base)
563 }
564
565 fn fold_depth_until_prime<Accum, Final, FunMerge, FunBase>(
567 &self,
568 fun_merge: &FunMerge,
569 fun_base: &FunBase,
570 init: Accum,
571 ) -> ControlFlow<Final, Accum>
572 where
573 FunMerge: Fn(u64, Accum, &M) -> ControlFlow<Final, Accum>,
574 FunBase: Fn(Accum, &B) -> ControlFlow<Final, Accum>,
575 {
576 let mut accum = init;
577
578 for (index, value) in self.values.iter().enumerate() {
579 accum = match value {
580 Value::Leaf(base) => fun_base(accum, base)?,
581 Value::Node(merge) => fun_merge(btree::depth_at(index), accum, merge)?,
582 };
583 }
584
585 Continue(accum)
586 }
587
588 fn fold_depth_until_prime_err<Accum, FunMerge, FunBase>(
590 &self,
591 fun_merge: &FunMerge,
592 fun_base: &FunBase,
593 init: Accum,
594 ) -> Result<Accum, String>
595 where
596 FunMerge: Fn(u64, Accum, &M) -> Result<Accum, String>,
597 FunBase: Fn(Accum, &B) -> Result<Accum, String>,
598 {
599 let mut accum = init;
600
601 for (index, value) in self.values.iter().enumerate() {
602 accum = match value {
603 Value::Leaf(base) => fun_base(accum, base)?,
604 Value::Node(merge) => fun_merge(btree::depth_at(index), accum, merge)?,
605 };
606 }
607
608 Ok(accum)
609 }
610
611 fn fold_depth_until<Accum, Final, FunFinish, FunMerge, FunBase>(
612 &self,
613 fun_merge: FunMerge,
614 fun_base: FunBase,
615 fun_finish: FunFinish,
616 init: Accum,
617 ) -> Final
618 where
619 FunMerge: Fn(u64, Accum, &M) -> ControlFlow<Final, Accum>,
620 FunBase: Fn(Accum, &B) -> ControlFlow<Final, Accum>,
621 FunFinish: Fn(Accum) -> Final,
622 {
623 match self.fold_depth_until_prime(&fun_merge, &fun_base, init) {
624 Continue(accum) => fun_finish(accum),
625 Break(value) => value,
626 }
627 }
628
629 fn fold_depth<Accum, FunMerge, FunBase>(
630 &self,
631 fun_merge: FunMerge,
632 fun_base: FunBase,
633 init: Accum,
634 ) -> Accum
635 where
636 FunMerge: Fn(u64, Accum, &M) -> Accum,
637 FunBase: Fn(Accum, &B) -> Accum,
638 {
639 self.fold_depth_until(
640 |i, accum, a| Continue(fun_merge(i, accum, a)),
641 |accum, d| Continue(fun_base(accum, d)),
642 |x| x,
643 init,
644 )
645 }
646
647 fn fold<Accum, FunMerge, FunBase>(
648 &self,
649 fun_merge: FunMerge,
650 fun_base: FunBase,
651 init: Accum,
652 ) -> Accum
653 where
654 FunMerge: Fn(Accum, &M) -> Accum,
655 FunBase: Fn(Accum, &B) -> Accum,
656 {
657 self.fold_depth(|_, accum, a| fun_merge(accum, a), fun_base, init)
658 }
659
660 fn fold_until<Accum, Final, FunFinish, FunMerge, FunBase>(
661 &self,
662 fun_merge: FunMerge,
663 fun_base: FunBase,
664 fun_finish: FunFinish,
665 init: Accum,
666 ) -> Final
667 where
668 FunMerge: Fn(Accum, &M) -> ControlFlow<Final, Accum>,
669 FunBase: Fn(Accum, &B) -> ControlFlow<Final, Accum>,
670 FunFinish: Fn(Accum) -> Final,
671 {
672 self.fold_depth_until(
673 |_, accum, a| fun_merge(accum, a),
674 fun_base,
675 fun_finish,
676 init,
677 )
678 }
679
680 fn update_split<Data, FunJobs, FunWeight, FunMerge, FunBase, Weight, R>(
681 &self,
682 fun_merge: &FunMerge,
683 fun_base: &FunBase,
684 weight_merge: &FunWeight,
685 jobs: &[Data],
686 update_level: u64,
687 jobs_split: &FunJobs,
688 ) -> Result<(Self, Option<R>), ()>
689 where
690 FunMerge: Fn(&[Data], u64, &M) -> Result<(M, Option<R>), ()>,
691 FunBase: Fn(&[Data], B) -> Result<B, ()>,
692 FunWeight: Fn(&M) -> (Weight, Weight),
693 FunJobs: Fn((Weight, Weight), &[Data]) -> (&[Data], &[Data]),
694 Data: Clone,
695 M: Clone,
696 B: Clone,
697 {
698 let mut values = Vec::with_capacity(self.values.len());
699 let mut scan_result = None;
700
701 let mut jobs_fifo = VecDeque::with_capacity(self.values.len());
705
706 jobs_fifo.push_back(jobs);
707
708 for (index, value) in self.values.iter().enumerate() {
709 let depth = btree::depth_at(index);
710
711 if depth > update_level {
712 values.push(value.clone());
713 continue;
714 }
715
716 let jobs_for_this = jobs_fifo.pop_front().unwrap();
717
718 let value = match value {
719 Value::Leaf(base) => Value::Leaf(fun_base(jobs_for_this, base.clone())?),
720 Value::Node(merge) => {
721 let (jobs_left, jobs_right) = jobs_split(weight_merge(merge), jobs_for_this);
722 jobs_fifo.push_back(jobs_left);
723 jobs_fifo.push_back(jobs_right);
724
725 let (value, result) = fun_merge(jobs_for_this, depth, merge)?;
726
727 if scan_result.is_none() {
728 scan_result = Some(result);
729 }
730
731 Value::Node(value)
732 }
733 };
734
735 values.push(value);
736 }
737
738 assert_eq!(jobs_fifo.capacity(), self.values.len());
739
740 Ok(((Self { values }), scan_result.flatten()))
741 }
742
743 fn update_accumulate<Data, FunMerge, FunBase>(
744 &self,
745 fun_merge: &FunMerge,
746 fun_base: &FunBase,
747 ) -> (Self, Data)
748 where
749 FunMerge: Fn((Data, Data), &M) -> (M, Data),
750 FunBase: Fn(&B) -> (B, Data),
751 Data: Clone,
752 {
753 let mut datas = vec![None; self.values.len()];
754
755 let childs_of = |data: &mut [Option<Data>], index: usize| -> Option<(Data, Data)> {
756 let left = data
757 .get_mut(btree::child_left(index))
758 .and_then(Option::take)?;
759 let right = data
760 .get_mut(btree::child_right(index))
761 .and_then(Option::take)?;
762
763 Some((left, right))
764 };
765
766 let mut values: Vec<_> = self
767 .values
768 .iter()
769 .enumerate()
770 .rev()
771 .map(|(index, value)| match value {
772 Value::Leaf(base) => {
773 let (new_base, count_list) = fun_base(base);
774 datas[index] = Some(count_list);
775 Value::Leaf(new_base)
776 }
777 Value::Node(merge) => {
778 let (left, right) = childs_of(datas.as_mut(), index).unwrap();
779 let (value, count_list) = fun_merge((left, right), merge);
780 datas[index] = Some(count_list);
781 Value::Node(value)
782 }
783 })
784 .collect();
785
786 values.reverse();
787
788 (Self { values }, datas[0].take().unwrap())
789 }
790
791 fn iter(&self) -> impl Iterator<Item = (u64, &Value<B, M>)> {
792 self.values
793 .iter()
794 .enumerate()
795 .map(|(index, value)| (btree::depth_at(index), value))
796 }
797}
798
799impl Tree<base::Base<Arc<TransactionWithWitness>>, merge::Merge<Arc<LedgerProofWithSokMessage>>> {
800 pub fn view(&self) -> impl Iterator<Item = JobValueWithIndex<'_>> {
801 self.values
802 .iter()
803 .enumerate()
804 .map(|(index, value)| JobValueWithIndex {
805 index,
806 job: match value {
807 Value::Leaf(base) => Value::Leaf(&base.job),
808 Value::Node(merge) => Value::Node(&merge.job),
809 },
810 })
811 }
812}
813
814pub type JobValue<'a> = super::parallel_scan::Value<
815 &'a base::Job<Arc<TransactionWithWitness>>,
816 &'a merge::Job<Arc<LedgerProofWithSokMessage>>,
817>;
818
819pub struct JobValueWithIndex<'a> {
820 index: usize,
821 pub job: JobValue<'a>,
822}
823
824impl JobValueWithIndex<'_> {
825 pub fn index(&self) -> usize {
826 self.index
827 }
828
829 pub fn depth(&self) -> usize {
830 btree::depth_at(self.index) as usize
831 }
832
833 pub fn parent(&self) -> Option<usize> {
834 btree::parent(self.index)
835 }
836
837 pub fn child_left(&self) -> usize {
838 btree::child_left(self.index)
839 }
840
841 pub fn child_right(&self) -> usize {
842 btree::child_right(self.index)
843 }
844
845 pub fn bundle_sibling(&self) -> Option<(usize, bool)> {
847 let parent = self.parent()?;
848 let try_sibling =
849 |parent, index| btree::parent(index).filter(|p| *p == parent).map(|_| index);
850 try_sibling(parent, self.index - 1)
851 .map(|i| (i, true))
852 .or_else(|| try_sibling(parent, self.index + 1).map(|i| (i, false)))
853 }
854}
855
856#[derive(Clone, Debug)]
857pub struct ParallelScan<BaseJob, MergeJob> {
858 pub trees: Vec<Tree<base::Base<BaseJob>, merge::Merge<MergeJob>>>,
859 pub(super) acc: Option<(MergeJob, Vec<BaseJob>)>,
861 pub curr_job_seq_no: SequenceNumber,
863 pub(super) max_base_jobs: u64,
865 pub(super) delay: u64,
866}
867
868pub(super) enum ResetKind {
869 Base,
870 Merge,
871 Both,
872}
873
874impl<BaseJob, MergeJob> Tree<base::Base<BaseJob>, merge::Merge<MergeJob>>
875where
876 BaseJob: Clone + Debug + 'static,
877 MergeJob: Clone + Debug + 'static,
878{
879 fn update(
880 &self,
881 completed_jobs: &[Job<BaseJob, MergeJob>],
882 update_level: u64,
883 sequence_no: SequenceNumber,
884 lens: WeightLens,
885 ) -> Result<(Self, Option<MergeJob>), ()> {
886 let add_merges = |jobs: &[Job<BaseJob, MergeJob>],
887 current_level: u64,
888 merge_job: &merge::Merge<MergeJob>|
889 -> Result<(merge::Merge<MergeJob>, Option<MergeJob>), ()> {
890 use merge::{
891 Job::{Empty, Full, Part},
892 Record,
893 };
894 use Job::{Base, Merge};
895
896 let weight = &merge_job.weight;
897 let m = &merge_job.job;
898
899 let (w1, w2) = &weight;
900 let (left, right) = (*lens.get(w1), *lens.get(w2));
901
902 if update_level > 0 && current_level == update_level - 1 {
904 let (new_weight, new_m) = match (jobs, m) {
906 ([], m) => (weight.clone(), m.clone()),
907 ([Merge(a), Merge(b)], Empty) => {
908 let w1 = lens.set(w1, left - 1);
909 let w2 = lens.set(w2, right - 1);
910
911 (
912 (w1, w2),
913 Full(Record {
914 left: a.clone(),
915 right: b.clone(),
916 seq_no: sequence_no.clone(),
917 state: JobStatus::Todo,
918 }),
919 )
920 }
921 ([Merge(a)], Empty) => {
922 let w1 = lens.set(w1, left - 1);
923 let w2 = lens.set(w2, right);
924
925 ((w1, w2), Part(a.clone()))
926 }
927 ([Merge(b)], Part(a)) => {
928 let w1 = lens.set(w1, left);
929 let w2 = lens.set(w2, right - 1);
930
931 (
932 (w1, w2),
933 Full(Record {
934 left: a.clone(),
935 right: b.clone(),
936 seq_no: sequence_no.clone(),
937 state: JobStatus::Todo,
938 }),
939 )
940 }
941 ([Base(_)], Empty) => {
942 let weight = if left == 0 {
945 let w1 = lens.set(w1, left);
946 let w2 = lens.set(w2, right - 1);
947 (w1, w2)
948 } else {
949 let w1 = lens.set(w1, left - 1);
950 let w2 = lens.set(w2, right);
951 (w1, w2)
952 };
953
954 (weight, Empty)
955 }
956 ([Base(_), Base(_)], Empty) => {
957 let w1 = lens.set(w1, left - 1);
958 let w2 = lens.set(w2, right - 1);
959
960 ((w1, w2), Empty)
961 }
962 (xs, m) => {
963 panic!(
964 "Got {} jobs when updating level {} and when one of the merge \
965 nodes at level {} is {:?}",
966 xs.len(),
967 update_level,
968 current_level,
969 m
970 );
971 }
972 };
973
974 Ok((
975 merge::Merge {
976 weight: new_weight,
977 job: new_m,
978 },
979 None::<MergeJob>,
980 ))
981 } else if current_level == update_level {
982 match (jobs, m) {
985 (
986 [Merge(a)],
987 Full(
988 x @ Record {
989 state: JobStatus::Todo,
990 ..
991 },
992 ),
993 ) => {
994 let mut x = x.clone();
995 x.state = JobStatus::Done;
996 let new_job = Full(x);
997
998 let (scan_result, weight) = if current_level == 0 {
999 let w1 = lens.set(w1, 0);
1000 let w2 = lens.set(w2, 0);
1001
1002 (Some(a.clone()), (w1, w2))
1003 } else {
1004 (None, weight.clone())
1005 };
1006
1007 Ok((
1008 merge::Merge {
1009 weight,
1010 job: new_job,
1011 },
1012 scan_result,
1013 ))
1014 }
1015 ([], m) => Ok((
1016 merge::Merge {
1017 weight: weight.clone(),
1018 job: m.clone(),
1019 },
1020 None,
1021 )),
1022 (xs, m) => {
1024 panic!(
1025 "Got {} jobs when updating level {} and when one of the merge \
1026 nodes at level {} is {:?}",
1027 xs.len(),
1028 update_level,
1029 current_level,
1030 m
1031 );
1032 }
1033 }
1034 } else if update_level > 0 && (current_level < update_level - 1) {
1035 match jobs {
1037 [] => Ok((
1038 merge::Merge {
1039 weight: weight.clone(),
1040 job: m.clone(),
1041 },
1042 None,
1043 )),
1044 _ => {
1045 let jobs_length = jobs.len() as u64;
1046 let jobs_sent_left = jobs_length.min(left);
1047 let jobs_sent_right = (jobs_length - jobs_sent_left).min(right);
1048
1049 let w1 = lens.set(w1, left - jobs_sent_left);
1050 let w2 = lens.set(w2, right - jobs_sent_right);
1051 let weight = (w1, w2);
1052
1053 Ok((
1054 merge::Merge {
1055 weight,
1056 job: m.clone(),
1057 },
1058 None,
1059 ))
1060 }
1061 }
1062 } else {
1063 Ok((
1064 merge::Merge {
1065 weight: weight.clone(),
1066 job: m.clone(),
1067 },
1068 None,
1069 ))
1070 }
1071 };
1072
1073 let add_bases = |jobs: &[Job<BaseJob, MergeJob>], base: base::Base<BaseJob>| {
1074 use base::Job::{Empty, Full};
1075 use Job::{Base, Merge};
1076
1077 let w = base.weight;
1078 let d = base.job;
1079
1080 let weight = lens.get(&w);
1081
1082 match (jobs, d) {
1084 ([], d) => Ok(base::Base { weight: w, job: d }),
1085 ([Base(d)], Empty) => {
1086 let w = lens.set(&w, weight - 1);
1087
1088 Ok(base::Base {
1089 weight: w,
1090 job: Full(base::Record {
1091 job: d.clone(),
1092 seq_no: sequence_no.clone(),
1093 state: JobStatus::Todo,
1094 }),
1095 })
1096 }
1097 ([Merge(_)], Full(mut b)) => {
1098 b.state = JobStatus::Done;
1099
1100 Ok(base::Base {
1101 weight: w,
1102 job: Full(b),
1103 })
1104 }
1105 (xs, d) => {
1106 panic!(
1107 "Got {} jobs when updating level {} and when one of the base nodes \
1108 is {:?}",
1109 xs.len(),
1110 update_level,
1111 d
1112 );
1113 }
1114 }
1115 };
1116
1117 self.update_split(
1118 &add_merges,
1119 &add_bases,
1120 &|merge| merge.weight.clone(),
1121 completed_jobs,
1122 update_level,
1123 &|(w1, w2), a: &[Job<BaseJob, MergeJob>]| {
1124 let l = *lens.get(&w1) as usize;
1125 let r = *lens.get(&w2) as usize;
1126
1127 (take(a, l), take_at(a, l, r))
1130 },
1131 )
1132 }
1133
1134 fn reset_weights(&self, reset_kind: ResetKind) -> Self {
1135 let fun_base = |base: &base::Base<BaseJob>| {
1136 let set_one = |lens: WeightLens, weight: &Weight| lens.set(weight, 1);
1137 let set_zero = |lens: WeightLens, weight: &Weight| lens.set(weight, 0);
1138
1139 use base::{
1140 Job::{Empty, Full},
1141 Record,
1142 };
1143 use JobStatus::Todo;
1144
1145 let update_merge_weight = |weight: &Weight| {
1146 match &base.job {
1149 Full(Record { state: Todo, .. }) => set_one(WeightLens::Merge, weight),
1150 _ => set_zero(WeightLens::Merge, weight),
1151 }
1152 };
1153
1154 let update_base_weight = |weight: &Weight| {
1155 match &base.job {
1158 Empty => set_one(WeightLens::Base, weight),
1159 Full(_) => set_zero(WeightLens::Base, weight),
1160 }
1161 };
1162
1163 let weight = &base.weight;
1164 let (new_weight, dummy_right_for_base_nodes) = match reset_kind {
1165 ResetKind::Merge => (
1166 update_merge_weight(weight),
1167 set_zero(WeightLens::Merge, weight),
1168 ),
1169 ResetKind::Base => (
1170 update_base_weight(weight),
1171 set_zero(WeightLens::Base, weight),
1172 ),
1173 ResetKind::Both => {
1174 let w = update_base_weight(weight);
1175 (update_merge_weight(&w), Weight::zero())
1176 }
1177 };
1178
1179 let base = base::Base {
1180 weight: new_weight.clone(),
1181 job: base.job.clone(),
1182 };
1183
1184 (base, (new_weight, dummy_right_for_base_nodes))
1185 };
1186
1187 let fun_merge = |lst: ((Weight, Weight), (Weight, Weight)),
1188 merge: &merge::Merge<MergeJob>| {
1189 let ((w1, w2), (w3, w4)) = &lst;
1190
1191 let reset = |lens: WeightLens, w: &Weight, ww: &Weight| {
1192 (
1194 lens.set(w, lens.get(w1) + lens.get(w2)),
1195 lens.set(ww, lens.get(w3) + lens.get(w4)),
1196 )
1197 };
1198
1199 use merge::{Job::Full, Record};
1200 use JobStatus::Todo;
1201
1202 let ww = match reset_kind {
1203 ResetKind::Merge => {
1204 let lens = WeightLens::Merge;
1207 match (&merge.weight, &merge.job) {
1208 ((w1, w2), Full(Record { state: Todo, .. })) => {
1209 (lens.set(w1, 1), lens.set(w2, 0))
1210 }
1211 ((w1, w2), _) => reset(lens, w1, w2),
1212 }
1213 }
1214 ResetKind::Base => {
1215 let w = &merge.weight;
1218 reset(WeightLens::Base, &w.0, &w.1)
1219 }
1220 ResetKind::Both => {
1221 let w = &merge.weight;
1222 let w = reset(WeightLens::Base, &w.0, &w.1);
1223 reset(WeightLens::Merge, &w.0, &w.1)
1224 }
1225 };
1226
1227 let merge = merge::Merge {
1228 weight: ww.clone(),
1229 job: merge.job.clone(),
1230 };
1231
1232 (merge, ww)
1233 };
1234
1235 let (result, _) = self.update_accumulate(&fun_merge, &fun_base);
1236 result
1237 }
1238
1239 fn jobs_on_level(&self, depth: u64, level: u64) -> Vec<AvailableJob<BaseJob, MergeJob>> {
1240 use JobStatus::Todo;
1241
1242 self.fold_depth(
1270 |i, mut acc, a| {
1271 use merge::{Job::Full, Record};
1272
1273 if let (
1274 true,
1275 Full(Record {
1276 left,
1277 right,
1278 state: Todo,
1279 ..
1280 }),
1281 ) = (i == level, &a.job)
1282 {
1283 acc.push(AvailableJob::Merge {
1284 left: left.clone(),
1285 right: right.clone(),
1286 });
1287 };
1288 acc
1289 },
1290 |mut acc, d| {
1291 use base::{Job::Full, Record};
1292
1293 if let (
1294 true,
1295 Full(Record {
1296 job, state: Todo, ..
1297 }),
1298 ) = (level == depth, &d.job)
1299 {
1300 acc.push(AvailableJob::Base(job.clone()));
1301 }
1302 acc
1303 },
1304 Vec::with_capacity(256),
1305 )
1306 }
1307
1308 fn to_hashable_jobs(&self) -> Vec<Job<base::Base<BaseJob>, merge::Merge<MergeJob>>> {
1309 use JobStatus::Done;
1310
1311 self.fold(
1312 |mut acc, a| {
1313 match &a.job {
1314 merge::Job::Full(merge::Record { state: Done, .. }) => {}
1315 _ => {
1316 acc.push(Job::Merge(a.clone()));
1317 }
1318 }
1319 acc
1320 },
1321 |mut acc, d| {
1322 match &d.job {
1323 base::Job::Full(base::Record { state: Done, .. }) => {}
1324 _ => {
1325 acc.push(Job::Base(d.clone()));
1326 }
1327 }
1328 acc
1329 },
1330 Vec::with_capacity(256),
1331 )
1332 }
1333
1334 fn jobs_records(&self) -> Vec<Job<base::Record<BaseJob>, merge::Record<MergeJob>>> {
1335 self.fold(
1336 |mut acc, a: &merge::Merge<MergeJob>| {
1337 if let merge::Job::Full(x) = &a.job {
1338 acc.push(Job::Merge(x.clone()));
1339 }
1340 acc
1341 },
1342 |mut acc, d: &base::Base<BaseJob>| {
1343 if let base::Job::Full(j) = &d.job {
1344 acc.push(Job::Base(j.clone()));
1345 }
1346 acc
1347 },
1348 Vec::with_capacity(256),
1349 )
1350 }
1351
1352 fn base_jobs(&self) -> Vec<BaseJob> {
1353 self.fold_depth(
1354 |_, acc, _| acc,
1355 |mut acc, d| {
1356 if let base::Job::Full(base::Record { job, .. }) = &d.job {
1357 acc.push(job.clone());
1358 };
1359 acc
1360 },
1361 Vec::with_capacity(256),
1362 )
1363 }
1364
1365 fn todo_job_count(&self) -> (u64, u64) {
1367 use JobStatus::Todo;
1368
1369 self.fold_depth(
1370 |_, (b, m), j| match &j.job {
1371 merge::Job::Full(merge::Record { state: Todo, .. }) => (b, m + 1),
1372 _ => (b, m),
1373 },
1374 |(b, m), d| match &d.job {
1375 base::Job::Full(base::Record { state: Todo, .. }) => (b + 1, m),
1376 _ => (b, m),
1377 },
1378 (0, 0),
1379 )
1380 }
1381
1382 fn leaves(&self) -> Vec<base::Base<BaseJob>> {
1383 self.fold_depth(
1384 |_, acc, _| acc,
1385 |mut acc, d| {
1386 if let base::Job::Full(_) = &d.job {
1387 acc.push(d.clone());
1388 };
1389 acc
1390 },
1391 Vec::with_capacity(256),
1392 )
1393 }
1394
1395 fn required_job_count(&self) -> u64 {
1396 match &self.values[0] {
1397 Value::Node(value) => {
1398 let (w1, w2) = &value.weight;
1399 w1.merge + w2.merge
1400 }
1401 Value::Leaf(base) => base.weight.merge,
1402 }
1403 }
1404
1405 fn available_space(&self) -> u64 {
1406 match &self.values[0] {
1407 Value::Node(value) => {
1408 let (w1, w2) = &value.weight;
1409 w1.base + w2.base
1410 }
1411 Value::Leaf(base) => base.weight.base,
1412 }
1413 }
1414
1415 fn create_tree_for_level(
1416 level: i64,
1417 depth: u64,
1418 merge_job: merge::Job<MergeJob>,
1419 base_job: base::Job<BaseJob>,
1420 ) -> Self {
1421 let base_weight = if level == -1 {
1422 Weight::zero()
1423 } else {
1424 Weight { base: 1, merge: 0 }
1425 };
1426
1427 let make_base = || base::Base {
1428 weight: base_weight.clone(),
1429 job: base_job.clone(),
1430 };
1431
1432 let make_merge = |d: u64| {
1433 let weight = if level == -1 {
1434 (Weight::zero(), Weight::zero())
1435 } else {
1436 let x = 2u64.pow(level as u32) / 2u64.pow(d as u32 + 1);
1437 (Weight { base: x, merge: 0 }, Weight { base: x, merge: 0 })
1438 };
1439 merge::Merge {
1440 weight,
1441 job: merge_job.clone(),
1442 }
1443 };
1444
1445 let nnodes = 2u64.pow((depth + 1) as u32) - 1;
1446
1447 let values: Vec<_> = (0..nnodes)
1448 .map(|index| {
1449 let node_depth = btree::depth_at(index as usize);
1450
1451 if node_depth == depth {
1452 Value::Leaf(make_base())
1453 } else {
1454 Value::Node(make_merge(node_depth))
1455 }
1456 })
1457 .collect();
1458
1459 Self { values }
1466 }
1467
1468 fn create_tree(depth: u64) -> Self {
1469 let level: i64 = depth.try_into().unwrap();
1470 Self::create_tree_for_level(level, depth, merge::Job::Empty, base::Job::Empty)
1471 }
1472}
1473
1474impl<BaseJob, MergeJob> ParallelScan<BaseJob, MergeJob>
1475where
1476 BaseJob: Debug + Clone + 'static,
1477 MergeJob: Debug + Clone + 'static,
1478{
1479 fn with_leaner_trees(&self) -> Self {
1480 use JobStatus::Done;
1481
1482 let trees = self
1483 .trees
1484 .iter()
1485 .map(|tree| {
1486 tree.map(
1487 |merge_node| match &merge_node.job {
1488 merge::Job::Full(merge::Record { state: Done, .. }) => merge::Merge {
1489 weight: merge_node.weight.clone(),
1490 job: merge::Job::Empty,
1491 },
1492 _ => merge_node.clone(),
1493 },
1494 |b| b.clone(),
1495 )
1496 })
1497 .collect();
1498
1499 Self {
1500 trees,
1501 acc: self.acc.clone(),
1502 curr_job_seq_no: self.curr_job_seq_no.clone(),
1503 max_base_jobs: self.max_base_jobs,
1504 delay: self.delay,
1505 }
1506 }
1507
1508 pub fn empty(max_base_jobs: u64, delay: u64) -> Self {
1509 let depth = ceil_log2(max_base_jobs);
1510 let first_tree = Tree::create_tree(depth);
1513
1514 let mut trees = Vec::with_capacity(32);
1515 trees.push(first_tree);
1516
1517 Self {
1518 trees,
1519 acc: None,
1520 curr_job_seq_no: SequenceNumber(0),
1521 max_base_jobs,
1522 delay,
1523 }
1524 }
1525
1526 fn map<F1, F2>(&self, f1: F1, f2: F2) -> Self
1527 where
1528 F1: Fn(&MergeJob) -> MergeJob,
1529 F2: Fn(&BaseJob) -> BaseJob,
1530 {
1531 let trees = self
1532 .trees
1533 .iter()
1534 .map(|tree| tree.map_depth(&|_, m| m.map(&f1), &|a| a.map(&f2)))
1535 .collect();
1536
1537 let acc = self
1538 .acc
1539 .as_ref()
1540 .map(|(m, bs)| (f1(m), bs.iter().map(&f2).collect()));
1541
1542 Self {
1543 trees,
1544 acc,
1545 curr_job_seq_no: self.curr_job_seq_no.clone(),
1546 max_base_jobs: self.max_base_jobs,
1547 delay: self.delay,
1548 }
1549 }
1550
1551 pub fn hash<FunMerge, FunBase>(
1552 &self,
1553 fun_merge: FunMerge,
1554 fun_base: FunBase,
1555 ) -> GenericArray<u8, U32>
1556 where
1557 FunMerge: Fn(&mut Vec<u8>, &MergeJob),
1558 FunBase: Fn(&mut Vec<u8>, &BaseJob),
1559 {
1560 const BUFFER_CAPACITY: usize = 128 * 1024;
1561
1562 let Self {
1563 trees,
1564 acc,
1565 curr_job_seq_no,
1566 max_base_jobs,
1567 delay,
1568 } = self.with_leaner_trees();
1569
1570 let mut sha: Sha256 = Sha256::new();
1571 let mut buffer = Vec::with_capacity(BUFFER_CAPACITY);
1572 let buffer = &mut buffer;
1573
1574 let add_weight =
1575 |buffer: &mut Vec<u8>, w: &Weight| write!(buffer, "{}{}", w.base, w.merge).unwrap();
1576
1577 let add_two_weights = |buffer: &mut Vec<u8>, (w1, w2): &(Weight, Weight)| {
1578 add_weight(buffer, w1);
1579 add_weight(buffer, w2);
1580 };
1581
1582 for job in trees.iter().flat_map(Tree::to_hashable_jobs) {
1583 buffer.clear();
1584
1585 match &job {
1586 Job::Base(base) => match &base.job {
1587 base::Job::Empty => {
1588 add_weight(buffer, &base.weight);
1589 write!(buffer, "Empty").unwrap();
1590 }
1591 base::Job::Full(base::Record { job, seq_no, state }) => {
1592 add_weight(buffer, &base.weight);
1593 write!(buffer, "Full{}{}", seq_no.0, state.as_str()).unwrap();
1594
1595 fun_base(buffer, job);
1596 }
1597 },
1598 Job::Merge(merge) => match &merge.job {
1599 merge::Job::Empty => {
1600 add_two_weights(buffer, &merge.weight);
1601 write!(buffer, "Empty").unwrap();
1602 }
1603 merge::Job::Part(job) => {
1604 add_two_weights(buffer, &merge.weight);
1605 write!(buffer, "Part").unwrap();
1606
1607 fun_merge(buffer, job);
1608 }
1609 merge::Job::Full(merge::Record {
1610 left,
1611 right,
1612 seq_no,
1613 state,
1614 }) => {
1615 add_two_weights(buffer, &merge.weight);
1616 write!(buffer, "Full{}{}", seq_no.0, state.as_str()).unwrap();
1617
1618 fun_merge(buffer, left);
1619 fun_merge(buffer, right);
1620 }
1621 },
1622 }
1623
1624 sha.update(buffer.as_slice());
1625
1626 assert_eq!(buffer.capacity(), BUFFER_CAPACITY);
1629 }
1630
1631 match &acc {
1632 Some((a, d_lst)) => {
1633 buffer.clear();
1634
1635 fun_merge(buffer, a);
1636 for j in d_lst {
1637 fun_base(buffer, j);
1638 }
1639
1640 sha.update(&buffer);
1641 }
1642 None => {
1643 sha.update("None");
1644 }
1645 };
1646
1647 buffer.clear();
1648 write!(buffer, "{}{}{}", curr_job_seq_no.0, max_base_jobs, delay).unwrap();
1649 sha.update(&buffer);
1650
1651 sha.finalize()
1652 }
1653
1654 pub fn fold_chronological_until<Accum, Final, FunMerge, FunBase, FunFinish>(
1655 &self,
1656 init: Accum,
1657 fun_merge: FunMerge,
1658 fun_base: FunBase,
1659 fun_finish: FunFinish,
1660 ) -> Final
1661 where
1662 FunMerge: Fn(Accum, &merge::Merge<MergeJob>) -> ControlFlow<Final, Accum>,
1663 FunBase: Fn(Accum, &base::Base<BaseJob>) -> ControlFlow<Final, Accum>,
1664 FunFinish: Fn(Accum) -> Final,
1665 {
1666 let mut accum = init;
1667
1668 for tree in self.trees.iter().rev() {
1669 match tree.fold_depth_until_prime(&|_, acc, m| fun_merge(acc, m), &fun_base, accum) {
1670 Continue(acc) => accum = acc,
1671 Break(v) => return v,
1672 }
1673 }
1674
1675 fun_finish(accum)
1676 }
1677
1678 pub fn fold_chronological_until_err<Accum, FunMerge, FunBase, FunFinish>(
1679 &self,
1680 init: Accum,
1681 fun_merge: FunMerge,
1682 fun_base: FunBase,
1683 fun_finish: FunFinish,
1684 ) -> Result<Accum, String>
1685 where
1686 FunMerge: Fn(Accum, &merge::Merge<MergeJob>) -> Result<Accum, String>,
1687 FunBase: Fn(Accum, &base::Base<BaseJob>) -> Result<Accum, String>,
1688 FunFinish: Fn(Accum) -> Accum,
1689 {
1690 let mut accum = init;
1691
1692 for tree in self.trees.iter().rev() {
1693 match tree.fold_depth_until_prime_err(&|_, acc, m| fun_merge(acc, m), &fun_base, accum)
1694 {
1695 Ok(acc) => accum = acc,
1696 Err(e) => return Err(e),
1697 }
1698 }
1699
1700 Ok(fun_finish(accum))
1701 }
1702
1703 fn fold_chronological<Accum, FunMerge, FunBase>(
1704 &self,
1705 init: Accum,
1706 fun_merge: FunMerge,
1707 fun_base: FunBase,
1708 ) -> Accum
1709 where
1710 FunMerge: Fn(Accum, &merge::Merge<MergeJob>) -> Accum,
1711 FunBase: Fn(Accum, &base::Base<BaseJob>) -> Accum,
1712 {
1713 self.fold_chronological_until(
1714 init,
1715 |acc, a| Continue(fun_merge(acc, a)),
1716 |acc, d| Continue(fun_base(acc, d)),
1717 |v| v,
1718 )
1719 }
1720
1721 fn max_trees(&self) -> u64 {
1722 ((ceil_log2(self.max_base_jobs) + 1) * (self.delay + 1)) + 1
1723 }
1724
1725 fn work_for_tree(&self, data_tree: WorkForTree) -> Vec<AvailableJob<BaseJob, MergeJob>> {
1726 let delay = self.delay + 1;
1727
1728 let trees = match data_tree {
1730 WorkForTree::Current => &self.trees[1..],
1731 WorkForTree::Next => &self.trees,
1732 };
1733
1734 work(trees, delay, self.max_base_jobs)
1737 }
1738
1739 fn all_work(&self) -> Vec<Vec<AvailableJob<BaseJob, MergeJob>>> {
1741 let depth = ceil_log2(self.max_base_jobs);
1742 let set1 = self.work_for_tree(WorkForTree::Current);
1744 let mut this = self.clone();
1755 this.trees.reserve(self.delay as usize + 1);
1756
1757 let mut other_set = Vec::with_capacity(256);
1760 other_set.push(set1);
1761
1762 for _ in 0..self.delay + 1 {
1763 this.trees.insert(0, Tree::create_tree(depth));
1765 let work = this.work_for_tree(WorkForTree::Current);
1766
1767 if !work.is_empty() {
1770 other_set.push(work);
1771 }
1772 }
1773
1774 other_set
1775 }
1776
1777 fn work_for_next_update(&self, data_count: u64) -> Vec<Vec<AvailableJob<BaseJob, MergeJob>>> {
1804 let delay = self.delay + 1;
1805 let current_tree_space = self.trees[0].available_space() as usize;
1806
1807 let mut set1 = work(&self.trees[1..], delay, self.max_base_jobs);
1808 let count = data_count.min(self.max_base_jobs) as usize;
1809
1810 if current_tree_space < count {
1811 let mut set2 = work(&self.trees, delay, self.max_base_jobs);
1812 set2.truncate((count - current_tree_space) * 2);
1813
1814 [set1, set2].into_iter().filter(|v| !v.is_empty()).collect()
1815 } else {
1816 set1.truncate(2 * count);
1817
1818 if set1.is_empty() {
1819 vec![]
1820 } else {
1821 vec![set1]
1822 }
1823 }
1824 }
1825
1826 fn free_space_on_current_tree(&self) -> u64 {
1827 self.trees[0].available_space()
1828 }
1829
1830 fn add_merge_jobs(
1831 &mut self,
1832 completed_jobs: &[MergeJob],
1833 ) -> Result<Option<(MergeJob, Vec<BaseJob>)>, ()> {
1834 fn take<T>(slice: &[T], n: usize) -> &[T] {
1835 slice.get(..n).unwrap_or(slice)
1836 }
1837
1838 fn drop<T>(slice: &[T], n: usize) -> &[T] {
1839 slice.get(n..).unwrap_or(&[])
1840 }
1841
1842 if completed_jobs.is_empty() {
1843 return Ok(None);
1844 }
1845
1846 let delay = self.delay + 1;
1847 let udelay = delay as usize;
1848 let depth = ceil_log2(self.max_base_jobs);
1849
1850 let completed_jobs_len = completed_jobs.len();
1851 let merge_jobs: Vec<_> = completed_jobs.iter().cloned().map(Job::Merge).collect();
1852 let jobs_required = self.work_for_tree(WorkForTree::Current);
1853
1854 assert!(
1855 merge_jobs.len() <= jobs_required.len(),
1856 "More work than required"
1857 );
1858
1859 let curr_tree = &self.trees[0];
1860 let to_be_updated_trees = &self.trees[1..];
1861
1862 let mut stats = BTreeMap::<u64, (u64, usize)>::new();
1864
1865 let (mut updated_trees, result_opt) = {
1866 let mut jobs = merge_jobs.as_slice();
1867
1868 let mut updated_trees = Vec::with_capacity(to_be_updated_trees.len());
1869 let mut scan_result = None;
1870
1871 for (i, tree) in to_be_updated_trees.iter().enumerate() {
1872 if (i % udelay == udelay - 1) && !jobs.is_empty() {
1874 let nrequired = tree.required_job_count() as usize;
1875 let completed_jobs = take(jobs, nrequired);
1876 let i = i as u64;
1877
1878 let level = depth - (i / delay);
1879 let old = stats.insert(i, (level, completed_jobs.len()));
1880 assert!(old.is_none());
1881
1882 let (tree, result) = tree.update(
1883 completed_jobs,
1884 depth - (i / delay),
1885 self.curr_job_seq_no.clone(),
1886 WeightLens::Merge,
1887 )?;
1888
1889 updated_trees.push(tree);
1890 scan_result = result;
1891 jobs = drop(jobs, nrequired);
1892 } else {
1893 updated_trees.push(tree.clone());
1894 }
1895 }
1896
1897 (updated_trees, scan_result)
1898 };
1899
1900 for (index, (level, njobs)) in stats.iter().rev() {
1901 let index = self.trees.len() - *index as usize - 2;
1902
1903 if result_opt.is_some() && index == 0 {
1904 println!(
1905 "- tree[{:>02}] level={} {:>3} completed jobs, a proof is emitted, tree is removed",
1906 index,
1907 level,
1908 njobs
1909 );
1910 } else {
1911 println!(
1912 "- tree[{:>02}] level={} {:>3} completed jobs (DONE)",
1913 index, level, njobs
1914 );
1915 println!(
1916 " level={} {:>3} new merge jobs (TODO)",
1917 level - 1,
1918 njobs / 2,
1919 );
1920 }
1921 }
1922
1923 let (mut updated_trees, result_opt) = {
1924 let (updated_trees, result_opt) = match result_opt {
1925 Some(scan_result) if !updated_trees.is_empty() => {
1926 let last = updated_trees.pop().unwrap();
1927 let tree_data = last.base_jobs();
1928 (updated_trees, Some((scan_result, tree_data)))
1929 }
1930 _ => (updated_trees, None),
1931 };
1932
1933 if result_opt.is_some()
1935 || (updated_trees.len() + 1) < self.max_trees() as usize
1936 && (completed_jobs_len == jobs_required.len())
1937 {
1938 let updated_trees = updated_trees
1939 .into_iter()
1940 .map(|tree| tree.reset_weights(ResetKind::Merge))
1941 .collect();
1942 (updated_trees, result_opt)
1943 } else {
1944 (updated_trees, result_opt)
1945 }
1946 };
1947
1948 updated_trees.insert(0, curr_tree.clone());
1949
1950 self.trees = updated_trees;
1951
1952 Ok(result_opt)
1953 }
1954
1955 fn add_data(
1956 &mut self,
1957 data: Vec<BaseJob>,
1958 base_kind: impl Fn(&BaseJob) -> usize,
1959 ) -> Result<(), ()> {
1960 if data.is_empty() {
1961 return Ok(());
1962 }
1963
1964 let data_len = data.len();
1965 let depth = ceil_log2(self.max_base_jobs);
1966 let tree = &self.trees[0];
1967
1968 let base_jobs: Vec<_> = data.into_iter().map(Job::Base).collect();
1969 let available_space = tree.available_space() as usize;
1970
1971 assert!(
1972 data_len <= available_space,
1973 "Data count ({}) exceeded available space ({})",
1974 data_len,
1975 available_space
1976 );
1977
1978 let (tree, _) = tree
1986 .update(
1987 &base_jobs,
1988 depth,
1989 self.curr_job_seq_no.clone(),
1990 WeightLens::Base,
1991 )
1992 .expect("Error while adding a base job to the tree");
1993
1994 let bases = tree.base_jobs();
1995 let nbase = bases.len();
1996
1997 let updated_trees = if data_len == available_space {
1998 let new_tree = Tree::create_tree(depth);
1999 let tree = tree.reset_weights(ResetKind::Both);
2000 vec![new_tree, tree]
2001 } else {
2002 let tree = tree.reset_weights(ResetKind::Merge);
2003 vec![tree]
2004 };
2005
2006 println!(
2007 "- tree[{:>02}] level=7 {:>3} new base jobs (TODO), total_nbase_jobs={:?}",
2008 self.trees.len() - 1,
2009 data_len,
2010 nbase,
2011 );
2012
2013 let max_base_jobs = self.max_base_jobs as usize;
2014
2015 let mut folded = bases.iter().fold(
2016 Vec::<(usize, usize)>::with_capacity(max_base_jobs),
2017 |mut accum, b| {
2018 let kind = base_kind(b);
2019 match accum.last_mut() {
2020 Some(last) if last.0 == kind => last.1 += 1,
2021 _ => accum.push((kind, 1)),
2022 }
2023 accum
2024 },
2025 );
2026
2027 let total_folded: usize = folded.iter().map(|(_, n)| *n).sum();
2028
2029 if total_folded != max_base_jobs {
2030 folded.push((10, max_base_jobs - total_folded));
2031 }
2032
2033 let to_s = |n: usize, s: &str| {
2034 if n == 1 {
2035 s.to_string()
2036 } else {
2037 format!("{n} {s}")
2038 }
2039 };
2040
2041 let s = folded
2042 .iter()
2043 .map(|(kind, n)| match kind {
2044 0 => to_s(*n, "CMD"),
2045 1 => to_s(*n, "FT"),
2046 2 => to_s(*n, "CB"),
2047 10 => to_s(*n, "EMPTY"),
2048 _ => panic!(),
2049 })
2050 .join("|");
2051
2052 println!(" level=7 has the following jobs (in this order):");
2053 println!(" {s}");
2054
2055 let mut old = std::mem::replace(&mut self.trees, updated_trees);
2070 old.remove(0);
2071 self.trees.append(&mut old);
2072
2073 Ok(())
2076 }
2077
2078 fn reset_seq_no(&mut self) {
2079 let last = self.trees.last().unwrap();
2080 let oldest_seq_no = match last.leaves().first() {
2081 Some(base::Base {
2082 job: base::Job::Full(base::Record { seq_no, .. }),
2083 ..
2084 }) => seq_no.clone(),
2085 _ => SequenceNumber::zero(),
2086 };
2087
2088 let new_seq = |seq: &SequenceNumber| (seq - &oldest_seq_no).incr();
2089
2090 let fun_merge = |m: &merge::Merge<MergeJob>| match &m.job {
2091 merge::Job::Full(merge::Record { seq_no, .. }) => m.with_seq_no(new_seq(seq_no)),
2092 _ => m.clone(),
2093 };
2094
2095 let fun_base = |m: &base::Base<BaseJob>| match &m.job {
2096 base::Job::Full(base::Record { seq_no, .. }) => m.with_seq_no(new_seq(seq_no)),
2097 _ => m.clone(),
2098 };
2099
2100 let mut max_seq = SequenceNumber::zero();
2101 let mut updated_trees = Vec::with_capacity(self.trees.len());
2102
2103 for tree in &self.trees {
2104 use base::{Base, Job::Full, Record};
2105
2106 let tree = tree.map(fun_merge, fun_base);
2107 updated_trees.push(tree.clone());
2108
2109 let leaves = tree.leaves();
2110
2111 let last = match leaves.last() {
2112 Some(last) => last,
2113 None => continue,
2114 };
2115
2116 if let Base {
2117 job: Full(Record { seq_no, .. }),
2118 ..
2119 } = last
2120 {
2121 max_seq = max_seq.max(seq_no.clone());
2122 };
2123 }
2124
2125 self.curr_job_seq_no = max_seq;
2126 self.trees = updated_trees;
2127 }
2128
2129 fn incr_sequence_no(&mut self) {
2130 let next_seq_no = self.curr_job_seq_no.incr();
2131
2132 if next_seq_no.is_u64_max() {
2133 self.reset_seq_no();
2134 } else {
2135 self.curr_job_seq_no = next_seq_no;
2136 }
2137 }
2138
2139 fn update_helper(
2140 &mut self,
2141 data: Vec<BaseJob>,
2142 completed_jobs: Vec<MergeJob>,
2143 base_kind: impl Fn(&BaseJob) -> usize,
2144 ) -> Result<Option<(MergeJob, Vec<BaseJob>)>, ()> {
2145 fn split<T>(slice: &[T], n: usize) -> (&[T], &[T]) {
2146 (
2147 slice.get(..n).unwrap_or(slice),
2148 slice.get(n..).unwrap_or(&[]),
2149 )
2150 }
2151
2152 let data_count = data.len() as u64;
2153
2154 assert!(
2155 data_count <= self.max_base_jobs,
2156 "Data count ({}) exceeded maximum ({})",
2157 data_count,
2158 self.max_base_jobs
2159 );
2160
2161 let required_jobs_count = self
2162 .work_for_next_update(data_count)
2163 .into_iter()
2164 .flatten()
2165 .count();
2166
2167 {
2168 let required = (required_jobs_count + 1) / 2;
2169 let got = (completed_jobs.len() + 1) / 2;
2170
2171 let max_base_jobs = self.max_base_jobs as usize;
2174 assert!(
2175 !(got < required && data.len() > max_base_jobs - required + got),
2176 "Insufficient jobs (Data count {}): Required- {} got- {}",
2177 data_count,
2178 required,
2179 got
2180 )
2181 }
2182
2183 let delay = self.delay + 1;
2184
2185 self.incr_sequence_no();
2187
2188 let latest_tree = &self.trees[0];
2189 let available_space = latest_tree.available_space();
2190
2191 let (data1, data2) = split(&data, available_space as usize);
2197
2198 let required_jobs_for_current_tree =
2207 work(&self.trees[1..], delay, self.max_base_jobs).len();
2208 let (jobs1, jobs2) = split(&completed_jobs, required_jobs_for_current_tree);
2209
2210 println!("scan_state update:");
2219
2220 let result_opt = self.add_merge_jobs(jobs1)?;
2222 self.add_data(data1.to_vec(), &base_kind)?;
2223
2224 if !jobs2.is_empty() || !data2.is_empty() {
2225 println!("scan_state update: (2nd set of jobs, new transactions didn't fit in latest/current tree)");
2226 }
2227
2228 self.add_merge_jobs(jobs2)?;
2231 self.add_data(data2.to_vec(), base_kind)?;
2232
2233 if result_opt.is_some() {
2235 self.acc.clone_from(&result_opt);
2236 };
2237
2238 assert!(
2239 self.trees.len() <= self.max_trees() as usize,
2240 "Tree list length ({}) exceeded maximum ({})",
2241 self.trees.len(),
2242 self.max_trees()
2243 );
2244
2245 Ok(result_opt)
2246 }
2247
2248 pub fn update(
2249 &mut self,
2250 data: Vec<BaseJob>,
2251 completed_jobs: Vec<MergeJob>,
2252 base_kind: impl Fn(&BaseJob) -> usize,
2253 ) -> Result<Option<(MergeJob, Vec<BaseJob>)>, ()> {
2254 self.update_helper(data, completed_jobs, base_kind)
2255 }
2256
2257 pub fn all_jobs(&self) -> Vec<Vec<AvailableJob<BaseJob, MergeJob>>> {
2258 self.all_work()
2259 }
2260
2261 pub fn jobs_for_next_update(&self) -> Vec<Vec<AvailableJob<BaseJob, MergeJob>>> {
2262 self.work_for_next_update(self.max_base_jobs)
2263 }
2264
2265 pub fn jobs_for_slots(&self, slots: u64) -> Vec<Vec<AvailableJob<BaseJob, MergeJob>>> {
2266 self.work_for_next_update(slots)
2267 }
2268
2269 pub fn free_space(&self) -> u64 {
2270 self.max_base_jobs
2271 }
2272
2273 pub fn last_emitted_value(&self) -> Option<&(MergeJob, Vec<BaseJob>)> {
2274 self.acc.as_ref()
2275 }
2276
2277 fn current_job_sequence_number(&self) -> SequenceNumber {
2278 self.curr_job_seq_no.clone()
2279 }
2280
2281 pub fn base_jobs_on_latest_tree(&self) -> impl Iterator<Item = BaseJob> {
2282 let depth = ceil_log2(self.max_base_jobs);
2283 let level = depth;
2284
2285 self.trees[0]
2286 .jobs_on_level(depth, level)
2287 .into_iter()
2288 .filter_map(|job| match job {
2289 AvailableJob::Base(base) => Some(base),
2290 AvailableJob::Merge { .. } => None,
2291 })
2292 }
2293
2294 pub fn base_jobs_on_earlier_tree(&self, index: usize) -> impl Iterator<Item = BaseJob> {
2296 let depth = ceil_log2(self.max_base_jobs);
2297 let level = depth;
2298
2299 let earlier_trees = &self.trees[1..];
2300
2301 let base_job = |job| match job {
2302 AvailableJob::Base(base) => Some(base),
2303 AvailableJob::Merge { .. } => None,
2304 };
2305
2306 match earlier_trees.get(index) {
2307 None => {
2308 Vec::new().into_iter().filter_map(base_job)
2311 }
2312 Some(tree) => tree
2313 .jobs_on_level(depth, level)
2314 .into_iter()
2315 .filter_map(base_job),
2316 }
2317 }
2318
2319 pub fn partition_if_overflowing(&self) -> SpacePartition {
2320 let cur_tree_space = self.free_space_on_current_tree();
2321
2322 let work_count = self.work_for_tree(WorkForTree::Current).len() as u64;
2325 let work_count_new_tree = self.work_for_tree(WorkForTree::Next).len() as u64;
2326
2327 SpacePartition {
2328 first: (cur_tree_space, work_count),
2329 second: {
2330 if cur_tree_space < self.max_base_jobs {
2331 let slots = self.max_base_jobs - cur_tree_space;
2332 Some((slots, work_count_new_tree.min(2 * slots)))
2333 } else {
2334 None
2335 }
2336 },
2337 }
2338 }
2339
2340 pub fn next_on_new_tree(&self) -> bool {
2341 let curr_tree_space = self.free_space_on_current_tree();
2342 curr_tree_space == self.max_base_jobs
2343 }
2344
2345 pub fn pending_data(&self) -> Vec<Vec<BaseJob>> {
2346 self.trees.iter().rev().map(Tree::base_jobs).collect()
2347 }
2348
2349 fn job_count(&self) -> (f64, f64) {
2351 use JobStatus::{Done, Todo};
2352
2353 self.fold_chronological(
2354 (0.0, 0.0),
2355 |(ntodo, ndone), merge| {
2356 use merge::{
2357 Job::{Empty, Full, Part},
2358 Record,
2359 };
2360
2361 let (todo, done) = match &merge.job {
2362 Part(_) => (0.5, 0.0),
2363 Full(Record { state: Todo, .. }) => (1.0, 0.0),
2364 Full(Record { state: Done, .. }) => (0.0, 1.0),
2365 Empty => (0.0, 0.0),
2366 };
2367 (ntodo + todo, ndone + done)
2368 },
2369 |(ntodo, ndone), base| {
2370 use base::{
2371 Job::{Empty, Full},
2372 Record,
2373 };
2374
2375 let (todo, done) = match &base.job {
2376 Empty => (0.0, 0.0),
2377 Full(Record { state: Todo, .. }) => (1.0, 0.0),
2378 Full(Record { state: Done, .. }) => (0.0, 1.0),
2379 };
2380
2381 (ntodo + todo, ndone + done)
2382 },
2383 )
2384 }
2385}
2386
2387fn work_to_do<'a, BaseJob, MergeJob, I>(
2388 trees: I,
2389 max_base_jobs: u64,
2390) -> Vec<AvailableJob<BaseJob, MergeJob>>
2391where
2392 I: Iterator<Item = &'a Tree<base::Base<BaseJob>, merge::Merge<MergeJob>>>,
2393 BaseJob: Debug + Clone + 'static,
2394 MergeJob: Debug + Clone + 'static,
2395{
2396 let depth = ceil_log2(max_base_jobs);
2397
2398 trees
2403 .enumerate()
2405 .flat_map(|(i, tree)| {
2406 let level = depth - i as u64;
2407 tree.jobs_on_level(depth, level)
2408 })
2409 .collect()
2410}
2411
2412fn work<'a, BaseJob, MergeJob, I>(
2413 trees: I,
2414 delay: u64,
2415 max_base_jobs: u64,
2416) -> Vec<AvailableJob<BaseJob, MergeJob>>
2417where
2418 I: IntoIterator<Item = &'a Tree<base::Base<BaseJob>, merge::Merge<MergeJob>>>,
2419 BaseJob: Debug + Clone + 'static,
2420 MergeJob: Debug + Clone + 'static,
2421{
2422 let depth = ceil_log2(max_base_jobs) as usize;
2423 let delay = delay as usize;
2424
2425 let work_trees = trees
2428 .into_iter()
2429 .enumerate()
2430 .filter_map(|(i, t)| {
2431 if i % delay == delay - 1 {
2432 Some(t)
2433 } else {
2434 None
2435 }
2436 })
2437 .take(depth + 1);
2438
2439 work_to_do(work_trees, max_base_jobs)
2440}
2441
2442fn take<T>(slice: &[T], n: usize) -> &[T] {
2443 slice.get(..n).unwrap_or(slice)
2444}
2445
2446fn take_at<T>(slice: &[T], skip: usize, n: usize) -> &[T] {
2447 slice.get(skip..).map(|s| take(s, n)).unwrap_or(&[])
2448}
2449
2450pub const fn ceil_log2(n: u64) -> u64 {
2451 assert!(n > 0);
2457 if n == 1 {
2458 0
2459 } else {
2460 u64::BITS as u64 - (n - 1).leading_zeros() as u64
2461 }
2462}
2463
2464fn flatten<T>(v: Vec<Vec<T>>) -> Vec<T> {
2465 v.into_iter().flatten().collect()
2466}
2467
2468fn assert_job_count<B, M>(
2470 s1: &ParallelScan<B, M>,
2471 s2: &ParallelScan<B, M>,
2472 completed_job_count: f64,
2473 base_job_count: f64,
2474 value_emitted: bool,
2475) where
2476 B: Debug + Clone + 'static,
2477 M: Debug + Clone + 'static,
2478{
2479 let (todo_before, done_before) = s1.job_count();
2483 let (todo_after, done_after) = s2.job_count();
2484
2485 let all_jobs = flatten(s2.all_jobs());
2498
2499 let all_jobs_expected_count = s2
2502 .trees
2503 .iter()
2504 .fold(Vec::with_capacity(s2.trees.len()), |mut acc, tree| {
2505 let mut records = tree.jobs_records();
2506 acc.append(&mut records);
2507 acc
2508 })
2509 .into_iter()
2510 .filter(|job| {
2511 matches!(
2512 job,
2513 Job::Base(base::Record {
2514 state: JobStatus::Todo,
2515 ..
2516 }) | Job::Merge(merge::Record {
2517 state: JobStatus::Todo,
2518 ..
2519 })
2520 )
2521 })
2522 .count();
2523
2524 assert_eq!(all_jobs.len(), all_jobs_expected_count);
2531
2532 let expected_todo_after = {
2533 let new_jobs = if value_emitted {
2534 (completed_job_count - 1.0) / 2.0
2535 } else {
2536 completed_job_count / 2.0
2537 };
2538 todo_before + base_job_count - completed_job_count + new_jobs
2539 };
2540
2541 let expected_done_after = {
2542 let jobs_from_delete_tree = if value_emitted {
2543 ((2 * s1.max_base_jobs) - 1) as f64
2544 } else {
2545 0.0
2546 };
2547 done_before + completed_job_count - jobs_from_delete_tree
2548 };
2549
2550 assert_eq!(todo_after, expected_todo_after);
2551 assert_eq!(done_after, expected_done_after);
2552}
2553
2554fn test_update<B, M>(
2555 s1: &ParallelScan<B, M>,
2556 data: Vec<B>,
2557 completed_jobs: Vec<M>,
2558) -> (Option<(M, Vec<B>)>, ParallelScan<B, M>)
2559where
2560 B: Debug + Clone + 'static,
2561 M: Debug + Clone + 'static,
2562{
2563 let mut s2 = s1.clone();
2564 let result_opt = s2
2565 .update(data.clone(), completed_jobs.clone(), |_| 0)
2566 .unwrap();
2567
2568 assert_job_count(
2569 s1,
2570 &s2,
2571 completed_jobs.len() as f64,
2572 data.len() as f64,
2573 result_opt.is_some(),
2574 );
2575 (result_opt, s2)
2576}
2577
2578fn int_to_string(u: &usize) -> String {
2579 u.to_string()
2580}
2581
2582fn sint_to_string(buffer: &mut Vec<u8>, i: &i64) {
2587 write!(buffer, "{}", i).unwrap();
2588}
2589
2590fn hash(state: &ParallelScan<i64, i64>) -> String {
2591 hex::encode(state.hash(sint_to_string, sint_to_string))
2592}
2593
2594#[cfg(test)]
2595mod tests {
2596 use std::{
2597 array,
2598 sync::{
2599 atomic::{AtomicBool, Ordering::Relaxed},
2600 mpsc::{sync_channel, Receiver, SyncSender},
2601 },
2602 };
2603
2604 use openmina_core::thread;
2605 use rand::Rng;
2606
2607 use super::*;
2608
2609 #[test]
2610 fn test_ceil_log2() {
2611 for a in 1..50u64 {
2612 let v = (a as f32).log2().ceil() as u64;
2613 let w = ceil_log2(a);
2614 assert_eq!(v, w);
2616 }
2617 }
2618
2619 #[test]
2621 fn test_sha256() {
2622 let array: [u8; 2 * 1024] = array::from_fn(|i| (i % 256) as u8);
2623 let mut slice = &array[..];
2624
2625 let mut sha256 = sha2::Sha256::new();
2626 for byte in slice.iter().copied() {
2627 sha256.update(&[byte][..]);
2628 }
2629 let first = sha256.finalize();
2630
2631 let mut sha256 = sha2::Sha256::new();
2632 let mut n = 1;
2633 while !slice.is_empty() {
2634 sha256.update(slice.get(..n).unwrap_or(slice));
2635 slice = slice.get(n..).unwrap_or(&[]);
2636
2637 n += 2;
2638 }
2639 let second = sha256.finalize();
2640
2641 assert_eq!(first, second);
2642 }
2643
2644 #[test]
2645 fn test_range_at_depth() {
2646 let ranges: Vec<_> = (0..10u64).map(btree::range_at_depth).collect();
2647
2648 assert_eq!(
2649 ranges,
2650 [
2651 0..1,
2652 1..3,
2653 3..7,
2654 7..15,
2655 15..31,
2656 31..63,
2657 63..127,
2658 127..255,
2659 255..511,
2660 511..1023,
2661 ]
2662 );
2663 }
2664
2665 #[test]
2667 fn always_max_base_jobs() {
2668 const MAX_BASE_JOS: u64 = 512;
2669
2670 let mut state = ParallelScan::<usize, usize>::empty(MAX_BASE_JOS, 3);
2677 let mut expected_result: Vec<Vec<usize>> = vec![];
2678
2679 for i in 0..100 {
2683 println!("####### LOOP {:?} #########", i);
2684
2685 let data: Vec<_> = (0..MAX_BASE_JOS as usize).map(|j| i + j).collect();
2686
2687 expected_result.push(data.clone());
2688
2689 let work: Vec<_> = state
2690 .work_for_next_update(data.len() as u64)
2691 .into_iter()
2692 .flatten()
2693 .collect();
2694
2695 let new_merges: Vec<_> = work
2696 .iter()
2697 .map(|job| match job {
2698 AvailableJob::Base(i) => *i,
2699 AvailableJob::Merge { left, right } => *left + *right,
2700 })
2701 .collect();
2702
2703 println!("work={:?} new_merges={:?}", work.len(), new_merges.len());
2704 let (result_opt, s) = test_update(&state, data, new_merges);
2711
2712 let (expected_result_, remaining_expected_results) = {
2715 match result_opt {
2716 None => ((0, vec![]), expected_result.clone()),
2717 Some(ref r) => {
2718 println!("RESULT_OPT.0={} len={}", r.0, r.1.len());
2719 if expected_result.is_empty() {
2722 ((0, vec![]), vec![])
2723 } else {
2724 let first = expected_result[0].clone();
2725 let sum: usize = first.iter().sum();
2726
2727 ((sum, first), expected_result[1..].to_vec())
2728 }
2729 }
2730 }
2731 };
2732
2733 assert_eq!(
2734 result_opt.as_ref().unwrap_or(&expected_result_),
2735 &expected_result_
2736 );
2737
2738 expected_result = remaining_expected_results;
2739 state = s;
2740 }
2741 }
2742
2743 #[test]
2745 fn random_base_jobs() {
2746 const MAX_BASE_JOS: usize = 512;
2747
2748 let mut state = ParallelScan::<usize, usize>::empty(MAX_BASE_JOS as u64, 3);
2749 let mut rng = rand::thread_rng();
2750 let expected_result = (MAX_BASE_JOS, vec![1usize; MAX_BASE_JOS]);
2751
2752 for _ in 0..1_000 {
2753 let mut data = vec![1; rng.gen_range(0..=30)];
2754 data.truncate(MAX_BASE_JOS);
2755 let data_len = data.len();
2756
2757 println!("list_length={}", data_len);
2758
2759 let work: Vec<_> = state
2760 .work_for_next_update(data_len as u64)
2761 .into_iter()
2762 .flatten()
2763 .take(data_len * 2)
2764 .collect();
2765 let new_merges: Vec<_> = work
2766 .iter()
2767 .map(|job| match job {
2768 AvailableJob::Base(i) => *i,
2769 AvailableJob::Merge { left, right } => left + right,
2770 })
2771 .collect();
2772
2773 let (result_opt, s) = test_update(&state, data, new_merges);
2774
2775 assert_eq!(
2776 result_opt.as_ref().unwrap_or(&expected_result),
2777 &expected_result
2778 );
2779
2780 state = s;
2781 }
2782 }
2783
2784 fn gen<FunDone, FunAcc>(fun_job_done: FunDone, fun_acc: FunAcc) -> ParallelScan<i64, i64>
2785 where
2786 FunDone: Fn(&AvailableJob<i64, i64>) -> i64,
2787 FunAcc: Fn(Option<(i64, Vec<i64>)>, (i64, Vec<i64>)) -> Option<(i64, Vec<i64>)>,
2788 {
2789 let mut rng = rand::thread_rng();
2790
2791 let depth = rng.gen_range(2..5);
2792 let delay = rng.gen_range(0..=3);
2793
2794 let max_base_jobs = 2u64.pow(depth);
2795
2796 let mut s = ParallelScan::<i64, i64>::empty(max_base_jobs, delay);
2797
2798 let ndatas = rng.gen_range(2..=20);
2799 let datas: Vec<Vec<i64>> = (1..ndatas)
2800 .map(|_| {
2801 std::iter::repeat_with(|| rng.gen())
2802 .take(max_base_jobs as usize)
2803 .collect()
2804 })
2805 .collect();
2806
2807 for data in datas {
2817 println!("ndata={}", data.len());
2818
2819 let jobs = flatten(s.work_for_next_update(data.len() as u64));
2820
2821 let jobs_done: Vec<i64> = jobs.iter().map(&fun_job_done).collect();
2822
2823 println!("jobs_donea={}", jobs_done.len());
2824
2825 let old_tuple = s.acc.clone();
2826
2827 let res_opt = s.update(data, jobs_done, |_| 0).unwrap();
2828
2829 if let Some(res) = res_opt {
2830 let tuple = if old_tuple.is_some() {
2831 old_tuple
2832 } else {
2833 s.acc.clone()
2834 };
2835
2836 s.acc = fun_acc(tuple, res);
2837 }
2838 println!("s.acc.is_some={:?}", s.acc.is_some());
2839 }
2840
2841 s
2842 }
2843
2844 fn fun_merge_up(
2845 state: Option<(i64, Vec<i64>)>,
2846 mut x: (i64, Vec<i64>),
2847 ) -> Option<(i64, Vec<i64>)> {
2848 let mut acc = state?;
2849 acc.1.append(&mut x.1);
2850 Some((acc.0.wrapping_add(x.0), acc.1))
2851 }
2852
2853 fn job_done(job: &AvailableJob<i64, i64>) -> i64 {
2854 match job {
2855 AvailableJob::Base(x) => *x,
2856 AvailableJob::Merge { left, right } => {
2858 left.wrapping_add(*right)
2861 }
2863 }
2864 }
2865
2866 #[test]
2870 fn split_on_if_enqueuing_onto_the_next_queue() {
2871 let mut rng = rand::thread_rng();
2872
2873 let p = 4;
2874 let max_base_jobs = 2u64.pow(p);
2875
2876 for _ in 0..100000 {
2877 let state = ParallelScan::<i64, i64>::empty(max_base_jobs, 1);
2878 println!("hash_state={:?}", hash(&state));
2879 let i = rng.gen_range(0..max_base_jobs);
2880
2881 let data: Vec<i64> = (0..i as i64).collect();
2882
2883 let partition = state.partition_if_overflowing();
2884 let jobs = flatten(state.work_for_next_update(data.len() as u64));
2885
2886 let jobs_done: Vec<i64> = jobs.iter().map(job_done).collect();
2887
2888 let tree_count_before = state.trees.len();
2889
2890 let (_, s) = test_update(&state, data, jobs_done);
2891
2892 println!("second={:?}", partition.second.is_some());
2893
2894 match partition.second {
2895 None => {
2896 let tree_count_after = s.trees.len();
2897 let expected_tree_count = if partition.first.0 == i {
2898 tree_count_before + 1
2899 } else {
2900 tree_count_before
2901 };
2902 assert_eq!(tree_count_after, expected_tree_count);
2903 }
2904 Some(_) => {
2905 let tree_count_after = s.trees.len();
2906 let expected_tree_count = if i > partition.first.0 {
2907 tree_count_before + 1
2908 } else {
2909 tree_count_before
2910 };
2911 assert_eq!(tree_count_after, expected_tree_count);
2912 }
2913 }
2914 }
2915 }
2916
2917 #[test]
2919 fn sequence_number_reset() {
2920 let p = 3;
2921 let max_base_jobs = 2u64.pow(p);
2922
2923 let jobs = |state: &ParallelScan<i64, i64>| -> Vec<Vec<Job<base::Record<i64>, merge::Record<i64>>>> {
2924 state.trees.iter().map(|t| t.jobs_records()).rev().collect()
2925 };
2926
2927 let verify_sequence_number = |state: &ParallelScan<i64, i64>| {
2928 let mut state = state.clone();
2929 state.reset_seq_no();
2930
2931 let jobs_list = jobs(&state);
2932
2933 let depth = ceil_log2(max_base_jobs + 1);
2934
2935 for (i, jobs) in jobs_list.iter().enumerate() {
2936 let cur_levels = depth - i as u64;
2944
2945 let seq_sum = (0..cur_levels).fold(0, |acc, j| {
2946 let j = j + i as u64;
2947 acc + (2u64.pow(j as u32) * (depth - j))
2948 });
2949
2950 let offset = i as u64;
2951
2952 let sum_of_all_seq_numbers: u64 = jobs
2953 .iter()
2954 .map(|job| match job {
2955 Job::Base(base::Record { seq_no, .. }) => seq_no.0 - offset,
2956 Job::Merge(merge::Record { seq_no, .. }) => seq_no.0 - offset,
2957 })
2958 .sum();
2959
2960 dbg!(sum_of_all_seq_numbers, seq_sum);
2961 assert_eq!(sum_of_all_seq_numbers, seq_sum);
2962 }
2963 };
2964
2965 let mut state = ParallelScan::<i64, i64>::empty(max_base_jobs, 0);
2966 let mut counter = 0;
2967
2968 for _ in 0..50 {
2969 let jobs = flatten(state.jobs_for_next_update());
2970 let jobs_done: Vec<_> = jobs.iter().map(job_done).collect();
2971 let data: Vec<i64> = (0..max_base_jobs as i64).collect();
2972
2973 let (res_opt, s) = test_update(&state, data, jobs_done);
2974
2975 state = s;
2976
2977 if res_opt.is_some() {
2978 if counter > p {
2979 verify_sequence_number(&state);
2980 } else {
2981 counter += 1;
2982 }
2983 };
2984 }
2985
2986 assert_eq!(
2987 hash(&state),
2988 "931a0dc0a488289000c195ae361138cc713deddc179b5d22bfa6344508d0cfb5"
2989 );
2990 }
2991
2992 fn step_on_free_space<F, FAcc, B, M>(
2993 state: &mut ParallelScan<B, M>,
2994 w: &SyncSender<Option<(M, Vec<B>)>>,
2995 mut ds: Vec<B>,
2996 f: F,
2997 f_acc: FAcc,
2998 ) where
2999 F: Fn(&AvailableJob<B, M>) -> M,
3000 FAcc: Fn(Option<(M, Vec<B>)>, (M, Vec<B>)) -> Option<(M, Vec<B>)>,
3001 B: Debug + Clone + 'static,
3002 M: Debug + Clone + 'static,
3003 {
3004 loop {
3005 let data = take(&ds, state.max_base_jobs as usize);
3006
3007 let jobs = flatten(state.work_for_next_update(data.len() as u64));
3008 let jobs_done: Vec<_> = jobs.iter().map(&f).collect();
3009
3010 let old_tuple = state.acc.clone();
3011
3012 let (res_opt, mut s) = test_update(state, data.to_vec(), jobs_done);
3013
3014 let s = match res_opt {
3015 Some(x) => {
3016 let tuple = if old_tuple.is_some() {
3017 f_acc(old_tuple, x)
3018 } else {
3019 s.acc.clone()
3020 };
3021 s.acc = tuple;
3022 s
3023 }
3024 None => s,
3025 };
3026
3027 w.send(s.acc.clone()).unwrap();
3028
3029 *state = s;
3030
3031 let rem_ds = ds.get(state.max_base_jobs as usize..).unwrap_or(&[]);
3032
3033 if rem_ds.is_empty() {
3034 return;
3035 } else {
3036 ds = rem_ds.to_vec();
3037 }
3038 }
3039 }
3040
3041 fn do_steps<F, FAcc, B, M>(
3042 state: &mut ParallelScan<B, M>,
3043 recv: &Receiver<B>,
3044 f: F,
3045 f_acc: FAcc,
3046 w: SyncSender<Option<(M, Vec<B>)>>,
3047 ) where
3048 F: Fn(&AvailableJob<B, M>) -> M,
3049 FAcc: Fn(Option<(M, Vec<B>)>, (M, Vec<B>)) -> Option<(M, Vec<B>)>,
3050 B: Debug + Clone + 'static,
3051 M: Debug + Clone + 'static,
3052 {
3053 let data = recv.recv().unwrap();
3054 step_on_free_space(state, &w, vec![data], &f, &f_acc);
3055 }
3056
3057 fn scan<F, FAcc, B, M>(
3058 s: &mut ParallelScan<B, M>,
3059 data: &Receiver<B>,
3060 f: F,
3061 f_acc: FAcc,
3062 ) -> Receiver<Option<(M, Vec<B>)>>
3063 where
3064 F: Fn(&AvailableJob<B, M>) -> M,
3065 FAcc: Fn(Option<(M, Vec<B>)>, (M, Vec<B>)) -> Option<(M, Vec<B>)>,
3066 B: Debug + Clone + 'static,
3067 M: Debug + Clone + 'static,
3068 {
3069 let (send, rec) = std::sync::mpsc::sync_channel::<Option<(M, Vec<B>)>>(1);
3070 do_steps(s, data, f, f_acc, send);
3071 rec
3072 }
3073
3074 fn step_repeatedly<F, FAcc, B, M>(
3075 state: &mut ParallelScan<B, M>,
3076 data: &Receiver<B>,
3077 f: F,
3078 f_acc: FAcc,
3079 ) -> Receiver<Option<(M, Vec<B>)>>
3080 where
3081 F: Fn(&AvailableJob<B, M>) -> M,
3082 FAcc: Fn(Option<(M, Vec<B>)>, (M, Vec<B>)) -> Option<(M, Vec<B>)>,
3083 B: Debug + Clone + 'static,
3084 M: Debug + Clone + 'static,
3085 {
3086 let (send, rec) = std::sync::mpsc::sync_channel::<Option<(M, Vec<B>)>>(1);
3087 do_steps(state, data, f, f_acc, send);
3088 rec
3089 }
3090
3091 #[test]
3093 fn scan_can_be_initialized_from_intermediate_state() {
3094 for _ in 0..10 {
3095 let mut state = gen(job_done, fun_merge_up);
3096
3097 println!("state={:#?}", state);
3098
3099 let do_one_next = Arc::new(AtomicBool::new(false));
3100
3101 let do_one_next_clone = Arc::clone(&do_one_next);
3102 let (send, recv) = sync_channel(1);
3103
3104 thread::spawn(move || loop {
3105 let v = if do_one_next_clone.load(Relaxed) {
3106 do_one_next_clone.store(false, Relaxed);
3107 1i64
3108 } else {
3109 0i64
3110 };
3111 if send.send(v).is_err() {
3112 return;
3113 }
3114 });
3115
3116 let one_then_zeros = recv;
3117
3118 let parallelism = state.max_base_jobs * ceil_log2(state.max_base_jobs);
3119 let old_acc = state.acc.as_ref().cloned().unwrap_or((0, vec![]));
3120
3121 let fill_some_zero =
3122 |state: &mut ParallelScan<i64, i64>, v: i64, r: &Receiver<i64>| -> i64 {
3123 (0..parallelism * parallelism).fold(v, |acc, _| {
3124 let pipe = step_repeatedly(state, r, job_done, fun_merge_up);
3125
3126 match pipe.recv() {
3127 Ok(Some((v, _))) => v,
3128 Ok(None) => acc,
3129 Err(_) => acc,
3130 }
3131 })
3132 };
3133
3134 let v = fill_some_zero(&mut state, 0, &one_then_zeros);
3135
3136 do_one_next.store(true, Relaxed);
3137
3138 let acc = { state.acc.clone().unwrap() };
3139
3140 assert_ne!(acc.0, old_acc.0);
3141
3142 fill_some_zero(&mut state, v, &one_then_zeros);
3143
3144 let acc_plus_one = { state.acc.unwrap() };
3145 assert_eq!(acc_plus_one.0, acc.0.wrapping_add(1));
3146 }
3147 }
3148
3149 #[test]
3153 fn scan_behaves_like_a_fold_long_term() {
3154 fn fun_merge_up(
3155 tuple: Option<(i64, Vec<String>)>,
3156 mut x: (i64, Vec<String>),
3157 ) -> Option<(i64, Vec<String>)> {
3158 let mut acc = tuple?;
3159 acc.1.append(&mut x.1);
3160
3161 Some((acc.0.wrapping_add(x.0), acc.1))
3162 }
3163
3164 fn job_done(job: &AvailableJob<String, i64>) -> i64 {
3165 match job {
3166 AvailableJob::Base(x) => x.parse().unwrap(),
3167 AvailableJob::Merge { left, right } => left.wrapping_add(*right),
3168 }
3169 }
3170
3171 let (send, recv) = sync_channel(1);
3172
3173 let depth = 7;
3174 let count: i64 = 1000;
3175
3176 thread::spawn(move || {
3177 let mut count = count;
3178 let x = count;
3179 loop {
3180 let next = if count <= 0 {
3181 "0".to_string()
3182 } else {
3183 (x - count).to_string()
3184 };
3185
3186 count -= 1;
3187
3188 if send.send(next).is_err() {
3189 return;
3190 }
3191 }
3192 });
3193
3194 let mut s = ParallelScan::<String, i64>::empty(2u64.pow(depth as u32), 1);
3195
3196 let after_3n = (0..4 * count).fold(0i64, |acc, _| {
3197 let result = scan(&mut s, &recv, job_done, fun_merge_up);
3198 match result.recv() {
3199 Ok(Some((v, _))) => v,
3200 Ok(None) => acc,
3201 Err(_) => acc,
3202 }
3203 });
3204
3205 let expected = (0..count).fold(0i64, |a, b| a.wrapping_add(b));
3206
3207 assert_eq!(after_3n, expected);
3208 }
3209
3210 #[test]
3215 fn scan_concat_over_strings() {
3216 fn fun_merge_up(
3217 tuple: Option<(String, Vec<String>)>,
3218 mut x: (String, Vec<String>),
3219 ) -> Option<(String, Vec<String>)> {
3220 let mut acc = tuple?;
3221 acc.1.append(&mut x.1);
3222
3223 Some((format!("{}{}", acc.0, x.0), acc.1))
3224 }
3225
3226 fn job_done(job: &AvailableJob<String, String>) -> String {
3227 match job {
3228 AvailableJob::Base(x) => x.clone(),
3229 AvailableJob::Merge { left, right } => {
3230 format!("{}{}", left, right)
3231 }
3232 }
3233 }
3234
3235 let (send, recv) = sync_channel(1);
3236
3237 let depth = 7;
3238 let count: i64 = 100;
3239
3240 thread::spawn(move || {
3241 let mut count = count;
3242 let x = count;
3243 loop {
3244 let next = if count <= 0 {
3245 "".to_string()
3246 } else {
3247 let n = (x - count).to_string();
3248 format!("{},", n)
3249 };
3250
3251 count -= 1;
3252
3253 if send.send(next).is_err() {
3254 return;
3255 }
3256 }
3257 });
3258
3259 let mut s = ParallelScan::<String, String>::empty(2u64.pow(depth as u32), 1);
3260
3261 let after_3n = (0..42 * count).fold(String::new(), |acc, _| {
3262 let result = scan(&mut s, &recv, job_done, fun_merge_up);
3263 match result.recv() {
3264 Ok(Some((v, _))) => v,
3265 Ok(None) => acc,
3266 Err(_) => acc,
3267 }
3268 });
3269
3270 let expected = (0..count)
3271 .map(|i| format!("{},", i))
3272 .fold(String::new(), |a, b| format!("{}{}", a, b));
3273
3274 assert_eq!(after_3n, expected);
3275 }
3276}