mina_node_testing/service/
mod.rs

1mod rpc_service;
2
3use std::{
4    collections::{BTreeMap, VecDeque},
5    sync::{Arc, Mutex as StdMutex},
6    time::Duration,
7};
8
9use ledger::{
10    dummy::dummy_transaction_proof,
11    proofs::transaction::ProofError,
12    scan_state::{
13        scan_state::transaction_snark::SokMessage,
14        transaction_logic::{verifiable, WithStatus},
15    },
16    Mask,
17};
18use mina_core::channels::Aborter;
19use mina_node_native::NodeService;
20use mina_p2p_messages::{
21    string::ByteString,
22    v2::{
23        CurrencyFeeStableV1, LedgerHash, LedgerProofProdStableV2, MinaBaseProofStableV2,
24        MinaStateSnarkedLedgerStateWithSokStableV2, NonZeroCurvePoint,
25        ProverExtendBlockchainInputStableV2,
26        SnarkWorkerWorkerRpcsVersionedGetWorkV2TResponseA0Single, StateHash,
27        TransactionSnarkStableV2, TransactionSnarkWorkTStableV2Proofs,
28    },
29};
30use node::{
31    account::AccountPublicKey,
32    block_producer::{vrf_evaluator::VrfEvaluatorInput, BlockProducerEvent},
33    core::{
34        channels::mpsc,
35        invariants::InvariantsState,
36        snark::{Snark, SnarkJobId},
37    },
38    event_source::Event,
39    external_snark_worker::SnarkWorkSpec,
40    external_snark_worker_effectful::{ExternalSnarkWorkerEvent, ExternalSnarkWorkerService},
41    ledger::write::BlockApplyResult,
42    p2p::{
43        connection::outgoing::P2pConnectionOutgoingInitOpts,
44        service_impl::{
45            webrtc::{Cmd, P2pServiceWebrtc, PeerState},
46            webrtc_with_libp2p::P2pServiceWebrtcWithLibp2p,
47        },
48        webrtc, P2pCryptoService, PeerId,
49    },
50    recorder::Recorder,
51    service::{
52        BlockProducerService, BlockProducerVrfEvaluatorService, TransitionFrontierGenesisService,
53    },
54    snark::{
55        block_verify::{SnarkBlockVerifyId, SnarkBlockVerifyService, VerifiableBlockWithHash},
56        user_command_verify::SnarkUserCommandVerifyId,
57        user_command_verify_effectful::SnarkUserCommandVerifyService,
58        work_verify::{SnarkWorkVerifyId, SnarkWorkVerifyService},
59        BlockVerifier, SnarkEvent, TransactionVerifier, VerifierSRS,
60    },
61    snark_pool::SnarkPoolService,
62    stats::Stats,
63    transition_frontier::{archive::archive_service::ArchiveService, genesis::GenesisConfig},
64    ActionWithMeta, State,
65};
66use redux::Instant;
67
68use crate::{
69    cluster::{ClusterNodeId, ProofKind},
70    node::NonDeterministicEvent,
71};
72
73pub type DynEffects = Box<dyn FnMut(&State, &NodeTestingService, &ActionWithMeta) + Send>;
74
75#[derive(Hash, Ord, PartialOrd, Eq, PartialEq)]
76pub struct PendingEventIdType;
77impl mina_core::requests::RequestIdType for PendingEventIdType {
78    fn request_id_type() -> &'static str {
79        "PendingEventId"
80    }
81}
82
83#[derive(Clone, Copy, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize, Default)]
84pub struct PendingEventId(usize);
85
86struct PendingEvents {
87    events: VecDeque<(PendingEventId, Event)>,
88    next_id: PendingEventId,
89}
90
91impl PendingEventId {
92    fn copy_inc(&mut self) -> Self {
93        let copy = *self;
94        let _ = self.0.wrapping_add(1);
95        copy
96    }
97}
98
99impl PendingEvents {
100    fn new() -> Self {
101        PendingEvents {
102            events: VecDeque::new(),
103            next_id: Default::default(),
104        }
105    }
106
107    fn add(&mut self, event: Event) -> PendingEventId {
108        let id = self.next_id.copy_inc();
109        self.events.push_back((id, event));
110        id
111    }
112
113    fn get(&self, id: PendingEventId) -> Option<&Event> {
114        self.events
115            .iter()
116            .find_map(|(_id, event)| (*_id == id).then_some(event))
117    }
118
119    fn remove(&mut self, id: PendingEventId) -> Option<Event> {
120        if let Some(i) = self
121            .events
122            .iter()
123            .enumerate()
124            .find_map(|(i, (_id, _))| (*_id == id).then_some(i))
125        {
126            self.events.remove(i).map(|(_, event)| event)
127        } else {
128            None
129        }
130    }
131
132    fn iter(&self) -> impl Iterator<Item = (PendingEventId, &Event)> {
133        self.events.iter().map(|(id, event)| (*id, event))
134    }
135}
136
137pub struct NodeTestingService {
138    real: NodeService,
139    id: ClusterNodeId,
140    /// Use webrtc p2p between Rust nodes.
141    rust_to_rust_use_webrtc: bool,
142    proof_kind: ProofKind,
143    /// We are replaying this node so disable some non-deterministic services.
144    is_replay: bool,
145    monotonic_time: Instant,
146    /// Events sent by the real service not yet received by state machine.
147    pending_events: PendingEvents,
148    //pending_events: PendingRequests<PendingEventIdType, Event>,
149    dyn_effects: Option<DynEffects>,
150
151    snarker_sok_digest: Option<ByteString>,
152
153    cluster_invariants_state: Arc<StdMutex<InvariantsState>>,
154    /// Once dropped, it will cause all threads associated to shutdown.
155    _shutdown: Aborter,
156}
157
158impl NodeTestingService {
159    pub fn new(
160        real: NodeService,
161        id: ClusterNodeId,
162        cluster_invariants_state: Arc<StdMutex<InvariantsState>>,
163        _shutdown: Aborter,
164    ) -> Self {
165        Self {
166            real,
167            id,
168            rust_to_rust_use_webrtc: false,
169            proof_kind: ProofKind::default(),
170            is_replay: false,
171            monotonic_time: Instant::now(),
172            pending_events: PendingEvents::new(),
173            dyn_effects: None,
174            snarker_sok_digest: None,
175            cluster_invariants_state,
176            _shutdown,
177        }
178    }
179
180    pub fn node_id(&self) -> ClusterNodeId {
181        self.id
182    }
183
184    pub fn rust_to_rust_use_webrtc(&self) -> bool {
185        self.rust_to_rust_use_webrtc
186    }
187
188    pub fn set_rust_to_rust_use_webrtc(&mut self) -> &mut Self {
189        if !cfg!(feature = "p2p-webrtc") {
190            unreachable!();
191        }
192        self.rust_to_rust_use_webrtc = true;
193        self
194    }
195
196    pub fn proof_kind(&self) -> ProofKind {
197        self.proof_kind
198    }
199
200    pub fn set_proof_kind(&mut self, kind: ProofKind) -> &mut Self {
201        self.proof_kind = kind;
202        self
203    }
204
205    pub fn set_replay(&mut self) -> &mut Self {
206        self.is_replay = true;
207        self
208    }
209
210    pub fn advance_time(&mut self, by_nanos: u64) {
211        self.monotonic_time += Duration::from_nanos(by_nanos);
212    }
213
214    pub fn dyn_effects(&mut self, state: &State, action: &ActionWithMeta) {
215        if let Some(mut dyn_effects) = self.dyn_effects.take() {
216            (dyn_effects)(state, self, action);
217            self.dyn_effects = Some(dyn_effects);
218        }
219    }
220
221    pub fn set_dyn_effects(&mut self, effects: DynEffects) {
222        self.dyn_effects = Some(effects);
223    }
224
225    pub fn remove_dyn_effects(&mut self) -> Option<DynEffects> {
226        self.dyn_effects.take()
227    }
228
229    pub fn set_snarker_sok_digest(&mut self, digest: ByteString) {
230        self.snarker_sok_digest = Some(digest);
231    }
232
233    pub fn pending_events(&mut self, poll: bool) -> impl Iterator<Item = (PendingEventId, &Event)> {
234        while let Ok(req) = self.real.rpc_receiver().try_recv() {
235            self.real.process_rpc_request(req);
236        }
237        if poll {
238            while let Some(event) = self.real.event_receiver().try_next() {
239                // Drop non-deterministic events during replay. We
240                // have those recorded as `ScenarioStep::NonDeterministicEvent`.
241                if self.is_replay && NonDeterministicEvent::should_drop_event(&event) {
242                    eprintln!("dropping non-deterministic event: {event:?}");
243                    continue;
244                }
245                self.pending_events.add(event);
246            }
247        }
248        self.pending_events.iter()
249    }
250
251    pub async fn next_pending_event(&mut self) -> Option<(PendingEventId, &Event)> {
252        let event = loop {
253            let (event_receiver, rpc_receiver) = self.real.event_receiver_with_rpc_receiver();
254            tokio::select! {
255                Some(rpc) = rpc_receiver.recv() => {
256                    self.real.process_rpc_request(rpc);
257                    break self.real.event_receiver().try_next().unwrap();
258                }
259                res = event_receiver.wait_for_events() => {
260                    res.ok()?;
261                    let event = self.real.event_receiver().try_next().unwrap();
262                    // Drop non-deterministic events during replay. We
263                    // have those recorded as `ScenarioStep::NonDeterministicEvent`.
264                    if self.is_replay && NonDeterministicEvent::should_drop_event(&event) {
265                        eprintln!("dropping non-deterministic event: {event:?}");
266                        continue;
267                    }
268                    break event;
269                }
270            }
271        };
272        let id = self.pending_events.add(event);
273        Some((id, self.pending_events.get(id).unwrap()))
274    }
275
276    pub fn get_pending_event(&self, id: PendingEventId) -> Option<&Event> {
277        self.pending_events.get(id)
278    }
279
280    pub fn take_pending_event(&mut self, id: PendingEventId) -> Option<Event> {
281        self.pending_events.remove(id)
282    }
283
284    pub fn ledger(&self, ledger_hash: &LedgerHash) -> Option<Mask> {
285        self.real
286            .ledger_manager()
287            .get_mask(ledger_hash)
288            .map(|(mask, _)| mask)
289    }
290}
291
292impl redux::Service for NodeTestingService {}
293
294impl node::Service for NodeTestingService {
295    fn queues(&mut self) -> node::service::Queues {
296        self.real.queues()
297    }
298
299    fn stats(&mut self) -> Option<&mut Stats> {
300        self.real.stats()
301    }
302
303    fn recorder(&mut self) -> &mut Recorder {
304        self.real.recorder()
305    }
306
307    fn is_replay(&self) -> bool {
308        self.is_replay
309    }
310}
311
312impl P2pCryptoService for NodeTestingService {
313    fn generate_random_nonce(&mut self) -> [u8; 24] {
314        self.real.generate_random_nonce()
315    }
316
317    fn ephemeral_sk(&mut self) -> [u8; 32] {
318        self.real.ephemeral_sk()
319    }
320
321    fn static_sk(&mut self) -> [u8; 32] {
322        self.real.static_sk()
323    }
324
325    fn sign_key(&mut self, key: &[u8; 32]) -> Vec<u8> {
326        self.real.sign_key(key)
327    }
328
329    fn sign_publication(&mut self, publication: &[u8]) -> Vec<u8> {
330        self.real.sign_publication(publication)
331    }
332
333    fn verify_publication(
334        &mut self,
335        pk: &libp2p_identity::PublicKey,
336        publication: &[u8],
337        sig: &[u8],
338    ) -> bool {
339        self.real.verify_publication(pk, publication, sig)
340    }
341}
342
343impl node::ledger::LedgerService for NodeTestingService {
344    fn ledger_manager(&self) -> &node::ledger::LedgerManager {
345        self.real.ledger_manager()
346    }
347}
348
349impl redux::TimeService for NodeTestingService {
350    fn monotonic_time(&mut self) -> Instant {
351        self.monotonic_time
352    }
353}
354
355impl node::event_source::EventSourceService for NodeTestingService {
356    fn next_event(&mut self) -> Option<Event> {
357        None
358    }
359}
360
361impl TransitionFrontierGenesisService for NodeTestingService {
362    fn load_genesis(&mut self, config: Arc<GenesisConfig>) {
363        TransitionFrontierGenesisService::load_genesis(&mut self.real, config);
364    }
365}
366
367impl P2pServiceWebrtc for NodeTestingService {
368    type Event = Event;
369
370    fn random_pick(
371        &mut self,
372        list: &[P2pConnectionOutgoingInitOpts],
373    ) -> Option<P2pConnectionOutgoingInitOpts> {
374        self.real.random_pick(list)
375    }
376
377    fn event_sender(&self) -> &mpsc::UnboundedSender<Event> {
378        P2pServiceWebrtc::event_sender(&self.real)
379    }
380
381    fn cmd_sender(&self) -> &mpsc::TrackedUnboundedSender<Cmd> {
382        P2pServiceWebrtc::cmd_sender(&self.real)
383    }
384
385    fn peers(&mut self) -> &mut BTreeMap<PeerId, PeerState> {
386        P2pServiceWebrtc::peers(&mut self.real)
387    }
388
389    fn outgoing_init(&mut self, peer_id: PeerId) {
390        P2pServiceWebrtc::outgoing_init(&mut self.real, peer_id)
391    }
392
393    fn incoming_init(&mut self, peer_id: PeerId, offer: webrtc::Offer) {
394        P2pServiceWebrtc::incoming_init(&mut self.real, peer_id, offer)
395    }
396
397    fn encrypt<T: node::p2p::identity::EncryptableType>(
398        &mut self,
399        other_pk: &node::p2p::identity::PublicKey,
400        message: &T,
401    ) -> Result<T::Encrypted, Box<dyn std::error::Error>> {
402        self.real.encrypt(other_pk, message)
403    }
404
405    fn decrypt<T: node::p2p::identity::EncryptableType>(
406        &mut self,
407        other_pub_key: &node::p2p::identity::PublicKey,
408        encrypted: &T::Encrypted,
409    ) -> Result<T, Box<dyn std::error::Error>> {
410        self.real.decrypt(other_pub_key, encrypted)
411    }
412
413    fn auth_encrypt_and_send(
414        &mut self,
415        peer_id: PeerId,
416        other_pub_key: &node::p2p::identity::PublicKey,
417        auth: webrtc::ConnectionAuth,
418    ) {
419        self.real
420            .auth_encrypt_and_send(peer_id, other_pub_key, auth)
421    }
422
423    fn auth_decrypt(
424        &mut self,
425        other_pub_key: &node::p2p::identity::PublicKey,
426        auth: webrtc::ConnectionAuthEncrypted,
427    ) -> Option<webrtc::ConnectionAuth> {
428        self.real.auth_decrypt(other_pub_key, auth)
429    }
430}
431
432impl P2pServiceWebrtcWithLibp2p for NodeTestingService {
433    #[cfg(feature = "p2p-libp2p")]
434    fn mio(&mut self) -> &mut node::p2p::service_impl::mio::MioService {
435        self.real.mio()
436    }
437
438    fn connections(&self) -> std::collections::BTreeSet<PeerId> {
439        self.real.connections()
440    }
441}
442
443impl SnarkBlockVerifyService for NodeTestingService {
444    fn verify_init(
445        &mut self,
446        req_id: SnarkBlockVerifyId,
447        verifier_index: BlockVerifier,
448        verifier_srs: Arc<VerifierSRS>,
449        block: VerifiableBlockWithHash,
450    ) {
451        match self.proof_kind() {
452            ProofKind::Dummy | ProofKind::ConstraintsChecked => {
453                let _ = self
454                    .real
455                    .event_sender()
456                    .send(SnarkEvent::BlockVerify(req_id, Ok(())).into());
457            }
458            ProofKind::Full => SnarkBlockVerifyService::verify_init(
459                &mut self.real,
460                req_id,
461                verifier_index,
462                verifier_srs,
463                block,
464            ),
465        }
466    }
467}
468
469impl SnarkUserCommandVerifyService for NodeTestingService {
470    fn verify_init(
471        &mut self,
472        req_id: SnarkUserCommandVerifyId,
473        commands: Vec<WithStatus<verifiable::UserCommand>>,
474    ) {
475        SnarkUserCommandVerifyService::verify_init(&mut self.real, req_id, commands)
476    }
477}
478
479impl SnarkWorkVerifyService for NodeTestingService {
480    fn verify_init(
481        &mut self,
482        req_id: SnarkWorkVerifyId,
483        verifier_index: TransactionVerifier,
484        verifier_srs: Arc<VerifierSRS>,
485        work: Vec<Snark>,
486    ) {
487        match self.proof_kind() {
488            ProofKind::Dummy | ProofKind::ConstraintsChecked => {
489                let _ = self
490                    .real
491                    .event_sender()
492                    .send(SnarkEvent::WorkVerify(req_id, Ok(())).into());
493            }
494            ProofKind::Full => SnarkWorkVerifyService::verify_init(
495                &mut self.real,
496                req_id,
497                verifier_index,
498                verifier_srs,
499                work,
500            ),
501        }
502    }
503}
504
505impl SnarkPoolService for NodeTestingService {
506    fn random_choose<'a>(
507        &mut self,
508        iter: impl Iterator<Item = &'a SnarkJobId>,
509        n: usize,
510    ) -> Vec<SnarkJobId> {
511        self.real.random_choose(iter, n)
512    }
513}
514
515impl BlockProducerVrfEvaluatorService for NodeTestingService {
516    fn evaluate(&mut self, data: VrfEvaluatorInput) {
517        BlockProducerVrfEvaluatorService::evaluate(&mut self.real, data)
518    }
519}
520
521impl ArchiveService for NodeTestingService {
522    fn send_to_archive(&mut self, data: BlockApplyResult) {
523        self.real.send_to_archive(data);
524    }
525}
526
527use std::cell::RefCell;
528thread_local! {
529    static GENESIS_PROOF: RefCell<Option<(StateHash, Arc<MinaBaseProofStableV2>)>> = const { RefCell::new(None)};
530}
531
532impl BlockProducerService for NodeTestingService {
533    fn provers(&self) -> ledger::proofs::provers::BlockProver {
534        self.real.provers()
535    }
536
537    fn prove(
538        &mut self,
539        block_hash: StateHash,
540        mut input: Box<ProverExtendBlockchainInputStableV2>,
541    ) {
542        fn dummy_proof_event(block_hash: StateHash) -> Event {
543            let dummy_proof = (*ledger::dummy::dummy_blockchain_proof()).clone();
544            BlockProducerEvent::BlockProve(block_hash, Ok(dummy_proof.into())).into()
545        }
546        let keypair = self.real.block_producer().unwrap().keypair();
547
548        match self.proof_kind() {
549            ProofKind::Dummy => {
550                let _ = self.real.event_sender().send(dummy_proof_event(block_hash));
551            }
552            ProofKind::ConstraintsChecked => {
553                match mina_node_native::block_producer::prove(
554                    self.provers(),
555                    &mut input,
556                    &keypair,
557                    true,
558                ) {
559                    Err(e)
560                        if matches!(
561                            e.downcast_ref::<ProofError>(),
562                            Some(ProofError::ConstraintsOk)
563                        ) =>
564                    {
565                        let _ = self.real.event_sender().send(dummy_proof_event(block_hash));
566                    }
567                    Err(err) => panic!("unexpected block proof generation error: {err:?}"),
568                    Ok(_) => unreachable!(),
569                }
570            }
571            ProofKind::Full => {
572                // TODO(binier): handle if block is genesis based on fork constants.
573                let is_genesis = input
574                    .next_state
575                    .body
576                    .consensus_state
577                    .blockchain_length
578                    .as_u32()
579                    == 1;
580                let res = GENESIS_PROOF.with_borrow_mut(|cached_genesis| {
581                    if let Some((_, proof)) = cached_genesis
582                        .as_ref()
583                        .filter(|(hash, _)| is_genesis && hash == &block_hash)
584                    {
585                        Ok(proof.clone())
586                    } else {
587                        mina_node_native::block_producer::prove(
588                            self.provers(),
589                            &mut input,
590                            &keypair,
591                            false,
592                        )
593                        .map_err(|err| format!("{err:?}"))
594                    }
595                });
596                if let Some(proof) = res.as_ref().ok().filter(|_| is_genesis) {
597                    GENESIS_PROOF
598                        .with_borrow_mut(|data| *data = Some((block_hash.clone(), proof.clone())));
599                }
600                let _ = self
601                    .real
602                    .event_sender()
603                    .send(BlockProducerEvent::BlockProve(block_hash, res).into());
604            }
605        }
606    }
607
608    fn with_producer_keypair<T>(
609        &self,
610        _f: impl FnOnce(&node::account::AccountSecretKey) -> T,
611    ) -> Option<T> {
612        None
613    }
614}
615
616impl ExternalSnarkWorkerService for NodeTestingService {
617    fn start(
618        &mut self,
619        public_key: NonZeroCurvePoint,
620        fee: CurrencyFeeStableV1,
621        _: TransactionVerifier,
622    ) -> Result<(), node::external_snark_worker::ExternalSnarkWorkerError> {
623        let pub_key = AccountPublicKey::from(public_key);
624        let sok_message = SokMessage::create(
625            (&fee).into(),
626            pub_key.try_into().map_err(|e| {
627                node::external_snark_worker::ExternalSnarkWorkerError::Error(format!("{:?}", e))
628            })?,
629        );
630        self.set_snarker_sok_digest((&sok_message.digest()).into());
631        let _ = self
632            .real
633            .event_sender()
634            .send(ExternalSnarkWorkerEvent::Started.into());
635        Ok(())
636        // self.real.start(path, public_key, fee)
637    }
638
639    fn submit(
640        &mut self,
641        spec: SnarkWorkSpec,
642    ) -> Result<(), node::external_snark_worker::ExternalSnarkWorkerError> {
643        let sok_digest = self.snarker_sok_digest.clone().unwrap();
644        let make_dummy_proof = |spec| {
645            let statement = match spec {
646                SnarkWorkerWorkerRpcsVersionedGetWorkV2TResponseA0Single::Transition(v, _) => v.0,
647                SnarkWorkerWorkerRpcsVersionedGetWorkV2TResponseA0Single::Merge(v) => v.0 .0,
648            };
649
650            LedgerProofProdStableV2(TransactionSnarkStableV2 {
651                statement: MinaStateSnarkedLedgerStateWithSokStableV2 {
652                    source: statement.source,
653                    target: statement.target,
654                    connecting_ledger_left: statement.connecting_ledger_left,
655                    connecting_ledger_right: statement.connecting_ledger_right,
656                    supply_increase: statement.supply_increase,
657                    fee_excess: statement.fee_excess,
658                    sok_digest: sok_digest.clone(),
659                },
660                proof: (*dummy_transaction_proof()).clone(),
661            })
662        };
663        let res = match spec {
664            SnarkWorkSpec::One(v) => TransactionSnarkWorkTStableV2Proofs::One(make_dummy_proof(v)),
665            SnarkWorkSpec::Two((v1, v2)) => TransactionSnarkWorkTStableV2Proofs::Two((
666                make_dummy_proof(v1),
667                make_dummy_proof(v2),
668            )),
669        };
670        let _ = self
671            .real
672            .event_sender()
673            .send(ExternalSnarkWorkerEvent::WorkResult(Arc::new(res)).into());
674        Ok(())
675        // self.real.submit(spec)
676    }
677
678    fn cancel(&mut self) -> Result<(), node::external_snark_worker::ExternalSnarkWorkerError> {
679        let _ = self
680            .real
681            .event_sender()
682            .send(ExternalSnarkWorkerEvent::WorkCancelled.into());
683        Ok(())
684        // self.real.cancel()
685    }
686
687    fn kill(&mut self) -> Result<(), node::external_snark_worker::ExternalSnarkWorkerError> {
688        let _ = self
689            .real
690            .event_sender()
691            .send(ExternalSnarkWorkerEvent::Killed.into());
692        Ok(())
693        // self.real.kill()
694    }
695}
696
697impl node::core::invariants::InvariantService for NodeTestingService {
698    type ClusterInvariantsState<'a> = std::sync::MutexGuard<'a, InvariantsState>;
699
700    fn node_id(&self) -> usize {
701        self.node_id().index()
702    }
703
704    fn invariants_state(&mut self) -> &mut InvariantsState {
705        node::core::invariants::InvariantService::invariants_state(&mut self.real)
706    }
707
708    fn cluster_invariants_state<'a>(&'a mut self) -> Option<Self::ClusterInvariantsState<'a>>
709    where
710        Self: 'a,
711    {
712        Some(
713            self.cluster_invariants_state.try_lock().expect(
714                "locking should never fail, since we are running all nodes in the same thread",
715            ),
716        )
717    }
718}