use crate::common::binary_conversions::binary_to_ip_port; use crate::common::check_genesis::genesis_checkup; use crate::common::network_startup::get_ip_and_port; use crate::common::skein::skein_256_hash_data; use crate::config::SETTINGS; use crate::encode; use crate::io; use crate::log::{error, info, warn}; use crate::miner::flag::{ clear_mining_stop_request, request_mining_stop, set_mining_state, set_node_mode, MiningState, NodeMode, }; use crate::orphans::structs::OrphanCheckup2; use crate::orphans::sync_check::sync_checkup; use crate::orphans::torrent_candidates::hydrate_torrent_candidates; use crate::records::block_height::get_block_height::get_height; use crate::records::memory::connections::{ mark_peer_network_map_synced, mark_peer_operational, mark_peer_wallet_registry_synced, set_reconnect_context, startup_synced_peer_streams, CONNECTIONS, }; use crate::records::memory::enums::{ClientType, ConnectionType}; use crate::records::memory::network_mapping::NodeInfo; use crate::records::memory::response_channels::{reserve_entry, Command}; use crate::records::memory::structs::{Connection, StoreConnectionParams}; use crate::rpc::client::handshake::connect_and_handshake; use crate::rpc::client::register_wallet::register_connected_wallet; use crate::rpc::client::structs::{Connect, Handshake}; use crate::rpc::client::syncing::node_syncing; use crate::rpc::client::wallet_registry_sync::sync_wallet_registry_with_retries; use crate::rpc::command_maps::RPC_RANDOM_NODE; use crate::rpc::handshake_constants::{ HANDSHAKE_ADDRESS_OFFSET, HANDSHAKE_MESSAGE_BYTES, HANDSHAKE_RESPONSE_BYTES, HANDSHAKE_SIGNATURE_OFFSET, }; use crate::rpc::responses::RpcResponse; use crate::rpc::server::connection_memory_manager::remove_key_from_memory; use crate::rpc::server::rpc_command_loop::start_loop; use crate::sled::Db; use crate::sleep; use crate::startup::network_broadcast::announce_self_to_network; use crate::startup::remote_height::request_remote_height; use crate::timeout; use crate::wallets::structures::Wallet; use crate::Arc; use crate::Duration; use crate::Mutex; use crate::SocketAddr; use crate::TcpStream; #[derive(Clone)] pub struct BootstrapParams { pub stream: Arc>, pub connections_key: String, pub wallet: Arc, pub db: Db, pub map: Arc>, pub first: bool, } pub fn spawn_bootstrap_peer_discovery(params: BootstrapParams) { tokio::spawn(async move { if let Err(e) = bootstrap_peer_discovery(params).await { set_node_mode(NodeMode::Normal); clear_mining_stop_request(); set_mining_state(MiningState::Idle); eprintln!("[bootstrap] error: {e}"); } }); } pub async fn bootstrap_peer_discovery(mut params: BootstrapParams) -> Result<(), String> { set_node_mode(NodeMode::Syncing); request_mining_stop(); let (_, _, local_endpoint) = get_ip_and_port().await; let max = SETTINGS.outgoing_connections; let mut no_progress_count = 0; let max_no_progress = 3; let mut current_key = params.connections_key.clone(); let mut stream = params.stream; params.first = false; while no_progress_count < max_no_progress { let outgoing_connections = { let connections = CONNECTIONS.read().await; connections .as_ref() .map(|connection| connection.count_outgoing_connections()) .unwrap_or(0) }; if outgoing_connections >= max as usize { break; } let (hashmap_key, _tx, rx) = reserve_entry(params.map.clone()).await; let mut payload = vec![RPC_RANDOM_NODE]; payload.extend_from_slice(&hashmap_key); RpcResponse::send_raw(&stream, Some(¤t_key), &payload).await; let mut rx = rx.lock().await; let buffer = match timeout(Duration::from_secs(15), rx.recv()).await { Ok(Some(buf)) => buf, Ok(None) => return Err("Peer discovery channel closed".into()), Err(_) => return Err("Timeout waiting for peer discovery response".into()), }; if buffer.len() != 18 { no_progress_count += 1; continue; } let addr_string = binary_to_ip_port(&buffer[0..18]); if addr_string == local_endpoint { no_progress_count += 1; continue; } if Connection::get_stream_from_memory(&addr_string) .await .is_some() { no_progress_count += 1; continue; } let socket_addr: SocketAddr = match addr_string.parse() { Ok(addr) => addr, Err(_) => { warn!("Invalid discovered peer from {current_key}: {addr_string}"); no_progress_count += 1; continue; } }; let connect = Connect { addr: socket_addr, node_ip: addr_string.clone(), wallet: params.wallet.clone(), db: params.db.clone(), map: params.map.clone(), first: params.first, }; if let Err(err) = connect_and_handshake(connect).await { warn!("Failed to connect to discovered peer {addr_string}: {err}"); no_progress_count += 1; continue; } if let Some(new_stream) = Connection::get_stream_from_memory(&addr_string).await { stream = new_stream; current_key = addr_string; no_progress_count = 0; } else { warn!("Failed to retrieve new stream for: {addr_string}"); no_progress_count += 1; } } loop { let local_height = get_height(¶ms.db); let remote_height = request_remote_height(stream.clone(), params.map.clone(), current_key.clone()).await?; let local_genesis_exists = genesis_checkup().await; if !local_genesis_exists || remote_height > local_height + 10 { info!("[sync] Starting sync from {local_height} to {remote_height}"); node_syncing( stream.clone(), ¶ms.db, remote_height, params.map.clone(), true, params.wallet.clone(), current_key.clone(), ) .await .map_err(|e| format!("Sync error: {e}"))?; if !local_genesis_exists && !genesis_checkup().await && remote_height > 0 { return Err("Sync completed without obtaining remote genesis".to_string()); } if !local_genesis_exists && !genesis_checkup().await { break; } continue; } break; } let post_sync_local_height = get_height(¶ms.db); let post_sync_remote_height = request_remote_height(stream.clone(), params.map.clone(), current_key.clone()).await?; let imported_candidates = match hydrate_torrent_candidates(stream.clone(), params.map.clone(), current_key.clone()) .await { Ok(imported) => { if imported > 0 { warn!( "[sync] hydrated {imported} torrent candidates before post-sync orphan check" ); } imported } Err(err) => { warn!("[sync] failed to hydrate torrent candidates: {err}"); 0 } }; if post_sync_remote_height != post_sync_local_height || imported_candidates > 0 { let orphan_checkup_params = OrphanCheckup2 { stream: stream.clone(), db: params.db.clone(), local_height: post_sync_local_height, remote_height: post_sync_remote_height, recheck_from_height: Some(post_sync_local_height.min(post_sync_remote_height)), map: params.map.clone(), node_syncing: true, connections_key: current_key.clone(), }; match sync_checkup(orphan_checkup_params, params.wallet.clone()).await { Ok(()) => {} Err(err) => warn!("[sync] Post-sync orphan check error: {err}"), } } info!("[sync] post-sync checks complete, mining grace period started"); if let Err(err) = NodeInfo::rebuild_mined_counts_from_chain(¶ms.db).await { error!("[startup] failed to rebuild mined counts after startup sync: {err}"); } for (peer_key, _) in startup_synced_peer_streams().await { mark_peer_operational(&peer_key, params.map.clone()).await; } sleep(Duration::from_secs(15)).await; info!("[sync] mining grace period complete, mining resuming"); set_node_mode(NodeMode::Normal); clear_mining_stop_request(); set_mining_state(MiningState::Idle); Ok(()) } pub async fn process_handshake_response( response: Vec, wallet: &Wallet, params: Handshake, ) -> io::Result<()> { if response.len() != HANDSHAKE_RESPONSE_BYTES { return Err(io::Error::new( io::ErrorKind::InvalidData, format!( "Invalid handshake response length: expected {}, got {}", HANDSHAKE_RESPONSE_BYTES, response.len() ), )); } let returned_message_bin = &response[..HANDSHAKE_MESSAGE_BYTES]; let returned_signed_bin = &response[HANDSHAKE_SIGNATURE_OFFSET..HANDSHAKE_ADDRESS_OFFSET]; let returned_public_key_bin = &response[HANDSHAKE_ADDRESS_OFFSET..HANDSHAKE_RESPONSE_BYTES]; let returned_message = encode(returned_message_bin); let returned_signed_message = encode(returned_signed_bin); if returned_public_key_bin.len() != Wallet::PUBLIC_KEY_LENGTH { return Ok(()); } let hash = skein_256_hash_data(&returned_message); let valid_response_signature = Wallet::verify_transaction_with_public_key_bytes( &hash, &returned_signed_message, returned_public_key_bin, ) .await; if returned_message == "ecaf" && valid_response_signature { return Err(io::Error::other( "Handshake accepted as client, not miner. The remote node could not verify your advertised public endpoint is reachable.", )); } if returned_message != "face" { return Err(io::Error::other(format!( "Handshake failed: unexpected response message {returned_message}" ))); } if !valid_response_signature { return Err(io::Error::other( "Handshake failed: response signature verification failed", )); } let Some(returned_short_address_bytes) = Wallet::public_key_bytes_to_short_address_bytes(returned_public_key_bin) else { return Err(io::Error::other( "Handshake failed: invalid response public key", )); }; let returned_short_address = Wallet::bytes_to_short_address(&returned_short_address_bytes) .ok_or_else(|| io::Error::other("Handshake failed: invalid peer short address"))?; // create and arc mutex of the stream let stream = Arc::new(Mutex::new(params.stream)); let connections_key = params.addr.clone(); let socket_parts: Vec<&str> = params.addr.split(':').collect(); if socket_parts.len() == 2 { let ip = socket_parts[0]; let port: u16 = socket_parts[1].parse().unwrap_or(0); set_reconnect_context(params.db.clone(), params.wallet.clone(), params.map.clone()).await; let mut conn = CONNECTIONS.write().await; if let Some(manager) = conn.as_mut() { if !manager.store_connection(StoreConnectionParams { connection_type: ConnectionType::Outgoing, ip: ip.to_string(), port, stream: Arc::clone(&stream), client_type: ClientType::Miner, wallet_short_address: returned_short_address.clone(), command_map: params.map.clone(), }) { return Err(io::Error::other( "The connection is already in the connection manager Please wait 10 minutes and try again", )); } } } let listener_stream = Arc::clone(&stream); tokio::spawn(start_loop( listener_stream, params.db.clone(), connections_key.clone(), params.wallet.clone(), params.map.clone(), )); let broadcast_stream = Arc::clone(&stream); if let Err(err) = sync_wallet_registry_with_retries( Arc::clone(&stream), ¶ms.db, params.map.clone(), connections_key.clone(), "outgoing peer", ) .await { remove_key_from_memory(&connections_key).await; return Err(io::Error::other(format!( "Wallet registry sync failed after handshake: {err}" ))); } mark_peer_wallet_registry_synced(&connections_key).await; if let Err(err) = register_connected_wallet( Arc::clone(&stream), ¶ms.db, params.map.clone(), connections_key.clone(), wallet, ) .await { remove_key_from_memory(&connections_key).await; return Err(io::Error::other(format!( "Wallet registration failed after handshake: {err}" ))); } if params.first { announce_self_to_network( broadcast_stream.clone(), &wallet.saved.short_address.clone(), params.map.clone(), ¶ms.db.clone(), params.wallet.clone(), &connections_key, ) .await .map_err(|err| { let connections_key = connections_key.clone(); tokio::spawn(async move { remove_key_from_memory(&connections_key).await; }); io::Error::other(format!("Network map sync failed: {err}")) })?; mark_peer_network_map_synced(&connections_key).await; let bsparams = BootstrapParams { stream: Arc::clone(&stream), connections_key: connections_key.clone(), wallet: params.wallet.clone(), db: params.db.clone(), map: params.map.clone(), first: params.first, }; spawn_bootstrap_peer_discovery(bsparams); } else { announce_self_to_network( broadcast_stream.clone(), &wallet.saved.short_address.clone(), params.map.clone(), ¶ms.db.clone(), params.wallet.clone(), &connections_key, ) .await .map_err(|err| { let connections_key = connections_key.clone(); tokio::spawn(async move { remove_key_from_memory(&connections_key).await; }); io::Error::other(format!("Network map sync failed: {err}")) })?; mark_peer_network_map_synced(&connections_key).await; if crate::miner::flag::is_normal_mode() { mark_peer_operational(&connections_key, params.map.clone()).await; } } Ok(()) }