p2p/network/noise/
p2p_network_noise_reducer.rs

1use chacha20poly1305::{aead::generic_array::GenericArray, AeadInPlace, ChaCha20Poly1305, KeyInit};
2use crypto_bigint::consts::U12;
3use openmina_core::{bug_condition, fuzzed_maybe, Substate};
4
5use crate::{
6    connection::incoming::{P2pConnectionIncomingAction, P2pConnectionIncomingState},
7    Data, P2pNetworkConnectionError, P2pNetworkPnetAction, P2pNetworkSchedulerAction,
8    P2pNetworkSchedulerState, P2pNetworkSelectAction, P2pState, PeerId, SelectKind,
9};
10
11use self::p2p_network_noise_state::ResponderConsumeOutput;
12
13use super::*;
14
15use super::p2p_network_noise_state::{
16    InitiatorOutput, NoiseError, NoiseState, P2pNetworkNoiseState, P2pNetworkNoiseStateInitiator,
17    P2pNetworkNoiseStateInner, P2pNetworkNoiseStateResponder, ResponderOutput,
18};
19
20const MAX_CHUNK_SIZE: usize = u16::MAX as usize - 19;
21
22impl P2pNetworkNoiseState {
23    pub fn reducer<State, Action>(
24        mut state_context: Substate<Action, State, P2pNetworkSchedulerState>,
25        action: redux::ActionWithMeta<P2pNetworkNoiseAction>,
26    ) -> Result<(), String>
27    where
28        State: crate::P2pStateTrait,
29        Action: crate::P2pActionTrait<State>,
30    {
31        let (action, _meta) = action.split();
32        let noise_state = state_context
33            .get_substate_mut()?
34            .connection_state_mut(action.addr())
35            .and_then(|c| c.noise_state_mut())
36            .ok_or_else(|| format!("Invalid noise state {}", action.addr()))?;
37
38        match action {
39            P2pNetworkNoiseAction::Init {
40                incoming,
41                ephemeral_sk: esk,
42                static_sk: ssk,
43                signature: payload,
44                addr,
45            } => {
46                let epk = esk.pk();
47                let spk = ssk.pk();
48
49                noise_state.inner = if incoming {
50                    // Luckily the name is 32 bytes long, if it were longer you would have to take a sha2_256 hash of it.
51                    let mut noise = NoiseState::new(*b"Noise_XX_25519_ChaChaPoly_SHA256");
52                    noise.mix_hash(b"");
53
54                    Some(P2pNetworkNoiseStateInner::Responder(
55                        P2pNetworkNoiseStateResponder::Init {
56                            r_esk: esk,
57                            r_spk: spk,
58                            r_ssk: ssk,
59                            buffer: vec![],
60                            payload,
61                            noise,
62                        },
63                    ))
64                } else {
65                    let mut chunk = vec![0, 32];
66                    chunk.extend_from_slice(epk.0.as_bytes());
67                    noise_state.outgoing_chunks.push_back(vec![chunk.into()]);
68
69                    let mut noise = NoiseState::new(*b"Noise_XX_25519_ChaChaPoly_SHA256");
70                    noise.mix_hash(b"");
71                    noise.mix_hash(epk.0.as_bytes());
72                    noise.mix_hash(b"");
73
74                    Some(P2pNetworkNoiseStateInner::Initiator(
75                        P2pNetworkNoiseStateInitiator {
76                            i_esk: esk,
77                            i_spk: spk,
78                            i_ssk: ssk,
79                            r_epk: None,
80                            payload,
81                            noise,
82                            remote_pk: None,
83                        },
84                    ))
85                };
86
87                let mut outgoing = noise_state.outgoing_chunks.clone();
88                let dispatcher = state_context.into_dispatcher();
89                while let Some(data) = outgoing.pop_front() {
90                    dispatcher.push(P2pNetworkNoiseAction::OutgoingChunk { addr, data });
91                }
92
93                Ok(())
94            }
95            P2pNetworkNoiseAction::IncomingData { data, addr } => {
96                noise_state.buffer.extend_from_slice(&data);
97                let mut offset = 0;
98                loop {
99                    let buf = &noise_state.buffer[offset..];
100                    // Note: there is no way to determine if a peer is not sending more data on purpose or not.
101                    // Let the timeout logic handle this.
102                    let len = buf
103                        .get(..2)
104                        .and_then(|buf| Some(u16::from_be_bytes(buf.try_into().ok()?)));
105
106                    if let Some(len) = len {
107                        let full_len = 2 + len as usize;
108                        if buf.len() >= full_len {
109                            noise_state
110                                .incoming_chunks
111                                .push_back(buf[..full_len].to_vec());
112                            offset += full_len;
113                            continue;
114                        }
115                    }
116                    break;
117                }
118                noise_state.buffer = noise_state.buffer[offset..].to_vec();
119
120                let incoming_chunks = noise_state.incoming_chunks.len();
121                let dispatcher = state_context.into_dispatcher();
122                for _ in 0..incoming_chunks {
123                    dispatcher.push(P2pNetworkNoiseAction::IncomingChunk { addr });
124                }
125                Ok(())
126            }
127            ref action @ P2pNetworkNoiseAction::IncomingChunk { addr } => {
128                let Some(state) = &mut noise_state.inner else {
129                    bug_condition!("action {:?}: no inner state", action);
130                    return Ok(());
131                };
132                if let Some(mut chunk) = noise_state.incoming_chunks.pop_front() {
133                    match state {
134                        P2pNetworkNoiseStateInner::Initiator(i) => match i.consume(&mut chunk) {
135                            Ok(_) => {}
136                            Err(err) => *state = P2pNetworkNoiseStateInner::Error(dbg!(err)),
137                        },
138                        P2pNetworkNoiseStateInner::Responder(o) => match o.consume(&mut chunk) {
139                            Ok(None) => {}
140                            Ok(Some(ResponderConsumeOutput {
141                                output:
142                                    ResponderOutput {
143                                        send_key,
144                                        recv_key,
145                                        remote_pk,
146                                        ..
147                                    },
148                                payload: _,
149                            })) => {
150                                let remote_peer_id = remote_pk.peer_id();
151
152                                if noise_state.expected_peer_id.is_some_and(|expected_per_id| {
153                                    expected_per_id != remote_peer_id
154                                }) {
155                                    let lhs = noise_state
156                                        .expected_peer_id
157                                        .map_or("none".to_string(), PeerId::to_libp2p_string);
158                                    let rhs = remote_peer_id.to_libp2p_string();
159                                    *state = P2pNetworkNoiseStateInner::Error(
160                                        NoiseError::RemotePeerIdMismatch(format!("{lhs} != {rhs}")),
161                                    );
162                                } else {
163                                    *state = P2pNetworkNoiseStateInner::Done {
164                                        incoming: true,
165                                        send_key,
166                                        recv_key,
167                                        recv_nonce: 0,
168                                        send_nonce: 0,
169                                        remote_pk,
170                                        remote_peer_id,
171                                    };
172                                }
173                            }
174                            Err(err) => {
175                                *state = P2pNetworkNoiseStateInner::Error(dbg!(err));
176                            }
177                        },
178                        P2pNetworkNoiseStateInner::Done {
179                            recv_key,
180                            recv_nonce,
181                            ..
182                        } => {
183                            let aead = ChaCha20Poly1305::new(&recv_key.0.into());
184                            let mut chunk = chunk;
185                            let mut nonce = GenericArray::default();
186                            nonce[4..].clone_from_slice(&recv_nonce.to_le_bytes());
187                            *recv_nonce += 1;
188                            if chunk.len() < 18 {
189                                *state = P2pNetworkNoiseStateInner::Error(NoiseError::ChunkTooShort)
190                            } else {
191                                let data = &mut chunk[2..];
192                                let (data, tag) = data.split_at_mut(data.len() - 16);
193                                let tag = GenericArray::from_slice(&*tag);
194                                if aead
195                                    .decrypt_in_place_detached(&nonce, &[], data, tag)
196                                    .is_err()
197                                {
198                                    *state = P2pNetworkNoiseStateInner::Error(dbg!(
199                                        NoiseError::FirstMacMismatch
200                                    ));
201                                } else {
202                                    noise_state.decrypted_chunks.push_back(data.to_vec().into());
203                                }
204                            }
205                        }
206                        P2pNetworkNoiseStateInner::Error(_) => {}
207                    }
208                }
209
210                let remote_peer_id = noise_state.remote_peer_id();
211                let handshake_done = noise_state.handshake_done(action);
212                let mut outgoing = noise_state.outgoing_chunks.clone();
213                let decrypted = noise_state.decrypted_chunks.front().cloned();
214                let middle_initiator = matches!(
215                    &noise_state.inner,
216                    Some(P2pNetworkNoiseStateInner::Initiator(..))
217                ) && remote_peer_id.is_some();
218
219                let middle_responder = matches!(
220                    &noise_state.inner,
221                    Some(P2pNetworkNoiseStateInner::Responder(
222                        P2pNetworkNoiseStateResponder::Init { .. },
223                    ))
224                );
225                let error = noise_state.as_error();
226
227                let (dispatcher, state) = state_context.into_dispatcher_and_state();
228                if let Some(error) = error {
229                    dispatcher.push(P2pNetworkSchedulerAction::Error {
230                        addr,
231                        error: P2pNetworkConnectionError::Noise(error),
232                    });
233                } else {
234                    if let Some((peer_id, true)) = handshake_done {
235                        dispatcher.push(P2pConnectionIncomingAction::FinalizePendingLibp2p {
236                            peer_id,
237                            addr: addr.sock_addr,
238                        });
239
240                        let p2p_state: &P2pState = state.substate()?;
241                        let this_connection_is_kept = p2p_state.peers
242                        .get(&peer_id)
243                        .and_then(|peer_state| peer_state.status.as_connecting())
244                        .and_then(|connecting| connecting.as_incoming())
245                        .is_some_and( |incoming| matches!(incoming, P2pConnectionIncomingState::FinalizePendingLibp2p { addr: a, .. } if a == &addr.sock_addr));
246
247                        if !this_connection_is_kept {
248                            return Ok(());
249                        }
250                    }
251
252                    while let Some(data) = outgoing.pop_front() {
253                        dispatcher.push(P2pNetworkNoiseAction::OutgoingChunk { addr, data });
254                    }
255
256                    if let Some(data) = decrypted {
257                        dispatcher.push(P2pNetworkNoiseAction::DecryptedData {
258                            addr,
259                            peer_id: remote_peer_id,
260                            data,
261                        });
262                    }
263
264                    if middle_initiator || middle_responder {
265                        dispatcher.push(P2pNetworkNoiseAction::OutgoingData {
266                            addr,
267                            data: Data::empty(),
268                        });
269                    }
270                }
271
272                Ok(())
273            }
274            ref action @ P2pNetworkNoiseAction::OutgoingChunk { addr, ref data }
275            | ref action @ P2pNetworkNoiseAction::OutgoingChunkSelectMux { addr, ref data } => {
276                noise_state.outgoing_chunks.pop_front();
277
278                let handshake_done = noise_state.handshake_done(action);
279                let dispatcher = state_context.into_dispatcher();
280                let data = fuzzed_maybe!(
281                    data.iter()
282                        .fold(vec![], |mut v, item| {
283                            v.extend_from_slice(item);
284                            v
285                        })
286                        .into(),
287                    crate::fuzzer::mutate_noise
288                );
289                dispatcher.push(P2pNetworkPnetAction::OutgoingData { addr, data });
290                if let Some((peer_id, incoming)) = handshake_done {
291                    dispatcher.push(P2pNetworkNoiseAction::HandshakeDone {
292                        addr,
293                        peer_id,
294                        incoming,
295                    });
296                }
297
298                Ok(())
299            }
300            P2pNetworkNoiseAction::OutgoingData { data, addr } => {
301                let Some(state) = &mut noise_state.inner else {
302                    bug_condition!("action `P2pNetworkNoiseAction::OutgoingData`: no inner state");
303                    return Ok(());
304                };
305
306                match state {
307                    P2pNetworkNoiseStateInner::Done {
308                        send_key,
309                        send_nonce,
310                        ..
311                    } => {
312                        let aead = ChaCha20Poly1305::new(&send_key.0.into());
313                        let mut chunks = vec![];
314
315                        for data_chunk in data.chunks(MAX_CHUNK_SIZE) {
316                            let mut chunk = Vec::with_capacity(18 + data_chunk.len());
317                            chunk
318                                .extend_from_slice(&((data_chunk.len() + 16) as u16).to_be_bytes());
319                            chunk.extend_from_slice(data_chunk);
320
321                            let mut nonce: GenericArray<u8, U12> = GenericArray::default();
322                            nonce[4..].clone_from_slice(&send_nonce.to_le_bytes());
323                            *send_nonce += 1;
324
325                            let tag = aead.encrypt_in_place_detached(
326                                &nonce,
327                                &[],
328                                &mut chunk[2..(2 + data_chunk.len())],
329                            );
330
331                            let tag = match tag {
332                                Ok(tag) => tag,
333                                Err(_) => {
334                                    *state =
335                                        P2pNetworkNoiseStateInner::Error(NoiseError::Encryption);
336                                    return Ok(());
337                                }
338                            };
339
340                            chunk.extend_from_slice(&tag);
341                            chunks.push(chunk.into());
342                        }
343                        noise_state.outgoing_chunks.push_back(chunks);
344                    }
345                    P2pNetworkNoiseStateInner::Initiator(i) => {
346                        match (i.generate(&data), i.remote_pk.clone()) {
347                            (
348                                Ok(Some(InitiatorOutput {
349                                    send_key,
350                                    recv_key,
351                                    chunk,
352                                })),
353                                Some(remote_pk),
354                            ) => {
355                                noise_state.outgoing_chunks.push_back(vec![chunk.into()]);
356                                let remote_peer_id = remote_pk.peer_id();
357
358                                if noise_state.expected_peer_id.is_some_and(|expected_per_id| {
359                                    expected_per_id != remote_peer_id
360                                }) {
361                                    let lhs = noise_state
362                                        .expected_peer_id
363                                        .map_or("none".to_string(), PeerId::to_libp2p_string);
364                                    let rhs = remote_peer_id.to_libp2p_string();
365                                    *state = P2pNetworkNoiseStateInner::Error(
366                                        NoiseError::RemotePeerIdMismatch(format!("{lhs} != {rhs}")),
367                                    );
368                                } else {
369                                    *state = P2pNetworkNoiseStateInner::Done {
370                                        incoming: false,
371                                        send_key,
372                                        recv_key,
373                                        recv_nonce: 0,
374                                        send_nonce: 0,
375                                        remote_pk,
376                                        remote_peer_id,
377                                    };
378                                }
379                            }
380                            (Err(error), Some(_)) => {
381                                *state = P2pNetworkNoiseStateInner::Error(error);
382                                return Ok(());
383                            }
384                            _ => {}
385                        }
386                    }
387                    P2pNetworkNoiseStateInner::Responder(r) => {
388                        if let Some(chunk) = r.generate(&data) {
389                            noise_state.outgoing_chunks.push_back(vec![chunk.into()]);
390                        }
391                    }
392                    // TODO: report error
393                    _ => {}
394                }
395
396                let mut outgoing = noise_state.outgoing_chunks.clone();
397                let error = noise_state.as_error();
398                let dispatcher = state_context.into_dispatcher();
399
400                if let Some(error) = error {
401                    dispatcher.push(P2pNetworkSchedulerAction::Error {
402                        addr,
403                        error: P2pNetworkConnectionError::Noise(error),
404                    });
405                } else {
406                    while let Some(data) = outgoing.pop_front() {
407                        dispatcher.push(P2pNetworkNoiseAction::OutgoingChunk { addr, data });
408                    }
409                }
410                Ok(())
411            }
412            P2pNetworkNoiseAction::OutgoingDataSelectMux { data, addr } => {
413                let Some(P2pNetworkNoiseStateInner::Done {
414                    send_key,
415                    send_nonce,
416                    ..
417                }) = &mut noise_state.inner
418                else {
419                    bug_condition!(
420                        "action `P2pNetworkNoiseAction::OutgoingDataSelectMux`: no inner state"
421                    );
422                    return Ok(());
423                };
424
425                let aead = ChaCha20Poly1305::new(&send_key.0.into());
426
427                let mut chunk = Vec::with_capacity(18 + data.len());
428                chunk.extend_from_slice(&((data.len() + 16) as u16).to_be_bytes());
429                chunk.extend_from_slice(&data);
430
431                let mut nonce = GenericArray::default();
432                nonce[4..].clone_from_slice(&send_nonce.to_le_bytes());
433                *send_nonce += 1;
434
435                match aead.encrypt_in_place_detached(&nonce, &[], &mut chunk[2..(2 + data.len())]) {
436                    Ok(tag) => {
437                        chunk.extend_from_slice(&tag);
438                        noise_state.outgoing_chunks.push_back(vec![chunk.into()]);
439                    }
440                    Err(_) => {
441                        noise_state.inner =
442                            Some(P2pNetworkNoiseStateInner::Error(NoiseError::Encryption));
443                    }
444                };
445
446                let outgoing = noise_state.outgoing_chunks.front().cloned();
447                let error = noise_state.as_error();
448                let dispatcher = state_context.into_dispatcher();
449
450                if let Some(error) = error {
451                    dispatcher.push(P2pNetworkSchedulerAction::Error {
452                        addr,
453                        error: P2pNetworkConnectionError::Noise(error),
454                    });
455                } else if let Some(data) = outgoing {
456                    dispatcher.push(P2pNetworkNoiseAction::OutgoingChunkSelectMux { addr, data })
457                }
458
459                Ok(())
460            }
461            P2pNetworkNoiseAction::DecryptedData {
462                addr,
463                peer_id,
464                data,
465            } => {
466                noise_state.decrypted_chunks.pop_front();
467
468                let remote_peer_id = noise_state.remote_peer_id();
469                let dispatcher = state_context.into_dispatcher();
470                dispatcher.push(P2pNetworkSelectAction::IncomingDataMux {
471                    addr,
472                    peer_id: peer_id.or(remote_peer_id),
473                    data,
474                    fin: false,
475                });
476
477                Ok(())
478            }
479            P2pNetworkNoiseAction::HandshakeDone {
480                addr,
481                peer_id,
482                incoming,
483            } => {
484                let dispatcher = state_context.into_dispatcher();
485                dispatcher.push(P2pNetworkSelectAction::Init {
486                    addr,
487                    kind: SelectKind::Multiplexing(peer_id),
488                    incoming,
489                });
490                Ok(())
491            }
492        }
493    }
494}