p2p/channels/snark_job_commitment/
p2p_channels_snark_job_commitment_reducer.rs

1use openmina_core::{bug_condition, Substate};
2use redux::ActionWithMeta;
3
4use crate::{
5    channels::{ChannelId, MsgId, P2pChannelsEffectfulAction},
6    P2pState,
7};
8
9use super::{
10    P2pChannelsSnarkJobCommitmentAction, P2pChannelsSnarkJobCommitmentState,
11    SnarkJobCommitmentPropagationChannelMsg, SnarkJobCommitmentPropagationState,
12};
13
14const LIMIT: u8 = 16;
15
16impl P2pChannelsSnarkJobCommitmentState {
17    /// Substate is accessed
18    pub fn reducer<Action, State>(
19        mut state_context: Substate<Action, State, P2pState>,
20        action: ActionWithMeta<P2pChannelsSnarkJobCommitmentAction>,
21    ) -> Result<(), String>
22    where
23        State: crate::P2pStateTrait,
24        Action: crate::P2pActionTrait<State>,
25    {
26        let (action, meta) = action.split();
27        let p2p_state = state_context.get_substate_mut()?;
28        let peer_id = *action.peer_id();
29        let snark_job_state = &mut p2p_state
30            .get_ready_peer_mut(&peer_id)
31            .ok_or_else(|| format!("Peer state not found for: {action:?}"))?
32            .channels
33            .snark_job_commitment;
34
35        match action {
36            P2pChannelsSnarkJobCommitmentAction::Init { .. } => {
37                *snark_job_state = Self::Init { time: meta.time() };
38
39                let dispatcher = state_context.into_dispatcher();
40                dispatcher.push(P2pChannelsEffectfulAction::InitChannel {
41                    peer_id,
42                    id: ChannelId::SnarkJobCommitmentPropagation,
43                    on_success: redux::callback!(
44                        on_snark_job_commitment_channel_init(peer_id: crate::PeerId) -> crate::P2pAction {
45                            P2pChannelsSnarkJobCommitmentAction::Pending { peer_id }
46                        }
47                    ),
48                });
49                Ok(())
50            }
51            P2pChannelsSnarkJobCommitmentAction::Pending { .. } => {
52                *snark_job_state = Self::Pending { time: meta.time() };
53                Ok(())
54            }
55            P2pChannelsSnarkJobCommitmentAction::Ready { .. } => {
56                *snark_job_state = Self::Ready {
57                    time: meta.time(),
58                    local: SnarkJobCommitmentPropagationState::WaitingForRequest {
59                        time: meta.time(),
60                    },
61                    remote: SnarkJobCommitmentPropagationState::WaitingForRequest {
62                        time: meta.time(),
63                    },
64                    next_send_index: 0,
65                };
66
67                let dispatcher = state_context.into_dispatcher();
68                dispatcher.push(P2pChannelsSnarkJobCommitmentAction::RequestSend {
69                    peer_id,
70                    limit: LIMIT,
71                });
72                Ok(())
73            }
74            P2pChannelsSnarkJobCommitmentAction::RequestSend { limit, .. } => {
75                let Self::Ready { local, .. } = snark_job_state else {
76                    bug_condition!(
77                        "Invalid state for `P2pChannelsSnarkJobCommitmentAction::RequestSend`, state: {:?}",
78                        snark_job_state
79                    );
80                    return Ok(());
81                };
82                *local = SnarkJobCommitmentPropagationState::Requested {
83                    time: meta.time(),
84                    requested_limit: limit,
85                };
86
87                let dispatcher = state_context.into_dispatcher();
88                dispatcher
89                    .push(P2pChannelsSnarkJobCommitmentAction::RequestSend { peer_id, limit });
90                Ok(())
91            }
92            P2pChannelsSnarkJobCommitmentAction::PromiseReceived { promised_count, .. } => {
93                let Self::Ready { local, .. } = snark_job_state else {
94                    bug_condition!(
95                        "Invalid state for `P2pChannelsSnarkJobCommitmentAction::PromiseReceived`, state: {:?}",
96                        snark_job_state
97                    );
98                    return Ok(());
99                };
100                let SnarkJobCommitmentPropagationState::Requested {
101                    requested_limit, ..
102                } = &local
103                else {
104                    bug_condition!(
105                        "Invalid state for `P2pChannelsSnarkJobCommitmentAction::PromiseReceived`, state: {:?}",
106                        snark_job_state
107                    );
108                    return Ok(());
109                };
110
111                *local = SnarkJobCommitmentPropagationState::Responding {
112                    time: meta.time(),
113                    requested_limit: *requested_limit,
114                    promised_count,
115                    current_count: 0,
116                };
117                Ok(())
118            }
119            P2pChannelsSnarkJobCommitmentAction::Received { commitment, .. } => {
120                let Self::Ready { local, .. } = snark_job_state else {
121                    bug_condition!(
122                        "Invalid state for `P2pChannelsSnarkJobCommitmentAction::Received`, state: {:?}",
123                        snark_job_state
124                    );
125                    return Ok(());
126                };
127                let SnarkJobCommitmentPropagationState::Responding {
128                    promised_count,
129                    current_count,
130                    ..
131                } = local
132                else {
133                    bug_condition!(
134                        "Invalid state for `P2pChannelsSnarkJobCommitmentAction::Received`, state: {:?}",
135                        snark_job_state
136                    );
137                    return Ok(());
138                };
139
140                *current_count += 1;
141
142                if current_count >= promised_count {
143                    *local = SnarkJobCommitmentPropagationState::Responded {
144                        time: meta.time(),
145                        count: *current_count,
146                    };
147                }
148
149                let (dispatcher, state) = state_context.into_dispatcher_and_state();
150                let p2p_state: &P2pState = state.substate()?;
151                dispatcher.push(P2pChannelsSnarkJobCommitmentAction::RequestSend {
152                    peer_id,
153                    limit: LIMIT,
154                });
155
156                if let Some(callback) = &p2p_state
157                    .callbacks
158                    .on_p2p_channels_snark_job_commitment_received
159                {
160                    dispatcher.push_callback(callback.clone(), (peer_id, commitment));
161                }
162                Ok(())
163            }
164            P2pChannelsSnarkJobCommitmentAction::RequestReceived { limit, .. } => {
165                let Self::Ready { remote, .. } = snark_job_state else {
166                    bug_condition!(
167                        "Invalid state for `P2pChannelsSnarkJobCommitmentAction::RequestReceived`, state: {:?}",
168                        snark_job_state
169                    );
170                    return Ok(());
171                };
172                *remote = SnarkJobCommitmentPropagationState::Requested {
173                    time: meta.time(),
174                    requested_limit: limit,
175                };
176                Ok(())
177            }
178            P2pChannelsSnarkJobCommitmentAction::ResponseSend {
179                last_index,
180                commitments,
181                ..
182            } => {
183                let Self::Ready {
184                    remote,
185                    next_send_index,
186                    ..
187                } = snark_job_state
188                else {
189                    bug_condition!(
190                    "Invalid state for `P2pChannelsSnarkJobCommitmentAction::ResponseSend`, state: {:?}",
191                    snark_job_state
192                );
193                    return Ok(());
194                };
195                *next_send_index = last_index + 1;
196
197                let count = commitments.len() as u8;
198                if count == 0 {
199                    return Ok(());
200                }
201
202                *remote = SnarkJobCommitmentPropagationState::Responded {
203                    time: meta.time(),
204                    count,
205                };
206
207                let dispatcher = state_context.into_dispatcher();
208                let msg = SnarkJobCommitmentPropagationChannelMsg::WillSend { count }.into();
209                dispatcher.push(P2pChannelsEffectfulAction::MessageSend {
210                    peer_id,
211                    msg_id: MsgId::first(),
212                    msg,
213                });
214
215                for commitment in commitments {
216                    let msg =
217                        SnarkJobCommitmentPropagationChannelMsg::Commitment(commitment).into();
218                    dispatcher.push(P2pChannelsEffectfulAction::MessageSend {
219                        peer_id,
220                        msg_id: MsgId::first(),
221                        msg,
222                    });
223                }
224
225                Ok(())
226            }
227        }
228    }
229}