use crate::common::binary_conversions::{binary_to_ip, ip_to_binary}; use crate::lazy_static; use crate::log::{info, warn}; use crate::records::memory::enums::{ClientType, ConnectionType}; use crate::records::memory::response_channels::{delete_entry, reserve_entry, Command}; use crate::records::memory::structs::{Connection, StoreConnectionParams}; use crate::rpc::client::handshake_processing::{bootstrap_peer_discovery, BootstrapParams}; use crate::rpc::command_maps::RPC_BLOCK_HEIGHT; use crate::rpc::responses::RpcResponse; use crate::sled::Db; use crate::sleep; use crate::thread_rng; use crate::timeout; use crate::wallets::structures::Wallet; use crate::Arc; use crate::AsyncWriteExt; use crate::AtomicBool; use crate::AtomicOrdering; use crate::Duration; use crate::IteratorRandom; use crate::Mutex; use crate::RwLock; use crate::TcpStream; fn split_ip_port_key(value: &str) -> Option<(String, u16)> { // Connection keys are stored as ip:port strings; IPv6 addresses may arrive // bracketed, so strip brackets before parsing the port. let (ip_part, port_part) = value.rsplit_once(':')?; let ip = ip_part .strip_prefix('[') .and_then(|inner| inner.strip_suffix(']')) .unwrap_or(ip_part) .to_string(); let port = port_part.parse::().ok()?; Some((ip, port)) } use crate::records::memory::structs::{ConnectionInfo, ConnectionKey}; #[derive(Clone)] struct ReconnectContext { db: Db, wallet_key: String, map: Arc>, } lazy_static! { static ref RECONNECT_CONTEXT: Mutex> = Mutex::new(None); static ref RECONNECT_IN_PROGRESS: AtomicBool = AtomicBool::new(false); } fn try_start_reconnect() -> bool { // Only one reconnect path should run at a time, whether it came from // liveness failure or bootstrap recovery. RECONNECT_IN_PROGRESS .compare_exchange(false, true, AtomicOrdering::SeqCst, AtomicOrdering::SeqCst) .is_ok() } fn finish_reconnect() { // Release the reconnect gate after the async reconnect attempt finishes. RECONNECT_IN_PROGRESS.store(false, AtomicOrdering::SeqCst); } pub async fn set_reconnect_context(db: Db, wallet_key: String, map: Arc>) { let mut context = RECONNECT_CONTEXT.lock().await; // Store enough state for later liveness checks to reconnect without // needing the original startup stack. *context = Some(ReconnectContext { db, wallet_key, map, }); } async fn reconnect_dropped_outgoing(excluded_ip: &str) { if !try_start_reconnect() { warn!("[reconnect] replacement attempt already in progress, skipping duplicate request"); return; } async { // When an outgoing peer disappears, try to replace it with another // active node that is not already connected and is not the failed IP. let context = { let guard = RECONNECT_CONTEXT.lock().await; guard.clone() }; let Some(context) = context else { warn!("[reconnect] no reconnect context configured"); return; }; let excluded_ip_bytes = ip_to_binary(excluded_ip); let live_connection = { let guard = CONNECTIONS.read().await; guard.as_ref().and_then(|conn| { conn.connection_map.iter().find_map(|(key, info)| { if key.ip == excluded_ip_bytes { return None; } if ClientType::from_bytes(&info.client_type) != Some(ClientType::Miner) { return None; } let ip = binary_to_ip(key.ip.clone()); let connections_key = format!("{}:{}", ip, key.port); Some((connections_key, Arc::clone(&info.stream))) }) }) }; let Some((connections_key, stream)) = live_connection else { warn!("[reconnect] no live stream available for bootstrap recovery"); return; }; let bootstrap_params = BootstrapParams { stream, connections_key, wallet_key: context.wallet_key, db: context.db, map: context.map, first: false, }; if let Err(err) = bootstrap_peer_discovery(bootstrap_params).await { warn!("[reconnect] bootstrap recovery failed: {err}"); } } .await; finish_reconnect(); } pub fn spawn_reconnect_bootstrap(params: BootstrapParams) { if !try_start_reconnect() { warn!("[reconnect] bootstrap recovery already in progress, skipping duplicate request"); return; } // Bootstrap discovery can perform network requests, so it runs detached // from the caller that noticed the connection problem. tokio::spawn(async move { if let Err(err) = bootstrap_peer_discovery(params).await { warn!("[reconnect] bootstrap recovery failed: {err}"); } finish_reconnect(); }); } impl Connection { // Initialize the in-memory connection manager state. pub fn new() -> Self { Self::default() } // Store a live socket in memory along with its role, peer identity, // and session metadata used by the RPC and peer-management paths. pub fn store_connection(&mut self, params: StoreConnectionParams) -> bool { let StoreConnectionParams { connection_type, ip, port, stream, client_type, wallet_address, command_map, } = params; let ip_bytes = ip_to_binary(&ip); let connection_key = ConnectionKey { connection_type: connection_type.as_bytes(), ip: ip_bytes.clone(), port, }; let connection_key2 = ConnectionKey { connection_type: connection_type.opposite().as_bytes(), ip: ip_bytes.clone(), port, }; // Miner nodes are identified by IP, not by port. A second node // announcing the same IP is rejected even if it uses another // socket port. if client_type == ClientType::Miner && self.connection_map.iter().any(|(key, info)| { key.ip == ip_bytes && ClientType::from_bytes(&info.client_type) == Some(ClientType::Miner) }) { return false; } // Non-miner RPC clients still use the full socket key so short // request/response connections do not collide unnecessarily. if self.connection_map.contains_key(&connection_key) || self.connection_map.contains_key(&connection_key2) { return false; } let address = Wallet::long_address_to_bytes(wallet_address); if address.len() != Wallet::ADDRESS_BYTES_LENGTH { return false; } let connection_info = ConnectionInfo::new( connection_type.as_bytes(), ip_bytes, port, stream.clone(), client_type.as_bytes(), address, ); self.connection_map.insert(connection_key, connection_info); if client_type == ClientType::Miner { Connection::client_checkup(stream, connection_type, ip, port, command_map); } true } // Remove a specific connection entry by direction, IP, and port. pub fn drop_connection( &mut self, connection_type: ConnectionType, ip: String, port: u16, ) -> Option { let ip_bytes = ip_to_binary(&ip); let connection_key = ConnectionKey { connection_type: connection_type.as_bytes(), ip: ip_bytes, port, }; let removed = self.connection_map.remove(&connection_key); if let Some(connection_info) = removed.as_ref() { let stream = Arc::clone(&connection_info.stream); tokio::spawn(async move { let mut stream_guard = stream.lock().await; let _ = stream_guard.shutdown().await; }); } if let Some(connection_info) = removed.as_ref() { let client_role = ClientType::from_bytes(&connection_info.client_type) .map(|client_type| client_type.as_str()) .unwrap_or("unknown"); info!( "[connection_manager] connection dropped: role={} direction={} peer={}:{}", client_role, connection_type.as_str(), ip, port ); } removed } pub fn client_checkup( stream: Arc>, connection_type: ConnectionType, ip: String, port: u16, command_map: Arc>, ) { tokio::spawn(async move { loop { sleep(Duration::from_secs(30)).await; let still_registered = { let guard = CONNECTIONS.read().await; guard .as_ref() .map(|conn| { let connection_key = ConnectionKey { connection_type: connection_type.as_bytes(), ip: ip_to_binary(&ip), port, }; conn.connection_map.contains_key(&connection_key) }) .unwrap_or(false) }; if !still_registered { break; } let message_type = RPC_BLOCK_HEIGHT; // Block-height request used as a lightweight checkup ping. let (checkup_key, _checkup_tx, checkup_rx_mutex) = reserve_entry(command_map.clone()).await; // Send a lightweight ping message and wait for the reply // routed back through the shared response hashmap. let mut message: Vec = Vec::with_capacity(4); message.push(message_type); message.extend_from_slice(&checkup_key); RpcResponse::send_raw(&stream, None, &message).await; let response_result = { let mut checkup_rx = checkup_rx_mutex.lock().await; timeout(Duration::from_secs(30), checkup_rx.recv()).await }; match response_result { Ok(Some(_reply)) => { info!( "[connection_manager] liveness check ok: type={} peer={}:{}", connection_type.as_str(), ip, port ); } _ => { let still_registered = { let guard = CONNECTIONS.read().await; guard .as_ref() .map(|conn| { let connection_key = ConnectionKey { connection_type: connection_type.as_bytes(), ip: ip_to_binary(&ip), port, }; conn.connection_map.contains_key(&connection_key) }) .unwrap_or(false) }; if !still_registered { delete_entry(command_map.clone(), checkup_key).await; break; } // Timed-out or missing replies drop the connection, // and outgoing peers trigger replacement discovery. warn!( "[connection_manager] liveness check failed: type={} peer={}:{}", connection_type.as_str(), ip, port ); delete_entry(command_map.clone(), checkup_key).await; let mut guard = CONNECTIONS.write().await; if let Some(conn) = guard.as_mut() { conn.drop_connection(connection_type, ip.clone(), port); } drop(guard); if connection_type == ConnectionType::Outgoing { reconnect_dropped_outgoing(&ip).await; } break; } } } }); } // Count active incoming peer connections. pub fn count_incoming_connections(&self) -> usize { self.connection_map .values() .filter(|info| { ConnectionType::from_bytes(&info.connection_type) == Some(ConnectionType::Incoming) }) .count() } // Count active outgoing peer connections. pub fn count_outgoing_connections(&self) -> usize { self.connection_map .values() .filter(|info| { ConnectionType::from_bytes(&info.connection_type) == Some(ConnectionType::Outgoing) }) .count() } // Return all live peer streams so broadcast-style paths can fan out // messages without caring whether a peer was incoming or outgoing. pub fn get_all_streams(&self) -> Vec>> { self.connection_map .values() .filter(|connection_info| { ClientType::from_bytes(&connection_info.client_type) == Some(ClientType::Miner) }) .map(|connection_info| Arc::clone(&connection_info.stream)) .collect() } // Return all non-client peer streams so network-wide broadcasts can // reach every reachable chain peer. pub fn get_all_peer_streams(&self) -> Vec>> { self.connection_map .values() .filter(|connection_info| { ClientType::from_bytes(&connection_info.client_type) == Some(ClientType::Miner) }) .map(|connection_info| Arc::clone(&connection_info.stream)) .collect() } // Resolve a stored outgoing node connection back to its live stream. pub fn get_stream_for_outgoing(&self, ip: &str, port: u16) -> Option>> { let ip_bytes = ip_to_binary(ip); let connection_key = ConnectionKey { connection_type: ConnectionType::Outgoing.as_bytes(), ip: ip_bytes, port, }; self.connection_map .get(&connection_key) .filter(|info| ClientType::from_bytes(&info.client_type) == Some(ClientType::Miner)) .map(|info| Arc::clone(&info.stream)) } // Look up a live miner stream by the exact ip:port connection key. // Network-map records only store bare IPs, so they must not be used // to select an arbitrary live socket. pub async fn get_stream_from_memory(key: &str) -> Option>> { let (ip, port) = split_ip_port_key(key)?; let lock = CONNECTIONS.read().await; let conn = lock.as_ref()?; let ip_bytes = ip_to_binary(&ip); conn.connection_map .iter() .find_map(|(connection_key, info)| { if connection_key.ip == ip_bytes && connection_key.port == port && ClientType::from_bytes(&info.client_type) == Some(ClientType::Miner) { Some(Arc::clone(&info.stream)) } else { None } }) } // Build the serialized connection key for a live stream when only // the stream handle is known. pub fn connection_key_for_stream(&self, stream: &Arc>) -> Option { self.connection_map .iter() .find_map(|(connection_key, connection_info)| { if Arc::ptr_eq(&connection_info.stream, stream) { let ip = binary_to_ip(connection_key.ip.clone()); Some(format!("{}:{}", ip, connection_key.port)) } else { None } }) } // Find the first stored connection record for the requested IP. pub fn find_connection_info(&self, ip: &str) -> Option<(ConnectionType, u16)> { let ip_bytes = ip_to_binary(ip); for (key, _info) in self.connection_map.iter() { if key.ip == ip_bytes { let connection_type = ConnectionType::from_bytes(&key.connection_type)?; return Some((connection_type, key.port)); } } None } // Find a stored connection by IP, constrained to a specific client role. pub fn find_connection_info_by_client_type( &self, ip: &str, client_type: ClientType, ) -> Option<(ConnectionType, u16)> { let ip_bytes = ip_to_binary(ip); let client_type_bytes = client_type.as_bytes(); for (key, info) in self.connection_map.iter() { if key.ip == ip_bytes && info.client_type == client_type_bytes { let connection_type = ConnectionType::from_bytes(&key.connection_type)?; return Some((connection_type, key.port)); } } None } // Find the stored outgoing port for a peer IP so reconnect and // cleanup logic can target the correct connection entry. pub fn find_outgoing_port(&self, ip: &str) -> Option { let ip_bytes = ip_to_binary(ip); self.connection_map .iter() .find(|(key, _)| { key.connection_type == ConnectionType::Outgoing.as_bytes() && key.ip == ip_bytes }) .map(|(key, _)| key.port) } // Prefer a random incoming node connection, falling back to an // outgoing node connection when no incoming peer is available. pub fn get_random_connection(&self, excluded_key: Option<&str>) -> Option<(Vec, u16)> { let mut rng = thread_rng(); let excluded = excluded_key.and_then(split_ip_port_key); if let Some((key, _info)) = self .connection_map .iter() .filter(|(key, info)| { ConnectionType::from_bytes(&key.connection_type) == Some(ConnectionType::Incoming) && ClientType::from_bytes(&info.client_type) == Some(ClientType::Miner) && excluded .as_ref() .map(|(ip, _)| key.ip != ip_to_binary(ip)) .unwrap_or(true) }) .choose(&mut rng) { return Some((key.ip.clone(), key.port)); } if let Some((key, _info)) = self .connection_map .iter() .filter(|(key, info)| { ConnectionType::from_bytes(&key.connection_type) == Some(ConnectionType::Outgoing) && ClientType::from_bytes(&info.client_type) == Some(ClientType::Miner) && excluded .as_ref() .map(|(ip, _)| key.ip != ip_to_binary(ip)) .unwrap_or(true) }) .choose(&mut rng) { return Some((key.ip.clone(), key.port)); } None } } lazy_static! { pub static ref CONNECTIONS: Arc>> = Arc::new(RwLock::new(None)); } pub async fn initialize_connection() { // Lazily create the singleton connection manager the first time the // node starts accepting or opening peer connections. let mut connection_instance = CONNECTIONS.write().await; if connection_instance.is_none() { *connection_instance = Some(Connection::new()); } } pub async fn outgoing_connection_count() -> usize { // Read the singleton connection manager and count live outgoing peers. CONNECTIONS .read() .await .as_ref() .map(|connection| connection.count_outgoing_connections()) .unwrap_or(0) } pub async fn get_client_type_from_memory(key: &str) -> Option { // Recover the stored client role from the serialized connection key // used throughout the RPC layer. let (ip, port) = split_ip_port_key(key)?; let ip_bytes = ip_to_binary(&ip); let guard = CONNECTIONS.read().await; let conn = guard.as_ref()?; for (connection_key, info) in conn.connection_map.iter() { if connection_key.ip == ip_bytes && connection_key.port == port { return ClientType::from_bytes(&info.client_type); } } None }