node startup bug fixes

This commit is contained in:
viraladmin 2026-06-04 20:45:43 -06:00
parent 5c4c1baf7e
commit 413adbf241
9 changed files with 285 additions and 42 deletions

View File

@ -223,7 +223,7 @@ impl Connection {
stream,
client_type,
wallet_public_key,
command_map,
command_map: _,
} = params;
let ip_bytes = ip_to_binary(&ip);
@ -272,15 +272,6 @@ impl Connection {
);
self.connection_map.insert(connection_key, connection_info);
if client_type == ClientType::Miner {
spawn_monitor_update(
ip.clone(),
MONITOR_ACTION_ADD,
wallet_public_key.clone(),
port,
);
Connection::client_checkup(stream, connection_type, ip, port, command_map);
}
true
}
@ -470,7 +461,6 @@ impl Connection {
for (connection_key, info) in self.connection_map.iter_mut() {
if connection_key.ip == ip_bytes && connection_key.port == port {
info.wallet_registry_synced = true;
info.ready = info.network_map_synced;
return true;
}
}
@ -485,7 +475,43 @@ impl Connection {
for (connection_key, info) in self.connection_map.iter_mut() {
if connection_key.ip == ip_bytes && connection_key.port == port {
info.network_map_synced = true;
info.ready = info.wallet_registry_synced;
return true;
}
}
false
}
pub fn mark_operational(&mut self, key: &str, command_map: Arc<Mutex<Command>>) -> bool {
let Some((ip, port)) = split_ip_port_key(key) else {
return false;
};
let ip_bytes = ip_to_binary(&ip);
for (connection_key, info) in self.connection_map.iter_mut() {
if connection_key.ip == ip_bytes && connection_key.port == port {
if info.ready {
return true;
}
if ClientType::from_bytes(&info.client_type) != Some(ClientType::Miner)
|| !info.wallet_registry_synced
|| !info.network_map_synced
{
return false;
}
info.ready = true;
spawn_monitor_update(
ip.clone(),
MONITOR_ACTION_ADD,
info.wallet_public_key.clone(),
port,
);
Connection::client_checkup(
Arc::clone(&info.stream),
ConnectionType::from_bytes(&info.connection_type)
.unwrap_or(ConnectionType::Incoming),
ip,
port,
command_map,
);
return true;
}
}
@ -543,6 +569,26 @@ impl Connection {
.collect()
}
pub fn get_startup_synced_peer_streams_with_keys(
&self,
) -> Vec<(String, Arc<Mutex<TcpStream>>)> {
self.connection_map
.iter()
.filter_map(|(key, connection_info)| {
if ClientType::from_bytes(&connection_info.client_type) != Some(ClientType::Miner)
|| connection_info.ready
|| !connection_info.wallet_registry_synced
|| !connection_info.network_map_synced
{
return None;
}
let ip = binary_to_ip(key.ip.clone());
let connections_key = format!("{}:{}", ip, key.port);
Some((connections_key, Arc::clone(&connection_info.stream)))
})
.collect()
}
// Resolve a stored outgoing node connection back to its live stream.
pub fn get_stream_for_outgoing(&self, ip: &str, port: u16) -> Option<Arc<Mutex<TcpStream>>> {
let ip_bytes = ip_to_binary(ip);
@ -805,6 +851,15 @@ pub async fn mark_peer_network_map_synced(key: &str) -> bool {
.unwrap_or(false)
}
pub async fn mark_peer_operational(key: &str, map: Arc<Mutex<Command>>) -> bool {
CONNECTIONS
.write()
.await
.as_mut()
.map(|connection| connection.mark_operational(key, map))
.unwrap_or(false)
}
pub async fn live_miner_peer_streams() -> Vec<(String, Arc<Mutex<TcpStream>>)> {
// Snapshot consensus and recovery checks vote only across currently
// connected miner peers, regardless of incoming/outgoing direction.
@ -831,6 +886,15 @@ pub async fn live_miner_peer_streams() -> Vec<(String, Arc<Mutex<TcpStream>>)> {
.unwrap_or_default()
}
pub async fn startup_synced_peer_streams() -> Vec<(String, Arc<Mutex<TcpStream>>)> {
CONNECTIONS
.read()
.await
.as_ref()
.map(|connection| connection.get_startup_synced_peer_streams_with_keys())
.unwrap_or_default()
}
pub async fn get_client_type_from_memory(key: &str) -> Option<ClientType> {
// Recover the stored client role from the serialized connection key
// used throughout the RPC layer.

View File

@ -227,14 +227,20 @@ impl NodeInfo {
}
if !remote_ip.is_empty() {
let broadcast_map = map.clone();
let broadcast_edit = edit.clone();
let broadcast_remote_ip = remote_ip.clone();
let broadcast_connections_key = connections_key.clone();
tokio::spawn(async move {
Self::broadcast_node(
map.clone(),
&edit,
&remote_ip,
broadcast_map,
&broadcast_edit,
&broadcast_remote_ip,
NodeEditType::Add,
&connections_key,
&broadcast_connections_key,
)
.await;
});
}
RpcResponse::Binary(b"Success".to_vec())

View File

@ -15,8 +15,8 @@ use crate::orphans::sync_check::sync_checkup;
use crate::orphans::torrent_candidates::hydrate_torrent_candidates;
use crate::records::block_height::get_block_height::get_height;
use crate::records::memory::connections::{
mark_peer_network_map_synced, mark_peer_wallet_registry_synced, set_reconnect_context,
CONNECTIONS,
mark_peer_network_map_synced, mark_peer_operational, mark_peer_wallet_registry_synced,
set_reconnect_context, startup_synced_peer_streams, CONNECTIONS,
};
use crate::records::memory::enums::{ClientType, ConnectionType};
use crate::records::memory::network_mapping::NodeInfo;
@ -33,6 +33,7 @@ use crate::rpc::handshake_constants::{
HANDSHAKE_SIGNATURE_OFFSET,
};
use crate::rpc::responses::RpcResponse;
use crate::rpc::server::connection_memory_manager::remove_key_from_memory;
use crate::rpc::server::rpc_command_loop::start_loop;
use crate::sled::Db;
use crate::startup::network_broadcast::announce_self_to_network;
@ -174,9 +175,12 @@ pub async fn bootstrap_peer_discovery(mut params: BootstrapParams) -> Result<(),
)
.await
.map_err(|e| format!("Sync error: {e}"))?;
if !local_genesis_exists && !genesis_checkup().await {
if !local_genesis_exists && !genesis_checkup().await && remote_height > 0 {
return Err("Sync completed without obtaining remote genesis".to_string());
}
if !local_genesis_exists && !genesis_checkup().await {
break;
}
continue;
}
break;
@ -215,6 +219,9 @@ pub async fn bootstrap_peer_discovery(mut params: BootstrapParams) -> Result<(),
}
info!("[sync] post-sync checks complete, mining resuming");
for (peer_key, _) in startup_synced_peer_streams().await {
mark_peer_operational(&peer_key, params.map.clone()).await;
}
set_node_mode(NodeMode::Normal);
clear_mining_stop_request();
set_mining_state(MiningState::Idle);
@ -343,6 +350,7 @@ pub async fn process_handshake_response(
)
.await
{
remove_key_from_memory(&connections_key).await;
return Err(io::Error::other(format!(
"Wallet registry sync failed after handshake: {err}"
)));
@ -359,7 +367,13 @@ pub async fn process_handshake_response(
&connections_key,
)
.await
.map_err(|err| io::Error::other(format!("Network map sync failed: {err}")))?;
.map_err(|err| {
let connections_key = connections_key.clone();
tokio::spawn(async move {
remove_key_from_memory(&connections_key).await;
});
io::Error::other(format!("Network map sync failed: {err}"))
})?;
mark_peer_network_map_synced(&connections_key).await;
if let Err(err) = NodeInfo::rebuild_mined_counts_from_chain(&params.db).await {
error!("[startup] failed to rebuild mined counts from local chain: {err}");
@ -384,8 +398,17 @@ pub async fn process_handshake_response(
&connections_key,
)
.await
.map_err(|err| io::Error::other(format!("Network map sync failed: {err}")))?;
.map_err(|err| {
let connections_key = connections_key.clone();
tokio::spawn(async move {
remove_key_from_memory(&connections_key).await;
});
io::Error::other(format!("Network map sync failed: {err}"))
})?;
mark_peer_network_map_synced(&connections_key).await;
if crate::miner::flag::is_normal_mode() {
mark_peer_operational(&connections_key, params.map.clone()).await;
}
}
Ok(())
}

View File

@ -1,7 +1,7 @@
use crate::common::check_genesis::genesis_checkup;
use crate::common::skein::skein_128_hash_bytes;
use crate::log::{error, warn};
use crate::miner::flag::{is_reorganizing_mode, is_syncing_mode};
use crate::miner::flag::{is_normal_mode, is_reorganizing_mode, is_syncing_mode};
use crate::orphans::checkup_state::{
finish_orphan_check, request_orphan_recheck, try_begin_orphan_check,
};
@ -62,7 +62,7 @@ pub async fn trigger_orphan_check(
map: Arc<Mutex<Command>>,
connections_key: String,
) {
if is_syncing_mode() {
if !is_normal_mode() || is_syncing_mode() {
return;
}
if !try_begin_orphan_check() {
@ -125,9 +125,10 @@ pub async fn torrent_submission(
) -> TorrentSubmissionOutcome {
let expected_height = next_expected_height(db).await;
let local_height = get_height(db);
let syncing = is_syncing_mode();
let normal = is_normal_mode();
let syncing = !normal || is_syncing_mode();
let reorganizing = is_reorganizing_mode();
let process_now = !syncing && !reorganizing && height == expected_height;
let process_now = normal && !syncing && !reorganizing && height == expected_height;
if height < expected_height && !(height > 0 && within_orphan_window(local_height, height)) {
// Far-stale torrents cannot affect the orphan window. Ignore them
@ -141,7 +142,7 @@ pub async fn torrent_submission(
// The sender receives an acknowledgement for staging even when the
// torrent cannot be processed immediately.
let staged_response = if syncing {
RpcResponse::Binary("Torrent staged while syncing.".as_bytes().to_vec())
RpcResponse::Binary("Torrent staged while startup/syncing.".as_bytes().to_vec())
} else if reorganizing {
RpcResponse::Binary("Torrent staged while reorganizing.".as_bytes().to_vec())
} else if height == expected_height {

View File

@ -54,13 +54,16 @@ pub async fn route_reply(
return Ok(());
}
// Unknown replies usually mean a stale or forged UID. Drain the
// payload so the stream remains aligned for future commands.
bad_rpc_call::record(ip, client_type, db, wallet).await;
let retired = is_retired_entry(map.clone(), uid).await;
if retired {
// Retired UIDs are normal timeout fallout: the requester gave up,
// but the peer eventually answered. Drain without penalizing so
// startup/sync latency does not poison an otherwise valid peer.
warn!("[rpc] late reply arrived for retired uid: {uid:?}");
} else {
// Unknown, never-reserved UIDs can still indicate malformed or
// forged traffic, so those keep counting as bad RPC behavior.
bad_rpc_call::record(ip, client_type, db, wallet).await;
warn!("[rpc] reply arrived for unknown uid: {uid:?}");
}
let _ = read_bytes_from_stream::read_usize_from_stream(

View File

@ -1,20 +1,31 @@
use crate::log::error;
use crate::common::check_genesis::genesis_checkup;
use crate::log::{error, warn};
use crate::miner::flag::{
clear_mining_stop_request, request_mining_stop, set_mining_state, set_node_mode, MiningState,
NodeMode,
};
use crate::orphans::structs::OrphanCheckup2;
use crate::orphans::sync_check::sync_checkup;
use crate::orphans::torrent_candidates::hydrate_torrent_candidates;
use crate::records::block_height::get_block_height::get_height;
use crate::records::memory::connections::{
mark_peer_network_map_synced, mark_peer_wallet_registry_synced,
mark_peer_network_map_synced, mark_peer_operational, mark_peer_wallet_registry_synced,
};
use crate::records::memory::network_mapping::NodeInfo;
use crate::records::memory::response_channels::generate_uid;
use crate::records::memory::response_channels::Command;
use crate::records::wallet_registry::{register_short_address, WalletRegistrationResult};
use crate::rpc::client::syncing::node_syncing;
use crate::rpc::client::wallet_registry_sync::sync_wallet_registry_with_retries;
use crate::rpc::responses::RpcResponse;
use crate::rpc::server::connection_memory_manager::write_to_memory;
use crate::rpc::server::connection_memory_manager::{remove_key_from_memory, write_to_memory};
use crate::rpc::server::handshake_processing::{combine_and_send_data, parse_received_data};
use crate::rpc::server::handshake_verifications::{connection_count, perform_handshake_tests};
use crate::rpc::server::structs::{CombineAndSendDataParams, HandshakeTestParams};
use crate::rpc::server::tests::{endpoint_port, is_port_open};
use crate::sled::Db;
use crate::startup::network_broadcast::announce_self_to_network;
use crate::startup::remote_height::request_remote_height;
use crate::wallets::structures::Wallet;
use crate::Arc;
use crate::AsyncWriteExt;
@ -40,6 +51,83 @@ async fn get_connection_counts() -> (u8, u8) {
(incoming, outgoing)
}
async fn sync_incoming_peer_before_operational(
stream: Arc<Mutex<TcpStream>>,
db: &Db,
wallet: Arc<Wallet>,
map: Arc<Mutex<Command>>,
connections_key: &str,
) -> Result<bool, String> {
let local_height = get_height(db);
let remote_height =
request_remote_height(stream.clone(), map.clone(), connections_key.to_string()).await?;
let local_genesis_exists = genesis_checkup().await;
if local_genesis_exists && remote_height < local_height {
warn!(
"[startup] incoming peer is behind local chain and will remain non-operational until it catches up: local_height={local_height} remote_height={remote_height}"
);
return Ok(false);
}
if !local_genesis_exists || remote_height > local_height + 10 {
set_node_mode(NodeMode::Syncing);
request_mining_stop();
set_mining_state(MiningState::Idle);
node_syncing(
stream.clone(),
db,
remote_height,
map.clone(),
true,
wallet.clone(),
connections_key.to_string(),
)
.await
.map_err(|err| format!("incoming sync error: {err}"))?;
if !local_genesis_exists && !genesis_checkup().await && remote_height > 0 {
return Err("incoming sync completed without obtaining remote genesis".to_string());
}
}
let post_sync_local_height = get_height(db);
let post_sync_remote_height =
request_remote_height(stream.clone(), map.clone(), connections_key.to_string()).await?;
if post_sync_remote_height > post_sync_local_height {
match hydrate_torrent_candidates(stream.clone(), map.clone(), connections_key.to_string())
.await
{
Ok(imported) => {
if imported > 0 {
warn!(
"[sync] hydrated {imported} torrent candidates before incoming post-sync orphan check"
);
}
}
Err(err) => warn!("[sync] failed to hydrate incoming torrent candidates: {err}"),
}
let orphan_checkup_params = OrphanCheckup2 {
stream: stream.clone(),
db: db.clone(),
local_height: post_sync_local_height,
remote_height: post_sync_remote_height,
recheck_from_height: Some(post_sync_local_height.min(post_sync_remote_height)),
map: map.clone(),
node_syncing: true,
connections_key: connections_key.to_string(),
};
if let Err(err) = sync_checkup(orphan_checkup_params, wallet).await {
warn!("[sync] Incoming post-sync orphan check error: {err}");
}
}
Ok(true)
}
async fn complete_incoming_miner_setup(
stream: Arc<Mutex<TcpStream>>,
db: &Db,
@ -59,19 +147,52 @@ async fn complete_incoming_miner_setup(
.await
{
error!("[startup] incoming peer wallet registry sync failed: {err}");
remove_key_from_memory(connections_key).await;
return;
}
mark_peer_wallet_registry_synced(connections_key).await;
let short_address = wallet.saved.short_address.clone();
if let Err(err) =
announce_self_to_network(stream, &short_address, map, db, wallet, connections_key).await
if let Err(err) = announce_self_to_network(
stream.clone(),
&short_address,
map.clone(),
db,
wallet.clone(),
connections_key,
)
.await
{
error!("[startup] incoming peer network map sync failed: {err}");
remove_key_from_memory(connections_key).await;
return;
}
mark_peer_network_map_synced(connections_key).await;
let operational = match sync_incoming_peer_before_operational(
stream.clone(),
db,
wallet.clone(),
map.clone(),
connections_key,
)
.await
{
Ok(operational) => operational,
Err(err) => {
error!("[startup] incoming peer chain sync failed: {err}");
remove_key_from_memory(connections_key).await;
return;
}
};
if operational {
mark_peer_operational(connections_key, map.clone()).await;
set_node_mode(NodeMode::Normal);
clear_mining_stop_request();
set_mining_state(MiningState::Idle);
}
if let Err(err) = NodeInfo::rebuild_mined_counts_from_chain(db).await {
error!("[startup] failed to rebuild mined counts after incoming handshake: {err}");
}

View File

@ -126,8 +126,11 @@ pub async fn announce_self_to_network(
.map_err(|_| "timed out waiting for network self-announcement response".to_string())?
.ok_or_else(|| "network self-announcement response channel closed".to_string())?;
if binary_to_string(buffer.clone()) != "Success" {
return Err("network self-announcement was rejected".to_string());
let response = binary_to_string(buffer.clone());
if response != "Success" {
return Err(format!(
"network self-announcement was rejected: {response}"
));
}
get_network_mapping(

View File

@ -6,7 +6,7 @@ use crate::records::memory::response_channels::{reserve_entry, Command};
use crate::rpc::command_maps::RPC_SUBMIT_TORRENT;
use crate::rpc::responses::RpcResponse;
use crate::torrent::structs::{Info, Torrent};
use crate::torrent::torrenting_system::get_nodes::get_nodes_from_memory;
use crate::torrent::torrenting_system::get_nodes::get_torrent_broadcast_nodes_from_memory;
use crate::torrent::torrenting_system::torrent_cache::should_broadcast_torrent;
use crate::Arc;
use crate::File;
@ -154,7 +154,7 @@ pub async fn broadcast_new_torrent_to_peers(
let torrent_len = 4 + torrent_bytes.len() as u32;
// Send the torrent to all currently connected miner peers.
let peers = get_nodes_from_memory().await;
let peers = get_torrent_broadcast_nodes_from_memory().await;
for (connections_key, stream) in peers {
// Each peer needs its own reply mapping entry and UID.
let (uid_bytes, _tx, _rx) = reserve_entry(map.clone()).await;

View File

@ -30,3 +30,25 @@ pub async fn get_nodes_from_memory() -> Vec<(String, Arc<Mutex<TcpStream>>)> {
drop(connection_storage);
nodes
}
pub async fn get_torrent_broadcast_nodes_from_memory() -> Vec<(String, Arc<Mutex<TcpStream>>)> {
// Torrent announcements are allowed to reach miner peers that are
// still starting/syncing so they can stage new candidates while
// catching up. Consensus/piece-selection paths must keep using
// get_nodes_from_memory(), which requires ready peers.
let connection_storage = CONNECTIONS.read().await;
let mut nodes = Vec::new();
if let Some(connection) = &*connection_storage {
for connection_info in connection.connection_map.values() {
if ClientType::from_bytes(&connection_info.client_type) != Some(ClientType::Miner) {
continue;
}
let ip = binary_to_ip(connection_info.ip.clone());
let port = connection_info.port;
let key = format!("{ip}:{port}");
let stream_arc = Arc::clone(&connection_info.stream);
nodes.push((key, stream_arc));
}
}
nodes
}