p2p/channels/signaling/exchange/
p2p_channels_signaling_exchange_reducer.rs

1use openmina_core::{bug_condition, Substate};
2use redux::ActionWithMeta;
3
4use crate::{
5    channels::{
6        signaling::discovery::P2pChannelsSignalingDiscoveryAction, ChannelId, MsgId,
7        P2pChannelsEffectfulAction,
8    },
9    connection::{
10        incoming::{
11            IncomingSignalingMethod, P2pConnectionIncomingAction, P2pConnectionIncomingInitOpts,
12        },
13        P2pConnectionResponse,
14    },
15    P2pState,
16};
17
18use super::{
19    P2pChannelsSignalingExchangeAction, P2pChannelsSignalingExchangeState,
20    SignalingExchangeChannelMsg, SignalingExchangeState,
21};
22
23impl P2pChannelsSignalingExchangeState {
24    /// Substate is accessed
25    pub fn reducer<Action, State>(
26        mut state_context: Substate<Action, State, P2pState>,
27        action: ActionWithMeta<P2pChannelsSignalingExchangeAction>,
28    ) -> Result<(), String>
29    where
30        State: crate::P2pStateTrait,
31        Action: crate::P2pActionTrait<State>,
32    {
33        let (action, meta) = action.split();
34        let p2p_state = state_context.get_substate_mut()?;
35        let peer_id = *action.peer_id();
36        let state = &mut p2p_state
37            .get_ready_peer_mut(&peer_id)
38            .ok_or_else(|| format!("Peer state not found for: {action:?}"))?
39            .channels
40            .signaling
41            .exchange;
42
43        match action {
44            P2pChannelsSignalingExchangeAction::Init { .. } => {
45                *state = Self::Init { time: meta.time() };
46
47                let dispatcher = state_context.into_dispatcher();
48                dispatcher.push(P2pChannelsEffectfulAction::InitChannel {
49                    peer_id,
50                    id: ChannelId::SignalingExchange,
51                    on_success: redux::callback!(
52                        on_signaling_exchange_channel_init(peer_id: crate::PeerId) -> crate::P2pAction {
53                            P2pChannelsSignalingExchangeAction::Pending { peer_id }
54                        }
55                    ),
56                });
57                Ok(())
58            }
59            P2pChannelsSignalingExchangeAction::Pending { .. } => {
60                *state = Self::Pending { time: meta.time() };
61                Ok(())
62            }
63            P2pChannelsSignalingExchangeAction::Ready { .. } => {
64                *state = Self::Ready {
65                    time: meta.time(),
66                    local: SignalingExchangeState::WaitingForRequest { time: meta.time() },
67                    remote: SignalingExchangeState::WaitingForRequest { time: meta.time() },
68                };
69
70                let dispatcher = state_context.into_dispatcher();
71                dispatcher.push(P2pChannelsSignalingExchangeAction::RequestSend { peer_id });
72
73                Ok(())
74            }
75            P2pChannelsSignalingExchangeAction::RequestSend { .. } => {
76                let Self::Ready { local, .. } = state else {
77                    bug_condition!(
78                        "Invalid state for `P2pChannelsSignalingExchangeAction::RequestSend`, state: {state:?}",
79                    );
80                    return Ok(());
81                };
82                *local = SignalingExchangeState::Requested { time: meta.time() };
83
84                let dispatcher = state_context.into_dispatcher();
85
86                let msg = SignalingExchangeChannelMsg::GetNext.into();
87                dispatcher.push(P2pChannelsEffectfulAction::MessageSend {
88                    peer_id,
89                    msg_id: MsgId::first(),
90                    msg,
91                });
92                Ok(())
93            }
94            P2pChannelsSignalingExchangeAction::OfferReceived {
95                offer,
96                offerer_pub_key,
97                ..
98            } => {
99                let Self::Ready { local, .. } = state else {
100                    bug_condition!(
101                        "Invalid state for `P2pChannelsSignalingExchangeAction::OfferReceived`, state: {state:?}",
102                    );
103                    return Ok(());
104                };
105
106                *local = SignalingExchangeState::Offered {
107                    time: meta.time(),
108                    offerer_pub_key: offerer_pub_key.clone(),
109                };
110
111                let dispatcher = state_context.into_dispatcher();
112                let offer = offer.clone();
113                dispatcher.push(P2pChannelsEffectfulAction::SignalingExchangeOfferDecrypt {
114                    peer_id,
115                    pub_key: offerer_pub_key.clone(),
116                    offer,
117                });
118                Ok(())
119            }
120            P2pChannelsSignalingExchangeAction::OfferDecryptError { .. } => {
121                let dispatcher = state_context.into_dispatcher();
122                let answer = P2pConnectionResponse::SignalDecryptionFailed;
123                dispatcher.push(P2pChannelsSignalingExchangeAction::AnswerSend { peer_id, answer });
124                Ok(())
125            }
126            P2pChannelsSignalingExchangeAction::OfferDecryptSuccess { offer, .. } => {
127                let (dispatcher, state) = state_context.into_dispatcher_and_state();
128                let state: &P2pState = state.substate()?;
129                let opts = P2pConnectionIncomingInitOpts {
130                    peer_id: offer.identity_pub_key.peer_id(),
131                    signaling: IncomingSignalingMethod::P2p {
132                        relay_peer_id: peer_id,
133                    },
134                    offer: offer.clone().into(),
135                };
136                match state.incoming_accept(opts.peer_id, &opts.offer) {
137                    Ok(_) => {
138                        dispatcher.push(P2pConnectionIncomingAction::Init { opts, rpc_id: None });
139                    }
140                    Err(reason) => {
141                        let answer = P2pConnectionResponse::Rejected(reason);
142                        dispatcher.push(P2pChannelsSignalingExchangeAction::AnswerSend {
143                            peer_id,
144                            answer,
145                        });
146                    }
147                }
148                Ok(())
149            }
150            P2pChannelsSignalingExchangeAction::AnswerSend { answer, .. } => {
151                let Self::Ready { local, .. } = state else {
152                    bug_condition!(
153                        "Invalid state for `P2pChannelsSignalingExchangeAction::AnswerSend`, state: {state:?}",
154                    );
155                    return Ok(());
156                };
157
158                let offerer_pub_key = match local {
159                    SignalingExchangeState::Offered {
160                        offerer_pub_key, ..
161                    } => offerer_pub_key.clone(),
162                    state => {
163                        bug_condition!(
164                        "Invalid state for `P2pChannelsSignalingExchangeAction::AnswerSend`, local state: {state:?}",
165                    );
166                        return Ok(());
167                    }
168                };
169
170                *local = SignalingExchangeState::Answered { time: meta.time() };
171
172                let answer = answer.clone();
173                let dispatcher = state_context.into_dispatcher();
174                dispatcher.push(
175                    P2pChannelsEffectfulAction::SignalingExchangeAnswerEncryptAndSend {
176                        peer_id,
177                        pub_key: offerer_pub_key.clone(),
178                        answer: Some(answer),
179                    },
180                );
181                dispatcher.push(P2pConnectionIncomingAction::AnswerSendSuccess {
182                    peer_id: offerer_pub_key.peer_id(),
183                });
184                Ok(())
185            }
186            P2pChannelsSignalingExchangeAction::RequestReceived { .. } => {
187                let Self::Ready { remote, .. } = state else {
188                    bug_condition!(
189                        "Invalid state for `P2pChannelsSignalingExchangeAction::RequestReceived`, state: {state:?}",
190                    );
191                    return Ok(());
192                };
193
194                *remote = SignalingExchangeState::Requested { time: meta.time() };
195
196                let (dispatcher, state) = state_context.into_dispatcher_and_state();
197                let state: &P2pState = state.substate()?;
198                state.webrtc_discovery_respond_with_availble_peers(dispatcher, meta.time());
199                Ok(())
200            }
201            P2pChannelsSignalingExchangeAction::OfferSend {
202                offer,
203                offerer_pub_key,
204                ..
205            } => {
206                let Self::Ready { remote, .. } = state else {
207                    bug_condition!(
208                        "Invalid state for `P2pChannelsSignalingExchangeAction::OfferSend`, state: {state:?}",
209                    );
210                    return Ok(());
211                };
212
213                *remote = SignalingExchangeState::Offered {
214                    time: meta.time(),
215                    offerer_pub_key: offerer_pub_key.clone(),
216                };
217                let dispatcher = state_context.into_dispatcher();
218                let msg = SignalingExchangeChannelMsg::OfferToYou {
219                    offerer_pub_key,
220                    offer,
221                }
222                .into();
223
224                dispatcher.push(P2pChannelsEffectfulAction::MessageSend {
225                    peer_id,
226                    msg_id: MsgId::first(),
227                    msg,
228                });
229                Ok(())
230            }
231            P2pChannelsSignalingExchangeAction::AnswerReceived { answer, .. } => {
232                let Self::Ready { remote, .. } = state else {
233                    bug_condition!(
234                        "Invalid state for `P2pChannelsSignalingExchangeAction::AnswerReceived`, state: {state:?}",
235                    );
236                    return Ok(());
237                };
238
239                let offerer_pub_key = match remote {
240                    SignalingExchangeState::Offered {
241                        offerer_pub_key, ..
242                    } => offerer_pub_key.clone(),
243                    state => {
244                        bug_condition!(
245                            "Invalid state for `P2pChannelsSignalingExchangeAction::AnswerReceived`, state: {state:?}",
246                        );
247                        return Ok(());
248                    }
249                };
250                *remote = SignalingExchangeState::Answered { time: meta.time() };
251                let dispatcher = state_context.into_dispatcher();
252                dispatcher.push(P2pChannelsSignalingDiscoveryAction::AnswerSend {
253                    peer_id: offerer_pub_key.peer_id(),
254                    answer: answer.clone(),
255                });
256                Ok(())
257            }
258        }
259    }
260}