450 lines
15 KiB
Rust
450 lines
15 KiB
Rust
use crate::common::check_genesis::genesis_checkup;
|
|
use crate::log::{error, warn};
|
|
use crate::miner::flag::{
|
|
clear_mining_stop_request, request_mining_stop, set_mining_state, set_node_mode, MiningState,
|
|
NodeMode,
|
|
};
|
|
use crate::orphans::structs::OrphanCheckup2;
|
|
use crate::orphans::sync_check::sync_checkup;
|
|
use crate::orphans::torrent_candidates::hydrate_torrent_candidates;
|
|
use crate::records::block_height::get_block_height::get_height;
|
|
use crate::records::memory::connections::{
|
|
mark_peer_network_map_synced, mark_peer_operational, mark_peer_wallet_registry_synced,
|
|
};
|
|
use crate::records::memory::network_mapping::NodeInfo;
|
|
use crate::records::memory::response_channels::generate_uid;
|
|
use crate::records::memory::response_channels::Command;
|
|
use crate::records::memory::structs::Connection;
|
|
use crate::rpc::client::register_wallet::register_connected_wallet;
|
|
use crate::rpc::client::syncing::node_syncing;
|
|
use crate::rpc::client::wallet_registry_sync::sync_wallet_registry_with_retries;
|
|
use crate::rpc::responses::RpcResponse;
|
|
use crate::rpc::server::connection_memory_manager::{remove_key_from_memory, write_to_memory};
|
|
use crate::rpc::server::handshake_processing::{combine_and_send_data, parse_received_data};
|
|
use crate::rpc::server::handshake_verifications::{connection_count, perform_handshake_tests};
|
|
use crate::rpc::server::structs::{CombineAndSendDataParams, HandshakeTestParams};
|
|
use crate::rpc::server::tests::{endpoint_port, is_port_open};
|
|
use crate::sled::Db;
|
|
use crate::sleep;
|
|
use crate::startup::network_broadcast::announce_self_to_network;
|
|
use crate::startup::remote_height::request_remote_height;
|
|
use crate::wallets::structures::Wallet;
|
|
use crate::Arc;
|
|
use crate::AsyncWriteExt;
|
|
use crate::Duration;
|
|
use crate::Mutex;
|
|
use crate::Settings;
|
|
use crate::TcpStream;
|
|
use crate::Utc;
|
|
|
|
async fn drop_failed_handshake(stream: &Arc<Mutex<TcpStream>>) {
|
|
// Failed handshakes are never stored in connection memory, but the
|
|
// accepted TCP socket should still be closed immediately.
|
|
let mut stream_guard = stream.lock().await;
|
|
let _ = stream_guard.flush().await;
|
|
let _ = stream_guard.shutdown().await;
|
|
}
|
|
|
|
async fn get_connection_counts() -> (u8, u8) {
|
|
// Handshake limits come from settings so the node can change its
|
|
// connection policy without recompiling.
|
|
let settings = Settings::load().expect("Failed to load settings");
|
|
let incoming = settings.incoming_connections;
|
|
let outgoing = settings.outgoing_connections;
|
|
(incoming, outgoing)
|
|
}
|
|
|
|
async fn sync_incoming_peer_before_operational(
|
|
stream: Arc<Mutex<TcpStream>>,
|
|
db: &Db,
|
|
wallet: Arc<Wallet>,
|
|
map: Arc<Mutex<Command>>,
|
|
connections_key: &str,
|
|
) -> Result<bool, String> {
|
|
let local_height = get_height(db);
|
|
let remote_height =
|
|
request_remote_height(stream.clone(), map.clone(), connections_key.to_string()).await?;
|
|
let local_genesis_exists = genesis_checkup().await;
|
|
|
|
if local_genesis_exists && remote_height < local_height {
|
|
warn!(
|
|
"[startup] incoming peer is behind local chain and will remain non-operational until it catches up: local_height={local_height} remote_height={remote_height}"
|
|
);
|
|
return Ok(false);
|
|
}
|
|
|
|
if !local_genesis_exists || remote_height > local_height + 10 {
|
|
set_node_mode(NodeMode::Syncing);
|
|
request_mining_stop();
|
|
set_mining_state(MiningState::Idle);
|
|
|
|
node_syncing(
|
|
stream.clone(),
|
|
db,
|
|
remote_height,
|
|
map.clone(),
|
|
true,
|
|
wallet.clone(),
|
|
connections_key.to_string(),
|
|
)
|
|
.await
|
|
.map_err(|err| format!("incoming sync error: {err}"))?;
|
|
|
|
if !local_genesis_exists && !genesis_checkup().await && remote_height > 0 {
|
|
return Err("incoming sync completed without obtaining remote genesis".to_string());
|
|
}
|
|
}
|
|
|
|
let post_sync_local_height = get_height(db);
|
|
let post_sync_remote_height =
|
|
request_remote_height(stream.clone(), map.clone(), connections_key.to_string()).await?;
|
|
|
|
if let Err(err) = NodeInfo::rebuild_mined_counts_from_chain(db).await {
|
|
error!("[startup] failed to rebuild mined counts before incoming orphan check: {err}");
|
|
}
|
|
|
|
let imported_candidates = match hydrate_torrent_candidates(
|
|
stream.clone(),
|
|
map.clone(),
|
|
connections_key.to_string(),
|
|
)
|
|
.await
|
|
{
|
|
Ok(imported) => {
|
|
if imported > 0 {
|
|
warn!(
|
|
"[sync] hydrated {imported} torrent candidates before incoming post-sync orphan check"
|
|
);
|
|
}
|
|
imported
|
|
}
|
|
Err(err) => {
|
|
warn!("[sync] failed to hydrate incoming torrent candidates: {err}");
|
|
0
|
|
}
|
|
};
|
|
|
|
if post_sync_remote_height > post_sync_local_height || imported_candidates > 0 {
|
|
let orphan_checkup_params = OrphanCheckup2 {
|
|
stream: stream.clone(),
|
|
db: db.clone(),
|
|
local_height: post_sync_local_height,
|
|
remote_height: post_sync_remote_height,
|
|
recheck_from_height: Some(post_sync_local_height.min(post_sync_remote_height)),
|
|
map: map.clone(),
|
|
node_syncing: true,
|
|
connections_key: connections_key.to_string(),
|
|
};
|
|
sync_checkup(orphan_checkup_params, wallet)
|
|
.await
|
|
.map_err(|err| format!("Incoming post-sync orphan check error: {err}"))?;
|
|
}
|
|
|
|
Ok(true)
|
|
}
|
|
|
|
fn spawn_incoming_peer_promotion_watcher(
|
|
stream: Arc<Mutex<TcpStream>>,
|
|
db: Db,
|
|
map: Arc<Mutex<Command>>,
|
|
connections_key: String,
|
|
) {
|
|
tokio::spawn(async move {
|
|
loop {
|
|
sleep(Duration::from_secs(5)).await;
|
|
|
|
if Connection::get_stream_from_memory(&connections_key)
|
|
.await
|
|
.is_none()
|
|
{
|
|
break;
|
|
}
|
|
|
|
let local_height = get_height(&db);
|
|
let remote_height =
|
|
match request_remote_height(stream.clone(), map.clone(), connections_key.clone())
|
|
.await
|
|
{
|
|
Ok(height) => height,
|
|
Err(_) => continue,
|
|
};
|
|
|
|
if remote_height >= local_height {
|
|
mark_peer_operational(&connections_key, map.clone()).await;
|
|
break;
|
|
}
|
|
}
|
|
});
|
|
}
|
|
|
|
async fn complete_incoming_miner_setup(
|
|
stream: Arc<Mutex<TcpStream>>,
|
|
db: &Db,
|
|
wallet: Arc<Wallet>,
|
|
map: Arc<Mutex<Command>>,
|
|
connections_key: &str,
|
|
) {
|
|
// Incoming miner handshakes need the same post-handshake state exchange
|
|
// as outgoing handshakes so a bootstrap with only incoming peers can mine.
|
|
if let Err(err) = sync_wallet_registry_with_retries(
|
|
stream.clone(),
|
|
db,
|
|
map.clone(),
|
|
connections_key.to_string(),
|
|
"incoming peer",
|
|
)
|
|
.await
|
|
{
|
|
error!("[startup] incoming peer wallet registry sync failed: {err}");
|
|
remove_key_from_memory(connections_key).await;
|
|
return;
|
|
}
|
|
mark_peer_wallet_registry_synced(connections_key).await;
|
|
|
|
if let Err(err) = register_connected_wallet(
|
|
stream.clone(),
|
|
db,
|
|
map.clone(),
|
|
connections_key.to_string(),
|
|
&wallet,
|
|
)
|
|
.await
|
|
{
|
|
error!("[startup] incoming peer wallet registration failed: {err}");
|
|
remove_key_from_memory(connections_key).await;
|
|
return;
|
|
}
|
|
|
|
let short_address = wallet.saved.short_address.clone();
|
|
if let Err(err) = announce_self_to_network(
|
|
stream.clone(),
|
|
&short_address,
|
|
map.clone(),
|
|
db,
|
|
wallet.clone(),
|
|
connections_key,
|
|
)
|
|
.await
|
|
{
|
|
error!("[startup] incoming peer network map sync failed: {err}");
|
|
remove_key_from_memory(connections_key).await;
|
|
return;
|
|
}
|
|
mark_peer_network_map_synced(connections_key).await;
|
|
|
|
let operational = match sync_incoming_peer_before_operational(
|
|
stream.clone(),
|
|
db,
|
|
wallet.clone(),
|
|
map.clone(),
|
|
connections_key,
|
|
)
|
|
.await
|
|
{
|
|
Ok(operational) => operational,
|
|
Err(err) => {
|
|
error!("[startup] incoming peer chain sync failed: {err}");
|
|
remove_key_from_memory(connections_key).await;
|
|
return;
|
|
}
|
|
};
|
|
|
|
if operational {
|
|
mark_peer_operational(connections_key, map.clone()).await;
|
|
sleep(Duration::from_secs(15)).await;
|
|
set_node_mode(NodeMode::Normal);
|
|
clear_mining_stop_request();
|
|
set_mining_state(MiningState::Idle);
|
|
} else {
|
|
spawn_incoming_peer_promotion_watcher(
|
|
stream.clone(),
|
|
db.clone(),
|
|
map.clone(),
|
|
connections_key.to_string(),
|
|
);
|
|
}
|
|
|
|
if let Err(err) = NodeInfo::rebuild_mined_counts_from_chain(db).await {
|
|
error!("[startup] failed to rebuild mined counts after incoming handshake: {err}");
|
|
}
|
|
}
|
|
|
|
// this function validates incoming handshake and determined
|
|
// what type of connection was made
|
|
pub async fn handle_handshake(
|
|
stream: Arc<Mutex<TcpStream>>,
|
|
db: Db,
|
|
wallet: Arc<Wallet>,
|
|
map: Arc<Mutex<Command>>,
|
|
) {
|
|
// read number of connected clients or set to 0 if none
|
|
let count = connection_count().await;
|
|
|
|
// Only incoming capacity matters here; outgoing is loaded with the
|
|
// same settings call but enforced by the connection starter.
|
|
let (incoming_connections, _outgoing_connection) = get_connection_counts().await;
|
|
|
|
// get data from stream
|
|
let Ok((
|
|
received_message,
|
|
received_signed_message,
|
|
received_public_key,
|
|
hash,
|
|
received_ip,
|
|
peer_time,
|
|
)) = parse_received_data(stream.clone()).await
|
|
else {
|
|
return;
|
|
};
|
|
|
|
// get local timestamp
|
|
let timestamp = Utc::now().timestamp() as u32;
|
|
|
|
// received message should be "aced"
|
|
// aced is used instead of ping and pong
|
|
// as its HEX and compressed better
|
|
if received_message == "aced" {
|
|
//validate handshake tests
|
|
if !perform_handshake_tests(HandshakeTestParams {
|
|
map: map.clone(),
|
|
stream: stream.clone(),
|
|
count,
|
|
peer_time,
|
|
timestamp,
|
|
incoming_connections,
|
|
hash: &hash,
|
|
received_signed_message: &received_signed_message,
|
|
received_address: &received_public_key,
|
|
received_ip: &received_ip,
|
|
})
|
|
.await
|
|
{
|
|
// Each failed test sends its own error response, so the
|
|
// handshake can stop here without writing another message.
|
|
drop_failed_handshake(&stream).await;
|
|
return;
|
|
}
|
|
|
|
// Port 0 is the explicit client marker. A node advertising a
|
|
// nonzero miner port must actually be reachable before it can be
|
|
// stored in connection memory or the network map.
|
|
let Some(advertised_port) = endpoint_port(&received_ip) else {
|
|
let hashmap_key = generate_uid();
|
|
let padded_bytes = [hashmap_key[0], hashmap_key[1], hashmap_key[2], 0];
|
|
let uid = u32::from_le_bytes(padded_bytes);
|
|
let response_bytes =
|
|
RpcResponse::Binary("error: Invalid advertised endpoint.".as_bytes().to_vec());
|
|
response_bytes.send(&stream, None, uid).await;
|
|
drop_failed_handshake(&stream).await;
|
|
return;
|
|
};
|
|
|
|
let connection_type = if advertised_port == 0 {
|
|
"client"
|
|
} else if is_port_open(&received_ip).await.unwrap_or(false) {
|
|
"miner"
|
|
} else {
|
|
let hashmap_key = generate_uid();
|
|
let padded_bytes = [hashmap_key[0], hashmap_key[1], hashmap_key[2], 0];
|
|
let uid = u32::from_le_bytes(padded_bytes);
|
|
let response_bytes = RpcResponse::Binary(
|
|
"error: Handshake failed: advertised miner port is not reachable."
|
|
.as_bytes()
|
|
.to_vec(),
|
|
);
|
|
response_bytes.send(&stream, None, uid).await;
|
|
drop_failed_handshake(&stream).await;
|
|
return;
|
|
};
|
|
|
|
if connection_type == "miner" {
|
|
let miner_reserved_limit = incoming_connections.saturating_sub(1) as usize;
|
|
let current_count = connection_count().await;
|
|
if current_count >= miner_reserved_limit {
|
|
let hashmap_key = generate_uid();
|
|
let padded_bytes = [hashmap_key[0], hashmap_key[1], hashmap_key[2], 0];
|
|
let uid = u32::from_le_bytes(padded_bytes);
|
|
let response_bytes = RpcResponse::Binary(
|
|
"error: Miner connection slots are filled. Please try again later."
|
|
.as_bytes()
|
|
.to_vec(),
|
|
);
|
|
response_bytes.send(&stream, None, uid).await;
|
|
drop_failed_handshake(&stream).await;
|
|
return;
|
|
}
|
|
}
|
|
|
|
let received_public_key_bytes = match crate::decode(&received_public_key) {
|
|
Ok(bytes) if bytes.len() == Wallet::PUBLIC_KEY_LENGTH => bytes,
|
|
_ => {
|
|
drop_failed_handshake(&stream).await;
|
|
return;
|
|
}
|
|
};
|
|
|
|
let Some(received_short_address_bytes) =
|
|
Wallet::public_key_bytes_to_short_address_bytes(&received_public_key_bytes)
|
|
else {
|
|
drop_failed_handshake(&stream).await;
|
|
return;
|
|
};
|
|
let Some(received_short_address) =
|
|
Wallet::bytes_to_short_address(&received_short_address_bytes)
|
|
else {
|
|
drop_failed_handshake(&stream).await;
|
|
return;
|
|
};
|
|
|
|
// write to memory
|
|
let connections_key = write_to_memory(
|
|
&received_ip,
|
|
stream.clone(),
|
|
connection_type,
|
|
received_short_address,
|
|
map.clone(),
|
|
)
|
|
.await;
|
|
|
|
if connections_key != "false" {
|
|
// Once the peer is accepted into memory, return our signed
|
|
// handshake response and start the long-lived RPC loop.
|
|
let is_miner = connection_type == "miner";
|
|
let post_handshake_stream = stream.clone();
|
|
let post_handshake_map = map.clone();
|
|
let post_handshake_wallet = wallet.clone();
|
|
let post_handshake_connections_key = connections_key.clone();
|
|
let params = CombineAndSendDataParams {
|
|
stream,
|
|
db: db.clone(),
|
|
connections_key,
|
|
connection_type: connection_type.to_string(),
|
|
wallet: wallet.clone(),
|
|
map,
|
|
returned_address: received_public_key.clone(),
|
|
};
|
|
if combine_and_send_data(params).await.is_ok() && is_miner {
|
|
complete_incoming_miner_setup(
|
|
post_handshake_stream,
|
|
&db,
|
|
post_handshake_wallet,
|
|
post_handshake_map,
|
|
&post_handshake_connections_key,
|
|
)
|
|
.await;
|
|
}
|
|
} else {
|
|
drop_failed_handshake(&stream).await;
|
|
}
|
|
} else {
|
|
let response_bytes = RpcResponse::Binary({
|
|
"error: Invalid Handshake: Signature Failed"
|
|
.to_string()
|
|
.as_bytes()
|
|
.to_vec()
|
|
});
|
|
response_bytes.send(&stream, None, 0).await;
|
|
drop_failed_handshake(&stream).await;
|
|
}
|
|
}
|