diff --git a/src/records/memory/connections.rs b/src/records/memory/connections.rs index c4516f3..e0bc567 100644 --- a/src/records/memory/connections.rs +++ b/src/records/memory/connections.rs @@ -182,6 +182,12 @@ async fn retry_dropped_outgoing(ip: String, port: u16) { finish_reconnect(); } +pub fn spawn_retry_dropped_outgoing(ip: String, port: u16) { + tokio::spawn(async move { + retry_dropped_outgoing(ip, port).await; + }); +} + pub fn spawn_reconnect_bootstrap(params: BootstrapParams) { if !try_start_reconnect() { warn!("[reconnect] bootstrap recovery already in progress, skipping duplicate request"); @@ -327,24 +333,24 @@ impl Connection { command_map: Arc>, ) { tokio::spawn(async move { + let connection_key = ConnectionKey { + connection_type: connection_type.as_bytes(), + ip: ip_to_binary(&ip), + port, + }; + let mut consecutive_failures = 0_u8; loop { sleep(Duration::from_secs(30)).await; - let still_registered = { + let still_monitoring_same_stream = { let guard = CONNECTIONS.read().await; guard .as_ref() - .map(|conn| { - let connection_key = ConnectionKey { - connection_type: connection_type.as_bytes(), - ip: ip_to_binary(&ip), - port, - }; - conn.connection_map.contains_key(&connection_key) - }) + .and_then(|conn| conn.connection_map.get(&connection_key)) + .map(|connection_info| Arc::ptr_eq(&connection_info.stream, &stream)) .unwrap_or(false) }; - if !still_registered { + if !still_monitoring_same_stream { break; } @@ -367,6 +373,7 @@ impl Connection { match response_result { Ok(Some(_reply)) => { + consecutive_failures = 0; info!( "[connection_manager] liveness check ok: type={} peer={}:{}", connection_type.as_str(), @@ -375,42 +382,55 @@ impl Connection { ); } _ => { - let still_registered = { + let still_monitoring_same_stream = { let guard = CONNECTIONS.read().await; guard .as_ref() - .map(|conn| { - let connection_key = ConnectionKey { - connection_type: connection_type.as_bytes(), - ip: ip_to_binary(&ip), - port, - }; - conn.connection_map.contains_key(&connection_key) + .and_then(|conn| conn.connection_map.get(&connection_key)) + .map(|connection_info| { + Arc::ptr_eq(&connection_info.stream, &stream) }) .unwrap_or(false) }; - if !still_registered { + if !still_monitoring_same_stream { delete_entry(command_map.clone(), checkup_key).await; break; } - // Timed-out or missing replies drop the connection, - // and outgoing peers trigger replacement discovery. + consecutive_failures = consecutive_failures.saturating_add(1); warn!( - "[connection_manager] liveness check failed: type={} peer={}:{}", + "[connection_manager] liveness check failed: type={} peer={}:{} attempt={}/3", connection_type.as_str(), ip, - port + port, + consecutive_failures ); delete_entry(command_map.clone(), checkup_key).await; + + if consecutive_failures < 3 { + continue; + } + + // Three consecutive timed-out or missing replies drop + // the connection, and outgoing peers trigger + // replacement discovery. let mut guard = CONNECTIONS.write().await; if let Some(conn) = guard.as_mut() { - conn.drop_connection(connection_type, ip.clone(), port); + let should_drop = conn + .connection_map + .get(&connection_key) + .map(|connection_info| { + Arc::ptr_eq(&connection_info.stream, &stream) + }) + .unwrap_or(false); + if should_drop { + conn.drop_connection(connection_type, ip.clone(), port); + } } drop(guard); if connection_type == ConnectionType::Outgoing { - retry_dropped_outgoing(ip.clone(), port).await; + spawn_retry_dropped_outgoing(ip.clone(), port); } break; } diff --git a/src/rpc/commands/wallet_register.rs b/src/rpc/commands/wallet_register.rs index 6fefb44..487e916 100644 --- a/src/rpc/commands/wallet_register.rs +++ b/src/rpc/commands/wallet_register.rs @@ -14,16 +14,16 @@ use crate::Duration; use crate::Mutex; async fn broadcast_wallet_registration( - short_address: &[u8], - public_key_bytes: &[u8], - signature: &str, + short_address: Vec, + public_key_bytes: Vec, + signature: String, map: Arc>, - remote_ip: &str, - connections_key: &str, + remote_ip: String, + connections_key: String, ) { // Registration broadcasts are forwarded to peers after local // acceptance so wallet lookups converge across connected nodes. - let signature_bytes = match decode(signature) { + let signature_bytes = match decode(&signature) { Ok(bytes) if bytes.len() == Wallet::SIGNATURE_LENGTH => bytes, _ => return, }; @@ -60,11 +60,11 @@ async fn broadcast_wallet_registration( // Wire layout: command, UID, short address, public key, signature. message.push(RPC_REGISTER_WALLET); message.extend_from_slice(&hashmap_key); - message.extend_from_slice(short_address); - message.extend_from_slice(public_key_bytes); + message.extend_from_slice(&short_address); + message.extend_from_slice(&public_key_bytes); message.extend_from_slice(&signature_bytes); - RpcResponse::send_raw(&unlocked_stream, Some(connections_key), &message).await; + RpcResponse::send_raw(&unlocked_stream, Some(&connections_key), &message).await; let response = { let mut rx = hashmap_rx.lock().await; @@ -108,7 +108,7 @@ pub async fn register( }; // The claimed short address must be the deterministic short address - // derived from the submitted long wallet address. + // derived from the submitted public key. if expected_short_address != short_address_bytes { return RpcResponse::Binary(b"0".to_vec()); } @@ -134,15 +134,14 @@ pub async fn register( match register_short_address(db, &short_address_bytes, &public_key_bytes) { Ok(WalletRegistrationResult::Inserted) => { - broadcast_wallet_registration( - &short_address_bytes, - &public_key_bytes, - &signature, + tokio::spawn(broadcast_wallet_registration( + short_address_bytes, + public_key_bytes, + signature, map, - &remote_ip, - &connections_key, - ) - .await; + remote_ip, + connections_key, + )); RpcResponse::Binary(b"1".to_vec()) } Ok(WalletRegistrationResult::AlreadyRegistered) => RpcResponse::Binary(b"1".to_vec()), diff --git a/src/rpc/server/connection_memory_manager.rs b/src/rpc/server/connection_memory_manager.rs index 8a54878..820caab 100644 --- a/src/rpc/server/connection_memory_manager.rs +++ b/src/rpc/server/connection_memory_manager.rs @@ -1,4 +1,4 @@ -use crate::records::memory::connections::CONNECTIONS; +use crate::records::memory::connections::{spawn_retry_dropped_outgoing, CONNECTIONS}; use crate::records::memory::enums::{ClientType, ConnectionType}; use crate::records::memory::response_channels::Command; use crate::records::memory::structs::StoreConnectionParams; @@ -89,7 +89,15 @@ pub async fn remove_key_from_memory(key: &str) { }; if let Some(connection) = connection_instance.as_mut() { connection.drop_connection(ConnectionType::Incoming, ip.clone(), port); - connection.drop_connection(ConnectionType::Outgoing, ip, port); + let dropped_outgoing = + connection.drop_connection(ConnectionType::Outgoing, ip.clone(), port); + if dropped_outgoing + .as_ref() + .and_then(|connection_info| ClientType::from_bytes(&connection_info.client_type)) + == Some(ClientType::Miner) + { + spawn_retry_dropped_outgoing(ip, port); + } } } @@ -120,6 +128,14 @@ pub async fn remove_stream_from_memory(stream: &Arc>) { if let Some((connection_type, ip_bytes, port)) = matching_connection { let ip = crate::common::binary_conversions::binary_to_ip(ip_bytes); - connection.drop_connection(connection_type, ip, port); + let dropped = connection.drop_connection(connection_type, ip.clone(), port); + if connection_type == ConnectionType::Outgoing + && dropped + .as_ref() + .and_then(|connection_info| ClientType::from_bytes(&connection_info.client_type)) + == Some(ClientType::Miner) + { + spawn_retry_dropped_outgoing(ip, port); + } } }