Skip to main content

mina_tree/scan_state/
scan_state.rs

1use std::{collections::HashSet, sync::Arc};
2
3use blake2::{
4    digest::{generic_array::GenericArray, typenum::U32},
5    Digest,
6};
7use mina_core::{constants::ConstraintConstants, snark::SnarkJobId};
8use mina_curves::pasta::Fp;
9use mina_p2p_messages::{
10    binprot,
11    v2::{
12        MinaStateProtocolStateValueStableV2,
13        TransactionSnarkScanStateLedgerProofWithSokMessageStableV2,
14        TransactionSnarkScanStateTransactionWithWitnessStableV2,
15    },
16};
17use mina_signer::CompressedPubKey;
18use sha2::Sha256;
19
20use crate::{
21    scan_state::{
22        parallel_scan::{base, merge, JobStatus},
23        pending_coinbase,
24        scan_state::transaction_snark::{
25            LedgerProofWithSokMessage, SokMessage, Statement, TransactionWithWitness,
26        },
27        transaction_logic::{
28            apply_transaction_first_pass, apply_transaction_second_pass,
29            local_state::LocalStateEnv,
30            protocol_state::GlobalState,
31            transaction_partially_applied::{
32                TransactionPartiallyApplied, ZkappCommandPartiallyApplied,
33            },
34            TransactionStatus,
35        },
36    },
37    sparse_ledger::SparseLedger,
38    staged_ledger::hash::AuxHash,
39    verifier::Verifier,
40    zkapps::non_snark::LedgerNonSnark,
41};
42
43use self::transaction_snark::{InitStack, LedgerProof, OneOrTwo, Registers};
44
45use super::{
46    currency::{Fee, Slot},
47    parallel_scan::ParallelScan,
48    snark_work,
49    transaction_logic::{
50        local_state::LocalState,
51        protocol_state::{protocol_state_view, ProtocolStateView},
52        transaction_applied::TransactionApplied,
53        transaction_witness::TransactionWitness,
54        Transaction, WithStatus,
55    },
56};
57// use super::parallel_scan::AvailableJob;
58
59pub use super::parallel_scan::{
60    base::Job as JobValueBase, merge::Job as JobValueMerge,
61    AvailableJob as ParallelScanAvailableJob, JobValue, JobValueWithIndex, SpacePartition,
62};
63
64// type LedgerProof = LedgerProofProdStableV2;
65// type LedgerProofWithSokMessage = TransactionSnarkScanStateLedgerProofWithSokMessageStableV2;
66// type TransactionWithWitness = TransactionSnarkScanStateTransactionWithWitnessStableV2;
67
68pub type AvailableJobMessage = super::parallel_scan::AvailableJob<
69    TransactionSnarkScanStateTransactionWithWitnessStableV2,
70    TransactionSnarkScanStateLedgerProofWithSokMessageStableV2,
71>;
72pub type AvailableJob = super::parallel_scan::AvailableJob<
73    Arc<transaction_snark::TransactionWithWitness>,
74    Arc<transaction_snark::LedgerProofWithSokMessage>,
75>;
76
77#[derive(Clone, Debug, PartialEq)]
78pub struct BorderBlockContinuedInTheNextTree(pub(super) bool);
79
80/// Scan state and any zkapp updates that were applied to the to the most recent
81/// snarked ledger but are from the tree just before the tree corresponding to
82/// the snarked ledger*)
83#[derive(Clone)]
84pub struct ScanState {
85    pub scan_state: ParallelScan<
86        Arc<transaction_snark::TransactionWithWitness>,
87        Arc<transaction_snark::LedgerProofWithSokMessage>,
88    >,
89    pub previous_incomplete_zkapp_updates: (
90        Vec<Arc<transaction_snark::TransactionWithWitness>>,
91        BorderBlockContinuedInTheNextTree,
92    ),
93}
94
95pub mod transaction_snark {
96    use std::sync::Arc;
97
98    use itertools::Itertools;
99    use mina_curves::pasta::Fp;
100    use mina_p2p_messages::{binprot, string::ByteString, v2::TransactionSnarkProofStableV2};
101    use mina_signer::CompressedPubKey;
102    use serde::{Deserialize, Serialize};
103
104    use crate::{
105        proofs::{
106            field::{field, Boolean},
107            witness::Witness,
108        },
109        scan_state::{
110            currency::{Amount, Signed, Slot},
111            fee_excess::FeeExcess,
112            pending_coinbase,
113            transaction_logic::{local_state::LocalState, transaction_applied::TransactionApplied},
114        },
115        sparse_ledger::SparseLedger,
116        staged_ledger::hash::OCamlString,
117        AppendToInputs as _, ToInputs,
118    };
119
120    use super::Fee;
121    use poseidon::hash::Inputs;
122
123    pub type LedgerHash = Fp;
124
125    /// <https://github.com/MinaProtocol/mina/blob/436023ba41c43a50458a551b7ef7a9ae61670b25/src/lib/mina_state/registers.ml>
126    #[derive(Debug, Clone, PartialEq, Eq)]
127    pub struct Registers {
128        pub first_pass_ledger: LedgerHash,
129        pub second_pass_ledger: LedgerHash,
130        pub pending_coinbase_stack: pending_coinbase::Stack,
131        pub local_state: LocalState,
132    }
133
134    impl ToInputs for Registers {
135        /// <https://github.com/MinaProtocol/mina/blob/4e0b324912017c3ff576704ee397ade3d9bda412/src/lib/mina_state/registers.ml#L30>
136        fn to_inputs(&self, inputs: &mut Inputs) {
137            let Self {
138                first_pass_ledger,
139                second_pass_ledger,
140                pending_coinbase_stack,
141                local_state,
142            } = self;
143
144            inputs.append(first_pass_ledger);
145            inputs.append(second_pass_ledger);
146            inputs.append(pending_coinbase_stack);
147            inputs.append(local_state);
148        }
149    }
150
151    impl Registers {
152        /// <https://github.com/MinaProtocol/mina/blob/2ee6e004ba8c6a0541056076aab22ea162f7eb3a/src/lib/transaction_snark/transaction_snark.ml#L350>
153        pub fn check_equal(&self, other: &Self) -> bool {
154            let Self {
155                first_pass_ledger,
156                second_pass_ledger,
157                pending_coinbase_stack,
158                local_state,
159            } = self;
160
161            first_pass_ledger == &other.first_pass_ledger
162                && second_pass_ledger == &other.second_pass_ledger
163                && local_state == &other.local_state
164                && pending_coinbase::Stack::connected(
165                    pending_coinbase_stack,
166                    &other.pending_coinbase_stack,
167                    None,
168                )
169        }
170
171        /// <https://github.com/MinaProtocol/mina/blob/436023ba41c43a50458a551b7ef7a9ae61670b25/src/lib/mina_state/registers.ml#L55>
172        pub fn connected(r1: &Self, r2: &Self) -> bool {
173            let Self {
174                first_pass_ledger,
175                second_pass_ledger,
176                pending_coinbase_stack,
177                local_state,
178            } = r1;
179
180            first_pass_ledger == &r2.first_pass_ledger
181                && second_pass_ledger == &r2.second_pass_ledger
182                && local_state == &r2.local_state
183                && pending_coinbase::Stack::connected(
184                    pending_coinbase_stack,
185                    &r2.pending_coinbase_stack,
186                    None,
187                )
188        }
189    }
190
191    #[derive(Clone, PartialEq, Eq, derive_more::Deref)]
192    pub struct SokDigest(pub Vec<u8>);
193
194    impl From<SokDigest> for ByteString {
195        fn from(value: SokDigest) -> Self {
196            value.0.into()
197        }
198    }
199
200    impl From<&SokDigest> for ByteString {
201        fn from(value: &SokDigest) -> Self {
202            value.0.clone().into()
203        }
204    }
205
206    impl OCamlString for SokDigest {
207        fn to_ocaml_str(&self) -> String {
208            crate::staged_ledger::hash::to_ocaml_str(&self.0)
209        }
210
211        fn from_ocaml_str(s: &str) -> Self {
212            let bytes: [u8; 32] = crate::staged_ledger::hash::from_ocaml_str(s);
213            Self(bytes.to_vec())
214        }
215    }
216
217    impl std::fmt::Debug for SokDigest {
218        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
219            f.write_fmt(format_args!("SokDigest({})", self.to_ocaml_str()))
220        }
221    }
222
223    impl Default for SokDigest {
224        /// <https://github.com/MinaProtocol/mina/blob/3a78f0e0c1343d14e2729c8b00205baa2ec70c93/src/lib/mina_base/sok_message.ml#L76>
225        fn default() -> Self {
226            Self(vec![0; 32])
227        }
228    }
229
230    pub struct StatementLedgers {
231        first_pass_ledger_source: LedgerHash,
232        first_pass_ledger_target: LedgerHash,
233        second_pass_ledger_source: LedgerHash,
234        second_pass_ledger_target: LedgerHash,
235        connecting_ledger_left: LedgerHash,
236        connecting_ledger_right: LedgerHash,
237        local_state_ledger_source: Fp,
238        local_state_ledger_target: Fp,
239    }
240
241    impl StatementLedgers {
242        /// <https://github.com/MinaProtocol/mina/blob/436023ba41c43a50458a551b7ef7a9ae61670b25/src/lib/mina_state/snarked_ledger_state.ml#L530>
243        pub fn of_statement<T>(s: &Statement<T>) -> Self {
244            Self {
245                first_pass_ledger_source: s.source.first_pass_ledger,
246                first_pass_ledger_target: s.target.first_pass_ledger,
247                second_pass_ledger_source: s.source.second_pass_ledger,
248                second_pass_ledger_target: s.target.second_pass_ledger,
249                connecting_ledger_left: s.connecting_ledger_left,
250                connecting_ledger_right: s.connecting_ledger_right,
251                local_state_ledger_source: s.source.local_state.ledger,
252                local_state_ledger_target: s.target.local_state.ledger,
253            }
254        }
255    }
256
257    /// <https://github.com/MinaProtocol/mina/blob/436023ba41c43a50458a551b7ef7a9ae61670b25/src/lib/mina_state/snarked_ledger_state.ml#L546>
258    fn validate_ledgers_at_merge(
259        s1: &StatementLedgers,
260        s2: &StatementLedgers,
261    ) -> Result<bool, String> {
262        // Check ledgers are valid based on the rules described in
263        // <https://github.com/MinaProtocol/mina/discussions/12000>
264        let is_same_block_at_shared_boundary = {
265            // First statement ends and the second statement starts in the
266            // same block. It could be within a single scan state tree
267            // or across two scan state trees
268            s1.connecting_ledger_right == s2.connecting_ledger_left
269        };
270
271        // Rule 1
272        let l1 = if is_same_block_at_shared_boundary {
273            // first pass ledger continues
274            &s2.first_pass_ledger_source
275        } else {
276            // s1's first pass ledger stops at the end of a block's transactions,
277            // check that it is equal to the start of the block's second pass ledger
278            &s1.connecting_ledger_right
279        };
280        let rule1 = "First pass ledger continues or first pass ledger connects to the \
281             same block's start of the second pass ledger";
282        let res1 = &s1.first_pass_ledger_target == l1;
283
284        // Rule 2
285        // s1's second pass ledger ends at say, block B1. s2 is in the next block, say B2
286        let l2 = if is_same_block_at_shared_boundary {
287            // second pass ledger continues
288            &s1.second_pass_ledger_target
289        } else {
290            // s2's second pass ledger starts where B2's first pass ledger ends
291            &s2.connecting_ledger_left
292        };
293        let rule2 = "Second pass ledger continues or second pass ledger of the statement on \
294             the right connects to the same block's end of first pass ledger";
295        let res2 = &s2.second_pass_ledger_source == l2;
296
297        // Rule 3
298        let l3 = if is_same_block_at_shared_boundary {
299            // no-op
300            &s1.second_pass_ledger_target
301        } else {
302            // s2's first pass ledger starts where B1's second pass ledger ends
303            &s2.first_pass_ledger_source
304        };
305        let rule3 = "First pass ledger of the statement on the right connects to the second \
306             pass ledger of the statement on the left";
307        let res3 = &s1.second_pass_ledger_target == l3;
308
309        let rule4 = "local state ledgers are equal or transition correctly from first pass \
310             to second pass";
311        let res4 = {
312            let local_state_ledger_equal =
313                s2.local_state_ledger_source == s1.local_state_ledger_target;
314
315            let local_state_ledger_transitions = s2.local_state_ledger_source
316                == s2.second_pass_ledger_source
317                && s1.local_state_ledger_target == s1.first_pass_ledger_target;
318
319            local_state_ledger_equal || local_state_ledger_transitions
320        };
321
322        let faileds = [(res1, rule1), (res2, rule2), (res3, rule3), (res4, rule4)]
323            .iter()
324            .filter_map(|(v, s)| if *v { None } else { Some(*s) })
325            .collect::<Vec<_>>();
326
327        if !faileds.is_empty() {
328            return Err(format!("Constraints failed: {}", faileds.iter().join(",")));
329        }
330
331        Ok(res1 && res2 && res3 && res4)
332    }
333
334    fn valid_ledgers_at_merge_unchecked(
335        s1: &StatementLedgers,
336        s2: &StatementLedgers,
337    ) -> Result<bool, String> {
338        validate_ledgers_at_merge(s1, s2)
339    }
340
341    // TODO: Dedup with `validate_ledgers_at_merge_checked`
342    pub fn validate_ledgers_at_merge_checked(
343        s1: &StatementLedgers,
344        s2: &StatementLedgers,
345        w: &mut Witness<Fp>,
346    ) -> Boolean {
347        let is_same_block_at_shared_boundary =
348            field::equal(s1.connecting_ledger_right, s2.connecting_ledger_left, w);
349        let l1 = w.exists_no_check(match is_same_block_at_shared_boundary {
350            Boolean::True => s2.first_pass_ledger_source,
351            Boolean::False => s1.connecting_ledger_right,
352        });
353        let res1 = field::equal(s1.first_pass_ledger_target, l1, w);
354        let l2 = w.exists_no_check(match is_same_block_at_shared_boundary {
355            Boolean::True => s1.second_pass_ledger_target,
356            Boolean::False => s2.connecting_ledger_left,
357        });
358        let res2 = field::equal(s2.second_pass_ledger_source, l2, w);
359        let l3 = w.exists_no_check(match is_same_block_at_shared_boundary {
360            Boolean::True => s1.second_pass_ledger_target,
361            Boolean::False => s2.first_pass_ledger_source,
362        });
363        let res3 = field::equal(s1.second_pass_ledger_target, l3, w);
364        let res4 = {
365            let local_state_ledger_equal = field::equal(
366                s2.local_state_ledger_source,
367                s1.local_state_ledger_target,
368                w,
369            );
370
371            // We decompose this way because of OCaml evaluation order
372            let b = field::equal(s1.local_state_ledger_target, s1.first_pass_ledger_target, w);
373            let a = field::equal(
374                s2.local_state_ledger_source,
375                s2.second_pass_ledger_source,
376                w,
377            );
378            let local_state_ledger_transitions = Boolean::all(&[a, b], w);
379
380            local_state_ledger_equal.or(&local_state_ledger_transitions, w)
381        };
382        // NOTES: No accumulate_failures here
383        Boolean::all(&[res1, res2, res3, res4], w)
384    }
385
386    #[derive(Debug, Clone, PartialEq, Eq)]
387    pub struct Statement<D> {
388        pub source: Registers,
389        pub target: Registers,
390        pub connecting_ledger_left: LedgerHash,
391        pub connecting_ledger_right: LedgerHash,
392        pub supply_increase: Signed<Amount>,
393        pub fee_excess: FeeExcess,
394        pub sok_digest: D,
395    }
396
397    impl ToInputs for Statement<SokDigest> {
398        /// <https://github.com/MinaProtocol/mina/blob/4e0b324912017c3ff576704ee397ade3d9bda412/src/lib/mina_state/snarked_ledger_state.ml#L263>
399        fn to_inputs(&self, inputs: &mut Inputs) {
400            let Self {
401                source,
402                target,
403                connecting_ledger_left,
404                connecting_ledger_right,
405                supply_increase,
406                fee_excess,
407                sok_digest,
408            } = self;
409
410            inputs.append_bytes(sok_digest);
411
412            inputs.append(source);
413            inputs.append(target);
414            inputs.append(connecting_ledger_left);
415            inputs.append(connecting_ledger_right);
416            inputs.append(supply_increase);
417            inputs.append(fee_excess);
418        }
419    }
420
421    impl Statement<SokDigest> {
422        pub fn without_digest(self) -> Statement<()> {
423            let Self {
424                source,
425                target,
426                connecting_ledger_left,
427                connecting_ledger_right,
428                supply_increase,
429                fee_excess,
430                sok_digest: _,
431            } = self;
432
433            Statement::<()> {
434                source,
435                target,
436                connecting_ledger_left,
437                connecting_ledger_right,
438                supply_increase,
439                fee_excess,
440                sok_digest: (),
441            }
442        }
443
444        pub fn with_digest(self, sok_digest: SokDigest) -> Self {
445            Self { sok_digest, ..self }
446        }
447    }
448
449    impl Statement<()> {
450        pub fn with_digest(self, sok_digest: SokDigest) -> Statement<SokDigest> {
451            let Self {
452                source,
453                target,
454                connecting_ledger_left,
455                connecting_ledger_right,
456                supply_increase,
457                fee_excess,
458                sok_digest: _,
459            } = self;
460
461            Statement::<SokDigest> {
462                source,
463                target,
464                connecting_ledger_left,
465                connecting_ledger_right,
466                supply_increase,
467                fee_excess,
468                sok_digest,
469            }
470        }
471
472        /// <https://github.com/MinaProtocol/mina/blob/436023ba41c43a50458a551b7ef7a9ae61670b25/src/lib/mina_state/snarked_ledger_state.ml#L631>
473        pub fn merge(&self, s2: &Statement<()>) -> Result<Self, String> {
474            let or_error_of_bool = |b: bool, error: &str| {
475                if b {
476                    Ok(())
477                } else {
478                    Err(format!(
479                        "Error merging statements left: {:#?} right {:#?}: {}",
480                        self, s2, error
481                    ))
482                }
483            };
484
485            // check ledgers are connected
486            let s1_ledger = StatementLedgers::of_statement(self);
487            let s2_ledger = StatementLedgers::of_statement(s2);
488
489            valid_ledgers_at_merge_unchecked(&s1_ledger, &s2_ledger)?;
490
491            // Check pending coinbase stack is connected
492            or_error_of_bool(
493                pending_coinbase::Stack::connected(
494                    &self.target.pending_coinbase_stack,
495                    &s2.source.pending_coinbase_stack,
496                    None,
497                ),
498                "Pending coinbase stacks are not connected",
499            )?;
500
501            // Check local states sans ledger are equal. Local state ledgers are checked
502            // in [valid_ledgers_at_merge_uncheckeds]
503            or_error_of_bool(
504                self.target
505                    .local_state
506                    .equal_without_ledger(&s2.source.local_state),
507                "Local states are not connected",
508            )?;
509
510            let connecting_ledger_left = self.connecting_ledger_left;
511            let connecting_ledger_right = s2.connecting_ledger_right;
512
513            let fee_excess = FeeExcess::combine(&self.fee_excess, &s2.fee_excess)?;
514            let supply_increase = self
515                .supply_increase
516                .add(&s2.supply_increase)
517                .ok_or_else(|| "Error adding supply_increase".to_string())?;
518
519            // assert!(self.target.check_equal(&s2.source));
520
521            Ok(Self {
522                source: self.source.clone(),
523                target: s2.target.clone(),
524                supply_increase,
525                fee_excess,
526                sok_digest: (),
527                connecting_ledger_left,
528                connecting_ledger_right,
529            })
530        }
531    }
532
533    pub mod work {
534        use mina_p2p_messages::bigint::InvalidBigInt;
535
536        use super::*;
537
538        pub type Statement = OneOrTwo<super::Statement<()>>;
539
540        #[derive(Debug, Clone, PartialEq)]
541        pub struct Work {
542            pub fee: Fee,
543            pub proofs: OneOrTwo<LedgerProof>,
544            pub prover: CompressedPubKey,
545        }
546
547        pub type Unchecked = Work;
548
549        pub type Checked = Work;
550
551        impl TryFrom<&mina_core::snark::Snark> for Work {
552            type Error = InvalidBigInt;
553
554            fn try_from(value: &mina_core::snark::Snark) -> Result<Self, Self::Error> {
555                Ok(Self {
556                    prover: (&value.snarker).try_into()?,
557                    fee: (&value.fee).into(),
558                    proofs: (&*value.proofs).try_into()?,
559                })
560            }
561        }
562
563        impl Work {
564            pub fn statement(&self) -> Statement {
565                self.proofs.map(|p| {
566                    let statement = p.statement();
567                    super::Statement::<()> {
568                        source: statement.source,
569                        target: statement.target,
570                        supply_increase: statement.supply_increase,
571                        fee_excess: statement.fee_excess,
572                        sok_digest: (),
573                        connecting_ledger_left: statement.connecting_ledger_left,
574                        connecting_ledger_right: statement.connecting_ledger_right,
575                    }
576                })
577            }
578        }
579
580        impl Checked {
581            /// <https://github.com/MinaProtocol/mina/blob/05c2f73d0f6e4f1341286843814ce02dcb3919e0/src/lib/transaction_snark_work/transaction_snark_work.ml#L121>
582            pub fn forget(self) -> Unchecked {
583                self
584            }
585        }
586    }
587
588    // TransactionSnarkPendingCoinbaseStackStateInitStackStableV1
589    #[derive(Debug, Clone, PartialEq)]
590    pub enum InitStack {
591        Base(pending_coinbase::Stack),
592        Merge,
593    }
594
595    #[derive(Debug, Clone, PartialEq)]
596    pub struct TransactionWithWitness {
597        pub transaction_with_info: TransactionApplied,
598        pub state_hash: (Fp, Fp), // (StateHash, StateBodyHash)
599        // pub state_hash: (StateHash, MinaBaseStateBodyHashStableV1),
600        pub statement: Statement<()>,
601        pub init_stack: InitStack,
602        pub first_pass_ledger_witness: SparseLedger,
603        pub second_pass_ledger_witness: SparseLedger,
604        pub block_global_slot: Slot,
605    }
606
607    #[derive(Debug, Clone, PartialEq)]
608    pub struct TransactionSnark<D> {
609        pub statement: Statement<D>,
610        pub proof: Arc<TransactionSnarkProofStableV2>,
611    }
612
613    #[derive(Debug, Clone, PartialEq)]
614    pub struct LedgerProof(pub TransactionSnark<SokDigest>);
615
616    impl LedgerProof {
617        pub fn create(
618            statement: Statement<()>,
619            sok_digest: SokDigest,
620            proof: Arc<TransactionSnarkProofStableV2>,
621        ) -> Self {
622            let statement = Statement::<SokDigest> {
623                source: statement.source,
624                target: statement.target,
625                supply_increase: statement.supply_increase,
626                fee_excess: statement.fee_excess,
627                sok_digest,
628                connecting_ledger_left: statement.connecting_ledger_left,
629                connecting_ledger_right: statement.connecting_ledger_right,
630            };
631
632            Self(TransactionSnark { statement, proof })
633        }
634
635        pub fn statement(&self) -> Statement<()> {
636            let Statement {
637                source,
638                target,
639                connecting_ledger_left,
640                connecting_ledger_right,
641                supply_increase,
642                fee_excess,
643                sok_digest: _,
644            } = &self.0.statement;
645
646            Statement::<()> {
647                source: source.clone(),
648                target: target.clone(),
649                supply_increase: *supply_increase,
650                fee_excess: fee_excess.clone(),
651                sok_digest: (),
652                connecting_ledger_left: *connecting_ledger_left,
653                connecting_ledger_right: *connecting_ledger_right,
654            }
655        }
656
657        pub fn statement_ref(&self) -> &Statement<SokDigest> {
658            &self.0.statement
659        }
660    }
661
662    #[derive(Debug, Clone, PartialEq)]
663    pub struct SokMessage {
664        pub fee: Fee,
665        pub prover: CompressedPubKey,
666    }
667
668    impl SokMessage {
669        pub fn create(fee: Fee, prover: CompressedPubKey) -> Self {
670            Self { fee, prover }
671        }
672
673        pub fn digest(&self) -> SokDigest {
674            use binprot::BinProtWrite;
675
676            let mut bytes = Vec::with_capacity(10000);
677            let binprot: mina_p2p_messages::v2::MinaBaseSokMessageStableV1 = self.into();
678            binprot.binprot_write(&mut bytes).unwrap();
679
680            use blake2::{
681                digest::{Update, VariableOutput},
682                Blake2bVar,
683            };
684            let mut hasher = Blake2bVar::new(32).expect("Invalid Blake2bVar output size");
685            hasher.update(bytes.as_slice());
686            let digest = hasher.finalize_boxed();
687
688            SokDigest(digest.into())
689        }
690    }
691
692    #[derive(Debug, Clone, PartialEq)]
693    pub struct LedgerProofWithSokMessage {
694        pub proof: LedgerProof,
695        pub sok_message: SokMessage,
696    }
697
698    #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
699    #[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
700    pub enum OneOrTwo<T> {
701        One(T),
702        Two((T, T)),
703    }
704
705    impl<T> OneOrTwo<T> {
706        pub fn len(&self) -> usize {
707            match self {
708                OneOrTwo::One(_) => 1,
709                OneOrTwo::Two(_) => 2,
710            }
711        }
712
713        pub fn iter(&self) -> OneOrTwoIter<'_, T> {
714            let array = match self {
715                OneOrTwo::One(a) => [Some(a), None],
716                OneOrTwo::Two((a, b)) => [Some(a), Some(b)],
717            };
718
719            OneOrTwoIter {
720                inner: array,
721                index: 0,
722            }
723        }
724
725        #[allow(clippy::should_implement_trait)]
726        pub fn into_iter(self) -> OneOrTwoIntoIter<T> {
727            let array = match self {
728                OneOrTwo::One(a) => [Some(a), None],
729                OneOrTwo::Two((a, b)) => [Some(a), Some(b)],
730            };
731
732            OneOrTwoIntoIter {
733                inner: array,
734                index: 0,
735            }
736        }
737
738        pub fn map<F, R>(&self, fun: F) -> OneOrTwo<R>
739        where
740            F: Fn(&T) -> R,
741        {
742            match self {
743                OneOrTwo::One(one) => OneOrTwo::One(fun(one)),
744                OneOrTwo::Two((a, b)) => OneOrTwo::Two((fun(a), fun(b))),
745            }
746        }
747
748        pub fn into_map<F, R>(self, fun: F) -> OneOrTwo<R>
749        where
750            F: Fn(T) -> R,
751        {
752            match self {
753                OneOrTwo::One(one) => OneOrTwo::One(fun(one)),
754                OneOrTwo::Two((a, b)) => OneOrTwo::Two((fun(a), fun(b))),
755            }
756        }
757
758        pub fn into_map_err<F, R, E>(self, fun: F) -> Result<OneOrTwo<R>, E>
759        where
760            F: Fn(T) -> Result<R, E>,
761        {
762            match self {
763                OneOrTwo::One(one) => Ok(OneOrTwo::One(fun(one)?)),
764                OneOrTwo::Two((a, b)) => Ok(OneOrTwo::Two((fun(a)?, fun(b)?))),
765            }
766        }
767
768        pub fn into_map_some<F, R>(self, fun: F) -> Option<OneOrTwo<R>>
769        where
770            F: Fn(T) -> Option<R>,
771        {
772            match self {
773                OneOrTwo::One(one) => Some(OneOrTwo::One(fun(one)?)),
774                OneOrTwo::Two((a, b)) => {
775                    let a = fun(a)?;
776                    match fun(b) {
777                        Some(b) => Some(OneOrTwo::Two((a, b))),
778                        None => Some(OneOrTwo::One(a)),
779                    }
780                }
781            }
782        }
783
784        /// <https://github.com/MinaProtocol/mina/blob/05c2f73d0f6e4f1341286843814ce02dcb3919e0/src/lib/one_or_two/one_or_two.ml#L54>
785        pub fn zip<B>(a: OneOrTwo<T>, b: OneOrTwo<B>) -> Result<OneOrTwo<(T, B)>, String> {
786            use OneOrTwo::*;
787
788            match (a, b) {
789                (One(a), One(b)) => Ok(One((a, b))),
790                (Two((a1, a2)), Two((b1, b2))) => Ok(Two(((a1, b1), (a2, b2)))),
791                (One(_), Two(_)) | (Two(_), One(_)) => Err("One_or_two.zip mismatched".to_string()),
792            }
793        }
794
795        pub fn fold<A, F>(&self, init: A, fun: F) -> A
796        where
797            F: Fn(A, &T) -> A,
798        {
799            match self {
800                OneOrTwo::One(a) => fun(init, a),
801                OneOrTwo::Two((a, b)) => fun(fun(init, a), b),
802            }
803        }
804    }
805
806    pub struct OneOrTwoIter<'a, T> {
807        inner: [Option<&'a T>; 2],
808        index: usize,
809    }
810
811    impl<'a, T> Iterator for OneOrTwoIter<'a, T> {
812        type Item = &'a T;
813
814        fn next(&mut self) -> Option<Self::Item> {
815            let value = self.inner.get(self.index)?.as_ref()?;
816            self.index += 1;
817
818            Some(value)
819        }
820    }
821
822    pub struct OneOrTwoIntoIter<T> {
823        inner: [Option<T>; 2],
824        index: usize,
825    }
826
827    impl<T> Iterator for OneOrTwoIntoIter<T> {
828        type Item = T;
829
830        fn next(&mut self) -> Option<Self::Item> {
831            let value = self.inner.get_mut(self.index)?.take()?;
832            self.index += 1;
833
834            Some(value)
835        }
836    }
837}
838
839fn sha256_digest(bytes: &[u8]) -> GenericArray<u8, U32> {
840    let mut sha: Sha256 = Sha256::new();
841    sha.update(bytes);
842    sha.finalize()
843}
844
845impl ScanState {
846    pub fn hash(&self) -> AuxHash {
847        use binprot::BinProtWrite;
848
849        let Self {
850            scan_state,
851            previous_incomplete_zkapp_updates,
852        } = self;
853
854        let state_hash = scan_state.hash(
855            |buffer, proof| {
856                // #[cfg(test)]
857                // {
858                //     let a: mina_p2p_messages::v2::TransactionSnarkScanStateLedgerProofWithSokMessageStableV2 = proof.as_ref().into();
859                //     let b: LedgerProofWithSokMessage = (&a).into();
860                //     assert_eq!(&b, proof.as_ref());
861                // }
862
863                proof.binprot_write(buffer).unwrap();
864            },
865            |buffer, transaction| {
866                // #[cfg(test)]
867                // {
868                //     let a: mina_p2p_messages::v2::TransactionSnarkScanStateTransactionWithWitnessStableV2 = transaction.as_ref().into();
869                //     let b: TransactionWithWitness = (&a).into();
870                //     assert_eq!(&b, transaction.as_ref());
871                // }
872
873                transaction.binprot_write(buffer).unwrap();
874            },
875        );
876
877        let (
878            previous_incomplete_zkapp_updates,
879            BorderBlockContinuedInTheNextTree(continue_in_next_tree),
880        ) = previous_incomplete_zkapp_updates;
881
882        let incomplete_updates = previous_incomplete_zkapp_updates.iter().fold(
883            Vec::with_capacity(1024 * 32),
884            |mut accum, tx| {
885                tx.binprot_write(&mut accum).unwrap();
886                accum
887            },
888        );
889        let incomplete_updates = sha256_digest(&incomplete_updates);
890
891        let continue_in_next_tree = match continue_in_next_tree {
892            true => "true",
893            false => "false",
894        };
895        let continue_in_next_tree = sha256_digest(continue_in_next_tree.as_bytes());
896
897        let mut bytes = Vec::with_capacity(2048);
898        bytes.extend_from_slice(&state_hash);
899        bytes.extend_from_slice(&incomplete_updates);
900        bytes.extend_from_slice(&continue_in_next_tree);
901        let digest = sha256_digest(&bytes);
902
903        AuxHash(digest.into())
904    }
905}
906
907/// <https://github.com/MinaProtocol/mina/blob/e5183ca1dde1c085b4c5d37d1d9987e24c294c32/src/lib/transaction_snark_scan_state/transaction_snark_scan_state.ml#L175>
908fn create_expected_statement<F>(
909    constraint_constants: &ConstraintConstants,
910    get_state: F,
911    connecting_merkle_root: Fp,
912    TransactionWithWitness {
913        transaction_with_info,
914        state_hash,
915        statement,
916        init_stack,
917        first_pass_ledger_witness,
918        second_pass_ledger_witness,
919        block_global_slot,
920    }: &TransactionWithWitness,
921) -> Result<Statement<()>, String>
922where
923    F: Fn(Fp) -> MinaStateProtocolStateValueStableV2,
924{
925    // TODO: Don't clone here
926    let source_first_pass_merkle_root = first_pass_ledger_witness.clone().merkle_root();
927    let source_second_pass_merkle_root = second_pass_ledger_witness.clone().merkle_root();
928
929    let WithStatus {
930        data: transaction, ..
931    } = transaction_with_info.transaction();
932
933    let protocol_state = get_state(state_hash.0);
934    let state_view = protocol_state_view(&protocol_state).map_err(|e| format!("{:?}", e))?;
935
936    let empty_local_state = LocalState::empty();
937
938    let coinbase = match &transaction {
939        Transaction::Coinbase(coinbase) => Some(coinbase.clone()),
940        _ => None,
941    };
942    // Keep the error, in OCaml it is throwned later
943    let fee_excess_with_err = transaction.fee_excess();
944
945    let (target_first_pass_merkle_root, target_second_pass_merkle_root, supply_increase) = {
946        let mut first_pass_ledger_witness = first_pass_ledger_witness.copy_content();
947        let partially_applied_transaction = apply_transaction_first_pass(
948            constraint_constants,
949            *block_global_slot,
950            &state_view,
951            &mut first_pass_ledger_witness,
952            &transaction,
953        )?;
954
955        let mut second_pass_ledger_witness = second_pass_ledger_witness.copy_content();
956        let applied_transaction = apply_transaction_second_pass(
957            constraint_constants,
958            &mut second_pass_ledger_witness,
959            partially_applied_transaction,
960        )?;
961
962        let target_first_pass_merkle_root = first_pass_ledger_witness.merkle_root();
963        let target_second_pass_merkle_root = second_pass_ledger_witness.merkle_root();
964
965        // TODO: `supply_increase` has no parameter
966        let supply_increase = applied_transaction.supply_increase(constraint_constants)?;
967
968        (
969            target_first_pass_merkle_root,
970            target_second_pass_merkle_root,
971            supply_increase,
972        )
973    };
974
975    let pending_coinbase_before = match init_stack {
976        transaction_snark::InitStack::Base(source) => source,
977        transaction_snark::InitStack::Merge => {
978            return Err(
979                "Invalid init stack in Pending coinbase stack state . Expected Base found Merge"
980                    .to_string(),
981            );
982        }
983    };
984
985    let pending_coinbase_after = {
986        let state_body_hash = state_hash.1;
987
988        let pending_coinbase_with_state =
989            pending_coinbase_before.push_state(state_body_hash, *block_global_slot);
990
991        match coinbase {
992            Some(cb) => pending_coinbase_with_state.push_coinbase(cb),
993            None => pending_coinbase_with_state,
994        }
995    };
996
997    let fee_excess = fee_excess_with_err?;
998
999    Ok(Statement {
1000        source: Registers {
1001            first_pass_ledger: source_first_pass_merkle_root,
1002            second_pass_ledger: source_second_pass_merkle_root,
1003            pending_coinbase_stack: statement.source.pending_coinbase_stack.clone(),
1004            local_state: empty_local_state.clone(),
1005        },
1006        target: Registers {
1007            first_pass_ledger: target_first_pass_merkle_root,
1008            second_pass_ledger: target_second_pass_merkle_root,
1009            pending_coinbase_stack: pending_coinbase_after,
1010            local_state: empty_local_state,
1011        },
1012        connecting_ledger_left: connecting_merkle_root,
1013        connecting_ledger_right: connecting_merkle_root,
1014        supply_increase,
1015        fee_excess,
1016        sok_digest: (),
1017    })
1018}
1019
1020fn completed_work_to_scanable_work(
1021    job: AvailableJob,
1022    (fee, current_proof, prover): (Fee, LedgerProof, CompressedPubKey),
1023) -> Result<Arc<LedgerProofWithSokMessage>, String> {
1024    use super::parallel_scan::AvailableJob::{Base, Merge};
1025
1026    let sok_digest = current_proof.0.statement.sok_digest;
1027
1028    let proof = &current_proof.0.proof;
1029
1030    match job {
1031        Base(t) => {
1032            let TransactionWithWitness { statement, .. } = t.as_ref();
1033            let ledger_proof = LedgerProof::create(statement.clone(), sok_digest, proof.clone());
1034            let sok_message = SokMessage::create(fee, prover);
1035
1036            Ok(Arc::new(LedgerProofWithSokMessage {
1037                proof: ledger_proof,
1038                sok_message,
1039            }))
1040        }
1041        Merge {
1042            left: proof1,
1043            right: proof2,
1044        } => {
1045            let s1 = proof1.proof.statement();
1046            let s2 = proof2.proof.statement();
1047
1048            let statement = s1.merge(&s2)?;
1049
1050            let ledger_proof = LedgerProof::create(statement, sok_digest, proof.clone());
1051            let sok_message = SokMessage::create(fee, prover);
1052
1053            Ok(Arc::new(LedgerProofWithSokMessage {
1054                proof: ledger_proof,
1055                sok_message,
1056            }))
1057        }
1058    }
1059}
1060
1061fn total_proofs(works: &[transaction_snark::work::Work]) -> usize {
1062    works.iter().map(|work| work.proofs.len()).sum()
1063}
1064
1065pub enum StatementCheck<F: Fn(Fp) -> MinaStateProtocolStateValueStableV2> {
1066    Partial,
1067    Full(F),
1068}
1069
1070impl ScanState {
1071    pub fn scan_statement<F>(
1072        &self,
1073        constraint_constants: &ConstraintConstants,
1074        statement_check: StatementCheck<F>,
1075        verifier: &Verifier,
1076    ) -> Result<Statement<()>, String>
1077    where
1078        F: Fn(Fp) -> MinaStateProtocolStateValueStableV2,
1079    {
1080        struct Acc(Option<(Statement<()>, Vec<Arc<LedgerProofWithSokMessage>>)>);
1081
1082        let merge_acc = |mut proofs: Vec<Arc<LedgerProofWithSokMessage>>,
1083                         acc: Acc,
1084                         s2: &Statement<()>|
1085         -> Result<Acc, String> {
1086            match acc.0 {
1087                None => Ok(Acc(Some((s2.clone(), proofs)))),
1088                Some((s1, mut ps)) => {
1089                    let merged_statement = s1.merge(s2)?;
1090                    proofs.append(&mut ps);
1091                    Ok(Acc(Some((merged_statement, proofs))))
1092                }
1093            }
1094        };
1095
1096        let merge_pc = |acc: Option<Statement<()>>,
1097                        s2: &Statement<()>|
1098         -> Result<Option<Statement<()>>, String> {
1099            match acc {
1100                None => Ok(Some(s2.clone())),
1101                Some(s1) => {
1102                    if !pending_coinbase::Stack::connected(
1103                        &s1.target.pending_coinbase_stack,
1104                        &s2.source.pending_coinbase_stack,
1105                        Some(&s1.source.pending_coinbase_stack),
1106                    ) {
1107                        return Err(format!(
1108                            "Base merge proof: invalid pending coinbase \
1109                             transition s1: {:?} s2: {:?}",
1110                            s1, s2
1111                        ));
1112                    }
1113                    Ok(Some(s2.clone()))
1114                }
1115            }
1116        };
1117
1118        let fold_step_a = |(acc_statement, acc_pc): (Acc, Option<Statement<()>>),
1119                           job: &merge::Job<Arc<LedgerProofWithSokMessage>>|
1120         -> Result<(Acc, Option<Statement<()>>), String> {
1121            use merge::{
1122                Job::{Empty, Full, Part},
1123                Record,
1124            };
1125            use JobStatus::Done;
1126
1127            match job {
1128                Part(ref ledger) => {
1129                    let LedgerProofWithSokMessage { proof, .. } = ledger.as_ref();
1130                    let statement = proof.statement();
1131                    let acc_stmt = merge_acc(vec![ledger.clone()], acc_statement, &statement)?;
1132                    Ok((acc_stmt, acc_pc))
1133                }
1134                Empty | Full(Record { state: Done, .. }) => Ok((acc_statement, acc_pc)),
1135                Full(Record { left, right, .. }) => {
1136                    let LedgerProofWithSokMessage { proof: proof1, .. } = left.as_ref();
1137                    let LedgerProofWithSokMessage { proof: proof2, .. } = right.as_ref();
1138
1139                    let stmt1 = proof1.statement();
1140                    let stmt2 = proof2.statement();
1141                    let merged_statement = stmt1.merge(&stmt2)?;
1142
1143                    let acc_stmt = merge_acc(
1144                        vec![left.clone(), right.clone()],
1145                        acc_statement,
1146                        &merged_statement,
1147                    )?;
1148
1149                    Ok((acc_stmt, acc_pc))
1150                }
1151            }
1152        };
1153
1154        let check_base = |(acc_statement, acc_pc), transaction: &TransactionWithWitness| {
1155            use StatementCheck::{Full, Partial};
1156
1157            let expected_statement = match &statement_check {
1158                Full(get_state) => create_expected_statement(
1159                    constraint_constants,
1160                    get_state,
1161                    transaction.statement.connecting_ledger_left,
1162                    transaction,
1163                )?,
1164                Partial => transaction.statement.clone(),
1165            };
1166
1167            if transaction.statement == expected_statement {
1168                let acc_stmt = merge_acc(Vec::new(), acc_statement, &transaction.statement)?;
1169                let acc_pc = merge_pc(acc_pc, &transaction.statement)?;
1170
1171                Ok((acc_stmt, acc_pc))
1172            } else {
1173                Err(format!(
1174                    "Bad base statement expected: {:#?} got: {:#?}",
1175                    transaction.statement, expected_statement
1176                ))
1177            }
1178        };
1179
1180        let fold_step_d = |(acc_statement, acc_pc): (Acc, Option<Statement<()>>),
1181                           job: &base::Job<Arc<TransactionWithWitness>>|
1182         -> Result<(Acc, Option<Statement<()>>), String> {
1183            use base::{
1184                Job::{Empty, Full},
1185                Record,
1186            };
1187            use JobStatus::Done;
1188
1189            match job {
1190                Empty => Ok((acc_statement, acc_pc)),
1191                Full(Record {
1192                    state: Done,
1193                    job: transaction,
1194                    ..
1195                }) => {
1196                    let acc_pc = merge_pc(acc_pc, &transaction.statement)?;
1197                    Ok((acc_statement, acc_pc))
1198                }
1199                Full(Record {
1200                    job: transaction, ..
1201                }) => check_base((acc_statement, acc_pc), transaction),
1202            }
1203        };
1204
1205        let res = self.scan_state.fold_chronological_until_err(
1206            (Acc(None), None),
1207            |acc, merge::Merge { weight: _, job }| fold_step_a(acc, job),
1208            |acc, base::Base { weight: _, job }| fold_step_d(acc, job),
1209            |v| v,
1210        )?;
1211
1212        match res {
1213            (Acc(None), _) => Err("Empty".to_string()),
1214            (Acc(Some((res, proofs))), _) => match verifier.verify(proofs.as_slice()) {
1215                Ok(Ok(())) => Ok(res),
1216                Ok(Err(e)) => Err(format!("Verifier issue {:?}", e)),
1217                Err(e) => Err(e),
1218            },
1219        }
1220    }
1221
1222    pub fn check_invariants<F>(
1223        &self,
1224        constraint_constants: &ConstraintConstants,
1225        statement_check: StatementCheck<F>,
1226        verifier: &Verifier,
1227        _error_prefix: &'static str,
1228        _last_proof_statement: Option<Statement<()>>,
1229        _registers_end: Registers,
1230    ) -> Result<(), String>
1231    where
1232        F: Fn(Fp) -> MinaStateProtocolStateValueStableV2,
1233    {
1234        // TODO: OCaml does much more than this (pretty printing error)
1235        match self.scan_statement(constraint_constants, statement_check, verifier) {
1236            Ok(_) => Ok(()),
1237            Err(s) => Err(s),
1238        }
1239    }
1240
1241    pub fn statement_of_job(job: &AvailableJob) -> Option<Statement<()>> {
1242        use super::parallel_scan::AvailableJob::{Base, Merge};
1243
1244        match job {
1245            Base(t) => {
1246                let TransactionWithWitness { statement, .. } = t.as_ref();
1247                Some(statement.clone())
1248            }
1249            Merge { left, right } => {
1250                let LedgerProofWithSokMessage { proof: p1, .. } = left.as_ref();
1251                let LedgerProofWithSokMessage { proof: p2, .. } = right.as_ref();
1252
1253                p1.statement().merge(&p2.statement()).ok()
1254            }
1255        }
1256    }
1257
1258    fn create(work_delay: u64, transaction_capacity_log_2: u64) -> Self {
1259        let k = 2u64.pow(transaction_capacity_log_2 as u32);
1260
1261        Self {
1262            scan_state: ParallelScan::empty(k, work_delay),
1263            previous_incomplete_zkapp_updates: (
1264                Vec::with_capacity(1024),
1265                BorderBlockContinuedInTheNextTree(false),
1266            ),
1267        }
1268    }
1269
1270    pub fn empty(constraint_constants: &ConstraintConstants) -> Self {
1271        let work_delay = constraint_constants.work_delay;
1272        let transaction_capacity_log_2 = constraint_constants.transaction_capacity_log_2;
1273
1274        Self::create(work_delay, transaction_capacity_log_2)
1275    }
1276
1277    fn extract_txn_and_global_slot(
1278        txn_with_witness: &TransactionWithWitness,
1279    ) -> (WithStatus<Transaction>, Fp, Slot) {
1280        let txn = txn_with_witness.transaction_with_info.transaction();
1281
1282        let state_hash = txn_with_witness.state_hash.0;
1283        let global_slot = txn_with_witness.block_global_slot;
1284        (txn, state_hash, global_slot)
1285    }
1286
1287    fn latest_ledger_proof_impl(
1288        &self,
1289    ) -> Option<(
1290        &LedgerProofWithSokMessage,
1291        Vec<TransactionsOrdered<Arc<TransactionWithWitness>>>,
1292    )> {
1293        let (proof, txns_with_witnesses) = self.scan_state.last_emitted_value()?;
1294
1295        let (previous_incomplete, BorderBlockContinuedInTheNextTree(continued_in_next_tree)) =
1296            self.previous_incomplete_zkapp_updates.clone();
1297
1298        let txns = {
1299            if continued_in_next_tree {
1300                TransactionsOrdered::first_and_second_pass_transactions_per_tree(
1301                    previous_incomplete,
1302                    txns_with_witnesses.clone(),
1303                )
1304            } else {
1305                let mut txns = TransactionsOrdered::first_and_second_pass_transactions_per_tree(
1306                    vec![],
1307                    txns_with_witnesses.clone(),
1308                );
1309
1310                if previous_incomplete.is_empty() {
1311                    txns
1312                } else {
1313                    txns.insert(
1314                        0,
1315                        TransactionsOrdered {
1316                            first_pass: vec![],
1317                            second_pass: vec![],
1318                            previous_incomplete,
1319                            current_incomplete: vec![],
1320                        },
1321                    );
1322                    txns
1323                }
1324            }
1325        };
1326
1327        Some((proof, txns))
1328    }
1329
1330    pub fn latest_ledger_proof(
1331        &self,
1332    ) -> Option<(
1333        &LedgerProofWithSokMessage,
1334        Vec<TransactionsOrdered<(WithStatus<Transaction>, Fp, Slot)>>,
1335    )> {
1336        self.latest_ledger_proof_impl().map(|(p, txns)| {
1337            let txns = txns
1338                .into_iter()
1339                .map(|ordered| ordered.map(|t| Self::extract_txn_and_global_slot(t.as_ref())))
1340                .collect::<Vec<_>>();
1341
1342            (p, txns)
1343        })
1344    }
1345
1346    fn incomplete_txns_from_recent_proof_tree(
1347        &self,
1348    ) -> Option<(
1349        LedgerProofWithSokMessage,
1350        (
1351            Vec<Arc<TransactionWithWitness>>,
1352            BorderBlockContinuedInTheNextTree,
1353        ),
1354    )> {
1355        let (proof, txns_per_block) = self.latest_ledger_proof_impl()?;
1356
1357        let txns = match txns_per_block.last() {
1358            None => (vec![], BorderBlockContinuedInTheNextTree(false)),
1359            Some(txns_in_last_block) => {
1360                // First pass ledger is considered as the snarked ledger, so any
1361                // account update whether completed in the same tree or not
1362                // should be included in the next tree
1363
1364                if !txns_in_last_block.second_pass.is_empty() {
1365                    (
1366                        txns_in_last_block.second_pass.clone(),
1367                        BorderBlockContinuedInTheNextTree(false),
1368                    )
1369                } else {
1370                    (
1371                        txns_in_last_block.current_incomplete.clone(),
1372                        BorderBlockContinuedInTheNextTree(true),
1373                    )
1374                }
1375            }
1376        };
1377
1378        Some((proof.clone(), txns))
1379    }
1380
1381    fn staged_transactions(&self) -> Vec<TransactionsOrdered<Arc<TransactionWithWitness>>> {
1382        let (previous_incomplete, BorderBlockContinuedInTheNextTree(continued_in_next_tree)) =
1383            match self.incomplete_txns_from_recent_proof_tree() {
1384                Some((_proof, v)) => v,
1385                None => (vec![], BorderBlockContinuedInTheNextTree(false)),
1386            };
1387
1388        let txns = {
1389            if continued_in_next_tree {
1390                TransactionsOrdered::first_and_second_pass_transactions_per_forest(
1391                    self.scan_state.pending_data(),
1392                    previous_incomplete,
1393                )
1394            } else {
1395                let mut txns = TransactionsOrdered::first_and_second_pass_transactions_per_forest(
1396                    self.scan_state.pending_data(),
1397                    vec![],
1398                );
1399
1400                if previous_incomplete.is_empty() {
1401                    txns
1402                } else {
1403                    txns.insert(
1404                        0,
1405                        vec![TransactionsOrdered {
1406                            first_pass: vec![],
1407                            second_pass: vec![],
1408                            previous_incomplete,
1409                            current_incomplete: vec![],
1410                        }],
1411                    );
1412                    txns
1413                }
1414            }
1415        };
1416
1417        txns.into_iter().flatten().collect::<Vec<_>>()
1418    }
1419
1420    /// All the transactions in the order in which they were applied along with
1421    /// the parent protocol state of the blocks that contained them
1422    fn staged_transactions_with_state_hash(
1423        &self,
1424    ) -> Vec<TransactionsOrdered<(WithStatus<Transaction>, Fp, Slot)>> {
1425        self.staged_transactions()
1426            .into_iter()
1427            .map(|ordered| ordered.map(|t| Self::extract_txn_and_global_slot(t.as_ref())))
1428            .collect::<Vec<_>>()
1429    }
1430
1431    fn apply_ordered_txns_stepwise<L, F, ApplyFirst, ApplySecond, ApplyFirstSparse>(
1432        stop_at_first_pass: Option<bool>,
1433        ordered_txns: Vec<TransactionsOrdered<Arc<TransactionWithWitness>>>,
1434        ledger: &mut L,
1435        get_protocol_state: F,
1436        apply_first_pass: ApplyFirst,
1437        apply_second_pass: &ApplySecond,
1438        apply_first_pass_sparse_ledger: ApplyFirstSparse,
1439    ) -> Result<Pass, String>
1440    where
1441        L: LedgerNonSnark,
1442        F: Fn(Fp) -> Result<MinaStateProtocolStateValueStableV2, String>,
1443        ApplyFirst: Fn(
1444            Slot,
1445            &ProtocolStateView,
1446            &mut L,
1447            &Transaction,
1448        ) -> Result<TransactionPartiallyApplied<L>, String>,
1449        ApplySecond:
1450            Fn(&mut L, TransactionPartiallyApplied<L>) -> Result<TransactionApplied, String>,
1451        ApplyFirstSparse: Fn(
1452            Slot,
1453            &ProtocolStateView,
1454            &mut SparseLedger,
1455            &Transaction,
1456        ) -> Result<TransactionPartiallyApplied<SparseLedger>, String>,
1457    {
1458        let mut ledger_mut = ledger.clone();
1459        let stop_at_first_pass = stop_at_first_pass.unwrap_or(false);
1460
1461        #[derive(Clone)]
1462        enum PreviousIncompleteTxns<L: LedgerNonSnark> {
1463            Unapplied(Vec<Arc<TransactionWithWitness>>),
1464            PartiallyApplied(Vec<(TransactionStatus, TransactionPartiallyApplied<L>)>),
1465        }
1466
1467        fn apply<L, F, Apply>(
1468            apply: Apply,
1469            ledger: &mut L,
1470            tx: &Transaction,
1471            state_hash: Fp,
1472            block_global_slot: Slot,
1473            get_protocol_state: F,
1474        ) -> Result<TransactionPartiallyApplied<L>, String>
1475        where
1476            L: LedgerNonSnark,
1477            F: Fn(Fp) -> Result<MinaStateProtocolStateValueStableV2, String>,
1478            Apply: Fn(
1479                Slot,
1480                &ProtocolStateView,
1481                &mut L,
1482                &Transaction,
1483            ) -> Result<TransactionPartiallyApplied<L>, String>,
1484        {
1485            match get_protocol_state(state_hash) {
1486                Ok(state) => {
1487                    let txn_state_view =
1488                        protocol_state_view(&state).map_err(|e| format!("{:?}", e))?;
1489                    apply(block_global_slot, &txn_state_view, ledger, tx)
1490                }
1491                Err(e) => Err(format!(
1492                    "Coudln't find protocol state with hash {:?}: {}",
1493                    state_hash, e
1494                )),
1495            }
1496        }
1497
1498        // let apply = |apply: ApplyFirst, ledger: &mut L, tx: &Transaction, state_hash: Fp, block_global_slot: Slot| {
1499        //     match get_protocol_state(state_hash) {
1500        //         Ok(state) => {
1501        //             let txn_state_view = protocol_state_view(&state);
1502        //             apply(block_global_slot, &txn_state_view, ledger, tx)
1503        //         },
1504        //         Err(e) => {
1505        //             Err(format!("Coudln't find protocol state with hash {:?}: {}", state_hash, e))
1506        //         },
1507        //     }
1508        // };
1509
1510        type Acc<L> = Vec<(TransactionStatus, TransactionPartiallyApplied<L>)>;
1511
1512        let apply_txns_first_pass = |mut acc: Acc<L>,
1513                                     txns: Vec<Arc<TransactionWithWitness>>|
1514         -> Result<(Pass, Acc<L>), String> {
1515            let mut ledger = ledger.clone();
1516
1517            for txn in txns {
1518                let (transaction, state_hash, block_global_slot) =
1519                    Self::extract_txn_and_global_slot(txn.as_ref());
1520                let expected_status = transaction.status;
1521
1522                let partially_applied_txn = apply(
1523                    &apply_first_pass,
1524                    &mut ledger,
1525                    &transaction.data,
1526                    state_hash,
1527                    block_global_slot,
1528                    &get_protocol_state,
1529                )?;
1530
1531                acc.push((expected_status, partially_applied_txn));
1532            }
1533
1534            Ok((Pass::FirstPassLedgerHash(ledger.merkle_root()), acc))
1535        };
1536
1537        fn apply_txns_second_pass<L, ApplySecond>(
1538            partially_applied_txns: Acc<L>,
1539            mut ledger: L,
1540            apply_second_pass: ApplySecond,
1541        ) -> Result<(), String>
1542        where
1543            L: LedgerNonSnark,
1544            ApplySecond:
1545                Fn(&mut L, TransactionPartiallyApplied<L>) -> Result<TransactionApplied, String>,
1546        {
1547            for (expected_status, partially_applied_txn) in partially_applied_txns {
1548                let res = apply_second_pass(&mut ledger, partially_applied_txn)?;
1549                let status = res.transaction_status();
1550
1551                if &expected_status != status {
1552                    return Err(format!(
1553                        "Transaction produced unxpected application status.\
1554                                        Expected {:#?}\
1555                                        Got: {:#?}\
1556                                        Transaction: {:#?}",
1557                        expected_status, status, "TODO"
1558                    ));
1559                    // Transaction: {:#?}", expected_status, status, partially_applied_txn));
1560                }
1561            }
1562
1563            Ok(())
1564        }
1565
1566        fn apply_previous_incomplete_txns<R, L, F, ApplyFirstSparse, ApplySecondPass>(
1567            txns: PreviousIncompleteTxns<L>,
1568            // k: Box<dyn FnOnce() -> Result<Pass, String>>,
1569            ledger: L,
1570            get_protocol_state: F,
1571            apply_first_pass_sparse_ledger: ApplyFirstSparse,
1572            apply_txns_second_pass: ApplySecondPass,
1573        ) -> Result<R, String>
1574        where
1575            L: LedgerNonSnark,
1576            F: Fn(Fp) -> Result<MinaStateProtocolStateValueStableV2, String>,
1577            ApplySecondPass: Fn(Acc<L>) -> Result<R, String>,
1578            ApplyFirstSparse: Fn(
1579                Slot,
1580                &ProtocolStateView,
1581                &mut SparseLedger,
1582                &Transaction,
1583            )
1584                -> Result<TransactionPartiallyApplied<SparseLedger>, String>,
1585        {
1586            // Note: Previous incomplete transactions refer to the block's transactions
1587            // from previous scan state tree that were split between the two trees.
1588            // The set in the previous tree have gone through the first pass. For the
1589            // second pass that is to happen after the rest of the set goes through the
1590            // first pass, we need partially applied state - result of previous tree's
1591            // transactions' first pass. To generate the partial state, we do a a first
1592            // pass application of previous tree's transaction on a sparse ledger created
1593            // from witnesses stored in the scan state and then use it to apply to the
1594            // ledger here
1595
1596            let inject_ledger_info =
1597                |partially_applied_txn: TransactionPartiallyApplied<SparseLedger>| {
1598                    use TransactionPartiallyApplied as P;
1599
1600                    match partially_applied_txn {
1601                        P::ZkappCommand(zkapp) => {
1602                            let original_first_pass_account_states = zkapp
1603                                .original_first_pass_account_states
1604                                .into_iter()
1605                                .map(|(id, loc_opt)| match loc_opt {
1606                                    None => Ok((id, None)),
1607                                    Some((_sparse_ledger_loc, account)) => {
1608                                        match ledger.location_of_account(&id) {
1609                                            Some(loc) => Ok((id, Some((loc, account)))),
1610                                            None => Err(
1611                                                "Original accounts states from partially applied \
1612                                                         transactions don't exist in the ledger",
1613                                            ),
1614                                        }
1615                                    }
1616                                })
1617                                .collect::<Result<Vec<_>, &'static str>>()
1618                                .unwrap(); // TODO
1619
1620                            let global_state = GlobalState {
1621                                first_pass_ledger: ledger.clone(),
1622                                second_pass_ledger: ledger.clone(),
1623                                fee_excess: zkapp.global_state.fee_excess,
1624                                supply_increase: zkapp.global_state.supply_increase,
1625                                protocol_state: zkapp.global_state.protocol_state,
1626                                block_global_slot: zkapp.global_state.block_global_slot,
1627                            };
1628
1629                            let local_state = LocalStateEnv::<L> {
1630                                stack_frame: zkapp.local_state.stack_frame,
1631                                call_stack: zkapp.local_state.call_stack,
1632                                transaction_commitment: zkapp.local_state.transaction_commitment,
1633                                full_transaction_commitment: zkapp
1634                                    .local_state
1635                                    .full_transaction_commitment,
1636                                excess: zkapp.local_state.excess,
1637                                supply_increase: zkapp.local_state.supply_increase,
1638                                ledger: ledger.clone(),
1639                                success: zkapp.local_state.success,
1640                                account_update_index: zkapp.local_state.account_update_index,
1641                                failure_status_tbl: zkapp.local_state.failure_status_tbl,
1642                                will_succeed: zkapp.local_state.will_succeed,
1643                            };
1644
1645                            TransactionPartiallyApplied::ZkappCommand(Box::new(
1646                                ZkappCommandPartiallyApplied {
1647                                    command: zkapp.command,
1648                                    previous_hash: zkapp.previous_hash,
1649                                    original_first_pass_account_states,
1650                                    constraint_constants: zkapp.constraint_constants,
1651                                    state_view: zkapp.state_view,
1652                                    global_state,
1653                                    local_state,
1654                                },
1655                            ))
1656                        }
1657                        P::SignedCommand(c) => P::SignedCommand(c),
1658                        P::FeeTransfer(ft) => P::FeeTransfer(ft),
1659                        P::Coinbase(cb) => P::Coinbase(cb),
1660                    }
1661                };
1662
1663            let apply_txns_to_witnesses_first_pass = |txns: Vec<Arc<TransactionWithWitness>>| {
1664                let acc = txns
1665                    .into_iter()
1666                    .map(|txn| {
1667                        let mut first_pass_ledger_witness =
1668                            txn.first_pass_ledger_witness.copy_content();
1669
1670                        let (transaction, state_hash, block_global_slot) =
1671                            ScanState::extract_txn_and_global_slot(txn.as_ref());
1672                        let expected_status = transaction.status.clone();
1673
1674                        let partially_applied_txn = apply(
1675                            &apply_first_pass_sparse_ledger,
1676                            &mut first_pass_ledger_witness,
1677                            &transaction.data,
1678                            state_hash,
1679                            block_global_slot,
1680                            &get_protocol_state,
1681                        )?;
1682
1683                        let partially_applied_txn = inject_ledger_info(partially_applied_txn);
1684
1685                        Ok((expected_status, partially_applied_txn))
1686                    })
1687                    .collect::<Result<Vec<_>, String>>()?;
1688
1689                Ok::<Acc<L>, String>(acc)
1690            };
1691
1692            use PreviousIncompleteTxns::{PartiallyApplied, Unapplied};
1693
1694            match txns {
1695                Unapplied(txns) => {
1696                    let partially_applied_txns = apply_txns_to_witnesses_first_pass(txns)?;
1697                    apply_txns_second_pass(partially_applied_txns)
1698                }
1699                PartiallyApplied(partially_applied_txns) => {
1700                    apply_txns_second_pass(partially_applied_txns)
1701                }
1702            }
1703        }
1704
1705        fn apply_txns<'a, L>(
1706            mut previous_incomplete: PreviousIncompleteTxns<L>,
1707            ordered_txns: Vec<TransactionsOrdered<Arc<TransactionWithWitness>>>,
1708            mut first_pass_ledger_hash: Pass,
1709            stop_at_first_pass: bool,
1710            apply_previous_incomplete_txns: &'a impl Fn(PreviousIncompleteTxns<L>) -> Result<(), String>,
1711            apply_txns_first_pass: &'a impl Fn(
1712                Acc<L>,
1713                Vec<Arc<TransactionWithWitness>>,
1714            ) -> Result<(Pass, Acc<L>), String>,
1715            apply_txns_second_pass: &'a impl Fn(Acc<L>) -> Result<(), String>,
1716        ) -> Result<Pass, String>
1717        where
1718            L: LedgerNonSnark,
1719        {
1720            use PreviousIncompleteTxns::{PartiallyApplied, Unapplied};
1721
1722            let mut ordered_txns = ordered_txns.into_iter().peekable();
1723
1724            let update_previous_incomplete = |previous_incomplete: PreviousIncompleteTxns<L>| {
1725                // filter out any non-zkapp transactions for second pass application
1726                match previous_incomplete {
1727                    Unapplied(txns) => Unapplied(
1728                        txns.into_iter()
1729                            .filter(|txn| {
1730                                use crate::scan_state::transaction_logic::transaction_applied::{
1731                                    CommandApplied::ZkappCommand, Varying::Command,
1732                                };
1733
1734                                matches!(
1735                                    &txn.transaction_with_info.varying,
1736                                    Command(ZkappCommand(_))
1737                                )
1738                            })
1739                            .collect(),
1740                    ),
1741                    PartiallyApplied(txns) => PartiallyApplied(
1742                        txns.into_iter()
1743                            .filter(|(_, txn)| {
1744                                matches!(&txn, TransactionPartiallyApplied::ZkappCommand(_))
1745                            })
1746                            .collect(),
1747                    ),
1748                }
1749            };
1750
1751            while let Some(txns_per_block) = ordered_txns.next() {
1752                let is_last = ordered_txns.peek().is_none();
1753
1754                previous_incomplete = update_previous_incomplete(previous_incomplete);
1755
1756                if is_last && stop_at_first_pass {
1757                    // Last block; don't apply second pass.
1758                    // This is for snarked ledgers which are first pass ledgers
1759
1760                    let (res_first_pass_ledger_hash, _) =
1761                        apply_txns_first_pass(Vec::with_capacity(256), txns_per_block.first_pass)?;
1762
1763                    first_pass_ledger_hash = res_first_pass_ledger_hash;
1764
1765                    // Skip previous_incomplete: If there are previous_incomplete txns
1766                    // then there’d be at least two sets of txns_per_block and the
1767                    // previous_incomplete txns will be applied when processing the first
1768                    // set. The subsequent sets shouldn’t have any previous-incomplete.
1769                    previous_incomplete = Unapplied(vec![]);
1770                    break;
1771                }
1772
1773                let current_incomplete_is_empty = txns_per_block.current_incomplete.is_empty();
1774
1775                // Apply first pass of a blocks transactions either new or
1776                // continued from previous tree
1777                let (res_first_pass_ledger_hash, partially_applied_txns) =
1778                    apply_txns_first_pass(Vec::with_capacity(256), txns_per_block.first_pass)?;
1779
1780                first_pass_ledger_hash = res_first_pass_ledger_hash;
1781
1782                let previous_not_empty = match &previous_incomplete {
1783                    Unapplied(txns) => !txns.is_empty(),
1784                    PartiallyApplied(txns) => !txns.is_empty(),
1785                };
1786
1787                // Apply second pass of previous tree's transactions, if any
1788                apply_previous_incomplete_txns(previous_incomplete)?;
1789
1790                let continue_previous_tree_s_txns = {
1791                    // If this is a continuation from previous tree for
1792                    // the same block (incomplete txns in both sets) then
1793                    // do second pass now
1794
1795                    previous_not_empty && !current_incomplete_is_empty
1796                };
1797
1798                let do_second_pass = {
1799                    // if transactions completed in the same tree; do second pass now
1800                    (!txns_per_block.second_pass.is_empty()) || continue_previous_tree_s_txns
1801                };
1802
1803                if do_second_pass {
1804                    apply_txns_second_pass(partially_applied_txns)?;
1805                    previous_incomplete = Unapplied(vec![]);
1806                } else {
1807                    // Transactions not completed in this tree, so second pass after
1808                    // first pass of remaining transactions for the same block
1809                    // in the next tree
1810                    previous_incomplete = PartiallyApplied(partially_applied_txns);
1811                }
1812            }
1813
1814            previous_incomplete = update_previous_incomplete(previous_incomplete);
1815
1816            apply_previous_incomplete_txns(previous_incomplete)?;
1817
1818            Ok(first_pass_ledger_hash)
1819        }
1820
1821        let previous_incomplete = match ordered_txns.first() {
1822            None => PreviousIncompleteTxns::<L>::Unapplied(vec![]),
1823            Some(first_block) => {
1824                PreviousIncompleteTxns::Unapplied(first_block.previous_incomplete.clone())
1825            }
1826        };
1827
1828        // Assuming this function is called on snarked ledger and snarked
1829        // ledger is the first pass ledger
1830        let first_pass_ledger_hash = Pass::FirstPassLedgerHash(ledger_mut.merkle_root());
1831
1832        apply_txns(
1833            previous_incomplete,
1834            ordered_txns,
1835            first_pass_ledger_hash,
1836            stop_at_first_pass,
1837            &|txns| {
1838                apply_previous_incomplete_txns(
1839                    txns,
1840                    ledger.clone(),
1841                    &get_protocol_state,
1842                    &apply_first_pass_sparse_ledger,
1843                    |partially_applied_txns| {
1844                        apply_txns_second_pass(
1845                            partially_applied_txns,
1846                            ledger.clone(),
1847                            apply_second_pass,
1848                        )
1849                    },
1850                )
1851            },
1852            &apply_txns_first_pass,
1853            &|partially_applied_txns| {
1854                apply_txns_second_pass(partially_applied_txns, ledger.clone(), apply_second_pass)
1855            }, // &apply_txns_second_pass,
1856        )
1857    }
1858
1859    fn apply_ordered_txns_sync<L, F, ApplyFirst, ApplySecond, ApplyFirstSparse>(
1860        stop_at_first_pass: Option<bool>,
1861        ordered_txns: Vec<TransactionsOrdered<Arc<TransactionWithWitness>>>,
1862        ledger: &mut L,
1863        get_protocol_state: F,
1864        apply_first_pass: ApplyFirst,
1865        apply_second_pass: ApplySecond,
1866        apply_first_pass_sparse_ledger: ApplyFirstSparse,
1867    ) -> Result<Pass, String>
1868    where
1869        L: LedgerNonSnark,
1870        F: Fn(Fp) -> Result<MinaStateProtocolStateValueStableV2, String>,
1871        ApplyFirst: Fn(
1872            Slot,
1873            &ProtocolStateView,
1874            &mut L,
1875            &Transaction,
1876        ) -> Result<TransactionPartiallyApplied<L>, String>,
1877        ApplySecond:
1878            Fn(&mut L, TransactionPartiallyApplied<L>) -> Result<TransactionApplied, String>,
1879        ApplyFirstSparse: Fn(
1880            Slot,
1881            &ProtocolStateView,
1882            &mut SparseLedger,
1883            &Transaction,
1884        ) -> Result<TransactionPartiallyApplied<SparseLedger>, String>,
1885    {
1886        Self::apply_ordered_txns_stepwise(
1887            stop_at_first_pass,
1888            ordered_txns,
1889            ledger,
1890            get_protocol_state,
1891            apply_first_pass,
1892            &apply_second_pass,
1893            apply_first_pass_sparse_ledger,
1894        )
1895    }
1896
1897    pub fn get_snarked_ledger_sync<L, F, ApplyFirst, ApplySecond, ApplyFirstSparse>(
1898        &self,
1899        ledger: &mut L,
1900        get_protocol_state: F,
1901        apply_first_pass: ApplyFirst,
1902        apply_second_pass: ApplySecond,
1903        apply_first_pass_sparse_ledger: ApplyFirstSparse,
1904    ) -> Result<Pass, String>
1905    where
1906        L: LedgerNonSnark,
1907        F: Fn(Fp) -> Result<MinaStateProtocolStateValueStableV2, String>,
1908        ApplyFirst: Fn(
1909            Slot,
1910            &ProtocolStateView,
1911            &mut L,
1912            &Transaction,
1913        ) -> Result<TransactionPartiallyApplied<L>, String>,
1914        ApplySecond:
1915            Fn(&mut L, TransactionPartiallyApplied<L>) -> Result<TransactionApplied, String>,
1916        ApplyFirstSparse: Fn(
1917            Slot,
1918            &ProtocolStateView,
1919            &mut SparseLedger,
1920            &Transaction,
1921        ) -> Result<TransactionPartiallyApplied<SparseLedger>, String>,
1922    {
1923        match self.latest_ledger_proof_impl() {
1924            None => Err("No transactions found".to_string()),
1925            Some((_, txns_per_block)) => Self::apply_ordered_txns_sync(
1926                Some(true),
1927                txns_per_block,
1928                ledger,
1929                get_protocol_state,
1930                apply_first_pass,
1931                apply_second_pass,
1932                apply_first_pass_sparse_ledger,
1933            ),
1934        }
1935    }
1936
1937    pub fn get_staged_ledger_sync<L, F, ApplyFirst, ApplySecond, ApplyFirstSparse>(
1938        &self,
1939        ledger: &mut L,
1940        get_protocol_state: F,
1941        apply_first_pass: ApplyFirst,
1942        apply_second_pass: ApplySecond,
1943        apply_first_pass_sparse_ledger: ApplyFirstSparse,
1944    ) -> Result<Pass, String>
1945    where
1946        L: LedgerNonSnark,
1947        F: Fn(Fp) -> Result<MinaStateProtocolStateValueStableV2, String>,
1948        ApplyFirst: Fn(
1949            Slot,
1950            &ProtocolStateView,
1951            &mut L,
1952            &Transaction,
1953        ) -> Result<TransactionPartiallyApplied<L>, String>,
1954        ApplySecond:
1955            Fn(&mut L, TransactionPartiallyApplied<L>) -> Result<TransactionApplied, String>,
1956        ApplyFirstSparse: Fn(
1957            Slot,
1958            &ProtocolStateView,
1959            &mut SparseLedger,
1960            &Transaction,
1961        ) -> Result<TransactionPartiallyApplied<SparseLedger>, String>,
1962    {
1963        let staged_transactions_with_state_hash = self.staged_transactions();
1964        Self::apply_ordered_txns_sync(
1965            None,
1966            staged_transactions_with_state_hash,
1967            ledger,
1968            get_protocol_state,
1969            apply_first_pass,
1970            apply_second_pass,
1971            apply_first_pass_sparse_ledger,
1972        )
1973    }
1974
1975    pub fn free_space(&self) -> u64 {
1976        self.scan_state.free_space()
1977    }
1978
1979    fn all_jobs(&self) -> Vec<Vec<AvailableJob>> {
1980        self.scan_state.all_jobs()
1981    }
1982
1983    pub fn next_on_new_tree(&self) -> bool {
1984        self.scan_state.next_on_new_tree()
1985    }
1986
1987    pub fn base_jobs_on_latest_tree(&self) -> impl Iterator<Item = Arc<TransactionWithWitness>> {
1988        self.scan_state.base_jobs_on_latest_tree()
1989    }
1990
1991    pub fn base_jobs_on_earlier_tree(
1992        &self,
1993        index: usize,
1994    ) -> impl Iterator<Item = Arc<TransactionWithWitness>> {
1995        self.scan_state.base_jobs_on_earlier_tree(index)
1996    }
1997
1998    pub fn partition_if_overflowing(&self) -> SpacePartition {
1999        let bundle_count = |work_count: u64| work_count.div_ceil(2);
2000
2001        // slots: current tree space
2002        // job_count: work count on current tree
2003        let SpacePartition {
2004            first: (slots, job_count),
2005            second,
2006        } = self.scan_state.partition_if_overflowing();
2007
2008        SpacePartition {
2009            first: (slots, bundle_count(job_count)),
2010            second: second.map(|(slots, job_count)| (slots, bundle_count(job_count))),
2011        }
2012    }
2013
2014    fn extract_from_job(job: AvailableJob) -> Extracted {
2015        use super::parallel_scan::AvailableJob::{Base, Merge};
2016
2017        match job {
2018            Base(d) => Extracted::First {
2019                transaction_with_info: Box::new(d.transaction_with_info.to_owned()),
2020                statement: Box::new(d.statement.to_owned()),
2021                state_hash: Box::new(d.state_hash),
2022                first_pass_ledger_witness: d.first_pass_ledger_witness.to_owned(),
2023                second_pass_ledger_witness: d.second_pass_ledger_witness.to_owned(),
2024                init_stack: Box::new(d.init_stack.to_owned()),
2025                block_global_slot: d.block_global_slot,
2026            },
2027            Merge { left, right } => {
2028                let LedgerProofWithSokMessage { proof: p1, .. } = left.as_ref();
2029                let LedgerProofWithSokMessage { proof: p2, .. } = right.as_ref();
2030                Extracted::Second(Box::new((p1.clone(), p2.clone())))
2031            }
2032        }
2033    }
2034
2035    pub fn all_work_statements_exn(&self) -> Vec<transaction_snark::work::Statement> {
2036        let work_seqs = self.all_jobs();
2037
2038        let s = |job: &AvailableJob| Self::statement_of_job(job).unwrap();
2039
2040        work_seqs
2041            .iter()
2042            .flat_map(|work_seq| group_list(work_seq, s))
2043            .collect()
2044    }
2045
2046    fn required_work_pairs(&self, slots: u64) -> Vec<OneOrTwo<AvailableJob>> {
2047        let work_list = self.scan_state.jobs_for_slots(slots);
2048        work_list
2049            .iter()
2050            .flat_map(|works| group_list(works, |job| job.clone()))
2051            .collect()
2052    }
2053
2054    pub fn k_work_pairs_for_new_diff(&self, k: u64) -> Vec<OneOrTwo<AvailableJob>> {
2055        let work_list = self.scan_state.jobs_for_next_update();
2056        work_list
2057            .iter()
2058            .flat_map(|works| group_list(works, |job| job.clone()))
2059            .take(k as usize)
2060            .collect()
2061    }
2062
2063    // Always the same pairing of jobs
2064    pub fn work_statements_for_new_diff(&self) -> Vec<transaction_snark::work::Statement> {
2065        let work_list = self.scan_state.jobs_for_next_update();
2066
2067        let s = |job: &AvailableJob| Self::statement_of_job(job).unwrap();
2068
2069        work_list
2070            .iter()
2071            .flat_map(|works| group_list(works, s))
2072            .collect()
2073    }
2074
2075    pub fn all_job_pairs_iter(&self) -> impl Iterator<Item = OneOrTwo<AvailableJob>> {
2076        self.all_jobs().into_iter().flat_map(|jobs| {
2077            let mut iter = jobs.into_iter();
2078            std::iter::from_fn(move || {
2079                let one = iter.next()?;
2080                Some(match iter.next() {
2081                    None => OneOrTwo::One(one),
2082                    Some(two) => OneOrTwo::Two((one, two)),
2083                })
2084            })
2085        })
2086    }
2087
2088    pub fn all_job_pairs_iter2(&self) -> impl Iterator<Item = OneOrTwo<AvailableJob>> {
2089        self.all_jobs().into_iter().flat_map(|jobs| {
2090            let mut iter = jobs.into_iter();
2091            std::iter::from_fn(move || {
2092                let one = iter.next()?;
2093                Some(OneOrTwo::One(one))
2094                // Some(match iter.next() {
2095                //     None => OneOrTwo::One(one),
2096                //     Some(two) => OneOrTwo::Two((one, two)),
2097                // })
2098            })
2099        })
2100    }
2101
2102    pub fn all_work_pairs<F>(
2103        &self,
2104        get_state: F,
2105    ) -> Result<Vec<OneOrTwo<snark_work::spec::Work>>, String>
2106    where
2107        F: Fn(&Fp) -> &MinaStateProtocolStateValueStableV2,
2108    {
2109        let single_spec = |job: AvailableJob| match Self::extract_from_job(job) {
2110            Extracted::First {
2111                transaction_with_info,
2112                statement,
2113                state_hash,
2114                first_pass_ledger_witness,
2115                second_pass_ledger_witness,
2116                init_stack,
2117                block_global_slot,
2118            } => {
2119                let witness = {
2120                    let WithStatus {
2121                        data: transaction,
2122                        status,
2123                    } = transaction_with_info.transaction();
2124
2125                    let protocol_state_body = {
2126                        let state = get_state(&state_hash.0);
2127                        state.body.clone()
2128                    };
2129
2130                    let init_stack = match *init_stack {
2131                        InitStack::Base(x) => x,
2132                        InitStack::Merge => return Err("init_stack was Merge".to_string()),
2133                    };
2134
2135                    TransactionWitness {
2136                        transaction,
2137                        protocol_state_body,
2138                        init_stack,
2139                        status,
2140                        first_pass_ledger: first_pass_ledger_witness,
2141                        second_pass_ledger: second_pass_ledger_witness,
2142                        block_global_slot,
2143                    }
2144                };
2145
2146                Ok(snark_work::spec::Work::Transition((
2147                    statement,
2148                    Box::new(witness),
2149                )))
2150            }
2151            Extracted::Second(s) => {
2152                let merged = s.0.statement().merge(&s.1.statement())?;
2153                Ok(snark_work::spec::Work::Merge(Box::new((merged, s))))
2154            }
2155        };
2156
2157        self.all_job_pairs_iter()
2158            .map(|group| group.into_map_err(single_spec))
2159            .collect()
2160    }
2161
2162    pub fn all_work_pairs2<F>(&self, get_state: F) -> Vec<OneOrTwo<snark_work::spec::Work>>
2163    where
2164        F: Fn(&Fp) -> Option<MinaStateProtocolStateValueStableV2>,
2165    {
2166        let single_spec = |job: AvailableJob| match Self::extract_from_job(job) {
2167            Extracted::First {
2168                transaction_with_info,
2169                statement,
2170                state_hash,
2171                first_pass_ledger_witness,
2172                second_pass_ledger_witness,
2173                init_stack,
2174                block_global_slot,
2175            } => {
2176                let witness = {
2177                    let WithStatus {
2178                        data: transaction,
2179                        status,
2180                    } = transaction_with_info.transaction();
2181
2182                    let protocol_state_body = {
2183                        let state = get_state(&state_hash.0)?;
2184                        state.body.clone()
2185                    };
2186
2187                    let init_stack = match *init_stack {
2188                        InitStack::Base(x) => x,
2189                        InitStack::Merge => return None,
2190                    };
2191
2192                    TransactionWitness {
2193                        transaction,
2194                        protocol_state_body,
2195                        init_stack,
2196                        status,
2197                        first_pass_ledger: first_pass_ledger_witness,
2198                        second_pass_ledger: second_pass_ledger_witness,
2199                        block_global_slot,
2200                    }
2201                };
2202
2203                Some(snark_work::spec::Work::Transition((
2204                    statement,
2205                    Box::new(witness),
2206                )))
2207            }
2208            Extracted::Second(s) => {
2209                let merged = s.0.statement().merge(&s.1.statement()).unwrap();
2210                Some(snark_work::spec::Work::Merge(Box::new((merged, s))))
2211            }
2212        };
2213
2214        self.all_job_pairs_iter2()
2215            .filter_map(|group| group.into_map_some(single_spec))
2216            .collect()
2217    }
2218
2219    pub fn fill_work_and_enqueue_transactions(
2220        &mut self,
2221        transactions: Vec<Arc<TransactionWithWitness>>,
2222        work: Vec<transaction_snark::work::Unchecked>,
2223    ) -> Result<
2224        Option<(
2225            LedgerProof,
2226            Vec<TransactionsOrdered<(WithStatus<Transaction>, Fp, Slot)>>,
2227        )>,
2228        String,
2229    > {
2230        {
2231            use crate::scan_state::transaction_logic::transaction_applied::Varying::*;
2232
2233            println!("{} transactions added to scan state:", transactions.len());
2234            println!(
2235                "- num_fee_transfer={:?}",
2236                transactions
2237                    .iter()
2238                    .filter(|tx| matches!(tx.transaction_with_info.varying, FeeTransfer(_)))
2239                    .count()
2240            );
2241
2242            println!(
2243                "- num_coinbase={:?}",
2244                transactions
2245                    .iter()
2246                    .filter(|tx| matches!(tx.transaction_with_info.varying, Coinbase(_)))
2247                    .count()
2248            );
2249
2250            println!(
2251                "- num_user_command={:?}",
2252                transactions
2253                    .iter()
2254                    .filter(|tx| matches!(tx.transaction_with_info.varying, Command(_)))
2255                    .count()
2256            );
2257        }
2258
2259        let fill_in_transaction_snark_work = |works: Vec<transaction_snark::work::Work>| -> Result<
2260            Vec<Arc<LedgerProofWithSokMessage>>,
2261            String,
2262        > {
2263            let next_jobs = self
2264                .scan_state
2265                .jobs_for_next_update()
2266                .into_iter()
2267                .flatten()
2268                .take(total_proofs(&works));
2269
2270            let works = works.into_iter().flat_map(
2271                |transaction_snark::work::Work {
2272                     fee,
2273                     proofs,
2274                     prover,
2275                 }| {
2276                    proofs
2277                        .into_map(|proof| (fee, proof, prover.clone()))
2278                        .into_iter()
2279                },
2280            );
2281
2282            next_jobs
2283                .zip(works)
2284                .map(|(job, work)| completed_work_to_scanable_work(job, work))
2285                .collect()
2286        };
2287
2288        // get incomplete transactions from previous proof which will be completed in
2289        // the new proof, if there's one
2290        let old_proof_and_incomplete_zkapp_updates = self.incomplete_txns_from_recent_proof_tree();
2291        let work_list = fill_in_transaction_snark_work(work)?;
2292
2293        let proof_opt = self
2294            .scan_state
2295            .update(transactions, work_list, |base| {
2296                // TODO: This is for logs only, make it cleaner
2297                match base.transaction_with_info.varying {
2298                    super::transaction_logic::transaction_applied::Varying::Command(_) => 0,
2299                    super::transaction_logic::transaction_applied::Varying::FeeTransfer(_) => 1,
2300                    super::transaction_logic::transaction_applied::Varying::Coinbase(_) => 2,
2301                }
2302            })
2303            .unwrap();
2304
2305        match proof_opt {
2306            None => Ok(None),
2307            Some((pwsm, _txns_with_witnesses)) => {
2308                let LedgerProofWithSokMessage { proof, .. } = pwsm.as_ref();
2309                let curr_stmt = proof.statement();
2310
2311                let (prev_stmt, incomplete_zkapp_updates_from_old_proof) =
2312                    match old_proof_and_incomplete_zkapp_updates {
2313                        None => (
2314                            curr_stmt.clone(),
2315                            (vec![], BorderBlockContinuedInTheNextTree(false)),
2316                        ),
2317                        Some((proof_with_sok, incomplete_zkapp_updates_from_old_proof)) => {
2318                            let proof = &proof_with_sok.proof;
2319                            (proof.statement(), incomplete_zkapp_updates_from_old_proof)
2320                        }
2321                    };
2322
2323                // prev_target is connected to curr_source- Order of the arguments is
2324                // important here
2325                let stmts_connect = if prev_stmt == curr_stmt {
2326                    Ok(())
2327                } else {
2328                    prev_stmt.merge(&curr_stmt).map(|_| ())
2329                };
2330
2331                match stmts_connect {
2332                    Ok(()) => {
2333                        self.previous_incomplete_zkapp_updates =
2334                            incomplete_zkapp_updates_from_old_proof;
2335
2336                        // This block is for when there's a proof emitted so Option.
2337                        // value_exn is safe here
2338                        // [latest_ledger_proof] generates ordered transactions
2339                        // appropriately*)
2340                        let (proof_with_sok, txns) = self.latest_ledger_proof().unwrap();
2341
2342                        Ok(Some((proof_with_sok.proof.clone(), txns)))
2343                    }
2344                    Err(e) => Err(format!(
2345                        "The new final statement does not connect to the previous \
2346                                     proof's statement: {:?}",
2347                        e
2348                    )),
2349                }
2350            }
2351        }
2352    }
2353
2354    pub fn required_state_hashes(&self) -> HashSet<Fp> {
2355        self.staged_transactions()
2356            .into_iter()
2357            .fold(HashSet::with_capacity(256), |accum, txns| {
2358                txns.fold(accum, |mut accum, txn| {
2359                    accum.insert(txn.state_hash.0);
2360                    accum
2361                })
2362            })
2363    }
2364
2365    fn check_required_protocol_states(&self, _protocol_states: ()) {
2366        todo!() // Not sure what is the type of `protocol_states` here
2367    }
2368
2369    /// Iterates on the scan state by tree
2370    /// And for each tree we iterate on all its values (from root to leaves)
2371    pub fn view(&self) -> impl Iterator<Item = impl Iterator<Item = JobValueWithIndex<'_>>> {
2372        self.scan_state.trees.iter().map(|tree| tree.view())
2373    }
2374}
2375
2376pub fn group_list<'a, F, T, R>(slice: &'a [T], fun: F) -> impl Iterator<Item = OneOrTwo<R>> + 'a
2377where
2378    F: Fn(&'a T) -> R + 'a,
2379{
2380    slice.chunks(2).map(move |subslice| match subslice {
2381        [a, b] => OneOrTwo::Two((fun(a), fun(b))),
2382        [a] => OneOrTwo::One(fun(a)),
2383        _ => panic!(),
2384    })
2385}
2386
2387pub enum Extracted {
2388    First {
2389        transaction_with_info: Box<TransactionApplied>,
2390        statement: Box<Statement<()>>,
2391        state_hash: Box<(Fp, Fp)>,
2392        first_pass_ledger_witness: SparseLedger,
2393        second_pass_ledger_witness: SparseLedger,
2394        init_stack: Box<InitStack>,
2395        block_global_slot: Slot,
2396    },
2397    Second(Box<(LedgerProof, LedgerProof)>),
2398}
2399
2400#[derive(Clone, Debug)]
2401pub struct TransactionsOrdered<T> {
2402    pub first_pass: Vec<T>,
2403    pub second_pass: Vec<T>,
2404    pub previous_incomplete: Vec<T>,
2405    pub current_incomplete: Vec<T>,
2406}
2407
2408impl<T> TransactionsOrdered<T> {
2409    fn map<B>(self, mut fun: impl FnMut(T) -> B) -> TransactionsOrdered<B> {
2410        let Self {
2411            first_pass,
2412            second_pass,
2413            previous_incomplete,
2414            current_incomplete,
2415        } = self;
2416
2417        let mut conv = |v: Vec<T>| v.into_iter().map(&mut fun).collect::<Vec<B>>();
2418
2419        TransactionsOrdered::<B> {
2420            first_pass: conv(first_pass),
2421            second_pass: conv(second_pass),
2422            previous_incomplete: conv(previous_incomplete),
2423            current_incomplete: conv(current_incomplete),
2424        }
2425    }
2426
2427    fn fold<A>(&self, init: A, fun: impl Fn(A, &T) -> A) -> A {
2428        let Self {
2429            first_pass,
2430            second_pass,
2431            previous_incomplete,
2432            current_incomplete,
2433        } = self;
2434
2435        let init = first_pass.iter().fold(init, &fun);
2436        let init = previous_incomplete.iter().fold(init, &fun);
2437        let init = second_pass.iter().fold(init, &fun);
2438        current_incomplete.iter().fold(init, &fun)
2439    }
2440}
2441
2442impl TransactionsOrdered<Arc<TransactionWithWitness>> {
2443    fn first_and_second_pass_transactions_per_tree(
2444        previous_incomplete: Vec<Arc<TransactionWithWitness>>,
2445        txns_per_tree: Vec<Arc<TransactionWithWitness>>,
2446    ) -> Vec<Self> {
2447        let txns_per_tree_len = txns_per_tree.len();
2448
2449        let complete_and_incomplete_transactions = |txs: Vec<Arc<TransactionWithWitness>>| -> Option<
2450            TransactionsOrdered<Arc<TransactionWithWitness>>,
2451        > {
2452            let target_first_pass_ledger = txs.first()?.statement.source.first_pass_ledger;
2453            let first_state_hash = txs.first()?.state_hash.0;
2454
2455            let first_pass_txns = Vec::with_capacity(txns_per_tree_len);
2456            let second_pass_txns = Vec::with_capacity(txns_per_tree_len);
2457
2458            let (first_pass_txns, second_pass_txns, target_first_pass_ledger) =
2459                txs.into_iter().fold(
2460                    (first_pass_txns, second_pass_txns, target_first_pass_ledger),
2461                    |(mut first_pass_txns, mut second_pass_txns, _old_root), txn_with_witness| {
2462                        let txn = txn_with_witness.transaction_with_info.transaction();
2463                        let target_first_pass_ledger =
2464                            txn_with_witness.statement.target.first_pass_ledger;
2465
2466                        use crate::scan_state::transaction_logic::UserCommand::*;
2467                        use Transaction::*;
2468
2469                        match txn.data {
2470                            Coinbase(_) | FeeTransfer(_) | Command(SignedCommand(_)) => {
2471                                first_pass_txns.push(txn_with_witness);
2472                            }
2473                            Command(ZkAppCommand(_)) => {
2474                                first_pass_txns.push(txn_with_witness.clone());
2475                                second_pass_txns.push(txn_with_witness);
2476                            }
2477                        }
2478
2479                        (first_pass_txns, second_pass_txns, target_first_pass_ledger)
2480                    },
2481                );
2482
2483            let (second_pass_txns, incomplete_txns) = match second_pass_txns.first() {
2484                None => (vec![], vec![]),
2485                Some(txn_with_witness) => {
2486                    if txn_with_witness.statement.source.second_pass_ledger
2487                        == target_first_pass_ledger
2488                    {
2489                        // second pass completed in the same tree
2490                        (second_pass_txns, vec![])
2491                    } else {
2492                        (vec![], second_pass_txns)
2493                    }
2494                }
2495            };
2496
2497            let previous_incomplete = match previous_incomplete.first() {
2498                None => vec![],
2499                Some(tx) => {
2500                    if tx.state_hash.0 == first_state_hash {
2501                        // same block
2502                        previous_incomplete.clone()
2503                    } else {
2504                        vec![]
2505                    }
2506                }
2507            };
2508
2509            Some(Self {
2510                first_pass: first_pass_txns,
2511                second_pass: second_pass_txns,
2512                current_incomplete: incomplete_txns,
2513                previous_incomplete,
2514            })
2515        };
2516
2517        let txns_by_block = |txns_per_tree: Vec<Arc<TransactionWithWitness>>| {
2518            let mut global = Vec::with_capacity(txns_per_tree.len());
2519            let txns_per_tree_len = txns_per_tree.len();
2520
2521            let make_current =
2522                || Vec::<Arc<TransactionWithWitness>>::with_capacity(txns_per_tree_len);
2523            let mut current = make_current();
2524
2525            for next in txns_per_tree {
2526                if current
2527                    .last()
2528                    .map(|last| last.state_hash.0 != next.state_hash.0)
2529                    .unwrap_or(false)
2530                {
2531                    global.push(current);
2532                    current = make_current();
2533                }
2534
2535                current.push(next);
2536            }
2537
2538            if !current.is_empty() {
2539                global.push(current);
2540            }
2541
2542            global
2543        };
2544
2545        txns_by_block(txns_per_tree)
2546            .into_iter()
2547            .filter_map(complete_and_incomplete_transactions)
2548            .collect()
2549    }
2550
2551    fn first_and_second_pass_transactions_per_forest(
2552        scan_state_txns: Vec<Vec<Arc<TransactionWithWitness>>>,
2553        previous_incomplete: Vec<Arc<TransactionWithWitness>>,
2554    ) -> Vec<Vec<Self>> {
2555        scan_state_txns
2556            .into_iter()
2557            .map(|txns_per_tree| {
2558                Self::first_and_second_pass_transactions_per_tree(
2559                    previous_incomplete.clone(),
2560                    txns_per_tree,
2561                )
2562            })
2563            .collect()
2564    }
2565}
2566
2567#[derive(Clone, Debug)]
2568pub enum Pass {
2569    FirstPassLedgerHash(Fp),
2570}
2571
2572impl From<&OneOrTwo<AvailableJobMessage>> for SnarkJobId {
2573    fn from(value: &OneOrTwo<AvailableJobMessage>) -> Self {
2574        let (first, second) = match value {
2575            OneOrTwo::One(j) => (j, j),
2576            OneOrTwo::Two((j1, j2)) => (j1, j2),
2577        };
2578
2579        let source = match first {
2580            AvailableJobMessage::Base(base) => &base.statement.0.source,
2581            AvailableJobMessage::Merge { left, .. } => &left.0 .0.statement.source,
2582        };
2583        let target = match second {
2584            AvailableJobMessage::Base(base) => &base.statement.0.target,
2585            AvailableJobMessage::Merge { right, .. } => &right.0 .0.statement.target,
2586        };
2587
2588        (source, target).into()
2589    }
2590}
2591
2592impl From<&OneOrTwo<Statement<()>>> for SnarkJobId {
2593    fn from(value: &OneOrTwo<Statement<()>>) -> Self {
2594        let (source, target): (
2595            mina_p2p_messages::v2::MinaStateBlockchainStateValueStableV2LedgerProofStatementSource,
2596            mina_p2p_messages::v2::MinaStateBlockchainStateValueStableV2LedgerProofStatementSource,
2597        ) = match value {
2598            OneOrTwo::One(stmt) => ((&stmt.source).into(), (&stmt.target).into()),
2599            OneOrTwo::Two((stmt1, stmt2)) => ((&stmt1.source).into(), (&stmt2.target).into()),
2600        };
2601        (&source, &target).into()
2602    }
2603}