1use std::{
2 collections::{BTreeMap, VecDeque},
3 fmt::Debug,
4 io::Write,
5 ops::ControlFlow,
6 sync::Arc,
7};
8
9use itertools::Itertools;
10use serde::{Deserialize, Serialize};
11use sha2::{
12 digest::{generic_array::GenericArray, typenum::U32},
13 Digest, Sha256,
14};
15use ControlFlow::{Break, Continue};
16
17use super::scan_state::transaction_snark::{LedgerProofWithSokMessage, TransactionWithWitness};
18
19#[derive(Clone, Eq, Ord, PartialEq, PartialOrd)]
22pub struct SequenceNumber(u64);
23
24impl Debug for SequenceNumber {
25 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
26 f.write_fmt(format_args!("{}", self.0))
27 }
28}
29
30impl SequenceNumber {
31 pub(super) fn new(number: u64) -> Self {
32 Self(number)
33 }
34
35 fn zero() -> Self {
36 Self(0)
37 }
38
39 pub fn incr(&self) -> Self {
40 Self(self.0 + 1)
41 }
42
43 fn is_u64_max(&self) -> bool {
44 self.0 == u64::MAX
45 }
46
47 pub fn as_u64(&self) -> u64 {
48 self.0
49 }
50}
51
52impl std::ops::Sub for &'_ SequenceNumber {
53 type Output = SequenceNumber;
54
55 fn sub(self, rhs: &'_ SequenceNumber) -> Self::Output {
56 SequenceNumber(self.0 - rhs.0)
57 }
58}
59
60#[derive(Clone, Debug)]
63pub enum JobStatus {
64 Todo,
65 Done,
66}
67
68impl JobStatus {
69 pub fn is_done(&self) -> bool {
70 matches!(self, Self::Done)
71 }
72
73 fn as_str(&self) -> &'static str {
74 match self {
75 JobStatus::Todo => "Todo",
76 JobStatus::Done => "Done",
77 }
78 }
79}
80
81#[derive(Clone)]
85pub struct Weight {
86 pub(super) base: u64,
87 pub(super) merge: u64,
88}
89
90impl Debug for Weight {
91 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
92 write!(f, "{{ base: {} merge: {} }}", self.base, self.merge)
93 }
94}
95
96impl Weight {
97 fn zero() -> Self {
98 Self { base: 0, merge: 0 }
99 }
100}
101
102trait Lens {
103 type Value;
104 type Target;
105 fn get<'a>(&self, target: &'a Self::Target) -> &'a Self::Value;
106 fn set(&self, target: &Self::Target, value: Self::Value) -> Self::Target;
107}
108
109enum WeightLens {
110 Base,
111 Merge,
112}
113
114impl Lens for WeightLens {
115 type Value = u64;
116 type Target = Weight;
117
118 fn get<'a>(&self, target: &'a Self::Target) -> &'a Self::Value {
119 match self {
120 WeightLens::Base => &target.base,
121 WeightLens::Merge => &target.merge,
122 }
123 }
124
125 fn set(&self, target: &Self::Target, value: Self::Value) -> Self::Target {
126 match self {
127 WeightLens::Base => Self::Target {
128 base: value,
129 merge: target.merge,
130 },
131 WeightLens::Merge => Self::Target {
132 base: target.base,
133 merge: value,
134 },
135 }
136 }
137}
138
139#[derive(Debug)]
140enum WorkForTree {
141 Current,
142 Next,
143}
144
145pub mod base {
147 use super::*;
148
149 #[derive(Clone)]
150 pub struct Record<BaseJob> {
151 pub job: BaseJob,
152 pub seq_no: SequenceNumber,
153 pub state: JobStatus,
154 }
155
156 impl<BaseJob> Debug for Record<BaseJob> {
157 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
158 let Self {
159 job: _,
160 seq_no,
161 state,
162 } = self;
163 f.write_fmt(format_args!("seq_no: {:?}, state: {:?}", seq_no, state))
164 }
165 }
166
167 #[derive(Clone)]
168 pub enum Job<BaseJob> {
169 Empty,
170 Full(Record<BaseJob>),
171 }
172
173 impl<BaseJob> Debug for Job<BaseJob> {
174 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
175 match self {
176 Self::Empty => write!(f, "Empty"),
177 Self::Full(arg0) => f.write_fmt(format_args!("Full {{ {:?} }}", arg0)),
178 }
179 }
180 }
181
182 #[derive(Clone, Debug)]
183 pub struct Base<BaseJob> {
184 pub weight: Weight,
185 pub job: Job<BaseJob>,
186 }
187
188 impl<BaseJob: Clone> Record<BaseJob> {
189 pub fn map<F: Fn(&BaseJob) -> BaseJob>(&self, fun: F) -> Self {
190 Self {
191 job: fun(&self.job),
192 seq_no: self.seq_no.clone(),
193 state: self.state.clone(),
194 }
195 }
196
197 pub fn with_seq_no(&self, no: SequenceNumber) -> Self {
198 Self {
199 seq_no: no,
200 state: self.state.clone(),
201 job: self.job.clone(),
202 }
203 }
204 }
205
206 impl<BaseJob: Clone> Job<BaseJob> {
207 pub fn map<F: Fn(&BaseJob) -> BaseJob>(&self, fun: F) -> Self {
208 match self {
209 Job::Empty => Self::Empty,
210 Job::Full(r) => Job::Full(r.map(fun)),
211 }
212 }
213 }
214
215 impl<BaseJob: Clone> Base<BaseJob> {
216 pub fn map<F: Fn(&BaseJob) -> BaseJob>(&self, fun: F) -> Self {
217 Self {
218 weight: self.weight.clone(),
219 job: self.job.map(fun),
220 }
221 }
222
223 pub fn with_seq_no(&self, no: SequenceNumber) -> Self {
224 Self {
225 weight: self.weight.clone(),
226 job: match &self.job {
227 Job::Full(record) => Job::Full(record.with_seq_no(no)),
228 x => x.clone(),
229 },
230 }
231 }
232 }
233}
234
235pub mod merge {
237 use super::*;
238
239 #[derive(Clone)]
240 pub struct Record<MergeJob> {
241 pub left: MergeJob,
242 pub right: MergeJob,
243 pub seq_no: SequenceNumber,
244 pub state: JobStatus,
245 }
246
247 impl<MergeJob> Debug for Record<MergeJob> {
248 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
249 let Self {
250 left: _,
251 right: _,
252 seq_no,
253 state,
254 } = self;
255 f.write_fmt(format_args!("seq_no: {:?}, state: {:?}", seq_no, state))
256 }
257 }
258
259 impl<MergeJob: Clone> Record<MergeJob> {
260 pub fn with_seq_no(&self, no: SequenceNumber) -> Self {
261 Self {
262 seq_no: no,
263 left: self.left.clone(),
264 right: self.right.clone(),
265 state: self.state.clone(),
266 }
267 }
268 }
269
270 #[derive(Clone)]
271 pub enum Job<MergeJob> {
272 Empty,
273 Part(MergeJob), Full(Record<MergeJob>),
275 }
276
277 impl<MergeJob> Debug for Job<MergeJob> {
278 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
279 match self {
280 Self::Empty => write!(f, "Empty"),
281 Self::Part(_) => write!(f, "Part(merge)"),
282 Self::Full(arg0) => f.write_fmt(format_args!("Full {{ {:?} }}", arg0)),
283 }
284 }
285 }
286
287 #[derive(Clone, Debug)]
288 pub struct Merge<MergeJob> {
289 pub weight: (Weight, Weight),
290 pub job: Job<MergeJob>,
291 }
292
293 impl<MergeJob> Record<MergeJob> {
294 pub fn map<F: Fn(&MergeJob) -> MergeJob>(&self, fun: F) -> Self {
295 Self {
296 left: fun(&self.left),
297 right: fun(&self.right),
298 seq_no: self.seq_no.clone(),
299 state: self.state.clone(),
300 }
301 }
302 }
303
304 impl<MergeJob> Job<MergeJob> {
305 pub fn map<F: Fn(&MergeJob) -> MergeJob>(&self, fun: F) -> Self {
306 match self {
307 Job::Empty => Self::Empty,
308 Job::Part(j) => Job::Part(fun(j)),
309 Job::Full(r) => Job::Full(r.map(fun)),
310 }
311 }
312 }
313
314 impl<MergeJob: Clone> Merge<MergeJob> {
315 pub fn map<F: Fn(&MergeJob) -> MergeJob>(&self, fun: F) -> Self {
316 Self {
317 weight: self.weight.clone(),
318 job: self.job.map(fun),
319 }
320 }
321
322 pub fn with_seq_no(&self, no: SequenceNumber) -> Self {
323 Self {
324 weight: self.weight.clone(),
325 job: match &self.job {
326 Job::Full(record) => Job::Full(record.with_seq_no(no)),
327 x => x.clone(),
328 },
329 }
330 }
331 }
332}
333
334#[derive(Clone, Debug, Serialize, Deserialize)]
336#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
337pub enum AvailableJob<BaseJob, MergeJob> {
338 Base(BaseJob),
339 Merge { left: MergeJob, right: MergeJob },
340}
341
342#[derive(Clone, Debug)]
344enum Job<BaseJob, MergeJob> {
345 Base(BaseJob),
346 Merge(MergeJob),
347}
348
349#[derive(Clone)]
356pub struct SpacePartition {
357 pub first: (u64, u64),
358 pub second: Option<(u64, u64)>,
359}
360
361impl Debug for SpacePartition {
362 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
363 struct Detail {
364 space_available: u64,
365 njobs_to_be_completed: u64,
366 }
367
368 impl Debug for Detail {
369 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
370 f.write_fmt(format_args!(
371 "space_available: {}, njobs_to_be_completed={}",
372 self.space_available, self.njobs_to_be_completed
373 ))
374 }
375 }
376
377 f.debug_struct("SpacePartition")
378 .field(
379 "first(current_tree)",
380 &Detail {
381 space_available: self.first.0,
382 njobs_to_be_completed: self.first.1,
383 },
384 )
385 .field(
386 "second(next_tree)",
387 &self.second.map(|second| Detail {
388 space_available: second.0,
389 njobs_to_be_completed: second.1,
390 }),
391 )
392 .finish()
393 }
394}
395
396trait WithVTable<T>: Debug {
397 fn by_ref(&self) -> &T;
398}
399
400impl<T: Debug> WithVTable<T> for T {
401 fn by_ref(&self) -> &Self {
402 self
403 }
404}
405
406#[derive(Clone, Debug)]
407pub enum Value<B, M> {
408 Leaf(B),
409 Node(M),
410}
411
412#[derive(Clone)]
414pub struct Tree<B, M> {
415 pub(super) values: Vec<Value<B, M>>,
416}
417
418impl<B, M> Debug for Tree<base::Base<B>, merge::Merge<M>>
419where
420 B: Debug,
421 M: Debug,
422{
423 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
424 enum BaseOrMerge<'a, B, M> {
425 Base(&'a base::Base<B>),
426 Merge(&'a merge::Merge<M>),
427 }
428
429 impl<B, M> Debug for BaseOrMerge<'_, B, M>
430 where
431 B: Debug,
432 M: Debug,
433 {
434 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
435 match self {
436 Self::Base(arg0) => write!(f, "{:?}", arg0),
437 Self::Merge(arg0) => write!(f, "{:?}", arg0),
438 }
439 }
440 }
441
442 let mut by_depth = BTreeMap::<usize, Vec<_>>::default();
443
444 for (index, v) in self.values.iter().enumerate() {
445 let vec = by_depth.entry(btree::depth_at(index) as usize).or_default();
446 let v = match v {
447 Value::Leaf(b) => BaseOrMerge::Base(b),
448 Value::Node(m) => BaseOrMerge::Merge(m),
449 };
450 vec.push(v);
451 }
452
453 for (depth, values) in by_depth.iter() {
454 writeln!(f, "depth={} {:#?}", depth, values)?;
455 }
456
457 Ok(())
458 }
459}
460
461mod btree {
462 pub fn depth_at(index: usize) -> u64 {
464 let depth = ((index + 1) as f32).log2().floor() as u32;
467 depth as u64
468 }
469
470 pub fn child_left(index: usize) -> usize {
471 (index * 2) + 1
472 }
473
474 pub fn child_right(index: usize) -> usize {
475 (index * 2) + 2
476 }
477
478 pub fn parent(index: usize) -> Option<usize> {
479 Some(index.checked_sub(1)? / 2)
480 }
481
482 pub fn range_at_depth(depth: u64) -> std::ops::Range<usize> {
483 if depth == 0 {
484 return 0..1;
485 }
486
487 let start = (1 << depth) - 1;
488 let end = (1 << (depth + 1)) - 1;
489
490 start..end
491 }
492}
493
494impl<B, M> Tree<B, M>
495where
496 B: Debug + 'static,
497 M: Debug + 'static,
498{
499 fn map_depth<FunMerge, FunBase>(&self, fun_merge: &FunMerge, fun_base: &FunBase) -> Self
501 where
502 FunMerge: for<'a> Fn(u64, &'a M) -> M,
503 FunBase: for<'a> Fn(&'a B) -> B,
504 {
505 let values = self
506 .values
507 .iter()
508 .enumerate()
509 .map(|(index, value)| match value {
510 Value::Leaf(base) => Value::Leaf(fun_base(base)),
511 Value::Node(merge) => Value::Node(fun_merge(btree::depth_at(index), merge)),
512 })
513 .collect();
514
515 Self { values }
516 }
517
518 pub(super) fn values_by_depth(&self) -> BTreeMap<u64, Value<Vec<&B>, Vec<&M>>> {
520 let mut values: BTreeMap<u64, Value<Vec<&B>, Vec<&M>>> = BTreeMap::default();
521
522 for (index, value) in self.values.iter().enumerate() {
523 values
524 .entry(btree::depth_at(index))
525 .and_modify(|for_depth: &mut Value<_, _>| match for_depth {
526 Value::Leaf(vec) => {
527 let Value::Leaf(leaf) = value else {
528 panic!("invalid")
529 };
530 vec.push(leaf);
531 }
532 Value::Node(vec) => {
533 let Value::Node(node) = value else {
534 panic!("invalid")
535 };
536 vec.push(node);
537 }
538 })
539 .or_insert({
540 match value {
541 Value::Leaf(leaf) => {
542 let mut vec = Vec::with_capacity(255);
543 vec.push(leaf);
544 Value::Leaf(vec)
545 }
546 Value::Node(node) => {
547 let mut vec = Vec::with_capacity(255);
548 vec.push(node);
549 Value::Node(vec)
550 }
551 }
552 });
553 }
554
555 values
556 }
557
558 fn map<FunMerge, FunBase>(&self, fun_merge: FunMerge, fun_base: FunBase) -> Self
559 where
560 FunMerge: Fn(&M) -> M,
561 FunBase: Fn(&B) -> B,
562 {
563 self.map_depth(&|_, m| fun_merge(m), &fun_base)
564 }
565
566 fn fold_depth_until_prime<Accum, Final, FunMerge, FunBase>(
568 &self,
569 fun_merge: &FunMerge,
570 fun_base: &FunBase,
571 init: Accum,
572 ) -> ControlFlow<Final, Accum>
573 where
574 FunMerge: Fn(u64, Accum, &M) -> ControlFlow<Final, Accum>,
575 FunBase: Fn(Accum, &B) -> ControlFlow<Final, Accum>,
576 {
577 let mut accum = init;
578
579 for (index, value) in self.values.iter().enumerate() {
580 accum = match value {
581 Value::Leaf(base) => fun_base(accum, base)?,
582 Value::Node(merge) => fun_merge(btree::depth_at(index), accum, merge)?,
583 };
584 }
585
586 Continue(accum)
587 }
588
589 fn fold_depth_until_prime_err<Accum, FunMerge, FunBase>(
591 &self,
592 fun_merge: &FunMerge,
593 fun_base: &FunBase,
594 init: Accum,
595 ) -> Result<Accum, String>
596 where
597 FunMerge: Fn(u64, Accum, &M) -> Result<Accum, String>,
598 FunBase: Fn(Accum, &B) -> Result<Accum, String>,
599 {
600 let mut accum = init;
601
602 for (index, value) in self.values.iter().enumerate() {
603 accum = match value {
604 Value::Leaf(base) => fun_base(accum, base)?,
605 Value::Node(merge) => fun_merge(btree::depth_at(index), accum, merge)?,
606 };
607 }
608
609 Ok(accum)
610 }
611
612 fn fold_depth_until<Accum, Final, FunFinish, FunMerge, FunBase>(
613 &self,
614 fun_merge: FunMerge,
615 fun_base: FunBase,
616 fun_finish: FunFinish,
617 init: Accum,
618 ) -> Final
619 where
620 FunMerge: Fn(u64, Accum, &M) -> ControlFlow<Final, Accum>,
621 FunBase: Fn(Accum, &B) -> ControlFlow<Final, Accum>,
622 FunFinish: Fn(Accum) -> Final,
623 {
624 match self.fold_depth_until_prime(&fun_merge, &fun_base, init) {
625 Continue(accum) => fun_finish(accum),
626 Break(value) => value,
627 }
628 }
629
630 fn fold_depth<Accum, FunMerge, FunBase>(
631 &self,
632 fun_merge: FunMerge,
633 fun_base: FunBase,
634 init: Accum,
635 ) -> Accum
636 where
637 FunMerge: Fn(u64, Accum, &M) -> Accum,
638 FunBase: Fn(Accum, &B) -> Accum,
639 {
640 self.fold_depth_until(
641 |i, accum, a| Continue(fun_merge(i, accum, a)),
642 |accum, d| Continue(fun_base(accum, d)),
643 |x| x,
644 init,
645 )
646 }
647
648 fn fold<Accum, FunMerge, FunBase>(
649 &self,
650 fun_merge: FunMerge,
651 fun_base: FunBase,
652 init: Accum,
653 ) -> Accum
654 where
655 FunMerge: Fn(Accum, &M) -> Accum,
656 FunBase: Fn(Accum, &B) -> Accum,
657 {
658 self.fold_depth(|_, accum, a| fun_merge(accum, a), fun_base, init)
659 }
660
661 fn fold_until<Accum, Final, FunFinish, FunMerge, FunBase>(
662 &self,
663 fun_merge: FunMerge,
664 fun_base: FunBase,
665 fun_finish: FunFinish,
666 init: Accum,
667 ) -> Final
668 where
669 FunMerge: Fn(Accum, &M) -> ControlFlow<Final, Accum>,
670 FunBase: Fn(Accum, &B) -> ControlFlow<Final, Accum>,
671 FunFinish: Fn(Accum) -> Final,
672 {
673 self.fold_depth_until(
674 |_, accum, a| fun_merge(accum, a),
675 fun_base,
676 fun_finish,
677 init,
678 )
679 }
680
681 fn update_split<Data, FunJobs, FunWeight, FunMerge, FunBase, Weight, R>(
682 &self,
683 fun_merge: &FunMerge,
684 fun_base: &FunBase,
685 weight_merge: &FunWeight,
686 jobs: &[Data],
687 update_level: u64,
688 jobs_split: &FunJobs,
689 ) -> Result<(Self, Option<R>), ()>
690 where
691 FunMerge: Fn(&[Data], u64, &M) -> Result<(M, Option<R>), ()>,
692 FunBase: Fn(&[Data], B) -> Result<B, ()>,
693 FunWeight: Fn(&M) -> (Weight, Weight),
694 FunJobs: Fn((Weight, Weight), &[Data]) -> (&[Data], &[Data]),
695 Data: Clone,
696 M: Clone,
697 B: Clone,
698 {
699 let mut values = Vec::with_capacity(self.values.len());
700 let mut scan_result = None;
701
702 let mut jobs_fifo = VecDeque::with_capacity(self.values.len());
706
707 jobs_fifo.push_back(jobs);
708
709 for (index, value) in self.values.iter().enumerate() {
710 let depth = btree::depth_at(index);
711
712 if depth > update_level {
713 values.push(value.clone());
714 continue;
715 }
716
717 let jobs_for_this = jobs_fifo.pop_front().unwrap();
718
719 let value = match value {
720 Value::Leaf(base) => Value::Leaf(fun_base(jobs_for_this, base.clone())?),
721 Value::Node(merge) => {
722 let (jobs_left, jobs_right) = jobs_split(weight_merge(merge), jobs_for_this);
723 jobs_fifo.push_back(jobs_left);
724 jobs_fifo.push_back(jobs_right);
725
726 let (value, result) = fun_merge(jobs_for_this, depth, merge)?;
727
728 if scan_result.is_none() {
729 scan_result = Some(result);
730 }
731
732 Value::Node(value)
733 }
734 };
735
736 values.push(value);
737 }
738
739 assert_eq!(jobs_fifo.capacity(), self.values.len());
740
741 Ok(((Self { values }), scan_result.flatten()))
742 }
743
744 fn update_accumulate<Data, FunMerge, FunBase>(
745 &self,
746 fun_merge: &FunMerge,
747 fun_base: &FunBase,
748 ) -> (Self, Data)
749 where
750 FunMerge: Fn((Data, Data), &M) -> (M, Data),
751 FunBase: Fn(&B) -> (B, Data),
752 Data: Clone,
753 {
754 let mut datas = vec![None; self.values.len()];
755
756 let childs_of = |data: &mut [Option<Data>], index: usize| -> Option<(Data, Data)> {
757 let left = data
758 .get_mut(btree::child_left(index))
759 .and_then(Option::take)?;
760 let right = data
761 .get_mut(btree::child_right(index))
762 .and_then(Option::take)?;
763
764 Some((left, right))
765 };
766
767 let mut values: Vec<_> = self
768 .values
769 .iter()
770 .enumerate()
771 .rev()
772 .map(|(index, value)| match value {
773 Value::Leaf(base) => {
774 let (new_base, count_list) = fun_base(base);
775 datas[index] = Some(count_list);
776 Value::Leaf(new_base)
777 }
778 Value::Node(merge) => {
779 let (left, right) = childs_of(datas.as_mut(), index).unwrap();
780 let (value, count_list) = fun_merge((left, right), merge);
781 datas[index] = Some(count_list);
782 Value::Node(value)
783 }
784 })
785 .collect();
786
787 values.reverse();
788
789 (Self { values }, datas[0].take().unwrap())
790 }
791
792 fn iter(&self) -> impl Iterator<Item = (u64, &Value<B, M>)> {
793 self.values
794 .iter()
795 .enumerate()
796 .map(|(index, value)| (btree::depth_at(index), value))
797 }
798}
799
800impl Tree<base::Base<Arc<TransactionWithWitness>>, merge::Merge<Arc<LedgerProofWithSokMessage>>> {
801 pub fn view(&self) -> impl Iterator<Item = JobValueWithIndex<'_>> {
802 self.values
803 .iter()
804 .enumerate()
805 .map(|(index, value)| JobValueWithIndex {
806 index,
807 job: match value {
808 Value::Leaf(base) => Value::Leaf(&base.job),
809 Value::Node(merge) => Value::Node(&merge.job),
810 },
811 })
812 }
813}
814
815pub type JobValue<'a> = super::parallel_scan::Value<
816 &'a base::Job<Arc<TransactionWithWitness>>,
817 &'a merge::Job<Arc<LedgerProofWithSokMessage>>,
818>;
819
820pub struct JobValueWithIndex<'a> {
821 index: usize,
822 pub job: JobValue<'a>,
823}
824
825impl JobValueWithIndex<'_> {
826 pub fn index(&self) -> usize {
827 self.index
828 }
829
830 pub fn depth(&self) -> usize {
831 btree::depth_at(self.index) as usize
832 }
833
834 pub fn parent(&self) -> Option<usize> {
835 btree::parent(self.index)
836 }
837
838 pub fn child_left(&self) -> usize {
839 btree::child_left(self.index)
840 }
841
842 pub fn child_right(&self) -> usize {
843 btree::child_right(self.index)
844 }
845
846 pub fn bundle_sibling(&self) -> Option<(usize, bool)> {
848 let parent = self.parent()?;
849 let try_sibling =
850 |parent, index| btree::parent(index).filter(|p| *p == parent).map(|_| index);
851 try_sibling(parent, self.index - 1)
852 .map(|i| (i, true))
853 .or_else(|| try_sibling(parent, self.index + 1).map(|i| (i, false)))
854 }
855}
856
857#[derive(Clone, Debug)]
858pub struct ParallelScan<BaseJob, MergeJob> {
859 pub trees: Vec<Tree<base::Base<BaseJob>, merge::Merge<MergeJob>>>,
860 pub(super) acc: Option<(MergeJob, Vec<BaseJob>)>,
862 pub curr_job_seq_no: SequenceNumber,
864 pub(super) max_base_jobs: u64,
866 pub(super) delay: u64,
867}
868
869pub(super) enum ResetKind {
870 Base,
871 Merge,
872 Both,
873}
874
875impl<BaseJob, MergeJob> Tree<base::Base<BaseJob>, merge::Merge<MergeJob>>
876where
877 BaseJob: Clone + Debug + 'static,
878 MergeJob: Clone + Debug + 'static,
879{
880 fn update(
881 &self,
882 completed_jobs: &[Job<BaseJob, MergeJob>],
883 update_level: u64,
884 sequence_no: SequenceNumber,
885 lens: WeightLens,
886 ) -> Result<(Self, Option<MergeJob>), ()> {
887 let add_merges = |jobs: &[Job<BaseJob, MergeJob>],
888 current_level: u64,
889 merge_job: &merge::Merge<MergeJob>|
890 -> Result<(merge::Merge<MergeJob>, Option<MergeJob>), ()> {
891 use merge::{
892 Job::{Empty, Full, Part},
893 Record,
894 };
895 use Job::{Base, Merge};
896
897 let weight = &merge_job.weight;
898 let m = &merge_job.job;
899
900 let (w1, w2) = &weight;
901 let (left, right) = (*lens.get(w1), *lens.get(w2));
902
903 if update_level > 0 && current_level == update_level - 1 {
905 let (new_weight, new_m) = match (jobs, m) {
907 ([], m) => (weight.clone(), m.clone()),
908 ([Merge(a), Merge(b)], Empty) => {
909 let w1 = lens.set(w1, left - 1);
910 let w2 = lens.set(w2, right - 1);
911
912 (
913 (w1, w2),
914 Full(Record {
915 left: a.clone(),
916 right: b.clone(),
917 seq_no: sequence_no.clone(),
918 state: JobStatus::Todo,
919 }),
920 )
921 }
922 ([Merge(a)], Empty) => {
923 let w1 = lens.set(w1, left - 1);
924 let w2 = lens.set(w2, right);
925
926 ((w1, w2), Part(a.clone()))
927 }
928 ([Merge(b)], Part(a)) => {
929 let w1 = lens.set(w1, left);
930 let w2 = lens.set(w2, right - 1);
931
932 (
933 (w1, w2),
934 Full(Record {
935 left: a.clone(),
936 right: b.clone(),
937 seq_no: sequence_no.clone(),
938 state: JobStatus::Todo,
939 }),
940 )
941 }
942 ([Base(_)], Empty) => {
943 let weight = if left == 0 {
946 let w1 = lens.set(w1, left);
947 let w2 = lens.set(w2, right - 1);
948 (w1, w2)
949 } else {
950 let w1 = lens.set(w1, left - 1);
951 let w2 = lens.set(w2, right);
952 (w1, w2)
953 };
954
955 (weight, Empty)
956 }
957 ([Base(_), Base(_)], Empty) => {
958 let w1 = lens.set(w1, left - 1);
959 let w2 = lens.set(w2, right - 1);
960
961 ((w1, w2), Empty)
962 }
963 (xs, m) => {
964 panic!(
965 "Got {} jobs when updating level {} and when one of the merge \
966 nodes at level {} is {:?}",
967 xs.len(),
968 update_level,
969 current_level,
970 m
971 );
972 }
973 };
974
975 Ok((
976 merge::Merge {
977 weight: new_weight,
978 job: new_m,
979 },
980 None::<MergeJob>,
981 ))
982 } else if current_level == update_level {
983 match (jobs, m) {
986 (
987 [Merge(a)],
988 Full(
989 x @ Record {
990 state: JobStatus::Todo,
991 ..
992 },
993 ),
994 ) => {
995 let mut x = x.clone();
996 x.state = JobStatus::Done;
997 let new_job = Full(x);
998
999 let (scan_result, weight) = if current_level == 0 {
1000 let w1 = lens.set(w1, 0);
1001 let w2 = lens.set(w2, 0);
1002
1003 (Some(a.clone()), (w1, w2))
1004 } else {
1005 (None, weight.clone())
1006 };
1007
1008 Ok((
1009 merge::Merge {
1010 weight,
1011 job: new_job,
1012 },
1013 scan_result,
1014 ))
1015 }
1016 ([], m) => Ok((
1017 merge::Merge {
1018 weight: weight.clone(),
1019 job: m.clone(),
1020 },
1021 None,
1022 )),
1023 (xs, m) => {
1025 panic!(
1026 "Got {} jobs when updating level {} and when one of the merge \
1027 nodes at level {} is {:?}",
1028 xs.len(),
1029 update_level,
1030 current_level,
1031 m
1032 );
1033 }
1034 }
1035 } else if update_level > 0 && (current_level < update_level - 1) {
1036 match jobs {
1038 [] => Ok((
1039 merge::Merge {
1040 weight: weight.clone(),
1041 job: m.clone(),
1042 },
1043 None,
1044 )),
1045 _ => {
1046 let jobs_length = jobs.len() as u64;
1047 let jobs_sent_left = jobs_length.min(left);
1048 let jobs_sent_right = (jobs_length - jobs_sent_left).min(right);
1049
1050 let w1 = lens.set(w1, left - jobs_sent_left);
1051 let w2 = lens.set(w2, right - jobs_sent_right);
1052 let weight = (w1, w2);
1053
1054 Ok((
1055 merge::Merge {
1056 weight,
1057 job: m.clone(),
1058 },
1059 None,
1060 ))
1061 }
1062 }
1063 } else {
1064 Ok((
1065 merge::Merge {
1066 weight: weight.clone(),
1067 job: m.clone(),
1068 },
1069 None,
1070 ))
1071 }
1072 };
1073
1074 let add_bases = |jobs: &[Job<BaseJob, MergeJob>], base: base::Base<BaseJob>| {
1075 use base::Job::{Empty, Full};
1076 use Job::{Base, Merge};
1077
1078 let w = base.weight;
1079 let d = base.job;
1080
1081 let weight = lens.get(&w);
1082
1083 match (jobs, d) {
1085 ([], d) => Ok(base::Base { weight: w, job: d }),
1086 ([Base(d)], Empty) => {
1087 let w = lens.set(&w, weight - 1);
1088
1089 Ok(base::Base {
1090 weight: w,
1091 job: Full(base::Record {
1092 job: d.clone(),
1093 seq_no: sequence_no.clone(),
1094 state: JobStatus::Todo,
1095 }),
1096 })
1097 }
1098 ([Merge(_)], Full(mut b)) => {
1099 b.state = JobStatus::Done;
1100
1101 Ok(base::Base {
1102 weight: w,
1103 job: Full(b),
1104 })
1105 }
1106 (xs, d) => {
1107 panic!(
1108 "Got {} jobs when updating level {} and when one of the base nodes \
1109 is {:?}",
1110 xs.len(),
1111 update_level,
1112 d
1113 );
1114 }
1115 }
1116 };
1117
1118 self.update_split(
1119 &add_merges,
1120 &add_bases,
1121 &|merge| merge.weight.clone(),
1122 completed_jobs,
1123 update_level,
1124 &|(w1, w2), a: &[Job<BaseJob, MergeJob>]| {
1125 let l = *lens.get(&w1) as usize;
1126 let r = *lens.get(&w2) as usize;
1127
1128 (take(a, l), take_at(a, l, r))
1131 },
1132 )
1133 }
1134
1135 fn reset_weights(&self, reset_kind: ResetKind) -> Self {
1136 let fun_base = |base: &base::Base<BaseJob>| {
1137 let set_one = |lens: WeightLens, weight: &Weight| lens.set(weight, 1);
1138 let set_zero = |lens: WeightLens, weight: &Weight| lens.set(weight, 0);
1139
1140 use base::{
1141 Job::{Empty, Full},
1142 Record,
1143 };
1144 use JobStatus::Todo;
1145
1146 let update_merge_weight = |weight: &Weight| {
1147 match &base.job {
1150 Full(Record { state: Todo, .. }) => set_one(WeightLens::Merge, weight),
1151 _ => set_zero(WeightLens::Merge, weight),
1152 }
1153 };
1154
1155 let update_base_weight = |weight: &Weight| {
1156 match &base.job {
1159 Empty => set_one(WeightLens::Base, weight),
1160 Full(_) => set_zero(WeightLens::Base, weight),
1161 }
1162 };
1163
1164 let weight = &base.weight;
1165 let (new_weight, dummy_right_for_base_nodes) = match reset_kind {
1166 ResetKind::Merge => (
1167 update_merge_weight(weight),
1168 set_zero(WeightLens::Merge, weight),
1169 ),
1170 ResetKind::Base => (
1171 update_base_weight(weight),
1172 set_zero(WeightLens::Base, weight),
1173 ),
1174 ResetKind::Both => {
1175 let w = update_base_weight(weight);
1176 (update_merge_weight(&w), Weight::zero())
1177 }
1178 };
1179
1180 let base = base::Base {
1181 weight: new_weight.clone(),
1182 job: base.job.clone(),
1183 };
1184
1185 (base, (new_weight, dummy_right_for_base_nodes))
1186 };
1187
1188 let fun_merge = |lst: ((Weight, Weight), (Weight, Weight)),
1189 merge: &merge::Merge<MergeJob>| {
1190 let ((w1, w2), (w3, w4)) = &lst;
1191
1192 let reset = |lens: WeightLens, w: &Weight, ww: &Weight| {
1193 (
1195 lens.set(w, lens.get(w1) + lens.get(w2)),
1196 lens.set(ww, lens.get(w3) + lens.get(w4)),
1197 )
1198 };
1199
1200 use merge::{Job::Full, Record};
1201 use JobStatus::Todo;
1202
1203 let ww = match reset_kind {
1204 ResetKind::Merge => {
1205 let lens = WeightLens::Merge;
1208 match (&merge.weight, &merge.job) {
1209 ((w1, w2), Full(Record { state: Todo, .. })) => {
1210 (lens.set(w1, 1), lens.set(w2, 0))
1211 }
1212 ((w1, w2), _) => reset(lens, w1, w2),
1213 }
1214 }
1215 ResetKind::Base => {
1216 let w = &merge.weight;
1219 reset(WeightLens::Base, &w.0, &w.1)
1220 }
1221 ResetKind::Both => {
1222 let w = &merge.weight;
1223 let w = reset(WeightLens::Base, &w.0, &w.1);
1224 reset(WeightLens::Merge, &w.0, &w.1)
1225 }
1226 };
1227
1228 let merge = merge::Merge {
1229 weight: ww.clone(),
1230 job: merge.job.clone(),
1231 };
1232
1233 (merge, ww)
1234 };
1235
1236 let (result, _) = self.update_accumulate(&fun_merge, &fun_base);
1237 result
1238 }
1239
1240 fn jobs_on_level(&self, depth: u64, level: u64) -> Vec<AvailableJob<BaseJob, MergeJob>> {
1241 use JobStatus::Todo;
1242
1243 self.fold_depth(
1271 |i, mut acc, a| {
1272 use merge::{Job::Full, Record};
1273
1274 if let (
1275 true,
1276 Full(Record {
1277 left,
1278 right,
1279 state: Todo,
1280 ..
1281 }),
1282 ) = (i == level, &a.job)
1283 {
1284 acc.push(AvailableJob::Merge {
1285 left: left.clone(),
1286 right: right.clone(),
1287 });
1288 };
1289 acc
1290 },
1291 |mut acc, d| {
1292 use base::{Job::Full, Record};
1293
1294 if let (
1295 true,
1296 Full(Record {
1297 job, state: Todo, ..
1298 }),
1299 ) = (level == depth, &d.job)
1300 {
1301 acc.push(AvailableJob::Base(job.clone()));
1302 }
1303 acc
1304 },
1305 Vec::with_capacity(256),
1306 )
1307 }
1308
1309 fn to_hashable_jobs(&self) -> Vec<Job<base::Base<BaseJob>, merge::Merge<MergeJob>>> {
1310 use JobStatus::Done;
1311
1312 self.fold(
1313 |mut acc, a| {
1314 match &a.job {
1315 merge::Job::Full(merge::Record { state: Done, .. }) => {}
1316 _ => {
1317 acc.push(Job::Merge(a.clone()));
1318 }
1319 }
1320 acc
1321 },
1322 |mut acc, d| {
1323 match &d.job {
1324 base::Job::Full(base::Record { state: Done, .. }) => {}
1325 _ => {
1326 acc.push(Job::Base(d.clone()));
1327 }
1328 }
1329 acc
1330 },
1331 Vec::with_capacity(256),
1332 )
1333 }
1334
1335 fn jobs_records(&self) -> Vec<Job<base::Record<BaseJob>, merge::Record<MergeJob>>> {
1336 self.fold(
1337 |mut acc, a: &merge::Merge<MergeJob>| {
1338 if let merge::Job::Full(x) = &a.job {
1339 acc.push(Job::Merge(x.clone()));
1340 }
1341 acc
1342 },
1343 |mut acc, d: &base::Base<BaseJob>| {
1344 if let base::Job::Full(j) = &d.job {
1345 acc.push(Job::Base(j.clone()));
1346 }
1347 acc
1348 },
1349 Vec::with_capacity(256),
1350 )
1351 }
1352
1353 fn base_jobs(&self) -> Vec<BaseJob> {
1354 self.fold_depth(
1355 |_, acc, _| acc,
1356 |mut acc, d| {
1357 if let base::Job::Full(base::Record { job, .. }) = &d.job {
1358 acc.push(job.clone());
1359 };
1360 acc
1361 },
1362 Vec::with_capacity(256),
1363 )
1364 }
1365
1366 fn todo_job_count(&self) -> (u64, u64) {
1368 use JobStatus::Todo;
1369
1370 self.fold_depth(
1371 |_, (b, m), j| match &j.job {
1372 merge::Job::Full(merge::Record { state: Todo, .. }) => (b, m + 1),
1373 _ => (b, m),
1374 },
1375 |(b, m), d| match &d.job {
1376 base::Job::Full(base::Record { state: Todo, .. }) => (b + 1, m),
1377 _ => (b, m),
1378 },
1379 (0, 0),
1380 )
1381 }
1382
1383 fn leaves(&self) -> Vec<base::Base<BaseJob>> {
1384 self.fold_depth(
1385 |_, acc, _| acc,
1386 |mut acc, d| {
1387 if let base::Job::Full(_) = &d.job {
1388 acc.push(d.clone());
1389 };
1390 acc
1391 },
1392 Vec::with_capacity(256),
1393 )
1394 }
1395
1396 fn required_job_count(&self) -> u64 {
1397 match &self.values[0] {
1398 Value::Node(value) => {
1399 let (w1, w2) = &value.weight;
1400 w1.merge + w2.merge
1401 }
1402 Value::Leaf(base) => base.weight.merge,
1403 }
1404 }
1405
1406 fn available_space(&self) -> u64 {
1407 match &self.values[0] {
1408 Value::Node(value) => {
1409 let (w1, w2) = &value.weight;
1410 w1.base + w2.base
1411 }
1412 Value::Leaf(base) => base.weight.base,
1413 }
1414 }
1415
1416 fn create_tree_for_level(
1417 level: i64,
1418 depth: u64,
1419 merge_job: merge::Job<MergeJob>,
1420 base_job: base::Job<BaseJob>,
1421 ) -> Self {
1422 let base_weight = if level == -1 {
1423 Weight::zero()
1424 } else {
1425 Weight { base: 1, merge: 0 }
1426 };
1427
1428 let make_base = || base::Base {
1429 weight: base_weight.clone(),
1430 job: base_job.clone(),
1431 };
1432
1433 let make_merge = |d: u64| {
1434 let weight = if level == -1 {
1435 (Weight::zero(), Weight::zero())
1436 } else {
1437 let x = 2u64.pow(level as u32) / 2u64.pow(d as u32 + 1);
1438 (Weight { base: x, merge: 0 }, Weight { base: x, merge: 0 })
1439 };
1440 merge::Merge {
1441 weight,
1442 job: merge_job.clone(),
1443 }
1444 };
1445
1446 let nnodes = 2u64.pow((depth + 1) as u32) - 1;
1447
1448 let values: Vec<_> = (0..nnodes)
1449 .map(|index| {
1450 let node_depth = btree::depth_at(index as usize);
1451
1452 if node_depth == depth {
1453 Value::Leaf(make_base())
1454 } else {
1455 Value::Node(make_merge(node_depth))
1456 }
1457 })
1458 .collect();
1459
1460 Self { values }
1467 }
1468
1469 fn create_tree(depth: u64) -> Self {
1470 let level: i64 = depth.try_into().unwrap();
1471 Self::create_tree_for_level(level, depth, merge::Job::Empty, base::Job::Empty)
1472 }
1473}
1474
1475impl<BaseJob, MergeJob> ParallelScan<BaseJob, MergeJob>
1476where
1477 BaseJob: Debug + Clone + 'static,
1478 MergeJob: Debug + Clone + 'static,
1479{
1480 fn with_leaner_trees(&self) -> Self {
1481 use JobStatus::Done;
1482
1483 let trees = self
1484 .trees
1485 .iter()
1486 .map(|tree| {
1487 tree.map(
1488 |merge_node| match &merge_node.job {
1489 merge::Job::Full(merge::Record { state: Done, .. }) => merge::Merge {
1490 weight: merge_node.weight.clone(),
1491 job: merge::Job::Empty,
1492 },
1493 _ => merge_node.clone(),
1494 },
1495 |b| b.clone(),
1496 )
1497 })
1498 .collect();
1499
1500 Self {
1501 trees,
1502 acc: self.acc.clone(),
1503 curr_job_seq_no: self.curr_job_seq_no.clone(),
1504 max_base_jobs: self.max_base_jobs,
1505 delay: self.delay,
1506 }
1507 }
1508
1509 pub fn empty(max_base_jobs: u64, delay: u64) -> Self {
1510 let depth = ceil_log2(max_base_jobs);
1511 let first_tree = Tree::create_tree(depth);
1514
1515 let mut trees = Vec::with_capacity(32);
1516 trees.push(first_tree);
1517
1518 Self {
1519 trees,
1520 acc: None,
1521 curr_job_seq_no: SequenceNumber(0),
1522 max_base_jobs,
1523 delay,
1524 }
1525 }
1526
1527 fn map<F1, F2>(&self, f1: F1, f2: F2) -> Self
1528 where
1529 F1: Fn(&MergeJob) -> MergeJob,
1530 F2: Fn(&BaseJob) -> BaseJob,
1531 {
1532 let trees = self
1533 .trees
1534 .iter()
1535 .map(|tree| tree.map_depth(&|_, m| m.map(&f1), &|a| a.map(&f2)))
1536 .collect();
1537
1538 let acc = self
1539 .acc
1540 .as_ref()
1541 .map(|(m, bs)| (f1(m), bs.iter().map(&f2).collect()));
1542
1543 Self {
1544 trees,
1545 acc,
1546 curr_job_seq_no: self.curr_job_seq_no.clone(),
1547 max_base_jobs: self.max_base_jobs,
1548 delay: self.delay,
1549 }
1550 }
1551
1552 pub fn hash<FunMerge, FunBase>(
1553 &self,
1554 fun_merge: FunMerge,
1555 fun_base: FunBase,
1556 ) -> GenericArray<u8, U32>
1557 where
1558 FunMerge: Fn(&mut Vec<u8>, &MergeJob),
1559 FunBase: Fn(&mut Vec<u8>, &BaseJob),
1560 {
1561 const BUFFER_CAPACITY: usize = 128 * 1024;
1562
1563 let Self {
1564 trees,
1565 acc,
1566 curr_job_seq_no,
1567 max_base_jobs,
1568 delay,
1569 } = self.with_leaner_trees();
1570
1571 let mut sha: Sha256 = Sha256::new();
1572 let mut buffer = Vec::with_capacity(BUFFER_CAPACITY);
1573 let buffer = &mut buffer;
1574
1575 let add_weight =
1576 |buffer: &mut Vec<u8>, w: &Weight| write!(buffer, "{}{}", w.base, w.merge).unwrap();
1577
1578 let add_two_weights = |buffer: &mut Vec<u8>, (w1, w2): &(Weight, Weight)| {
1579 add_weight(buffer, w1);
1580 add_weight(buffer, w2);
1581 };
1582
1583 for job in trees.iter().flat_map(Tree::to_hashable_jobs) {
1584 buffer.clear();
1585
1586 match &job {
1587 Job::Base(base) => match &base.job {
1588 base::Job::Empty => {
1589 add_weight(buffer, &base.weight);
1590 write!(buffer, "Empty").unwrap();
1591 }
1592 base::Job::Full(base::Record { job, seq_no, state }) => {
1593 add_weight(buffer, &base.weight);
1594 write!(buffer, "Full{}{}", seq_no.0, state.as_str()).unwrap();
1595
1596 fun_base(buffer, job);
1597 }
1598 },
1599 Job::Merge(merge) => match &merge.job {
1600 merge::Job::Empty => {
1601 add_two_weights(buffer, &merge.weight);
1602 write!(buffer, "Empty").unwrap();
1603 }
1604 merge::Job::Part(job) => {
1605 add_two_weights(buffer, &merge.weight);
1606 write!(buffer, "Part").unwrap();
1607
1608 fun_merge(buffer, job);
1609 }
1610 merge::Job::Full(merge::Record {
1611 left,
1612 right,
1613 seq_no,
1614 state,
1615 }) => {
1616 add_two_weights(buffer, &merge.weight);
1617 write!(buffer, "Full{}{}", seq_no.0, state.as_str()).unwrap();
1618
1619 fun_merge(buffer, left);
1620 fun_merge(buffer, right);
1621 }
1622 },
1623 }
1624
1625 sha.update(buffer.as_slice());
1626
1627 assert_eq!(buffer.capacity(), BUFFER_CAPACITY);
1630 }
1631
1632 match &acc {
1633 Some((a, d_lst)) => {
1634 buffer.clear();
1635
1636 fun_merge(buffer, a);
1637 for j in d_lst {
1638 fun_base(buffer, j);
1639 }
1640
1641 sha.update(&buffer);
1642 }
1643 None => {
1644 sha.update("None");
1645 }
1646 };
1647
1648 buffer.clear();
1649 write!(buffer, "{}{}{}", curr_job_seq_no.0, max_base_jobs, delay).unwrap();
1650 sha.update(&buffer);
1651
1652 sha.finalize()
1653 }
1654
1655 pub fn fold_chronological_until<Accum, Final, FunMerge, FunBase, FunFinish>(
1656 &self,
1657 init: Accum,
1658 fun_merge: FunMerge,
1659 fun_base: FunBase,
1660 fun_finish: FunFinish,
1661 ) -> Final
1662 where
1663 FunMerge: Fn(Accum, &merge::Merge<MergeJob>) -> ControlFlow<Final, Accum>,
1664 FunBase: Fn(Accum, &base::Base<BaseJob>) -> ControlFlow<Final, Accum>,
1665 FunFinish: Fn(Accum) -> Final,
1666 {
1667 let mut accum = init;
1668
1669 for tree in self.trees.iter().rev() {
1670 match tree.fold_depth_until_prime(&|_, acc, m| fun_merge(acc, m), &fun_base, accum) {
1671 Continue(acc) => accum = acc,
1672 Break(v) => return v,
1673 }
1674 }
1675
1676 fun_finish(accum)
1677 }
1678
1679 pub fn fold_chronological_until_err<Accum, FunMerge, FunBase, FunFinish>(
1680 &self,
1681 init: Accum,
1682 fun_merge: FunMerge,
1683 fun_base: FunBase,
1684 fun_finish: FunFinish,
1685 ) -> Result<Accum, String>
1686 where
1687 FunMerge: Fn(Accum, &merge::Merge<MergeJob>) -> Result<Accum, String>,
1688 FunBase: Fn(Accum, &base::Base<BaseJob>) -> Result<Accum, String>,
1689 FunFinish: Fn(Accum) -> Accum,
1690 {
1691 let mut accum = init;
1692
1693 for tree in self.trees.iter().rev() {
1694 match tree.fold_depth_until_prime_err(&|_, acc, m| fun_merge(acc, m), &fun_base, accum)
1695 {
1696 Ok(acc) => accum = acc,
1697 Err(e) => return Err(e),
1698 }
1699 }
1700
1701 Ok(fun_finish(accum))
1702 }
1703
1704 fn fold_chronological<Accum, FunMerge, FunBase>(
1705 &self,
1706 init: Accum,
1707 fun_merge: FunMerge,
1708 fun_base: FunBase,
1709 ) -> Accum
1710 where
1711 FunMerge: Fn(Accum, &merge::Merge<MergeJob>) -> Accum,
1712 FunBase: Fn(Accum, &base::Base<BaseJob>) -> Accum,
1713 {
1714 self.fold_chronological_until(
1715 init,
1716 |acc, a| Continue(fun_merge(acc, a)),
1717 |acc, d| Continue(fun_base(acc, d)),
1718 |v| v,
1719 )
1720 }
1721
1722 fn max_trees(&self) -> u64 {
1723 ((ceil_log2(self.max_base_jobs) + 1) * (self.delay + 1)) + 1
1724 }
1725
1726 fn work_for_tree(&self, data_tree: WorkForTree) -> Vec<AvailableJob<BaseJob, MergeJob>> {
1727 let delay = self.delay + 1;
1728
1729 let trees = match data_tree {
1731 WorkForTree::Current => &self.trees[1..],
1732 WorkForTree::Next => &self.trees,
1733 };
1734
1735 work(trees, delay, self.max_base_jobs)
1738 }
1739
1740 fn all_work(&self) -> Vec<Vec<AvailableJob<BaseJob, MergeJob>>> {
1742 let depth = ceil_log2(self.max_base_jobs);
1743 let set1 = self.work_for_tree(WorkForTree::Current);
1745 let mut this = self.clone();
1756 this.trees.reserve(self.delay as usize + 1);
1757
1758 let mut other_set = Vec::with_capacity(256);
1761 other_set.push(set1);
1762
1763 for _ in 0..self.delay + 1 {
1764 this.trees.insert(0, Tree::create_tree(depth));
1766 let work = this.work_for_tree(WorkForTree::Current);
1767
1768 if !work.is_empty() {
1771 other_set.push(work);
1772 }
1773 }
1774
1775 other_set
1776 }
1777
1778 fn work_for_next_update(&self, data_count: u64) -> Vec<Vec<AvailableJob<BaseJob, MergeJob>>> {
1805 let delay = self.delay + 1;
1806 let current_tree_space = self.trees[0].available_space() as usize;
1807
1808 let mut set1 = work(&self.trees[1..], delay, self.max_base_jobs);
1809 let count = data_count.min(self.max_base_jobs) as usize;
1810
1811 if current_tree_space < count {
1812 let mut set2 = work(&self.trees, delay, self.max_base_jobs);
1813 set2.truncate((count - current_tree_space) * 2);
1814
1815 [set1, set2].into_iter().filter(|v| !v.is_empty()).collect()
1816 } else {
1817 set1.truncate(2 * count);
1818
1819 if set1.is_empty() {
1820 vec![]
1821 } else {
1822 vec![set1]
1823 }
1824 }
1825 }
1826
1827 fn free_space_on_current_tree(&self) -> u64 {
1828 self.trees[0].available_space()
1829 }
1830
1831 fn add_merge_jobs(
1832 &mut self,
1833 completed_jobs: &[MergeJob],
1834 ) -> Result<Option<(MergeJob, Vec<BaseJob>)>, ()> {
1835 fn take<T>(slice: &[T], n: usize) -> &[T] {
1836 slice.get(..n).unwrap_or(slice)
1837 }
1838
1839 fn drop<T>(slice: &[T], n: usize) -> &[T] {
1840 slice.get(n..).unwrap_or(&[])
1841 }
1842
1843 if completed_jobs.is_empty() {
1844 return Ok(None);
1845 }
1846
1847 let delay = self.delay + 1;
1848 let udelay = delay as usize;
1849 let depth = ceil_log2(self.max_base_jobs);
1850
1851 let completed_jobs_len = completed_jobs.len();
1852 let merge_jobs: Vec<_> = completed_jobs.iter().cloned().map(Job::Merge).collect();
1853 let jobs_required = self.work_for_tree(WorkForTree::Current);
1854
1855 assert!(
1856 merge_jobs.len() <= jobs_required.len(),
1857 "More work than required"
1858 );
1859
1860 let curr_tree = &self.trees[0];
1861 let to_be_updated_trees = &self.trees[1..];
1862
1863 let mut stats = BTreeMap::<u64, (u64, usize)>::new();
1865
1866 let (mut updated_trees, result_opt) = {
1867 let mut jobs = merge_jobs.as_slice();
1868
1869 let mut updated_trees = Vec::with_capacity(to_be_updated_trees.len());
1870 let mut scan_result = None;
1871
1872 for (i, tree) in to_be_updated_trees.iter().enumerate() {
1873 if (i % udelay == udelay - 1) && !jobs.is_empty() {
1875 let nrequired = tree.required_job_count() as usize;
1876 let completed_jobs = take(jobs, nrequired);
1877 let i = i as u64;
1878
1879 let level = depth - (i / delay);
1880 let old = stats.insert(i, (level, completed_jobs.len()));
1881 assert!(old.is_none());
1882
1883 let (tree, result) = tree.update(
1884 completed_jobs,
1885 depth - (i / delay),
1886 self.curr_job_seq_no.clone(),
1887 WeightLens::Merge,
1888 )?;
1889
1890 updated_trees.push(tree);
1891 scan_result = result;
1892 jobs = drop(jobs, nrequired);
1893 } else {
1894 updated_trees.push(tree.clone());
1895 }
1896 }
1897
1898 (updated_trees, scan_result)
1899 };
1900
1901 for (index, (level, njobs)) in stats.iter().rev() {
1902 let index = self.trees.len() - *index as usize - 2;
1903
1904 if result_opt.is_some() && index == 0 {
1905 println!(
1906 "- tree[{:>02}] level={} {:>3} completed jobs, a proof is emitted, tree is removed",
1907 index,
1908 level,
1909 njobs
1910 );
1911 } else {
1912 println!(
1913 "- tree[{:>02}] level={} {:>3} completed jobs (DONE)",
1914 index, level, njobs
1915 );
1916 println!(
1917 " level={} {:>3} new merge jobs (TODO)",
1918 level - 1,
1919 njobs / 2,
1920 );
1921 }
1922 }
1923
1924 let (mut updated_trees, result_opt) = {
1925 let (updated_trees, result_opt) = match result_opt {
1926 Some(scan_result) if !updated_trees.is_empty() => {
1927 let last = updated_trees.pop().unwrap();
1928 let tree_data = last.base_jobs();
1929 (updated_trees, Some((scan_result, tree_data)))
1930 }
1931 _ => (updated_trees, None),
1932 };
1933
1934 if result_opt.is_some()
1936 || (updated_trees.len() + 1) < self.max_trees() as usize
1937 && (completed_jobs_len == jobs_required.len())
1938 {
1939 let updated_trees = updated_trees
1940 .into_iter()
1941 .map(|tree| tree.reset_weights(ResetKind::Merge))
1942 .collect();
1943 (updated_trees, result_opt)
1944 } else {
1945 (updated_trees, result_opt)
1946 }
1947 };
1948
1949 updated_trees.insert(0, curr_tree.clone());
1950
1951 self.trees = updated_trees;
1952
1953 Ok(result_opt)
1954 }
1955
1956 fn add_data(
1957 &mut self,
1958 data: Vec<BaseJob>,
1959 base_kind: impl Fn(&BaseJob) -> usize,
1960 ) -> Result<(), ()> {
1961 if data.is_empty() {
1962 return Ok(());
1963 }
1964
1965 let data_len = data.len();
1966 let depth = ceil_log2(self.max_base_jobs);
1967 let tree = &self.trees[0];
1968
1969 let base_jobs: Vec<_> = data.into_iter().map(Job::Base).collect();
1970 let available_space = tree.available_space() as usize;
1971
1972 assert!(
1973 data_len <= available_space,
1974 "Data count ({}) exceeded available space ({})",
1975 data_len,
1976 available_space
1977 );
1978
1979 let (tree, _) = tree
1987 .update(
1988 &base_jobs,
1989 depth,
1990 self.curr_job_seq_no.clone(),
1991 WeightLens::Base,
1992 )
1993 .expect("Error while adding a base job to the tree");
1994
1995 let bases = tree.base_jobs();
1996 let nbase = bases.len();
1997
1998 let updated_trees = if data_len == available_space {
1999 let new_tree = Tree::create_tree(depth);
2000 let tree = tree.reset_weights(ResetKind::Both);
2001 vec![new_tree, tree]
2002 } else {
2003 let tree = tree.reset_weights(ResetKind::Merge);
2004 vec![tree]
2005 };
2006
2007 println!(
2008 "- tree[{:>02}] level=7 {:>3} new base jobs (TODO), total_nbase_jobs={:?}",
2009 self.trees.len() - 1,
2010 data_len,
2011 nbase,
2012 );
2013
2014 let max_base_jobs = self.max_base_jobs as usize;
2015
2016 let mut folded = bases.iter().fold(
2017 Vec::<(usize, usize)>::with_capacity(max_base_jobs),
2018 |mut accum, b| {
2019 let kind = base_kind(b);
2020 match accum.last_mut() {
2021 Some(last) if last.0 == kind => last.1 += 1,
2022 _ => accum.push((kind, 1)),
2023 }
2024 accum
2025 },
2026 );
2027
2028 let total_folded: usize = folded.iter().map(|(_, n)| *n).sum();
2029
2030 if total_folded != max_base_jobs {
2031 folded.push((10, max_base_jobs - total_folded));
2032 }
2033
2034 let to_s = |n: usize, s: &str| {
2035 if n == 1 {
2036 s.to_string()
2037 } else {
2038 format!("{n} {s}")
2039 }
2040 };
2041
2042 let s = folded
2043 .iter()
2044 .map(|(kind, n)| match kind {
2045 0 => to_s(*n, "CMD"),
2046 1 => to_s(*n, "FT"),
2047 2 => to_s(*n, "CB"),
2048 10 => to_s(*n, "EMPTY"),
2049 _ => panic!(),
2050 })
2051 .join("|");
2052
2053 println!(" level=7 has the following jobs (in this order):");
2054 println!(" {s}");
2055
2056 let mut old = std::mem::replace(&mut self.trees, updated_trees);
2071 old.remove(0);
2072 self.trees.append(&mut old);
2073
2074 Ok(())
2077 }
2078
2079 fn reset_seq_no(&mut self) {
2080 let last = self.trees.last().unwrap();
2081 let oldest_seq_no = match last.leaves().first() {
2082 Some(base::Base {
2083 job: base::Job::Full(base::Record { seq_no, .. }),
2084 ..
2085 }) => seq_no.clone(),
2086 _ => SequenceNumber::zero(),
2087 };
2088
2089 let new_seq = |seq: &SequenceNumber| (seq - &oldest_seq_no).incr();
2090
2091 let fun_merge = |m: &merge::Merge<MergeJob>| match &m.job {
2092 merge::Job::Full(merge::Record { seq_no, .. }) => m.with_seq_no(new_seq(seq_no)),
2093 _ => m.clone(),
2094 };
2095
2096 let fun_base = |m: &base::Base<BaseJob>| match &m.job {
2097 base::Job::Full(base::Record { seq_no, .. }) => m.with_seq_no(new_seq(seq_no)),
2098 _ => m.clone(),
2099 };
2100
2101 let mut max_seq = SequenceNumber::zero();
2102 let mut updated_trees = Vec::with_capacity(self.trees.len());
2103
2104 for tree in &self.trees {
2105 use base::{Base, Job::Full, Record};
2106
2107 let tree = tree.map(fun_merge, fun_base);
2108 updated_trees.push(tree.clone());
2109
2110 let leaves = tree.leaves();
2111
2112 let last = match leaves.last() {
2113 Some(last) => last,
2114 None => continue,
2115 };
2116
2117 if let Base {
2118 job: Full(Record { seq_no, .. }),
2119 ..
2120 } = last
2121 {
2122 max_seq = max_seq.max(seq_no.clone());
2123 };
2124 }
2125
2126 self.curr_job_seq_no = max_seq;
2127 self.trees = updated_trees;
2128 }
2129
2130 fn incr_sequence_no(&mut self) {
2131 let next_seq_no = self.curr_job_seq_no.incr();
2132
2133 if next_seq_no.is_u64_max() {
2134 self.reset_seq_no();
2135 } else {
2136 self.curr_job_seq_no = next_seq_no;
2137 }
2138 }
2139
2140 fn update_helper(
2141 &mut self,
2142 data: Vec<BaseJob>,
2143 completed_jobs: Vec<MergeJob>,
2144 base_kind: impl Fn(&BaseJob) -> usize,
2145 ) -> Result<Option<(MergeJob, Vec<BaseJob>)>, ()> {
2146 fn split<T>(slice: &[T], n: usize) -> (&[T], &[T]) {
2147 (
2148 slice.get(..n).unwrap_or(slice),
2149 slice.get(n..).unwrap_or(&[]),
2150 )
2151 }
2152
2153 let data_count = data.len() as u64;
2154
2155 assert!(
2156 data_count <= self.max_base_jobs,
2157 "Data count ({}) exceeded maximum ({})",
2158 data_count,
2159 self.max_base_jobs
2160 );
2161
2162 let required_jobs_count = self
2163 .work_for_next_update(data_count)
2164 .into_iter()
2165 .flatten()
2166 .count();
2167
2168 {
2169 let required = required_jobs_count.div_ceil(2);
2170 let got = completed_jobs.len().div_ceil(2);
2171
2172 let max_base_jobs = self.max_base_jobs as usize;
2175 assert!(
2176 !(got < required && data.len() > max_base_jobs - required + got),
2177 "Insufficient jobs (Data count {}): Required- {} got- {}",
2178 data_count,
2179 required,
2180 got
2181 )
2182 }
2183
2184 let delay = self.delay + 1;
2185
2186 self.incr_sequence_no();
2188
2189 let latest_tree = &self.trees[0];
2190 let available_space = latest_tree.available_space();
2191
2192 let (data1, data2) = split(&data, available_space as usize);
2198
2199 let required_jobs_for_current_tree =
2208 work(&self.trees[1..], delay, self.max_base_jobs).len();
2209 let (jobs1, jobs2) = split(&completed_jobs, required_jobs_for_current_tree);
2210
2211 println!("scan_state update:");
2220
2221 let result_opt = self.add_merge_jobs(jobs1)?;
2223 self.add_data(data1.to_vec(), &base_kind)?;
2224
2225 if !jobs2.is_empty() || !data2.is_empty() {
2226 println!("scan_state update: (2nd set of jobs, new transactions didn't fit in latest/current tree)");
2227 }
2228
2229 self.add_merge_jobs(jobs2)?;
2232 self.add_data(data2.to_vec(), base_kind)?;
2233
2234 if result_opt.is_some() {
2236 self.acc.clone_from(&result_opt);
2237 };
2238
2239 assert!(
2240 self.trees.len() <= self.max_trees() as usize,
2241 "Tree list length ({}) exceeded maximum ({})",
2242 self.trees.len(),
2243 self.max_trees()
2244 );
2245
2246 Ok(result_opt)
2247 }
2248
2249 pub fn update(
2250 &mut self,
2251 data: Vec<BaseJob>,
2252 completed_jobs: Vec<MergeJob>,
2253 base_kind: impl Fn(&BaseJob) -> usize,
2254 ) -> Result<Option<(MergeJob, Vec<BaseJob>)>, ()> {
2255 self.update_helper(data, completed_jobs, base_kind)
2256 }
2257
2258 pub fn all_jobs(&self) -> Vec<Vec<AvailableJob<BaseJob, MergeJob>>> {
2259 self.all_work()
2260 }
2261
2262 pub fn jobs_for_next_update(&self) -> Vec<Vec<AvailableJob<BaseJob, MergeJob>>> {
2263 self.work_for_next_update(self.max_base_jobs)
2264 }
2265
2266 pub fn jobs_for_slots(&self, slots: u64) -> Vec<Vec<AvailableJob<BaseJob, MergeJob>>> {
2267 self.work_for_next_update(slots)
2268 }
2269
2270 pub fn free_space(&self) -> u64 {
2271 self.max_base_jobs
2272 }
2273
2274 pub fn last_emitted_value(&self) -> Option<&(MergeJob, Vec<BaseJob>)> {
2275 self.acc.as_ref()
2276 }
2277
2278 fn current_job_sequence_number(&self) -> SequenceNumber {
2279 self.curr_job_seq_no.clone()
2280 }
2281
2282 pub fn base_jobs_on_latest_tree(&self) -> impl Iterator<Item = BaseJob> {
2283 let depth = ceil_log2(self.max_base_jobs);
2284 let level = depth;
2285
2286 self.trees[0]
2287 .jobs_on_level(depth, level)
2288 .into_iter()
2289 .filter_map(|job| match job {
2290 AvailableJob::Base(base) => Some(base),
2291 AvailableJob::Merge { .. } => None,
2292 })
2293 }
2294
2295 pub fn base_jobs_on_earlier_tree(&self, index: usize) -> impl Iterator<Item = BaseJob> {
2297 let depth = ceil_log2(self.max_base_jobs);
2298 let level = depth;
2299
2300 let earlier_trees = &self.trees[1..];
2301
2302 let base_job = |job| match job {
2303 AvailableJob::Base(base) => Some(base),
2304 AvailableJob::Merge { .. } => None,
2305 };
2306
2307 match earlier_trees.get(index) {
2308 None => {
2309 Vec::new().into_iter().filter_map(base_job)
2312 }
2313 Some(tree) => tree
2314 .jobs_on_level(depth, level)
2315 .into_iter()
2316 .filter_map(base_job),
2317 }
2318 }
2319
2320 pub fn partition_if_overflowing(&self) -> SpacePartition {
2321 let cur_tree_space = self.free_space_on_current_tree();
2322
2323 let work_count = self.work_for_tree(WorkForTree::Current).len() as u64;
2326 let work_count_new_tree = self.work_for_tree(WorkForTree::Next).len() as u64;
2327
2328 SpacePartition {
2329 first: (cur_tree_space, work_count),
2330 second: {
2331 if cur_tree_space < self.max_base_jobs {
2332 let slots = self.max_base_jobs - cur_tree_space;
2333 Some((slots, work_count_new_tree.min(2 * slots)))
2334 } else {
2335 None
2336 }
2337 },
2338 }
2339 }
2340
2341 pub fn next_on_new_tree(&self) -> bool {
2342 let curr_tree_space = self.free_space_on_current_tree();
2343 curr_tree_space == self.max_base_jobs
2344 }
2345
2346 pub fn pending_data(&self) -> Vec<Vec<BaseJob>> {
2347 self.trees.iter().rev().map(Tree::base_jobs).collect()
2348 }
2349
2350 fn job_count(&self) -> (f64, f64) {
2352 use JobStatus::{Done, Todo};
2353
2354 self.fold_chronological(
2355 (0.0, 0.0),
2356 |(ntodo, ndone), merge| {
2357 use merge::{
2358 Job::{Empty, Full, Part},
2359 Record,
2360 };
2361
2362 let (todo, done) = match &merge.job {
2363 Part(_) => (0.5, 0.0),
2364 Full(Record { state: Todo, .. }) => (1.0, 0.0),
2365 Full(Record { state: Done, .. }) => (0.0, 1.0),
2366 Empty => (0.0, 0.0),
2367 };
2368 (ntodo + todo, ndone + done)
2369 },
2370 |(ntodo, ndone), base| {
2371 use base::{
2372 Job::{Empty, Full},
2373 Record,
2374 };
2375
2376 let (todo, done) = match &base.job {
2377 Empty => (0.0, 0.0),
2378 Full(Record { state: Todo, .. }) => (1.0, 0.0),
2379 Full(Record { state: Done, .. }) => (0.0, 1.0),
2380 };
2381
2382 (ntodo + todo, ndone + done)
2383 },
2384 )
2385 }
2386}
2387
2388fn work_to_do<'a, BaseJob, MergeJob, I>(
2389 trees: I,
2390 max_base_jobs: u64,
2391) -> Vec<AvailableJob<BaseJob, MergeJob>>
2392where
2393 I: Iterator<Item = &'a Tree<base::Base<BaseJob>, merge::Merge<MergeJob>>>,
2394 BaseJob: Debug + Clone + 'static,
2395 MergeJob: Debug + Clone + 'static,
2396{
2397 let depth = ceil_log2(max_base_jobs);
2398
2399 trees
2404 .enumerate()
2406 .flat_map(|(i, tree)| {
2407 let level = depth - i as u64;
2408 tree.jobs_on_level(depth, level)
2409 })
2410 .collect()
2411}
2412
2413fn work<'a, BaseJob, MergeJob, I>(
2414 trees: I,
2415 delay: u64,
2416 max_base_jobs: u64,
2417) -> Vec<AvailableJob<BaseJob, MergeJob>>
2418where
2419 I: IntoIterator<Item = &'a Tree<base::Base<BaseJob>, merge::Merge<MergeJob>>>,
2420 BaseJob: Debug + Clone + 'static,
2421 MergeJob: Debug + Clone + 'static,
2422{
2423 let depth = ceil_log2(max_base_jobs) as usize;
2424 let delay = delay as usize;
2425
2426 let work_trees = trees
2429 .into_iter()
2430 .enumerate()
2431 .filter_map(|(i, t)| {
2432 if i % delay == delay - 1 {
2433 Some(t)
2434 } else {
2435 None
2436 }
2437 })
2438 .take(depth + 1);
2439
2440 work_to_do(work_trees, max_base_jobs)
2441}
2442
2443fn take<T>(slice: &[T], n: usize) -> &[T] {
2444 slice.get(..n).unwrap_or(slice)
2445}
2446
2447fn take_at<T>(slice: &[T], skip: usize, n: usize) -> &[T] {
2448 slice.get(skip..).map(|s| take(s, n)).unwrap_or(&[])
2449}
2450
2451pub const fn ceil_log2(n: u64) -> u64 {
2452 assert!(n > 0);
2458 if n == 1 {
2459 0
2460 } else {
2461 u64::BITS as u64 - (n - 1).leading_zeros() as u64
2462 }
2463}
2464
2465fn flatten<T>(v: Vec<Vec<T>>) -> Vec<T> {
2466 v.into_iter().flatten().collect()
2467}
2468
2469fn assert_job_count<B, M>(
2471 s1: &ParallelScan<B, M>,
2472 s2: &ParallelScan<B, M>,
2473 completed_job_count: f64,
2474 base_job_count: f64,
2475 value_emitted: bool,
2476) where
2477 B: Debug + Clone + 'static,
2478 M: Debug + Clone + 'static,
2479{
2480 let (todo_before, done_before) = s1.job_count();
2484 let (todo_after, done_after) = s2.job_count();
2485
2486 let all_jobs = flatten(s2.all_jobs());
2499
2500 let all_jobs_expected_count = s2
2503 .trees
2504 .iter()
2505 .fold(Vec::with_capacity(s2.trees.len()), |mut acc, tree| {
2506 let mut records = tree.jobs_records();
2507 acc.append(&mut records);
2508 acc
2509 })
2510 .into_iter()
2511 .filter(|job| {
2512 matches!(
2513 job,
2514 Job::Base(base::Record {
2515 state: JobStatus::Todo,
2516 ..
2517 }) | Job::Merge(merge::Record {
2518 state: JobStatus::Todo,
2519 ..
2520 })
2521 )
2522 })
2523 .count();
2524
2525 assert_eq!(all_jobs.len(), all_jobs_expected_count);
2532
2533 let expected_todo_after = {
2534 let new_jobs = if value_emitted {
2535 (completed_job_count - 1.0) / 2.0
2536 } else {
2537 completed_job_count / 2.0
2538 };
2539 todo_before + base_job_count - completed_job_count + new_jobs
2540 };
2541
2542 let expected_done_after = {
2543 let jobs_from_delete_tree = if value_emitted {
2544 ((2 * s1.max_base_jobs) - 1) as f64
2545 } else {
2546 0.0
2547 };
2548 done_before + completed_job_count - jobs_from_delete_tree
2549 };
2550
2551 assert_eq!(todo_after, expected_todo_after);
2552 assert_eq!(done_after, expected_done_after);
2553}
2554
2555fn test_update<B, M>(
2556 s1: &ParallelScan<B, M>,
2557 data: Vec<B>,
2558 completed_jobs: Vec<M>,
2559) -> (Option<(M, Vec<B>)>, ParallelScan<B, M>)
2560where
2561 B: Debug + Clone + 'static,
2562 M: Debug + Clone + 'static,
2563{
2564 let mut s2 = s1.clone();
2565 let result_opt = s2
2566 .update(data.clone(), completed_jobs.clone(), |_| 0)
2567 .unwrap();
2568
2569 assert_job_count(
2570 s1,
2571 &s2,
2572 completed_jobs.len() as f64,
2573 data.len() as f64,
2574 result_opt.is_some(),
2575 );
2576 (result_opt, s2)
2577}
2578
2579fn int_to_string(u: &usize) -> String {
2580 u.to_string()
2581}
2582
2583fn sint_to_string(buffer: &mut Vec<u8>, i: &i64) {
2588 write!(buffer, "{}", i).unwrap();
2589}
2590
2591fn hash(state: &ParallelScan<i64, i64>) -> String {
2592 hex::encode(state.hash(sint_to_string, sint_to_string))
2593}
2594
2595#[cfg(test)]
2596mod tests {
2597 use std::{
2598 array,
2599 sync::{
2600 atomic::{AtomicBool, Ordering::Relaxed},
2601 mpsc::{sync_channel, Receiver, SyncSender},
2602 },
2603 };
2604
2605 use mina_core::thread;
2606 use rand::Rng;
2607
2608 use super::*;
2609
2610 #[test]
2611 fn test_ceil_log2() {
2612 for a in 1..50u64 {
2613 let v = (a as f32).log2().ceil() as u64;
2614 let w = ceil_log2(a);
2615 assert_eq!(v, w);
2617 }
2618 }
2619
2620 #[test]
2622 fn test_sha256() {
2623 let array: [u8; 2 * 1024] = array::from_fn(|i| (i % 256) as u8);
2624 let mut slice = &array[..];
2625
2626 let mut sha256 = sha2::Sha256::new();
2627 for byte in slice.iter().copied() {
2628 sha256.update(&[byte][..]);
2629 }
2630 let first = sha256.finalize();
2631
2632 let mut sha256 = sha2::Sha256::new();
2633 let mut n = 1;
2634 while !slice.is_empty() {
2635 sha256.update(slice.get(..n).unwrap_or(slice));
2636 slice = slice.get(n..).unwrap_or(&[]);
2637
2638 n += 2;
2639 }
2640 let second = sha256.finalize();
2641
2642 assert_eq!(first, second);
2643 }
2644
2645 #[test]
2646 fn test_range_at_depth() {
2647 let ranges: Vec<_> = (0..10u64).map(btree::range_at_depth).collect();
2648
2649 assert_eq!(
2650 ranges,
2651 [
2652 0..1,
2653 1..3,
2654 3..7,
2655 7..15,
2656 15..31,
2657 31..63,
2658 63..127,
2659 127..255,
2660 255..511,
2661 511..1023,
2662 ]
2663 );
2664 }
2665
2666 #[test]
2668 fn always_max_base_jobs() {
2669 const MAX_BASE_JOS: u64 = 512;
2670
2671 let mut state = ParallelScan::<usize, usize>::empty(MAX_BASE_JOS, 3);
2678 let mut expected_result: Vec<Vec<usize>> = vec![];
2679
2680 for i in 0..100 {
2684 println!("####### LOOP {:?} #########", i);
2685
2686 let data: Vec<_> = (0..MAX_BASE_JOS as usize).map(|j| i + j).collect();
2687
2688 expected_result.push(data.clone());
2689
2690 let work: Vec<_> = state
2691 .work_for_next_update(data.len() as u64)
2692 .into_iter()
2693 .flatten()
2694 .collect();
2695
2696 let new_merges: Vec<_> = work
2697 .iter()
2698 .map(|job| match job {
2699 AvailableJob::Base(i) => *i,
2700 AvailableJob::Merge { left, right } => *left + *right,
2701 })
2702 .collect();
2703
2704 println!("work={:?} new_merges={:?}", work.len(), new_merges.len());
2705 let (result_opt, s) = test_update(&state, data, new_merges);
2712
2713 let (expected_result_, remaining_expected_results) = {
2716 match result_opt {
2717 None => ((0, vec![]), expected_result.clone()),
2718 Some(ref r) => {
2719 println!("RESULT_OPT.0={} len={}", r.0, r.1.len());
2720 if expected_result.is_empty() {
2723 ((0, vec![]), vec![])
2724 } else {
2725 let first = expected_result[0].clone();
2726 let sum: usize = first.iter().sum();
2727
2728 ((sum, first), expected_result[1..].to_vec())
2729 }
2730 }
2731 }
2732 };
2733
2734 assert_eq!(
2735 result_opt.as_ref().unwrap_or(&expected_result_),
2736 &expected_result_
2737 );
2738
2739 expected_result = remaining_expected_results;
2740 state = s;
2741 }
2742 }
2743
2744 #[test]
2746 fn random_base_jobs() {
2747 const MAX_BASE_JOS: usize = 512;
2748
2749 let mut state = ParallelScan::<usize, usize>::empty(MAX_BASE_JOS as u64, 3);
2750 let mut rng = rand::thread_rng();
2751 let expected_result = (MAX_BASE_JOS, vec![1usize; MAX_BASE_JOS]);
2752
2753 for _ in 0..1_000 {
2754 let mut data = vec![1; rng.gen_range(0..=30)];
2755 data.truncate(MAX_BASE_JOS);
2756 let data_len = data.len();
2757
2758 println!("list_length={}", data_len);
2759
2760 let work: Vec<_> = state
2761 .work_for_next_update(data_len as u64)
2762 .into_iter()
2763 .flatten()
2764 .take(data_len * 2)
2765 .collect();
2766 let new_merges: Vec<_> = work
2767 .iter()
2768 .map(|job| match job {
2769 AvailableJob::Base(i) => *i,
2770 AvailableJob::Merge { left, right } => left + right,
2771 })
2772 .collect();
2773
2774 let (result_opt, s) = test_update(&state, data, new_merges);
2775
2776 assert_eq!(
2777 result_opt.as_ref().unwrap_or(&expected_result),
2778 &expected_result
2779 );
2780
2781 state = s;
2782 }
2783 }
2784
2785 fn gen<FunDone, FunAcc>(fun_job_done: FunDone, fun_acc: FunAcc) -> ParallelScan<i64, i64>
2786 where
2787 FunDone: Fn(&AvailableJob<i64, i64>) -> i64,
2788 FunAcc: Fn(Option<(i64, Vec<i64>)>, (i64, Vec<i64>)) -> Option<(i64, Vec<i64>)>,
2789 {
2790 let mut rng = rand::thread_rng();
2791
2792 let depth = rng.gen_range(2..5);
2793 let delay = rng.gen_range(0..=3);
2794
2795 let max_base_jobs = 2u64.pow(depth);
2796
2797 let mut s = ParallelScan::<i64, i64>::empty(max_base_jobs, delay);
2798
2799 let ndatas = rng.gen_range(2..=20);
2800 let datas: Vec<Vec<i64>> = (1..ndatas)
2801 .map(|_| {
2802 std::iter::repeat_with(|| rng.gen())
2803 .take(max_base_jobs as usize)
2804 .collect()
2805 })
2806 .collect();
2807
2808 for data in datas {
2818 println!("ndata={}", data.len());
2819
2820 let jobs = flatten(s.work_for_next_update(data.len() as u64));
2821
2822 let jobs_done: Vec<i64> = jobs.iter().map(&fun_job_done).collect();
2823
2824 println!("jobs_donea={}", jobs_done.len());
2825
2826 let old_tuple = s.acc.clone();
2827
2828 let res_opt = s.update(data, jobs_done, |_| 0).unwrap();
2829
2830 if let Some(res) = res_opt {
2831 let tuple = if old_tuple.is_some() {
2832 old_tuple
2833 } else {
2834 s.acc.clone()
2835 };
2836
2837 s.acc = fun_acc(tuple, res);
2838 }
2839 println!("s.acc.is_some={:?}", s.acc.is_some());
2840 }
2841
2842 s
2843 }
2844
2845 fn fun_merge_up(
2846 state: Option<(i64, Vec<i64>)>,
2847 mut x: (i64, Vec<i64>),
2848 ) -> Option<(i64, Vec<i64>)> {
2849 let mut acc = state?;
2850 acc.1.append(&mut x.1);
2851 Some((acc.0.wrapping_add(x.0), acc.1))
2852 }
2853
2854 fn job_done(job: &AvailableJob<i64, i64>) -> i64 {
2855 match job {
2856 AvailableJob::Base(x) => *x,
2857 AvailableJob::Merge { left, right } => {
2859 left.wrapping_add(*right)
2862 }
2864 }
2865 }
2866
2867 #[test]
2871 fn split_on_if_enqueuing_onto_the_next_queue() {
2872 let mut rng = rand::thread_rng();
2873
2874 let p = 4;
2875 let max_base_jobs = 2u64.pow(p);
2876
2877 for _ in 0..100000 {
2878 let state = ParallelScan::<i64, i64>::empty(max_base_jobs, 1);
2879 println!("hash_state={:?}", hash(&state));
2880 let i = rng.gen_range(0..max_base_jobs);
2881
2882 let data: Vec<i64> = (0..i as i64).collect();
2883
2884 let partition = state.partition_if_overflowing();
2885 let jobs = flatten(state.work_for_next_update(data.len() as u64));
2886
2887 let jobs_done: Vec<i64> = jobs.iter().map(job_done).collect();
2888
2889 let tree_count_before = state.trees.len();
2890
2891 let (_, s) = test_update(&state, data, jobs_done);
2892
2893 println!("second={:?}", partition.second.is_some());
2894
2895 match partition.second {
2896 None => {
2897 let tree_count_after = s.trees.len();
2898 let expected_tree_count = if partition.first.0 == i {
2899 tree_count_before + 1
2900 } else {
2901 tree_count_before
2902 };
2903 assert_eq!(tree_count_after, expected_tree_count);
2904 }
2905 Some(_) => {
2906 let tree_count_after = s.trees.len();
2907 let expected_tree_count = if i > partition.first.0 {
2908 tree_count_before + 1
2909 } else {
2910 tree_count_before
2911 };
2912 assert_eq!(tree_count_after, expected_tree_count);
2913 }
2914 }
2915 }
2916 }
2917
2918 #[test]
2920 fn sequence_number_reset() {
2921 let p = 3;
2922 let max_base_jobs = 2u64.pow(p);
2923
2924 let jobs = |state: &ParallelScan<i64, i64>| -> Vec<Vec<Job<base::Record<i64>, merge::Record<i64>>>> {
2925 state.trees.iter().map(|t| t.jobs_records()).rev().collect()
2926 };
2927
2928 let verify_sequence_number = |state: &ParallelScan<i64, i64>| {
2929 let mut state = state.clone();
2930 state.reset_seq_no();
2931
2932 let jobs_list = jobs(&state);
2933
2934 let depth = ceil_log2(max_base_jobs + 1);
2935
2936 for (i, jobs) in jobs_list.iter().enumerate() {
2937 let cur_levels = depth - i as u64;
2945
2946 let seq_sum = (0..cur_levels).fold(0, |acc, j| {
2947 let j = j + i as u64;
2948 acc + (2u64.pow(j as u32) * (depth - j))
2949 });
2950
2951 let offset = i as u64;
2952
2953 let sum_of_all_seq_numbers: u64 = jobs
2954 .iter()
2955 .map(|job| match job {
2956 Job::Base(base::Record { seq_no, .. }) => seq_no.0 - offset,
2957 Job::Merge(merge::Record { seq_no, .. }) => seq_no.0 - offset,
2958 })
2959 .sum();
2960
2961 dbg!(sum_of_all_seq_numbers, seq_sum);
2962 assert_eq!(sum_of_all_seq_numbers, seq_sum);
2963 }
2964 };
2965
2966 let mut state = ParallelScan::<i64, i64>::empty(max_base_jobs, 0);
2967 let mut counter = 0;
2968
2969 for _ in 0..50 {
2970 let jobs = flatten(state.jobs_for_next_update());
2971 let jobs_done: Vec<_> = jobs.iter().map(job_done).collect();
2972 let data: Vec<i64> = (0..max_base_jobs as i64).collect();
2973
2974 let (res_opt, s) = test_update(&state, data, jobs_done);
2975
2976 state = s;
2977
2978 if res_opt.is_some() {
2979 if counter > p {
2980 verify_sequence_number(&state);
2981 } else {
2982 counter += 1;
2983 }
2984 };
2985 }
2986
2987 assert_eq!(
2988 hash(&state),
2989 "931a0dc0a488289000c195ae361138cc713deddc179b5d22bfa6344508d0cfb5"
2990 );
2991 }
2992
2993 fn step_on_free_space<F, FAcc, B, M>(
2994 state: &mut ParallelScan<B, M>,
2995 w: &SyncSender<Option<(M, Vec<B>)>>,
2996 mut ds: Vec<B>,
2997 f: F,
2998 f_acc: FAcc,
2999 ) where
3000 F: Fn(&AvailableJob<B, M>) -> M,
3001 FAcc: Fn(Option<(M, Vec<B>)>, (M, Vec<B>)) -> Option<(M, Vec<B>)>,
3002 B: Debug + Clone + 'static,
3003 M: Debug + Clone + 'static,
3004 {
3005 loop {
3006 let data = take(&ds, state.max_base_jobs as usize);
3007
3008 let jobs = flatten(state.work_for_next_update(data.len() as u64));
3009 let jobs_done: Vec<_> = jobs.iter().map(&f).collect();
3010
3011 let old_tuple = state.acc.clone();
3012
3013 let (res_opt, mut s) = test_update(state, data.to_vec(), jobs_done);
3014
3015 let s = match res_opt {
3016 Some(x) => {
3017 let tuple = if old_tuple.is_some() {
3018 f_acc(old_tuple, x)
3019 } else {
3020 s.acc.clone()
3021 };
3022 s.acc = tuple;
3023 s
3024 }
3025 None => s,
3026 };
3027
3028 w.send(s.acc.clone()).unwrap();
3029
3030 *state = s;
3031
3032 let rem_ds = ds.get(state.max_base_jobs as usize..).unwrap_or(&[]);
3033
3034 if rem_ds.is_empty() {
3035 return;
3036 } else {
3037 ds = rem_ds.to_vec();
3038 }
3039 }
3040 }
3041
3042 fn do_steps<F, FAcc, B, M>(
3043 state: &mut ParallelScan<B, M>,
3044 recv: &Receiver<B>,
3045 f: F,
3046 f_acc: FAcc,
3047 w: SyncSender<Option<(M, Vec<B>)>>,
3048 ) where
3049 F: Fn(&AvailableJob<B, M>) -> M,
3050 FAcc: Fn(Option<(M, Vec<B>)>, (M, Vec<B>)) -> Option<(M, Vec<B>)>,
3051 B: Debug + Clone + 'static,
3052 M: Debug + Clone + 'static,
3053 {
3054 let data = recv.recv().unwrap();
3055 step_on_free_space(state, &w, vec![data], &f, &f_acc);
3056 }
3057
3058 fn scan<F, FAcc, B, M>(
3059 s: &mut ParallelScan<B, M>,
3060 data: &Receiver<B>,
3061 f: F,
3062 f_acc: FAcc,
3063 ) -> Receiver<Option<(M, Vec<B>)>>
3064 where
3065 F: Fn(&AvailableJob<B, M>) -> M,
3066 FAcc: Fn(Option<(M, Vec<B>)>, (M, Vec<B>)) -> Option<(M, Vec<B>)>,
3067 B: Debug + Clone + 'static,
3068 M: Debug + Clone + 'static,
3069 {
3070 let (send, rec) = std::sync::mpsc::sync_channel::<Option<(M, Vec<B>)>>(1);
3071 do_steps(s, data, f, f_acc, send);
3072 rec
3073 }
3074
3075 fn step_repeatedly<F, FAcc, B, M>(
3076 state: &mut ParallelScan<B, M>,
3077 data: &Receiver<B>,
3078 f: F,
3079 f_acc: FAcc,
3080 ) -> Receiver<Option<(M, Vec<B>)>>
3081 where
3082 F: Fn(&AvailableJob<B, M>) -> M,
3083 FAcc: Fn(Option<(M, Vec<B>)>, (M, Vec<B>)) -> Option<(M, Vec<B>)>,
3084 B: Debug + Clone + 'static,
3085 M: Debug + Clone + 'static,
3086 {
3087 let (send, rec) = std::sync::mpsc::sync_channel::<Option<(M, Vec<B>)>>(1);
3088 do_steps(state, data, f, f_acc, send);
3089 rec
3090 }
3091
3092 #[test]
3094 fn scan_can_be_initialized_from_intermediate_state() {
3095 for _ in 0..10 {
3096 let mut state = gen(job_done, fun_merge_up);
3097
3098 println!("state={:#?}", state);
3099
3100 let do_one_next = Arc::new(AtomicBool::new(false));
3101
3102 let do_one_next_clone = Arc::clone(&do_one_next);
3103 let (send, recv) = sync_channel(1);
3104
3105 thread::spawn(move || loop {
3106 let v = if do_one_next_clone.load(Relaxed) {
3107 do_one_next_clone.store(false, Relaxed);
3108 1i64
3109 } else {
3110 0i64
3111 };
3112 if send.send(v).is_err() {
3113 return;
3114 }
3115 });
3116
3117 let one_then_zeros = recv;
3118
3119 let parallelism = state.max_base_jobs * ceil_log2(state.max_base_jobs);
3120 let old_acc = state.acc.as_ref().cloned().unwrap_or((0, vec![]));
3121
3122 let fill_some_zero =
3123 |state: &mut ParallelScan<i64, i64>, v: i64, r: &Receiver<i64>| -> i64 {
3124 (0..parallelism * parallelism).fold(v, |acc, _| {
3125 let pipe = step_repeatedly(state, r, job_done, fun_merge_up);
3126
3127 match pipe.recv() {
3128 Ok(Some((v, _))) => v,
3129 Ok(None) => acc,
3130 Err(_) => acc,
3131 }
3132 })
3133 };
3134
3135 let v = fill_some_zero(&mut state, 0, &one_then_zeros);
3136
3137 do_one_next.store(true, Relaxed);
3138
3139 let acc = { state.acc.clone().unwrap() };
3140
3141 assert_ne!(acc.0, old_acc.0);
3142
3143 fill_some_zero(&mut state, v, &one_then_zeros);
3144
3145 let acc_plus_one = { state.acc.unwrap() };
3146 assert_eq!(acc_plus_one.0, acc.0.wrapping_add(1));
3147 }
3148 }
3149
3150 #[test]
3154 fn scan_behaves_like_a_fold_long_term() {
3155 fn fun_merge_up(
3156 tuple: Option<(i64, Vec<String>)>,
3157 mut x: (i64, Vec<String>),
3158 ) -> Option<(i64, Vec<String>)> {
3159 let mut acc = tuple?;
3160 acc.1.append(&mut x.1);
3161
3162 Some((acc.0.wrapping_add(x.0), acc.1))
3163 }
3164
3165 fn job_done(job: &AvailableJob<String, i64>) -> i64 {
3166 match job {
3167 AvailableJob::Base(x) => x.parse().unwrap(),
3168 AvailableJob::Merge { left, right } => left.wrapping_add(*right),
3169 }
3170 }
3171
3172 let (send, recv) = sync_channel(1);
3173
3174 let depth = 7;
3175 let count: i64 = 1000;
3176
3177 thread::spawn(move || {
3178 let mut count = count;
3179 let x = count;
3180 loop {
3181 let next = if count <= 0 {
3182 "0".to_string()
3183 } else {
3184 (x - count).to_string()
3185 };
3186
3187 count -= 1;
3188
3189 if send.send(next).is_err() {
3190 return;
3191 }
3192 }
3193 });
3194
3195 let mut s = ParallelScan::<String, i64>::empty(2u64.pow(depth as u32), 1);
3196
3197 let after_3n = (0..4 * count).fold(0i64, |acc, _| {
3198 let result = scan(&mut s, &recv, job_done, fun_merge_up);
3199 match result.recv() {
3200 Ok(Some((v, _))) => v,
3201 Ok(None) => acc,
3202 Err(_) => acc,
3203 }
3204 });
3205
3206 let expected = (0..count).fold(0i64, |a, b| a.wrapping_add(b));
3207
3208 assert_eq!(after_3n, expected);
3209 }
3210
3211 #[test]
3216 fn scan_concat_over_strings() {
3217 fn fun_merge_up(
3218 tuple: Option<(String, Vec<String>)>,
3219 mut x: (String, Vec<String>),
3220 ) -> Option<(String, Vec<String>)> {
3221 let mut acc = tuple?;
3222 acc.1.append(&mut x.1);
3223
3224 Some((format!("{}{}", acc.0, x.0), acc.1))
3225 }
3226
3227 fn job_done(job: &AvailableJob<String, String>) -> String {
3228 match job {
3229 AvailableJob::Base(x) => x.clone(),
3230 AvailableJob::Merge { left, right } => {
3231 format!("{}{}", left, right)
3232 }
3233 }
3234 }
3235
3236 let (send, recv) = sync_channel(1);
3237
3238 let depth = 7;
3239 let count: i64 = 100;
3240
3241 thread::spawn(move || {
3242 let mut count = count;
3243 let x = count;
3244 loop {
3245 let next = if count <= 0 {
3246 "".to_string()
3247 } else {
3248 let n = (x - count).to_string();
3249 format!("{},", n)
3250 };
3251
3252 count -= 1;
3253
3254 if send.send(next).is_err() {
3255 return;
3256 }
3257 }
3258 });
3259
3260 let mut s = ParallelScan::<String, String>::empty(2u64.pow(depth as u32), 1);
3261
3262 let after_3n = (0..42 * count).fold(String::new(), |acc, _| {
3263 let result = scan(&mut s, &recv, job_done, fun_merge_up);
3264 match result.recv() {
3265 Ok(Some((v, _))) => v,
3266 Ok(None) => acc,
3267 Err(_) => acc,
3268 }
3269 });
3270
3271 let expected = (0..count)
3272 .map(|i| format!("{},", i))
3273 .fold(String::new(), |a, b| format!("{}{}", a, b));
3274
3275 assert_eq!(after_3n, expected);
3276 }
3277}