From 4b66a2bd54dd3a1db895cbc7e9f8205d8e23d6b2 Mon Sep 17 00:00:00 2001 From: viraladmin <00purple@gmail.com> Date: Mon, 1 Jun 2026 08:29:11 -0600 Subject: [PATCH] fixed issues with orphan correction and mining --- settings.ini | 4 +- src/blocks/block.rs | 106 +++++++----- src/main.rs | 7 +- src/miner/genesis.rs | 19 +-- src/miner/mining.rs | 22 +-- src/miner/structs.rs | 3 +- src/orphans/add_genesis.rs | 17 +- src/orphans/checkup_state.rs | 51 ++++++ src/orphans/deep_sync_rollback.rs | 6 +- src/orphans/mod.rs | 1 + src/orphans/orphan_checkup.rs | 36 +++-- src/orphans/orphan_window_check.rs | 29 +++- src/orphans/replay_errors.rs | 57 +++++++ src/orphans/save_blocks.rs | 23 +-- src/orphans/snapshot_check.rs | 113 ++++++++++++- src/orphans/structs.rs | 3 + src/orphans/sync_check.rs | 95 ++++++++--- src/orphans/undo_block_transactions.rs | 9 +- src/records/ip_score/ban_management.rs | 15 +- src/records/ip_score/penalty.rs | 152 ++++++++---------- src/records/ip_score/score.rs | 6 +- src/records/memory/averages.rs | 71 +++----- src/records/memory/connections.rs | 36 ++++- src/records/memory/network_mapping/add.rs | 18 +-- src/records/memory/network_mapping/delete.rs | 22 +-- src/records/memory/network_mapping/queries.rs | 15 +- src/records/memory/network_mapping/structs.rs | 6 +- src/records/memory/torrent_status.rs | 16 +- src/records/record_chain/save.rs | 39 +++-- src/records/record_chain/structs.rs | 2 + src/rpc/client/block_hash_vote.rs | 42 +++++ src/rpc/client/handshake.rs | 8 +- src/rpc/client/handshake_processing.rs | 24 ++- src/rpc/client/mod.rs | 1 + src/rpc/client/structs.rs | 5 +- src/rpc/client/syncing.rs | 8 +- src/rpc/command_maps.rs | 1 + src/rpc/commands/add_network_node.rs | 4 +- src/rpc/commands/bad_rpc_call.rs | 6 +- src/rpc/commands/block_hash_at_height.rs | 24 +++ src/rpc/commands/block_peer_ip.rs | 17 +- src/rpc/commands/delete_network_node.rs | 4 +- src/rpc/commands/mod.rs | 1 + src/rpc/commands/receive_torrent.rs | 43 +++-- src/rpc/commands/route_reply.rs | 7 +- src/rpc/commands/unblock_peer_ip.rs | 13 +- src/rpc/commands/validate_torrent.rs | 5 +- src/rpc/server/command_loop_state.rs | 28 +++- src/rpc/server/flood_protection.rs | 6 +- src/rpc/server/handshake.rs | 29 +--- src/rpc/server/handshake_processing.rs | 11 +- src/rpc/server/rpc_command_loop.rs | 50 ++++-- src/rpc/server/start_rpc.rs | 11 +- src/rpc/server/structs.rs | 3 +- src/startup/connections.rs | 33 ++-- src/startup/initialize_startup.rs | 8 +- src/startup/network_broadcast.rs | 10 +- src/startup/node_runtime.rs | 13 +- src/startup/unlock_pipe.rs | 46 +++--- src/startup/windows_service.rs | 10 +- src/torrent/create_metadata.rs | 3 +- src/torrent/structs.rs | 49 +++++- src/torrent/torrenting_system/save_block.rs | 12 ++ .../torrenting_system/torrent_requests.rs | 13 +- .../async_funcs/validate_torrent_data.rs | 29 ++-- 65 files changed, 961 insertions(+), 615 deletions(-) create mode 100644 src/orphans/checkup_state.rs create mode 100644 src/rpc/client/block_hash_vote.rs create mode 100644 src/rpc/commands/block_hash_at_height.rs diff --git a/settings.ini b/settings.ini index b1314c1..f0150b8 100644 --- a/settings.ini +++ b/settings.ini @@ -6,12 +6,12 @@ TORRENT_PATH = "./torrents" DB_PATH = "./state" BALANCE_SHEET = "./balance_sheet" LOG_PATH = "./logs" -WALLET_PATH = "/home/viraladmin/chatgpt/wallets" +WALLET_PATH = "./wallets" WALLET_NAME = "contractless.wallet" [Settings] -LOG_LEVEL = "disabled" +LOG_LEVEL = "info" PUBLIC_IP = "your_public_ip_address" LISTEN_IP = "0.0.0.0" RPC_PORT = "50055" diff --git a/src/blocks/block.rs b/src/blocks/block.rs index 63c474e..d661541 100644 --- a/src/blocks/block.rs +++ b/src/blocks/block.rs @@ -1,16 +1,20 @@ use crate::common::skein::{skein_256_hash_data, skein_512_hash_data}; use crate::common::types::Transaction; use crate::records::block_height::get_block_height::get_height; -use crate::records::memory::averages::{calculate_averages, update_block_data}; +use crate::records::memory::averages::{asert_anchor, update_block_data}; use crate::sled::Db; use crate::to_string; use crate::wallets::structures::Wallet; use crate::Cursor; -use crate::Duration; use crate::Serialize; use crate::{decode, encode}; use crate::{AsyncReadExt, AsyncWriteExt}; +const TARGET_BLOCK_SECONDS: i128 = 15; +const ASERT_HALF_LIFE_SECONDS: i128 = 1_800; +const ASERT_RADIX_BITS: i128 = 16; +const ASERT_FIXED_ONE: i128 = 1 << ASERT_RADIX_BITS; + pub const TIMESTAMP_OFFSET: usize = 0; pub const MINER_OFFSET: usize = TIMESTAMP_OFFSET + 4; pub const PREVIOUS_HASH_OFFSET: usize = MINER_OFFSET + Wallet::SHORT_ADDRESS_BYTES_LENGTH; @@ -99,15 +103,11 @@ impl UnminedBlock { a ^ b ^ c ^ d } - pub async fn vrf_generate(self, wallet_key: String) -> VrfBlock { + pub async fn vrf_generate(self, private_key: &str) -> VrfBlock { // Sign the unmined header hash with the miner wallet and derive // the VRF number from that signature. let hash = self.hash().await; - let wallet = Wallet::try_obtain_wallet(wallet_key, None) - .await - .unwrap_or_else(|err| panic!("Wallet decryption failed: {err}")); - let privkey = &wallet.saved.private_key; - let proof = Wallet::sign_transaction(&hash, privkey).await; + let proof = Wallet::sign_transaction(&hash, private_key).await; let vrf = Self::generate_random_number(&proof).await; VrfBlock { unmined_block: self, @@ -122,52 +122,82 @@ impl UnminedBlock { skein_256_hash_data(&serialized) } - // Calculate the next difficulty using the rolling average and target block time. - fn calculate_new_difficulty( - current_difficulty: u64, - difficulty_average: u64, - average_duration: Duration, - ) -> u64 { - let lower_bound = Duration::from_secs(14); - let upper_bound = Duration::from_secs(16); + fn asert_target(anchor_target: u64, height_delta: u32, time_delta: i128) -> u64 { + // Deterministic fixed-point ASERT calculation. The polynomial is the + // BCH ASERT approximation for 2^x, avoiding platform-dependent floats. + let expected_time = height_delta as i128 * TARGET_BLOCK_SECONDS; + let time_error = time_delta - expected_time; + let exponent = (time_error << ASERT_RADIX_BITS) / ASERT_HALF_LIFE_SECONDS; + let shifts = exponent >> ASERT_RADIX_BITS; + let frac = exponent - (shifts << ASERT_RADIX_BITS); - // When the rolling average is already within the target window, - // use the cached mean difficulty exactly. - if difficulty_average > 0 - && average_duration >= lower_bound - && average_duration <= upper_bound - { - return difficulty_average; - } + let factor = ASERT_FIXED_ONE + + ((195_766_423_245_049_i128 * frac + + 971_821_376_i128 * frac * frac + + 5_127_i128 * frac * frac * frac + + (1_i128 << 47)) + >> 48); - // Outside the target window, apply the capped 30% adjustment - // with integer math to keep the result stable. - let adjustment = current_difficulty.saturating_mul(30).saturating_div(100); - if average_duration > upper_bound { - current_difficulty.saturating_add(adjustment) - } else if average_duration < lower_bound { - current_difficulty.saturating_sub(adjustment) + let mut target = anchor_target as u128 * factor.max(1) as u128; + + if shifts >= 0 { + if shifts >= 64 { + return u64::MAX; + } + target = target.checked_shl(shifts as u32).unwrap_or(u128::MAX); } else { - current_difficulty + let right_shift = (-shifts) as u32; + if right_shift >= 128 { + return 1; + } + target >>= right_shift; } + + target >>= ASERT_RADIX_BITS as u32; + target.clamp(1, u64::MAX as u128) as u64 } - // Adjust difficulty based on the latest saved block averages. + fn clamp_per_block(raw_target: u64, current_difficulty: u64) -> u64 { + // ASERT provides the direction and scale, while this guard keeps any + // single block from swinging the threshold too far. + let lower_bound = current_difficulty + .saturating_mul(85) + .saturating_div(100) + .max(1); + let upper_bound = current_difficulty + .saturating_mul(115) + .saturating_div(100) + .max(lower_bound); + + raw_target.clamp(lower_bound, upper_bound) + } + + // Adjust difficulty based on ASERT drift from the oldest cached canonical block. pub async fn adjust_difficulty( current_timestamp: u32, db: &Db, current_difficulty: u64, ) -> u64 { let block_number = get_height(db); + let candidate_height = block_number + 1; - // Refresh rolling block data before reading averages. + // Refresh cached canonical block data before reading the ASERT anchor. update_block_data(block_number).await; - // Get the current rolling difficulty and duration averages. - let (difficulty_average, average_duration) = calculate_averages(current_timestamp).await; + let Some((anchor_height, anchor_timestamp, anchor_difficulty)) = asert_anchor().await + else { + return current_difficulty; + }; - // Apply the bounded difficulty adjustment. - Self::calculate_new_difficulty(current_difficulty, difficulty_average, average_duration) + if anchor_height >= candidate_height { + return current_difficulty; + } + + let height_delta = candidate_height - anchor_height; + let time_delta = current_timestamp as i128 - anchor_timestamp as i128; + let raw_target = Self::asert_target(anchor_difficulty, height_delta, time_delta); + + Self::clamp_per_block(raw_target, current_difficulty) } } diff --git a/src/main.rs b/src/main.rs index e53cba1..259f409 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,13 +2,14 @@ use blockchain::exit; use blockchain::log::{error, logger}; use blockchain::startup::daemonize::daemonize_after_wallet_prompt; use blockchain::startup::daemonize::handle_control_command; -use blockchain::startup::initialize_startup::obtain_startup_wallet_key; +use blockchain::startup::initialize_startup::obtain_startup_wallet; use blockchain::startup::initialize_startup::prepare_pre_wallet_startup; use blockchain::startup::node_runtime::initialize_node_logging; use blockchain::startup::node_runtime::install_panic_cleanup; use blockchain::startup::node_runtime::run_unlocked_node; use blockchain::startup::windows_service::handle_windows_service_command; use blockchain::startup::windows_service::try_run_as_windows_service; +use blockchain::Arc; use blockchain::Runtime; use tokio::runtime::Builder; @@ -52,7 +53,7 @@ fn main() { .build() .expect("Failed to create startup runtime"); startup_runtime.block_on(prepare_pre_wallet_startup()); - let wallet_key = startup_runtime.block_on(obtain_startup_wallet_key()); + let wallet = startup_runtime.block_on(obtain_startup_wallet()); drop(startup_runtime); // Linux detaches after the wallet prompt unless --foreground is supplied. @@ -74,7 +75,7 @@ fn main() { } }); install_panic_cleanup(); - if let Err(e) = runtime.block_on(run_unlocked_node(wallet_key, true)) { + if let Err(e) = runtime.block_on(run_unlocked_node(Arc::new(wallet), true)) { error!("Failed to start unlocked node runtime: {e}"); logger().flush(); eprintln!("Failed to start unlocked node runtime: {e}"); diff --git a/src/miner/genesis.rs b/src/miner/genesis.rs index c2a4234..7ede38e 100644 --- a/src/miner/genesis.rs +++ b/src/miner/genesis.rs @@ -25,19 +25,10 @@ use crate::Utc; pub async fn create_genesis_transaction( db: &Db, verification_service: Arc, - wallet_key: String, + wallet: Arc, map: Arc>, ) { - // Load the local wallet so the genesis block records the miner's - // current short address in the header. - let wallet = match Wallet::try_obtain_wallet(wallet_key.clone(), None).await { - Ok(wallet) => wallet, - Err(err) => { - error!("Wallet decryption failed: {err}"); - return; - } - }; - let miner = wallet.saved.short_address; + let miner = wallet.saved.short_address.clone(); // The genesis transaction carries the fixed launch message and // uses transaction type zero. @@ -52,7 +43,7 @@ pub async fn create_genesis_transaction( let _ = create_genesis_block( genesis_transaction, &miner, - wallet_key, + wallet, db, verification_service, map, @@ -63,7 +54,7 @@ pub async fn create_genesis_transaction( async fn create_genesis_block( signed_genesis_transaction: GenesisTransaction, miner: &str, - wallet_key: String, + wallet: Arc, db: &Db, verification_service: Arc, map: Arc>, @@ -134,7 +125,7 @@ async fn create_genesis_block( .await; // The VRF binds the candidate header to the mining wallet. - let vrf_block = UnminedBlock::vrf_generate(block_struct, wallet_key.clone()).await; + let vrf_block = UnminedBlock::vrf_generate(block_struct, &wallet.saved.private_key).await; let header_hash = vrf_block.hash().await; diff --git a/src/miner/mining.rs b/src/miner/mining.rs index 160c29f..1b853e8 100644 --- a/src/miner/mining.rs +++ b/src/miner/mining.rs @@ -26,7 +26,7 @@ use crate::Utc; pub async fn mine_block( db: &Db, verification_service: Arc, - wallet_key: String, + wallet: Arc, map: Arc>, ) -> Result<(), Box> { if Settings::load() @@ -42,13 +42,7 @@ pub async fn mine_block( // Mining runs continuously, rebuilding its context from the // latest saved tip before each one-second nonce round. - let wallet = match Wallet::try_obtain_wallet(wallet_key.clone(), None).await { - Ok(wallet) => wallet, - Err(err) => { - return Err(std::io::Error::other(format!("Wallet decryption failed: {err}")).into()); - } - }; - let miner_short = wallet.saved.short_address; + let miner_short = wallet.saved.short_address.clone(); // Track the height this miner expects to produce next so nonce workers // can stop quickly when another peer advances the chain. @@ -109,7 +103,7 @@ pub async fn mine_block( let attempt_context = match prepare_attempt_context( db, miner_short.clone(), - wallet_key.clone(), + wallet.clone(), current_block_number, verification_service.clone(), ) @@ -157,7 +151,7 @@ async fn wait_until_mining_allowed(mut was_stopped: bool) -> bool { async fn prepare_attempt_context( db: &Db, miner_short: String, - wallet_key: String, + wallet: Arc, current_block_number: u32, verification_service: Arc, ) -> Option { @@ -171,7 +165,7 @@ async fn prepare_attempt_context( match build_attempt_context( db, miner_short, - wallet_key, + wallet, current_block_number, verification_service, ) @@ -218,7 +212,7 @@ async fn wait_for_next_second_or_chain_change( async fn build_attempt_context( db: &Db, miner_short: String, - wallet_key: String, + wallet: Arc, current_block_number: u32, verification_service: Arc, ) -> Result> { @@ -235,7 +229,7 @@ async fn build_attempt_context( Ok(MiningAttemptContext { db: db.clone(), miner_short, - wallet_key, + wallet, current_block_number, previous_hash, previous_difficulty, @@ -271,7 +265,7 @@ pub async fn mine_block_internal( .await; // Add the wallet VRF proof before hashing and verifying the candidate. - let vrf_block = UnminedBlock::vrf_generate(unmined_block, ctx.wallet_key.clone()).await; + let vrf_block = UnminedBlock::vrf_generate(unmined_block, &ctx.wallet.saved.private_key).await; let block_hash = vrf_block.hash().await; // Every mined block begins with a consensus-created reward transaction. diff --git a/src/miner/structs.rs b/src/miner/structs.rs index 7528df3..18f4da8 100644 --- a/src/miner/structs.rs +++ b/src/miner/structs.rs @@ -1,5 +1,6 @@ use crate::sled::Db; use crate::verifications::verification_service::VerificationService; +use crate::wallets::structures::Wallet; use crate::Arc; // MiningAttemptContext captures one consistent chain tip for a nonce round. @@ -8,7 +9,7 @@ use crate::Arc; pub struct MiningAttemptContext { pub db: Db, pub miner_short: String, - pub wallet_key: String, + pub wallet: Arc, pub current_block_number: u32, pub previous_hash: String, pub previous_difficulty: u64, diff --git a/src/orphans/add_genesis.rs b/src/orphans/add_genesis.rs index 93ecc7a..05a1eaa 100644 --- a/src/orphans/add_genesis.rs +++ b/src/orphans/add_genesis.rs @@ -6,6 +6,7 @@ use crate::torrent::structs::Torrent; use crate::torrent::torrenting_system::torrent_requests::{ handle_response_and_save_torrent, send_request_torrent_message, }; +use crate::wallets::structures::Wallet; use crate::Arc; use crate::Duration; use crate::Mutex; @@ -16,7 +17,7 @@ pub async fn create_genesis_block( map: Arc>, stream: Arc>, db: Db, - wallet_key: &str, + wallet: Arc, connections_key: String, ) { // if no local genesis exists, request the remote genesis @@ -38,17 +39,9 @@ pub async fn create_genesis_block( return; } }; - handle_response_and_save_torrent( - 0, - &db, - torrent, - wallet_key, - map.clone(), - false, - false, - ) - .await - .ok(); + handle_response_and_save_torrent(0, &db, torrent, wallet, map.clone(), false, false) + .await + .ok(); } } } diff --git a/src/orphans/checkup_state.rs b/src/orphans/checkup_state.rs new file mode 100644 index 0000000..6d6ddf6 --- /dev/null +++ b/src/orphans/checkup_state.rs @@ -0,0 +1,51 @@ +use crate::{AtomicBool, AtomicOrdering}; +use std::sync::atomic::AtomicU32; + +static ORPHAN_CHECK_RUNNING: AtomicBool = AtomicBool::new(false); +static ORPHAN_RECHECK_REQUESTED: AtomicBool = AtomicBool::new(false); +static ORPHAN_RECHECK_FROM_HEIGHT: AtomicU32 = AtomicU32::new(0); + +fn store_earliest_recheck_height(incoming_height: u32) { + let mut current = ORPHAN_RECHECK_FROM_HEIGHT.load(AtomicOrdering::SeqCst); + while current == 0 || incoming_height < current { + match ORPHAN_RECHECK_FROM_HEIGHT.compare_exchange( + current, + incoming_height, + AtomicOrdering::SeqCst, + AtomicOrdering::SeqCst, + ) { + Ok(_) => return, + Err(next_current) => current = next_current, + } + } +} + +pub fn try_begin_orphan_check() -> bool { + if ORPHAN_CHECK_RUNNING + .compare_exchange(false, true, AtomicOrdering::SeqCst, AtomicOrdering::SeqCst) + .is_err() + { + return false; + } + + ORPHAN_RECHECK_REQUESTED.store(false, AtomicOrdering::SeqCst); + ORPHAN_RECHECK_FROM_HEIGHT.store(0, AtomicOrdering::SeqCst); + true +} + +pub fn request_orphan_recheck(incoming_height: u32) { + store_earliest_recheck_height(incoming_height); + ORPHAN_RECHECK_REQUESTED.store(true, AtomicOrdering::SeqCst); +} + +pub fn take_orphan_recheck_height() -> Option { + if !ORPHAN_RECHECK_REQUESTED.swap(false, AtomicOrdering::SeqCst) { + return None; + } + + Some(ORPHAN_RECHECK_FROM_HEIGHT.swap(0, AtomicOrdering::SeqCst)) +} + +pub fn finish_orphan_check() { + ORPHAN_CHECK_RUNNING.store(false, AtomicOrdering::SeqCst); +} diff --git a/src/orphans/deep_sync_rollback.rs b/src/orphans/deep_sync_rollback.rs index dbf5712..f7385b5 100644 --- a/src/orphans/deep_sync_rollback.rs +++ b/src/orphans/deep_sync_rollback.rs @@ -4,8 +4,10 @@ use crate::orphans::structs::{CheckUp, UndoTransactions}; use crate::orphans::undo_block_transactions::undo_transactions; use crate::torrent::unpack_local_torrent::load_torrent; use crate::torrent::unpack_remote_torrent::request_torrent; +use crate::wallets::structures::Wallet; +use crate::Arc; -pub async fn deep_sync_rollback(mut params: CheckUp, wallet_key: &str) { +pub async fn deep_sync_rollback(mut params: CheckUp, wallet: Arc) { if params.local_height < params.remote_height { // This pass only handles deeper sync gaps. Near-tip disagreements // are left for the orphan-window check. @@ -40,7 +42,7 @@ pub async fn deep_sync_rollback(mut params: CheckUp, wallet_key: &str) { node_syncing: params.node_syncing, connections_key: params.connections_key.clone(), }; - undo_transactions(undo_transactions_params, wallet_key) + undo_transactions(undo_transactions_params, wallet.clone()) .await .ok(); params.local_height -= 1; diff --git a/src/orphans/mod.rs b/src/orphans/mod.rs index 8237477..59ba33a 100644 --- a/src/orphans/mod.rs +++ b/src/orphans/mod.rs @@ -1,4 +1,5 @@ pub mod add_genesis; +pub mod checkup_state; pub mod deep_sync_rollback; pub mod get_path_names; pub mod orphan_checkup; diff --git a/src/orphans/orphan_checkup.rs b/src/orphans/orphan_checkup.rs index e21968c..46b26ad 100644 --- a/src/orphans/orphan_checkup.rs +++ b/src/orphans/orphan_checkup.rs @@ -1,7 +1,7 @@ use crate::common::skein::skein_128_hash_bytes; use crate::log::{info, warn}; use crate::miner::flag::begin_reorg_lock; -use crate::orphans::replay_errors::should_retry_staged_candidate; +use crate::orphans::replay_errors::staged_candidate_status_for_error; use crate::orphans::structs::{OrphanCheckup, UndoTransactions}; use crate::orphans::undo_block_transactions::undo_transactions; use crate::records::memory::torrent_status::{ @@ -20,6 +20,8 @@ use crate::torrent::torrenting_system::temp_database_storage::remove_block_piece use crate::torrent::torrenting_system::torrent_map::create_torrent_map; use crate::torrent::unpack_local_torrent::load_torrent; use crate::verifications::verification_service::global_verification_service; +use crate::wallets::structures::Wallet; +use crate::Arc; async fn staged_candidates_for_height(height: u32) -> Vec { let mut candidates = Vec::new(); @@ -97,11 +99,11 @@ async fn candidate_attaches_before_rollback( params: &OrphanCheckup, height: u32, torrent: &Torrent, - wallet_key: &str, + wallet: Arc, ) -> Result<(), String> { // Metadata may choose a candidate, but only downloaded block bytes can // prove the rollback is safe. - torrent.verify(height, ¶ms.db, wallet_key).await?; + torrent.verify(height, ¶ms.db, wallet).await?; let _download_guard = acquire_candidate_download(height, &torrent.info.info_hash, true).await?; let verification_service = global_verification_service() @@ -140,6 +142,12 @@ async fn candidate_attaches_before_rollback( cleanup_candidate_pieces(¶ms.db, height, torrent).await; return Err("Candidate header hash does not match torrent metadata.".to_string()); } + if !torrent.info.previous_hash.is_empty() + && loaded_block.vrf_block.unmined_block.previous_hash != torrent.info.previous_hash + { + cleanup_candidate_pieces(¶ms.db, height, torrent).await; + return Err("Candidate previous hash does not match torrent metadata.".to_string()); + } if height > 0 { let parent_height = height - 1; @@ -154,7 +162,7 @@ async fn candidate_attaches_before_rollback( Ok(()) } -pub async fn checkup(params: OrphanCheckup, wallet_key: &str) -> Result<(), String> { +pub async fn checkup(params: OrphanCheckup, wallet: Arc) -> Result<(), String> { // The orphan window check only reasons over local canonical/staged evidence inside the // orphan window. If we do not yet have a competing staged torrent, // there is nothing to compare and the local chain remains current. @@ -173,9 +181,11 @@ pub async fn checkup(params: OrphanCheckup, wallet_key: &str) -> Result<(), Stri if !torrent_beats(&competing_torrent, &local_torrent) { // The local block remains the winner at this height. Since // candidates are sorted best-first, every remaining staged - // competitor has also lost to the local block. + // competitor with the same parent has also lost to the local block. for staged_torrent in &staged_candidates { - if staged_torrent.info.info_hash != local_torrent.info.info_hash { + if staged_torrent.info.info_hash != local_torrent.info.info_hash + && staged_torrent.info.previous_hash == local_torrent.info.previous_hash + { set_torrent_status( height, &staged_torrent.info.info_hash, @@ -191,7 +201,7 @@ pub async fn checkup(params: OrphanCheckup, wallet_key: &str) -> Result<(), Stri ¶ms, height, &competing_torrent, - wallet_key, + wallet.clone(), ) .await { @@ -211,23 +221,15 @@ pub async fn checkup(params: OrphanCheckup, wallet_key: &str) -> Result<(), Stri begin_reorg_lock().await; } info!("[orphan] adopting proven staged chain from height {height}"); - undo_transactions(undo_transactions_params, wallet_key).await?; + undo_transactions(undo_transactions_params, wallet.clone()).await?; return Ok(()); } Err(err) => { - let status = if should_retry_staged_candidate(&err) { - TorrentStatus::Pending - } else { - TorrentStatus::Invalid - }; + let status = staged_candidate_status_for_error(&err); set_torrent_status(height, &competing_info_hash, status).await; warn!( "[orphan] staged candidate failed pre-rollback proof: height={height} err={err}" ); - - if status == TorrentStatus::Pending { - break; - } } } } diff --git a/src/orphans/orphan_window_check.rs b/src/orphans/orphan_window_check.rs index ddfcfff..c33043e 100644 --- a/src/orphans/orphan_window_check.rs +++ b/src/orphans/orphan_window_check.rs @@ -1,7 +1,9 @@ use crate::orphans::orphan_checkup::checkup; use crate::orphans::structs::{CheckUp, OrphanCheckup}; +use crate::wallets::structures::Wallet; +use crate::Arc; -pub async fn orphan_window_check(params: CheckUp, wallet_key: &str) -> Result<(), String> { +pub async fn orphan_window_check(params: CheckUp, wallet: Arc) -> Result<(), String> { // orphan window check handles near-tip comparisons where the local and // remote chains are within the orphan correction window let height_diff = match params.local_height.cmp(¶ms.remote_height) { @@ -10,24 +12,33 @@ pub async fn orphan_window_check(params: CheckUp, wallet_key: &str) -> Result<() std::cmp::Ordering::Less => params.remote_height - params.local_height, }; + let include_recheck_floor = |stop_check: u32| { + if let Some(recheck_from_height) = params.recheck_from_height { + stop_check.min(recheck_from_height) + } else { + stop_check + } + }; + if height_diff == 0 { // same height means compare the last ten blocks directly let start_check = params.local_height; let original_start_check = params.local_height; - let stop_check = params.local_height.saturating_sub(10); + let stop_check = include_recheck_floor(params.local_height.saturating_sub(10)); let orphan_checkup_params = OrphanCheckup { start_check, stop_check, original_start_check, local_height: params.local_height, remote_height: params.remote_height, + recheck_from_height: params.recheck_from_height, stream: params.stream, db: params.db, map: params.map.clone(), node_syncing: params.node_syncing, connections_key: params.connections_key, }; - checkup(orphan_checkup_params, wallet_key).await?; + checkup(orphan_checkup_params, wallet.clone()).await?; } else if height_diff <= 10 && params.local_height > params.remote_height { // if the local chain is slightly ahead, begin comparison from // the remote height and only search within the overlap window @@ -35,20 +46,22 @@ pub async fn orphan_window_check(params: CheckUp, wallet_key: &str) -> Result<() let original_start_check = params.remote_height; // The farther apart the tips are, the less backward overlap remains // inside the ten-block correction window. - let stop_check = params.remote_height.saturating_sub(10 - height_diff); + let stop_check = + include_recheck_floor(params.remote_height.saturating_sub(10 - height_diff)); let orphan_checkup_params = OrphanCheckup { start_check, stop_check, original_start_check, local_height: params.local_height, remote_height: params.remote_height, + recheck_from_height: params.recheck_from_height, stream: params.stream, db: params.db, map: params.map.clone(), node_syncing: params.node_syncing, connections_key: params.connections_key, }; - checkup(orphan_checkup_params, wallet_key).await?; + checkup(orphan_checkup_params, wallet.clone()).await?; } else if height_diff <= 10 && params.local_height < params.remote_height { // if the remote chain is slightly ahead, start at the local tip // and search backward only within the valid orphan range @@ -56,20 +69,22 @@ pub async fn orphan_window_check(params: CheckUp, wallet_key: &str) -> Result<() let original_start_check = params.local_height; // Search only the portion of local history that could still be // replaced by staged remote candidates. - let stop_check = params.local_height.saturating_sub(10 - height_diff); + let stop_check = + include_recheck_floor(params.local_height.saturating_sub(10 - height_diff)); let orphan_checkup_params = OrphanCheckup { start_check, stop_check, original_start_check, local_height: params.local_height, remote_height: params.remote_height, + recheck_from_height: params.recheck_from_height, stream: params.stream, db: params.db, map: params.map.clone(), node_syncing: params.node_syncing, connections_key: params.connections_key, }; - checkup(orphan_checkup_params, wallet_key).await?; + checkup(orphan_checkup_params, wallet.clone()).await?; } Ok(()) } diff --git a/src/orphans/replay_errors.rs b/src/orphans/replay_errors.rs index 402b1f4..a5a2803 100644 --- a/src/orphans/replay_errors.rs +++ b/src/orphans/replay_errors.rs @@ -1,7 +1,33 @@ +use crate::records::memory::torrent_status::TorrentStatus; + +pub fn staged_candidate_status_for_error(error: &str) -> TorrentStatus { + if error.contains("Incorrect previous_block_hash.") + || error.contains("Candidate parent is not current chain parent.") + { + return TorrentStatus::MissingParent; + } + + if should_retry_staged_candidate(error) { + TorrentStatus::Pending + } else { + TorrentStatus::Invalid + } +} + pub fn should_retry_staged_candidate(error: &str) -> bool { // Explicit "not found" responses mean connected peers cannot seed this // candidate anymore. Keep retry behavior for local timing/concurrency // conditions only. + if error.contains("Incoming block is no longer the next expected height.") { + return true; + } + + if error.contains("Incorrect previous_block_hash.") + || error.contains("Candidate parent is not current chain parent.") + { + return true; + } + if error.contains("No available peer could provide remaining pieces") || error.contains("piece not found") || error.contains("Requested candidate not found") @@ -20,3 +46,34 @@ pub fn should_retry_staged_candidate(error: &str) -> bool { || error.contains("Candidate download already active") || error.contains("Timed out waiting for active candidate download") } + +#[cfg(test)] +mod tests { + use super::{should_retry_staged_candidate, staged_candidate_status_for_error}; + use crate::records::memory::torrent_status::TorrentStatus; + + #[test] + fn next_expected_height_race_keeps_candidate_eligible() { + assert!(should_retry_staged_candidate( + "Incoming block is no longer the next expected height." + )); + } + + #[test] + fn parent_mismatch_waits_for_missing_parent() { + assert_eq!( + staged_candidate_status_for_error("Incorrect previous_block_hash."), + TorrentStatus::MissingParent + ); + } + + #[test] + fn difficulty_mismatch_rejects_candidate() { + assert_eq!( + staged_candidate_status_for_error( + "error: Difficulty mismatch with the blockchain data." + ), + TorrentStatus::Invalid + ); + } +} diff --git a/src/orphans/save_blocks.rs b/src/orphans/save_blocks.rs index 65d58a4..d0e99cd 100644 --- a/src/orphans/save_blocks.rs +++ b/src/orphans/save_blocks.rs @@ -1,10 +1,10 @@ -use crate::orphans::replay_errors::should_retry_staged_candidate; +use crate::orphans::replay_errors::staged_candidate_status_for_error; use crate::orphans::structs::UndoTransactions; use crate::orphans::torrent_candidates::hydrate_torrent_candidates; use crate::records::block_height::get_block_height::get_height; use crate::records::memory::response_channels::reserve_entry; use crate::records::memory::torrent_status::{ - get_torrent_status, mark_other_torrent_statuses_invalid, set_torrent_status, TorrentStatus, + get_torrent_status, set_torrent_status, TorrentStatus, }; use crate::torrent::structs::Torrent; use crate::torrent::torrenting_system::save_torrent::{ @@ -13,13 +13,15 @@ use crate::torrent::torrenting_system::save_torrent::{ use crate::torrent::torrenting_system::torrent_requests::{ handle_response_and_save_torrent, send_request_torrent_message, }; +use crate::wallets::structures::Wallet; +use crate::Arc; use crate::{timeout, Duration}; use std::collections::HashSet; pub async fn save_new_blocks( params: &UndoTransactions, replay_to_height: u32, - wallet_key: &str, + wallet: Arc, mut true_start_height: u32, ) -> Result<(), String> { // After rollback, save replacement blocks only up to the height @@ -70,7 +72,7 @@ pub async fn save_new_blocks( true_start_height, ¶ms.db, torrent.clone(), - wallet_key, + wallet.clone(), params.map.clone(), true, true, @@ -85,8 +87,6 @@ pub async fn save_new_blocks( TorrentStatus::Valid, ) .await; - mark_other_torrent_statuses_invalid(true_start_height, &torrent_info_hash) - .await; resolved_from_staging = true; break; } else { @@ -99,13 +99,7 @@ pub async fn save_new_blocks( } } Err(err) => { - let status = if should_retry_staged_candidate(&err) { - // Missing pieces mean the candidate has not been - // tested yet, so keep it eligible for a later replay. - TorrentStatus::Pending - } else { - TorrentStatus::Invalid - }; + let status = staged_candidate_status_for_error(&err); set_torrent_status(true_start_height, &torrent_info_hash, status).await; } } @@ -189,7 +183,7 @@ pub async fn save_new_blocks( true_start_height, ¶ms.db, torrent, - wallet_key, + wallet.clone(), params.map.clone(), true, true, @@ -207,7 +201,6 @@ pub async fn save_new_blocks( )); } set_torrent_status(true_start_height, &torrent_info_hash, TorrentStatus::Valid).await; - mark_other_torrent_statuses_invalid(true_start_height, &torrent_info_hash).await; } else { return Err(format!( "No replacement torrent received while replaying height {true_start_height}" diff --git a/src/orphans/snapshot_check.rs b/src/orphans/snapshot_check.rs index e923854..3aa56e7 100644 --- a/src/orphans/snapshot_check.rs +++ b/src/orphans/snapshot_check.rs @@ -1,11 +1,16 @@ use crate::common::binary_conversions::binary_to_string; -use crate::log::error; +use crate::log::{error, info, warn}; use crate::miner::flag::begin_reorg_lock; use crate::orphans::structs::UndoTransactions; use crate::orphans::undo_block_transactions::undo_transactions; +use crate::records::memory::connections::live_miner_peer_streams; +use crate::records::memory::response_channels::Command; use crate::records::unpack_block::unpack_header::load_block_header; +use crate::rpc::client::block_hash_vote::request_block_hash_at_height; use crate::sled::Db; use crate::torrent::unpack_remote_torrent::request_torrent; +use crate::wallets::structures::Wallet; +use crate::{tokio, Arc, Mutex}; async fn get_snapshot(db: &Db) -> Option<(u32, String)> { // snapshots store a trusted height/hash pair used to @@ -26,7 +31,92 @@ pub async fn snapshot_height(db: &Db) -> Option { get_snapshot(db).await.map(|(height, _)| height) } -pub async fn update_snapshot(db: &Db, current_height: u32) -> Result<(), String> { +fn required_snapshot_votes(total_voters: usize) -> usize { + (total_voters * 2).div_ceil(3).max(2) +} + +async fn snapshot_has_peer_quorum( + snapshot_height: u32, + local_hash: &str, + map: Arc>, +) -> bool { + let peers = live_miner_peer_streams().await; + let total_voters = peers.len() + 1; + + if total_voters < 2 { + warn!( + "[snapshot] not advancing snapshot at height {snapshot_height}: no connected miner peers" + ); + return false; + } + + let required_votes = required_snapshot_votes(total_voters); + let mut handles = Vec::with_capacity(peers.len()); + + for (connections_key, stream) in peers { + let map_clone = map.clone(); + handles.push(tokio::spawn(async move { + let vote = request_block_hash_at_height( + stream, + map_clone, + connections_key.clone(), + snapshot_height, + ) + .await; + (connections_key, vote) + })); + } + + let mut matching_votes = 1usize; + + for handle in handles { + match handle.await { + Ok((connections_key, Ok(peer_hash))) => { + if peer_hash == local_hash { + matching_votes += 1; + } else { + warn!( + "[snapshot] peer hash mismatch: height={snapshot_height} peer={connections_key} local_hash={local_hash} peer_hash={peer_hash}" + ); + } + } + Ok((connections_key, Err(err))) => { + warn!( + "[snapshot] peer vote failed: height={snapshot_height} peer={connections_key} err={err}" + ); + } + Err(err) => { + warn!("[snapshot] peer vote task failed at height {snapshot_height}: {err}"); + } + } + } + + if matching_votes >= required_votes { + info!( + "[snapshot] consensus reached: height={snapshot_height} hash={local_hash} votes={matching_votes}/{total_voters} required={required_votes}" + ); + true + } else { + warn!( + "[snapshot] consensus not reached: height={snapshot_height} hash={local_hash} votes={matching_votes}/{total_voters} required={required_votes}" + ); + false + } +} + +fn store_snapshot(db: &Db, snapshot_height: u32, hash: &str) -> Result<(), String> { + let value = format!("{snapshot_height}:{hash}"); + let key = b"snapshot"; + db.insert(key, value.as_bytes()) + .map_err(|e| format!("Failed to store snapshot at height {snapshot_height}: {e}"))?; + Ok(()) +} + +pub async fn update_snapshot( + db: &Db, + current_height: u32, + map: Arc>, +) -> Result<(), String> { // Genesis is always a valid snapshot, then later snapshots lag the tip // so normal orphan correction still has room to operate. let snapshot_height = if current_height == 0 { @@ -45,14 +135,21 @@ pub async fn update_snapshot(db: &Db, current_height: u32) -> Result<(), String> // still loaded from disk when the snapshot is checked. let header = load_block_header(snapshot_height).await?; let hash = header.hash().await; - let value = format!("{snapshot_height}:{hash}"); - let key = b"snapshot"; - db.insert(key, value.as_bytes()) - .map_err(|e| format!("Failed to store snapshot at height {snapshot_height}: {e}"))?; + + if snapshot_height == 0 { + store_snapshot(db, snapshot_height, &hash)?; + return Ok(()); + } + + if !snapshot_has_peer_quorum(snapshot_height, &hash, map).await { + return Ok(()); + } + + store_snapshot(db, snapshot_height, &hash)?; Ok(()) } -pub async fn snapshot_verified(params: UndoTransactions, wallet_key: &str) -> bool { +pub async fn snapshot_verified(params: UndoTransactions, wallet: Arc) -> bool { // if the local chain disagrees with the stored snapshot, // roll back to the snapshot point before continuing if let Some((snap_height, snap_hash)) = get_snapshot(¶ms.db).await { @@ -75,7 +172,7 @@ pub async fn snapshot_verified(params: UndoTransactions, wallet_key: &str) -> bo node_syncing: params.node_syncing, connections_key: params.connections_key, }; - let _ = undo_transactions(undo_transactions_params, wallet_key).await; + let _ = undo_transactions(undo_transactions_params, wallet.clone()).await; return false; } // also make sure the remote peer still agrees diff --git a/src/orphans/structs.rs b/src/orphans/structs.rs index 5d81d2c..b218ca8 100644 --- a/src/orphans/structs.rs +++ b/src/orphans/structs.rs @@ -12,6 +12,7 @@ pub struct OrphanCheckup { pub original_start_check: u32, pub local_height: u32, pub remote_height: u32, + pub recheck_from_height: Option, pub stream: Arc>, pub db: Db, pub map: Arc>, @@ -26,6 +27,7 @@ pub struct OrphanCheckup2 { pub db: Db, pub local_height: u32, pub remote_height: u32, + pub recheck_from_height: Option, pub map: Arc>, pub node_syncing: bool, pub connections_key: String, @@ -49,6 +51,7 @@ pub struct UndoTransactions { pub struct CheckUp { pub local_height: u32, pub remote_height: u32, + pub recheck_from_height: Option, pub db: Db, pub stream: Arc>, pub map: Arc>, diff --git a/src/orphans/sync_check.rs b/src/orphans/sync_check.rs index 3023ca9..e1ffa51 100644 --- a/src/orphans/sync_check.rs +++ b/src/orphans/sync_check.rs @@ -2,24 +2,33 @@ use crate::common::check_genesis::genesis_checkup; use crate::log::{error, info, warn}; use crate::miner::flag::end_reorg_lock; use crate::orphans::add_genesis::create_genesis_block; +use crate::orphans::checkup_state::take_orphan_recheck_height; use crate::orphans::deep_sync_rollback::deep_sync_rollback; use crate::orphans::orphan_window_check::orphan_window_check; -use crate::orphans::replay_errors::should_retry_staged_candidate; +use crate::orphans::replay_errors::{ + should_retry_staged_candidate, staged_candidate_status_for_error, +}; use crate::orphans::snapshot_check::snapshot_verified; use crate::orphans::structs::CheckUp; use crate::orphans::structs::OrphanCheckup2; use crate::orphans::structs::UndoTransactions; use crate::records::block_height::get_block_height::get_height; use crate::records::memory::torrent_status::{ - get_torrent_status, mark_other_torrent_statuses_invalid, set_torrent_status, TorrentStatus, + get_torrent_status, set_torrent_status, TorrentStatus, }; +use crate::startup::remote_height::request_remote_height; use crate::torrent::structs::Torrent; use crate::torrent::torrenting_system::save_torrent::{ list_staged_torrents, read_staged_torrent, remove_staged_torrent, }; use crate::torrent::torrenting_system::torrent_requests::handle_response_and_save_torrent; +use crate::wallets::structures::Wallet; +use crate::Arc; -async fn replay_staged_torrents(params: &OrphanCheckup2, wallet_key: &str) -> Result<(), String> { +async fn replay_staged_torrents( + params: &OrphanCheckup2, + wallet: Arc, +) -> Result<(), String> { // staged torrents are replayed after orphan correction so // any valid deferred candidates can be reconsidered in order. // Replay is height-based: all candidates for the current expected @@ -114,7 +123,7 @@ async fn replay_staged_torrents(params: &OrphanCheckup2, wallet_key: &str) -> Re expected_height, ¶ms.db, torrent, - wallet_key, + wallet.clone(), params.map.clone(), true, true, @@ -132,23 +141,17 @@ async fn replay_staged_torrents(params: &OrphanCheckup2, wallet_key: &str) -> Re }; set_torrent_status(expected_height, &torrent_info_hash, status).await; if advanced_height { - mark_other_torrent_statuses_invalid(expected_height, &torrent_info_hash) - .await; break; } } Err(err) => { - if should_retry_staged_candidate(&err) { + let status = staged_candidate_status_for_error(&err); + if status != TorrentStatus::Invalid { retryable_pending = true; // Piece availability is not proof that the candidate // lost the block fight; leave it pending so a later // orphan pass can retry after more peers stage it. - set_torrent_status( - expected_height, - &torrent_info_hash, - TorrentStatus::Pending, - ) - .await; + set_torrent_status(expected_height, &torrent_info_hash, status).await; } else { set_torrent_status( expected_height, @@ -177,7 +180,7 @@ async fn replay_staged_torrents(params: &OrphanCheckup2, wallet_key: &str) -> Re } } -pub async fn sync_checkup(params: OrphanCheckup2, wallet_key: &str) -> Result<(), String> { +async fn sync_checkup_pass(params: &OrphanCheckup2, wallet: Arc) -> Result<(), String> { // bootstrap missing genesis first so the normal orphan // correction logic can operate against a valid local chain if params.local_height == 0 && !genesis_checkup().await { @@ -187,7 +190,7 @@ pub async fn sync_checkup(params: OrphanCheckup2, wallet_key: &str) -> Result<() params.map.clone(), params.stream.clone(), params.db.clone(), - wallet_key, + wallet.clone(), params.connections_key.clone(), ) .await; @@ -203,11 +206,11 @@ pub async fn sync_checkup(params: OrphanCheckup2, wallet_key: &str) -> Result<() }; // snapshot verification can trigger an immediate rollback // if a trusted checkpoint no longer matches local state - if !snapshot_verified(undo_transactions_params, wallet_key).await { + if !snapshot_verified(undo_transactions_params, wallet.clone()).await { // A snapshot rollback already happened, so replay staged torrents and // exit instead of running the near-tip rules against stale heights. let mut replay_waiting = false; - match replay_staged_torrents(¶ms, wallet_key).await { + match replay_staged_torrents(params, wallet.clone()).await { Ok(()) => {} Err(err) => { replay_waiting = should_retry_staged_candidate(&err); @@ -219,9 +222,6 @@ pub async fn sync_checkup(params: OrphanCheckup2, wallet_key: &str) -> Result<() "[orphan] replay is waiting for block data; leaving candidates pending for a later pass" ); } - if !params.node_syncing { - end_reorg_lock(); - } return Ok(()); } // run the two orphan rules in order, then replay any staged @@ -229,17 +229,18 @@ pub async fn sync_checkup(params: OrphanCheckup2, wallet_key: &str) -> Result<() let checkup_params = CheckUp { local_height: params.local_height, remote_height: params.remote_height, + recheck_from_height: params.recheck_from_height, db: params.db.clone(), stream: params.stream.clone(), map: params.map.clone(), node_syncing: params.node_syncing, connections_key: params.connections_key.clone(), }; - deep_sync_rollback(checkup_params.clone(), wallet_key).await; + deep_sync_rollback(checkup_params.clone(), wallet.clone()).await; let mut replay_waiting = false; let height_before_window_check = get_height(¶ms.db); - match orphan_window_check(checkup_params, wallet_key).await { + match orphan_window_check(checkup_params, wallet.clone()).await { Ok(()) => {} Err(err) => { if should_retry_staged_candidate(&err) @@ -252,7 +253,7 @@ pub async fn sync_checkup(params: OrphanCheckup2, wallet_key: &str) -> Result<() } let height_before_replay = get_height(¶ms.db); - match replay_staged_torrents(¶ms, wallet_key).await { + match replay_staged_torrents(params, wallet.clone()).await { Ok(()) => {} Err(err) => { replay_waiting |= should_retry_staged_candidate(&err); @@ -267,9 +268,53 @@ pub async fn sync_checkup(params: OrphanCheckup2, wallet_key: &str) -> Result<() "[orphan] replay is waiting for block data; leaving candidates pending for a later pass" ); } + info!("[orphan] orphan check pass completed"); + Ok(()) +} + +pub async fn sync_checkup(mut params: OrphanCheckup2, wallet: Arc) -> Result<(), String> { + let result = loop { + match sync_checkup_pass(¶ms, wallet.clone()).await { + Ok(()) => {} + Err(err) => break Err(err), + } + + let Some(recheck_height) = take_orphan_recheck_height() else { + break Ok(()); + }; + + let local_height = get_height(¶ms.db); + let remote_height = match request_remote_height( + params.stream.clone(), + params.map.clone(), + params.connections_key.clone(), + ) + .await + { + Ok(height) => height, + Err(err) => { + warn!("[orphan] failed to refresh remote height before queued recheck: {err}"); + params.remote_height + } + }; + + params.local_height = local_height; + params.remote_height = remote_height.max(recheck_height).max(local_height); + params.recheck_from_height = Some(recheck_height); + + warn!( + "[orphan] running queued orphan recheck: local_height={} remote_height={} queued_height={}", + params.local_height, params.remote_height, recheck_height + ); + }; + if !params.node_syncing { end_reorg_lock(); } - info!("[orphan] orphan check completed"); - Ok(()) + + if result.is_ok() { + info!("[orphan] orphan check completed"); + } + + result } diff --git a/src/orphans/undo_block_transactions.rs b/src/orphans/undo_block_transactions.rs index 194b15b..6aa4ab5 100644 --- a/src/orphans/undo_block_transactions.rs +++ b/src/orphans/undo_block_transactions.rs @@ -20,8 +20,13 @@ use crate::records::block_height::get_block_height::get_height; use crate::records::memory::network_mapping::NodeInfo; use crate::records::memory::torrent_status::reset_all_torrent_statuses; use crate::records::unpack_block::load_by_block_number::load_block; +use crate::wallets::structures::Wallet; +use crate::Arc; -pub async fn undo_transactions(params: UndoTransactions, wallet_key: &str) -> Result<(), String> { +pub async fn undo_transactions( + params: UndoTransactions, + wallet: Arc, +) -> Result<(), String> { // walk backward from the current tip to the selected // rollback height and undo each block in reverse order let true_start_height = params.start_height; @@ -163,7 +168,7 @@ pub async fn undo_transactions(params: UndoTransactions, wallet_key: &str) -> Re // rebuild mined counts after rollback, then fetch and save the // replacement blocks, and finally rebuild mined counts again NodeInfo::rebuild_mined_counts_from_chain(¶ms.db).await?; - save_new_blocks(¶ms, replay_to_height, wallet_key, true_start_height).await?; + save_new_blocks(¶ms, replay_to_height, wallet, true_start_height).await?; NodeInfo::rebuild_mined_counts_from_chain(¶ms.db).await?; Ok(()) } diff --git a/src/records/ip_score/ban_management.rs b/src/records/ip_score/ban_management.rs index e2f4369..865d7d9 100644 --- a/src/records/ip_score/ban_management.rs +++ b/src/records/ip_score/ban_management.rs @@ -1,22 +1,15 @@ use crate::common::skein::skein_256_hash_data; -use crate::log::error; use crate::records::ip_score::get_score::get_ip_score_timestamp; use crate::rpc::commands::unblock_peer_ip::unblock_peer; use crate::sled::Db; use crate::sleep; use crate::wallets::structures::Wallet; +use crate::Arc; use crate::Duration; -pub async fn sign_ip_to_ban(ip: &str, wallet_key: &str) -> String { +pub async fn sign_ip_to_ban(ip: &str, wallet: &Wallet) -> String { // Ban and unban operations reuse the wallet signature flow so peer actions // can be authenticated by other nodes. - let wallet = match Wallet::try_obtain_wallet(wallet_key.to_string(), None).await { - Ok(wallet) => wallet, - Err(err) => { - error!("Wallet decryption failed while signing IP ban: {err}"); - return String::new(); - } - }; let privkey = &wallet.saved.private_key; // The signature is over the IP hash, not the raw IP string. let ip_hash = skein_256_hash_data(ip); @@ -24,12 +17,12 @@ pub async fn sign_ip_to_ban(ip: &str, wallet_key: &str) -> String { signature } -pub fn spawn_unban(db: Db, ip: String, signature: String, wallet_key: String, duration: Duration) { +pub fn spawn_unban(db: Db, ip: String, signature: String, wallet: Arc, duration: Duration) { // Timed unbans are scheduled in the background so temporary bans can expire // automatically without blocking the caller. tokio::spawn(async move { sleep(duration).await; - unblock_peer(&db, ip.to_string(), signature, wallet_key.to_string()).await; + unblock_peer(&db, ip.to_string(), signature, wallet).await; }); } diff --git a/src/records/ip_score/penalty.rs b/src/records/ip_score/penalty.rs index 3cfa781..a457972 100644 --- a/src/records/ip_score/penalty.rs +++ b/src/records/ip_score/penalty.rs @@ -1,105 +1,83 @@ -use crate::log::warn; -use crate::records::ip_score::ban_management::{sign_ip_to_ban, spawn_unban}; -use crate::records::memory::connections::CONNECTIONS; -use crate::records::memory::enums::ClientType; -use crate::rpc::commands::block_peer_ip::block_peer; -use crate::sled::Db; -use crate::Duration; - -pub async fn issue_penalty( - score: u8, - ip: &str, - client_type: &str, - wallet_key: &str, - db: &Db, -) -> String { +use crate::log::warn; +use crate::records::ip_score::ban_management::{sign_ip_to_ban, spawn_unban}; +use crate::records::memory::connections::CONNECTIONS; +use crate::records::memory::enums::ClientType; +use crate::rpc::commands::block_peer_ip::block_peer; +use crate::sled::Db; +use crate::wallets::structures::Wallet; +use crate::Arc; +use crate::Duration; + +pub async fn issue_penalty( + score: u8, + ip: &str, + client_type: &str, + wallet: Arc, + db: &Db, +) -> String { // Penalties only matter for active known connections, so resolve the // reported client type before taking action. let mut guard = CONNECTIONS.write().await; let Ok(client_type) = client_type.parse::() else { return "No action taken".to_string(); }; - - let signature = sign_ip_to_ban(ip, wallet_key).await; - if let Some(conn) = guard.as_mut() { - if let Some((connection_type, port)) = - conn.find_connection_info_by_client_type(ip, client_type) - { + + let signature = sign_ip_to_ban(ip, &wallet).await; + if let Some(conn) = guard.as_mut() { + if let Some((connection_type, port)) = + conn.find_connection_info_by_client_type(ip, client_type) + { // Higher scores escalate from a dropped connection to temporary and // then permanent bans. if score > 100 { warn!("[ip_score] permanently banning ip={ip} score={score}"); - block_peer( - db, - ip.to_string(), - signature.to_string(), - wallet_key.to_string(), - ) - .await; - return format!("IP {ip} permanently banned"); + block_peer(db, ip.to_string(), signature.to_string(), wallet.clone()).await; + return format!("IP {ip} permanently banned"); } else if score > 75 { warn!("[ip_score] banning ip={ip} duration=24h score={score}"); - - block_peer( - db, - ip.to_string(), - signature.to_string(), - wallet_key.to_string(), - ) - .await; - spawn_unban( - db.clone(), - ip.to_string(), - signature.to_string(), - wallet_key.to_string(), - Duration::from_secs(86400), - ); - return format!("IP {ip} banned for 24 hours"); + + block_peer(db, ip.to_string(), signature.to_string(), wallet.clone()).await; + spawn_unban( + db.clone(), + ip.to_string(), + signature.to_string(), + wallet.clone(), + Duration::from_secs(86400), + ); + return format!("IP {ip} banned for 24 hours"); } else if score > 50 { warn!("[ip_score] banning ip={ip} duration=1h score={score}"); - - block_peer( - db, - ip.to_string(), - signature.to_string(), - wallet_key.to_string(), - ) - .await; - spawn_unban( - db.clone(), - ip.to_string(), - signature.to_string(), - wallet_key.to_string(), - Duration::from_secs(3600), - ); - return format!("IP {ip} banned for 1 hour"); + + block_peer(db, ip.to_string(), signature.to_string(), wallet.clone()).await; + spawn_unban( + db.clone(), + ip.to_string(), + signature.to_string(), + wallet.clone(), + Duration::from_secs(3600), + ); + return format!("IP {ip} banned for 1 hour"); } else if score > 30 { warn!("[ip_score] banning ip={ip} duration=30m score={score}"); - - block_peer( - db, - ip.to_string(), - signature.to_string(), - wallet_key.to_string(), - ) - .await; - spawn_unban( - db.clone(), - ip.to_string(), - signature.to_string(), - wallet_key.to_string(), - Duration::from_secs(1800), - ); - return format!("IP {ip} banned for 30 minutes"); + + block_peer(db, ip.to_string(), signature.to_string(), wallet.clone()).await; + spawn_unban( + db.clone(), + ip.to_string(), + signature.to_string(), + wallet.clone(), + Duration::from_secs(1800), + ); + return format!("IP {ip} banned for 30 minutes"); } else if score > 10 { warn!("[ip_score] dropping connection ip={ip} score={score}"); - - // Low-level penalties disconnect the peer but do not add a ban - // record yet. - conn.drop_connection(connection_type, ip.to_string(), port); - return format!("IP {ip} dropped due to score {score}"); - } - } - } - "No action taken".to_string() -} + + // Low-level penalties disconnect the peer but do not add a ban + // record yet. + conn.drop_connection(connection_type, ip.to_string(), port); + return format!("IP {ip} dropped due to score {score}"); + } + } + } + "No action taken".to_string() +} diff --git a/src/records/ip_score/score.rs b/src/records/ip_score/score.rs index a2691ce..5408d1d 100644 --- a/src/records/ip_score/score.rs +++ b/src/records/ip_score/score.rs @@ -4,6 +4,8 @@ use crate::records::ip_score::enums::InfractionType; use crate::records::ip_score::get_score::get_ip_score_timestamp; use crate::records::ip_score::penalty::issue_penalty; use crate::sled::Db; +use crate::wallets::structures::Wallet; +use crate::Arc; use crate::Duration; fn score_subject(ip: &str, client_type: &str) -> String { @@ -22,7 +24,7 @@ pub async fn update_ip_score( infraction_type: InfractionType, timestamp: u32, db: &Db, - wallet_key: &str, + wallet: Arc, ) -> sled::Result<()> { // Convert the incoming event into a new score and persist the latest // score/timestamp pair before penalty handling runs. @@ -52,7 +54,7 @@ pub async fn update_ip_score( // Penalty handling is driven from the updated score so actions like // temporary bans always reflect the most recent infraction state. - let action = issue_penalty(score, ip, client_type, wallet_key, db).await; + let action = issue_penalty(score, ip, client_type, wallet, db).await; if action != "No action taken" { warn!("[ip_score] penalty ip={ip} client_type={client_type} subject={subject} infraction={infraction_type:?} previous_score={previous_score} new_score={score} action={action}"); } diff --git a/src/records/memory/averages.rs b/src/records/memory/averages.rs index 386a979..5864a34 100644 --- a/src/records/memory/averages.rs +++ b/src/records/memory/averages.rs @@ -1,12 +1,11 @@ -use crate::blocks::block::DIFFICULTY_OFFSET; -use crate::common::network_paths_and_settings::block_extension_and_paths; -use crate::lazy_static; -use crate::Duration; -use crate::HashMap; -use crate::Mutex; -use crate::PathBuf; +use crate::blocks::block::DIFFICULTY_OFFSET; +use crate::common::network_paths_and_settings::block_extension_and_paths; +use crate::lazy_static; +use crate::HashMap; +use crate::Mutex; +use crate::PathBuf; -pub const DIFFICULTY_AVERAGE_WINDOW: u32 = 254; +pub const DIFFICULTY_AVERAGE_WINDOW: u32 = 50; lazy_static! { static ref AVERAGE_DATA: Mutex> = Mutex::new(HashMap::new()); @@ -114,48 +113,14 @@ pub async fn update_block_data(block_num: u32) { } } -async fn calculate_time_differences(latest_timestamp: u32) -> Vec { - // Build the interval list from the cached block timestamps plus - // the candidate block timestamp being evaluated right now. - let cache = AVERAGE_DATA.lock().await; - let mut timestamps: Vec<_> = cache.values().map(|&(timestamp, _)| timestamp).collect(); - - timestamps.push(latest_timestamp); - - timestamps.sort(); - - timestamps - .windows(2) - .map(|w| Duration::from_secs((w[1] - w[0]) as u64)) - .collect() -} - -async fn calculate_mean_difficulty() -> u64 { - // Difficulty smoothing uses the rolling mean of the cached prior - // block difficulties rather than just the current tip value. - let cache = AVERAGE_DATA.lock().await; - let difficulties: Vec<_> = cache.values().map(|&(_, difficulty)| difficulty).collect(); - - if difficulties.is_empty() { - 0 - } else { - let total: u128 = difficulties - .iter() - .map(|&difficulty| difficulty as u128) - .sum(); - let average = total / difficulties.len() as u128; - average.min(u64::MAX as u128) as u64 - } -} - -pub async fn calculate_averages(current_timestamp: u32) -> (u64, Duration) { - // Combine the rolling time intervals and rolling mean difficulty - // into the aggregate inputs used by difficulty adjustment. - let time_differences = calculate_time_differences(current_timestamp).await; - let total_duration: Duration = time_differences.iter().sum(); - let average_duration = total_duration / (time_differences.len() as u32); - - let mean_difficulty = calculate_mean_difficulty().await; - - (mean_difficulty, average_duration) -} +pub async fn asert_anchor() -> Option<(u32, u32, u64)> { + // ASERT uses the oldest cached canonical block as its reference point. + // The cache is already rebuilt after startup and rollback, and it is + // trimmed to the active difficulty window. + let cache = AVERAGE_DATA.lock().await; + cache + .iter() + .filter(|(_, (_, difficulty))| *difficulty > 0) + .min_by_key(|(height, _)| *height) + .map(|(height, (timestamp, difficulty))| (*height, *timestamp, *difficulty)) +} diff --git a/src/records/memory/connections.rs b/src/records/memory/connections.rs index e82d443..1aa33ce 100644 --- a/src/records/memory/connections.rs +++ b/src/records/memory/connections.rs @@ -40,7 +40,7 @@ use crate::records::memory::structs::{ConnectionInfo, ConnectionKey}; #[derive(Clone)] struct ReconnectContext { db: Db, - wallet_key: String, + wallet: Arc, map: Arc>, } @@ -62,15 +62,11 @@ fn finish_reconnect() { RECONNECT_IN_PROGRESS.store(false, AtomicOrdering::SeqCst); } -pub async fn set_reconnect_context(db: Db, wallet_key: String, map: Arc>) { +pub async fn set_reconnect_context(db: Db, wallet: Arc, map: Arc>) { let mut context = RECONNECT_CONTEXT.lock().await; // Store enough state for later liveness checks to reconnect without // needing the original startup stack. - *context = Some(ReconnectContext { - db, - wallet_key, - map, - }); + *context = Some(ReconnectContext { db, wallet, map }); } async fn reconnect_dropped_outgoing(excluded_ip: &str) { @@ -118,7 +114,7 @@ async fn reconnect_dropped_outgoing(excluded_ip: &str) { let bootstrap_params = BootstrapParams { stream, connections_key, - wallet_key: context.wallet_key, + wallet: context.wallet, db: context.db, map: context.map, first: false, @@ -577,6 +573,30 @@ pub async fn peer_connection_count() -> usize { .unwrap_or(0) } +pub async fn live_miner_peer_streams() -> Vec<(String, Arc>)> { + // Snapshot consensus and recovery checks vote only across currently + // connected miner peers, regardless of incoming/outgoing direction. + CONNECTIONS + .read() + .await + .as_ref() + .map(|connection| { + connection + .connection_map + .iter() + .filter_map(|(key, info)| { + if ClientType::from_bytes(&info.client_type) != Some(ClientType::Miner) { + return None; + } + let ip = binary_to_ip(key.ip.clone()); + let connections_key = format!("{}:{}", ip, key.port); + Some((connections_key, Arc::clone(&info.stream))) + }) + .collect() + }) + .unwrap_or_default() +} + pub async fn get_client_type_from_memory(key: &str) -> Option { // Recover the stored client role from the serialized connection key // used throughout the RPC layer. diff --git a/src/records/memory/network_mapping/add.rs b/src/records/memory/network_mapping/add.rs index 7f7e899..0c5c0ba 100644 --- a/src/records/memory/network_mapping/add.rs +++ b/src/records/memory/network_mapping/add.rs @@ -78,7 +78,7 @@ impl NodeInfo { mut blocks_mined, remote_ip, db, - wallet_key, + wallet, connections_key, } = params; let current_timestamp = Utc::now().timestamp_millis() as u64; @@ -92,17 +92,9 @@ impl NodeInfo { // current timestamp so they can be propagated as fresh node events. if edit.ip == remote_ip { edit.modified_timestamp = current_timestamp; - let wallet = match Wallet::try_obtain_wallet(wallet_key.clone(), None).await { - Ok(wallet) => wallet, - Err(err) => { - error!("Wallet decryption failed while adding node address: {err}"); - return RpcResponse::Binary(b"Error: Wallet decryption failed".to_vec()); - } - }; - edit.modified_by = wallet.saved.long_address; + edit.modified_by = wallet.saved.long_address.clone(); edit.modified_signature = - Self::added_signature(&edit.address, &edit.ip, current_timestamp, &wallet_key) - .await; + Self::added_signature(&edit.address, &edit.ip, current_timestamp, &wallet).await; } if !remote_ip.is_empty() { @@ -229,7 +221,7 @@ impl NodeInfo { InfractionType::BadMinerIpUpdate, now, &db, - &wallet_key, + wallet.clone(), ) .await; return RpcResponse::Binary(b"Error: Ip Already exists.".to_vec()); @@ -252,7 +244,7 @@ impl NodeInfo { edit, remote_ip, db, - wallet_key, + wallet, connections_key, }); } diff --git a/src/records/memory/network_mapping/delete.rs b/src/records/memory/network_mapping/delete.rs index 2de2f60..c887fb6 100644 --- a/src/records/memory/network_mapping/delete.rs +++ b/src/records/memory/network_mapping/delete.rs @@ -8,7 +8,7 @@ impl NodeInfo { edit, remote_ip, db, - wallet_key, + wallet, connections_key, } = params; let task_addr = edit.address.clone(); @@ -18,7 +18,7 @@ impl NodeInfo { let added_timestamp = edit.modified_timestamp; let task_remote_ip = remote_ip; - let task_wallet_key = wallet_key; + let task_signing_wallet = wallet; let task_connections_key = connections_key; let task_db = db.clone(); @@ -107,7 +107,7 @@ impl NodeInfo { }, remote_ip: task_remote_ip.clone(), db: task_db.clone(), - wallet_key: task_wallet_key.clone(), + wallet: task_signing_wallet.clone(), connections_key: task_connections_key.clone(), }) .await; @@ -134,7 +134,7 @@ impl NodeInfo { mut edit, remote_ip, db, - wallet_key, + wallet, connections_key, } = params; let current_timestamp = Utc::now().timestamp_millis() as u64; @@ -143,17 +143,9 @@ impl NodeInfo { // before they are applied and broadcast. if remote_ip.is_empty() { edit.modified_timestamp = current_timestamp; - let wallet = match Wallet::try_obtain_wallet(wallet_key.clone(), None).await { - Ok(wallet) => wallet, - Err(err) => { - error!("Wallet decryption failed while deleting node address: {err}"); - return RpcResponse::Binary(b"Error: Wallet decryption failed".to_vec()); - } - }; - edit.modified_by = wallet.saved.long_address; + edit.modified_by = wallet.saved.long_address.clone(); edit.modified_signature = - Self::added_signature(&edit.address, &edit.ip, current_timestamp, &wallet_key) - .await; + Self::added_signature(&edit.address, &edit.ip, current_timestamp, &wallet).await; } let data = format!( @@ -284,7 +276,7 @@ impl NodeInfo { let bootstrap_params = BootstrapParams { stream: new_stream, connections_key, - wallet_key: wallet_key.clone(), + wallet: wallet.clone(), db: db.clone(), map: map.clone(), first: false, diff --git a/src/records/memory/network_mapping/queries.rs b/src/records/memory/network_mapping/queries.rs index a159028..dd38f34 100644 --- a/src/records/memory/network_mapping/queries.rs +++ b/src/records/memory/network_mapping/queries.rs @@ -109,22 +109,15 @@ impl NodeInfo { address: &str, ip: &str, current_timestamp: u64, - wallet_key: &str, + wallet: &Arc, ) -> String { // Node edits are signed over address, IP, signer, and timestamp // so peers can independently verify the advertised change. - let wallet = match Wallet::try_obtain_wallet(wallet_key.to_string(), None).await { - Ok(wallet) => wallet, - Err(err) => { - error!("Wallet decryption failed while signing node edit: {err}"); - return String::new(); - } - }; - let added_by = wallet.saved.long_address; - let private_key = wallet.saved.private_key; + let added_by = wallet.saved.long_address.clone(); + let private_key = &wallet.saved.private_key; let data = format!("{address}{ip}{added_by}{current_timestamp}"); let hashed_data = skein_256_hash_data(&data); - Wallet::sign_transaction(&hashed_data, &private_key).await + Wallet::sign_transaction(&hashed_data, private_key).await } } diff --git a/src/records/memory/network_mapping/structs.rs b/src/records/memory/network_mapping/structs.rs index f617f0a..2b756fd 100644 --- a/src/records/memory/network_mapping/structs.rs +++ b/src/records/memory/network_mapping/structs.rs @@ -41,7 +41,7 @@ pub struct AddAddressParams { pub blocks_mined: u8, pub remote_ip: String, pub db: Db, - pub wallet_key: String, + pub wallet: Arc, pub connections_key: String, } @@ -52,7 +52,7 @@ pub struct DeleteAddressParams { pub edit: SignedNodeEdit, pub remote_ip: String, pub db: Db, - pub wallet_key: String, + pub wallet: Arc, pub connections_key: String, } @@ -63,6 +63,6 @@ pub struct PingMonitorParams { pub edit: SignedNodeEdit, pub remote_ip: String, pub db: Db, - pub wallet_key: String, + pub wallet: Arc, pub connections_key: String, } diff --git a/src/records/memory/torrent_status.rs b/src/records/memory/torrent_status.rs index 09fdcd6..3bcfd28 100644 --- a/src/records/memory/torrent_status.rs +++ b/src/records/memory/torrent_status.rs @@ -3,6 +3,7 @@ use crate::{lazy_static, HashMap, Mutex}; #[derive(Clone, Copy, Debug, PartialEq, Eq)] pub enum TorrentStatus { Pending, + MissingParent, Valid, Invalid, } @@ -55,18 +56,3 @@ pub async fn reset_all_torrent_statuses() { *status = TorrentStatus::Pending; } } - -pub async fn mark_other_torrent_statuses_invalid(height: u32, selected_info_hash: &str) { - let mut map = TORRENT_STATUS.lock().await; - let prefix = format!("{height}:"); - for (key, status) in map.iter_mut() { - // Once one candidate wins a height, every other remembered candidate - // at that same height has been checked and lost. - let Some((_, info_hash)) = key.split_once(':') else { - continue; - }; - if key.starts_with(&prefix) && info_hash != selected_info_hash { - *status = TorrentStatus::Invalid; - } - } -} diff --git a/src/records/record_chain/save.rs b/src/records/record_chain/save.rs index 7329dfe..bf3f275 100644 --- a/src/records/record_chain/save.rs +++ b/src/records/record_chain/save.rs @@ -9,7 +9,7 @@ use crate::miner::flag::{ use crate::orphans::snapshot_check::{snapshot_height, update_snapshot}; use crate::records::block_height::get_block_height::get_height; use crate::records::block_height::increase_block_height::increase_height; -use crate::records::memory::averages::{calculate_averages, update_block_data}; +use crate::records::memory::averages::{asert_anchor, update_block_data}; use crate::records::memory::mempool::{ apply_selected_transaction_math, mark_processed_by_signatures, mark_selected_transactions_processed, restore_processed_by_signatures, @@ -172,6 +172,7 @@ pub async fn save_block(params: SaveBlockParams) -> Result<(), String> { difficulty: previous_difficulty, timestamp, header_hash: &header_hash, + previous_hash, save_type: save_type.clone(), miner: miner.clone(), map, @@ -191,6 +192,7 @@ pub async fn save_block(params: SaveBlockParams) -> Result<(), String> { difficulty: previous_difficulty, timestamp, header_hash: &header_hash, + previous_hash, save_type, miner: miner.clone(), map, @@ -216,16 +218,21 @@ async fn log_saved_block_difficulty( } update_block_data(block_number - 1).await; - let (difficulty_average, average_duration) = calculate_averages(timestamp).await; + let Some((anchor_height, anchor_timestamp, anchor_difficulty)) = asert_anchor().await else { + info!( + "[difficulty] saved_block={block_number} timestamp={timestamp} current_difficulty={current_difficulty} new_difficulty={new_difficulty} asert_anchor=missing" + ); + return; + }; + + let elapsed_seconds = timestamp.saturating_sub(anchor_timestamp); + let expected_seconds = block_number + .saturating_sub(anchor_height) + .saturating_mul(15); + let error_seconds = elapsed_seconds as i64 - expected_seconds as i64; info!( - "[difficulty] saved_block={} timestamp={} average_time_seconds={:.2} average_difficulty={} current_difficulty={} new_difficulty={}", - block_number, - timestamp, - average_duration.as_secs_f64(), - difficulty_average, - current_difficulty, - new_difficulty + "[difficulty] saved_block={block_number} timestamp={timestamp} target_seconds=15 anchor_height={anchor_height} elapsed_seconds={elapsed_seconds} expected_seconds={expected_seconds} error_seconds={error_seconds} anchor_difficulty={anchor_difficulty} current_difficulty={current_difficulty} new_difficulty={new_difficulty}" ); } @@ -243,6 +250,7 @@ async fn save_binary_data_with_mempool_stream( difficulty, timestamp, header_hash, + previous_hash, save_type, miner, map, @@ -307,8 +315,8 @@ async fn save_binary_data_with_mempool_stream( difficulty, timestamp, header_hash, + previous_hash, miner.clone(), - map.clone(), ) .await { @@ -336,7 +344,7 @@ async fn save_binary_data_with_mempool_stream( cleanup_block_file(&file_name); return Err(err); } - let _ = update_snapshot(db, next_number).await; + let _ = update_snapshot(db, next_number, map.clone()).await; if let Some(snapshot_height) = snapshot_height(db).await { if let Err(err) = finalize_rewards_through_height(db, snapshot_height).await { error!( @@ -348,7 +356,7 @@ async fn save_binary_data_with_mempool_stream( let _ = prune_staged_torrents(snapshot_height).await; } } else { - let _ = update_snapshot(db, next_number).await; + let _ = update_snapshot(db, next_number, map.clone()).await; } if !is_syncing_mode() { @@ -376,6 +384,7 @@ async fn save_binary_data(params: SaveBinaryDataParams<'_>) -> Result<(), String difficulty, timestamp, header_hash, + previous_hash, save_type, miner, map, @@ -437,8 +446,8 @@ async fn save_binary_data(params: SaveBinaryDataParams<'_>) -> Result<(), String difficulty, timestamp, header_hash, + previous_hash, miner.clone(), - map.clone(), ) .await { @@ -466,7 +475,7 @@ async fn save_binary_data(params: SaveBinaryDataParams<'_>) -> Result<(), String cleanup_block_file(&file_name); return Err(err); } - let _ = update_snapshot(db, next_number).await; + let _ = update_snapshot(db, next_number, map.clone()).await; if let Some(snapshot_height) = snapshot_height(db).await { if let Err(err) = finalize_rewards_through_height(db, snapshot_height).await { error!( @@ -478,7 +487,7 @@ async fn save_binary_data(params: SaveBinaryDataParams<'_>) -> Result<(), String let _ = prune_staged_torrents(snapshot_height).await; } } else { - let _ = update_snapshot(db, next_number).await; + let _ = update_snapshot(db, next_number, map.clone()).await; } if !is_syncing_mode() { diff --git a/src/records/record_chain/structs.rs b/src/records/record_chain/structs.rs index 8c8615f..7cf84d6 100644 --- a/src/records/record_chain/structs.rs +++ b/src/records/record_chain/structs.rs @@ -31,6 +31,7 @@ pub struct SaveBinaryDataWithMempoolStreamParams<'a> { pub difficulty: u64, pub timestamp: u32, pub header_hash: &'a str, + pub previous_hash: &'a str, pub save_type: SaveType, pub miner: String, pub map: Arc>, @@ -49,6 +50,7 @@ pub struct SaveBinaryDataParams<'a> { pub difficulty: u64, pub timestamp: u32, pub header_hash: &'a str, + pub previous_hash: &'a str, pub save_type: SaveType, pub miner: String, pub map: Arc>, diff --git a/src/rpc/client/block_hash_vote.rs b/src/rpc/client/block_hash_vote.rs new file mode 100644 index 0000000..b8fef4d --- /dev/null +++ b/src/rpc/client/block_hash_vote.rs @@ -0,0 +1,42 @@ +use crate::encode; +use crate::records::memory::response_channels::{reserve_entry, Command}; +use crate::rpc::command_maps::RPC_BLOCK_HASH_AT_HEIGHT; +use crate::rpc::responses::RpcResponse; +use crate::{timeout, Arc, Duration, Mutex, TcpStream}; + +pub async fn request_block_hash_at_height( + stream: Arc>, + map: Arc>, + connections_key: String, + height: u32, +) -> Result { + let (hashmap_key, _tx, rx) = reserve_entry(map).await; + + let mut message = vec![RPC_BLOCK_HASH_AT_HEIGHT]; + message.extend_from_slice(&hashmap_key); + message.extend_from_slice(&height.to_le_bytes()); + + RpcResponse::send_raw(&stream, Some(&connections_key), &message).await; + + let mut rx = rx.lock().await; + let buffer = timeout(Duration::from_secs(5), rx.recv()) + .await + .map_err(|_| format!("Timed out waiting for block hash vote at height {height}"))? + .ok_or_else(|| "No block hash vote response received".to_string())?; + + if let Ok(response_text) = String::from_utf8(buffer.clone()) { + let trimmed = response_text.trim(); + if trimmed.starts_with("error:") { + return Err(trimmed.to_string()); + } + } + + if buffer.len() != 32 { + return Err(format!( + "Invalid block hash vote length: expected 32, got {}", + buffer.len() + )); + } + + Ok(encode(buffer)) +} diff --git a/src/rpc/client/handshake.rs b/src/rpc/client/handshake.rs index 3ff2bc4..488bc9d 100644 --- a/src/rpc/client/handshake.rs +++ b/src/rpc/client/handshake.rs @@ -5,7 +5,6 @@ use crate::rpc::client::handshake_processing::process_handshake_response; use crate::rpc::client::structs::{Connect, Handshake}; use crate::rpc::command_maps::{MAX_RPC_REPLY_BYTES, RPC_REPLY}; use crate::rpc::handshake_constants::HANDSHAKE_RESPONSE_BYTES; -use crate::wallets::structures::Wallet; use crate::IpAddr; use crate::SocketAddr; use crate::TcpStream; @@ -31,7 +30,7 @@ pub async fn connect_and_handshake(params: Connect) -> Result<(), Box io::Error { } async fn perform_handshake(mut params: Handshake) -> io::Result<()> { - let wallet = Wallet::try_obtain_wallet(params.wallet_key.clone(), None) - .await - .map_err(io::Error::other)?; - + let wallet = params.wallet.clone(); let data = prepare_handshake_message(&wallet, "aced").await?; let response = send_and_receive_handshake(&mut params, &data).await?; process_handshake_response(response, &wallet, params).await diff --git a/src/rpc/client/handshake_processing.rs b/src/rpc/client/handshake_processing.rs index 3676236..a68747f 100644 --- a/src/rpc/client/handshake_processing.rs +++ b/src/rpc/client/handshake_processing.rs @@ -46,7 +46,7 @@ use crate::TcpStream; pub struct BootstrapParams { pub stream: Arc>, pub connections_key: String, - pub wallet_key: String, + pub wallet: Arc, pub db: Db, pub map: Arc>, pub first: bool, @@ -130,7 +130,7 @@ pub async fn bootstrap_peer_discovery(mut params: BootstrapParams) -> Result<(), let connect = Connect { addr: socket_addr, node_ip: addr_string.clone(), - wallet_key: params.wallet_key.clone(), + wallet: params.wallet.clone(), db: params.db.clone(), map: params.map.clone(), first: params.first, @@ -166,7 +166,7 @@ pub async fn bootstrap_peer_discovery(mut params: BootstrapParams) -> Result<(), remote_height, params.map.clone(), true, - ¶ms.wallet_key, + params.wallet.clone(), current_key.clone(), ) .await @@ -200,11 +200,12 @@ pub async fn bootstrap_peer_discovery(mut params: BootstrapParams) -> Result<(), db: params.db.clone(), local_height: post_sync_local_height, remote_height: post_sync_remote_height, + recheck_from_height: Some(post_sync_local_height.min(post_sync_remote_height)), map: params.map.clone(), node_syncing: true, connections_key: current_key.clone(), }; - match sync_checkup(orphan_checkup_params, ¶ms.wallet_key).await { + match sync_checkup(orphan_checkup_params, params.wallet.clone()).await { Ok(()) => {} Err(err) => warn!("[sync] Post-sync orphan check error: {err}"), } @@ -280,12 +281,7 @@ pub async fn process_handshake_response( if socket_parts.len() == 2 { let ip = socket_parts[0]; let port: u16 = socket_parts[1].parse().unwrap_or(0); - set_reconnect_context( - params.db.clone(), - params.wallet_key.clone(), - params.map.clone(), - ) - .await; + set_reconnect_context(params.db.clone(), params.wallet.clone(), params.map.clone()).await; let mut conn = CONNECTIONS.write().await; if let Some(manager) = conn.as_mut() { if !manager.store_connection(StoreConnectionParams { @@ -308,7 +304,7 @@ pub async fn process_handshake_response( listener_stream, params.db.clone(), connections_key.clone(), - params.wallet_key.clone(), + params.wallet.clone(), params.map.clone(), )); let broadcast_stream = Arc::clone(&stream); @@ -341,7 +337,7 @@ pub async fn process_handshake_response( &wallet.saved.short_address.clone(), params.map.clone(), ¶ms.db.clone(), - ¶ms.wallet_key.clone(), + params.wallet.clone(), &connections_key, ) .await; @@ -351,7 +347,7 @@ pub async fn process_handshake_response( let bsparams = BootstrapParams { stream: Arc::clone(&stream), connections_key: connections_key.clone(), - wallet_key: params.wallet_key.clone(), + wallet: params.wallet.clone(), db: params.db.clone(), map: params.map.clone(), first: params.first, @@ -364,7 +360,7 @@ pub async fn process_handshake_response( &wallet.saved.short_address.clone(), params.map.clone(), ¶ms.db.clone(), - ¶ms.wallet_key.clone(), + params.wallet.clone(), &connections_key, ) .await; diff --git a/src/rpc/client/mod.rs b/src/rpc/client/mod.rs index 82f2d47..93c2f86 100644 --- a/src/rpc/client/mod.rs +++ b/src/rpc/client/mod.rs @@ -1,4 +1,5 @@ // The rpc client module contains the standalone client-side handshake and sync helpers. +pub mod block_hash_vote; pub mod handshake; pub mod handshake_message; pub mod handshake_processing; diff --git a/src/rpc/client/structs.rs b/src/rpc/client/structs.rs index 9d1f232..00e7821 100644 --- a/src/rpc/client/structs.rs +++ b/src/rpc/client/structs.rs @@ -1,5 +1,6 @@ use crate::records::memory::response_channels::Command; use crate::sled::Db; +use crate::wallets::structures::Wallet; use crate::Arc; use crate::Mutex; use crate::SocketAddr; @@ -10,7 +11,7 @@ pub struct Connect { pub addr: SocketAddr, pub db: Db, pub node_ip: String, - pub wallet_key: String, + pub wallet: Arc, pub map: Arc>, pub first: bool, } @@ -20,7 +21,7 @@ pub struct Handshake { pub stream: TcpStream, pub db: Db, pub addr: String, - pub wallet_key: String, + pub wallet: Arc, pub map: Arc>, pub first: bool, } diff --git a/src/rpc/client/syncing.rs b/src/rpc/client/syncing.rs index 060546b..43c9bae 100644 --- a/src/rpc/client/syncing.rs +++ b/src/rpc/client/syncing.rs @@ -12,6 +12,7 @@ use crate::torrent::structs::Torrent; use crate::torrent::torrenting_system::torrent_requests::{ handle_response_and_save_torrent, send_request_torrent_message, }; +use crate::wallets::structures::Wallet; use crate::Arc; use crate::Duration; use crate::Mutex; @@ -23,7 +24,7 @@ pub async fn node_syncing( remote_height: u32, map: Arc>, node_syncing: bool, - wallet_key: &str, + wallet: Arc, connections_key: String, ) -> io::Result<()> { let mut local_height = get_height(db); @@ -76,7 +77,7 @@ pub async fn node_syncing( local_height, db, torrent, - wallet_key, + wallet.clone(), map.clone(), false, false, @@ -96,12 +97,13 @@ pub async fn node_syncing( db: db.clone(), local_height, remote_height, + recheck_from_height: Some(local_height), map: map.clone(), node_syncing, connections_key: connections_key.clone(), }; - match sync_checkup(orphan_checkup_params, wallet_key).await { + match sync_checkup(orphan_checkup_params, wallet.clone()).await { Ok(()) => {} Err(err) => error!("[sync] orphan check returned error: {err}"), } diff --git a/src/rpc/command_maps.rs b/src/rpc/command_maps.rs index 332fda6..dc1e029 100644 --- a/src/rpc/command_maps.rs +++ b/src/rpc/command_maps.rs @@ -53,6 +53,7 @@ pub const RPC_REGISTER_WALLET: u8 = 38; pub const RPC_WALLET_REGISTRY_SYNC: u8 = 39; pub const RPC_VANITY_LOOKUP: u8 = 40; pub const RPC_TORRENT_CANDIDATES: u8 = 41; +pub const RPC_BLOCK_HASH_AT_HEIGHT: u8 = 42; pub const RPC_REPLY: u8 = 255; pub const MAX_RPC_REPLY_BYTES: usize = 64 * 1024 * 1024; diff --git a/src/rpc/commands/add_network_node.rs b/src/rpc/commands/add_network_node.rs index 782afbc..c5619f1 100644 --- a/src/rpc/commands/add_network_node.rs +++ b/src/rpc/commands/add_network_node.rs @@ -13,7 +13,7 @@ pub async fn add_network_node( connections_key: &str, stream_locked: Arc>, db: &Db, - wallet_key: &str, + wallet: Arc, map: Arc>, ) -> Result<(u32, RpcResponse), String> { // Command 28 carries the signed node-add payload directly after the @@ -63,7 +63,7 @@ pub async fn add_network_node( blocks_mined: 0_u8, remote_ip, db: db.clone(), - wallet_key: wallet_key.to_string(), + wallet, connections_key: connections_key.to_string(), }) .await; diff --git a/src/rpc/commands/bad_rpc_call.rs b/src/rpc/commands/bad_rpc_call.rs index faf5b8e..3027c62 100644 --- a/src/rpc/commands/bad_rpc_call.rs +++ b/src/rpc/commands/bad_rpc_call.rs @@ -2,9 +2,11 @@ use crate::records::ip_score::enums::InfractionType; use crate::records::ip_score::score::update_ip_score; use crate::records::memory::enums::ClientType; use crate::sled::Db; +use crate::wallets::structures::Wallet; +use crate::Arc; use crate::Utc; -pub async fn record(ip: &str, client_type: ClientType, db: &Db, wallet_key: &str) { +pub async fn record(ip: &str, client_type: ClientType, db: &Db, wallet: Arc) { let now = Utc::now().timestamp() as u32; let _ = update_ip_score( ip, @@ -12,7 +14,7 @@ pub async fn record(ip: &str, client_type: ClientType, db: &Db, wallet_key: &str InfractionType::BadRpcCall, now, db, - wallet_key, + wallet, ) .await; } diff --git a/src/rpc/commands/block_hash_at_height.rs b/src/rpc/commands/block_hash_at_height.rs new file mode 100644 index 0000000..6bdfcc1 --- /dev/null +++ b/src/rpc/commands/block_hash_at_height.rs @@ -0,0 +1,24 @@ +use crate::decode; +use crate::records::unpack_block::unpack_header::load_block_header; +use crate::rpc::responses::RpcResponse; + +pub async fn request_block_hash_at_height(block_number: u32) -> RpcResponse { + match load_block_header(block_number).await { + Ok(header) => { + let hash = header.hash().await; + match decode(&hash) { + Ok(bytes) => RpcResponse::Binary(bytes), + Err(err) => RpcResponse::Binary( + format!("error: Failed to decode block hash: {err}") + .as_bytes() + .to_vec(), + ), + } + } + Err(err) => RpcResponse::Binary( + format!("error: Failed to load block header at height {block_number}: {err}") + .as_bytes() + .to_vec(), + ), + } +} diff --git a/src/rpc/commands/block_peer_ip.rs b/src/rpc/commands/block_peer_ip.rs index 85763f6..dd3ecb2 100644 --- a/src/rpc/commands/block_peer_ip.rs +++ b/src/rpc/commands/block_peer_ip.rs @@ -1,19 +1,16 @@ use crate::rpc::responses::RpcResponse; use crate::sled::Db; use crate::wallets::structures::Wallet; +use crate::Arc; -pub async fn block_peer(db: &Db, ip: String, signature: String, wallet_key: String) -> RpcResponse { +pub async fn block_peer( + db: &Db, + ip: String, + signature: String, + wallet: Arc, +) -> RpcResponse { // Peer blocking is restricted to the local node owner, proven by a // signature from the locally loaded wallet over the target IP string. - let wallet = match Wallet::try_obtain_wallet(wallet_key, None).await { - Ok(wallet) => wallet, - Err(err) => { - let msg = format!("error: Wallet decryption failed: {err}") - .as_bytes() - .to_vec(); - return RpcResponse::Binary(msg); - } - }; if Wallet::verify_transaction(&ip, &signature, &wallet.saved.long_address).await { let tree = db.open_tree("blocked_peers").unwrap(); let key = ip.clone(); diff --git a/src/rpc/commands/delete_network_node.rs b/src/rpc/commands/delete_network_node.rs index b937a9d..0bf4ff0 100644 --- a/src/rpc/commands/delete_network_node.rs +++ b/src/rpc/commands/delete_network_node.rs @@ -13,7 +13,7 @@ pub async fn delete_network_node( connections_key: &str, stream_locked: Arc>, db: &Db, - wallet_key: &str, + wallet: Arc, map: Arc>, ) -> Result<(u32, RpcResponse), String> { // Command 29 uses the same signed node-edit shape as add, but the @@ -54,7 +54,7 @@ pub async fn delete_network_node( }, remote_ip, db: db.clone(), - wallet_key: wallet_key.to_string(), + wallet, connections_key: connections_key.to_string(), }) .await; diff --git a/src/rpc/commands/mod.rs b/src/rpc/commands/mod.rs index 8b6a6d2..383febc 100644 --- a/src/rpc/commands/mod.rs +++ b/src/rpc/commands/mod.rs @@ -5,6 +5,7 @@ pub mod address_complete_balance_sheet; pub mod bad_rpc_call; pub mod block_by_hash; pub mod block_by_height; +pub mod block_hash_at_height; pub mod block_header_by_hash; pub mod block_header_by_height; pub mod block_headers; diff --git a/src/rpc/commands/receive_torrent.rs b/src/rpc/commands/receive_torrent.rs index 6b8d652..e13aa48 100644 --- a/src/rpc/commands/receive_torrent.rs +++ b/src/rpc/commands/receive_torrent.rs @@ -1,8 +1,10 @@ use crate::common::check_genesis::genesis_checkup; use crate::common::skein::skein_128_hash_bytes; -use crate::lazy_static; use crate::log::{error, warn}; use crate::miner::flag::{is_reorganizing_mode, is_syncing_mode}; +use crate::orphans::checkup_state::{ + finish_orphan_check, request_orphan_recheck, try_begin_orphan_check, +}; use crate::orphans::structs::OrphanCheckup2; use crate::orphans::sync_check::sync_checkup; use crate::records::block_height::get_block_height::get_height; @@ -20,19 +22,15 @@ use crate::torrent::torrenting_system::torrent_cache::{ use crate::torrent::torrenting_system::torrent_requests::{ setup_download_for_torrent, stage_and_verify_torrent, }; +use crate::wallets::structures::Wallet; use crate::Arc; -use crate::AtomicBool; -use crate::AtomicOrdering; use crate::Mutex; -lazy_static! { - static ref ORPHAN_CHECK_RUNNING: AtomicBool = AtomicBool::new(false); -} - pub fn should_trigger_orphan_check(error: &str) -> bool { // These errors mean the incoming torrent may belong to a competing // branch, so a targeted orphan check is worth attempting. error.contains("Incorrect previous_block_hash.") + || error.contains("Candidate parent is not current chain parent.") || error.contains("Difficulty mismatch with the blockchain data.") || error.contains("Incoming block is no longer the next expected height.") || error.contains("Error opening file ./testnet_blocks/") @@ -60,19 +58,17 @@ pub async fn trigger_orphan_check( incoming_height: u32, stream: Arc>, db: &Db, - wallet_key: &str, + wallet: Arc, map: Arc>, connections_key: String, ) { if is_syncing_mode() { return; } - if ORPHAN_CHECK_RUNNING - .compare_exchange(false, true, AtomicOrdering::SeqCst, AtomicOrdering::SeqCst) - .is_err() - { + if !try_begin_orphan_check() { + request_orphan_recheck(incoming_height); warn!( - "[broadcast] orphan check already running, skipping duplicate trigger: reason={reason} incoming_height={incoming_height}" + "[broadcast] orphan check already running, queued recheck: reason={reason} incoming_height={incoming_height}" ); return; } @@ -96,15 +92,16 @@ pub async fn trigger_orphan_check( db: db.clone(), local_height, remote_height, + recheck_from_height: Some(incoming_height), map, node_syncing: false, connections_key, }; - match sync_checkup(orphan_checkup_params, wallet_key).await { + match sync_checkup(orphan_checkup_params, wallet).await { Ok(()) => {} Err(err) => error!("[broadcast] orphan check error: {err}"), }; - ORPHAN_CHECK_RUNNING.store(false, AtomicOrdering::SeqCst); + finish_orphan_check(); } pub enum TorrentSubmissionOutcome { @@ -122,7 +119,7 @@ pub async fn torrent_submission( torrent_bytes: Vec, stream: Arc>, db: &Db, - wallet_key: &str, + wallet: Arc, map: Arc>, connections_key: String, ) -> TorrentSubmissionOutcome { @@ -169,7 +166,7 @@ pub async fn torrent_submission( return TorrentSubmissionOutcome::Rejected(RpcResponse::Binary(msg)); } - match stage_and_verify_torrent(height, db, torrent, wallet_key, process_now).await { + match stage_and_verify_torrent(height, db, torrent, wallet.clone(), process_now).await { Ok(stage_result) => { let _ = remember_recent_torrent(&torrent_hash, height).await; if let Some((torrent, staged_path)) = stage_result { @@ -177,7 +174,7 @@ pub async fn torrent_submission( // the background so the RPC reply can return immediately. let stream_clone = stream.clone(); let db_clone = db.clone(); - let wallet_key_clone = wallet_key.to_string(); + let wallet_clone = wallet.clone(); let map_clone = map.clone(); let map_for_download = map.clone(); let map_for_broadcast = map.clone(); @@ -215,7 +212,7 @@ pub async fn torrent_submission( height, stream_clone, &db_clone, - &wallet_key_clone, + wallet_clone, map_clone, connections_key_clone, ) @@ -280,7 +277,7 @@ pub async fn receive_torrent( connections_key: &str, stream: Arc>, db: &Db, - wallet_key: &str, + wallet: Arc, map: Arc>, ) -> Result<(u32, RpcResponse), String> { let (uid, _) = @@ -313,7 +310,7 @@ pub async fn receive_torrent( torrent_bytes, stream.clone(), db, - wallet_key, + wallet.clone(), map.clone(), connections_key.to_string(), ) @@ -329,7 +326,7 @@ pub async fn receive_torrent( } => { let stream_clone = stream.clone(); let db_clone = db.clone(); - let wallet_key_clone = wallet_key.to_string(); + let wallet_clone = wallet.clone(); let map_clone = map.clone(); let connections_key_clone = connections_key.to_string(); tokio::spawn(async move { @@ -338,7 +335,7 @@ pub async fn receive_torrent( incoming_height, stream_clone, &db_clone, - &wallet_key_clone, + wallet_clone, map_clone, connections_key_clone, ) diff --git a/src/rpc/commands/route_reply.rs b/src/rpc/commands/route_reply.rs index f709e5b..ac91f2f 100644 --- a/src/rpc/commands/route_reply.rs +++ b/src/rpc/commands/route_reply.rs @@ -7,6 +7,7 @@ use crate::rpc::command_maps::MAX_RPC_REPLY_BYTES; use crate::rpc::commands::bad_rpc_call; use crate::rpc::read_bytes_from_stream; use crate::sled::Db; +use crate::wallets::structures::Wallet; use crate::Arc; use crate::Mutex; use crate::TcpStream; @@ -15,7 +16,7 @@ pub async fn route_reply( connections_key: &str, stream_locked: Arc>, db: &Db, - wallet_key: &str, + wallet: Arc, map: Arc>, ip: &str, client_type: ClientType, @@ -29,7 +30,7 @@ pub async fn route_reply( read_bytes_from_stream::read_u32_from_stream(connections_key, stream_locked.clone()).await? as usize; if message_length > MAX_RPC_REPLY_BYTES { - bad_rpc_call::record(ip, client_type, db, wallet_key).await; + bad_rpc_call::record(ip, client_type, db, wallet.clone()).await; return Err(format!( "error: RPC reply payload too large: len={message_length} max={MAX_RPC_REPLY_BYTES}" )); @@ -55,7 +56,7 @@ pub async fn route_reply( // Unknown replies usually mean a stale or forged UID. Drain the // payload so the stream remains aligned for future commands. - bad_rpc_call::record(ip, client_type, db, wallet_key).await; + bad_rpc_call::record(ip, client_type, db, wallet).await; let retired = is_retired_entry(map.clone(), uid).await; if retired { warn!("[rpc] late reply arrived for retired uid: {uid:?}"); diff --git a/src/rpc/commands/unblock_peer_ip.rs b/src/rpc/commands/unblock_peer_ip.rs index ee41548..c5a1811 100644 --- a/src/rpc/commands/unblock_peer_ip.rs +++ b/src/rpc/commands/unblock_peer_ip.rs @@ -1,25 +1,16 @@ use crate::rpc::responses::RpcResponse; use crate::sled::Db; use crate::wallets::structures::Wallet; +use crate::Arc; pub async fn unblock_peer( db: &Db, ip: String, signature: String, - wallet_key: String, + wallet: Arc, ) -> RpcResponse { // Peer unblocking is restricted to the local node owner, proven by a // signature from the locally loaded wallet over the target IP string. - let wallet = match Wallet::try_obtain_wallet(wallet_key, None).await { - Ok(wallet) => wallet, - Err(err) => { - let msg = format!("error: Wallet decryption failed: {err}") - .as_bytes() - .to_vec(); - return RpcResponse::Binary(msg); - } - }; - if Wallet::verify_transaction(&ip, &signature, &wallet.saved.long_address).await { let tree = db.open_tree("blocked_peers").unwrap(); let key = ip.clone(); diff --git a/src/rpc/commands/validate_torrent.rs b/src/rpc/commands/validate_torrent.rs index 21afa89..9c1eee1 100644 --- a/src/rpc/commands/validate_torrent.rs +++ b/src/rpc/commands/validate_torrent.rs @@ -3,6 +3,7 @@ use crate::rpc::responses::RpcResponse; use crate::rpc::server::flood_protection::MAX_TORRENT_METADATA_BYTES; use crate::sled::Db; use crate::torrent::structs::Torrent; +use crate::wallets::structures::Wallet; use crate::Arc; use crate::Mutex; use crate::TcpStream; @@ -11,7 +12,7 @@ pub async fn validate( connections_key: &str, stream_locked: Arc>, db: &Db, - wallet_key: &str, + wallet: Arc, ) -> Result<(u32, RpcResponse), String> { // Command 6 validates torrent metadata for a specific block height // without accepting or staging it as a broadcast torrent. @@ -45,7 +46,7 @@ pub async fn validate( // Verification checks the torrent against local chain data and // returns the verifier message as the binary RPC payload. - let result = match torrent.verify(block_number, db, wallet_key).await { + let result = match torrent.verify(block_number, db, wallet).await { Ok(()) => RpcResponse::Binary(b"msg: Validation passed".to_vec()), Err(err) => RpcResponse::Binary(err.into_bytes()), }; diff --git a/src/rpc/server/command_loop_state.rs b/src/rpc/server/command_loop_state.rs index 6410d27..622d14c 100644 --- a/src/rpc/server/command_loop_state.rs +++ b/src/rpc/server/command_loop_state.rs @@ -2,12 +2,15 @@ use crate::io::ErrorKind; use crate::log::warn; use crate::records::memory::connections::get_client_type_from_memory; use crate::records::memory::enums::ClientType; -use crate::rpc::command_maps::RPC_REPLY; +use crate::rpc::command_maps::{ + RPC_BLOCK_HEIGHT, RPC_BLOCK_PIECE, RPC_REPLY, RPC_TORRENT_BY_HEIGHT, +}; use crate::rpc::server::connection_memory_manager::remove_stream_from_memory; use crate::rpc::server::flood_protection::check_request_frequency_with_client_type; use crate::rpc::server::structs::IncomingCommand; use crate::sled::Db; use crate::sleep; +use crate::wallets::structures::Wallet; use crate::Arc; use crate::Duration; use crate::Mutex; @@ -57,11 +60,23 @@ async fn peer_ip(stream_locked: &Arc>) -> String { .unwrap_or_else(|_| "unknown".into()) } +fn skip_flood_check(command: u8, client_type: ClientType) -> bool { + if command == RPC_REPLY { + return true; + } + + client_type == ClientType::Miner + && matches!( + command, + RPC_BLOCK_HEIGHT | RPC_TORRENT_BY_HEIGHT | RPC_BLOCK_PIECE + ) +} + pub async fn next_incoming_command( stream_locked: Arc>, db: &Db, connections_key: &str, - wallet_key: &str, + wallet: Arc, ) -> Result, String> { // A disconnected socket returns None so the caller can end the RPC // loop without treating a clean disconnect as a command failure. @@ -76,10 +91,11 @@ pub async fn next_incoming_command( .await .unwrap_or(ClientType::Miner); - // Replies belong to an existing request path, so only new inbound - // commands are counted against flood protection. - if command != RPC_REPLY { - check_request_frequency_with_client_type(db, ip.clone(), client_type, wallet_key).await; + // Replies and miner sync traffic belong to expected node-to-node + // maintenance paths. All other commands still count against flood + // protection, including non-sync commands from miners. + if !skip_flood_check(command, client_type) { + check_request_frequency_with_client_type(db, ip.clone(), client_type, wallet).await; } Ok(Some(IncomingCommand { diff --git a/src/rpc/server/flood_protection.rs b/src/rpc/server/flood_protection.rs index 2b4d0f4..b96f417 100644 --- a/src/rpc/server/flood_protection.rs +++ b/src/rpc/server/flood_protection.rs @@ -3,6 +3,8 @@ use crate::records::ip_score::score::update_ip_score; use crate::records::memory::enums::ClientType; use crate::rpc::server::structs::RpcFloodState; use crate::sled::Db; +use crate::wallets::structures::Wallet; +use crate::Arc; use crate::Utc; pub const MAX_TORRENT_METADATA_BYTES: usize = 8192; @@ -24,7 +26,7 @@ pub async fn check_request_frequency_with_client_type( db: &Db, ip: String, client_type: ClientType, - wallet_key: &str, + wallet: Arc, ) { // Keep one compact flood-tracker row per subject and decay the // counters by elapsed time so stale request history expires @@ -67,7 +69,7 @@ pub async fn check_request_frequency_with_client_type( InfractionType::RpcFloodAttack, now as u32, db, - wallet_key, + wallet, ) .await; diff --git a/src/rpc/server/handshake.rs b/src/rpc/server/handshake.rs index 13b5dc1..415560f 100644 --- a/src/rpc/server/handshake.rs +++ b/src/rpc/server/handshake.rs @@ -40,20 +40,12 @@ async fn get_connection_counts() -> (u8, u8) { async fn complete_incoming_miner_setup( stream: Arc>, db: &Db, - wallet_key: &str, + wallet: Arc, map: Arc>, connections_key: &str, ) { // Incoming miner handshakes need the same post-handshake state exchange // as outgoing handshakes so a bootstrap with only incoming peers can mine. - let wallet = match Wallet::try_obtain_wallet(wallet_key.to_string(), None).await { - Ok(wallet) => wallet, - Err(err) => { - error!("[handshake] unable to load wallet for incoming miner setup: {err}"); - return; - } - }; - if let Err(err) = register_connected_wallet( stream.clone(), map.clone(), @@ -71,15 +63,8 @@ async fn complete_incoming_miner_setup( warn!("[wallet_registry] incoming peer sync failed after handshake: {err}"); } - announce_self_to_network( - stream, - &wallet.saved.short_address, - map, - db, - wallet_key, - connections_key, - ) - .await; + let short_address = wallet.saved.short_address.clone(); + announce_self_to_network(stream, &short_address, map, db, wallet, connections_key).await; if let Err(err) = NodeInfo::rebuild_mined_counts_from_chain(db).await { error!("[startup] failed to rebuild mined counts after incoming handshake: {err}"); @@ -91,7 +76,7 @@ async fn complete_incoming_miner_setup( pub async fn handle_handshake( stream: Arc>, db: Db, - wallet_key: String, + wallet: Arc, map: Arc>, ) { // read number of connected clients or set to 0 if none @@ -208,14 +193,14 @@ pub async fn handle_handshake( let is_miner = connection_type == "miner"; let post_handshake_stream = stream.clone(); let post_handshake_map = map.clone(); - let post_handshake_wallet_key = wallet_key.clone(); + let post_handshake_wallet = wallet.clone(); let post_handshake_connections_key = connections_key.clone(); let params = CombineAndSendDataParams { stream, db: db.clone(), connections_key, connection_type: connection_type.to_string(), - wallet_key: wallet_key.clone(), + wallet: wallet.clone(), map, returned_address: received_address.clone(), }; @@ -223,7 +208,7 @@ pub async fn handle_handshake( complete_incoming_miner_setup( post_handshake_stream, &db, - &post_handshake_wallet_key, + post_handshake_wallet, post_handshake_map, &post_handshake_connections_key, ) diff --git a/src/rpc/server/handshake_processing.rs b/src/rpc/server/handshake_processing.rs index 7d28ffc..282485e 100644 --- a/src/rpc/server/handshake_processing.rs +++ b/src/rpc/server/handshake_processing.rs @@ -79,11 +79,10 @@ pub async fn parse_received_data( pub async fn generate_and_sign_message( connection_type: &str, - wallet_key: String, + wallet: &Wallet, ) -> Result<(String, String, String), String> { // get the wallet info so we can sign our return message - let wallet = Wallet::try_obtain_wallet(wallet_key, None).await?; - let address = wallet.saved.long_address; + let address = wallet.saved.long_address.clone(); // if miner face is the return message, used because its hex so compressed better // otherwise face spelledbackwards is used @@ -155,12 +154,12 @@ pub async fn combine_and_send_data(params: CombineAndSendDataParams) -> Result<( db, connections_key, connection_type, - wallet_key, + wallet, map, returned_address: _, } = params; // generate the message to send - let result = generate_and_sign_message(&connection_type, wallet_key.clone()).await; + let result = generate_and_sign_message(&connection_type, &wallet).await; if let Err(err) = result { error!("Failed: {err}"); return Err(err); @@ -187,7 +186,7 @@ pub async fn combine_and_send_data(params: CombineAndSendDataParams) -> Result<( stream.clone(), db, connections_key.to_string(), - wallet_key, + wallet, map, )); diff --git a/src/rpc/server/rpc_command_loop.rs b/src/rpc/server/rpc_command_loop.rs index 563c5b2..8977f0b 100644 --- a/src/rpc/server/rpc_command_loop.rs +++ b/src/rpc/server/rpc_command_loop.rs @@ -6,6 +6,7 @@ use crate::rpc::server::command_loop_state::next_incoming_command; use crate::rpc::server::connection_memory_manager::remove_stream_from_memory; use crate::rpc::*; use crate::sled::Db; +use crate::wallets::structures::Wallet; use crate::Arc; use crate::AsyncWriteExt; use crate::Mutex; @@ -15,12 +16,12 @@ pub async fn start_loop( stream_locked: Arc>, db: Db, connections_key: String, - wallet_key: String, + wallet: Arc, map: Arc>, ) -> Result<(), String> { 'outer: loop { let Some(incoming_command) = - next_incoming_command(stream_locked.clone(), &db, &connections_key, &wallet_key) + next_incoming_command(stream_locked.clone(), &db, &connections_key, wallet.clone()) .await? else { break 'outer Ok(()); @@ -101,7 +102,7 @@ pub async fn start_loop( &connections_key, stream_locked.clone(), &db, - &wallet_key, + wallet.clone(), ) .await?; result @@ -518,8 +519,7 @@ pub async fn start_loop( .await?; let result = - commands::block_peer_ip::block_peer(&db, ip, signature, wallet_key.to_string()) - .await; + commands::block_peer_ip::block_peer(&db, ip, signature, wallet.clone()).await; result .send(&stream_locked, Some(&connections_key), uid) .await; @@ -542,13 +542,9 @@ pub async fn start_loop( ) .await?; - let result = commands::unblock_peer_ip::unblock_peer( - &db, - ip, - signature, - wallet_key.to_string(), - ) - .await; + let result = + commands::unblock_peer_ip::unblock_peer(&db, ip, signature, wallet.clone()) + .await; result .send(&stream_locked, Some(&connections_key), uid) .await; @@ -559,7 +555,7 @@ pub async fn start_loop( &connections_key, stream_locked.clone(), &db, - &wallet_key, + wallet.clone(), map.clone(), ) .await?; @@ -573,7 +569,7 @@ pub async fn start_loop( &connections_key, stream_locked.clone(), &db, - &wallet_key, + wallet.clone(), map.clone(), ) .await?; @@ -645,7 +641,7 @@ pub async fn start_loop( &connections_key, stream_locked.clone(), &db, - &wallet_key, + wallet.clone(), map.clone(), ) .await?; @@ -797,12 +793,32 @@ pub async fn start_loop( .send(&stream_locked, Some(&connections_key), uid) .await; } + 42 => { + // request the canonical block hash at a specific height + let (uid, _) = read_bytes_from_stream::read_uid_from_stream( + &connections_key, + stream_locked.clone(), + ) + .await?; + let block_number = read_bytes_from_stream::read_u32_from_stream( + &connections_key, + stream_locked.clone(), + ) + .await?; + + let result = + commands::block_hash_at_height::request_block_hash_at_height(block_number) + .await; + result + .send(&stream_locked, Some(&connections_key), uid) + .await; + } 255 => { commands::route_reply::route_reply( &connections_key, stream_locked.clone(), &db, - &wallet_key, + wallet.clone(), map.clone(), &ip, client_type, @@ -812,7 +828,7 @@ pub async fn start_loop( _ => { // Unknown commands are ignored at the protocol level but // still count as bad RPC behavior for scoring purposes. - commands::bad_rpc_call::record(&ip, client_type, &db, &wallet_key).await; + commands::bad_rpc_call::record(&ip, client_type, &db, wallet.clone()).await; } } diff --git a/src/rpc/server/start_rpc.rs b/src/rpc/server/start_rpc.rs index 16dd964..c767937 100644 --- a/src/rpc/server/start_rpc.rs +++ b/src/rpc/server/start_rpc.rs @@ -2,6 +2,7 @@ use crate::log::error; use crate::records::memory::response_channels::Command; use crate::rpc::server::handshake::handle_handshake; use crate::sled::Db; +use crate::wallets::structures::Wallet; use crate::Arc; use crate::Mutex; use crate::SocketAddr; @@ -11,7 +12,7 @@ use crate::TcpListener; pub async fn start_rpc( db: &Db, server_address: String, - wallet_key: String, + wallet: Arc, map: Arc>, ) { // Parse once at startup so the accept loop can work with a concrete @@ -24,7 +25,7 @@ pub async fn start_rpc( // The listener runs in the background while startup continues with // the rest of node initialization. tokio::spawn(async move { - rpc_server(server_socket, &db_clone, wallet_key, map).await; + rpc_server(server_socket, &db_clone, wallet, map).await; }); } @@ -32,7 +33,7 @@ pub async fn start_rpc( async fn rpc_server( server_socket: SocketAddr, db: &Db, - wallet_key: String, + wallet: Arc, map: Arc>, ) { // Bind failure means this node cannot accept RPC traffic, so log the @@ -51,10 +52,10 @@ async fn rpc_server( // slow peers do not block the listener from accepting. let stream = Arc::new(Mutex::new(stream)); let db_clone = db.clone(); - let wallet_key_clone = wallet_key.clone(); + let wallet_clone = wallet.clone(); let map_clone = map.clone(); tokio::spawn(async move { - handle_handshake(stream, db_clone, wallet_key_clone, map_clone).await; + handle_handshake(stream, db_clone, wallet_clone, map_clone).await; }); } Err(e) => { diff --git a/src/rpc/server/structs.rs b/src/rpc/server/structs.rs index ab3a1a0..4831e3f 100644 --- a/src/rpc/server/structs.rs +++ b/src/rpc/server/structs.rs @@ -4,6 +4,7 @@ use crate::rpc::server::flood_protection::{ RPC_LONG_WINDOW_LIMIT, RPC_LONG_WINDOW_SECS, RPC_SHORT_WINDOW_SECS, }; use crate::sled::Db; +use crate::wallets::structures::Wallet; use crate::Arc; use crate::Mutex; use crate::TcpStream; @@ -21,7 +22,7 @@ pub struct CombineAndSendDataParams { pub db: Db, pub connections_key: String, pub connection_type: String, - pub wallet_key: String, + pub wallet: Arc, pub map: Arc>, pub returned_address: String, } diff --git a/src/startup/connections.rs b/src/startup/connections.rs index 5894515..179c6d4 100644 --- a/src/startup/connections.rs +++ b/src/startup/connections.rs @@ -6,16 +6,17 @@ use crate::miner::flag::{ use crate::records::memory::response_channels::Command; use crate::rpc::client::handshake::connect_and_handshake; use crate::rpc::client::structs::Connect; -use crate::sled::Db; -use crate::sleep; -use crate::Arc; +use crate::sled::Db; +use crate::sleep; +use crate::wallets::structures::Wallet; +use crate::Arc; use crate::Duration; use crate::Mutex; -pub async fn handle_connections( - db: Db, - wallet_key: String, - map: Arc>, +pub async fn handle_connections( + db: Db, + wallet: Arc, + map: Arc>, ) -> Result<(), String> { // A zero outgoing limit means this node should not open any bootstrap // connection during startup. @@ -40,9 +41,7 @@ pub async fn handle_connections( // shared state so each attempt can run independently let db_clone = db.clone(); - let wallet_key_cloned = wallet_key.clone(); - - // parse the configured peer string once before spawning + // parse the configured peer string once before spawning // the outbound connection attempt let socket_address = server.parse().expect("Failed to parse the socket address"); @@ -51,13 +50,13 @@ pub async fn handle_connections( let first: bool = true; let connect_params = Connect { - addr: socket_address, - db: db_clone, - node_ip: server.to_string(), - wallet_key: wallet_key_cloned, - map: map_clone, - first, - }; + addr: socket_address, + db: db_clone, + node_ip: server.to_string(), + wallet: wallet.clone(), + map: map_clone, + first, + }; let err_string = match connect_and_handshake(connect_params).await { Ok(()) => { diff --git a/src/startup/initialize_startup.rs b/src/startup/initialize_startup.rs index 7c4b3c2..56cb4bd 100644 --- a/src/startup/initialize_startup.rs +++ b/src/startup/initialize_startup.rs @@ -74,11 +74,11 @@ pub async fn prepare_pre_wallet_startup() { create_file_paths().await; } -pub async fn obtain_startup_wallet_key() -> String { +pub async fn obtain_startup_wallet() -> Wallet { // Open or create the configured wallet and return the validated - // encryption key once the wallet can be used safely. - let (wallet_key, _wallet) = obtain_valid_wallet().await; - wallet_key + // wallet once it can be used safely. + let (_wallet_key, wallet) = obtain_valid_wallet().await; + wallet } pub async fn open_chain_state() -> sled::Db { diff --git a/src/startup/network_broadcast.rs b/src/startup/network_broadcast.rs index 1cc678c..dcd1821 100644 --- a/src/startup/network_broadcast.rs +++ b/src/startup/network_broadcast.rs @@ -32,7 +32,7 @@ pub async fn announce_self_to_network( address: &str, command_map: Arc>, db: &Db, - wallet_key: &str, + wallet: Arc, connections_key: &str, ) { // announce the local node to the connected peer, then @@ -84,7 +84,7 @@ pub async fn announce_self_to_network( unlocked_stream, command_map.clone(), db, - wallet_key, + wallet, connections_key, ) .await @@ -96,7 +96,7 @@ pub async fn get_network_mapping( unlocked_stream: Arc>, command_map: Arc>, db: &Db, - wallet_key: &str, + wallet: Arc, connections_key: &str, ) { // request the remote peer's serialized node list and @@ -154,7 +154,7 @@ pub async fn get_network_mapping( blocks_mined, remote_ip: remote_ip.to_string(), db: db.clone(), - wallet_key: wallet_key.to_string(), + wallet: wallet.clone(), connections_key: connections_key.to_string(), }) .await; @@ -192,7 +192,7 @@ pub async fn get_network_mapping( }, remote_ip: remote_ip.to_string(), db: db.clone(), - wallet_key: wallet_key.to_string(), + wallet: wallet.clone(), connections_key: connections_key.to_string(), }) .await; diff --git a/src/startup/node_runtime.rs b/src/startup/node_runtime.rs index 0b2e077..03f859c 100644 --- a/src/startup/node_runtime.rs +++ b/src/startup/node_runtime.rs @@ -17,6 +17,7 @@ use crate::startup::connections::handle_connections; use crate::startup::daemonize::{install_shutdown_cleanup, remove_registered_pid_file}; use crate::startup::initialize_startup::open_chain_state; use crate::verifications::verification_service::initialize_global_verification_service; +use crate::wallets::structures::Wallet; use crate::Arc; use crate::Error; use crate::HashMap; @@ -89,7 +90,7 @@ pub fn clear_ip_scores(db: &Db) -> sled::Result<()> { Ok(()) } -pub async fn run_unlocked_node(wallet_key: String, install_shutdown: bool) -> Result<(), String> { +pub async fn run_unlocked_node(wallet: Arc, install_shutdown: bool) -> Result<(), String> { // Once the wallet is available, the shared node runtime performs the remaining // startup work for both Linux foreground/daemon mode and the Windows service path. info!( @@ -117,7 +118,7 @@ pub async fn run_unlocked_node(wallet_key: String, install_shutdown: bool) -> Re error!("Failed to clear IP scores: {e}"); } - let wallet_key_clone = wallet_key.clone(); + let wallet_for_server = wallet.clone(); let map: Arc> = Arc::new(Mutex::new(HashMap::new())); let map_cloned = Arc::clone(&map); let db_server = db.clone(); @@ -133,24 +134,24 @@ pub async fn run_unlocked_node(wallet_key: String, install_shutdown: bool) -> Re // The RPC server starts first so handshake traffic can begin while the rest of the // node initialization continues. tokio::spawn(async move { - start_rpc(&db_server, server_address, wallet_key_clone, map_cloned).await; + start_rpc(&db_server, server_address, wallet_for_server, map_cloned).await; }); // Connection management, genesis creation, and mining then proceed in the same // unlocked runtime regardless of how the process was launched. - handle_connections(db.clone(), wallet_key.clone(), map.clone()) + handle_connections(db.clone(), wallet.clone(), map.clone()) .await .map_err(|e| format!("Startup connection error: {e}"))?; create_genesis_transaction( &db, verification_service.clone(), - wallet_key.clone(), + wallet.clone(), map.clone(), ) .await; - mine_block(&db, verification_service, wallet_key.clone(), map.clone()) + mine_block(&db, verification_service, wallet.clone(), map.clone()) .await .map_err(|e| format!("Mining loop error: {e}"))?; diff --git a/src/startup/unlock_pipe.rs b/src/startup/unlock_pipe.rs index 93482f1..b3b662d 100644 --- a/src/startup/unlock_pipe.rs +++ b/src/startup/unlock_pipe.rs @@ -99,11 +99,11 @@ fn wide_null(value: &str) -> Vec { } #[cfg(windows)] -pub async fn run_unlock_pipe_server( - service_state: Arc>, - shutdown: Arc, - unlock_sender: mpsc::UnboundedSender, -) { +pub async fn run_unlock_pipe_server( + service_state: Arc>, + shutdown: Arc, + unlock_sender: mpsc::UnboundedSender>, +) { let pipe_name = pipe_name(); let mut first_instance = true; @@ -176,11 +176,11 @@ pub async fn run_unlock_pipe_server( } #[cfg(windows)] -async fn handle_request( - request_bytes: &[u8], - service_state: Arc>, - unlock_sender: mpsc::UnboundedSender, -) -> UnlockPipeResponse { +async fn handle_request( + request_bytes: &[u8], + service_state: Arc>, + unlock_sender: mpsc::UnboundedSender>, +) -> UnlockPipeResponse { // Malformed helper requests are reported back through the pipe instead of panicking the service. let request = match from_slice::(request_bytes) { Ok(request) => request, @@ -207,19 +207,19 @@ async fn handle_request( }; } - match Wallet::try_obtain_wallet(wallet_key.clone(), None).await { - Ok(_) => { - // Mark unlocked before sending the key so status checks immediately reflect progress. - { - let mut state = service_state.write().await; - *state = ServiceWaitState::Unlocked; - } - - // If the runtime loop is gone, restore the waiting state so another attempt can be made. - if unlock_sender.send(wallet_key).is_err() { - let mut state = service_state.write().await; - *state = ServiceWaitState::WaitingForUnlock; - return UnlockPipeResponse::Error { + match Wallet::try_obtain_wallet(wallet_key, None).await { + Ok(wallet) => { + // Mark unlocked before sending the wallet so status checks immediately reflect progress. + { + let mut state = service_state.write().await; + *state = ServiceWaitState::Unlocked; + } + + // If the runtime loop is gone, restore the waiting state so another attempt can be made. + if unlock_sender.send(Arc::new(wallet)).is_err() { + let mut state = service_state.write().await; + *state = ServiceWaitState::WaitingForUnlock; + return UnlockPipeResponse::Error { message: "Service failed to accept the unlock request.".to_string(), }; } diff --git a/src/startup/windows_service.rs b/src/startup/windows_service.rs index c70c7e1..28eefd4 100644 --- a/src/startup/windows_service.rs +++ b/src/startup/windows_service.rs @@ -15,6 +15,8 @@ use crate::startup::unlock_pipe::{pipe_name, run_unlock_pipe_server}; #[cfg(windows)] use crate::startup::unlock_structs::ServiceWaitState; #[cfg(windows)] +use crate::wallets::structures::Wallet; +#[cfg(windows)] use crate::Arc; #[cfg(windows)] use crate::Duration; @@ -472,7 +474,7 @@ fn run_service() -> windows_service::Result<()> { let service_state = Arc::new(RwLock::new(ServiceWaitState::WaitingForUnlock)); let service_state_for_pipe = service_state.clone(); let shutdown_for_pipe = shutdown.clone(); - let (unlock_tx, mut unlock_rx) = mpsc::unbounded_channel::(); + let (unlock_tx, mut unlock_rx) = mpsc::unbounded_channel::>(); let unlocked_node_task: Arc>>> = Arc::new(StdMutex::new(None)); let unlocked_node_task_for_loop = unlocked_node_task.clone(); @@ -508,12 +510,12 @@ fn run_service() -> windows_service::Result<()> { runtime.block_on(async move { loop { tokio::select! { - maybe_wallet_key = unlock_rx.recv() => { - if let Some(wallet_key) = maybe_wallet_key { + maybe_wallet = unlock_rx.recv() => { + if let Some(wallet) = maybe_wallet { // Once the wallet key is accepted, the shared unlocked-node // runtime is launched inside the service process itself. let handle = tokio::spawn(async move { - if let Err(err) = run_unlocked_node(wallet_key, false).await { + if let Err(err) = run_unlocked_node(wallet, false).await { error!("Unlocked Windows service node failed during startup: {err}"); } }); diff --git a/src/torrent/create_metadata.rs b/src/torrent/create_metadata.rs index 2d9c602..16e01dc 100644 --- a/src/torrent/create_metadata.rs +++ b/src/torrent/create_metadata.rs @@ -20,8 +20,8 @@ pub async fn metadata_from_file( difficulty: u64, timestamp: u32, block_hash: &str, + previous_hash: &str, miner_wallet: String, - _map: Arc>, ) -> Result, String> { // The torrent is built from the mined block file and saved under the network torrent path. let ( @@ -99,6 +99,7 @@ pub async fn metadata_from_file( nonce, vrf, block_hash: block_hash.to_string(), + previous_hash: previous_hash.to_string(), piece_length, info_hash: block_hashed, pieces: piece_hashes, diff --git a/src/torrent/structs.rs b/src/torrent/structs.rs index 1ccf4c2..ef07341 100644 --- a/src/torrent/structs.rs +++ b/src/torrent/structs.rs @@ -77,20 +77,21 @@ pub struct TorrentCacheEntry { #[derive(Clone, Debug, Serialize)] pub struct Torrent { // Torrent is the compact metadata file used to fetch and verify one mined block. - pub info: Info, // fixed header is 89 bytes plus 17 bytes per piece entry + pub info: Info, // fixed header is 121 bytes plus 17 bytes per piece entry pub mined_by: String, // 22-byte short address when serialized } #[derive(Clone, Debug, Serialize)] pub struct Info { // Info is the serialized block metadata header followed by one hash entry per block piece. - // The fixed header is 89 bytes before piece entries. + // The fixed header is 121 bytes before piece entries. pub length: u64, // 8 bytes pub this_block_difficulty: u64, // 8 bytes pub timestamp: u32, // 4 bytes pub nonce: u8, // 1 byte pub vrf: u128, // 16 bytes pub block_hash: String, // 32 bytes + pub previous_hash: String, // 32 bytes pub piece_length: u32, // 4 bytes pub info_hash: String, // 16 bytes pub pieces: Vec>, // 17 bytes per piece entry @@ -120,6 +121,10 @@ impl Torrent { bytes.extend(block_hash_bytes); } + if let Ok(previous_hash_bytes) = decode(&self.info.previous_hash) { + bytes.extend(previous_hash_bytes); + } + let piece_length_bytes = self.info.piece_length.to_le_bytes(); bytes.extend(piece_length_bytes); @@ -145,10 +150,11 @@ impl Torrent { } pub async fn from_bytes(torrent_bytes: &[u8]) -> tokio::io::Result { - const FIXED_HEADER_BYTES: usize = 89; + const OLD_FIXED_HEADER_BYTES: usize = 89; + const FIXED_HEADER_BYTES: usize = 121; const PIECE_ENTRY_BYTES: usize = 17; const WALLET_BYTES: usize = Wallet::SHORT_ADDRESS_BYTES_LENGTH; - const MIN_TORRENT_BYTES: usize = FIXED_HEADER_BYTES + WALLET_BYTES; + const MIN_TORRENT_BYTES: usize = OLD_FIXED_HEADER_BYTES + WALLET_BYTES; if torrent_bytes.len() < MIN_TORRENT_BYTES { return Err(tokio::io::Error::new( @@ -179,6 +185,40 @@ impl Torrent { let block_hash = encode(block_hash_bytes); cursor = &cursor[32..]; + let expected_torrent_len = + |fixed_header_bytes: usize, candidate_piece_length: u32| -> Option { + if candidate_piece_length == 0 { + return None; + } + let piece_length_u64 = candidate_piece_length as u64; + let piece_count_u64 = length.checked_add(piece_length_u64 - 1)? / piece_length_u64; + let piece_count = usize::try_from(piece_count_u64).ok()?; + if piece_count > u8::MAX as usize { + return None; + } + fixed_header_bytes + .checked_add(piece_count.checked_mul(PIECE_ENTRY_BYTES)?)? + .checked_add(WALLET_BYTES) + }; + + let parse_as_new_format = if cursor.len() >= 32 + 4 + 16 + WALLET_BYTES { + let possible_piece_length = + u32::from_le_bytes([cursor[32], cursor[33], cursor[34], cursor[35]]); + expected_torrent_len(FIXED_HEADER_BYTES, possible_piece_length) + == Some(torrent_bytes.len()) + } else { + false + }; + + let previous_hash = if parse_as_new_format { + let previous_hash_bytes = &cursor[0..32]; + let previous_hash = encode(previous_hash_bytes); + cursor = &cursor[32..]; + previous_hash + } else { + String::new() + }; + let piece_length = u32::from_le_bytes(cursor[0..4].try_into().unwrap()); cursor = &cursor[4..]; @@ -291,6 +331,7 @@ impl Torrent { nonce, vrf, block_hash, + previous_hash, piece_length, info_hash, pieces, diff --git a/src/torrent/torrenting_system/save_block.rs b/src/torrent/torrenting_system/save_block.rs index ce75354..666a5fd 100644 --- a/src/torrent/torrenting_system/save_block.rs +++ b/src/torrent/torrenting_system/save_block.rs @@ -90,6 +90,18 @@ pub async fn verify_and_save_block(params: DownloadSave) -> Result<(), String> { } }; + let loaded_header_hash = loaded_block.vrf_block.hash().await; + if loaded_header_hash != params.torrent.info.block_hash { + cleanup_download_pieces(¶ms).await; + return Err("Candidate header hash does not match torrent metadata.".to_string()); + } + if !params.torrent.info.previous_hash.is_empty() + && loaded_block.vrf_block.unmined_block.previous_hash != params.torrent.info.previous_hash + { + cleanup_download_pieces(¶ms).await; + return Err("Candidate previous hash does not match torrent metadata.".to_string()); + } + // Run full block verification before allowing the chain save path to persist the downloaded block. let signatures = match loaded_block .verify(¶ms.db, params.verification_service.clone()) diff --git a/src/torrent/torrenting_system/torrent_requests.rs b/src/torrent/torrenting_system/torrent_requests.rs index 94eceb5..c3d118b 100644 --- a/src/torrent/torrenting_system/torrent_requests.rs +++ b/src/torrent/torrenting_system/torrent_requests.rs @@ -10,6 +10,7 @@ use crate::torrent::structs::Torrent; use crate::torrent::torrenting_system::save_torrent::save_staged_torrent; use crate::torrent::torrenting_system::setup_block_download::setup_download; use crate::verifications::verification_service::global_verification_service; +use crate::wallets::structures::Wallet; use crate::Arc; use crate::Mutex; use crate::TcpStream; @@ -37,13 +38,13 @@ pub async fn handle_response_and_save_torrent( height: u32, db: &Db, torrent: Torrent, - wallet_key: &str, + wallet: Arc, map: Arc>, allow_during_reorg: bool, rebroadcast: bool, ) -> Result<(), String> { let Some((torrent, staged_path)) = - stage_and_verify_torrent(height, db, torrent, wallet_key, true).await? + stage_and_verify_torrent(height, db, torrent, wallet, true).await? else { return Ok(()); }; @@ -72,7 +73,7 @@ pub struct ProcessTorrentResponse { pub height: u32, pub db: Db, pub torrent: Torrent, - pub wallet_key: String, + pub wallet: Arc, pub map: Arc>, pub allow_during_reorg: bool, pub process_now: bool, @@ -83,7 +84,7 @@ pub async fn process_torrent_response(params: ProcessTorrentResponse) -> Result< params.height, ¶ms.db, params.torrent, - ¶ms.wallet_key, + params.wallet, params.process_now, ) .await? @@ -112,7 +113,7 @@ pub async fn stage_and_verify_torrent( height: u32, db: &Db, torrent: Torrent, - wallet_key: &str, + wallet: Arc, process_now: bool, ) -> Result, String> { // Stage the torrent first so a parseable candidate is never lost just @@ -129,7 +130,7 @@ pub async fn stage_and_verify_torrent( return Ok(None); } - if let Err(error) = torrent.verify(height, db, wallet_key).await { + if let Err(error) = torrent.verify(height, db, wallet).await { warn!("[torrent] validation failed: height={height} err={error}"); return Err(error); } diff --git a/src/verifications/async_funcs/validate_torrent_data.rs b/src/verifications/async_funcs/validate_torrent_data.rs index c04f857..731ed14 100644 --- a/src/verifications/async_funcs/validate_torrent_data.rs +++ b/src/verifications/async_funcs/validate_torrent_data.rs @@ -7,12 +7,13 @@ use crate::records::wallet_registry::is_registered_short_address; use crate::sled::Db; use crate::torrent::structs::Torrent; use crate::wallets::structures::Wallet; +use crate::Arc; use crate::Utc; impl Torrent { // validate the torrent metadata before any block pieces are // downloaded so invalid broadcasts can be rejected early - pub async fn verify(&self, height: u32, db: &Db, wallet_key: &str) -> Result<(), String> { + pub async fn verify(&self, height: u32, db: &Db, wallet: Arc) -> Result<(), String> { let address = &self.mined_by; let ip = NodeInfo::find_ip_by_address(address) .await @@ -26,26 +27,19 @@ impl Torrent { InfractionType::BadTorrent, now, db, - wallet_key, + wallet.clone(), ) .await; return Err(e); } if let Err(e) = Self::validate_mined_by(self, db).await { - let _ = update_ip_score( - &ip, - "miner", - InfractionType::BadTorrent, - now, - db, - wallet_key, - ) - .await; + let _ = + update_ip_score(&ip, "miner", InfractionType::BadTorrent, now, db, wallet).await; return Err(e); } - Self::validate_difficulty_matching(self, height).await?; + Self::validate_parent_and_difficulty(self, height).await?; Ok(()) } @@ -118,7 +112,10 @@ impl Torrent { Ok(()) } - async fn validate_difficulty_matching(&self, adjusted_block_number: u32) -> Result<(), String> { + async fn validate_parent_and_difficulty( + &self, + adjusted_block_number: u32, + ) -> Result<(), String> { // compare the torrent's stated block difficulty against // the next difficulty recorded in the current local chain let blockchain_difficulty = if genesis_checkup().await { @@ -129,6 +126,12 @@ impl Torrent { // use the previous block header as the source of truth // for the difficulty this torrent should be mining under let blockchain_data = load_block_header(previous_block_number).await?; + if !self.info.previous_hash.is_empty() { + let local_parent_hash = blockchain_data.hash().await; + if self.info.previous_hash != local_parent_hash { + return Err("Candidate parent is not current chain parent.".to_string()); + } + } blockchain_data.unmined_block.next_block_difficulty } else { 3000000000000000_u64