p2p/channels/snark/
p2p_channels_snark_actions.rs

1use openmina_core::{snark::Snark, ActionEvent};
2use serde::{Deserialize, Serialize};
3
4use crate::{channels::P2pChannelsAction, P2pState, PeerId};
5
6use super::{P2pChannelsSnarkState, SnarkInfo, SnarkPropagationState};
7
8pub type P2pChannelsSnarkActionWithMetaRef<'a> = redux::ActionWithMeta<&'a P2pChannelsSnarkAction>;
9
10#[derive(Serialize, Deserialize, Debug, Clone, ActionEvent)]
11#[action_event(fields(display(peer_id)))]
12pub enum P2pChannelsSnarkAction {
13    Init {
14        peer_id: PeerId,
15    },
16    Pending {
17        peer_id: PeerId,
18    },
19    Ready {
20        peer_id: PeerId,
21    },
22    #[action_event(level = debug, fields(display(peer_id), limit))]
23    RequestSend {
24        peer_id: PeerId,
25        limit: u8,
26    },
27    #[action_event(level = debug, fields(display(peer_id), promised_count))]
28    PromiseReceived {
29        peer_id: PeerId,
30        promised_count: u8,
31    },
32    Received {
33        peer_id: PeerId,
34        snark: Box<SnarkInfo>,
35    },
36    #[action_event(level = debug, fields(display(peer_id), limit))]
37    RequestReceived {
38        peer_id: PeerId,
39        limit: u8,
40    },
41    #[action_event(level = debug, fields(display(peer_id), snarks = snarks.len(), first_index, last_index))]
42    ResponseSend {
43        peer_id: PeerId,
44        snarks: Vec<SnarkInfo>,
45        first_index: u64,
46        last_index: u64,
47    },
48    Libp2pReceived {
49        peer_id: PeerId,
50        snark: Box<Snark>,
51        nonce: u32,
52    },
53    /// Checks if a snark has already been received from pubsub network, ff it has, it broadcasts a validated message.
54    /// If not, it constructs a new message with the snark and broadcasts it to the network,
55    /// either directly or by rebroadcasting it if it was received from a WebRTC connection.
56    Libp2pBroadcast {
57        snark: Snark,
58        nonce: u32,
59        is_local: bool,
60    },
61}
62
63impl P2pChannelsSnarkAction {
64    pub fn peer_id(&self) -> Option<&PeerId> {
65        match self {
66            Self::Init { peer_id }
67            | Self::Pending { peer_id }
68            | Self::Ready { peer_id }
69            | Self::RequestSend { peer_id, .. }
70            | Self::PromiseReceived { peer_id, .. }
71            | Self::Received { peer_id, .. }
72            | Self::RequestReceived { peer_id, .. }
73            | Self::ResponseSend { peer_id, .. } => Some(peer_id),
74            Self::Libp2pReceived { peer_id, .. } => Some(peer_id),
75            Self::Libp2pBroadcast { .. } => None,
76        }
77    }
78}
79
80impl redux::EnablingCondition<P2pState> for P2pChannelsSnarkAction {
81    fn is_enabled(&self, state: &P2pState, _time: redux::Timestamp) -> bool {
82        match self {
83            P2pChannelsSnarkAction::Init { peer_id } => state
84                .get_ready_peer(peer_id)
85                .is_some_and(|p| matches!(&p.channels.snark, P2pChannelsSnarkState::Enabled)),
86            P2pChannelsSnarkAction::Pending { peer_id } => state
87                .get_ready_peer(peer_id)
88                .is_some_and(|p| matches!(&p.channels.snark, P2pChannelsSnarkState::Init { .. })),
89            P2pChannelsSnarkAction::Ready { peer_id } => {
90                state.get_ready_peer(peer_id).is_some_and(|p| {
91                    matches!(&p.channels.snark, P2pChannelsSnarkState::Pending { .. })
92                })
93            }
94            P2pChannelsSnarkAction::RequestSend { peer_id, .. } => {
95                state.get_ready_peer(peer_id).is_some_and(|p| {
96                    matches!(
97                        &p.channels.snark,
98                        P2pChannelsSnarkState::Ready {
99                            local: SnarkPropagationState::WaitingForRequest { .. }
100                                | SnarkPropagationState::Responded { .. },
101                            ..
102                        }
103                    )
104                })
105            }
106            P2pChannelsSnarkAction::PromiseReceived {
107                peer_id,
108                promised_count,
109            } => state.get_ready_peer(peer_id).is_some_and(|p| {
110                matches!(
111                    &p.channels.snark,
112                    P2pChannelsSnarkState::Ready {
113                        local: SnarkPropagationState::Requested {
114                            requested_limit, ..
115                        }, ..
116                    } if *promised_count > 0 && promised_count <= requested_limit
117                )
118            }),
119            P2pChannelsSnarkAction::Received { peer_id, .. } => {
120                state.get_ready_peer(peer_id).is_some_and(|p| {
121                    matches!(
122                        &p.channels.snark,
123                        P2pChannelsSnarkState::Ready {
124                            local: SnarkPropagationState::Responding { .. },
125                            ..
126                        }
127                    )
128                })
129            }
130            P2pChannelsSnarkAction::RequestReceived { peer_id, limit } => {
131                *limit > 0
132                    && state.get_ready_peer(peer_id).is_some_and(|p| {
133                        matches!(
134                            &p.channels.snark,
135                            P2pChannelsSnarkState::Ready {
136                                remote: SnarkPropagationState::WaitingForRequest { .. }
137                                    | SnarkPropagationState::Responded { .. },
138                                ..
139                            }
140                        )
141                    })
142            }
143            P2pChannelsSnarkAction::ResponseSend {
144                peer_id,
145                snarks,
146                first_index,
147                last_index,
148            } => {
149                !snarks.is_empty()
150                    && first_index <= last_index
151                    && state
152                        .get_ready_peer(peer_id)
153                        .is_some_and(|p| match &p.channels.snark {
154                            P2pChannelsSnarkState::Ready {
155                                remote,
156                                next_send_index,
157                                ..
158                            } => {
159                                if first_index < next_send_index {
160                                    return false;
161                                }
162                                match remote {
163                                    SnarkPropagationState::Requested {
164                                        requested_limit, ..
165                                    } => snarks.len() <= *requested_limit as usize,
166                                    _ => false,
167                                }
168                            }
169                            _ => false,
170                        })
171            }
172            P2pChannelsSnarkAction::Libp2pReceived { peer_id, .. } => {
173                cfg!(feature = "p2p-libp2p")
174                    && state
175                        .peers
176                        .get(peer_id)
177                        .filter(|p| p.is_libp2p())
178                        .and_then(|p| p.status.as_ready())
179                        .is_some_and(|p| p.channels.snark.is_ready())
180            }
181            P2pChannelsSnarkAction::Libp2pBroadcast { .. } => {
182                cfg!(feature = "p2p-libp2p")
183                    && state
184                        .peers
185                        .iter()
186                        .any(|(_, p)| p.is_libp2p() && p.status.as_ready().is_some())
187            }
188        }
189    }
190}
191
192impl From<P2pChannelsSnarkAction> for crate::P2pAction {
193    fn from(action: P2pChannelsSnarkAction) -> Self {
194        Self::Channels(P2pChannelsAction::Snark(action))
195    }
196}