fixed network ync startup process

This commit is contained in:
viraladmin 2026-06-13 19:36:03 -06:00
parent b8c8e47b69
commit 0e852e0e69
11 changed files with 215 additions and 97 deletions

View File

@ -148,6 +148,124 @@ pub async fn transactions_by_address(db: &Db, address: &str) -> RpcResponse {
RpcResponse::Binary(bytes)
}
pub async fn latest_pending_txids_by_address(db: &Db, address: &str, limit: usize) -> Vec<Vec<u8>> {
if limit == 0 {
return Vec::new();
}
let client_handle = match db_client().await {
Ok(client) => client,
Err(_) => return Vec::new(),
};
let client = client_handle.as_ref();
let addresses = canonical_mempool_addresses(db, address);
let limit = i64::try_from(limit).unwrap_or(i64::MAX);
let rows = match client
.query(
r#"
SELECT hash FROM (
SELECT DISTINCT ON (hash) hash, time, source_id FROM (
SELECT hash, time, id AS source_id FROM transfer
WHERE (sender = ANY($1) OR receiver = ANY($1)) AND processed = false
UNION ALL
SELECT hash, time, id AS source_id FROM token
WHERE creator = ANY($1) AND processed = false
UNION ALL
SELECT hash, time, id AS source_id FROM issue_token
WHERE creator = ANY($1) AND processed = false
UNION ALL
SELECT hash, time, id AS source_id FROM burn
WHERE address = ANY($1) AND processed = false
UNION ALL
SELECT hash, time, id AS source_id FROM nft
WHERE creator = ANY($1) AND processed = false
UNION ALL
SELECT hash, time, id AS source_id FROM marketing
WHERE advertiser = ANY($1) AND processed = false
UNION ALL
SELECT hash, time, id AS source_id FROM vanity_address
WHERE address = ANY($1) AND processed = false
UNION ALL
SELECT hash, time, id AS source_id FROM swap
WHERE (sender1 = ANY($1) OR sender2 = ANY($1)) AND processed = false
UNION ALL
SELECT hash, time, id AS source_id FROM loan_contract
WHERE (lender = ANY($1) OR borrower = ANY($1)) AND processed = false
UNION ALL
SELECT hash, time, id AS source_id FROM loan_payment
WHERE address = ANY($1) AND processed = false
UNION ALL
SELECT hash, time, id AS source_id FROM collateral_claim
WHERE address = ANY($1) AND processed = false
) AS pending
ORDER BY hash, time DESC, source_id DESC
) AS deduped
ORDER BY time DESC, source_id DESC
LIMIT $2
"#,
&[&addresses, &limit],
)
.await
{
Ok(rows) => rows,
Err(_) => return Vec::new(),
};
let mut txids = rows
.into_iter()
.filter_map(|row| {
let hash: String = row.get("hash");
decode(&hash).ok().filter(|bytes| bytes.len() == 32)
})
.collect::<Vec<_>>();
txids.truncate(limit as usize);
txids
}
pub async fn pending_transaction_by_txid(txid: &[u8]) -> Option<Vec<u8>> {
if txid.len() != 32 {
return None;
}
let client_handle = db_client().await.ok()?;
let client = client_handle.as_ref();
let hash = crate::encode(txid);
let row = client
.query_opt(
r#"
SELECT original FROM (
SELECT original FROM transfer WHERE hash = $1 AND processed = false
UNION ALL
SELECT original FROM token WHERE hash = $1 AND processed = false
UNION ALL
SELECT original FROM issue_token WHERE hash = $1 AND processed = false
UNION ALL
SELECT original FROM burn WHERE hash = $1 AND processed = false
UNION ALL
SELECT original FROM nft WHERE hash = $1 AND processed = false
UNION ALL
SELECT original FROM marketing WHERE hash = $1 AND processed = false
UNION ALL
SELECT original FROM vanity_address WHERE hash = $1 AND processed = false
UNION ALL
SELECT original FROM swap WHERE hash = $1 AND processed = false
UNION ALL
SELECT original FROM loan_contract WHERE hash = $1 AND processed = false
UNION ALL
SELECT original FROM loan_payment WHERE hash = $1 AND processed = false
UNION ALL
SELECT original FROM collateral_claim WHERE hash = $1 AND processed = false
) AS subquery LIMIT 1
"#,
&[&hash],
)
.await
.ok()?;
row.map(|row| row.get("original"))
}
pub async fn largest_fee() -> RpcResponse {
let client_handle = match db_client().await {
Ok(client) => client,

View File

@ -207,7 +207,8 @@ mod selection;
pub use lookups::{
get_basecoin_balance, get_coin_balance, get_pending_payments_for_contract, largest_fee,
signature_exists, total_transactions, transaction_by_signature, transactions_by_address,
latest_pending_txids_by_address, pending_transaction_by_txid, signature_exists,
total_transactions, transaction_by_signature, transactions_by_address,
};
pub use processing::{
delete_by_signatures, mark_processed_by_signatures, mark_selected_transactions_processed,

View File

@ -12,7 +12,7 @@ impl NodeInfo {
db: &Db,
edit: SignedNodeEdit,
monitors: Vec<String>,
blocks_mined: u8,
_blocks_mined: u8,
) -> Result<(), String> {
if !is_public_network_address(&edit.ip) {
return Err("invalid network address".to_string());
@ -43,29 +43,24 @@ impl NodeInfo {
let mut address_map = ADDRESS_MAP.lock().await;
if let Some(existing_node) = address_map.get_mut(&edit.address) {
existing_node.monitoring = monitors.clone();
existing_node.blocks_mined = blocks_mined;
if existing_node.deleted_timestamp > 0 {
if existing_node.ip == edit.ip {
existing_node.ip = edit.ip;
existing_node.added_by = edit.modified_by;
existing_node.added_timestamp = edit.modified_timestamp;
existing_node.added_signature = edit.modified_signature;
existing_node.deleted_timestamp = 0_u64;
existing_node.deleted_block = 0_u32;
drop(address_map);
Self::persist_recovery_snapshot("import revive").await;
return Ok(());
} else {
if existing_node.ip != edit.ip {
return Err("active node must be deleted before changing IP".to_string());
}
if edit.modified_timestamp > existing_node.added_timestamp {
existing_node.added_by = edit.modified_by;
existing_node.added_timestamp = edit.modified_timestamp;
existing_node.added_signature = edit.modified_signature;
existing_node.deleted_timestamp = 0_u64;
existing_node.deleted_block = 0_u32;
drop(address_map);
Self::persist_recovery_snapshot("import revive").await;
return Ok(());
}
address_map.remove(&edit.address);
} else {
if edit.modified_timestamp > existing_node.added_timestamp {
*existing_node = NodeInfo::new(
edit.ip,
blocks_mined,
edit.modified_by,
edit.modified_timestamp,
edit.modified_signature,
);
existing_node.monitoring = monitors;
}
drop(address_map);
@ -74,17 +69,18 @@ impl NodeInfo {
}
}
if address_map
.values()
.any(|node| node.ip == edit.ip && node.deleted_timestamp == 0 && edit.ip != GENESIS_IP)
{
return Err("ip already exists".to_string());
}
let inherited_blocks_mined = match address_map.values().find(|node| node.ip == edit.ip) {
Some(node) if node.deleted_timestamp == 0 && edit.ip != GENESIS_IP => {
return Err("ip already exists".to_string());
}
Some(node) => node.blocks_mined,
None => 0,
};
address_map.insert(edit.address, {
let mut node = NodeInfo::new(
edit.ip,
blocks_mined,
inherited_blocks_mined,
edit.modified_by,
edit.modified_timestamp,
edit.modified_signature,
@ -209,7 +205,7 @@ impl NodeInfo {
map,
mut edit,
monitors,
mut blocks_mined,
blocks_mined: _blocks_mined,
remote_ip,
db,
wallet,
@ -235,10 +231,6 @@ impl NodeInfo {
Self::added_signature(&edit.address, &edit.ip, current_timestamp, &wallet).await;
}
if !remote_ip.is_empty() {
blocks_mined = 0;
}
let data = format!(
"{}{}{}{}",
edit.address, edit.ip, edit.modified_by, edit.modified_timestamp
@ -318,26 +310,23 @@ impl NodeInfo {
if let Some(existing_node) = address_map.get_mut(&edit.address) {
existing_node.monitoring = monitors.clone();
if existing_node.deleted_timestamp > 0 {
if existing_node.ip == edit.ip {
existing_node.ip = edit.ip.clone();
existing_node.added_by = edit.modified_by.clone();
existing_node.added_timestamp = edit.modified_timestamp;
existing_node.added_signature = edit.modified_signature.clone();
existing_node.deleted_timestamp = 0_u64;
existing_node.deleted_block = 0_u32;
state_changed = true;
} else {
if existing_node.ip != edit.ip {
return RpcResponse::Binary(
b"Error: Active node must be deleted before changing IP".to_vec(),
);
}
if edit.modified_timestamp > existing_node.added_timestamp {
existing_node.added_by = edit.modified_by.clone();
existing_node.added_timestamp = edit.modified_timestamp;
existing_node.added_signature = edit.modified_signature.clone();
existing_node.blocks_mined = blocks_mined;
existing_node.deleted_timestamp = 0_u64;
existing_node.deleted_block = 0_u32;
state_changed = true;
} else {
address_map.remove(&edit.address);
}
} else {
if edit.modified_timestamp > existing_node.added_timestamp {
*existing_node = NodeInfo::new(
edit.ip.clone(),
blocks_mined,
edit.modified_by.clone(),
edit.modified_timestamp,
edit.modified_signature.clone(),
);
existing_node.monitoring = monitors.clone();
state_changed = true;
}
@ -345,11 +334,14 @@ impl NodeInfo {
}
if !address_map.contains_key(&edit.address) {
let mut inherited_blocks_mined = 0;
if let Some(existing_node) =
address_map.values_mut().find(|node| node.ip == edit.ip)
{
if existing_node.deleted_timestamp == 0 && edit.ip != GENESIS_IP {
penalize_duplicate_ip = true;
} else {
inherited_blocks_mined = existing_node.blocks_mined;
}
}
@ -359,7 +351,7 @@ impl NodeInfo {
address_map.insert(edit.address.clone(), {
let mut node = NodeInfo::new(
edit.ip.clone(),
blocks_mined,
inherited_blocks_mined,
edit.modified_by.clone(),
edit.modified_timestamp,
edit.modified_signature.clone(),

View File

@ -2,6 +2,13 @@ use super::*;
use crate::common::check_genesis::genesis_checkup;
use crate::records::unpack_block::unpack_header::load_block_header;
fn increment_count(counts: &mut HashMap<String, u8>, address: &str) {
let entry = counts.entry(address.to_string()).or_insert(0);
if *entry < 250 {
*entry += 1;
}
}
impl NodeInfo {
pub async fn increment_mined(address: &str) {
{
@ -48,8 +55,7 @@ impl NodeInfo {
}
let mut mined_count = 0_u8;
let start_height = if through_height > 0 { 1 } else { 0 };
for block_number in start_height..=through_height {
for block_number in 1..=through_height {
let header = load_block_header(block_number).await?;
if header.unmined_block.miner == address {
mined_count = mined_count.saturating_add(1);
@ -102,15 +108,10 @@ impl NodeInfo {
return Ok(());
}
let start_height = if current_height > 0 { 1 } else { 0 };
for block_number in start_height..=current_height {
for block_number in 1..=current_height {
let header = load_block_header(block_number).await?;
let miner = header.unmined_block.miner;
let entry = mined_counts.entry(miner).or_insert(0);
// Keep the rebuilt value under the same cap as live increments.
if *entry < 250 {
*entry += 1;
}
increment_count(&mut mined_counts, &miner);
}
{

View File

@ -11,7 +11,7 @@ use crate::records::ip_score::score::update_ip_score;
use crate::records::memory::connections::CONNECTIONS;
use crate::records::memory::network_mapping::enums::NodeEditType;
use crate::records::memory::network_mapping::structs::{
AddAddressParams, DeleteAddressParams, MonitorAddressParams, SignedMonitorEdit, SignedNodeEdit,
AddAddressParams, MonitorAddressParams, SignedMonitorEdit, SignedNodeEdit,
NODE_RECORD_FIXED_BYTES,
};
use crate::records::memory::response_channels::Command;
@ -61,7 +61,6 @@ impl NodeInfo {
}
mod add;
mod delete;
pub mod enums;
mod mined_counts;
pub(crate) mod monitor;

View File

@ -56,17 +56,6 @@ pub struct AddAddressParams {
pub connections_key: String,
}
// DeleteAddressParams groups the shared context needed to remove a node from the network map.
#[derive(Clone)]
pub struct DeleteAddressParams {
pub map: Arc<Mutex<Command>>,
pub edit: SignedNodeEdit,
pub remote_ip: String,
pub db: Db,
pub wallet: Arc<Wallet>,
pub connections_key: String,
}
#[derive(Clone)]
pub struct MonitorAddressParams {
pub map: Arc<Mutex<Command>>,

View File

@ -40,7 +40,6 @@ pub const RPC_VALIDATE_MESSAGE: u8 = 25;
pub const RPC_BLOCK_IP: u8 = 26;
pub const RPC_UNBLOCK_IP: u8 = 27;
pub const RPC_ADD_NETWORK_NODE: u8 = 28;
pub const RPC_DELETE_NETWORK_NODE: u8 = 29;
pub const RPC_REQUEST_NODE_LIST: u8 = 30;
pub const RPC_TOKEN_LIST: u8 = 31;
pub const RPC_NFT_LIST: u8 = 32;

View File

@ -1,7 +1,9 @@
use crate::records::memory::mempool::latest_pending_txids_by_address;
use crate::records::record_chain::wallet_tx_index::WALLET_TX_INDEX_TREE;
use crate::rpc::responses::RpcResponse;
use crate::sled::Db;
use crate::wallets::structures::Wallet;
use std::collections::HashSet;
use std::ops::Bound;
const TXID_LENGTH: usize = 32;
@ -91,6 +93,10 @@ pub async fn latest(address_bytes: Vec<u8>, limit: u32, db: &Db) -> RpcResponse
return RpcResponse::Binary(Vec::new());
}
let Some(address) = Wallet::bytes_to_short_address(&address_bytes) else {
return RpcResponse::Binary(b"error: Invalid wallet address bytes".to_vec());
};
let tree = match db.open_tree(WALLET_TX_INDEX_TREE) {
Ok(tree) => tree,
Err(err) => {
@ -103,6 +109,19 @@ pub async fn latest(address_bytes: Vec<u8>, limit: u32, db: &Db) -> RpcResponse
let (start, end) = history_bounds(&address_bytes);
let mut found = 0usize;
let mut txids = Vec::with_capacity(limit * TXID_LENGTH);
let mut seen = HashSet::new();
for txid in latest_pending_txids_by_address(db, &address, limit).await {
if txid.len() != TXID_LENGTH || !seen.insert(txid.clone()) {
continue;
}
txids.extend_from_slice(&txid);
found += 1;
if found >= limit {
return RpcResponse::Binary(txids);
}
}
for entry in tree.range((start, end)).rev() {
let (_key, value) = match entry {
@ -118,7 +137,12 @@ pub async fn latest(address_bytes: Vec<u8>, limit: u32, db: &Db) -> RpcResponse
continue;
}
txids.extend_from_slice(&value);
let txid = value.to_vec();
if !seen.insert(txid.clone()) {
continue;
}
txids.extend_from_slice(&txid);
found += 1;
if found >= limit {
break;

View File

@ -13,7 +13,6 @@ pub mod block_headers;
pub mod block_height;
pub mod block_peer_ip;
pub mod contract;
pub mod delete_network_node;
pub mod difficulty;
pub mod largest_tx_fee;
pub mod latest_block;

View File

@ -2,6 +2,7 @@ use crate::blocks::block::VRF_BLOCK_BYTES;
use crate::common::binary_conversions::binary_to_string;
use crate::common::network_paths_and_settings::block_extension_and_paths;
use crate::io;
use crate::records::memory::mempool::pending_transaction_by_txid;
use crate::rpc::command_maps;
use crate::rpc::responses::RpcResponse;
use crate::sled::Db;
@ -32,7 +33,7 @@ pub async fn request_transaction_by_txid(db: &Db, txid: Vec<u8>) -> RpcResponse
pub async fn request_transaction_by_txid_with_block(db: &Db, txid: Vec<u8>) -> RpcResponse {
// Some callers need the block number alongside the raw transaction
// bytes, so this variant prefixes the payload with the block height.
match lookup_transaction_location(db, txid).await {
match lookup_transaction_location(db, txid.clone()).await {
Ok((block, position, block_filename)) => {
let bytes = calculate_offset(&block_filename, position).await;
match bytes {
@ -48,7 +49,16 @@ pub async fn request_transaction_by_txid_with_block(db: &Db, txid: Vec<u8>) -> R
}
}
}
Err(msg) => RpcResponse::Binary(msg.into_bytes()),
Err(msg) => {
if let Some(bytes) = pending_transaction_by_txid(&txid).await {
let mut response = Vec::with_capacity(4 + bytes.len());
response.extend_from_slice(&0u32.to_le_bytes());
response.extend_from_slice(&bytes);
RpcResponse::Binary(response)
} else {
RpcResponse::Binary(msg.into_bytes())
}
}
}
}

View File

@ -567,20 +567,6 @@ pub async fn start_loop(
.send(&stream_locked, Some(&connections_key), uid)
.await;
}
29 => {
// delete a new network node
let (uid, result) = commands::delete_network_node::delete_network_node(
&connections_key,
stream_locked.clone(),
&db,
wallet.clone(),
map.clone(),
)
.await?;
result
.send(&stream_locked, Some(&connections_key), uid)
.await;
}
30 => {
// request node list
let (uid, _) = read_bytes_from_stream::read_uid_from_stream(