1use openmina_core::{bug_condition, Substate};
2use redux::ActionWithMeta;
3
4use crate::{
5 channels::{ChannelId, MsgId, P2pChannelsEffectfulAction},
6 P2pNetworkPubsubAction, P2pState,
7};
8
9use super::{
10 P2pChannelsSnarkAction, P2pChannelsSnarkState, SnarkPropagationChannelMsg,
11 SnarkPropagationState,
12};
13use mina_p2p_messages::{gossip::GossipNetMessageV2, v2};
14
15impl P2pChannelsSnarkState {
16 pub fn reducer<Action, State>(
17 mut state_context: Substate<Action, State, P2pState>,
18 action: ActionWithMeta<P2pChannelsSnarkAction>,
19 ) -> Result<(), String>
20 where
21 State: crate::P2pStateTrait,
22 Action: crate::P2pActionTrait<State>,
23 {
24 let (action, meta) = action.split();
25 let p2p_state = state_context.get_substate_mut()?;
26
27 let state = action
28 .peer_id()
29 .and_then(|peer_id| p2p_state.get_ready_peer_mut(peer_id))
30 .map(|peer_state| &mut peer_state.channels.snark)
31 .ok_or_else(|| format!("Invalid state for: {action:?}"));
32
33 match action {
34 P2pChannelsSnarkAction::Init { peer_id } => {
35 let state = state.inspect_err(|error| bug_condition!("{}", error))?;
36 *state = Self::Init { time: meta.time() };
37
38 let dispatcher = state_context.into_dispatcher();
39
40 dispatcher.push(P2pChannelsEffectfulAction::InitChannel {
41 peer_id,
42 id: ChannelId::SnarkPropagation,
43 on_success: redux::callback!(
44 on_snark_channel_init(peer_id: crate::PeerId) -> crate::P2pAction {
45 P2pChannelsSnarkAction::Pending { peer_id }
46 }
47 ),
48 });
49 Ok(())
50 }
51 P2pChannelsSnarkAction::Pending { .. } => {
52 let state = state.inspect_err(|error| bug_condition!("{}", error))?;
53 *state = Self::Pending { time: meta.time() };
54 Ok(())
55 }
56 P2pChannelsSnarkAction::Ready { .. } => {
57 let state = state.inspect_err(|error| bug_condition!("{}", error))?;
58 *state = Self::Ready {
59 time: meta.time(),
60 local: SnarkPropagationState::WaitingForRequest { time: meta.time() },
61 remote: SnarkPropagationState::WaitingForRequest { time: meta.time() },
62 next_send_index: 0,
63 };
64 Ok(())
65 }
66 P2pChannelsSnarkAction::RequestSend { limit, peer_id } => {
67 let state = state.inspect_err(|error| bug_condition!("{}", error))?;
68 let Self::Ready { local, .. } = state else {
69 bug_condition!(
70 "Invalid state for `P2pChannelsSnarkAction::RequestSend`, state: {:?}",
71 state
72 );
73 return Ok(());
74 };
75 *local = SnarkPropagationState::Requested {
76 time: meta.time(),
77 requested_limit: limit,
78 };
79
80 let dispatcher = state_context.into_dispatcher();
81 dispatcher.push(P2pChannelsEffectfulAction::MessageSend {
82 peer_id,
83 msg_id: MsgId::first(),
84 msg: SnarkPropagationChannelMsg::GetNext { limit }.into(),
85 });
86 Ok(())
87 }
88 P2pChannelsSnarkAction::PromiseReceived { promised_count, .. } => {
89 let state = state.inspect_err(|error| bug_condition!("{}", error))?;
90 let Self::Ready { local, .. } = state else {
91 bug_condition!(
92 "Invalid state for `P2pChannelsSnarkAction::PromiseReceived`, state: {:?}",
93 state
94 );
95 return Ok(());
96 };
97 let SnarkPropagationState::Requested {
98 requested_limit, ..
99 } = &local
100 else {
101 return Ok(());
102 };
103 *local = SnarkPropagationState::Responding {
104 time: meta.time(),
105 requested_limit: *requested_limit,
106 promised_count,
107 current_count: 0,
108 };
109 Ok(())
110 }
111 P2pChannelsSnarkAction::Received { peer_id, snark } => {
112 let state = state.inspect_err(|error| bug_condition!("{}", error))?;
113 let Self::Ready { local, .. } = state else {
114 bug_condition!(
115 "Invalid state for `P2pChannelsSnarkAction::Received`, state: {:?}",
116 state
117 );
118 return Ok(());
119 };
120 let SnarkPropagationState::Responding {
121 promised_count,
122 current_count,
123 ..
124 } = local
125 else {
126 bug_condition!(
127 "Invalid state for `P2pChannelsSnarkAction::Received`, state: {:?}",
128 state
129 );
130 return Ok(());
131 };
132
133 *current_count += 1;
134
135 if current_count >= promised_count {
136 *local = SnarkPropagationState::Responded {
137 time: meta.time(),
138 count: *current_count,
139 };
140 }
141
142 let (dispatcher, state) = state_context.into_dispatcher_and_state();
143 let p2p_state: &P2pState = state.substate()?;
144
145 if let Some(callback) = &p2p_state.callbacks.on_p2p_channels_snark_received {
146 dispatcher.push_callback(callback.clone(), (peer_id, snark));
147 }
148
149 Ok(())
150 }
151 P2pChannelsSnarkAction::RequestReceived { limit, .. } => {
152 let state = state.inspect_err(|error| bug_condition!("{}", error))?;
153 let Self::Ready { remote, .. } = state else {
154 bug_condition!(
155 "Invalid state for `P2pChannelsSnarkAction::RequestReceived`, state: {:?}",
156 state
157 );
158 return Ok(());
159 };
160 *remote = SnarkPropagationState::Requested {
161 time: meta.time(),
162 requested_limit: limit,
163 };
164 Ok(())
165 }
166 P2pChannelsSnarkAction::ResponseSend {
167 snarks,
168 last_index,
169 peer_id,
170 ..
171 } => {
172 let state = state.inspect_err(|error| bug_condition!("{}", error))?;
173 let Self::Ready {
174 remote,
175 next_send_index,
176 ..
177 } = state
178 else {
179 bug_condition!(
180 "Invalid state for `P2pChannelsSnarkAction::ResponseSend`, state: {:?}",
181 state
182 );
183 return Ok(());
184 };
185 *next_send_index = last_index + 1;
186
187 let count = snarks.len() as u8;
188 if count == 0 {
189 return Ok(());
190 }
191
192 *remote = SnarkPropagationState::Responded {
193 time: meta.time(),
194 count,
195 };
196
197 let dispatcher = state_context.into_dispatcher();
198 dispatcher.push(P2pChannelsEffectfulAction::MessageSend {
199 peer_id,
200 msg_id: MsgId::first(),
201 msg: SnarkPropagationChannelMsg::WillSend { count }.into(),
202 });
203
204 for snark in snarks {
205 dispatcher.push(P2pChannelsEffectfulAction::MessageSend {
206 peer_id,
207 msg_id: MsgId::first(),
208 msg: SnarkPropagationChannelMsg::Snark(snark).into(),
209 });
210 }
211 Ok(())
212 }
213 #[cfg(not(feature = "p2p-libp2p"))]
214 P2pChannelsSnarkAction::Libp2pBroadcast { .. } => Ok(()),
215 #[cfg(feature = "p2p-libp2p")]
216 P2pChannelsSnarkAction::Libp2pBroadcast {
217 snark,
218 nonce,
219 is_local,
220 } => {
221 let (dispatcher, state) = state_context.into_dispatcher_and_state();
222 let p2p_state: &P2pState = state.substate()?;
223 let pubsub_state = &p2p_state.network.scheduler.broadcast_state.mcache;
224
225 let message_id = crate::BroadcastMessageId::Snark {
226 job_id: snark.job_id(),
227 };
228
229 if pubsub_state.contains_broadcast_id(&message_id) {
230 dispatcher
231 .push(P2pNetworkPubsubAction::BroadcastValidatedMessage { message_id });
232
233 return Ok(());
234 };
235
236 let message = Box::new((snark.statement(), (&snark).into()));
237 let message = v2::NetworkPoolSnarkPoolDiffVersionedStableV2::AddSolvedWork(message);
238 let nonce = nonce.into();
239 let message = GossipNetMessageV2::SnarkPoolDiff { message, nonce };
240 if is_local {
241 dispatcher.push(P2pNetworkPubsubAction::Broadcast { message });
242 } else {
243 dispatcher.push(P2pNetworkPubsubAction::WebRtcRebroadcast { message });
245 }
246 Ok(())
247 }
248 P2pChannelsSnarkAction::Libp2pReceived { peer_id, snark, .. } => {
249 let (dispatcher, state) = state_context.into_dispatcher_and_state();
250 let p2p_state: &P2pState = state.substate()?;
251
252 if let Some(callback) = &p2p_state.callbacks.on_p2p_channels_snark_libp2p_received {
253 dispatcher.push_callback(callback.clone(), (peer_id, snark));
254 }
255
256 Ok(())
257 }
258 }
259 }
260}