use crate::blocks::loans::LoanContractTransaction; use crate::common::binary_conversions::binary_to_string; use crate::common::network_paths_and_settings::block_extension_and_paths; use crate::common::nft_assets::{nft_asset_name, nft_asset_parts}; use crate::config::SETTINGS; use crate::decode; use crate::lazy_static; use crate::records::memory::structs::BalanceKey; use crate::records::wallet_registry::{ resolve_canonical_registered_short_address, }; use crate::rpc::commands::transaction_by_txid::request_transaction_by_txid; use crate::rpc::responses::RpcResponse; use crate::sled::Db; use crate::wallets::structures::Wallet; use crate::HashMap; use crate::NoTls; use crate::{task, AtomicBool}; use anyhow::{anyhow, Result}; use once_cell::sync::OnceCell; use std::fs::File; use std::io::Write; use tokio_postgres::Client; lazy_static! { pub static ref BASECOIN: String = { let ( _network_name, base_coin, _suffix, _torrentpath, _wallet_path, _blockpath, _db_path, _balance_path, _log_path, ) = block_extension_and_paths(); format!("{base_coin:<15}") }; static ref CLEANUP_RUNNING: AtomicBool = AtomicBool::new(false); } pub static DB: OnceCell = OnceCell::new(); pub const EPOCH_ROW_CAP: i64 = 100_000; const NFT_UNIT: i64 = 100_000_000; const CLEANUP_DEPTH: u32 = 10; const CLEANUP_BATCH_LIMIT: i64 = 1000; #[derive(Clone)] enum SelectedMempoolTransaction { // These variants hold the minimal fields needed to score, mark, and // later apply selected mempool transactions into a saved block. Transfer { id: i64, fee: i64, sender: String, value: i64, coin: String, nft_series: i32, receiver: String, hash: String, }, Token { id: i64, fee: i64, creator: String, number: i64, ticker: String, hash: String, }, IssueToken { id: i64, fee: i64, creator: String, number: i64, ticker: String, hash: String, }, Burn { id: i64, fee: i64, address: String, coin: String, nft_series: i32, value: i64, hash: String, }, Nft { id: i64, fee: i64, creator: String, nft_name: String, series: i16, count: i64, hash: String, }, Marketing { id: i64, fee: i64, advertiser: String, hash: String, }, Vanity { id: i64, fee: i64, address: String, vanity_address: String, hash: String, }, Swap { id: i64, fee1: i64, fee2: i64, ticker1: String, nft_series1: i32, ticker2: String, nft_series2: i32, value1: i64, value2: i64, sender1: String, tip1: i64, tip2: i64, sender2: String, hash: String, }, Lender { id: i64, fee: i64, loan_coin: String, loan_amount: i64, lender: String, collateral: String, collateral_amount: i64, borrower: String, txid: String, hash: String, }, Borrower { id: i64, fee: i64, payback_amount: i64, contract_hash: String, address: String, tip: i64, hash: String, }, Collateral { id: i64, fee: i64, address: String, contract_hash: String, hash: String, }, } #[derive(Clone, Default)] pub struct SelectedMempoolBatch { // The selected transaction view is kept separate from the original // serialized bytes so save paths can stream the original payloads. transactions: Vec, originals: Vec>, } impl SelectedMempoolTransaction { fn table_name(&self) -> &'static str { match self { SelectedMempoolTransaction::Transfer { .. } => "transfer", SelectedMempoolTransaction::Token { .. } => "token", SelectedMempoolTransaction::IssueToken { .. } => "issue_token", SelectedMempoolTransaction::Burn { .. } => "burn", SelectedMempoolTransaction::Nft { .. } => "nft", SelectedMempoolTransaction::Marketing { .. } => "marketing", SelectedMempoolTransaction::Vanity { .. } => "vanity_address", SelectedMempoolTransaction::Swap { .. } => "swap", SelectedMempoolTransaction::Lender { .. } => "loan_contract", SelectedMempoolTransaction::Borrower { .. } => "loan_payment", SelectedMempoolTransaction::Collateral { .. } => "collateral_claim", } } fn id(&self) -> i64 { match self { SelectedMempoolTransaction::Transfer { id, .. } | SelectedMempoolTransaction::Token { id, .. } | SelectedMempoolTransaction::IssueToken { id, .. } | SelectedMempoolTransaction::Burn { id, .. } | SelectedMempoolTransaction::Nft { id, .. } | SelectedMempoolTransaction::Marketing { id, .. } | SelectedMempoolTransaction::Vanity { id, .. } | SelectedMempoolTransaction::Swap { id, .. } | SelectedMempoolTransaction::Lender { id, .. } | SelectedMempoolTransaction::Borrower { id, .. } | SelectedMempoolTransaction::Collateral { id, .. } => *id, } } } impl SelectedMempoolBatch { pub fn is_empty(&self) -> bool { self.transactions.is_empty() } } mod lookups; mod processing; mod schema; mod selection; pub use lookups::{ get_basecoin_balance, get_coin_balance, get_pending_payments_for_contract, largest_fee, signature_exists, total_transactions, transaction_by_signature, transactions_by_address, }; pub use processing::{ mark_processed_by_signatures, mark_selected_transactions_processed, restore_processed_by_signatures, restore_selected_transactions_processed, spawn_processed_cleanup, delete_by_signatures, }; pub use schema::{clear_mempool, init_db, setup_mempool}; pub use selection::{ apply_selected_transaction_math, clear_selected_transaction_sql, delete_selected_transactions, select_transactions_for_block, stream_selected_transaction_originals, }; fn required_string(row: &tokio_postgres::Row, column: &str) -> Result { row.try_get::<_, Option>(column)? .ok_or_else(|| anyhow!("Missing required column {column}")) } fn add_balance_change( db: &Db, balance_changes: &mut HashMap, address: &str, coin: &str, delta: i64, ) { add_balance_change_bytes( balance_changes, address_key_bytes(db, address), coin.as_bytes().to_vec(), delta, ); } fn add_balance_change_bytes( balance_changes: &mut HashMap, address: Vec, coin: Vec, delta: i64, ) { *balance_changes .entry(BalanceKey { address, coin }) .or_insert(0) += delta; } fn address_key_bytes(db: &Db, address: &str) -> Vec { resolve_canonical_registered_short_address(db, address) .ok() .flatten() .or_else(|| Wallet::normalize_to_short_address(address)) .map(|normalized| normalized.as_bytes().to_vec()) .unwrap_or_else(|| address.as_bytes().to_vec()) } fn canonical_mempool_addresses(db: &Db, address: &str) -> Vec { let canonical = resolve_canonical_registered_short_address(db, address) .ok() .flatten() .or_else(|| Wallet::normalize_to_short_address(address)) .unwrap_or_else(|| address.to_string()); vec![canonical] } async fn resolve_loan_details(db: &Db, contract_hash: &str) -> Result<(Vec, Vec)> { let RpcResponse::Binary(bytes) = request_transaction_by_txid(db, decode(contract_hash)?).await; if bytes.is_empty() || bytes[0] != 7 { return Ok((Vec::new(), Vec::new())); } match LoanContractTransaction::from_bytes(7, &bytes[1..]).await { Ok(loan) => Ok(( loan.unsigned_loan_contract.loan_coin.as_bytes().to_vec(), address_key_bytes(db, &loan.unsigned_loan_contract.lender), )), Err(_) => Ok((Vec::new(), Vec::new())), } } async fn resolve_collateral_details(db: &Db, contract_hash: &str) -> Result<(Vec, i64)> { let RpcResponse::Binary(bytes) = request_transaction_by_txid(db, decode(contract_hash)?).await; if bytes.is_empty() || bytes[0] != 7 { return Ok((Vec::new(), 0)); } match LoanContractTransaction::from_bytes(7, &bytes[1..]).await { Ok(loan) => Ok(( loan.unsigned_loan_contract.collateral.as_bytes().to_vec(), loan.unsigned_loan_contract.collateral_amount as i64, )), Err(_) => Ok((Vec::new(), 0)), } } fn ids_for_table(batch: &SelectedMempoolBatch, table: &str) -> Vec { batch .transactions .iter() .filter(|tx| tx.table_name() == table) .map(SelectedMempoolTransaction::id) .collect() } async fn mark_rows_by_ids( client: &Client, table: &str, ids: &[i64], block_number: i32, ) -> Result<()> { if ids.is_empty() { return Ok(()); } let statement = format!( "UPDATE {table} SET processed=true, processed_block_number=$1 WHERE id = ANY($2)" ); client.execute(&statement, &[&block_number, &ids]).await?; Ok(()) } async fn unmark_rows_by_ids(client: &Client, table: &str, ids: &[i64]) -> Result<()> { if ids.is_empty() { return Ok(()); } let statement = format!("UPDATE {table} SET processed=false, processed_block_number=NULL WHERE id = ANY($1)"); client.execute(&statement, &[&ids]).await?; Ok(()) } async fn delete_rows(client: &Client, table: &str, ids: &[i64]) -> Result<()> { if ids.is_empty() { return Ok(()); } let statement = format!("DELETE FROM {table} WHERE id = ANY($1)"); client.execute(&statement, &[&ids]).await?; Ok(()) } async fn unmark_by_signatures( client: &Client, table: &str, signature_column: &str, signatures: &[String], ) -> Result { let statement = format!( "UPDATE {table} SET processed=false, processed_block_number=NULL WHERE {signature_column} = ANY($1) AND processed = true" ); Ok(client.execute(&statement, &[&signatures]).await?) } async fn delete_processed_before_or_at(block_number: u32, limit: i64) -> Result<()> { // Periodic cleanup deletes processed mempool rows in bounded batches // so long-lived nodes do not accumulate infinite processed history. let client = DB.get().expect("DB not initialized"); let bn = block_number as i32; delete_processed_rows_limited(client, "transfer", bn, limit).await?; delete_processed_rows_limited(client, "token", bn, limit).await?; delete_processed_rows_limited(client, "issue_token", bn, limit).await?; delete_processed_rows_limited(client, "burn", bn, limit).await?; delete_processed_rows_limited(client, "nft", bn, limit).await?; delete_processed_rows_limited(client, "marketing", bn, limit).await?; delete_processed_rows_limited(client, "vanity_address", bn, limit).await?; delete_processed_rows_limited(client, "swap", bn, limit).await?; delete_processed_rows_limited(client, "loan_contract", bn, limit).await?; delete_processed_rows_limited(client, "loan_payment", bn, limit).await?; delete_processed_rows_limited(client, "collateral_claim", bn, limit).await?; Ok(()) } async fn delete_processed_rows_limited( client: &Client, table: &str, block_number: i32, limit: i64, ) -> Result { let statement = format!( r#" DELETE FROM {table} WHERE id IN ( SELECT id FROM {table} WHERE processed = true AND processed_block_number IS NOT NULL AND processed_block_number <= $1 ORDER BY processed_block_number ASC, id ASC LIMIT $2 ) "# ); Ok(client.execute(&statement, &[&block_number, &limit]).await?) }