p2p/channels/snark/
p2p_channels_snark_reducer.rs

1use openmina_core::{bug_condition, Substate};
2use redux::ActionWithMeta;
3
4use crate::{
5    channels::{ChannelId, MsgId, P2pChannelsEffectfulAction},
6    P2pNetworkPubsubAction, P2pState,
7};
8
9use super::{
10    P2pChannelsSnarkAction, P2pChannelsSnarkState, SnarkPropagationChannelMsg,
11    SnarkPropagationState,
12};
13use mina_p2p_messages::{gossip::GossipNetMessageV2, v2};
14
15impl P2pChannelsSnarkState {
16    pub fn reducer<Action, State>(
17        mut state_context: Substate<Action, State, P2pState>,
18        action: ActionWithMeta<P2pChannelsSnarkAction>,
19    ) -> Result<(), String>
20    where
21        State: crate::P2pStateTrait,
22        Action: crate::P2pActionTrait<State>,
23    {
24        let (action, meta) = action.split();
25        let p2p_state = state_context.get_substate_mut()?;
26
27        let state = action
28            .peer_id()
29            .and_then(|peer_id| p2p_state.get_ready_peer_mut(peer_id))
30            .map(|peer_state| &mut peer_state.channels.snark)
31            .ok_or_else(|| format!("Invalid state for: {action:?}"));
32
33        match action {
34            P2pChannelsSnarkAction::Init { peer_id } => {
35                let state = state.inspect_err(|error| bug_condition!("{}", error))?;
36                *state = Self::Init { time: meta.time() };
37
38                let dispatcher = state_context.into_dispatcher();
39
40                dispatcher.push(P2pChannelsEffectfulAction::InitChannel {
41                    peer_id,
42                    id: ChannelId::SnarkPropagation,
43                    on_success: redux::callback!(
44                        on_snark_channel_init(peer_id: crate::PeerId) -> crate::P2pAction {
45                            P2pChannelsSnarkAction::Pending { peer_id }
46                        }
47                    ),
48                });
49                Ok(())
50            }
51            P2pChannelsSnarkAction::Pending { .. } => {
52                let state = state.inspect_err(|error| bug_condition!("{}", error))?;
53                *state = Self::Pending { time: meta.time() };
54                Ok(())
55            }
56            P2pChannelsSnarkAction::Ready { .. } => {
57                let state = state.inspect_err(|error| bug_condition!("{}", error))?;
58                *state = Self::Ready {
59                    time: meta.time(),
60                    local: SnarkPropagationState::WaitingForRequest { time: meta.time() },
61                    remote: SnarkPropagationState::WaitingForRequest { time: meta.time() },
62                    next_send_index: 0,
63                };
64                Ok(())
65            }
66            P2pChannelsSnarkAction::RequestSend { limit, peer_id } => {
67                let state = state.inspect_err(|error| bug_condition!("{}", error))?;
68                let Self::Ready { local, .. } = state else {
69                    bug_condition!(
70                        "Invalid state for `P2pChannelsSnarkAction::RequestSend`, state: {:?}",
71                        state
72                    );
73                    return Ok(());
74                };
75                *local = SnarkPropagationState::Requested {
76                    time: meta.time(),
77                    requested_limit: limit,
78                };
79
80                let dispatcher = state_context.into_dispatcher();
81                dispatcher.push(P2pChannelsEffectfulAction::MessageSend {
82                    peer_id,
83                    msg_id: MsgId::first(),
84                    msg: SnarkPropagationChannelMsg::GetNext { limit }.into(),
85                });
86                Ok(())
87            }
88            P2pChannelsSnarkAction::PromiseReceived { promised_count, .. } => {
89                let state = state.inspect_err(|error| bug_condition!("{}", error))?;
90                let Self::Ready { local, .. } = state else {
91                    bug_condition!(
92                        "Invalid state for `P2pChannelsSnarkAction::PromiseReceived`, state: {:?}",
93                        state
94                    );
95                    return Ok(());
96                };
97                let SnarkPropagationState::Requested {
98                    requested_limit, ..
99                } = &local
100                else {
101                    return Ok(());
102                };
103                *local = SnarkPropagationState::Responding {
104                    time: meta.time(),
105                    requested_limit: *requested_limit,
106                    promised_count,
107                    current_count: 0,
108                };
109                Ok(())
110            }
111            P2pChannelsSnarkAction::Received { peer_id, snark } => {
112                let state = state.inspect_err(|error| bug_condition!("{}", error))?;
113                let Self::Ready { local, .. } = state else {
114                    bug_condition!(
115                        "Invalid state for `P2pChannelsSnarkAction::Received`, state: {:?}",
116                        state
117                    );
118                    return Ok(());
119                };
120                let SnarkPropagationState::Responding {
121                    promised_count,
122                    current_count,
123                    ..
124                } = local
125                else {
126                    bug_condition!(
127                        "Invalid state for `P2pChannelsSnarkAction::Received`, state: {:?}",
128                        state
129                    );
130                    return Ok(());
131                };
132
133                *current_count += 1;
134
135                if current_count >= promised_count {
136                    *local = SnarkPropagationState::Responded {
137                        time: meta.time(),
138                        count: *current_count,
139                    };
140                }
141
142                let (dispatcher, state) = state_context.into_dispatcher_and_state();
143                let p2p_state: &P2pState = state.substate()?;
144
145                if let Some(callback) = &p2p_state.callbacks.on_p2p_channels_snark_received {
146                    dispatcher.push_callback(callback.clone(), (peer_id, snark));
147                }
148
149                Ok(())
150            }
151            P2pChannelsSnarkAction::RequestReceived { limit, .. } => {
152                let state = state.inspect_err(|error| bug_condition!("{}", error))?;
153                let Self::Ready { remote, .. } = state else {
154                    bug_condition!(
155                        "Invalid state for `P2pChannelsSnarkAction::RequestReceived`, state: {:?}",
156                        state
157                    );
158                    return Ok(());
159                };
160                *remote = SnarkPropagationState::Requested {
161                    time: meta.time(),
162                    requested_limit: limit,
163                };
164                Ok(())
165            }
166            P2pChannelsSnarkAction::ResponseSend {
167                snarks,
168                last_index,
169                peer_id,
170                ..
171            } => {
172                let state = state.inspect_err(|error| bug_condition!("{}", error))?;
173                let Self::Ready {
174                    remote,
175                    next_send_index,
176                    ..
177                } = state
178                else {
179                    bug_condition!(
180                        "Invalid state for `P2pChannelsSnarkAction::ResponseSend`, state: {:?}",
181                        state
182                    );
183                    return Ok(());
184                };
185                *next_send_index = last_index + 1;
186
187                let count = snarks.len() as u8;
188                if count == 0 {
189                    return Ok(());
190                }
191
192                *remote = SnarkPropagationState::Responded {
193                    time: meta.time(),
194                    count,
195                };
196
197                let dispatcher = state_context.into_dispatcher();
198                dispatcher.push(P2pChannelsEffectfulAction::MessageSend {
199                    peer_id,
200                    msg_id: MsgId::first(),
201                    msg: SnarkPropagationChannelMsg::WillSend { count }.into(),
202                });
203
204                for snark in snarks {
205                    dispatcher.push(P2pChannelsEffectfulAction::MessageSend {
206                        peer_id,
207                        msg_id: MsgId::first(),
208                        msg: SnarkPropagationChannelMsg::Snark(snark).into(),
209                    });
210                }
211                Ok(())
212            }
213            #[cfg(not(feature = "p2p-libp2p"))]
214            P2pChannelsSnarkAction::Libp2pBroadcast { .. } => Ok(()),
215            #[cfg(feature = "p2p-libp2p")]
216            P2pChannelsSnarkAction::Libp2pBroadcast {
217                snark,
218                nonce,
219                is_local,
220            } => {
221                let (dispatcher, state) = state_context.into_dispatcher_and_state();
222                let p2p_state: &P2pState = state.substate()?;
223                let pubsub_state = &p2p_state.network.scheduler.broadcast_state.mcache;
224
225                let message_id = crate::BroadcastMessageId::Snark {
226                    job_id: snark.job_id(),
227                };
228
229                if pubsub_state.contains_broadcast_id(&message_id) {
230                    dispatcher
231                        .push(P2pNetworkPubsubAction::BroadcastValidatedMessage { message_id });
232
233                    return Ok(());
234                };
235
236                let message = Box::new((snark.statement(), (&snark).into()));
237                let message = v2::NetworkPoolSnarkPoolDiffVersionedStableV2::AddSolvedWork(message);
238                let nonce = nonce.into();
239                let message = GossipNetMessageV2::SnarkPoolDiff { message, nonce };
240                if is_local {
241                    dispatcher.push(P2pNetworkPubsubAction::Broadcast { message });
242                } else {
243                    // rebroadcast snark if received from webrtc network, otherwise noop.
244                    dispatcher.push(P2pNetworkPubsubAction::WebRtcRebroadcast { message });
245                }
246                Ok(())
247            }
248            P2pChannelsSnarkAction::Libp2pReceived { peer_id, snark, .. } => {
249                let (dispatcher, state) = state_context.into_dispatcher_and_state();
250                let p2p_state: &P2pState = state.substate()?;
251
252                if let Some(callback) = &p2p_state.callbacks.on_p2p_channels_snark_libp2p_received {
253                    dispatcher.push_callback(callback.clone(), (peer_id, snark));
254                }
255
256                Ok(())
257            }
258        }
259    }
260}