2026-06-04 17:51:07 +00:00
|
|
|
use crate::records::memory::connections::{spawn_retry_dropped_outgoing, CONNECTIONS};
|
2026-05-24 17:56:57 +00:00
|
|
|
use crate::records::memory::enums::{ClientType, ConnectionType};
|
|
|
|
|
use crate::records::memory::response_channels::Command;
|
|
|
|
|
use crate::records::memory::structs::StoreConnectionParams;
|
|
|
|
|
use crate::Arc;
|
|
|
|
|
use crate::AsyncWriteExt;
|
|
|
|
|
use crate::Mutex;
|
|
|
|
|
use crate::TcpStream;
|
|
|
|
|
|
|
|
|
|
fn split_ip_port_key(value: &str) -> Option<(String, u16)> {
|
|
|
|
|
// Use rsplit_once so IPv4 host:port values work normally and IPv6
|
|
|
|
|
// bracketed addresses can keep their internal colons.
|
|
|
|
|
let (ip_part, port_part) = value.rsplit_once(':')?;
|
|
|
|
|
let ip = ip_part
|
|
|
|
|
.strip_prefix('[')
|
|
|
|
|
.and_then(|inner| inner.strip_suffix(']'))
|
|
|
|
|
.unwrap_or(ip_part)
|
|
|
|
|
.to_string();
|
|
|
|
|
let port = port_part.parse::<u16>().ok()?;
|
|
|
|
|
Some((ip, port))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// write an incoming connection to memory
|
|
|
|
|
// client type defines if connection
|
|
|
|
|
// is another node or a client connecting
|
|
|
|
|
// to use rpc services
|
|
|
|
|
pub async fn write_to_memory(
|
|
|
|
|
received_ip_port: &str,
|
|
|
|
|
stream: Arc<Mutex<TcpStream>>,
|
|
|
|
|
client_type: &str,
|
2026-06-11 21:06:02 +00:00
|
|
|
wallet_short_address: String,
|
2026-05-24 17:56:57 +00:00
|
|
|
command_map: Arc<Mutex<Command>>,
|
|
|
|
|
) -> String {
|
|
|
|
|
// Reject unknown connection labels before the connection manager is
|
|
|
|
|
// locked so invalid handshakes do not touch shared state.
|
|
|
|
|
let Ok(client_type) = client_type.parse::<ClientType>() else {
|
|
|
|
|
return "false".to_string();
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
let mut connection_instance = CONNECTIONS.write().await;
|
|
|
|
|
if let Some(mut connection) = connection_instance.take() {
|
|
|
|
|
// The connection manager stores IP and port separately because
|
|
|
|
|
// connection direction is tracked in the key alongside them.
|
|
|
|
|
let Some((ip, port)) = split_ip_port_key(received_ip_port) else {
|
|
|
|
|
*connection_instance = Some(connection);
|
|
|
|
|
return "false".to_string();
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
let added = connection.store_connection(StoreConnectionParams {
|
|
|
|
|
connection_type: ConnectionType::Incoming,
|
|
|
|
|
ip: ip.clone(),
|
|
|
|
|
port,
|
|
|
|
|
stream: stream.clone(),
|
|
|
|
|
client_type,
|
2026-06-11 21:06:02 +00:00
|
|
|
wallet_short_address,
|
2026-05-24 17:56:57 +00:00
|
|
|
command_map,
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
*connection_instance = Some(connection);
|
|
|
|
|
if added {
|
|
|
|
|
// Return the original ip:port string as the connection key
|
|
|
|
|
// used by stream readers and cleanup paths.
|
|
|
|
|
drop(connection_instance);
|
|
|
|
|
received_ip_port.to_string()
|
|
|
|
|
} else {
|
|
|
|
|
// Duplicate streams are told why they are being closed before
|
|
|
|
|
// the socket is shut down.
|
|
|
|
|
drop(connection_instance);
|
|
|
|
|
let duplicate_message =
|
|
|
|
|
"The connection is already in the connection manager Please wait 10 minutes and try again";
|
|
|
|
|
let mut stream_guard = stream.lock().await;
|
|
|
|
|
let _ = stream_guard.write_all(duplicate_message.as_bytes()).await;
|
|
|
|
|
let _ = stream_guard.flush().await;
|
|
|
|
|
let _ = stream_guard.shutdown().await;
|
|
|
|
|
"false".to_string()
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
"false".to_string()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// delete a connection from memory
|
|
|
|
|
pub async fn remove_key_from_memory(key: &str) {
|
|
|
|
|
let mut connection_instance = CONNECTIONS.write().await;
|
|
|
|
|
// A key can represent either direction, so cleanup removes both
|
|
|
|
|
// incoming and outgoing entries with the same endpoint.
|
|
|
|
|
let Some((ip, port)) = split_ip_port_key(key) else {
|
|
|
|
|
return;
|
|
|
|
|
};
|
|
|
|
|
if let Some(connection) = connection_instance.as_mut() {
|
|
|
|
|
connection.drop_connection(ConnectionType::Incoming, ip.clone(), port);
|
2026-06-04 17:51:07 +00:00
|
|
|
let dropped_outgoing =
|
|
|
|
|
connection.drop_connection(ConnectionType::Outgoing, ip.clone(), port);
|
|
|
|
|
if dropped_outgoing
|
|
|
|
|
.as_ref()
|
|
|
|
|
.and_then(|connection_info| ClientType::from_bytes(&connection_info.client_type))
|
|
|
|
|
== Some(ClientType::Miner)
|
|
|
|
|
{
|
|
|
|
|
spawn_retry_dropped_outgoing(ip, port);
|
|
|
|
|
}
|
2026-05-24 17:56:57 +00:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub async fn remove_stream_from_memory(stream: &Arc<Mutex<TcpStream>>) {
|
|
|
|
|
let mut connection_instance = CONNECTIONS.write().await;
|
|
|
|
|
let Some(connection) = connection_instance.as_mut() else {
|
|
|
|
|
return;
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
// Stream cleanup is used when only the socket handle is known, so
|
|
|
|
|
// search the connection map for the matching Arc before dropping it.
|
2026-05-26 06:24:57 +00:00
|
|
|
let matching_connection =
|
|
|
|
|
connection
|
|
|
|
|
.connection_map
|
|
|
|
|
.iter()
|
|
|
|
|
.find_map(|(connection_key, connection_info)| {
|
|
|
|
|
let connection_type = ConnectionType::from_bytes(&connection_key.connection_type)?;
|
|
|
|
|
if Arc::ptr_eq(&connection_info.stream, stream) {
|
|
|
|
|
Some((
|
|
|
|
|
connection_type,
|
|
|
|
|
connection_key.ip.clone(),
|
|
|
|
|
connection_key.port,
|
|
|
|
|
))
|
|
|
|
|
} else {
|
|
|
|
|
None
|
|
|
|
|
}
|
|
|
|
|
});
|
2026-05-24 17:56:57 +00:00
|
|
|
|
|
|
|
|
if let Some((connection_type, ip_bytes, port)) = matching_connection {
|
|
|
|
|
let ip = crate::common::binary_conversions::binary_to_ip(ip_bytes);
|
2026-06-04 17:51:07 +00:00
|
|
|
let dropped = connection.drop_connection(connection_type, ip.clone(), port);
|
|
|
|
|
if connection_type == ConnectionType::Outgoing
|
|
|
|
|
&& dropped
|
|
|
|
|
.as_ref()
|
|
|
|
|
.and_then(|connection_info| ClientType::from_bytes(&connection_info.client_type))
|
|
|
|
|
== Some(ClientType::Miner)
|
|
|
|
|
{
|
|
|
|
|
spawn_retry_dropped_outgoing(ip, port);
|
|
|
|
|
}
|
2026-05-24 17:56:57 +00:00
|
|
|
}
|
|
|
|
|
}
|