mina_tree/scan_state/
scan_state.rs

1use std::{collections::HashSet, sync::Arc};
2
3use blake2::{
4    digest::{generic_array::GenericArray, typenum::U32},
5    Digest,
6};
7use mina_curves::pasta::Fp;
8use mina_p2p_messages::{
9    binprot,
10    v2::{
11        MinaStateProtocolStateValueStableV2,
12        TransactionSnarkScanStateLedgerProofWithSokMessageStableV2,
13        TransactionSnarkScanStateTransactionWithWitnessStableV2,
14    },
15};
16use mina_signer::CompressedPubKey;
17use openmina_core::{constants::ConstraintConstants, snark::SnarkJobId};
18use sha2::Sha256;
19
20use crate::{
21    scan_state::{
22        parallel_scan::{base, merge, JobStatus},
23        pending_coinbase,
24        scan_state::transaction_snark::{
25            LedgerProofWithSokMessage, SokMessage, Statement, TransactionWithWitness,
26        },
27        transaction_logic::{
28            apply_transaction_first_pass, apply_transaction_second_pass,
29            local_state::LocalStateEnv,
30            protocol_state::GlobalState,
31            transaction_partially_applied::{
32                TransactionPartiallyApplied, ZkappCommandPartiallyApplied,
33            },
34            TransactionStatus,
35        },
36    },
37    sparse_ledger::SparseLedger,
38    staged_ledger::hash::AuxHash,
39    verifier::Verifier,
40    zkapps::non_snark::LedgerNonSnark,
41};
42
43use self::transaction_snark::{InitStack, LedgerProof, OneOrTwo, Registers};
44
45use super::{
46    currency::{Fee, Slot},
47    parallel_scan::ParallelScan,
48    snark_work,
49    transaction_logic::{
50        local_state::LocalState,
51        protocol_state::{protocol_state_view, ProtocolStateView},
52        transaction_applied::TransactionApplied,
53        transaction_witness::TransactionWitness,
54        Transaction, WithStatus,
55    },
56};
57// use super::parallel_scan::AvailableJob;
58
59pub use super::parallel_scan::{
60    base::Job as JobValueBase, merge::Job as JobValueMerge,
61    AvailableJob as ParallelScanAvailableJob, JobValue, JobValueWithIndex, SpacePartition,
62};
63
64// type LedgerProof = LedgerProofProdStableV2;
65// type LedgerProofWithSokMessage = TransactionSnarkScanStateLedgerProofWithSokMessageStableV2;
66// type TransactionWithWitness = TransactionSnarkScanStateTransactionWithWitnessStableV2;
67
68pub type AvailableJobMessage = super::parallel_scan::AvailableJob<
69    TransactionSnarkScanStateTransactionWithWitnessStableV2,
70    TransactionSnarkScanStateLedgerProofWithSokMessageStableV2,
71>;
72pub type AvailableJob = super::parallel_scan::AvailableJob<
73    Arc<transaction_snark::TransactionWithWitness>,
74    Arc<transaction_snark::LedgerProofWithSokMessage>,
75>;
76
77#[derive(Clone, Debug, PartialEq)]
78pub struct BorderBlockContinuedInTheNextTree(pub(super) bool);
79
80/// Scan state and any zkapp updates that were applied to the to the most recent
81/// snarked ledger but are from the tree just before the tree corresponding to
82/// the snarked ledger*)
83#[derive(Clone)]
84pub struct ScanState {
85    pub scan_state: ParallelScan<
86        Arc<transaction_snark::TransactionWithWitness>,
87        Arc<transaction_snark::LedgerProofWithSokMessage>,
88    >,
89    pub previous_incomplete_zkapp_updates: (
90        Vec<Arc<transaction_snark::TransactionWithWitness>>,
91        BorderBlockContinuedInTheNextTree,
92    ),
93}
94
95pub mod transaction_snark {
96    use std::sync::Arc;
97
98    use itertools::Itertools;
99    use mina_curves::pasta::Fp;
100    use mina_p2p_messages::{binprot, string::ByteString, v2::TransactionSnarkProofStableV2};
101    use mina_signer::CompressedPubKey;
102    use serde::{Deserialize, Serialize};
103
104    use crate::{
105        proofs::{
106            field::{field, Boolean},
107            witness::Witness,
108        },
109        scan_state::{
110            currency::{Amount, Signed, Slot},
111            fee_excess::FeeExcess,
112            pending_coinbase,
113            transaction_logic::{local_state::LocalState, transaction_applied::TransactionApplied},
114        },
115        sparse_ledger::SparseLedger,
116        staged_ledger::hash::OCamlString,
117        AppendToInputs as _, ToInputs,
118    };
119
120    use super::Fee;
121    use poseidon::hash::Inputs;
122
123    pub type LedgerHash = Fp;
124
125    /// <https://github.com/MinaProtocol/mina/blob/436023ba41c43a50458a551b7ef7a9ae61670b25/src/lib/mina_state/registers.ml>
126    #[derive(Debug, Clone, PartialEq, Eq)]
127    pub struct Registers {
128        pub first_pass_ledger: LedgerHash,
129        pub second_pass_ledger: LedgerHash,
130        pub pending_coinbase_stack: pending_coinbase::Stack,
131        pub local_state: LocalState,
132    }
133
134    impl ToInputs for Registers {
135        /// <https://github.com/MinaProtocol/mina/blob/4e0b324912017c3ff576704ee397ade3d9bda412/src/lib/mina_state/registers.ml#L30>
136        fn to_inputs(&self, inputs: &mut Inputs) {
137            let Self {
138                first_pass_ledger,
139                second_pass_ledger,
140                pending_coinbase_stack,
141                local_state,
142            } = self;
143
144            inputs.append(first_pass_ledger);
145            inputs.append(second_pass_ledger);
146            inputs.append(pending_coinbase_stack);
147            inputs.append(local_state);
148        }
149    }
150
151    impl Registers {
152        /// <https://github.com/MinaProtocol/mina/blob/2ee6e004ba8c6a0541056076aab22ea162f7eb3a/src/lib/transaction_snark/transaction_snark.ml#L350>
153        pub fn check_equal(&self, other: &Self) -> bool {
154            let Self {
155                first_pass_ledger,
156                second_pass_ledger,
157                pending_coinbase_stack,
158                local_state,
159            } = self;
160
161            first_pass_ledger == &other.first_pass_ledger
162                && second_pass_ledger == &other.second_pass_ledger
163                && local_state == &other.local_state
164                && pending_coinbase::Stack::connected(
165                    pending_coinbase_stack,
166                    &other.pending_coinbase_stack,
167                    None,
168                )
169        }
170
171        /// <https://github.com/MinaProtocol/mina/blob/436023ba41c43a50458a551b7ef7a9ae61670b25/src/lib/mina_state/registers.ml#L55>
172        pub fn connected(r1: &Self, r2: &Self) -> bool {
173            let Self {
174                first_pass_ledger,
175                second_pass_ledger,
176                pending_coinbase_stack,
177                local_state,
178            } = r1;
179
180            first_pass_ledger == &r2.first_pass_ledger
181                && second_pass_ledger == &r2.second_pass_ledger
182                && local_state == &r2.local_state
183                && pending_coinbase::Stack::connected(
184                    pending_coinbase_stack,
185                    &r2.pending_coinbase_stack,
186                    None,
187                )
188        }
189    }
190
191    #[derive(Clone, PartialEq, Eq, derive_more::Deref)]
192    pub struct SokDigest(pub Vec<u8>);
193
194    impl From<SokDigest> for ByteString {
195        fn from(value: SokDigest) -> Self {
196            value.0.into()
197        }
198    }
199
200    impl From<&SokDigest> for ByteString {
201        fn from(value: &SokDigest) -> Self {
202            value.0.clone().into()
203        }
204    }
205
206    impl OCamlString for SokDigest {
207        fn to_ocaml_str(&self) -> String {
208            crate::staged_ledger::hash::to_ocaml_str(&self.0)
209        }
210
211        fn from_ocaml_str(s: &str) -> Self {
212            let bytes: [u8; 32] = crate::staged_ledger::hash::from_ocaml_str(s);
213            Self(bytes.to_vec())
214        }
215    }
216
217    impl std::fmt::Debug for SokDigest {
218        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
219            f.write_fmt(format_args!("SokDigest({})", self.to_ocaml_str()))
220        }
221    }
222
223    impl Default for SokDigest {
224        /// <https://github.com/MinaProtocol/mina/blob/3a78f0e0c1343d14e2729c8b00205baa2ec70c93/src/lib/mina_base/sok_message.ml#L76>
225        fn default() -> Self {
226            Self(vec![0; 32])
227        }
228    }
229
230    pub struct StatementLedgers {
231        first_pass_ledger_source: LedgerHash,
232        first_pass_ledger_target: LedgerHash,
233        second_pass_ledger_source: LedgerHash,
234        second_pass_ledger_target: LedgerHash,
235        connecting_ledger_left: LedgerHash,
236        connecting_ledger_right: LedgerHash,
237        local_state_ledger_source: Fp,
238        local_state_ledger_target: Fp,
239    }
240
241    impl StatementLedgers {
242        /// <https://github.com/MinaProtocol/mina/blob/436023ba41c43a50458a551b7ef7a9ae61670b25/src/lib/mina_state/snarked_ledger_state.ml#L530>
243        pub fn of_statement<T>(s: &Statement<T>) -> Self {
244            Self {
245                first_pass_ledger_source: s.source.first_pass_ledger,
246                first_pass_ledger_target: s.target.first_pass_ledger,
247                second_pass_ledger_source: s.source.second_pass_ledger,
248                second_pass_ledger_target: s.target.second_pass_ledger,
249                connecting_ledger_left: s.connecting_ledger_left,
250                connecting_ledger_right: s.connecting_ledger_right,
251                local_state_ledger_source: s.source.local_state.ledger,
252                local_state_ledger_target: s.target.local_state.ledger,
253            }
254        }
255    }
256
257    /// <https://github.com/MinaProtocol/mina/blob/436023ba41c43a50458a551b7ef7a9ae61670b25/src/lib/mina_state/snarked_ledger_state.ml#L546>
258    fn validate_ledgers_at_merge(
259        s1: &StatementLedgers,
260        s2: &StatementLedgers,
261    ) -> Result<bool, String> {
262        // Check ledgers are valid based on the rules described in
263        // <https://github.com/MinaProtocol/mina/discussions/12000>
264        let is_same_block_at_shared_boundary = {
265            // First statement ends and the second statement starts in the
266            // same block. It could be within a single scan state tree
267            // or across two scan state trees
268            s1.connecting_ledger_right == s2.connecting_ledger_left
269        };
270
271        // Rule 1
272        let l1 = if is_same_block_at_shared_boundary {
273            // first pass ledger continues
274            &s2.first_pass_ledger_source
275        } else {
276            // s1's first pass ledger stops at the end of a block's transactions,
277            // check that it is equal to the start of the block's second pass ledger
278            &s1.connecting_ledger_right
279        };
280        let rule1 = "First pass ledger continues or first pass ledger connects to the \
281             same block's start of the second pass ledger";
282        let res1 = &s1.first_pass_ledger_target == l1;
283
284        // Rule 2
285        // s1's second pass ledger ends at say, block B1. s2 is in the next block, say B2
286        let l2 = if is_same_block_at_shared_boundary {
287            // second pass ledger continues
288            &s1.second_pass_ledger_target
289        } else {
290            // s2's second pass ledger starts where B2's first pass ledger ends
291            &s2.connecting_ledger_left
292        };
293        let rule2 = "Second pass ledger continues or second pass ledger of the statement on \
294             the right connects to the same block's end of first pass ledger";
295        let res2 = &s2.second_pass_ledger_source == l2;
296
297        // Rule 3
298        let l3 = if is_same_block_at_shared_boundary {
299            // no-op
300            &s1.second_pass_ledger_target
301        } else {
302            // s2's first pass ledger starts where B1's second pass ledger ends
303            &s2.first_pass_ledger_source
304        };
305        let rule3 = "First pass ledger of the statement on the right connects to the second \
306             pass ledger of the statement on the left";
307        let res3 = &s1.second_pass_ledger_target == l3;
308
309        let rule4 = "local state ledgers are equal or transition correctly from first pass \
310             to second pass";
311        let res4 = {
312            let local_state_ledger_equal =
313                s2.local_state_ledger_source == s1.local_state_ledger_target;
314
315            let local_state_ledger_transitions = s2.local_state_ledger_source
316                == s2.second_pass_ledger_source
317                && s1.local_state_ledger_target == s1.first_pass_ledger_target;
318
319            local_state_ledger_equal || local_state_ledger_transitions
320        };
321
322        let faileds = [(res1, rule1), (res2, rule2), (res3, rule3), (res4, rule4)]
323            .iter()
324            .filter_map(|(v, s)| if *v { None } else { Some(*s) })
325            .collect::<Vec<_>>();
326
327        if !faileds.is_empty() {
328            return Err(format!("Constraints failed: {}", faileds.iter().join(",")));
329        }
330
331        Ok(res1 && res2 && res3 && res4)
332    }
333
334    fn valid_ledgers_at_merge_unchecked(
335        s1: &StatementLedgers,
336        s2: &StatementLedgers,
337    ) -> Result<bool, String> {
338        validate_ledgers_at_merge(s1, s2)
339    }
340
341    // TODO: Dedup with `validate_ledgers_at_merge_checked`
342    pub fn validate_ledgers_at_merge_checked(
343        s1: &StatementLedgers,
344        s2: &StatementLedgers,
345        w: &mut Witness<Fp>,
346    ) -> Boolean {
347        let is_same_block_at_shared_boundary =
348            field::equal(s1.connecting_ledger_right, s2.connecting_ledger_left, w);
349        let l1 = w.exists_no_check(match is_same_block_at_shared_boundary {
350            Boolean::True => s2.first_pass_ledger_source,
351            Boolean::False => s1.connecting_ledger_right,
352        });
353        let res1 = field::equal(s1.first_pass_ledger_target, l1, w);
354        let l2 = w.exists_no_check(match is_same_block_at_shared_boundary {
355            Boolean::True => s1.second_pass_ledger_target,
356            Boolean::False => s2.connecting_ledger_left,
357        });
358        let res2 = field::equal(s2.second_pass_ledger_source, l2, w);
359        let l3 = w.exists_no_check(match is_same_block_at_shared_boundary {
360            Boolean::True => s1.second_pass_ledger_target,
361            Boolean::False => s2.first_pass_ledger_source,
362        });
363        let res3 = field::equal(s1.second_pass_ledger_target, l3, w);
364        let res4 = {
365            let local_state_ledger_equal = field::equal(
366                s2.local_state_ledger_source,
367                s1.local_state_ledger_target,
368                w,
369            );
370
371            // We decompose this way because of OCaml evaluation order
372            let b = field::equal(s1.local_state_ledger_target, s1.first_pass_ledger_target, w);
373            let a = field::equal(
374                s2.local_state_ledger_source,
375                s2.second_pass_ledger_source,
376                w,
377            );
378            let local_state_ledger_transitions = Boolean::all(&[a, b], w);
379
380            local_state_ledger_equal.or(&local_state_ledger_transitions, w)
381        };
382        // NOTES: No accumulate_failures here
383        Boolean::all(&[res1, res2, res3, res4], w)
384    }
385
386    #[derive(Debug, Clone, PartialEq, Eq)]
387    pub struct Statement<D> {
388        pub source: Registers,
389        pub target: Registers,
390        pub connecting_ledger_left: LedgerHash,
391        pub connecting_ledger_right: LedgerHash,
392        pub supply_increase: Signed<Amount>,
393        pub fee_excess: FeeExcess,
394        pub sok_digest: D,
395    }
396
397    impl ToInputs for Statement<SokDigest> {
398        /// <https://github.com/MinaProtocol/mina/blob/4e0b324912017c3ff576704ee397ade3d9bda412/src/lib/mina_state/snarked_ledger_state.ml#L263>
399        fn to_inputs(&self, inputs: &mut Inputs) {
400            let Self {
401                source,
402                target,
403                connecting_ledger_left,
404                connecting_ledger_right,
405                supply_increase,
406                fee_excess,
407                sok_digest,
408            } = self;
409
410            inputs.append_bytes(sok_digest);
411
412            inputs.append(source);
413            inputs.append(target);
414            inputs.append(connecting_ledger_left);
415            inputs.append(connecting_ledger_right);
416            inputs.append(supply_increase);
417            inputs.append(fee_excess);
418        }
419    }
420
421    impl Statement<SokDigest> {
422        pub fn without_digest(self) -> Statement<()> {
423            let Self {
424                source,
425                target,
426                connecting_ledger_left,
427                connecting_ledger_right,
428                supply_increase,
429                fee_excess,
430                sok_digest: _,
431            } = self;
432
433            Statement::<()> {
434                source,
435                target,
436                connecting_ledger_left,
437                connecting_ledger_right,
438                supply_increase,
439                fee_excess,
440                sok_digest: (),
441            }
442        }
443
444        pub fn with_digest(self, sok_digest: SokDigest) -> Self {
445            Self { sok_digest, ..self }
446        }
447    }
448
449    impl Statement<()> {
450        pub fn with_digest(self, sok_digest: SokDigest) -> Statement<SokDigest> {
451            let Self {
452                source,
453                target,
454                connecting_ledger_left,
455                connecting_ledger_right,
456                supply_increase,
457                fee_excess,
458                sok_digest: _,
459            } = self;
460
461            Statement::<SokDigest> {
462                source,
463                target,
464                connecting_ledger_left,
465                connecting_ledger_right,
466                supply_increase,
467                fee_excess,
468                sok_digest,
469            }
470        }
471
472        /// <https://github.com/MinaProtocol/mina/blob/436023ba41c43a50458a551b7ef7a9ae61670b25/src/lib/mina_state/snarked_ledger_state.ml#L631>
473        pub fn merge(&self, s2: &Statement<()>) -> Result<Self, String> {
474            let or_error_of_bool = |b: bool, error: &str| {
475                if b {
476                    Ok(())
477                } else {
478                    Err(format!(
479                        "Error merging statements left: {:#?} right {:#?}: {}",
480                        self, s2, error
481                    ))
482                }
483            };
484
485            // check ledgers are connected
486            let s1_ledger = StatementLedgers::of_statement(self);
487            let s2_ledger = StatementLedgers::of_statement(s2);
488
489            valid_ledgers_at_merge_unchecked(&s1_ledger, &s2_ledger)?;
490
491            // Check pending coinbase stack is connected
492            or_error_of_bool(
493                pending_coinbase::Stack::connected(
494                    &self.target.pending_coinbase_stack,
495                    &s2.source.pending_coinbase_stack,
496                    None,
497                ),
498                "Pending coinbase stacks are not connected",
499            )?;
500
501            // Check local states sans ledger are equal. Local state ledgers are checked
502            // in [valid_ledgers_at_merge_uncheckeds]
503            or_error_of_bool(
504                self.target
505                    .local_state
506                    .equal_without_ledger(&s2.source.local_state),
507                "Local states are not connected",
508            )?;
509
510            let connecting_ledger_left = self.connecting_ledger_left;
511            let connecting_ledger_right = s2.connecting_ledger_right;
512
513            let fee_excess = FeeExcess::combine(&self.fee_excess, &s2.fee_excess)?;
514            let supply_increase = self
515                .supply_increase
516                .add(&s2.supply_increase)
517                .ok_or_else(|| "Error adding supply_increase".to_string())?;
518
519            // assert!(self.target.check_equal(&s2.source));
520
521            Ok(Self {
522                source: self.source.clone(),
523                target: s2.target.clone(),
524                supply_increase,
525                fee_excess,
526                sok_digest: (),
527                connecting_ledger_left,
528                connecting_ledger_right,
529            })
530        }
531    }
532
533    pub mod work {
534        use ark_ff::fields::arithmetic::InvalidBigInt;
535
536        use super::*;
537
538        pub type Statement = OneOrTwo<super::Statement<()>>;
539
540        #[derive(Debug, Clone, PartialEq)]
541        pub struct Work {
542            pub fee: Fee,
543            pub proofs: OneOrTwo<LedgerProof>,
544            pub prover: CompressedPubKey,
545        }
546
547        pub type Unchecked = Work;
548
549        pub type Checked = Work;
550
551        impl TryFrom<&openmina_core::snark::Snark> for Work {
552            type Error = InvalidBigInt;
553
554            fn try_from(value: &openmina_core::snark::Snark) -> Result<Self, Self::Error> {
555                Ok(Self {
556                    prover: (&value.snarker).try_into()?,
557                    fee: (&value.fee).into(),
558                    proofs: (&*value.proofs).try_into()?,
559                })
560            }
561        }
562
563        impl Work {
564            pub fn statement(&self) -> Statement {
565                self.proofs.map(|p| {
566                    let statement = p.statement();
567                    super::Statement::<()> {
568                        source: statement.source,
569                        target: statement.target,
570                        supply_increase: statement.supply_increase,
571                        fee_excess: statement.fee_excess,
572                        sok_digest: (),
573                        connecting_ledger_left: statement.connecting_ledger_left,
574                        connecting_ledger_right: statement.connecting_ledger_right,
575                    }
576                })
577            }
578        }
579
580        impl Checked {
581            /// <https://github.com/MinaProtocol/mina/blob/05c2f73d0f6e4f1341286843814ce02dcb3919e0/src/lib/transaction_snark_work/transaction_snark_work.ml#L121>
582            pub fn forget(self) -> Unchecked {
583                self
584            }
585        }
586    }
587
588    // TransactionSnarkPendingCoinbaseStackStateInitStackStableV1
589    #[derive(Debug, Clone, PartialEq)]
590    pub enum InitStack {
591        Base(pending_coinbase::Stack),
592        Merge,
593    }
594
595    #[derive(Debug, Clone, PartialEq)]
596    pub struct TransactionWithWitness {
597        pub transaction_with_info: TransactionApplied,
598        pub state_hash: (Fp, Fp), // (StateHash, StateBodyHash)
599        // pub state_hash: (StateHash, MinaBaseStateBodyHashStableV1),
600        pub statement: Statement<()>,
601        pub init_stack: InitStack,
602        pub first_pass_ledger_witness: SparseLedger,
603        pub second_pass_ledger_witness: SparseLedger,
604        pub block_global_slot: Slot,
605    }
606
607    #[derive(Debug, Clone, PartialEq)]
608    pub struct TransactionSnark<D> {
609        pub statement: Statement<D>,
610        pub proof: Arc<TransactionSnarkProofStableV2>,
611    }
612
613    #[derive(Debug, Clone, PartialEq)]
614    pub struct LedgerProof(pub TransactionSnark<SokDigest>);
615
616    impl LedgerProof {
617        pub fn create(
618            statement: Statement<()>,
619            sok_digest: SokDigest,
620            proof: Arc<TransactionSnarkProofStableV2>,
621        ) -> Self {
622            let statement = Statement::<SokDigest> {
623                source: statement.source,
624                target: statement.target,
625                supply_increase: statement.supply_increase,
626                fee_excess: statement.fee_excess,
627                sok_digest,
628                connecting_ledger_left: statement.connecting_ledger_left,
629                connecting_ledger_right: statement.connecting_ledger_right,
630            };
631
632            Self(TransactionSnark { statement, proof })
633        }
634
635        pub fn statement(&self) -> Statement<()> {
636            let Statement {
637                source,
638                target,
639                connecting_ledger_left,
640                connecting_ledger_right,
641                supply_increase,
642                fee_excess,
643                sok_digest: _,
644            } = &self.0.statement;
645
646            Statement::<()> {
647                source: source.clone(),
648                target: target.clone(),
649                supply_increase: *supply_increase,
650                fee_excess: fee_excess.clone(),
651                sok_digest: (),
652                connecting_ledger_left: *connecting_ledger_left,
653                connecting_ledger_right: *connecting_ledger_right,
654            }
655        }
656
657        pub fn statement_ref(&self) -> &Statement<SokDigest> {
658            &self.0.statement
659        }
660    }
661
662    #[derive(Debug, Clone, PartialEq)]
663    pub struct SokMessage {
664        pub fee: Fee,
665        pub prover: CompressedPubKey,
666    }
667
668    impl SokMessage {
669        pub fn create(fee: Fee, prover: CompressedPubKey) -> Self {
670            Self { fee, prover }
671        }
672
673        pub fn digest(&self) -> SokDigest {
674            use binprot::BinProtWrite;
675
676            let mut bytes = Vec::with_capacity(10000);
677            let binprot: mina_p2p_messages::v2::MinaBaseSokMessageStableV1 = self.into();
678            binprot.binprot_write(&mut bytes).unwrap();
679
680            use blake2::{
681                digest::{Update, VariableOutput},
682                Blake2bVar,
683            };
684            let mut hasher = Blake2bVar::new(32).expect("Invalid Blake2bVar output size");
685            hasher.update(bytes.as_slice());
686            let digest = hasher.finalize_boxed();
687
688            SokDigest(digest.into())
689        }
690    }
691
692    #[derive(Debug, Clone, PartialEq)]
693    pub struct LedgerProofWithSokMessage {
694        pub proof: LedgerProof,
695        pub sok_message: SokMessage,
696    }
697
698    #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
699    pub enum OneOrTwo<T> {
700        One(T),
701        Two((T, T)),
702    }
703
704    impl<T> OneOrTwo<T> {
705        pub fn len(&self) -> usize {
706            match self {
707                OneOrTwo::One(_) => 1,
708                OneOrTwo::Two(_) => 2,
709            }
710        }
711
712        pub fn iter(&self) -> OneOrTwoIter<'_, T> {
713            let array = match self {
714                OneOrTwo::One(a) => [Some(a), None],
715                OneOrTwo::Two((a, b)) => [Some(a), Some(b)],
716            };
717
718            OneOrTwoIter {
719                inner: array,
720                index: 0,
721            }
722        }
723
724        #[allow(clippy::should_implement_trait)]
725        pub fn into_iter(self) -> OneOrTwoIntoIter<T> {
726            let array = match self {
727                OneOrTwo::One(a) => [Some(a), None],
728                OneOrTwo::Two((a, b)) => [Some(a), Some(b)],
729            };
730
731            OneOrTwoIntoIter {
732                inner: array,
733                index: 0,
734            }
735        }
736
737        pub fn map<F, R>(&self, fun: F) -> OneOrTwo<R>
738        where
739            F: Fn(&T) -> R,
740        {
741            match self {
742                OneOrTwo::One(one) => OneOrTwo::One(fun(one)),
743                OneOrTwo::Two((a, b)) => OneOrTwo::Two((fun(a), fun(b))),
744            }
745        }
746
747        pub fn into_map<F, R>(self, fun: F) -> OneOrTwo<R>
748        where
749            F: Fn(T) -> R,
750        {
751            match self {
752                OneOrTwo::One(one) => OneOrTwo::One(fun(one)),
753                OneOrTwo::Two((a, b)) => OneOrTwo::Two((fun(a), fun(b))),
754            }
755        }
756
757        pub fn into_map_err<F, R, E>(self, fun: F) -> Result<OneOrTwo<R>, E>
758        where
759            F: Fn(T) -> Result<R, E>,
760        {
761            match self {
762                OneOrTwo::One(one) => Ok(OneOrTwo::One(fun(one)?)),
763                OneOrTwo::Two((a, b)) => Ok(OneOrTwo::Two((fun(a)?, fun(b)?))),
764            }
765        }
766
767        pub fn into_map_some<F, R>(self, fun: F) -> Option<OneOrTwo<R>>
768        where
769            F: Fn(T) -> Option<R>,
770        {
771            match self {
772                OneOrTwo::One(one) => Some(OneOrTwo::One(fun(one)?)),
773                OneOrTwo::Two((a, b)) => {
774                    let a = fun(a)?;
775                    match fun(b) {
776                        Some(b) => Some(OneOrTwo::Two((a, b))),
777                        None => Some(OneOrTwo::One(a)),
778                    }
779                }
780            }
781        }
782
783        /// <https://github.com/MinaProtocol/mina/blob/05c2f73d0f6e4f1341286843814ce02dcb3919e0/src/lib/one_or_two/one_or_two.ml#L54>
784        pub fn zip<B>(a: OneOrTwo<T>, b: OneOrTwo<B>) -> Result<OneOrTwo<(T, B)>, String> {
785            use OneOrTwo::*;
786
787            match (a, b) {
788                (One(a), One(b)) => Ok(One((a, b))),
789                (Two((a1, a2)), Two((b1, b2))) => Ok(Two(((a1, b1), (a2, b2)))),
790                (One(_), Two(_)) | (Two(_), One(_)) => Err("One_or_two.zip mismatched".to_string()),
791            }
792        }
793
794        pub fn fold<A, F>(&self, init: A, fun: F) -> A
795        where
796            F: Fn(A, &T) -> A,
797        {
798            match self {
799                OneOrTwo::One(a) => fun(init, a),
800                OneOrTwo::Two((a, b)) => fun(fun(init, a), b),
801            }
802        }
803    }
804
805    pub struct OneOrTwoIter<'a, T> {
806        inner: [Option<&'a T>; 2],
807        index: usize,
808    }
809
810    impl<'a, T> Iterator for OneOrTwoIter<'a, T> {
811        type Item = &'a T;
812
813        fn next(&mut self) -> Option<Self::Item> {
814            let value = self.inner.get(self.index)?.as_ref()?;
815            self.index += 1;
816
817            Some(value)
818        }
819    }
820
821    pub struct OneOrTwoIntoIter<T> {
822        inner: [Option<T>; 2],
823        index: usize,
824    }
825
826    impl<T> Iterator for OneOrTwoIntoIter<T> {
827        type Item = T;
828
829        fn next(&mut self) -> Option<Self::Item> {
830            let value = self.inner.get_mut(self.index)?.take()?;
831            self.index += 1;
832
833            Some(value)
834        }
835    }
836}
837
838fn sha256_digest(bytes: &[u8]) -> GenericArray<u8, U32> {
839    let mut sha: Sha256 = Sha256::new();
840    sha.update(bytes);
841    sha.finalize()
842}
843
844impl ScanState {
845    pub fn hash(&self) -> AuxHash {
846        use binprot::BinProtWrite;
847
848        let Self {
849            scan_state,
850            previous_incomplete_zkapp_updates,
851        } = self;
852
853        let state_hash = scan_state.hash(
854            |buffer, proof| {
855                // #[cfg(test)]
856                // {
857                //     let a: mina_p2p_messages::v2::TransactionSnarkScanStateLedgerProofWithSokMessageStableV2 = proof.as_ref().into();
858                //     let b: LedgerProofWithSokMessage = (&a).into();
859                //     assert_eq!(&b, proof.as_ref());
860                // }
861
862                proof.binprot_write(buffer).unwrap();
863            },
864            |buffer, transaction| {
865                // #[cfg(test)]
866                // {
867                //     let a: mina_p2p_messages::v2::TransactionSnarkScanStateTransactionWithWitnessStableV2 = transaction.as_ref().into();
868                //     let b: TransactionWithWitness = (&a).into();
869                //     assert_eq!(&b, transaction.as_ref());
870                // }
871
872                transaction.binprot_write(buffer).unwrap();
873            },
874        );
875
876        let (
877            previous_incomplete_zkapp_updates,
878            BorderBlockContinuedInTheNextTree(continue_in_next_tree),
879        ) = previous_incomplete_zkapp_updates;
880
881        let incomplete_updates = previous_incomplete_zkapp_updates.iter().fold(
882            Vec::with_capacity(1024 * 32),
883            |mut accum, tx| {
884                tx.binprot_write(&mut accum).unwrap();
885                accum
886            },
887        );
888        let incomplete_updates = sha256_digest(&incomplete_updates);
889
890        let continue_in_next_tree = match continue_in_next_tree {
891            true => "true",
892            false => "false",
893        };
894        let continue_in_next_tree = sha256_digest(continue_in_next_tree.as_bytes());
895
896        let mut bytes = Vec::with_capacity(2048);
897        bytes.extend_from_slice(&state_hash);
898        bytes.extend_from_slice(&incomplete_updates);
899        bytes.extend_from_slice(&continue_in_next_tree);
900        let digest = sha256_digest(&bytes);
901
902        AuxHash(digest.into())
903    }
904}
905
906/// <https://github.com/MinaProtocol/mina/blob/e5183ca1dde1c085b4c5d37d1d9987e24c294c32/src/lib/transaction_snark_scan_state/transaction_snark_scan_state.ml#L175>
907fn create_expected_statement<F>(
908    constraint_constants: &ConstraintConstants,
909    get_state: F,
910    connecting_merkle_root: Fp,
911    TransactionWithWitness {
912        transaction_with_info,
913        state_hash,
914        statement,
915        init_stack,
916        first_pass_ledger_witness,
917        second_pass_ledger_witness,
918        block_global_slot,
919    }: &TransactionWithWitness,
920) -> Result<Statement<()>, String>
921where
922    F: Fn(Fp) -> MinaStateProtocolStateValueStableV2,
923{
924    // TODO: Don't clone here
925    let source_first_pass_merkle_root = first_pass_ledger_witness.clone().merkle_root();
926    let source_second_pass_merkle_root = second_pass_ledger_witness.clone().merkle_root();
927
928    let WithStatus {
929        data: transaction, ..
930    } = transaction_with_info.transaction();
931
932    let protocol_state = get_state(state_hash.0);
933    let state_view = protocol_state_view(&protocol_state).map_err(|e| format!("{:?}", e))?;
934
935    let empty_local_state = LocalState::empty();
936
937    let coinbase = match &transaction {
938        Transaction::Coinbase(coinbase) => Some(coinbase.clone()),
939        _ => None,
940    };
941    // Keep the error, in OCaml it is throwned later
942    let fee_excess_with_err = transaction.fee_excess();
943
944    let (target_first_pass_merkle_root, target_second_pass_merkle_root, supply_increase) = {
945        let mut first_pass_ledger_witness = first_pass_ledger_witness.copy_content();
946        let partially_applied_transaction = apply_transaction_first_pass(
947            constraint_constants,
948            *block_global_slot,
949            &state_view,
950            &mut first_pass_ledger_witness,
951            &transaction,
952        )?;
953
954        let mut second_pass_ledger_witness = second_pass_ledger_witness.copy_content();
955        let applied_transaction = apply_transaction_second_pass(
956            constraint_constants,
957            &mut second_pass_ledger_witness,
958            partially_applied_transaction,
959        )?;
960
961        let target_first_pass_merkle_root = first_pass_ledger_witness.merkle_root();
962        let target_second_pass_merkle_root = second_pass_ledger_witness.merkle_root();
963
964        // TODO: `supply_increase` has no parameter
965        let supply_increase = applied_transaction.supply_increase(constraint_constants)?;
966
967        (
968            target_first_pass_merkle_root,
969            target_second_pass_merkle_root,
970            supply_increase,
971        )
972    };
973
974    let pending_coinbase_before = match init_stack {
975        transaction_snark::InitStack::Base(source) => source,
976        transaction_snark::InitStack::Merge => {
977            return Err(
978                "Invalid init stack in Pending coinbase stack state . Expected Base found Merge"
979                    .to_string(),
980            );
981        }
982    };
983
984    let pending_coinbase_after = {
985        let state_body_hash = state_hash.1;
986
987        let pending_coinbase_with_state =
988            pending_coinbase_before.push_state(state_body_hash, *block_global_slot);
989
990        match coinbase {
991            Some(cb) => pending_coinbase_with_state.push_coinbase(cb),
992            None => pending_coinbase_with_state,
993        }
994    };
995
996    let fee_excess = fee_excess_with_err?;
997
998    Ok(Statement {
999        source: Registers {
1000            first_pass_ledger: source_first_pass_merkle_root,
1001            second_pass_ledger: source_second_pass_merkle_root,
1002            pending_coinbase_stack: statement.source.pending_coinbase_stack.clone(),
1003            local_state: empty_local_state.clone(),
1004        },
1005        target: Registers {
1006            first_pass_ledger: target_first_pass_merkle_root,
1007            second_pass_ledger: target_second_pass_merkle_root,
1008            pending_coinbase_stack: pending_coinbase_after,
1009            local_state: empty_local_state,
1010        },
1011        connecting_ledger_left: connecting_merkle_root,
1012        connecting_ledger_right: connecting_merkle_root,
1013        supply_increase,
1014        fee_excess,
1015        sok_digest: (),
1016    })
1017}
1018
1019fn completed_work_to_scanable_work(
1020    job: AvailableJob,
1021    (fee, current_proof, prover): (Fee, LedgerProof, CompressedPubKey),
1022) -> Result<Arc<LedgerProofWithSokMessage>, String> {
1023    use super::parallel_scan::AvailableJob::{Base, Merge};
1024
1025    let sok_digest = current_proof.0.statement.sok_digest;
1026
1027    let proof = &current_proof.0.proof;
1028
1029    match job {
1030        Base(t) => {
1031            let TransactionWithWitness { statement, .. } = t.as_ref();
1032            let ledger_proof = LedgerProof::create(statement.clone(), sok_digest, proof.clone());
1033            let sok_message = SokMessage::create(fee, prover);
1034
1035            Ok(Arc::new(LedgerProofWithSokMessage {
1036                proof: ledger_proof,
1037                sok_message,
1038            }))
1039        }
1040        Merge {
1041            left: proof1,
1042            right: proof2,
1043        } => {
1044            let s1 = proof1.proof.statement();
1045            let s2 = proof2.proof.statement();
1046
1047            let statement = s1.merge(&s2)?;
1048
1049            let ledger_proof = LedgerProof::create(statement, sok_digest, proof.clone());
1050            let sok_message = SokMessage::create(fee, prover);
1051
1052            Ok(Arc::new(LedgerProofWithSokMessage {
1053                proof: ledger_proof,
1054                sok_message,
1055            }))
1056        }
1057    }
1058}
1059
1060fn total_proofs(works: &[transaction_snark::work::Work]) -> usize {
1061    works.iter().map(|work| work.proofs.len()).sum()
1062}
1063
1064pub enum StatementCheck<F: Fn(Fp) -> MinaStateProtocolStateValueStableV2> {
1065    Partial,
1066    Full(F),
1067}
1068
1069impl ScanState {
1070    pub fn scan_statement<F>(
1071        &self,
1072        constraint_constants: &ConstraintConstants,
1073        statement_check: StatementCheck<F>,
1074        verifier: &Verifier,
1075    ) -> Result<Statement<()>, String>
1076    where
1077        F: Fn(Fp) -> MinaStateProtocolStateValueStableV2,
1078    {
1079        struct Acc(Option<(Statement<()>, Vec<Arc<LedgerProofWithSokMessage>>)>);
1080
1081        let merge_acc = |mut proofs: Vec<Arc<LedgerProofWithSokMessage>>,
1082                         acc: Acc,
1083                         s2: &Statement<()>|
1084         -> Result<Acc, String> {
1085            match acc.0 {
1086                None => Ok(Acc(Some((s2.clone(), proofs)))),
1087                Some((s1, mut ps)) => {
1088                    let merged_statement = s1.merge(s2)?;
1089                    proofs.append(&mut ps);
1090                    Ok(Acc(Some((merged_statement, proofs))))
1091                }
1092            }
1093        };
1094
1095        let merge_pc = |acc: Option<Statement<()>>,
1096                        s2: &Statement<()>|
1097         -> Result<Option<Statement<()>>, String> {
1098            match acc {
1099                None => Ok(Some(s2.clone())),
1100                Some(s1) => {
1101                    if !pending_coinbase::Stack::connected(
1102                        &s1.target.pending_coinbase_stack,
1103                        &s2.source.pending_coinbase_stack,
1104                        Some(&s1.source.pending_coinbase_stack),
1105                    ) {
1106                        return Err(format!(
1107                            "Base merge proof: invalid pending coinbase \
1108                             transition s1: {:?} s2: {:?}",
1109                            s1, s2
1110                        ));
1111                    }
1112                    Ok(Some(s2.clone()))
1113                }
1114            }
1115        };
1116
1117        let fold_step_a = |(acc_statement, acc_pc): (Acc, Option<Statement<()>>),
1118                           job: &merge::Job<Arc<LedgerProofWithSokMessage>>|
1119         -> Result<(Acc, Option<Statement<()>>), String> {
1120            use merge::{
1121                Job::{Empty, Full, Part},
1122                Record,
1123            };
1124            use JobStatus::Done;
1125
1126            match job {
1127                Part(ref ledger) => {
1128                    let LedgerProofWithSokMessage { proof, .. } = ledger.as_ref();
1129                    let statement = proof.statement();
1130                    let acc_stmt = merge_acc(vec![ledger.clone()], acc_statement, &statement)?;
1131                    Ok((acc_stmt, acc_pc))
1132                }
1133                Empty | Full(Record { state: Done, .. }) => Ok((acc_statement, acc_pc)),
1134                Full(Record { left, right, .. }) => {
1135                    let LedgerProofWithSokMessage { proof: proof1, .. } = left.as_ref();
1136                    let LedgerProofWithSokMessage { proof: proof2, .. } = right.as_ref();
1137
1138                    let stmt1 = proof1.statement();
1139                    let stmt2 = proof2.statement();
1140                    let merged_statement = stmt1.merge(&stmt2)?;
1141
1142                    let acc_stmt = merge_acc(
1143                        vec![left.clone(), right.clone()],
1144                        acc_statement,
1145                        &merged_statement,
1146                    )?;
1147
1148                    Ok((acc_stmt, acc_pc))
1149                }
1150            }
1151        };
1152
1153        let check_base = |(acc_statement, acc_pc), transaction: &TransactionWithWitness| {
1154            use StatementCheck::{Full, Partial};
1155
1156            let expected_statement = match &statement_check {
1157                Full(get_state) => create_expected_statement(
1158                    constraint_constants,
1159                    get_state,
1160                    transaction.statement.connecting_ledger_left,
1161                    transaction,
1162                )?,
1163                Partial => transaction.statement.clone(),
1164            };
1165
1166            if transaction.statement == expected_statement {
1167                let acc_stmt = merge_acc(Vec::new(), acc_statement, &transaction.statement)?;
1168                let acc_pc = merge_pc(acc_pc, &transaction.statement)?;
1169
1170                Ok((acc_stmt, acc_pc))
1171            } else {
1172                Err(format!(
1173                    "Bad base statement expected: {:#?} got: {:#?}",
1174                    transaction.statement, expected_statement
1175                ))
1176            }
1177        };
1178
1179        let fold_step_d = |(acc_statement, acc_pc): (Acc, Option<Statement<()>>),
1180                           job: &base::Job<Arc<TransactionWithWitness>>|
1181         -> Result<(Acc, Option<Statement<()>>), String> {
1182            use base::{
1183                Job::{Empty, Full},
1184                Record,
1185            };
1186            use JobStatus::Done;
1187
1188            match job {
1189                Empty => Ok((acc_statement, acc_pc)),
1190                Full(Record {
1191                    state: Done,
1192                    job: transaction,
1193                    ..
1194                }) => {
1195                    let acc_pc = merge_pc(acc_pc, &transaction.statement)?;
1196                    Ok((acc_statement, acc_pc))
1197                }
1198                Full(Record {
1199                    job: transaction, ..
1200                }) => check_base((acc_statement, acc_pc), transaction),
1201            }
1202        };
1203
1204        let res = self.scan_state.fold_chronological_until_err(
1205            (Acc(None), None),
1206            |acc, merge::Merge { weight: _, job }| fold_step_a(acc, job),
1207            |acc, base::Base { weight: _, job }| fold_step_d(acc, job),
1208            |v| v,
1209        )?;
1210
1211        match res {
1212            (Acc(None), _) => Err("Empty".to_string()),
1213            (Acc(Some((res, proofs))), _) => match verifier.verify(proofs.as_slice()) {
1214                Ok(Ok(())) => Ok(res),
1215                Ok(Err(e)) => Err(format!("Verifier issue {:?}", e)),
1216                Err(e) => Err(e),
1217            },
1218        }
1219    }
1220
1221    pub fn check_invariants<F>(
1222        &self,
1223        constraint_constants: &ConstraintConstants,
1224        statement_check: StatementCheck<F>,
1225        verifier: &Verifier,
1226        _error_prefix: &'static str,
1227        _last_proof_statement: Option<Statement<()>>,
1228        _registers_end: Registers,
1229    ) -> Result<(), String>
1230    where
1231        F: Fn(Fp) -> MinaStateProtocolStateValueStableV2,
1232    {
1233        // TODO: OCaml does much more than this (pretty printing error)
1234        match self.scan_statement(constraint_constants, statement_check, verifier) {
1235            Ok(_) => Ok(()),
1236            Err(s) => Err(s),
1237        }
1238    }
1239
1240    pub fn statement_of_job(job: &AvailableJob) -> Option<Statement<()>> {
1241        use super::parallel_scan::AvailableJob::{Base, Merge};
1242
1243        match job {
1244            Base(t) => {
1245                let TransactionWithWitness { statement, .. } = t.as_ref();
1246                Some(statement.clone())
1247            }
1248            Merge { left, right } => {
1249                let LedgerProofWithSokMessage { proof: p1, .. } = left.as_ref();
1250                let LedgerProofWithSokMessage { proof: p2, .. } = right.as_ref();
1251
1252                p1.statement().merge(&p2.statement()).ok()
1253            }
1254        }
1255    }
1256
1257    fn create(work_delay: u64, transaction_capacity_log_2: u64) -> Self {
1258        let k = 2u64.pow(transaction_capacity_log_2 as u32);
1259
1260        Self {
1261            scan_state: ParallelScan::empty(k, work_delay),
1262            previous_incomplete_zkapp_updates: (
1263                Vec::with_capacity(1024),
1264                BorderBlockContinuedInTheNextTree(false),
1265            ),
1266        }
1267    }
1268
1269    pub fn empty(constraint_constants: &ConstraintConstants) -> Self {
1270        let work_delay = constraint_constants.work_delay;
1271        let transaction_capacity_log_2 = constraint_constants.transaction_capacity_log_2;
1272
1273        Self::create(work_delay, transaction_capacity_log_2)
1274    }
1275
1276    fn extract_txn_and_global_slot(
1277        txn_with_witness: &TransactionWithWitness,
1278    ) -> (WithStatus<Transaction>, Fp, Slot) {
1279        let txn = txn_with_witness.transaction_with_info.transaction();
1280
1281        let state_hash = txn_with_witness.state_hash.0;
1282        let global_slot = txn_with_witness.block_global_slot;
1283        (txn, state_hash, global_slot)
1284    }
1285
1286    fn latest_ledger_proof_impl(
1287        &self,
1288    ) -> Option<(
1289        &LedgerProofWithSokMessage,
1290        Vec<TransactionsOrdered<Arc<TransactionWithWitness>>>,
1291    )> {
1292        let (proof, txns_with_witnesses) = self.scan_state.last_emitted_value()?;
1293
1294        let (previous_incomplete, BorderBlockContinuedInTheNextTree(continued_in_next_tree)) =
1295            self.previous_incomplete_zkapp_updates.clone();
1296
1297        let txns = {
1298            if continued_in_next_tree {
1299                TransactionsOrdered::first_and_second_pass_transactions_per_tree(
1300                    previous_incomplete,
1301                    txns_with_witnesses.clone(),
1302                )
1303            } else {
1304                let mut txns = TransactionsOrdered::first_and_second_pass_transactions_per_tree(
1305                    vec![],
1306                    txns_with_witnesses.clone(),
1307                );
1308
1309                if previous_incomplete.is_empty() {
1310                    txns
1311                } else {
1312                    txns.insert(
1313                        0,
1314                        TransactionsOrdered {
1315                            first_pass: vec![],
1316                            second_pass: vec![],
1317                            previous_incomplete,
1318                            current_incomplete: vec![],
1319                        },
1320                    );
1321                    txns
1322                }
1323            }
1324        };
1325
1326        Some((proof, txns))
1327    }
1328
1329    pub fn latest_ledger_proof(
1330        &self,
1331    ) -> Option<(
1332        &LedgerProofWithSokMessage,
1333        Vec<TransactionsOrdered<(WithStatus<Transaction>, Fp, Slot)>>,
1334    )> {
1335        self.latest_ledger_proof_impl().map(|(p, txns)| {
1336            let txns = txns
1337                .into_iter()
1338                .map(|ordered| ordered.map(|t| Self::extract_txn_and_global_slot(t.as_ref())))
1339                .collect::<Vec<_>>();
1340
1341            (p, txns)
1342        })
1343    }
1344
1345    fn incomplete_txns_from_recent_proof_tree(
1346        &self,
1347    ) -> Option<(
1348        LedgerProofWithSokMessage,
1349        (
1350            Vec<Arc<TransactionWithWitness>>,
1351            BorderBlockContinuedInTheNextTree,
1352        ),
1353    )> {
1354        let (proof, txns_per_block) = self.latest_ledger_proof_impl()?;
1355
1356        let txns = match txns_per_block.last() {
1357            None => (vec![], BorderBlockContinuedInTheNextTree(false)),
1358            Some(txns_in_last_block) => {
1359                // First pass ledger is considered as the snarked ledger, so any
1360                // account update whether completed in the same tree or not
1361                // should be included in the next tree
1362
1363                if !txns_in_last_block.second_pass.is_empty() {
1364                    (
1365                        txns_in_last_block.second_pass.clone(),
1366                        BorderBlockContinuedInTheNextTree(false),
1367                    )
1368                } else {
1369                    (
1370                        txns_in_last_block.current_incomplete.clone(),
1371                        BorderBlockContinuedInTheNextTree(true),
1372                    )
1373                }
1374            }
1375        };
1376
1377        Some((proof.clone(), txns))
1378    }
1379
1380    fn staged_transactions(&self) -> Vec<TransactionsOrdered<Arc<TransactionWithWitness>>> {
1381        let (previous_incomplete, BorderBlockContinuedInTheNextTree(continued_in_next_tree)) =
1382            match self.incomplete_txns_from_recent_proof_tree() {
1383                Some((_proof, v)) => v,
1384                None => (vec![], BorderBlockContinuedInTheNextTree(false)),
1385            };
1386
1387        let txns = {
1388            if continued_in_next_tree {
1389                TransactionsOrdered::first_and_second_pass_transactions_per_forest(
1390                    self.scan_state.pending_data(),
1391                    previous_incomplete,
1392                )
1393            } else {
1394                let mut txns = TransactionsOrdered::first_and_second_pass_transactions_per_forest(
1395                    self.scan_state.pending_data(),
1396                    vec![],
1397                );
1398
1399                if previous_incomplete.is_empty() {
1400                    txns
1401                } else {
1402                    txns.insert(
1403                        0,
1404                        vec![TransactionsOrdered {
1405                            first_pass: vec![],
1406                            second_pass: vec![],
1407                            previous_incomplete,
1408                            current_incomplete: vec![],
1409                        }],
1410                    );
1411                    txns
1412                }
1413            }
1414        };
1415
1416        txns.into_iter().flatten().collect::<Vec<_>>()
1417    }
1418
1419    /// All the transactions in the order in which they were applied along with
1420    /// the parent protocol state of the blocks that contained them
1421    fn staged_transactions_with_state_hash(
1422        &self,
1423    ) -> Vec<TransactionsOrdered<(WithStatus<Transaction>, Fp, Slot)>> {
1424        self.staged_transactions()
1425            .into_iter()
1426            .map(|ordered| ordered.map(|t| Self::extract_txn_and_global_slot(t.as_ref())))
1427            .collect::<Vec<_>>()
1428    }
1429
1430    fn apply_ordered_txns_stepwise<L, F, ApplyFirst, ApplySecond, ApplyFirstSparse>(
1431        stop_at_first_pass: Option<bool>,
1432        ordered_txns: Vec<TransactionsOrdered<Arc<TransactionWithWitness>>>,
1433        ledger: &mut L,
1434        get_protocol_state: F,
1435        apply_first_pass: ApplyFirst,
1436        apply_second_pass: &ApplySecond,
1437        apply_first_pass_sparse_ledger: ApplyFirstSparse,
1438    ) -> Result<Pass, String>
1439    where
1440        L: LedgerNonSnark,
1441        F: Fn(Fp) -> Result<MinaStateProtocolStateValueStableV2, String>,
1442        ApplyFirst: Fn(
1443            Slot,
1444            &ProtocolStateView,
1445            &mut L,
1446            &Transaction,
1447        ) -> Result<TransactionPartiallyApplied<L>, String>,
1448        ApplySecond:
1449            Fn(&mut L, TransactionPartiallyApplied<L>) -> Result<TransactionApplied, String>,
1450        ApplyFirstSparse: Fn(
1451            Slot,
1452            &ProtocolStateView,
1453            &mut SparseLedger,
1454            &Transaction,
1455        ) -> Result<TransactionPartiallyApplied<SparseLedger>, String>,
1456    {
1457        let mut ledger_mut = ledger.clone();
1458        let stop_at_first_pass = stop_at_first_pass.unwrap_or(false);
1459
1460        #[derive(Clone)]
1461        enum PreviousIncompleteTxns<L: LedgerNonSnark> {
1462            Unapplied(Vec<Arc<TransactionWithWitness>>),
1463            PartiallyApplied(Vec<(TransactionStatus, TransactionPartiallyApplied<L>)>),
1464        }
1465
1466        fn apply<L, F, Apply>(
1467            apply: Apply,
1468            ledger: &mut L,
1469            tx: &Transaction,
1470            state_hash: Fp,
1471            block_global_slot: Slot,
1472            get_protocol_state: F,
1473        ) -> Result<TransactionPartiallyApplied<L>, String>
1474        where
1475            L: LedgerNonSnark,
1476            F: Fn(Fp) -> Result<MinaStateProtocolStateValueStableV2, String>,
1477            Apply: Fn(
1478                Slot,
1479                &ProtocolStateView,
1480                &mut L,
1481                &Transaction,
1482            ) -> Result<TransactionPartiallyApplied<L>, String>,
1483        {
1484            match get_protocol_state(state_hash) {
1485                Ok(state) => {
1486                    let txn_state_view =
1487                        protocol_state_view(&state).map_err(|e| format!("{:?}", e))?;
1488                    apply(block_global_slot, &txn_state_view, ledger, tx)
1489                }
1490                Err(e) => Err(format!(
1491                    "Coudln't find protocol state with hash {:?}: {}",
1492                    state_hash, e
1493                )),
1494            }
1495        }
1496
1497        // let apply = |apply: ApplyFirst, ledger: &mut L, tx: &Transaction, state_hash: Fp, block_global_slot: Slot| {
1498        //     match get_protocol_state(state_hash) {
1499        //         Ok(state) => {
1500        //             let txn_state_view = protocol_state_view(&state);
1501        //             apply(block_global_slot, &txn_state_view, ledger, tx)
1502        //         },
1503        //         Err(e) => {
1504        //             Err(format!("Coudln't find protocol state with hash {:?}: {}", state_hash, e))
1505        //         },
1506        //     }
1507        // };
1508
1509        type Acc<L> = Vec<(TransactionStatus, TransactionPartiallyApplied<L>)>;
1510
1511        let apply_txns_first_pass = |mut acc: Acc<L>,
1512                                     txns: Vec<Arc<TransactionWithWitness>>|
1513         -> Result<(Pass, Acc<L>), String> {
1514            let mut ledger = ledger.clone();
1515
1516            for txn in txns {
1517                let (transaction, state_hash, block_global_slot) =
1518                    Self::extract_txn_and_global_slot(txn.as_ref());
1519                let expected_status = transaction.status;
1520
1521                let partially_applied_txn = apply(
1522                    &apply_first_pass,
1523                    &mut ledger,
1524                    &transaction.data,
1525                    state_hash,
1526                    block_global_slot,
1527                    &get_protocol_state,
1528                )?;
1529
1530                acc.push((expected_status, partially_applied_txn));
1531            }
1532
1533            Ok((Pass::FirstPassLedgerHash(ledger.merkle_root()), acc))
1534        };
1535
1536        fn apply_txns_second_pass<L, ApplySecond>(
1537            partially_applied_txns: Acc<L>,
1538            mut ledger: L,
1539            apply_second_pass: ApplySecond,
1540        ) -> Result<(), String>
1541        where
1542            L: LedgerNonSnark,
1543            ApplySecond:
1544                Fn(&mut L, TransactionPartiallyApplied<L>) -> Result<TransactionApplied, String>,
1545        {
1546            for (expected_status, partially_applied_txn) in partially_applied_txns {
1547                let res = apply_second_pass(&mut ledger, partially_applied_txn)?;
1548                let status = res.transaction_status();
1549
1550                if &expected_status != status {
1551                    return Err(format!(
1552                        "Transaction produced unxpected application status.\
1553                                        Expected {:#?}\
1554                                        Got: {:#?}\
1555                                        Transaction: {:#?}",
1556                        expected_status, status, "TODO"
1557                    ));
1558                    // Transaction: {:#?}", expected_status, status, partially_applied_txn));
1559                }
1560            }
1561
1562            Ok(())
1563        }
1564
1565        fn apply_previous_incomplete_txns<R, L, F, ApplyFirstSparse, ApplySecondPass>(
1566            txns: PreviousIncompleteTxns<L>,
1567            // k: Box<dyn FnOnce() -> Result<Pass, String>>,
1568            ledger: L,
1569            get_protocol_state: F,
1570            apply_first_pass_sparse_ledger: ApplyFirstSparse,
1571            apply_txns_second_pass: ApplySecondPass,
1572        ) -> Result<R, String>
1573        where
1574            L: LedgerNonSnark,
1575            F: Fn(Fp) -> Result<MinaStateProtocolStateValueStableV2, String>,
1576            ApplySecondPass: Fn(Acc<L>) -> Result<R, String>,
1577            ApplyFirstSparse: Fn(
1578                Slot,
1579                &ProtocolStateView,
1580                &mut SparseLedger,
1581                &Transaction,
1582            )
1583                -> Result<TransactionPartiallyApplied<SparseLedger>, String>,
1584        {
1585            // Note: Previous incomplete transactions refer to the block's transactions
1586            // from previous scan state tree that were split between the two trees.
1587            // The set in the previous tree have gone through the first pass. For the
1588            // second pass that is to happen after the rest of the set goes through the
1589            // first pass, we need partially applied state - result of previous tree's
1590            // transactions' first pass. To generate the partial state, we do a a first
1591            // pass application of previous tree's transaction on a sparse ledger created
1592            // from witnesses stored in the scan state and then use it to apply to the
1593            // ledger here
1594
1595            let inject_ledger_info =
1596                |partially_applied_txn: TransactionPartiallyApplied<SparseLedger>| {
1597                    use TransactionPartiallyApplied as P;
1598
1599                    match partially_applied_txn {
1600                        P::ZkappCommand(zkapp) => {
1601                            let original_first_pass_account_states = zkapp
1602                                .original_first_pass_account_states
1603                                .into_iter()
1604                                .map(|(id, loc_opt)| match loc_opt {
1605                                    None => Ok((id, None)),
1606                                    Some((_sparse_ledger_loc, account)) => {
1607                                        match ledger.location_of_account(&id) {
1608                                            Some(loc) => Ok((id, Some((loc, account)))),
1609                                            None => Err(
1610                                                "Original accounts states from partially applied \
1611                                                         transactions don't exist in the ledger",
1612                                            ),
1613                                        }
1614                                    }
1615                                })
1616                                .collect::<Result<Vec<_>, &'static str>>()
1617                                .unwrap(); // TODO
1618
1619                            let global_state = GlobalState {
1620                                first_pass_ledger: ledger.clone(),
1621                                second_pass_ledger: ledger.clone(),
1622                                fee_excess: zkapp.global_state.fee_excess,
1623                                supply_increase: zkapp.global_state.supply_increase,
1624                                protocol_state: zkapp.global_state.protocol_state,
1625                                block_global_slot: zkapp.global_state.block_global_slot,
1626                            };
1627
1628                            let local_state = LocalStateEnv::<L> {
1629                                stack_frame: zkapp.local_state.stack_frame,
1630                                call_stack: zkapp.local_state.call_stack,
1631                                transaction_commitment: zkapp.local_state.transaction_commitment,
1632                                full_transaction_commitment: zkapp
1633                                    .local_state
1634                                    .full_transaction_commitment,
1635                                excess: zkapp.local_state.excess,
1636                                supply_increase: zkapp.local_state.supply_increase,
1637                                ledger: ledger.clone(),
1638                                success: zkapp.local_state.success,
1639                                account_update_index: zkapp.local_state.account_update_index,
1640                                failure_status_tbl: zkapp.local_state.failure_status_tbl,
1641                                will_succeed: zkapp.local_state.will_succeed,
1642                            };
1643
1644                            TransactionPartiallyApplied::ZkappCommand(Box::new(
1645                                ZkappCommandPartiallyApplied {
1646                                    command: zkapp.command,
1647                                    previous_hash: zkapp.previous_hash,
1648                                    original_first_pass_account_states,
1649                                    constraint_constants: zkapp.constraint_constants,
1650                                    state_view: zkapp.state_view,
1651                                    global_state,
1652                                    local_state,
1653                                },
1654                            ))
1655                        }
1656                        P::SignedCommand(c) => P::SignedCommand(c),
1657                        P::FeeTransfer(ft) => P::FeeTransfer(ft),
1658                        P::Coinbase(cb) => P::Coinbase(cb),
1659                    }
1660                };
1661
1662            let apply_txns_to_witnesses_first_pass = |txns: Vec<Arc<TransactionWithWitness>>| {
1663                let acc = txns
1664                    .into_iter()
1665                    .map(|txn| {
1666                        let mut first_pass_ledger_witness =
1667                            txn.first_pass_ledger_witness.copy_content();
1668
1669                        let (transaction, state_hash, block_global_slot) =
1670                            ScanState::extract_txn_and_global_slot(txn.as_ref());
1671                        let expected_status = transaction.status.clone();
1672
1673                        let partially_applied_txn = apply(
1674                            &apply_first_pass_sparse_ledger,
1675                            &mut first_pass_ledger_witness,
1676                            &transaction.data,
1677                            state_hash,
1678                            block_global_slot,
1679                            &get_protocol_state,
1680                        )?;
1681
1682                        let partially_applied_txn = inject_ledger_info(partially_applied_txn);
1683
1684                        Ok((expected_status, partially_applied_txn))
1685                    })
1686                    .collect::<Result<Vec<_>, String>>()?;
1687
1688                Ok::<Acc<L>, String>(acc)
1689            };
1690
1691            use PreviousIncompleteTxns::{PartiallyApplied, Unapplied};
1692
1693            match txns {
1694                Unapplied(txns) => {
1695                    let partially_applied_txns = apply_txns_to_witnesses_first_pass(txns)?;
1696                    apply_txns_second_pass(partially_applied_txns)
1697                }
1698                PartiallyApplied(partially_applied_txns) => {
1699                    apply_txns_second_pass(partially_applied_txns)
1700                }
1701            }
1702        }
1703
1704        fn apply_txns<'a, L>(
1705            mut previous_incomplete: PreviousIncompleteTxns<L>,
1706            ordered_txns: Vec<TransactionsOrdered<Arc<TransactionWithWitness>>>,
1707            mut first_pass_ledger_hash: Pass,
1708            stop_at_first_pass: bool,
1709            apply_previous_incomplete_txns: &'a impl Fn(PreviousIncompleteTxns<L>) -> Result<(), String>,
1710            apply_txns_first_pass: &'a impl Fn(
1711                Acc<L>,
1712                Vec<Arc<TransactionWithWitness>>,
1713            ) -> Result<(Pass, Acc<L>), String>,
1714            apply_txns_second_pass: &'a impl Fn(Acc<L>) -> Result<(), String>,
1715        ) -> Result<Pass, String>
1716        where
1717            L: LedgerNonSnark,
1718        {
1719            use PreviousIncompleteTxns::{PartiallyApplied, Unapplied};
1720
1721            let mut ordered_txns = ordered_txns.into_iter().peekable();
1722
1723            let update_previous_incomplete = |previous_incomplete: PreviousIncompleteTxns<L>| {
1724                // filter out any non-zkapp transactions for second pass application
1725                match previous_incomplete {
1726                    Unapplied(txns) => Unapplied(
1727                        txns.into_iter()
1728                            .filter(|txn| {
1729                                use crate::scan_state::transaction_logic::transaction_applied::{
1730                                    CommandApplied::ZkappCommand, Varying::Command,
1731                                };
1732
1733                                matches!(
1734                                    &txn.transaction_with_info.varying,
1735                                    Command(ZkappCommand(_))
1736                                )
1737                            })
1738                            .collect(),
1739                    ),
1740                    PartiallyApplied(txns) => PartiallyApplied(
1741                        txns.into_iter()
1742                            .filter(|(_, txn)| {
1743                                matches!(&txn, TransactionPartiallyApplied::ZkappCommand(_))
1744                            })
1745                            .collect(),
1746                    ),
1747                }
1748            };
1749
1750            while let Some(txns_per_block) = ordered_txns.next() {
1751                let is_last = ordered_txns.peek().is_none();
1752
1753                previous_incomplete = update_previous_incomplete(previous_incomplete);
1754
1755                if is_last && stop_at_first_pass {
1756                    // Last block; don't apply second pass.
1757                    // This is for snarked ledgers which are first pass ledgers
1758
1759                    let (res_first_pass_ledger_hash, _) =
1760                        apply_txns_first_pass(Vec::with_capacity(256), txns_per_block.first_pass)?;
1761
1762                    first_pass_ledger_hash = res_first_pass_ledger_hash;
1763
1764                    // Skip previous_incomplete: If there are previous_incomplete txns
1765                    // then there’d be at least two sets of txns_per_block and the
1766                    // previous_incomplete txns will be applied when processing the first
1767                    // set. The subsequent sets shouldn’t have any previous-incomplete.
1768                    previous_incomplete = Unapplied(vec![]);
1769                    break;
1770                }
1771
1772                let current_incomplete_is_empty = txns_per_block.current_incomplete.is_empty();
1773
1774                // Apply first pass of a blocks transactions either new or
1775                // continued from previous tree
1776                let (res_first_pass_ledger_hash, partially_applied_txns) =
1777                    apply_txns_first_pass(Vec::with_capacity(256), txns_per_block.first_pass)?;
1778
1779                first_pass_ledger_hash = res_first_pass_ledger_hash;
1780
1781                let previous_not_empty = match &previous_incomplete {
1782                    Unapplied(txns) => !txns.is_empty(),
1783                    PartiallyApplied(txns) => !txns.is_empty(),
1784                };
1785
1786                // Apply second pass of previous tree's transactions, if any
1787                apply_previous_incomplete_txns(previous_incomplete)?;
1788
1789                let continue_previous_tree_s_txns = {
1790                    // If this is a continuation from previous tree for
1791                    // the same block (incomplete txns in both sets) then
1792                    // do second pass now
1793
1794                    previous_not_empty && !current_incomplete_is_empty
1795                };
1796
1797                let do_second_pass = {
1798                    // if transactions completed in the same tree; do second pass now
1799                    (!txns_per_block.second_pass.is_empty()) || continue_previous_tree_s_txns
1800                };
1801
1802                if do_second_pass {
1803                    apply_txns_second_pass(partially_applied_txns)?;
1804                    previous_incomplete = Unapplied(vec![]);
1805                } else {
1806                    // Transactions not completed in this tree, so second pass after
1807                    // first pass of remaining transactions for the same block
1808                    // in the next tree
1809                    previous_incomplete = PartiallyApplied(partially_applied_txns);
1810                }
1811            }
1812
1813            previous_incomplete = update_previous_incomplete(previous_incomplete);
1814
1815            apply_previous_incomplete_txns(previous_incomplete)?;
1816
1817            Ok(first_pass_ledger_hash)
1818        }
1819
1820        let previous_incomplete = match ordered_txns.first() {
1821            None => PreviousIncompleteTxns::<L>::Unapplied(vec![]),
1822            Some(first_block) => {
1823                PreviousIncompleteTxns::Unapplied(first_block.previous_incomplete.clone())
1824            }
1825        };
1826
1827        // Assuming this function is called on snarked ledger and snarked
1828        // ledger is the first pass ledger
1829        let first_pass_ledger_hash = Pass::FirstPassLedgerHash(ledger_mut.merkle_root());
1830
1831        apply_txns(
1832            previous_incomplete,
1833            ordered_txns,
1834            first_pass_ledger_hash,
1835            stop_at_first_pass,
1836            &|txns| {
1837                apply_previous_incomplete_txns(
1838                    txns,
1839                    ledger.clone(),
1840                    &get_protocol_state,
1841                    &apply_first_pass_sparse_ledger,
1842                    |partially_applied_txns| {
1843                        apply_txns_second_pass(
1844                            partially_applied_txns,
1845                            ledger.clone(),
1846                            apply_second_pass,
1847                        )
1848                    },
1849                )
1850            },
1851            &apply_txns_first_pass,
1852            &|partially_applied_txns| {
1853                apply_txns_second_pass(partially_applied_txns, ledger.clone(), apply_second_pass)
1854            }, // &apply_txns_second_pass,
1855        )
1856    }
1857
1858    fn apply_ordered_txns_sync<L, F, ApplyFirst, ApplySecond, ApplyFirstSparse>(
1859        stop_at_first_pass: Option<bool>,
1860        ordered_txns: Vec<TransactionsOrdered<Arc<TransactionWithWitness>>>,
1861        ledger: &mut L,
1862        get_protocol_state: F,
1863        apply_first_pass: ApplyFirst,
1864        apply_second_pass: ApplySecond,
1865        apply_first_pass_sparse_ledger: ApplyFirstSparse,
1866    ) -> Result<Pass, String>
1867    where
1868        L: LedgerNonSnark,
1869        F: Fn(Fp) -> Result<MinaStateProtocolStateValueStableV2, String>,
1870        ApplyFirst: Fn(
1871            Slot,
1872            &ProtocolStateView,
1873            &mut L,
1874            &Transaction,
1875        ) -> Result<TransactionPartiallyApplied<L>, String>,
1876        ApplySecond:
1877            Fn(&mut L, TransactionPartiallyApplied<L>) -> Result<TransactionApplied, String>,
1878        ApplyFirstSparse: Fn(
1879            Slot,
1880            &ProtocolStateView,
1881            &mut SparseLedger,
1882            &Transaction,
1883        ) -> Result<TransactionPartiallyApplied<SparseLedger>, String>,
1884    {
1885        Self::apply_ordered_txns_stepwise(
1886            stop_at_first_pass,
1887            ordered_txns,
1888            ledger,
1889            get_protocol_state,
1890            apply_first_pass,
1891            &apply_second_pass,
1892            apply_first_pass_sparse_ledger,
1893        )
1894    }
1895
1896    pub fn get_snarked_ledger_sync<L, F, ApplyFirst, ApplySecond, ApplyFirstSparse>(
1897        &self,
1898        ledger: &mut L,
1899        get_protocol_state: F,
1900        apply_first_pass: ApplyFirst,
1901        apply_second_pass: ApplySecond,
1902        apply_first_pass_sparse_ledger: ApplyFirstSparse,
1903    ) -> Result<Pass, String>
1904    where
1905        L: LedgerNonSnark,
1906        F: Fn(Fp) -> Result<MinaStateProtocolStateValueStableV2, String>,
1907        ApplyFirst: Fn(
1908            Slot,
1909            &ProtocolStateView,
1910            &mut L,
1911            &Transaction,
1912        ) -> Result<TransactionPartiallyApplied<L>, String>,
1913        ApplySecond:
1914            Fn(&mut L, TransactionPartiallyApplied<L>) -> Result<TransactionApplied, String>,
1915        ApplyFirstSparse: Fn(
1916            Slot,
1917            &ProtocolStateView,
1918            &mut SparseLedger,
1919            &Transaction,
1920        ) -> Result<TransactionPartiallyApplied<SparseLedger>, String>,
1921    {
1922        match self.latest_ledger_proof_impl() {
1923            None => Err("No transactions found".to_string()),
1924            Some((_, txns_per_block)) => Self::apply_ordered_txns_sync(
1925                Some(true),
1926                txns_per_block,
1927                ledger,
1928                get_protocol_state,
1929                apply_first_pass,
1930                apply_second_pass,
1931                apply_first_pass_sparse_ledger,
1932            ),
1933        }
1934    }
1935
1936    pub fn get_staged_ledger_sync<L, F, ApplyFirst, ApplySecond, ApplyFirstSparse>(
1937        &self,
1938        ledger: &mut L,
1939        get_protocol_state: F,
1940        apply_first_pass: ApplyFirst,
1941        apply_second_pass: ApplySecond,
1942        apply_first_pass_sparse_ledger: ApplyFirstSparse,
1943    ) -> Result<Pass, String>
1944    where
1945        L: LedgerNonSnark,
1946        F: Fn(Fp) -> Result<MinaStateProtocolStateValueStableV2, String>,
1947        ApplyFirst: Fn(
1948            Slot,
1949            &ProtocolStateView,
1950            &mut L,
1951            &Transaction,
1952        ) -> Result<TransactionPartiallyApplied<L>, String>,
1953        ApplySecond:
1954            Fn(&mut L, TransactionPartiallyApplied<L>) -> Result<TransactionApplied, String>,
1955        ApplyFirstSparse: Fn(
1956            Slot,
1957            &ProtocolStateView,
1958            &mut SparseLedger,
1959            &Transaction,
1960        ) -> Result<TransactionPartiallyApplied<SparseLedger>, String>,
1961    {
1962        let staged_transactions_with_state_hash = self.staged_transactions();
1963        Self::apply_ordered_txns_sync(
1964            None,
1965            staged_transactions_with_state_hash,
1966            ledger,
1967            get_protocol_state,
1968            apply_first_pass,
1969            apply_second_pass,
1970            apply_first_pass_sparse_ledger,
1971        )
1972    }
1973
1974    pub fn free_space(&self) -> u64 {
1975        self.scan_state.free_space()
1976    }
1977
1978    fn all_jobs(&self) -> Vec<Vec<AvailableJob>> {
1979        self.scan_state.all_jobs()
1980    }
1981
1982    pub fn next_on_new_tree(&self) -> bool {
1983        self.scan_state.next_on_new_tree()
1984    }
1985
1986    pub fn base_jobs_on_latest_tree(&self) -> impl Iterator<Item = Arc<TransactionWithWitness>> {
1987        self.scan_state.base_jobs_on_latest_tree()
1988    }
1989
1990    pub fn base_jobs_on_earlier_tree(
1991        &self,
1992        index: usize,
1993    ) -> impl Iterator<Item = Arc<TransactionWithWitness>> {
1994        self.scan_state.base_jobs_on_earlier_tree(index)
1995    }
1996
1997    pub fn partition_if_overflowing(&self) -> SpacePartition {
1998        let bundle_count = |work_count: u64| (work_count + 1) / 2;
1999
2000        // slots: current tree space
2001        // job_count: work count on current tree
2002        let SpacePartition {
2003            first: (slots, job_count),
2004            second,
2005        } = self.scan_state.partition_if_overflowing();
2006
2007        SpacePartition {
2008            first: (slots, bundle_count(job_count)),
2009            second: second.map(|(slots, job_count)| (slots, bundle_count(job_count))),
2010        }
2011    }
2012
2013    fn extract_from_job(job: AvailableJob) -> Extracted {
2014        use super::parallel_scan::AvailableJob::{Base, Merge};
2015
2016        match job {
2017            Base(d) => Extracted::First {
2018                transaction_with_info: Box::new(d.transaction_with_info.to_owned()),
2019                statement: Box::new(d.statement.to_owned()),
2020                state_hash: Box::new(d.state_hash),
2021                first_pass_ledger_witness: d.first_pass_ledger_witness.to_owned(),
2022                second_pass_ledger_witness: d.second_pass_ledger_witness.to_owned(),
2023                init_stack: Box::new(d.init_stack.to_owned()),
2024                block_global_slot: d.block_global_slot,
2025            },
2026            Merge { left, right } => {
2027                let LedgerProofWithSokMessage { proof: p1, .. } = left.as_ref();
2028                let LedgerProofWithSokMessage { proof: p2, .. } = right.as_ref();
2029                Extracted::Second(Box::new((p1.clone(), p2.clone())))
2030            }
2031        }
2032    }
2033
2034    pub fn all_work_statements_exn(&self) -> Vec<transaction_snark::work::Statement> {
2035        let work_seqs = self.all_jobs();
2036
2037        let s = |job: &AvailableJob| Self::statement_of_job(job).unwrap();
2038
2039        work_seqs
2040            .iter()
2041            .flat_map(|work_seq| group_list(work_seq, s))
2042            .collect()
2043    }
2044
2045    fn required_work_pairs(&self, slots: u64) -> Vec<OneOrTwo<AvailableJob>> {
2046        let work_list = self.scan_state.jobs_for_slots(slots);
2047        work_list
2048            .iter()
2049            .flat_map(|works| group_list(works, |job| job.clone()))
2050            .collect()
2051    }
2052
2053    pub fn k_work_pairs_for_new_diff(&self, k: u64) -> Vec<OneOrTwo<AvailableJob>> {
2054        let work_list = self.scan_state.jobs_for_next_update();
2055        work_list
2056            .iter()
2057            .flat_map(|works| group_list(works, |job| job.clone()))
2058            .take(k as usize)
2059            .collect()
2060    }
2061
2062    // Always the same pairing of jobs
2063    pub fn work_statements_for_new_diff(&self) -> Vec<transaction_snark::work::Statement> {
2064        let work_list = self.scan_state.jobs_for_next_update();
2065
2066        let s = |job: &AvailableJob| Self::statement_of_job(job).unwrap();
2067
2068        work_list
2069            .iter()
2070            .flat_map(|works| group_list(works, s))
2071            .collect()
2072    }
2073
2074    pub fn all_job_pairs_iter(&self) -> impl Iterator<Item = OneOrTwo<AvailableJob>> {
2075        self.all_jobs().into_iter().flat_map(|jobs| {
2076            let mut iter = jobs.into_iter();
2077            std::iter::from_fn(move || {
2078                let one = iter.next()?;
2079                Some(match iter.next() {
2080                    None => OneOrTwo::One(one),
2081                    Some(two) => OneOrTwo::Two((one, two)),
2082                })
2083            })
2084        })
2085    }
2086
2087    pub fn all_job_pairs_iter2(&self) -> impl Iterator<Item = OneOrTwo<AvailableJob>> {
2088        self.all_jobs().into_iter().flat_map(|jobs| {
2089            let mut iter = jobs.into_iter();
2090            std::iter::from_fn(move || {
2091                let one = iter.next()?;
2092                Some(OneOrTwo::One(one))
2093                // Some(match iter.next() {
2094                //     None => OneOrTwo::One(one),
2095                //     Some(two) => OneOrTwo::Two((one, two)),
2096                // })
2097            })
2098        })
2099    }
2100
2101    pub fn all_work_pairs<F>(
2102        &self,
2103        get_state: F,
2104    ) -> Result<Vec<OneOrTwo<snark_work::spec::Work>>, String>
2105    where
2106        F: Fn(&Fp) -> &MinaStateProtocolStateValueStableV2,
2107    {
2108        let single_spec = |job: AvailableJob| match Self::extract_from_job(job) {
2109            Extracted::First {
2110                transaction_with_info,
2111                statement,
2112                state_hash,
2113                first_pass_ledger_witness,
2114                second_pass_ledger_witness,
2115                init_stack,
2116                block_global_slot,
2117            } => {
2118                let witness = {
2119                    let WithStatus {
2120                        data: transaction,
2121                        status,
2122                    } = transaction_with_info.transaction();
2123
2124                    let protocol_state_body = {
2125                        let state = get_state(&state_hash.0);
2126                        state.body.clone()
2127                    };
2128
2129                    let init_stack = match *init_stack {
2130                        InitStack::Base(x) => x,
2131                        InitStack::Merge => return Err("init_stack was Merge".to_string()),
2132                    };
2133
2134                    TransactionWitness {
2135                        transaction,
2136                        protocol_state_body,
2137                        init_stack,
2138                        status,
2139                        first_pass_ledger: first_pass_ledger_witness,
2140                        second_pass_ledger: second_pass_ledger_witness,
2141                        block_global_slot,
2142                    }
2143                };
2144
2145                Ok(snark_work::spec::Work::Transition((statement, witness)))
2146            }
2147            Extracted::Second(s) => {
2148                let merged = s.0.statement().merge(&s.1.statement())?;
2149                Ok(snark_work::spec::Work::Merge(Box::new((merged, s))))
2150            }
2151        };
2152
2153        self.all_job_pairs_iter()
2154            .map(|group| group.into_map_err(single_spec))
2155            .collect()
2156    }
2157
2158    pub fn all_work_pairs2<F>(&self, get_state: F) -> Vec<OneOrTwo<snark_work::spec::Work>>
2159    where
2160        F: Fn(&Fp) -> Option<MinaStateProtocolStateValueStableV2>,
2161    {
2162        let single_spec = |job: AvailableJob| match Self::extract_from_job(job) {
2163            Extracted::First {
2164                transaction_with_info,
2165                statement,
2166                state_hash,
2167                first_pass_ledger_witness,
2168                second_pass_ledger_witness,
2169                init_stack,
2170                block_global_slot,
2171            } => {
2172                let witness = {
2173                    let WithStatus {
2174                        data: transaction,
2175                        status,
2176                    } = transaction_with_info.transaction();
2177
2178                    let protocol_state_body = {
2179                        let state = get_state(&state_hash.0)?;
2180                        state.body.clone()
2181                    };
2182
2183                    let init_stack = match *init_stack {
2184                        InitStack::Base(x) => x,
2185                        InitStack::Merge => return None,
2186                    };
2187
2188                    TransactionWitness {
2189                        transaction,
2190                        protocol_state_body,
2191                        init_stack,
2192                        status,
2193                        first_pass_ledger: first_pass_ledger_witness,
2194                        second_pass_ledger: second_pass_ledger_witness,
2195                        block_global_slot,
2196                    }
2197                };
2198
2199                Some(snark_work::spec::Work::Transition((statement, witness)))
2200            }
2201            Extracted::Second(s) => {
2202                let merged = s.0.statement().merge(&s.1.statement()).unwrap();
2203                Some(snark_work::spec::Work::Merge(Box::new((merged, s))))
2204            }
2205        };
2206
2207        self.all_job_pairs_iter2()
2208            .filter_map(|group| group.into_map_some(single_spec))
2209            .collect()
2210    }
2211
2212    pub fn fill_work_and_enqueue_transactions(
2213        &mut self,
2214        transactions: Vec<Arc<TransactionWithWitness>>,
2215        work: Vec<transaction_snark::work::Unchecked>,
2216    ) -> Result<
2217        Option<(
2218            LedgerProof,
2219            Vec<TransactionsOrdered<(WithStatus<Transaction>, Fp, Slot)>>,
2220        )>,
2221        String,
2222    > {
2223        {
2224            use crate::scan_state::transaction_logic::transaction_applied::Varying::*;
2225
2226            println!("{} transactions added to scan state:", transactions.len());
2227            println!(
2228                "- num_fee_transfer={:?}",
2229                transactions
2230                    .iter()
2231                    .filter(|tx| matches!(tx.transaction_with_info.varying, FeeTransfer(_)))
2232                    .count()
2233            );
2234
2235            println!(
2236                "- num_coinbase={:?}",
2237                transactions
2238                    .iter()
2239                    .filter(|tx| matches!(tx.transaction_with_info.varying, Coinbase(_)))
2240                    .count()
2241            );
2242
2243            println!(
2244                "- num_user_command={:?}",
2245                transactions
2246                    .iter()
2247                    .filter(|tx| matches!(tx.transaction_with_info.varying, Command(_)))
2248                    .count()
2249            );
2250        }
2251
2252        let fill_in_transaction_snark_work = |works: Vec<transaction_snark::work::Work>| -> Result<
2253            Vec<Arc<LedgerProofWithSokMessage>>,
2254            String,
2255        > {
2256            let next_jobs = self
2257                .scan_state
2258                .jobs_for_next_update()
2259                .into_iter()
2260                .flatten()
2261                .take(total_proofs(&works));
2262
2263            let works = works.into_iter().flat_map(
2264                |transaction_snark::work::Work {
2265                     fee,
2266                     proofs,
2267                     prover,
2268                 }| {
2269                    proofs
2270                        .into_map(|proof| (fee, proof, prover.clone()))
2271                        .into_iter()
2272                },
2273            );
2274
2275            next_jobs
2276                .zip(works)
2277                .map(|(job, work)| completed_work_to_scanable_work(job, work))
2278                .collect()
2279        };
2280
2281        // get incomplete transactions from previous proof which will be completed in
2282        // the new proof, if there's one
2283        let old_proof_and_incomplete_zkapp_updates = self.incomplete_txns_from_recent_proof_tree();
2284        let work_list = fill_in_transaction_snark_work(work)?;
2285
2286        let proof_opt = self
2287            .scan_state
2288            .update(transactions, work_list, |base| {
2289                // TODO: This is for logs only, make it cleaner
2290                match base.transaction_with_info.varying {
2291                    super::transaction_logic::transaction_applied::Varying::Command(_) => 0,
2292                    super::transaction_logic::transaction_applied::Varying::FeeTransfer(_) => 1,
2293                    super::transaction_logic::transaction_applied::Varying::Coinbase(_) => 2,
2294                }
2295            })
2296            .unwrap();
2297
2298        match proof_opt {
2299            None => Ok(None),
2300            Some((pwsm, _txns_with_witnesses)) => {
2301                let LedgerProofWithSokMessage { proof, .. } = pwsm.as_ref();
2302                let curr_stmt = proof.statement();
2303
2304                let (prev_stmt, incomplete_zkapp_updates_from_old_proof) =
2305                    match old_proof_and_incomplete_zkapp_updates {
2306                        None => (
2307                            curr_stmt.clone(),
2308                            (vec![], BorderBlockContinuedInTheNextTree(false)),
2309                        ),
2310                        Some((proof_with_sok, incomplete_zkapp_updates_from_old_proof)) => {
2311                            let proof = &proof_with_sok.proof;
2312                            (proof.statement(), incomplete_zkapp_updates_from_old_proof)
2313                        }
2314                    };
2315
2316                // prev_target is connected to curr_source- Order of the arguments is
2317                // important here
2318                let stmts_connect = if prev_stmt == curr_stmt {
2319                    Ok(())
2320                } else {
2321                    prev_stmt.merge(&curr_stmt).map(|_| ())
2322                };
2323
2324                match stmts_connect {
2325                    Ok(()) => {
2326                        self.previous_incomplete_zkapp_updates =
2327                            incomplete_zkapp_updates_from_old_proof;
2328
2329                        // This block is for when there's a proof emitted so Option.
2330                        // value_exn is safe here
2331                        // [latest_ledger_proof] generates ordered transactions
2332                        // appropriately*)
2333                        let (proof_with_sok, txns) = self.latest_ledger_proof().unwrap();
2334
2335                        Ok(Some((proof_with_sok.proof.clone(), txns)))
2336                    }
2337                    Err(e) => Err(format!(
2338                        "The new final statement does not connect to the previous \
2339                                     proof's statement: {:?}",
2340                        e
2341                    )),
2342                }
2343            }
2344        }
2345    }
2346
2347    pub fn required_state_hashes(&self) -> HashSet<Fp> {
2348        self.staged_transactions()
2349            .into_iter()
2350            .fold(HashSet::with_capacity(256), |accum, txns| {
2351                txns.fold(accum, |mut accum, txn| {
2352                    accum.insert(txn.state_hash.0);
2353                    accum
2354                })
2355            })
2356    }
2357
2358    fn check_required_protocol_states(&self, _protocol_states: ()) {
2359        todo!() // Not sure what is the type of `protocol_states` here
2360    }
2361
2362    /// Iterates on the scan state by tree
2363    /// And for each tree we iterate on all its values (from root to leaves)
2364    pub fn view(&self) -> impl Iterator<Item = impl Iterator<Item = JobValueWithIndex<'_>>> {
2365        self.scan_state.trees.iter().map(|tree| tree.view())
2366    }
2367}
2368
2369pub fn group_list<'a, F, T, R>(slice: &'a [T], fun: F) -> impl Iterator<Item = OneOrTwo<R>> + 'a
2370where
2371    F: Fn(&'a T) -> R + 'a,
2372{
2373    slice.chunks(2).map(move |subslice| match subslice {
2374        [a, b] => OneOrTwo::Two((fun(a), fun(b))),
2375        [a] => OneOrTwo::One(fun(a)),
2376        _ => panic!(),
2377    })
2378}
2379
2380pub enum Extracted {
2381    First {
2382        transaction_with_info: Box<TransactionApplied>,
2383        statement: Box<Statement<()>>,
2384        state_hash: Box<(Fp, Fp)>,
2385        first_pass_ledger_witness: SparseLedger,
2386        second_pass_ledger_witness: SparseLedger,
2387        init_stack: Box<InitStack>,
2388        block_global_slot: Slot,
2389    },
2390    Second(Box<(LedgerProof, LedgerProof)>),
2391}
2392
2393#[derive(Clone, Debug)]
2394pub struct TransactionsOrdered<T> {
2395    pub first_pass: Vec<T>,
2396    pub second_pass: Vec<T>,
2397    pub previous_incomplete: Vec<T>,
2398    pub current_incomplete: Vec<T>,
2399}
2400
2401impl<T> TransactionsOrdered<T> {
2402    fn map<B>(self, mut fun: impl FnMut(T) -> B) -> TransactionsOrdered<B> {
2403        let Self {
2404            first_pass,
2405            second_pass,
2406            previous_incomplete,
2407            current_incomplete,
2408        } = self;
2409
2410        let mut conv = |v: Vec<T>| v.into_iter().map(&mut fun).collect::<Vec<B>>();
2411
2412        TransactionsOrdered::<B> {
2413            first_pass: conv(first_pass),
2414            second_pass: conv(second_pass),
2415            previous_incomplete: conv(previous_incomplete),
2416            current_incomplete: conv(current_incomplete),
2417        }
2418    }
2419
2420    fn fold<A>(&self, init: A, fun: impl Fn(A, &T) -> A) -> A {
2421        let Self {
2422            first_pass,
2423            second_pass,
2424            previous_incomplete,
2425            current_incomplete,
2426        } = self;
2427
2428        let init = first_pass.iter().fold(init, &fun);
2429        let init = previous_incomplete.iter().fold(init, &fun);
2430        let init = second_pass.iter().fold(init, &fun);
2431        current_incomplete.iter().fold(init, &fun)
2432    }
2433}
2434
2435impl TransactionsOrdered<Arc<TransactionWithWitness>> {
2436    fn first_and_second_pass_transactions_per_tree(
2437        previous_incomplete: Vec<Arc<TransactionWithWitness>>,
2438        txns_per_tree: Vec<Arc<TransactionWithWitness>>,
2439    ) -> Vec<Self> {
2440        let txns_per_tree_len = txns_per_tree.len();
2441
2442        let complete_and_incomplete_transactions = |txs: Vec<Arc<TransactionWithWitness>>| -> Option<
2443            TransactionsOrdered<Arc<TransactionWithWitness>>,
2444        > {
2445            let target_first_pass_ledger = txs.first()?.statement.source.first_pass_ledger;
2446            let first_state_hash = txs.first()?.state_hash.0;
2447
2448            let first_pass_txns = Vec::with_capacity(txns_per_tree_len);
2449            let second_pass_txns = Vec::with_capacity(txns_per_tree_len);
2450
2451            let (first_pass_txns, second_pass_txns, target_first_pass_ledger) =
2452                txs.into_iter().fold(
2453                    (first_pass_txns, second_pass_txns, target_first_pass_ledger),
2454                    |(mut first_pass_txns, mut second_pass_txns, _old_root), txn_with_witness| {
2455                        let txn = txn_with_witness.transaction_with_info.transaction();
2456                        let target_first_pass_ledger =
2457                            txn_with_witness.statement.target.first_pass_ledger;
2458
2459                        use crate::scan_state::transaction_logic::UserCommand::*;
2460                        use Transaction::*;
2461
2462                        match txn.data {
2463                            Coinbase(_) | FeeTransfer(_) | Command(SignedCommand(_)) => {
2464                                first_pass_txns.push(txn_with_witness);
2465                            }
2466                            Command(ZkAppCommand(_)) => {
2467                                first_pass_txns.push(txn_with_witness.clone());
2468                                second_pass_txns.push(txn_with_witness);
2469                            }
2470                        }
2471
2472                        (first_pass_txns, second_pass_txns, target_first_pass_ledger)
2473                    },
2474                );
2475
2476            let (second_pass_txns, incomplete_txns) = match second_pass_txns.first() {
2477                None => (vec![], vec![]),
2478                Some(txn_with_witness) => {
2479                    if txn_with_witness.statement.source.second_pass_ledger
2480                        == target_first_pass_ledger
2481                    {
2482                        // second pass completed in the same tree
2483                        (second_pass_txns, vec![])
2484                    } else {
2485                        (vec![], second_pass_txns)
2486                    }
2487                }
2488            };
2489
2490            let previous_incomplete = match previous_incomplete.first() {
2491                None => vec![],
2492                Some(tx) => {
2493                    if tx.state_hash.0 == first_state_hash {
2494                        // same block
2495                        previous_incomplete.clone()
2496                    } else {
2497                        vec![]
2498                    }
2499                }
2500            };
2501
2502            Some(Self {
2503                first_pass: first_pass_txns,
2504                second_pass: second_pass_txns,
2505                current_incomplete: incomplete_txns,
2506                previous_incomplete,
2507            })
2508        };
2509
2510        let txns_by_block = |txns_per_tree: Vec<Arc<TransactionWithWitness>>| {
2511            let mut global = Vec::with_capacity(txns_per_tree.len());
2512            let txns_per_tree_len = txns_per_tree.len();
2513
2514            let make_current =
2515                || Vec::<Arc<TransactionWithWitness>>::with_capacity(txns_per_tree_len);
2516            let mut current = make_current();
2517
2518            for next in txns_per_tree {
2519                if current
2520                    .last()
2521                    .map(|last| last.state_hash.0 != next.state_hash.0)
2522                    .unwrap_or(false)
2523                {
2524                    global.push(current);
2525                    current = make_current();
2526                }
2527
2528                current.push(next);
2529            }
2530
2531            if !current.is_empty() {
2532                global.push(current);
2533            }
2534
2535            global
2536        };
2537
2538        txns_by_block(txns_per_tree)
2539            .into_iter()
2540            .filter_map(complete_and_incomplete_transactions)
2541            .collect()
2542    }
2543
2544    fn first_and_second_pass_transactions_per_forest(
2545        scan_state_txns: Vec<Vec<Arc<TransactionWithWitness>>>,
2546        previous_incomplete: Vec<Arc<TransactionWithWitness>>,
2547    ) -> Vec<Vec<Self>> {
2548        scan_state_txns
2549            .into_iter()
2550            .map(|txns_per_tree| {
2551                Self::first_and_second_pass_transactions_per_tree(
2552                    previous_incomplete.clone(),
2553                    txns_per_tree,
2554                )
2555            })
2556            .collect()
2557    }
2558}
2559
2560#[derive(Clone, Debug)]
2561pub enum Pass {
2562    FirstPassLedgerHash(Fp),
2563}
2564
2565impl From<&OneOrTwo<AvailableJobMessage>> for SnarkJobId {
2566    fn from(value: &OneOrTwo<AvailableJobMessage>) -> Self {
2567        let (first, second) = match value {
2568            OneOrTwo::One(j) => (j, j),
2569            OneOrTwo::Two((j1, j2)) => (j1, j2),
2570        };
2571
2572        let source = match first {
2573            AvailableJobMessage::Base(base) => &base.statement.0.source,
2574            AvailableJobMessage::Merge { left, .. } => &left.0 .0.statement.source,
2575        };
2576        let target = match second {
2577            AvailableJobMessage::Base(base) => &base.statement.0.target,
2578            AvailableJobMessage::Merge { right, .. } => &right.0 .0.statement.target,
2579        };
2580
2581        (source, target).into()
2582    }
2583}
2584
2585impl From<&OneOrTwo<Statement<()>>> for SnarkJobId {
2586    fn from(value: &OneOrTwo<Statement<()>>) -> Self {
2587        let (source, target): (
2588            mina_p2p_messages::v2::MinaStateBlockchainStateValueStableV2LedgerProofStatementSource,
2589            mina_p2p_messages::v2::MinaStateBlockchainStateValueStableV2LedgerProofStatementSource,
2590        ) = match value {
2591            OneOrTwo::One(stmt) => ((&stmt.source).into(), (&stmt.target).into()),
2592            OneOrTwo::Two((stmt1, stmt2)) => ((&stmt1.source).into(), (&stmt2.target).into()),
2593        };
2594        (&source, &target).into()
2595    }
2596}