use crate::common::binary_conversions::{binary_to_ip, binary_to_string, ip_to_binary}; use crate::common::network_startup::get_ip_and_port; use crate::records::memory::network_mapping::monitor::MONITOR_ACTION_ADD; use crate::records::memory::network_mapping::structs::{ AddAddressParams, MonitorAddressParams, SignedMonitorEdit, SignedNodeEdit, NODE_ADDED_BY_OFFSET, NODE_ADDED_SIGNATURE_OFFSET, NODE_ADDED_TIMESTAMP_OFFSET, NODE_BLOCKS_MINED_OFFSET, NODE_DELETED_BLOCK_OFFSET, NODE_DELETED_TIMESTAMP_OFFSET, NODE_IP_OFFSET, NODE_MONITOR_COUNT_OFFSET, NODE_RECORD_FIXED_BYTES, }; use crate::records::memory::network_mapping::NodeInfo; use crate::records::memory::response_channels::{reserve_entry, Command}; use crate::records::memory::structs::Connection; use crate::rpc::command_maps::{RPC_ADD_NETWORK_NODE, RPC_REQUEST_NODE_LIST}; use crate::rpc::responses::RpcResponse; use crate::sled::Db; use crate::timeout; use crate::wallets::structures::Wallet; use crate::Arc; use crate::Duration; use crate::Mutex; use crate::TcpStream; use crate::Utc; async fn record_local_monitor_for_peer( connections_key: &str, command_map: Arc>, db: &Db, wallet: Arc, ) { let Some(peer_wallet_bytes) = Connection::get_wallet_for_connection_key(connections_key).await else { return; }; let Some(monitored_address) = Wallet::public_key_bytes_to_short_address(&peer_wallet_bytes) else { return; }; let monitoring_address = wallet.saved.short_address.clone(); if monitored_address == monitoring_address { return; } let Some((target_ip, _)) = connections_key.rsplit_once(':') else { return; }; let timestamp = Utc::now().timestamp_millis() as u64; let signature = NodeInfo::monitor_signature( MONITOR_ACTION_ADD, &monitored_address, &monitoring_address, target_ip, timestamp, &wallet, ) .await; let _ = NodeInfo::add_monitor(MonitorAddressParams { map: command_map, edit: SignedMonitorEdit { action: MONITOR_ACTION_ADD, monitored_address, monitoring_address, target_ip: target_ip.to_string(), modified_timestamp: timestamp, modified_signature: signature, }, remote_ip: String::new(), db: db.clone(), wallet, connections_key: connections_key.to_string(), }) .await; } pub async fn announce_self_to_network( unlocked_stream: Arc>, address: &str, command_map: Arc>, db: &Db, wallet: Arc, connections_key: &str, ) -> Result<(), String> { // announce the local node to the connected peer, then // request its current network mapping on success let message_type = RPC_ADD_NETWORK_NODE; let (ip, _, _) = get_ip_and_port().await; // Reserve a reply key so the peer's acknowledgement returns to this request. let (hashmap_key, _hashmap_tx, hashmap_rx) = reserve_entry(command_map.clone()).await; // Encode the local node identity into the same binary shape used by network commands. let ip_bytes = ip_to_binary(&ip); let address_bytes = match Wallet::short_address_to_bytes(address) { Some(bytes) => bytes, None => return Err("local node address was invalid".to_string()), }; let modified_by_bytes = vec![0u8; Wallet::SHORT_ADDRESS_BYTES_LENGTH]; let time = Utc::now().timestamp_millis() as u64; let modified_timestamp_bytes = time.to_le_bytes(); // Self-announcement is intentionally unsigned. The receiving node // adopts it by re-signing the membership edit with its own wallet. let modified_signature_bytes = vec![0u8; Wallet::SIGNATURE_LENGTH]; let mut message: Vec = Vec::with_capacity( 1 + 3 + Wallet::SHORT_ADDRESS_BYTES_LENGTH + 16 + Wallet::SHORT_ADDRESS_BYTES_LENGTH + 8 + Wallet::SIGNATURE_LENGTH, ); message.push(message_type); message.extend_from_slice(&hashmap_key); message.extend_from_slice(&address_bytes); message.extend_from_slice(&ip_bytes); message.extend_from_slice(&modified_by_bytes); message.extend_from_slice(&modified_timestamp_bytes); message.extend_from_slice(&modified_signature_bytes); RpcResponse::send_raw(&unlocked_stream, Some(connections_key), &message).await; // Only pull the peer's network mapping after it accepts our self-announcement. let mut rx = hashmap_rx.lock().await; // Handle the received data let buffer = timeout(Duration::from_secs(30), rx.recv()) .await .map_err(|_| "timed out waiting for network self-announcement response".to_string())? .ok_or_else(|| "network self-announcement response channel closed".to_string())?; if binary_to_string(buffer.clone()) != "Success" { return Err("network self-announcement was rejected".to_string()); } get_network_mapping( unlocked_stream, command_map.clone(), db, wallet.clone(), connections_key, ) .await?; record_local_monitor_for_peer(connections_key, command_map, db, wallet).await; Ok(()) } pub async fn get_network_mapping( unlocked_stream: Arc>, command_map: Arc>, db: &Db, wallet: Arc, connections_key: &str, ) -> Result<(), String> { // request the remote peer's serialized node list and // import each advertised add/delete record locally let message_type = RPC_REQUEST_NODE_LIST; let (download_hashmap_key, _hashmap_tx, download_hashmap_rx) = reserve_entry(command_map.clone()).await; let mut message: Vec = Vec::new(); message.push(message_type); message.extend_from_slice(&download_hashmap_key); // The remote reply is a concatenated list of variable-width node records. RpcResponse::send_raw(&unlocked_stream, Some(connections_key), &message).await; let mut rx = download_hashmap_rx.lock().await; let mut buffer = timeout(Duration::from_secs(30), rx.recv()) .await .map_err(|_| "timed out waiting for network mapping response".to_string())? .ok_or_else(|| "network mapping response channel closed".to_string())?; while buffer.len() >= NODE_RECORD_FIXED_BYTES { let monitor_count = u16::from_le_bytes( buffer[NODE_MONITOR_COUNT_OFFSET..NODE_RECORD_FIXED_BYTES] .try_into() .unwrap(), ) as usize; let record_bytes = NODE_RECORD_FIXED_BYTES + (monitor_count * Wallet::SHORT_ADDRESS_BYTES_LENGTH); if buffer.len() < record_bytes { return Err("network mapping response ended mid-record".to_string()); } let chunk: Vec = buffer.drain(0..record_bytes).collect(); // The first part of each record describes the advertised node address and IP. let Some(address) = Wallet::bytes_to_short_address(&chunk[0..NODE_IP_OFFSET]) else { continue; }; let ip = binary_to_ip(chunk[NODE_IP_OFFSET..NODE_BLOCKS_MINED_OFFSET].to_vec()); let _advertised_blocks_mined = chunk[NODE_BLOCKS_MINED_OFFSET]; let blocks_mined = 0_u8; let added_by_bytes = &chunk[NODE_ADDED_BY_OFFSET..NODE_ADDED_TIMESTAMP_OFFSET]; let added_by = if added_by_bytes.iter().all(|&byte| byte == 0) { String::new() } else { Wallet::bytes_to_short_address(added_by_bytes).unwrap_or_default() }; let added_timestamp = u64::from_le_bytes( chunk[NODE_ADDED_TIMESTAMP_OFFSET..NODE_ADDED_SIGNATURE_OFFSET] .try_into() .unwrap(), ); let added_signature = crate::encode(&chunk[NODE_ADDED_SIGNATURE_OFFSET..NODE_DELETED_TIMESTAMP_OFFSET]); let deleted_timestamp = u64::from_le_bytes( chunk[NODE_DELETED_TIMESTAMP_OFFSET..NODE_DELETED_BLOCK_OFFSET] .try_into() .unwrap(), ); let deleted_block = u32::from_le_bytes( chunk[NODE_DELETED_BLOCK_OFFSET..NODE_MONITOR_COUNT_OFFSET] .try_into() .unwrap(), ); let remote_ip = ""; // Add records are imported through NodeInfo so local validation/signing rules stay central. NodeInfo::add_address(AddAddressParams { map: command_map.clone(), edit: SignedNodeEdit { address: address.clone(), ip: ip.clone(), modified_by: added_by, modified_timestamp: added_timestamp, modified_signature: added_signature, }, blocks_mined, remote_ip: remote_ip.to_string(), db: db.clone(), wallet: wallet.clone(), connections_key: connections_key.to_string(), }) .await; let mut monitors = Vec::with_capacity(monitor_count); for monitor_index in 0..monitor_count { let start = NODE_RECORD_FIXED_BYTES + monitor_index * Wallet::SHORT_ADDRESS_BYTES_LENGTH; let end = start + Wallet::SHORT_ADDRESS_BYTES_LENGTH; if let Some(monitor) = Wallet::bytes_to_short_address(&chunk[start..end]) { monitors.push(monitor); } } NodeInfo::set_monitors_from_mapping(&address, monitors).await; if deleted_timestamp > 0 { NodeInfo::set_deleted_metadata_from_mapping(&address, deleted_timestamp, deleted_block) .await; } } if !buffer.is_empty() { return Err("network mapping response had trailing partial bytes".to_string()); } Ok(()) }