node/event_source/
event_source_effects.rs

1use p2p::channels::{
2    signaling::{
3        discovery::P2pChannelsSignalingDiscoveryAction,
4        exchange::P2pChannelsSignalingExchangeAction,
5    },
6    snark::P2pChannelsSnarkAction,
7    streaming_rpc::P2pChannelsStreamingRpcAction,
8    transaction::P2pChannelsTransactionAction,
9};
10use snark::user_command_verify::{SnarkUserCommandVerifyAction, SnarkUserCommandVerifyError};
11
12#[cfg(feature = "p2p-libp2p")]
13use crate::p2p::{MioEvent, P2pNetworkSchedulerAction};
14use crate::{
15    action::CheckTimeoutsAction,
16    block_producer::{
17        vrf_evaluator::BlockProducerVrfEvaluatorAction, BlockProducerEvent,
18        BlockProducerVrfEvaluatorEvent,
19    },
20    external_snark_worker_effectful::ExternalSnarkWorkerEvent,
21    ledger::{read::LedgerReadAction, write::LedgerWriteAction},
22    p2p::{
23        channels::{
24            best_tip::P2pChannelsBestTipAction, rpc::P2pChannelsRpcAction,
25            snark_job_commitment::P2pChannelsSnarkJobCommitmentAction, ChannelId,
26            P2pChannelsMessageReceivedAction,
27        },
28        connection::{
29            incoming::P2pConnectionIncomingAction, outgoing::P2pConnectionOutgoingAction,
30            P2pConnectionErrorResponse, P2pConnectionResponse,
31        },
32        disconnection::{P2pDisconnectionAction, P2pDisconnectionReason},
33        P2pChannelEvent,
34    },
35    rpc::{RpcAction, RpcRequest},
36    snark::{block_verify::SnarkBlockVerifyAction, work_verify::SnarkWorkVerifyAction, SnarkEvent},
37    transition_frontier::genesis::TransitionFrontierGenesisAction,
38    BlockProducerAction, ExternalSnarkWorkerAction, Service, Store,
39};
40
41use super::{
42    Event, EventSourceAction, EventSourceActionWithMeta, LedgerEvent, P2pConnectionEvent, P2pEvent,
43};
44
45pub fn event_source_effects<S: Service>(store: &mut Store<S>, action: EventSourceActionWithMeta) {
46    let (action, meta) = action.split();
47    match action {
48        EventSourceAction::ProcessEvents => {
49            // This action gets continously called until there are no more
50            // events available.
51            //
52            // Retrieve and process max 1024 events at a time and dispatch
53            // `CheckTimeoutsAction` in between `EventSourceProcessEventsAction`
54            // calls so that we make sure, that action gets called even
55            // if we are continously flooded with events.
56            for _ in 0..1024 {
57                match store.service.next_event() {
58                    Some(event) => {
59                        store.dispatch(EventSourceAction::NewEvent { event });
60                    }
61                    None => break,
62                }
63            }
64            store.dispatch(CheckTimeoutsAction {});
65        }
66        // "Translate" event into the corresponding action and dispatch it.
67        EventSourceAction::NewEvent { event } => match event {
68            Event::P2p(e) => match e {
69                #[cfg(not(feature = "p2p-libp2p"))]
70                P2pEvent::MioEvent(_) => {}
71                #[cfg(feature = "p2p-libp2p")]
72                P2pEvent::MioEvent(e) => match e {
73                    MioEvent::InterfaceDetected(ip) => {
74                        store.dispatch(P2pNetworkSchedulerAction::InterfaceDetected { ip });
75                    }
76                    MioEvent::InterfaceExpired(ip) => {
77                        store.dispatch(P2pNetworkSchedulerAction::InterfaceExpired { ip });
78                    }
79                    MioEvent::ListenerReady { listener } => {
80                        store.dispatch(P2pNetworkSchedulerAction::ListenerReady { listener });
81                    }
82                    MioEvent::ListenerError { listener, error } => {
83                        store
84                            .dispatch(P2pNetworkSchedulerAction::ListenerError { listener, error });
85                    }
86                    MioEvent::IncomingConnectionIsReady { listener } => {
87                        store.dispatch(P2pNetworkSchedulerAction::IncomingConnectionIsReady {
88                            listener,
89                        });
90                    }
91                    MioEvent::IncomingConnectionDidAccept(addr, result) => {
92                        store.dispatch(P2pNetworkSchedulerAction::IncomingDidAccept {
93                            addr,
94                            result,
95                        });
96                    }
97                    MioEvent::OutgoingConnectionDidConnect(addr, result) => {
98                        store.dispatch(P2pNetworkSchedulerAction::OutgoingDidConnect {
99                            addr,
100                            result,
101                        });
102                    }
103                    MioEvent::IncomingDataIsReady(addr) => {
104                        store.dispatch(P2pNetworkSchedulerAction::IncomingDataIsReady { addr });
105                    }
106                    MioEvent::IncomingDataDidReceive(addr, result) => {
107                        store.dispatch(P2pNetworkSchedulerAction::IncomingDataDidReceive {
108                            addr,
109                            result,
110                        });
111                    }
112                    MioEvent::OutgoingDataDidSend(_, _result) => {}
113                    MioEvent::ConnectionDidClose(addr, result) => {
114                        if let Err(e) = result {
115                            store.dispatch(P2pNetworkSchedulerAction::Error {
116                                addr,
117                                error: p2p::P2pNetworkConnectionError::MioError(e),
118                            });
119                        } else {
120                            store.dispatch(P2pNetworkSchedulerAction::Error {
121                                addr,
122                                error: p2p::P2pNetworkConnectionError::RemoteClosed,
123                            });
124                        }
125                    }
126                    MioEvent::ConnectionDidCloseOnDemand(addr) => {
127                        store.dispatch(P2pNetworkSchedulerAction::Prune { addr });
128                    }
129                },
130                P2pEvent::Connection(e) => match e {
131                    P2pConnectionEvent::OfferSdpReady(peer_id, res) => match res {
132                        Err(error) => {
133                            store.dispatch(P2pConnectionOutgoingAction::OfferSdpCreateError {
134                                peer_id,
135                                error,
136                            });
137                        }
138                        Ok(sdp) => {
139                            store.dispatch(P2pConnectionOutgoingAction::OfferSdpCreateSuccess {
140                                peer_id,
141                                sdp,
142                            });
143                        }
144                    },
145                    P2pConnectionEvent::AnswerSdpReady(peer_id, res) => match res {
146                        Err(error) => {
147                            store.dispatch(P2pConnectionIncomingAction::AnswerSdpCreateError {
148                                peer_id,
149                                error,
150                            });
151                        }
152                        Ok(sdp) => {
153                            store.dispatch(P2pConnectionIncomingAction::AnswerSdpCreateSuccess {
154                                peer_id,
155                                sdp,
156                            });
157                        }
158                    },
159                    P2pConnectionEvent::AnswerReceived(peer_id, res) => match res {
160                        P2pConnectionResponse::Accepted(answer) => {
161                            store.dispatch(P2pConnectionOutgoingAction::AnswerRecvSuccess {
162                                peer_id,
163                                answer,
164                            });
165                        }
166                        P2pConnectionResponse::Rejected(reason) => {
167                            store.dispatch(P2pConnectionOutgoingAction::AnswerRecvError {
168                                peer_id,
169                                error: P2pConnectionErrorResponse::Rejected(reason),
170                            });
171                        }
172                        P2pConnectionResponse::SignalDecryptionFailed => {
173                            store.dispatch(P2pConnectionOutgoingAction::AnswerRecvError {
174                                peer_id,
175                                error: P2pConnectionErrorResponse::SignalDecryptionFailed,
176                            });
177                        }
178                        P2pConnectionResponse::InternalError => {
179                            store.dispatch(P2pConnectionOutgoingAction::AnswerRecvError {
180                                peer_id,
181                                error: P2pConnectionErrorResponse::InternalError,
182                            });
183                        }
184                    },
185                    P2pConnectionEvent::Finalized(peer_id, res) => match res {
186                        Err(error) => {
187                            store.dispatch(P2pConnectionOutgoingAction::FinalizeError {
188                                peer_id,
189                                error: error.clone(),
190                            });
191                            store.dispatch(P2pConnectionIncomingAction::FinalizeError {
192                                peer_id,
193                                error,
194                            });
195                        }
196                        Ok(auth) => {
197                            let _ = store.dispatch(P2pConnectionOutgoingAction::FinalizeSuccess {
198                                peer_id,
199                                remote_auth: Some(auth.clone()),
200                            }) || store.dispatch(
201                                P2pConnectionIncomingAction::FinalizeSuccess {
202                                    peer_id,
203                                    remote_auth: auth.clone(),
204                                },
205                            );
206                        }
207                    },
208                    P2pConnectionEvent::Closed(peer_id) => {
209                        store.dispatch(P2pDisconnectionAction::PeerClosed { peer_id });
210                        store.dispatch(P2pDisconnectionAction::Finish { peer_id });
211                    }
212                },
213                P2pEvent::Channel(e) => match e {
214                    P2pChannelEvent::Opened(peer_id, chan_id, res) => match res {
215                        Err(err) => {
216                            openmina_core::log::warn!(meta.time(); kind = "P2pChannelEvent::Opened", peer_id = peer_id.to_string(), error = err);
217                            // TODO(binier): dispatch error action.
218                        }
219                        Ok(_) => match chan_id {
220                            ChannelId::SignalingDiscovery => {
221                                store.dispatch(P2pChannelsSignalingDiscoveryAction::Ready {
222                                    peer_id,
223                                });
224                            }
225                            ChannelId::SignalingExchange => {
226                                store.dispatch(P2pChannelsSignalingExchangeAction::Ready {
227                                    peer_id,
228                                });
229                            }
230                            ChannelId::BestTipPropagation => {
231                                store.dispatch(P2pChannelsBestTipAction::Ready { peer_id });
232                            }
233                            ChannelId::TransactionPropagation => {
234                                store.dispatch(P2pChannelsTransactionAction::Ready { peer_id });
235                            }
236                            ChannelId::SnarkPropagation => {
237                                store.dispatch(P2pChannelsSnarkAction::Ready { peer_id });
238                            }
239                            ChannelId::SnarkJobCommitmentPropagation => {
240                                store.dispatch(P2pChannelsSnarkJobCommitmentAction::Ready {
241                                    peer_id,
242                                });
243                            }
244                            ChannelId::Rpc => {
245                                store.dispatch(P2pChannelsRpcAction::Ready { peer_id });
246                            }
247                            ChannelId::StreamingRpc => {
248                                store.dispatch(P2pChannelsStreamingRpcAction::Ready { peer_id });
249                            }
250                        },
251                    },
252                    P2pChannelEvent::Sent(peer_id, _, _, res) => {
253                        if let Err(err) = res {
254                            let reason = P2pDisconnectionReason::P2pChannelSendFailed(err);
255                            store.dispatch(P2pDisconnectionAction::Init { peer_id, reason });
256                        }
257                    }
258                    P2pChannelEvent::Received(peer_id, res) => match res {
259                        Err(err) => {
260                            let reason = P2pDisconnectionReason::P2pChannelReceiveFailed(err);
261                            store.dispatch(P2pDisconnectionAction::Init { peer_id, reason });
262                        }
263                        Ok(message) => {
264                            store.dispatch(P2pChannelsMessageReceivedAction {
265                                peer_id,
266                                message: Box::new(message),
267                            });
268                        }
269                    },
270                    P2pChannelEvent::Closed(peer_id, chan_id) => {
271                        let reason = P2pDisconnectionReason::P2pChannelClosed(chan_id);
272                        store.dispatch(P2pDisconnectionAction::Init { peer_id, reason });
273                    }
274                },
275            },
276            Event::Ledger(event) => match event {
277                LedgerEvent::Write(response) => {
278                    store.dispatch(LedgerWriteAction::Success { response });
279                }
280                LedgerEvent::Read(id, response) => {
281                    store.dispatch(LedgerReadAction::Success { id, response });
282                }
283            },
284            Event::Snark(event) => match event {
285                SnarkEvent::BlockVerify(req_id, result) => match result {
286                    Err(error) => {
287                        store.dispatch(SnarkBlockVerifyAction::Error { req_id, error });
288                    }
289                    Ok(()) => {
290                        store.dispatch(SnarkBlockVerifyAction::Success { req_id });
291                    }
292                },
293                SnarkEvent::WorkVerify(req_id, result) => match result {
294                    Err(error) => {
295                        store.dispatch(SnarkWorkVerifyAction::Error { req_id, error });
296                    }
297                    Ok(()) => {
298                        store.dispatch(SnarkWorkVerifyAction::Success { req_id });
299                    }
300                },
301                SnarkEvent::UserCommandVerify(req_id, result) => {
302                    if let Ok(commands) = result {
303                        store.dispatch(SnarkUserCommandVerifyAction::Success { req_id, commands });
304                    } else {
305                        store.dispatch(SnarkUserCommandVerifyAction::Error {
306                            req_id,
307                            error: SnarkUserCommandVerifyError::VerificationFailed,
308                        });
309                    }
310                }
311            },
312            Event::Rpc(rpc_id, e) => match *e {
313                RpcRequest::StateGet(filter) => {
314                    store.dispatch(RpcAction::GlobalStateGet { rpc_id, filter });
315                }
316                RpcRequest::StatusGet => {
317                    store.dispatch(RpcAction::StatusGet { rpc_id });
318                }
319                RpcRequest::HeartbeatGet => {
320                    store.dispatch(RpcAction::HeartbeatGet { rpc_id });
321                }
322                RpcRequest::ActionStatsGet(query) => {
323                    store.dispatch(RpcAction::ActionStatsGet { rpc_id, query });
324                }
325                RpcRequest::SyncStatsGet(query) => {
326                    store.dispatch(RpcAction::SyncStatsGet { rpc_id, query });
327                }
328                RpcRequest::BlockProducerStatsGet => {
329                    store.dispatch(RpcAction::BlockProducerStatsGet { rpc_id });
330                }
331                RpcRequest::PeersGet => {
332                    store.dispatch(RpcAction::PeersGet { rpc_id });
333                }
334                RpcRequest::MessageProgressGet => {
335                    store.dispatch(RpcAction::MessageProgressGet { rpc_id });
336                }
337                RpcRequest::P2pConnectionOutgoing(opts) => {
338                    store.dispatch(RpcAction::P2pConnectionOutgoingInit { rpc_id, opts });
339                }
340                RpcRequest::P2pConnectionIncoming(opts) => {
341                    store.dispatch(RpcAction::P2pConnectionIncomingInit { rpc_id, opts });
342                }
343                RpcRequest::ScanStateSummaryGet(query) => {
344                    store.dispatch(RpcAction::ScanStateSummaryGetInit { rpc_id, query });
345                }
346                RpcRequest::SnarkPoolGet => {
347                    store.dispatch(RpcAction::SnarkPoolAvailableJobsGet { rpc_id });
348                }
349                RpcRequest::SnarkPoolJobGet { job_id } => {
350                    store.dispatch(RpcAction::SnarkPoolJobGet { rpc_id, job_id });
351                }
352                RpcRequest::SnarkPoolCompletedJobsGet => {
353                    store.dispatch(RpcAction::SnarkPoolCompletedJobsGet { rpc_id });
354                }
355                RpcRequest::SnarkPoolPendingJobsGet => {
356                    store.dispatch(RpcAction::SnarkPoolPendingJobsGet { rpc_id });
357                }
358                RpcRequest::SnarkerConfig => {
359                    store.dispatch(RpcAction::SnarkerConfigGet { rpc_id });
360                }
361                RpcRequest::SnarkerJobCommit { job_id } => {
362                    store.dispatch(RpcAction::SnarkerJobCommit { rpc_id, job_id });
363                }
364                RpcRequest::SnarkerJobSpec { job_id } => {
365                    store.dispatch(RpcAction::SnarkerJobSpec { rpc_id, job_id });
366                }
367                RpcRequest::SnarkerWorkers => {
368                    store.dispatch(RpcAction::SnarkerWorkersGet { rpc_id });
369                }
370                RpcRequest::HealthCheck => {
371                    store.dispatch(RpcAction::HealthCheck { rpc_id });
372                }
373                RpcRequest::ReadinessCheck => {
374                    store.dispatch(RpcAction::ReadinessCheck { rpc_id });
375                }
376                RpcRequest::DiscoveryRoutingTable => {
377                    store.dispatch(RpcAction::DiscoveryRoutingTable { rpc_id });
378                }
379                RpcRequest::DiscoveryBoostrapStats => {
380                    store.dispatch(RpcAction::DiscoveryBoostrapStats { rpc_id });
381                }
382                RpcRequest::TransactionPoolGet => {
383                    store.dispatch(RpcAction::TransactionPool { rpc_id });
384                }
385                RpcRequest::LedgerAccountsGet(account_query) => {
386                    store.dispatch(RpcAction::LedgerAccountsGetInit {
387                        rpc_id,
388                        account_query,
389                    });
390                }
391                RpcRequest::TransactionInject(commands) => {
392                    store.dispatch(RpcAction::TransactionInjectInit { rpc_id, commands });
393                }
394                RpcRequest::TransitionFrontierUserCommandsGet => {
395                    store.dispatch(RpcAction::TransitionFrontierUserCommandsGet { rpc_id });
396                }
397                RpcRequest::BestChain(max_length) => {
398                    store.dispatch(RpcAction::BestChain { rpc_id, max_length });
399                }
400                RpcRequest::ConsensusConstantsGet => {
401                    store.dispatch(RpcAction::ConsensusConstantsGet { rpc_id });
402                }
403                RpcRequest::TransactionStatusGet(tx) => {
404                    store.dispatch(RpcAction::TransactionStatusGet { rpc_id, tx });
405                }
406                RpcRequest::GetBlock(query) => {
407                    store.dispatch(RpcAction::BlockGet { rpc_id, query });
408                }
409                RpcRequest::PooledUserCommands(query) => {
410                    store.dispatch(RpcAction::PooledUserCommands { rpc_id, query });
411                }
412                RpcRequest::PooledZkappCommands(query) => {
413                    store.dispatch(RpcAction::PooledZkappCommands { rpc_id, query });
414                }
415                RpcRequest::ConsensusTimeGet(query) => {
416                    store.dispatch(RpcAction::ConsensusTimeGet { rpc_id, query });
417                }
418                RpcRequest::GenesisBlockGet => {
419                    store.dispatch(RpcAction::GenesisBlock { rpc_id });
420                }
421                RpcRequest::LedgerStatusGet(ledger_hash) => {
422                    store.dispatch(RpcAction::LedgerStatusGetInit {
423                        rpc_id,
424                        ledger_hash,
425                    });
426                }
427                RpcRequest::LedgerAccountDelegatorsGet(ledger_hash, account_id) => {
428                    store.dispatch(RpcAction::LedgerAccountDelegatorsGetInit {
429                        rpc_id,
430                        ledger_hash,
431                        account_id,
432                    });
433                }
434            },
435            Event::ExternalSnarkWorker(e) => match e {
436                ExternalSnarkWorkerEvent::Started => {
437                    store.dispatch(ExternalSnarkWorkerAction::Started);
438                }
439                ExternalSnarkWorkerEvent::Killed => {
440                    store.dispatch(ExternalSnarkWorkerAction::Killed);
441                }
442                ExternalSnarkWorkerEvent::WorkResult(result) => {
443                    store.dispatch(ExternalSnarkWorkerAction::WorkResult { result });
444                }
445                ExternalSnarkWorkerEvent::WorkError(error) => {
446                    store.dispatch(ExternalSnarkWorkerAction::WorkError { error });
447                }
448                ExternalSnarkWorkerEvent::WorkCancelled => {
449                    store.dispatch(ExternalSnarkWorkerAction::WorkCancelled);
450                }
451                ExternalSnarkWorkerEvent::Error(error) => {
452                    store.dispatch(ExternalSnarkWorkerAction::Error {
453                        error,
454                        permanent: false,
455                    });
456                }
457            },
458            Event::BlockProducerEvent(e) => match e {
459                BlockProducerEvent::VrfEvaluator(vrf_e) => match vrf_e {
460                    BlockProducerVrfEvaluatorEvent::Evaluated(vrf_output_with_hash) => {
461                        store.dispatch(
462                            BlockProducerVrfEvaluatorAction::ProcessSlotEvaluationSuccess {
463                                vrf_output: vrf_output_with_hash.evaluation_result,
464                                staking_ledger_hash: vrf_output_with_hash.staking_ledger_hash,
465                            },
466                        );
467                    }
468                },
469                BlockProducerEvent::BlockProve(block_hash, res) => match res {
470                    Err(err) => todo!(
471                        "error while trying to produce block proof for block {block_hash} - {err}"
472                    ),
473                    Ok(proof) => {
474                        if store
475                            .state()
476                            .transition_frontier
477                            .genesis
478                            .prove_pending_block_hash()
479                            .is_some_and(|hash| hash == block_hash)
480                        {
481                            // TODO(refactor): before this is dispatched, genesis inject must be dispatched
482                            store.dispatch(TransitionFrontierGenesisAction::ProveSuccess { proof });
483                        } else {
484                            store.dispatch(BlockProducerAction::BlockProveSuccess { proof });
485                        }
486                    }
487                },
488            },
489            Event::GenesisLoad(res) => match res {
490                Err(err) => todo!("error while trying to load genesis config/ledger. - {err}"),
491                Ok(data) => {
492                    store.dispatch(TransitionFrontierGenesisAction::LedgerLoadSuccess { data });
493                }
494            },
495        },
496        EventSourceAction::WaitTimeout => {
497            store.dispatch(CheckTimeoutsAction {});
498        }
499        EventSourceAction::WaitForEvents => {}
500    }
501}