From 5c4c1baf7e062109532c0d24df9de9d9f9faa632 Mon Sep 17 00:00:00 2001 From: viraladmin <00purple@gmail.com> Date: Thu, 4 Jun 2026 15:22:07 -0600 Subject: [PATCH] node startup bug fixes --- src/records/ip_score/penalty.rs | 7 +- src/records/memory/connections.rs | 105 +++++++-- src/records/memory/network_mapping/add.rs | 64 +++--- src/records/memory/network_mapping/mod.rs | 5 +- src/records/memory/network_mapping/monitor.rs | 29 ++- src/records/memory/structs.rs | 6 + src/rpc/client/handshake_processing.rs | 62 ++++-- src/rpc/client/wallet_registry_sync.rs | 65 +++++- src/rpc/commands/wallet_register.rs | 19 +- src/rpc/server/handshake.rs | 53 +++-- src/startup/network_broadcast.rs | 199 ++++++++++-------- src/torrent/torrenting_system/get_nodes.rs | 5 +- 12 files changed, 411 insertions(+), 208 deletions(-) diff --git a/src/records/ip_score/penalty.rs b/src/records/ip_score/penalty.rs index a457972..3805c01 100644 --- a/src/records/ip_score/penalty.rs +++ b/src/records/ip_score/penalty.rs @@ -1,7 +1,7 @@ use crate::log::warn; use crate::records::ip_score::ban_management::{sign_ip_to_ban, spawn_unban}; -use crate::records::memory::connections::CONNECTIONS; -use crate::records::memory::enums::ClientType; +use crate::records::memory::connections::{spawn_retry_dropped_outgoing, CONNECTIONS}; +use crate::records::memory::enums::{ClientType, ConnectionType}; use crate::rpc::commands::block_peer_ip::block_peer; use crate::sled::Db; use crate::wallets::structures::Wallet; @@ -75,6 +75,9 @@ pub async fn issue_penalty( // Low-level penalties disconnect the peer but do not add a ban // record yet. conn.drop_connection(connection_type, ip.to_string(), port); + if client_type == ClientType::Miner && connection_type == ConnectionType::Outgoing { + spawn_retry_dropped_outgoing(ip.to_string(), port); + } return format!("IP {ip} dropped due to score {score}"); } } diff --git a/src/records/memory/connections.rs b/src/records/memory/connections.rs index e0bc567..54efacd 100644 --- a/src/records/memory/connections.rs +++ b/src/records/memory/connections.rs @@ -98,6 +98,9 @@ async fn reconnect_replacement_inner(excluded_ip: &str) { if ClientType::from_bytes(&info.client_type) != Some(ClientType::Miner) { return None; } + if !info.ready { + return None; + } let ip = binary_to_ip(key.ip.clone()); let connections_key = format!("{}:{}", ip, key.port); Some((connections_key, Arc::clone(&info.stream))) @@ -459,30 +462,87 @@ impl Connection { .count() } - // Return all live peer streams so broadcast-style paths can fan out - // messages without caring whether a peer was incoming or outgoing. + pub fn mark_wallet_registry_synced(&mut self, key: &str) -> bool { + let Some((ip, port)) = split_ip_port_key(key) else { + return false; + }; + let ip_bytes = ip_to_binary(&ip); + for (connection_key, info) in self.connection_map.iter_mut() { + if connection_key.ip == ip_bytes && connection_key.port == port { + info.wallet_registry_synced = true; + info.ready = info.network_map_synced; + return true; + } + } + false + } + + pub fn mark_network_map_synced(&mut self, key: &str) -> bool { + let Some((ip, port)) = split_ip_port_key(key) else { + return false; + }; + let ip_bytes = ip_to_binary(&ip); + for (connection_key, info) in self.connection_map.iter_mut() { + if connection_key.ip == ip_bytes && connection_key.port == port { + info.network_map_synced = true; + info.ready = info.wallet_registry_synced; + return true; + } + } + false + } + + pub fn count_ready_miner_connections(&self) -> usize { + self.connection_map + .values() + .filter(|info| { + ClientType::from_bytes(&info.client_type) == Some(ClientType::Miner) && info.ready + }) + .count() + } + + // Return ready peer streams so broadcast-style paths do not send + // network-wide traffic to peers still completing startup sync. pub fn get_all_streams(&self) -> Vec>> { self.connection_map .values() .filter(|connection_info| { ClientType::from_bytes(&connection_info.client_type) == Some(ClientType::Miner) + && connection_info.ready }) .map(|connection_info| Arc::clone(&connection_info.stream)) .collect() } - // Return all non-client peer streams so network-wide broadcasts can - // reach every reachable chain peer. + // Return ready non-client peer streams so registry rebroadcasts only fan + // out through initialized chain peers. pub fn get_all_peer_streams(&self) -> Vec>> { self.connection_map .values() .filter(|connection_info| { ClientType::from_bytes(&connection_info.client_type) == Some(ClientType::Miner) + && connection_info.ready }) .map(|connection_info| Arc::clone(&connection_info.stream)) .collect() } + pub fn get_all_ready_peer_streams_with_keys(&self) -> Vec<(String, Arc>)> { + self.connection_map + .iter() + .filter_map(|(key, connection_info)| { + if ClientType::from_bytes(&connection_info.client_type) != Some(ClientType::Miner) + || !connection_info.ready + { + return None; + } + let ip = binary_to_ip(key.ip.clone()); + let connections_key = format!("{}:{}", ip, key.port); + Some((connections_key, Arc::clone(&connection_info.stream))) + }) + .collect() + } + // Resolve a stored outgoing node connection back to its live stream. pub fn get_stream_for_outgoing(&self, ip: &str, port: u16) -> Option>> { let ip_bytes = ip_to_binary(ip); @@ -610,6 +670,7 @@ impl Connection { .filter(|(key, info)| { ConnectionType::from_bytes(&key.connection_type) == Some(ConnectionType::Incoming) && ClientType::from_bytes(&info.client_type) == Some(ClientType::Miner) + && info.ready && excluded .as_ref() .map(|(ip, _)| key.ip != ip_to_binary(ip)) @@ -626,6 +687,7 @@ impl Connection { .filter(|(key, info)| { ConnectionType::from_bytes(&key.connection_type) == Some(ConnectionType::Outgoing) && ClientType::from_bytes(&info.client_type) == Some(ClientType::Miner) + && info.ready && excluded .as_ref() .map(|(ip, _)| key.ip != ip_to_binary(ip)) @@ -714,22 +776,35 @@ pub async fn outgoing_connection_count() -> usize { } pub async fn peer_connection_count() -> usize { - // Mining only needs proof that this node is connected to the live - // network; incoming and outgoing miner peers both satisfy that. + // Mining needs at least one fully initialized miner peer. A raw socket + // is not enough because block validation depends on wallet registry and + // network-map state being synced first. CONNECTIONS .read() .await .as_ref() - .map(|connection| { - connection - .connection_map - .values() - .filter(|info| ClientType::from_bytes(&info.client_type) == Some(ClientType::Miner)) - .count() - }) + .map(|connection| connection.count_ready_miner_connections()) .unwrap_or(0) } +pub async fn mark_peer_wallet_registry_synced(key: &str) -> bool { + CONNECTIONS + .write() + .await + .as_mut() + .map(|connection| connection.mark_wallet_registry_synced(key)) + .unwrap_or(false) +} + +pub async fn mark_peer_network_map_synced(key: &str) -> bool { + CONNECTIONS + .write() + .await + .as_mut() + .map(|connection| connection.mark_network_map_synced(key)) + .unwrap_or(false) +} + pub async fn live_miner_peer_streams() -> Vec<(String, Arc>)> { // Snapshot consensus and recovery checks vote only across currently // connected miner peers, regardless of incoming/outgoing direction. @@ -742,7 +817,9 @@ pub async fn live_miner_peer_streams() -> Vec<(String, Arc>)> { .connection_map .iter() .filter_map(|(key, info)| { - if ClientType::from_bytes(&info.client_type) != Some(ClientType::Miner) { + if ClientType::from_bytes(&info.client_type) != Some(ClientType::Miner) + || !info.ready + { return None; } let ip = binary_to_ip(key.ip.clone()); diff --git a/src/records/memory/network_mapping/add.rs b/src/records/memory/network_mapping/add.rs index 05f8d91..5c88198 100644 --- a/src/records/memory/network_mapping/add.rs +++ b/src/records/memory/network_mapping/add.rs @@ -6,7 +6,7 @@ impl NodeInfo { edit: &SignedNodeEdit, remote_ip: &str, edittype: NodeEditType, - connections_key: &str, + _connections_key: &str, ) { // Re-broadcast signed node-map edits to connected peers while // skipping the source peer that already sent the update. @@ -28,13 +28,16 @@ impl NodeInfo { }; let modified_timestamp_bytes = edit.modified_timestamp.to_le_bytes(); let modified_signature_bytes = decode(&edit.modified_signature).unwrap(); - let connections_lock = CONNECTIONS.read().await; - let streams: Option>>> = connections_lock - .as_ref() - .map(|connection| connection.get_all_streams()); + let streams = { + let connections_lock = CONNECTIONS.read().await; + connections_lock + .as_ref() + .map(|connection| connection.get_all_ready_peer_streams_with_keys()) + .unwrap_or_default() + }; match streams { - Some(streams) => { - for unlocked_stream in streams { + streams if !streams.is_empty() => { + for (peer_key, unlocked_stream) in streams { let (hashmap_key, _hashmap_tx, hashmap_rx) = reserve_entry(map.clone()).await; let mut message: Vec = Vec::new(); message.push(message_type); @@ -44,31 +47,18 @@ impl NodeInfo { message.extend_from_slice(&modified_by_bytes); message.extend_from_slice(&modified_timestamp_bytes); message.extend_from_slice(&modified_signature_bytes); - let peer_addr = { - let stream = unlocked_stream.lock().await; - stream.peer_addr() + let Some((peer_ip, _)) = peer_key.rsplit_once(':') else { + continue; }; - match peer_addr { - Ok(addr) => { - if (addr.ip().to_string() != *remote_ip) && !remote_ip.is_empty() { - RpcResponse::send_raw( - &unlocked_stream, - Some(connections_key), - &message, - ) - .await; - let mut rx = hashmap_rx.lock().await; - let _ = rx.recv().await; - } - } - Err(e) => { - error!("broadcast node error {e}"); - continue; - } + if !remote_ip.is_empty() && peer_ip == remote_ip { + continue; } + RpcResponse::send_raw(&unlocked_stream, Some(&peer_key), &message).await; + let mut rx = hashmap_rx.lock().await; + let _ = timeout(Duration::from_secs(5), rx.recv()).await; } } - None => { + _ => { warn!("No active connections found."); } } @@ -236,14 +226,16 @@ impl NodeInfo { return RpcResponse::Binary(b"Error: Ip Already exists.".to_vec()); } - Self::broadcast_node( - map.clone(), - &edit, - &remote_ip, - NodeEditType::Add, - &connections_key, - ) - .await; + if !remote_ip.is_empty() { + Self::broadcast_node( + map.clone(), + &edit, + &remote_ip, + NodeEditType::Add, + &connections_key, + ) + .await; + } RpcResponse::Binary(b"Success".to_vec()) } diff --git a/src/records/memory/network_mapping/mod.rs b/src/records/memory/network_mapping/mod.rs index c55ed21..81a639d 100644 --- a/src/records/memory/network_mapping/mod.rs +++ b/src/records/memory/network_mapping/mod.rs @@ -4,7 +4,7 @@ use crate::common::skein::skein_256_hash_data; use crate::common::types::GENESIS_IP; use crate::decode; use crate::lazy_static; -use crate::log::{error, info, warn}; +use crate::log::{info, warn}; use crate::records::block_height::get_block_height::get_height; use crate::records::ip_score::enums::InfractionType; use crate::records::ip_score::score::update_ip_score; @@ -17,11 +17,12 @@ use crate::records::memory::network_mapping::structs::{ use crate::records::memory::response_channels::{reserve_entry, Command}; use crate::rpc::responses::RpcResponse; use crate::sled::Db; +use crate::timeout; use crate::wallets::structures::Wallet; use crate::Arc; +use crate::Duration; use crate::HashMap; use crate::Mutex; -use crate::TcpStream; use crate::Utc; lazy_static! { diff --git a/src/records/memory/network_mapping/monitor.rs b/src/records/memory/network_mapping/monitor.rs index 88dfddc..e7b1a24 100644 --- a/src/records/memory/network_mapping/monitor.rs +++ b/src/records/memory/network_mapping/monitor.rs @@ -26,7 +26,7 @@ impl NodeInfo { edit: &SignedMonitorEdit, remote_ip: &str, edittype: NodeEditType, - connections_key: &str, + _connections_key: &str, ) { let monitored_bytes = match Wallet::short_address_to_bytes(&edit.monitored_address) { Some(bytes) => bytes, @@ -43,21 +43,20 @@ impl NodeInfo { Err(_) => return, }; - let connections_lock = CONNECTIONS.read().await; - let streams = connections_lock - .as_ref() - .map(|connection| connection.get_all_streams()); + let streams = { + let connections_lock = CONNECTIONS.read().await; + connections_lock + .as_ref() + .map(|connection| connection.get_all_ready_peer_streams_with_keys()) + .unwrap_or_default() + }; - if let Some(streams) = streams { - for unlocked_stream in streams { - let peer_addr = { - let stream = unlocked_stream.lock().await; - stream.peer_addr() - }; - let Ok(addr) = peer_addr else { + if !streams.is_empty() { + for (peer_key, unlocked_stream) in streams { + let Some((peer_ip, _)) = peer_key.rsplit_once(':') else { continue; }; - if !remote_ip.is_empty() && addr.ip().to_string() == remote_ip { + if !remote_ip.is_empty() && peer_ip == remote_ip { continue; } @@ -71,9 +70,9 @@ impl NodeInfo { message.extend_from_slice(&target_ip_bytes); message.extend_from_slice(×tamp_bytes); message.extend_from_slice(&signature_bytes); - RpcResponse::send_raw(&unlocked_stream, Some(connections_key), &message).await; + RpcResponse::send_raw(&unlocked_stream, Some(&peer_key), &message).await; let mut rx = hashmap_rx.lock().await; - let _ = rx.recv().await; + let _ = timeout(Duration::from_secs(5), rx.recv()).await; } } } diff --git a/src/records/memory/structs.rs b/src/records/memory/structs.rs index 3e7f39d..1ca6026 100644 --- a/src/records/memory/structs.rs +++ b/src/records/memory/structs.rs @@ -16,6 +16,9 @@ pub struct ConnectionInfo { pub stream: Arc>, pub client_type: Vec, pub wallet_public_key: Vec, + pub wallet_registry_synced: bool, + pub network_map_synced: bool, + pub ready: bool, } // ConnectionKey is the stable lookup key used inside the in-memory connection map. @@ -208,6 +211,9 @@ impl ConnectionInfo { stream, client_type, wallet_public_key, + wallet_registry_synced: false, + network_map_synced: false, + ready: false, } } } diff --git a/src/rpc/client/handshake_processing.rs b/src/rpc/client/handshake_processing.rs index 8bdf696..bc58625 100644 --- a/src/rpc/client/handshake_processing.rs +++ b/src/rpc/client/handshake_processing.rs @@ -14,16 +14,19 @@ use crate::orphans::structs::OrphanCheckup2; use crate::orphans::sync_check::sync_checkup; use crate::orphans::torrent_candidates::hydrate_torrent_candidates; use crate::records::block_height::get_block_height::get_height; -use crate::records::memory::connections::{set_reconnect_context, CONNECTIONS}; +use crate::records::memory::connections::{ + mark_peer_network_map_synced, mark_peer_wallet_registry_synced, set_reconnect_context, + CONNECTIONS, +}; use crate::records::memory::enums::{ClientType, ConnectionType}; use crate::records::memory::network_mapping::NodeInfo; use crate::records::memory::response_channels::{reserve_entry, Command}; use crate::records::memory::structs::{Connection, StoreConnectionParams}; +use crate::records::wallet_registry::{register_short_address, WalletRegistrationResult}; use crate::rpc::client::handshake::connect_and_handshake; -use crate::rpc::client::register_wallet::register_connected_wallet; use crate::rpc::client::structs::{Connect, Handshake}; use crate::rpc::client::syncing::node_syncing; -use crate::rpc::client::wallet_registry_sync::sync_wallet_registry; +use crate::rpc::client::wallet_registry_sync::sync_wallet_registry_with_retries; use crate::rpc::command_maps::RPC_RANDOM_NODE; use crate::rpc::handshake_constants::{ HANDSHAKE_ADDRESS_OFFSET, HANDSHAKE_MESSAGE_BYTES, HANDSHAKE_RESPONSE_BYTES, @@ -270,6 +273,32 @@ pub async fn process_handshake_response( )); } + let Some(returned_short_address_bytes) = + Wallet::public_key_bytes_to_short_address_bytes(returned_public_key_bin) + else { + return Err(io::Error::other( + "Handshake failed: invalid response public key", + )); + }; + + match register_short_address( + ¶ms.db, + &returned_short_address_bytes, + returned_public_key_bin, + ) { + Ok(WalletRegistrationResult::Inserted | WalletRegistrationResult::AlreadyRegistered) => {} + Ok(WalletRegistrationResult::Conflict) => { + return Err(io::Error::other( + "Handshake failed: peer public key conflicts with wallet registry", + )); + } + Err(err) => { + return Err(io::Error::other(format!( + "Handshake failed: could not register peer public key: {err}" + ))); + } + } + // create and arc mutex of the stream let stream = Arc::new(Mutex::new(params.stream)); let connections_key = params.addr.clone(); @@ -305,27 +334,20 @@ pub async fn process_handshake_response( )); let broadcast_stream = Arc::clone(&stream); - if let Err(err) = register_connected_wallet( - Arc::clone(&stream), - params.map.clone(), - connections_key.clone(), - wallet, - ) - .await - { - warn!("[wallet_registry] peer registration failed during handshake: {err}"); - } - - if let Err(err) = sync_wallet_registry( + if let Err(err) = sync_wallet_registry_with_retries( Arc::clone(&stream), ¶ms.db, params.map.clone(), connections_key.clone(), + "outgoing peer", ) .await { - warn!("[wallet_registry] peer sync failed during handshake: {err}"); + return Err(io::Error::other(format!( + "Wallet registry sync failed after handshake: {err}" + ))); } + mark_peer_wallet_registry_synced(&connections_key).await; if params.first { announce_self_to_network( @@ -336,7 +358,9 @@ pub async fn process_handshake_response( params.wallet.clone(), &connections_key, ) - .await; + .await + .map_err(|err| io::Error::other(format!("Network map sync failed: {err}")))?; + mark_peer_network_map_synced(&connections_key).await; if let Err(err) = NodeInfo::rebuild_mined_counts_from_chain(¶ms.db).await { error!("[startup] failed to rebuild mined counts from local chain: {err}"); } @@ -359,7 +383,9 @@ pub async fn process_handshake_response( params.wallet.clone(), &connections_key, ) - .await; + .await + .map_err(|err| io::Error::other(format!("Network map sync failed: {err}")))?; + mark_peer_network_map_synced(&connections_key).await; } Ok(()) } diff --git a/src/rpc/client/wallet_registry_sync.rs b/src/rpc/client/wallet_registry_sync.rs index cbfee32..03963ac 100644 --- a/src/rpc/client/wallet_registry_sync.rs +++ b/src/rpc/client/wallet_registry_sync.rs @@ -1,10 +1,11 @@ -use crate::log::warn; +use crate::log::{info, warn}; use crate::records::memory::response_channels::{reserve_entry, Command}; use crate::records::wallet_registry::{register_short_address, WalletRegistrationResult}; use crate::rpc::command_maps::RPC_WALLET_REGISTRY_SYNC; use crate::rpc::commands::wallet_registry_sync::WALLET_REGISTRY_RECORD_BYTES; use crate::rpc::responses::RpcResponse; use crate::sled::Db; +use crate::sleep; use crate::timeout; use crate::wallets::structures::Wallet; use crate::Arc; @@ -54,3 +55,65 @@ pub async fn sync_wallet_registry( Ok(()) } + +pub fn spawn_wallet_registry_sync( + stream: Arc>, + db: Db, + map: Arc>, + connections_key: String, + label: &'static str, +) { + tokio::spawn(async move { + for attempt in 1..=3 { + match sync_wallet_registry(stream.clone(), &db, map.clone(), connections_key.clone()) + .await + { + Ok(()) => { + info!( + "[wallet_registry] {label} sync completed: peer={connections_key} attempt={attempt}" + ); + return; + } + Err(err) => { + warn!( + "[wallet_registry] {label} sync failed: peer={connections_key} attempt={attempt}/3 err={err}" + ); + if attempt < 3 { + sleep(Duration::from_secs(5)).await; + } + } + } + } + }); +} + +pub async fn sync_wallet_registry_with_retries( + stream: Arc>, + db: &Db, + map: Arc>, + connections_key: String, + label: &'static str, +) -> Result<(), String> { + let mut last_error = String::new(); + for attempt in 1..=3 { + match sync_wallet_registry(stream.clone(), db, map.clone(), connections_key.clone()).await { + Ok(()) => { + info!( + "[wallet_registry] {label} sync completed: peer={connections_key} attempt={attempt}" + ); + return Ok(()); + } + Err(err) => { + last_error = err; + warn!( + "[wallet_registry] {label} sync failed: peer={connections_key} attempt={attempt}/3 err={last_error}" + ); + if attempt < 3 { + sleep(Duration::from_secs(5)).await; + } + } + } + } + + Err(last_error) +} diff --git a/src/rpc/commands/wallet_register.rs b/src/rpc/commands/wallet_register.rs index 487e916..2763346 100644 --- a/src/rpc/commands/wallet_register.rs +++ b/src/rpc/commands/wallet_register.rs @@ -19,7 +19,7 @@ async fn broadcast_wallet_registration( signature: String, map: Arc>, remote_ip: String, - connections_key: String, + _connections_key: String, ) { // Registration broadcasts are forwarded to peers after local // acceptance so wallet lookups converge across connected nodes. @@ -32,20 +32,13 @@ async fn broadcast_wallet_registration( let connections_lock = CONNECTIONS.read().await; connections_lock .as_ref() - .map(|connection| connection.get_all_peer_streams()) + .map(|connection| connection.get_all_ready_peer_streams_with_keys()) .unwrap_or_default() }; - for unlocked_stream in streams { - let peer_addr = { - let stream = unlocked_stream.lock().await; - stream.peer_addr() - }; - - if let Ok(addr) = peer_addr { - if !remote_ip.is_empty() && addr.ip().to_string() == remote_ip { - // Do not immediately echo a registration back to the peer - // that sent it to us. + for (peer_key, unlocked_stream) in streams { + if let Some((peer_ip, _)) = peer_key.rsplit_once(':') { + if !remote_ip.is_empty() && peer_ip == remote_ip { continue; } } @@ -64,7 +57,7 @@ async fn broadcast_wallet_registration( message.extend_from_slice(&public_key_bytes); message.extend_from_slice(&signature_bytes); - RpcResponse::send_raw(&unlocked_stream, Some(&connections_key), &message).await; + RpcResponse::send_raw(&unlocked_stream, Some(&peer_key), &message).await; let response = { let mut rx = hashmap_rx.lock().await; diff --git a/src/rpc/server/handshake.rs b/src/rpc/server/handshake.rs index de3d84b..5fd394b 100644 --- a/src/rpc/server/handshake.rs +++ b/src/rpc/server/handshake.rs @@ -1,9 +1,12 @@ -use crate::log::{error, warn}; +use crate::log::error; +use crate::records::memory::connections::{ + mark_peer_network_map_synced, mark_peer_wallet_registry_synced, +}; use crate::records::memory::network_mapping::NodeInfo; use crate::records::memory::response_channels::generate_uid; use crate::records::memory::response_channels::Command; -use crate::rpc::client::register_wallet::register_connected_wallet; -use crate::rpc::client::wallet_registry_sync::sync_wallet_registry; +use crate::records::wallet_registry::{register_short_address, WalletRegistrationResult}; +use crate::rpc::client::wallet_registry_sync::sync_wallet_registry_with_retries; use crate::rpc::responses::RpcResponse; use crate::rpc::server::connection_memory_manager::write_to_memory; use crate::rpc::server::handshake_processing::{combine_and_send_data, parse_received_data}; @@ -46,25 +49,28 @@ async fn complete_incoming_miner_setup( ) { // Incoming miner handshakes need the same post-handshake state exchange // as outgoing handshakes so a bootstrap with only incoming peers can mine. - if let Err(err) = register_connected_wallet( + if let Err(err) = sync_wallet_registry_with_retries( stream.clone(), + db, map.clone(), connections_key.to_string(), - &wallet, + "incoming peer", ) .await { - warn!("[wallet_registry] incoming peer registration failed after handshake: {err}"); - } - - if let Err(err) = - sync_wallet_registry(stream.clone(), db, map.clone(), connections_key.to_string()).await - { - warn!("[wallet_registry] incoming peer sync failed after handshake: {err}"); + error!("[startup] incoming peer wallet registry sync failed: {err}"); + return; } + mark_peer_wallet_registry_synced(connections_key).await; let short_address = wallet.saved.short_address.clone(); - announce_self_to_network(stream, &short_address, map, db, wallet, connections_key).await; + if let Err(err) = + announce_self_to_network(stream, &short_address, map, db, wallet, connections_key).await + { + error!("[startup] incoming peer network map sync failed: {err}"); + return; + } + mark_peer_network_map_synced(connections_key).await; if let Err(err) = NodeInfo::rebuild_mined_counts_from_chain(db).await { error!("[startup] failed to rebuild mined counts after incoming handshake: {err}"); @@ -185,6 +191,27 @@ pub async fn handle_handshake( } }; + let Some(received_short_address_bytes) = + Wallet::public_key_bytes_to_short_address_bytes(&received_public_key_bytes) + else { + drop_failed_handshake(&stream).await; + return; + }; + + match register_short_address( + &db, + &received_short_address_bytes, + &received_public_key_bytes, + ) { + Ok( + WalletRegistrationResult::Inserted | WalletRegistrationResult::AlreadyRegistered, + ) => {} + Ok(WalletRegistrationResult::Conflict) | Err(_) => { + drop_failed_handshake(&stream).await; + return; + } + } + // write to memory let connections_key = write_to_memory( &received_ip, diff --git a/src/startup/network_broadcast.rs b/src/startup/network_broadcast.rs index f1616d6..9579af1 100644 --- a/src/startup/network_broadcast.rs +++ b/src/startup/network_broadcast.rs @@ -13,8 +13,10 @@ use crate::records::memory::structs::Connection; use crate::rpc::command_maps::{RPC_ADD_NETWORK_NODE, RPC_REQUEST_NODE_LIST}; use crate::rpc::responses::RpcResponse; use crate::sled::Db; +use crate::timeout; use crate::wallets::structures::Wallet; use crate::Arc; +use crate::Duration; use crate::Mutex; use crate::TcpStream; use crate::Utc; @@ -75,7 +77,7 @@ pub async fn announce_self_to_network( db: &Db, wallet: Arc, connections_key: &str, -) { +) -> Result<(), String> { // announce the local node to the connected peer, then // request its current network mapping on success let message_type = RPC_ADD_NETWORK_NODE; @@ -88,7 +90,7 @@ pub async fn announce_self_to_network( let ip_bytes = ip_to_binary(&ip); let address_bytes = match Wallet::short_address_to_bytes(address) { Some(bytes) => bytes, - None => return, + None => return Err("local node address was invalid".to_string()), }; let modified_by_bytes = vec![0u8; Wallet::SHORT_ADDRESS_BYTES_LENGTH]; let time = Utc::now().timestamp_millis() as u64; @@ -119,19 +121,25 @@ pub async fn announce_self_to_network( let mut rx = hashmap_rx.lock().await; // Handle the received data - if let Some(buffer) = rx.recv().await { - if binary_to_string(buffer.clone()) == "Success" { - get_network_mapping( - unlocked_stream, - command_map.clone(), - db, - wallet.clone(), - connections_key, - ) - .await; - record_local_monitor_for_peer(connections_key, command_map, db, wallet).await; - } + let buffer = timeout(Duration::from_secs(30), rx.recv()) + .await + .map_err(|_| "timed out waiting for network self-announcement response".to_string())? + .ok_or_else(|| "network self-announcement response channel closed".to_string())?; + + if binary_to_string(buffer.clone()) != "Success" { + return Err("network self-announcement was rejected".to_string()); } + + get_network_mapping( + unlocked_stream, + command_map.clone(), + db, + wallet.clone(), + connections_key, + ) + .await?; + record_local_monitor_for_peer(connections_key, command_map, db, wallet).await; + Ok(()) } pub async fn get_network_mapping( @@ -140,7 +148,7 @@ pub async fn get_network_mapping( db: &Db, wallet: Arc, connections_key: &str, -) { +) -> Result<(), String> { // request the remote peer's serialized node list and // import each advertised add/delete record locally let message_type = RPC_REQUEST_NODE_LIST; @@ -156,87 +164,92 @@ pub async fn get_network_mapping( RpcResponse::send_raw(&unlocked_stream, Some(connections_key), &message).await; let mut rx = download_hashmap_rx.lock().await; - if let Some(mut buffer) = rx.recv().await { - while buffer.len() >= NODE_RECORD_FIXED_BYTES { - let monitor_count = u16::from_le_bytes( - buffer[NODE_MONITOR_COUNT_OFFSET..NODE_RECORD_FIXED_BYTES] - .try_into() - .unwrap(), - ) as usize; - let record_bytes = - NODE_RECORD_FIXED_BYTES + (monitor_count * Wallet::SHORT_ADDRESS_BYTES_LENGTH); - if buffer.len() < record_bytes { - break; - } - let chunk: Vec = buffer.drain(0..record_bytes).collect(); - // The first part of each record describes the advertised node address and IP. - let Some(address) = Wallet::bytes_to_short_address(&chunk[0..NODE_IP_OFFSET]) else { - continue; - }; - let ip = binary_to_ip(chunk[NODE_IP_OFFSET..NODE_BLOCKS_MINED_OFFSET].to_vec()); - let _advertised_blocks_mined = chunk[NODE_BLOCKS_MINED_OFFSET]; - let blocks_mined = 0_u8; - let added_by_bytes = &chunk[NODE_ADDED_BY_OFFSET..NODE_ADDED_TIMESTAMP_OFFSET]; - let added_by = if added_by_bytes.iter().all(|&byte| byte == 0) { - String::new() - } else { - Wallet::bytes_to_short_address(added_by_bytes).unwrap_or_default() - }; - let added_timestamp = u64::from_le_bytes( - chunk[NODE_ADDED_TIMESTAMP_OFFSET..NODE_ADDED_SIGNATURE_OFFSET] - .try_into() - .unwrap(), - ); - let added_signature = - crate::encode(&chunk[NODE_ADDED_SIGNATURE_OFFSET..NODE_DELETED_TIMESTAMP_OFFSET]); - let deleted_timestamp = u64::from_le_bytes( - chunk[NODE_DELETED_TIMESTAMP_OFFSET..NODE_DELETED_BLOCK_OFFSET] - .try_into() - .unwrap(), - ); - let deleted_block = u32::from_le_bytes( - chunk[NODE_DELETED_BLOCK_OFFSET..NODE_MONITOR_COUNT_OFFSET] - .try_into() - .unwrap(), - ); - let remote_ip = ""; - // Add records are imported through NodeInfo so local validation/signing rules stay central. - NodeInfo::add_address(AddAddressParams { - map: command_map.clone(), - edit: SignedNodeEdit { - address: address.clone(), - ip: ip.clone(), - modified_by: added_by, - modified_timestamp: added_timestamp, - modified_signature: added_signature, - }, - blocks_mined, - remote_ip: remote_ip.to_string(), - db: db.clone(), - wallet: wallet.clone(), - connections_key: connections_key.to_string(), - }) - .await; + let mut buffer = timeout(Duration::from_secs(30), rx.recv()) + .await + .map_err(|_| "timed out waiting for network mapping response".to_string())? + .ok_or_else(|| "network mapping response channel closed".to_string())?; - let mut monitors = Vec::with_capacity(monitor_count); - for monitor_index in 0..monitor_count { - let start = - NODE_RECORD_FIXED_BYTES + monitor_index * Wallet::SHORT_ADDRESS_BYTES_LENGTH; - let end = start + Wallet::SHORT_ADDRESS_BYTES_LENGTH; - if let Some(monitor) = Wallet::bytes_to_short_address(&chunk[start..end]) { - monitors.push(monitor); - } - } - NodeInfo::set_monitors_from_mapping(&address, monitors).await; + while buffer.len() >= NODE_RECORD_FIXED_BYTES { + let monitor_count = u16::from_le_bytes( + buffer[NODE_MONITOR_COUNT_OFFSET..NODE_RECORD_FIXED_BYTES] + .try_into() + .unwrap(), + ) as usize; + let record_bytes = + NODE_RECORD_FIXED_BYTES + (monitor_count * Wallet::SHORT_ADDRESS_BYTES_LENGTH); + if buffer.len() < record_bytes { + return Err("network mapping response ended mid-record".to_string()); + } + let chunk: Vec = buffer.drain(0..record_bytes).collect(); + // The first part of each record describes the advertised node address and IP. + let Some(address) = Wallet::bytes_to_short_address(&chunk[0..NODE_IP_OFFSET]) else { + continue; + }; + let ip = binary_to_ip(chunk[NODE_IP_OFFSET..NODE_BLOCKS_MINED_OFFSET].to_vec()); + let _advertised_blocks_mined = chunk[NODE_BLOCKS_MINED_OFFSET]; + let blocks_mined = 0_u8; + let added_by_bytes = &chunk[NODE_ADDED_BY_OFFSET..NODE_ADDED_TIMESTAMP_OFFSET]; + let added_by = if added_by_bytes.iter().all(|&byte| byte == 0) { + String::new() + } else { + Wallet::bytes_to_short_address(added_by_bytes).unwrap_or_default() + }; + let added_timestamp = u64::from_le_bytes( + chunk[NODE_ADDED_TIMESTAMP_OFFSET..NODE_ADDED_SIGNATURE_OFFSET] + .try_into() + .unwrap(), + ); + let added_signature = + crate::encode(&chunk[NODE_ADDED_SIGNATURE_OFFSET..NODE_DELETED_TIMESTAMP_OFFSET]); + let deleted_timestamp = u64::from_le_bytes( + chunk[NODE_DELETED_TIMESTAMP_OFFSET..NODE_DELETED_BLOCK_OFFSET] + .try_into() + .unwrap(), + ); + let deleted_block = u32::from_le_bytes( + chunk[NODE_DELETED_BLOCK_OFFSET..NODE_MONITOR_COUNT_OFFSET] + .try_into() + .unwrap(), + ); + let remote_ip = ""; + // Add records are imported through NodeInfo so local validation/signing rules stay central. + NodeInfo::add_address(AddAddressParams { + map: command_map.clone(), + edit: SignedNodeEdit { + address: address.clone(), + ip: ip.clone(), + modified_by: added_by, + modified_timestamp: added_timestamp, + modified_signature: added_signature, + }, + blocks_mined, + remote_ip: remote_ip.to_string(), + db: db.clone(), + wallet: wallet.clone(), + connections_key: connections_key.to_string(), + }) + .await; - if deleted_timestamp > 0 { - NodeInfo::set_deleted_metadata_from_mapping( - &address, - deleted_timestamp, - deleted_block, - ) - .await; + let mut monitors = Vec::with_capacity(monitor_count); + for monitor_index in 0..monitor_count { + let start = + NODE_RECORD_FIXED_BYTES + monitor_index * Wallet::SHORT_ADDRESS_BYTES_LENGTH; + let end = start + Wallet::SHORT_ADDRESS_BYTES_LENGTH; + if let Some(monitor) = Wallet::bytes_to_short_address(&chunk[start..end]) { + monitors.push(monitor); } } + NodeInfo::set_monitors_from_mapping(&address, monitors).await; + + if deleted_timestamp > 0 { + NodeInfo::set_deleted_metadata_from_mapping(&address, deleted_timestamp, deleted_block) + .await; + } } + + if !buffer.is_empty() { + return Err("network mapping response had trailing partial bytes".to_string()); + } + + Ok(()) } diff --git a/src/torrent/torrenting_system/get_nodes.rs b/src/torrent/torrenting_system/get_nodes.rs index c5963cf..981a7f9 100644 --- a/src/torrent/torrenting_system/get_nodes.rs +++ b/src/torrent/torrenting_system/get_nodes.rs @@ -11,10 +11,13 @@ pub async fn get_nodes_from_memory() -> Vec<(String, Arc>)> { let mut nodes = Vec::new(); if let Some(connection) = &*connection_storage { for connection_info in connection.connection_map.values() { - // Only miner peers are expected to serve block pieces. + // Only fully initialized miner peers are expected to serve block pieces. if ClientType::from_bytes(&connection_info.client_type) != Some(ClientType::Miner) { continue; } + if !connection_info.ready { + continue; + } // Use ip:port as the scheduler key and clone the shared stream handle for requests. let ip = binary_to_ip(connection_info.ip.clone()); let port = connection_info.port;