From f651fda2dd1ffc6e57838b70bbe77799b1d1a0e2 Mon Sep 17 00:00:00 2001 From: viraladmin <00purple@gmail.com> Date: Sat, 13 Jun 2026 09:55:52 -0600 Subject: [PATCH] startup sync bug fixes --- src/miner/block_rewards.rs | 11 +- src/orphans/orphan_checkup.rs | 14 ++ src/orphans/orphan_window_check.rs | 27 ++-- src/orphans/sync_check.rs | 14 +- src/records/memory/connections.rs | 28 ++++ src/records/memory/network_mapping/add.rs | 16 +- .../memory/network_mapping/mined_counts.rs | 35 +++++ src/rpc/client/handshake_processing.rs | 11 +- src/rpc/client/syncing.rs | 13 ++ src/rpc/commands/receive_torrent.rs | 138 ++++++++++++++++-- src/rpc/server/handshake.rs | 6 +- src/startup/network_broadcast.rs | 5 +- .../async_funcs/verify_rewards.rs | 17 ++- 13 files changed, 286 insertions(+), 49 deletions(-) diff --git a/src/miner/block_rewards.rs b/src/miner/block_rewards.rs index f1554a8..e3de3d1 100644 --- a/src/miner/block_rewards.rs +++ b/src/miner/block_rewards.rs @@ -4,6 +4,8 @@ use crate::records::block_height::get_block_height::get_height; use crate::records::memory::network_mapping::NodeInfo; use crate::sled::Db; +const REWARD_MATURITY_BLOCKS: u8 = 100; + pub async fn calculate_block_reward(block_height: u32) -> u64 { // Apply the fixed halving schedule based on the block // height currently being mined or verified. @@ -37,7 +39,14 @@ pub async fn create_rewards_transaction( // New miners must first prove participation before receiving // the block subsidy, so early mined blocks pay a zero reward. - let value = if NodeInfo::get_mined_count(short_address).await < 100 { + let value = if !NodeInfo::chain_mined_count_at_least( + short_address, + get_height(db), + REWARD_MATURITY_BLOCKS, + ) + .await + .unwrap_or(false) + { 0_u64 } else { calculate_block_reward(block_height).await diff --git a/src/orphans/orphan_checkup.rs b/src/orphans/orphan_checkup.rs index 996d161..c24f7f5 100644 --- a/src/orphans/orphan_checkup.rs +++ b/src/orphans/orphan_checkup.rs @@ -9,6 +9,7 @@ use crate::records::memory::torrent_status::{ }; use crate::records::unpack_block::load_by_binary_data::load_block_from_binary; use crate::records::unpack_block::unpack_header::load_block_header; +use crate::rpc::client::block_hash_vote::request_block_hash_at_height; use crate::torrent::structs::{DownloadSave, Torrent}; use crate::torrent::torrenting_system::create_file::combine_pieces; use crate::torrent::torrenting_system::download_locks::acquire_candidate_download; @@ -104,6 +105,19 @@ async fn candidate_attaches_before_rollback( // Metadata may choose a candidate, but only downloaded block bytes can // prove the rollback is safe. torrent.verify(height, ¶ms.db, wallet).await?; + let peer_canonical_hash = request_block_hash_at_height( + params.stream.clone(), + params.map.clone(), + params.connections_key.clone(), + height, + ) + .await?; + if peer_canonical_hash != torrent.info.block_hash { + return Err(format!( + "Staged candidate is not peer canonical at height {height}." + )); + } + let _download_guard = acquire_candidate_download(height, &torrent.info.info_hash, true).await?; let verification_service = global_verification_service() diff --git a/src/orphans/orphan_window_check.rs b/src/orphans/orphan_window_check.rs index c33043e..e632257 100644 --- a/src/orphans/orphan_window_check.rs +++ b/src/orphans/orphan_window_check.rs @@ -20,11 +20,14 @@ pub async fn orphan_window_check(params: CheckUp, wallet: Arc) -> Result } }; + let shared_tip = params.local_height.min(params.remote_height); + let shared_window_floor = include_recheck_floor(shared_tip.saturating_sub(10)); + if height_diff == 0 { // same height means compare the last ten blocks directly - let start_check = params.local_height; - let original_start_check = params.local_height; - let stop_check = include_recheck_floor(params.local_height.saturating_sub(10)); + let start_check = shared_tip; + let original_start_check = shared_tip; + let stop_check = shared_window_floor; let orphan_checkup_params = OrphanCheckup { start_check, stop_check, @@ -42,12 +45,9 @@ pub async fn orphan_window_check(params: CheckUp, wallet: Arc) -> Result } else if height_diff <= 10 && params.local_height > params.remote_height { // if the local chain is slightly ahead, begin comparison from // the remote height and only search within the overlap window - let start_check = params.remote_height; - let original_start_check = params.remote_height; - // The farther apart the tips are, the less backward overlap remains - // inside the ten-block correction window. - let stop_check = - include_recheck_floor(params.remote_height.saturating_sub(10 - height_diff)); + let start_check = shared_tip; + let original_start_check = shared_tip; + let stop_check = shared_window_floor; let orphan_checkup_params = OrphanCheckup { start_check, stop_check, @@ -65,12 +65,9 @@ pub async fn orphan_window_check(params: CheckUp, wallet: Arc) -> Result } else if height_diff <= 10 && params.local_height < params.remote_height { // if the remote chain is slightly ahead, start at the local tip // and search backward only within the valid orphan range - let start_check = params.local_height; - let original_start_check = params.local_height; - // Search only the portion of local history that could still be - // replaced by staged remote candidates. - let stop_check = - include_recheck_floor(params.local_height.saturating_sub(10 - height_diff)); + let start_check = shared_tip; + let original_start_check = shared_tip; + let stop_check = shared_window_floor; let orphan_checkup_params = OrphanCheckup { start_check, stop_check, diff --git a/src/orphans/sync_check.rs b/src/orphans/sync_check.rs index 3c9d445..1c08a25 100644 --- a/src/orphans/sync_check.rs +++ b/src/orphans/sync_check.rs @@ -244,10 +244,16 @@ async fn sync_checkup_pass(params: &OrphanCheckup2, wallet: Arc) -> Resu match orphan_window_check(checkup_params, wallet.clone()).await { Ok(()) => {} Err(err) => { - if should_retry_staged_candidate(&err) - && get_height(¶ms.db) < height_before_window_check - { - replay_waiting = true; + let height_after_window_check = get_height(¶ms.db); + let rolled_back = height_after_window_check < height_before_window_check; + if rolled_back { + if should_retry_staged_candidate(&err) { + replay_waiting = true; + } else { + return Err(format!( + "orphan window adoption failed after rollback: {err}" + )); + } } error!("[orphan] orphan window check error: {err}"); } diff --git a/src/records/memory/connections.rs b/src/records/memory/connections.rs index 2c10001..7bbfe49 100644 --- a/src/records/memory/connections.rs +++ b/src/records/memory/connections.rs @@ -884,6 +884,34 @@ pub async fn mark_peer_operational(key: &str, map: Arc>) -> bool .unwrap_or(false) } +pub async fn peer_is_operational(key: &str) -> bool { + let Some((ip, port)) = split_ip_port_key(key) else { + return false; + }; + let ip_bytes = ip_to_binary(&ip); + + CONNECTIONS + .read() + .await + .as_ref() + .and_then(|connection| { + connection + .connection_map + .iter() + .find_map(|(connection_key, info)| { + if connection_key.ip == ip_bytes + && connection_key.port == port + && ClientType::from_bytes(&info.client_type) == Some(ClientType::Miner) + { + Some(info.ready) + } else { + None + } + }) + }) + .unwrap_or(false) +} + pub async fn live_miner_peer_streams() -> Vec<(String, Arc>)> { // Snapshot consensus and recovery checks vote only across currently // connected miner peers, regardless of incoming/outgoing direction. diff --git a/src/records/memory/network_mapping/add.rs b/src/records/memory/network_mapping/add.rs index 5ae6974..5cf828a 100644 --- a/src/records/memory/network_mapping/add.rs +++ b/src/records/memory/network_mapping/add.rs @@ -3,6 +3,10 @@ use crate::records::memory::response_channels::reserve_transient_entry_with_cont const ONE_HOUR_MILLIS: u64 = 3_600_000; +fn connection_key_ip(connections_key: &str) -> Option<&str> { + connections_key.rsplit_once(':').map(|(ip, _)| ip) +} + impl NodeInfo { pub async fn import_signed_mapping_address( db: &Db, @@ -216,9 +220,15 @@ impl NodeInfo { return RpcResponse::Binary(b"Error: Invalid network address".to_vec()); } - // Locally initiated edits are re-signed with the local wallet and - // current timestamp so they can be propagated as fresh node events. - if edit.ip == remote_ip { + let unsigned_self_announcement = edit.modified_by.is_empty() + && (edit.ip == remote_ip + || connection_key_ip(&connections_key) + .map(|connection_ip| edit.ip == connection_ip) + .unwrap_or(false)); + + // Unsigned self-announcements are sponsored by the receiving node + // after the claimed IP is tied to the live peer connection. + if unsigned_self_announcement { edit.modified_timestamp = current_timestamp; edit.modified_by = wallet.saved.short_address.clone(); edit.modified_signature = diff --git a/src/records/memory/network_mapping/mined_counts.rs b/src/records/memory/network_mapping/mined_counts.rs index 3a327e5..54fb6c0 100644 --- a/src/records/memory/network_mapping/mined_counts.rs +++ b/src/records/memory/network_mapping/mined_counts.rs @@ -1,4 +1,5 @@ use super::*; +use crate::common::check_genesis::genesis_checkup; use crate::records::unpack_block::unpack_header::load_block_header; impl NodeInfo { @@ -37,6 +38,30 @@ impl NodeInfo { } } + pub async fn chain_mined_count_at_least( + address: &str, + through_height: u32, + threshold: u8, + ) -> Result { + if threshold == 0 { + return Ok(true); + } + + let mut mined_count = 0_u8; + let start_height = if through_height > 0 { 1 } else { 0 }; + for block_number in start_height..=through_height { + let header = load_block_header(block_number).await?; + if header.unmined_block.miner == address { + mined_count = mined_count.saturating_add(1); + if mined_count >= threshold { + return Ok(true); + } + } + } + + Ok(false) + } + pub async fn set_deleted_block_from_mapping(address: &str, deleted_block: u32) { let mut map = ADDRESS_MAP.lock().await; if let Some(node_info) = map.get_mut(address) { @@ -67,6 +92,16 @@ impl NodeInfo { let current_height = get_height(db); let mut mined_counts: HashMap = HashMap::new(); + if current_height == 0 && !genesis_checkup().await { + let mut map = ADDRESS_MAP.lock().await; + for node_info in map.values_mut() { + node_info.blocks_mined = 0; + } + drop(map); + Self::persist_recovery_snapshot("mined rebuild without genesis").await; + return Ok(()); + } + let start_height = if current_height > 0 { 1 } else { 0 }; for block_number in start_height..=current_height { let header = load_block_header(block_number).await?; diff --git a/src/rpc/client/handshake_processing.rs b/src/rpc/client/handshake_processing.rs index 06180b8..ddaab2d 100644 --- a/src/rpc/client/handshake_processing.rs +++ b/src/rpc/client/handshake_processing.rs @@ -60,10 +60,13 @@ pub struct BootstrapParams { pub fn spawn_bootstrap_peer_discovery(params: BootstrapParams) { tokio::spawn(async move { + let run_startup_sync = params.run_startup_sync; if let Err(e) = bootstrap_peer_discovery(params).await { - set_node_mode(NodeMode::Normal); - clear_mining_stop_request(); - set_mining_state(MiningState::Idle); + if !run_startup_sync { + set_node_mode(NodeMode::Normal); + clear_mining_stop_request(); + set_mining_state(MiningState::Idle); + } eprintln!("[bootstrap] error: {e}"); } }); @@ -233,7 +236,7 @@ pub async fn bootstrap_peer_discovery(mut params: BootstrapParams) -> Result<(), }; match sync_checkup(orphan_checkup_params, params.wallet.clone()).await { Ok(()) => {} - Err(err) => warn!("[sync] Post-sync orphan check error: {err}"), + Err(err) => return Err(format!("Post-sync orphan check error: {err}")), } } diff --git a/src/rpc/client/syncing.rs b/src/rpc/client/syncing.rs index 6ad4a7c..f99c81d 100644 --- a/src/rpc/client/syncing.rs +++ b/src/rpc/client/syncing.rs @@ -95,6 +95,19 @@ pub async fn node_syncing( Err(err) => { warn!("[sync] error saving block: height={local_height} err={err}"); + if err.contains("Invalid reward for the Rewards Transaction") + || err.contains("This address is not eligable to mine") + || err.contains("This miner address is not registered") + || err.contains("Miner wallet address is not registered") + { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + format!( + "sync rejected canonical peer block at height {local_height}: {err}" + ), + )); + } + // Refresh local chain state before triggering orphan // handling so the reconciliation logic starts from the // real saved height. diff --git a/src/rpc/commands/receive_torrent.rs b/src/rpc/commands/receive_torrent.rs index e6e4ded..d012ed1 100644 --- a/src/rpc/commands/receive_torrent.rs +++ b/src/rpc/commands/receive_torrent.rs @@ -1,14 +1,21 @@ use crate::common::check_genesis::genesis_checkup; use crate::common::skein::skein_128_hash_bytes; -use crate::log::{error, warn}; -use crate::miner::flag::{is_normal_mode, is_reorganizing_mode, is_syncing_mode}; +use crate::log::{error, info, warn}; +use crate::miner::flag::{ + clear_mining_stop_request, is_normal_mode, is_reorganizing_mode, is_syncing_mode, + request_mining_stop, set_mining_state, set_node_mode, wait_for_mining_idle, MiningState, + NodeMode, +}; use crate::orphans::checkup_state::{ finish_orphan_check, request_orphan_recheck, try_begin_orphan_check, }; use crate::orphans::structs::OrphanCheckup2; use crate::orphans::sync_check::sync_checkup; use crate::records::block_height::get_block_height::get_height; +use crate::records::memory::connections::{get_client_type_from_memory, peer_is_operational}; +use crate::records::memory::enums::ClientType; use crate::records::memory::response_channels::Command; +use crate::rpc::client::syncing::node_syncing; use crate::rpc::read_bytes_from_stream; use crate::rpc::responses::RpcResponse; use crate::rpc::server::flood_protection::MAX_TORRENT_METADATA_BYTES; @@ -26,6 +33,8 @@ use crate::wallets::structures::Wallet; use crate::Arc; use crate::Mutex; +const LIVE_SYNC_HEIGHT_GAP: u32 = 10; + pub fn should_trigger_orphan_check(error: &str) -> bool { // These errors mean the incoming torrent may belong to a competing // branch, so a targeted orphan check is worth attempting. @@ -53,6 +62,72 @@ fn within_orphan_window(local_height: u32, incoming_height: u32) -> bool { local_height.abs_diff(incoming_height) <= 10 } +async fn run_live_catchup_sync( + stream: Arc>, + db: &Db, + wallet: Arc, + map: Arc>, + connections_key: String, + local_height: u32, + remote_height: u32, +) -> Result<(), String> { + info!( + "[sync] live catch-up started: local_height={local_height} remote_height={remote_height}" + ); + set_node_mode(NodeMode::Syncing); + request_mining_stop(); + wait_for_mining_idle().await; + + let sync_result = node_syncing( + stream.clone(), + db, + remote_height, + map.clone(), + true, + wallet.clone(), + connections_key.clone(), + ) + .await + .map_err(|err| format!("Live catch-up sync failed: {err}")); + + if sync_result.is_ok() { + let post_sync_local_height = get_height(db); + let post_sync_remote_height = + match request_remote_height(stream.clone(), map.clone(), connections_key.clone()).await + { + Ok(height) => height, + Err(err) => { + warn!("[sync] live catch-up failed to refresh post-sync remote height: {err}"); + remote_height + } + }; + + if post_sync_remote_height != post_sync_local_height { + let orphan_checkup_params = OrphanCheckup2 { + stream, + db: db.clone(), + local_height: post_sync_local_height, + remote_height: post_sync_remote_height, + recheck_from_height: Some(post_sync_local_height.min(post_sync_remote_height)), + map, + node_syncing: true, + connections_key, + }; + if let Err(err) = sync_checkup(orphan_checkup_params, wallet).await { + warn!("[sync] live catch-up post-sync orphan check error: {err}"); + } + } + } + + set_node_mode(NodeMode::Normal); + clear_mining_stop_request(); + set_mining_state(MiningState::Idle); + if sync_result.is_ok() { + info!("[sync] live catch-up complete, normal mode restored"); + } + sync_result +} + pub async fn trigger_orphan_check( reason: &str, incoming_height: u32, @@ -87,20 +162,37 @@ pub async fn trigger_orphan_check( warn!( "[broadcast] triggering orphan check: reason={reason} local_height={local_height} remote_height={remote_height}" ); - let orphan_checkup_params = OrphanCheckup2 { - stream, - db: db.clone(), - local_height, - remote_height, - recheck_from_height: Some(incoming_height), - map, - node_syncing: false, - connections_key, - }; - match sync_checkup(orphan_checkup_params, wallet).await { - Ok(()) => {} - Err(err) => error!("[broadcast] orphan check error: {err}"), + + let result = if remote_height > local_height.saturating_add(LIVE_SYNC_HEIGHT_GAP) { + run_live_catchup_sync( + stream, + db, + wallet, + map, + connections_key, + local_height, + remote_height, + ) + .await + } else { + let orphan_checkup_params = OrphanCheckup2 { + stream, + db: db.clone(), + local_height, + remote_height, + recheck_from_height: Some(incoming_height), + map, + node_syncing: false, + connections_key, + }; + sync_checkup(orphan_checkup_params, wallet) + .await + .map_err(|err| format!("Orphan check failed: {err}")) }; + + if let Err(err) = result { + error!("[broadcast] orphan/sync recovery error: {err}"); + } finish_orphan_check(); } @@ -307,6 +399,22 @@ pub async fn receive_torrent( ) .await?; + if get_client_type_from_memory(connections_key).await == Some(ClientType::Miner) + && !peer_is_operational(connections_key).await + { + warn!( + "[broadcast] ignored torrent from non-operational peer: peer={connections_key} height={block_number}" + ); + return Ok(( + uid, + RpcResponse::Binary( + "Torrent ignored from non-operational peer." + .as_bytes() + .to_vec(), + ), + )); + } + let outcome = torrent_submission( block_number, torrent_bytes, diff --git a/src/rpc/server/handshake.rs b/src/rpc/server/handshake.rs index ad67b09..845cc96 100644 --- a/src/rpc/server/handshake.rs +++ b/src/rpc/server/handshake.rs @@ -135,9 +135,9 @@ async fn sync_incoming_peer_before_operational( node_syncing: true, connections_key: connections_key.to_string(), }; - if let Err(err) = sync_checkup(orphan_checkup_params, wallet).await { - warn!("[sync] Incoming post-sync orphan check error: {err}"); - } + sync_checkup(orphan_checkup_params, wallet) + .await + .map_err(|err| format!("Incoming post-sync orphan check error: {err}"))?; } Ok(true) diff --git a/src/startup/network_broadcast.rs b/src/startup/network_broadcast.rs index 69b5fe2..53786ec 100644 --- a/src/startup/network_broadcast.rs +++ b/src/startup/network_broadcast.rs @@ -41,11 +41,12 @@ pub async fn announce_self_to_network( Some(bytes) => bytes, None => return Err("local node address was invalid".to_string()), }; - let modified_by_bytes = vec![0u8; Wallet::SHORT_ADDRESS_BYTES_LENGTH]; let time = Utc::now().timestamp_millis() as u64; let modified_timestamp_bytes = time.to_le_bytes(); // Self-announcement is intentionally unsigned. The receiving node - // adopts it by re-signing the membership edit with its own wallet. + // sponsors it by signing the membership edit after confirming the + // announcement belongs to the live peer connection. + let modified_by_bytes = vec![0u8; Wallet::SHORT_ADDRESS_BYTES_LENGTH]; let modified_signature_bytes = vec![0u8; Wallet::SIGNATURE_LENGTH]; let mut message: Vec = Vec::with_capacity( diff --git a/src/verifications/async_funcs/verify_rewards.rs b/src/verifications/async_funcs/verify_rewards.rs index 49084a2..15f5e31 100644 --- a/src/verifications/async_funcs/verify_rewards.rs +++ b/src/verifications/async_funcs/verify_rewards.rs @@ -6,6 +6,8 @@ use crate::records::wallet_registry::is_registered_short_address; use crate::sled::Db; use crate::verifications::async_funcs::checks::verify_db::db_hex_verification; +const REWARD_MATURITY_BLOCKS: u8 = 100; + impl RewardsTransaction { pub async fn verify(&self, miner: String, db: &Db) -> Result { // Rewards are tied to the next block height and only begin after @@ -23,7 +25,13 @@ impl RewardsTransaction { // New miners receive zero reward until their mined-count history // reaches the maturity threshold. - let reward_value = if NodeInfo::get_mined_count(&miner).await < 100 { + let reward_value = if !NodeInfo::chain_mined_count_at_least( + &miner, + previous_height, + REWARD_MATURITY_BLOCKS, + ) + .await? + { 0_u64 } else { calculate_block_reward(previous_height + 1).await @@ -32,7 +40,12 @@ impl RewardsTransaction { // The unsigned reward value must exactly match the deterministic // reward calculation for the block being created. if value != reward_value { - return Err("Invalid reward for the Rewards Transaction.".to_string()); + return Err(format!( + "Invalid reward for the Rewards Transaction. miner={miner} height={} actual={} expected={}", + previous_height + 1, + value, + reward_value + )); } // Recompute the reward txid after the value check so duplicate