p2p/network/pnet/
p2p_network_pnet_reducer.rs

1use openmina_core::{bug_condition, Substate};
2use salsa_simple::XSalsa20;
3
4use crate::{
5    disconnection::P2pDisconnectionReason, Data, P2pNetworkPnetEffectfulAction,
6    P2pNetworkSchedulerAction, P2pNetworkSchedulerState, P2pNetworkSelectAction,
7};
8
9use super::{
10    p2p_network_pnet_state::{Half, P2pNetworkPnetState},
11    *,
12};
13
14impl Half {
15    fn reduce(&mut self, shared_secret: &[u8; 32], data: &[u8]) {
16        match self {
17            Half::Buffering { buffer, offset } => {
18                if *offset + data.len() < 24 {
19                    buffer[*offset..(*offset + data.len())].clone_from_slice(data);
20                    *offset += data.len();
21                } else {
22                    if *offset < 24 {
23                        buffer[*offset..24].clone_from_slice(&data[..(24 - *offset)]);
24                    }
25                    let nonce = *buffer;
26                    let remaining = data[(24 - *offset)..].to_vec().into_boxed_slice();
27                    *self = Half::Done {
28                        cipher: XSalsa20::new(*shared_secret, nonce),
29                        to_send: vec![],
30                    };
31                    self.reduce(shared_secret, &remaining);
32                }
33            }
34            Half::Done { cipher, to_send } => {
35                *to_send = data.to_vec();
36                cipher.apply_keystream(to_send.as_mut());
37            }
38        }
39    }
40}
41
42impl P2pNetworkPnetState {
43    /// Substate is accessed
44    pub fn reducer<Action, State>(
45        mut state_context: Substate<Action, State, P2pNetworkSchedulerState>,
46        action: redux::ActionWithMeta<P2pNetworkPnetAction>,
47    ) -> Result<(), String>
48    where
49        State: crate::P2pStateTrait,
50        Action: crate::P2pActionTrait<State>,
51    {
52        let (action, _meta) = action.split();
53        let pnet_state = &mut state_context
54            .get_substate_mut()?
55            .connection_state_mut(action.addr())
56            .ok_or_else(|| format!("Missing connection for action: {:?}", action))?
57            .pnet;
58
59        match action {
60            P2pNetworkPnetAction::IncomingData { data, addr } => {
61                pnet_state.incoming.reduce(&pnet_state.shared_secret, &data);
62
63                let (dispatcher, state) = state_context.into_dispatcher_and_state();
64                let scheduler_state: &P2pNetworkSchedulerState = state.substate()?;
65                let pnet_state = &scheduler_state
66                    .connection_state(&addr)
67                    .ok_or("Missing connection for action: `P2pNetworkPnetAction::IncomingData`")?
68                    .pnet;
69
70                let Half::Done { to_send, .. } = &pnet_state.incoming else {
71                    // this should be only if incoming state didn't receive enough data
72                    return Ok(());
73                };
74
75                if !to_send.is_empty() {
76                    let data = Data::from(to_send.clone());
77                    dispatcher.push(P2pNetworkSelectAction::IncomingDataAuth {
78                        addr,
79                        data,
80                        fin: false,
81                    });
82                }
83
84                Ok(())
85            }
86            P2pNetworkPnetAction::OutgoingData { data, addr } => {
87                pnet_state.outgoing.reduce(&pnet_state.shared_secret, &data);
88
89                let (dispatcher, state) = state_context.into_dispatcher_and_state();
90                let scheduler_state: &P2pNetworkSchedulerState = state.substate()?;
91                let pnet_state = &scheduler_state
92                    .connection_state(&addr)
93                    .ok_or("Missing connection for action: `P2pNetworkPnetAction::OutgoingData`")?
94                    .pnet;
95
96                let Half::Done { to_send, .. } = &pnet_state.outgoing else {
97                    // this should be only if we haven't set enough data to change from `Buffering` to `Done`
98                    return Ok(());
99                };
100
101                if !to_send.is_empty() {
102                    dispatcher.push(P2pNetworkPnetEffectfulAction::OutgoingData {
103                        addr,
104                        data: to_send.clone(),
105                    });
106                }
107                Ok(())
108            }
109            P2pNetworkPnetAction::SetupNonce {
110                nonce,
111                addr,
112                incoming,
113            } => {
114                pnet_state
115                    .outgoing
116                    .reduce(&pnet_state.shared_secret, &nonce);
117
118                let (dispatcher, state) = state_context.into_dispatcher_and_state();
119                let scheduler_state: &P2pNetworkSchedulerState = state.substate()?;
120                let pnet_state = &scheduler_state
121                    .connection_state(&addr)
122                    .ok_or("Missing connection for action: `P2pNetworkPnetAction::SetupNonce`")?
123                    .pnet;
124
125                if !matches!(&pnet_state.outgoing, Half::Done { .. }) {
126                    bug_condition!(
127                        "Invalid state for `P2pNetworkPnetAction::SetupNonce`, state: {:?}",
128                        pnet_state.outgoing
129                    );
130                    return Ok(());
131                };
132
133                dispatcher.push(P2pNetworkPnetEffectfulAction::SetupNonce {
134                    addr,
135                    nonce,
136                    incoming,
137                });
138                Ok(())
139            }
140            P2pNetworkPnetAction::Timeout { addr } => {
141                let dispatcher = state_context.into_dispatcher();
142                dispatcher.push(P2pNetworkSchedulerAction::Disconnect {
143                    addr,
144                    reason: P2pDisconnectionReason::Timeout,
145                });
146                Ok(())
147            }
148        }
149    }
150}