1use chacha20poly1305::{aead::generic_array::GenericArray, AeadInPlace, ChaCha20Poly1305, KeyInit};
2use crypto_bigint::consts::U12;
3use openmina_core::{bug_condition, fuzzed_maybe, Substate};
4
5use crate::{
6 connection::incoming::{P2pConnectionIncomingAction, P2pConnectionIncomingState},
7 Data, P2pNetworkConnectionError, P2pNetworkPnetAction, P2pNetworkSchedulerAction,
8 P2pNetworkSchedulerState, P2pNetworkSelectAction, P2pState, PeerId, SelectKind,
9};
10
11use self::p2p_network_noise_state::ResponderConsumeOutput;
12
13use super::*;
14
15use super::p2p_network_noise_state::{
16 InitiatorOutput, NoiseError, NoiseState, P2pNetworkNoiseState, P2pNetworkNoiseStateInitiator,
17 P2pNetworkNoiseStateInner, P2pNetworkNoiseStateResponder, ResponderOutput,
18};
19
20const MAX_CHUNK_SIZE: usize = u16::MAX as usize - 19;
21
22impl P2pNetworkNoiseState {
23 pub fn reducer<State, Action>(
24 mut state_context: Substate<Action, State, P2pNetworkSchedulerState>,
25 action: redux::ActionWithMeta<P2pNetworkNoiseAction>,
26 ) -> Result<(), String>
27 where
28 State: crate::P2pStateTrait,
29 Action: crate::P2pActionTrait<State>,
30 {
31 let (action, _meta) = action.split();
32 let noise_state = state_context
33 .get_substate_mut()?
34 .connection_state_mut(action.addr())
35 .and_then(|c| c.noise_state_mut())
36 .ok_or_else(|| format!("Invalid noise state {}", action.addr()))?;
37
38 match action {
39 P2pNetworkNoiseAction::Init {
40 incoming,
41 ephemeral_sk: esk,
42 static_sk: ssk,
43 signature: payload,
44 addr,
45 } => {
46 let epk = esk.pk();
47 let spk = ssk.pk();
48
49 noise_state.inner = if incoming {
50 let mut noise = NoiseState::new(*b"Noise_XX_25519_ChaChaPoly_SHA256");
52 noise.mix_hash(b"");
53
54 Some(P2pNetworkNoiseStateInner::Responder(
55 P2pNetworkNoiseStateResponder::Init {
56 r_esk: esk,
57 r_spk: spk,
58 r_ssk: ssk,
59 buffer: vec![],
60 payload,
61 noise,
62 },
63 ))
64 } else {
65 let mut chunk = vec![0, 32];
66 chunk.extend_from_slice(epk.0.as_bytes());
67 noise_state.outgoing_chunks.push_back(vec![chunk.into()]);
68
69 let mut noise = NoiseState::new(*b"Noise_XX_25519_ChaChaPoly_SHA256");
70 noise.mix_hash(b"");
71 noise.mix_hash(epk.0.as_bytes());
72 noise.mix_hash(b"");
73
74 Some(P2pNetworkNoiseStateInner::Initiator(
75 P2pNetworkNoiseStateInitiator {
76 i_esk: esk,
77 i_spk: spk,
78 i_ssk: ssk,
79 r_epk: None,
80 payload,
81 noise,
82 remote_pk: None,
83 },
84 ))
85 };
86
87 let mut outgoing = noise_state.outgoing_chunks.clone();
88 let dispatcher = state_context.into_dispatcher();
89 while let Some(data) = outgoing.pop_front() {
90 dispatcher.push(P2pNetworkNoiseAction::OutgoingChunk { addr, data });
91 }
92
93 Ok(())
94 }
95 P2pNetworkNoiseAction::IncomingData { data, addr } => {
96 noise_state.buffer.extend_from_slice(&data);
97 let mut offset = 0;
98 loop {
99 let buf = &noise_state.buffer[offset..];
100 let len = buf
103 .get(..2)
104 .and_then(|buf| Some(u16::from_be_bytes(buf.try_into().ok()?)));
105
106 if let Some(len) = len {
107 let full_len = 2 + len as usize;
108 if buf.len() >= full_len {
109 noise_state
110 .incoming_chunks
111 .push_back(buf[..full_len].to_vec());
112 offset += full_len;
113 continue;
114 }
115 }
116 break;
117 }
118 noise_state.buffer = noise_state.buffer[offset..].to_vec();
119
120 let incoming_chunks = noise_state.incoming_chunks.len();
121 let dispatcher = state_context.into_dispatcher();
122 for _ in 0..incoming_chunks {
123 dispatcher.push(P2pNetworkNoiseAction::IncomingChunk { addr });
124 }
125 Ok(())
126 }
127 ref action @ P2pNetworkNoiseAction::IncomingChunk { addr } => {
128 let Some(state) = &mut noise_state.inner else {
129 bug_condition!("action {:?}: no inner state", action);
130 return Ok(());
131 };
132 if let Some(mut chunk) = noise_state.incoming_chunks.pop_front() {
133 match state {
134 P2pNetworkNoiseStateInner::Initiator(i) => match i.consume(&mut chunk) {
135 Ok(_) => {}
136 Err(err) => *state = P2pNetworkNoiseStateInner::Error(dbg!(err)),
137 },
138 P2pNetworkNoiseStateInner::Responder(o) => match o.consume(&mut chunk) {
139 Ok(None) => {}
140 Ok(Some(ResponderConsumeOutput {
141 output:
142 ResponderOutput {
143 send_key,
144 recv_key,
145 remote_pk,
146 ..
147 },
148 payload: _,
149 })) => {
150 let remote_peer_id = remote_pk.peer_id();
151
152 if noise_state.expected_peer_id.is_some_and(|expected_per_id| {
153 expected_per_id != remote_peer_id
154 }) {
155 let lhs = noise_state
156 .expected_peer_id
157 .map_or("none".to_string(), PeerId::to_libp2p_string);
158 let rhs = remote_peer_id.to_libp2p_string();
159 *state = P2pNetworkNoiseStateInner::Error(
160 NoiseError::RemotePeerIdMismatch(format!("{lhs} != {rhs}")),
161 );
162 } else {
163 *state = P2pNetworkNoiseStateInner::Done {
164 incoming: true,
165 send_key,
166 recv_key,
167 recv_nonce: 0,
168 send_nonce: 0,
169 remote_pk,
170 remote_peer_id,
171 };
172 }
173 }
174 Err(err) => {
175 *state = P2pNetworkNoiseStateInner::Error(dbg!(err));
176 }
177 },
178 P2pNetworkNoiseStateInner::Done {
179 recv_key,
180 recv_nonce,
181 ..
182 } => {
183 let aead = ChaCha20Poly1305::new(&recv_key.0.into());
184 let mut chunk = chunk;
185 let mut nonce = GenericArray::default();
186 nonce[4..].clone_from_slice(&recv_nonce.to_le_bytes());
187 *recv_nonce += 1;
188 if chunk.len() < 18 {
189 *state = P2pNetworkNoiseStateInner::Error(NoiseError::ChunkTooShort)
190 } else {
191 let data = &mut chunk[2..];
192 let (data, tag) = data.split_at_mut(data.len() - 16);
193 let tag = GenericArray::from_slice(&*tag);
194 if aead
195 .decrypt_in_place_detached(&nonce, &[], data, tag)
196 .is_err()
197 {
198 *state = P2pNetworkNoiseStateInner::Error(dbg!(
199 NoiseError::FirstMacMismatch
200 ));
201 } else {
202 noise_state.decrypted_chunks.push_back(data.to_vec().into());
203 }
204 }
205 }
206 P2pNetworkNoiseStateInner::Error(_) => {}
207 }
208 }
209
210 let remote_peer_id = noise_state.remote_peer_id();
211 let handshake_done = noise_state.handshake_done(action);
212 let mut outgoing = noise_state.outgoing_chunks.clone();
213 let decrypted = noise_state.decrypted_chunks.front().cloned();
214 let middle_initiator = matches!(
215 &noise_state.inner,
216 Some(P2pNetworkNoiseStateInner::Initiator(..))
217 ) && remote_peer_id.is_some();
218
219 let middle_responder = matches!(
220 &noise_state.inner,
221 Some(P2pNetworkNoiseStateInner::Responder(
222 P2pNetworkNoiseStateResponder::Init { .. },
223 ))
224 );
225 let error = noise_state.as_error();
226
227 let (dispatcher, state) = state_context.into_dispatcher_and_state();
228 if let Some(error) = error {
229 dispatcher.push(P2pNetworkSchedulerAction::Error {
230 addr,
231 error: P2pNetworkConnectionError::Noise(error),
232 });
233 } else {
234 if let Some((peer_id, true)) = handshake_done {
235 dispatcher.push(P2pConnectionIncomingAction::FinalizePendingLibp2p {
236 peer_id,
237 addr: addr.sock_addr,
238 });
239
240 let p2p_state: &P2pState = state.substate()?;
241 let this_connection_is_kept = p2p_state.peers
242 .get(&peer_id)
243 .and_then(|peer_state| peer_state.status.as_connecting())
244 .and_then(|connecting| connecting.as_incoming())
245 .is_some_and( |incoming| matches!(incoming, P2pConnectionIncomingState::FinalizePendingLibp2p { addr: a, .. } if a == &addr.sock_addr));
246
247 if !this_connection_is_kept {
248 return Ok(());
249 }
250 }
251
252 while let Some(data) = outgoing.pop_front() {
253 dispatcher.push(P2pNetworkNoiseAction::OutgoingChunk { addr, data });
254 }
255
256 if let Some(data) = decrypted {
257 dispatcher.push(P2pNetworkNoiseAction::DecryptedData {
258 addr,
259 peer_id: remote_peer_id,
260 data,
261 });
262 }
263
264 if middle_initiator || middle_responder {
265 dispatcher.push(P2pNetworkNoiseAction::OutgoingData {
266 addr,
267 data: Data::empty(),
268 });
269 }
270 }
271
272 Ok(())
273 }
274 ref action @ P2pNetworkNoiseAction::OutgoingChunk { addr, ref data }
275 | ref action @ P2pNetworkNoiseAction::OutgoingChunkSelectMux { addr, ref data } => {
276 noise_state.outgoing_chunks.pop_front();
277
278 let handshake_done = noise_state.handshake_done(action);
279 let dispatcher = state_context.into_dispatcher();
280 let data = fuzzed_maybe!(
281 data.iter()
282 .fold(vec![], |mut v, item| {
283 v.extend_from_slice(item);
284 v
285 })
286 .into(),
287 crate::fuzzer::mutate_noise
288 );
289 dispatcher.push(P2pNetworkPnetAction::OutgoingData { addr, data });
290 if let Some((peer_id, incoming)) = handshake_done {
291 dispatcher.push(P2pNetworkNoiseAction::HandshakeDone {
292 addr,
293 peer_id,
294 incoming,
295 });
296 }
297
298 Ok(())
299 }
300 P2pNetworkNoiseAction::OutgoingData { data, addr } => {
301 let Some(state) = &mut noise_state.inner else {
302 bug_condition!("action `P2pNetworkNoiseAction::OutgoingData`: no inner state");
303 return Ok(());
304 };
305
306 match state {
307 P2pNetworkNoiseStateInner::Done {
308 send_key,
309 send_nonce,
310 ..
311 } => {
312 let aead = ChaCha20Poly1305::new(&send_key.0.into());
313 let mut chunks = vec![];
314
315 for data_chunk in data.chunks(MAX_CHUNK_SIZE) {
316 let mut chunk = Vec::with_capacity(18 + data_chunk.len());
317 chunk
318 .extend_from_slice(&((data_chunk.len() + 16) as u16).to_be_bytes());
319 chunk.extend_from_slice(data_chunk);
320
321 let mut nonce: GenericArray<u8, U12> = GenericArray::default();
322 nonce[4..].clone_from_slice(&send_nonce.to_le_bytes());
323 *send_nonce += 1;
324
325 let tag = aead.encrypt_in_place_detached(
326 &nonce,
327 &[],
328 &mut chunk[2..(2 + data_chunk.len())],
329 );
330
331 let tag = match tag {
332 Ok(tag) => tag,
333 Err(_) => {
334 *state =
335 P2pNetworkNoiseStateInner::Error(NoiseError::Encryption);
336 return Ok(());
337 }
338 };
339
340 chunk.extend_from_slice(&tag);
341 chunks.push(chunk.into());
342 }
343 noise_state.outgoing_chunks.push_back(chunks);
344 }
345 P2pNetworkNoiseStateInner::Initiator(i) => {
346 match (i.generate(&data), i.remote_pk.clone()) {
347 (
348 Ok(Some(InitiatorOutput {
349 send_key,
350 recv_key,
351 chunk,
352 })),
353 Some(remote_pk),
354 ) => {
355 noise_state.outgoing_chunks.push_back(vec![chunk.into()]);
356 let remote_peer_id = remote_pk.peer_id();
357
358 if noise_state.expected_peer_id.is_some_and(|expected_per_id| {
359 expected_per_id != remote_peer_id
360 }) {
361 let lhs = noise_state
362 .expected_peer_id
363 .map_or("none".to_string(), PeerId::to_libp2p_string);
364 let rhs = remote_peer_id.to_libp2p_string();
365 *state = P2pNetworkNoiseStateInner::Error(
366 NoiseError::RemotePeerIdMismatch(format!("{lhs} != {rhs}")),
367 );
368 } else {
369 *state = P2pNetworkNoiseStateInner::Done {
370 incoming: false,
371 send_key,
372 recv_key,
373 recv_nonce: 0,
374 send_nonce: 0,
375 remote_pk,
376 remote_peer_id,
377 };
378 }
379 }
380 (Err(error), Some(_)) => {
381 *state = P2pNetworkNoiseStateInner::Error(error);
382 return Ok(());
383 }
384 _ => {}
385 }
386 }
387 P2pNetworkNoiseStateInner::Responder(r) => {
388 if let Some(chunk) = r.generate(&data) {
389 noise_state.outgoing_chunks.push_back(vec![chunk.into()]);
390 }
391 }
392 _ => {}
394 }
395
396 let mut outgoing = noise_state.outgoing_chunks.clone();
397 let error = noise_state.as_error();
398 let dispatcher = state_context.into_dispatcher();
399
400 if let Some(error) = error {
401 dispatcher.push(P2pNetworkSchedulerAction::Error {
402 addr,
403 error: P2pNetworkConnectionError::Noise(error),
404 });
405 } else {
406 while let Some(data) = outgoing.pop_front() {
407 dispatcher.push(P2pNetworkNoiseAction::OutgoingChunk { addr, data });
408 }
409 }
410 Ok(())
411 }
412 P2pNetworkNoiseAction::OutgoingDataSelectMux { data, addr } => {
413 let Some(P2pNetworkNoiseStateInner::Done {
414 send_key,
415 send_nonce,
416 ..
417 }) = &mut noise_state.inner
418 else {
419 bug_condition!(
420 "action `P2pNetworkNoiseAction::OutgoingDataSelectMux`: no inner state"
421 );
422 return Ok(());
423 };
424
425 let aead = ChaCha20Poly1305::new(&send_key.0.into());
426
427 let mut chunk = Vec::with_capacity(18 + data.len());
428 chunk.extend_from_slice(&((data.len() + 16) as u16).to_be_bytes());
429 chunk.extend_from_slice(&data);
430
431 let mut nonce = GenericArray::default();
432 nonce[4..].clone_from_slice(&send_nonce.to_le_bytes());
433 *send_nonce += 1;
434
435 match aead.encrypt_in_place_detached(&nonce, &[], &mut chunk[2..(2 + data.len())]) {
436 Ok(tag) => {
437 chunk.extend_from_slice(&tag);
438 noise_state.outgoing_chunks.push_back(vec![chunk.into()]);
439 }
440 Err(_) => {
441 noise_state.inner =
442 Some(P2pNetworkNoiseStateInner::Error(NoiseError::Encryption));
443 }
444 };
445
446 let outgoing = noise_state.outgoing_chunks.front().cloned();
447 let error = noise_state.as_error();
448 let dispatcher = state_context.into_dispatcher();
449
450 if let Some(error) = error {
451 dispatcher.push(P2pNetworkSchedulerAction::Error {
452 addr,
453 error: P2pNetworkConnectionError::Noise(error),
454 });
455 } else if let Some(data) = outgoing {
456 dispatcher.push(P2pNetworkNoiseAction::OutgoingChunkSelectMux { addr, data })
457 }
458
459 Ok(())
460 }
461 P2pNetworkNoiseAction::DecryptedData {
462 addr,
463 peer_id,
464 data,
465 } => {
466 noise_state.decrypted_chunks.pop_front();
467
468 let remote_peer_id = noise_state.remote_peer_id();
469 let dispatcher = state_context.into_dispatcher();
470 dispatcher.push(P2pNetworkSelectAction::IncomingDataMux {
471 addr,
472 peer_id: peer_id.or(remote_peer_id),
473 data,
474 fin: false,
475 });
476
477 Ok(())
478 }
479 P2pNetworkNoiseAction::HandshakeDone {
480 addr,
481 peer_id,
482 incoming,
483 } => {
484 let dispatcher = state_context.into_dispatcher();
485 dispatcher.push(P2pNetworkSelectAction::Init {
486 addr,
487 kind: SelectKind::Multiplexing(peer_id),
488 incoming,
489 });
490 Ok(())
491 }
492 }
493 }
494}