use crate::records::memory::connections::CONNECTIONS; use crate::records::memory::enums::{ClientType, ConnectionType}; use crate::records::memory::response_channels::Command; use crate::records::memory::structs::StoreConnectionParams; use crate::Arc; use crate::AsyncWriteExt; use crate::Mutex; use crate::TcpStream; fn split_ip_port_key(value: &str) -> Option<(String, u16)> { // Use rsplit_once so IPv4 host:port values work normally and IPv6 // bracketed addresses can keep their internal colons. let (ip_part, port_part) = value.rsplit_once(':')?; let ip = ip_part .strip_prefix('[') .and_then(|inner| inner.strip_suffix(']')) .unwrap_or(ip_part) .to_string(); let port = port_part.parse::().ok()?; Some((ip, port)) } // write an incoming connection to memory // client type defines if connection // is another node or a client connecting // to use rpc services pub async fn write_to_memory( received_ip_port: &str, stream: Arc>, client_type: &str, wallet_public_key: Vec, command_map: Arc>, ) -> String { // Reject unknown connection labels before the connection manager is // locked so invalid handshakes do not touch shared state. let Ok(client_type) = client_type.parse::() else { return "false".to_string(); }; let mut connection_instance = CONNECTIONS.write().await; if let Some(mut connection) = connection_instance.take() { // The connection manager stores IP and port separately because // connection direction is tracked in the key alongside them. let Some((ip, port)) = split_ip_port_key(received_ip_port) else { *connection_instance = Some(connection); return "false".to_string(); }; let added = connection.store_connection(StoreConnectionParams { connection_type: ConnectionType::Incoming, ip: ip.clone(), port, stream: stream.clone(), client_type, wallet_public_key, command_map, }); *connection_instance = Some(connection); if added { // Return the original ip:port string as the connection key // used by stream readers and cleanup paths. drop(connection_instance); received_ip_port.to_string() } else { // Duplicate streams are told why they are being closed before // the socket is shut down. drop(connection_instance); let duplicate_message = "The connection is already in the connection manager Please wait 10 minutes and try again"; let mut stream_guard = stream.lock().await; let _ = stream_guard.write_all(duplicate_message.as_bytes()).await; let _ = stream_guard.flush().await; let _ = stream_guard.shutdown().await; "false".to_string() } } else { "false".to_string() } } // delete a connection from memory pub async fn remove_key_from_memory(key: &str) { let mut connection_instance = CONNECTIONS.write().await; // A key can represent either direction, so cleanup removes both // incoming and outgoing entries with the same endpoint. let Some((ip, port)) = split_ip_port_key(key) else { return; }; if let Some(connection) = connection_instance.as_mut() { connection.drop_connection(ConnectionType::Incoming, ip.clone(), port); connection.drop_connection(ConnectionType::Outgoing, ip, port); } } pub async fn remove_stream_from_memory(stream: &Arc>) { let mut connection_instance = CONNECTIONS.write().await; let Some(connection) = connection_instance.as_mut() else { return; }; // Stream cleanup is used when only the socket handle is known, so // search the connection map for the matching Arc before dropping it. let matching_connection = connection .connection_map .iter() .find_map(|(connection_key, connection_info)| { let connection_type = ConnectionType::from_bytes(&connection_key.connection_type)?; if Arc::ptr_eq(&connection_info.stream, stream) { Some(( connection_type, connection_key.ip.clone(), connection_key.port, )) } else { None } }); if let Some((connection_type, ip_bytes, port)) = matching_connection { let ip = crate::common::binary_conversions::binary_to_ip(ip_bytes); connection.drop_connection(connection_type, ip, port); } }