1mod rpc_service;
2
3use std::{
4 collections::{BTreeMap, VecDeque},
5 sync::{Arc, Mutex as StdMutex},
6 time::Duration,
7};
8
9use ledger::{
10 dummy::dummy_transaction_proof,
11 proofs::transaction::ProofError,
12 scan_state::{
13 scan_state::transaction_snark::SokMessage,
14 transaction_logic::{verifiable, WithStatus},
15 },
16 Mask,
17};
18use mina_core::channels::Aborter;
19use mina_node_native::NodeService;
20use mina_p2p_messages::{
21 string::ByteString,
22 v2::{
23 CurrencyFeeStableV1, LedgerHash, LedgerProofProdStableV2, MinaBaseProofStableV2,
24 MinaStateSnarkedLedgerStateWithSokStableV2, NonZeroCurvePoint,
25 ProverExtendBlockchainInputStableV2,
26 SnarkWorkerWorkerRpcsVersionedGetWorkV2TResponseA0Single, StateHash,
27 TransactionSnarkStableV2, TransactionSnarkWorkTStableV2Proofs,
28 },
29};
30use node::{
31 account::AccountPublicKey,
32 block_producer::{vrf_evaluator::VrfEvaluatorInput, BlockProducerEvent},
33 core::{
34 channels::mpsc,
35 invariants::InvariantsState,
36 snark::{Snark, SnarkJobId},
37 },
38 event_source::Event,
39 external_snark_worker::SnarkWorkSpec,
40 external_snark_worker_effectful::{ExternalSnarkWorkerEvent, ExternalSnarkWorkerService},
41 ledger::write::BlockApplyResult,
42 p2p::{
43 connection::outgoing::P2pConnectionOutgoingInitOpts,
44 service_impl::{
45 webrtc::{Cmd, P2pServiceWebrtc, PeerState},
46 webrtc_with_libp2p::P2pServiceWebrtcWithLibp2p,
47 },
48 webrtc, P2pCryptoService, PeerId,
49 },
50 recorder::Recorder,
51 service::{
52 BlockProducerService, BlockProducerVrfEvaluatorService, TransitionFrontierGenesisService,
53 },
54 snark::{
55 block_verify::{SnarkBlockVerifyId, SnarkBlockVerifyService, VerifiableBlockWithHash},
56 user_command_verify::SnarkUserCommandVerifyId,
57 user_command_verify_effectful::SnarkUserCommandVerifyService,
58 work_verify::{SnarkWorkVerifyId, SnarkWorkVerifyService},
59 BlockVerifier, SnarkEvent, TransactionVerifier, VerifierSRS,
60 },
61 snark_pool::SnarkPoolService,
62 stats::Stats,
63 transition_frontier::{archive::archive_service::ArchiveService, genesis::GenesisConfig},
64 ActionWithMeta, State,
65};
66use redux::Instant;
67
68use crate::{
69 cluster::{ClusterNodeId, ProofKind},
70 node::NonDeterministicEvent,
71};
72
73pub type DynEffects = Box<dyn FnMut(&State, &NodeTestingService, &ActionWithMeta) + Send>;
74
75#[derive(Hash, Ord, PartialOrd, Eq, PartialEq)]
76pub struct PendingEventIdType;
77impl mina_core::requests::RequestIdType for PendingEventIdType {
78 fn request_id_type() -> &'static str {
79 "PendingEventId"
80 }
81}
82
83#[derive(Clone, Copy, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize, Default)]
84pub struct PendingEventId(usize);
85
86struct PendingEvents {
87 events: VecDeque<(PendingEventId, Event)>,
88 next_id: PendingEventId,
89}
90
91impl PendingEventId {
92 fn copy_inc(&mut self) -> Self {
93 let copy = *self;
94 let _ = self.0.wrapping_add(1);
95 copy
96 }
97}
98
99impl PendingEvents {
100 fn new() -> Self {
101 PendingEvents {
102 events: VecDeque::new(),
103 next_id: Default::default(),
104 }
105 }
106
107 fn add(&mut self, event: Event) -> PendingEventId {
108 let id = self.next_id.copy_inc();
109 self.events.push_back((id, event));
110 id
111 }
112
113 fn get(&self, id: PendingEventId) -> Option<&Event> {
114 self.events
115 .iter()
116 .find_map(|(_id, event)| (*_id == id).then_some(event))
117 }
118
119 fn remove(&mut self, id: PendingEventId) -> Option<Event> {
120 if let Some(i) = self
121 .events
122 .iter()
123 .enumerate()
124 .find_map(|(i, (_id, _))| (*_id == id).then_some(i))
125 {
126 self.events.remove(i).map(|(_, event)| event)
127 } else {
128 None
129 }
130 }
131
132 fn iter(&self) -> impl Iterator<Item = (PendingEventId, &Event)> {
133 self.events.iter().map(|(id, event)| (*id, event))
134 }
135}
136
137pub struct NodeTestingService {
138 real: NodeService,
139 id: ClusterNodeId,
140 rust_to_rust_use_webrtc: bool,
142 proof_kind: ProofKind,
143 is_replay: bool,
145 monotonic_time: Instant,
146 pending_events: PendingEvents,
148 dyn_effects: Option<DynEffects>,
150
151 snarker_sok_digest: Option<ByteString>,
152
153 cluster_invariants_state: Arc<StdMutex<InvariantsState>>,
154 _shutdown: Aborter,
156}
157
158impl NodeTestingService {
159 pub fn new(
160 real: NodeService,
161 id: ClusterNodeId,
162 cluster_invariants_state: Arc<StdMutex<InvariantsState>>,
163 _shutdown: Aborter,
164 ) -> Self {
165 Self {
166 real,
167 id,
168 rust_to_rust_use_webrtc: false,
169 proof_kind: ProofKind::default(),
170 is_replay: false,
171 monotonic_time: Instant::now(),
172 pending_events: PendingEvents::new(),
173 dyn_effects: None,
174 snarker_sok_digest: None,
175 cluster_invariants_state,
176 _shutdown,
177 }
178 }
179
180 pub fn node_id(&self) -> ClusterNodeId {
181 self.id
182 }
183
184 pub fn rust_to_rust_use_webrtc(&self) -> bool {
185 self.rust_to_rust_use_webrtc
186 }
187
188 pub fn set_rust_to_rust_use_webrtc(&mut self) -> &mut Self {
189 assert!(cfg!(feature = "p2p-webrtc"));
190 self.rust_to_rust_use_webrtc = true;
191 self
192 }
193
194 pub fn proof_kind(&self) -> ProofKind {
195 self.proof_kind
196 }
197
198 pub fn set_proof_kind(&mut self, kind: ProofKind) -> &mut Self {
199 self.proof_kind = kind;
200 self
201 }
202
203 pub fn set_replay(&mut self) -> &mut Self {
204 self.is_replay = true;
205 self
206 }
207
208 pub fn advance_time(&mut self, by_nanos: u64) {
209 self.monotonic_time += Duration::from_nanos(by_nanos);
210 }
211
212 pub fn dyn_effects(&mut self, state: &State, action: &ActionWithMeta) {
213 if let Some(mut dyn_effects) = self.dyn_effects.take() {
214 (dyn_effects)(state, self, action);
215 self.dyn_effects = Some(dyn_effects);
216 }
217 }
218
219 pub fn set_dyn_effects(&mut self, effects: DynEffects) {
220 self.dyn_effects = Some(effects);
221 }
222
223 pub fn remove_dyn_effects(&mut self) -> Option<DynEffects> {
224 self.dyn_effects.take()
225 }
226
227 pub fn set_snarker_sok_digest(&mut self, digest: ByteString) {
228 self.snarker_sok_digest = Some(digest);
229 }
230
231 pub fn pending_events(&mut self, poll: bool) -> impl Iterator<Item = (PendingEventId, &Event)> {
232 while let Ok(req) = self.real.rpc_receiver().try_recv() {
233 self.real.process_rpc_request(req);
234 }
235 if poll {
236 while let Some(event) = self.real.event_receiver().try_next() {
237 if self.is_replay && NonDeterministicEvent::should_drop_event(&event) {
240 eprintln!("dropping non-deterministic event: {event:?}");
241 continue;
242 }
243 self.pending_events.add(event);
244 }
245 }
246 self.pending_events.iter()
247 }
248
249 pub async fn next_pending_event(&mut self) -> Option<(PendingEventId, &Event)> {
250 let event = loop {
251 let (event_receiver, rpc_receiver) = self.real.event_receiver_with_rpc_receiver();
252 tokio::select! {
253 Some(rpc) = rpc_receiver.recv() => {
254 self.real.process_rpc_request(rpc);
255 break self.real.event_receiver().try_next().unwrap();
256 }
257 res = event_receiver.wait_for_events() => {
258 res.ok()?;
259 let event = self.real.event_receiver().try_next().unwrap();
260 if self.is_replay && NonDeterministicEvent::should_drop_event(&event) {
263 eprintln!("dropping non-deterministic event: {event:?}");
264 continue;
265 }
266 break event;
267 }
268 }
269 };
270 let id = self.pending_events.add(event);
271 Some((id, self.pending_events.get(id).unwrap()))
272 }
273
274 pub fn get_pending_event(&self, id: PendingEventId) -> Option<&Event> {
275 self.pending_events.get(id)
276 }
277
278 pub fn take_pending_event(&mut self, id: PendingEventId) -> Option<Event> {
279 self.pending_events.remove(id)
280 }
281
282 pub fn ledger(&self, ledger_hash: &LedgerHash) -> Option<Mask> {
283 self.real
284 .ledger_manager()
285 .get_mask(ledger_hash)
286 .map(|(mask, _)| mask)
287 }
288}
289
290impl redux::Service for NodeTestingService {}
291
292impl node::Service for NodeTestingService {
293 fn queues(&mut self) -> node::service::Queues {
294 self.real.queues()
295 }
296
297 fn stats(&mut self) -> Option<&mut Stats> {
298 self.real.stats()
299 }
300
301 fn recorder(&mut self) -> &mut Recorder {
302 self.real.recorder()
303 }
304
305 fn is_replay(&self) -> bool {
306 self.is_replay
307 }
308}
309
310impl P2pCryptoService for NodeTestingService {
311 fn generate_random_nonce(&mut self) -> [u8; 24] {
312 self.real.generate_random_nonce()
313 }
314
315 fn ephemeral_sk(&mut self) -> [u8; 32] {
316 self.real.ephemeral_sk()
317 }
318
319 fn static_sk(&mut self) -> [u8; 32] {
320 self.real.static_sk()
321 }
322
323 fn sign_key(&mut self, key: &[u8; 32]) -> Vec<u8> {
324 self.real.sign_key(key)
325 }
326
327 fn sign_publication(&mut self, publication: &[u8]) -> Vec<u8> {
328 self.real.sign_publication(publication)
329 }
330
331 fn verify_publication(
332 &mut self,
333 pk: &libp2p_identity::PublicKey,
334 publication: &[u8],
335 sig: &[u8],
336 ) -> bool {
337 self.real.verify_publication(pk, publication, sig)
338 }
339}
340
341impl node::ledger::LedgerService for NodeTestingService {
342 fn ledger_manager(&self) -> &node::ledger::LedgerManager {
343 self.real.ledger_manager()
344 }
345}
346
347impl redux::TimeService for NodeTestingService {
348 fn monotonic_time(&mut self) -> Instant {
349 self.monotonic_time
350 }
351}
352
353impl node::event_source::EventSourceService for NodeTestingService {
354 fn next_event(&mut self) -> Option<Event> {
355 None
356 }
357}
358
359impl TransitionFrontierGenesisService for NodeTestingService {
360 fn load_genesis(&mut self, config: Arc<GenesisConfig>) {
361 TransitionFrontierGenesisService::load_genesis(&mut self.real, config);
362 }
363}
364
365impl P2pServiceWebrtc for NodeTestingService {
366 type Event = Event;
367
368 fn random_pick(
369 &mut self,
370 list: &[P2pConnectionOutgoingInitOpts],
371 ) -> Option<P2pConnectionOutgoingInitOpts> {
372 self.real.random_pick(list)
373 }
374
375 fn event_sender(&self) -> &mpsc::UnboundedSender<Event> {
376 P2pServiceWebrtc::event_sender(&self.real)
377 }
378
379 fn cmd_sender(&self) -> &mpsc::TrackedUnboundedSender<Cmd> {
380 P2pServiceWebrtc::cmd_sender(&self.real)
381 }
382
383 fn peers(&mut self) -> &mut BTreeMap<PeerId, PeerState> {
384 P2pServiceWebrtc::peers(&mut self.real)
385 }
386
387 fn outgoing_init(&mut self, peer_id: PeerId) {
388 P2pServiceWebrtc::outgoing_init(&mut self.real, peer_id)
389 }
390
391 fn incoming_init(&mut self, peer_id: PeerId, offer: webrtc::Offer) {
392 P2pServiceWebrtc::incoming_init(&mut self.real, peer_id, offer)
393 }
394
395 fn encrypt<T: node::p2p::identity::EncryptableType>(
396 &mut self,
397 other_pk: &node::p2p::identity::PublicKey,
398 message: &T,
399 ) -> Result<T::Encrypted, Box<dyn std::error::Error>> {
400 self.real.encrypt(other_pk, message)
401 }
402
403 fn decrypt<T: node::p2p::identity::EncryptableType>(
404 &mut self,
405 other_pub_key: &node::p2p::identity::PublicKey,
406 encrypted: &T::Encrypted,
407 ) -> Result<T, Box<dyn std::error::Error>> {
408 self.real.decrypt(other_pub_key, encrypted)
409 }
410
411 fn auth_encrypt_and_send(
412 &mut self,
413 peer_id: PeerId,
414 other_pub_key: &node::p2p::identity::PublicKey,
415 auth: webrtc::ConnectionAuth,
416 ) {
417 self.real
418 .auth_encrypt_and_send(peer_id, other_pub_key, auth)
419 }
420
421 fn auth_decrypt(
422 &mut self,
423 other_pub_key: &node::p2p::identity::PublicKey,
424 auth: webrtc::ConnectionAuthEncrypted,
425 ) -> Option<webrtc::ConnectionAuth> {
426 self.real.auth_decrypt(other_pub_key, auth)
427 }
428}
429
430impl P2pServiceWebrtcWithLibp2p for NodeTestingService {
431 #[cfg(feature = "p2p-libp2p")]
432 fn mio(&mut self) -> &mut node::p2p::service_impl::mio::MioService {
433 self.real.mio()
434 }
435
436 fn connections(&self) -> std::collections::BTreeSet<PeerId> {
437 self.real.connections()
438 }
439}
440
441impl SnarkBlockVerifyService for NodeTestingService {
442 fn verify_init(
443 &mut self,
444 req_id: SnarkBlockVerifyId,
445 verifier_index: BlockVerifier,
446 verifier_srs: Arc<VerifierSRS>,
447 block: VerifiableBlockWithHash,
448 ) {
449 match self.proof_kind() {
450 ProofKind::Dummy | ProofKind::ConstraintsChecked => {
451 let _ = self
452 .real
453 .event_sender()
454 .send(SnarkEvent::BlockVerify(req_id, Ok(())).into());
455 }
456 ProofKind::Full => SnarkBlockVerifyService::verify_init(
457 &mut self.real,
458 req_id,
459 verifier_index,
460 verifier_srs,
461 block,
462 ),
463 }
464 }
465}
466
467impl SnarkUserCommandVerifyService for NodeTestingService {
468 fn verify_init(
469 &mut self,
470 req_id: SnarkUserCommandVerifyId,
471 commands: Vec<WithStatus<verifiable::UserCommand>>,
472 ) {
473 SnarkUserCommandVerifyService::verify_init(&mut self.real, req_id, commands)
474 }
475}
476
477impl SnarkWorkVerifyService for NodeTestingService {
478 fn verify_init(
479 &mut self,
480 req_id: SnarkWorkVerifyId,
481 verifier_index: TransactionVerifier,
482 verifier_srs: Arc<VerifierSRS>,
483 work: Vec<Snark>,
484 ) {
485 match self.proof_kind() {
486 ProofKind::Dummy | ProofKind::ConstraintsChecked => {
487 let _ = self
488 .real
489 .event_sender()
490 .send(SnarkEvent::WorkVerify(req_id, Ok(())).into());
491 }
492 ProofKind::Full => SnarkWorkVerifyService::verify_init(
493 &mut self.real,
494 req_id,
495 verifier_index,
496 verifier_srs,
497 work,
498 ),
499 }
500 }
501}
502
503impl SnarkPoolService for NodeTestingService {
504 fn random_choose<'a>(
505 &mut self,
506 iter: impl Iterator<Item = &'a SnarkJobId>,
507 n: usize,
508 ) -> Vec<SnarkJobId> {
509 self.real.random_choose(iter, n)
510 }
511}
512
513impl BlockProducerVrfEvaluatorService for NodeTestingService {
514 fn evaluate(&mut self, data: VrfEvaluatorInput) {
515 BlockProducerVrfEvaluatorService::evaluate(&mut self.real, data)
516 }
517}
518
519impl ArchiveService for NodeTestingService {
520 fn send_to_archive(&mut self, data: BlockApplyResult) {
521 self.real.send_to_archive(data);
522 }
523}
524
525use std::cell::RefCell;
526thread_local! {
527 static GENESIS_PROOF: RefCell<Option<(StateHash, Arc<MinaBaseProofStableV2>)>> = const { RefCell::new(None)};
528}
529
530impl BlockProducerService for NodeTestingService {
531 fn provers(&self) -> ledger::proofs::provers::BlockProver {
532 self.real.provers()
533 }
534
535 fn prove(
536 &mut self,
537 block_hash: StateHash,
538 mut input: Box<ProverExtendBlockchainInputStableV2>,
539 ) {
540 fn dummy_proof_event(block_hash: StateHash) -> Event {
541 let dummy_proof = (*ledger::dummy::dummy_blockchain_proof()).clone();
542 BlockProducerEvent::BlockProve(block_hash, Ok(dummy_proof.into())).into()
543 }
544 let keypair = self.real.block_producer().unwrap().keypair();
545
546 match self.proof_kind() {
547 ProofKind::Dummy => {
548 let _ = self.real.event_sender().send(dummy_proof_event(block_hash));
549 }
550 ProofKind::ConstraintsChecked => {
551 match mina_node_native::block_producer::prove(
552 self.provers(),
553 &mut input,
554 &keypair,
555 true,
556 ) {
557 Err(e)
558 if matches!(
559 e.downcast_ref::<ProofError>(),
560 Some(ProofError::ConstraintsOk)
561 ) =>
562 {
563 let _ = self.real.event_sender().send(dummy_proof_event(block_hash));
564 }
565 Err(err) => panic!("unexpected block proof generation error: {err:?}"),
566 Ok(_) => unreachable!(),
567 }
568 }
569 ProofKind::Full => {
570 let is_genesis = input
572 .next_state
573 .body
574 .consensus_state
575 .blockchain_length
576 .as_u32()
577 == 1;
578 let res = GENESIS_PROOF.with_borrow_mut(|cached_genesis| {
579 if let Some((_, proof)) = cached_genesis
580 .as_ref()
581 .filter(|(hash, _)| is_genesis && hash == &block_hash)
582 {
583 Ok(proof.clone())
584 } else {
585 mina_node_native::block_producer::prove(
586 self.provers(),
587 &mut input,
588 &keypair,
589 false,
590 )
591 .map_err(|err| format!("{err:?}"))
592 }
593 });
594 if let Some(proof) = res.as_ref().ok().filter(|_| is_genesis) {
595 GENESIS_PROOF
596 .with_borrow_mut(|data| *data = Some((block_hash.clone(), proof.clone())));
597 }
598 let _ = self
599 .real
600 .event_sender()
601 .send(BlockProducerEvent::BlockProve(block_hash, res).into());
602 }
603 }
604 }
605
606 fn with_producer_keypair<T>(
607 &self,
608 _f: impl FnOnce(&node::account::AccountSecretKey) -> T,
609 ) -> Option<T> {
610 None
611 }
612}
613
614impl ExternalSnarkWorkerService for NodeTestingService {
615 fn start(
616 &mut self,
617 public_key: NonZeroCurvePoint,
618 fee: CurrencyFeeStableV1,
619 _: TransactionVerifier,
620 ) -> Result<(), node::external_snark_worker::ExternalSnarkWorkerError> {
621 let pub_key = AccountPublicKey::from(public_key);
622 let sok_message = SokMessage::create(
623 (&fee).into(),
624 pub_key.try_into().map_err(|e| {
625 node::external_snark_worker::ExternalSnarkWorkerError::Error(format!("{:?}", e))
626 })?,
627 );
628 self.set_snarker_sok_digest((&sok_message.digest()).into());
629 let _ = self
630 .real
631 .event_sender()
632 .send(ExternalSnarkWorkerEvent::Started.into());
633 Ok(())
634 }
636
637 fn submit(
638 &mut self,
639 spec: SnarkWorkSpec,
640 ) -> Result<(), node::external_snark_worker::ExternalSnarkWorkerError> {
641 let sok_digest = self.snarker_sok_digest.clone().unwrap();
642 let make_dummy_proof = |spec| {
643 let statement = match spec {
644 SnarkWorkerWorkerRpcsVersionedGetWorkV2TResponseA0Single::Transition(v, _) => v.0,
645 SnarkWorkerWorkerRpcsVersionedGetWorkV2TResponseA0Single::Merge(v) => v.0 .0,
646 };
647
648 LedgerProofProdStableV2(TransactionSnarkStableV2 {
649 statement: MinaStateSnarkedLedgerStateWithSokStableV2 {
650 source: statement.source,
651 target: statement.target,
652 connecting_ledger_left: statement.connecting_ledger_left,
653 connecting_ledger_right: statement.connecting_ledger_right,
654 supply_increase: statement.supply_increase,
655 fee_excess: statement.fee_excess,
656 sok_digest: sok_digest.clone(),
657 },
658 proof: (*dummy_transaction_proof()).clone(),
659 })
660 };
661 let res = match spec {
662 SnarkWorkSpec::One(v) => TransactionSnarkWorkTStableV2Proofs::One(make_dummy_proof(v)),
663 SnarkWorkSpec::Two((v1, v2)) => TransactionSnarkWorkTStableV2Proofs::Two((
664 make_dummy_proof(v1),
665 make_dummy_proof(v2),
666 )),
667 };
668 let _ = self
669 .real
670 .event_sender()
671 .send(ExternalSnarkWorkerEvent::WorkResult(Arc::new(res)).into());
672 Ok(())
673 }
675
676 fn cancel(&mut self) -> Result<(), node::external_snark_worker::ExternalSnarkWorkerError> {
677 let _ = self
678 .real
679 .event_sender()
680 .send(ExternalSnarkWorkerEvent::WorkCancelled.into());
681 Ok(())
682 }
684
685 fn kill(&mut self) -> Result<(), node::external_snark_worker::ExternalSnarkWorkerError> {
686 let _ = self
687 .real
688 .event_sender()
689 .send(ExternalSnarkWorkerEvent::Killed.into());
690 Ok(())
691 }
693}
694
695impl node::core::invariants::InvariantService for NodeTestingService {
696 type ClusterInvariantsState<'a> = std::sync::MutexGuard<'a, InvariantsState>;
697
698 fn node_id(&self) -> usize {
699 self.node_id().index()
700 }
701
702 fn invariants_state(&mut self) -> &mut InvariantsState {
703 node::core::invariants::InvariantService::invariants_state(&mut self.real)
704 }
705
706 fn cluster_invariants_state<'a>(&'a mut self) -> Option<Self::ClusterInvariantsState<'a>>
707 where
708 Self: 'a,
709 {
710 Some(
711 self.cluster_invariants_state.try_lock().expect(
712 "locking should never fail, since we are running all nodes in the same thread",
713 ),
714 )
715 }
716}