fixed syncing issues

This commit is contained in:
viraladmin 2026-06-05 09:36:06 -06:00
parent a621522170
commit 04b93275fa
33 changed files with 2117 additions and 2004 deletions

View File

@ -39,9 +39,18 @@ pub async fn create_genesis_block(
return; return;
} }
}; };
handle_response_and_save_torrent(0, &db, torrent, wallet, map.clone(), false, false) handle_response_and_save_torrent(
.await 0,
.ok(); &db,
torrent,
wallet,
map.clone(),
false,
true,
false,
)
.await
.ok();
} }
} }
} }

View File

@ -116,6 +116,7 @@ async fn candidate_attaches_before_rollback(
block_number: height, block_number: height,
allow_during_reorg: true, allow_during_reorg: true,
allow_historical: true, allow_historical: true,
allow_startup_peers: params.node_syncing,
db: params.db.clone(), db: params.db.clone(),
verification_service: std::sync::Arc::new(verification_service), verification_service: std::sync::Arc::new(verification_service),
map: params.map.clone(), map: params.map.clone(),

View File

@ -75,6 +75,7 @@ pub async fn save_new_blocks(
wallet.clone(), wallet.clone(),
params.map.clone(), params.map.clone(),
true, true,
params.node_syncing,
true, true,
) )
.await .await
@ -186,6 +187,7 @@ pub async fn save_new_blocks(
wallet.clone(), wallet.clone(),
params.map.clone(), params.map.clone(),
true, true,
params.node_syncing,
true, true,
) )
.await?; .await?;

View File

@ -126,6 +126,7 @@ async fn replay_staged_torrents(
wallet.clone(), wallet.clone(),
params.map.clone(), params.map.clone(),
true, true,
params.node_syncing,
true, true,
) )
.await .await

View File

@ -190,18 +190,25 @@ pub async fn bootstrap_peer_discovery(mut params: BootstrapParams) -> Result<(),
let post_sync_remote_height = let post_sync_remote_height =
request_remote_height(stream.clone(), params.map.clone(), current_key.clone()).await?; request_remote_height(stream.clone(), params.map.clone(), current_key.clone()).await?;
if post_sync_remote_height != post_sync_local_height { let imported_candidates =
match hydrate_torrent_candidates(stream.clone(), params.map.clone(), current_key.clone()) match hydrate_torrent_candidates(stream.clone(), params.map.clone(), current_key.clone())
.await .await
{ {
Ok(imported) => { Ok(imported) => {
if imported > 0 { if imported > 0 {
warn!("[sync] hydrated {imported} torrent candidates before post-sync orphan check"); warn!(
"[sync] hydrated {imported} torrent candidates before post-sync orphan check"
);
} }
imported
} }
Err(err) => warn!("[sync] failed to hydrate torrent candidates: {err}"), Err(err) => {
} warn!("[sync] failed to hydrate torrent candidates: {err}");
0
}
};
if post_sync_remote_height != post_sync_local_height || imported_candidates > 0 {
let orphan_checkup_params = OrphanCheckup2 { let orphan_checkup_params = OrphanCheckup2 {
stream: stream.clone(), stream: stream.clone(),
db: params.db.clone(), db: params.db.clone(),

View File

@ -80,6 +80,7 @@ pub async fn node_syncing(
wallet.clone(), wallet.clone(),
map.clone(), map.clone(),
false, false,
node_syncing,
false, false,
) )
.await .await

View File

@ -188,6 +188,7 @@ pub async fn torrent_submission(
torrent, torrent,
staged_path, staged_path,
false, false,
false,
db_clone.clone(), db_clone.clone(),
map_for_download, map_for_download,
) )

View File

@ -14,6 +14,7 @@ use crate::records::memory::connections::{
use crate::records::memory::network_mapping::NodeInfo; use crate::records::memory::network_mapping::NodeInfo;
use crate::records::memory::response_channels::generate_uid; use crate::records::memory::response_channels::generate_uid;
use crate::records::memory::response_channels::Command; use crate::records::memory::response_channels::Command;
use crate::records::memory::structs::Connection;
use crate::records::wallet_registry::{register_short_address, WalletRegistrationResult}; use crate::records::wallet_registry::{register_short_address, WalletRegistrationResult};
use crate::rpc::client::syncing::node_syncing; use crate::rpc::client::syncing::node_syncing;
use crate::rpc::client::wallet_registry_sync::sync_wallet_registry_with_retries; use crate::rpc::client::wallet_registry_sync::sync_wallet_registry_with_retries;
@ -24,11 +25,13 @@ use crate::rpc::server::handshake_verifications::{connection_count, perform_hand
use crate::rpc::server::structs::{CombineAndSendDataParams, HandshakeTestParams}; use crate::rpc::server::structs::{CombineAndSendDataParams, HandshakeTestParams};
use crate::rpc::server::tests::{endpoint_port, is_port_open}; use crate::rpc::server::tests::{endpoint_port, is_port_open};
use crate::sled::Db; use crate::sled::Db;
use crate::sleep;
use crate::startup::network_broadcast::announce_self_to_network; use crate::startup::network_broadcast::announce_self_to_network;
use crate::startup::remote_height::request_remote_height; use crate::startup::remote_height::request_remote_height;
use crate::wallets::structures::Wallet; use crate::wallets::structures::Wallet;
use crate::Arc; use crate::Arc;
use crate::AsyncWriteExt; use crate::AsyncWriteExt;
use crate::Duration;
use crate::Mutex; use crate::Mutex;
use crate::Settings; use crate::Settings;
use crate::TcpStream; use crate::TcpStream;
@ -96,20 +99,28 @@ async fn sync_incoming_peer_before_operational(
let post_sync_remote_height = let post_sync_remote_height =
request_remote_height(stream.clone(), map.clone(), connections_key.to_string()).await?; request_remote_height(stream.clone(), map.clone(), connections_key.to_string()).await?;
if post_sync_remote_height > post_sync_local_height { let imported_candidates = match hydrate_torrent_candidates(
match hydrate_torrent_candidates(stream.clone(), map.clone(), connections_key.to_string()) stream.clone(),
.await map.clone(),
{ connections_key.to_string(),
Ok(imported) => { )
if imported > 0 { .await
warn!( {
Ok(imported) => {
if imported > 0 {
warn!(
"[sync] hydrated {imported} torrent candidates before incoming post-sync orphan check" "[sync] hydrated {imported} torrent candidates before incoming post-sync orphan check"
); );
}
} }
Err(err) => warn!("[sync] failed to hydrate incoming torrent candidates: {err}"), imported
} }
Err(err) => {
warn!("[sync] failed to hydrate incoming torrent candidates: {err}");
0
}
};
if post_sync_remote_height > post_sync_local_height || imported_candidates > 0 {
let orphan_checkup_params = OrphanCheckup2 { let orphan_checkup_params = OrphanCheckup2 {
stream: stream.clone(), stream: stream.clone(),
db: db.clone(), db: db.clone(),
@ -128,6 +139,40 @@ async fn sync_incoming_peer_before_operational(
Ok(true) Ok(true)
} }
fn spawn_incoming_peer_promotion_watcher(
stream: Arc<Mutex<TcpStream>>,
db: Db,
map: Arc<Mutex<Command>>,
connections_key: String,
) {
tokio::spawn(async move {
loop {
sleep(Duration::from_secs(5)).await;
if Connection::get_stream_from_memory(&connections_key)
.await
.is_none()
{
break;
}
let local_height = get_height(&db);
let remote_height =
match request_remote_height(stream.clone(), map.clone(), connections_key.clone())
.await
{
Ok(height) => height,
Err(_) => continue,
};
if remote_height >= local_height {
mark_peer_operational(&connections_key, map.clone()).await;
break;
}
}
});
}
async fn complete_incoming_miner_setup( async fn complete_incoming_miner_setup(
stream: Arc<Mutex<TcpStream>>, stream: Arc<Mutex<TcpStream>>,
db: &Db, db: &Db,
@ -191,6 +236,13 @@ async fn complete_incoming_miner_setup(
set_node_mode(NodeMode::Normal); set_node_mode(NodeMode::Normal);
clear_mining_stop_request(); clear_mining_stop_request();
set_mining_state(MiningState::Idle); set_mining_state(MiningState::Idle);
} else {
spawn_incoming_peer_promotion_watcher(
stream.clone(),
db.clone(),
map.clone(),
connections_key.to_string(),
);
} }
if let Err(err) = NodeInfo::rebuild_mined_counts_from_chain(db).await { if let Err(err) = NodeInfo::rebuild_mined_counts_from_chain(db).await {

View File

@ -52,6 +52,7 @@ pub struct DownloadSave {
pub block_number: u32, pub block_number: u32,
pub allow_during_reorg: bool, pub allow_during_reorg: bool,
pub allow_historical: bool, pub allow_historical: bool,
pub allow_startup_peers: bool,
pub db: Db, pub db: Db,
pub verification_service: Arc<VerificationService>, pub verification_service: Arc<VerificationService>,
pub map: Arc<Mutex<Command>>, pub map: Arc<Mutex<Command>>,

View File

@ -7,7 +7,9 @@ use crate::records::memory::torrentmap::{PieceReservation, TorrentMap};
use crate::sleep; use crate::sleep;
use crate::torrent::structs::PieceStatus; use crate::torrent::structs::PieceStatus;
use crate::torrent::structs::{DownloadSave, DownloadedPieceJob, PieceDownloadJob, RequestPiece}; use crate::torrent::structs::{DownloadSave, DownloadedPieceJob, PieceDownloadJob, RequestPiece};
use crate::torrent::torrenting_system::get_nodes::get_nodes_from_memory; use crate::torrent::torrenting_system::get_nodes::{
get_nodes_from_memory, get_sync_nodes_from_memory,
};
use crate::torrent::torrenting_system::request_piece::request_piece_from_node; use crate::torrent::torrenting_system::request_piece::request_piece_from_node;
use crate::torrent::torrenting_system::temp_database_storage::remove_block_pieces_from_db; use crate::torrent::torrenting_system::temp_database_storage::remove_block_pieces_from_db;
use crate::torrent::torrenting_system::temp_database_storage::save_piece_to_db; use crate::torrent::torrenting_system::temp_database_storage::save_piece_to_db;
@ -210,7 +212,11 @@ pub async fn download_block_pieces(params: DownloadSave) -> Result<(), String> {
// Re-scan connected peers for any piece that is still pending or // Re-scan connected peers for any piece that is still pending or
// has failed and needs another download attempt. // has failed and needs another download attempt.
let connected_nodes = get_nodes_from_memory().await; let connected_nodes = if params.allow_startup_peers {
get_sync_nodes_from_memory().await
} else {
get_nodes_from_memory().await
};
if connected_nodes.is_empty() { if connected_nodes.is_empty() {
return Err(format!( return Err(format!(
"No connected miner peers available for block {}", "No connected miner peers available for block {}",

View File

@ -31,6 +31,31 @@ pub async fn get_nodes_from_memory() -> Vec<(String, Arc<Mutex<TcpStream>>)> {
nodes nodes
} }
pub async fn get_sync_nodes_from_memory() -> Vec<(String, Arc<Mutex<TcpStream>>)> {
// Startup chain sync happens before a peer is marked fully ready for
// mining/consensus, but after registry and network-map sync completed.
let connection_storage = CONNECTIONS.read().await;
let mut nodes = Vec::new();
if let Some(connection) = &*connection_storage {
for connection_info in connection.connection_map.values() {
if ClientType::from_bytes(&connection_info.client_type) != Some(ClientType::Miner) {
continue;
}
if !connection_info.ready
&& !(connection_info.wallet_registry_synced && connection_info.network_map_synced)
{
continue;
}
let ip = binary_to_ip(connection_info.ip.clone());
let port = connection_info.port;
let key = format!("{ip}:{port}");
let stream_arc = Arc::clone(&connection_info.stream);
nodes.push((key, stream_arc));
}
}
nodes
}
pub async fn get_torrent_broadcast_nodes_from_memory() -> Vec<(String, Arc<Mutex<TcpStream>>)> { pub async fn get_torrent_broadcast_nodes_from_memory() -> Vec<(String, Arc<Mutex<TcpStream>>)> {
// Torrent announcements are allowed to reach miner peers that are // Torrent announcements are allowed to reach miner peers that are
// still starting/syncing so they can stage new candidates while // still starting/syncing so they can stage new candidates while

View File

@ -16,6 +16,7 @@ pub async fn setup_download(
torrent: Torrent, torrent: Torrent,
staged_path: String, staged_path: String,
allow_during_reorg: bool, allow_during_reorg: bool,
allow_startup_peers: bool,
db: Db, db: Db,
verification_service: Arc<VerificationService>, verification_service: Arc<VerificationService>,
map: Arc<Mutex<Command>>, map: Arc<Mutex<Command>>,
@ -36,6 +37,7 @@ pub async fn setup_download(
block_number, block_number,
allow_during_reorg, allow_during_reorg,
allow_historical: false, allow_historical: false,
allow_startup_peers,
db: db.clone(), db: db.clone(),
verification_service, verification_service,
map, map,

View File

@ -41,6 +41,7 @@ pub async fn handle_response_and_save_torrent(
wallet: Arc<Wallet>, wallet: Arc<Wallet>,
map: Arc<Mutex<Command>>, map: Arc<Mutex<Command>>,
allow_during_reorg: bool, allow_during_reorg: bool,
allow_startup_peers: bool,
rebroadcast: bool, rebroadcast: bool,
) -> Result<(), String> { ) -> Result<(), String> {
let Some((torrent, staged_path)) = let Some((torrent, staged_path)) =
@ -55,6 +56,7 @@ pub async fn handle_response_and_save_torrent(
torrent, torrent,
staged_path, staged_path,
allow_during_reorg, allow_during_reorg,
allow_startup_peers,
db.clone(), db.clone(),
map.clone(), map.clone(),
) )
@ -98,6 +100,7 @@ pub async fn process_torrent_response(params: ProcessTorrentResponse) -> Result<
torrent, torrent,
staged_path, staged_path,
params.allow_during_reorg, params.allow_during_reorg,
false,
params.db, params.db,
params.map.clone(), params.map.clone(),
) )
@ -143,6 +146,7 @@ pub async fn setup_download_for_torrent(
torrent: Torrent, torrent: Torrent,
staged_path: String, staged_path: String,
allow_during_reorg: bool, allow_during_reorg: bool,
allow_startup_peers: bool,
db: Db, db: Db,
map: Arc<Mutex<Command>>, map: Arc<Mutex<Command>>,
) -> Result<(), String> { ) -> Result<(), String> {
@ -156,6 +160,7 @@ pub async fn setup_download_for_torrent(
torrent, torrent,
staged_path, staged_path,
allow_during_reorg, allow_during_reorg,
allow_startup_peers,
db, db,
Arc::new(verification_service), Arc::new(verification_service),
map, map,