use crate::common::binary_conversions::binary_to_ip_port; use crate::common::check_genesis::genesis_checkup; use crate::common::network_startup::get_ip_and_port; use crate::common::skein::skein_256_hash_data; use crate::config::SETTINGS; use crate::encode; use crate::io; use crate::log::{error, info, warn}; use crate::miner::flag::{ clear_mining_stop_request, request_mining_stop, set_mining_state, set_node_mode, MiningState, NodeMode, }; use crate::orphans::structs::OrphanCheckup2; use crate::orphans::sync_check::sync_checkup; use crate::orphans::torrent_candidates::hydrate_torrent_candidates; use crate::records::block_height::get_block_height::get_height; use crate::records::memory::connections::{set_reconnect_context, CONNECTIONS}; use crate::records::memory::enums::{ClientType, ConnectionType}; use crate::records::memory::network_mapping::NodeInfo; use crate::records::memory::response_channels::{reserve_entry, Command}; use crate::records::memory::structs::{Connection, StoreConnectionParams}; use crate::rpc::client::handshake::connect_and_handshake; use crate::rpc::client::register_wallet::register_connected_wallet; use crate::rpc::client::structs::{Connect, Handshake}; use crate::rpc::client::syncing::node_syncing; use crate::rpc::client::wallet_registry_sync::sync_wallet_registry; use crate::rpc::command_maps::RPC_RANDOM_NODE; use crate::rpc::handshake_constants::{ HANDSHAKE_ADDRESS_OFFSET, HANDSHAKE_MESSAGE_BYTES, HANDSHAKE_RESPONSE_BYTES, HANDSHAKE_SIGNATURE_OFFSET, }; use crate::rpc::responses::RpcResponse; use crate::rpc::server::rpc_command_loop::start_loop; use crate::sled::Db; use crate::startup::network_broadcast::announce_self_to_network; use crate::startup::remote_height::request_remote_height; use crate::timeout; use crate::wallets::structures::Wallet; use crate::Arc; use crate::Duration; use crate::Mutex; use crate::SocketAddr; use crate::TcpStream; #[derive(Clone)] pub struct BootstrapParams { pub stream: Arc>, pub connections_key: String, pub wallet_key: String, pub db: Db, pub map: Arc>, pub first: bool, } pub fn spawn_bootstrap_peer_discovery(params: BootstrapParams) { tokio::spawn(async move { if let Err(e) = bootstrap_peer_discovery(params).await { set_node_mode(NodeMode::Normal); clear_mining_stop_request(); set_mining_state(MiningState::Idle); eprintln!("[bootstrap] error: {e}"); } }); } pub async fn bootstrap_peer_discovery(mut params: BootstrapParams) -> Result<(), String> { set_node_mode(NodeMode::Syncing); request_mining_stop(); let (_, _, local_endpoint) = get_ip_and_port().await; let max = SETTINGS.outgoing_connections; let mut no_progress_count = 0; let max_no_progress = 3; let mut current_key = params.connections_key.clone(); let mut stream = params.stream; params.first = false; while no_progress_count < max_no_progress { let outgoing_connections = { let connections = CONNECTIONS.read().await; connections .as_ref() .map(|connection| connection.count_outgoing_connections()) .unwrap_or(0) }; if outgoing_connections >= max as usize { break; } let (hashmap_key, _tx, rx) = reserve_entry(params.map.clone()).await; let mut payload = vec![RPC_RANDOM_NODE]; payload.extend_from_slice(&hashmap_key); RpcResponse::send_raw(&stream, Some(¤t_key), &payload).await; let mut rx = rx.lock().await; let buffer = match timeout(Duration::from_secs(15), rx.recv()).await { Ok(Some(buf)) => buf, Ok(None) => return Err("Peer discovery channel closed".into()), Err(_) => return Err("Timeout waiting for peer discovery response".into()), }; if buffer.len() != 18 { no_progress_count += 1; continue; } let addr_string = binary_to_ip_port(&buffer[0..18]); if addr_string == local_endpoint { no_progress_count += 1; continue; } if Connection::get_stream_from_memory(&addr_string) .await .is_some() { no_progress_count += 1; continue; } let socket_addr: SocketAddr = match addr_string.parse() { Ok(addr) => addr, Err(_) => { warn!("Invalid discovered peer from {current_key}: {addr_string}"); no_progress_count += 1; continue; } }; let connect = Connect { addr: socket_addr, node_ip: addr_string.clone(), wallet_key: params.wallet_key.clone(), db: params.db.clone(), map: params.map.clone(), first: params.first, }; if let Err(err) = connect_and_handshake(connect).await { warn!("Failed to connect to discovered peer {addr_string}: {err}"); no_progress_count += 1; continue; } if let Some(new_stream) = Connection::get_stream_from_memory(&addr_string).await { stream = new_stream; current_key = addr_string; no_progress_count = 0; } else { warn!("Failed to retrieve new stream for: {addr_string}"); no_progress_count += 1; } } loop { let local_height = get_height(¶ms.db); let remote_height = request_remote_height(stream.clone(), params.map.clone(), current_key.clone()).await?; let local_genesis_exists = genesis_checkup().await; if !local_genesis_exists || remote_height > local_height + 10 { info!("[sync] Starting sync from {local_height} to {remote_height}"); node_syncing( stream.clone(), ¶ms.db, remote_height, params.map.clone(), true, ¶ms.wallet_key, current_key.clone(), ) .await .map_err(|e| format!("Sync error: {e}"))?; if !local_genesis_exists && !genesis_checkup().await { return Err("Sync completed without obtaining remote genesis".to_string()); } continue; } break; } let post_sync_local_height = get_height(¶ms.db); let post_sync_remote_height = request_remote_height(stream.clone(), params.map.clone(), current_key.clone()).await?; if post_sync_remote_height != post_sync_local_height { match hydrate_torrent_candidates(stream.clone(), params.map.clone(), current_key.clone()) .await { Ok(imported) => { if imported > 0 { warn!("[sync] hydrated {imported} torrent candidates before post-sync orphan check"); } } Err(err) => warn!("[sync] failed to hydrate torrent candidates: {err}"), } let orphan_checkup_params = OrphanCheckup2 { stream: stream.clone(), db: params.db.clone(), local_height: post_sync_local_height, remote_height: post_sync_remote_height, map: params.map.clone(), node_syncing: true, connections_key: current_key.clone(), }; match sync_checkup(orphan_checkup_params, ¶ms.wallet_key).await { Ok(()) => {} Err(err) => warn!("[sync] Post-sync orphan check error: {err}"), } } info!("[sync] post-sync checks complete, mining resuming"); set_node_mode(NodeMode::Normal); clear_mining_stop_request(); set_mining_state(MiningState::Idle); Ok(()) } pub async fn process_handshake_response( response: Vec, wallet: &Wallet, params: Handshake, ) -> io::Result<()> { if response.len() != HANDSHAKE_RESPONSE_BYTES { return Err(io::Error::new( io::ErrorKind::InvalidData, format!( "Invalid handshake response length: expected {}, got {}", HANDSHAKE_RESPONSE_BYTES, response.len() ), )); } let returned_message_bin = &response[..HANDSHAKE_MESSAGE_BYTES]; let returned_signed_bin = &response[HANDSHAKE_SIGNATURE_OFFSET..HANDSHAKE_ADDRESS_OFFSET]; let returned_address_bin = &response[HANDSHAKE_ADDRESS_OFFSET..HANDSHAKE_RESPONSE_BYTES]; let returned_message = encode(returned_message_bin); let returned_signed_message = encode(returned_signed_bin); if Wallet::map_byte_to_wallet(returned_address_bin[0]).is_empty() { return Err(io::Error::new( io::ErrorKind::InvalidData, "Invalid handshake wallet network byte", )); } let returned_address = Wallet::bytes_to_long_address(returned_address_bin.to_vec()); if !Wallet::wallet_validation(&returned_address).await { return Ok(()); } let hash = skein_256_hash_data(&returned_message); let valid_response_signature = Wallet::verify_transaction(&hash, &returned_signed_message, &returned_address).await; if returned_message == "ecaf" && valid_response_signature { return Err(io::Error::other( "Handshake accepted as client, not miner. The remote node could not verify your advertised public endpoint is reachable.", )); } if returned_message != "face" { return Err(io::Error::other(format!( "Handshake failed: unexpected response message {returned_message}" ))); } if !valid_response_signature { return Err(io::Error::other( "Handshake failed: response signature verification failed", )); } // create and arc mutex of the stream let stream = Arc::new(Mutex::new(params.stream)); let connections_key = params.addr.clone(); let socket_parts: Vec<&str> = params.addr.split(':').collect(); if socket_parts.len() == 2 { let ip = socket_parts[0]; let port: u16 = socket_parts[1].parse().unwrap_or(0); set_reconnect_context( params.db.clone(), params.wallet_key.clone(), params.map.clone(), ) .await; let mut conn = CONNECTIONS.write().await; if let Some(manager) = conn.as_mut() { if !manager.store_connection(StoreConnectionParams { connection_type: ConnectionType::Outgoing, ip: ip.to_string(), port, stream: Arc::clone(&stream), client_type: ClientType::Miner, wallet_address: returned_address.clone(), command_map: params.map.clone(), }) { return Err(io::Error::other( "The connection is already in the connection manager Please wait 10 minutes and try again", )); } } } let listener_stream = Arc::clone(&stream); tokio::spawn(start_loop( listener_stream, params.db.clone(), connections_key.clone(), params.wallet_key.clone(), params.map.clone(), )); let broadcast_stream = Arc::clone(&stream); if let Err(err) = register_connected_wallet( Arc::clone(&stream), params.map.clone(), connections_key.clone(), wallet, ) .await { warn!("[wallet_registry] peer registration failed during handshake: {err}"); } if let Err(err) = sync_wallet_registry( Arc::clone(&stream), ¶ms.db, params.map.clone(), connections_key.clone(), ) .await { warn!("[wallet_registry] peer sync failed during handshake: {err}"); } if params.first { announce_self_to_network( broadcast_stream.clone(), &wallet.saved.short_address.clone(), params.map.clone(), ¶ms.db.clone(), ¶ms.wallet_key.clone(), &connections_key, ) .await; if let Err(err) = NodeInfo::rebuild_mined_counts_from_chain(¶ms.db).await { error!("[startup] failed to rebuild mined counts from local chain: {err}"); } let bsparams = BootstrapParams { stream: Arc::clone(&stream), connections_key: connections_key.clone(), wallet_key: params.wallet_key.clone(), db: params.db.clone(), map: params.map.clone(), first: params.first, }; spawn_bootstrap_peer_discovery(bsparams); } else { announce_self_to_network( broadcast_stream.clone(), &wallet.saved.short_address.clone(), params.map.clone(), ¶ms.db.clone(), ¶ms.wallet_key.clone(), &connections_key, ) .await; } Ok(()) }