use crate::common::check_genesis::genesis_checkup; use crate::log::{error, warn}; use crate::miner::flag::{ clear_mining_stop_request, request_mining_stop, set_mining_state, set_node_mode, MiningState, NodeMode, }; use crate::orphans::structs::OrphanCheckup2; use crate::orphans::sync_check::sync_checkup; use crate::orphans::torrent_candidates::hydrate_torrent_candidates; use crate::records::block_height::get_block_height::get_height; use crate::records::memory::connections::{ mark_peer_network_map_synced, mark_peer_operational, mark_peer_wallet_registry_synced, }; use crate::records::memory::network_mapping::NodeInfo; use crate::records::memory::response_channels::generate_uid; use crate::records::memory::response_channels::Command; use crate::records::wallet_registry::{register_short_address, WalletRegistrationResult}; use crate::rpc::client::syncing::node_syncing; use crate::rpc::client::wallet_registry_sync::sync_wallet_registry_with_retries; use crate::rpc::responses::RpcResponse; use crate::rpc::server::connection_memory_manager::{remove_key_from_memory, write_to_memory}; use crate::rpc::server::handshake_processing::{combine_and_send_data, parse_received_data}; use crate::rpc::server::handshake_verifications::{connection_count, perform_handshake_tests}; use crate::rpc::server::structs::{CombineAndSendDataParams, HandshakeTestParams}; use crate::rpc::server::tests::{endpoint_port, is_port_open}; use crate::sled::Db; use crate::startup::network_broadcast::announce_self_to_network; use crate::startup::remote_height::request_remote_height; use crate::wallets::structures::Wallet; use crate::Arc; use crate::AsyncWriteExt; use crate::Mutex; use crate::Settings; use crate::TcpStream; use crate::Utc; async fn drop_failed_handshake(stream: &Arc>) { // Failed handshakes are never stored in connection memory, but the // accepted TCP socket should still be closed immediately. let mut stream_guard = stream.lock().await; let _ = stream_guard.flush().await; let _ = stream_guard.shutdown().await; } async fn get_connection_counts() -> (u8, u8) { // Handshake limits come from settings so the node can change its // connection policy without recompiling. let settings = Settings::load().expect("Failed to load settings"); let incoming = settings.incoming_connections; let outgoing = settings.outgoing_connections; (incoming, outgoing) } async fn sync_incoming_peer_before_operational( stream: Arc>, db: &Db, wallet: Arc, map: Arc>, connections_key: &str, ) -> Result { let local_height = get_height(db); let remote_height = request_remote_height(stream.clone(), map.clone(), connections_key.to_string()).await?; let local_genesis_exists = genesis_checkup().await; if local_genesis_exists && remote_height < local_height { warn!( "[startup] incoming peer is behind local chain and will remain non-operational until it catches up: local_height={local_height} remote_height={remote_height}" ); return Ok(false); } if !local_genesis_exists || remote_height > local_height + 10 { set_node_mode(NodeMode::Syncing); request_mining_stop(); set_mining_state(MiningState::Idle); node_syncing( stream.clone(), db, remote_height, map.clone(), true, wallet.clone(), connections_key.to_string(), ) .await .map_err(|err| format!("incoming sync error: {err}"))?; if !local_genesis_exists && !genesis_checkup().await && remote_height > 0 { return Err("incoming sync completed without obtaining remote genesis".to_string()); } } let post_sync_local_height = get_height(db); let post_sync_remote_height = request_remote_height(stream.clone(), map.clone(), connections_key.to_string()).await?; if post_sync_remote_height > post_sync_local_height { match hydrate_torrent_candidates(stream.clone(), map.clone(), connections_key.to_string()) .await { Ok(imported) => { if imported > 0 { warn!( "[sync] hydrated {imported} torrent candidates before incoming post-sync orphan check" ); } } Err(err) => warn!("[sync] failed to hydrate incoming torrent candidates: {err}"), } let orphan_checkup_params = OrphanCheckup2 { stream: stream.clone(), db: db.clone(), local_height: post_sync_local_height, remote_height: post_sync_remote_height, recheck_from_height: Some(post_sync_local_height.min(post_sync_remote_height)), map: map.clone(), node_syncing: true, connections_key: connections_key.to_string(), }; if let Err(err) = sync_checkup(orphan_checkup_params, wallet).await { warn!("[sync] Incoming post-sync orphan check error: {err}"); } } Ok(true) } async fn complete_incoming_miner_setup( stream: Arc>, db: &Db, wallet: Arc, map: Arc>, connections_key: &str, ) { // Incoming miner handshakes need the same post-handshake state exchange // as outgoing handshakes so a bootstrap with only incoming peers can mine. if let Err(err) = sync_wallet_registry_with_retries( stream.clone(), db, map.clone(), connections_key.to_string(), "incoming peer", ) .await { error!("[startup] incoming peer wallet registry sync failed: {err}"); remove_key_from_memory(connections_key).await; return; } mark_peer_wallet_registry_synced(connections_key).await; let short_address = wallet.saved.short_address.clone(); if let Err(err) = announce_self_to_network( stream.clone(), &short_address, map.clone(), db, wallet.clone(), connections_key, ) .await { error!("[startup] incoming peer network map sync failed: {err}"); remove_key_from_memory(connections_key).await; return; } mark_peer_network_map_synced(connections_key).await; let operational = match sync_incoming_peer_before_operational( stream.clone(), db, wallet.clone(), map.clone(), connections_key, ) .await { Ok(operational) => operational, Err(err) => { error!("[startup] incoming peer chain sync failed: {err}"); remove_key_from_memory(connections_key).await; return; } }; if operational { mark_peer_operational(connections_key, map.clone()).await; set_node_mode(NodeMode::Normal); clear_mining_stop_request(); set_mining_state(MiningState::Idle); } if let Err(err) = NodeInfo::rebuild_mined_counts_from_chain(db).await { error!("[startup] failed to rebuild mined counts after incoming handshake: {err}"); } } // this function validates incoming handshake and determined // what type of connection was made pub async fn handle_handshake( stream: Arc>, db: Db, wallet: Arc, map: Arc>, ) { // read number of connected clients or set to 0 if none let count = connection_count().await; // Only incoming capacity matters here; outgoing is loaded with the // same settings call but enforced by the connection starter. let (incoming_connections, _outgoing_connection) = get_connection_counts().await; // get data from stream let Ok(( received_message, received_signed_message, received_public_key, hash, received_ip, peer_time, )) = parse_received_data(stream.clone()).await else { return; }; // get local timestamp let timestamp = Utc::now().timestamp() as u32; // received message should be "aced" // aced is used instead of ping and pong // as its HEX and compressed better if received_message == "aced" { //validate handshake tests if !perform_handshake_tests(HandshakeTestParams { map: map.clone(), stream: stream.clone(), count, peer_time, timestamp, incoming_connections, hash: &hash, received_signed_message: &received_signed_message, received_address: &received_public_key, received_ip: &received_ip, }) .await { // Each failed test sends its own error response, so the // handshake can stop here without writing another message. drop_failed_handshake(&stream).await; return; } // Port 0 is the explicit client marker. A node advertising a // nonzero miner port must actually be reachable before it can be // stored in connection memory or the network map. let Some(advertised_port) = endpoint_port(&received_ip) else { let hashmap_key = generate_uid(); let padded_bytes = [hashmap_key[0], hashmap_key[1], hashmap_key[2], 0]; let uid = u32::from_le_bytes(padded_bytes); let response_bytes = RpcResponse::Binary("error: Invalid advertised endpoint.".as_bytes().to_vec()); response_bytes.send(&stream, None, uid).await; drop_failed_handshake(&stream).await; return; }; let connection_type = if advertised_port == 0 { "client" } else if is_port_open(&received_ip).await.unwrap_or(false) { "miner" } else { let hashmap_key = generate_uid(); let padded_bytes = [hashmap_key[0], hashmap_key[1], hashmap_key[2], 0]; let uid = u32::from_le_bytes(padded_bytes); let response_bytes = RpcResponse::Binary( "error: Handshake failed: advertised miner port is not reachable." .as_bytes() .to_vec(), ); response_bytes.send(&stream, None, uid).await; drop_failed_handshake(&stream).await; return; }; if connection_type == "miner" { let miner_reserved_limit = incoming_connections.saturating_sub(1) as usize; let current_count = connection_count().await; if current_count >= miner_reserved_limit { let hashmap_key = generate_uid(); let padded_bytes = [hashmap_key[0], hashmap_key[1], hashmap_key[2], 0]; let uid = u32::from_le_bytes(padded_bytes); let response_bytes = RpcResponse::Binary( "error: Miner connection slots are filled. Please try again later." .as_bytes() .to_vec(), ); response_bytes.send(&stream, None, uid).await; drop_failed_handshake(&stream).await; return; } } let received_public_key_bytes = match crate::decode(&received_public_key) { Ok(bytes) if bytes.len() == Wallet::PUBLIC_KEY_LENGTH => bytes, _ => { drop_failed_handshake(&stream).await; return; } }; let Some(received_short_address_bytes) = Wallet::public_key_bytes_to_short_address_bytes(&received_public_key_bytes) else { drop_failed_handshake(&stream).await; return; }; match register_short_address( &db, &received_short_address_bytes, &received_public_key_bytes, ) { Ok( WalletRegistrationResult::Inserted | WalletRegistrationResult::AlreadyRegistered, ) => {} Ok(WalletRegistrationResult::Conflict) | Err(_) => { drop_failed_handshake(&stream).await; return; } } // write to memory let connections_key = write_to_memory( &received_ip, stream.clone(), connection_type, received_public_key_bytes, map.clone(), ) .await; if connections_key != "false" { // Once the peer is accepted into memory, return our signed // handshake response and start the long-lived RPC loop. let is_miner = connection_type == "miner"; let post_handshake_stream = stream.clone(); let post_handshake_map = map.clone(); let post_handshake_wallet = wallet.clone(); let post_handshake_connections_key = connections_key.clone(); let params = CombineAndSendDataParams { stream, db: db.clone(), connections_key, connection_type: connection_type.to_string(), wallet: wallet.clone(), map, returned_address: received_public_key.clone(), }; if combine_and_send_data(params).await.is_ok() && is_miner { complete_incoming_miner_setup( post_handshake_stream, &db, post_handshake_wallet, post_handshake_map, &post_handshake_connections_key, ) .await; } } else { drop_failed_handshake(&stream).await; } } else { let response_bytes = RpcResponse::Binary({ "error: Invalid Handshake: Signature Failed" .to_string() .as_bytes() .to_vec() }); response_bytes.send(&stream, None, 0).await; drop_failed_handshake(&stream).await; } }