Skip to main content

mina_tree/scan_state/
parallel_scan.rs

1use std::{
2    collections::{BTreeMap, VecDeque},
3    fmt::Debug,
4    io::Write,
5    ops::ControlFlow,
6    sync::Arc,
7};
8
9use itertools::Itertools;
10use serde::{Deserialize, Serialize};
11use sha2::{
12    digest::{generic_array::GenericArray, typenum::U32},
13    Digest, Sha256,
14};
15use ControlFlow::{Break, Continue};
16
17use super::scan_state::transaction_snark::{LedgerProofWithSokMessage, TransactionWithWitness};
18
19/// Sequence number for jobs in the scan state that corresponds to the order in
20/// which they were added
21#[derive(Clone, Eq, Ord, PartialEq, PartialOrd)]
22pub struct SequenceNumber(u64);
23
24impl Debug for SequenceNumber {
25    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
26        f.write_fmt(format_args!("{}", self.0))
27    }
28}
29
30impl SequenceNumber {
31    pub(super) fn new(number: u64) -> Self {
32        Self(number)
33    }
34
35    fn zero() -> Self {
36        Self(0)
37    }
38
39    pub fn incr(&self) -> Self {
40        Self(self.0 + 1)
41    }
42
43    fn is_u64_max(&self) -> bool {
44        self.0 == u64::MAX
45    }
46
47    pub fn as_u64(&self) -> u64 {
48        self.0
49    }
50}
51
52impl std::ops::Sub for &'_ SequenceNumber {
53    type Output = SequenceNumber;
54
55    fn sub(self, rhs: &'_ SequenceNumber) -> Self::Output {
56        SequenceNumber(self.0 - rhs.0)
57    }
58}
59
60/// Each node on the tree is viewed as a job that needs to be completed. When a
61/// job is completed, it creates a new "Todo" job and marks the old job as "Done"
62#[derive(Clone, Debug)]
63pub enum JobStatus {
64    Todo,
65    Done,
66}
67
68impl JobStatus {
69    pub fn is_done(&self) -> bool {
70        matches!(self, Self::Done)
71    }
72
73    fn as_str(&self) -> &'static str {
74        match self {
75            JobStatus::Todo => "Todo",
76            JobStatus::Done => "Done",
77        }
78    }
79}
80
81/// The number of new jobs- base and merge that can be added to this tree.
82/// Each node has a weight associated to it and the
83/// new jobs received are distributed across the tree based on this number.
84#[derive(Clone)]
85pub struct Weight {
86    pub(super) base: u64,
87    pub(super) merge: u64,
88}
89
90impl Debug for Weight {
91    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
92        write!(f, "{{ base: {} merge: {} }}", self.base, self.merge)
93    }
94}
95
96impl Weight {
97    fn zero() -> Self {
98        Self { base: 0, merge: 0 }
99    }
100}
101
102trait Lens {
103    type Value;
104    type Target;
105    fn get<'a>(&self, target: &'a Self::Target) -> &'a Self::Value;
106    fn set(&self, target: &Self::Target, value: Self::Value) -> Self::Target;
107}
108
109enum WeightLens {
110    Base,
111    Merge,
112}
113
114impl Lens for WeightLens {
115    type Value = u64;
116    type Target = Weight;
117
118    fn get<'a>(&self, target: &'a Self::Target) -> &'a Self::Value {
119        match self {
120            WeightLens::Base => &target.base,
121            WeightLens::Merge => &target.merge,
122        }
123    }
124
125    fn set(&self, target: &Self::Target, value: Self::Value) -> Self::Target {
126        match self {
127            WeightLens::Base => Self::Target {
128                base: value,
129                merge: target.merge,
130            },
131            WeightLens::Merge => Self::Target {
132                base: target.base,
133                merge: value,
134            },
135        }
136    }
137}
138
139#[derive(Debug)]
140enum WorkForTree {
141    Current,
142    Next,
143}
144
145/// For base proofs (Proving new transactions)
146pub mod base {
147    use super::*;
148
149    #[derive(Clone)]
150    pub struct Record<BaseJob> {
151        pub job: BaseJob,
152        pub seq_no: SequenceNumber,
153        pub state: JobStatus,
154    }
155
156    impl<BaseJob> Debug for Record<BaseJob> {
157        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
158            let Self {
159                job: _,
160                seq_no,
161                state,
162            } = self;
163            f.write_fmt(format_args!("seq_no: {:?}, state: {:?}", seq_no, state))
164        }
165    }
166
167    #[derive(Clone)]
168    pub enum Job<BaseJob> {
169        Empty,
170        Full(Record<BaseJob>),
171    }
172
173    impl<BaseJob> Debug for Job<BaseJob> {
174        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
175            match self {
176                Self::Empty => write!(f, "Empty"),
177                Self::Full(arg0) => f.write_fmt(format_args!("Full {{ {:?} }}", arg0)),
178            }
179        }
180    }
181
182    #[derive(Clone, Debug)]
183    pub struct Base<BaseJob> {
184        pub weight: Weight,
185        pub job: Job<BaseJob>,
186    }
187
188    impl<BaseJob: Clone> Record<BaseJob> {
189        pub fn map<F: Fn(&BaseJob) -> BaseJob>(&self, fun: F) -> Self {
190            Self {
191                job: fun(&self.job),
192                seq_no: self.seq_no.clone(),
193                state: self.state.clone(),
194            }
195        }
196
197        pub fn with_seq_no(&self, no: SequenceNumber) -> Self {
198            Self {
199                seq_no: no,
200                state: self.state.clone(),
201                job: self.job.clone(),
202            }
203        }
204    }
205
206    impl<BaseJob: Clone> Job<BaseJob> {
207        pub fn map<F: Fn(&BaseJob) -> BaseJob>(&self, fun: F) -> Self {
208            match self {
209                Job::Empty => Self::Empty,
210                Job::Full(r) => Job::Full(r.map(fun)),
211            }
212        }
213    }
214
215    impl<BaseJob: Clone> Base<BaseJob> {
216        pub fn map<F: Fn(&BaseJob) -> BaseJob>(&self, fun: F) -> Self {
217            Self {
218                weight: self.weight.clone(),
219                job: self.job.map(fun),
220            }
221        }
222
223        pub fn with_seq_no(&self, no: SequenceNumber) -> Self {
224            Self {
225                weight: self.weight.clone(),
226                job: match &self.job {
227                    Job::Full(record) => Job::Full(record.with_seq_no(no)),
228                    x => x.clone(),
229                },
230            }
231        }
232    }
233}
234
235/// For merge proofs: Merging two base proofs or two merge proofs
236pub mod merge {
237    use super::*;
238
239    #[derive(Clone)]
240    pub struct Record<MergeJob> {
241        pub left: MergeJob,
242        pub right: MergeJob,
243        pub seq_no: SequenceNumber,
244        pub state: JobStatus,
245    }
246
247    impl<MergeJob> Debug for Record<MergeJob> {
248        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
249            let Self {
250                left: _,
251                right: _,
252                seq_no,
253                state,
254            } = self;
255            f.write_fmt(format_args!("seq_no: {:?}, state: {:?}", seq_no, state))
256        }
257    }
258
259    impl<MergeJob: Clone> Record<MergeJob> {
260        pub fn with_seq_no(&self, no: SequenceNumber) -> Self {
261            Self {
262                seq_no: no,
263                left: self.left.clone(),
264                right: self.right.clone(),
265                state: self.state.clone(),
266            }
267        }
268    }
269
270    #[derive(Clone)]
271    pub enum Job<MergeJob> {
272        Empty,
273        Part(MergeJob), // left
274        Full(Record<MergeJob>),
275    }
276
277    impl<MergeJob> Debug for Job<MergeJob> {
278        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
279            match self {
280                Self::Empty => write!(f, "Empty"),
281                Self::Part(_) => write!(f, "Part(merge)"),
282                Self::Full(arg0) => f.write_fmt(format_args!("Full {{ {:?} }}", arg0)),
283            }
284        }
285    }
286
287    #[derive(Clone, Debug)]
288    pub struct Merge<MergeJob> {
289        pub weight: (Weight, Weight),
290        pub job: Job<MergeJob>,
291    }
292
293    impl<MergeJob> Record<MergeJob> {
294        pub fn map<F: Fn(&MergeJob) -> MergeJob>(&self, fun: F) -> Self {
295            Self {
296                left: fun(&self.left),
297                right: fun(&self.right),
298                seq_no: self.seq_no.clone(),
299                state: self.state.clone(),
300            }
301        }
302    }
303
304    impl<MergeJob> Job<MergeJob> {
305        pub fn map<F: Fn(&MergeJob) -> MergeJob>(&self, fun: F) -> Self {
306            match self {
307                Job::Empty => Self::Empty,
308                Job::Part(j) => Job::Part(fun(j)),
309                Job::Full(r) => Job::Full(r.map(fun)),
310            }
311        }
312    }
313
314    impl<MergeJob: Clone> Merge<MergeJob> {
315        pub fn map<F: Fn(&MergeJob) -> MergeJob>(&self, fun: F) -> Self {
316            Self {
317                weight: self.weight.clone(),
318                job: self.job.map(fun),
319            }
320        }
321
322        pub fn with_seq_no(&self, no: SequenceNumber) -> Self {
323            Self {
324                weight: self.weight.clone(),
325                job: match &self.job {
326                    Job::Full(record) => Job::Full(record.with_seq_no(no)),
327                    x => x.clone(),
328                },
329            }
330        }
331    }
332}
333
334/// All the jobs on a tree that can be done. Base.Full and Merge.Full
335#[derive(Clone, Debug, Serialize, Deserialize)]
336#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
337pub enum AvailableJob<BaseJob, MergeJob> {
338    Base(BaseJob),
339    Merge { left: MergeJob, right: MergeJob },
340}
341
342/// New jobs to be added (including new transactions or new merge jobs)
343#[derive(Clone, Debug)]
344enum Job<BaseJob, MergeJob> {
345    Base(BaseJob),
346    Merge(MergeJob),
347}
348
349/// Space available and number of jobs required to enqueue data.
350/// first = space on the current tree and number of jobs required
351/// to be completed
352/// second = If the current-tree space is less than <max_base_jobs>
353/// then remaining number of slots on a new tree and the corresponding
354/// job count.
355#[derive(Clone)]
356pub struct SpacePartition {
357    pub first: (u64, u64),
358    pub second: Option<(u64, u64)>,
359}
360
361impl Debug for SpacePartition {
362    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
363        struct Detail {
364            space_available: u64,
365            njobs_to_be_completed: u64,
366        }
367
368        impl Debug for Detail {
369            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
370                f.write_fmt(format_args!(
371                    "space_available: {}, njobs_to_be_completed={}",
372                    self.space_available, self.njobs_to_be_completed
373                ))
374            }
375        }
376
377        f.debug_struct("SpacePartition")
378            .field(
379                "first(current_tree)",
380                &Detail {
381                    space_available: self.first.0,
382                    njobs_to_be_completed: self.first.1,
383                },
384            )
385            .field(
386                "second(next_tree)",
387                &self.second.map(|second| Detail {
388                    space_available: second.0,
389                    njobs_to_be_completed: second.1,
390                }),
391            )
392            .finish()
393    }
394}
395
396trait WithVTable<T>: Debug {
397    fn by_ref(&self) -> &T;
398}
399
400impl<T: Debug> WithVTable<T> for T {
401    fn by_ref(&self) -> &Self {
402        self
403    }
404}
405
406#[derive(Clone, Debug)]
407pub enum Value<B, M> {
408    Leaf(B),
409    Node(M),
410}
411
412/// A single tree with number of leaves = max_base_jobs = 2**transaction_capacity_log_2
413#[derive(Clone)]
414pub struct Tree<B, M> {
415    pub(super) values: Vec<Value<B, M>>,
416}
417
418impl<B, M> Debug for Tree<base::Base<B>, merge::Merge<M>>
419where
420    B: Debug,
421    M: Debug,
422{
423    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
424        enum BaseOrMerge<'a, B, M> {
425            Base(&'a base::Base<B>),
426            Merge(&'a merge::Merge<M>),
427        }
428
429        impl<B, M> Debug for BaseOrMerge<'_, B, M>
430        where
431            B: Debug,
432            M: Debug,
433        {
434            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
435                match self {
436                    Self::Base(arg0) => write!(f, "{:?}", arg0),
437                    Self::Merge(arg0) => write!(f, "{:?}", arg0),
438                }
439            }
440        }
441
442        let mut by_depth = BTreeMap::<usize, Vec<_>>::default();
443
444        for (index, v) in self.values.iter().enumerate() {
445            let vec = by_depth.entry(btree::depth_at(index) as usize).or_default();
446            let v = match v {
447                Value::Leaf(b) => BaseOrMerge::Base(b),
448                Value::Node(m) => BaseOrMerge::Merge(m),
449            };
450            vec.push(v);
451        }
452
453        for (depth, values) in by_depth.iter() {
454            writeln!(f, "depth={} {:#?}", depth, values)?;
455        }
456
457        Ok(())
458    }
459}
460
461mod btree {
462    // <https://stackoverflow.com/a/31147495/5717561>
463    pub fn depth_at(index: usize) -> u64 {
464        // Get the depth from its index (in the array)
465        // TODO: Find if there is a faster way to get that
466        let depth = ((index + 1) as f32).log2().floor() as u32;
467        depth as u64
468    }
469
470    pub fn child_left(index: usize) -> usize {
471        (index * 2) + 1
472    }
473
474    pub fn child_right(index: usize) -> usize {
475        (index * 2) + 2
476    }
477
478    pub fn parent(index: usize) -> Option<usize> {
479        Some(index.checked_sub(1)? / 2)
480    }
481
482    pub fn range_at_depth(depth: u64) -> std::ops::Range<usize> {
483        if depth == 0 {
484            return 0..1;
485        }
486
487        let start = (1 << depth) - 1;
488        let end = (1 << (depth + 1)) - 1;
489
490        start..end
491    }
492}
493
494impl<B, M> Tree<B, M>
495where
496    B: Debug + 'static,
497    M: Debug + 'static,
498{
499    /// mapi where i is the level of the tree
500    fn map_depth<FunMerge, FunBase>(&self, fun_merge: &FunMerge, fun_base: &FunBase) -> Self
501    where
502        FunMerge: for<'a> Fn(u64, &'a M) -> M,
503        FunBase: for<'a> Fn(&'a B) -> B,
504    {
505        let values = self
506            .values
507            .iter()
508            .enumerate()
509            .map(|(index, value)| match value {
510                Value::Leaf(base) => Value::Leaf(fun_base(base)),
511                Value::Node(merge) => Value::Node(fun_merge(btree::depth_at(index), merge)),
512            })
513            .collect();
514
515        Self { values }
516    }
517
518    /// Use for binprot conversion only
519    pub(super) fn values_by_depth(&self) -> BTreeMap<u64, Value<Vec<&B>, Vec<&M>>> {
520        let mut values: BTreeMap<u64, Value<Vec<&B>, Vec<&M>>> = BTreeMap::default();
521
522        for (index, value) in self.values.iter().enumerate() {
523            values
524                .entry(btree::depth_at(index))
525                .and_modify(|for_depth: &mut Value<_, _>| match for_depth {
526                    Value::Leaf(vec) => {
527                        let Value::Leaf(leaf) = value else {
528                            panic!("invalid")
529                        };
530                        vec.push(leaf);
531                    }
532                    Value::Node(vec) => {
533                        let Value::Node(node) = value else {
534                            panic!("invalid")
535                        };
536                        vec.push(node);
537                    }
538                })
539                .or_insert({
540                    match value {
541                        Value::Leaf(leaf) => {
542                            let mut vec = Vec::with_capacity(255);
543                            vec.push(leaf);
544                            Value::Leaf(vec)
545                        }
546                        Value::Node(node) => {
547                            let mut vec = Vec::with_capacity(255);
548                            vec.push(node);
549                            Value::Node(vec)
550                        }
551                    }
552                });
553        }
554
555        values
556    }
557
558    fn map<FunMerge, FunBase>(&self, fun_merge: FunMerge, fun_base: FunBase) -> Self
559    where
560        FunMerge: Fn(&M) -> M,
561        FunBase: Fn(&B) -> B,
562    {
563        self.map_depth(&|_, m| fun_merge(m), &fun_base)
564    }
565
566    /// foldi where i is the cur_level
567    fn fold_depth_until_prime<Accum, Final, FunMerge, FunBase>(
568        &self,
569        fun_merge: &FunMerge,
570        fun_base: &FunBase,
571        init: Accum,
572    ) -> ControlFlow<Final, Accum>
573    where
574        FunMerge: Fn(u64, Accum, &M) -> ControlFlow<Final, Accum>,
575        FunBase: Fn(Accum, &B) -> ControlFlow<Final, Accum>,
576    {
577        let mut accum = init;
578
579        for (index, value) in self.values.iter().enumerate() {
580            accum = match value {
581                Value::Leaf(base) => fun_base(accum, base)?,
582                Value::Node(merge) => fun_merge(btree::depth_at(index), accum, merge)?,
583            };
584        }
585
586        Continue(accum)
587    }
588
589    /// foldi where i is the cur_level
590    fn fold_depth_until_prime_err<Accum, FunMerge, FunBase>(
591        &self,
592        fun_merge: &FunMerge,
593        fun_base: &FunBase,
594        init: Accum,
595    ) -> Result<Accum, String>
596    where
597        FunMerge: Fn(u64, Accum, &M) -> Result<Accum, String>,
598        FunBase: Fn(Accum, &B) -> Result<Accum, String>,
599    {
600        let mut accum = init;
601
602        for (index, value) in self.values.iter().enumerate() {
603            accum = match value {
604                Value::Leaf(base) => fun_base(accum, base)?,
605                Value::Node(merge) => fun_merge(btree::depth_at(index), accum, merge)?,
606            };
607        }
608
609        Ok(accum)
610    }
611
612    fn fold_depth_until<Accum, Final, FunFinish, FunMerge, FunBase>(
613        &self,
614        fun_merge: FunMerge,
615        fun_base: FunBase,
616        fun_finish: FunFinish,
617        init: Accum,
618    ) -> Final
619    where
620        FunMerge: Fn(u64, Accum, &M) -> ControlFlow<Final, Accum>,
621        FunBase: Fn(Accum, &B) -> ControlFlow<Final, Accum>,
622        FunFinish: Fn(Accum) -> Final,
623    {
624        match self.fold_depth_until_prime(&fun_merge, &fun_base, init) {
625            Continue(accum) => fun_finish(accum),
626            Break(value) => value,
627        }
628    }
629
630    fn fold_depth<Accum, FunMerge, FunBase>(
631        &self,
632        fun_merge: FunMerge,
633        fun_base: FunBase,
634        init: Accum,
635    ) -> Accum
636    where
637        FunMerge: Fn(u64, Accum, &M) -> Accum,
638        FunBase: Fn(Accum, &B) -> Accum,
639    {
640        self.fold_depth_until(
641            |i, accum, a| Continue(fun_merge(i, accum, a)),
642            |accum, d| Continue(fun_base(accum, d)),
643            |x| x,
644            init,
645        )
646    }
647
648    fn fold<Accum, FunMerge, FunBase>(
649        &self,
650        fun_merge: FunMerge,
651        fun_base: FunBase,
652        init: Accum,
653    ) -> Accum
654    where
655        FunMerge: Fn(Accum, &M) -> Accum,
656        FunBase: Fn(Accum, &B) -> Accum,
657    {
658        self.fold_depth(|_, accum, a| fun_merge(accum, a), fun_base, init)
659    }
660
661    fn fold_until<Accum, Final, FunFinish, FunMerge, FunBase>(
662        &self,
663        fun_merge: FunMerge,
664        fun_base: FunBase,
665        fun_finish: FunFinish,
666        init: Accum,
667    ) -> Final
668    where
669        FunMerge: Fn(Accum, &M) -> ControlFlow<Final, Accum>,
670        FunBase: Fn(Accum, &B) -> ControlFlow<Final, Accum>,
671        FunFinish: Fn(Accum) -> Final,
672    {
673        self.fold_depth_until(
674            |_, accum, a| fun_merge(accum, a),
675            fun_base,
676            fun_finish,
677            init,
678        )
679    }
680
681    fn update_split<Data, FunJobs, FunWeight, FunMerge, FunBase, Weight, R>(
682        &self,
683        fun_merge: &FunMerge,
684        fun_base: &FunBase,
685        weight_merge: &FunWeight,
686        jobs: &[Data],
687        update_level: u64,
688        jobs_split: &FunJobs,
689    ) -> Result<(Self, Option<R>), ()>
690    where
691        FunMerge: Fn(&[Data], u64, &M) -> Result<(M, Option<R>), ()>,
692        FunBase: Fn(&[Data], B) -> Result<B, ()>,
693        FunWeight: Fn(&M) -> (Weight, Weight),
694        FunJobs: Fn((Weight, Weight), &[Data]) -> (&[Data], &[Data]),
695        Data: Clone,
696        M: Clone,
697        B: Clone,
698    {
699        let mut values = Vec::with_capacity(self.values.len());
700        let mut scan_result = None;
701
702        // Because our tree is a perfect binary tree, two values pushed
703        // at the back of `jobs_fifo` by a node will be popped by its
704        // left and right children, respectively
705        let mut jobs_fifo = VecDeque::with_capacity(self.values.len());
706
707        jobs_fifo.push_back(jobs);
708
709        for (index, value) in self.values.iter().enumerate() {
710            let depth = btree::depth_at(index);
711
712            if depth > update_level {
713                values.push(value.clone());
714                continue;
715            }
716
717            let jobs_for_this = jobs_fifo.pop_front().unwrap();
718
719            let value = match value {
720                Value::Leaf(base) => Value::Leaf(fun_base(jobs_for_this, base.clone())?),
721                Value::Node(merge) => {
722                    let (jobs_left, jobs_right) = jobs_split(weight_merge(merge), jobs_for_this);
723                    jobs_fifo.push_back(jobs_left);
724                    jobs_fifo.push_back(jobs_right);
725
726                    let (value, result) = fun_merge(jobs_for_this, depth, merge)?;
727
728                    if scan_result.is_none() {
729                        scan_result = Some(result);
730                    }
731
732                    Value::Node(value)
733                }
734            };
735
736            values.push(value);
737        }
738
739        assert_eq!(jobs_fifo.capacity(), self.values.len());
740
741        Ok(((Self { values }), scan_result.flatten()))
742    }
743
744    fn update_accumulate<Data, FunMerge, FunBase>(
745        &self,
746        fun_merge: &FunMerge,
747        fun_base: &FunBase,
748    ) -> (Self, Data)
749    where
750        FunMerge: Fn((Data, Data), &M) -> (M, Data),
751        FunBase: Fn(&B) -> (B, Data),
752        Data: Clone,
753    {
754        let mut datas = vec![None; self.values.len()];
755
756        let childs_of = |data: &mut [Option<Data>], index: usize| -> Option<(Data, Data)> {
757            let left = data
758                .get_mut(btree::child_left(index))
759                .and_then(Option::take)?;
760            let right = data
761                .get_mut(btree::child_right(index))
762                .and_then(Option::take)?;
763
764            Some((left, right))
765        };
766
767        let mut values: Vec<_> = self
768            .values
769            .iter()
770            .enumerate()
771            .rev()
772            .map(|(index, value)| match value {
773                Value::Leaf(base) => {
774                    let (new_base, count_list) = fun_base(base);
775                    datas[index] = Some(count_list);
776                    Value::Leaf(new_base)
777                }
778                Value::Node(merge) => {
779                    let (left, right) = childs_of(datas.as_mut(), index).unwrap();
780                    let (value, count_list) = fun_merge((left, right), merge);
781                    datas[index] = Some(count_list);
782                    Value::Node(value)
783                }
784            })
785            .collect();
786
787        values.reverse();
788
789        (Self { values }, datas[0].take().unwrap())
790    }
791
792    fn iter(&self) -> impl Iterator<Item = (u64, &Value<B, M>)> {
793        self.values
794            .iter()
795            .enumerate()
796            .map(|(index, value)| (btree::depth_at(index), value))
797    }
798}
799
800impl Tree<base::Base<Arc<TransactionWithWitness>>, merge::Merge<Arc<LedgerProofWithSokMessage>>> {
801    pub fn view(&self) -> impl Iterator<Item = JobValueWithIndex<'_>> {
802        self.values
803            .iter()
804            .enumerate()
805            .map(|(index, value)| JobValueWithIndex {
806                index,
807                job: match value {
808                    Value::Leaf(base) => Value::Leaf(&base.job),
809                    Value::Node(merge) => Value::Node(&merge.job),
810                },
811            })
812    }
813}
814
815pub type JobValue<'a> = super::parallel_scan::Value<
816    &'a base::Job<Arc<TransactionWithWitness>>,
817    &'a merge::Job<Arc<LedgerProofWithSokMessage>>,
818>;
819
820pub struct JobValueWithIndex<'a> {
821    index: usize,
822    pub job: JobValue<'a>,
823}
824
825impl JobValueWithIndex<'_> {
826    pub fn index(&self) -> usize {
827        self.index
828    }
829
830    pub fn depth(&self) -> usize {
831        btree::depth_at(self.index) as usize
832    }
833
834    pub fn parent(&self) -> Option<usize> {
835        btree::parent(self.index)
836    }
837
838    pub fn child_left(&self) -> usize {
839        btree::child_left(self.index)
840    }
841
842    pub fn child_right(&self) -> usize {
843        btree::child_right(self.index)
844    }
845
846    /// Returns sibling index and if the sibling is on the left.
847    pub fn bundle_sibling(&self) -> Option<(usize, bool)> {
848        let parent = self.parent()?;
849        let try_sibling =
850            |parent, index| btree::parent(index).filter(|p| *p == parent).map(|_| index);
851        try_sibling(parent, self.index - 1)
852            .map(|i| (i, true))
853            .or_else(|| try_sibling(parent, self.index + 1).map(|i| (i, false)))
854    }
855}
856
857#[derive(Clone, Debug)]
858pub struct ParallelScan<BaseJob, MergeJob> {
859    pub trees: Vec<Tree<base::Base<BaseJob>, merge::Merge<MergeJob>>>,
860    /// last emitted proof and the corresponding transactions
861    pub(super) acc: Option<(MergeJob, Vec<BaseJob>)>,
862    /// Sequence number for the jobs added every block
863    pub curr_job_seq_no: SequenceNumber,
864    /// transaction_capacity_log_2
865    pub(super) max_base_jobs: u64,
866    pub(super) delay: u64,
867}
868
869pub(super) enum ResetKind {
870    Base,
871    Merge,
872    Both,
873}
874
875impl<BaseJob, MergeJob> Tree<base::Base<BaseJob>, merge::Merge<MergeJob>>
876where
877    BaseJob: Clone + Debug + 'static,
878    MergeJob: Clone + Debug + 'static,
879{
880    fn update(
881        &self,
882        completed_jobs: &[Job<BaseJob, MergeJob>],
883        update_level: u64,
884        sequence_no: SequenceNumber,
885        lens: WeightLens,
886    ) -> Result<(Self, Option<MergeJob>), ()> {
887        let add_merges = |jobs: &[Job<BaseJob, MergeJob>],
888                          current_level: u64,
889                          merge_job: &merge::Merge<MergeJob>|
890         -> Result<(merge::Merge<MergeJob>, Option<MergeJob>), ()> {
891            use merge::{
892                Job::{Empty, Full, Part},
893                Record,
894            };
895            use Job::{Base, Merge};
896
897            let weight = &merge_job.weight;
898            let m = &merge_job.job;
899
900            let (w1, w2) = &weight;
901            let (left, right) = (*lens.get(w1), *lens.get(w2));
902
903            // println!("current_level={} update_level={}", current_level, update_level);
904            if update_level > 0 && current_level == update_level - 1 {
905                // Create new jobs from the completed ones
906                let (new_weight, new_m) = match (jobs, m) {
907                    ([], m) => (weight.clone(), m.clone()),
908                    ([Merge(a), Merge(b)], Empty) => {
909                        let w1 = lens.set(w1, left - 1);
910                        let w2 = lens.set(w2, right - 1);
911
912                        (
913                            (w1, w2),
914                            Full(Record {
915                                left: a.clone(),
916                                right: b.clone(),
917                                seq_no: sequence_no.clone(),
918                                state: JobStatus::Todo,
919                            }),
920                        )
921                    }
922                    ([Merge(a)], Empty) => {
923                        let w1 = lens.set(w1, left - 1);
924                        let w2 = lens.set(w2, right);
925
926                        ((w1, w2), Part(a.clone()))
927                    }
928                    ([Merge(b)], Part(a)) => {
929                        let w1 = lens.set(w1, left);
930                        let w2 = lens.set(w2, right - 1);
931
932                        (
933                            (w1, w2),
934                            Full(Record {
935                                left: a.clone(),
936                                right: b.clone(),
937                                seq_no: sequence_no.clone(),
938                                state: JobStatus::Todo,
939                            }),
940                        )
941                    }
942                    ([Base(_)], Empty) => {
943                        // Depending on whether this is the first or second of the two base jobs
944
945                        let weight = if left == 0 {
946                            let w1 = lens.set(w1, left);
947                            let w2 = lens.set(w2, right - 1);
948                            (w1, w2)
949                        } else {
950                            let w1 = lens.set(w1, left - 1);
951                            let w2 = lens.set(w2, right);
952                            (w1, w2)
953                        };
954
955                        (weight, Empty)
956                    }
957                    ([Base(_), Base(_)], Empty) => {
958                        let w1 = lens.set(w1, left - 1);
959                        let w2 = lens.set(w2, right - 1);
960
961                        ((w1, w2), Empty)
962                    }
963                    (xs, m) => {
964                        panic!(
965                            "Got {} jobs when updating level {} and when one of the merge \
966                             nodes at level {} is {:?}",
967                            xs.len(),
968                            update_level,
969                            current_level,
970                            m
971                        );
972                    }
973                };
974
975                Ok((
976                    merge::Merge {
977                        weight: new_weight,
978                        job: new_m,
979                    },
980                    None::<MergeJob>,
981                ))
982            } else if current_level == update_level {
983                // Mark completed jobs as Done
984
985                match (jobs, m) {
986                    (
987                        [Merge(a)],
988                        Full(
989                            x @ Record {
990                                state: JobStatus::Todo,
991                                ..
992                            },
993                        ),
994                    ) => {
995                        let mut x = x.clone();
996                        x.state = JobStatus::Done;
997                        let new_job = Full(x);
998
999                        let (scan_result, weight) = if current_level == 0 {
1000                            let w1 = lens.set(w1, 0);
1001                            let w2 = lens.set(w2, 0);
1002
1003                            (Some(a.clone()), (w1, w2))
1004                        } else {
1005                            (None, weight.clone())
1006                        };
1007
1008                        Ok((
1009                            merge::Merge {
1010                                weight,
1011                                job: new_job,
1012                            },
1013                            scan_result,
1014                        ))
1015                    }
1016                    ([], m) => Ok((
1017                        merge::Merge {
1018                            weight: weight.clone(),
1019                            job: m.clone(),
1020                        },
1021                        None,
1022                    )),
1023                    // ([], m) => Ok(((weight, m), None)),
1024                    (xs, m) => {
1025                        panic!(
1026                            "Got {} jobs when updating level {} and when one of the merge \
1027                             nodes at level {} is {:?}",
1028                            xs.len(),
1029                            update_level,
1030                            current_level,
1031                            m
1032                        );
1033                    }
1034                }
1035            } else if update_level > 0 && (current_level < update_level - 1) {
1036                // Update the job count for all the level above
1037                match jobs {
1038                    [] => Ok((
1039                        merge::Merge {
1040                            weight: weight.clone(),
1041                            job: m.clone(),
1042                        },
1043                        None,
1044                    )),
1045                    _ => {
1046                        let jobs_length = jobs.len() as u64;
1047                        let jobs_sent_left = jobs_length.min(left);
1048                        let jobs_sent_right = (jobs_length - jobs_sent_left).min(right);
1049
1050                        let w1 = lens.set(w1, left - jobs_sent_left);
1051                        let w2 = lens.set(w2, right - jobs_sent_right);
1052                        let weight = (w1, w2);
1053
1054                        Ok((
1055                            merge::Merge {
1056                                weight,
1057                                job: m.clone(),
1058                            },
1059                            None,
1060                        ))
1061                    }
1062                }
1063            } else {
1064                Ok((
1065                    merge::Merge {
1066                        weight: weight.clone(),
1067                        job: m.clone(),
1068                    },
1069                    None,
1070                ))
1071            }
1072        };
1073
1074        let add_bases = |jobs: &[Job<BaseJob, MergeJob>], base: base::Base<BaseJob>| {
1075            use base::Job::{Empty, Full};
1076            use Job::{Base, Merge};
1077
1078            let w = base.weight;
1079            let d = base.job;
1080
1081            let weight = lens.get(&w);
1082
1083            // println!("add_bases jobs={:?} w={}", jobs.len(), weight);
1084            match (jobs, d) {
1085                ([], d) => Ok(base::Base { weight: w, job: d }),
1086                ([Base(d)], Empty) => {
1087                    let w = lens.set(&w, weight - 1);
1088
1089                    Ok(base::Base {
1090                        weight: w,
1091                        job: Full(base::Record {
1092                            job: d.clone(),
1093                            seq_no: sequence_no.clone(),
1094                            state: JobStatus::Todo,
1095                        }),
1096                    })
1097                }
1098                ([Merge(_)], Full(mut b)) => {
1099                    b.state = JobStatus::Done;
1100
1101                    Ok(base::Base {
1102                        weight: w,
1103                        job: Full(b),
1104                    })
1105                }
1106                (xs, d) => {
1107                    panic!(
1108                        "Got {} jobs when updating level {} and when one of the base nodes \
1109                         is {:?}",
1110                        xs.len(),
1111                        update_level,
1112                        d
1113                    );
1114                }
1115            }
1116        };
1117
1118        self.update_split(
1119            &add_merges,
1120            &add_bases,
1121            &|merge| merge.weight.clone(),
1122            completed_jobs,
1123            update_level,
1124            &|(w1, w2), a: &[Job<BaseJob, MergeJob>]| {
1125                let l = *lens.get(&w1) as usize;
1126                let r = *lens.get(&w2) as usize;
1127
1128                // println!("split l={} r={} len={}", l, r, a.len());
1129
1130                (take(a, l), take_at(a, l, r))
1131            },
1132        )
1133    }
1134
1135    fn reset_weights(&self, reset_kind: ResetKind) -> Self {
1136        let fun_base = |base: &base::Base<BaseJob>| {
1137            let set_one = |lens: WeightLens, weight: &Weight| lens.set(weight, 1);
1138            let set_zero = |lens: WeightLens, weight: &Weight| lens.set(weight, 0);
1139
1140            use base::{
1141                Job::{Empty, Full},
1142                Record,
1143            };
1144            use JobStatus::Todo;
1145
1146            let update_merge_weight = |weight: &Weight| {
1147                // When updating the merge-weight of base nodes, only the nodes with
1148                // "Todo" status needs to be included
1149                match &base.job {
1150                    Full(Record { state: Todo, .. }) => set_one(WeightLens::Merge, weight),
1151                    _ => set_zero(WeightLens::Merge, weight),
1152                }
1153            };
1154
1155            let update_base_weight = |weight: &Weight| {
1156                // When updating the base-weight of base nodes, only the Empty nodes
1157                // need to be included
1158                match &base.job {
1159                    Empty => set_one(WeightLens::Base, weight),
1160                    Full(_) => set_zero(WeightLens::Base, weight),
1161                }
1162            };
1163
1164            let weight = &base.weight;
1165            let (new_weight, dummy_right_for_base_nodes) = match reset_kind {
1166                ResetKind::Merge => (
1167                    update_merge_weight(weight),
1168                    set_zero(WeightLens::Merge, weight),
1169                ),
1170                ResetKind::Base => (
1171                    update_base_weight(weight),
1172                    set_zero(WeightLens::Base, weight),
1173                ),
1174                ResetKind::Both => {
1175                    let w = update_base_weight(weight);
1176                    (update_merge_weight(&w), Weight::zero())
1177                }
1178            };
1179
1180            let base = base::Base {
1181                weight: new_weight.clone(),
1182                job: base.job.clone(),
1183            };
1184
1185            (base, (new_weight, dummy_right_for_base_nodes))
1186        };
1187
1188        let fun_merge = |lst: ((Weight, Weight), (Weight, Weight)),
1189                         merge: &merge::Merge<MergeJob>| {
1190            let ((w1, w2), (w3, w4)) = &lst;
1191
1192            let reset = |lens: WeightLens, w: &Weight, ww: &Weight| {
1193                // Weights of all other jobs is sum of weights of its children
1194                (
1195                    lens.set(w, lens.get(w1) + lens.get(w2)),
1196                    lens.set(ww, lens.get(w3) + lens.get(w4)),
1197                )
1198            };
1199
1200            use merge::{Job::Full, Record};
1201            use JobStatus::Todo;
1202
1203            let ww = match reset_kind {
1204                ResetKind::Merge => {
1205                    // When updating the merge-weight of merge nodes, only the nodes
1206                    // with "Todo" status needs to be included
1207                    let lens = WeightLens::Merge;
1208                    match (&merge.weight, &merge.job) {
1209                        ((w1, w2), Full(Record { state: Todo, .. })) => {
1210                            (lens.set(w1, 1), lens.set(w2, 0))
1211                        }
1212                        ((w1, w2), _) => reset(lens, w1, w2),
1213                    }
1214                }
1215                ResetKind::Base => {
1216                    // The base-weight of merge nodes is the sum of weights of its
1217                    // children
1218                    let w = &merge.weight;
1219                    reset(WeightLens::Base, &w.0, &w.1)
1220                }
1221                ResetKind::Both => {
1222                    let w = &merge.weight;
1223                    let w = reset(WeightLens::Base, &w.0, &w.1);
1224                    reset(WeightLens::Merge, &w.0, &w.1)
1225                }
1226            };
1227
1228            let merge = merge::Merge {
1229                weight: ww.clone(),
1230                job: merge.job.clone(),
1231            };
1232
1233            (merge, ww)
1234        };
1235
1236        let (result, _) = self.update_accumulate(&fun_merge, &fun_base);
1237        result
1238    }
1239
1240    fn jobs_on_level(&self, depth: u64, level: u64) -> Vec<AvailableJob<BaseJob, MergeJob>> {
1241        use JobStatus::Todo;
1242
1243        // self.iter()
1244        //     .filter(|(d, _)| *d == depth)
1245        //     .filter_map(|(_, value)| match value {
1246        //         Value::Leaf(base::Base {
1247        //             job:
1248        //                 base::Job::Full(base::Record {
1249        //                     job, state: Todo, ..
1250        //                 }),
1251        //             ..
1252        //         }) => Some(AvailableJob::Base(job.clone())),
1253        //         Value::Node(merge::Merge {
1254        //             job:
1255        //                 merge::Job::Full(merge::Record {
1256        //                     left,
1257        //                     right,
1258        //                     state: Todo,
1259        //                     ..
1260        //                 }),
1261        //             ..
1262        //         }) => Some(AvailableJob::Merge {
1263        //             left: left.clone(),
1264        //             right: right.clone(),
1265        //         }),
1266        //         _ => None,
1267        //     })
1268        //     .collect::<Vec<_>>();
1269
1270        self.fold_depth(
1271            |i, mut acc, a| {
1272                use merge::{Job::Full, Record};
1273
1274                if let (
1275                    true,
1276                    Full(Record {
1277                        left,
1278                        right,
1279                        state: Todo,
1280                        ..
1281                    }),
1282                ) = (i == level, &a.job)
1283                {
1284                    acc.push(AvailableJob::Merge {
1285                        left: left.clone(),
1286                        right: right.clone(),
1287                    });
1288                };
1289                acc
1290            },
1291            |mut acc, d| {
1292                use base::{Job::Full, Record};
1293
1294                if let (
1295                    true,
1296                    Full(Record {
1297                        job, state: Todo, ..
1298                    }),
1299                ) = (level == depth, &d.job)
1300                {
1301                    acc.push(AvailableJob::Base(job.clone()));
1302                }
1303                acc
1304            },
1305            Vec::with_capacity(256),
1306        )
1307    }
1308
1309    fn to_hashable_jobs(&self) -> Vec<Job<base::Base<BaseJob>, merge::Merge<MergeJob>>> {
1310        use JobStatus::Done;
1311
1312        self.fold(
1313            |mut acc, a| {
1314                match &a.job {
1315                    merge::Job::Full(merge::Record { state: Done, .. }) => {}
1316                    _ => {
1317                        acc.push(Job::Merge(a.clone()));
1318                    }
1319                }
1320                acc
1321            },
1322            |mut acc, d| {
1323                match &d.job {
1324                    base::Job::Full(base::Record { state: Done, .. }) => {}
1325                    _ => {
1326                        acc.push(Job::Base(d.clone()));
1327                    }
1328                }
1329                acc
1330            },
1331            Vec::with_capacity(256),
1332        )
1333    }
1334
1335    fn jobs_records(&self) -> Vec<Job<base::Record<BaseJob>, merge::Record<MergeJob>>> {
1336        self.fold(
1337            |mut acc, a: &merge::Merge<MergeJob>| {
1338                if let merge::Job::Full(x) = &a.job {
1339                    acc.push(Job::Merge(x.clone()));
1340                }
1341                acc
1342            },
1343            |mut acc, d: &base::Base<BaseJob>| {
1344                if let base::Job::Full(j) = &d.job {
1345                    acc.push(Job::Base(j.clone()));
1346                }
1347                acc
1348            },
1349            Vec::with_capacity(256),
1350        )
1351    }
1352
1353    fn base_jobs(&self) -> Vec<BaseJob> {
1354        self.fold_depth(
1355            |_, acc, _| acc,
1356            |mut acc, d| {
1357                if let base::Job::Full(base::Record { job, .. }) = &d.job {
1358                    acc.push(job.clone());
1359                };
1360                acc
1361            },
1362            Vec::with_capacity(256),
1363        )
1364    }
1365
1366    /// calculates the number of base and merge jobs that is currently with the Todo status
1367    fn todo_job_count(&self) -> (u64, u64) {
1368        use JobStatus::Todo;
1369
1370        self.fold_depth(
1371            |_, (b, m), j| match &j.job {
1372                merge::Job::Full(merge::Record { state: Todo, .. }) => (b, m + 1),
1373                _ => (b, m),
1374            },
1375            |(b, m), d| match &d.job {
1376                base::Job::Full(base::Record { state: Todo, .. }) => (b + 1, m),
1377                _ => (b, m),
1378            },
1379            (0, 0),
1380        )
1381    }
1382
1383    fn leaves(&self) -> Vec<base::Base<BaseJob>> {
1384        self.fold_depth(
1385            |_, acc, _| acc,
1386            |mut acc, d| {
1387                if let base::Job::Full(_) = &d.job {
1388                    acc.push(d.clone());
1389                };
1390                acc
1391            },
1392            Vec::with_capacity(256),
1393        )
1394    }
1395
1396    fn required_job_count(&self) -> u64 {
1397        match &self.values[0] {
1398            Value::Node(value) => {
1399                let (w1, w2) = &value.weight;
1400                w1.merge + w2.merge
1401            }
1402            Value::Leaf(base) => base.weight.merge,
1403        }
1404    }
1405
1406    fn available_space(&self) -> u64 {
1407        match &self.values[0] {
1408            Value::Node(value) => {
1409                let (w1, w2) = &value.weight;
1410                w1.base + w2.base
1411            }
1412            Value::Leaf(base) => base.weight.base,
1413        }
1414    }
1415
1416    fn create_tree_for_level(
1417        level: i64,
1418        depth: u64,
1419        merge_job: merge::Job<MergeJob>,
1420        base_job: base::Job<BaseJob>,
1421    ) -> Self {
1422        let base_weight = if level == -1 {
1423            Weight::zero()
1424        } else {
1425            Weight { base: 1, merge: 0 }
1426        };
1427
1428        let make_base = || base::Base {
1429            weight: base_weight.clone(),
1430            job: base_job.clone(),
1431        };
1432
1433        let make_merge = |d: u64| {
1434            let weight = if level == -1 {
1435                (Weight::zero(), Weight::zero())
1436            } else {
1437                let x = 2u64.pow(level as u32) / 2u64.pow(d as u32 + 1);
1438                (Weight { base: x, merge: 0 }, Weight { base: x, merge: 0 })
1439            };
1440            merge::Merge {
1441                weight,
1442                job: merge_job.clone(),
1443            }
1444        };
1445
1446        let nnodes = 2u64.pow((depth + 1) as u32) - 1;
1447
1448        let values: Vec<_> = (0..nnodes)
1449            .map(|index| {
1450                let node_depth = btree::depth_at(index as usize);
1451
1452                if node_depth == depth {
1453                    Value::Leaf(make_base())
1454                } else {
1455                    Value::Node(make_merge(node_depth))
1456                }
1457            })
1458            .collect();
1459
1460        // println!("first={:?}", values[0]);
1461
1462        // println!("nnodes={:?} len={:?} nbases={:?}", nnodes, values.len(), values.iter().filter(|v| {
1463        //   matches!(v, Value::Leaf(_))
1464        // }).count());
1465
1466        Self { values }
1467    }
1468
1469    fn create_tree(depth: u64) -> Self {
1470        let level: i64 = depth.try_into().unwrap();
1471        Self::create_tree_for_level(level, depth, merge::Job::Empty, base::Job::Empty)
1472    }
1473}
1474
1475impl<BaseJob, MergeJob> ParallelScan<BaseJob, MergeJob>
1476where
1477    BaseJob: Debug + Clone + 'static,
1478    MergeJob: Debug + Clone + 'static,
1479{
1480    fn with_leaner_trees(&self) -> Self {
1481        use JobStatus::Done;
1482
1483        let trees = self
1484            .trees
1485            .iter()
1486            .map(|tree| {
1487                tree.map(
1488                    |merge_node| match &merge_node.job {
1489                        merge::Job::Full(merge::Record { state: Done, .. }) => merge::Merge {
1490                            weight: merge_node.weight.clone(),
1491                            job: merge::Job::Empty,
1492                        },
1493                        _ => merge_node.clone(),
1494                    },
1495                    |b| b.clone(),
1496                )
1497            })
1498            .collect();
1499
1500        Self {
1501            trees,
1502            acc: self.acc.clone(),
1503            curr_job_seq_no: self.curr_job_seq_no.clone(),
1504            max_base_jobs: self.max_base_jobs,
1505            delay: self.delay,
1506        }
1507    }
1508
1509    pub fn empty(max_base_jobs: u64, delay: u64) -> Self {
1510        let depth = ceil_log2(max_base_jobs);
1511        // println!("empty depth={:?}", depth);
1512
1513        let first_tree = Tree::create_tree(depth);
1514
1515        let mut trees = Vec::with_capacity(32);
1516        trees.push(first_tree);
1517
1518        Self {
1519            trees,
1520            acc: None,
1521            curr_job_seq_no: SequenceNumber(0),
1522            max_base_jobs,
1523            delay,
1524        }
1525    }
1526
1527    fn map<F1, F2>(&self, f1: F1, f2: F2) -> Self
1528    where
1529        F1: Fn(&MergeJob) -> MergeJob,
1530        F2: Fn(&BaseJob) -> BaseJob,
1531    {
1532        let trees = self
1533            .trees
1534            .iter()
1535            .map(|tree| tree.map_depth(&|_, m| m.map(&f1), &|a| a.map(&f2)))
1536            .collect();
1537
1538        let acc = self
1539            .acc
1540            .as_ref()
1541            .map(|(m, bs)| (f1(m), bs.iter().map(&f2).collect()));
1542
1543        Self {
1544            trees,
1545            acc,
1546            curr_job_seq_no: self.curr_job_seq_no.clone(),
1547            max_base_jobs: self.max_base_jobs,
1548            delay: self.delay,
1549        }
1550    }
1551
1552    pub fn hash<FunMerge, FunBase>(
1553        &self,
1554        fun_merge: FunMerge,
1555        fun_base: FunBase,
1556    ) -> GenericArray<u8, U32>
1557    where
1558        FunMerge: Fn(&mut Vec<u8>, &MergeJob),
1559        FunBase: Fn(&mut Vec<u8>, &BaseJob),
1560    {
1561        const BUFFER_CAPACITY: usize = 128 * 1024;
1562
1563        let Self {
1564            trees,
1565            acc,
1566            curr_job_seq_no,
1567            max_base_jobs,
1568            delay,
1569        } = self.with_leaner_trees();
1570
1571        let mut sha: Sha256 = Sha256::new();
1572        let mut buffer = Vec::with_capacity(BUFFER_CAPACITY);
1573        let buffer = &mut buffer;
1574
1575        let add_weight =
1576            |buffer: &mut Vec<u8>, w: &Weight| write!(buffer, "{}{}", w.base, w.merge).unwrap();
1577
1578        let add_two_weights = |buffer: &mut Vec<u8>, (w1, w2): &(Weight, Weight)| {
1579            add_weight(buffer, w1);
1580            add_weight(buffer, w2);
1581        };
1582
1583        for job in trees.iter().flat_map(Tree::to_hashable_jobs) {
1584            buffer.clear();
1585
1586            match &job {
1587                Job::Base(base) => match &base.job {
1588                    base::Job::Empty => {
1589                        add_weight(buffer, &base.weight);
1590                        write!(buffer, "Empty").unwrap();
1591                    }
1592                    base::Job::Full(base::Record { job, seq_no, state }) => {
1593                        add_weight(buffer, &base.weight);
1594                        write!(buffer, "Full{}{}", seq_no.0, state.as_str()).unwrap();
1595
1596                        fun_base(buffer, job);
1597                    }
1598                },
1599                Job::Merge(merge) => match &merge.job {
1600                    merge::Job::Empty => {
1601                        add_two_weights(buffer, &merge.weight);
1602                        write!(buffer, "Empty").unwrap();
1603                    }
1604                    merge::Job::Part(job) => {
1605                        add_two_weights(buffer, &merge.weight);
1606                        write!(buffer, "Part").unwrap();
1607
1608                        fun_merge(buffer, job);
1609                    }
1610                    merge::Job::Full(merge::Record {
1611                        left,
1612                        right,
1613                        seq_no,
1614                        state,
1615                    }) => {
1616                        add_two_weights(buffer, &merge.weight);
1617                        write!(buffer, "Full{}{}", seq_no.0, state.as_str()).unwrap();
1618
1619                        fun_merge(buffer, left);
1620                        fun_merge(buffer, right);
1621                    }
1622                },
1623            }
1624
1625            sha.update(buffer.as_slice());
1626
1627            // TODO: Remove this assert once we know it's a good capacity
1628            //       (buffer is not resized for serialization)
1629            assert_eq!(buffer.capacity(), BUFFER_CAPACITY);
1630        }
1631
1632        match &acc {
1633            Some((a, d_lst)) => {
1634                buffer.clear();
1635
1636                fun_merge(buffer, a);
1637                for j in d_lst {
1638                    fun_base(buffer, j);
1639                }
1640
1641                sha.update(&buffer);
1642            }
1643            None => {
1644                sha.update("None");
1645            }
1646        };
1647
1648        buffer.clear();
1649        write!(buffer, "{}{}{}", curr_job_seq_no.0, max_base_jobs, delay).unwrap();
1650        sha.update(&buffer);
1651
1652        sha.finalize()
1653    }
1654
1655    pub fn fold_chronological_until<Accum, Final, FunMerge, FunBase, FunFinish>(
1656        &self,
1657        init: Accum,
1658        fun_merge: FunMerge,
1659        fun_base: FunBase,
1660        fun_finish: FunFinish,
1661    ) -> Final
1662    where
1663        FunMerge: Fn(Accum, &merge::Merge<MergeJob>) -> ControlFlow<Final, Accum>,
1664        FunBase: Fn(Accum, &base::Base<BaseJob>) -> ControlFlow<Final, Accum>,
1665        FunFinish: Fn(Accum) -> Final,
1666    {
1667        let mut accum = init;
1668
1669        for tree in self.trees.iter().rev() {
1670            match tree.fold_depth_until_prime(&|_, acc, m| fun_merge(acc, m), &fun_base, accum) {
1671                Continue(acc) => accum = acc,
1672                Break(v) => return v,
1673            }
1674        }
1675
1676        fun_finish(accum)
1677    }
1678
1679    pub fn fold_chronological_until_err<Accum, FunMerge, FunBase, FunFinish>(
1680        &self,
1681        init: Accum,
1682        fun_merge: FunMerge,
1683        fun_base: FunBase,
1684        fun_finish: FunFinish,
1685    ) -> Result<Accum, String>
1686    where
1687        FunMerge: Fn(Accum, &merge::Merge<MergeJob>) -> Result<Accum, String>,
1688        FunBase: Fn(Accum, &base::Base<BaseJob>) -> Result<Accum, String>,
1689        FunFinish: Fn(Accum) -> Accum,
1690    {
1691        let mut accum = init;
1692
1693        for tree in self.trees.iter().rev() {
1694            match tree.fold_depth_until_prime_err(&|_, acc, m| fun_merge(acc, m), &fun_base, accum)
1695            {
1696                Ok(acc) => accum = acc,
1697                Err(e) => return Err(e),
1698            }
1699        }
1700
1701        Ok(fun_finish(accum))
1702    }
1703
1704    fn fold_chronological<Accum, FunMerge, FunBase>(
1705        &self,
1706        init: Accum,
1707        fun_merge: FunMerge,
1708        fun_base: FunBase,
1709    ) -> Accum
1710    where
1711        FunMerge: Fn(Accum, &merge::Merge<MergeJob>) -> Accum,
1712        FunBase: Fn(Accum, &base::Base<BaseJob>) -> Accum,
1713    {
1714        self.fold_chronological_until(
1715            init,
1716            |acc, a| Continue(fun_merge(acc, a)),
1717            |acc, d| Continue(fun_base(acc, d)),
1718            |v| v,
1719        )
1720    }
1721
1722    fn max_trees(&self) -> u64 {
1723        ((ceil_log2(self.max_base_jobs) + 1) * (self.delay + 1)) + 1
1724    }
1725
1726    fn work_for_tree(&self, data_tree: WorkForTree) -> Vec<AvailableJob<BaseJob, MergeJob>> {
1727        let delay = self.delay + 1;
1728
1729        // TODO: Not sure if skip(1) is correct below
1730        let trees = match data_tree {
1731            WorkForTree::Current => &self.trees[1..],
1732            WorkForTree::Next => &self.trees,
1733        };
1734
1735        // println!("WORK_FOR_TREE len={} delay={}", trees.len(), delay);
1736
1737        work(trees, delay, self.max_base_jobs)
1738    }
1739
1740    /// work on all the level and all the trees
1741    fn all_work(&self) -> Vec<Vec<AvailableJob<BaseJob, MergeJob>>> {
1742        let depth = ceil_log2(self.max_base_jobs);
1743        // TODO: Not sure if it's correct
1744        let set1 = self.work_for_tree(WorkForTree::Current);
1745        // let setaaa = self.work_for_tree(WorkForTree::Next);
1746
1747        // println!(
1748        //     "ntrees={} delay={} set1={}",
1749        //     self.trees.len(),
1750        //     self.delay,
1751        //     set1.len()
1752        // );
1753        // println!("ntrees={} set1={} setaa={}", self.trees.len(), set1.len(), setaaa.len());
1754
1755        let mut this = self.clone();
1756        this.trees.reserve(self.delay as usize + 1);
1757
1758        // println!("set1={:?}", set1.len());
1759
1760        let mut other_set = Vec::with_capacity(256);
1761        other_set.push(set1);
1762
1763        for _ in 0..self.delay + 1 {
1764            // println!("trees={}", this.trees.len());
1765            this.trees.insert(0, Tree::create_tree(depth));
1766            let work = this.work_for_tree(WorkForTree::Current);
1767
1768            // println!("work={}", work.len());
1769
1770            if !work.is_empty() {
1771                other_set.push(work);
1772            }
1773        }
1774
1775        other_set
1776    }
1777
1778    // let all_work :
1779    //     type merge base. (merge, base) t -> (merge, base) Available_job.t list list
1780    //     =
1781    //  fun t ->
1782    //   let depth = Int.ceil_log2 t.max_base_jobs in
1783    //   Printf.eprintf "ntrees=%d\n%!" (Non_empty_list.length t.trees);
1784    //   let set1 = work_for_tree t ~data_tree:`Current in
1785    //   let _, other_sets =
1786    //     List.fold ~init:(t, [])
1787    //       (List.init ~f:Fn.id (t.delay + 1))
1788    //       ~f:(fun (t, work_list) _ ->
1789    //         Printf.eprintf "trees=%d\n%!" (Non_empty_list.length t.trees);
1790    //         let trees' = Non_empty_list.cons (create_tree ~depth) t.trees in
1791    //         let t' = { t with trees = trees' } in
1792    //         let work = work_for_tree t' ~data_tree:`Current in
1793    //         Printf.eprintf "work=%d\n%!" (List.length work);
1794    //         match work_for_tree t' ~data_tree:`Current with
1795    //         | [] ->
1796    //             (t', work_list)
1797    //         | work ->
1798    //             (t', work :: work_list) )
1799    //   in
1800    //   Printf.eprintf "set1=%d\n%!" (List.length set1);
1801    //   if List.is_empty set1 then List.rev other_sets
1802    //   else set1 :: List.rev other_sets
1803
1804    fn work_for_next_update(&self, data_count: u64) -> Vec<Vec<AvailableJob<BaseJob, MergeJob>>> {
1805        let delay = self.delay + 1;
1806        let current_tree_space = self.trees[0].available_space() as usize;
1807
1808        let mut set1 = work(&self.trees[1..], delay, self.max_base_jobs);
1809        let count = data_count.min(self.max_base_jobs) as usize;
1810
1811        if current_tree_space < count {
1812            let mut set2 = work(&self.trees, delay, self.max_base_jobs);
1813            set2.truncate((count - current_tree_space) * 2);
1814
1815            [set1, set2].into_iter().filter(|v| !v.is_empty()).collect()
1816        } else {
1817            set1.truncate(2 * count);
1818
1819            if set1.is_empty() {
1820                vec![]
1821            } else {
1822                vec![set1]
1823            }
1824        }
1825    }
1826
1827    fn free_space_on_current_tree(&self) -> u64 {
1828        self.trees[0].available_space()
1829    }
1830
1831    fn add_merge_jobs(
1832        &mut self,
1833        completed_jobs: &[MergeJob],
1834    ) -> Result<Option<(MergeJob, Vec<BaseJob>)>, ()> {
1835        fn take<T>(slice: &[T], n: usize) -> &[T] {
1836            slice.get(..n).unwrap_or(slice)
1837        }
1838
1839        fn drop<T>(slice: &[T], n: usize) -> &[T] {
1840            slice.get(n..).unwrap_or(&[])
1841        }
1842
1843        if completed_jobs.is_empty() {
1844            return Ok(None);
1845        }
1846
1847        let delay = self.delay + 1;
1848        let udelay = delay as usize;
1849        let depth = ceil_log2(self.max_base_jobs);
1850
1851        let completed_jobs_len = completed_jobs.len();
1852        let merge_jobs: Vec<_> = completed_jobs.iter().cloned().map(Job::Merge).collect();
1853        let jobs_required = self.work_for_tree(WorkForTree::Current);
1854
1855        assert!(
1856            merge_jobs.len() <= jobs_required.len(),
1857            "More work than required"
1858        );
1859
1860        let curr_tree = &self.trees[0];
1861        let to_be_updated_trees = &self.trees[1..];
1862
1863        // (index, (level, njobs))
1864        let mut stats = BTreeMap::<u64, (u64, usize)>::new();
1865
1866        let (mut updated_trees, result_opt) = {
1867            let mut jobs = merge_jobs.as_slice();
1868
1869            let mut updated_trees = Vec::with_capacity(to_be_updated_trees.len());
1870            let mut scan_result = None;
1871
1872            for (i, tree) in to_be_updated_trees.iter().enumerate() {
1873                // Every nth (n=delay) tree
1874                if (i % udelay == udelay - 1) && !jobs.is_empty() {
1875                    let nrequired = tree.required_job_count() as usize;
1876                    let completed_jobs = take(jobs, nrequired);
1877                    let i = i as u64;
1878
1879                    let level = depth - (i / delay);
1880                    let old = stats.insert(i, (level, completed_jobs.len()));
1881                    assert!(old.is_none());
1882
1883                    let (tree, result) = tree.update(
1884                        completed_jobs,
1885                        depth - (i / delay),
1886                        self.curr_job_seq_no.clone(),
1887                        WeightLens::Merge,
1888                    )?;
1889
1890                    updated_trees.push(tree);
1891                    scan_result = result;
1892                    jobs = drop(jobs, nrequired);
1893                } else {
1894                    updated_trees.push(tree.clone());
1895                }
1896            }
1897
1898            (updated_trees, scan_result)
1899        };
1900
1901        for (index, (level, njobs)) in stats.iter().rev() {
1902            let index = self.trees.len() - *index as usize - 2;
1903
1904            if result_opt.is_some() && index == 0 {
1905                println!(
1906                    "- tree[{:>02}] level={} {:>3} completed jobs, a proof is emitted, tree is removed",
1907                    index,
1908                    level,
1909                    njobs
1910                );
1911            } else {
1912                println!(
1913                    "- tree[{:>02}] level={} {:>3} completed jobs (DONE)",
1914                    index, level, njobs
1915                );
1916                println!(
1917                    "           level={} {:>3} new merge jobs (TODO)",
1918                    level - 1,
1919                    njobs / 2,
1920                );
1921            }
1922        }
1923
1924        let (mut updated_trees, result_opt) = {
1925            let (updated_trees, result_opt) = match result_opt {
1926                Some(scan_result) if !updated_trees.is_empty() => {
1927                    let last = updated_trees.pop().unwrap();
1928                    let tree_data = last.base_jobs();
1929                    (updated_trees, Some((scan_result, tree_data)))
1930                }
1931                _ => (updated_trees, None),
1932            };
1933
1934            // TODO: Not sure if priority is same as OCaml here
1935            if result_opt.is_some()
1936                || (updated_trees.len() + 1) < self.max_trees() as usize
1937                    && (completed_jobs_len == jobs_required.len())
1938            {
1939                let updated_trees = updated_trees
1940                    .into_iter()
1941                    .map(|tree| tree.reset_weights(ResetKind::Merge))
1942                    .collect();
1943                (updated_trees, result_opt)
1944            } else {
1945                (updated_trees, result_opt)
1946            }
1947        };
1948
1949        updated_trees.insert(0, curr_tree.clone());
1950
1951        self.trees = updated_trees;
1952
1953        Ok(result_opt)
1954    }
1955
1956    fn add_data(
1957        &mut self,
1958        data: Vec<BaseJob>,
1959        base_kind: impl Fn(&BaseJob) -> usize,
1960    ) -> Result<(), ()> {
1961        if data.is_empty() {
1962            return Ok(());
1963        }
1964
1965        let data_len = data.len();
1966        let depth = ceil_log2(self.max_base_jobs);
1967        let tree = &self.trees[0];
1968
1969        let base_jobs: Vec<_> = data.into_iter().map(Job::Base).collect();
1970        let available_space = tree.available_space() as usize;
1971
1972        assert!(
1973            data_len <= available_space,
1974            "Data count ({}) exceeded available space ({})",
1975            data_len,
1976            available_space
1977        );
1978
1979        // println!(
1980        //     "base_jobs={:?} available_space={:?} depth={:?}",
1981        //     base_jobs.len(),
1982        //     available_space,
1983        //     depth
1984        // );
1985
1986        let (tree, _) = tree
1987            .update(
1988                &base_jobs,
1989                depth,
1990                self.curr_job_seq_no.clone(),
1991                WeightLens::Base,
1992            )
1993            .expect("Error while adding a base job to the tree");
1994
1995        let bases = tree.base_jobs();
1996        let nbase = bases.len();
1997
1998        let updated_trees = if data_len == available_space {
1999            let new_tree = Tree::create_tree(depth);
2000            let tree = tree.reset_weights(ResetKind::Both);
2001            vec![new_tree, tree]
2002        } else {
2003            let tree = tree.reset_weights(ResetKind::Merge);
2004            vec![tree]
2005        };
2006
2007        println!(
2008            "- tree[{:>02}] level=7 {:>3} new base jobs (TODO), total_nbase_jobs={:?}",
2009            self.trees.len() - 1,
2010            data_len,
2011            nbase,
2012        );
2013
2014        let max_base_jobs = self.max_base_jobs as usize;
2015
2016        let mut folded = bases.iter().fold(
2017            Vec::<(usize, usize)>::with_capacity(max_base_jobs),
2018            |mut accum, b| {
2019                let kind = base_kind(b);
2020                match accum.last_mut() {
2021                    Some(last) if last.0 == kind => last.1 += 1,
2022                    _ => accum.push((kind, 1)),
2023                }
2024                accum
2025            },
2026        );
2027
2028        let total_folded: usize = folded.iter().map(|(_, n)| *n).sum();
2029
2030        if total_folded != max_base_jobs {
2031            folded.push((10, max_base_jobs - total_folded));
2032        }
2033
2034        let to_s = |n: usize, s: &str| {
2035            if n == 1 {
2036                s.to_string()
2037            } else {
2038                format!("{n} {s}")
2039            }
2040        };
2041
2042        let s = folded
2043            .iter()
2044            .map(|(kind, n)| match kind {
2045                0 => to_s(*n, "CMD"),
2046                1 => to_s(*n, "FT"),
2047                2 => to_s(*n, "CB"),
2048                10 => to_s(*n, "EMPTY"),
2049                _ => panic!(),
2050            })
2051            .join("|");
2052
2053        println!("           level=7 has the following jobs (in this order):");
2054        println!("           {s}");
2055
2056        // println!(
2057        //     "updated_trees={} self_trees={:?}",
2058        //     updated_trees.len(),
2059        //     self.trees.len()
2060        // );
2061
2062        // println!("WORK1={:?}", work(updated_trees.as_slice(), self.delay + 1, self.max_base_jobs));
2063        // println!("WORK2={:?}", work(self.trees.as_slice(), self.delay + 1, self.max_base_jobs));
2064
2065        // self.trees.append(&mut updated_trees);
2066
2067        // let tail = &self.trees[1..];
2068        // self.trees = updated_trees.into_iter().zip(tail.iter().cloned().collect()).collect();
2069
2070        let mut old = std::mem::replace(&mut self.trees, updated_trees);
2071        old.remove(0);
2072        self.trees.append(&mut old);
2073
2074        // println!("WORK3={:?}", work(self.trees.as_slice(), self.delay + 1, self.max_base_jobs));
2075
2076        Ok(())
2077    }
2078
2079    fn reset_seq_no(&mut self) {
2080        let last = self.trees.last().unwrap();
2081        let oldest_seq_no = match last.leaves().first() {
2082            Some(base::Base {
2083                job: base::Job::Full(base::Record { seq_no, .. }),
2084                ..
2085            }) => seq_no.clone(),
2086            _ => SequenceNumber::zero(),
2087        };
2088
2089        let new_seq = |seq: &SequenceNumber| (seq - &oldest_seq_no).incr();
2090
2091        let fun_merge = |m: &merge::Merge<MergeJob>| match &m.job {
2092            merge::Job::Full(merge::Record { seq_no, .. }) => m.with_seq_no(new_seq(seq_no)),
2093            _ => m.clone(),
2094        };
2095
2096        let fun_base = |m: &base::Base<BaseJob>| match &m.job {
2097            base::Job::Full(base::Record { seq_no, .. }) => m.with_seq_no(new_seq(seq_no)),
2098            _ => m.clone(),
2099        };
2100
2101        let mut max_seq = SequenceNumber::zero();
2102        let mut updated_trees = Vec::with_capacity(self.trees.len());
2103
2104        for tree in &self.trees {
2105            use base::{Base, Job::Full, Record};
2106
2107            let tree = tree.map(fun_merge, fun_base);
2108            updated_trees.push(tree.clone());
2109
2110            let leaves = tree.leaves();
2111
2112            let last = match leaves.last() {
2113                Some(last) => last,
2114                None => continue,
2115            };
2116
2117            if let Base {
2118                job: Full(Record { seq_no, .. }),
2119                ..
2120            } = last
2121            {
2122                max_seq = max_seq.max(seq_no.clone());
2123            };
2124        }
2125
2126        self.curr_job_seq_no = max_seq;
2127        self.trees = updated_trees;
2128    }
2129
2130    fn incr_sequence_no(&mut self) {
2131        let next_seq_no = self.curr_job_seq_no.incr();
2132
2133        if next_seq_no.is_u64_max() {
2134            self.reset_seq_no();
2135        } else {
2136            self.curr_job_seq_no = next_seq_no;
2137        }
2138    }
2139
2140    fn update_helper(
2141        &mut self,
2142        data: Vec<BaseJob>,
2143        completed_jobs: Vec<MergeJob>,
2144        base_kind: impl Fn(&BaseJob) -> usize,
2145    ) -> Result<Option<(MergeJob, Vec<BaseJob>)>, ()> {
2146        fn split<T>(slice: &[T], n: usize) -> (&[T], &[T]) {
2147            (
2148                slice.get(..n).unwrap_or(slice),
2149                slice.get(n..).unwrap_or(&[]),
2150            )
2151        }
2152
2153        let data_count = data.len() as u64;
2154
2155        assert!(
2156            data_count <= self.max_base_jobs,
2157            "Data count ({}) exceeded maximum ({})",
2158            data_count,
2159            self.max_base_jobs
2160        );
2161
2162        let required_jobs_count = self
2163            .work_for_next_update(data_count)
2164            .into_iter()
2165            .flatten()
2166            .count();
2167
2168        {
2169            let required = required_jobs_count.div_ceil(2);
2170            let got = completed_jobs.len().div_ceil(2);
2171
2172            // println!("required={:?} got={:?}", required, got);
2173
2174            let max_base_jobs = self.max_base_jobs as usize;
2175            assert!(
2176                !(got < required && data.len() > max_base_jobs - required + got),
2177                "Insufficient jobs (Data count {}): Required- {} got- {}",
2178                data_count,
2179                required,
2180                got
2181            )
2182        }
2183
2184        let delay = self.delay + 1;
2185
2186        // Increment the sequence number
2187        self.incr_sequence_no();
2188
2189        let latest_tree = &self.trees[0];
2190        let available_space = latest_tree.available_space();
2191
2192        // Possible that new base jobs is added to a new tree within an
2193        // update i.e., part of it is added to the first tree and the rest
2194        // of it to a new tree. This happens when the throughput is not max.
2195        // This also requires merge jobs to be done on two different set of trees*)
2196
2197        let (data1, data2) = split(&data, available_space as usize);
2198
2199        // println!(
2200        //     "delay={} available_space={} data1={} data2={}",
2201        //     delay,
2202        //     available_space,
2203        //     data1.len(),
2204        //     data2.len()
2205        // );
2206
2207        let required_jobs_for_current_tree =
2208            work(&self.trees[1..], delay, self.max_base_jobs).len();
2209        let (jobs1, jobs2) = split(&completed_jobs, required_jobs_for_current_tree);
2210
2211        // println!(
2212        //     "required_jobs_for_current_tree={} jobs1={} jobs2={}",
2213        //     required_jobs_for_current_tree,
2214        //     jobs1.len(),
2215        //     jobs2.len()
2216        // );
2217
2218        // TODO: For logs, consider when works are splitted on 2 trees
2219        println!("scan_state update:");
2220
2221        // update first set of jobs and data
2222        let result_opt = self.add_merge_jobs(jobs1)?;
2223        self.add_data(data1.to_vec(), &base_kind)?;
2224
2225        if !jobs2.is_empty() || !data2.is_empty() {
2226            println!("scan_state update: (2nd set of jobs, new transactions didn't fit in latest/current tree)");
2227        }
2228
2229        // update second set of jobs and data.
2230        // This will be empty if all the data fit in the current tree
2231        self.add_merge_jobs(jobs2)?;
2232        self.add_data(data2.to_vec(), base_kind)?;
2233
2234        // update the latest emitted value
2235        if result_opt.is_some() {
2236            self.acc.clone_from(&result_opt);
2237        };
2238
2239        assert!(
2240            self.trees.len() <= self.max_trees() as usize,
2241            "Tree list length ({}) exceeded maximum ({})",
2242            self.trees.len(),
2243            self.max_trees()
2244        );
2245
2246        Ok(result_opt)
2247    }
2248
2249    pub fn update(
2250        &mut self,
2251        data: Vec<BaseJob>,
2252        completed_jobs: Vec<MergeJob>,
2253        base_kind: impl Fn(&BaseJob) -> usize,
2254    ) -> Result<Option<(MergeJob, Vec<BaseJob>)>, ()> {
2255        self.update_helper(data, completed_jobs, base_kind)
2256    }
2257
2258    pub fn all_jobs(&self) -> Vec<Vec<AvailableJob<BaseJob, MergeJob>>> {
2259        self.all_work()
2260    }
2261
2262    pub fn jobs_for_next_update(&self) -> Vec<Vec<AvailableJob<BaseJob, MergeJob>>> {
2263        self.work_for_next_update(self.max_base_jobs)
2264    }
2265
2266    pub fn jobs_for_slots(&self, slots: u64) -> Vec<Vec<AvailableJob<BaseJob, MergeJob>>> {
2267        self.work_for_next_update(slots)
2268    }
2269
2270    pub fn free_space(&self) -> u64 {
2271        self.max_base_jobs
2272    }
2273
2274    pub fn last_emitted_value(&self) -> Option<&(MergeJob, Vec<BaseJob>)> {
2275        self.acc.as_ref()
2276    }
2277
2278    fn current_job_sequence_number(&self) -> SequenceNumber {
2279        self.curr_job_seq_no.clone()
2280    }
2281
2282    pub fn base_jobs_on_latest_tree(&self) -> impl Iterator<Item = BaseJob> {
2283        let depth = ceil_log2(self.max_base_jobs);
2284        let level = depth;
2285
2286        self.trees[0]
2287            .jobs_on_level(depth, level)
2288            .into_iter()
2289            .filter_map(|job| match job {
2290                AvailableJob::Base(base) => Some(base),
2291                AvailableJob::Merge { .. } => None,
2292            })
2293    }
2294
2295    // 0-based indexing, so 0 indicates next-to-latest tree
2296    pub fn base_jobs_on_earlier_tree(&self, index: usize) -> impl Iterator<Item = BaseJob> {
2297        let depth = ceil_log2(self.max_base_jobs);
2298        let level = depth;
2299
2300        let earlier_trees = &self.trees[1..];
2301
2302        let base_job = |job| match job {
2303            AvailableJob::Base(base) => Some(base),
2304            AvailableJob::Merge { .. } => None,
2305        };
2306
2307        match earlier_trees.get(index) {
2308            None => {
2309                // Use `Vec::new().into_iter().filter_map` to returns same concrete type
2310                // than the `Some(_)` branch
2311                Vec::new().into_iter().filter_map(base_job)
2312            }
2313            Some(tree) => tree
2314                .jobs_on_level(depth, level)
2315                .into_iter()
2316                .filter_map(base_job),
2317        }
2318    }
2319
2320    pub fn partition_if_overflowing(&self) -> SpacePartition {
2321        let cur_tree_space = self.free_space_on_current_tree();
2322
2323        // Check actual work count because it would be zero initially
2324
2325        let work_count = self.work_for_tree(WorkForTree::Current).len() as u64;
2326        let work_count_new_tree = self.work_for_tree(WorkForTree::Next).len() as u64;
2327
2328        SpacePartition {
2329            first: (cur_tree_space, work_count),
2330            second: {
2331                if cur_tree_space < self.max_base_jobs {
2332                    let slots = self.max_base_jobs - cur_tree_space;
2333                    Some((slots, work_count_new_tree.min(2 * slots)))
2334                } else {
2335                    None
2336                }
2337            },
2338        }
2339    }
2340
2341    pub fn next_on_new_tree(&self) -> bool {
2342        let curr_tree_space = self.free_space_on_current_tree();
2343        curr_tree_space == self.max_base_jobs
2344    }
2345
2346    pub fn pending_data(&self) -> Vec<Vec<BaseJob>> {
2347        self.trees.iter().rev().map(Tree::base_jobs).collect()
2348    }
2349
2350    // #[cfg(test)]
2351    fn job_count(&self) -> (f64, f64) {
2352        use JobStatus::{Done, Todo};
2353
2354        self.fold_chronological(
2355            (0.0, 0.0),
2356            |(ntodo, ndone), merge| {
2357                use merge::{
2358                    Job::{Empty, Full, Part},
2359                    Record,
2360                };
2361
2362                let (todo, done) = match &merge.job {
2363                    Part(_) => (0.5, 0.0),
2364                    Full(Record { state: Todo, .. }) => (1.0, 0.0),
2365                    Full(Record { state: Done, .. }) => (0.0, 1.0),
2366                    Empty => (0.0, 0.0),
2367                };
2368                (ntodo + todo, ndone + done)
2369            },
2370            |(ntodo, ndone), base| {
2371                use base::{
2372                    Job::{Empty, Full},
2373                    Record,
2374                };
2375
2376                let (todo, done) = match &base.job {
2377                    Empty => (0.0, 0.0),
2378                    Full(Record { state: Todo, .. }) => (1.0, 0.0),
2379                    Full(Record { state: Done, .. }) => (0.0, 1.0),
2380                };
2381
2382                (ntodo + todo, ndone + done)
2383            },
2384        )
2385    }
2386}
2387
2388fn work_to_do<'a, BaseJob, MergeJob, I>(
2389    trees: I,
2390    max_base_jobs: u64,
2391) -> Vec<AvailableJob<BaseJob, MergeJob>>
2392where
2393    I: Iterator<Item = &'a Tree<base::Base<BaseJob>, merge::Merge<MergeJob>>>,
2394    BaseJob: Debug + Clone + 'static,
2395    MergeJob: Debug + Clone + 'static,
2396{
2397    let depth = ceil_log2(max_base_jobs);
2398
2399    // let trees: Vec<_> = trees.collect();
2400
2401    // println!("work_to_do length={}", trees.len());
2402
2403    trees
2404        // .iter()
2405        .enumerate()
2406        .flat_map(|(i, tree)| {
2407            let level = depth - i as u64;
2408            tree.jobs_on_level(depth, level)
2409        })
2410        .collect()
2411}
2412
2413fn work<'a, BaseJob, MergeJob, I>(
2414    trees: I,
2415    delay: u64,
2416    max_base_jobs: u64,
2417) -> Vec<AvailableJob<BaseJob, MergeJob>>
2418where
2419    I: IntoIterator<Item = &'a Tree<base::Base<BaseJob>, merge::Merge<MergeJob>>>,
2420    BaseJob: Debug + Clone + 'static,
2421    MergeJob: Debug + Clone + 'static,
2422{
2423    let depth = ceil_log2(max_base_jobs) as usize;
2424    let delay = delay as usize;
2425
2426    // println!("WORK_DELAY={}", delay);
2427
2428    let work_trees = trees
2429        .into_iter()
2430        .enumerate()
2431        .filter_map(|(i, t)| {
2432            if i % delay == delay - 1 {
2433                Some(t)
2434            } else {
2435                None
2436            }
2437        })
2438        .take(depth + 1);
2439
2440    work_to_do(work_trees, max_base_jobs)
2441}
2442
2443fn take<T>(slice: &[T], n: usize) -> &[T] {
2444    slice.get(..n).unwrap_or(slice)
2445}
2446
2447fn take_at<T>(slice: &[T], skip: usize, n: usize) -> &[T] {
2448    slice.get(skip..).map(|s| take(s, n)).unwrap_or(&[])
2449}
2450
2451pub const fn ceil_log2(n: u64) -> u64 {
2452    // let ceil_log2 i =
2453    //   if i <= 0
2454    //   then raise_s (Sexp.message "[Int.ceil_log2] got invalid input" [ "", sexp_of_int i ]);
2455    //   if i = 1 then 0 else num_bits - clz (i - 1)
2456
2457    assert!(n > 0);
2458    if n == 1 {
2459        0
2460    } else {
2461        u64::BITS as u64 - (n - 1).leading_zeros() as u64
2462    }
2463}
2464
2465fn flatten<T>(v: Vec<Vec<T>>) -> Vec<T> {
2466    v.into_iter().flatten().collect()
2467}
2468
2469// #[cfg(test)]
2470fn assert_job_count<B, M>(
2471    s1: &ParallelScan<B, M>,
2472    s2: &ParallelScan<B, M>,
2473    completed_job_count: f64,
2474    base_job_count: f64,
2475    value_emitted: bool,
2476) where
2477    B: Debug + Clone + 'static,
2478    M: Debug + Clone + 'static,
2479{
2480    // println!("s1={:#?}", s1);
2481    // println!("s2={:?}", s2);
2482
2483    let (todo_before, done_before) = s1.job_count();
2484    let (todo_after, done_after) = s2.job_count();
2485
2486    // println!(
2487    //     "before todo={:?} done={:?}",
2488    //     s1.job_count().0,
2489    //     s1.job_count().1
2490    // );
2491    // println!(
2492    //     "after  todo={:?} done={:?}",
2493    //     s2.job_count().0,
2494    //     s2.job_count().1
2495    // );
2496
2497    // ordered list of jobs that is actually called when distributing work
2498    let all_jobs = flatten(s2.all_jobs());
2499
2500    // list of jobs
2501
2502    let all_jobs_expected_count = s2
2503        .trees
2504        .iter()
2505        .fold(Vec::with_capacity(s2.trees.len()), |mut acc, tree| {
2506            let mut records = tree.jobs_records();
2507            acc.append(&mut records);
2508            acc
2509        })
2510        .into_iter()
2511        .filter(|job| {
2512            matches!(
2513                job,
2514                Job::Base(base::Record {
2515                    state: JobStatus::Todo,
2516                    ..
2517                }) | Job::Merge(merge::Record {
2518                    state: JobStatus::Todo,
2519                    ..
2520                })
2521            )
2522        })
2523        .count();
2524
2525    // println!(
2526    //     "all_jobs={} all_jobs_expected={}",
2527    //     all_jobs.len(),
2528    //     all_jobs_expected_count
2529    // );
2530
2531    assert_eq!(all_jobs.len(), all_jobs_expected_count);
2532
2533    let expected_todo_after = {
2534        let new_jobs = if value_emitted {
2535            (completed_job_count - 1.0) / 2.0
2536        } else {
2537            completed_job_count / 2.0
2538        };
2539        todo_before + base_job_count - completed_job_count + new_jobs
2540    };
2541
2542    let expected_done_after = {
2543        let jobs_from_delete_tree = if value_emitted {
2544            ((2 * s1.max_base_jobs) - 1) as f64
2545        } else {
2546            0.0
2547        };
2548        done_before + completed_job_count - jobs_from_delete_tree
2549    };
2550
2551    assert_eq!(todo_after, expected_todo_after);
2552    assert_eq!(done_after, expected_done_after);
2553}
2554
2555fn test_update<B, M>(
2556    s1: &ParallelScan<B, M>,
2557    data: Vec<B>,
2558    completed_jobs: Vec<M>,
2559) -> (Option<(M, Vec<B>)>, ParallelScan<B, M>)
2560where
2561    B: Debug + Clone + 'static,
2562    M: Debug + Clone + 'static,
2563{
2564    let mut s2 = s1.clone();
2565    let result_opt = s2
2566        .update(data.clone(), completed_jobs.clone(), |_| 0)
2567        .unwrap();
2568
2569    assert_job_count(
2570        s1,
2571        &s2,
2572        completed_jobs.len() as f64,
2573        data.len() as f64,
2574        result_opt.is_some(),
2575    );
2576    (result_opt, s2)
2577}
2578
2579fn int_to_string(u: &usize) -> String {
2580    u.to_string()
2581}
2582
2583// fn sint_to_string(i: &i64) -> String {
2584//     i.to_string()
2585// }
2586
2587fn sint_to_string(buffer: &mut Vec<u8>, i: &i64) {
2588    write!(buffer, "{}", i).unwrap();
2589}
2590
2591fn hash(state: &ParallelScan<i64, i64>) -> String {
2592    hex::encode(state.hash(sint_to_string, sint_to_string))
2593}
2594
2595#[cfg(test)]
2596mod tests {
2597    use std::{
2598        array,
2599        sync::{
2600            atomic::{AtomicBool, Ordering::Relaxed},
2601            mpsc::{sync_channel, Receiver, SyncSender},
2602        },
2603    };
2604
2605    use mina_core::thread;
2606    use rand::Rng;
2607
2608    use super::*;
2609
2610    #[test]
2611    fn test_ceil_log2() {
2612        for a in 1..50u64 {
2613            let v = (a as f32).log2().ceil() as u64;
2614            let w = ceil_log2(a);
2615            // println!("{} {} {}", a, v, w);
2616            assert_eq!(v, w);
2617        }
2618    }
2619
2620    // Make sure that sha256 produces same result when data is splitted or not
2621    #[test]
2622    fn test_sha256() {
2623        let array: [u8; 2 * 1024] = array::from_fn(|i| (i % 256) as u8);
2624        let mut slice = &array[..];
2625
2626        let mut sha256 = sha2::Sha256::new();
2627        for byte in slice.iter().copied() {
2628            sha256.update(&[byte][..]);
2629        }
2630        let first = sha256.finalize();
2631
2632        let mut sha256 = sha2::Sha256::new();
2633        let mut n = 1;
2634        while !slice.is_empty() {
2635            sha256.update(slice.get(..n).unwrap_or(slice));
2636            slice = slice.get(n..).unwrap_or(&[]);
2637
2638            n += 2;
2639        }
2640        let second = sha256.finalize();
2641
2642        assert_eq!(first, second);
2643    }
2644
2645    #[test]
2646    fn test_range_at_depth() {
2647        let ranges: Vec<_> = (0..10u64).map(btree::range_at_depth).collect();
2648
2649        assert_eq!(
2650            ranges,
2651            [
2652                0..1,
2653                1..3,
2654                3..7,
2655                7..15,
2656                15..31,
2657                31..63,
2658                63..127,
2659                127..255,
2660                255..511,
2661                511..1023,
2662            ]
2663        );
2664    }
2665
2666    /// <https://github.com/MinaProtocol/mina/blob/2ee6e004ba8c6a0541056076aab22ea162f7eb3a/src/lib/parallel_scan/parallel_scan.ml#L1525>
2667    #[test]
2668    fn always_max_base_jobs() {
2669        const MAX_BASE_JOS: u64 = 512;
2670
2671        // let int_to_string = |i: &usize| i.to_string();
2672
2673        // let state = ParallelScan::<usize, usize>::empty(8, 3);
2674        // println!("STATE len={:?} ={:#?}", state.trees.len(), state);
2675        // println!("hash={:?}", hash(&state));
2676
2677        let mut state = ParallelScan::<usize, usize>::empty(MAX_BASE_JOS, 3);
2678        let mut expected_result: Vec<Vec<usize>> = vec![];
2679
2680        // println!("hash={:?}", hex::encode(state.hash(int_to_string, int_to_string)));
2681        // println!("hash={:?}", state.hash(int_to_string, int_to_string));
2682
2683        for i in 0..100 {
2684            println!("####### LOOP {:?} #########", i);
2685
2686            let data: Vec<_> = (0..MAX_BASE_JOS as usize).map(|j| i + j).collect();
2687
2688            expected_result.push(data.clone());
2689
2690            let work: Vec<_> = state
2691                .work_for_next_update(data.len() as u64)
2692                .into_iter()
2693                .flatten()
2694                .collect();
2695
2696            let new_merges: Vec<_> = work
2697                .iter()
2698                .map(|job| match job {
2699                    AvailableJob::Base(i) => *i,
2700                    AvailableJob::Merge { left, right } => *left + *right,
2701                })
2702                .collect();
2703
2704            println!("work={:?} new_merges={:?}", work.len(), new_merges.len());
2705            // println!("hash_s1={:?}", hash(&state));
2706
2707            // let mut s2 = state.clone();
2708            // let result_opt = s2.update(data.clone(), new_merges.clone()).unwrap();
2709            // println!("hash_s2={:?}", hash(&s2));
2710
2711            let (result_opt, s) = test_update(&state, data, new_merges);
2712
2713            // assert!(result_opt.is_none());
2714
2715            let (expected_result_, remaining_expected_results) = {
2716                match result_opt {
2717                    None => ((0, vec![]), expected_result.clone()),
2718                    Some(ref r) => {
2719                        println!("RESULT_OPT.0={} len={}", r.0, r.1.len());
2720                        // println!("expected_result={:?}", expected_result);
2721                        // Printf.eprintf "RESULT_OPT.0=%d len=%d\n%!" a (List.length l);
2722                        if expected_result.is_empty() {
2723                            ((0, vec![]), vec![])
2724                        } else {
2725                            let first = expected_result[0].clone();
2726                            let sum: usize = first.iter().sum();
2727
2728                            ((sum, first), expected_result[1..].to_vec())
2729                        }
2730                    }
2731                }
2732            };
2733
2734            assert_eq!(
2735                result_opt.as_ref().unwrap_or(&expected_result_),
2736                &expected_result_
2737            );
2738
2739            expected_result = remaining_expected_results;
2740            state = s;
2741        }
2742    }
2743
2744    /// <https://github.com/MinaProtocol/mina/blob/2ee6e004ba8c6a0541056076aab22ea162f7eb3a/src/lib/parallel_scan/parallel_scan.ml#L1562>
2745    #[test]
2746    fn random_base_jobs() {
2747        const MAX_BASE_JOS: usize = 512;
2748
2749        let mut state = ParallelScan::<usize, usize>::empty(MAX_BASE_JOS as u64, 3);
2750        let mut rng = rand::thread_rng();
2751        let expected_result = (MAX_BASE_JOS, vec![1usize; MAX_BASE_JOS]);
2752
2753        for _ in 0..1_000 {
2754            let mut data = vec![1; rng.gen_range(0..=30)];
2755            data.truncate(MAX_BASE_JOS);
2756            let data_len = data.len();
2757
2758            println!("list_length={}", data_len);
2759
2760            let work: Vec<_> = state
2761                .work_for_next_update(data_len as u64)
2762                .into_iter()
2763                .flatten()
2764                .take(data_len * 2)
2765                .collect();
2766            let new_merges: Vec<_> = work
2767                .iter()
2768                .map(|job| match job {
2769                    AvailableJob::Base(i) => *i,
2770                    AvailableJob::Merge { left, right } => left + right,
2771                })
2772                .collect();
2773
2774            let (result_opt, s) = test_update(&state, data, new_merges);
2775
2776            assert_eq!(
2777                result_opt.as_ref().unwrap_or(&expected_result),
2778                &expected_result
2779            );
2780
2781            state = s;
2782        }
2783    }
2784
2785    fn gen<FunDone, FunAcc>(fun_job_done: FunDone, fun_acc: FunAcc) -> ParallelScan<i64, i64>
2786    where
2787        FunDone: Fn(&AvailableJob<i64, i64>) -> i64,
2788        FunAcc: Fn(Option<(i64, Vec<i64>)>, (i64, Vec<i64>)) -> Option<(i64, Vec<i64>)>,
2789    {
2790        let mut rng = rand::thread_rng();
2791
2792        let depth = rng.gen_range(2..5);
2793        let delay = rng.gen_range(0..=3);
2794
2795        let max_base_jobs = 2u64.pow(depth);
2796
2797        let mut s = ParallelScan::<i64, i64>::empty(max_base_jobs, delay);
2798
2799        let ndatas = rng.gen_range(2..=20);
2800        let datas: Vec<Vec<i64>> = (1..ndatas)
2801            .map(|_| {
2802                std::iter::repeat_with(|| rng.gen())
2803                    .take(max_base_jobs as usize)
2804                    .collect()
2805            })
2806            .collect();
2807
2808        // let datas = vec![
2809        //     // vec![-58823712978749242i64 as i64, 25103, 33363641, -1611555993190i64 as i64]
2810        //     // std::iter::repeat_with(|| rng.gen_range(0..1000))
2811        //     std::iter::repeat_with(|| rng.gen())
2812        //     // std::iter::repeat_with(|| 1)
2813        //         .take(max_base_jobs as usize)
2814        //         .collect::<Vec<_>>()
2815        // ];
2816
2817        for data in datas {
2818            println!("ndata={}", data.len());
2819
2820            let jobs = flatten(s.work_for_next_update(data.len() as u64));
2821
2822            let jobs_done: Vec<i64> = jobs.iter().map(&fun_job_done).collect();
2823
2824            println!("jobs_donea={}", jobs_done.len());
2825
2826            let old_tuple = s.acc.clone();
2827
2828            let res_opt = s.update(data, jobs_done, |_| 0).unwrap();
2829
2830            if let Some(res) = res_opt {
2831                let tuple = if old_tuple.is_some() {
2832                    old_tuple
2833                } else {
2834                    s.acc.clone()
2835                };
2836
2837                s.acc = fun_acc(tuple, res);
2838            }
2839            println!("s.acc.is_some={:?}", s.acc.is_some());
2840        }
2841
2842        s
2843    }
2844
2845    fn fun_merge_up(
2846        state: Option<(i64, Vec<i64>)>,
2847        mut x: (i64, Vec<i64>),
2848    ) -> Option<(i64, Vec<i64>)> {
2849        let mut acc = state?;
2850        acc.1.append(&mut x.1);
2851        Some((acc.0.wrapping_add(x.0), acc.1))
2852    }
2853
2854    fn job_done(job: &AvailableJob<i64, i64>) -> i64 {
2855        match job {
2856            AvailableJob::Base(x) => *x,
2857            // AvailableJob::Merge { left, right } => left + right,
2858            AvailableJob::Merge { left, right } => {
2859                // let left = *left as i64;
2860                // let right = *right as i64;
2861                left.wrapping_add(*right)
2862                // left + right
2863            }
2864        }
2865    }
2866
2867    /// scan (+) over ints
2868    ///
2869    /// <https://github.com/MinaProtocol/mina/blob/2ee6e004ba8c6a0541056076aab22ea162f7eb3a/src/lib/parallel_scan/parallel_scan.ml#L1677>
2870    #[test]
2871    fn split_on_if_enqueuing_onto_the_next_queue() {
2872        let mut rng = rand::thread_rng();
2873
2874        let p = 4;
2875        let max_base_jobs = 2u64.pow(p);
2876
2877        for _ in 0..100000 {
2878            let state = ParallelScan::<i64, i64>::empty(max_base_jobs, 1);
2879            println!("hash_state={:?}", hash(&state));
2880            let i = rng.gen_range(0..max_base_jobs);
2881
2882            let data: Vec<i64> = (0..i as i64).collect();
2883
2884            let partition = state.partition_if_overflowing();
2885            let jobs = flatten(state.work_for_next_update(data.len() as u64));
2886
2887            let jobs_done: Vec<i64> = jobs.iter().map(job_done).collect();
2888
2889            let tree_count_before = state.trees.len();
2890
2891            let (_, s) = test_update(&state, data, jobs_done);
2892
2893            println!("second={:?}", partition.second.is_some());
2894
2895            match partition.second {
2896                None => {
2897                    let tree_count_after = s.trees.len();
2898                    let expected_tree_count = if partition.first.0 == i {
2899                        tree_count_before + 1
2900                    } else {
2901                        tree_count_before
2902                    };
2903                    assert_eq!(tree_count_after, expected_tree_count);
2904                }
2905                Some(_) => {
2906                    let tree_count_after = s.trees.len();
2907                    let expected_tree_count = if i > partition.first.0 {
2908                        tree_count_before + 1
2909                    } else {
2910                        tree_count_before
2911                    };
2912                    assert_eq!(tree_count_after, expected_tree_count);
2913                }
2914            }
2915        }
2916    }
2917
2918    /// <https://github.com/MinaProtocol/mina/blob/2ee6e004ba8c6a0541056076aab22ea162f7eb3a/src/lib/parallel_scan/parallel_scan.ml#L1722>
2919    #[test]
2920    fn sequence_number_reset() {
2921        let p = 3;
2922        let max_base_jobs = 2u64.pow(p);
2923
2924        let jobs = |state: &ParallelScan<i64, i64>| -> Vec<Vec<Job<base::Record<i64>, merge::Record<i64>>>> {
2925            state.trees.iter().map(|t| t.jobs_records()).rev().collect()
2926        };
2927
2928        let verify_sequence_number = |state: &ParallelScan<i64, i64>| {
2929            let mut state = state.clone();
2930            state.reset_seq_no();
2931
2932            let jobs_list = jobs(&state);
2933
2934            let depth = ceil_log2(max_base_jobs + 1);
2935
2936            for (i, jobs) in jobs_list.iter().enumerate() {
2937                // each tree has jobs up till a level below the older tree
2938                //  and have the following sequence numbers after reset
2939                //             4
2940                //         3       3
2941                //       2   2   2   2
2942                //      1 1 1 1 1 1 1 1
2943
2944                let cur_levels = depth - i as u64;
2945
2946                let seq_sum = (0..cur_levels).fold(0, |acc, j| {
2947                    let j = j + i as u64;
2948                    acc + (2u64.pow(j as u32) * (depth - j))
2949                });
2950
2951                let offset = i as u64;
2952
2953                let sum_of_all_seq_numbers: u64 = jobs
2954                    .iter()
2955                    .map(|job| match job {
2956                        Job::Base(base::Record { seq_no, .. }) => seq_no.0 - offset,
2957                        Job::Merge(merge::Record { seq_no, .. }) => seq_no.0 - offset,
2958                    })
2959                    .sum();
2960
2961                dbg!(sum_of_all_seq_numbers, seq_sum);
2962                assert_eq!(sum_of_all_seq_numbers, seq_sum);
2963            }
2964        };
2965
2966        let mut state = ParallelScan::<i64, i64>::empty(max_base_jobs, 0);
2967        let mut counter = 0;
2968
2969        for _ in 0..50 {
2970            let jobs = flatten(state.jobs_for_next_update());
2971            let jobs_done: Vec<_> = jobs.iter().map(job_done).collect();
2972            let data: Vec<i64> = (0..max_base_jobs as i64).collect();
2973
2974            let (res_opt, s) = test_update(&state, data, jobs_done);
2975
2976            state = s;
2977
2978            if res_opt.is_some() {
2979                if counter > p {
2980                    verify_sequence_number(&state);
2981                } else {
2982                    counter += 1;
2983                }
2984            };
2985        }
2986
2987        assert_eq!(
2988            hash(&state),
2989            "931a0dc0a488289000c195ae361138cc713deddc179b5d22bfa6344508d0cfb5"
2990        );
2991    }
2992
2993    fn step_on_free_space<F, FAcc, B, M>(
2994        state: &mut ParallelScan<B, M>,
2995        w: &SyncSender<Option<(M, Vec<B>)>>,
2996        mut ds: Vec<B>,
2997        f: F,
2998        f_acc: FAcc,
2999    ) where
3000        F: Fn(&AvailableJob<B, M>) -> M,
3001        FAcc: Fn(Option<(M, Vec<B>)>, (M, Vec<B>)) -> Option<(M, Vec<B>)>,
3002        B: Debug + Clone + 'static,
3003        M: Debug + Clone + 'static,
3004    {
3005        loop {
3006            let data = take(&ds, state.max_base_jobs as usize);
3007
3008            let jobs = flatten(state.work_for_next_update(data.len() as u64));
3009            let jobs_done: Vec<_> = jobs.iter().map(&f).collect();
3010
3011            let old_tuple = state.acc.clone();
3012
3013            let (res_opt, mut s) = test_update(state, data.to_vec(), jobs_done);
3014
3015            let s = match res_opt {
3016                Some(x) => {
3017                    let tuple = if old_tuple.is_some() {
3018                        f_acc(old_tuple, x)
3019                    } else {
3020                        s.acc.clone()
3021                    };
3022                    s.acc = tuple;
3023                    s
3024                }
3025                None => s,
3026            };
3027
3028            w.send(s.acc.clone()).unwrap();
3029
3030            *state = s;
3031
3032            let rem_ds = ds.get(state.max_base_jobs as usize..).unwrap_or(&[]);
3033
3034            if rem_ds.is_empty() {
3035                return;
3036            } else {
3037                ds = rem_ds.to_vec();
3038            }
3039        }
3040    }
3041
3042    fn do_steps<F, FAcc, B, M>(
3043        state: &mut ParallelScan<B, M>,
3044        recv: &Receiver<B>,
3045        f: F,
3046        f_acc: FAcc,
3047        w: SyncSender<Option<(M, Vec<B>)>>,
3048    ) where
3049        F: Fn(&AvailableJob<B, M>) -> M,
3050        FAcc: Fn(Option<(M, Vec<B>)>, (M, Vec<B>)) -> Option<(M, Vec<B>)>,
3051        B: Debug + Clone + 'static,
3052        M: Debug + Clone + 'static,
3053    {
3054        let data = recv.recv().unwrap();
3055        step_on_free_space(state, &w, vec![data], &f, &f_acc);
3056    }
3057
3058    fn scan<F, FAcc, B, M>(
3059        s: &mut ParallelScan<B, M>,
3060        data: &Receiver<B>,
3061        f: F,
3062        f_acc: FAcc,
3063    ) -> Receiver<Option<(M, Vec<B>)>>
3064    where
3065        F: Fn(&AvailableJob<B, M>) -> M,
3066        FAcc: Fn(Option<(M, Vec<B>)>, (M, Vec<B>)) -> Option<(M, Vec<B>)>,
3067        B: Debug + Clone + 'static,
3068        M: Debug + Clone + 'static,
3069    {
3070        let (send, rec) = std::sync::mpsc::sync_channel::<Option<(M, Vec<B>)>>(1);
3071        do_steps(s, data, f, f_acc, send);
3072        rec
3073    }
3074
3075    fn step_repeatedly<F, FAcc, B, M>(
3076        state: &mut ParallelScan<B, M>,
3077        data: &Receiver<B>,
3078        f: F,
3079        f_acc: FAcc,
3080    ) -> Receiver<Option<(M, Vec<B>)>>
3081    where
3082        F: Fn(&AvailableJob<B, M>) -> M,
3083        FAcc: Fn(Option<(M, Vec<B>)>, (M, Vec<B>)) -> Option<(M, Vec<B>)>,
3084        B: Debug + Clone + 'static,
3085        M: Debug + Clone + 'static,
3086    {
3087        let (send, rec) = std::sync::mpsc::sync_channel::<Option<(M, Vec<B>)>>(1);
3088        do_steps(state, data, f, f_acc, send);
3089        rec
3090    }
3091
3092    /// <https://github.com/MinaProtocol/mina/blob/2ee6e004ba8c6a0541056076aab22ea162f7eb3a/src/lib/parallel_scan/parallel_scan.ml#L1803>
3093    #[test]
3094    fn scan_can_be_initialized_from_intermediate_state() {
3095        for _ in 0..10 {
3096            let mut state = gen(job_done, fun_merge_up);
3097
3098            println!("state={:#?}", state);
3099
3100            let do_one_next = Arc::new(AtomicBool::new(false));
3101
3102            let do_one_next_clone = Arc::clone(&do_one_next);
3103            let (send, recv) = sync_channel(1);
3104
3105            thread::spawn(move || loop {
3106                let v = if do_one_next_clone.load(Relaxed) {
3107                    do_one_next_clone.store(false, Relaxed);
3108                    1i64
3109                } else {
3110                    0i64
3111                };
3112                if send.send(v).is_err() {
3113                    return;
3114                }
3115            });
3116
3117            let one_then_zeros = recv;
3118
3119            let parallelism = state.max_base_jobs * ceil_log2(state.max_base_jobs);
3120            let old_acc = state.acc.as_ref().cloned().unwrap_or((0, vec![]));
3121
3122            let fill_some_zero =
3123                |state: &mut ParallelScan<i64, i64>, v: i64, r: &Receiver<i64>| -> i64 {
3124                    (0..parallelism * parallelism).fold(v, |acc, _| {
3125                        let pipe = step_repeatedly(state, r, job_done, fun_merge_up);
3126
3127                        match pipe.recv() {
3128                            Ok(Some((v, _))) => v,
3129                            Ok(None) => acc,
3130                            Err(_) => acc,
3131                        }
3132                    })
3133                };
3134
3135            let v = fill_some_zero(&mut state, 0, &one_then_zeros);
3136
3137            do_one_next.store(true, Relaxed);
3138
3139            let acc = { state.acc.clone().unwrap() };
3140
3141            assert_ne!(acc.0, old_acc.0);
3142
3143            fill_some_zero(&mut state, v, &one_then_zeros);
3144
3145            let acc_plus_one = { state.acc.unwrap() };
3146            assert_eq!(acc_plus_one.0, acc.0.wrapping_add(1));
3147        }
3148    }
3149
3150    /// scan (+) over ints, map from string
3151    ///
3152    /// <https://github.com/MinaProtocol/mina/blob/2ee6e004ba8c6a0541056076aab22ea162f7eb3a/src/lib/parallel_scan/parallel_scan.ml#L1879>
3153    #[test]
3154    fn scan_behaves_like_a_fold_long_term() {
3155        fn fun_merge_up(
3156            tuple: Option<(i64, Vec<String>)>,
3157            mut x: (i64, Vec<String>),
3158        ) -> Option<(i64, Vec<String>)> {
3159            let mut acc = tuple?;
3160            acc.1.append(&mut x.1);
3161
3162            Some((acc.0.wrapping_add(x.0), acc.1))
3163        }
3164
3165        fn job_done(job: &AvailableJob<String, i64>) -> i64 {
3166            match job {
3167                AvailableJob::Base(x) => x.parse().unwrap(),
3168                AvailableJob::Merge { left, right } => left.wrapping_add(*right),
3169            }
3170        }
3171
3172        let (send, recv) = sync_channel(1);
3173
3174        let depth = 7;
3175        let count: i64 = 1000;
3176
3177        thread::spawn(move || {
3178            let mut count = count;
3179            let x = count;
3180            loop {
3181                let next = if count <= 0 {
3182                    "0".to_string()
3183                } else {
3184                    (x - count).to_string()
3185                };
3186
3187                count -= 1;
3188
3189                if send.send(next).is_err() {
3190                    return;
3191                }
3192            }
3193        });
3194
3195        let mut s = ParallelScan::<String, i64>::empty(2u64.pow(depth as u32), 1);
3196
3197        let after_3n = (0..4 * count).fold(0i64, |acc, _| {
3198            let result = scan(&mut s, &recv, job_done, fun_merge_up);
3199            match result.recv() {
3200                Ok(Some((v, _))) => v,
3201                Ok(None) => acc,
3202                Err(_) => acc,
3203            }
3204        });
3205
3206        let expected = (0..count).fold(0i64, |a, b| a.wrapping_add(b));
3207
3208        assert_eq!(after_3n, expected);
3209    }
3210
3211    /// scan performs operation in correct order with \
3212    /// non-commutative semigroup
3213    ///
3214    /// <https://github.com/MinaProtocol/mina/blob/2ee6e004ba8c6a0541056076aab22ea162f7eb3a/src/lib/parallel_scan/parallel_scan.ml#L1917>
3215    #[test]
3216    fn scan_concat_over_strings() {
3217        fn fun_merge_up(
3218            tuple: Option<(String, Vec<String>)>,
3219            mut x: (String, Vec<String>),
3220        ) -> Option<(String, Vec<String>)> {
3221            let mut acc = tuple?;
3222            acc.1.append(&mut x.1);
3223
3224            Some((format!("{}{}", acc.0, x.0), acc.1))
3225        }
3226
3227        fn job_done(job: &AvailableJob<String, String>) -> String {
3228            match job {
3229                AvailableJob::Base(x) => x.clone(),
3230                AvailableJob::Merge { left, right } => {
3231                    format!("{}{}", left, right)
3232                }
3233            }
3234        }
3235
3236        let (send, recv) = sync_channel(1);
3237
3238        let depth = 7;
3239        let count: i64 = 100;
3240
3241        thread::spawn(move || {
3242            let mut count = count;
3243            let x = count;
3244            loop {
3245                let next = if count <= 0 {
3246                    "".to_string()
3247                } else {
3248                    let n = (x - count).to_string();
3249                    format!("{},", n)
3250                };
3251
3252                count -= 1;
3253
3254                if send.send(next).is_err() {
3255                    return;
3256                }
3257            }
3258        });
3259
3260        let mut s = ParallelScan::<String, String>::empty(2u64.pow(depth as u32), 1);
3261
3262        let after_3n = (0..42 * count).fold(String::new(), |acc, _| {
3263            let result = scan(&mut s, &recv, job_done, fun_merge_up);
3264            match result.recv() {
3265                Ok(Some((v, _))) => v,
3266                Ok(None) => acc,
3267                Err(_) => acc,
3268            }
3269        });
3270
3271        let expected = (0..count)
3272            .map(|i| format!("{},", i))
3273            .fold(String::new(), |a, b| format!("{}{}", a, b));
3274
3275        assert_eq!(after_3n, expected);
3276    }
3277}