p2p/channels/snark_job_commitment/
p2p_channels_snark_job_commitment_reducer.rs1use openmina_core::{bug_condition, Substate};
2use redux::ActionWithMeta;
3
4use crate::{
5 channels::{ChannelId, MsgId, P2pChannelsEffectfulAction},
6 P2pState,
7};
8
9use super::{
10 P2pChannelsSnarkJobCommitmentAction, P2pChannelsSnarkJobCommitmentState,
11 SnarkJobCommitmentPropagationChannelMsg, SnarkJobCommitmentPropagationState,
12};
13
14const LIMIT: u8 = 16;
15
16impl P2pChannelsSnarkJobCommitmentState {
17 pub fn reducer<Action, State>(
19 mut state_context: Substate<Action, State, P2pState>,
20 action: ActionWithMeta<P2pChannelsSnarkJobCommitmentAction>,
21 ) -> Result<(), String>
22 where
23 State: crate::P2pStateTrait,
24 Action: crate::P2pActionTrait<State>,
25 {
26 let (action, meta) = action.split();
27 let p2p_state = state_context.get_substate_mut()?;
28 let peer_id = *action.peer_id();
29 let snark_job_state = &mut p2p_state
30 .get_ready_peer_mut(&peer_id)
31 .ok_or_else(|| format!("Peer state not found for: {action:?}"))?
32 .channels
33 .snark_job_commitment;
34
35 match action {
36 P2pChannelsSnarkJobCommitmentAction::Init { .. } => {
37 *snark_job_state = Self::Init { time: meta.time() };
38
39 let dispatcher = state_context.into_dispatcher();
40 dispatcher.push(P2pChannelsEffectfulAction::InitChannel {
41 peer_id,
42 id: ChannelId::SnarkJobCommitmentPropagation,
43 on_success: redux::callback!(
44 on_snark_job_commitment_channel_init(peer_id: crate::PeerId) -> crate::P2pAction {
45 P2pChannelsSnarkJobCommitmentAction::Pending { peer_id }
46 }
47 ),
48 });
49 Ok(())
50 }
51 P2pChannelsSnarkJobCommitmentAction::Pending { .. } => {
52 *snark_job_state = Self::Pending { time: meta.time() };
53 Ok(())
54 }
55 P2pChannelsSnarkJobCommitmentAction::Ready { .. } => {
56 *snark_job_state = Self::Ready {
57 time: meta.time(),
58 local: SnarkJobCommitmentPropagationState::WaitingForRequest {
59 time: meta.time(),
60 },
61 remote: SnarkJobCommitmentPropagationState::WaitingForRequest {
62 time: meta.time(),
63 },
64 next_send_index: 0,
65 };
66
67 let dispatcher = state_context.into_dispatcher();
68 dispatcher.push(P2pChannelsSnarkJobCommitmentAction::RequestSend {
69 peer_id,
70 limit: LIMIT,
71 });
72 Ok(())
73 }
74 P2pChannelsSnarkJobCommitmentAction::RequestSend { limit, .. } => {
75 let Self::Ready { local, .. } = snark_job_state else {
76 bug_condition!(
77 "Invalid state for `P2pChannelsSnarkJobCommitmentAction::RequestSend`, state: {:?}",
78 snark_job_state
79 );
80 return Ok(());
81 };
82 *local = SnarkJobCommitmentPropagationState::Requested {
83 time: meta.time(),
84 requested_limit: limit,
85 };
86
87 let dispatcher = state_context.into_dispatcher();
88 dispatcher
89 .push(P2pChannelsSnarkJobCommitmentAction::RequestSend { peer_id, limit });
90 Ok(())
91 }
92 P2pChannelsSnarkJobCommitmentAction::PromiseReceived { promised_count, .. } => {
93 let Self::Ready { local, .. } = snark_job_state else {
94 bug_condition!(
95 "Invalid state for `P2pChannelsSnarkJobCommitmentAction::PromiseReceived`, state: {:?}",
96 snark_job_state
97 );
98 return Ok(());
99 };
100 let SnarkJobCommitmentPropagationState::Requested {
101 requested_limit, ..
102 } = &local
103 else {
104 bug_condition!(
105 "Invalid state for `P2pChannelsSnarkJobCommitmentAction::PromiseReceived`, state: {:?}",
106 snark_job_state
107 );
108 return Ok(());
109 };
110
111 *local = SnarkJobCommitmentPropagationState::Responding {
112 time: meta.time(),
113 requested_limit: *requested_limit,
114 promised_count,
115 current_count: 0,
116 };
117 Ok(())
118 }
119 P2pChannelsSnarkJobCommitmentAction::Received { commitment, .. } => {
120 let Self::Ready { local, .. } = snark_job_state else {
121 bug_condition!(
122 "Invalid state for `P2pChannelsSnarkJobCommitmentAction::Received`, state: {:?}",
123 snark_job_state
124 );
125 return Ok(());
126 };
127 let SnarkJobCommitmentPropagationState::Responding {
128 promised_count,
129 current_count,
130 ..
131 } = local
132 else {
133 bug_condition!(
134 "Invalid state for `P2pChannelsSnarkJobCommitmentAction::Received`, state: {:?}",
135 snark_job_state
136 );
137 return Ok(());
138 };
139
140 *current_count += 1;
141
142 if current_count >= promised_count {
143 *local = SnarkJobCommitmentPropagationState::Responded {
144 time: meta.time(),
145 count: *current_count,
146 };
147 }
148
149 let (dispatcher, state) = state_context.into_dispatcher_and_state();
150 let p2p_state: &P2pState = state.substate()?;
151 dispatcher.push(P2pChannelsSnarkJobCommitmentAction::RequestSend {
152 peer_id,
153 limit: LIMIT,
154 });
155
156 if let Some(callback) = &p2p_state
157 .callbacks
158 .on_p2p_channels_snark_job_commitment_received
159 {
160 dispatcher.push_callback(callback.clone(), (peer_id, commitment));
161 }
162 Ok(())
163 }
164 P2pChannelsSnarkJobCommitmentAction::RequestReceived { limit, .. } => {
165 let Self::Ready { remote, .. } = snark_job_state else {
166 bug_condition!(
167 "Invalid state for `P2pChannelsSnarkJobCommitmentAction::RequestReceived`, state: {:?}",
168 snark_job_state
169 );
170 return Ok(());
171 };
172 *remote = SnarkJobCommitmentPropagationState::Requested {
173 time: meta.time(),
174 requested_limit: limit,
175 };
176 Ok(())
177 }
178 P2pChannelsSnarkJobCommitmentAction::ResponseSend {
179 last_index,
180 commitments,
181 ..
182 } => {
183 let Self::Ready {
184 remote,
185 next_send_index,
186 ..
187 } = snark_job_state
188 else {
189 bug_condition!(
190 "Invalid state for `P2pChannelsSnarkJobCommitmentAction::ResponseSend`, state: {:?}",
191 snark_job_state
192 );
193 return Ok(());
194 };
195 *next_send_index = last_index + 1;
196
197 let count = commitments.len() as u8;
198 if count == 0 {
199 return Ok(());
200 }
201
202 *remote = SnarkJobCommitmentPropagationState::Responded {
203 time: meta.time(),
204 count,
205 };
206
207 let dispatcher = state_context.into_dispatcher();
208 let msg = SnarkJobCommitmentPropagationChannelMsg::WillSend { count }.into();
209 dispatcher.push(P2pChannelsEffectfulAction::MessageSend {
210 peer_id,
211 msg_id: MsgId::first(),
212 msg,
213 });
214
215 for commitment in commitments {
216 let msg =
217 SnarkJobCommitmentPropagationChannelMsg::Commitment(commitment).into();
218 dispatcher.push(P2pChannelsEffectfulAction::MessageSend {
219 peer_id,
220 msg_id: MsgId::first(),
221 msg,
222 });
223 }
224
225 Ok(())
226 }
227 }
228 }
229}