use crate::common::binary_conversions::{binary_to_ip, ip_to_binary}; use crate::lazy_static; use crate::log::{info, warn}; use crate::records::memory::enums::{ClientType, ConnectionType}; use crate::records::memory::network_mapping::monitor::{MONITOR_ACTION_ADD, MONITOR_ACTION_REMOVE}; use crate::records::memory::network_mapping::structs::{MonitorAddressParams, SignedMonitorEdit}; use crate::records::memory::network_mapping::NodeInfo; use crate::records::memory::response_channels::{delete_entry, reserve_entry, Command}; use crate::records::memory::structs::{Connection, StoreConnectionParams}; use crate::rpc::client::handshake::connect_and_handshake; use crate::rpc::client::handshake_processing::{bootstrap_peer_discovery, BootstrapParams}; use crate::rpc::client::structs::Connect; use crate::rpc::command_maps::RPC_BLOCK_HEIGHT; use crate::rpc::responses::RpcResponse; use crate::sled::Db; use crate::sleep; use crate::thread_rng; use crate::timeout; use crate::wallets::structures::Wallet; use crate::Arc; use crate::AsyncWriteExt; use crate::AtomicBool; use crate::AtomicOrdering; use crate::Duration; use crate::IteratorRandom; use crate::Mutex; use crate::RwLock; use crate::TcpStream; fn split_ip_port_key(value: &str) -> Option<(String, u16)> { // Connection keys are stored as ip:port strings; IPv6 addresses may arrive // bracketed, so strip brackets before parsing the port. let (ip_part, port_part) = value.rsplit_once(':')?; let ip = ip_part .strip_prefix('[') .and_then(|inner| inner.strip_suffix(']')) .unwrap_or(ip_part) .to_string(); let port = port_part.parse::().ok()?; Some((ip, port)) } use crate::records::memory::structs::{ConnectionInfo, ConnectionKey}; #[derive(Clone)] struct ReconnectContext { db: Db, wallet: Arc, map: Arc>, } lazy_static! { static ref RECONNECT_CONTEXT: Mutex> = Mutex::new(None); static ref RECONNECT_IN_PROGRESS: AtomicBool = AtomicBool::new(false); } fn try_start_reconnect() -> bool { // Only one reconnect path should run at a time, whether it came from // liveness failure or bootstrap recovery. RECONNECT_IN_PROGRESS .compare_exchange(false, true, AtomicOrdering::SeqCst, AtomicOrdering::SeqCst) .is_ok() } fn finish_reconnect() { // Release the reconnect gate after the async reconnect attempt finishes. RECONNECT_IN_PROGRESS.store(false, AtomicOrdering::SeqCst); } pub async fn set_reconnect_context(db: Db, wallet: Arc, map: Arc>) { let mut context = RECONNECT_CONTEXT.lock().await; // Store enough state for later liveness checks to reconnect without // needing the original startup stack. *context = Some(ReconnectContext { db, wallet, map }); } async fn reconnect_replacement_inner(excluded_ip: &str) { // When an outgoing peer disappears, try to replace it with another // active node that is not already connected and is not the failed IP. let context = { let guard = RECONNECT_CONTEXT.lock().await; guard.clone() }; let Some(context) = context else { warn!("[reconnect] no reconnect context configured"); return; }; let excluded_ip_bytes = ip_to_binary(excluded_ip); let live_connection = { let guard = CONNECTIONS.read().await; guard.as_ref().and_then(|conn| { conn.connection_map.iter().find_map(|(key, info)| { if key.ip == excluded_ip_bytes { return None; } if ClientType::from_bytes(&info.client_type) != Some(ClientType::Miner) { return None; } let ip = binary_to_ip(key.ip.clone()); let connections_key = format!("{}:{}", ip, key.port); Some((connections_key, Arc::clone(&info.stream))) }) }) }; let Some((connections_key, stream)) = live_connection else { warn!("[reconnect] no live stream available for bootstrap recovery"); return; }; let bootstrap_params = BootstrapParams { stream, connections_key, wallet: context.wallet, db: context.db, map: context.map, first: false, }; if let Err(err) = bootstrap_peer_discovery(bootstrap_params).await { warn!("[reconnect] bootstrap recovery failed: {err}"); } } async fn retry_dropped_outgoing(ip: String, port: u16) { if !try_start_reconnect() { warn!("[reconnect] reconnect attempt already in progress, skipping duplicate request"); return; } async { let context = { let guard = RECONNECT_CONTEXT.lock().await; guard.clone() }; let Some(context) = context else { warn!("[reconnect] no reconnect context configured"); return; }; let addr_string = format!("{ip}:{port}"); for attempt in 1..=3 { sleep(Duration::from_secs(30)).await; let socket_addr = match addr_string.parse() { Ok(addr) => addr, Err(err) => { warn!("[reconnect] invalid dropped peer address {addr_string}: {err}"); break; } }; let connect = Connect { addr: socket_addr, node_ip: addr_string.clone(), wallet: context.wallet.clone(), db: context.db.clone(), map: context.map.clone(), first: false, }; match connect_and_handshake(connect).await { Ok(()) => { info!("[reconnect] reconnected dropped peer {addr_string} on attempt {attempt}"); return; } Err(err) => { warn!( "[reconnect] failed to reconnect dropped peer {addr_string} on attempt {attempt}/3: {err}" ); } } } reconnect_replacement_inner(&ip).await; } .await; finish_reconnect(); } pub fn spawn_retry_dropped_outgoing(ip: String, port: u16) { tokio::spawn(async move { retry_dropped_outgoing(ip, port).await; }); } pub fn spawn_reconnect_bootstrap(params: BootstrapParams) { if !try_start_reconnect() { warn!("[reconnect] bootstrap recovery already in progress, skipping duplicate request"); return; } // Bootstrap discovery can perform network requests, so it runs detached // from the caller that noticed the connection problem. tokio::spawn(async move { if let Err(err) = bootstrap_peer_discovery(params).await { warn!("[reconnect] bootstrap recovery failed: {err}"); } finish_reconnect(); }); } impl Connection { // Initialize the in-memory connection manager state. pub fn new() -> Self { Self::default() } // Store a live socket in memory along with its role, peer identity, // and session metadata used by the RPC and peer-management paths. pub fn store_connection(&mut self, params: StoreConnectionParams) -> bool { let StoreConnectionParams { connection_type, ip, port, stream, client_type, wallet_public_key, command_map, } = params; let ip_bytes = ip_to_binary(&ip); let connection_key = ConnectionKey { connection_type: connection_type.as_bytes(), ip: ip_bytes.clone(), port, }; let connection_key2 = ConnectionKey { connection_type: connection_type.opposite().as_bytes(), ip: ip_bytes.clone(), port, }; // Miner nodes are identified by IP, not by port. A second node // announcing the same IP is rejected even if it uses another // socket port. if client_type == ClientType::Miner && self.connection_map.iter().any(|(key, info)| { key.ip == ip_bytes && ClientType::from_bytes(&info.client_type) == Some(ClientType::Miner) }) { return false; } // Non-miner RPC clients still use the full socket key so short // request/response connections do not collide unnecessarily. if self.connection_map.contains_key(&connection_key) || self.connection_map.contains_key(&connection_key2) { return false; } if wallet_public_key.len() != Wallet::PUBLIC_KEY_LENGTH { return false; } let connection_info = ConnectionInfo::new( connection_type.as_bytes(), ip_bytes, port, stream.clone(), client_type.as_bytes(), wallet_public_key.clone(), ); self.connection_map.insert(connection_key, connection_info); if client_type == ClientType::Miner { spawn_monitor_update( ip.clone(), MONITOR_ACTION_ADD, wallet_public_key.clone(), port, ); Connection::client_checkup(stream, connection_type, ip, port, command_map); } true } // Remove a specific connection entry by direction, IP, and port. pub fn drop_connection( &mut self, connection_type: ConnectionType, ip: String, port: u16, ) -> Option { let ip_bytes = ip_to_binary(&ip); let connection_key = ConnectionKey { connection_type: connection_type.as_bytes(), ip: ip_bytes, port, }; let removed = self.connection_map.remove(&connection_key); if let Some(connection_info) = removed.as_ref() { if ClientType::from_bytes(&connection_info.client_type) == Some(ClientType::Miner) { spawn_monitor_update( ip.clone(), MONITOR_ACTION_REMOVE, connection_info.wallet_public_key.clone(), port, ); } let stream = Arc::clone(&connection_info.stream); tokio::spawn(async move { let mut stream_guard = stream.lock().await; let _ = stream_guard.shutdown().await; }); } if let Some(connection_info) = removed.as_ref() { let client_role = ClientType::from_bytes(&connection_info.client_type) .map(|client_type| client_type.as_str()) .unwrap_or("unknown"); info!( "[connection_manager] connection dropped: role={} direction={} peer={}:{}", client_role, connection_type.as_str(), ip, port ); } removed } pub fn client_checkup( stream: Arc>, connection_type: ConnectionType, ip: String, port: u16, command_map: Arc>, ) { tokio::spawn(async move { let connection_key = ConnectionKey { connection_type: connection_type.as_bytes(), ip: ip_to_binary(&ip), port, }; let mut consecutive_failures = 0_u8; loop { sleep(Duration::from_secs(30)).await; let still_monitoring_same_stream = { let guard = CONNECTIONS.read().await; guard .as_ref() .and_then(|conn| conn.connection_map.get(&connection_key)) .map(|connection_info| Arc::ptr_eq(&connection_info.stream, &stream)) .unwrap_or(false) }; if !still_monitoring_same_stream { break; } let message_type = RPC_BLOCK_HEIGHT; // Block-height request used as a lightweight checkup ping. let (checkup_key, _checkup_tx, checkup_rx_mutex) = reserve_entry(command_map.clone()).await; // Send a lightweight ping message and wait for the reply // routed back through the shared response hashmap. let mut message: Vec = Vec::with_capacity(4); message.push(message_type); message.extend_from_slice(&checkup_key); RpcResponse::send_raw(&stream, None, &message).await; let response_result = { let mut checkup_rx = checkup_rx_mutex.lock().await; timeout(Duration::from_secs(30), checkup_rx.recv()).await }; match response_result { Ok(Some(_reply)) => { consecutive_failures = 0; info!( "[connection_manager] liveness check ok: type={} peer={}:{}", connection_type.as_str(), ip, port ); } _ => { let still_monitoring_same_stream = { let guard = CONNECTIONS.read().await; guard .as_ref() .and_then(|conn| conn.connection_map.get(&connection_key)) .map(|connection_info| { Arc::ptr_eq(&connection_info.stream, &stream) }) .unwrap_or(false) }; if !still_monitoring_same_stream { delete_entry(command_map.clone(), checkup_key).await; break; } consecutive_failures = consecutive_failures.saturating_add(1); warn!( "[connection_manager] liveness check failed: type={} peer={}:{} attempt={}/3", connection_type.as_str(), ip, port, consecutive_failures ); delete_entry(command_map.clone(), checkup_key).await; if consecutive_failures < 3 { continue; } // Three consecutive timed-out or missing replies drop // the connection, and outgoing peers trigger // replacement discovery. let mut guard = CONNECTIONS.write().await; if let Some(conn) = guard.as_mut() { let should_drop = conn .connection_map .get(&connection_key) .map(|connection_info| { Arc::ptr_eq(&connection_info.stream, &stream) }) .unwrap_or(false); if should_drop { conn.drop_connection(connection_type, ip.clone(), port); } } drop(guard); if connection_type == ConnectionType::Outgoing { spawn_retry_dropped_outgoing(ip.clone(), port); } break; } } } }); } // Count active incoming peer connections. pub fn count_incoming_connections(&self) -> usize { self.connection_map .values() .filter(|info| { ConnectionType::from_bytes(&info.connection_type) == Some(ConnectionType::Incoming) }) .count() } // Count active outgoing peer connections. pub fn count_outgoing_connections(&self) -> usize { self.connection_map .values() .filter(|info| { ConnectionType::from_bytes(&info.connection_type) == Some(ConnectionType::Outgoing) }) .count() } // Return all live peer streams so broadcast-style paths can fan out // messages without caring whether a peer was incoming or outgoing. pub fn get_all_streams(&self) -> Vec>> { self.connection_map .values() .filter(|connection_info| { ClientType::from_bytes(&connection_info.client_type) == Some(ClientType::Miner) }) .map(|connection_info| Arc::clone(&connection_info.stream)) .collect() } // Return all non-client peer streams so network-wide broadcasts can // reach every reachable chain peer. pub fn get_all_peer_streams(&self) -> Vec>> { self.connection_map .values() .filter(|connection_info| { ClientType::from_bytes(&connection_info.client_type) == Some(ClientType::Miner) }) .map(|connection_info| Arc::clone(&connection_info.stream)) .collect() } // Resolve a stored outgoing node connection back to its live stream. pub fn get_stream_for_outgoing(&self, ip: &str, port: u16) -> Option>> { let ip_bytes = ip_to_binary(ip); let connection_key = ConnectionKey { connection_type: ConnectionType::Outgoing.as_bytes(), ip: ip_bytes, port, }; self.connection_map .get(&connection_key) .filter(|info| ClientType::from_bytes(&info.client_type) == Some(ClientType::Miner)) .map(|info| Arc::clone(&info.stream)) } // Look up a live miner stream by the exact ip:port connection key. // Network-map records only store bare IPs, so they must not be used // to select an arbitrary live socket. pub async fn get_stream_from_memory(key: &str) -> Option>> { let (ip, port) = split_ip_port_key(key)?; let lock = CONNECTIONS.read().await; let conn = lock.as_ref()?; let ip_bytes = ip_to_binary(&ip); conn.connection_map .iter() .find_map(|(connection_key, info)| { if connection_key.ip == ip_bytes && connection_key.port == port && ClientType::from_bytes(&info.client_type) == Some(ClientType::Miner) { Some(Arc::clone(&info.stream)) } else { None } }) } pub async fn get_wallet_for_connection_key(key: &str) -> Option> { let (ip, port) = split_ip_port_key(key)?; let lock = CONNECTIONS.read().await; let conn = lock.as_ref()?; let ip_bytes = ip_to_binary(&ip); conn.connection_map .iter() .find_map(|(connection_key, info)| { if connection_key.ip == ip_bytes && connection_key.port == port && ClientType::from_bytes(&info.client_type) == Some(ClientType::Miner) { Some(info.wallet_public_key.clone()) } else { None } }) } // Build the serialized connection key for a live stream when only // the stream handle is known. pub fn connection_key_for_stream(&self, stream: &Arc>) -> Option { self.connection_map .iter() .find_map(|(connection_key, connection_info)| { if Arc::ptr_eq(&connection_info.stream, stream) { let ip = binary_to_ip(connection_key.ip.clone()); Some(format!("{}:{}", ip, connection_key.port)) } else { None } }) } // Find the first stored connection record for the requested IP. pub fn find_connection_info(&self, ip: &str) -> Option<(ConnectionType, u16)> { let ip_bytes = ip_to_binary(ip); for (key, _info) in self.connection_map.iter() { if key.ip == ip_bytes { let connection_type = ConnectionType::from_bytes(&key.connection_type)?; return Some((connection_type, key.port)); } } None } // Find a stored connection by IP, constrained to a specific client role. pub fn find_connection_info_by_client_type( &self, ip: &str, client_type: ClientType, ) -> Option<(ConnectionType, u16)> { let ip_bytes = ip_to_binary(ip); let client_type_bytes = client_type.as_bytes(); for (key, info) in self.connection_map.iter() { if key.ip == ip_bytes && info.client_type == client_type_bytes { let connection_type = ConnectionType::from_bytes(&key.connection_type)?; return Some((connection_type, key.port)); } } None } // Find the stored outgoing port for a peer IP so reconnect and // cleanup logic can target the correct connection entry. pub fn find_outgoing_port(&self, ip: &str) -> Option { let ip_bytes = ip_to_binary(ip); self.connection_map .iter() .find(|(key, _)| { key.connection_type == ConnectionType::Outgoing.as_bytes() && key.ip == ip_bytes }) .map(|(key, _)| key.port) } // Prefer a random incoming node connection, falling back to an // outgoing node connection when no incoming peer is available. pub fn get_random_connection(&self, excluded_key: Option<&str>) -> Option<(Vec, u16)> { let mut rng = thread_rng(); let excluded = excluded_key.and_then(split_ip_port_key); if let Some((key, _info)) = self .connection_map .iter() .filter(|(key, info)| { ConnectionType::from_bytes(&key.connection_type) == Some(ConnectionType::Incoming) && ClientType::from_bytes(&info.client_type) == Some(ClientType::Miner) && excluded .as_ref() .map(|(ip, _)| key.ip != ip_to_binary(ip)) .unwrap_or(true) }) .choose(&mut rng) { return Some((key.ip.clone(), key.port)); } if let Some((key, _info)) = self .connection_map .iter() .filter(|(key, info)| { ConnectionType::from_bytes(&key.connection_type) == Some(ConnectionType::Outgoing) && ClientType::from_bytes(&info.client_type) == Some(ClientType::Miner) && excluded .as_ref() .map(|(ip, _)| key.ip != ip_to_binary(ip)) .unwrap_or(true) }) .choose(&mut rng) { return Some((key.ip.clone(), key.port)); } None } } fn spawn_monitor_update(ip: String, action: u8, peer_public_key: Vec, port: u16) { tokio::spawn(async move { let context = { let guard = RECONNECT_CONTEXT.lock().await; guard.clone() }; let Some(context) = context else { return; }; let Some(monitored_address) = Wallet::public_key_bytes_to_short_address(&peer_public_key) else { return; }; let monitoring_address = context.wallet.saved.short_address.clone(); if monitored_address == monitoring_address { return; } let timestamp = crate::Utc::now().timestamp_millis() as u64; let signature = NodeInfo::monitor_signature( action, &monitored_address, &monitoring_address, &ip, timestamp, &context.wallet, ) .await; let edit = SignedMonitorEdit { action, monitored_address, monitoring_address, target_ip: ip.clone(), modified_timestamp: timestamp, modified_signature: signature, }; let params = MonitorAddressParams { map: context.map.clone(), edit, remote_ip: String::new(), db: context.db.clone(), wallet: context.wallet.clone(), connections_key: format!("{ip}:{port}"), }; let _ = if action == MONITOR_ACTION_ADD { NodeInfo::add_monitor(params).await } else { NodeInfo::remove_monitor(params).await }; }); } lazy_static! { pub static ref CONNECTIONS: Arc>> = Arc::new(RwLock::new(None)); } pub async fn initialize_connection() { // Lazily create the singleton connection manager the first time the // node starts accepting or opening peer connections. let mut connection_instance = CONNECTIONS.write().await; if connection_instance.is_none() { *connection_instance = Some(Connection::new()); } } pub async fn outgoing_connection_count() -> usize { // Read the singleton connection manager and count live outgoing peers. CONNECTIONS .read() .await .as_ref() .map(|connection| connection.count_outgoing_connections()) .unwrap_or(0) } pub async fn peer_connection_count() -> usize { // Mining only needs proof that this node is connected to the live // network; incoming and outgoing miner peers both satisfy that. CONNECTIONS .read() .await .as_ref() .map(|connection| { connection .connection_map .values() .filter(|info| ClientType::from_bytes(&info.client_type) == Some(ClientType::Miner)) .count() }) .unwrap_or(0) } pub async fn live_miner_peer_streams() -> Vec<(String, Arc>)> { // Snapshot consensus and recovery checks vote only across currently // connected miner peers, regardless of incoming/outgoing direction. CONNECTIONS .read() .await .as_ref() .map(|connection| { connection .connection_map .iter() .filter_map(|(key, info)| { if ClientType::from_bytes(&info.client_type) != Some(ClientType::Miner) { return None; } let ip = binary_to_ip(key.ip.clone()); let connections_key = format!("{}:{}", ip, key.port); Some((connections_key, Arc::clone(&info.stream))) }) .collect() }) .unwrap_or_default() } pub async fn get_client_type_from_memory(key: &str) -> Option { // Recover the stored client role from the serialized connection key // used throughout the RPC layer. let (ip, port) = split_ip_port_key(key)?; let ip_bytes = ip_to_binary(&ip); let guard = CONNECTIONS.read().await; let conn = guard.as_ref()?; for (connection_key, info) in conn.connection_map.iter() { if connection_key.ip == ip_bytes && connection_key.port == port { return ClientType::from_bytes(&info.client_type); } } None }