From 0845fd3ba6a29e4efb941dc4e200e532a0d43c3c Mon Sep 17 00:00:00 2001 From: viraladmin <00purple@gmail.com> Date: Thu, 11 Jun 2026 15:06:02 -0600 Subject: [PATCH] bug fixes --- src/records/memory/connections.rs | 21 +++++----- src/records/memory/network_mapping/add.rs | 10 +++-- src/records/memory/network_mapping/mod.rs | 4 +- src/records/memory/response_channels.rs | 21 ++++++++++ src/records/memory/structs.rs | 8 ++-- src/rpc/client/handshake_processing.rs | 44 +++++++++++---------- src/rpc/client/register_wallet.rs | 13 +++++- src/rpc/commands/random_node.rs | 14 +++++-- src/rpc/commands/route_reply.rs | 13 ++---- src/rpc/commands/tx_submit.rs | 44 ++++++++++++++------- src/rpc/commands/wallet_register.rs | 21 ++++------ src/rpc/server/connection_memory_manager.rs | 4 +- src/rpc/server/handshake.rs | 39 ++++++++++-------- src/rpc/server/rpc_command_loop.rs | 5 ++- src/torrent/create_metadata.rs | 11 ++++-- 15 files changed, 164 insertions(+), 108 deletions(-) diff --git a/src/records/memory/connections.rs b/src/records/memory/connections.rs index 181c520..d70d21c 100644 --- a/src/records/memory/connections.rs +++ b/src/records/memory/connections.rs @@ -224,7 +224,7 @@ impl Connection { port, stream, client_type, - wallet_public_key, + wallet_short_address, command_map: _, } = params; @@ -261,7 +261,7 @@ impl Connection { return false; } - if wallet_public_key.len() != Wallet::PUBLIC_KEY_LENGTH { + if !Wallet::short_address_validation(&wallet_short_address) { return false; } let connection_info = ConnectionInfo::new( @@ -270,7 +270,7 @@ impl Connection { port, stream.clone(), client_type.as_bytes(), - wallet_public_key.clone(), + wallet_short_address, ); self.connection_map.insert(connection_key, connection_info); @@ -296,7 +296,7 @@ impl Connection { spawn_monitor_update( ip.clone(), MONITOR_ACTION_REMOVE, - connection_info.wallet_public_key.clone(), + connection_info.wallet_short_address.clone(), port, ); } @@ -508,7 +508,7 @@ impl Connection { spawn_monitor_update( ip.clone(), MONITOR_ACTION_ADD, - info.wallet_public_key.clone(), + info.wallet_short_address.clone(), port, ); Connection::client_checkup( @@ -633,7 +633,7 @@ impl Connection { }) } - pub async fn get_wallet_for_connection_key(key: &str) -> Option> { + pub async fn get_wallet_for_connection_key(key: &str) -> Option { let (ip, port) = split_ip_port_key(key)?; let lock = CONNECTIONS.read().await; let conn = lock.as_ref()?; @@ -646,7 +646,7 @@ impl Connection { && connection_key.port == port && ClientType::from_bytes(&info.client_type) == Some(ClientType::Miner) { - Some(info.wallet_public_key.clone()) + Some(info.wallet_short_address.clone()) } else { None } @@ -765,7 +765,7 @@ impl Connection { } } -fn spawn_monitor_update(ip: String, action: u8, peer_public_key: Vec, port: u16) { +fn spawn_monitor_update(ip: String, action: u8, monitored_address: String, port: u16) { tokio::spawn(async move { let context = { let guard = RECONNECT_CONTEXT.lock().await; @@ -774,10 +774,9 @@ fn spawn_monitor_update(ip: String, action: u8, peer_public_key: Vec, port: let Some(context) = context else { return; }; - let Some(monitored_address) = Wallet::public_key_bytes_to_short_address(&peer_public_key) - else { + if !Wallet::short_address_validation(&monitored_address) { return; - }; + } let monitoring_address = context.wallet.saved.short_address.clone(); if monitored_address == monitoring_address { return; diff --git a/src/records/memory/network_mapping/add.rs b/src/records/memory/network_mapping/add.rs index 2aac309..a929fe8 100644 --- a/src/records/memory/network_mapping/add.rs +++ b/src/records/memory/network_mapping/add.rs @@ -1,4 +1,5 @@ use super::*; +use crate::records::memory::response_channels::reserve_transient_entry_with_context; const ONE_HOUR_MILLIS: u64 = 3_600_000; @@ -166,7 +167,12 @@ impl NodeInfo { match streams { streams if !streams.is_empty() => { for (peer_key, unlocked_stream) in streams { - let (hashmap_key, _hashmap_tx, hashmap_rx) = reserve_entry(map.clone()).await; + let hashmap_key = reserve_transient_entry_with_context( + map.clone(), + Some(message_type), + Some(peer_key.clone()), + ) + .await; let mut message: Vec = Vec::new(); message.push(message_type); message.extend_from_slice(&hashmap_key); @@ -186,8 +192,6 @@ impl NodeInfo { continue; } RpcResponse::send_raw(&unlocked_stream, Some(&peer_key), &message).await; - let mut rx = hashmap_rx.lock().await; - let _ = timeout(Duration::from_secs(5), rx.recv()).await; } } _ => { diff --git a/src/records/memory/network_mapping/mod.rs b/src/records/memory/network_mapping/mod.rs index 870faca..dde628e 100644 --- a/src/records/memory/network_mapping/mod.rs +++ b/src/records/memory/network_mapping/mod.rs @@ -14,13 +14,11 @@ use crate::records::memory::network_mapping::structs::{ AddAddressParams, DeleteAddressParams, MonitorAddressParams, SignedMonitorEdit, SignedNodeEdit, NODE_RECORD_FIXED_BYTES, }; -use crate::records::memory::response_channels::{reserve_entry, Command}; +use crate::records::memory::response_channels::Command; use crate::rpc::responses::RpcResponse; use crate::sled::Db; -use crate::timeout; use crate::wallets::structures::Wallet; use crate::Arc; -use crate::Duration; use crate::HashMap; use crate::Mutex; use crate::Utc; diff --git a/src/records/memory/response_channels.rs b/src/records/memory/response_channels.rs index fa85bfb..369e79c 100644 --- a/src/records/memory/response_channels.rs +++ b/src/records/memory/response_channels.rs @@ -103,6 +103,27 @@ pub async fn reserve_entry_with_context( } } +pub async fn reserve_transient_entry_with_context( + map: Arc>, + command: Option, + peer: Option, +) -> Byte3 { + let (key, _tx, rx) = reserve_entry_with_context(map.clone(), command, peer).await; + tokio::spawn(async move { + let received = { + let mut rx = rx.lock().await; + matches!(crate::timeout(Duration::from_secs(30), rx.recv()).await, Ok(Some(_))) + }; + if !received { + warn!( + "[rpc_trace] transient uid expired without reply: uid={key:?}" + ); + } + delete_entry(map, key).await; + }); + key +} + pub struct ReplyTrace { pub age_ms: u128, pub command: Option, diff --git a/src/records/memory/structs.rs b/src/records/memory/structs.rs index 1ca6026..d569f8c 100644 --- a/src/records/memory/structs.rs +++ b/src/records/memory/structs.rs @@ -15,7 +15,7 @@ pub struct ConnectionInfo { pub port: u16, pub stream: Arc>, pub client_type: Vec, - pub wallet_public_key: Vec, + pub wallet_short_address: String, pub wallet_registry_synced: bool, pub network_map_synced: bool, pub ready: bool, @@ -37,7 +37,7 @@ pub struct StoreConnectionParams { pub port: u16, pub stream: Arc>, pub client_type: ClientType, - pub wallet_public_key: Vec, + pub wallet_short_address: String, pub command_map: Arc>, } @@ -202,7 +202,7 @@ impl ConnectionInfo { port: u16, stream: Arc>, client_type: Vec, - wallet_public_key: Vec, + wallet_short_address: String, ) -> Self { ConnectionInfo { connection_type, @@ -210,7 +210,7 @@ impl ConnectionInfo { port, stream, client_type, - wallet_public_key, + wallet_short_address, wallet_registry_synced: false, network_map_synced: false, ready: false, diff --git a/src/rpc/client/handshake_processing.rs b/src/rpc/client/handshake_processing.rs index 81ac057..09e9ed2 100644 --- a/src/rpc/client/handshake_processing.rs +++ b/src/rpc/client/handshake_processing.rs @@ -22,8 +22,8 @@ use crate::records::memory::enums::{ClientType, ConnectionType}; use crate::records::memory::network_mapping::NodeInfo; use crate::records::memory::response_channels::{reserve_entry, Command}; use crate::records::memory::structs::{Connection, StoreConnectionParams}; -use crate::records::wallet_registry::{register_short_address, WalletRegistrationResult}; use crate::rpc::client::handshake::connect_and_handshake; +use crate::rpc::client::register_wallet::register_connected_wallet; use crate::rpc::client::structs::{Connect, Handshake}; use crate::rpc::client::syncing::node_syncing; use crate::rpc::client::wallet_registry_sync::sync_wallet_registry_with_retries; @@ -38,6 +38,7 @@ use crate::rpc::server::rpc_command_loop::start_loop; use crate::sled::Db; use crate::startup::network_broadcast::announce_self_to_network; use crate::startup::remote_height::request_remote_height; +use crate::sleep; use crate::timeout; use crate::wallets::structures::Wallet; use crate::Arc; @@ -225,13 +226,15 @@ pub async fn bootstrap_peer_discovery(mut params: BootstrapParams) -> Result<(), } } - info!("[sync] post-sync checks complete, mining resuming"); + info!("[sync] post-sync checks complete, mining grace period started"); if let Err(err) = NodeInfo::rebuild_mined_counts_from_chain(¶ms.db).await { error!("[startup] failed to rebuild mined counts after startup sync: {err}"); } for (peer_key, _) in startup_synced_peer_streams().await { mark_peer_operational(&peer_key, params.map.clone()).await; } + sleep(Duration::from_secs(15)).await; + info!("[sync] mining grace period complete, mining resuming"); set_node_mode(NodeMode::Normal); clear_mining_stop_request(); set_mining_state(MiningState::Idle); @@ -297,24 +300,8 @@ pub async fn process_handshake_response( "Handshake failed: invalid response public key", )); }; - - match register_short_address( - ¶ms.db, - &returned_short_address_bytes, - returned_public_key_bin, - ) { - Ok(WalletRegistrationResult::Inserted | WalletRegistrationResult::AlreadyRegistered) => {} - Ok(WalletRegistrationResult::Conflict) => { - return Err(io::Error::other( - "Handshake failed: peer public key conflicts with wallet registry", - )); - } - Err(err) => { - return Err(io::Error::other(format!( - "Handshake failed: could not register peer public key: {err}" - ))); - } - } + let returned_short_address = Wallet::bytes_to_short_address(&returned_short_address_bytes) + .ok_or_else(|| io::Error::other("Handshake failed: invalid peer short address"))?; // create and arc mutex of the stream let stream = Arc::new(Mutex::new(params.stream)); @@ -332,7 +319,7 @@ pub async fn process_handshake_response( port, stream: Arc::clone(&stream), client_type: ClientType::Miner, - wallet_public_key: returned_public_key_bin.to_vec(), + wallet_short_address: returned_short_address.clone(), command_map: params.map.clone(), }) { return Err(io::Error::other( @@ -367,6 +354,21 @@ pub async fn process_handshake_response( } mark_peer_wallet_registry_synced(&connections_key).await; + if let Err(err) = register_connected_wallet( + Arc::clone(&stream), + ¶ms.db, + params.map.clone(), + connections_key.clone(), + wallet, + ) + .await + { + remove_key_from_memory(&connections_key).await; + return Err(io::Error::other(format!( + "Wallet registration failed after handshake: {err}" + ))); + } + if params.first { announce_self_to_network( broadcast_stream.clone(), diff --git a/src/rpc/client/register_wallet.rs b/src/rpc/client/register_wallet.rs index bca348e..b115c77 100644 --- a/src/rpc/client/register_wallet.rs +++ b/src/rpc/client/register_wallet.rs @@ -2,8 +2,10 @@ use crate::common::skein::skein_256_hash_bytes; use crate::decode; use crate::log::warn; use crate::records::memory::response_channels::{reserve_entry, Command}; +use crate::records::wallet_registry::{register_short_address, WalletRegistrationResult}; use crate::rpc::command_maps::RPC_REGISTER_WALLET; use crate::rpc::responses::RpcResponse; +use crate::sled::Db; use crate::timeout; use crate::wallets::structures::Wallet; use crate::Arc; @@ -13,6 +15,7 @@ use crate::TcpStream; pub async fn register_connected_wallet( stream: Arc>, + db: &Db, map: Arc>, connections_key: String, wallet: &Wallet, @@ -61,7 +64,15 @@ pub async fn register_connected_wallet( let response_text = String::from_utf8_lossy(&response); match response_text.trim() { - "1" => Ok(()), + "1" => match register_short_address(db, &short_address_bytes, &public_key_bytes) { + Ok(WalletRegistrationResult::Inserted | WalletRegistrationResult::AlreadyRegistered) => { + Ok(()) + } + Ok(WalletRegistrationResult::Conflict) => { + Err("Local wallet registry conflicts with startup wallet".to_string()) + } + Err(err) => Err(format!("Failed to record startup wallet locally: {err}")), + }, "0" => Err("Peer rejected wallet registration".to_string()), other => { warn!("[wallet_registry] unexpected wallet registration response: {other}"); diff --git a/src/rpc/commands/random_node.rs b/src/rpc/commands/random_node.rs index 07eb632..03d73aa 100644 --- a/src/rpc/commands/random_node.rs +++ b/src/rpc/commands/random_node.rs @@ -1,11 +1,17 @@ use crate::records::memory::connections::CONNECTIONS; use crate::records::memory::network_mapping::NodeInfo; +use crate::records::block_height::get_block_height::get_height; use crate::rpc::responses::RpcResponse; +use crate::sled::Db; -pub async fn request_node(connections_key: &str) -> RpcResponse { - // Return one random connected node that can legally sponsor/add nodes - // under the mature-network network-map rules. - let eligible_ips = NodeInfo::eligible_sponsor_ips().await; +pub async fn request_node(db: &Db, connections_key: &str) -> RpcResponse { + // Before the mature-network gate, node adds are intentionally permissive. + // After height 10,000, only mature sponsors can be returned. + let eligible_ips = if get_height(db) > 10000 { + NodeInfo::eligible_sponsor_ips().await + } else { + NodeInfo::active_node_ips().await + }; if eligible_ips.is_empty() { return RpcResponse::Binary(b"Error: No Eligible Sponsor Found".to_vec()); } diff --git a/src/rpc/commands/route_reply.rs b/src/rpc/commands/route_reply.rs index 041f9e1..acf814c 100644 --- a/src/rpc/commands/route_reply.rs +++ b/src/rpc/commands/route_reply.rs @@ -4,7 +4,6 @@ use crate::records::memory::response_channels::{ delete_entry, get_entry, is_retired_entry, trace_entry, Command, }; use crate::rpc::command_maps::MAX_RPC_REPLY_BYTES; -use crate::rpc::commands::bad_rpc_call; use crate::rpc::read_bytes_from_stream; use crate::sled::Db; use crate::wallets::structures::Wallet; @@ -15,11 +14,11 @@ use crate::TcpStream; pub async fn route_reply( connections_key: &str, stream_locked: Arc>, - db: &Db, - wallet: Arc, + _db: &Db, + _wallet: Arc, map: Arc>, - ip: &str, - client_type: ClientType, + _ip: &str, + _client_type: ClientType, ) -> Result<(), String> { // Replies are command 255 packets: UID, payload length, then the // response bytes for the task waiting on that UID. @@ -30,7 +29,6 @@ pub async fn route_reply( read_bytes_from_stream::read_u32_from_stream(connections_key, stream_locked.clone()).await? as usize; if message_length > MAX_RPC_REPLY_BYTES { - bad_rpc_call::record(ip, client_type, db, wallet.clone()).await; return Err(format!( "error: RPC reply payload too large: len={message_length} max={MAX_RPC_REPLY_BYTES}" )); @@ -82,9 +80,6 @@ pub async fn route_reply( warn!("[rpc] late reply arrived for retired uid: {uid:?}"); } } else { - // Unknown, never-reserved UIDs can still indicate malformed or - // forged traffic, so those keep counting as bad RPC behavior. - bad_rpc_call::record(ip, client_type, db, wallet).await; warn!("[rpc] reply arrived for unknown uid: {uid:?}"); } let _ = read_bytes_from_stream::read_usize_from_stream( diff --git a/src/rpc/commands/tx_submit.rs b/src/rpc/commands/tx_submit.rs index eabeb26..fd73595 100644 --- a/src/rpc/commands/tx_submit.rs +++ b/src/rpc/commands/tx_submit.rs @@ -15,13 +15,17 @@ use crate::common::types::{ }; use crate::records::memory::connections::get_client_type_from_memory; use crate::records::memory::enums::ClientType; -use crate::records::memory::response_channels::generate_uid; +use crate::records::memory::response_channels::{ + reserve_transient_entry_with_context, Command, +}; use crate::rpc::command_maps::RPC_SUBMIT_TRANSACTION; use crate::rpc::responses::RpcResponse; use crate::sled::Db; use crate::torrent::torrenting_system::get_nodes::get_nodes_from_memory; +use crate::Arc; +use crate::Mutex; -async fn broadcast_tx(tx_bytes: Vec) { +async fn broadcast_tx(tx_bytes: Vec, map: Arc>) { // Broadcast newly accepted mempool transactions only to miner peers, // since those are the nodes that need transaction fan-out most. let nodes = get_nodes_from_memory().await; @@ -34,7 +38,12 @@ async fn broadcast_tx(tx_bytes: Vec) { continue; } - let uid = generate_uid(); + let uid = reserve_transient_entry_with_context( + map.clone(), + Some(RPC_SUBMIT_TRANSACTION), + Some(key.clone()), + ) + .await; let mut message = Vec::with_capacity(4 + tx_bytes.len()); // Reuse the original submit-transaction wire shape when // rebroadcasting to miner peers: command, UID, transaction bytes. @@ -45,7 +54,12 @@ async fn broadcast_tx(tx_bytes: Vec) { } } -pub async fn save_and_submit(txtype: u8, tx: Vec, db: &Db) -> RpcResponse { +pub async fn save_and_submit( + txtype: u8, + tx: Vec, + db: &Db, + map: Arc>, +) -> RpcResponse { // Decode, verify, add to the mempool, and then broadcast according to // the declared transaction type. // In each branch, a non-empty verifier response means the transaction @@ -64,7 +78,7 @@ pub async fn save_and_submit(txtype: u8, tx: Vec, db: &Db) -> RpcResponse { match transfer.to_bytes().await { Ok(transfer_bytes) => match transfer.add_to_memory().await { Ok(_) => { - broadcast_tx(transfer_bytes).await; + broadcast_tx(transfer_bytes, map.clone()).await; let msg = "successful_broadcast: true".to_string().as_bytes().to_vec(); RpcResponse::Binary(msg) @@ -103,7 +117,7 @@ pub async fn save_and_submit(txtype: u8, tx: Vec, db: &Db) -> RpcResponse { match create_token.to_bytes().await { Ok(token_bytes) => match create_token.add_to_memory().await { Ok(_) => { - broadcast_tx(token_bytes).await; + broadcast_tx(token_bytes, map.clone()).await; let msg = "successful_broadcast: true".to_string().as_bytes().to_vec(); RpcResponse::Binary(msg) @@ -142,7 +156,7 @@ pub async fn save_and_submit(txtype: u8, tx: Vec, db: &Db) -> RpcResponse { match create_nft.to_bytes().await { Ok(nft_bytes) => match create_nft.add_to_memory().await { Ok(_) => { - broadcast_tx(nft_bytes).await; + broadcast_tx(nft_bytes, map.clone()).await; let msg = "successful_broadcast: true".to_string().as_bytes().to_vec(); RpcResponse::Binary(msg) @@ -181,7 +195,7 @@ pub async fn save_and_submit(txtype: u8, tx: Vec, db: &Db) -> RpcResponse { match burn.to_bytes().await { Ok(burn_bytes) => match burn.add_to_memory().await { Ok(_) => { - broadcast_tx(burn_bytes).await; + broadcast_tx(burn_bytes, map.clone()).await; let msg = "successful_broadcast: true".to_string().as_bytes().to_vec(); RpcResponse::Binary(msg) @@ -220,7 +234,7 @@ pub async fn save_and_submit(txtype: u8, tx: Vec, db: &Db) -> RpcResponse { match issue_token.to_bytes().await { Ok(issue_token_bytes) => match issue_token.add_to_memory().await { Ok(_) => { - broadcast_tx(issue_token_bytes).await; + broadcast_tx(issue_token_bytes, map.clone()).await; let msg = "successful_broadcast: true".to_string().as_bytes().to_vec(); RpcResponse::Binary(msg) @@ -259,7 +273,7 @@ pub async fn save_and_submit(txtype: u8, tx: Vec, db: &Db) -> RpcResponse { match marketing.to_bytes().await { Ok(marketing_bytes) => match marketing.add_to_memory().await { Ok(_) => { - broadcast_tx(marketing_bytes).await; + broadcast_tx(marketing_bytes, map.clone()).await; let msg = "successful_broadcast: true".to_string().as_bytes().to_vec(); RpcResponse::Binary(msg) @@ -298,7 +312,7 @@ pub async fn save_and_submit(txtype: u8, tx: Vec, db: &Db) -> RpcResponse { match swap.to_bytes().await { Ok(swap_bytes) => match swap.add_to_memory().await { Ok(_) => { - broadcast_tx(swap_bytes).await; + broadcast_tx(swap_bytes, map.clone()).await; let msg = "successful_broadcast: true".to_string().as_bytes().to_vec(); RpcResponse::Binary(msg) @@ -337,7 +351,7 @@ pub async fn save_and_submit(txtype: u8, tx: Vec, db: &Db) -> RpcResponse { match loan.to_bytes().await { Ok(loan_bytes) => match loan.add_to_memory().await { Ok(_) => { - broadcast_tx(loan_bytes).await; + broadcast_tx(loan_bytes, map.clone()).await; let msg = "successful_broadcast: true".to_string().as_bytes().to_vec(); RpcResponse::Binary(msg) @@ -376,7 +390,7 @@ pub async fn save_and_submit(txtype: u8, tx: Vec, db: &Db) -> RpcResponse { match payment.to_bytes().await { Ok(payment_bytes) => match payment.add_to_memory().await { Ok(_) => { - broadcast_tx(payment_bytes).await; + broadcast_tx(payment_bytes, map.clone()).await; let msg = "successful_broadcast: true".to_string().as_bytes().to_vec(); RpcResponse::Binary(msg) @@ -415,7 +429,7 @@ pub async fn save_and_submit(txtype: u8, tx: Vec, db: &Db) -> RpcResponse { match collateral.to_bytes().await { Ok(collateral_bytes) => match collateral.add_to_memory().await { Ok(_) => { - broadcast_tx(collateral_bytes).await; + broadcast_tx(collateral_bytes, map.clone()).await; let msg = "successful_broadcast: true".to_string().as_bytes().to_vec(); RpcResponse::Binary(msg) @@ -454,7 +468,7 @@ pub async fn save_and_submit(txtype: u8, tx: Vec, db: &Db) -> RpcResponse { match vanity.to_bytes().await { Ok(vanity_bytes) => match vanity.add_to_memory().await { Ok(_) => { - broadcast_tx(vanity_bytes).await; + broadcast_tx(vanity_bytes, map.clone()).await; let msg = "successful_broadcast: true".to_string().as_bytes().to_vec(); RpcResponse::Binary(msg) diff --git a/src/rpc/commands/wallet_register.rs b/src/rpc/commands/wallet_register.rs index 2763346..c8d25f6 100644 --- a/src/rpc/commands/wallet_register.rs +++ b/src/rpc/commands/wallet_register.rs @@ -1,16 +1,13 @@ use crate::common::skein::skein_256_hash_bytes; use crate::decode; -use crate::log::warn; use crate::records::memory::connections::CONNECTIONS; -use crate::records::memory::response_channels::{reserve_entry, Command}; +use crate::records::memory::response_channels::{reserve_transient_entry_with_context, Command}; use crate::records::wallet_registry::{register_short_address, WalletRegistrationResult}; use crate::rpc::command_maps::RPC_REGISTER_WALLET; use crate::rpc::responses::RpcResponse; use crate::sled::Db; -use crate::timeout; use crate::wallets::structures::Wallet; use crate::Arc; -use crate::Duration; use crate::Mutex; async fn broadcast_wallet_registration( @@ -43,7 +40,12 @@ async fn broadcast_wallet_registration( } } - let (hashmap_key, _hashmap_tx, hashmap_rx) = reserve_entry(map.clone()).await; + let hashmap_key = reserve_transient_entry_with_context( + map.clone(), + Some(RPC_REGISTER_WALLET), + Some(peer_key.clone()), + ) + .await; let mut message = Vec::with_capacity( 1 + 3 + Wallet::SHORT_ADDRESS_BYTES_LENGTH @@ -58,15 +60,6 @@ async fn broadcast_wallet_registration( message.extend_from_slice(&signature_bytes); RpcResponse::send_raw(&unlocked_stream, Some(&peer_key), &message).await; - - let response = { - let mut rx = hashmap_rx.lock().await; - timeout(Duration::from_secs(5), rx.recv()).await - }; - - if response.is_err() { - warn!("[wallet_registry] timed out waiting for rebroadcast acknowledgement"); - } } } diff --git a/src/rpc/server/connection_memory_manager.rs b/src/rpc/server/connection_memory_manager.rs index 820caab..da22642 100644 --- a/src/rpc/server/connection_memory_manager.rs +++ b/src/rpc/server/connection_memory_manager.rs @@ -28,7 +28,7 @@ pub async fn write_to_memory( received_ip_port: &str, stream: Arc>, client_type: &str, - wallet_public_key: Vec, + wallet_short_address: String, command_map: Arc>, ) -> String { // Reject unknown connection labels before the connection manager is @@ -52,7 +52,7 @@ pub async fn write_to_memory( port, stream: stream.clone(), client_type, - wallet_public_key, + wallet_short_address, command_map, }); diff --git a/src/rpc/server/handshake.rs b/src/rpc/server/handshake.rs index d4e2cde..853343c 100644 --- a/src/rpc/server/handshake.rs +++ b/src/rpc/server/handshake.rs @@ -15,7 +15,7 @@ use crate::records::memory::network_mapping::NodeInfo; use crate::records::memory::response_channels::generate_uid; use crate::records::memory::response_channels::Command; use crate::records::memory::structs::Connection; -use crate::records::wallet_registry::{register_short_address, WalletRegistrationResult}; +use crate::rpc::client::register_wallet::register_connected_wallet; use crate::rpc::client::syncing::node_syncing; use crate::rpc::client::wallet_registry_sync::sync_wallet_registry_with_retries; use crate::rpc::responses::RpcResponse; @@ -197,6 +197,20 @@ async fn complete_incoming_miner_setup( } mark_peer_wallet_registry_synced(connections_key).await; + if let Err(err) = register_connected_wallet( + stream.clone(), + db, + map.clone(), + connections_key.to_string(), + &wallet, + ) + .await + { + error!("[startup] incoming peer wallet registration failed: {err}"); + remove_key_from_memory(connections_key).await; + return; + } + let short_address = wallet.saved.short_address.clone(); if let Err(err) = announce_self_to_network( stream.clone(), @@ -233,6 +247,7 @@ async fn complete_incoming_miner_setup( if operational { mark_peer_operational(connections_key, map.clone()).await; + sleep(Duration::from_secs(15)).await; set_node_mode(NodeMode::Normal); clear_mining_stop_request(); set_mining_state(MiningState::Idle); @@ -370,27 +385,19 @@ pub async fn handle_handshake( drop_failed_handshake(&stream).await; return; }; - - match register_short_address( - &db, - &received_short_address_bytes, - &received_public_key_bytes, - ) { - Ok( - WalletRegistrationResult::Inserted | WalletRegistrationResult::AlreadyRegistered, - ) => {} - Ok(WalletRegistrationResult::Conflict) | Err(_) => { - drop_failed_handshake(&stream).await; - return; - } - } + let Some(received_short_address) = + Wallet::bytes_to_short_address(&received_short_address_bytes) + else { + drop_failed_handshake(&stream).await; + return; + }; // write to memory let connections_key = write_to_memory( &received_ip, stream.clone(), connection_type, - received_public_key_bytes, + received_short_address, map.clone(), ) .await; diff --git a/src/rpc/server/rpc_command_loop.rs b/src/rpc/server/rpc_command_loop.rs index 00fd4ae..6d05978 100644 --- a/src/rpc/server/rpc_command_loop.rs +++ b/src/rpc/server/rpc_command_loop.rs @@ -68,7 +68,7 @@ pub async fn start_loop( ) .await?; - let result = commands::random_node::request_node(&connections_key).await; + let result = commands::random_node::request_node(&db, &connections_key).await; result .send(&stream_locked, Some(&connections_key), uid) .await; @@ -176,7 +176,8 @@ pub async fn start_loop( ) .await?; - let result = commands::tx_submit::save_and_submit(txtype, tx, &db).await; + let result = + commands::tx_submit::save_and_submit(txtype, tx, &db, map.clone()).await; result .send(&stream_locked, Some(&connections_key), uid) .await; diff --git a/src/torrent/create_metadata.rs b/src/torrent/create_metadata.rs index 28b814d..ccbdf66 100644 --- a/src/torrent/create_metadata.rs +++ b/src/torrent/create_metadata.rs @@ -2,7 +2,7 @@ use crate::blocks::block::{NONCE_OFFSET, VRF_OFFSET}; use crate::common::network_paths_and_settings::block_extension_and_paths; use crate::common::skein::skein_128_hash_bytes; use crate::log::error; -use crate::records::memory::response_channels::{reserve_entry, Command}; +use crate::records::memory::response_channels::{reserve_transient_entry_with_context, Command}; use crate::rpc::command_maps::RPC_SUBMIT_TORRENT; use crate::rpc::responses::RpcResponse; use crate::torrent::structs::{Info, Torrent}; @@ -156,8 +156,13 @@ pub async fn broadcast_new_torrent_to_peers( // Send the torrent to all currently connected miner peers. let peers = get_torrent_broadcast_nodes_from_memory().await; for (connections_key, stream) in peers { - // Each peer needs its own reply mapping entry and UID. - let (uid_bytes, _tx, _rx) = reserve_entry(map.clone()).await; + // Each peer gets a short-lived reply mapping so the ack can be drained. + let uid_bytes = reserve_transient_entry_with_context( + map.clone(), + Some(RPC_SUBMIT_TORRENT), + Some(connections_key.clone()), + ) + .await; let mut message = Vec::with_capacity(1 + 3 + 4 + 4 + torrent_bytes.len()); message.push(command); // Command byte