diff --git a/src/records/memory/mempool/lookups.rs b/src/records/memory/mempool/lookups.rs index 5bd42ae..9a8b8bf 100644 --- a/src/records/memory/mempool/lookups.rs +++ b/src/records/memory/mempool/lookups.rs @@ -1,7 +1,7 @@ use super::*; pub async fn signature_exists(signature: &str, hash: &str) -> Result { - let client_handle = db_client().await?; + let client_handle = db_client().await?; let client = client_handle.as_ref(); // Check every mempool table because the signature column names differ by @@ -148,6 +148,124 @@ pub async fn transactions_by_address(db: &Db, address: &str) -> RpcResponse { RpcResponse::Binary(bytes) } +pub async fn latest_pending_txids_by_address(db: &Db, address: &str, limit: usize) -> Vec> { + if limit == 0 { + return Vec::new(); + } + + let client_handle = match db_client().await { + Ok(client) => client, + Err(_) => return Vec::new(), + }; + let client = client_handle.as_ref(); + let addresses = canonical_mempool_addresses(db, address); + let limit = i64::try_from(limit).unwrap_or(i64::MAX); + + let rows = match client + .query( + r#" + SELECT hash FROM ( + SELECT DISTINCT ON (hash) hash, time, source_id FROM ( + SELECT hash, time, id AS source_id FROM transfer + WHERE (sender = ANY($1) OR receiver = ANY($1)) AND processed = false + UNION ALL + SELECT hash, time, id AS source_id FROM token + WHERE creator = ANY($1) AND processed = false + UNION ALL + SELECT hash, time, id AS source_id FROM issue_token + WHERE creator = ANY($1) AND processed = false + UNION ALL + SELECT hash, time, id AS source_id FROM burn + WHERE address = ANY($1) AND processed = false + UNION ALL + SELECT hash, time, id AS source_id FROM nft + WHERE creator = ANY($1) AND processed = false + UNION ALL + SELECT hash, time, id AS source_id FROM marketing + WHERE advertiser = ANY($1) AND processed = false + UNION ALL + SELECT hash, time, id AS source_id FROM vanity_address + WHERE address = ANY($1) AND processed = false + UNION ALL + SELECT hash, time, id AS source_id FROM swap + WHERE (sender1 = ANY($1) OR sender2 = ANY($1)) AND processed = false + UNION ALL + SELECT hash, time, id AS source_id FROM loan_contract + WHERE (lender = ANY($1) OR borrower = ANY($1)) AND processed = false + UNION ALL + SELECT hash, time, id AS source_id FROM loan_payment + WHERE address = ANY($1) AND processed = false + UNION ALL + SELECT hash, time, id AS source_id FROM collateral_claim + WHERE address = ANY($1) AND processed = false + ) AS pending + ORDER BY hash, time DESC, source_id DESC + ) AS deduped + ORDER BY time DESC, source_id DESC + LIMIT $2 + "#, + &[&addresses, &limit], + ) + .await + { + Ok(rows) => rows, + Err(_) => return Vec::new(), + }; + + let mut txids = rows + .into_iter() + .filter_map(|row| { + let hash: String = row.get("hash"); + decode(&hash).ok().filter(|bytes| bytes.len() == 32) + }) + .collect::>(); + txids.truncate(limit as usize); + txids +} + +pub async fn pending_transaction_by_txid(txid: &[u8]) -> Option> { + if txid.len() != 32 { + return None; + } + + let client_handle = db_client().await.ok()?; + let client = client_handle.as_ref(); + let hash = crate::encode(txid); + let row = client + .query_opt( + r#" + SELECT original FROM ( + SELECT original FROM transfer WHERE hash = $1 AND processed = false + UNION ALL + SELECT original FROM token WHERE hash = $1 AND processed = false + UNION ALL + SELECT original FROM issue_token WHERE hash = $1 AND processed = false + UNION ALL + SELECT original FROM burn WHERE hash = $1 AND processed = false + UNION ALL + SELECT original FROM nft WHERE hash = $1 AND processed = false + UNION ALL + SELECT original FROM marketing WHERE hash = $1 AND processed = false + UNION ALL + SELECT original FROM vanity_address WHERE hash = $1 AND processed = false + UNION ALL + SELECT original FROM swap WHERE hash = $1 AND processed = false + UNION ALL + SELECT original FROM loan_contract WHERE hash = $1 AND processed = false + UNION ALL + SELECT original FROM loan_payment WHERE hash = $1 AND processed = false + UNION ALL + SELECT original FROM collateral_claim WHERE hash = $1 AND processed = false + ) AS subquery LIMIT 1 + "#, + &[&hash], + ) + .await + .ok()?; + + row.map(|row| row.get("original")) +} + pub async fn largest_fee() -> RpcResponse { let client_handle = match db_client().await { Ok(client) => client, @@ -204,7 +322,7 @@ async fn pending_saved_loan_payment_balance( addresses: &[String], coin: &str, ) -> Result> { - let client_handle = db_client().await?; + let client_handle = db_client().await?; let client = client_handle.as_ref(); let rows = client .query( @@ -258,7 +376,7 @@ pub async fn get_coin_balance( address: &str, coin: &str, ) -> Result> { - let client_handle = db_client().await?; + let client_handle = db_client().await?; let client = client_handle.as_ref(); // Pending-balance checks use canonical addresses so vanity and short // address inputs see the same outgoing obligations. @@ -339,7 +457,7 @@ pub async fn get_basecoin_balance( db: &Db, address: &str, ) -> Result> { - let client_handle = db_client().await?; + let client_handle = db_client().await?; let client = client_handle.as_ref(); let addresses = canonical_mempool_addresses(db, address); @@ -403,7 +521,7 @@ pub async fn get_basecoin_balance( pub async fn get_pending_payments_for_contract( contract_hash: &str, ) -> Result> { - let client_handle = db_client().await?; + let client_handle = db_client().await?; let client = client_handle.as_ref(); // Mempool/UI callers can use this for unconfirmed payment visibility. diff --git a/src/records/memory/mempool/mod.rs b/src/records/memory/mempool/mod.rs index 9b0f9df..a5af7ac 100644 --- a/src/records/memory/mempool/mod.rs +++ b/src/records/memory/mempool/mod.rs @@ -207,7 +207,8 @@ mod selection; pub use lookups::{ get_basecoin_balance, get_coin_balance, get_pending_payments_for_contract, largest_fee, - signature_exists, total_transactions, transaction_by_signature, transactions_by_address, + latest_pending_txids_by_address, pending_transaction_by_txid, signature_exists, + total_transactions, transaction_by_signature, transactions_by_address, }; pub use processing::{ delete_by_signatures, mark_processed_by_signatures, mark_selected_transactions_processed, diff --git a/src/records/memory/network_mapping/add.rs b/src/records/memory/network_mapping/add.rs index 5cf828a..323a961 100644 --- a/src/records/memory/network_mapping/add.rs +++ b/src/records/memory/network_mapping/add.rs @@ -12,7 +12,7 @@ impl NodeInfo { db: &Db, edit: SignedNodeEdit, monitors: Vec, - blocks_mined: u8, + _blocks_mined: u8, ) -> Result<(), String> { if !is_public_network_address(&edit.ip) { return Err("invalid network address".to_string()); @@ -43,29 +43,24 @@ impl NodeInfo { let mut address_map = ADDRESS_MAP.lock().await; if let Some(existing_node) = address_map.get_mut(&edit.address) { existing_node.monitoring = monitors.clone(); - existing_node.blocks_mined = blocks_mined; if existing_node.deleted_timestamp > 0 { - if existing_node.ip == edit.ip { + existing_node.ip = edit.ip; + existing_node.added_by = edit.modified_by; + existing_node.added_timestamp = edit.modified_timestamp; + existing_node.added_signature = edit.modified_signature; + existing_node.deleted_timestamp = 0_u64; + existing_node.deleted_block = 0_u32; + drop(address_map); + Self::persist_recovery_snapshot("import revive").await; + return Ok(()); + } else { + if existing_node.ip != edit.ip { + return Err("active node must be deleted before changing IP".to_string()); + } + if edit.modified_timestamp > existing_node.added_timestamp { existing_node.added_by = edit.modified_by; existing_node.added_timestamp = edit.modified_timestamp; existing_node.added_signature = edit.modified_signature; - existing_node.deleted_timestamp = 0_u64; - existing_node.deleted_block = 0_u32; - drop(address_map); - Self::persist_recovery_snapshot("import revive").await; - return Ok(()); - } - - address_map.remove(&edit.address); - } else { - if edit.modified_timestamp > existing_node.added_timestamp { - *existing_node = NodeInfo::new( - edit.ip, - blocks_mined, - edit.modified_by, - edit.modified_timestamp, - edit.modified_signature, - ); existing_node.monitoring = monitors; } drop(address_map); @@ -74,17 +69,18 @@ impl NodeInfo { } } - if address_map - .values() - .any(|node| node.ip == edit.ip && node.deleted_timestamp == 0 && edit.ip != GENESIS_IP) - { - return Err("ip already exists".to_string()); - } + let inherited_blocks_mined = match address_map.values().find(|node| node.ip == edit.ip) { + Some(node) if node.deleted_timestamp == 0 && edit.ip != GENESIS_IP => { + return Err("ip already exists".to_string()); + } + Some(node) => node.blocks_mined, + None => 0, + }; address_map.insert(edit.address, { let mut node = NodeInfo::new( edit.ip, - blocks_mined, + inherited_blocks_mined, edit.modified_by, edit.modified_timestamp, edit.modified_signature, @@ -209,7 +205,7 @@ impl NodeInfo { map, mut edit, monitors, - mut blocks_mined, + blocks_mined: _blocks_mined, remote_ip, db, wallet, @@ -235,10 +231,6 @@ impl NodeInfo { Self::added_signature(&edit.address, &edit.ip, current_timestamp, &wallet).await; } - if !remote_ip.is_empty() { - blocks_mined = 0; - } - let data = format!( "{}{}{}{}", edit.address, edit.ip, edit.modified_by, edit.modified_timestamp @@ -318,26 +310,23 @@ impl NodeInfo { if let Some(existing_node) = address_map.get_mut(&edit.address) { existing_node.monitoring = monitors.clone(); if existing_node.deleted_timestamp > 0 { - if existing_node.ip == edit.ip { + existing_node.ip = edit.ip.clone(); + existing_node.added_by = edit.modified_by.clone(); + existing_node.added_timestamp = edit.modified_timestamp; + existing_node.added_signature = edit.modified_signature.clone(); + existing_node.deleted_timestamp = 0_u64; + existing_node.deleted_block = 0_u32; + state_changed = true; + } else { + if existing_node.ip != edit.ip { + return RpcResponse::Binary( + b"Error: Active node must be deleted before changing IP".to_vec(), + ); + } + if edit.modified_timestamp > existing_node.added_timestamp { existing_node.added_by = edit.modified_by.clone(); existing_node.added_timestamp = edit.modified_timestamp; existing_node.added_signature = edit.modified_signature.clone(); - existing_node.blocks_mined = blocks_mined; - existing_node.deleted_timestamp = 0_u64; - existing_node.deleted_block = 0_u32; - state_changed = true; - } else { - address_map.remove(&edit.address); - } - } else { - if edit.modified_timestamp > existing_node.added_timestamp { - *existing_node = NodeInfo::new( - edit.ip.clone(), - blocks_mined, - edit.modified_by.clone(), - edit.modified_timestamp, - edit.modified_signature.clone(), - ); existing_node.monitoring = monitors.clone(); state_changed = true; } @@ -345,11 +334,14 @@ impl NodeInfo { } if !address_map.contains_key(&edit.address) { + let mut inherited_blocks_mined = 0; if let Some(existing_node) = address_map.values_mut().find(|node| node.ip == edit.ip) { if existing_node.deleted_timestamp == 0 && edit.ip != GENESIS_IP { penalize_duplicate_ip = true; + } else { + inherited_blocks_mined = existing_node.blocks_mined; } } @@ -359,7 +351,7 @@ impl NodeInfo { address_map.insert(edit.address.clone(), { let mut node = NodeInfo::new( edit.ip.clone(), - blocks_mined, + inherited_blocks_mined, edit.modified_by.clone(), edit.modified_timestamp, edit.modified_signature.clone(), diff --git a/src/records/memory/network_mapping/mined_counts.rs b/src/records/memory/network_mapping/mined_counts.rs index 54fb6c0..4cfce4d 100644 --- a/src/records/memory/network_mapping/mined_counts.rs +++ b/src/records/memory/network_mapping/mined_counts.rs @@ -2,6 +2,13 @@ use super::*; use crate::common::check_genesis::genesis_checkup; use crate::records::unpack_block::unpack_header::load_block_header; +fn increment_count(counts: &mut HashMap, address: &str) { + let entry = counts.entry(address.to_string()).or_insert(0); + if *entry < 250 { + *entry += 1; + } +} + impl NodeInfo { pub async fn increment_mined(address: &str) { { @@ -48,8 +55,7 @@ impl NodeInfo { } let mut mined_count = 0_u8; - let start_height = if through_height > 0 { 1 } else { 0 }; - for block_number in start_height..=through_height { + for block_number in 1..=through_height { let header = load_block_header(block_number).await?; if header.unmined_block.miner == address { mined_count = mined_count.saturating_add(1); @@ -102,15 +108,10 @@ impl NodeInfo { return Ok(()); } - let start_height = if current_height > 0 { 1 } else { 0 }; - for block_number in start_height..=current_height { + for block_number in 1..=current_height { let header = load_block_header(block_number).await?; let miner = header.unmined_block.miner; - let entry = mined_counts.entry(miner).or_insert(0); - // Keep the rebuilt value under the same cap as live increments. - if *entry < 250 { - *entry += 1; - } + increment_count(&mut mined_counts, &miner); } { diff --git a/src/records/memory/network_mapping/mod.rs b/src/records/memory/network_mapping/mod.rs index dde628e..4268e1f 100644 --- a/src/records/memory/network_mapping/mod.rs +++ b/src/records/memory/network_mapping/mod.rs @@ -11,7 +11,7 @@ use crate::records::ip_score::score::update_ip_score; use crate::records::memory::connections::CONNECTIONS; use crate::records::memory::network_mapping::enums::NodeEditType; use crate::records::memory::network_mapping::structs::{ - AddAddressParams, DeleteAddressParams, MonitorAddressParams, SignedMonitorEdit, SignedNodeEdit, + AddAddressParams, MonitorAddressParams, SignedMonitorEdit, SignedNodeEdit, NODE_RECORD_FIXED_BYTES, }; use crate::records::memory::response_channels::Command; @@ -61,7 +61,6 @@ impl NodeInfo { } mod add; -mod delete; pub mod enums; mod mined_counts; pub(crate) mod monitor; diff --git a/src/records/memory/network_mapping/structs.rs b/src/records/memory/network_mapping/structs.rs index 857dc71..2d8a149 100644 --- a/src/records/memory/network_mapping/structs.rs +++ b/src/records/memory/network_mapping/structs.rs @@ -56,17 +56,6 @@ pub struct AddAddressParams { pub connections_key: String, } -// DeleteAddressParams groups the shared context needed to remove a node from the network map. -#[derive(Clone)] -pub struct DeleteAddressParams { - pub map: Arc>, - pub edit: SignedNodeEdit, - pub remote_ip: String, - pub db: Db, - pub wallet: Arc, - pub connections_key: String, -} - #[derive(Clone)] pub struct MonitorAddressParams { pub map: Arc>, diff --git a/src/rpc/command_maps.rs b/src/rpc/command_maps.rs index 045f525..963b201 100644 --- a/src/rpc/command_maps.rs +++ b/src/rpc/command_maps.rs @@ -40,7 +40,6 @@ pub const RPC_VALIDATE_MESSAGE: u8 = 25; pub const RPC_BLOCK_IP: u8 = 26; pub const RPC_UNBLOCK_IP: u8 = 27; pub const RPC_ADD_NETWORK_NODE: u8 = 28; -pub const RPC_DELETE_NETWORK_NODE: u8 = 29; pub const RPC_REQUEST_NODE_LIST: u8 = 30; pub const RPC_TOKEN_LIST: u8 = 31; pub const RPC_NFT_LIST: u8 = 32; diff --git a/src/rpc/commands/address_history.rs b/src/rpc/commands/address_history.rs index 602e0d4..2d2b109 100644 --- a/src/rpc/commands/address_history.rs +++ b/src/rpc/commands/address_history.rs @@ -1,7 +1,9 @@ +use crate::records::memory::mempool::latest_pending_txids_by_address; use crate::records::record_chain::wallet_tx_index::WALLET_TX_INDEX_TREE; use crate::rpc::responses::RpcResponse; use crate::sled::Db; use crate::wallets::structures::Wallet; +use std::collections::HashSet; use std::ops::Bound; const TXID_LENGTH: usize = 32; @@ -91,6 +93,10 @@ pub async fn latest(address_bytes: Vec, limit: u32, db: &Db) -> RpcResponse return RpcResponse::Binary(Vec::new()); } + let Some(address) = Wallet::bytes_to_short_address(&address_bytes) else { + return RpcResponse::Binary(b"error: Invalid wallet address bytes".to_vec()); + }; + let tree = match db.open_tree(WALLET_TX_INDEX_TREE) { Ok(tree) => tree, Err(err) => { @@ -103,6 +109,19 @@ pub async fn latest(address_bytes: Vec, limit: u32, db: &Db) -> RpcResponse let (start, end) = history_bounds(&address_bytes); let mut found = 0usize; let mut txids = Vec::with_capacity(limit * TXID_LENGTH); + let mut seen = HashSet::new(); + + for txid in latest_pending_txids_by_address(db, &address, limit).await { + if txid.len() != TXID_LENGTH || !seen.insert(txid.clone()) { + continue; + } + + txids.extend_from_slice(&txid); + found += 1; + if found >= limit { + return RpcResponse::Binary(txids); + } + } for entry in tree.range((start, end)).rev() { let (_key, value) = match entry { @@ -118,7 +137,12 @@ pub async fn latest(address_bytes: Vec, limit: u32, db: &Db) -> RpcResponse continue; } - txids.extend_from_slice(&value); + let txid = value.to_vec(); + if !seen.insert(txid.clone()) { + continue; + } + + txids.extend_from_slice(&txid); found += 1; if found >= limit { break; diff --git a/src/rpc/commands/mod.rs b/src/rpc/commands/mod.rs index 1b27ab9..46adca5 100644 --- a/src/rpc/commands/mod.rs +++ b/src/rpc/commands/mod.rs @@ -13,7 +13,6 @@ pub mod block_headers; pub mod block_height; pub mod block_peer_ip; pub mod contract; -pub mod delete_network_node; pub mod difficulty; pub mod largest_tx_fee; pub mod latest_block; diff --git a/src/rpc/commands/transaction_by_txid.rs b/src/rpc/commands/transaction_by_txid.rs index e8ca365..8382fea 100644 --- a/src/rpc/commands/transaction_by_txid.rs +++ b/src/rpc/commands/transaction_by_txid.rs @@ -2,6 +2,7 @@ use crate::blocks::block::VRF_BLOCK_BYTES; use crate::common::binary_conversions::binary_to_string; use crate::common::network_paths_and_settings::block_extension_and_paths; use crate::io; +use crate::records::memory::mempool::pending_transaction_by_txid; use crate::rpc::command_maps; use crate::rpc::responses::RpcResponse; use crate::sled::Db; @@ -32,7 +33,7 @@ pub async fn request_transaction_by_txid(db: &Db, txid: Vec) -> RpcResponse pub async fn request_transaction_by_txid_with_block(db: &Db, txid: Vec) -> RpcResponse { // Some callers need the block number alongside the raw transaction // bytes, so this variant prefixes the payload with the block height. - match lookup_transaction_location(db, txid).await { + match lookup_transaction_location(db, txid.clone()).await { Ok((block, position, block_filename)) => { let bytes = calculate_offset(&block_filename, position).await; match bytes { @@ -48,7 +49,16 @@ pub async fn request_transaction_by_txid_with_block(db: &Db, txid: Vec) -> R } } } - Err(msg) => RpcResponse::Binary(msg.into_bytes()), + Err(msg) => { + if let Some(bytes) = pending_transaction_by_txid(&txid).await { + let mut response = Vec::with_capacity(4 + bytes.len()); + response.extend_from_slice(&0u32.to_le_bytes()); + response.extend_from_slice(&bytes); + RpcResponse::Binary(response) + } else { + RpcResponse::Binary(msg.into_bytes()) + } + } } } diff --git a/src/rpc/server/rpc_command_loop.rs b/src/rpc/server/rpc_command_loop.rs index dfb1e3a..696d24e 100644 --- a/src/rpc/server/rpc_command_loop.rs +++ b/src/rpc/server/rpc_command_loop.rs @@ -567,20 +567,6 @@ pub async fn start_loop( .send(&stream_locked, Some(&connections_key), uid) .await; } - 29 => { - // delete a new network node - let (uid, result) = commands::delete_network_node::delete_network_node( - &connections_key, - stream_locked.clone(), - &db, - wallet.clone(), - map.clone(), - ) - .await?; - result - .send(&stream_locked, Some(&connections_key), uid) - .await; - } 30 => { // request node list let (uid, _) = read_bytes_from_stream::read_uid_from_stream(