mina_node_testing/service/
mod.rs

1mod rpc_service;
2
3use std::{
4    collections::{BTreeMap, VecDeque},
5    sync::{Arc, Mutex as StdMutex},
6    time::Duration,
7};
8
9use ledger::{
10    dummy::dummy_transaction_proof,
11    proofs::transaction::ProofError,
12    scan_state::{
13        scan_state::transaction_snark::SokMessage,
14        transaction_logic::{verifiable, WithStatus},
15    },
16    Mask,
17};
18use mina_core::channels::Aborter;
19use mina_node_native::NodeService;
20use mina_p2p_messages::{
21    string::ByteString,
22    v2::{
23        CurrencyFeeStableV1, LedgerHash, LedgerProofProdStableV2, MinaBaseProofStableV2,
24        MinaStateSnarkedLedgerStateWithSokStableV2, NonZeroCurvePoint,
25        ProverExtendBlockchainInputStableV2,
26        SnarkWorkerWorkerRpcsVersionedGetWorkV2TResponseA0Single, StateHash,
27        TransactionSnarkStableV2, TransactionSnarkWorkTStableV2Proofs,
28    },
29};
30use node::{
31    account::AccountPublicKey,
32    block_producer::{vrf_evaluator::VrfEvaluatorInput, BlockProducerEvent},
33    core::{
34        channels::mpsc,
35        invariants::InvariantsState,
36        snark::{Snark, SnarkJobId},
37    },
38    event_source::Event,
39    external_snark_worker::SnarkWorkSpec,
40    external_snark_worker_effectful::{ExternalSnarkWorkerEvent, ExternalSnarkWorkerService},
41    ledger::write::BlockApplyResult,
42    p2p::{
43        connection::outgoing::P2pConnectionOutgoingInitOpts,
44        service_impl::{
45            webrtc::{Cmd, P2pServiceWebrtc, PeerState},
46            webrtc_with_libp2p::P2pServiceWebrtcWithLibp2p,
47        },
48        webrtc, P2pCryptoService, PeerId,
49    },
50    recorder::Recorder,
51    service::{
52        BlockProducerService, BlockProducerVrfEvaluatorService, TransitionFrontierGenesisService,
53    },
54    snark::{
55        block_verify::{SnarkBlockVerifyId, SnarkBlockVerifyService, VerifiableBlockWithHash},
56        user_command_verify::SnarkUserCommandVerifyId,
57        user_command_verify_effectful::SnarkUserCommandVerifyService,
58        work_verify::{SnarkWorkVerifyId, SnarkWorkVerifyService},
59        BlockVerifier, SnarkEvent, TransactionVerifier, VerifierSRS,
60    },
61    snark_pool::SnarkPoolService,
62    stats::Stats,
63    transition_frontier::{archive::archive_service::ArchiveService, genesis::GenesisConfig},
64    ActionWithMeta, State,
65};
66use redux::Instant;
67
68use crate::{
69    cluster::{ClusterNodeId, ProofKind},
70    node::NonDeterministicEvent,
71};
72
73pub type DynEffects = Box<dyn FnMut(&State, &NodeTestingService, &ActionWithMeta) + Send>;
74
75#[derive(Hash, Ord, PartialOrd, Eq, PartialEq)]
76pub struct PendingEventIdType;
77impl mina_core::requests::RequestIdType for PendingEventIdType {
78    fn request_id_type() -> &'static str {
79        "PendingEventId"
80    }
81}
82
83#[derive(Clone, Copy, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize, Default)]
84pub struct PendingEventId(usize);
85
86struct PendingEvents {
87    events: VecDeque<(PendingEventId, Event)>,
88    next_id: PendingEventId,
89}
90
91impl PendingEventId {
92    fn copy_inc(&mut self) -> Self {
93        let copy = *self;
94        let _ = self.0.wrapping_add(1);
95        copy
96    }
97}
98
99impl PendingEvents {
100    fn new() -> Self {
101        PendingEvents {
102            events: VecDeque::new(),
103            next_id: Default::default(),
104        }
105    }
106
107    fn add(&mut self, event: Event) -> PendingEventId {
108        let id = self.next_id.copy_inc();
109        self.events.push_back((id, event));
110        id
111    }
112
113    fn get(&self, id: PendingEventId) -> Option<&Event> {
114        self.events
115            .iter()
116            .find_map(|(_id, event)| (*_id == id).then_some(event))
117    }
118
119    fn remove(&mut self, id: PendingEventId) -> Option<Event> {
120        if let Some(i) = self
121            .events
122            .iter()
123            .enumerate()
124            .find_map(|(i, (_id, _))| (*_id == id).then_some(i))
125        {
126            self.events.remove(i).map(|(_, event)| event)
127        } else {
128            None
129        }
130    }
131
132    fn iter(&self) -> impl Iterator<Item = (PendingEventId, &Event)> {
133        self.events.iter().map(|(id, event)| (*id, event))
134    }
135}
136
137pub struct NodeTestingService {
138    real: NodeService,
139    id: ClusterNodeId,
140    /// Use webrtc p2p between Rust nodes.
141    rust_to_rust_use_webrtc: bool,
142    proof_kind: ProofKind,
143    /// We are replaying this node so disable some non-deterministic services.
144    is_replay: bool,
145    monotonic_time: Instant,
146    /// Events sent by the real service not yet received by state machine.
147    pending_events: PendingEvents,
148    //pending_events: PendingRequests<PendingEventIdType, Event>,
149    dyn_effects: Option<DynEffects>,
150
151    snarker_sok_digest: Option<ByteString>,
152
153    cluster_invariants_state: Arc<StdMutex<InvariantsState>>,
154    /// Once dropped, it will cause all threads associated to shutdown.
155    _shutdown: Aborter,
156}
157
158impl NodeTestingService {
159    pub fn new(
160        real: NodeService,
161        id: ClusterNodeId,
162        cluster_invariants_state: Arc<StdMutex<InvariantsState>>,
163        _shutdown: Aborter,
164    ) -> Self {
165        Self {
166            real,
167            id,
168            rust_to_rust_use_webrtc: false,
169            proof_kind: ProofKind::default(),
170            is_replay: false,
171            monotonic_time: Instant::now(),
172            pending_events: PendingEvents::new(),
173            dyn_effects: None,
174            snarker_sok_digest: None,
175            cluster_invariants_state,
176            _shutdown,
177        }
178    }
179
180    pub fn node_id(&self) -> ClusterNodeId {
181        self.id
182    }
183
184    pub fn rust_to_rust_use_webrtc(&self) -> bool {
185        self.rust_to_rust_use_webrtc
186    }
187
188    pub fn set_rust_to_rust_use_webrtc(&mut self) -> &mut Self {
189        assert!(cfg!(feature = "p2p-webrtc"));
190        self.rust_to_rust_use_webrtc = true;
191        self
192    }
193
194    pub fn proof_kind(&self) -> ProofKind {
195        self.proof_kind
196    }
197
198    pub fn set_proof_kind(&mut self, kind: ProofKind) -> &mut Self {
199        self.proof_kind = kind;
200        self
201    }
202
203    pub fn set_replay(&mut self) -> &mut Self {
204        self.is_replay = true;
205        self
206    }
207
208    pub fn advance_time(&mut self, by_nanos: u64) {
209        self.monotonic_time += Duration::from_nanos(by_nanos);
210    }
211
212    pub fn dyn_effects(&mut self, state: &State, action: &ActionWithMeta) {
213        if let Some(mut dyn_effects) = self.dyn_effects.take() {
214            (dyn_effects)(state, self, action);
215            self.dyn_effects = Some(dyn_effects);
216        }
217    }
218
219    pub fn set_dyn_effects(&mut self, effects: DynEffects) {
220        self.dyn_effects = Some(effects);
221    }
222
223    pub fn remove_dyn_effects(&mut self) -> Option<DynEffects> {
224        self.dyn_effects.take()
225    }
226
227    pub fn set_snarker_sok_digest(&mut self, digest: ByteString) {
228        self.snarker_sok_digest = Some(digest);
229    }
230
231    pub fn pending_events(&mut self, poll: bool) -> impl Iterator<Item = (PendingEventId, &Event)> {
232        while let Ok(req) = self.real.rpc_receiver().try_recv() {
233            self.real.process_rpc_request(req);
234        }
235        if poll {
236            while let Some(event) = self.real.event_receiver().try_next() {
237                // Drop non-deterministic events during replay. We
238                // have those recorded as `ScenarioStep::NonDeterministicEvent`.
239                if self.is_replay && NonDeterministicEvent::should_drop_event(&event) {
240                    eprintln!("dropping non-deterministic event: {event:?}");
241                    continue;
242                }
243                self.pending_events.add(event);
244            }
245        }
246        self.pending_events.iter()
247    }
248
249    pub async fn next_pending_event(&mut self) -> Option<(PendingEventId, &Event)> {
250        let event = loop {
251            let (event_receiver, rpc_receiver) = self.real.event_receiver_with_rpc_receiver();
252            tokio::select! {
253                Some(rpc) = rpc_receiver.recv() => {
254                    self.real.process_rpc_request(rpc);
255                    break self.real.event_receiver().try_next().unwrap();
256                }
257                res = event_receiver.wait_for_events() => {
258                    res.ok()?;
259                    let event = self.real.event_receiver().try_next().unwrap();
260                    // Drop non-deterministic events during replay. We
261                    // have those recorded as `ScenarioStep::NonDeterministicEvent`.
262                    if self.is_replay && NonDeterministicEvent::should_drop_event(&event) {
263                        eprintln!("dropping non-deterministic event: {event:?}");
264                        continue;
265                    }
266                    break event;
267                }
268            }
269        };
270        let id = self.pending_events.add(event);
271        Some((id, self.pending_events.get(id).unwrap()))
272    }
273
274    pub fn get_pending_event(&self, id: PendingEventId) -> Option<&Event> {
275        self.pending_events.get(id)
276    }
277
278    pub fn take_pending_event(&mut self, id: PendingEventId) -> Option<Event> {
279        self.pending_events.remove(id)
280    }
281
282    pub fn ledger(&self, ledger_hash: &LedgerHash) -> Option<Mask> {
283        self.real
284            .ledger_manager()
285            .get_mask(ledger_hash)
286            .map(|(mask, _)| mask)
287    }
288}
289
290impl redux::Service for NodeTestingService {}
291
292impl node::Service for NodeTestingService {
293    fn queues(&mut self) -> node::service::Queues {
294        self.real.queues()
295    }
296
297    fn stats(&mut self) -> Option<&mut Stats> {
298        self.real.stats()
299    }
300
301    fn recorder(&mut self) -> &mut Recorder {
302        self.real.recorder()
303    }
304
305    fn is_replay(&self) -> bool {
306        self.is_replay
307    }
308}
309
310impl P2pCryptoService for NodeTestingService {
311    fn generate_random_nonce(&mut self) -> [u8; 24] {
312        self.real.generate_random_nonce()
313    }
314
315    fn ephemeral_sk(&mut self) -> [u8; 32] {
316        self.real.ephemeral_sk()
317    }
318
319    fn static_sk(&mut self) -> [u8; 32] {
320        self.real.static_sk()
321    }
322
323    fn sign_key(&mut self, key: &[u8; 32]) -> Vec<u8> {
324        self.real.sign_key(key)
325    }
326
327    fn sign_publication(&mut self, publication: &[u8]) -> Vec<u8> {
328        self.real.sign_publication(publication)
329    }
330
331    fn verify_publication(
332        &mut self,
333        pk: &libp2p_identity::PublicKey,
334        publication: &[u8],
335        sig: &[u8],
336    ) -> bool {
337        self.real.verify_publication(pk, publication, sig)
338    }
339}
340
341impl node::ledger::LedgerService for NodeTestingService {
342    fn ledger_manager(&self) -> &node::ledger::LedgerManager {
343        self.real.ledger_manager()
344    }
345}
346
347impl redux::TimeService for NodeTestingService {
348    fn monotonic_time(&mut self) -> Instant {
349        self.monotonic_time
350    }
351}
352
353impl node::event_source::EventSourceService for NodeTestingService {
354    fn next_event(&mut self) -> Option<Event> {
355        None
356    }
357}
358
359impl TransitionFrontierGenesisService for NodeTestingService {
360    fn load_genesis(&mut self, config: Arc<GenesisConfig>) {
361        TransitionFrontierGenesisService::load_genesis(&mut self.real, config);
362    }
363}
364
365impl P2pServiceWebrtc for NodeTestingService {
366    type Event = Event;
367
368    fn random_pick(
369        &mut self,
370        list: &[P2pConnectionOutgoingInitOpts],
371    ) -> Option<P2pConnectionOutgoingInitOpts> {
372        self.real.random_pick(list)
373    }
374
375    fn event_sender(&self) -> &mpsc::UnboundedSender<Event> {
376        P2pServiceWebrtc::event_sender(&self.real)
377    }
378
379    fn cmd_sender(&self) -> &mpsc::TrackedUnboundedSender<Cmd> {
380        P2pServiceWebrtc::cmd_sender(&self.real)
381    }
382
383    fn peers(&mut self) -> &mut BTreeMap<PeerId, PeerState> {
384        P2pServiceWebrtc::peers(&mut self.real)
385    }
386
387    fn outgoing_init(&mut self, peer_id: PeerId) {
388        P2pServiceWebrtc::outgoing_init(&mut self.real, peer_id)
389    }
390
391    fn incoming_init(&mut self, peer_id: PeerId, offer: webrtc::Offer) {
392        P2pServiceWebrtc::incoming_init(&mut self.real, peer_id, offer)
393    }
394
395    fn encrypt<T: node::p2p::identity::EncryptableType>(
396        &mut self,
397        other_pk: &node::p2p::identity::PublicKey,
398        message: &T,
399    ) -> Result<T::Encrypted, Box<dyn std::error::Error>> {
400        self.real.encrypt(other_pk, message)
401    }
402
403    fn decrypt<T: node::p2p::identity::EncryptableType>(
404        &mut self,
405        other_pub_key: &node::p2p::identity::PublicKey,
406        encrypted: &T::Encrypted,
407    ) -> Result<T, Box<dyn std::error::Error>> {
408        self.real.decrypt(other_pub_key, encrypted)
409    }
410
411    fn auth_encrypt_and_send(
412        &mut self,
413        peer_id: PeerId,
414        other_pub_key: &node::p2p::identity::PublicKey,
415        auth: webrtc::ConnectionAuth,
416    ) {
417        self.real
418            .auth_encrypt_and_send(peer_id, other_pub_key, auth)
419    }
420
421    fn auth_decrypt(
422        &mut self,
423        other_pub_key: &node::p2p::identity::PublicKey,
424        auth: webrtc::ConnectionAuthEncrypted,
425    ) -> Option<webrtc::ConnectionAuth> {
426        self.real.auth_decrypt(other_pub_key, auth)
427    }
428}
429
430impl P2pServiceWebrtcWithLibp2p for NodeTestingService {
431    #[cfg(feature = "p2p-libp2p")]
432    fn mio(&mut self) -> &mut node::p2p::service_impl::mio::MioService {
433        self.real.mio()
434    }
435
436    fn connections(&self) -> std::collections::BTreeSet<PeerId> {
437        self.real.connections()
438    }
439}
440
441impl SnarkBlockVerifyService for NodeTestingService {
442    fn verify_init(
443        &mut self,
444        req_id: SnarkBlockVerifyId,
445        verifier_index: BlockVerifier,
446        verifier_srs: Arc<VerifierSRS>,
447        block: VerifiableBlockWithHash,
448    ) {
449        match self.proof_kind() {
450            ProofKind::Dummy | ProofKind::ConstraintsChecked => {
451                let _ = self
452                    .real
453                    .event_sender()
454                    .send(SnarkEvent::BlockVerify(req_id, Ok(())).into());
455            }
456            ProofKind::Full => SnarkBlockVerifyService::verify_init(
457                &mut self.real,
458                req_id,
459                verifier_index,
460                verifier_srs,
461                block,
462            ),
463        }
464    }
465}
466
467impl SnarkUserCommandVerifyService for NodeTestingService {
468    fn verify_init(
469        &mut self,
470        req_id: SnarkUserCommandVerifyId,
471        commands: Vec<WithStatus<verifiable::UserCommand>>,
472    ) {
473        SnarkUserCommandVerifyService::verify_init(&mut self.real, req_id, commands)
474    }
475}
476
477impl SnarkWorkVerifyService for NodeTestingService {
478    fn verify_init(
479        &mut self,
480        req_id: SnarkWorkVerifyId,
481        verifier_index: TransactionVerifier,
482        verifier_srs: Arc<VerifierSRS>,
483        work: Vec<Snark>,
484    ) {
485        match self.proof_kind() {
486            ProofKind::Dummy | ProofKind::ConstraintsChecked => {
487                let _ = self
488                    .real
489                    .event_sender()
490                    .send(SnarkEvent::WorkVerify(req_id, Ok(())).into());
491            }
492            ProofKind::Full => SnarkWorkVerifyService::verify_init(
493                &mut self.real,
494                req_id,
495                verifier_index,
496                verifier_srs,
497                work,
498            ),
499        }
500    }
501}
502
503impl SnarkPoolService for NodeTestingService {
504    fn random_choose<'a>(
505        &mut self,
506        iter: impl Iterator<Item = &'a SnarkJobId>,
507        n: usize,
508    ) -> Vec<SnarkJobId> {
509        self.real.random_choose(iter, n)
510    }
511}
512
513impl BlockProducerVrfEvaluatorService for NodeTestingService {
514    fn evaluate(&mut self, data: VrfEvaluatorInput) {
515        BlockProducerVrfEvaluatorService::evaluate(&mut self.real, data)
516    }
517}
518
519impl ArchiveService for NodeTestingService {
520    fn send_to_archive(&mut self, data: BlockApplyResult) {
521        self.real.send_to_archive(data);
522    }
523}
524
525use std::cell::RefCell;
526thread_local! {
527    static GENESIS_PROOF: RefCell<Option<(StateHash, Arc<MinaBaseProofStableV2>)>> = const { RefCell::new(None)};
528}
529
530impl BlockProducerService for NodeTestingService {
531    fn provers(&self) -> ledger::proofs::provers::BlockProver {
532        self.real.provers()
533    }
534
535    fn prove(
536        &mut self,
537        block_hash: StateHash,
538        mut input: Box<ProverExtendBlockchainInputStableV2>,
539    ) {
540        fn dummy_proof_event(block_hash: StateHash) -> Event {
541            let dummy_proof = (*ledger::dummy::dummy_blockchain_proof()).clone();
542            BlockProducerEvent::BlockProve(block_hash, Ok(dummy_proof.into())).into()
543        }
544        let keypair = self.real.block_producer().unwrap().keypair();
545
546        match self.proof_kind() {
547            ProofKind::Dummy => {
548                let _ = self.real.event_sender().send(dummy_proof_event(block_hash));
549            }
550            ProofKind::ConstraintsChecked => {
551                match mina_node_native::block_producer::prove(
552                    self.provers(),
553                    &mut input,
554                    &keypair,
555                    true,
556                ) {
557                    Err(e)
558                        if matches!(
559                            e.downcast_ref::<ProofError>(),
560                            Some(ProofError::ConstraintsOk)
561                        ) =>
562                    {
563                        let _ = self.real.event_sender().send(dummy_proof_event(block_hash));
564                    }
565                    Err(err) => panic!("unexpected block proof generation error: {err:?}"),
566                    Ok(_) => unreachable!(),
567                }
568            }
569            ProofKind::Full => {
570                // TODO(binier): handle if block is genesis based on fork constants.
571                let is_genesis = input
572                    .next_state
573                    .body
574                    .consensus_state
575                    .blockchain_length
576                    .as_u32()
577                    == 1;
578                let res = GENESIS_PROOF.with_borrow_mut(|cached_genesis| {
579                    if let Some((_, proof)) = cached_genesis
580                        .as_ref()
581                        .filter(|(hash, _)| is_genesis && hash == &block_hash)
582                    {
583                        Ok(proof.clone())
584                    } else {
585                        mina_node_native::block_producer::prove(
586                            self.provers(),
587                            &mut input,
588                            &keypair,
589                            false,
590                        )
591                        .map_err(|err| format!("{err:?}"))
592                    }
593                });
594                if let Some(proof) = res.as_ref().ok().filter(|_| is_genesis) {
595                    GENESIS_PROOF
596                        .with_borrow_mut(|data| *data = Some((block_hash.clone(), proof.clone())));
597                }
598                let _ = self
599                    .real
600                    .event_sender()
601                    .send(BlockProducerEvent::BlockProve(block_hash, res).into());
602            }
603        }
604    }
605
606    fn with_producer_keypair<T>(
607        &self,
608        _f: impl FnOnce(&node::account::AccountSecretKey) -> T,
609    ) -> Option<T> {
610        None
611    }
612}
613
614impl ExternalSnarkWorkerService for NodeTestingService {
615    fn start(
616        &mut self,
617        public_key: NonZeroCurvePoint,
618        fee: CurrencyFeeStableV1,
619        _: TransactionVerifier,
620    ) -> Result<(), node::external_snark_worker::ExternalSnarkWorkerError> {
621        let pub_key = AccountPublicKey::from(public_key);
622        let sok_message = SokMessage::create(
623            (&fee).into(),
624            pub_key.try_into().map_err(|e| {
625                node::external_snark_worker::ExternalSnarkWorkerError::Error(format!("{:?}", e))
626            })?,
627        );
628        self.set_snarker_sok_digest((&sok_message.digest()).into());
629        let _ = self
630            .real
631            .event_sender()
632            .send(ExternalSnarkWorkerEvent::Started.into());
633        Ok(())
634        // self.real.start(path, public_key, fee)
635    }
636
637    fn submit(
638        &mut self,
639        spec: SnarkWorkSpec,
640    ) -> Result<(), node::external_snark_worker::ExternalSnarkWorkerError> {
641        let sok_digest = self.snarker_sok_digest.clone().unwrap();
642        let make_dummy_proof = |spec| {
643            let statement = match spec {
644                SnarkWorkerWorkerRpcsVersionedGetWorkV2TResponseA0Single::Transition(v, _) => v.0,
645                SnarkWorkerWorkerRpcsVersionedGetWorkV2TResponseA0Single::Merge(v) => v.0 .0,
646            };
647
648            LedgerProofProdStableV2(TransactionSnarkStableV2 {
649                statement: MinaStateSnarkedLedgerStateWithSokStableV2 {
650                    source: statement.source,
651                    target: statement.target,
652                    connecting_ledger_left: statement.connecting_ledger_left,
653                    connecting_ledger_right: statement.connecting_ledger_right,
654                    supply_increase: statement.supply_increase,
655                    fee_excess: statement.fee_excess,
656                    sok_digest: sok_digest.clone(),
657                },
658                proof: (*dummy_transaction_proof()).clone(),
659            })
660        };
661        let res = match spec {
662            SnarkWorkSpec::One(v) => TransactionSnarkWorkTStableV2Proofs::One(make_dummy_proof(v)),
663            SnarkWorkSpec::Two((v1, v2)) => TransactionSnarkWorkTStableV2Proofs::Two((
664                make_dummy_proof(v1),
665                make_dummy_proof(v2),
666            )),
667        };
668        let _ = self
669            .real
670            .event_sender()
671            .send(ExternalSnarkWorkerEvent::WorkResult(Arc::new(res)).into());
672        Ok(())
673        // self.real.submit(spec)
674    }
675
676    fn cancel(&mut self) -> Result<(), node::external_snark_worker::ExternalSnarkWorkerError> {
677        let _ = self
678            .real
679            .event_sender()
680            .send(ExternalSnarkWorkerEvent::WorkCancelled.into());
681        Ok(())
682        // self.real.cancel()
683    }
684
685    fn kill(&mut self) -> Result<(), node::external_snark_worker::ExternalSnarkWorkerError> {
686        let _ = self
687            .real
688            .event_sender()
689            .send(ExternalSnarkWorkerEvent::Killed.into());
690        Ok(())
691        // self.real.kill()
692    }
693}
694
695impl node::core::invariants::InvariantService for NodeTestingService {
696    type ClusterInvariantsState<'a> = std::sync::MutexGuard<'a, InvariantsState>;
697
698    fn node_id(&self) -> usize {
699        self.node_id().index()
700    }
701
702    fn invariants_state(&mut self) -> &mut InvariantsState {
703        node::core::invariants::InvariantService::invariants_state(&mut self.real)
704    }
705
706    fn cluster_invariants_state<'a>(&'a mut self) -> Option<Self::ClusterInvariantsState<'a>>
707    where
708        Self: 'a,
709    {
710        Some(
711            self.cluster_invariants_state.try_lock().expect(
712                "locking should never fail, since we are running all nodes in the same thread",
713            ),
714        )
715    }
716}