Contractless/src/rpc/server/connection_memory_manager.rs

142 lines
5.3 KiB
Rust
Raw Normal View History

2026-06-04 17:51:07 +00:00
use crate::records::memory::connections::{spawn_retry_dropped_outgoing, CONNECTIONS};
2026-05-24 17:56:57 +00:00
use crate::records::memory::enums::{ClientType, ConnectionType};
use crate::records::memory::response_channels::Command;
use crate::records::memory::structs::StoreConnectionParams;
use crate::Arc;
use crate::AsyncWriteExt;
use crate::Mutex;
use crate::TcpStream;
fn split_ip_port_key(value: &str) -> Option<(String, u16)> {
// Use rsplit_once so IPv4 host:port values work normally and IPv6
// bracketed addresses can keep their internal colons.
let (ip_part, port_part) = value.rsplit_once(':')?;
let ip = ip_part
.strip_prefix('[')
.and_then(|inner| inner.strip_suffix(']'))
.unwrap_or(ip_part)
.to_string();
let port = port_part.parse::<u16>().ok()?;
Some((ip, port))
}
// write an incoming connection to memory
// client type defines if connection
// is another node or a client connecting
// to use rpc services
pub async fn write_to_memory(
received_ip_port: &str,
stream: Arc<Mutex<TcpStream>>,
client_type: &str,
2026-06-04 16:18:55 +00:00
wallet_public_key: Vec<u8>,
2026-05-24 17:56:57 +00:00
command_map: Arc<Mutex<Command>>,
) -> String {
// Reject unknown connection labels before the connection manager is
// locked so invalid handshakes do not touch shared state.
let Ok(client_type) = client_type.parse::<ClientType>() else {
return "false".to_string();
};
let mut connection_instance = CONNECTIONS.write().await;
if let Some(mut connection) = connection_instance.take() {
// The connection manager stores IP and port separately because
// connection direction is tracked in the key alongside them.
let Some((ip, port)) = split_ip_port_key(received_ip_port) else {
*connection_instance = Some(connection);
return "false".to_string();
};
let added = connection.store_connection(StoreConnectionParams {
connection_type: ConnectionType::Incoming,
ip: ip.clone(),
port,
stream: stream.clone(),
client_type,
2026-06-04 16:18:55 +00:00
wallet_public_key,
2026-05-24 17:56:57 +00:00
command_map,
});
*connection_instance = Some(connection);
if added {
// Return the original ip:port string as the connection key
// used by stream readers and cleanup paths.
drop(connection_instance);
received_ip_port.to_string()
} else {
// Duplicate streams are told why they are being closed before
// the socket is shut down.
drop(connection_instance);
let duplicate_message =
"The connection is already in the connection manager Please wait 10 minutes and try again";
let mut stream_guard = stream.lock().await;
let _ = stream_guard.write_all(duplicate_message.as_bytes()).await;
let _ = stream_guard.flush().await;
let _ = stream_guard.shutdown().await;
"false".to_string()
}
} else {
"false".to_string()
}
}
// delete a connection from memory
pub async fn remove_key_from_memory(key: &str) {
let mut connection_instance = CONNECTIONS.write().await;
// A key can represent either direction, so cleanup removes both
// incoming and outgoing entries with the same endpoint.
let Some((ip, port)) = split_ip_port_key(key) else {
return;
};
if let Some(connection) = connection_instance.as_mut() {
connection.drop_connection(ConnectionType::Incoming, ip.clone(), port);
2026-06-04 17:51:07 +00:00
let dropped_outgoing =
connection.drop_connection(ConnectionType::Outgoing, ip.clone(), port);
if dropped_outgoing
.as_ref()
.and_then(|connection_info| ClientType::from_bytes(&connection_info.client_type))
== Some(ClientType::Miner)
{
spawn_retry_dropped_outgoing(ip, port);
}
2026-05-24 17:56:57 +00:00
}
}
pub async fn remove_stream_from_memory(stream: &Arc<Mutex<TcpStream>>) {
let mut connection_instance = CONNECTIONS.write().await;
let Some(connection) = connection_instance.as_mut() else {
return;
};
// Stream cleanup is used when only the socket handle is known, so
// search the connection map for the matching Arc before dropping it.
2026-05-26 06:24:57 +00:00
let matching_connection =
connection
.connection_map
.iter()
.find_map(|(connection_key, connection_info)| {
let connection_type = ConnectionType::from_bytes(&connection_key.connection_type)?;
if Arc::ptr_eq(&connection_info.stream, stream) {
Some((
connection_type,
connection_key.ip.clone(),
connection_key.port,
))
} else {
None
}
});
2026-05-24 17:56:57 +00:00
if let Some((connection_type, ip_bytes, port)) = matching_connection {
let ip = crate::common::binary_conversions::binary_to_ip(ip_bytes);
2026-06-04 17:51:07 +00:00
let dropped = connection.drop_connection(connection_type, ip.clone(), port);
if connection_type == ConnectionType::Outgoing
&& dropped
.as_ref()
.and_then(|connection_info| ClientType::from_bytes(&connection_info.client_type))
== Some(ClientType::Miner)
{
spawn_retry_dropped_outgoing(ip, port);
}
2026-05-24 17:56:57 +00:00
}
}