1mod rpc_service;
2
3use std::{
4 collections::{BTreeMap, VecDeque},
5 sync::{Arc, Mutex as StdMutex},
6 time::Duration,
7};
8
9use ledger::{
10 dummy::dummy_transaction_proof,
11 proofs::transaction::ProofError,
12 scan_state::{
13 scan_state::transaction_snark::SokMessage,
14 transaction_logic::{verifiable, WithStatus},
15 },
16 Mask,
17};
18use mina_core::channels::Aborter;
19use mina_node_native::NodeService;
20use mina_p2p_messages::{
21 string::ByteString,
22 v2::{
23 CurrencyFeeStableV1, LedgerHash, LedgerProofProdStableV2, MinaBaseProofStableV2,
24 MinaStateSnarkedLedgerStateWithSokStableV2, NonZeroCurvePoint,
25 ProverExtendBlockchainInputStableV2,
26 SnarkWorkerWorkerRpcsVersionedGetWorkV2TResponseA0Single, StateHash,
27 TransactionSnarkStableV2, TransactionSnarkWorkTStableV2Proofs,
28 },
29};
30use node::{
31 account::AccountPublicKey,
32 block_producer::{vrf_evaluator::VrfEvaluatorInput, BlockProducerEvent},
33 core::{
34 channels::mpsc,
35 invariants::InvariantsState,
36 snark::{Snark, SnarkJobId},
37 },
38 event_source::Event,
39 external_snark_worker::SnarkWorkSpec,
40 external_snark_worker_effectful::{ExternalSnarkWorkerEvent, ExternalSnarkWorkerService},
41 ledger::write::BlockApplyResult,
42 p2p::{
43 connection::outgoing::P2pConnectionOutgoingInitOpts,
44 service_impl::{
45 webrtc::{Cmd, P2pServiceWebrtc, PeerState},
46 webrtc_with_libp2p::P2pServiceWebrtcWithLibp2p,
47 },
48 webrtc, P2pCryptoService, PeerId,
49 },
50 recorder::Recorder,
51 service::{
52 BlockProducerService, BlockProducerVrfEvaluatorService, TransitionFrontierGenesisService,
53 },
54 snark::{
55 block_verify::{SnarkBlockVerifyId, SnarkBlockVerifyService, VerifiableBlockWithHash},
56 user_command_verify::SnarkUserCommandVerifyId,
57 user_command_verify_effectful::SnarkUserCommandVerifyService,
58 work_verify::{SnarkWorkVerifyId, SnarkWorkVerifyService},
59 BlockVerifier, SnarkEvent, TransactionVerifier, VerifierSRS,
60 },
61 snark_pool::SnarkPoolService,
62 stats::Stats,
63 transition_frontier::{archive::archive_service::ArchiveService, genesis::GenesisConfig},
64 ActionWithMeta, State,
65};
66use redux::Instant;
67
68use crate::{
69 cluster::{ClusterNodeId, ProofKind},
70 node::NonDeterministicEvent,
71};
72
73pub type DynEffects = Box<dyn FnMut(&State, &NodeTestingService, &ActionWithMeta) + Send>;
74
75#[derive(Hash, Ord, PartialOrd, Eq, PartialEq)]
76pub struct PendingEventIdType;
77impl mina_core::requests::RequestIdType for PendingEventIdType {
78 fn request_id_type() -> &'static str {
79 "PendingEventId"
80 }
81}
82
83#[derive(Clone, Copy, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize, Default)]
84pub struct PendingEventId(usize);
85
86struct PendingEvents {
87 events: VecDeque<(PendingEventId, Event)>,
88 next_id: PendingEventId,
89}
90
91impl PendingEventId {
92 fn copy_inc(&mut self) -> Self {
93 let copy = *self;
94 let _ = self.0.wrapping_add(1);
95 copy
96 }
97}
98
99impl PendingEvents {
100 fn new() -> Self {
101 PendingEvents {
102 events: VecDeque::new(),
103 next_id: Default::default(),
104 }
105 }
106
107 fn add(&mut self, event: Event) -> PendingEventId {
108 let id = self.next_id.copy_inc();
109 self.events.push_back((id, event));
110 id
111 }
112
113 fn get(&self, id: PendingEventId) -> Option<&Event> {
114 self.events
115 .iter()
116 .find_map(|(_id, event)| (*_id == id).then_some(event))
117 }
118
119 fn remove(&mut self, id: PendingEventId) -> Option<Event> {
120 if let Some(i) = self
121 .events
122 .iter()
123 .enumerate()
124 .find_map(|(i, (_id, _))| (*_id == id).then_some(i))
125 {
126 self.events.remove(i).map(|(_, event)| event)
127 } else {
128 None
129 }
130 }
131
132 fn iter(&self) -> impl Iterator<Item = (PendingEventId, &Event)> {
133 self.events.iter().map(|(id, event)| (*id, event))
134 }
135}
136
137pub struct NodeTestingService {
138 real: NodeService,
139 id: ClusterNodeId,
140 rust_to_rust_use_webrtc: bool,
142 proof_kind: ProofKind,
143 is_replay: bool,
145 monotonic_time: Instant,
146 pending_events: PendingEvents,
148 dyn_effects: Option<DynEffects>,
150
151 snarker_sok_digest: Option<ByteString>,
152
153 cluster_invariants_state: Arc<StdMutex<InvariantsState>>,
154 _shutdown: Aborter,
156}
157
158impl NodeTestingService {
159 pub fn new(
160 real: NodeService,
161 id: ClusterNodeId,
162 cluster_invariants_state: Arc<StdMutex<InvariantsState>>,
163 _shutdown: Aborter,
164 ) -> Self {
165 Self {
166 real,
167 id,
168 rust_to_rust_use_webrtc: false,
169 proof_kind: ProofKind::default(),
170 is_replay: false,
171 monotonic_time: Instant::now(),
172 pending_events: PendingEvents::new(),
173 dyn_effects: None,
174 snarker_sok_digest: None,
175 cluster_invariants_state,
176 _shutdown,
177 }
178 }
179
180 pub fn node_id(&self) -> ClusterNodeId {
181 self.id
182 }
183
184 pub fn rust_to_rust_use_webrtc(&self) -> bool {
185 self.rust_to_rust_use_webrtc
186 }
187
188 pub fn set_rust_to_rust_use_webrtc(&mut self) -> &mut Self {
189 if !cfg!(feature = "p2p-webrtc") {
190 unreachable!();
191 }
192 self.rust_to_rust_use_webrtc = true;
193 self
194 }
195
196 pub fn proof_kind(&self) -> ProofKind {
197 self.proof_kind
198 }
199
200 pub fn set_proof_kind(&mut self, kind: ProofKind) -> &mut Self {
201 self.proof_kind = kind;
202 self
203 }
204
205 pub fn set_replay(&mut self) -> &mut Self {
206 self.is_replay = true;
207 self
208 }
209
210 pub fn advance_time(&mut self, by_nanos: u64) {
211 self.monotonic_time += Duration::from_nanos(by_nanos);
212 }
213
214 pub fn dyn_effects(&mut self, state: &State, action: &ActionWithMeta) {
215 if let Some(mut dyn_effects) = self.dyn_effects.take() {
216 (dyn_effects)(state, self, action);
217 self.dyn_effects = Some(dyn_effects);
218 }
219 }
220
221 pub fn set_dyn_effects(&mut self, effects: DynEffects) {
222 self.dyn_effects = Some(effects);
223 }
224
225 pub fn remove_dyn_effects(&mut self) -> Option<DynEffects> {
226 self.dyn_effects.take()
227 }
228
229 pub fn set_snarker_sok_digest(&mut self, digest: ByteString) {
230 self.snarker_sok_digest = Some(digest);
231 }
232
233 pub fn pending_events(&mut self, poll: bool) -> impl Iterator<Item = (PendingEventId, &Event)> {
234 while let Ok(req) = self.real.rpc_receiver().try_recv() {
235 self.real.process_rpc_request(req);
236 }
237 if poll {
238 while let Some(event) = self.real.event_receiver().try_next() {
239 if self.is_replay && NonDeterministicEvent::should_drop_event(&event) {
242 eprintln!("dropping non-deterministic event: {event:?}");
243 continue;
244 }
245 self.pending_events.add(event);
246 }
247 }
248 self.pending_events.iter()
249 }
250
251 pub async fn next_pending_event(&mut self) -> Option<(PendingEventId, &Event)> {
252 let event = loop {
253 let (event_receiver, rpc_receiver) = self.real.event_receiver_with_rpc_receiver();
254 tokio::select! {
255 Some(rpc) = rpc_receiver.recv() => {
256 self.real.process_rpc_request(rpc);
257 break self.real.event_receiver().try_next().unwrap();
258 }
259 res = event_receiver.wait_for_events() => {
260 res.ok()?;
261 let event = self.real.event_receiver().try_next().unwrap();
262 if self.is_replay && NonDeterministicEvent::should_drop_event(&event) {
265 eprintln!("dropping non-deterministic event: {event:?}");
266 continue;
267 }
268 break event;
269 }
270 }
271 };
272 let id = self.pending_events.add(event);
273 Some((id, self.pending_events.get(id).unwrap()))
274 }
275
276 pub fn get_pending_event(&self, id: PendingEventId) -> Option<&Event> {
277 self.pending_events.get(id)
278 }
279
280 pub fn take_pending_event(&mut self, id: PendingEventId) -> Option<Event> {
281 self.pending_events.remove(id)
282 }
283
284 pub fn ledger(&self, ledger_hash: &LedgerHash) -> Option<Mask> {
285 self.real
286 .ledger_manager()
287 .get_mask(ledger_hash)
288 .map(|(mask, _)| mask)
289 }
290}
291
292impl redux::Service for NodeTestingService {}
293
294impl node::Service for NodeTestingService {
295 fn queues(&mut self) -> node::service::Queues {
296 self.real.queues()
297 }
298
299 fn stats(&mut self) -> Option<&mut Stats> {
300 self.real.stats()
301 }
302
303 fn recorder(&mut self) -> &mut Recorder {
304 self.real.recorder()
305 }
306
307 fn is_replay(&self) -> bool {
308 self.is_replay
309 }
310}
311
312impl P2pCryptoService for NodeTestingService {
313 fn generate_random_nonce(&mut self) -> [u8; 24] {
314 self.real.generate_random_nonce()
315 }
316
317 fn ephemeral_sk(&mut self) -> [u8; 32] {
318 self.real.ephemeral_sk()
319 }
320
321 fn static_sk(&mut self) -> [u8; 32] {
322 self.real.static_sk()
323 }
324
325 fn sign_key(&mut self, key: &[u8; 32]) -> Vec<u8> {
326 self.real.sign_key(key)
327 }
328
329 fn sign_publication(&mut self, publication: &[u8]) -> Vec<u8> {
330 self.real.sign_publication(publication)
331 }
332
333 fn verify_publication(
334 &mut self,
335 pk: &libp2p_identity::PublicKey,
336 publication: &[u8],
337 sig: &[u8],
338 ) -> bool {
339 self.real.verify_publication(pk, publication, sig)
340 }
341}
342
343impl node::ledger::LedgerService for NodeTestingService {
344 fn ledger_manager(&self) -> &node::ledger::LedgerManager {
345 self.real.ledger_manager()
346 }
347}
348
349impl redux::TimeService for NodeTestingService {
350 fn monotonic_time(&mut self) -> Instant {
351 self.monotonic_time
352 }
353}
354
355impl node::event_source::EventSourceService for NodeTestingService {
356 fn next_event(&mut self) -> Option<Event> {
357 None
358 }
359}
360
361impl TransitionFrontierGenesisService for NodeTestingService {
362 fn load_genesis(&mut self, config: Arc<GenesisConfig>) {
363 TransitionFrontierGenesisService::load_genesis(&mut self.real, config);
364 }
365}
366
367impl P2pServiceWebrtc for NodeTestingService {
368 type Event = Event;
369
370 fn random_pick(
371 &mut self,
372 list: &[P2pConnectionOutgoingInitOpts],
373 ) -> Option<P2pConnectionOutgoingInitOpts> {
374 self.real.random_pick(list)
375 }
376
377 fn event_sender(&self) -> &mpsc::UnboundedSender<Event> {
378 P2pServiceWebrtc::event_sender(&self.real)
379 }
380
381 fn cmd_sender(&self) -> &mpsc::TrackedUnboundedSender<Cmd> {
382 P2pServiceWebrtc::cmd_sender(&self.real)
383 }
384
385 fn peers(&mut self) -> &mut BTreeMap<PeerId, PeerState> {
386 P2pServiceWebrtc::peers(&mut self.real)
387 }
388
389 fn outgoing_init(&mut self, peer_id: PeerId) {
390 P2pServiceWebrtc::outgoing_init(&mut self.real, peer_id)
391 }
392
393 fn incoming_init(&mut self, peer_id: PeerId, offer: webrtc::Offer) {
394 P2pServiceWebrtc::incoming_init(&mut self.real, peer_id, offer)
395 }
396
397 fn encrypt<T: node::p2p::identity::EncryptableType>(
398 &mut self,
399 other_pk: &node::p2p::identity::PublicKey,
400 message: &T,
401 ) -> Result<T::Encrypted, Box<dyn std::error::Error>> {
402 self.real.encrypt(other_pk, message)
403 }
404
405 fn decrypt<T: node::p2p::identity::EncryptableType>(
406 &mut self,
407 other_pub_key: &node::p2p::identity::PublicKey,
408 encrypted: &T::Encrypted,
409 ) -> Result<T, Box<dyn std::error::Error>> {
410 self.real.decrypt(other_pub_key, encrypted)
411 }
412
413 fn auth_encrypt_and_send(
414 &mut self,
415 peer_id: PeerId,
416 other_pub_key: &node::p2p::identity::PublicKey,
417 auth: webrtc::ConnectionAuth,
418 ) {
419 self.real
420 .auth_encrypt_and_send(peer_id, other_pub_key, auth)
421 }
422
423 fn auth_decrypt(
424 &mut self,
425 other_pub_key: &node::p2p::identity::PublicKey,
426 auth: webrtc::ConnectionAuthEncrypted,
427 ) -> Option<webrtc::ConnectionAuth> {
428 self.real.auth_decrypt(other_pub_key, auth)
429 }
430}
431
432impl P2pServiceWebrtcWithLibp2p for NodeTestingService {
433 #[cfg(feature = "p2p-libp2p")]
434 fn mio(&mut self) -> &mut node::p2p::service_impl::mio::MioService {
435 self.real.mio()
436 }
437
438 fn connections(&self) -> std::collections::BTreeSet<PeerId> {
439 self.real.connections()
440 }
441}
442
443impl SnarkBlockVerifyService for NodeTestingService {
444 fn verify_init(
445 &mut self,
446 req_id: SnarkBlockVerifyId,
447 verifier_index: BlockVerifier,
448 verifier_srs: Arc<VerifierSRS>,
449 block: VerifiableBlockWithHash,
450 ) {
451 match self.proof_kind() {
452 ProofKind::Dummy | ProofKind::ConstraintsChecked => {
453 let _ = self
454 .real
455 .event_sender()
456 .send(SnarkEvent::BlockVerify(req_id, Ok(())).into());
457 }
458 ProofKind::Full => SnarkBlockVerifyService::verify_init(
459 &mut self.real,
460 req_id,
461 verifier_index,
462 verifier_srs,
463 block,
464 ),
465 }
466 }
467}
468
469impl SnarkUserCommandVerifyService for NodeTestingService {
470 fn verify_init(
471 &mut self,
472 req_id: SnarkUserCommandVerifyId,
473 commands: Vec<WithStatus<verifiable::UserCommand>>,
474 ) {
475 SnarkUserCommandVerifyService::verify_init(&mut self.real, req_id, commands)
476 }
477}
478
479impl SnarkWorkVerifyService for NodeTestingService {
480 fn verify_init(
481 &mut self,
482 req_id: SnarkWorkVerifyId,
483 verifier_index: TransactionVerifier,
484 verifier_srs: Arc<VerifierSRS>,
485 work: Vec<Snark>,
486 ) {
487 match self.proof_kind() {
488 ProofKind::Dummy | ProofKind::ConstraintsChecked => {
489 let _ = self
490 .real
491 .event_sender()
492 .send(SnarkEvent::WorkVerify(req_id, Ok(())).into());
493 }
494 ProofKind::Full => SnarkWorkVerifyService::verify_init(
495 &mut self.real,
496 req_id,
497 verifier_index,
498 verifier_srs,
499 work,
500 ),
501 }
502 }
503}
504
505impl SnarkPoolService for NodeTestingService {
506 fn random_choose<'a>(
507 &mut self,
508 iter: impl Iterator<Item = &'a SnarkJobId>,
509 n: usize,
510 ) -> Vec<SnarkJobId> {
511 self.real.random_choose(iter, n)
512 }
513}
514
515impl BlockProducerVrfEvaluatorService for NodeTestingService {
516 fn evaluate(&mut self, data: VrfEvaluatorInput) {
517 BlockProducerVrfEvaluatorService::evaluate(&mut self.real, data)
518 }
519}
520
521impl ArchiveService for NodeTestingService {
522 fn send_to_archive(&mut self, data: BlockApplyResult) {
523 self.real.send_to_archive(data);
524 }
525}
526
527use std::cell::RefCell;
528thread_local! {
529 static GENESIS_PROOF: RefCell<Option<(StateHash, Arc<MinaBaseProofStableV2>)>> = const { RefCell::new(None)};
530}
531
532impl BlockProducerService for NodeTestingService {
533 fn provers(&self) -> ledger::proofs::provers::BlockProver {
534 self.real.provers()
535 }
536
537 fn prove(
538 &mut self,
539 block_hash: StateHash,
540 mut input: Box<ProverExtendBlockchainInputStableV2>,
541 ) {
542 fn dummy_proof_event(block_hash: StateHash) -> Event {
543 let dummy_proof = (*ledger::dummy::dummy_blockchain_proof()).clone();
544 BlockProducerEvent::BlockProve(block_hash, Ok(dummy_proof.into())).into()
545 }
546 let keypair = self.real.block_producer().unwrap().keypair();
547
548 match self.proof_kind() {
549 ProofKind::Dummy => {
550 let _ = self.real.event_sender().send(dummy_proof_event(block_hash));
551 }
552 ProofKind::ConstraintsChecked => {
553 match mina_node_native::block_producer::prove(
554 self.provers(),
555 &mut input,
556 &keypair,
557 true,
558 ) {
559 Err(e)
560 if matches!(
561 e.downcast_ref::<ProofError>(),
562 Some(ProofError::ConstraintsOk)
563 ) =>
564 {
565 let _ = self.real.event_sender().send(dummy_proof_event(block_hash));
566 }
567 Err(err) => panic!("unexpected block proof generation error: {err:?}"),
568 Ok(_) => unreachable!(),
569 }
570 }
571 ProofKind::Full => {
572 let is_genesis = input
574 .next_state
575 .body
576 .consensus_state
577 .blockchain_length
578 .as_u32()
579 == 1;
580 let res = GENESIS_PROOF.with_borrow_mut(|cached_genesis| {
581 if let Some((_, proof)) = cached_genesis
582 .as_ref()
583 .filter(|(hash, _)| is_genesis && hash == &block_hash)
584 {
585 Ok(proof.clone())
586 } else {
587 mina_node_native::block_producer::prove(
588 self.provers(),
589 &mut input,
590 &keypair,
591 false,
592 )
593 .map_err(|err| format!("{err:?}"))
594 }
595 });
596 if let Some(proof) = res.as_ref().ok().filter(|_| is_genesis) {
597 GENESIS_PROOF
598 .with_borrow_mut(|data| *data = Some((block_hash.clone(), proof.clone())));
599 }
600 let _ = self
601 .real
602 .event_sender()
603 .send(BlockProducerEvent::BlockProve(block_hash, res).into());
604 }
605 }
606 }
607
608 fn with_producer_keypair<T>(
609 &self,
610 _f: impl FnOnce(&node::account::AccountSecretKey) -> T,
611 ) -> Option<T> {
612 None
613 }
614}
615
616impl ExternalSnarkWorkerService for NodeTestingService {
617 fn start(
618 &mut self,
619 public_key: NonZeroCurvePoint,
620 fee: CurrencyFeeStableV1,
621 _: TransactionVerifier,
622 ) -> Result<(), node::external_snark_worker::ExternalSnarkWorkerError> {
623 let pub_key = AccountPublicKey::from(public_key);
624 let sok_message = SokMessage::create(
625 (&fee).into(),
626 pub_key.try_into().map_err(|e| {
627 node::external_snark_worker::ExternalSnarkWorkerError::Error(format!("{:?}", e))
628 })?,
629 );
630 self.set_snarker_sok_digest((&sok_message.digest()).into());
631 let _ = self
632 .real
633 .event_sender()
634 .send(ExternalSnarkWorkerEvent::Started.into());
635 Ok(())
636 }
638
639 fn submit(
640 &mut self,
641 spec: SnarkWorkSpec,
642 ) -> Result<(), node::external_snark_worker::ExternalSnarkWorkerError> {
643 let sok_digest = self.snarker_sok_digest.clone().unwrap();
644 let make_dummy_proof = |spec| {
645 let statement = match spec {
646 SnarkWorkerWorkerRpcsVersionedGetWorkV2TResponseA0Single::Transition(v, _) => v.0,
647 SnarkWorkerWorkerRpcsVersionedGetWorkV2TResponseA0Single::Merge(v) => v.0 .0,
648 };
649
650 LedgerProofProdStableV2(TransactionSnarkStableV2 {
651 statement: MinaStateSnarkedLedgerStateWithSokStableV2 {
652 source: statement.source,
653 target: statement.target,
654 connecting_ledger_left: statement.connecting_ledger_left,
655 connecting_ledger_right: statement.connecting_ledger_right,
656 supply_increase: statement.supply_increase,
657 fee_excess: statement.fee_excess,
658 sok_digest: sok_digest.clone(),
659 },
660 proof: (*dummy_transaction_proof()).clone(),
661 })
662 };
663 let res = match spec {
664 SnarkWorkSpec::One(v) => TransactionSnarkWorkTStableV2Proofs::One(make_dummy_proof(v)),
665 SnarkWorkSpec::Two((v1, v2)) => TransactionSnarkWorkTStableV2Proofs::Two((
666 make_dummy_proof(v1),
667 make_dummy_proof(v2),
668 )),
669 };
670 let _ = self
671 .real
672 .event_sender()
673 .send(ExternalSnarkWorkerEvent::WorkResult(Arc::new(res)).into());
674 Ok(())
675 }
677
678 fn cancel(&mut self) -> Result<(), node::external_snark_worker::ExternalSnarkWorkerError> {
679 let _ = self
680 .real
681 .event_sender()
682 .send(ExternalSnarkWorkerEvent::WorkCancelled.into());
683 Ok(())
684 }
686
687 fn kill(&mut self) -> Result<(), node::external_snark_worker::ExternalSnarkWorkerError> {
688 let _ = self
689 .real
690 .event_sender()
691 .send(ExternalSnarkWorkerEvent::Killed.into());
692 Ok(())
693 }
695}
696
697impl node::core::invariants::InvariantService for NodeTestingService {
698 type ClusterInvariantsState<'a> = std::sync::MutexGuard<'a, InvariantsState>;
699
700 fn node_id(&self) -> usize {
701 self.node_id().index()
702 }
703
704 fn invariants_state(&mut self) -> &mut InvariantsState {
705 node::core::invariants::InvariantService::invariants_state(&mut self.real)
706 }
707
708 fn cluster_invariants_state<'a>(&'a mut self) -> Option<Self::ClusterInvariantsState<'a>>
709 where
710 Self: 'a,
711 {
712 Some(
713 self.cluster_invariants_state.try_lock().expect(
714 "locking should never fail, since we are running all nodes in the same thread",
715 ),
716 )
717 }
718}