bug fixes

This commit is contained in:
viraladmin 2026-06-11 15:06:02 -06:00
parent 64df13519a
commit 0845fd3ba6
15 changed files with 164 additions and 108 deletions

View File

@ -224,7 +224,7 @@ impl Connection {
port, port,
stream, stream,
client_type, client_type,
wallet_public_key, wallet_short_address,
command_map: _, command_map: _,
} = params; } = params;
@ -261,7 +261,7 @@ impl Connection {
return false; return false;
} }
if wallet_public_key.len() != Wallet::PUBLIC_KEY_LENGTH { if !Wallet::short_address_validation(&wallet_short_address) {
return false; return false;
} }
let connection_info = ConnectionInfo::new( let connection_info = ConnectionInfo::new(
@ -270,7 +270,7 @@ impl Connection {
port, port,
stream.clone(), stream.clone(),
client_type.as_bytes(), client_type.as_bytes(),
wallet_public_key.clone(), wallet_short_address,
); );
self.connection_map.insert(connection_key, connection_info); self.connection_map.insert(connection_key, connection_info);
@ -296,7 +296,7 @@ impl Connection {
spawn_monitor_update( spawn_monitor_update(
ip.clone(), ip.clone(),
MONITOR_ACTION_REMOVE, MONITOR_ACTION_REMOVE,
connection_info.wallet_public_key.clone(), connection_info.wallet_short_address.clone(),
port, port,
); );
} }
@ -508,7 +508,7 @@ impl Connection {
spawn_monitor_update( spawn_monitor_update(
ip.clone(), ip.clone(),
MONITOR_ACTION_ADD, MONITOR_ACTION_ADD,
info.wallet_public_key.clone(), info.wallet_short_address.clone(),
port, port,
); );
Connection::client_checkup( Connection::client_checkup(
@ -633,7 +633,7 @@ impl Connection {
}) })
} }
pub async fn get_wallet_for_connection_key(key: &str) -> Option<Vec<u8>> { pub async fn get_wallet_for_connection_key(key: &str) -> Option<String> {
let (ip, port) = split_ip_port_key(key)?; let (ip, port) = split_ip_port_key(key)?;
let lock = CONNECTIONS.read().await; let lock = CONNECTIONS.read().await;
let conn = lock.as_ref()?; let conn = lock.as_ref()?;
@ -646,7 +646,7 @@ impl Connection {
&& connection_key.port == port && connection_key.port == port
&& ClientType::from_bytes(&info.client_type) == Some(ClientType::Miner) && ClientType::from_bytes(&info.client_type) == Some(ClientType::Miner)
{ {
Some(info.wallet_public_key.clone()) Some(info.wallet_short_address.clone())
} else { } else {
None None
} }
@ -765,7 +765,7 @@ impl Connection {
} }
} }
fn spawn_monitor_update(ip: String, action: u8, peer_public_key: Vec<u8>, port: u16) { fn spawn_monitor_update(ip: String, action: u8, monitored_address: String, port: u16) {
tokio::spawn(async move { tokio::spawn(async move {
let context = { let context = {
let guard = RECONNECT_CONTEXT.lock().await; let guard = RECONNECT_CONTEXT.lock().await;
@ -774,10 +774,9 @@ fn spawn_monitor_update(ip: String, action: u8, peer_public_key: Vec<u8>, port:
let Some(context) = context else { let Some(context) = context else {
return; return;
}; };
let Some(monitored_address) = Wallet::public_key_bytes_to_short_address(&peer_public_key) if !Wallet::short_address_validation(&monitored_address) {
else {
return; return;
}; }
let monitoring_address = context.wallet.saved.short_address.clone(); let monitoring_address = context.wallet.saved.short_address.clone();
if monitored_address == monitoring_address { if monitored_address == monitoring_address {
return; return;

View File

@ -1,4 +1,5 @@
use super::*; use super::*;
use crate::records::memory::response_channels::reserve_transient_entry_with_context;
const ONE_HOUR_MILLIS: u64 = 3_600_000; const ONE_HOUR_MILLIS: u64 = 3_600_000;
@ -166,7 +167,12 @@ impl NodeInfo {
match streams { match streams {
streams if !streams.is_empty() => { streams if !streams.is_empty() => {
for (peer_key, unlocked_stream) in streams { for (peer_key, unlocked_stream) in streams {
let (hashmap_key, _hashmap_tx, hashmap_rx) = reserve_entry(map.clone()).await; let hashmap_key = reserve_transient_entry_with_context(
map.clone(),
Some(message_type),
Some(peer_key.clone()),
)
.await;
let mut message: Vec<u8> = Vec::new(); let mut message: Vec<u8> = Vec::new();
message.push(message_type); message.push(message_type);
message.extend_from_slice(&hashmap_key); message.extend_from_slice(&hashmap_key);
@ -186,8 +192,6 @@ impl NodeInfo {
continue; continue;
} }
RpcResponse::send_raw(&unlocked_stream, Some(&peer_key), &message).await; RpcResponse::send_raw(&unlocked_stream, Some(&peer_key), &message).await;
let mut rx = hashmap_rx.lock().await;
let _ = timeout(Duration::from_secs(5), rx.recv()).await;
} }
} }
_ => { _ => {

View File

@ -14,13 +14,11 @@ use crate::records::memory::network_mapping::structs::{
AddAddressParams, DeleteAddressParams, MonitorAddressParams, SignedMonitorEdit, SignedNodeEdit, AddAddressParams, DeleteAddressParams, MonitorAddressParams, SignedMonitorEdit, SignedNodeEdit,
NODE_RECORD_FIXED_BYTES, NODE_RECORD_FIXED_BYTES,
}; };
use crate::records::memory::response_channels::{reserve_entry, Command}; use crate::records::memory::response_channels::Command;
use crate::rpc::responses::RpcResponse; use crate::rpc::responses::RpcResponse;
use crate::sled::Db; use crate::sled::Db;
use crate::timeout;
use crate::wallets::structures::Wallet; use crate::wallets::structures::Wallet;
use crate::Arc; use crate::Arc;
use crate::Duration;
use crate::HashMap; use crate::HashMap;
use crate::Mutex; use crate::Mutex;
use crate::Utc; use crate::Utc;

View File

@ -103,6 +103,27 @@ pub async fn reserve_entry_with_context(
} }
} }
pub async fn reserve_transient_entry_with_context(
map: Arc<Mutex<Command>>,
command: Option<u8>,
peer: Option<String>,
) -> Byte3 {
let (key, _tx, rx) = reserve_entry_with_context(map.clone(), command, peer).await;
tokio::spawn(async move {
let received = {
let mut rx = rx.lock().await;
matches!(crate::timeout(Duration::from_secs(30), rx.recv()).await, Ok(Some(_)))
};
if !received {
warn!(
"[rpc_trace] transient uid expired without reply: uid={key:?}"
);
}
delete_entry(map, key).await;
});
key
}
pub struct ReplyTrace { pub struct ReplyTrace {
pub age_ms: u128, pub age_ms: u128,
pub command: Option<u8>, pub command: Option<u8>,

View File

@ -15,7 +15,7 @@ pub struct ConnectionInfo {
pub port: u16, pub port: u16,
pub stream: Arc<Mutex<TcpStream>>, pub stream: Arc<Mutex<TcpStream>>,
pub client_type: Vec<u8>, pub client_type: Vec<u8>,
pub wallet_public_key: Vec<u8>, pub wallet_short_address: String,
pub wallet_registry_synced: bool, pub wallet_registry_synced: bool,
pub network_map_synced: bool, pub network_map_synced: bool,
pub ready: bool, pub ready: bool,
@ -37,7 +37,7 @@ pub struct StoreConnectionParams {
pub port: u16, pub port: u16,
pub stream: Arc<Mutex<TcpStream>>, pub stream: Arc<Mutex<TcpStream>>,
pub client_type: ClientType, pub client_type: ClientType,
pub wallet_public_key: Vec<u8>, pub wallet_short_address: String,
pub command_map: Arc<Mutex<Command>>, pub command_map: Arc<Mutex<Command>>,
} }
@ -202,7 +202,7 @@ impl ConnectionInfo {
port: u16, port: u16,
stream: Arc<Mutex<TcpStream>>, stream: Arc<Mutex<TcpStream>>,
client_type: Vec<u8>, client_type: Vec<u8>,
wallet_public_key: Vec<u8>, wallet_short_address: String,
) -> Self { ) -> Self {
ConnectionInfo { ConnectionInfo {
connection_type, connection_type,
@ -210,7 +210,7 @@ impl ConnectionInfo {
port, port,
stream, stream,
client_type, client_type,
wallet_public_key, wallet_short_address,
wallet_registry_synced: false, wallet_registry_synced: false,
network_map_synced: false, network_map_synced: false,
ready: false, ready: false,

View File

@ -22,8 +22,8 @@ use crate::records::memory::enums::{ClientType, ConnectionType};
use crate::records::memory::network_mapping::NodeInfo; use crate::records::memory::network_mapping::NodeInfo;
use crate::records::memory::response_channels::{reserve_entry, Command}; use crate::records::memory::response_channels::{reserve_entry, Command};
use crate::records::memory::structs::{Connection, StoreConnectionParams}; use crate::records::memory::structs::{Connection, StoreConnectionParams};
use crate::records::wallet_registry::{register_short_address, WalletRegistrationResult};
use crate::rpc::client::handshake::connect_and_handshake; use crate::rpc::client::handshake::connect_and_handshake;
use crate::rpc::client::register_wallet::register_connected_wallet;
use crate::rpc::client::structs::{Connect, Handshake}; use crate::rpc::client::structs::{Connect, Handshake};
use crate::rpc::client::syncing::node_syncing; use crate::rpc::client::syncing::node_syncing;
use crate::rpc::client::wallet_registry_sync::sync_wallet_registry_with_retries; use crate::rpc::client::wallet_registry_sync::sync_wallet_registry_with_retries;
@ -38,6 +38,7 @@ use crate::rpc::server::rpc_command_loop::start_loop;
use crate::sled::Db; use crate::sled::Db;
use crate::startup::network_broadcast::announce_self_to_network; use crate::startup::network_broadcast::announce_self_to_network;
use crate::startup::remote_height::request_remote_height; use crate::startup::remote_height::request_remote_height;
use crate::sleep;
use crate::timeout; use crate::timeout;
use crate::wallets::structures::Wallet; use crate::wallets::structures::Wallet;
use crate::Arc; use crate::Arc;
@ -225,13 +226,15 @@ pub async fn bootstrap_peer_discovery(mut params: BootstrapParams) -> Result<(),
} }
} }
info!("[sync] post-sync checks complete, mining resuming"); info!("[sync] post-sync checks complete, mining grace period started");
if let Err(err) = NodeInfo::rebuild_mined_counts_from_chain(&params.db).await { if let Err(err) = NodeInfo::rebuild_mined_counts_from_chain(&params.db).await {
error!("[startup] failed to rebuild mined counts after startup sync: {err}"); error!("[startup] failed to rebuild mined counts after startup sync: {err}");
} }
for (peer_key, _) in startup_synced_peer_streams().await { for (peer_key, _) in startup_synced_peer_streams().await {
mark_peer_operational(&peer_key, params.map.clone()).await; mark_peer_operational(&peer_key, params.map.clone()).await;
} }
sleep(Duration::from_secs(15)).await;
info!("[sync] mining grace period complete, mining resuming");
set_node_mode(NodeMode::Normal); set_node_mode(NodeMode::Normal);
clear_mining_stop_request(); clear_mining_stop_request();
set_mining_state(MiningState::Idle); set_mining_state(MiningState::Idle);
@ -297,24 +300,8 @@ pub async fn process_handshake_response(
"Handshake failed: invalid response public key", "Handshake failed: invalid response public key",
)); ));
}; };
let returned_short_address = Wallet::bytes_to_short_address(&returned_short_address_bytes)
match register_short_address( .ok_or_else(|| io::Error::other("Handshake failed: invalid peer short address"))?;
&params.db,
&returned_short_address_bytes,
returned_public_key_bin,
) {
Ok(WalletRegistrationResult::Inserted | WalletRegistrationResult::AlreadyRegistered) => {}
Ok(WalletRegistrationResult::Conflict) => {
return Err(io::Error::other(
"Handshake failed: peer public key conflicts with wallet registry",
));
}
Err(err) => {
return Err(io::Error::other(format!(
"Handshake failed: could not register peer public key: {err}"
)));
}
}
// create and arc mutex of the stream // create and arc mutex of the stream
let stream = Arc::new(Mutex::new(params.stream)); let stream = Arc::new(Mutex::new(params.stream));
@ -332,7 +319,7 @@ pub async fn process_handshake_response(
port, port,
stream: Arc::clone(&stream), stream: Arc::clone(&stream),
client_type: ClientType::Miner, client_type: ClientType::Miner,
wallet_public_key: returned_public_key_bin.to_vec(), wallet_short_address: returned_short_address.clone(),
command_map: params.map.clone(), command_map: params.map.clone(),
}) { }) {
return Err(io::Error::other( return Err(io::Error::other(
@ -367,6 +354,21 @@ pub async fn process_handshake_response(
} }
mark_peer_wallet_registry_synced(&connections_key).await; mark_peer_wallet_registry_synced(&connections_key).await;
if let Err(err) = register_connected_wallet(
Arc::clone(&stream),
&params.db,
params.map.clone(),
connections_key.clone(),
wallet,
)
.await
{
remove_key_from_memory(&connections_key).await;
return Err(io::Error::other(format!(
"Wallet registration failed after handshake: {err}"
)));
}
if params.first { if params.first {
announce_self_to_network( announce_self_to_network(
broadcast_stream.clone(), broadcast_stream.clone(),

View File

@ -2,8 +2,10 @@ use crate::common::skein::skein_256_hash_bytes;
use crate::decode; use crate::decode;
use crate::log::warn; use crate::log::warn;
use crate::records::memory::response_channels::{reserve_entry, Command}; use crate::records::memory::response_channels::{reserve_entry, Command};
use crate::records::wallet_registry::{register_short_address, WalletRegistrationResult};
use crate::rpc::command_maps::RPC_REGISTER_WALLET; use crate::rpc::command_maps::RPC_REGISTER_WALLET;
use crate::rpc::responses::RpcResponse; use crate::rpc::responses::RpcResponse;
use crate::sled::Db;
use crate::timeout; use crate::timeout;
use crate::wallets::structures::Wallet; use crate::wallets::structures::Wallet;
use crate::Arc; use crate::Arc;
@ -13,6 +15,7 @@ use crate::TcpStream;
pub async fn register_connected_wallet( pub async fn register_connected_wallet(
stream: Arc<Mutex<TcpStream>>, stream: Arc<Mutex<TcpStream>>,
db: &Db,
map: Arc<Mutex<Command>>, map: Arc<Mutex<Command>>,
connections_key: String, connections_key: String,
wallet: &Wallet, wallet: &Wallet,
@ -61,7 +64,15 @@ pub async fn register_connected_wallet(
let response_text = String::from_utf8_lossy(&response); let response_text = String::from_utf8_lossy(&response);
match response_text.trim() { match response_text.trim() {
"1" => Ok(()), "1" => match register_short_address(db, &short_address_bytes, &public_key_bytes) {
Ok(WalletRegistrationResult::Inserted | WalletRegistrationResult::AlreadyRegistered) => {
Ok(())
}
Ok(WalletRegistrationResult::Conflict) => {
Err("Local wallet registry conflicts with startup wallet".to_string())
}
Err(err) => Err(format!("Failed to record startup wallet locally: {err}")),
},
"0" => Err("Peer rejected wallet registration".to_string()), "0" => Err("Peer rejected wallet registration".to_string()),
other => { other => {
warn!("[wallet_registry] unexpected wallet registration response: {other}"); warn!("[wallet_registry] unexpected wallet registration response: {other}");

View File

@ -1,11 +1,17 @@
use crate::records::memory::connections::CONNECTIONS; use crate::records::memory::connections::CONNECTIONS;
use crate::records::memory::network_mapping::NodeInfo; use crate::records::memory::network_mapping::NodeInfo;
use crate::records::block_height::get_block_height::get_height;
use crate::rpc::responses::RpcResponse; use crate::rpc::responses::RpcResponse;
use crate::sled::Db;
pub async fn request_node(connections_key: &str) -> RpcResponse { pub async fn request_node(db: &Db, connections_key: &str) -> RpcResponse {
// Return one random connected node that can legally sponsor/add nodes // Before the mature-network gate, node adds are intentionally permissive.
// under the mature-network network-map rules. // After height 10,000, only mature sponsors can be returned.
let eligible_ips = NodeInfo::eligible_sponsor_ips().await; let eligible_ips = if get_height(db) > 10000 {
NodeInfo::eligible_sponsor_ips().await
} else {
NodeInfo::active_node_ips().await
};
if eligible_ips.is_empty() { if eligible_ips.is_empty() {
return RpcResponse::Binary(b"Error: No Eligible Sponsor Found".to_vec()); return RpcResponse::Binary(b"Error: No Eligible Sponsor Found".to_vec());
} }

View File

@ -4,7 +4,6 @@ use crate::records::memory::response_channels::{
delete_entry, get_entry, is_retired_entry, trace_entry, Command, delete_entry, get_entry, is_retired_entry, trace_entry, Command,
}; };
use crate::rpc::command_maps::MAX_RPC_REPLY_BYTES; use crate::rpc::command_maps::MAX_RPC_REPLY_BYTES;
use crate::rpc::commands::bad_rpc_call;
use crate::rpc::read_bytes_from_stream; use crate::rpc::read_bytes_from_stream;
use crate::sled::Db; use crate::sled::Db;
use crate::wallets::structures::Wallet; use crate::wallets::structures::Wallet;
@ -15,11 +14,11 @@ use crate::TcpStream;
pub async fn route_reply( pub async fn route_reply(
connections_key: &str, connections_key: &str,
stream_locked: Arc<Mutex<TcpStream>>, stream_locked: Arc<Mutex<TcpStream>>,
db: &Db, _db: &Db,
wallet: Arc<Wallet>, _wallet: Arc<Wallet>,
map: Arc<Mutex<Command>>, map: Arc<Mutex<Command>>,
ip: &str, _ip: &str,
client_type: ClientType, _client_type: ClientType,
) -> Result<(), String> { ) -> Result<(), String> {
// Replies are command 255 packets: UID, payload length, then the // Replies are command 255 packets: UID, payload length, then the
// response bytes for the task waiting on that UID. // response bytes for the task waiting on that UID.
@ -30,7 +29,6 @@ pub async fn route_reply(
read_bytes_from_stream::read_u32_from_stream(connections_key, stream_locked.clone()).await? read_bytes_from_stream::read_u32_from_stream(connections_key, stream_locked.clone()).await?
as usize; as usize;
if message_length > MAX_RPC_REPLY_BYTES { if message_length > MAX_RPC_REPLY_BYTES {
bad_rpc_call::record(ip, client_type, db, wallet.clone()).await;
return Err(format!( return Err(format!(
"error: RPC reply payload too large: len={message_length} max={MAX_RPC_REPLY_BYTES}" "error: RPC reply payload too large: len={message_length} max={MAX_RPC_REPLY_BYTES}"
)); ));
@ -82,9 +80,6 @@ pub async fn route_reply(
warn!("[rpc] late reply arrived for retired uid: {uid:?}"); warn!("[rpc] late reply arrived for retired uid: {uid:?}");
} }
} else { } else {
// Unknown, never-reserved UIDs can still indicate malformed or
// forged traffic, so those keep counting as bad RPC behavior.
bad_rpc_call::record(ip, client_type, db, wallet).await;
warn!("[rpc] reply arrived for unknown uid: {uid:?}"); warn!("[rpc] reply arrived for unknown uid: {uid:?}");
} }
let _ = read_bytes_from_stream::read_usize_from_stream( let _ = read_bytes_from_stream::read_usize_from_stream(

View File

@ -15,13 +15,17 @@ use crate::common::types::{
}; };
use crate::records::memory::connections::get_client_type_from_memory; use crate::records::memory::connections::get_client_type_from_memory;
use crate::records::memory::enums::ClientType; use crate::records::memory::enums::ClientType;
use crate::records::memory::response_channels::generate_uid; use crate::records::memory::response_channels::{
reserve_transient_entry_with_context, Command,
};
use crate::rpc::command_maps::RPC_SUBMIT_TRANSACTION; use crate::rpc::command_maps::RPC_SUBMIT_TRANSACTION;
use crate::rpc::responses::RpcResponse; use crate::rpc::responses::RpcResponse;
use crate::sled::Db; use crate::sled::Db;
use crate::torrent::torrenting_system::get_nodes::get_nodes_from_memory; use crate::torrent::torrenting_system::get_nodes::get_nodes_from_memory;
use crate::Arc;
use crate::Mutex;
async fn broadcast_tx(tx_bytes: Vec<u8>) { async fn broadcast_tx(tx_bytes: Vec<u8>, map: Arc<Mutex<Command>>) {
// Broadcast newly accepted mempool transactions only to miner peers, // Broadcast newly accepted mempool transactions only to miner peers,
// since those are the nodes that need transaction fan-out most. // since those are the nodes that need transaction fan-out most.
let nodes = get_nodes_from_memory().await; let nodes = get_nodes_from_memory().await;
@ -34,7 +38,12 @@ async fn broadcast_tx(tx_bytes: Vec<u8>) {
continue; continue;
} }
let uid = generate_uid(); let uid = reserve_transient_entry_with_context(
map.clone(),
Some(RPC_SUBMIT_TRANSACTION),
Some(key.clone()),
)
.await;
let mut message = Vec::with_capacity(4 + tx_bytes.len()); let mut message = Vec::with_capacity(4 + tx_bytes.len());
// Reuse the original submit-transaction wire shape when // Reuse the original submit-transaction wire shape when
// rebroadcasting to miner peers: command, UID, transaction bytes. // rebroadcasting to miner peers: command, UID, transaction bytes.
@ -45,7 +54,12 @@ async fn broadcast_tx(tx_bytes: Vec<u8>) {
} }
} }
pub async fn save_and_submit(txtype: u8, tx: Vec<u8>, db: &Db) -> RpcResponse { pub async fn save_and_submit(
txtype: u8,
tx: Vec<u8>,
db: &Db,
map: Arc<Mutex<Command>>,
) -> RpcResponse {
// Decode, verify, add to the mempool, and then broadcast according to // Decode, verify, add to the mempool, and then broadcast according to
// the declared transaction type. // the declared transaction type.
// In each branch, a non-empty verifier response means the transaction // In each branch, a non-empty verifier response means the transaction
@ -64,7 +78,7 @@ pub async fn save_and_submit(txtype: u8, tx: Vec<u8>, db: &Db) -> RpcResponse {
match transfer.to_bytes().await { match transfer.to_bytes().await {
Ok(transfer_bytes) => match transfer.add_to_memory().await { Ok(transfer_bytes) => match transfer.add_to_memory().await {
Ok(_) => { Ok(_) => {
broadcast_tx(transfer_bytes).await; broadcast_tx(transfer_bytes, map.clone()).await;
let msg = let msg =
"successful_broadcast: true".to_string().as_bytes().to_vec(); "successful_broadcast: true".to_string().as_bytes().to_vec();
RpcResponse::Binary(msg) RpcResponse::Binary(msg)
@ -103,7 +117,7 @@ pub async fn save_and_submit(txtype: u8, tx: Vec<u8>, db: &Db) -> RpcResponse {
match create_token.to_bytes().await { match create_token.to_bytes().await {
Ok(token_bytes) => match create_token.add_to_memory().await { Ok(token_bytes) => match create_token.add_to_memory().await {
Ok(_) => { Ok(_) => {
broadcast_tx(token_bytes).await; broadcast_tx(token_bytes, map.clone()).await;
let msg = let msg =
"successful_broadcast: true".to_string().as_bytes().to_vec(); "successful_broadcast: true".to_string().as_bytes().to_vec();
RpcResponse::Binary(msg) RpcResponse::Binary(msg)
@ -142,7 +156,7 @@ pub async fn save_and_submit(txtype: u8, tx: Vec<u8>, db: &Db) -> RpcResponse {
match create_nft.to_bytes().await { match create_nft.to_bytes().await {
Ok(nft_bytes) => match create_nft.add_to_memory().await { Ok(nft_bytes) => match create_nft.add_to_memory().await {
Ok(_) => { Ok(_) => {
broadcast_tx(nft_bytes).await; broadcast_tx(nft_bytes, map.clone()).await;
let msg = let msg =
"successful_broadcast: true".to_string().as_bytes().to_vec(); "successful_broadcast: true".to_string().as_bytes().to_vec();
RpcResponse::Binary(msg) RpcResponse::Binary(msg)
@ -181,7 +195,7 @@ pub async fn save_and_submit(txtype: u8, tx: Vec<u8>, db: &Db) -> RpcResponse {
match burn.to_bytes().await { match burn.to_bytes().await {
Ok(burn_bytes) => match burn.add_to_memory().await { Ok(burn_bytes) => match burn.add_to_memory().await {
Ok(_) => { Ok(_) => {
broadcast_tx(burn_bytes).await; broadcast_tx(burn_bytes, map.clone()).await;
let msg = let msg =
"successful_broadcast: true".to_string().as_bytes().to_vec(); "successful_broadcast: true".to_string().as_bytes().to_vec();
RpcResponse::Binary(msg) RpcResponse::Binary(msg)
@ -220,7 +234,7 @@ pub async fn save_and_submit(txtype: u8, tx: Vec<u8>, db: &Db) -> RpcResponse {
match issue_token.to_bytes().await { match issue_token.to_bytes().await {
Ok(issue_token_bytes) => match issue_token.add_to_memory().await { Ok(issue_token_bytes) => match issue_token.add_to_memory().await {
Ok(_) => { Ok(_) => {
broadcast_tx(issue_token_bytes).await; broadcast_tx(issue_token_bytes, map.clone()).await;
let msg = let msg =
"successful_broadcast: true".to_string().as_bytes().to_vec(); "successful_broadcast: true".to_string().as_bytes().to_vec();
RpcResponse::Binary(msg) RpcResponse::Binary(msg)
@ -259,7 +273,7 @@ pub async fn save_and_submit(txtype: u8, tx: Vec<u8>, db: &Db) -> RpcResponse {
match marketing.to_bytes().await { match marketing.to_bytes().await {
Ok(marketing_bytes) => match marketing.add_to_memory().await { Ok(marketing_bytes) => match marketing.add_to_memory().await {
Ok(_) => { Ok(_) => {
broadcast_tx(marketing_bytes).await; broadcast_tx(marketing_bytes, map.clone()).await;
let msg = let msg =
"successful_broadcast: true".to_string().as_bytes().to_vec(); "successful_broadcast: true".to_string().as_bytes().to_vec();
RpcResponse::Binary(msg) RpcResponse::Binary(msg)
@ -298,7 +312,7 @@ pub async fn save_and_submit(txtype: u8, tx: Vec<u8>, db: &Db) -> RpcResponse {
match swap.to_bytes().await { match swap.to_bytes().await {
Ok(swap_bytes) => match swap.add_to_memory().await { Ok(swap_bytes) => match swap.add_to_memory().await {
Ok(_) => { Ok(_) => {
broadcast_tx(swap_bytes).await; broadcast_tx(swap_bytes, map.clone()).await;
let msg = let msg =
"successful_broadcast: true".to_string().as_bytes().to_vec(); "successful_broadcast: true".to_string().as_bytes().to_vec();
RpcResponse::Binary(msg) RpcResponse::Binary(msg)
@ -337,7 +351,7 @@ pub async fn save_and_submit(txtype: u8, tx: Vec<u8>, db: &Db) -> RpcResponse {
match loan.to_bytes().await { match loan.to_bytes().await {
Ok(loan_bytes) => match loan.add_to_memory().await { Ok(loan_bytes) => match loan.add_to_memory().await {
Ok(_) => { Ok(_) => {
broadcast_tx(loan_bytes).await; broadcast_tx(loan_bytes, map.clone()).await;
let msg = let msg =
"successful_broadcast: true".to_string().as_bytes().to_vec(); "successful_broadcast: true".to_string().as_bytes().to_vec();
RpcResponse::Binary(msg) RpcResponse::Binary(msg)
@ -376,7 +390,7 @@ pub async fn save_and_submit(txtype: u8, tx: Vec<u8>, db: &Db) -> RpcResponse {
match payment.to_bytes().await { match payment.to_bytes().await {
Ok(payment_bytes) => match payment.add_to_memory().await { Ok(payment_bytes) => match payment.add_to_memory().await {
Ok(_) => { Ok(_) => {
broadcast_tx(payment_bytes).await; broadcast_tx(payment_bytes, map.clone()).await;
let msg = let msg =
"successful_broadcast: true".to_string().as_bytes().to_vec(); "successful_broadcast: true".to_string().as_bytes().to_vec();
RpcResponse::Binary(msg) RpcResponse::Binary(msg)
@ -415,7 +429,7 @@ pub async fn save_and_submit(txtype: u8, tx: Vec<u8>, db: &Db) -> RpcResponse {
match collateral.to_bytes().await { match collateral.to_bytes().await {
Ok(collateral_bytes) => match collateral.add_to_memory().await { Ok(collateral_bytes) => match collateral.add_to_memory().await {
Ok(_) => { Ok(_) => {
broadcast_tx(collateral_bytes).await; broadcast_tx(collateral_bytes, map.clone()).await;
let msg = let msg =
"successful_broadcast: true".to_string().as_bytes().to_vec(); "successful_broadcast: true".to_string().as_bytes().to_vec();
RpcResponse::Binary(msg) RpcResponse::Binary(msg)
@ -454,7 +468,7 @@ pub async fn save_and_submit(txtype: u8, tx: Vec<u8>, db: &Db) -> RpcResponse {
match vanity.to_bytes().await { match vanity.to_bytes().await {
Ok(vanity_bytes) => match vanity.add_to_memory().await { Ok(vanity_bytes) => match vanity.add_to_memory().await {
Ok(_) => { Ok(_) => {
broadcast_tx(vanity_bytes).await; broadcast_tx(vanity_bytes, map.clone()).await;
let msg = let msg =
"successful_broadcast: true".to_string().as_bytes().to_vec(); "successful_broadcast: true".to_string().as_bytes().to_vec();
RpcResponse::Binary(msg) RpcResponse::Binary(msg)

View File

@ -1,16 +1,13 @@
use crate::common::skein::skein_256_hash_bytes; use crate::common::skein::skein_256_hash_bytes;
use crate::decode; use crate::decode;
use crate::log::warn;
use crate::records::memory::connections::CONNECTIONS; use crate::records::memory::connections::CONNECTIONS;
use crate::records::memory::response_channels::{reserve_entry, Command}; use crate::records::memory::response_channels::{reserve_transient_entry_with_context, Command};
use crate::records::wallet_registry::{register_short_address, WalletRegistrationResult}; use crate::records::wallet_registry::{register_short_address, WalletRegistrationResult};
use crate::rpc::command_maps::RPC_REGISTER_WALLET; use crate::rpc::command_maps::RPC_REGISTER_WALLET;
use crate::rpc::responses::RpcResponse; use crate::rpc::responses::RpcResponse;
use crate::sled::Db; use crate::sled::Db;
use crate::timeout;
use crate::wallets::structures::Wallet; use crate::wallets::structures::Wallet;
use crate::Arc; use crate::Arc;
use crate::Duration;
use crate::Mutex; use crate::Mutex;
async fn broadcast_wallet_registration( async fn broadcast_wallet_registration(
@ -43,7 +40,12 @@ async fn broadcast_wallet_registration(
} }
} }
let (hashmap_key, _hashmap_tx, hashmap_rx) = reserve_entry(map.clone()).await; let hashmap_key = reserve_transient_entry_with_context(
map.clone(),
Some(RPC_REGISTER_WALLET),
Some(peer_key.clone()),
)
.await;
let mut message = Vec::with_capacity( let mut message = Vec::with_capacity(
1 + 3 1 + 3
+ Wallet::SHORT_ADDRESS_BYTES_LENGTH + Wallet::SHORT_ADDRESS_BYTES_LENGTH
@ -58,15 +60,6 @@ async fn broadcast_wallet_registration(
message.extend_from_slice(&signature_bytes); message.extend_from_slice(&signature_bytes);
RpcResponse::send_raw(&unlocked_stream, Some(&peer_key), &message).await; RpcResponse::send_raw(&unlocked_stream, Some(&peer_key), &message).await;
let response = {
let mut rx = hashmap_rx.lock().await;
timeout(Duration::from_secs(5), rx.recv()).await
};
if response.is_err() {
warn!("[wallet_registry] timed out waiting for rebroadcast acknowledgement");
}
} }
} }

View File

@ -28,7 +28,7 @@ pub async fn write_to_memory(
received_ip_port: &str, received_ip_port: &str,
stream: Arc<Mutex<TcpStream>>, stream: Arc<Mutex<TcpStream>>,
client_type: &str, client_type: &str,
wallet_public_key: Vec<u8>, wallet_short_address: String,
command_map: Arc<Mutex<Command>>, command_map: Arc<Mutex<Command>>,
) -> String { ) -> String {
// Reject unknown connection labels before the connection manager is // Reject unknown connection labels before the connection manager is
@ -52,7 +52,7 @@ pub async fn write_to_memory(
port, port,
stream: stream.clone(), stream: stream.clone(),
client_type, client_type,
wallet_public_key, wallet_short_address,
command_map, command_map,
}); });

View File

@ -15,7 +15,7 @@ use crate::records::memory::network_mapping::NodeInfo;
use crate::records::memory::response_channels::generate_uid; use crate::records::memory::response_channels::generate_uid;
use crate::records::memory::response_channels::Command; use crate::records::memory::response_channels::Command;
use crate::records::memory::structs::Connection; use crate::records::memory::structs::Connection;
use crate::records::wallet_registry::{register_short_address, WalletRegistrationResult}; use crate::rpc::client::register_wallet::register_connected_wallet;
use crate::rpc::client::syncing::node_syncing; use crate::rpc::client::syncing::node_syncing;
use crate::rpc::client::wallet_registry_sync::sync_wallet_registry_with_retries; use crate::rpc::client::wallet_registry_sync::sync_wallet_registry_with_retries;
use crate::rpc::responses::RpcResponse; use crate::rpc::responses::RpcResponse;
@ -197,6 +197,20 @@ async fn complete_incoming_miner_setup(
} }
mark_peer_wallet_registry_synced(connections_key).await; mark_peer_wallet_registry_synced(connections_key).await;
if let Err(err) = register_connected_wallet(
stream.clone(),
db,
map.clone(),
connections_key.to_string(),
&wallet,
)
.await
{
error!("[startup] incoming peer wallet registration failed: {err}");
remove_key_from_memory(connections_key).await;
return;
}
let short_address = wallet.saved.short_address.clone(); let short_address = wallet.saved.short_address.clone();
if let Err(err) = announce_self_to_network( if let Err(err) = announce_self_to_network(
stream.clone(), stream.clone(),
@ -233,6 +247,7 @@ async fn complete_incoming_miner_setup(
if operational { if operational {
mark_peer_operational(connections_key, map.clone()).await; mark_peer_operational(connections_key, map.clone()).await;
sleep(Duration::from_secs(15)).await;
set_node_mode(NodeMode::Normal); set_node_mode(NodeMode::Normal);
clear_mining_stop_request(); clear_mining_stop_request();
set_mining_state(MiningState::Idle); set_mining_state(MiningState::Idle);
@ -370,27 +385,19 @@ pub async fn handle_handshake(
drop_failed_handshake(&stream).await; drop_failed_handshake(&stream).await;
return; return;
}; };
let Some(received_short_address) =
match register_short_address( Wallet::bytes_to_short_address(&received_short_address_bytes)
&db, else {
&received_short_address_bytes, drop_failed_handshake(&stream).await;
&received_public_key_bytes, return;
) { };
Ok(
WalletRegistrationResult::Inserted | WalletRegistrationResult::AlreadyRegistered,
) => {}
Ok(WalletRegistrationResult::Conflict) | Err(_) => {
drop_failed_handshake(&stream).await;
return;
}
}
// write to memory // write to memory
let connections_key = write_to_memory( let connections_key = write_to_memory(
&received_ip, &received_ip,
stream.clone(), stream.clone(),
connection_type, connection_type,
received_public_key_bytes, received_short_address,
map.clone(), map.clone(),
) )
.await; .await;

View File

@ -68,7 +68,7 @@ pub async fn start_loop(
) )
.await?; .await?;
let result = commands::random_node::request_node(&connections_key).await; let result = commands::random_node::request_node(&db, &connections_key).await;
result result
.send(&stream_locked, Some(&connections_key), uid) .send(&stream_locked, Some(&connections_key), uid)
.await; .await;
@ -176,7 +176,8 @@ pub async fn start_loop(
) )
.await?; .await?;
let result = commands::tx_submit::save_and_submit(txtype, tx, &db).await; let result =
commands::tx_submit::save_and_submit(txtype, tx, &db, map.clone()).await;
result result
.send(&stream_locked, Some(&connections_key), uid) .send(&stream_locked, Some(&connections_key), uid)
.await; .await;

View File

@ -2,7 +2,7 @@ use crate::blocks::block::{NONCE_OFFSET, VRF_OFFSET};
use crate::common::network_paths_and_settings::block_extension_and_paths; use crate::common::network_paths_and_settings::block_extension_and_paths;
use crate::common::skein::skein_128_hash_bytes; use crate::common::skein::skein_128_hash_bytes;
use crate::log::error; use crate::log::error;
use crate::records::memory::response_channels::{reserve_entry, Command}; use crate::records::memory::response_channels::{reserve_transient_entry_with_context, Command};
use crate::rpc::command_maps::RPC_SUBMIT_TORRENT; use crate::rpc::command_maps::RPC_SUBMIT_TORRENT;
use crate::rpc::responses::RpcResponse; use crate::rpc::responses::RpcResponse;
use crate::torrent::structs::{Info, Torrent}; use crate::torrent::structs::{Info, Torrent};
@ -156,8 +156,13 @@ pub async fn broadcast_new_torrent_to_peers(
// Send the torrent to all currently connected miner peers. // Send the torrent to all currently connected miner peers.
let peers = get_torrent_broadcast_nodes_from_memory().await; let peers = get_torrent_broadcast_nodes_from_memory().await;
for (connections_key, stream) in peers { for (connections_key, stream) in peers {
// Each peer needs its own reply mapping entry and UID. // Each peer gets a short-lived reply mapping so the ack can be drained.
let (uid_bytes, _tx, _rx) = reserve_entry(map.clone()).await; let uid_bytes = reserve_transient_entry_with_context(
map.clone(),
Some(RPC_SUBMIT_TORRENT),
Some(connections_key.clone()),
)
.await;
let mut message = Vec::with_capacity(1 + 3 + 4 + 4 + torrent_bytes.len()); let mut message = Vec::with_capacity(1 + 3 + 4 + 4 + torrent_bytes.len());
message.push(command); // Command byte message.push(command); // Command byte