1use super::{
2 P2pNetworkIdentifyStreamAction, P2pNetworkIdentifyStreamKind, P2pNetworkIdentifyStreamState,
3};
4use crate::{
5 identify::P2pIdentifyAction,
6 network::identify::{
7 pb::{self, Identify},
8 stream::P2pNetworkIdentifyStreamError,
9 stream_effectful::P2pNetworkIdentifyStreamEffectfulAction,
10 P2pNetworkIdentify, P2pNetworkIdentifyState,
11 },
12 token, ConnectionAddr, Data, P2pLimits, P2pNetworkConnectionError, P2pNetworkSchedulerAction,
13 P2pNetworkStreamProtobufError, P2pNetworkYamuxAction, P2pState, PeerId, YamuxFlags,
14};
15use multiaddr::Multiaddr;
16use openmina_core::{bug_condition, fuzzed_maybe, warn, Substate, SubstateAccess};
17use prost::Message;
18use quick_protobuf::BytesReader;
19use redux::{ActionWithMeta, Dispatcher};
20
21impl P2pNetworkIdentifyStreamState {
22 pub fn reducer<Action, State>(
23 mut state_context: Substate<Action, State, P2pNetworkIdentifyState>,
24 action: ActionWithMeta<P2pNetworkIdentifyStreamAction>,
25 limits: &P2pLimits,
26 ) -> Result<(), String>
27 where
28 State: crate::P2pStateTrait,
29 Action: crate::P2pActionTrait<State>,
30 {
31 let (action, meta) = action.split();
32 let substate = state_context.get_substate_mut()?;
33 let stream_state = match &action {
34 P2pNetworkIdentifyStreamAction::New {
35 peer_id, stream_id, ..
36 } => substate
37 .create_identify_stream_state(peer_id, stream_id)
38 .map_err(|stream| {
39 format!("Identify stream already exists for action {action:?}: {stream:?}")
40 })?,
41 P2pNetworkIdentifyStreamAction::Prune {
42 peer_id, stream_id, ..
43 } => {
44 return substate
45 .remove_identify_stream_state(peer_id, stream_id)
46 .then_some(())
47 .ok_or_else(|| format!("Identify stream not found for action {action:?}"));
48 }
49 a => substate
50 .find_identify_stream_state_mut(a.peer_id(), a.stream_id())
51 .ok_or_else(|| format!("Identify stream not found for action {a:?}"))?,
52 };
53
54 match &stream_state {
55 P2pNetworkIdentifyStreamState::Default => {
56 let P2pNetworkIdentifyStreamAction::New {
57 incoming,
58 addr,
59 peer_id,
60 stream_id,
61 } = action
62 else {
63 bug_condition!("Received action {:?} in Default state", action);
65 return Ok(());
66 };
67
68 let kind = P2pNetworkIdentifyStreamKind::from(incoming);
69
70 *stream_state = match kind {
71 P2pNetworkIdentifyStreamKind::Incoming => {
73 P2pNetworkIdentifyStreamState::SendIdentify
74 }
75 P2pNetworkIdentifyStreamKind::Outgoing => {
77 P2pNetworkIdentifyStreamState::RecvIdentify
78 }
79 };
80
81 if matches!(stream_state, P2pNetworkIdentifyStreamState::SendIdentify) {
82 let (dispatcher, state) = state_context.into_dispatcher_and_state();
83 let p2p_state: &P2pState = state.substate()?;
84
85 let addresses = p2p_state
86 .network
87 .scheduler
88 .listeners
89 .iter()
90 .cloned()
91 .collect::<Vec<_>>();
92
93 dispatcher.push(
94 P2pNetworkIdentifyStreamEffectfulAction::GetListenAddresses {
95 addr,
96 peer_id,
97 stream_id,
98 addresses,
99 },
100 );
101 }
102
103 Ok(())
104 }
105 P2pNetworkIdentifyStreamState::RecvIdentify => match action {
106 P2pNetworkIdentifyStreamAction::IncomingData {
107 data,
108 peer_id,
109 stream_id,
110 addr,
111 } => {
112 let data = &data.0;
113 let mut reader = BytesReader::from_bytes(data);
114 let Ok(len) = reader.read_varint32(data).map(|v| v as usize) else {
115 *stream_state = P2pNetworkIdentifyStreamState::Error(
116 P2pNetworkStreamProtobufError::MessageLength,
117 );
118 return Ok(());
119 };
120
121 if len > limits.identify_message() {
123 *stream_state = P2pNetworkIdentifyStreamState::Error(
124 P2pNetworkStreamProtobufError::Limit(len, limits.identify_message()),
125 );
126 return Ok(());
127 }
128
129 let data = &data[(data.len() - reader.len())..];
130
131 if len > reader.len() {
132 *stream_state = P2pNetworkIdentifyStreamState::IncomingPartialData {
133 len,
134 data: data.to_vec(),
135 };
136 Ok(())
137 } else {
138 stream_state.handle_incoming_identify_message(len, data)?;
139 let stream_state = stream_state.clone();
140 let dispatcher = state_context.into_dispatcher();
141
142 if let P2pNetworkIdentifyStreamState::IdentifyReceived { data } =
143 stream_state
144 {
145 dispatcher.push(P2pIdentifyAction::UpdatePeerInformation {
146 peer_id,
147 info: data,
148 addr,
149 });
150 dispatcher.push(P2pNetworkIdentifyStreamAction::Close {
151 addr,
152 peer_id,
153 stream_id,
154 });
155 } else {
156 let P2pNetworkIdentifyStreamState::Error(error) = stream_state else {
157 bug_condition!("Invalid stream state");
158 return Ok(());
159 };
160
161 warn!(meta.time(); summary = "error handling Identify action", error = display(&error));
162 dispatcher.push(P2pNetworkSchedulerAction::Error {
163 addr,
164 error: P2pNetworkConnectionError::IdentifyStreamError(
165 P2pNetworkIdentifyStreamError::from(error),
166 ),
167 });
168 }
169 Ok(())
170 }
171 }
172 P2pNetworkIdentifyStreamAction::RemoteClose {
173 addr,
174 peer_id,
175 stream_id,
176 }
177 | P2pNetworkIdentifyStreamAction::Close {
178 addr,
179 peer_id,
180 stream_id,
181 } => {
182 let dispatcher = state_context.into_dispatcher();
183 Self::disconnect(dispatcher, addr, peer_id, stream_id)
184 }
185 _ => {
186 bug_condition!("Received action {:?} in RecvIdentify state", action);
188 Ok(())
189 }
190 },
191 P2pNetworkIdentifyStreamState::IncomingPartialData { len, data } => match action {
192 P2pNetworkIdentifyStreamAction::IncomingData {
193 data: new_data,
194 peer_id,
195 addr,
196 stream_id,
197 } => {
198 let mut data = data.clone();
199 data.extend_from_slice(&new_data);
200
201 if *len > data.len() {
202 *stream_state =
203 P2pNetworkIdentifyStreamState::IncomingPartialData { len: *len, data };
204 Ok(())
205 } else {
206 stream_state.handle_incoming_identify_message(*len, &data)?;
207
208 if let P2pNetworkIdentifyStreamState::IdentifyReceived { data } =
209 stream_state
210 {
211 let data = data.clone();
212 let dispatcher = state_context.into_dispatcher();
213 dispatcher.push(P2pIdentifyAction::UpdatePeerInformation {
214 peer_id,
215 info: data,
216 addr,
217 });
218 dispatcher.push(P2pNetworkIdentifyStreamAction::Close {
219 addr,
220 peer_id,
221 stream_id,
222 });
223 } else {
224 let P2pNetworkIdentifyStreamState::Error(error) = stream_state else {
225 bug_condition!("Invalid stream state");
226 return Ok(());
227 };
228
229 let error = error.clone();
230 let dispatcher = state_context.into_dispatcher();
231 warn!(meta.time(); summary = "error handling Identify action", error = display(&error));
232
233 dispatcher.push(P2pNetworkSchedulerAction::Error {
234 addr,
235 error: P2pNetworkConnectionError::IdentifyStreamError(
236 P2pNetworkIdentifyStreamError::from(error),
237 ),
238 });
239 }
240
241 Ok(())
242 }
243 }
244 P2pNetworkIdentifyStreamAction::RemoteClose {
245 addr,
246 peer_id,
247 stream_id,
248 }
249 | P2pNetworkIdentifyStreamAction::Close {
250 addr,
251 peer_id,
252 stream_id,
253 } => {
254 let dispatcher = state_context.into_dispatcher();
255 Self::disconnect(dispatcher, addr, peer_id, stream_id)
256 }
257 _ => {
258 bug_condition!("Received action {:?} in IncomingPartialData state", action);
260 Ok(())
261 }
262 },
263 P2pNetworkIdentifyStreamState::SendIdentify => match action {
264 P2pNetworkIdentifyStreamAction::RemoteClose {
265 addr,
266 peer_id,
267 stream_id,
268 }
269 | P2pNetworkIdentifyStreamAction::Close {
270 addr,
271 peer_id,
272 stream_id,
273 } => {
274 let dispatcher = state_context.into_dispatcher();
275 Self::disconnect(dispatcher, addr, peer_id, stream_id)
276 }
277 P2pNetworkIdentifyStreamAction::SendIdentify {
278 addr,
279 peer_id,
280 stream_id,
281 addresses,
282 } => {
283 let (dispatcher, state) = state_context.into_dispatcher_and_state();
284 let state = state.substate()?;
285 Self::send_identify(dispatcher, state, addr, peer_id, stream_id, addresses);
286 Ok(())
287 }
288 action => {
289 bug_condition!("Received action {:?} in SendIdentify state", action);
291 Ok(())
292 }
293 },
294 P2pNetworkIdentifyStreamState::IdentifyReceived { .. } => match action {
295 P2pNetworkIdentifyStreamAction::Close {
296 addr,
297 peer_id,
298 stream_id,
299 }
300 | P2pNetworkIdentifyStreamAction::RemoteClose {
301 addr,
302 peer_id,
303 stream_id,
304 } => {
305 let dispatcher = state_context.into_dispatcher();
306 Self::disconnect(dispatcher, addr, peer_id, stream_id)
307 }
308 _ => Ok(()),
309 },
310 P2pNetworkIdentifyStreamState::Error(_) => {
311 Ok(())
313 }
314 }
315 }
316
317 fn handle_incoming_identify_message(&mut self, len: usize, data: &[u8]) -> Result<(), String> {
318 let message = match Identify::decode(&data[..len]) {
319 Ok(v) => v,
320 Err(e) => {
321 *self = P2pNetworkIdentifyStreamState::Error(
322 P2pNetworkStreamProtobufError::Message(e.to_string()),
323 );
324 return Ok(());
325 }
326 };
327
328 let data = match P2pNetworkIdentify::try_from(message) {
329 Ok(v) => v,
330 Err(e) => {
331 *self = P2pNetworkIdentifyStreamState::Error(e.into());
332 return Ok(());
333 }
334 };
335
336 *self = P2pNetworkIdentifyStreamState::IdentifyReceived {
337 data: Box::new(data),
338 };
339 Ok(())
340 }
341
342 fn disconnect<Action, State>(
343 dispatcher: &mut Dispatcher<Action, State>,
344 addr: ConnectionAddr,
345 peer_id: PeerId,
346 stream_id: u32,
347 ) -> Result<(), String>
348 where
349 State: SubstateAccess<P2pNetworkIdentifyState>,
350 Action: crate::P2pActionTrait<State>,
351 {
352 dispatcher.push(P2pNetworkYamuxAction::OutgoingData {
353 addr,
354 stream_id,
355 data: Data::empty(),
356 flags: YamuxFlags::FIN,
357 });
358 dispatcher.push(P2pNetworkIdentifyStreamAction::Prune {
359 addr,
360 peer_id,
361 stream_id,
362 });
363 Ok(())
364 }
365
366 fn send_identify<Action, State>(
367 dispatcher: &mut Dispatcher<Action, State>,
368 state: &P2pState,
369 addr: ConnectionAddr,
370 peer_id: PeerId,
371 stream_id: u32,
372 mut listen_addrs: Vec<Multiaddr>,
373 ) where
374 Action: crate::P2pActionTrait<State>,
375 State: crate::P2pStateTrait,
376 {
377 let config = &state.config;
378 let ips = &config.external_addrs;
379 let port = config.libp2p_port.unwrap_or(8302);
380
381 listen_addrs.extend(
382 ips.iter()
383 .map(|ip| Multiaddr::from(*ip).with(multiaddr::Protocol::Tcp(port))),
384 );
385
386 let public_key = Some(state.config.identity_pub_key.clone());
387
388 let mut protocols = vec![
389 token::StreamKind::Identify(token::IdentifyAlgorithm::Identify1_0_0),
390 token::StreamKind::Broadcast(token::BroadcastAlgorithm::Meshsub1_1_0),
391 token::StreamKind::Rpc(token::RpcAlgorithm::Rpc0_0_1),
392 ];
393 if state.network.scheduler.discovery_state.is_some() {
394 protocols.push(token::StreamKind::Discovery(
395 token::DiscoveryAlgorithm::Kademlia1_0_0,
396 ));
397 }
398 let identify_msg = P2pNetworkIdentify {
399 protocol_version: Some("ipfs/0.1.0".to_string()),
400 agent_version: Some("openmina".to_owned()),
402 public_key,
403 listen_addrs,
404 observed_addr: None,
406 protocols,
407 };
408
409 let mut out = Vec::new();
410 let identify_msg_proto: pb::Identify = match (&identify_msg).try_into() {
411 Ok(identify_msg_proto) => identify_msg_proto,
412 Err(err) => {
413 bug_condition!("error encoding message {:?}", err);
414 return;
415 }
416 };
417
418 if let Err(err) = prost::Message::encode_length_delimited(&identify_msg_proto, &mut out) {
419 bug_condition!("error serializing message {:?}", err);
420 return;
421 }
422
423 let data = fuzzed_maybe!(
424 Data(out.into_boxed_slice()),
425 crate::fuzzer::mutate_identify_msg
426 );
427
428 let flags = fuzzed_maybe!(Default::default(), crate::fuzzer::mutate_yamux_flags);
429
430 dispatcher.push(P2pNetworkYamuxAction::OutgoingData {
431 addr,
432 stream_id,
433 data,
434 flags,
435 });
436
437 dispatcher.push(P2pNetworkIdentifyStreamAction::Close {
438 addr,
439 peer_id,
440 stream_id,
441 });
442 }
443}