diff --git a/src/miner/genesis.rs b/src/miner/genesis.rs index 5858f39..c2a4234 100644 --- a/src/miner/genesis.rs +++ b/src/miner/genesis.rs @@ -7,6 +7,7 @@ use crate::miner::flag::{ is_mining_running, is_mining_stop_requested, is_normal_mode, set_mining_state, MiningState, }; use crate::records::memory::connections::peer_connection_count; +use crate::records::memory::network_mapping::NodeInfo; use crate::records::memory::response_channels::Command; use crate::records::record_chain::save::save_block; use crate::records::record_chain::structs::{SaveBlockParams, SaveType}; @@ -72,6 +73,7 @@ async fn create_genesis_block( let mut nonce: u8 = 0; let mut genesis_search_logged = false; let mut disconnected_logged = false; + let mut registration_logged = false; let validator_mode = Settings::load() .map(|settings| settings.validator) .unwrap_or(false); @@ -100,6 +102,16 @@ async fn create_genesis_block( set_mining_state(MiningState::Idle); break Ok(()); } + if !NodeInfo::address_checkup(miner, 0).await { + if !registration_logged { + info!("[genesis] waiting for local node address to be registered before mining genesis"); + registration_logged = true; + } + set_mining_state(MiningState::Idle); + let _ = sleep(Duration::from_millis(100)).await; + continue; + } + registration_logged = false; // Mark the miner running once per active stretch. if !is_mining_running() { set_mining_state(MiningState::Running); diff --git a/src/miner/mining.rs b/src/miner/mining.rs index 83c3320..160c29f 100644 --- a/src/miner/mining.rs +++ b/src/miner/mining.rs @@ -54,6 +54,7 @@ pub async fn mine_block( // can stop quickly when another peer advances the chain. let mut expected_block_height = get_height(db) + 1; let mut fairness_paused_height: Option = None; + let mut registration_paused_height: Option = None; let mut was_stopped = true; loop { @@ -72,11 +73,18 @@ pub async fn mine_block( // Mining requires the local node wallet to be announced in // the network map. Offline/solo nodes stay idle instead of // producing private blocks that can later be submitted in bulk. + if registration_paused_height != Some(current_block_number) { + info!( + "[mining] waiting for local node address to be registered before mining block {current_block_number}" + ); + registration_paused_height = Some(current_block_number); + } set_mining_state(MiningState::Idle); was_stopped = true; sleep(Duration::from_millis(250)).await; continue; } + registration_paused_height = None; if !wait_for_fairness_gate( db, diff --git a/src/orphans/deep_sync_rollback.rs b/src/orphans/deep_sync_rollback.rs index e70bfef..dbf5712 100644 --- a/src/orphans/deep_sync_rollback.rs +++ b/src/orphans/deep_sync_rollback.rs @@ -33,6 +33,7 @@ pub async fn deep_sync_rollback(mut params: CheckUp, wallet_key: &str) { // against the next lower local height. let undo_transactions_params = UndoTransactions { start_height: params.local_height, + replay_to_height: params.remote_height, db: params.db.clone(), stream: params.stream.clone(), map: params.map.clone(), diff --git a/src/orphans/orphan_checkup.rs b/src/orphans/orphan_checkup.rs index 95acb38..7d89c32 100644 --- a/src/orphans/orphan_checkup.rs +++ b/src/orphans/orphan_checkup.rs @@ -196,6 +196,7 @@ pub async fn checkup(params: OrphanCheckup, wallet_key: &str) -> Result<(), Stri Ok(()) => { let undo_transactions_params = UndoTransactions { start_height: height, + replay_to_height: params.remote_height, db: params.db.clone(), stream: params.stream.clone(), map: params.map.clone(), diff --git a/src/orphans/save_blocks.rs b/src/orphans/save_blocks.rs index 0cd1fd9..b755da0 100644 --- a/src/orphans/save_blocks.rs +++ b/src/orphans/save_blocks.rs @@ -18,12 +18,13 @@ use std::collections::HashSet; pub async fn save_new_blocks( params: &UndoTransactions, - max_height: u32, + replay_to_height: u32, wallet_key: &str, mut true_start_height: u32, ) -> Result<(), String> { - // after rollback, request and save each remote block from the - // divergence point up to the height we need to restore + // After rollback, save replacement blocks only up to the height + // proven by the peer/orphan context. The old local tip may include + // blocks mined on the losing branch while orphan correction was running. let mut hydrated_heights = HashSet::new(); loop { let mut resolved_from_staging = false; @@ -112,7 +113,7 @@ pub async fn save_new_blocks( if resolved_from_staging { // Continue replaying if staged data supplied this height and more // replacement heights remain. - match max_height.cmp(&true_start_height) { + match replay_to_height.cmp(&true_start_height) { std::cmp::Ordering::Equal => break, std::cmp::Ordering::Greater => { true_start_height += 1; @@ -213,7 +214,7 @@ pub async fn save_new_blocks( // continue until the requested replacement range // has been fully saved locally - match max_height.cmp(&true_start_height) { + match replay_to_height.cmp(&true_start_height) { std::cmp::Ordering::Equal => break, std::cmp::Ordering::Greater => true_start_height += 1, std::cmp::Ordering::Less => break, diff --git a/src/orphans/snapshot_check.rs b/src/orphans/snapshot_check.rs index 2917ecb..e923854 100644 --- a/src/orphans/snapshot_check.rs +++ b/src/orphans/snapshot_check.rs @@ -68,6 +68,7 @@ pub async fn snapshot_verified(params: UndoTransactions, wallet_key: &str) -> bo } let undo_transactions_params = UndoTransactions { start_height: snap_height, + replay_to_height: params.replay_to_height, db: params.db, stream: params.stream, map: params.map, diff --git a/src/orphans/structs.rs b/src/orphans/structs.rs index b18285e..5d81d2c 100644 --- a/src/orphans/structs.rs +++ b/src/orphans/structs.rs @@ -35,6 +35,7 @@ pub struct OrphanCheckup2 { // reorganization decision has already been made pub struct UndoTransactions { pub start_height: u32, + pub replay_to_height: u32, pub db: Db, pub stream: Arc>, pub map: Arc>, diff --git a/src/orphans/sync_check.rs b/src/orphans/sync_check.rs index 209f640..09500ae 100644 --- a/src/orphans/sync_check.rs +++ b/src/orphans/sync_check.rs @@ -166,9 +166,10 @@ async fn replay_staged_torrents(params: &OrphanCheckup2, wallet_key: &str) -> Re // Every staged candidate for the current expected height was // exhausted without extending the chain, so stop replay here. if retryable_pending { - return Err(format!( - "Replay waiting for block pieces at height {expected_height}" - )); + warn!( + "[orphan] replay paused at height {expected_height}; candidate pieces are not currently available" + ); + return Ok(()); } return Ok(()); } @@ -192,6 +193,7 @@ pub async fn sync_checkup(params: OrphanCheckup2, wallet_key: &str) -> Result<() } let undo_transactions_params = UndoTransactions { start_height: params.local_height, + replay_to_height: params.remote_height, db: params.db.clone(), stream: params.stream.clone(), map: params.map.clone(), @@ -211,8 +213,10 @@ pub async fn sync_checkup(params: OrphanCheckup2, wallet_key: &str) -> Result<() error!("[orphan] staged torrent replay error: {err}"); } } - if replay_waiting && !params.node_syncing { - return Err("orphan replay is waiting for block data".to_string()); + if replay_waiting { + warn!( + "[orphan] replay is waiting for block data; leaving candidates pending for a later pass" + ); } if !params.node_syncing { end_reorg_lock(); @@ -257,8 +261,10 @@ pub async fn sync_checkup(params: OrphanCheckup2, wallet_key: &str) -> Result<() if get_height(¶ms.db) > height_before_replay { replay_waiting = false; } - if replay_waiting && !params.node_syncing { - return Err("orphan replay is waiting for block data".to_string()); + if replay_waiting { + warn!( + "[orphan] replay is waiting for block data; leaving candidates pending for a later pass" + ); } if !params.node_syncing { end_reorg_lock(); diff --git a/src/orphans/undo_block_transactions.rs b/src/orphans/undo_block_transactions.rs index 0878b5a..194b15b 100644 --- a/src/orphans/undo_block_transactions.rs +++ b/src/orphans/undo_block_transactions.rs @@ -25,8 +25,9 @@ pub async fn undo_transactions(params: UndoTransactions, wallet_key: &str) -> Re // walk backward from the current tip to the selected // rollback height and undo each block in reverse order let true_start_height = params.start_height; - let max_height = get_height(¶ms.db); - let mut current_height = max_height; + let rollback_tip = get_height(¶ms.db); + let replay_to_height = params.replay_to_height.max(true_start_height); + let mut current_height = rollback_tip; let mut rolled_back_transactions = Vec::new(); loop { // Load the canonical block before deleting it so every transaction can @@ -162,7 +163,7 @@ pub async fn undo_transactions(params: UndoTransactions, wallet_key: &str) -> Re // rebuild mined counts after rollback, then fetch and save the // replacement blocks, and finally rebuild mined counts again NodeInfo::rebuild_mined_counts_from_chain(¶ms.db).await?; - save_new_blocks(¶ms, max_height, wallet_key, true_start_height).await?; + save_new_blocks(¶ms, replay_to_height, wallet_key, true_start_height).await?; NodeInfo::rebuild_mined_counts_from_chain(¶ms.db).await?; Ok(()) } diff --git a/src/rpc/server/handshake.rs b/src/rpc/server/handshake.rs index f9f5c06..13b5dc1 100644 --- a/src/rpc/server/handshake.rs +++ b/src/rpc/server/handshake.rs @@ -1,5 +1,9 @@ +use crate::log::{error, warn}; +use crate::records::memory::network_mapping::NodeInfo; use crate::records::memory::response_channels::generate_uid; use crate::records::memory::response_channels::Command; +use crate::rpc::client::register_wallet::register_connected_wallet; +use crate::rpc::client::wallet_registry_sync::sync_wallet_registry; use crate::rpc::responses::RpcResponse; use crate::rpc::server::connection_memory_manager::write_to_memory; use crate::rpc::server::handshake_processing::{combine_and_send_data, parse_received_data}; @@ -7,6 +11,8 @@ use crate::rpc::server::handshake_verifications::{connection_count, perform_hand use crate::rpc::server::structs::{CombineAndSendDataParams, HandshakeTestParams}; use crate::rpc::server::tests::{endpoint_port, is_port_open}; use crate::sled::Db; +use crate::startup::network_broadcast::announce_self_to_network; +use crate::wallets::structures::Wallet; use crate::Arc; use crate::AsyncWriteExt; use crate::Mutex; @@ -31,6 +37,55 @@ async fn get_connection_counts() -> (u8, u8) { (incoming, outgoing) } +async fn complete_incoming_miner_setup( + stream: Arc>, + db: &Db, + wallet_key: &str, + map: Arc>, + connections_key: &str, +) { + // Incoming miner handshakes need the same post-handshake state exchange + // as outgoing handshakes so a bootstrap with only incoming peers can mine. + let wallet = match Wallet::try_obtain_wallet(wallet_key.to_string(), None).await { + Ok(wallet) => wallet, + Err(err) => { + error!("[handshake] unable to load wallet for incoming miner setup: {err}"); + return; + } + }; + + if let Err(err) = register_connected_wallet( + stream.clone(), + map.clone(), + connections_key.to_string(), + &wallet, + ) + .await + { + warn!("[wallet_registry] incoming peer registration failed after handshake: {err}"); + } + + if let Err(err) = + sync_wallet_registry(stream.clone(), db, map.clone(), connections_key.to_string()).await + { + warn!("[wallet_registry] incoming peer sync failed after handshake: {err}"); + } + + announce_self_to_network( + stream, + &wallet.saved.short_address, + map, + db, + wallet_key, + connections_key, + ) + .await; + + if let Err(err) = NodeInfo::rebuild_mined_counts_from_chain(db).await { + error!("[startup] failed to rebuild mined counts after incoming handshake: {err}"); + } +} + // this function validates incoming handshake and determined // what type of connection was made pub async fn handle_handshake( @@ -150,6 +205,11 @@ pub async fn handle_handshake( if connections_key != "false" { // Once the peer is accepted into memory, return our signed // handshake response and start the long-lived RPC loop. + let is_miner = connection_type == "miner"; + let post_handshake_stream = stream.clone(); + let post_handshake_map = map.clone(); + let post_handshake_wallet_key = wallet_key.clone(); + let post_handshake_connections_key = connections_key.clone(); let params = CombineAndSendDataParams { stream, db: db.clone(), @@ -159,7 +219,16 @@ pub async fn handle_handshake( map, returned_address: received_address.clone(), }; - let _ = combine_and_send_data(params).await; + if combine_and_send_data(params).await.is_ok() && is_miner { + complete_incoming_miner_setup( + post_handshake_stream, + &db, + &post_handshake_wallet_key, + post_handshake_map, + &post_handshake_connections_key, + ) + .await; + } } else { drop_failed_handshake(&stream).await; } diff --git a/src/rpc/server/handshake_processing.rs b/src/rpc/server/handshake_processing.rs index 33ff8f9..7d28ffc 100644 --- a/src/rpc/server/handshake_processing.rs +++ b/src/rpc/server/handshake_processing.rs @@ -149,7 +149,7 @@ pub async fn return_handshake( Ok(()) } -pub async fn combine_and_send_data(params: CombineAndSendDataParams) { +pub async fn combine_and_send_data(params: CombineAndSendDataParams) -> Result<(), String> { let CombineAndSendDataParams { stream, db, @@ -163,7 +163,7 @@ pub async fn combine_and_send_data(params: CombineAndSendDataParams) { let result = generate_and_sign_message(&connection_type, wallet_key.clone()).await; if let Err(err) = result { error!("Failed: {err}"); - return; + return Err(err); } let (address, message, signed_message) = result.unwrap(); @@ -179,7 +179,7 @@ pub async fn combine_and_send_data(params: CombineAndSendDataParams) { .await { error!("Handshake failed: {err}"); - return; + return Err(err); } // start the rpc loop @@ -190,4 +190,6 @@ pub async fn combine_and_send_data(params: CombineAndSendDataParams) { wallet_key, map, )); + + Ok(()) }