This commit is contained in:
viraladmin 2026-05-26 12:58:35 -06:00
parent 4e22fd2aae
commit b23949d2d8
11 changed files with 122 additions and 19 deletions

View File

@ -7,6 +7,7 @@ use crate::miner::flag::{
is_mining_running, is_mining_stop_requested, is_normal_mode, set_mining_state, MiningState,
};
use crate::records::memory::connections::peer_connection_count;
use crate::records::memory::network_mapping::NodeInfo;
use crate::records::memory::response_channels::Command;
use crate::records::record_chain::save::save_block;
use crate::records::record_chain::structs::{SaveBlockParams, SaveType};
@ -72,6 +73,7 @@ async fn create_genesis_block(
let mut nonce: u8 = 0;
let mut genesis_search_logged = false;
let mut disconnected_logged = false;
let mut registration_logged = false;
let validator_mode = Settings::load()
.map(|settings| settings.validator)
.unwrap_or(false);
@ -100,6 +102,16 @@ async fn create_genesis_block(
set_mining_state(MiningState::Idle);
break Ok(());
}
if !NodeInfo::address_checkup(miner, 0).await {
if !registration_logged {
info!("[genesis] waiting for local node address to be registered before mining genesis");
registration_logged = true;
}
set_mining_state(MiningState::Idle);
let _ = sleep(Duration::from_millis(100)).await;
continue;
}
registration_logged = false;
// Mark the miner running once per active stretch.
if !is_mining_running() {
set_mining_state(MiningState::Running);

View File

@ -54,6 +54,7 @@ pub async fn mine_block(
// can stop quickly when another peer advances the chain.
let mut expected_block_height = get_height(db) + 1;
let mut fairness_paused_height: Option<u32> = None;
let mut registration_paused_height: Option<u32> = None;
let mut was_stopped = true;
loop {
@ -72,11 +73,18 @@ pub async fn mine_block(
// Mining requires the local node wallet to be announced in
// the network map. Offline/solo nodes stay idle instead of
// producing private blocks that can later be submitted in bulk.
if registration_paused_height != Some(current_block_number) {
info!(
"[mining] waiting for local node address to be registered before mining block {current_block_number}"
);
registration_paused_height = Some(current_block_number);
}
set_mining_state(MiningState::Idle);
was_stopped = true;
sleep(Duration::from_millis(250)).await;
continue;
}
registration_paused_height = None;
if !wait_for_fairness_gate(
db,

View File

@ -33,6 +33,7 @@ pub async fn deep_sync_rollback(mut params: CheckUp, wallet_key: &str) {
// against the next lower local height.
let undo_transactions_params = UndoTransactions {
start_height: params.local_height,
replay_to_height: params.remote_height,
db: params.db.clone(),
stream: params.stream.clone(),
map: params.map.clone(),

View File

@ -196,6 +196,7 @@ pub async fn checkup(params: OrphanCheckup, wallet_key: &str) -> Result<(), Stri
Ok(()) => {
let undo_transactions_params = UndoTransactions {
start_height: height,
replay_to_height: params.remote_height,
db: params.db.clone(),
stream: params.stream.clone(),
map: params.map.clone(),

View File

@ -18,12 +18,13 @@ use std::collections::HashSet;
pub async fn save_new_blocks(
params: &UndoTransactions,
max_height: u32,
replay_to_height: u32,
wallet_key: &str,
mut true_start_height: u32,
) -> Result<(), String> {
// after rollback, request and save each remote block from the
// divergence point up to the height we need to restore
// After rollback, save replacement blocks only up to the height
// proven by the peer/orphan context. The old local tip may include
// blocks mined on the losing branch while orphan correction was running.
let mut hydrated_heights = HashSet::new();
loop {
let mut resolved_from_staging = false;
@ -112,7 +113,7 @@ pub async fn save_new_blocks(
if resolved_from_staging {
// Continue replaying if staged data supplied this height and more
// replacement heights remain.
match max_height.cmp(&true_start_height) {
match replay_to_height.cmp(&true_start_height) {
std::cmp::Ordering::Equal => break,
std::cmp::Ordering::Greater => {
true_start_height += 1;
@ -213,7 +214,7 @@ pub async fn save_new_blocks(
// continue until the requested replacement range
// has been fully saved locally
match max_height.cmp(&true_start_height) {
match replay_to_height.cmp(&true_start_height) {
std::cmp::Ordering::Equal => break,
std::cmp::Ordering::Greater => true_start_height += 1,
std::cmp::Ordering::Less => break,

View File

@ -68,6 +68,7 @@ pub async fn snapshot_verified(params: UndoTransactions, wallet_key: &str) -> bo
}
let undo_transactions_params = UndoTransactions {
start_height: snap_height,
replay_to_height: params.replay_to_height,
db: params.db,
stream: params.stream,
map: params.map,

View File

@ -35,6 +35,7 @@ pub struct OrphanCheckup2 {
// reorganization decision has already been made
pub struct UndoTransactions {
pub start_height: u32,
pub replay_to_height: u32,
pub db: Db,
pub stream: Arc<Mutex<TcpStream>>,
pub map: Arc<Mutex<Command>>,

View File

@ -166,9 +166,10 @@ async fn replay_staged_torrents(params: &OrphanCheckup2, wallet_key: &str) -> Re
// Every staged candidate for the current expected height was
// exhausted without extending the chain, so stop replay here.
if retryable_pending {
return Err(format!(
"Replay waiting for block pieces at height {expected_height}"
));
warn!(
"[orphan] replay paused at height {expected_height}; candidate pieces are not currently available"
);
return Ok(());
}
return Ok(());
}
@ -192,6 +193,7 @@ pub async fn sync_checkup(params: OrphanCheckup2, wallet_key: &str) -> Result<()
}
let undo_transactions_params = UndoTransactions {
start_height: params.local_height,
replay_to_height: params.remote_height,
db: params.db.clone(),
stream: params.stream.clone(),
map: params.map.clone(),
@ -211,8 +213,10 @@ pub async fn sync_checkup(params: OrphanCheckup2, wallet_key: &str) -> Result<()
error!("[orphan] staged torrent replay error: {err}");
}
}
if replay_waiting && !params.node_syncing {
return Err("orphan replay is waiting for block data".to_string());
if replay_waiting {
warn!(
"[orphan] replay is waiting for block data; leaving candidates pending for a later pass"
);
}
if !params.node_syncing {
end_reorg_lock();
@ -257,8 +261,10 @@ pub async fn sync_checkup(params: OrphanCheckup2, wallet_key: &str) -> Result<()
if get_height(&params.db) > height_before_replay {
replay_waiting = false;
}
if replay_waiting && !params.node_syncing {
return Err("orphan replay is waiting for block data".to_string());
if replay_waiting {
warn!(
"[orphan] replay is waiting for block data; leaving candidates pending for a later pass"
);
}
if !params.node_syncing {
end_reorg_lock();

View File

@ -25,8 +25,9 @@ pub async fn undo_transactions(params: UndoTransactions, wallet_key: &str) -> Re
// walk backward from the current tip to the selected
// rollback height and undo each block in reverse order
let true_start_height = params.start_height;
let max_height = get_height(&params.db);
let mut current_height = max_height;
let rollback_tip = get_height(&params.db);
let replay_to_height = params.replay_to_height.max(true_start_height);
let mut current_height = rollback_tip;
let mut rolled_back_transactions = Vec::new();
loop {
// Load the canonical block before deleting it so every transaction can
@ -162,7 +163,7 @@ pub async fn undo_transactions(params: UndoTransactions, wallet_key: &str) -> Re
// rebuild mined counts after rollback, then fetch and save the
// replacement blocks, and finally rebuild mined counts again
NodeInfo::rebuild_mined_counts_from_chain(&params.db).await?;
save_new_blocks(&params, max_height, wallet_key, true_start_height).await?;
save_new_blocks(&params, replay_to_height, wallet_key, true_start_height).await?;
NodeInfo::rebuild_mined_counts_from_chain(&params.db).await?;
Ok(())
}

View File

@ -1,5 +1,9 @@
use crate::log::{error, warn};
use crate::records::memory::network_mapping::NodeInfo;
use crate::records::memory::response_channels::generate_uid;
use crate::records::memory::response_channels::Command;
use crate::rpc::client::register_wallet::register_connected_wallet;
use crate::rpc::client::wallet_registry_sync::sync_wallet_registry;
use crate::rpc::responses::RpcResponse;
use crate::rpc::server::connection_memory_manager::write_to_memory;
use crate::rpc::server::handshake_processing::{combine_and_send_data, parse_received_data};
@ -7,6 +11,8 @@ use crate::rpc::server::handshake_verifications::{connection_count, perform_hand
use crate::rpc::server::structs::{CombineAndSendDataParams, HandshakeTestParams};
use crate::rpc::server::tests::{endpoint_port, is_port_open};
use crate::sled::Db;
use crate::startup::network_broadcast::announce_self_to_network;
use crate::wallets::structures::Wallet;
use crate::Arc;
use crate::AsyncWriteExt;
use crate::Mutex;
@ -31,6 +37,55 @@ async fn get_connection_counts() -> (u8, u8) {
(incoming, outgoing)
}
async fn complete_incoming_miner_setup(
stream: Arc<Mutex<TcpStream>>,
db: &Db,
wallet_key: &str,
map: Arc<Mutex<Command>>,
connections_key: &str,
) {
// Incoming miner handshakes need the same post-handshake state exchange
// as outgoing handshakes so a bootstrap with only incoming peers can mine.
let wallet = match Wallet::try_obtain_wallet(wallet_key.to_string(), None).await {
Ok(wallet) => wallet,
Err(err) => {
error!("[handshake] unable to load wallet for incoming miner setup: {err}");
return;
}
};
if let Err(err) = register_connected_wallet(
stream.clone(),
map.clone(),
connections_key.to_string(),
&wallet,
)
.await
{
warn!("[wallet_registry] incoming peer registration failed after handshake: {err}");
}
if let Err(err) =
sync_wallet_registry(stream.clone(), db, map.clone(), connections_key.to_string()).await
{
warn!("[wallet_registry] incoming peer sync failed after handshake: {err}");
}
announce_self_to_network(
stream,
&wallet.saved.short_address,
map,
db,
wallet_key,
connections_key,
)
.await;
if let Err(err) = NodeInfo::rebuild_mined_counts_from_chain(db).await {
error!("[startup] failed to rebuild mined counts after incoming handshake: {err}");
}
}
// this function validates incoming handshake and determined
// what type of connection was made
pub async fn handle_handshake(
@ -150,6 +205,11 @@ pub async fn handle_handshake(
if connections_key != "false" {
// Once the peer is accepted into memory, return our signed
// handshake response and start the long-lived RPC loop.
let is_miner = connection_type == "miner";
let post_handshake_stream = stream.clone();
let post_handshake_map = map.clone();
let post_handshake_wallet_key = wallet_key.clone();
let post_handshake_connections_key = connections_key.clone();
let params = CombineAndSendDataParams {
stream,
db: db.clone(),
@ -159,7 +219,16 @@ pub async fn handle_handshake(
map,
returned_address: received_address.clone(),
};
let _ = combine_and_send_data(params).await;
if combine_and_send_data(params).await.is_ok() && is_miner {
complete_incoming_miner_setup(
post_handshake_stream,
&db,
&post_handshake_wallet_key,
post_handshake_map,
&post_handshake_connections_key,
)
.await;
}
} else {
drop_failed_handshake(&stream).await;
}

View File

@ -149,7 +149,7 @@ pub async fn return_handshake(
Ok(())
}
pub async fn combine_and_send_data(params: CombineAndSendDataParams) {
pub async fn combine_and_send_data(params: CombineAndSendDataParams) -> Result<(), String> {
let CombineAndSendDataParams {
stream,
db,
@ -163,7 +163,7 @@ pub async fn combine_and_send_data(params: CombineAndSendDataParams) {
let result = generate_and_sign_message(&connection_type, wallet_key.clone()).await;
if let Err(err) = result {
error!("Failed: {err}");
return;
return Err(err);
}
let (address, message, signed_message) = result.unwrap();
@ -179,7 +179,7 @@ pub async fn combine_and_send_data(params: CombineAndSendDataParams) {
.await
{
error!("Handshake failed: {err}");
return;
return Err(err);
}
// start the rpc loop
@ -190,4 +190,6 @@ pub async fn combine_and_send_data(params: CombineAndSendDataParams) {
wallet_key,
map,
));
Ok(())
}