mina_tree/scan_state/
parallel_scan.rs

1use std::{
2    collections::{BTreeMap, VecDeque},
3    fmt::Debug,
4    io::Write,
5    ops::ControlFlow,
6    sync::Arc,
7};
8
9use itertools::Itertools;
10use serde::{Deserialize, Serialize};
11use sha2::{
12    digest::{generic_array::GenericArray, typenum::U32},
13    Digest, Sha256,
14};
15use ControlFlow::{Break, Continue};
16
17use super::scan_state::transaction_snark::{LedgerProofWithSokMessage, TransactionWithWitness};
18
19/// Sequence number for jobs in the scan state that corresponds to the order in
20/// which they were added
21#[derive(Clone, Eq, Ord, PartialEq, PartialOrd)]
22pub struct SequenceNumber(u64);
23
24impl Debug for SequenceNumber {
25    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
26        f.write_fmt(format_args!("{}", self.0))
27    }
28}
29
30impl SequenceNumber {
31    pub(super) fn new(number: u64) -> Self {
32        Self(number)
33    }
34
35    fn zero() -> Self {
36        Self(0)
37    }
38
39    pub fn incr(&self) -> Self {
40        Self(self.0 + 1)
41    }
42
43    fn is_u64_max(&self) -> bool {
44        self.0 == u64::MAX
45    }
46
47    pub fn as_u64(&self) -> u64 {
48        self.0
49    }
50}
51
52impl std::ops::Sub for &'_ SequenceNumber {
53    type Output = SequenceNumber;
54
55    fn sub(self, rhs: &'_ SequenceNumber) -> Self::Output {
56        SequenceNumber(self.0 - rhs.0)
57    }
58}
59
60/// Each node on the tree is viewed as a job that needs to be completed. When a
61/// job is completed, it creates a new "Todo" job and marks the old job as "Done"
62#[derive(Clone, Debug)]
63pub enum JobStatus {
64    Todo,
65    Done,
66}
67
68impl JobStatus {
69    pub fn is_done(&self) -> bool {
70        matches!(self, Self::Done)
71    }
72
73    fn as_str(&self) -> &'static str {
74        match self {
75            JobStatus::Todo => "Todo",
76            JobStatus::Done => "Done",
77        }
78    }
79}
80
81/// The number of new jobs- base and merge that can be added to this tree.
82/// Each node has a weight associated to it and the
83/// new jobs received are distributed across the tree based on this number.
84#[derive(Clone)]
85pub struct Weight {
86    pub(super) base: u64,
87    pub(super) merge: u64,
88}
89
90impl Debug for Weight {
91    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
92        write!(f, "{{ base: {} merge: {} }}", self.base, self.merge)
93    }
94}
95
96impl Weight {
97    fn zero() -> Self {
98        Self { base: 0, merge: 0 }
99    }
100}
101
102trait Lens {
103    type Value;
104    type Target;
105    fn get<'a>(&self, target: &'a Self::Target) -> &'a Self::Value;
106    fn set(&self, target: &Self::Target, value: Self::Value) -> Self::Target;
107}
108
109enum WeightLens {
110    Base,
111    Merge,
112}
113
114impl Lens for WeightLens {
115    type Value = u64;
116    type Target = Weight;
117
118    fn get<'a>(&self, target: &'a Self::Target) -> &'a Self::Value {
119        match self {
120            WeightLens::Base => &target.base,
121            WeightLens::Merge => &target.merge,
122        }
123    }
124
125    fn set(&self, target: &Self::Target, value: Self::Value) -> Self::Target {
126        match self {
127            WeightLens::Base => Self::Target {
128                base: value,
129                merge: target.merge,
130            },
131            WeightLens::Merge => Self::Target {
132                base: target.base,
133                merge: value,
134            },
135        }
136    }
137}
138
139#[derive(Debug)]
140enum WorkForTree {
141    Current,
142    Next,
143}
144
145/// For base proofs (Proving new transactions)
146pub mod base {
147    use super::*;
148
149    #[derive(Clone)]
150    pub struct Record<BaseJob> {
151        pub job: BaseJob,
152        pub seq_no: SequenceNumber,
153        pub state: JobStatus,
154    }
155
156    impl<BaseJob> Debug for Record<BaseJob> {
157        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
158            let Self {
159                job: _,
160                seq_no,
161                state,
162            } = self;
163            f.write_fmt(format_args!("seq_no: {:?}, state: {:?}", seq_no, state))
164        }
165    }
166
167    #[derive(Clone)]
168    pub enum Job<BaseJob> {
169        Empty,
170        Full(Record<BaseJob>),
171    }
172
173    impl<BaseJob> Debug for Job<BaseJob> {
174        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
175            match self {
176                Self::Empty => write!(f, "Empty"),
177                Self::Full(arg0) => f.write_fmt(format_args!("Full {{ {:?} }}", arg0)),
178            }
179        }
180    }
181
182    #[derive(Clone, Debug)]
183    pub struct Base<BaseJob> {
184        pub weight: Weight,
185        pub job: Job<BaseJob>,
186    }
187
188    impl<BaseJob: Clone> Record<BaseJob> {
189        pub fn map<F: Fn(&BaseJob) -> BaseJob>(&self, fun: F) -> Self {
190            Self {
191                job: fun(&self.job),
192                seq_no: self.seq_no.clone(),
193                state: self.state.clone(),
194            }
195        }
196
197        pub fn with_seq_no(&self, no: SequenceNumber) -> Self {
198            Self {
199                seq_no: no,
200                state: self.state.clone(),
201                job: self.job.clone(),
202            }
203        }
204    }
205
206    impl<BaseJob: Clone> Job<BaseJob> {
207        pub fn map<F: Fn(&BaseJob) -> BaseJob>(&self, fun: F) -> Self {
208            match self {
209                Job::Empty => Self::Empty,
210                Job::Full(r) => Job::Full(r.map(fun)),
211            }
212        }
213    }
214
215    impl<BaseJob: Clone> Base<BaseJob> {
216        pub fn map<F: Fn(&BaseJob) -> BaseJob>(&self, fun: F) -> Self {
217            Self {
218                weight: self.weight.clone(),
219                job: self.job.map(fun),
220            }
221        }
222
223        pub fn with_seq_no(&self, no: SequenceNumber) -> Self {
224            Self {
225                weight: self.weight.clone(),
226                job: match &self.job {
227                    Job::Full(record) => Job::Full(record.with_seq_no(no)),
228                    x => x.clone(),
229                },
230            }
231        }
232    }
233}
234
235/// For merge proofs: Merging two base proofs or two merge proofs
236pub mod merge {
237    use super::*;
238
239    #[derive(Clone)]
240    pub struct Record<MergeJob> {
241        pub left: MergeJob,
242        pub right: MergeJob,
243        pub seq_no: SequenceNumber,
244        pub state: JobStatus,
245    }
246
247    impl<MergeJob> Debug for Record<MergeJob> {
248        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
249            let Self {
250                left: _,
251                right: _,
252                seq_no,
253                state,
254            } = self;
255            f.write_fmt(format_args!("seq_no: {:?}, state: {:?}", seq_no, state))
256        }
257    }
258
259    impl<MergeJob: Clone> Record<MergeJob> {
260        pub fn with_seq_no(&self, no: SequenceNumber) -> Self {
261            Self {
262                seq_no: no,
263                left: self.left.clone(),
264                right: self.right.clone(),
265                state: self.state.clone(),
266            }
267        }
268    }
269
270    #[derive(Clone)]
271    pub enum Job<MergeJob> {
272        Empty,
273        Part(MergeJob), // left
274        Full(Record<MergeJob>),
275    }
276
277    impl<MergeJob> Debug for Job<MergeJob> {
278        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
279            match self {
280                Self::Empty => write!(f, "Empty"),
281                Self::Part(_) => write!(f, "Part(merge)"),
282                Self::Full(arg0) => f.write_fmt(format_args!("Full {{ {:?} }}", arg0)),
283            }
284        }
285    }
286
287    #[derive(Clone, Debug)]
288    pub struct Merge<MergeJob> {
289        pub weight: (Weight, Weight),
290        pub job: Job<MergeJob>,
291    }
292
293    impl<MergeJob> Record<MergeJob> {
294        pub fn map<F: Fn(&MergeJob) -> MergeJob>(&self, fun: F) -> Self {
295            Self {
296                left: fun(&self.left),
297                right: fun(&self.right),
298                seq_no: self.seq_no.clone(),
299                state: self.state.clone(),
300            }
301        }
302    }
303
304    impl<MergeJob> Job<MergeJob> {
305        pub fn map<F: Fn(&MergeJob) -> MergeJob>(&self, fun: F) -> Self {
306            match self {
307                Job::Empty => Self::Empty,
308                Job::Part(j) => Job::Part(fun(j)),
309                Job::Full(r) => Job::Full(r.map(fun)),
310            }
311        }
312    }
313
314    impl<MergeJob: Clone> Merge<MergeJob> {
315        pub fn map<F: Fn(&MergeJob) -> MergeJob>(&self, fun: F) -> Self {
316            Self {
317                weight: self.weight.clone(),
318                job: self.job.map(fun),
319            }
320        }
321
322        pub fn with_seq_no(&self, no: SequenceNumber) -> Self {
323            Self {
324                weight: self.weight.clone(),
325                job: match &self.job {
326                    Job::Full(record) => Job::Full(record.with_seq_no(no)),
327                    x => x.clone(),
328                },
329            }
330        }
331    }
332}
333
334/// All the jobs on a tree that can be done. Base.Full and Merge.Full
335#[derive(Clone, Debug, Serialize, Deserialize)]
336pub enum AvailableJob<BaseJob, MergeJob> {
337    Base(BaseJob),
338    Merge { left: MergeJob, right: MergeJob },
339}
340
341/// New jobs to be added (including new transactions or new merge jobs)
342#[derive(Clone, Debug)]
343enum Job<BaseJob, MergeJob> {
344    Base(BaseJob),
345    Merge(MergeJob),
346}
347
348/// Space available and number of jobs required to enqueue data.
349/// first = space on the current tree and number of jobs required
350/// to be completed
351/// second = If the current-tree space is less than <max_base_jobs>
352/// then remaining number of slots on a new tree and the corresponding
353/// job count.
354#[derive(Clone)]
355pub struct SpacePartition {
356    pub first: (u64, u64),
357    pub second: Option<(u64, u64)>,
358}
359
360impl Debug for SpacePartition {
361    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
362        struct Detail {
363            space_available: u64,
364            njobs_to_be_completed: u64,
365        }
366
367        impl Debug for Detail {
368            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
369                f.write_fmt(format_args!(
370                    "space_available: {}, njobs_to_be_completed={}",
371                    self.space_available, self.njobs_to_be_completed
372                ))
373            }
374        }
375
376        f.debug_struct("SpacePartition")
377            .field(
378                "first(current_tree)",
379                &Detail {
380                    space_available: self.first.0,
381                    njobs_to_be_completed: self.first.1,
382                },
383            )
384            .field(
385                "second(next_tree)",
386                &self.second.map(|second| Detail {
387                    space_available: second.0,
388                    njobs_to_be_completed: second.1,
389                }),
390            )
391            .finish()
392    }
393}
394
395trait WithVTable<T>: Debug {
396    fn by_ref(&self) -> &T;
397}
398
399impl<T: Debug> WithVTable<T> for T {
400    fn by_ref(&self) -> &Self {
401        self
402    }
403}
404
405#[derive(Clone, Debug)]
406pub enum Value<B, M> {
407    Leaf(B),
408    Node(M),
409}
410
411/// A single tree with number of leaves = max_base_jobs = 2**transaction_capacity_log_2
412#[derive(Clone)]
413pub struct Tree<B, M> {
414    pub(super) values: Vec<Value<B, M>>,
415}
416
417impl<B, M> Debug for Tree<base::Base<B>, merge::Merge<M>>
418where
419    B: Debug,
420    M: Debug,
421{
422    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
423        enum BaseOrMerge<'a, B, M> {
424            Base(&'a base::Base<B>),
425            Merge(&'a merge::Merge<M>),
426        }
427
428        impl<B, M> Debug for BaseOrMerge<'_, B, M>
429        where
430            B: Debug,
431            M: Debug,
432        {
433            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
434                match self {
435                    Self::Base(arg0) => write!(f, "{:?}", arg0),
436                    Self::Merge(arg0) => write!(f, "{:?}", arg0),
437                }
438            }
439        }
440
441        let mut by_depth = BTreeMap::<usize, Vec<_>>::default();
442
443        for (index, v) in self.values.iter().enumerate() {
444            let vec = by_depth.entry(btree::depth_at(index) as usize).or_default();
445            let v = match v {
446                Value::Leaf(b) => BaseOrMerge::Base(b),
447                Value::Node(m) => BaseOrMerge::Merge(m),
448            };
449            vec.push(v);
450        }
451
452        for (depth, values) in by_depth.iter() {
453            writeln!(f, "depth={} {:#?}", depth, values)?;
454        }
455
456        Ok(())
457    }
458}
459
460mod btree {
461    // <https://stackoverflow.com/a/31147495/5717561>
462    pub fn depth_at(index: usize) -> u64 {
463        // Get the depth from its index (in the array)
464        // TODO: Find if there is a faster way to get that
465        let depth = ((index + 1) as f32).log2().floor() as u32;
466        depth as u64
467    }
468
469    pub fn child_left(index: usize) -> usize {
470        (index * 2) + 1
471    }
472
473    pub fn child_right(index: usize) -> usize {
474        (index * 2) + 2
475    }
476
477    pub fn parent(index: usize) -> Option<usize> {
478        Some(index.checked_sub(1)? / 2)
479    }
480
481    pub fn range_at_depth(depth: u64) -> std::ops::Range<usize> {
482        if depth == 0 {
483            return 0..1;
484        }
485
486        let start = (1 << depth) - 1;
487        let end = (1 << (depth + 1)) - 1;
488
489        start..end
490    }
491}
492
493impl<B, M> Tree<B, M>
494where
495    B: Debug + 'static,
496    M: Debug + 'static,
497{
498    /// mapi where i is the level of the tree
499    fn map_depth<FunMerge, FunBase>(&self, fun_merge: &FunMerge, fun_base: &FunBase) -> Self
500    where
501        FunMerge: for<'a> Fn(u64, &'a M) -> M,
502        FunBase: for<'a> Fn(&'a B) -> B,
503    {
504        let values = self
505            .values
506            .iter()
507            .enumerate()
508            .map(|(index, value)| match value {
509                Value::Leaf(base) => Value::Leaf(fun_base(base)),
510                Value::Node(merge) => Value::Node(fun_merge(btree::depth_at(index), merge)),
511            })
512            .collect();
513
514        Self { values }
515    }
516
517    /// Use for binprot conversion only
518    pub(super) fn values_by_depth(&self) -> BTreeMap<u64, Value<Vec<&B>, Vec<&M>>> {
519        let mut values: BTreeMap<u64, Value<Vec<&B>, Vec<&M>>> = BTreeMap::default();
520
521        for (index, value) in self.values.iter().enumerate() {
522            values
523                .entry(btree::depth_at(index))
524                .and_modify(|for_depth: &mut Value<_, _>| match for_depth {
525                    Value::Leaf(vec) => {
526                        let Value::Leaf(leaf) = value else {
527                            panic!("invalid")
528                        };
529                        vec.push(leaf);
530                    }
531                    Value::Node(vec) => {
532                        let Value::Node(node) = value else {
533                            panic!("invalid")
534                        };
535                        vec.push(node);
536                    }
537                })
538                .or_insert({
539                    match value {
540                        Value::Leaf(leaf) => {
541                            let mut vec = Vec::with_capacity(255);
542                            vec.push(leaf);
543                            Value::Leaf(vec)
544                        }
545                        Value::Node(node) => {
546                            let mut vec = Vec::with_capacity(255);
547                            vec.push(node);
548                            Value::Node(vec)
549                        }
550                    }
551                });
552        }
553
554        values
555    }
556
557    fn map<FunMerge, FunBase>(&self, fun_merge: FunMerge, fun_base: FunBase) -> Self
558    where
559        FunMerge: Fn(&M) -> M,
560        FunBase: Fn(&B) -> B,
561    {
562        self.map_depth(&|_, m| fun_merge(m), &fun_base)
563    }
564
565    /// foldi where i is the cur_level
566    fn fold_depth_until_prime<Accum, Final, FunMerge, FunBase>(
567        &self,
568        fun_merge: &FunMerge,
569        fun_base: &FunBase,
570        init: Accum,
571    ) -> ControlFlow<Final, Accum>
572    where
573        FunMerge: Fn(u64, Accum, &M) -> ControlFlow<Final, Accum>,
574        FunBase: Fn(Accum, &B) -> ControlFlow<Final, Accum>,
575    {
576        let mut accum = init;
577
578        for (index, value) in self.values.iter().enumerate() {
579            accum = match value {
580                Value::Leaf(base) => fun_base(accum, base)?,
581                Value::Node(merge) => fun_merge(btree::depth_at(index), accum, merge)?,
582            };
583        }
584
585        Continue(accum)
586    }
587
588    /// foldi where i is the cur_level
589    fn fold_depth_until_prime_err<Accum, FunMerge, FunBase>(
590        &self,
591        fun_merge: &FunMerge,
592        fun_base: &FunBase,
593        init: Accum,
594    ) -> Result<Accum, String>
595    where
596        FunMerge: Fn(u64, Accum, &M) -> Result<Accum, String>,
597        FunBase: Fn(Accum, &B) -> Result<Accum, String>,
598    {
599        let mut accum = init;
600
601        for (index, value) in self.values.iter().enumerate() {
602            accum = match value {
603                Value::Leaf(base) => fun_base(accum, base)?,
604                Value::Node(merge) => fun_merge(btree::depth_at(index), accum, merge)?,
605            };
606        }
607
608        Ok(accum)
609    }
610
611    fn fold_depth_until<Accum, Final, FunFinish, FunMerge, FunBase>(
612        &self,
613        fun_merge: FunMerge,
614        fun_base: FunBase,
615        fun_finish: FunFinish,
616        init: Accum,
617    ) -> Final
618    where
619        FunMerge: Fn(u64, Accum, &M) -> ControlFlow<Final, Accum>,
620        FunBase: Fn(Accum, &B) -> ControlFlow<Final, Accum>,
621        FunFinish: Fn(Accum) -> Final,
622    {
623        match self.fold_depth_until_prime(&fun_merge, &fun_base, init) {
624            Continue(accum) => fun_finish(accum),
625            Break(value) => value,
626        }
627    }
628
629    fn fold_depth<Accum, FunMerge, FunBase>(
630        &self,
631        fun_merge: FunMerge,
632        fun_base: FunBase,
633        init: Accum,
634    ) -> Accum
635    where
636        FunMerge: Fn(u64, Accum, &M) -> Accum,
637        FunBase: Fn(Accum, &B) -> Accum,
638    {
639        self.fold_depth_until(
640            |i, accum, a| Continue(fun_merge(i, accum, a)),
641            |accum, d| Continue(fun_base(accum, d)),
642            |x| x,
643            init,
644        )
645    }
646
647    fn fold<Accum, FunMerge, FunBase>(
648        &self,
649        fun_merge: FunMerge,
650        fun_base: FunBase,
651        init: Accum,
652    ) -> Accum
653    where
654        FunMerge: Fn(Accum, &M) -> Accum,
655        FunBase: Fn(Accum, &B) -> Accum,
656    {
657        self.fold_depth(|_, accum, a| fun_merge(accum, a), fun_base, init)
658    }
659
660    fn fold_until<Accum, Final, FunFinish, FunMerge, FunBase>(
661        &self,
662        fun_merge: FunMerge,
663        fun_base: FunBase,
664        fun_finish: FunFinish,
665        init: Accum,
666    ) -> Final
667    where
668        FunMerge: Fn(Accum, &M) -> ControlFlow<Final, Accum>,
669        FunBase: Fn(Accum, &B) -> ControlFlow<Final, Accum>,
670        FunFinish: Fn(Accum) -> Final,
671    {
672        self.fold_depth_until(
673            |_, accum, a| fun_merge(accum, a),
674            fun_base,
675            fun_finish,
676            init,
677        )
678    }
679
680    fn update_split<Data, FunJobs, FunWeight, FunMerge, FunBase, Weight, R>(
681        &self,
682        fun_merge: &FunMerge,
683        fun_base: &FunBase,
684        weight_merge: &FunWeight,
685        jobs: &[Data],
686        update_level: u64,
687        jobs_split: &FunJobs,
688    ) -> Result<(Self, Option<R>), ()>
689    where
690        FunMerge: Fn(&[Data], u64, &M) -> Result<(M, Option<R>), ()>,
691        FunBase: Fn(&[Data], B) -> Result<B, ()>,
692        FunWeight: Fn(&M) -> (Weight, Weight),
693        FunJobs: Fn((Weight, Weight), &[Data]) -> (&[Data], &[Data]),
694        Data: Clone,
695        M: Clone,
696        B: Clone,
697    {
698        let mut values = Vec::with_capacity(self.values.len());
699        let mut scan_result = None;
700
701        // Because our tree is a perfect binary tree, two values pushed
702        // at the back of `jobs_fifo` by a node will be popped by its
703        // left and right children, respectively
704        let mut jobs_fifo = VecDeque::with_capacity(self.values.len());
705
706        jobs_fifo.push_back(jobs);
707
708        for (index, value) in self.values.iter().enumerate() {
709            let depth = btree::depth_at(index);
710
711            if depth > update_level {
712                values.push(value.clone());
713                continue;
714            }
715
716            let jobs_for_this = jobs_fifo.pop_front().unwrap();
717
718            let value = match value {
719                Value::Leaf(base) => Value::Leaf(fun_base(jobs_for_this, base.clone())?),
720                Value::Node(merge) => {
721                    let (jobs_left, jobs_right) = jobs_split(weight_merge(merge), jobs_for_this);
722                    jobs_fifo.push_back(jobs_left);
723                    jobs_fifo.push_back(jobs_right);
724
725                    let (value, result) = fun_merge(jobs_for_this, depth, merge)?;
726
727                    if scan_result.is_none() {
728                        scan_result = Some(result);
729                    }
730
731                    Value::Node(value)
732                }
733            };
734
735            values.push(value);
736        }
737
738        assert_eq!(jobs_fifo.capacity(), self.values.len());
739
740        Ok(((Self { values }), scan_result.flatten()))
741    }
742
743    fn update_accumulate<Data, FunMerge, FunBase>(
744        &self,
745        fun_merge: &FunMerge,
746        fun_base: &FunBase,
747    ) -> (Self, Data)
748    where
749        FunMerge: Fn((Data, Data), &M) -> (M, Data),
750        FunBase: Fn(&B) -> (B, Data),
751        Data: Clone,
752    {
753        let mut datas = vec![None; self.values.len()];
754
755        let childs_of = |data: &mut [Option<Data>], index: usize| -> Option<(Data, Data)> {
756            let left = data
757                .get_mut(btree::child_left(index))
758                .and_then(Option::take)?;
759            let right = data
760                .get_mut(btree::child_right(index))
761                .and_then(Option::take)?;
762
763            Some((left, right))
764        };
765
766        let mut values: Vec<_> = self
767            .values
768            .iter()
769            .enumerate()
770            .rev()
771            .map(|(index, value)| match value {
772                Value::Leaf(base) => {
773                    let (new_base, count_list) = fun_base(base);
774                    datas[index] = Some(count_list);
775                    Value::Leaf(new_base)
776                }
777                Value::Node(merge) => {
778                    let (left, right) = childs_of(datas.as_mut(), index).unwrap();
779                    let (value, count_list) = fun_merge((left, right), merge);
780                    datas[index] = Some(count_list);
781                    Value::Node(value)
782                }
783            })
784            .collect();
785
786        values.reverse();
787
788        (Self { values }, datas[0].take().unwrap())
789    }
790
791    fn iter(&self) -> impl Iterator<Item = (u64, &Value<B, M>)> {
792        self.values
793            .iter()
794            .enumerate()
795            .map(|(index, value)| (btree::depth_at(index), value))
796    }
797}
798
799impl Tree<base::Base<Arc<TransactionWithWitness>>, merge::Merge<Arc<LedgerProofWithSokMessage>>> {
800    pub fn view(&self) -> impl Iterator<Item = JobValueWithIndex<'_>> {
801        self.values
802            .iter()
803            .enumerate()
804            .map(|(index, value)| JobValueWithIndex {
805                index,
806                job: match value {
807                    Value::Leaf(base) => Value::Leaf(&base.job),
808                    Value::Node(merge) => Value::Node(&merge.job),
809                },
810            })
811    }
812}
813
814pub type JobValue<'a> = super::parallel_scan::Value<
815    &'a base::Job<Arc<TransactionWithWitness>>,
816    &'a merge::Job<Arc<LedgerProofWithSokMessage>>,
817>;
818
819pub struct JobValueWithIndex<'a> {
820    index: usize,
821    pub job: JobValue<'a>,
822}
823
824impl JobValueWithIndex<'_> {
825    pub fn index(&self) -> usize {
826        self.index
827    }
828
829    pub fn depth(&self) -> usize {
830        btree::depth_at(self.index) as usize
831    }
832
833    pub fn parent(&self) -> Option<usize> {
834        btree::parent(self.index)
835    }
836
837    pub fn child_left(&self) -> usize {
838        btree::child_left(self.index)
839    }
840
841    pub fn child_right(&self) -> usize {
842        btree::child_right(self.index)
843    }
844
845    /// Returns sibling index and if the sibling is on the left.
846    pub fn bundle_sibling(&self) -> Option<(usize, bool)> {
847        let parent = self.parent()?;
848        let try_sibling =
849            |parent, index| btree::parent(index).filter(|p| *p == parent).map(|_| index);
850        try_sibling(parent, self.index - 1)
851            .map(|i| (i, true))
852            .or_else(|| try_sibling(parent, self.index + 1).map(|i| (i, false)))
853    }
854}
855
856#[derive(Clone, Debug)]
857pub struct ParallelScan<BaseJob, MergeJob> {
858    pub trees: Vec<Tree<base::Base<BaseJob>, merge::Merge<MergeJob>>>,
859    /// last emitted proof and the corresponding transactions
860    pub(super) acc: Option<(MergeJob, Vec<BaseJob>)>,
861    /// Sequence number for the jobs added every block
862    pub curr_job_seq_no: SequenceNumber,
863    /// transaction_capacity_log_2
864    pub(super) max_base_jobs: u64,
865    pub(super) delay: u64,
866}
867
868pub(super) enum ResetKind {
869    Base,
870    Merge,
871    Both,
872}
873
874impl<BaseJob, MergeJob> Tree<base::Base<BaseJob>, merge::Merge<MergeJob>>
875where
876    BaseJob: Clone + Debug + 'static,
877    MergeJob: Clone + Debug + 'static,
878{
879    fn update(
880        &self,
881        completed_jobs: &[Job<BaseJob, MergeJob>],
882        update_level: u64,
883        sequence_no: SequenceNumber,
884        lens: WeightLens,
885    ) -> Result<(Self, Option<MergeJob>), ()> {
886        let add_merges = |jobs: &[Job<BaseJob, MergeJob>],
887                          current_level: u64,
888                          merge_job: &merge::Merge<MergeJob>|
889         -> Result<(merge::Merge<MergeJob>, Option<MergeJob>), ()> {
890            use merge::{
891                Job::{Empty, Full, Part},
892                Record,
893            };
894            use Job::{Base, Merge};
895
896            let weight = &merge_job.weight;
897            let m = &merge_job.job;
898
899            let (w1, w2) = &weight;
900            let (left, right) = (*lens.get(w1), *lens.get(w2));
901
902            // println!("current_level={} update_level={}", current_level, update_level);
903            if update_level > 0 && current_level == update_level - 1 {
904                // Create new jobs from the completed ones
905                let (new_weight, new_m) = match (jobs, m) {
906                    ([], m) => (weight.clone(), m.clone()),
907                    ([Merge(a), Merge(b)], Empty) => {
908                        let w1 = lens.set(w1, left - 1);
909                        let w2 = lens.set(w2, right - 1);
910
911                        (
912                            (w1, w2),
913                            Full(Record {
914                                left: a.clone(),
915                                right: b.clone(),
916                                seq_no: sequence_no.clone(),
917                                state: JobStatus::Todo,
918                            }),
919                        )
920                    }
921                    ([Merge(a)], Empty) => {
922                        let w1 = lens.set(w1, left - 1);
923                        let w2 = lens.set(w2, right);
924
925                        ((w1, w2), Part(a.clone()))
926                    }
927                    ([Merge(b)], Part(a)) => {
928                        let w1 = lens.set(w1, left);
929                        let w2 = lens.set(w2, right - 1);
930
931                        (
932                            (w1, w2),
933                            Full(Record {
934                                left: a.clone(),
935                                right: b.clone(),
936                                seq_no: sequence_no.clone(),
937                                state: JobStatus::Todo,
938                            }),
939                        )
940                    }
941                    ([Base(_)], Empty) => {
942                        // Depending on whether this is the first or second of the two base jobs
943
944                        let weight = if left == 0 {
945                            let w1 = lens.set(w1, left);
946                            let w2 = lens.set(w2, right - 1);
947                            (w1, w2)
948                        } else {
949                            let w1 = lens.set(w1, left - 1);
950                            let w2 = lens.set(w2, right);
951                            (w1, w2)
952                        };
953
954                        (weight, Empty)
955                    }
956                    ([Base(_), Base(_)], Empty) => {
957                        let w1 = lens.set(w1, left - 1);
958                        let w2 = lens.set(w2, right - 1);
959
960                        ((w1, w2), Empty)
961                    }
962                    (xs, m) => {
963                        panic!(
964                            "Got {} jobs when updating level {} and when one of the merge \
965                             nodes at level {} is {:?}",
966                            xs.len(),
967                            update_level,
968                            current_level,
969                            m
970                        );
971                    }
972                };
973
974                Ok((
975                    merge::Merge {
976                        weight: new_weight,
977                        job: new_m,
978                    },
979                    None::<MergeJob>,
980                ))
981            } else if current_level == update_level {
982                // Mark completed jobs as Done
983
984                match (jobs, m) {
985                    (
986                        [Merge(a)],
987                        Full(
988                            x @ Record {
989                                state: JobStatus::Todo,
990                                ..
991                            },
992                        ),
993                    ) => {
994                        let mut x = x.clone();
995                        x.state = JobStatus::Done;
996                        let new_job = Full(x);
997
998                        let (scan_result, weight) = if current_level == 0 {
999                            let w1 = lens.set(w1, 0);
1000                            let w2 = lens.set(w2, 0);
1001
1002                            (Some(a.clone()), (w1, w2))
1003                        } else {
1004                            (None, weight.clone())
1005                        };
1006
1007                        Ok((
1008                            merge::Merge {
1009                                weight,
1010                                job: new_job,
1011                            },
1012                            scan_result,
1013                        ))
1014                    }
1015                    ([], m) => Ok((
1016                        merge::Merge {
1017                            weight: weight.clone(),
1018                            job: m.clone(),
1019                        },
1020                        None,
1021                    )),
1022                    // ([], m) => Ok(((weight, m), None)),
1023                    (xs, m) => {
1024                        panic!(
1025                            "Got {} jobs when updating level {} and when one of the merge \
1026                             nodes at level {} is {:?}",
1027                            xs.len(),
1028                            update_level,
1029                            current_level,
1030                            m
1031                        );
1032                    }
1033                }
1034            } else if update_level > 0 && (current_level < update_level - 1) {
1035                // Update the job count for all the level above
1036                match jobs {
1037                    [] => Ok((
1038                        merge::Merge {
1039                            weight: weight.clone(),
1040                            job: m.clone(),
1041                        },
1042                        None,
1043                    )),
1044                    _ => {
1045                        let jobs_length = jobs.len() as u64;
1046                        let jobs_sent_left = jobs_length.min(left);
1047                        let jobs_sent_right = (jobs_length - jobs_sent_left).min(right);
1048
1049                        let w1 = lens.set(w1, left - jobs_sent_left);
1050                        let w2 = lens.set(w2, right - jobs_sent_right);
1051                        let weight = (w1, w2);
1052
1053                        Ok((
1054                            merge::Merge {
1055                                weight,
1056                                job: m.clone(),
1057                            },
1058                            None,
1059                        ))
1060                    }
1061                }
1062            } else {
1063                Ok((
1064                    merge::Merge {
1065                        weight: weight.clone(),
1066                        job: m.clone(),
1067                    },
1068                    None,
1069                ))
1070            }
1071        };
1072
1073        let add_bases = |jobs: &[Job<BaseJob, MergeJob>], base: base::Base<BaseJob>| {
1074            use base::Job::{Empty, Full};
1075            use Job::{Base, Merge};
1076
1077            let w = base.weight;
1078            let d = base.job;
1079
1080            let weight = lens.get(&w);
1081
1082            // println!("add_bases jobs={:?} w={}", jobs.len(), weight);
1083            match (jobs, d) {
1084                ([], d) => Ok(base::Base { weight: w, job: d }),
1085                ([Base(d)], Empty) => {
1086                    let w = lens.set(&w, weight - 1);
1087
1088                    Ok(base::Base {
1089                        weight: w,
1090                        job: Full(base::Record {
1091                            job: d.clone(),
1092                            seq_no: sequence_no.clone(),
1093                            state: JobStatus::Todo,
1094                        }),
1095                    })
1096                }
1097                ([Merge(_)], Full(mut b)) => {
1098                    b.state = JobStatus::Done;
1099
1100                    Ok(base::Base {
1101                        weight: w,
1102                        job: Full(b),
1103                    })
1104                }
1105                (xs, d) => {
1106                    panic!(
1107                        "Got {} jobs when updating level {} and when one of the base nodes \
1108                         is {:?}",
1109                        xs.len(),
1110                        update_level,
1111                        d
1112                    );
1113                }
1114            }
1115        };
1116
1117        self.update_split(
1118            &add_merges,
1119            &add_bases,
1120            &|merge| merge.weight.clone(),
1121            completed_jobs,
1122            update_level,
1123            &|(w1, w2), a: &[Job<BaseJob, MergeJob>]| {
1124                let l = *lens.get(&w1) as usize;
1125                let r = *lens.get(&w2) as usize;
1126
1127                // println!("split l={} r={} len={}", l, r, a.len());
1128
1129                (take(a, l), take_at(a, l, r))
1130            },
1131        )
1132    }
1133
1134    fn reset_weights(&self, reset_kind: ResetKind) -> Self {
1135        let fun_base = |base: &base::Base<BaseJob>| {
1136            let set_one = |lens: WeightLens, weight: &Weight| lens.set(weight, 1);
1137            let set_zero = |lens: WeightLens, weight: &Weight| lens.set(weight, 0);
1138
1139            use base::{
1140                Job::{Empty, Full},
1141                Record,
1142            };
1143            use JobStatus::Todo;
1144
1145            let update_merge_weight = |weight: &Weight| {
1146                // When updating the merge-weight of base nodes, only the nodes with
1147                // "Todo" status needs to be included
1148                match &base.job {
1149                    Full(Record { state: Todo, .. }) => set_one(WeightLens::Merge, weight),
1150                    _ => set_zero(WeightLens::Merge, weight),
1151                }
1152            };
1153
1154            let update_base_weight = |weight: &Weight| {
1155                // When updating the base-weight of base nodes, only the Empty nodes
1156                // need to be included
1157                match &base.job {
1158                    Empty => set_one(WeightLens::Base, weight),
1159                    Full(_) => set_zero(WeightLens::Base, weight),
1160                }
1161            };
1162
1163            let weight = &base.weight;
1164            let (new_weight, dummy_right_for_base_nodes) = match reset_kind {
1165                ResetKind::Merge => (
1166                    update_merge_weight(weight),
1167                    set_zero(WeightLens::Merge, weight),
1168                ),
1169                ResetKind::Base => (
1170                    update_base_weight(weight),
1171                    set_zero(WeightLens::Base, weight),
1172                ),
1173                ResetKind::Both => {
1174                    let w = update_base_weight(weight);
1175                    (update_merge_weight(&w), Weight::zero())
1176                }
1177            };
1178
1179            let base = base::Base {
1180                weight: new_weight.clone(),
1181                job: base.job.clone(),
1182            };
1183
1184            (base, (new_weight, dummy_right_for_base_nodes))
1185        };
1186
1187        let fun_merge = |lst: ((Weight, Weight), (Weight, Weight)),
1188                         merge: &merge::Merge<MergeJob>| {
1189            let ((w1, w2), (w3, w4)) = &lst;
1190
1191            let reset = |lens: WeightLens, w: &Weight, ww: &Weight| {
1192                // Weights of all other jobs is sum of weights of its children
1193                (
1194                    lens.set(w, lens.get(w1) + lens.get(w2)),
1195                    lens.set(ww, lens.get(w3) + lens.get(w4)),
1196                )
1197            };
1198
1199            use merge::{Job::Full, Record};
1200            use JobStatus::Todo;
1201
1202            let ww = match reset_kind {
1203                ResetKind::Merge => {
1204                    // When updating the merge-weight of merge nodes, only the nodes
1205                    // with "Todo" status needs to be included
1206                    let lens = WeightLens::Merge;
1207                    match (&merge.weight, &merge.job) {
1208                        ((w1, w2), Full(Record { state: Todo, .. })) => {
1209                            (lens.set(w1, 1), lens.set(w2, 0))
1210                        }
1211                        ((w1, w2), _) => reset(lens, w1, w2),
1212                    }
1213                }
1214                ResetKind::Base => {
1215                    // The base-weight of merge nodes is the sum of weights of its
1216                    // children
1217                    let w = &merge.weight;
1218                    reset(WeightLens::Base, &w.0, &w.1)
1219                }
1220                ResetKind::Both => {
1221                    let w = &merge.weight;
1222                    let w = reset(WeightLens::Base, &w.0, &w.1);
1223                    reset(WeightLens::Merge, &w.0, &w.1)
1224                }
1225            };
1226
1227            let merge = merge::Merge {
1228                weight: ww.clone(),
1229                job: merge.job.clone(),
1230            };
1231
1232            (merge, ww)
1233        };
1234
1235        let (result, _) = self.update_accumulate(&fun_merge, &fun_base);
1236        result
1237    }
1238
1239    fn jobs_on_level(&self, depth: u64, level: u64) -> Vec<AvailableJob<BaseJob, MergeJob>> {
1240        use JobStatus::Todo;
1241
1242        // self.iter()
1243        //     .filter(|(d, _)| *d == depth)
1244        //     .filter_map(|(_, value)| match value {
1245        //         Value::Leaf(base::Base {
1246        //             job:
1247        //                 base::Job::Full(base::Record {
1248        //                     job, state: Todo, ..
1249        //                 }),
1250        //             ..
1251        //         }) => Some(AvailableJob::Base(job.clone())),
1252        //         Value::Node(merge::Merge {
1253        //             job:
1254        //                 merge::Job::Full(merge::Record {
1255        //                     left,
1256        //                     right,
1257        //                     state: Todo,
1258        //                     ..
1259        //                 }),
1260        //             ..
1261        //         }) => Some(AvailableJob::Merge {
1262        //             left: left.clone(),
1263        //             right: right.clone(),
1264        //         }),
1265        //         _ => None,
1266        //     })
1267        //     .collect::<Vec<_>>();
1268
1269        self.fold_depth(
1270            |i, mut acc, a| {
1271                use merge::{Job::Full, Record};
1272
1273                if let (
1274                    true,
1275                    Full(Record {
1276                        left,
1277                        right,
1278                        state: Todo,
1279                        ..
1280                    }),
1281                ) = (i == level, &a.job)
1282                {
1283                    acc.push(AvailableJob::Merge {
1284                        left: left.clone(),
1285                        right: right.clone(),
1286                    });
1287                };
1288                acc
1289            },
1290            |mut acc, d| {
1291                use base::{Job::Full, Record};
1292
1293                if let (
1294                    true,
1295                    Full(Record {
1296                        job, state: Todo, ..
1297                    }),
1298                ) = (level == depth, &d.job)
1299                {
1300                    acc.push(AvailableJob::Base(job.clone()));
1301                }
1302                acc
1303            },
1304            Vec::with_capacity(256),
1305        )
1306    }
1307
1308    fn to_hashable_jobs(&self) -> Vec<Job<base::Base<BaseJob>, merge::Merge<MergeJob>>> {
1309        use JobStatus::Done;
1310
1311        self.fold(
1312            |mut acc, a| {
1313                match &a.job {
1314                    merge::Job::Full(merge::Record { state: Done, .. }) => {}
1315                    _ => {
1316                        acc.push(Job::Merge(a.clone()));
1317                    }
1318                }
1319                acc
1320            },
1321            |mut acc, d| {
1322                match &d.job {
1323                    base::Job::Full(base::Record { state: Done, .. }) => {}
1324                    _ => {
1325                        acc.push(Job::Base(d.clone()));
1326                    }
1327                }
1328                acc
1329            },
1330            Vec::with_capacity(256),
1331        )
1332    }
1333
1334    fn jobs_records(&self) -> Vec<Job<base::Record<BaseJob>, merge::Record<MergeJob>>> {
1335        self.fold(
1336            |mut acc, a: &merge::Merge<MergeJob>| {
1337                if let merge::Job::Full(x) = &a.job {
1338                    acc.push(Job::Merge(x.clone()));
1339                }
1340                acc
1341            },
1342            |mut acc, d: &base::Base<BaseJob>| {
1343                if let base::Job::Full(j) = &d.job {
1344                    acc.push(Job::Base(j.clone()));
1345                }
1346                acc
1347            },
1348            Vec::with_capacity(256),
1349        )
1350    }
1351
1352    fn base_jobs(&self) -> Vec<BaseJob> {
1353        self.fold_depth(
1354            |_, acc, _| acc,
1355            |mut acc, d| {
1356                if let base::Job::Full(base::Record { job, .. }) = &d.job {
1357                    acc.push(job.clone());
1358                };
1359                acc
1360            },
1361            Vec::with_capacity(256),
1362        )
1363    }
1364
1365    /// calculates the number of base and merge jobs that is currently with the Todo status
1366    fn todo_job_count(&self) -> (u64, u64) {
1367        use JobStatus::Todo;
1368
1369        self.fold_depth(
1370            |_, (b, m), j| match &j.job {
1371                merge::Job::Full(merge::Record { state: Todo, .. }) => (b, m + 1),
1372                _ => (b, m),
1373            },
1374            |(b, m), d| match &d.job {
1375                base::Job::Full(base::Record { state: Todo, .. }) => (b + 1, m),
1376                _ => (b, m),
1377            },
1378            (0, 0),
1379        )
1380    }
1381
1382    fn leaves(&self) -> Vec<base::Base<BaseJob>> {
1383        self.fold_depth(
1384            |_, acc, _| acc,
1385            |mut acc, d| {
1386                if let base::Job::Full(_) = &d.job {
1387                    acc.push(d.clone());
1388                };
1389                acc
1390            },
1391            Vec::with_capacity(256),
1392        )
1393    }
1394
1395    fn required_job_count(&self) -> u64 {
1396        match &self.values[0] {
1397            Value::Node(value) => {
1398                let (w1, w2) = &value.weight;
1399                w1.merge + w2.merge
1400            }
1401            Value::Leaf(base) => base.weight.merge,
1402        }
1403    }
1404
1405    fn available_space(&self) -> u64 {
1406        match &self.values[0] {
1407            Value::Node(value) => {
1408                let (w1, w2) = &value.weight;
1409                w1.base + w2.base
1410            }
1411            Value::Leaf(base) => base.weight.base,
1412        }
1413    }
1414
1415    fn create_tree_for_level(
1416        level: i64,
1417        depth: u64,
1418        merge_job: merge::Job<MergeJob>,
1419        base_job: base::Job<BaseJob>,
1420    ) -> Self {
1421        let base_weight = if level == -1 {
1422            Weight::zero()
1423        } else {
1424            Weight { base: 1, merge: 0 }
1425        };
1426
1427        let make_base = || base::Base {
1428            weight: base_weight.clone(),
1429            job: base_job.clone(),
1430        };
1431
1432        let make_merge = |d: u64| {
1433            let weight = if level == -1 {
1434                (Weight::zero(), Weight::zero())
1435            } else {
1436                let x = 2u64.pow(level as u32) / 2u64.pow(d as u32 + 1);
1437                (Weight { base: x, merge: 0 }, Weight { base: x, merge: 0 })
1438            };
1439            merge::Merge {
1440                weight,
1441                job: merge_job.clone(),
1442            }
1443        };
1444
1445        let nnodes = 2u64.pow((depth + 1) as u32) - 1;
1446
1447        let values: Vec<_> = (0..nnodes)
1448            .map(|index| {
1449                let node_depth = btree::depth_at(index as usize);
1450
1451                if node_depth == depth {
1452                    Value::Leaf(make_base())
1453                } else {
1454                    Value::Node(make_merge(node_depth))
1455                }
1456            })
1457            .collect();
1458
1459        // println!("first={:?}", values[0]);
1460
1461        // println!("nnodes={:?} len={:?} nbases={:?}", nnodes, values.len(), values.iter().filter(|v| {
1462        //   matches!(v, Value::Leaf(_))
1463        // }).count());
1464
1465        Self { values }
1466    }
1467
1468    fn create_tree(depth: u64) -> Self {
1469        let level: i64 = depth.try_into().unwrap();
1470        Self::create_tree_for_level(level, depth, merge::Job::Empty, base::Job::Empty)
1471    }
1472}
1473
1474impl<BaseJob, MergeJob> ParallelScan<BaseJob, MergeJob>
1475where
1476    BaseJob: Debug + Clone + 'static,
1477    MergeJob: Debug + Clone + 'static,
1478{
1479    fn with_leaner_trees(&self) -> Self {
1480        use JobStatus::Done;
1481
1482        let trees = self
1483            .trees
1484            .iter()
1485            .map(|tree| {
1486                tree.map(
1487                    |merge_node| match &merge_node.job {
1488                        merge::Job::Full(merge::Record { state: Done, .. }) => merge::Merge {
1489                            weight: merge_node.weight.clone(),
1490                            job: merge::Job::Empty,
1491                        },
1492                        _ => merge_node.clone(),
1493                    },
1494                    |b| b.clone(),
1495                )
1496            })
1497            .collect();
1498
1499        Self {
1500            trees,
1501            acc: self.acc.clone(),
1502            curr_job_seq_no: self.curr_job_seq_no.clone(),
1503            max_base_jobs: self.max_base_jobs,
1504            delay: self.delay,
1505        }
1506    }
1507
1508    pub fn empty(max_base_jobs: u64, delay: u64) -> Self {
1509        let depth = ceil_log2(max_base_jobs);
1510        // println!("empty depth={:?}", depth);
1511
1512        let first_tree = Tree::create_tree(depth);
1513
1514        let mut trees = Vec::with_capacity(32);
1515        trees.push(first_tree);
1516
1517        Self {
1518            trees,
1519            acc: None,
1520            curr_job_seq_no: SequenceNumber(0),
1521            max_base_jobs,
1522            delay,
1523        }
1524    }
1525
1526    fn map<F1, F2>(&self, f1: F1, f2: F2) -> Self
1527    where
1528        F1: Fn(&MergeJob) -> MergeJob,
1529        F2: Fn(&BaseJob) -> BaseJob,
1530    {
1531        let trees = self
1532            .trees
1533            .iter()
1534            .map(|tree| tree.map_depth(&|_, m| m.map(&f1), &|a| a.map(&f2)))
1535            .collect();
1536
1537        let acc = self
1538            .acc
1539            .as_ref()
1540            .map(|(m, bs)| (f1(m), bs.iter().map(&f2).collect()));
1541
1542        Self {
1543            trees,
1544            acc,
1545            curr_job_seq_no: self.curr_job_seq_no.clone(),
1546            max_base_jobs: self.max_base_jobs,
1547            delay: self.delay,
1548        }
1549    }
1550
1551    pub fn hash<FunMerge, FunBase>(
1552        &self,
1553        fun_merge: FunMerge,
1554        fun_base: FunBase,
1555    ) -> GenericArray<u8, U32>
1556    where
1557        FunMerge: Fn(&mut Vec<u8>, &MergeJob),
1558        FunBase: Fn(&mut Vec<u8>, &BaseJob),
1559    {
1560        const BUFFER_CAPACITY: usize = 128 * 1024;
1561
1562        let Self {
1563            trees,
1564            acc,
1565            curr_job_seq_no,
1566            max_base_jobs,
1567            delay,
1568        } = self.with_leaner_trees();
1569
1570        let mut sha: Sha256 = Sha256::new();
1571        let mut buffer = Vec::with_capacity(BUFFER_CAPACITY);
1572        let buffer = &mut buffer;
1573
1574        let add_weight =
1575            |buffer: &mut Vec<u8>, w: &Weight| write!(buffer, "{}{}", w.base, w.merge).unwrap();
1576
1577        let add_two_weights = |buffer: &mut Vec<u8>, (w1, w2): &(Weight, Weight)| {
1578            add_weight(buffer, w1);
1579            add_weight(buffer, w2);
1580        };
1581
1582        for job in trees.iter().flat_map(Tree::to_hashable_jobs) {
1583            buffer.clear();
1584
1585            match &job {
1586                Job::Base(base) => match &base.job {
1587                    base::Job::Empty => {
1588                        add_weight(buffer, &base.weight);
1589                        write!(buffer, "Empty").unwrap();
1590                    }
1591                    base::Job::Full(base::Record { job, seq_no, state }) => {
1592                        add_weight(buffer, &base.weight);
1593                        write!(buffer, "Full{}{}", seq_no.0, state.as_str()).unwrap();
1594
1595                        fun_base(buffer, job);
1596                    }
1597                },
1598                Job::Merge(merge) => match &merge.job {
1599                    merge::Job::Empty => {
1600                        add_two_weights(buffer, &merge.weight);
1601                        write!(buffer, "Empty").unwrap();
1602                    }
1603                    merge::Job::Part(job) => {
1604                        add_two_weights(buffer, &merge.weight);
1605                        write!(buffer, "Part").unwrap();
1606
1607                        fun_merge(buffer, job);
1608                    }
1609                    merge::Job::Full(merge::Record {
1610                        left,
1611                        right,
1612                        seq_no,
1613                        state,
1614                    }) => {
1615                        add_two_weights(buffer, &merge.weight);
1616                        write!(buffer, "Full{}{}", seq_no.0, state.as_str()).unwrap();
1617
1618                        fun_merge(buffer, left);
1619                        fun_merge(buffer, right);
1620                    }
1621                },
1622            }
1623
1624            sha.update(buffer.as_slice());
1625
1626            // TODO: Remove this assert once we know it's a good capacity
1627            //       (buffer is not resized for serialization)
1628            assert_eq!(buffer.capacity(), BUFFER_CAPACITY);
1629        }
1630
1631        match &acc {
1632            Some((a, d_lst)) => {
1633                buffer.clear();
1634
1635                fun_merge(buffer, a);
1636                for j in d_lst {
1637                    fun_base(buffer, j);
1638                }
1639
1640                sha.update(&buffer);
1641            }
1642            None => {
1643                sha.update("None");
1644            }
1645        };
1646
1647        buffer.clear();
1648        write!(buffer, "{}{}{}", curr_job_seq_no.0, max_base_jobs, delay).unwrap();
1649        sha.update(&buffer);
1650
1651        sha.finalize()
1652    }
1653
1654    pub fn fold_chronological_until<Accum, Final, FunMerge, FunBase, FunFinish>(
1655        &self,
1656        init: Accum,
1657        fun_merge: FunMerge,
1658        fun_base: FunBase,
1659        fun_finish: FunFinish,
1660    ) -> Final
1661    where
1662        FunMerge: Fn(Accum, &merge::Merge<MergeJob>) -> ControlFlow<Final, Accum>,
1663        FunBase: Fn(Accum, &base::Base<BaseJob>) -> ControlFlow<Final, Accum>,
1664        FunFinish: Fn(Accum) -> Final,
1665    {
1666        let mut accum = init;
1667
1668        for tree in self.trees.iter().rev() {
1669            match tree.fold_depth_until_prime(&|_, acc, m| fun_merge(acc, m), &fun_base, accum) {
1670                Continue(acc) => accum = acc,
1671                Break(v) => return v,
1672            }
1673        }
1674
1675        fun_finish(accum)
1676    }
1677
1678    pub fn fold_chronological_until_err<Accum, FunMerge, FunBase, FunFinish>(
1679        &self,
1680        init: Accum,
1681        fun_merge: FunMerge,
1682        fun_base: FunBase,
1683        fun_finish: FunFinish,
1684    ) -> Result<Accum, String>
1685    where
1686        FunMerge: Fn(Accum, &merge::Merge<MergeJob>) -> Result<Accum, String>,
1687        FunBase: Fn(Accum, &base::Base<BaseJob>) -> Result<Accum, String>,
1688        FunFinish: Fn(Accum) -> Accum,
1689    {
1690        let mut accum = init;
1691
1692        for tree in self.trees.iter().rev() {
1693            match tree.fold_depth_until_prime_err(&|_, acc, m| fun_merge(acc, m), &fun_base, accum)
1694            {
1695                Ok(acc) => accum = acc,
1696                Err(e) => return Err(e),
1697            }
1698        }
1699
1700        Ok(fun_finish(accum))
1701    }
1702
1703    fn fold_chronological<Accum, FunMerge, FunBase>(
1704        &self,
1705        init: Accum,
1706        fun_merge: FunMerge,
1707        fun_base: FunBase,
1708    ) -> Accum
1709    where
1710        FunMerge: Fn(Accum, &merge::Merge<MergeJob>) -> Accum,
1711        FunBase: Fn(Accum, &base::Base<BaseJob>) -> Accum,
1712    {
1713        self.fold_chronological_until(
1714            init,
1715            |acc, a| Continue(fun_merge(acc, a)),
1716            |acc, d| Continue(fun_base(acc, d)),
1717            |v| v,
1718        )
1719    }
1720
1721    fn max_trees(&self) -> u64 {
1722        ((ceil_log2(self.max_base_jobs) + 1) * (self.delay + 1)) + 1
1723    }
1724
1725    fn work_for_tree(&self, data_tree: WorkForTree) -> Vec<AvailableJob<BaseJob, MergeJob>> {
1726        let delay = self.delay + 1;
1727
1728        // TODO: Not sure if skip(1) is correct below
1729        let trees = match data_tree {
1730            WorkForTree::Current => &self.trees[1..],
1731            WorkForTree::Next => &self.trees,
1732        };
1733
1734        // println!("WORK_FOR_TREE len={} delay={}", trees.len(), delay);
1735
1736        work(trees, delay, self.max_base_jobs)
1737    }
1738
1739    /// work on all the level and all the trees
1740    fn all_work(&self) -> Vec<Vec<AvailableJob<BaseJob, MergeJob>>> {
1741        let depth = ceil_log2(self.max_base_jobs);
1742        // TODO: Not sure if it's correct
1743        let set1 = self.work_for_tree(WorkForTree::Current);
1744        // let setaaa = self.work_for_tree(WorkForTree::Next);
1745
1746        // println!(
1747        //     "ntrees={} delay={} set1={}",
1748        //     self.trees.len(),
1749        //     self.delay,
1750        //     set1.len()
1751        // );
1752        // println!("ntrees={} set1={} setaa={}", self.trees.len(), set1.len(), setaaa.len());
1753
1754        let mut this = self.clone();
1755        this.trees.reserve(self.delay as usize + 1);
1756
1757        // println!("set1={:?}", set1.len());
1758
1759        let mut other_set = Vec::with_capacity(256);
1760        other_set.push(set1);
1761
1762        for _ in 0..self.delay + 1 {
1763            // println!("trees={}", this.trees.len());
1764            this.trees.insert(0, Tree::create_tree(depth));
1765            let work = this.work_for_tree(WorkForTree::Current);
1766
1767            // println!("work={}", work.len());
1768
1769            if !work.is_empty() {
1770                other_set.push(work);
1771            }
1772        }
1773
1774        other_set
1775    }
1776
1777    // let all_work :
1778    //     type merge base. (merge, base) t -> (merge, base) Available_job.t list list
1779    //     =
1780    //  fun t ->
1781    //   let depth = Int.ceil_log2 t.max_base_jobs in
1782    //   Printf.eprintf "ntrees=%d\n%!" (Non_empty_list.length t.trees);
1783    //   let set1 = work_for_tree t ~data_tree:`Current in
1784    //   let _, other_sets =
1785    //     List.fold ~init:(t, [])
1786    //       (List.init ~f:Fn.id (t.delay + 1))
1787    //       ~f:(fun (t, work_list) _ ->
1788    //         Printf.eprintf "trees=%d\n%!" (Non_empty_list.length t.trees);
1789    //         let trees' = Non_empty_list.cons (create_tree ~depth) t.trees in
1790    //         let t' = { t with trees = trees' } in
1791    //         let work = work_for_tree t' ~data_tree:`Current in
1792    //         Printf.eprintf "work=%d\n%!" (List.length work);
1793    //         match work_for_tree t' ~data_tree:`Current with
1794    //         | [] ->
1795    //             (t', work_list)
1796    //         | work ->
1797    //             (t', work :: work_list) )
1798    //   in
1799    //   Printf.eprintf "set1=%d\n%!" (List.length set1);
1800    //   if List.is_empty set1 then List.rev other_sets
1801    //   else set1 :: List.rev other_sets
1802
1803    fn work_for_next_update(&self, data_count: u64) -> Vec<Vec<AvailableJob<BaseJob, MergeJob>>> {
1804        let delay = self.delay + 1;
1805        let current_tree_space = self.trees[0].available_space() as usize;
1806
1807        let mut set1 = work(&self.trees[1..], delay, self.max_base_jobs);
1808        let count = data_count.min(self.max_base_jobs) as usize;
1809
1810        if current_tree_space < count {
1811            let mut set2 = work(&self.trees, delay, self.max_base_jobs);
1812            set2.truncate((count - current_tree_space) * 2);
1813
1814            [set1, set2].into_iter().filter(|v| !v.is_empty()).collect()
1815        } else {
1816            set1.truncate(2 * count);
1817
1818            if set1.is_empty() {
1819                vec![]
1820            } else {
1821                vec![set1]
1822            }
1823        }
1824    }
1825
1826    fn free_space_on_current_tree(&self) -> u64 {
1827        self.trees[0].available_space()
1828    }
1829
1830    fn add_merge_jobs(
1831        &mut self,
1832        completed_jobs: &[MergeJob],
1833    ) -> Result<Option<(MergeJob, Vec<BaseJob>)>, ()> {
1834        fn take<T>(slice: &[T], n: usize) -> &[T] {
1835            slice.get(..n).unwrap_or(slice)
1836        }
1837
1838        fn drop<T>(slice: &[T], n: usize) -> &[T] {
1839            slice.get(n..).unwrap_or(&[])
1840        }
1841
1842        if completed_jobs.is_empty() {
1843            return Ok(None);
1844        }
1845
1846        let delay = self.delay + 1;
1847        let udelay = delay as usize;
1848        let depth = ceil_log2(self.max_base_jobs);
1849
1850        let completed_jobs_len = completed_jobs.len();
1851        let merge_jobs: Vec<_> = completed_jobs.iter().cloned().map(Job::Merge).collect();
1852        let jobs_required = self.work_for_tree(WorkForTree::Current);
1853
1854        assert!(
1855            merge_jobs.len() <= jobs_required.len(),
1856            "More work than required"
1857        );
1858
1859        let curr_tree = &self.trees[0];
1860        let to_be_updated_trees = &self.trees[1..];
1861
1862        // (index, (level, njobs))
1863        let mut stats = BTreeMap::<u64, (u64, usize)>::new();
1864
1865        let (mut updated_trees, result_opt) = {
1866            let mut jobs = merge_jobs.as_slice();
1867
1868            let mut updated_trees = Vec::with_capacity(to_be_updated_trees.len());
1869            let mut scan_result = None;
1870
1871            for (i, tree) in to_be_updated_trees.iter().enumerate() {
1872                // Every nth (n=delay) tree
1873                if (i % udelay == udelay - 1) && !jobs.is_empty() {
1874                    let nrequired = tree.required_job_count() as usize;
1875                    let completed_jobs = take(jobs, nrequired);
1876                    let i = i as u64;
1877
1878                    let level = depth - (i / delay);
1879                    let old = stats.insert(i, (level, completed_jobs.len()));
1880                    assert!(old.is_none());
1881
1882                    let (tree, result) = tree.update(
1883                        completed_jobs,
1884                        depth - (i / delay),
1885                        self.curr_job_seq_no.clone(),
1886                        WeightLens::Merge,
1887                    )?;
1888
1889                    updated_trees.push(tree);
1890                    scan_result = result;
1891                    jobs = drop(jobs, nrequired);
1892                } else {
1893                    updated_trees.push(tree.clone());
1894                }
1895            }
1896
1897            (updated_trees, scan_result)
1898        };
1899
1900        for (index, (level, njobs)) in stats.iter().rev() {
1901            let index = self.trees.len() - *index as usize - 2;
1902
1903            if result_opt.is_some() && index == 0 {
1904                println!(
1905                    "- tree[{:>02}] level={} {:>3} completed jobs, a proof is emitted, tree is removed",
1906                    index,
1907                    level,
1908                    njobs
1909                );
1910            } else {
1911                println!(
1912                    "- tree[{:>02}] level={} {:>3} completed jobs (DONE)",
1913                    index, level, njobs
1914                );
1915                println!(
1916                    "           level={} {:>3} new merge jobs (TODO)",
1917                    level - 1,
1918                    njobs / 2,
1919                );
1920            }
1921        }
1922
1923        let (mut updated_trees, result_opt) = {
1924            let (updated_trees, result_opt) = match result_opt {
1925                Some(scan_result) if !updated_trees.is_empty() => {
1926                    let last = updated_trees.pop().unwrap();
1927                    let tree_data = last.base_jobs();
1928                    (updated_trees, Some((scan_result, tree_data)))
1929                }
1930                _ => (updated_trees, None),
1931            };
1932
1933            // TODO: Not sure if priority is same as OCaml here
1934            if result_opt.is_some()
1935                || (updated_trees.len() + 1) < self.max_trees() as usize
1936                    && (completed_jobs_len == jobs_required.len())
1937            {
1938                let updated_trees = updated_trees
1939                    .into_iter()
1940                    .map(|tree| tree.reset_weights(ResetKind::Merge))
1941                    .collect();
1942                (updated_trees, result_opt)
1943            } else {
1944                (updated_trees, result_opt)
1945            }
1946        };
1947
1948        updated_trees.insert(0, curr_tree.clone());
1949
1950        self.trees = updated_trees;
1951
1952        Ok(result_opt)
1953    }
1954
1955    fn add_data(
1956        &mut self,
1957        data: Vec<BaseJob>,
1958        base_kind: impl Fn(&BaseJob) -> usize,
1959    ) -> Result<(), ()> {
1960        if data.is_empty() {
1961            return Ok(());
1962        }
1963
1964        let data_len = data.len();
1965        let depth = ceil_log2(self.max_base_jobs);
1966        let tree = &self.trees[0];
1967
1968        let base_jobs: Vec<_> = data.into_iter().map(Job::Base).collect();
1969        let available_space = tree.available_space() as usize;
1970
1971        assert!(
1972            data_len <= available_space,
1973            "Data count ({}) exceeded available space ({})",
1974            data_len,
1975            available_space
1976        );
1977
1978        // println!(
1979        //     "base_jobs={:?} available_space={:?} depth={:?}",
1980        //     base_jobs.len(),
1981        //     available_space,
1982        //     depth
1983        // );
1984
1985        let (tree, _) = tree
1986            .update(
1987                &base_jobs,
1988                depth,
1989                self.curr_job_seq_no.clone(),
1990                WeightLens::Base,
1991            )
1992            .expect("Error while adding a base job to the tree");
1993
1994        let bases = tree.base_jobs();
1995        let nbase = bases.len();
1996
1997        let updated_trees = if data_len == available_space {
1998            let new_tree = Tree::create_tree(depth);
1999            let tree = tree.reset_weights(ResetKind::Both);
2000            vec![new_tree, tree]
2001        } else {
2002            let tree = tree.reset_weights(ResetKind::Merge);
2003            vec![tree]
2004        };
2005
2006        println!(
2007            "- tree[{:>02}] level=7 {:>3} new base jobs (TODO), total_nbase_jobs={:?}",
2008            self.trees.len() - 1,
2009            data_len,
2010            nbase,
2011        );
2012
2013        let max_base_jobs = self.max_base_jobs as usize;
2014
2015        let mut folded = bases.iter().fold(
2016            Vec::<(usize, usize)>::with_capacity(max_base_jobs),
2017            |mut accum, b| {
2018                let kind = base_kind(b);
2019                match accum.last_mut() {
2020                    Some(last) if last.0 == kind => last.1 += 1,
2021                    _ => accum.push((kind, 1)),
2022                }
2023                accum
2024            },
2025        );
2026
2027        let total_folded: usize = folded.iter().map(|(_, n)| *n).sum();
2028
2029        if total_folded != max_base_jobs {
2030            folded.push((10, max_base_jobs - total_folded));
2031        }
2032
2033        let to_s = |n: usize, s: &str| {
2034            if n == 1 {
2035                s.to_string()
2036            } else {
2037                format!("{n} {s}")
2038            }
2039        };
2040
2041        let s = folded
2042            .iter()
2043            .map(|(kind, n)| match kind {
2044                0 => to_s(*n, "CMD"),
2045                1 => to_s(*n, "FT"),
2046                2 => to_s(*n, "CB"),
2047                10 => to_s(*n, "EMPTY"),
2048                _ => panic!(),
2049            })
2050            .join("|");
2051
2052        println!("           level=7 has the following jobs (in this order):");
2053        println!("           {s}");
2054
2055        // println!(
2056        //     "updated_trees={} self_trees={:?}",
2057        //     updated_trees.len(),
2058        //     self.trees.len()
2059        // );
2060
2061        // println!("WORK1={:?}", work(updated_trees.as_slice(), self.delay + 1, self.max_base_jobs));
2062        // println!("WORK2={:?}", work(self.trees.as_slice(), self.delay + 1, self.max_base_jobs));
2063
2064        // self.trees.append(&mut updated_trees);
2065
2066        // let tail = &self.trees[1..];
2067        // self.trees = updated_trees.into_iter().zip(tail.iter().cloned().collect()).collect();
2068
2069        let mut old = std::mem::replace(&mut self.trees, updated_trees);
2070        old.remove(0);
2071        self.trees.append(&mut old);
2072
2073        // println!("WORK3={:?}", work(self.trees.as_slice(), self.delay + 1, self.max_base_jobs));
2074
2075        Ok(())
2076    }
2077
2078    fn reset_seq_no(&mut self) {
2079        let last = self.trees.last().unwrap();
2080        let oldest_seq_no = match last.leaves().first() {
2081            Some(base::Base {
2082                job: base::Job::Full(base::Record { seq_no, .. }),
2083                ..
2084            }) => seq_no.clone(),
2085            _ => SequenceNumber::zero(),
2086        };
2087
2088        let new_seq = |seq: &SequenceNumber| (seq - &oldest_seq_no).incr();
2089
2090        let fun_merge = |m: &merge::Merge<MergeJob>| match &m.job {
2091            merge::Job::Full(merge::Record { seq_no, .. }) => m.with_seq_no(new_seq(seq_no)),
2092            _ => m.clone(),
2093        };
2094
2095        let fun_base = |m: &base::Base<BaseJob>| match &m.job {
2096            base::Job::Full(base::Record { seq_no, .. }) => m.with_seq_no(new_seq(seq_no)),
2097            _ => m.clone(),
2098        };
2099
2100        let mut max_seq = SequenceNumber::zero();
2101        let mut updated_trees = Vec::with_capacity(self.trees.len());
2102
2103        for tree in &self.trees {
2104            use base::{Base, Job::Full, Record};
2105
2106            let tree = tree.map(fun_merge, fun_base);
2107            updated_trees.push(tree.clone());
2108
2109            let leaves = tree.leaves();
2110
2111            let last = match leaves.last() {
2112                Some(last) => last,
2113                None => continue,
2114            };
2115
2116            if let Base {
2117                job: Full(Record { seq_no, .. }),
2118                ..
2119            } = last
2120            {
2121                max_seq = max_seq.max(seq_no.clone());
2122            };
2123        }
2124
2125        self.curr_job_seq_no = max_seq;
2126        self.trees = updated_trees;
2127    }
2128
2129    fn incr_sequence_no(&mut self) {
2130        let next_seq_no = self.curr_job_seq_no.incr();
2131
2132        if next_seq_no.is_u64_max() {
2133            self.reset_seq_no();
2134        } else {
2135            self.curr_job_seq_no = next_seq_no;
2136        }
2137    }
2138
2139    fn update_helper(
2140        &mut self,
2141        data: Vec<BaseJob>,
2142        completed_jobs: Vec<MergeJob>,
2143        base_kind: impl Fn(&BaseJob) -> usize,
2144    ) -> Result<Option<(MergeJob, Vec<BaseJob>)>, ()> {
2145        fn split<T>(slice: &[T], n: usize) -> (&[T], &[T]) {
2146            (
2147                slice.get(..n).unwrap_or(slice),
2148                slice.get(n..).unwrap_or(&[]),
2149            )
2150        }
2151
2152        let data_count = data.len() as u64;
2153
2154        assert!(
2155            data_count <= self.max_base_jobs,
2156            "Data count ({}) exceeded maximum ({})",
2157            data_count,
2158            self.max_base_jobs
2159        );
2160
2161        let required_jobs_count = self
2162            .work_for_next_update(data_count)
2163            .into_iter()
2164            .flatten()
2165            .count();
2166
2167        {
2168            let required = (required_jobs_count + 1) / 2;
2169            let got = (completed_jobs.len() + 1) / 2;
2170
2171            // println!("required={:?} got={:?}", required, got);
2172
2173            let max_base_jobs = self.max_base_jobs as usize;
2174            assert!(
2175                !(got < required && data.len() > max_base_jobs - required + got),
2176                "Insufficient jobs (Data count {}): Required- {} got- {}",
2177                data_count,
2178                required,
2179                got
2180            )
2181        }
2182
2183        let delay = self.delay + 1;
2184
2185        // Increment the sequence number
2186        self.incr_sequence_no();
2187
2188        let latest_tree = &self.trees[0];
2189        let available_space = latest_tree.available_space();
2190
2191        // Possible that new base jobs is added to a new tree within an
2192        // update i.e., part of it is added to the first tree and the rest
2193        // of it to a new tree. This happens when the throughput is not max.
2194        // This also requires merge jobs to be done on two different set of trees*)
2195
2196        let (data1, data2) = split(&data, available_space as usize);
2197
2198        // println!(
2199        //     "delay={} available_space={} data1={} data2={}",
2200        //     delay,
2201        //     available_space,
2202        //     data1.len(),
2203        //     data2.len()
2204        // );
2205
2206        let required_jobs_for_current_tree =
2207            work(&self.trees[1..], delay, self.max_base_jobs).len();
2208        let (jobs1, jobs2) = split(&completed_jobs, required_jobs_for_current_tree);
2209
2210        // println!(
2211        //     "required_jobs_for_current_tree={} jobs1={} jobs2={}",
2212        //     required_jobs_for_current_tree,
2213        //     jobs1.len(),
2214        //     jobs2.len()
2215        // );
2216
2217        // TODO: For logs, consider when works are splitted on 2 trees
2218        println!("scan_state update:");
2219
2220        // update first set of jobs and data
2221        let result_opt = self.add_merge_jobs(jobs1)?;
2222        self.add_data(data1.to_vec(), &base_kind)?;
2223
2224        if !jobs2.is_empty() || !data2.is_empty() {
2225            println!("scan_state update: (2nd set of jobs, new transactions didn't fit in latest/current tree)");
2226        }
2227
2228        // update second set of jobs and data.
2229        // This will be empty if all the data fit in the current tree
2230        self.add_merge_jobs(jobs2)?;
2231        self.add_data(data2.to_vec(), base_kind)?;
2232
2233        // update the latest emitted value
2234        if result_opt.is_some() {
2235            self.acc.clone_from(&result_opt);
2236        };
2237
2238        assert!(
2239            self.trees.len() <= self.max_trees() as usize,
2240            "Tree list length ({}) exceeded maximum ({})",
2241            self.trees.len(),
2242            self.max_trees()
2243        );
2244
2245        Ok(result_opt)
2246    }
2247
2248    pub fn update(
2249        &mut self,
2250        data: Vec<BaseJob>,
2251        completed_jobs: Vec<MergeJob>,
2252        base_kind: impl Fn(&BaseJob) -> usize,
2253    ) -> Result<Option<(MergeJob, Vec<BaseJob>)>, ()> {
2254        self.update_helper(data, completed_jobs, base_kind)
2255    }
2256
2257    pub fn all_jobs(&self) -> Vec<Vec<AvailableJob<BaseJob, MergeJob>>> {
2258        self.all_work()
2259    }
2260
2261    pub fn jobs_for_next_update(&self) -> Vec<Vec<AvailableJob<BaseJob, MergeJob>>> {
2262        self.work_for_next_update(self.max_base_jobs)
2263    }
2264
2265    pub fn jobs_for_slots(&self, slots: u64) -> Vec<Vec<AvailableJob<BaseJob, MergeJob>>> {
2266        self.work_for_next_update(slots)
2267    }
2268
2269    pub fn free_space(&self) -> u64 {
2270        self.max_base_jobs
2271    }
2272
2273    pub fn last_emitted_value(&self) -> Option<&(MergeJob, Vec<BaseJob>)> {
2274        self.acc.as_ref()
2275    }
2276
2277    fn current_job_sequence_number(&self) -> SequenceNumber {
2278        self.curr_job_seq_no.clone()
2279    }
2280
2281    pub fn base_jobs_on_latest_tree(&self) -> impl Iterator<Item = BaseJob> {
2282        let depth = ceil_log2(self.max_base_jobs);
2283        let level = depth;
2284
2285        self.trees[0]
2286            .jobs_on_level(depth, level)
2287            .into_iter()
2288            .filter_map(|job| match job {
2289                AvailableJob::Base(base) => Some(base),
2290                AvailableJob::Merge { .. } => None,
2291            })
2292    }
2293
2294    // 0-based indexing, so 0 indicates next-to-latest tree
2295    pub fn base_jobs_on_earlier_tree(&self, index: usize) -> impl Iterator<Item = BaseJob> {
2296        let depth = ceil_log2(self.max_base_jobs);
2297        let level = depth;
2298
2299        let earlier_trees = &self.trees[1..];
2300
2301        let base_job = |job| match job {
2302            AvailableJob::Base(base) => Some(base),
2303            AvailableJob::Merge { .. } => None,
2304        };
2305
2306        match earlier_trees.get(index) {
2307            None => {
2308                // Use `Vec::new().into_iter().filter_map` to returns same concrete type
2309                // than the `Some(_)` branch
2310                Vec::new().into_iter().filter_map(base_job)
2311            }
2312            Some(tree) => tree
2313                .jobs_on_level(depth, level)
2314                .into_iter()
2315                .filter_map(base_job),
2316        }
2317    }
2318
2319    pub fn partition_if_overflowing(&self) -> SpacePartition {
2320        let cur_tree_space = self.free_space_on_current_tree();
2321
2322        // Check actual work count because it would be zero initially
2323
2324        let work_count = self.work_for_tree(WorkForTree::Current).len() as u64;
2325        let work_count_new_tree = self.work_for_tree(WorkForTree::Next).len() as u64;
2326
2327        SpacePartition {
2328            first: (cur_tree_space, work_count),
2329            second: {
2330                if cur_tree_space < self.max_base_jobs {
2331                    let slots = self.max_base_jobs - cur_tree_space;
2332                    Some((slots, work_count_new_tree.min(2 * slots)))
2333                } else {
2334                    None
2335                }
2336            },
2337        }
2338    }
2339
2340    pub fn next_on_new_tree(&self) -> bool {
2341        let curr_tree_space = self.free_space_on_current_tree();
2342        curr_tree_space == self.max_base_jobs
2343    }
2344
2345    pub fn pending_data(&self) -> Vec<Vec<BaseJob>> {
2346        self.trees.iter().rev().map(Tree::base_jobs).collect()
2347    }
2348
2349    // #[cfg(test)]
2350    fn job_count(&self) -> (f64, f64) {
2351        use JobStatus::{Done, Todo};
2352
2353        self.fold_chronological(
2354            (0.0, 0.0),
2355            |(ntodo, ndone), merge| {
2356                use merge::{
2357                    Job::{Empty, Full, Part},
2358                    Record,
2359                };
2360
2361                let (todo, done) = match &merge.job {
2362                    Part(_) => (0.5, 0.0),
2363                    Full(Record { state: Todo, .. }) => (1.0, 0.0),
2364                    Full(Record { state: Done, .. }) => (0.0, 1.0),
2365                    Empty => (0.0, 0.0),
2366                };
2367                (ntodo + todo, ndone + done)
2368            },
2369            |(ntodo, ndone), base| {
2370                use base::{
2371                    Job::{Empty, Full},
2372                    Record,
2373                };
2374
2375                let (todo, done) = match &base.job {
2376                    Empty => (0.0, 0.0),
2377                    Full(Record { state: Todo, .. }) => (1.0, 0.0),
2378                    Full(Record { state: Done, .. }) => (0.0, 1.0),
2379                };
2380
2381                (ntodo + todo, ndone + done)
2382            },
2383        )
2384    }
2385}
2386
2387fn work_to_do<'a, BaseJob, MergeJob, I>(
2388    trees: I,
2389    max_base_jobs: u64,
2390) -> Vec<AvailableJob<BaseJob, MergeJob>>
2391where
2392    I: Iterator<Item = &'a Tree<base::Base<BaseJob>, merge::Merge<MergeJob>>>,
2393    BaseJob: Debug + Clone + 'static,
2394    MergeJob: Debug + Clone + 'static,
2395{
2396    let depth = ceil_log2(max_base_jobs);
2397
2398    // let trees: Vec<_> = trees.collect();
2399
2400    // println!("work_to_do length={}", trees.len());
2401
2402    trees
2403        // .iter()
2404        .enumerate()
2405        .flat_map(|(i, tree)| {
2406            let level = depth - i as u64;
2407            tree.jobs_on_level(depth, level)
2408        })
2409        .collect()
2410}
2411
2412fn work<'a, BaseJob, MergeJob, I>(
2413    trees: I,
2414    delay: u64,
2415    max_base_jobs: u64,
2416) -> Vec<AvailableJob<BaseJob, MergeJob>>
2417where
2418    I: IntoIterator<Item = &'a Tree<base::Base<BaseJob>, merge::Merge<MergeJob>>>,
2419    BaseJob: Debug + Clone + 'static,
2420    MergeJob: Debug + Clone + 'static,
2421{
2422    let depth = ceil_log2(max_base_jobs) as usize;
2423    let delay = delay as usize;
2424
2425    // println!("WORK_DELAY={}", delay);
2426
2427    let work_trees = trees
2428        .into_iter()
2429        .enumerate()
2430        .filter_map(|(i, t)| {
2431            if i % delay == delay - 1 {
2432                Some(t)
2433            } else {
2434                None
2435            }
2436        })
2437        .take(depth + 1);
2438
2439    work_to_do(work_trees, max_base_jobs)
2440}
2441
2442fn take<T>(slice: &[T], n: usize) -> &[T] {
2443    slice.get(..n).unwrap_or(slice)
2444}
2445
2446fn take_at<T>(slice: &[T], skip: usize, n: usize) -> &[T] {
2447    slice.get(skip..).map(|s| take(s, n)).unwrap_or(&[])
2448}
2449
2450pub const fn ceil_log2(n: u64) -> u64 {
2451    // let ceil_log2 i =
2452    //   if i <= 0
2453    //   then raise_s (Sexp.message "[Int.ceil_log2] got invalid input" [ "", sexp_of_int i ]);
2454    //   if i = 1 then 0 else num_bits - clz (i - 1)
2455
2456    assert!(n > 0);
2457    if n == 1 {
2458        0
2459    } else {
2460        u64::BITS as u64 - (n - 1).leading_zeros() as u64
2461    }
2462}
2463
2464fn flatten<T>(v: Vec<Vec<T>>) -> Vec<T> {
2465    v.into_iter().flatten().collect()
2466}
2467
2468// #[cfg(test)]
2469fn assert_job_count<B, M>(
2470    s1: &ParallelScan<B, M>,
2471    s2: &ParallelScan<B, M>,
2472    completed_job_count: f64,
2473    base_job_count: f64,
2474    value_emitted: bool,
2475) where
2476    B: Debug + Clone + 'static,
2477    M: Debug + Clone + 'static,
2478{
2479    // println!("s1={:#?}", s1);
2480    // println!("s2={:?}", s2);
2481
2482    let (todo_before, done_before) = s1.job_count();
2483    let (todo_after, done_after) = s2.job_count();
2484
2485    // println!(
2486    //     "before todo={:?} done={:?}",
2487    //     s1.job_count().0,
2488    //     s1.job_count().1
2489    // );
2490    // println!(
2491    //     "after  todo={:?} done={:?}",
2492    //     s2.job_count().0,
2493    //     s2.job_count().1
2494    // );
2495
2496    // ordered list of jobs that is actually called when distributing work
2497    let all_jobs = flatten(s2.all_jobs());
2498
2499    // list of jobs
2500
2501    let all_jobs_expected_count = s2
2502        .trees
2503        .iter()
2504        .fold(Vec::with_capacity(s2.trees.len()), |mut acc, tree| {
2505            let mut records = tree.jobs_records();
2506            acc.append(&mut records);
2507            acc
2508        })
2509        .into_iter()
2510        .filter(|job| {
2511            matches!(
2512                job,
2513                Job::Base(base::Record {
2514                    state: JobStatus::Todo,
2515                    ..
2516                }) | Job::Merge(merge::Record {
2517                    state: JobStatus::Todo,
2518                    ..
2519                })
2520            )
2521        })
2522        .count();
2523
2524    // println!(
2525    //     "all_jobs={} all_jobs_expected={}",
2526    //     all_jobs.len(),
2527    //     all_jobs_expected_count
2528    // );
2529
2530    assert_eq!(all_jobs.len(), all_jobs_expected_count);
2531
2532    let expected_todo_after = {
2533        let new_jobs = if value_emitted {
2534            (completed_job_count - 1.0) / 2.0
2535        } else {
2536            completed_job_count / 2.0
2537        };
2538        todo_before + base_job_count - completed_job_count + new_jobs
2539    };
2540
2541    let expected_done_after = {
2542        let jobs_from_delete_tree = if value_emitted {
2543            ((2 * s1.max_base_jobs) - 1) as f64
2544        } else {
2545            0.0
2546        };
2547        done_before + completed_job_count - jobs_from_delete_tree
2548    };
2549
2550    assert_eq!(todo_after, expected_todo_after);
2551    assert_eq!(done_after, expected_done_after);
2552}
2553
2554fn test_update<B, M>(
2555    s1: &ParallelScan<B, M>,
2556    data: Vec<B>,
2557    completed_jobs: Vec<M>,
2558) -> (Option<(M, Vec<B>)>, ParallelScan<B, M>)
2559where
2560    B: Debug + Clone + 'static,
2561    M: Debug + Clone + 'static,
2562{
2563    let mut s2 = s1.clone();
2564    let result_opt = s2
2565        .update(data.clone(), completed_jobs.clone(), |_| 0)
2566        .unwrap();
2567
2568    assert_job_count(
2569        s1,
2570        &s2,
2571        completed_jobs.len() as f64,
2572        data.len() as f64,
2573        result_opt.is_some(),
2574    );
2575    (result_opt, s2)
2576}
2577
2578fn int_to_string(u: &usize) -> String {
2579    u.to_string()
2580}
2581
2582// fn sint_to_string(i: &i64) -> String {
2583//     i.to_string()
2584// }
2585
2586fn sint_to_string(buffer: &mut Vec<u8>, i: &i64) {
2587    write!(buffer, "{}", i).unwrap();
2588}
2589
2590fn hash(state: &ParallelScan<i64, i64>) -> String {
2591    hex::encode(state.hash(sint_to_string, sint_to_string))
2592}
2593
2594#[cfg(test)]
2595mod tests {
2596    use std::{
2597        array,
2598        sync::{
2599            atomic::{AtomicBool, Ordering::Relaxed},
2600            mpsc::{sync_channel, Receiver, SyncSender},
2601        },
2602    };
2603
2604    use openmina_core::thread;
2605    use rand::Rng;
2606
2607    use super::*;
2608
2609    #[test]
2610    fn test_ceil_log2() {
2611        for a in 1..50u64 {
2612            let v = (a as f32).log2().ceil() as u64;
2613            let w = ceil_log2(a);
2614            // println!("{} {} {}", a, v, w);
2615            assert_eq!(v, w);
2616        }
2617    }
2618
2619    // Make sure that sha256 produces same result when data is splitted or not
2620    #[test]
2621    fn test_sha256() {
2622        let array: [u8; 2 * 1024] = array::from_fn(|i| (i % 256) as u8);
2623        let mut slice = &array[..];
2624
2625        let mut sha256 = sha2::Sha256::new();
2626        for byte in slice.iter().copied() {
2627            sha256.update(&[byte][..]);
2628        }
2629        let first = sha256.finalize();
2630
2631        let mut sha256 = sha2::Sha256::new();
2632        let mut n = 1;
2633        while !slice.is_empty() {
2634            sha256.update(slice.get(..n).unwrap_or(slice));
2635            slice = slice.get(n..).unwrap_or(&[]);
2636
2637            n += 2;
2638        }
2639        let second = sha256.finalize();
2640
2641        assert_eq!(first, second);
2642    }
2643
2644    #[test]
2645    fn test_range_at_depth() {
2646        let ranges: Vec<_> = (0..10u64).map(btree::range_at_depth).collect();
2647
2648        assert_eq!(
2649            ranges,
2650            [
2651                0..1,
2652                1..3,
2653                3..7,
2654                7..15,
2655                15..31,
2656                31..63,
2657                63..127,
2658                127..255,
2659                255..511,
2660                511..1023,
2661            ]
2662        );
2663    }
2664
2665    /// <https://github.com/MinaProtocol/mina/blob/2ee6e004ba8c6a0541056076aab22ea162f7eb3a/src/lib/parallel_scan/parallel_scan.ml#L1525>
2666    #[test]
2667    fn always_max_base_jobs() {
2668        const MAX_BASE_JOS: u64 = 512;
2669
2670        // let int_to_string = |i: &usize| i.to_string();
2671
2672        // let state = ParallelScan::<usize, usize>::empty(8, 3);
2673        // println!("STATE len={:?} ={:#?}", state.trees.len(), state);
2674        // println!("hash={:?}", hash(&state));
2675
2676        let mut state = ParallelScan::<usize, usize>::empty(MAX_BASE_JOS, 3);
2677        let mut expected_result: Vec<Vec<usize>> = vec![];
2678
2679        // println!("hash={:?}", hex::encode(state.hash(int_to_string, int_to_string)));
2680        // println!("hash={:?}", state.hash(int_to_string, int_to_string));
2681
2682        for i in 0..100 {
2683            println!("####### LOOP {:?} #########", i);
2684
2685            let data: Vec<_> = (0..MAX_BASE_JOS as usize).map(|j| i + j).collect();
2686
2687            expected_result.push(data.clone());
2688
2689            let work: Vec<_> = state
2690                .work_for_next_update(data.len() as u64)
2691                .into_iter()
2692                .flatten()
2693                .collect();
2694
2695            let new_merges: Vec<_> = work
2696                .iter()
2697                .map(|job| match job {
2698                    AvailableJob::Base(i) => *i,
2699                    AvailableJob::Merge { left, right } => *left + *right,
2700                })
2701                .collect();
2702
2703            println!("work={:?} new_merges={:?}", work.len(), new_merges.len());
2704            // println!("hash_s1={:?}", hash(&state));
2705
2706            // let mut s2 = state.clone();
2707            // let result_opt = s2.update(data.clone(), new_merges.clone()).unwrap();
2708            // println!("hash_s2={:?}", hash(&s2));
2709
2710            let (result_opt, s) = test_update(&state, data, new_merges);
2711
2712            // assert!(result_opt.is_none());
2713
2714            let (expected_result_, remaining_expected_results) = {
2715                match result_opt {
2716                    None => ((0, vec![]), expected_result.clone()),
2717                    Some(ref r) => {
2718                        println!("RESULT_OPT.0={} len={}", r.0, r.1.len());
2719                        // println!("expected_result={:?}", expected_result);
2720                        // Printf.eprintf "RESULT_OPT.0=%d len=%d\n%!" a (List.length l);
2721                        if expected_result.is_empty() {
2722                            ((0, vec![]), vec![])
2723                        } else {
2724                            let first = expected_result[0].clone();
2725                            let sum: usize = first.iter().sum();
2726
2727                            ((sum, first), expected_result[1..].to_vec())
2728                        }
2729                    }
2730                }
2731            };
2732
2733            assert_eq!(
2734                result_opt.as_ref().unwrap_or(&expected_result_),
2735                &expected_result_
2736            );
2737
2738            expected_result = remaining_expected_results;
2739            state = s;
2740        }
2741    }
2742
2743    /// <https://github.com/MinaProtocol/mina/blob/2ee6e004ba8c6a0541056076aab22ea162f7eb3a/src/lib/parallel_scan/parallel_scan.ml#L1562>
2744    #[test]
2745    fn random_base_jobs() {
2746        const MAX_BASE_JOS: usize = 512;
2747
2748        let mut state = ParallelScan::<usize, usize>::empty(MAX_BASE_JOS as u64, 3);
2749        let mut rng = rand::thread_rng();
2750        let expected_result = (MAX_BASE_JOS, vec![1usize; MAX_BASE_JOS]);
2751
2752        for _ in 0..1_000 {
2753            let mut data = vec![1; rng.gen_range(0..=30)];
2754            data.truncate(MAX_BASE_JOS);
2755            let data_len = data.len();
2756
2757            println!("list_length={}", data_len);
2758
2759            let work: Vec<_> = state
2760                .work_for_next_update(data_len as u64)
2761                .into_iter()
2762                .flatten()
2763                .take(data_len * 2)
2764                .collect();
2765            let new_merges: Vec<_> = work
2766                .iter()
2767                .map(|job| match job {
2768                    AvailableJob::Base(i) => *i,
2769                    AvailableJob::Merge { left, right } => left + right,
2770                })
2771                .collect();
2772
2773            let (result_opt, s) = test_update(&state, data, new_merges);
2774
2775            assert_eq!(
2776                result_opt.as_ref().unwrap_or(&expected_result),
2777                &expected_result
2778            );
2779
2780            state = s;
2781        }
2782    }
2783
2784    fn gen<FunDone, FunAcc>(fun_job_done: FunDone, fun_acc: FunAcc) -> ParallelScan<i64, i64>
2785    where
2786        FunDone: Fn(&AvailableJob<i64, i64>) -> i64,
2787        FunAcc: Fn(Option<(i64, Vec<i64>)>, (i64, Vec<i64>)) -> Option<(i64, Vec<i64>)>,
2788    {
2789        let mut rng = rand::thread_rng();
2790
2791        let depth = rng.gen_range(2..5);
2792        let delay = rng.gen_range(0..=3);
2793
2794        let max_base_jobs = 2u64.pow(depth);
2795
2796        let mut s = ParallelScan::<i64, i64>::empty(max_base_jobs, delay);
2797
2798        let ndatas = rng.gen_range(2..=20);
2799        let datas: Vec<Vec<i64>> = (1..ndatas)
2800            .map(|_| {
2801                std::iter::repeat_with(|| rng.gen())
2802                    .take(max_base_jobs as usize)
2803                    .collect()
2804            })
2805            .collect();
2806
2807        // let datas = vec![
2808        //     // vec![-58823712978749242i64 as i64, 25103, 33363641, -1611555993190i64 as i64]
2809        //     // std::iter::repeat_with(|| rng.gen_range(0..1000))
2810        //     std::iter::repeat_with(|| rng.gen())
2811        //     // std::iter::repeat_with(|| 1)
2812        //         .take(max_base_jobs as usize)
2813        //         .collect::<Vec<_>>()
2814        // ];
2815
2816        for data in datas {
2817            println!("ndata={}", data.len());
2818
2819            let jobs = flatten(s.work_for_next_update(data.len() as u64));
2820
2821            let jobs_done: Vec<i64> = jobs.iter().map(&fun_job_done).collect();
2822
2823            println!("jobs_donea={}", jobs_done.len());
2824
2825            let old_tuple = s.acc.clone();
2826
2827            let res_opt = s.update(data, jobs_done, |_| 0).unwrap();
2828
2829            if let Some(res) = res_opt {
2830                let tuple = if old_tuple.is_some() {
2831                    old_tuple
2832                } else {
2833                    s.acc.clone()
2834                };
2835
2836                s.acc = fun_acc(tuple, res);
2837            }
2838            println!("s.acc.is_some={:?}", s.acc.is_some());
2839        }
2840
2841        s
2842    }
2843
2844    fn fun_merge_up(
2845        state: Option<(i64, Vec<i64>)>,
2846        mut x: (i64, Vec<i64>),
2847    ) -> Option<(i64, Vec<i64>)> {
2848        let mut acc = state?;
2849        acc.1.append(&mut x.1);
2850        Some((acc.0.wrapping_add(x.0), acc.1))
2851    }
2852
2853    fn job_done(job: &AvailableJob<i64, i64>) -> i64 {
2854        match job {
2855            AvailableJob::Base(x) => *x,
2856            // AvailableJob::Merge { left, right } => left + right,
2857            AvailableJob::Merge { left, right } => {
2858                // let left = *left as i64;
2859                // let right = *right as i64;
2860                left.wrapping_add(*right)
2861                // left + right
2862            }
2863        }
2864    }
2865
2866    /// scan (+) over ints
2867    ///
2868    /// <https://github.com/MinaProtocol/mina/blob/2ee6e004ba8c6a0541056076aab22ea162f7eb3a/src/lib/parallel_scan/parallel_scan.ml#L1677>
2869    #[test]
2870    fn split_on_if_enqueuing_onto_the_next_queue() {
2871        let mut rng = rand::thread_rng();
2872
2873        let p = 4;
2874        let max_base_jobs = 2u64.pow(p);
2875
2876        for _ in 0..100000 {
2877            let state = ParallelScan::<i64, i64>::empty(max_base_jobs, 1);
2878            println!("hash_state={:?}", hash(&state));
2879            let i = rng.gen_range(0..max_base_jobs);
2880
2881            let data: Vec<i64> = (0..i as i64).collect();
2882
2883            let partition = state.partition_if_overflowing();
2884            let jobs = flatten(state.work_for_next_update(data.len() as u64));
2885
2886            let jobs_done: Vec<i64> = jobs.iter().map(job_done).collect();
2887
2888            let tree_count_before = state.trees.len();
2889
2890            let (_, s) = test_update(&state, data, jobs_done);
2891
2892            println!("second={:?}", partition.second.is_some());
2893
2894            match partition.second {
2895                None => {
2896                    let tree_count_after = s.trees.len();
2897                    let expected_tree_count = if partition.first.0 == i {
2898                        tree_count_before + 1
2899                    } else {
2900                        tree_count_before
2901                    };
2902                    assert_eq!(tree_count_after, expected_tree_count);
2903                }
2904                Some(_) => {
2905                    let tree_count_after = s.trees.len();
2906                    let expected_tree_count = if i > partition.first.0 {
2907                        tree_count_before + 1
2908                    } else {
2909                        tree_count_before
2910                    };
2911                    assert_eq!(tree_count_after, expected_tree_count);
2912                }
2913            }
2914        }
2915    }
2916
2917    /// <https://github.com/MinaProtocol/mina/blob/2ee6e004ba8c6a0541056076aab22ea162f7eb3a/src/lib/parallel_scan/parallel_scan.ml#L1722>
2918    #[test]
2919    fn sequence_number_reset() {
2920        let p = 3;
2921        let max_base_jobs = 2u64.pow(p);
2922
2923        let jobs = |state: &ParallelScan<i64, i64>| -> Vec<Vec<Job<base::Record<i64>, merge::Record<i64>>>> {
2924            state.trees.iter().map(|t| t.jobs_records()).rev().collect()
2925        };
2926
2927        let verify_sequence_number = |state: &ParallelScan<i64, i64>| {
2928            let mut state = state.clone();
2929            state.reset_seq_no();
2930
2931            let jobs_list = jobs(&state);
2932
2933            let depth = ceil_log2(max_base_jobs + 1);
2934
2935            for (i, jobs) in jobs_list.iter().enumerate() {
2936                // each tree has jobs up till a level below the older tree
2937                //  and have the following sequence numbers after reset
2938                //             4
2939                //         3       3
2940                //       2   2   2   2
2941                //      1 1 1 1 1 1 1 1
2942
2943                let cur_levels = depth - i as u64;
2944
2945                let seq_sum = (0..cur_levels).fold(0, |acc, j| {
2946                    let j = j + i as u64;
2947                    acc + (2u64.pow(j as u32) * (depth - j))
2948                });
2949
2950                let offset = i as u64;
2951
2952                let sum_of_all_seq_numbers: u64 = jobs
2953                    .iter()
2954                    .map(|job| match job {
2955                        Job::Base(base::Record { seq_no, .. }) => seq_no.0 - offset,
2956                        Job::Merge(merge::Record { seq_no, .. }) => seq_no.0 - offset,
2957                    })
2958                    .sum();
2959
2960                dbg!(sum_of_all_seq_numbers, seq_sum);
2961                assert_eq!(sum_of_all_seq_numbers, seq_sum);
2962            }
2963        };
2964
2965        let mut state = ParallelScan::<i64, i64>::empty(max_base_jobs, 0);
2966        let mut counter = 0;
2967
2968        for _ in 0..50 {
2969            let jobs = flatten(state.jobs_for_next_update());
2970            let jobs_done: Vec<_> = jobs.iter().map(job_done).collect();
2971            let data: Vec<i64> = (0..max_base_jobs as i64).collect();
2972
2973            let (res_opt, s) = test_update(&state, data, jobs_done);
2974
2975            state = s;
2976
2977            if res_opt.is_some() {
2978                if counter > p {
2979                    verify_sequence_number(&state);
2980                } else {
2981                    counter += 1;
2982                }
2983            };
2984        }
2985
2986        assert_eq!(
2987            hash(&state),
2988            "931a0dc0a488289000c195ae361138cc713deddc179b5d22bfa6344508d0cfb5"
2989        );
2990    }
2991
2992    fn step_on_free_space<F, FAcc, B, M>(
2993        state: &mut ParallelScan<B, M>,
2994        w: &SyncSender<Option<(M, Vec<B>)>>,
2995        mut ds: Vec<B>,
2996        f: F,
2997        f_acc: FAcc,
2998    ) where
2999        F: Fn(&AvailableJob<B, M>) -> M,
3000        FAcc: Fn(Option<(M, Vec<B>)>, (M, Vec<B>)) -> Option<(M, Vec<B>)>,
3001        B: Debug + Clone + 'static,
3002        M: Debug + Clone + 'static,
3003    {
3004        loop {
3005            let data = take(&ds, state.max_base_jobs as usize);
3006
3007            let jobs = flatten(state.work_for_next_update(data.len() as u64));
3008            let jobs_done: Vec<_> = jobs.iter().map(&f).collect();
3009
3010            let old_tuple = state.acc.clone();
3011
3012            let (res_opt, mut s) = test_update(state, data.to_vec(), jobs_done);
3013
3014            let s = match res_opt {
3015                Some(x) => {
3016                    let tuple = if old_tuple.is_some() {
3017                        f_acc(old_tuple, x)
3018                    } else {
3019                        s.acc.clone()
3020                    };
3021                    s.acc = tuple;
3022                    s
3023                }
3024                None => s,
3025            };
3026
3027            w.send(s.acc.clone()).unwrap();
3028
3029            *state = s;
3030
3031            let rem_ds = ds.get(state.max_base_jobs as usize..).unwrap_or(&[]);
3032
3033            if rem_ds.is_empty() {
3034                return;
3035            } else {
3036                ds = rem_ds.to_vec();
3037            }
3038        }
3039    }
3040
3041    fn do_steps<F, FAcc, B, M>(
3042        state: &mut ParallelScan<B, M>,
3043        recv: &Receiver<B>,
3044        f: F,
3045        f_acc: FAcc,
3046        w: SyncSender<Option<(M, Vec<B>)>>,
3047    ) where
3048        F: Fn(&AvailableJob<B, M>) -> M,
3049        FAcc: Fn(Option<(M, Vec<B>)>, (M, Vec<B>)) -> Option<(M, Vec<B>)>,
3050        B: Debug + Clone + 'static,
3051        M: Debug + Clone + 'static,
3052    {
3053        let data = recv.recv().unwrap();
3054        step_on_free_space(state, &w, vec![data], &f, &f_acc);
3055    }
3056
3057    fn scan<F, FAcc, B, M>(
3058        s: &mut ParallelScan<B, M>,
3059        data: &Receiver<B>,
3060        f: F,
3061        f_acc: FAcc,
3062    ) -> Receiver<Option<(M, Vec<B>)>>
3063    where
3064        F: Fn(&AvailableJob<B, M>) -> M,
3065        FAcc: Fn(Option<(M, Vec<B>)>, (M, Vec<B>)) -> Option<(M, Vec<B>)>,
3066        B: Debug + Clone + 'static,
3067        M: Debug + Clone + 'static,
3068    {
3069        let (send, rec) = std::sync::mpsc::sync_channel::<Option<(M, Vec<B>)>>(1);
3070        do_steps(s, data, f, f_acc, send);
3071        rec
3072    }
3073
3074    fn step_repeatedly<F, FAcc, B, M>(
3075        state: &mut ParallelScan<B, M>,
3076        data: &Receiver<B>,
3077        f: F,
3078        f_acc: FAcc,
3079    ) -> Receiver<Option<(M, Vec<B>)>>
3080    where
3081        F: Fn(&AvailableJob<B, M>) -> M,
3082        FAcc: Fn(Option<(M, Vec<B>)>, (M, Vec<B>)) -> Option<(M, Vec<B>)>,
3083        B: Debug + Clone + 'static,
3084        M: Debug + Clone + 'static,
3085    {
3086        let (send, rec) = std::sync::mpsc::sync_channel::<Option<(M, Vec<B>)>>(1);
3087        do_steps(state, data, f, f_acc, send);
3088        rec
3089    }
3090
3091    /// <https://github.com/MinaProtocol/mina/blob/2ee6e004ba8c6a0541056076aab22ea162f7eb3a/src/lib/parallel_scan/parallel_scan.ml#L1803>
3092    #[test]
3093    fn scan_can_be_initialized_from_intermediate_state() {
3094        for _ in 0..10 {
3095            let mut state = gen(job_done, fun_merge_up);
3096
3097            println!("state={:#?}", state);
3098
3099            let do_one_next = Arc::new(AtomicBool::new(false));
3100
3101            let do_one_next_clone = Arc::clone(&do_one_next);
3102            let (send, recv) = sync_channel(1);
3103
3104            thread::spawn(move || loop {
3105                let v = if do_one_next_clone.load(Relaxed) {
3106                    do_one_next_clone.store(false, Relaxed);
3107                    1i64
3108                } else {
3109                    0i64
3110                };
3111                if send.send(v).is_err() {
3112                    return;
3113                }
3114            });
3115
3116            let one_then_zeros = recv;
3117
3118            let parallelism = state.max_base_jobs * ceil_log2(state.max_base_jobs);
3119            let old_acc = state.acc.as_ref().cloned().unwrap_or((0, vec![]));
3120
3121            let fill_some_zero =
3122                |state: &mut ParallelScan<i64, i64>, v: i64, r: &Receiver<i64>| -> i64 {
3123                    (0..parallelism * parallelism).fold(v, |acc, _| {
3124                        let pipe = step_repeatedly(state, r, job_done, fun_merge_up);
3125
3126                        match pipe.recv() {
3127                            Ok(Some((v, _))) => v,
3128                            Ok(None) => acc,
3129                            Err(_) => acc,
3130                        }
3131                    })
3132                };
3133
3134            let v = fill_some_zero(&mut state, 0, &one_then_zeros);
3135
3136            do_one_next.store(true, Relaxed);
3137
3138            let acc = { state.acc.clone().unwrap() };
3139
3140            assert_ne!(acc.0, old_acc.0);
3141
3142            fill_some_zero(&mut state, v, &one_then_zeros);
3143
3144            let acc_plus_one = { state.acc.unwrap() };
3145            assert_eq!(acc_plus_one.0, acc.0.wrapping_add(1));
3146        }
3147    }
3148
3149    /// scan (+) over ints, map from string
3150    ///
3151    /// <https://github.com/MinaProtocol/mina/blob/2ee6e004ba8c6a0541056076aab22ea162f7eb3a/src/lib/parallel_scan/parallel_scan.ml#L1879>
3152    #[test]
3153    fn scan_behaves_like_a_fold_long_term() {
3154        fn fun_merge_up(
3155            tuple: Option<(i64, Vec<String>)>,
3156            mut x: (i64, Vec<String>),
3157        ) -> Option<(i64, Vec<String>)> {
3158            let mut acc = tuple?;
3159            acc.1.append(&mut x.1);
3160
3161            Some((acc.0.wrapping_add(x.0), acc.1))
3162        }
3163
3164        fn job_done(job: &AvailableJob<String, i64>) -> i64 {
3165            match job {
3166                AvailableJob::Base(x) => x.parse().unwrap(),
3167                AvailableJob::Merge { left, right } => left.wrapping_add(*right),
3168            }
3169        }
3170
3171        let (send, recv) = sync_channel(1);
3172
3173        let depth = 7;
3174        let count: i64 = 1000;
3175
3176        thread::spawn(move || {
3177            let mut count = count;
3178            let x = count;
3179            loop {
3180                let next = if count <= 0 {
3181                    "0".to_string()
3182                } else {
3183                    (x - count).to_string()
3184                };
3185
3186                count -= 1;
3187
3188                if send.send(next).is_err() {
3189                    return;
3190                }
3191            }
3192        });
3193
3194        let mut s = ParallelScan::<String, i64>::empty(2u64.pow(depth as u32), 1);
3195
3196        let after_3n = (0..4 * count).fold(0i64, |acc, _| {
3197            let result = scan(&mut s, &recv, job_done, fun_merge_up);
3198            match result.recv() {
3199                Ok(Some((v, _))) => v,
3200                Ok(None) => acc,
3201                Err(_) => acc,
3202            }
3203        });
3204
3205        let expected = (0..count).fold(0i64, |a, b| a.wrapping_add(b));
3206
3207        assert_eq!(after_3n, expected);
3208    }
3209
3210    /// scan performs operation in correct order with \
3211    /// non-commutative semigroup
3212    ///
3213    /// <https://github.com/MinaProtocol/mina/blob/2ee6e004ba8c6a0541056076aab22ea162f7eb3a/src/lib/parallel_scan/parallel_scan.ml#L1917>
3214    #[test]
3215    fn scan_concat_over_strings() {
3216        fn fun_merge_up(
3217            tuple: Option<(String, Vec<String>)>,
3218            mut x: (String, Vec<String>),
3219        ) -> Option<(String, Vec<String>)> {
3220            let mut acc = tuple?;
3221            acc.1.append(&mut x.1);
3222
3223            Some((format!("{}{}", acc.0, x.0), acc.1))
3224        }
3225
3226        fn job_done(job: &AvailableJob<String, String>) -> String {
3227            match job {
3228                AvailableJob::Base(x) => x.clone(),
3229                AvailableJob::Merge { left, right } => {
3230                    format!("{}{}", left, right)
3231                }
3232            }
3233        }
3234
3235        let (send, recv) = sync_channel(1);
3236
3237        let depth = 7;
3238        let count: i64 = 100;
3239
3240        thread::spawn(move || {
3241            let mut count = count;
3242            let x = count;
3243            loop {
3244                let next = if count <= 0 {
3245                    "".to_string()
3246                } else {
3247                    let n = (x - count).to_string();
3248                    format!("{},", n)
3249                };
3250
3251                count -= 1;
3252
3253                if send.send(next).is_err() {
3254                    return;
3255                }
3256            }
3257        });
3258
3259        let mut s = ParallelScan::<String, String>::empty(2u64.pow(depth as u32), 1);
3260
3261        let after_3n = (0..42 * count).fold(String::new(), |acc, _| {
3262            let result = scan(&mut s, &recv, job_done, fun_merge_up);
3263            match result.recv() {
3264                Ok(Some((v, _))) => v,
3265                Ok(None) => acc,
3266                Err(_) => acc,
3267            }
3268        });
3269
3270        let expected = (0..count)
3271            .map(|i| format!("{},", i))
3272            .fold(String::new(), |a, b| format!("{}{}", a, b));
3273
3274        assert_eq!(after_3n, expected);
3275    }
3276}