Contractless/src/rpc/client/handshake_processing.rs

422 lines
15 KiB
Rust

use crate::common::binary_conversions::binary_to_ip_port;
use crate::common::check_genesis::genesis_checkup;
use crate::common::network_startup::get_ip_and_port;
use crate::common::skein::skein_256_hash_data;
use crate::config::SETTINGS;
use crate::encode;
use crate::io;
use crate::log::{error, info, warn};
use crate::miner::flag::{
clear_mining_stop_request, request_mining_stop, set_mining_state, set_node_mode, MiningState,
NodeMode,
};
use crate::orphans::structs::OrphanCheckup2;
use crate::orphans::sync_check::sync_checkup;
use crate::orphans::torrent_candidates::hydrate_torrent_candidates;
use crate::records::block_height::get_block_height::get_height;
use crate::records::memory::connections::{
mark_peer_network_map_synced, mark_peer_operational, mark_peer_wallet_registry_synced,
set_reconnect_context, startup_synced_peer_streams, CONNECTIONS,
};
use crate::records::memory::enums::{ClientType, ConnectionType};
use crate::records::memory::network_mapping::NodeInfo;
use crate::records::memory::response_channels::{reserve_entry, Command};
use crate::records::memory::structs::{Connection, StoreConnectionParams};
use crate::records::wallet_registry::{register_short_address, WalletRegistrationResult};
use crate::rpc::client::handshake::connect_and_handshake;
use crate::rpc::client::structs::{Connect, Handshake};
use crate::rpc::client::syncing::node_syncing;
use crate::rpc::client::wallet_registry_sync::sync_wallet_registry_with_retries;
use crate::rpc::command_maps::RPC_RANDOM_NODE;
use crate::rpc::handshake_constants::{
HANDSHAKE_ADDRESS_OFFSET, HANDSHAKE_MESSAGE_BYTES, HANDSHAKE_RESPONSE_BYTES,
HANDSHAKE_SIGNATURE_OFFSET,
};
use crate::rpc::responses::RpcResponse;
use crate::rpc::server::connection_memory_manager::remove_key_from_memory;
use crate::rpc::server::rpc_command_loop::start_loop;
use crate::sled::Db;
use crate::startup::network_broadcast::announce_self_to_network;
use crate::startup::remote_height::request_remote_height;
use crate::timeout;
use crate::wallets::structures::Wallet;
use crate::Arc;
use crate::Duration;
use crate::Mutex;
use crate::SocketAddr;
use crate::TcpStream;
#[derive(Clone)]
pub struct BootstrapParams {
pub stream: Arc<Mutex<TcpStream>>,
pub connections_key: String,
pub wallet: Arc<Wallet>,
pub db: Db,
pub map: Arc<Mutex<Command>>,
pub first: bool,
}
pub fn spawn_bootstrap_peer_discovery(params: BootstrapParams) {
tokio::spawn(async move {
if let Err(e) = bootstrap_peer_discovery(params).await {
set_node_mode(NodeMode::Normal);
clear_mining_stop_request();
set_mining_state(MiningState::Idle);
eprintln!("[bootstrap] error: {e}");
}
});
}
pub async fn bootstrap_peer_discovery(mut params: BootstrapParams) -> Result<(), String> {
set_node_mode(NodeMode::Syncing);
request_mining_stop();
let (_, _, local_endpoint) = get_ip_and_port().await;
let max = SETTINGS.outgoing_connections;
let mut no_progress_count = 0;
let max_no_progress = 3;
let mut current_key = params.connections_key.clone();
let mut stream = params.stream;
params.first = false;
while no_progress_count < max_no_progress {
let outgoing_connections = {
let connections = CONNECTIONS.read().await;
connections
.as_ref()
.map(|connection| connection.count_outgoing_connections())
.unwrap_or(0)
};
if outgoing_connections >= max as usize {
break;
}
let (hashmap_key, _tx, rx) = reserve_entry(params.map.clone()).await;
let mut payload = vec![RPC_RANDOM_NODE];
payload.extend_from_slice(&hashmap_key);
RpcResponse::send_raw(&stream, Some(&current_key), &payload).await;
let mut rx = rx.lock().await;
let buffer = match timeout(Duration::from_secs(15), rx.recv()).await {
Ok(Some(buf)) => buf,
Ok(None) => return Err("Peer discovery channel closed".into()),
Err(_) => return Err("Timeout waiting for peer discovery response".into()),
};
if buffer.len() != 18 {
no_progress_count += 1;
continue;
}
let addr_string = binary_to_ip_port(&buffer[0..18]);
if addr_string == local_endpoint {
no_progress_count += 1;
continue;
}
if Connection::get_stream_from_memory(&addr_string)
.await
.is_some()
{
no_progress_count += 1;
continue;
}
let socket_addr: SocketAddr = match addr_string.parse() {
Ok(addr) => addr,
Err(_) => {
warn!("Invalid discovered peer from {current_key}: {addr_string}");
no_progress_count += 1;
continue;
}
};
let connect = Connect {
addr: socket_addr,
node_ip: addr_string.clone(),
wallet: params.wallet.clone(),
db: params.db.clone(),
map: params.map.clone(),
first: params.first,
};
if let Err(err) = connect_and_handshake(connect).await {
warn!("Failed to connect to discovered peer {addr_string}: {err}");
no_progress_count += 1;
continue;
}
if let Some(new_stream) = Connection::get_stream_from_memory(&addr_string).await {
stream = new_stream;
current_key = addr_string;
no_progress_count = 0;
} else {
warn!("Failed to retrieve new stream for: {addr_string}");
no_progress_count += 1;
}
}
loop {
let local_height = get_height(&params.db);
let remote_height =
request_remote_height(stream.clone(), params.map.clone(), current_key.clone()).await?;
let local_genesis_exists = genesis_checkup().await;
if !local_genesis_exists || remote_height > local_height + 10 {
info!("[sync] Starting sync from {local_height} to {remote_height}");
node_syncing(
stream.clone(),
&params.db,
remote_height,
params.map.clone(),
true,
params.wallet.clone(),
current_key.clone(),
)
.await
.map_err(|e| format!("Sync error: {e}"))?;
if !local_genesis_exists && !genesis_checkup().await && remote_height > 0 {
return Err("Sync completed without obtaining remote genesis".to_string());
}
if !local_genesis_exists && !genesis_checkup().await {
break;
}
continue;
}
break;
}
let post_sync_local_height = get_height(&params.db);
let post_sync_remote_height =
request_remote_height(stream.clone(), params.map.clone(), current_key.clone()).await?;
let imported_candidates =
match hydrate_torrent_candidates(stream.clone(), params.map.clone(), current_key.clone())
.await
{
Ok(imported) => {
if imported > 0 {
warn!(
"[sync] hydrated {imported} torrent candidates before post-sync orphan check"
);
}
imported
}
Err(err) => {
warn!("[sync] failed to hydrate torrent candidates: {err}");
0
}
};
if post_sync_remote_height != post_sync_local_height || imported_candidates > 0 {
let orphan_checkup_params = OrphanCheckup2 {
stream: stream.clone(),
db: params.db.clone(),
local_height: post_sync_local_height,
remote_height: post_sync_remote_height,
recheck_from_height: Some(post_sync_local_height.min(post_sync_remote_height)),
map: params.map.clone(),
node_syncing: true,
connections_key: current_key.clone(),
};
match sync_checkup(orphan_checkup_params, params.wallet.clone()).await {
Ok(()) => {}
Err(err) => warn!("[sync] Post-sync orphan check error: {err}"),
}
}
info!("[sync] post-sync checks complete, mining resuming");
if let Err(err) = NodeInfo::rebuild_mined_counts_from_chain(&params.db).await {
error!("[startup] failed to rebuild mined counts after startup sync: {err}");
}
for (peer_key, _) in startup_synced_peer_streams().await {
mark_peer_operational(&peer_key, params.map.clone()).await;
}
set_node_mode(NodeMode::Normal);
clear_mining_stop_request();
set_mining_state(MiningState::Idle);
Ok(())
}
pub async fn process_handshake_response(
response: Vec<u8>,
wallet: &Wallet,
params: Handshake,
) -> io::Result<()> {
if response.len() != HANDSHAKE_RESPONSE_BYTES {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!(
"Invalid handshake response length: expected {}, got {}",
HANDSHAKE_RESPONSE_BYTES,
response.len()
),
));
}
let returned_message_bin = &response[..HANDSHAKE_MESSAGE_BYTES];
let returned_signed_bin = &response[HANDSHAKE_SIGNATURE_OFFSET..HANDSHAKE_ADDRESS_OFFSET];
let returned_public_key_bin = &response[HANDSHAKE_ADDRESS_OFFSET..HANDSHAKE_RESPONSE_BYTES];
let returned_message = encode(returned_message_bin);
let returned_signed_message = encode(returned_signed_bin);
if returned_public_key_bin.len() != Wallet::PUBLIC_KEY_LENGTH {
return Ok(());
}
let hash = skein_256_hash_data(&returned_message);
let valid_response_signature = Wallet::verify_transaction_with_public_key_bytes(
&hash,
&returned_signed_message,
returned_public_key_bin,
)
.await;
if returned_message == "ecaf" && valid_response_signature {
return Err(io::Error::other(
"Handshake accepted as client, not miner. The remote node could not verify your advertised public endpoint is reachable.",
));
}
if returned_message != "face" {
return Err(io::Error::other(format!(
"Handshake failed: unexpected response message {returned_message}"
)));
}
if !valid_response_signature {
return Err(io::Error::other(
"Handshake failed: response signature verification failed",
));
}
let Some(returned_short_address_bytes) =
Wallet::public_key_bytes_to_short_address_bytes(returned_public_key_bin)
else {
return Err(io::Error::other(
"Handshake failed: invalid response public key",
));
};
match register_short_address(
&params.db,
&returned_short_address_bytes,
returned_public_key_bin,
) {
Ok(WalletRegistrationResult::Inserted | WalletRegistrationResult::AlreadyRegistered) => {}
Ok(WalletRegistrationResult::Conflict) => {
return Err(io::Error::other(
"Handshake failed: peer public key conflicts with wallet registry",
));
}
Err(err) => {
return Err(io::Error::other(format!(
"Handshake failed: could not register peer public key: {err}"
)));
}
}
// create and arc mutex of the stream
let stream = Arc::new(Mutex::new(params.stream));
let connections_key = params.addr.clone();
let socket_parts: Vec<&str> = params.addr.split(':').collect();
if socket_parts.len() == 2 {
let ip = socket_parts[0];
let port: u16 = socket_parts[1].parse().unwrap_or(0);
set_reconnect_context(params.db.clone(), params.wallet.clone(), params.map.clone()).await;
let mut conn = CONNECTIONS.write().await;
if let Some(manager) = conn.as_mut() {
if !manager.store_connection(StoreConnectionParams {
connection_type: ConnectionType::Outgoing,
ip: ip.to_string(),
port,
stream: Arc::clone(&stream),
client_type: ClientType::Miner,
wallet_public_key: returned_public_key_bin.to_vec(),
command_map: params.map.clone(),
}) {
return Err(io::Error::other(
"The connection is already in the connection manager Please wait 10 minutes and try again",
));
}
}
}
let listener_stream = Arc::clone(&stream);
tokio::spawn(start_loop(
listener_stream,
params.db.clone(),
connections_key.clone(),
params.wallet.clone(),
params.map.clone(),
));
let broadcast_stream = Arc::clone(&stream);
if let Err(err) = sync_wallet_registry_with_retries(
Arc::clone(&stream),
&params.db,
params.map.clone(),
connections_key.clone(),
"outgoing peer",
)
.await
{
remove_key_from_memory(&connections_key).await;
return Err(io::Error::other(format!(
"Wallet registry sync failed after handshake: {err}"
)));
}
mark_peer_wallet_registry_synced(&connections_key).await;
if params.first {
announce_self_to_network(
broadcast_stream.clone(),
&wallet.saved.short_address.clone(),
params.map.clone(),
&params.db.clone(),
params.wallet.clone(),
&connections_key,
)
.await
.map_err(|err| {
let connections_key = connections_key.clone();
tokio::spawn(async move {
remove_key_from_memory(&connections_key).await;
});
io::Error::other(format!("Network map sync failed: {err}"))
})?;
mark_peer_network_map_synced(&connections_key).await;
let bsparams = BootstrapParams {
stream: Arc::clone(&stream),
connections_key: connections_key.clone(),
wallet: params.wallet.clone(),
db: params.db.clone(),
map: params.map.clone(),
first: params.first,
};
spawn_bootstrap_peer_discovery(bsparams);
} else {
announce_self_to_network(
broadcast_stream.clone(),
&wallet.saved.short_address.clone(),
params.map.clone(),
&params.db.clone(),
params.wallet.clone(),
&connections_key,
)
.await
.map_err(|err| {
let connections_key = connections_key.clone();
tokio::spawn(async move {
remove_key_from_memory(&connections_key).await;
});
io::Error::other(format!("Network map sync failed: {err}"))
})?;
mark_peer_network_map_synced(&connections_key).await;
if crate::miner::flag::is_normal_mode() {
mark_peer_operational(&connections_key, params.map.clone()).await;
}
}
Ok(())
}