p2p/channels/signaling/discovery/
p2p_channels_signaling_discovery_reducer.rs

1use openmina_core::{bug_condition, Substate};
2use redux::ActionWithMeta;
3
4use crate::{
5    channels::{
6        signaling::exchange::P2pChannelsSignalingExchangeAction, ChannelId, MsgId,
7        P2pChannelsEffectfulAction,
8    },
9    connection::{
10        outgoing::{P2pConnectionOutgoingAction, P2pConnectionOutgoingInitOpts},
11        P2pConnectionErrorResponse, P2pConnectionResponse,
12    },
13    webrtc::SignalingMethod,
14    P2pState,
15};
16
17use super::{
18    P2pChannelsSignalingDiscoveryAction, P2pChannelsSignalingDiscoveryState,
19    SignalingDiscoveryChannelMsg, SignalingDiscoveryState,
20};
21
22impl P2pChannelsSignalingDiscoveryState {
23    /// Substate is accessed
24    pub fn reducer<Action, State>(
25        mut state_context: Substate<Action, State, P2pState>,
26        action: ActionWithMeta<P2pChannelsSignalingDiscoveryAction>,
27    ) -> Result<(), String>
28    where
29        State: crate::P2pStateTrait,
30        Action: crate::P2pActionTrait<State>,
31    {
32        let (action, meta) = action.split();
33        let p2p_state = state_context.get_substate_mut()?;
34        let peer_id = *action.peer_id();
35        let state = &mut p2p_state
36            .get_ready_peer_mut(&peer_id)
37            .ok_or_else(|| format!("Peer state not found for: {action:?}"))?
38            .channels
39            .signaling
40            .discovery;
41
42        match action {
43            P2pChannelsSignalingDiscoveryAction::Init { .. } => {
44                *state = Self::Init { time: meta.time() };
45
46                let dispatcher = state_context.into_dispatcher();
47                dispatcher.push(P2pChannelsEffectfulAction::InitChannel {
48                    peer_id,
49                    id: ChannelId::SignalingDiscovery,
50                    on_success: redux::callback!(
51                        on_signaling_discovery_channel_init(peer_id: crate::PeerId) -> crate::P2pAction {
52                            P2pChannelsSignalingDiscoveryAction::Pending { peer_id }
53                        }
54                    ),
55                });
56                Ok(())
57            }
58            P2pChannelsSignalingDiscoveryAction::Pending { .. } => {
59                *state = Self::Pending { time: meta.time() };
60                Ok(())
61            }
62            P2pChannelsSignalingDiscoveryAction::Ready { .. } => {
63                *state = Self::Ready {
64                    time: meta.time(),
65                    local: SignalingDiscoveryState::WaitingForRequest { time: meta.time() },
66                    remote: SignalingDiscoveryState::WaitingForRequest { time: meta.time() },
67                };
68
69                let dispatcher = state_context.into_dispatcher();
70                dispatcher.push(P2pChannelsSignalingDiscoveryAction::RequestSend { peer_id });
71
72                Ok(())
73            }
74            P2pChannelsSignalingDiscoveryAction::RequestSend { .. } => {
75                let Self::Ready { local, .. } = state else {
76                    bug_condition!(
77                        "Invalid state for `P2pChannelsSignalingDiscoveryAction::RequestSend`, state: {state:?}",
78                    );
79                    return Ok(());
80                };
81                *local = SignalingDiscoveryState::Requested { time: meta.time() };
82
83                let dispatcher = state_context.into_dispatcher();
84
85                let msg = SignalingDiscoveryChannelMsg::GetNext.into();
86                dispatcher.push(P2pChannelsEffectfulAction::MessageSend {
87                    peer_id,
88                    msg_id: MsgId::first(),
89                    msg,
90                });
91                Ok(())
92            }
93            P2pChannelsSignalingDiscoveryAction::DiscoveryRequestReceived { .. } => {
94                let Self::Ready { local, .. } = state else {
95                    bug_condition!(
96                        "Invalid state for `P2pChannelsSignalingDiscoveryAction::DiscoveryRequestReceived`, state: {state:?}",
97                    );
98                    return Ok(());
99                };
100
101                *local = SignalingDiscoveryState::DiscoveryRequested { time: meta.time() };
102
103                let (dispatcher, state) = state_context.into_dispatcher_and_state();
104                let state: &P2pState = state.substate()?;
105                state.webrtc_discovery_respond_with_availble_peers(dispatcher, meta.time());
106
107                Ok(())
108            }
109            P2pChannelsSignalingDiscoveryAction::DiscoveredSend {
110                target_public_key, ..
111            } => {
112                let Self::Ready { local, .. } = state else {
113                    bug_condition!(
114                        "Invalid state for `P2pChannelsSignalingDiscoveryAction::DiscoveredSend`, state: {state:?}",
115                    );
116                    return Ok(());
117                };
118
119                *local = SignalingDiscoveryState::Discovered {
120                    time: meta.time(),
121                    target_public_key: target_public_key.clone(),
122                };
123
124                let dispatcher = state_context.into_dispatcher();
125
126                let msg = SignalingDiscoveryChannelMsg::Discovered { target_public_key }.into();
127                dispatcher.push(P2pChannelsEffectfulAction::MessageSend {
128                    peer_id,
129                    msg_id: MsgId::first(),
130                    msg,
131                });
132                Ok(())
133            }
134            P2pChannelsSignalingDiscoveryAction::DiscoveredRejectReceived { .. } => {
135                let Self::Ready { local, .. } = state else {
136                    bug_condition!(
137                        "Invalid state for `P2pChannelsSignalingDiscoveryAction::DiscoveredRejectReceived`, state: {state:?}",
138                    );
139                    return Ok(());
140                };
141
142                let target_public_key = match local {
143                    SignalingDiscoveryState::Discovered {
144                        target_public_key, ..
145                    } => target_public_key.clone(),
146                    state => {
147                        bug_condition!(
148                            "Invalid state for `P2pChannelsSignalingDiscoveryAction::DiscoveredRejectReceived`, state: {state:?}",
149                        );
150                        return Ok(());
151                    }
152                };
153
154                *local = SignalingDiscoveryState::DiscoveredRejected {
155                    time: meta.time(),
156                    target_public_key,
157                };
158
159                let (dispatcher, state) = state_context.into_dispatcher_and_state();
160                let state: &P2pState = state.substate()?;
161                state.webrtc_discovery_respond_with_availble_peers(dispatcher, meta.time());
162
163                Ok(())
164            }
165            P2pChannelsSignalingDiscoveryAction::DiscoveredAcceptReceived { offer, .. } => {
166                let Self::Ready { local, .. } = state else {
167                    bug_condition!(
168                        "Invalid state for `P2pChannelsSignalingDiscoveryAction::DiscoveredAcceptReceived`, state: {state:?}",
169                    );
170                    return Ok(());
171                };
172
173                let target_public_key = match local {
174                    SignalingDiscoveryState::Discovered {
175                        target_public_key, ..
176                    } => target_public_key.clone(),
177                    state => {
178                        bug_condition!(
179                            "Invalid state for `P2pChannelsSignalingDiscoveryAction::DiscoveredAcceptReceived`, state: {state:?}",
180                        );
181                        return Ok(());
182                    }
183                };
184
185                *local = SignalingDiscoveryState::DiscoveredAccepted {
186                    time: meta.time(),
187                    target_public_key: target_public_key.clone(),
188                };
189
190                let dispatcher = state_context.into_dispatcher();
191                dispatcher.push(P2pChannelsSignalingExchangeAction::OfferSend {
192                    peer_id: target_public_key.peer_id(),
193                    offerer_pub_key: peer_id.to_public_key().unwrap(),
194                    offer: offer.clone(),
195                });
196                Ok(())
197            }
198            P2pChannelsSignalingDiscoveryAction::AnswerSend { answer, .. } => {
199                let Self::Ready { local, .. } = state else {
200                    bug_condition!(
201                        "Invalid state for `P2pChannelsSignalingDiscoveryAction::AnswerSend`, state: {state:?}",
202                    );
203                    return Ok(());
204                };
205
206                *local = SignalingDiscoveryState::Answered { time: meta.time() };
207
208                let dispatcher = state_context.into_dispatcher();
209
210                let msg = SignalingDiscoveryChannelMsg::Answer(answer.clone()).into();
211                dispatcher.push(P2pChannelsEffectfulAction::MessageSend {
212                    peer_id,
213                    msg_id: MsgId::first(),
214                    msg,
215                });
216                Ok(())
217            }
218            P2pChannelsSignalingDiscoveryAction::RequestReceived { .. } => {
219                let Self::Ready { remote, .. } = state else {
220                    bug_condition!(
221                        "Invalid state for `P2pChannelsSignalingDiscoveryAction::RequestReceived`, state: {state:?}",
222                    );
223                    return Ok(());
224                };
225
226                *remote = SignalingDiscoveryState::Requested { time: meta.time() };
227                Ok(())
228            }
229            P2pChannelsSignalingDiscoveryAction::DiscoveryRequestSend { .. } => {
230                let Self::Ready { remote, .. } = state else {
231                    bug_condition!(
232                        "Invalid state for `P2pChannelsSignalingDiscoveryAction::DiscoveryRequestSend`, state: {state:?}",
233                    );
234                    return Ok(());
235                };
236
237                *remote = SignalingDiscoveryState::DiscoveryRequested { time: meta.time() };
238                let dispatcher = state_context.into_dispatcher();
239
240                let msg = SignalingDiscoveryChannelMsg::Discover.into();
241                dispatcher.push(P2pChannelsEffectfulAction::MessageSend {
242                    peer_id,
243                    msg_id: MsgId::first(),
244                    msg,
245                });
246                Ok(())
247            }
248            P2pChannelsSignalingDiscoveryAction::DiscoveredReceived {
249                target_public_key, ..
250            } => {
251                let Self::Ready { remote, .. } = state else {
252                    bug_condition!(
253                        "Invalid state for `P2pChannelsSignalingDiscoveryAction::DiscoveredReceived`, state: {state:?}",
254                    );
255                    return Ok(());
256                };
257
258                *remote = SignalingDiscoveryState::Discovered {
259                    time: meta.time(),
260                    target_public_key: target_public_key.clone(),
261                };
262                let (dispatcher, state) = state_context.into_dispatcher_and_state();
263                let state: &P2pState = state.substate()?;
264                let action = P2pConnectionOutgoingAction::Init {
265                    opts: P2pConnectionOutgoingInitOpts::WebRTC {
266                        peer_id: target_public_key.peer_id(),
267                        signaling: SignalingMethod::P2p {
268                            relay_peer_id: peer_id,
269                        },
270                    },
271                    rpc_id: None,
272                    on_success: None,
273                };
274                let accepted = redux::EnablingCondition::is_enabled(&action, state, meta.time());
275                if accepted {
276                    dispatcher.push(action);
277                } else {
278                    dispatcher
279                        .push(P2pChannelsSignalingDiscoveryAction::DiscoveredReject { peer_id });
280                }
281                Ok(())
282            }
283            P2pChannelsSignalingDiscoveryAction::DiscoveredReject { .. } => {
284                let Self::Ready { remote, .. } = state else {
285                    bug_condition!(
286                        "Invalid state for `P2pChannelsSignalingDiscoveryAction::DiscoveredReject`, state: {state:?}",
287                    );
288                    return Ok(());
289                };
290
291                let target_public_key = match remote {
292                    SignalingDiscoveryState::Discovered {
293                        target_public_key, ..
294                    } => target_public_key.clone(),
295                    state => {
296                        bug_condition!(
297                            "Invalid state for `P2pChannelsSignalingDiscoveryAction::DiscoveredReject`, state: {state:?}",
298                        );
299                        return Ok(());
300                    }
301                };
302
303                *remote = SignalingDiscoveryState::DiscoveredRejected {
304                    time: meta.time(),
305                    target_public_key,
306                };
307                let dispatcher = state_context.into_dispatcher();
308
309                let msg = SignalingDiscoveryChannelMsg::DiscoveredReject.into();
310                dispatcher.push(P2pChannelsEffectfulAction::MessageSend {
311                    peer_id,
312                    msg_id: MsgId::first(),
313                    msg,
314                });
315                Ok(())
316            }
317            P2pChannelsSignalingDiscoveryAction::DiscoveredAccept { offer, .. } => {
318                let Self::Ready { remote, .. } = state else {
319                    bug_condition!(
320                        "Invalid state for `P2pChannelsSignalingDiscoveryAction::DiscoveredAccept`, state: {state:?}",
321                    );
322                    return Ok(());
323                };
324
325                let target_public_key = match remote {
326                    SignalingDiscoveryState::Discovered {
327                        target_public_key, ..
328                    } => target_public_key.clone(),
329                    state => {
330                        bug_condition!(
331                            "Invalid state for `P2pChannelsSignalingDiscoveryAction::DiscoveredAccept`, state: {state:?}",
332                        );
333                        return Ok(());
334                    }
335                };
336
337                *remote = SignalingDiscoveryState::DiscoveredAccepted {
338                    time: meta.time(),
339                    target_public_key: target_public_key.clone(),
340                };
341                let dispatcher = state_context.into_dispatcher();
342                // TODO(binier): this action might not be enabled, in
343                // which case we sshould be rejecting discovered peer.
344                dispatcher.push(P2pConnectionOutgoingAction::OfferSendSuccess {
345                    peer_id: target_public_key.peer_id(),
346                });
347                dispatcher.push(
348                    P2pChannelsEffectfulAction::SignalingDiscoveryOfferEncryptAndSend {
349                        peer_id,
350                        pub_key: target_public_key,
351                        offer,
352                    },
353                );
354                Ok(())
355            }
356            P2pChannelsSignalingDiscoveryAction::AnswerReceived { answer, .. } => {
357                let Self::Ready { remote, .. } = state else {
358                    bug_condition!(
359                        "Invalid state for `P2pChannelsSignalingDiscoveryAction::AnswerReceived`, state: {state:?}",
360                    );
361                    return Ok(());
362                };
363
364                let target_public_key = match remote {
365                    SignalingDiscoveryState::DiscoveredAccepted {
366                        target_public_key, ..
367                    } => target_public_key.clone(),
368                    state => {
369                        bug_condition!(
370                        "Invalid state for `P2pChannelsSignalingDiscoveryAction::AnswerReceived`, state: {state:?}",
371                    );
372                        return Ok(());
373                    }
374                };
375
376                let dispatcher = state_context.into_dispatcher();
377                match answer {
378                    // TODO(binier): custom error
379                    None => dispatcher.push(P2pConnectionOutgoingAction::AnswerRecvError {
380                        peer_id: target_public_key.peer_id(),
381                        error: P2pConnectionErrorResponse::InternalError,
382                    }),
383                    Some(answer) => dispatcher.push(
384                        P2pChannelsEffectfulAction::SignalingDiscoveryAnswerDecrypt {
385                            peer_id,
386                            pub_key: target_public_key,
387                            answer: answer.clone(),
388                        },
389                    ),
390                }
391                Ok(())
392            }
393            P2pChannelsSignalingDiscoveryAction::AnswerDecrypted { answer, .. } => {
394                let Self::Ready { remote, .. } = state else {
395                    bug_condition!(
396                        "Invalid state for `P2pChannelsSignalingDiscoveryAction::AnswerDecrypted`, state: {state:?}",
397                    );
398                    return Ok(());
399                };
400
401                let target_public_key = match remote {
402                    SignalingDiscoveryState::DiscoveredAccepted {
403                        target_public_key, ..
404                    } => target_public_key.clone(),
405                    state => {
406                        bug_condition!(
407                            "Invalid state for `P2pChannelsSignalingDiscoveryAction::AnswerDecrypted`, state: {state:?}",
408                        );
409                        return Ok(());
410                    }
411                };
412
413                *remote = SignalingDiscoveryState::Answered { time: meta.time() };
414
415                let (dispatcher, state) = state_context.into_dispatcher_and_state();
416                match answer {
417                    P2pConnectionResponse::Accepted(answer) => {
418                        dispatcher.push(P2pConnectionOutgoingAction::AnswerRecvSuccess {
419                            peer_id: target_public_key.peer_id(),
420                            answer: answer.clone(),
421                        })
422                    }
423                    P2pConnectionResponse::Rejected(reason) => {
424                        dispatcher.push(P2pConnectionOutgoingAction::AnswerRecvError {
425                            peer_id: target_public_key.peer_id(),
426                            error: P2pConnectionErrorResponse::Rejected(reason),
427                        })
428                    }
429                    P2pConnectionResponse::SignalDecryptionFailed => {
430                        dispatcher.push(P2pConnectionOutgoingAction::AnswerRecvError {
431                            peer_id: target_public_key.peer_id(),
432                            error: P2pConnectionErrorResponse::SignalDecryptionFailed,
433                        })
434                    }
435                    P2pConnectionResponse::InternalError => {
436                        dispatcher.push(P2pConnectionOutgoingAction::AnswerRecvError {
437                            peer_id: target_public_key.peer_id(),
438                            error: P2pConnectionErrorResponse::InternalError,
439                        })
440                    }
441                }
442
443                let state: &P2pState = state.substate()?;
444                state.webrtc_discovery_respond_with_availble_peers(dispatcher, meta.time());
445                Ok(())
446            }
447        }
448    }
449}