From 413adbf2411e7e82bcf0e2a97adcee997b50599f Mon Sep 17 00:00:00 2001 From: viraladmin <00purple@gmail.com> Date: Thu, 4 Jun 2026 20:45:43 -0600 Subject: [PATCH] node startup bug fixes --- src/records/memory/connections.rs | 88 ++++++++++++-- src/records/memory/network_mapping/add.rs | 22 ++-- src/rpc/client/handshake_processing.rs | 33 +++++- src/rpc/commands/receive_torrent.rs | 11 +- src/rpc/commands/route_reply.rs | 9 +- src/rpc/server/handshake.rs | 131 ++++++++++++++++++++- src/startup/network_broadcast.rs | 7 +- src/torrent/create_metadata.rs | 4 +- src/torrent/torrenting_system/get_nodes.rs | 22 ++++ 9 files changed, 285 insertions(+), 42 deletions(-) diff --git a/src/records/memory/connections.rs b/src/records/memory/connections.rs index 54efacd..df40d61 100644 --- a/src/records/memory/connections.rs +++ b/src/records/memory/connections.rs @@ -223,7 +223,7 @@ impl Connection { stream, client_type, wallet_public_key, - command_map, + command_map: _, } = params; let ip_bytes = ip_to_binary(&ip); @@ -272,15 +272,6 @@ impl Connection { ); self.connection_map.insert(connection_key, connection_info); - if client_type == ClientType::Miner { - spawn_monitor_update( - ip.clone(), - MONITOR_ACTION_ADD, - wallet_public_key.clone(), - port, - ); - Connection::client_checkup(stream, connection_type, ip, port, command_map); - } true } @@ -470,7 +461,6 @@ impl Connection { for (connection_key, info) in self.connection_map.iter_mut() { if connection_key.ip == ip_bytes && connection_key.port == port { info.wallet_registry_synced = true; - info.ready = info.network_map_synced; return true; } } @@ -485,7 +475,43 @@ impl Connection { for (connection_key, info) in self.connection_map.iter_mut() { if connection_key.ip == ip_bytes && connection_key.port == port { info.network_map_synced = true; - info.ready = info.wallet_registry_synced; + return true; + } + } + false + } + + pub fn mark_operational(&mut self, key: &str, command_map: Arc>) -> bool { + let Some((ip, port)) = split_ip_port_key(key) else { + return false; + }; + let ip_bytes = ip_to_binary(&ip); + for (connection_key, info) in self.connection_map.iter_mut() { + if connection_key.ip == ip_bytes && connection_key.port == port { + if info.ready { + return true; + } + if ClientType::from_bytes(&info.client_type) != Some(ClientType::Miner) + || !info.wallet_registry_synced + || !info.network_map_synced + { + return false; + } + info.ready = true; + spawn_monitor_update( + ip.clone(), + MONITOR_ACTION_ADD, + info.wallet_public_key.clone(), + port, + ); + Connection::client_checkup( + Arc::clone(&info.stream), + ConnectionType::from_bytes(&info.connection_type) + .unwrap_or(ConnectionType::Incoming), + ip, + port, + command_map, + ); return true; } } @@ -543,6 +569,26 @@ impl Connection { .collect() } + pub fn get_startup_synced_peer_streams_with_keys( + &self, + ) -> Vec<(String, Arc>)> { + self.connection_map + .iter() + .filter_map(|(key, connection_info)| { + if ClientType::from_bytes(&connection_info.client_type) != Some(ClientType::Miner) + || connection_info.ready + || !connection_info.wallet_registry_synced + || !connection_info.network_map_synced + { + return None; + } + let ip = binary_to_ip(key.ip.clone()); + let connections_key = format!("{}:{}", ip, key.port); + Some((connections_key, Arc::clone(&connection_info.stream))) + }) + .collect() + } + // Resolve a stored outgoing node connection back to its live stream. pub fn get_stream_for_outgoing(&self, ip: &str, port: u16) -> Option>> { let ip_bytes = ip_to_binary(ip); @@ -805,6 +851,15 @@ pub async fn mark_peer_network_map_synced(key: &str) -> bool { .unwrap_or(false) } +pub async fn mark_peer_operational(key: &str, map: Arc>) -> bool { + CONNECTIONS + .write() + .await + .as_mut() + .map(|connection| connection.mark_operational(key, map)) + .unwrap_or(false) +} + pub async fn live_miner_peer_streams() -> Vec<(String, Arc>)> { // Snapshot consensus and recovery checks vote only across currently // connected miner peers, regardless of incoming/outgoing direction. @@ -831,6 +886,15 @@ pub async fn live_miner_peer_streams() -> Vec<(String, Arc>)> { .unwrap_or_default() } +pub async fn startup_synced_peer_streams() -> Vec<(String, Arc>)> { + CONNECTIONS + .read() + .await + .as_ref() + .map(|connection| connection.get_startup_synced_peer_streams_with_keys()) + .unwrap_or_default() +} + pub async fn get_client_type_from_memory(key: &str) -> Option { // Recover the stored client role from the serialized connection key // used throughout the RPC layer. diff --git a/src/records/memory/network_mapping/add.rs b/src/records/memory/network_mapping/add.rs index 5c88198..6fc5a72 100644 --- a/src/records/memory/network_mapping/add.rs +++ b/src/records/memory/network_mapping/add.rs @@ -227,14 +227,20 @@ impl NodeInfo { } if !remote_ip.is_empty() { - Self::broadcast_node( - map.clone(), - &edit, - &remote_ip, - NodeEditType::Add, - &connections_key, - ) - .await; + let broadcast_map = map.clone(); + let broadcast_edit = edit.clone(); + let broadcast_remote_ip = remote_ip.clone(); + let broadcast_connections_key = connections_key.clone(); + tokio::spawn(async move { + Self::broadcast_node( + broadcast_map, + &broadcast_edit, + &broadcast_remote_ip, + NodeEditType::Add, + &broadcast_connections_key, + ) + .await; + }); } RpcResponse::Binary(b"Success".to_vec()) diff --git a/src/rpc/client/handshake_processing.rs b/src/rpc/client/handshake_processing.rs index bc58625..1ac2c3f 100644 --- a/src/rpc/client/handshake_processing.rs +++ b/src/rpc/client/handshake_processing.rs @@ -15,8 +15,8 @@ use crate::orphans::sync_check::sync_checkup; use crate::orphans::torrent_candidates::hydrate_torrent_candidates; use crate::records::block_height::get_block_height::get_height; use crate::records::memory::connections::{ - mark_peer_network_map_synced, mark_peer_wallet_registry_synced, set_reconnect_context, - CONNECTIONS, + mark_peer_network_map_synced, mark_peer_operational, mark_peer_wallet_registry_synced, + set_reconnect_context, startup_synced_peer_streams, CONNECTIONS, }; use crate::records::memory::enums::{ClientType, ConnectionType}; use crate::records::memory::network_mapping::NodeInfo; @@ -33,6 +33,7 @@ use crate::rpc::handshake_constants::{ HANDSHAKE_SIGNATURE_OFFSET, }; use crate::rpc::responses::RpcResponse; +use crate::rpc::server::connection_memory_manager::remove_key_from_memory; use crate::rpc::server::rpc_command_loop::start_loop; use crate::sled::Db; use crate::startup::network_broadcast::announce_self_to_network; @@ -174,9 +175,12 @@ pub async fn bootstrap_peer_discovery(mut params: BootstrapParams) -> Result<(), ) .await .map_err(|e| format!("Sync error: {e}"))?; - if !local_genesis_exists && !genesis_checkup().await { + if !local_genesis_exists && !genesis_checkup().await && remote_height > 0 { return Err("Sync completed without obtaining remote genesis".to_string()); } + if !local_genesis_exists && !genesis_checkup().await { + break; + } continue; } break; @@ -215,6 +219,9 @@ pub async fn bootstrap_peer_discovery(mut params: BootstrapParams) -> Result<(), } info!("[sync] post-sync checks complete, mining resuming"); + for (peer_key, _) in startup_synced_peer_streams().await { + mark_peer_operational(&peer_key, params.map.clone()).await; + } set_node_mode(NodeMode::Normal); clear_mining_stop_request(); set_mining_state(MiningState::Idle); @@ -343,6 +350,7 @@ pub async fn process_handshake_response( ) .await { + remove_key_from_memory(&connections_key).await; return Err(io::Error::other(format!( "Wallet registry sync failed after handshake: {err}" ))); @@ -359,7 +367,13 @@ pub async fn process_handshake_response( &connections_key, ) .await - .map_err(|err| io::Error::other(format!("Network map sync failed: {err}")))?; + .map_err(|err| { + let connections_key = connections_key.clone(); + tokio::spawn(async move { + remove_key_from_memory(&connections_key).await; + }); + io::Error::other(format!("Network map sync failed: {err}")) + })?; mark_peer_network_map_synced(&connections_key).await; if let Err(err) = NodeInfo::rebuild_mined_counts_from_chain(¶ms.db).await { error!("[startup] failed to rebuild mined counts from local chain: {err}"); @@ -384,8 +398,17 @@ pub async fn process_handshake_response( &connections_key, ) .await - .map_err(|err| io::Error::other(format!("Network map sync failed: {err}")))?; + .map_err(|err| { + let connections_key = connections_key.clone(); + tokio::spawn(async move { + remove_key_from_memory(&connections_key).await; + }); + io::Error::other(format!("Network map sync failed: {err}")) + })?; mark_peer_network_map_synced(&connections_key).await; + if crate::miner::flag::is_normal_mode() { + mark_peer_operational(&connections_key, params.map.clone()).await; + } } Ok(()) } diff --git a/src/rpc/commands/receive_torrent.rs b/src/rpc/commands/receive_torrent.rs index 500ffa9..653054b 100644 --- a/src/rpc/commands/receive_torrent.rs +++ b/src/rpc/commands/receive_torrent.rs @@ -1,7 +1,7 @@ use crate::common::check_genesis::genesis_checkup; use crate::common::skein::skein_128_hash_bytes; use crate::log::{error, warn}; -use crate::miner::flag::{is_reorganizing_mode, is_syncing_mode}; +use crate::miner::flag::{is_normal_mode, is_reorganizing_mode, is_syncing_mode}; use crate::orphans::checkup_state::{ finish_orphan_check, request_orphan_recheck, try_begin_orphan_check, }; @@ -62,7 +62,7 @@ pub async fn trigger_orphan_check( map: Arc>, connections_key: String, ) { - if is_syncing_mode() { + if !is_normal_mode() || is_syncing_mode() { return; } if !try_begin_orphan_check() { @@ -125,9 +125,10 @@ pub async fn torrent_submission( ) -> TorrentSubmissionOutcome { let expected_height = next_expected_height(db).await; let local_height = get_height(db); - let syncing = is_syncing_mode(); + let normal = is_normal_mode(); + let syncing = !normal || is_syncing_mode(); let reorganizing = is_reorganizing_mode(); - let process_now = !syncing && !reorganizing && height == expected_height; + let process_now = normal && !syncing && !reorganizing && height == expected_height; if height < expected_height && !(height > 0 && within_orphan_window(local_height, height)) { // Far-stale torrents cannot affect the orphan window. Ignore them @@ -141,7 +142,7 @@ pub async fn torrent_submission( // The sender receives an acknowledgement for staging even when the // torrent cannot be processed immediately. let staged_response = if syncing { - RpcResponse::Binary("Torrent staged while syncing.".as_bytes().to_vec()) + RpcResponse::Binary("Torrent staged while startup/syncing.".as_bytes().to_vec()) } else if reorganizing { RpcResponse::Binary("Torrent staged while reorganizing.".as_bytes().to_vec()) } else if height == expected_height { diff --git a/src/rpc/commands/route_reply.rs b/src/rpc/commands/route_reply.rs index ac91f2f..21559c5 100644 --- a/src/rpc/commands/route_reply.rs +++ b/src/rpc/commands/route_reply.rs @@ -54,13 +54,16 @@ pub async fn route_reply( return Ok(()); } - // Unknown replies usually mean a stale or forged UID. Drain the - // payload so the stream remains aligned for future commands. - bad_rpc_call::record(ip, client_type, db, wallet).await; let retired = is_retired_entry(map.clone(), uid).await; if retired { + // Retired UIDs are normal timeout fallout: the requester gave up, + // but the peer eventually answered. Drain without penalizing so + // startup/sync latency does not poison an otherwise valid peer. warn!("[rpc] late reply arrived for retired uid: {uid:?}"); } else { + // Unknown, never-reserved UIDs can still indicate malformed or + // forged traffic, so those keep counting as bad RPC behavior. + bad_rpc_call::record(ip, client_type, db, wallet).await; warn!("[rpc] reply arrived for unknown uid: {uid:?}"); } let _ = read_bytes_from_stream::read_usize_from_stream( diff --git a/src/rpc/server/handshake.rs b/src/rpc/server/handshake.rs index 5fd394b..dec97b6 100644 --- a/src/rpc/server/handshake.rs +++ b/src/rpc/server/handshake.rs @@ -1,20 +1,31 @@ -use crate::log::error; +use crate::common::check_genesis::genesis_checkup; +use crate::log::{error, warn}; +use crate::miner::flag::{ + clear_mining_stop_request, request_mining_stop, set_mining_state, set_node_mode, MiningState, + NodeMode, +}; +use crate::orphans::structs::OrphanCheckup2; +use crate::orphans::sync_check::sync_checkup; +use crate::orphans::torrent_candidates::hydrate_torrent_candidates; +use crate::records::block_height::get_block_height::get_height; use crate::records::memory::connections::{ - mark_peer_network_map_synced, mark_peer_wallet_registry_synced, + mark_peer_network_map_synced, mark_peer_operational, mark_peer_wallet_registry_synced, }; use crate::records::memory::network_mapping::NodeInfo; use crate::records::memory::response_channels::generate_uid; use crate::records::memory::response_channels::Command; use crate::records::wallet_registry::{register_short_address, WalletRegistrationResult}; +use crate::rpc::client::syncing::node_syncing; use crate::rpc::client::wallet_registry_sync::sync_wallet_registry_with_retries; use crate::rpc::responses::RpcResponse; -use crate::rpc::server::connection_memory_manager::write_to_memory; +use crate::rpc::server::connection_memory_manager::{remove_key_from_memory, write_to_memory}; use crate::rpc::server::handshake_processing::{combine_and_send_data, parse_received_data}; use crate::rpc::server::handshake_verifications::{connection_count, perform_handshake_tests}; use crate::rpc::server::structs::{CombineAndSendDataParams, HandshakeTestParams}; use crate::rpc::server::tests::{endpoint_port, is_port_open}; use crate::sled::Db; use crate::startup::network_broadcast::announce_self_to_network; +use crate::startup::remote_height::request_remote_height; use crate::wallets::structures::Wallet; use crate::Arc; use crate::AsyncWriteExt; @@ -40,6 +51,83 @@ async fn get_connection_counts() -> (u8, u8) { (incoming, outgoing) } +async fn sync_incoming_peer_before_operational( + stream: Arc>, + db: &Db, + wallet: Arc, + map: Arc>, + connections_key: &str, +) -> Result { + let local_height = get_height(db); + let remote_height = + request_remote_height(stream.clone(), map.clone(), connections_key.to_string()).await?; + let local_genesis_exists = genesis_checkup().await; + + if local_genesis_exists && remote_height < local_height { + warn!( + "[startup] incoming peer is behind local chain and will remain non-operational until it catches up: local_height={local_height} remote_height={remote_height}" + ); + return Ok(false); + } + + if !local_genesis_exists || remote_height > local_height + 10 { + set_node_mode(NodeMode::Syncing); + request_mining_stop(); + set_mining_state(MiningState::Idle); + + node_syncing( + stream.clone(), + db, + remote_height, + map.clone(), + true, + wallet.clone(), + connections_key.to_string(), + ) + .await + .map_err(|err| format!("incoming sync error: {err}"))?; + + if !local_genesis_exists && !genesis_checkup().await && remote_height > 0 { + return Err("incoming sync completed without obtaining remote genesis".to_string()); + } + } + + let post_sync_local_height = get_height(db); + let post_sync_remote_height = + request_remote_height(stream.clone(), map.clone(), connections_key.to_string()).await?; + + if post_sync_remote_height > post_sync_local_height { + match hydrate_torrent_candidates(stream.clone(), map.clone(), connections_key.to_string()) + .await + { + Ok(imported) => { + if imported > 0 { + warn!( + "[sync] hydrated {imported} torrent candidates before incoming post-sync orphan check" + ); + } + } + Err(err) => warn!("[sync] failed to hydrate incoming torrent candidates: {err}"), + } + + let orphan_checkup_params = OrphanCheckup2 { + stream: stream.clone(), + db: db.clone(), + local_height: post_sync_local_height, + remote_height: post_sync_remote_height, + recheck_from_height: Some(post_sync_local_height.min(post_sync_remote_height)), + map: map.clone(), + node_syncing: true, + connections_key: connections_key.to_string(), + }; + if let Err(err) = sync_checkup(orphan_checkup_params, wallet).await { + warn!("[sync] Incoming post-sync orphan check error: {err}"); + } + } + + Ok(true) +} + async fn complete_incoming_miner_setup( stream: Arc>, db: &Db, @@ -59,19 +147,52 @@ async fn complete_incoming_miner_setup( .await { error!("[startup] incoming peer wallet registry sync failed: {err}"); + remove_key_from_memory(connections_key).await; return; } mark_peer_wallet_registry_synced(connections_key).await; let short_address = wallet.saved.short_address.clone(); - if let Err(err) = - announce_self_to_network(stream, &short_address, map, db, wallet, connections_key).await + if let Err(err) = announce_self_to_network( + stream.clone(), + &short_address, + map.clone(), + db, + wallet.clone(), + connections_key, + ) + .await { error!("[startup] incoming peer network map sync failed: {err}"); + remove_key_from_memory(connections_key).await; return; } mark_peer_network_map_synced(connections_key).await; + let operational = match sync_incoming_peer_before_operational( + stream.clone(), + db, + wallet.clone(), + map.clone(), + connections_key, + ) + .await + { + Ok(operational) => operational, + Err(err) => { + error!("[startup] incoming peer chain sync failed: {err}"); + remove_key_from_memory(connections_key).await; + return; + } + }; + + if operational { + mark_peer_operational(connections_key, map.clone()).await; + set_node_mode(NodeMode::Normal); + clear_mining_stop_request(); + set_mining_state(MiningState::Idle); + } + if let Err(err) = NodeInfo::rebuild_mined_counts_from_chain(db).await { error!("[startup] failed to rebuild mined counts after incoming handshake: {err}"); } diff --git a/src/startup/network_broadcast.rs b/src/startup/network_broadcast.rs index 9579af1..dce7962 100644 --- a/src/startup/network_broadcast.rs +++ b/src/startup/network_broadcast.rs @@ -126,8 +126,11 @@ pub async fn announce_self_to_network( .map_err(|_| "timed out waiting for network self-announcement response".to_string())? .ok_or_else(|| "network self-announcement response channel closed".to_string())?; - if binary_to_string(buffer.clone()) != "Success" { - return Err("network self-announcement was rejected".to_string()); + let response = binary_to_string(buffer.clone()); + if response != "Success" { + return Err(format!( + "network self-announcement was rejected: {response}" + )); } get_network_mapping( diff --git a/src/torrent/create_metadata.rs b/src/torrent/create_metadata.rs index 16e01dc..28b814d 100644 --- a/src/torrent/create_metadata.rs +++ b/src/torrent/create_metadata.rs @@ -6,7 +6,7 @@ use crate::records::memory::response_channels::{reserve_entry, Command}; use crate::rpc::command_maps::RPC_SUBMIT_TORRENT; use crate::rpc::responses::RpcResponse; use crate::torrent::structs::{Info, Torrent}; -use crate::torrent::torrenting_system::get_nodes::get_nodes_from_memory; +use crate::torrent::torrenting_system::get_nodes::get_torrent_broadcast_nodes_from_memory; use crate::torrent::torrenting_system::torrent_cache::should_broadcast_torrent; use crate::Arc; use crate::File; @@ -154,7 +154,7 @@ pub async fn broadcast_new_torrent_to_peers( let torrent_len = 4 + torrent_bytes.len() as u32; // Send the torrent to all currently connected miner peers. - let peers = get_nodes_from_memory().await; + let peers = get_torrent_broadcast_nodes_from_memory().await; for (connections_key, stream) in peers { // Each peer needs its own reply mapping entry and UID. let (uid_bytes, _tx, _rx) = reserve_entry(map.clone()).await; diff --git a/src/torrent/torrenting_system/get_nodes.rs b/src/torrent/torrenting_system/get_nodes.rs index 981a7f9..b0a5fbd 100644 --- a/src/torrent/torrenting_system/get_nodes.rs +++ b/src/torrent/torrenting_system/get_nodes.rs @@ -30,3 +30,25 @@ pub async fn get_nodes_from_memory() -> Vec<(String, Arc>)> { drop(connection_storage); nodes } + +pub async fn get_torrent_broadcast_nodes_from_memory() -> Vec<(String, Arc>)> { + // Torrent announcements are allowed to reach miner peers that are + // still starting/syncing so they can stage new candidates while + // catching up. Consensus/piece-selection paths must keep using + // get_nodes_from_memory(), which requires ready peers. + let connection_storage = CONNECTIONS.read().await; + let mut nodes = Vec::new(); + if let Some(connection) = &*connection_storage { + for connection_info in connection.connection_map.values() { + if ClientType::from_bytes(&connection_info.client_type) != Some(ClientType::Miner) { + continue; + } + let ip = binary_to_ip(connection_info.ip.clone()); + let port = connection_info.port; + let key = format!("{ip}:{port}"); + let stream_arc = Arc::clone(&connection_info.stream); + nodes.push((key, stream_arc)); + } + } + nodes +}