diff --git a/src/blocks/burn.rs b/src/blocks/burn.rs index 347789e..fe95f5f 100644 --- a/src/blocks/burn.rs +++ b/src/blocks/burn.rs @@ -1,6 +1,6 @@ use crate::common::binary_conversions::binary_to_string; use crate::common::skein::skein_256_hash_data; -use crate::records::memory::mempool::DB; +use crate::records::memory::mempool::db_client; use crate::to_string; use crate::wallets::structures::Wallet; use crate::Cursor; @@ -181,7 +181,8 @@ impl BurnTransaction { let hash = &self.unsigned_burn.hash().await; let signature = &self.signature; - let client = DB.get().expect("DB not initialized"); + let client_handle = db_client().await?; + let client = client_handle.as_ref(); client .execute( diff --git a/src/blocks/collateral.rs b/src/blocks/collateral.rs index 94d2d41..4df1635 100644 --- a/src/blocks/collateral.rs +++ b/src/blocks/collateral.rs @@ -1,5 +1,5 @@ use crate::common::skein::skein_256_hash_data; -use crate::records::memory::mempool::DB; +use crate::records::memory::mempool::db_client; use crate::to_string; use crate::wallets::structures::Wallet; use crate::Cursor; @@ -176,7 +176,8 @@ impl CollateralClaimTransaction { let signature = &self.signature; // Collateral-claim transactions remain in the mempool table until mined or removed. - let client = DB.get().expect("DB not initialized"); + let client_handle = db_client().await?; + let client = client_handle.as_ref(); client .execute( diff --git a/src/blocks/issue_token.rs b/src/blocks/issue_token.rs index 28beeb5..0fb139a 100644 --- a/src/blocks/issue_token.rs +++ b/src/blocks/issue_token.rs @@ -1,6 +1,6 @@ use crate::common::binary_conversions::binary_to_string; use crate::common::skein::skein_256_hash_data; -use crate::records::memory::mempool::DB; +use crate::records::memory::mempool::db_client; use crate::to_string; use crate::wallets::structures::Wallet; use crate::Cursor; @@ -182,7 +182,8 @@ impl IssueTokenTransaction { let signature = &self.signature; // Issue-token transactions remain in the mempool table until mined or removed. - let client = DB.get().expect("DB not initialized"); + let client_handle = db_client().await?; + let client = client_handle.as_ref(); client .execute( diff --git a/src/blocks/loan_payment.rs b/src/blocks/loan_payment.rs index d9dc3d2..11a774d 100644 --- a/src/blocks/loan_payment.rs +++ b/src/blocks/loan_payment.rs @@ -1,5 +1,5 @@ use crate::common::skein::skein_256_hash_data; -use crate::records::memory::mempool::DB; +use crate::records::memory::mempool::db_client; use crate::to_string; use crate::wallets::structures::Wallet; use crate::Cursor; @@ -208,7 +208,8 @@ impl ContractPaymentTransaction { let signature = &self.signature; // Loan-payment transactions remain in the mempool table until mined or removed. - let client = DB.get().expect("DB not initialized"); + let client_handle = db_client().await?; + let client = client_handle.as_ref(); client .execute( diff --git a/src/blocks/loans.rs b/src/blocks/loans.rs index bbd145c..c3bc99b 100644 --- a/src/blocks/loans.rs +++ b/src/blocks/loans.rs @@ -1,6 +1,6 @@ use crate::common::binary_conversions::binary_to_string; use crate::common::skein::skein_256_hash_data; -use crate::records::memory::mempool::DB; +use crate::records::memory::mempool::db_client; use crate::to_string; use crate::wallets::structures::Wallet; use crate::Cursor; @@ -326,7 +326,8 @@ impl LoanContractTransaction { let signature2 = &self.signature2; // Loan contracts remain in the mempool table until mined or removed. - let client = DB.get().expect("DB not initialized"); + let client_handle = db_client().await?; + let client = client_handle.as_ref(); client .execute( diff --git a/src/blocks/marketing.rs b/src/blocks/marketing.rs index 8bb6f91..c0cce19 100644 --- a/src/blocks/marketing.rs +++ b/src/blocks/marketing.rs @@ -1,6 +1,6 @@ use crate::common::binary_conversions::binary_to_string; use crate::common::skein::skein_256_hash_data; -use crate::records::memory::mempool::DB; +use crate::records::memory::mempool::db_client; use crate::to_string; use crate::wallets::structures::Wallet; use crate::Cursor; @@ -253,7 +253,8 @@ impl MarketingTransaction { let signature = &self.signature; // Marketing transactions remain in the mempool table until mined or removed. - let client = DB.get().expect("DB not initialized"); + let client_handle = db_client().await?; + let client = client_handle.as_ref(); client .execute( diff --git a/src/blocks/nft.rs b/src/blocks/nft.rs index a91b7f7..eef07ce 100644 --- a/src/blocks/nft.rs +++ b/src/blocks/nft.rs @@ -1,6 +1,6 @@ use crate::common::binary_conversions::binary_to_string; use crate::common::skein::skein_256_hash_data; -use crate::records::memory::mempool::DB; +use crate::records::memory::mempool::db_client; use crate::to_string; use crate::wallets::structures::Wallet; use crate::Cursor; @@ -230,7 +230,8 @@ impl CreateNftTransaction { let signature = &self.signature; // NFT-creation transactions remain in the mempool table until mined or removed. - let client = DB.get().expect("DB not initialized"); + let client_handle = db_client().await?; + let client = client_handle.as_ref(); client .execute( diff --git a/src/blocks/swap.rs b/src/blocks/swap.rs index a45feb4..b7ccd66 100644 --- a/src/blocks/swap.rs +++ b/src/blocks/swap.rs @@ -1,6 +1,6 @@ use crate::common::binary_conversions::binary_to_string; use crate::common::skein::skein_256_hash_data; -use crate::records::memory::mempool::DB; +use crate::records::memory::mempool::db_client; use crate::to_string; use crate::wallets::structures::Wallet; use crate::Cursor; @@ -327,7 +327,8 @@ impl SwapTransaction { let signature2 = &self.signature2; // Swap transactions remain in the mempool table until mined or removed. - let client = DB.get().expect("DB not initialized"); + let client_handle = db_client().await?; + let client = client_handle.as_ref(); client .execute( diff --git a/src/blocks/token.rs b/src/blocks/token.rs index 3a98f0d..e6e4388 100644 --- a/src/blocks/token.rs +++ b/src/blocks/token.rs @@ -1,6 +1,6 @@ use crate::common::binary_conversions::binary_to_string; use crate::common::skein::skein_256_hash_data; -use crate::records::memory::mempool::DB; +use crate::records::memory::mempool::db_client; use crate::to_string; use crate::wallets::structures::Wallet; use crate::Cursor; @@ -204,7 +204,8 @@ impl CreateTokenTransaction { let signature = &self.signature; // Token-creation transactions remain in the mempool table until mined or removed. - let client = DB.get().expect("DB not initialized"); + let client_handle = db_client().await?; + let client = client_handle.as_ref(); client .execute( diff --git a/src/blocks/transfer.rs b/src/blocks/transfer.rs index 5c38244..8840078 100644 --- a/src/blocks/transfer.rs +++ b/src/blocks/transfer.rs @@ -1,6 +1,6 @@ use crate::common::binary_conversions::binary_to_string; use crate::common::skein::skein_256_hash_data; -use crate::records::memory::mempool::DB; +use crate::records::memory::mempool::db_client; use crate::to_string; use crate::wallets::structures::Wallet; use crate::Cursor; @@ -226,7 +226,8 @@ impl TransferTransaction { let signature = &self.signature; // Transfer transactions remain in the mempool table until mined or removed. - let client = DB.get().expect("DB not initialized"); + let client_handle = db_client().await?; + let client = client_handle.as_ref(); client .execute( diff --git a/src/blocks/vanity.rs b/src/blocks/vanity.rs index 36dfa71..943ace0 100644 --- a/src/blocks/vanity.rs +++ b/src/blocks/vanity.rs @@ -1,5 +1,5 @@ use crate::common::skein::skein_256_hash_data; -use crate::records::memory::mempool::DB; +use crate::records::memory::mempool::db_client; use crate::to_string; use crate::wallets::structures::Wallet; use crate::Cursor; @@ -182,10 +182,8 @@ impl VanityAddressTransaction { // Vanity transactions are written to the vanity mempool table // until the transaction is mined or removed. - let client = DB.get().ok_or_else(|| { - Box::new(std::io::Error::other("DB not initialized")) - as Box - })?; + let client_handle = db_client().await?; + let client = client_handle.as_ref(); client .execute( diff --git a/src/records/memory/connections.rs b/src/records/memory/connections.rs index 228818e..181c520 100644 --- a/src/records/memory/connections.rs +++ b/src/records/memory/connections.rs @@ -713,9 +713,18 @@ impl Connection { // Prefer a random incoming node connection, falling back to an // outgoing node connection when no incoming peer is available. - pub fn get_random_connection(&self, excluded_key: Option<&str>) -> Option<(Vec, u16)> { + pub fn get_random_connection( + &self, + excluded_key: Option<&str>, + eligible_ips: Option<&[String]>, + ) -> Option<(Vec, u16)> { let mut rng = thread_rng(); let excluded = excluded_key.and_then(split_ip_port_key); + let is_eligible = |key_ip: &[u8]| { + eligible_ips + .map(|ips| ips.iter().any(|ip| ip_to_binary(ip) == key_ip)) + .unwrap_or(true) + }; if let Some((key, _info)) = self .connection_map @@ -724,6 +733,7 @@ impl Connection { ConnectionType::from_bytes(&key.connection_type) == Some(ConnectionType::Incoming) && ClientType::from_bytes(&info.client_type) == Some(ClientType::Miner) && info.ready + && is_eligible(&key.ip) && excluded .as_ref() .map(|(ip, _)| key.ip != ip_to_binary(ip)) @@ -741,6 +751,7 @@ impl Connection { ConnectionType::from_bytes(&key.connection_type) == Some(ConnectionType::Outgoing) && ClientType::from_bytes(&info.client_type) == Some(ClientType::Miner) && info.ready + && is_eligible(&key.ip) && excluded .as_ref() .map(|(ip, _)| key.ip != ip_to_binary(ip)) diff --git a/src/records/memory/mempool/lookups.rs b/src/records/memory/mempool/lookups.rs index f23070a..5bd42ae 100644 --- a/src/records/memory/mempool/lookups.rs +++ b/src/records/memory/mempool/lookups.rs @@ -1,7 +1,8 @@ use super::*; pub async fn signature_exists(signature: &str, hash: &str) -> Result { - let client = DB.get().expect("DB not initialized"); + let client_handle = db_client().await?; + let client = client_handle.as_ref(); // Check every mempool table because the signature column names differ by // transaction type, especially for two-party swaps and loans. @@ -36,7 +37,11 @@ pub async fn signature_exists(signature: &str, hash: &str) -> Result { } pub async fn transaction_by_signature(signature: &str) -> RpcResponse { - let client = DB.get().expect("DB not initialized"); + let client_handle = match db_client().await { + Ok(client) => client, + Err(_) => return RpcResponse::Binary(Vec::new()), + }; + let client = client_handle.as_ref(); // Return the original serialized transaction bytes, not a reconstructed // row, so RPC callers receive the same payload that would enter a block. @@ -85,7 +90,11 @@ pub async fn transaction_by_signature(signature: &str) -> RpcResponse { } pub async fn transactions_by_address(db: &Db, address: &str) -> RpcResponse { - let client = DB.get().expect("DB not initialized"); + let client_handle = match db_client().await { + Ok(client) => client, + Err(_) => return RpcResponse::Binary(Vec::new()), + }; + let client = client_handle.as_ref(); // Canonicalize vanity aliases before querying pending rows. let addresses = canonical_mempool_addresses(db, address); @@ -140,7 +149,11 @@ pub async fn transactions_by_address(db: &Db, address: &str) -> RpcResponse { } pub async fn largest_fee() -> RpcResponse { - let client = DB.get().expect("DB not initialized"); + let client_handle = match db_client().await { + Ok(client) => client, + Err(_) => return RpcResponse::Binary(0u32.to_le_bytes().to_vec()), + }; + let client = client_handle.as_ref(); // Swaps have two possible fees, so both sides are included in the max. let row = match client @@ -191,7 +204,8 @@ async fn pending_saved_loan_payment_balance( addresses: &[String], coin: &str, ) -> Result> { - let client = DB.get().expect("DB not initialized"); + let client_handle = db_client().await?; + let client = client_handle.as_ref(); let rows = client .query( r#" @@ -244,7 +258,8 @@ pub async fn get_coin_balance( address: &str, coin: &str, ) -> Result> { - let client = DB.get().expect("DB not initialized"); + let client_handle = db_client().await?; + let client = client_handle.as_ref(); // Pending-balance checks use canonical addresses so vanity and short // address inputs see the same outgoing obligations. let addresses = canonical_mempool_addresses(db, address); @@ -324,7 +339,8 @@ pub async fn get_basecoin_balance( db: &Db, address: &str, ) -> Result> { - let client = DB.get().expect("DB not initialized"); + let client_handle = db_client().await?; + let client = client_handle.as_ref(); let addresses = canonical_mempool_addresses(db, address); // Base coin projection includes direct base transfers plus all fees and @@ -387,10 +403,11 @@ pub async fn get_basecoin_balance( pub async fn get_pending_payments_for_contract( contract_hash: &str, ) -> Result> { - let client = DB.get().expect("DB not initialized"); + let client_handle = db_client().await?; + let client = client_handle.as_ref(); - // Loan verification uses this to prevent pending payments from exceeding - // what the contract still owes. + // Mempool/UI callers can use this for unconfirmed payment visibility. + // Consensus validation must use confirmed contract payments only. let row = client .query_one( r#" @@ -408,7 +425,11 @@ pub async fn get_pending_payments_for_contract( } pub async fn total_transactions() -> RpcResponse { - let client = DB.get().expect("DB not initialized"); + let client_handle = match db_client().await { + Ok(client) => client, + Err(_) => return RpcResponse::Binary(vec![0; 8]), + }; + let client = client_handle.as_ref(); // Count rows across all mempool tables, including processed rows that may // still be retained briefly for orphan rollback. let row = match client diff --git a/src/records/memory/mempool/mod.rs b/src/records/memory/mempool/mod.rs index 084b51b..9b0f9df 100644 --- a/src/records/memory/mempool/mod.rs +++ b/src/records/memory/mempool/mod.rs @@ -13,7 +13,7 @@ use crate::sled::Db; use crate::wallets::structures::Wallet; use crate::HashMap; use crate::NoTls; -use crate::{task, AtomicBool}; +use crate::{task, Arc, AtomicBool, RwLock}; use anyhow::{anyhow, Result}; use once_cell::sync::OnceCell; use std::fs::File; @@ -38,7 +38,7 @@ lazy_static! { static ref CLEANUP_RUNNING: AtomicBool = AtomicBool::new(false); } -pub static DB: OnceCell = OnceCell::new(); +pub static DB: OnceCell>> = OnceCell::new(); pub const EPOCH_ROW_CAP: i64 = 100_000; const NFT_UNIT: i64 = 100_000_000; @@ -214,7 +214,7 @@ pub use processing::{ restore_processed_by_signatures, restore_selected_transactions_processed, spawn_processed_cleanup, }; -pub use schema::{clear_mempool, init_db, setup_mempool}; +pub use schema::{clear_mempool, db_client, ensure_db_connection, init_db, setup_mempool}; pub use selection::{ apply_selected_transaction_math, clear_selected_transaction_sql, delete_selected_transactions, select_transactions_for_block, stream_selected_transaction_originals, @@ -360,7 +360,8 @@ async fn unmark_by_signatures( async fn delete_processed_before_or_at(block_number: u32, limit: i64) -> Result<()> { // Periodic cleanup deletes processed mempool rows in bounded batches // so long-lived nodes do not accumulate infinite processed history. - let client = DB.get().expect("DB not initialized"); + let client_handle = db_client().await?; + let client = client_handle.as_ref(); let bn = block_number as i32; delete_processed_rows_limited(client, "transfer", bn, limit).await?; diff --git a/src/records/memory/mempool/processing.rs b/src/records/memory/mempool/processing.rs index a321b09..74f0af9 100644 --- a/src/records/memory/mempool/processing.rs +++ b/src/records/memory/mempool/processing.rs @@ -6,7 +6,8 @@ pub async fn mark_selected_transactions_processed( ) -> Result<()> { // Mark each selected mempool row as processed under the saved block // number so it can be cleaned up or restored later if needed. - let client = DB.get().expect("DB not initialized"); + let client_handle = db_client().await?; + let client = client_handle.as_ref(); let bn = block_number as i32; // Selected batches are grouped by table, then marked with one UPDATE per @@ -14,7 +15,7 @@ pub async fn mark_selected_transactions_processed( mark_rows_by_ids(client, "transfer", &ids_for_table(batch, "transfer"), bn).await?; mark_rows_by_ids(client, "token", &ids_for_table(batch, "token"), bn).await?; mark_rows_by_ids( - client, + &client, "issue_token", &ids_for_table(batch, "issue_token"), bn, @@ -24,7 +25,7 @@ pub async fn mark_selected_transactions_processed( mark_rows_by_ids(client, "nft", &ids_for_table(batch, "nft"), bn).await?; mark_rows_by_ids(client, "marketing", &ids_for_table(batch, "marketing"), bn).await?; mark_rows_by_ids( - client, + &client, "vanity_address", &ids_for_table(batch, "vanity_address"), bn, @@ -32,21 +33,21 @@ pub async fn mark_selected_transactions_processed( .await?; mark_rows_by_ids(client, "swap", &ids_for_table(batch, "swap"), bn).await?; mark_rows_by_ids( - client, + &client, "loan_contract", &ids_for_table(batch, "loan_contract"), bn, ) .await?; mark_rows_by_ids( - client, + &client, "loan_payment", &ids_for_table(batch, "loan_payment"), bn, ) .await?; mark_rows_by_ids( - client, + &client, "collateral_claim", &ids_for_table(batch, "collateral_claim"), bn, @@ -59,7 +60,8 @@ pub async fn mark_selected_transactions_processed( pub async fn restore_selected_transactions_processed(batch: &SelectedMempoolBatch) -> Result<()> { // If block commit fails after selected rows were marked processed, // restore them before the chain height can acknowledge the block. - let client = DB.get().expect("DB not initialized"); + let client_handle = db_client().await?; + let client = client_handle.as_ref(); unmark_rows_by_ids(client, "transfer", &ids_for_table(batch, "transfer")).await?; unmark_rows_by_ids(client, "token", &ids_for_table(batch, "token")).await?; @@ -68,26 +70,26 @@ pub async fn restore_selected_transactions_processed(batch: &SelectedMempoolBatc unmark_rows_by_ids(client, "nft", &ids_for_table(batch, "nft")).await?; unmark_rows_by_ids(client, "marketing", &ids_for_table(batch, "marketing")).await?; unmark_rows_by_ids( - client, + &client, "vanity_address", &ids_for_table(batch, "vanity_address"), ) .await?; unmark_rows_by_ids(client, "swap", &ids_for_table(batch, "swap")).await?; unmark_rows_by_ids( - client, + &client, "loan_contract", &ids_for_table(batch, "loan_contract"), ) .await?; unmark_rows_by_ids( - client, + &client, "loan_payment", &ids_for_table(batch, "loan_payment"), ) .await?; unmark_rows_by_ids( - client, + &client, "collateral_claim", &ids_for_table(batch, "collateral_claim"), ) @@ -103,7 +105,8 @@ pub async fn restore_processed_by_signatures(signatures: &[String]) -> Result Result<()> { return Ok(()); } - let client = DB.get().expect("DB not initialized"); + let client_handle = db_client().await?; + let client = client_handle.as_ref(); // Failed validation removes every matching pending row no matter which // transaction table currently owns the signature. diff --git a/src/records/memory/mempool/schema.rs b/src/records/memory/mempool/schema.rs index d41c1ba..a0c61e9 100644 --- a/src/records/memory/mempool/schema.rs +++ b/src/records/memory/mempool/schema.rs @@ -1,11 +1,6 @@ use super::*; -pub async fn init_db() -> Result<()> { - // Initialize the shared Postgres client used by the mempool tables. - if DB.get().is_some() { - return Ok(()); - } - +async fn connect_client() -> Result { let password = SETTINGS .pg_password .as_deref() @@ -28,16 +23,54 @@ pub async fn init_db() -> Result<()> { } }); - DB.set(client) + Ok(client) +} + +pub async fn db_client() -> Result> { + let slot = DB + .get() + .ok_or_else(|| anyhow!("DB not initialized"))?; + Ok(slot.read().await.clone()) +} + +pub async fn reconnect_db() -> Result<()> { + let client = connect_client().await?; + + if let Some(slot) = DB.get() { + *slot.write().await = Arc::new(client); + return Ok(()); + } + + DB.set(RwLock::new(Arc::new(client))) .map_err(|_| anyhow!("DB already initialized"))?; Ok(()) } +pub async fn ensure_db_connection() -> Result<()> { + if let Ok(client) = db_client().await { + if client.simple_query("SELECT 1").await.is_ok() { + return Ok(()); + } + } + + reconnect_db().await +} + +pub async fn init_db() -> Result<()> { + // Initialize the shared Postgres client used by the mempool tables. + if DB.get().is_some() { + return ensure_db_connection().await; + } + + reconnect_db().await +} + pub async fn setup_mempool() -> Result<()> { // Create or migrate the mempool schema, deduplicate any stale rows, // add the selection indexes, and start from an empty live mempool. - let client = DB.get().expect("DB not initialized"); + let client_handle = db_client().await?; + let client = client_handle.as_ref(); let schema = r#" CREATE TABLE IF NOT EXISTS transfer ( @@ -371,7 +404,8 @@ pub async fn setup_mempool() -> Result<()> { pub async fn clear_mempool() -> Result<()> { // Startup clears any leftover mempool rows so a node restart begins // from a clean pending-transaction state. - let client = DB.get().expect("DB not initialized"); + let client_handle = db_client().await?; + let client = client_handle.as_ref(); client .batch_execute( diff --git a/src/records/memory/mempool/selection.rs b/src/records/memory/mempool/selection.rs index e8c2e23..0e39ee4 100644 --- a/src/records/memory/mempool/selection.rs +++ b/src/records/memory/mempool/selection.rs @@ -4,7 +4,8 @@ use crate::records::record_chain::pending_effects::{BalanceOperand, PendingEffec pub async fn select_transactions_for_block(limit: i64) -> Result { // Pull the highest-priority unprocessed rows across all mempool // tables, keeping the original bytes for block-file streaming later. - let client = DB.get().expect("DB not initialized"); + let client_handle = db_client().await?; + let client = client_handle.as_ref(); let rows = client .query( r#" @@ -411,7 +412,8 @@ pub async fn apply_selected_transaction_math( // Local mined blocks need to persist the token hard-limit // metadata just like the downloaded-block save path does. - let client = DB.get().expect("DB not initialized"); + let client_handle = db_client().await?; + let client = client_handle.as_ref(); let token_row = client .query_opt( "SELECT hard_limit FROM token WHERE hash = $1 LIMIT 1", @@ -922,7 +924,8 @@ pub async fn stream_selected_transaction_originals( } pub async fn delete_selected_transactions(batch: &SelectedMempoolBatch) -> Result<()> { - let client = DB.get().expect("DB not initialized"); + let client_handle = db_client().await?; + let client = client_handle.as_ref(); // Each transaction kind still lives in a separate SQL table, so deletion // groups the selected IDs by table after the block has been written. diff --git a/src/records/memory/network_mapping/add.rs b/src/records/memory/network_mapping/add.rs index 64264fa..2aac309 100644 --- a/src/records/memory/network_mapping/add.rs +++ b/src/records/memory/network_mapping/add.rs @@ -1,6 +1,98 @@ use super::*; +const ONE_HOUR_MILLIS: u64 = 3_600_000; + impl NodeInfo { + pub async fn import_signed_mapping_address( + db: &Db, + edit: SignedNodeEdit, + monitors: Vec, + blocks_mined: u8, + ) -> Result<(), String> { + if !is_public_network_address(&edit.ip) { + return Err("invalid network address".to_string()); + } + + let data = format!( + "{}{}{}{}", + edit.address, edit.ip, edit.modified_by, edit.modified_timestamp + ); + let hashed_data = skein_256_hash_data(&data); + let pubkey = crate::records::wallet_registry::resolve_pubkey_from_short_address( + db, + &edit.modified_by, + ) + .map_err(|err| format!("could not resolve signer: {err}"))? + .ok_or_else(|| "could not resolve signer".to_string())?; + + if !Wallet::verify_transaction_with_public_key_bytes( + &hashed_data, + &edit.modified_signature, + &pubkey, + ) + .await + { + return Err("could not validate signature".to_string()); + } + + let mut address_map = ADDRESS_MAP.lock().await; + if let Some(existing_node) = address_map.get_mut(&edit.address) { + existing_node.monitoring = monitors.clone(); + existing_node.blocks_mined = blocks_mined; + if existing_node.deleted_timestamp > 0 { + if existing_node.ip == edit.ip { + existing_node.added_by = edit.modified_by; + existing_node.added_timestamp = edit.modified_timestamp; + existing_node.added_signature = edit.modified_signature; + existing_node.deleted_timestamp = 0_u64; + existing_node.deleted_block = 0_u32; + drop(address_map); + Self::persist_recovery_snapshot("import revive").await; + return Ok(()); + } + + address_map.remove(&edit.address); + } else { + if edit.modified_timestamp > existing_node.added_timestamp { + *existing_node = NodeInfo::new( + edit.ip, + blocks_mined, + edit.modified_by, + edit.modified_timestamp, + edit.modified_signature, + ); + existing_node.monitoring = monitors; + } + drop(address_map); + Self::persist_recovery_snapshot("import existing").await; + return Ok(()); + } + } + + if address_map + .values() + .any(|node| node.ip == edit.ip && node.deleted_timestamp == 0 && edit.ip != GENESIS_IP) + { + return Err("ip already exists".to_string()); + } + + address_map.insert(edit.address, { + let mut node = NodeInfo::new( + edit.ip, + blocks_mined, + edit.modified_by, + edit.modified_timestamp, + edit.modified_signature, + ); + node.monitoring = monitors; + node + }); + drop(address_map); + Self::persist_recovery_snapshot("import add").await; + + Ok(()) + } + pub async fn broadcast_address_state( map: Arc>, address: &str, @@ -172,7 +264,7 @@ impl NodeInfo { let signer_node = address_map.get(&signer_key); let valid_added_by = signer_node .map(|node| { - (current_timestamp - node.added_timestamp) >= 3600 + current_timestamp.saturating_sub(node.added_timestamp) >= ONE_HOUR_MILLIS && node.deleted_timestamp == 0 }) .unwrap_or(false); @@ -195,7 +287,7 @@ impl NodeInfo { .values() .filter(|node| { node.added_by == edit.modified_by - && (current_timestamp - node.added_timestamp) <= 3600 + && current_timestamp.saturating_sub(node.added_timestamp) <= ONE_HOUR_MILLIS }) .count(); @@ -215,6 +307,8 @@ impl NodeInfo { if existing_node.ip == edit.ip { existing_node.deleted_timestamp = 0_u64; existing_node.deleted_block = 0_u32; + drop(address_map); + Self::persist_recovery_snapshot("node revive").await; return RpcResponse::Binary(b"Success".to_vec()); } else { address_map.remove(&edit.address); @@ -275,6 +369,8 @@ impl NodeInfo { return RpcResponse::Binary(b"Error: Ip Already exists.".to_vec()); } + Self::persist_recovery_snapshot("node add").await; + if !remote_ip.is_empty() { let broadcast_map = map.clone(); let broadcast_edit = edit.clone(); diff --git a/src/records/memory/network_mapping/mined_counts.rs b/src/records/memory/network_mapping/mined_counts.rs index bef8b04..3a327e5 100644 --- a/src/records/memory/network_mapping/mined_counts.rs +++ b/src/records/memory/network_mapping/mined_counts.rs @@ -3,23 +3,29 @@ use crate::records::unpack_block::unpack_header::load_block_header; impl NodeInfo { pub async fn increment_mined(address: &str) { - let mut map = ADDRESS_MAP.lock().await; - if let Some(node_info) = map.get_mut(address) { - // Counts are capped at u8-safe policy maximum used by node rules. - if node_info.blocks_mined < 250 { - node_info.blocks_mined += 1; + { + let mut map = ADDRESS_MAP.lock().await; + if let Some(node_info) = map.get_mut(address) { + // Counts are capped at u8-safe policy maximum used by node rules. + if node_info.blocks_mined < 250 { + node_info.blocks_mined += 1; + } } } + Self::persist_recovery_snapshot("mined increment").await; } pub async fn decrement_mined(address: &str) { - let mut map = ADDRESS_MAP.lock().await; - if let Some(node_info) = map.get_mut(address) { - // Rollback can undo mined credit, but never below zero. - if node_info.blocks_mined > 0 { - node_info.blocks_mined -= 1; + { + let mut map = ADDRESS_MAP.lock().await; + if let Some(node_info) = map.get_mut(address) { + // Rollback can undo mined credit, but never below zero. + if node_info.blocks_mined > 0 { + node_info.blocks_mined -= 1; + } } } + Self::persist_recovery_snapshot("mined decrement").await; } pub async fn get_mined_count(address: &str) -> u8 { @@ -45,11 +51,14 @@ impl NodeInfo { deleted_timestamp: u64, deleted_block: u32, ) { - let mut map = ADDRESS_MAP.lock().await; - if let Some(node_info) = map.get_mut(address) { - node_info.deleted_timestamp = deleted_timestamp; - node_info.deleted_block = deleted_block; + { + let mut map = ADDRESS_MAP.lock().await; + if let Some(node_info) = map.get_mut(address) { + node_info.deleted_timestamp = deleted_timestamp; + node_info.deleted_block = deleted_block; + } } + Self::persist_recovery_snapshot("deleted metadata import").await; } pub async fn rebuild_mined_counts_from_chain(db: &Db) -> Result<(), String> { @@ -84,6 +93,7 @@ impl NodeInfo { } } } + Self::persist_recovery_snapshot("mined rebuild").await; Ok(()) } } diff --git a/src/records/memory/network_mapping/mod.rs b/src/records/memory/network_mapping/mod.rs index 81a639d..870faca 100644 --- a/src/records/memory/network_mapping/mod.rs +++ b/src/records/memory/network_mapping/mod.rs @@ -67,5 +67,6 @@ mod delete; pub mod enums; mod mined_counts; pub(crate) mod monitor; +pub(crate) mod persistence; mod queries; pub mod structs; diff --git a/src/records/memory/network_mapping/monitor.rs b/src/records/memory/network_mapping/monitor.rs index f324856..d482f14 100644 --- a/src/records/memory/network_mapping/monitor.rs +++ b/src/records/memory/network_mapping/monitor.rs @@ -164,6 +164,8 @@ impl NodeInfo { } } + Self::persist_recovery_snapshot("monitor update").await; + RpcResponse::Binary(b"Success".to_vec()) } diff --git a/src/records/memory/network_mapping/persistence.rs b/src/records/memory/network_mapping/persistence.rs new file mode 100644 index 0000000..1937f1f --- /dev/null +++ b/src/records/memory/network_mapping/persistence.rs @@ -0,0 +1,151 @@ +use super::*; +use crate::common::binary_conversions::{binary_to_ip, ip_to_binary}; +use crate::common::network_paths_and_settings::block_extension_and_paths; +use crate::records::memory::network_mapping::structs::{ + NODE_ADDED_BY_OFFSET, NODE_ADDED_SIGNATURE_OFFSET, NODE_ADDED_TIMESTAMP_OFFSET, + NODE_BLOCKS_MINED_OFFSET, NODE_DELETED_BLOCK_OFFSET, NODE_DELETED_TIMESTAMP_OFFSET, + NODE_IP_OFFSET, NODE_MONITOR_COUNT_OFFSET, NODE_RECORD_FIXED_BYTES, +}; +use crate::{create_dir_all, read, PathBuf}; + +fn recovery_snapshot_path() -> PathBuf { + let (_, _, _, _, _, _, db_path, _, _) = block_extension_and_paths(); + PathBuf::from(db_path).join("network_map_recovery.bin") +} + +impl NodeInfo { + pub async fn save_recovery_snapshot() -> Result<(), String> { + let path = recovery_snapshot_path(); + if let Some(parent) = path.parent() { + create_dir_all(parent) + .await + .map_err(|err| format!("failed to create network-map snapshot directory: {err}"))?; + } + + let data = { + let map = ADDRESS_MAP.lock().await; + let mut data = Vec::with_capacity(map.len() * NODE_RECORD_FIXED_BYTES); + + for (address, node_info) in map.iter() { + let Some(address_bytes) = Wallet::short_address_to_bytes(address) else { + continue; + }; + let added_by_bytes = + Wallet::short_address_to_bytes(&node_info.added_by).unwrap_or_default(); + let Ok(added_signature_bytes) = decode(node_info.added_signature.clone()) else { + continue; + }; + if added_signature_bytes.len() != Wallet::SIGNATURE_LENGTH { + continue; + } + + data.extend_from_slice(&address_bytes); + data.extend_from_slice(&ip_to_binary(&node_info.ip)); + data.push(node_info.blocks_mined); + if added_by_bytes.len() == Wallet::SHORT_ADDRESS_BYTES_LENGTH { + data.extend_from_slice(&added_by_bytes); + } else { + data.extend_from_slice(&[0u8; Wallet::SHORT_ADDRESS_BYTES_LENGTH]); + } + data.extend_from_slice(&node_info.added_timestamp.to_le_bytes()); + data.extend_from_slice(&added_signature_bytes); + data.extend_from_slice(&node_info.deleted_timestamp.to_le_bytes()); + data.extend_from_slice(&node_info.deleted_block.to_le_bytes()); + data.extend_from_slice(&0u16.to_le_bytes()); + } + + data + }; + + crate::tokio::fs::write(&path, data) + .await + .map_err(|err| format!("failed to write network-map recovery snapshot: {err}")) + } + + pub async fn load_recovery_snapshot() -> Result { + let path = recovery_snapshot_path(); + let Ok(mut buffer) = read(&path).await else { + return Ok(0); + }; + + let mut loaded = 0usize; + let mut recovered = HashMap::new(); + + while buffer.len() >= NODE_RECORD_FIXED_BYTES { + let monitor_count = u16::from_le_bytes( + buffer[NODE_MONITOR_COUNT_OFFSET..NODE_RECORD_FIXED_BYTES] + .try_into() + .unwrap(), + ) as usize; + let record_bytes = + NODE_RECORD_FIXED_BYTES + (monitor_count * Wallet::SHORT_ADDRESS_BYTES_LENGTH); + if buffer.len() < record_bytes { + break; + } + + let chunk: Vec = buffer.drain(0..record_bytes).collect(); + let Some(address) = Wallet::bytes_to_short_address(&chunk[0..NODE_IP_OFFSET]) else { + continue; + }; + let ip = binary_to_ip(chunk[NODE_IP_OFFSET..NODE_BLOCKS_MINED_OFFSET].to_vec()); + let blocks_mined = chunk[NODE_BLOCKS_MINED_OFFSET]; + let added_by_bytes = &chunk[NODE_ADDED_BY_OFFSET..NODE_ADDED_TIMESTAMP_OFFSET]; + let added_by = if added_by_bytes.iter().all(|&byte| byte == 0) { + String::new() + } else { + Wallet::bytes_to_short_address(added_by_bytes).unwrap_or_default() + }; + let added_timestamp = u64::from_le_bytes( + chunk[NODE_ADDED_TIMESTAMP_OFFSET..NODE_ADDED_SIGNATURE_OFFSET] + .try_into() + .unwrap(), + ); + let added_signature = + crate::encode(&chunk[NODE_ADDED_SIGNATURE_OFFSET..NODE_DELETED_TIMESTAMP_OFFSET]); + let deleted_timestamp = u64::from_le_bytes( + chunk[NODE_DELETED_TIMESTAMP_OFFSET..NODE_DELETED_BLOCK_OFFSET] + .try_into() + .unwrap(), + ); + let deleted_block = u32::from_le_bytes( + chunk[NODE_DELETED_BLOCK_OFFSET..NODE_MONITOR_COUNT_OFFSET] + .try_into() + .unwrap(), + ); + + let mut node = NodeInfo::new( + ip, + blocks_mined, + added_by, + added_timestamp, + added_signature, + ); + node.deleted_timestamp = deleted_timestamp; + node.deleted_block = deleted_block; + node.monitoring = Vec::new(); + recovered.insert(address, node); + loaded += 1; + } + + if !buffer.is_empty() { + warn!("[network_map] recovery snapshot had trailing partial bytes"); + } + + let mut map = ADDRESS_MAP.lock().await; + *map = recovered; + Ok(loaded) + } + + pub async fn clear_recovered_monitors() { + let mut map = ADDRESS_MAP.lock().await; + for node in map.values_mut() { + node.monitoring.clear(); + } + } + + pub async fn persist_recovery_snapshot(context: &str) { + if let Err(err) = Self::save_recovery_snapshot().await { + warn!("[network_map] failed to save recovery snapshot after {context}: {err}"); + } + } +} diff --git a/src/records/memory/network_mapping/queries.rs b/src/records/memory/network_mapping/queries.rs index 5b182a4..a0522b5 100644 --- a/src/records/memory/network_mapping/queries.rs +++ b/src/records/memory/network_mapping/queries.rs @@ -38,6 +38,19 @@ impl NodeInfo { .collect() } + pub async fn eligible_sponsor_ips() -> Vec { + let current_timestamp = Utc::now().timestamp_millis() as u64; + let map = ADDRESS_MAP.lock().await; + map.values() + .filter(|node_info| { + node_info.deleted_timestamp == 0 + && current_timestamp.saturating_sub(node_info.added_timestamp) >= 3_600_000 + && node_info.blocks_mined >= 100 + }) + .map(|node_info| node_info.ip.clone()) + .collect() + } + pub async fn get_deleted_addresses() -> Vec { let map = ADDRESS_MAP.lock().await; map.iter() diff --git a/src/records/record_chain/save.rs b/src/records/record_chain/save.rs index 9c5fb66..31712ce 100644 --- a/src/records/record_chain/save.rs +++ b/src/records/record_chain/save.rs @@ -11,7 +11,7 @@ use crate::records::block_height::get_block_height::get_height; use crate::records::block_height::increase_block_height::increase_height; use crate::records::memory::averages::asert_genesis_anchor; use crate::records::memory::mempool::{ - apply_selected_transaction_math, mark_processed_by_signatures, + apply_selected_transaction_math, ensure_db_connection, mark_processed_by_signatures, mark_selected_transactions_processed, restore_processed_by_signatures, restore_selected_transactions_processed, select_transactions_for_block, spawn_processed_cleanup, stream_selected_transaction_originals, @@ -71,6 +71,10 @@ pub async fn save_block(params: SaveBlockParams) -> Result<(), String> { } } + ensure_db_connection() + .await + .map_err(|err| format!("Postgres reconnect failed before block save: {err}"))?; + let header_bytes = block .vrf_block .to_bytes() diff --git a/src/rpc/client/handshake_processing.rs b/src/rpc/client/handshake_processing.rs index 89c881f..81ac057 100644 --- a/src/rpc/client/handshake_processing.rs +++ b/src/rpc/client/handshake_processing.rs @@ -226,6 +226,9 @@ pub async fn bootstrap_peer_discovery(mut params: BootstrapParams) -> Result<(), } info!("[sync] post-sync checks complete, mining resuming"); + if let Err(err) = NodeInfo::rebuild_mined_counts_from_chain(¶ms.db).await { + error!("[startup] failed to rebuild mined counts after startup sync: {err}"); + } for (peer_key, _) in startup_synced_peer_streams().await { mark_peer_operational(&peer_key, params.map.clone()).await; } @@ -382,9 +385,6 @@ pub async fn process_handshake_response( io::Error::other(format!("Network map sync failed: {err}")) })?; mark_peer_network_map_synced(&connections_key).await; - if let Err(err) = NodeInfo::rebuild_mined_counts_from_chain(¶ms.db).await { - error!("[startup] failed to rebuild mined counts from local chain: {err}"); - } let bsparams = BootstrapParams { stream: Arc::clone(&stream), connections_key: connections_key.clone(), diff --git a/src/rpc/commands/random_node.rs b/src/rpc/commands/random_node.rs index d2fd417..07eb632 100644 --- a/src/rpc/commands/random_node.rs +++ b/src/rpc/commands/random_node.rs @@ -1,13 +1,19 @@ use crate::records::memory::connections::CONNECTIONS; +use crate::records::memory::network_mapping::NodeInfo; use crate::rpc::responses::RpcResponse; pub async fn request_node(connections_key: &str) -> RpcResponse { - // Return one random node endpoint from the in-memory connection set - // so callers can bootstrap peer discovery from a live connection. + // Return one random connected node that can legally sponsor/add nodes + // under the mature-network network-map rules. + let eligible_ips = NodeInfo::eligible_sponsor_ips().await; + if eligible_ips.is_empty() { + return RpcResponse::Binary(b"Error: No Eligible Sponsor Found".to_vec()); + } + let result = { let connections_lock = CONNECTIONS.read().await; if let Some(connection) = connections_lock.as_ref() { - connection.get_random_connection(Some(connections_key)) + connection.get_random_connection(Some(connections_key), Some(&eligible_ips)) } else { None } @@ -20,7 +26,7 @@ pub async fn request_node(connections_key: &str) -> RpcResponse { RpcResponse::Binary(ip) } None => { - let msg = "Error: No Connections Found" + let msg = "Error: No Eligible Sponsor Found" .to_string() .as_bytes() .to_vec(); diff --git a/src/standalone_tools/connections/handshake.rs b/src/standalone_tools/connections/handshake.rs index dedcbc5..6a3a37f 100644 --- a/src/standalone_tools/connections/handshake.rs +++ b/src/standalone_tools/connections/handshake.rs @@ -161,10 +161,17 @@ async fn perform_handshake( } if total_read != HANDSHAKE_RESPONSE_BYTES { + let received = &buffer[..total_read]; + return Err(io::Error::new( io::ErrorKind::UnexpectedEof, format!( - "Incomplete handshake response: received {total_read} of {HANDSHAKE_RESPONSE_BYTES} bytes" + "Incomplete handshake response: received {total_read} of {HANDSHAKE_RESPONSE_BYTES} bytes. Text='{}' Hex='{}'", + String::from_utf8_lossy(received), + received + .iter() + .map(|byte| format!("{:02x}", byte)) + .collect::() ), )); } diff --git a/src/startup/daemonize.rs b/src/startup/daemonize.rs index f20c8ac..2f492e4 100644 --- a/src/startup/daemonize.rs +++ b/src/startup/daemonize.rs @@ -200,18 +200,21 @@ pub fn install_shutdown_cleanup(db: Db) { } }; - tokio::select! { - _ = sigterm.recv() => { - warn!("Received SIGTERM, shutting down."); - } - _ = sigint.recv() => { - warn!("Received SIGINT, shutting down."); - } - } - - if let Err(err) = db.flush_async().await { - error!("Failed to flush sled during shutdown: {err}"); - } + tokio::select! { + _ = sigterm.recv() => { + warn!("Received SIGTERM, shutting down."); + } + _ = sigint.recv() => { + warn!("Received SIGINT, shutting down."); + } + } + + crate::records::memory::network_mapping::NodeInfo::persist_recovery_snapshot("shutdown") + .await; + + if let Err(err) = db.flush_async().await { + error!("Failed to flush sled during shutdown: {err}"); + } // Removing the PID file here lets the next startup proceed without manual cleanup. remove_registered_pid_file(); diff --git a/src/startup/network_broadcast.rs b/src/startup/network_broadcast.rs index b9c1c43..d2991e1 100644 --- a/src/startup/network_broadcast.rs +++ b/src/startup/network_broadcast.rs @@ -1,7 +1,8 @@ use crate::common::binary_conversions::{binary_to_ip, binary_to_string, ip_to_binary}; +use crate::log::warn; use crate::common::network_startup::get_ip_and_port; use crate::records::memory::network_mapping::structs::{ - AddAddressParams, SignedNodeEdit, NODE_ADDED_BY_OFFSET, NODE_ADDED_SIGNATURE_OFFSET, + SignedNodeEdit, NODE_ADDED_BY_OFFSET, NODE_ADDED_SIGNATURE_OFFSET, NODE_ADDED_TIMESTAMP_OFFSET, NODE_BLOCKS_MINED_OFFSET, NODE_DELETED_BLOCK_OFFSET, NODE_DELETED_TIMESTAMP_OFFSET, NODE_IP_OFFSET, NODE_MONITOR_COUNT_OFFSET, NODE_RECORD_FIXED_BYTES, @@ -98,7 +99,7 @@ pub async fn get_network_mapping( unlocked_stream: Arc>, command_map: Arc>, db: &Db, - wallet: Arc, + _wallet: Arc, connections_key: &str, ) -> Result<(), String> { // request the remote peer's serialized node list and @@ -138,8 +139,7 @@ pub async fn get_network_mapping( continue; }; let ip = binary_to_ip(chunk[NODE_IP_OFFSET..NODE_BLOCKS_MINED_OFFSET].to_vec()); - let _advertised_blocks_mined = chunk[NODE_BLOCKS_MINED_OFFSET]; - let blocks_mined = 0_u8; + let blocks_mined = chunk[NODE_BLOCKS_MINED_OFFSET]; let added_by_bytes = &chunk[NODE_ADDED_BY_OFFSET..NODE_ADDED_TIMESTAMP_OFFSET]; let added_by = if added_by_bytes.iter().all(|&byte| byte == 0) { String::new() @@ -163,7 +163,6 @@ pub async fn get_network_mapping( .try_into() .unwrap(), ); - let remote_ip = ""; let mut monitors = Vec::with_capacity(monitor_count); for monitor_index in 0..monitor_count { let start = @@ -174,10 +173,12 @@ pub async fn get_network_mapping( } } - // Add records are imported through NodeInfo so local validation/signing rules stay central. - NodeInfo::add_address(AddAddressParams { - map: command_map.clone(), - edit: SignedNodeEdit { + // Import signed map records as remote state. This verifies the original + // signer but does not require the newly connecting node to be eligible + // to add nodes locally after the mature-network gate. + if let Err(err) = NodeInfo::import_signed_mapping_address( + db, + SignedNodeEdit { address: address.clone(), ip: ip.clone(), modified_by: added_by, @@ -186,12 +187,11 @@ pub async fn get_network_mapping( }, monitors, blocks_mined, - remote_ip: remote_ip.to_string(), - db: db.clone(), - wallet: wallet.clone(), - connections_key: connections_key.to_string(), - }) - .await; + ) + .await + { + warn!("[network_map] skipped imported node record {address}: {err}"); + } if deleted_timestamp > 0 { NodeInfo::set_deleted_metadata_from_mapping(&address, deleted_timestamp, deleted_block) diff --git a/src/startup/node_runtime.rs b/src/startup/node_runtime.rs index 03f859c..9150905 100644 --- a/src/startup/node_runtime.rs +++ b/src/startup/node_runtime.rs @@ -10,6 +10,7 @@ use crate::miner::genesis::create_genesis_transaction; use crate::miner::mining::mine_block; use crate::panic; use crate::records::memory::mempool::{init_db, setup_mempool}; +use crate::records::memory::network_mapping::NodeInfo; use crate::records::memory::response_channels::Command; use crate::rpc::server::start_rpc::start_rpc; use crate::sled::Db; @@ -118,6 +119,17 @@ pub async fn run_unlocked_node(wallet: Arc, install_shutdown: bool) -> R error!("Failed to clear IP scores: {e}"); } + match NodeInfo::load_recovery_snapshot().await { + Ok(loaded) if loaded > 0 => { + info!("[network_map] loaded {loaded} recovered node records with monitors cleared"); + if let Err(err) = NodeInfo::rebuild_mined_counts_from_chain(&db).await { + error!("[network_map] failed to rebuild recovered mined counts: {err}"); + } + } + Ok(_) => {} + Err(err) => error!("[network_map] failed to load recovery snapshot: {err}"), + } + let wallet_for_server = wallet.clone(); let map: Arc> = Arc::new(Mutex::new(HashMap::new())); let map_cloned = Arc::clone(&map); diff --git a/src/verifications/async_funcs/total_payments.rs b/src/verifications/async_funcs/total_payments.rs index 17d5ab6..024853c 100644 --- a/src/verifications/async_funcs/total_payments.rs +++ b/src/verifications/async_funcs/total_payments.rs @@ -1,5 +1,4 @@ use crate::decode; -use crate::records::memory::mempool::get_pending_payments_for_contract; use crate::sled::Db; // Contract payment totals are stored as a packed list of u64 values @@ -28,8 +27,9 @@ fn deserialize_value(bytes: &[u8]) -> Option> { } pub async fn get_total_payments(db: &Db, contract_id: &str) -> u64 { - // Combine confirmed on-chain payments with pending mempool loan - // payments so contract checks see the most current total. + // Consensus loan state only includes payments already saved in blocks. + // Pending mempool payments are intentionally excluded so block validation + // cannot depend on each node's local unconfirmed transaction set. let tree = db.open_tree("contract_payments").unwrap(); let contract_key = match decode(contract_id) { Ok(bytes) => bytes, @@ -41,14 +41,8 @@ pub async fn get_total_payments(db: &Db, contract_id: &str) -> u64 { _ => None, }; - let chain_total = match payments { + match payments { Some(p) => p.iter().sum(), None => 0, - }; - - let pending_total = get_pending_payments_for_contract(contract_id) - .await - .unwrap_or(0); - - chain_total + pending_total + } } diff --git a/src/verifications/verification_service.rs b/src/verifications/verification_service.rs index 66922df..f3fff83 100644 --- a/src/verifications/verification_service.rs +++ b/src/verifications/verification_service.rs @@ -1,6 +1,7 @@ use crate::common::types::Transaction; use crate::rayon::ThreadPool; use crate::rayon::ThreadPoolBuilder; +use crate::records::memory::mempool::ensure_db_connection; use crate::verifications::async_funcs::checks::block_balance::BlockBalanceTracker; use crate::verifications::async_funcs::transactions::verify_transactions; use crate::verifications::sync_funcs::transaction_verify_loop::COUNTER; @@ -103,6 +104,10 @@ impl VerificationService { verify_runtime: Arc, job: VerificationJob, ) -> Result, String> { + ensure_db_connection() + .await + .map_err(|err| format!("Postgres reconnect failed before validation: {err}"))?; + // Transaction verification is split into chunks and processed in parallel, then // merged back into one result list for the caller. let chunk_size = 1000usize;