1use p2p::channels::{
2 signaling::{
3 discovery::P2pChannelsSignalingDiscoveryAction,
4 exchange::P2pChannelsSignalingExchangeAction,
5 },
6 snark::P2pChannelsSnarkAction,
7 streaming_rpc::P2pChannelsStreamingRpcAction,
8 transaction::P2pChannelsTransactionAction,
9};
10use snark::user_command_verify::{SnarkUserCommandVerifyAction, SnarkUserCommandVerifyError};
11
12#[cfg(feature = "p2p-libp2p")]
13use crate::p2p::{MioEvent, P2pNetworkSchedulerAction};
14use crate::{
15 action::CheckTimeoutsAction,
16 block_producer::{
17 vrf_evaluator::BlockProducerVrfEvaluatorAction, BlockProducerEvent,
18 BlockProducerVrfEvaluatorEvent,
19 },
20 external_snark_worker_effectful::ExternalSnarkWorkerEvent,
21 ledger::{read::LedgerReadAction, write::LedgerWriteAction},
22 p2p::{
23 channels::{
24 best_tip::P2pChannelsBestTipAction, rpc::P2pChannelsRpcAction,
25 snark_job_commitment::P2pChannelsSnarkJobCommitmentAction, ChannelId,
26 P2pChannelsMessageReceivedAction,
27 },
28 connection::{
29 incoming::P2pConnectionIncomingAction, outgoing::P2pConnectionOutgoingAction,
30 P2pConnectionErrorResponse, P2pConnectionResponse,
31 },
32 disconnection::{P2pDisconnectionAction, P2pDisconnectionReason},
33 P2pChannelEvent,
34 },
35 rpc::{RpcAction, RpcRequest},
36 snark::{block_verify::SnarkBlockVerifyAction, work_verify::SnarkWorkVerifyAction, SnarkEvent},
37 transition_frontier::genesis::TransitionFrontierGenesisAction,
38 BlockProducerAction, ExternalSnarkWorkerAction, Service, Store,
39};
40
41use super::{
42 Event, EventSourceAction, EventSourceActionWithMeta, LedgerEvent, P2pConnectionEvent, P2pEvent,
43};
44
45pub fn event_source_effects<S: Service>(store: &mut Store<S>, action: EventSourceActionWithMeta) {
46 let (action, meta) = action.split();
47 match action {
48 EventSourceAction::ProcessEvents => {
49 for _ in 0..1024 {
57 match store.service.next_event() {
58 Some(event) => {
59 store.dispatch(EventSourceAction::NewEvent { event });
60 }
61 None => break,
62 }
63 }
64 store.dispatch(CheckTimeoutsAction {});
65 }
66 EventSourceAction::NewEvent { event } => match event {
68 Event::P2p(e) => match e {
69 #[cfg(not(feature = "p2p-libp2p"))]
70 P2pEvent::MioEvent(_) => {}
71 #[cfg(feature = "p2p-libp2p")]
72 P2pEvent::MioEvent(e) => match e {
73 MioEvent::InterfaceDetected(ip) => {
74 store.dispatch(P2pNetworkSchedulerAction::InterfaceDetected { ip });
75 }
76 MioEvent::InterfaceExpired(ip) => {
77 store.dispatch(P2pNetworkSchedulerAction::InterfaceExpired { ip });
78 }
79 MioEvent::ListenerReady { listener } => {
80 store.dispatch(P2pNetworkSchedulerAction::ListenerReady { listener });
81 }
82 MioEvent::ListenerError { listener, error } => {
83 store
84 .dispatch(P2pNetworkSchedulerAction::ListenerError { listener, error });
85 }
86 MioEvent::IncomingConnectionIsReady { listener } => {
87 store.dispatch(P2pNetworkSchedulerAction::IncomingConnectionIsReady {
88 listener,
89 });
90 }
91 MioEvent::IncomingConnectionDidAccept(addr, result) => {
92 store.dispatch(P2pNetworkSchedulerAction::IncomingDidAccept {
93 addr,
94 result,
95 });
96 }
97 MioEvent::OutgoingConnectionDidConnect(addr, result) => {
98 store.dispatch(P2pNetworkSchedulerAction::OutgoingDidConnect {
99 addr,
100 result,
101 });
102 }
103 MioEvent::IncomingDataIsReady(addr) => {
104 store.dispatch(P2pNetworkSchedulerAction::IncomingDataIsReady { addr });
105 }
106 MioEvent::IncomingDataDidReceive(addr, result) => {
107 store.dispatch(P2pNetworkSchedulerAction::IncomingDataDidReceive {
108 addr,
109 result,
110 });
111 }
112 MioEvent::OutgoingDataDidSend(_, _result) => {}
113 MioEvent::ConnectionDidClose(addr, result) => {
114 if let Err(e) = result {
115 store.dispatch(P2pNetworkSchedulerAction::Error {
116 addr,
117 error: p2p::P2pNetworkConnectionError::MioError(e),
118 });
119 } else {
120 store.dispatch(P2pNetworkSchedulerAction::Error {
121 addr,
122 error: p2p::P2pNetworkConnectionError::RemoteClosed,
123 });
124 }
125 }
126 MioEvent::ConnectionDidCloseOnDemand(addr) => {
127 store.dispatch(P2pNetworkSchedulerAction::Prune { addr });
128 }
129 },
130 P2pEvent::Connection(e) => match e {
131 P2pConnectionEvent::OfferSdpReady(peer_id, res) => match res {
132 Err(error) => {
133 store.dispatch(P2pConnectionOutgoingAction::OfferSdpCreateError {
134 peer_id,
135 error,
136 });
137 }
138 Ok(sdp) => {
139 store.dispatch(P2pConnectionOutgoingAction::OfferSdpCreateSuccess {
140 peer_id,
141 sdp,
142 });
143 }
144 },
145 P2pConnectionEvent::AnswerSdpReady(peer_id, res) => match res {
146 Err(error) => {
147 store.dispatch(P2pConnectionIncomingAction::AnswerSdpCreateError {
148 peer_id,
149 error,
150 });
151 }
152 Ok(sdp) => {
153 store.dispatch(P2pConnectionIncomingAction::AnswerSdpCreateSuccess {
154 peer_id,
155 sdp,
156 });
157 }
158 },
159 P2pConnectionEvent::AnswerReceived(peer_id, res) => match res {
160 P2pConnectionResponse::Accepted(answer) => {
161 store.dispatch(P2pConnectionOutgoingAction::AnswerRecvSuccess {
162 peer_id,
163 answer,
164 });
165 }
166 P2pConnectionResponse::Rejected(reason) => {
167 store.dispatch(P2pConnectionOutgoingAction::AnswerRecvError {
168 peer_id,
169 error: P2pConnectionErrorResponse::Rejected(reason),
170 });
171 }
172 P2pConnectionResponse::SignalDecryptionFailed => {
173 store.dispatch(P2pConnectionOutgoingAction::AnswerRecvError {
174 peer_id,
175 error: P2pConnectionErrorResponse::SignalDecryptionFailed,
176 });
177 }
178 P2pConnectionResponse::InternalError => {
179 store.dispatch(P2pConnectionOutgoingAction::AnswerRecvError {
180 peer_id,
181 error: P2pConnectionErrorResponse::InternalError,
182 });
183 }
184 },
185 P2pConnectionEvent::Finalized(peer_id, res) => match res {
186 Err(error) => {
187 store.dispatch(P2pConnectionOutgoingAction::FinalizeError {
188 peer_id,
189 error: error.clone(),
190 });
191 store.dispatch(P2pConnectionIncomingAction::FinalizeError {
192 peer_id,
193 error,
194 });
195 }
196 Ok(auth) => {
197 let _ = store.dispatch(P2pConnectionOutgoingAction::FinalizeSuccess {
198 peer_id,
199 remote_auth: Some(auth.clone()),
200 }) || store.dispatch(
201 P2pConnectionIncomingAction::FinalizeSuccess {
202 peer_id,
203 remote_auth: auth.clone(),
204 },
205 );
206 }
207 },
208 P2pConnectionEvent::Closed(peer_id) => {
209 store.dispatch(P2pDisconnectionAction::PeerClosed { peer_id });
210 store.dispatch(P2pDisconnectionAction::Finish { peer_id });
211 }
212 },
213 P2pEvent::Channel(e) => match e {
214 P2pChannelEvent::Opened(peer_id, chan_id, res) => match res {
215 Err(err) => {
216 openmina_core::log::warn!(meta.time(); kind = "P2pChannelEvent::Opened", peer_id = peer_id.to_string(), error = err);
217 }
219 Ok(_) => match chan_id {
220 ChannelId::SignalingDiscovery => {
221 store.dispatch(P2pChannelsSignalingDiscoveryAction::Ready {
222 peer_id,
223 });
224 }
225 ChannelId::SignalingExchange => {
226 store.dispatch(P2pChannelsSignalingExchangeAction::Ready {
227 peer_id,
228 });
229 }
230 ChannelId::BestTipPropagation => {
231 store.dispatch(P2pChannelsBestTipAction::Ready { peer_id });
232 }
233 ChannelId::TransactionPropagation => {
234 store.dispatch(P2pChannelsTransactionAction::Ready { peer_id });
235 }
236 ChannelId::SnarkPropagation => {
237 store.dispatch(P2pChannelsSnarkAction::Ready { peer_id });
238 }
239 ChannelId::SnarkJobCommitmentPropagation => {
240 store.dispatch(P2pChannelsSnarkJobCommitmentAction::Ready {
241 peer_id,
242 });
243 }
244 ChannelId::Rpc => {
245 store.dispatch(P2pChannelsRpcAction::Ready { peer_id });
246 }
247 ChannelId::StreamingRpc => {
248 store.dispatch(P2pChannelsStreamingRpcAction::Ready { peer_id });
249 }
250 },
251 },
252 P2pChannelEvent::Sent(peer_id, _, _, res) => {
253 if let Err(err) = res {
254 let reason = P2pDisconnectionReason::P2pChannelSendFailed(err);
255 store.dispatch(P2pDisconnectionAction::Init { peer_id, reason });
256 }
257 }
258 P2pChannelEvent::Received(peer_id, res) => match res {
259 Err(err) => {
260 let reason = P2pDisconnectionReason::P2pChannelReceiveFailed(err);
261 store.dispatch(P2pDisconnectionAction::Init { peer_id, reason });
262 }
263 Ok(message) => {
264 store.dispatch(P2pChannelsMessageReceivedAction {
265 peer_id,
266 message: Box::new(message),
267 });
268 }
269 },
270 P2pChannelEvent::Closed(peer_id, chan_id) => {
271 let reason = P2pDisconnectionReason::P2pChannelClosed(chan_id);
272 store.dispatch(P2pDisconnectionAction::Init { peer_id, reason });
273 }
274 },
275 },
276 Event::Ledger(event) => match event {
277 LedgerEvent::Write(response) => {
278 store.dispatch(LedgerWriteAction::Success { response });
279 }
280 LedgerEvent::Read(id, response) => {
281 store.dispatch(LedgerReadAction::Success { id, response });
282 }
283 },
284 Event::Snark(event) => match event {
285 SnarkEvent::BlockVerify(req_id, result) => match result {
286 Err(error) => {
287 store.dispatch(SnarkBlockVerifyAction::Error { req_id, error });
288 }
289 Ok(()) => {
290 store.dispatch(SnarkBlockVerifyAction::Success { req_id });
291 }
292 },
293 SnarkEvent::WorkVerify(req_id, result) => match result {
294 Err(error) => {
295 store.dispatch(SnarkWorkVerifyAction::Error { req_id, error });
296 }
297 Ok(()) => {
298 store.dispatch(SnarkWorkVerifyAction::Success { req_id });
299 }
300 },
301 SnarkEvent::UserCommandVerify(req_id, result) => {
302 if let Ok(commands) = result {
303 store.dispatch(SnarkUserCommandVerifyAction::Success { req_id, commands });
304 } else {
305 store.dispatch(SnarkUserCommandVerifyAction::Error {
306 req_id,
307 error: SnarkUserCommandVerifyError::VerificationFailed,
308 });
309 }
310 }
311 },
312 Event::Rpc(rpc_id, e) => match *e {
313 RpcRequest::StateGet(filter) => {
314 store.dispatch(RpcAction::GlobalStateGet { rpc_id, filter });
315 }
316 RpcRequest::StatusGet => {
317 store.dispatch(RpcAction::StatusGet { rpc_id });
318 }
319 RpcRequest::HeartbeatGet => {
320 store.dispatch(RpcAction::HeartbeatGet { rpc_id });
321 }
322 RpcRequest::ActionStatsGet(query) => {
323 store.dispatch(RpcAction::ActionStatsGet { rpc_id, query });
324 }
325 RpcRequest::SyncStatsGet(query) => {
326 store.dispatch(RpcAction::SyncStatsGet { rpc_id, query });
327 }
328 RpcRequest::BlockProducerStatsGet => {
329 store.dispatch(RpcAction::BlockProducerStatsGet { rpc_id });
330 }
331 RpcRequest::PeersGet => {
332 store.dispatch(RpcAction::PeersGet { rpc_id });
333 }
334 RpcRequest::MessageProgressGet => {
335 store.dispatch(RpcAction::MessageProgressGet { rpc_id });
336 }
337 RpcRequest::P2pConnectionOutgoing(opts) => {
338 store.dispatch(RpcAction::P2pConnectionOutgoingInit { rpc_id, opts });
339 }
340 RpcRequest::P2pConnectionIncoming(opts) => {
341 store.dispatch(RpcAction::P2pConnectionIncomingInit { rpc_id, opts });
342 }
343 RpcRequest::ScanStateSummaryGet(query) => {
344 store.dispatch(RpcAction::ScanStateSummaryGetInit { rpc_id, query });
345 }
346 RpcRequest::SnarkPoolGet => {
347 store.dispatch(RpcAction::SnarkPoolAvailableJobsGet { rpc_id });
348 }
349 RpcRequest::SnarkPoolJobGet { job_id } => {
350 store.dispatch(RpcAction::SnarkPoolJobGet { rpc_id, job_id });
351 }
352 RpcRequest::SnarkPoolCompletedJobsGet => {
353 store.dispatch(RpcAction::SnarkPoolCompletedJobsGet { rpc_id });
354 }
355 RpcRequest::SnarkPoolPendingJobsGet => {
356 store.dispatch(RpcAction::SnarkPoolPendingJobsGet { rpc_id });
357 }
358 RpcRequest::SnarkerConfig => {
359 store.dispatch(RpcAction::SnarkerConfigGet { rpc_id });
360 }
361 RpcRequest::SnarkerJobCommit { job_id } => {
362 store.dispatch(RpcAction::SnarkerJobCommit { rpc_id, job_id });
363 }
364 RpcRequest::SnarkerJobSpec { job_id } => {
365 store.dispatch(RpcAction::SnarkerJobSpec { rpc_id, job_id });
366 }
367 RpcRequest::SnarkerWorkers => {
368 store.dispatch(RpcAction::SnarkerWorkersGet { rpc_id });
369 }
370 RpcRequest::HealthCheck => {
371 store.dispatch(RpcAction::HealthCheck { rpc_id });
372 }
373 RpcRequest::ReadinessCheck => {
374 store.dispatch(RpcAction::ReadinessCheck { rpc_id });
375 }
376 RpcRequest::DiscoveryRoutingTable => {
377 store.dispatch(RpcAction::DiscoveryRoutingTable { rpc_id });
378 }
379 RpcRequest::DiscoveryBoostrapStats => {
380 store.dispatch(RpcAction::DiscoveryBoostrapStats { rpc_id });
381 }
382 RpcRequest::TransactionPoolGet => {
383 store.dispatch(RpcAction::TransactionPool { rpc_id });
384 }
385 RpcRequest::LedgerAccountsGet(account_query) => {
386 store.dispatch(RpcAction::LedgerAccountsGetInit {
387 rpc_id,
388 account_query,
389 });
390 }
391 RpcRequest::TransactionInject(commands) => {
392 store.dispatch(RpcAction::TransactionInjectInit { rpc_id, commands });
393 }
394 RpcRequest::TransitionFrontierUserCommandsGet => {
395 store.dispatch(RpcAction::TransitionFrontierUserCommandsGet { rpc_id });
396 }
397 RpcRequest::BestChain(max_length) => {
398 store.dispatch(RpcAction::BestChain { rpc_id, max_length });
399 }
400 RpcRequest::ConsensusConstantsGet => {
401 store.dispatch(RpcAction::ConsensusConstantsGet { rpc_id });
402 }
403 RpcRequest::TransactionStatusGet(tx) => {
404 store.dispatch(RpcAction::TransactionStatusGet { rpc_id, tx });
405 }
406 RpcRequest::GetBlock(query) => {
407 store.dispatch(RpcAction::BlockGet { rpc_id, query });
408 }
409 RpcRequest::PooledUserCommands(query) => {
410 store.dispatch(RpcAction::PooledUserCommands { rpc_id, query });
411 }
412 RpcRequest::PooledZkappCommands(query) => {
413 store.dispatch(RpcAction::PooledZkappCommands { rpc_id, query });
414 }
415 RpcRequest::ConsensusTimeGet(query) => {
416 store.dispatch(RpcAction::ConsensusTimeGet { rpc_id, query });
417 }
418 RpcRequest::GenesisBlockGet => {
419 store.dispatch(RpcAction::GenesisBlock { rpc_id });
420 }
421 RpcRequest::LedgerStatusGet(ledger_hash) => {
422 store.dispatch(RpcAction::LedgerStatusGetInit {
423 rpc_id,
424 ledger_hash,
425 });
426 }
427 RpcRequest::LedgerAccountDelegatorsGet(ledger_hash, account_id) => {
428 store.dispatch(RpcAction::LedgerAccountDelegatorsGetInit {
429 rpc_id,
430 ledger_hash,
431 account_id,
432 });
433 }
434 },
435 Event::ExternalSnarkWorker(e) => match e {
436 ExternalSnarkWorkerEvent::Started => {
437 store.dispatch(ExternalSnarkWorkerAction::Started);
438 }
439 ExternalSnarkWorkerEvent::Killed => {
440 store.dispatch(ExternalSnarkWorkerAction::Killed);
441 }
442 ExternalSnarkWorkerEvent::WorkResult(result) => {
443 store.dispatch(ExternalSnarkWorkerAction::WorkResult { result });
444 }
445 ExternalSnarkWorkerEvent::WorkError(error) => {
446 store.dispatch(ExternalSnarkWorkerAction::WorkError { error });
447 }
448 ExternalSnarkWorkerEvent::WorkCancelled => {
449 store.dispatch(ExternalSnarkWorkerAction::WorkCancelled);
450 }
451 ExternalSnarkWorkerEvent::Error(error) => {
452 store.dispatch(ExternalSnarkWorkerAction::Error {
453 error,
454 permanent: false,
455 });
456 }
457 },
458 Event::BlockProducerEvent(e) => match e {
459 BlockProducerEvent::VrfEvaluator(vrf_e) => match vrf_e {
460 BlockProducerVrfEvaluatorEvent::Evaluated(vrf_output_with_hash) => {
461 store.dispatch(
462 BlockProducerVrfEvaluatorAction::ProcessSlotEvaluationSuccess {
463 vrf_output: vrf_output_with_hash.evaluation_result,
464 staking_ledger_hash: vrf_output_with_hash.staking_ledger_hash,
465 },
466 );
467 }
468 },
469 BlockProducerEvent::BlockProve(block_hash, res) => match res {
470 Err(err) => todo!(
471 "error while trying to produce block proof for block {block_hash} - {err}"
472 ),
473 Ok(proof) => {
474 if store
475 .state()
476 .transition_frontier
477 .genesis
478 .prove_pending_block_hash()
479 .is_some_and(|hash| hash == block_hash)
480 {
481 store.dispatch(TransitionFrontierGenesisAction::ProveSuccess { proof });
483 } else {
484 store.dispatch(BlockProducerAction::BlockProveSuccess { proof });
485 }
486 }
487 },
488 },
489 Event::GenesisLoad(res) => match res {
490 Err(err) => todo!("error while trying to load genesis config/ledger. - {err}"),
491 Ok(data) => {
492 store.dispatch(TransitionFrontierGenesisAction::LedgerLoadSuccess { data });
493 }
494 },
495 },
496 EventSourceAction::WaitTimeout => {
497 store.dispatch(CheckTimeoutsAction {});
498 }
499 EventSourceAction::WaitForEvents => {}
500 }
501}