p2p/network/identify/stream/
p2p_network_identify_stream_reducer.rs

1use super::{
2    P2pNetworkIdentifyStreamAction, P2pNetworkIdentifyStreamKind, P2pNetworkIdentifyStreamState,
3};
4use crate::{
5    identify::P2pIdentifyAction,
6    network::identify::{
7        pb::{self, Identify},
8        stream::P2pNetworkIdentifyStreamError,
9        stream_effectful::P2pNetworkIdentifyStreamEffectfulAction,
10        P2pNetworkIdentify, P2pNetworkIdentifyState,
11    },
12    token, ConnectionAddr, Data, P2pLimits, P2pNetworkConnectionError, P2pNetworkSchedulerAction,
13    P2pNetworkStreamProtobufError, P2pNetworkYamuxAction, P2pState, PeerId, YamuxFlags,
14};
15use multiaddr::Multiaddr;
16use openmina_core::{bug_condition, fuzzed_maybe, warn, Substate, SubstateAccess};
17use prost::Message;
18use quick_protobuf::BytesReader;
19use redux::{ActionWithMeta, Dispatcher};
20
21impl P2pNetworkIdentifyStreamState {
22    pub fn reducer<Action, State>(
23        mut state_context: Substate<Action, State, P2pNetworkIdentifyState>,
24        action: ActionWithMeta<P2pNetworkIdentifyStreamAction>,
25        limits: &P2pLimits,
26    ) -> Result<(), String>
27    where
28        State: crate::P2pStateTrait,
29        Action: crate::P2pActionTrait<State>,
30    {
31        let (action, meta) = action.split();
32        let substate = state_context.get_substate_mut()?;
33        let stream_state = match &action {
34            P2pNetworkIdentifyStreamAction::New {
35                peer_id, stream_id, ..
36            } => substate
37                .create_identify_stream_state(peer_id, stream_id)
38                .map_err(|stream| {
39                    format!("Identify stream already exists for action {action:?}: {stream:?}")
40                })?,
41            P2pNetworkIdentifyStreamAction::Prune {
42                peer_id, stream_id, ..
43            } => {
44                return substate
45                    .remove_identify_stream_state(peer_id, stream_id)
46                    .then_some(())
47                    .ok_or_else(|| format!("Identify stream not found for action {action:?}"));
48            }
49            a => substate
50                .find_identify_stream_state_mut(a.peer_id(), a.stream_id())
51                .ok_or_else(|| format!("Identify stream not found for action {a:?}"))?,
52        };
53
54        match &stream_state {
55            P2pNetworkIdentifyStreamState::Default => {
56                let P2pNetworkIdentifyStreamAction::New {
57                    incoming,
58                    addr,
59                    peer_id,
60                    stream_id,
61                } = action
62                else {
63                    // enabling conditions should prevent receiving other actions in Default state
64                    bug_condition!("Received action {:?} in Default state", action);
65                    return Ok(());
66                };
67
68                let kind = P2pNetworkIdentifyStreamKind::from(incoming);
69
70                *stream_state = match kind {
71                    // For incoming streams we prepare to send the Identify message
72                    P2pNetworkIdentifyStreamKind::Incoming => {
73                        P2pNetworkIdentifyStreamState::SendIdentify
74                    }
75                    // For outgoing streams we expect to get the Identify message from the remote peer
76                    P2pNetworkIdentifyStreamKind::Outgoing => {
77                        P2pNetworkIdentifyStreamState::RecvIdentify
78                    }
79                };
80
81                if matches!(stream_state, P2pNetworkIdentifyStreamState::SendIdentify) {
82                    let (dispatcher, state) = state_context.into_dispatcher_and_state();
83                    let p2p_state: &P2pState = state.substate()?;
84
85                    let addresses = p2p_state
86                        .network
87                        .scheduler
88                        .listeners
89                        .iter()
90                        .cloned()
91                        .collect::<Vec<_>>();
92
93                    dispatcher.push(
94                        P2pNetworkIdentifyStreamEffectfulAction::GetListenAddresses {
95                            addr,
96                            peer_id,
97                            stream_id,
98                            addresses,
99                        },
100                    );
101                }
102
103                Ok(())
104            }
105            P2pNetworkIdentifyStreamState::RecvIdentify => match action {
106                P2pNetworkIdentifyStreamAction::IncomingData {
107                    data,
108                    peer_id,
109                    stream_id,
110                    addr,
111                } => {
112                    let data = &data.0;
113                    let mut reader = BytesReader::from_bytes(data);
114                    let Ok(len) = reader.read_varint32(data).map(|v| v as usize) else {
115                        *stream_state = P2pNetworkIdentifyStreamState::Error(
116                            P2pNetworkStreamProtobufError::MessageLength,
117                        );
118                        return Ok(());
119                    };
120
121                    // TODO: implement as configuration option
122                    if len > limits.identify_message() {
123                        *stream_state = P2pNetworkIdentifyStreamState::Error(
124                            P2pNetworkStreamProtobufError::Limit(len, limits.identify_message()),
125                        );
126                        return Ok(());
127                    }
128
129                    let data = &data[(data.len() - reader.len())..];
130
131                    if len > reader.len() {
132                        *stream_state = P2pNetworkIdentifyStreamState::IncomingPartialData {
133                            len,
134                            data: data.to_vec(),
135                        };
136                        Ok(())
137                    } else {
138                        stream_state.handle_incoming_identify_message(len, data)?;
139                        let stream_state = stream_state.clone();
140                        let dispatcher = state_context.into_dispatcher();
141
142                        if let P2pNetworkIdentifyStreamState::IdentifyReceived { data } =
143                            stream_state
144                        {
145                            dispatcher.push(P2pIdentifyAction::UpdatePeerInformation {
146                                peer_id,
147                                info: data,
148                                addr,
149                            });
150                            dispatcher.push(P2pNetworkIdentifyStreamAction::Close {
151                                addr,
152                                peer_id,
153                                stream_id,
154                            });
155                        } else {
156                            let P2pNetworkIdentifyStreamState::Error(error) = stream_state else {
157                                bug_condition!("Invalid stream state");
158                                return Ok(());
159                            };
160
161                            warn!(meta.time(); summary = "error handling Identify action", error = display(&error));
162                            dispatcher.push(P2pNetworkSchedulerAction::Error {
163                                addr,
164                                error: P2pNetworkConnectionError::IdentifyStreamError(
165                                    P2pNetworkIdentifyStreamError::from(error),
166                                ),
167                            });
168                        }
169                        Ok(())
170                    }
171                }
172                P2pNetworkIdentifyStreamAction::RemoteClose {
173                    addr,
174                    peer_id,
175                    stream_id,
176                }
177                | P2pNetworkIdentifyStreamAction::Close {
178                    addr,
179                    peer_id,
180                    stream_id,
181                } => {
182                    let dispatcher = state_context.into_dispatcher();
183                    Self::disconnect(dispatcher, addr, peer_id, stream_id)
184                }
185                _ => {
186                    // State and connection cleanup should be handled by timeout
187                    bug_condition!("Received action {:?} in RecvIdentify state", action);
188                    Ok(())
189                }
190            },
191            P2pNetworkIdentifyStreamState::IncomingPartialData { len, data } => match action {
192                P2pNetworkIdentifyStreamAction::IncomingData {
193                    data: new_data,
194                    peer_id,
195                    addr,
196                    stream_id,
197                } => {
198                    let mut data = data.clone();
199                    data.extend_from_slice(&new_data);
200
201                    if *len > data.len() {
202                        *stream_state =
203                            P2pNetworkIdentifyStreamState::IncomingPartialData { len: *len, data };
204                        Ok(())
205                    } else {
206                        stream_state.handle_incoming_identify_message(*len, &data)?;
207
208                        if let P2pNetworkIdentifyStreamState::IdentifyReceived { data } =
209                            stream_state
210                        {
211                            let data = data.clone();
212                            let dispatcher = state_context.into_dispatcher();
213                            dispatcher.push(P2pIdentifyAction::UpdatePeerInformation {
214                                peer_id,
215                                info: data,
216                                addr,
217                            });
218                            dispatcher.push(P2pNetworkIdentifyStreamAction::Close {
219                                addr,
220                                peer_id,
221                                stream_id,
222                            });
223                        } else {
224                            let P2pNetworkIdentifyStreamState::Error(error) = stream_state else {
225                                bug_condition!("Invalid stream state");
226                                return Ok(());
227                            };
228
229                            let error = error.clone();
230                            let dispatcher = state_context.into_dispatcher();
231                            warn!(meta.time(); summary = "error handling Identify action", error = display(&error));
232
233                            dispatcher.push(P2pNetworkSchedulerAction::Error {
234                                addr,
235                                error: P2pNetworkConnectionError::IdentifyStreamError(
236                                    P2pNetworkIdentifyStreamError::from(error),
237                                ),
238                            });
239                        }
240
241                        Ok(())
242                    }
243                }
244                P2pNetworkIdentifyStreamAction::RemoteClose {
245                    addr,
246                    peer_id,
247                    stream_id,
248                }
249                | P2pNetworkIdentifyStreamAction::Close {
250                    addr,
251                    peer_id,
252                    stream_id,
253                } => {
254                    let dispatcher = state_context.into_dispatcher();
255                    Self::disconnect(dispatcher, addr, peer_id, stream_id)
256                }
257                _ => {
258                    // State and connection cleanup should be handled by timeout
259                    bug_condition!("Received action {:?} in IncomingPartialData state", action);
260                    Ok(())
261                }
262            },
263            P2pNetworkIdentifyStreamState::SendIdentify => match action {
264                P2pNetworkIdentifyStreamAction::RemoteClose {
265                    addr,
266                    peer_id,
267                    stream_id,
268                }
269                | P2pNetworkIdentifyStreamAction::Close {
270                    addr,
271                    peer_id,
272                    stream_id,
273                } => {
274                    let dispatcher = state_context.into_dispatcher();
275                    Self::disconnect(dispatcher, addr, peer_id, stream_id)
276                }
277                P2pNetworkIdentifyStreamAction::SendIdentify {
278                    addr,
279                    peer_id,
280                    stream_id,
281                    addresses,
282                } => {
283                    let (dispatcher, state) = state_context.into_dispatcher_and_state();
284                    let state = state.substate()?;
285                    Self::send_identify(dispatcher, state, addr, peer_id, stream_id, addresses);
286                    Ok(())
287                }
288                action => {
289                    // State and connection cleanup should be handled by timeout
290                    bug_condition!("Received action {:?} in SendIdentify state", action);
291                    Ok(())
292                }
293            },
294            P2pNetworkIdentifyStreamState::IdentifyReceived { .. } => match action {
295                P2pNetworkIdentifyStreamAction::Close {
296                    addr,
297                    peer_id,
298                    stream_id,
299                }
300                | P2pNetworkIdentifyStreamAction::RemoteClose {
301                    addr,
302                    peer_id,
303                    stream_id,
304                } => {
305                    let dispatcher = state_context.into_dispatcher();
306                    Self::disconnect(dispatcher, addr, peer_id, stream_id)
307                }
308                _ => Ok(()),
309            },
310            P2pNetworkIdentifyStreamState::Error(_) => {
311                // TODO
312                Ok(())
313            }
314        }
315    }
316
317    fn handle_incoming_identify_message(&mut self, len: usize, data: &[u8]) -> Result<(), String> {
318        let message = match Identify::decode(&data[..len]) {
319            Ok(v) => v,
320            Err(e) => {
321                *self = P2pNetworkIdentifyStreamState::Error(
322                    P2pNetworkStreamProtobufError::Message(e.to_string()),
323                );
324                return Ok(());
325            }
326        };
327
328        let data = match P2pNetworkIdentify::try_from(message) {
329            Ok(v) => v,
330            Err(e) => {
331                *self = P2pNetworkIdentifyStreamState::Error(e.into());
332                return Ok(());
333            }
334        };
335
336        *self = P2pNetworkIdentifyStreamState::IdentifyReceived {
337            data: Box::new(data),
338        };
339        Ok(())
340    }
341
342    fn disconnect<Action, State>(
343        dispatcher: &mut Dispatcher<Action, State>,
344        addr: ConnectionAddr,
345        peer_id: PeerId,
346        stream_id: u32,
347    ) -> Result<(), String>
348    where
349        State: SubstateAccess<P2pNetworkIdentifyState>,
350        Action: crate::P2pActionTrait<State>,
351    {
352        dispatcher.push(P2pNetworkYamuxAction::OutgoingData {
353            addr,
354            stream_id,
355            data: Data::empty(),
356            flags: YamuxFlags::FIN,
357        });
358        dispatcher.push(P2pNetworkIdentifyStreamAction::Prune {
359            addr,
360            peer_id,
361            stream_id,
362        });
363        Ok(())
364    }
365
366    fn send_identify<Action, State>(
367        dispatcher: &mut Dispatcher<Action, State>,
368        state: &P2pState,
369        addr: ConnectionAddr,
370        peer_id: PeerId,
371        stream_id: u32,
372        mut listen_addrs: Vec<Multiaddr>,
373    ) where
374        Action: crate::P2pActionTrait<State>,
375        State: crate::P2pStateTrait,
376    {
377        let config = &state.config;
378        let ips = &config.external_addrs;
379        let port = config.libp2p_port.unwrap_or(8302);
380
381        listen_addrs.extend(
382            ips.iter()
383                .map(|ip| Multiaddr::from(*ip).with(multiaddr::Protocol::Tcp(port))),
384        );
385
386        let public_key = Some(state.config.identity_pub_key.clone());
387
388        let mut protocols = vec![
389            token::StreamKind::Identify(token::IdentifyAlgorithm::Identify1_0_0),
390            token::StreamKind::Broadcast(token::BroadcastAlgorithm::Meshsub1_1_0),
391            token::StreamKind::Rpc(token::RpcAlgorithm::Rpc0_0_1),
392        ];
393        if state.network.scheduler.discovery_state.is_some() {
394            protocols.push(token::StreamKind::Discovery(
395                token::DiscoveryAlgorithm::Kademlia1_0_0,
396            ));
397        }
398        let identify_msg = P2pNetworkIdentify {
399            protocol_version: Some("ipfs/0.1.0".to_string()),
400            // TODO: include build info from GlobalConfig (?)
401            agent_version: Some("openmina".to_owned()),
402            public_key,
403            listen_addrs,
404            // TODO: other peers seem to report inaccurate information, should we implement this?
405            observed_addr: None,
406            protocols,
407        };
408
409        let mut out = Vec::new();
410        let identify_msg_proto: pb::Identify = match (&identify_msg).try_into() {
411            Ok(identify_msg_proto) => identify_msg_proto,
412            Err(err) => {
413                bug_condition!("error encoding message {:?}", err);
414                return;
415            }
416        };
417
418        if let Err(err) = prost::Message::encode_length_delimited(&identify_msg_proto, &mut out) {
419            bug_condition!("error serializing message {:?}", err);
420            return;
421        }
422
423        let data = fuzzed_maybe!(
424            Data(out.into_boxed_slice()),
425            crate::fuzzer::mutate_identify_msg
426        );
427
428        let flags = fuzzed_maybe!(Default::default(), crate::fuzzer::mutate_yamux_flags);
429
430        dispatcher.push(P2pNetworkYamuxAction::OutgoingData {
431            addr,
432            stream_id,
433            data,
434            flags,
435        });
436
437        dispatcher.push(P2pNetworkIdentifyStreamAction::Close {
438            addr,
439            peer_id,
440            stream_id,
441        });
442    }
443}