diff --git a/src/common/check_genesis.rs b/src/common/check_genesis.rs index bca9e18..2e55f4a 100644 --- a/src/common/check_genesis.rs +++ b/src/common/check_genesis.rs @@ -1,24 +1,24 @@ -use crate::common::network_paths_and_settings::block_extension_and_paths; -use crate::metadata; -use crate::PathBuf; - -// Check whether the local chain already has the active network's genesis block. -pub async fn genesis_checkup() -> bool { - // Resolve the active network suffix and block directory from the - // shared path helper so mainnet/testnet never duplicate path logic. - let ( - _network_name, - _padded_base_coin, - suffix, - _torrent_path, - _wallet_path, - block_path, - _db_path, - _balance_path, - _log_path, - ) = block_extension_and_paths(); - - // Genesis is always block zero using the current network's block suffix. - let genesis_location = PathBuf::from(block_path).join(format!("0.{suffix}")); - (metadata(genesis_location).await).is_ok() -} +use crate::common::network_paths_and_settings::block_extension_and_paths; +use crate::metadata; +use crate::PathBuf; + +// Check whether the local chain already has the active network's genesis block. +pub async fn genesis_checkup() -> bool { + // Resolve the active network suffix and block directory from the + // shared path helper so mainnet/testnet never duplicate path logic. + let ( + _network_name, + _padded_base_coin, + suffix, + _torrent_path, + _wallet_path, + block_path, + _db_path, + _balance_path, + _log_path, + ) = block_extension_and_paths(); + + // Genesis is always block zero using the current network's block suffix. + let genesis_location = PathBuf::from(block_path).join(format!("0.{suffix}")); + (metadata(genesis_location).await).is_ok() +} diff --git a/src/orphans/add_genesis.rs b/src/orphans/add_genesis.rs index 05a1eaa..4489e0c 100644 --- a/src/orphans/add_genesis.rs +++ b/src/orphans/add_genesis.rs @@ -39,9 +39,18 @@ pub async fn create_genesis_block( return; } }; - handle_response_and_save_torrent(0, &db, torrent, wallet, map.clone(), false, false) - .await - .ok(); + handle_response_and_save_torrent( + 0, + &db, + torrent, + wallet, + map.clone(), + false, + true, + false, + ) + .await + .ok(); } } } diff --git a/src/orphans/get_path_names.rs b/src/orphans/get_path_names.rs index 4880f98..a2c111c 100644 --- a/src/orphans/get_path_names.rs +++ b/src/orphans/get_path_names.rs @@ -1,28 +1,28 @@ -use crate::common::network_paths_and_settings::block_extension_and_paths; -use crate::PathBuf; - -pub async fn get_file_names(start_height: u32) -> (String, String) { - // build the canonical block and torrent filenames - // for the height currently being undone - let ( - _network_name, - _padded_base_coin, - block_ext, - torrent_path, - _wallet_path, - block_path, - _db_path, - _balance_path, - _log_path, - ) = block_extension_and_paths(); - - let torrent_name = PathBuf::from(torrent_path) - .join(format!("{start_height}.torrent")) - .to_string_lossy() - .into_owned(); - let block_name = PathBuf::from(block_path) - .join(format!("{start_height}.{block_ext}")) - .to_string_lossy() - .into_owned(); - (torrent_name, block_name) -} +use crate::common::network_paths_and_settings::block_extension_and_paths; +use crate::PathBuf; + +pub async fn get_file_names(start_height: u32) -> (String, String) { + // build the canonical block and torrent filenames + // for the height currently being undone + let ( + _network_name, + _padded_base_coin, + block_ext, + torrent_path, + _wallet_path, + block_path, + _db_path, + _balance_path, + _log_path, + ) = block_extension_and_paths(); + + let torrent_name = PathBuf::from(torrent_path) + .join(format!("{start_height}.torrent")) + .to_string_lossy() + .into_owned(); + let block_name = PathBuf::from(block_path) + .join(format!("{start_height}.{block_ext}")) + .to_string_lossy() + .into_owned(); + (torrent_name, block_name) +} diff --git a/src/orphans/orphan_checkup.rs b/src/orphans/orphan_checkup.rs index 46b26ad..996d161 100644 --- a/src/orphans/orphan_checkup.rs +++ b/src/orphans/orphan_checkup.rs @@ -116,6 +116,7 @@ async fn candidate_attaches_before_rollback( block_number: height, allow_during_reorg: true, allow_historical: true, + allow_startup_peers: params.node_syncing, db: params.db.clone(), verification_service: std::sync::Arc::new(verification_service), map: params.map.clone(), diff --git a/src/orphans/save_blocks.rs b/src/orphans/save_blocks.rs index d0e99cd..039cc6b 100644 --- a/src/orphans/save_blocks.rs +++ b/src/orphans/save_blocks.rs @@ -75,6 +75,7 @@ pub async fn save_new_blocks( wallet.clone(), params.map.clone(), true, + params.node_syncing, true, ) .await @@ -186,6 +187,7 @@ pub async fn save_new_blocks( wallet.clone(), params.map.clone(), true, + params.node_syncing, true, ) .await?; diff --git a/src/orphans/sync_check.rs b/src/orphans/sync_check.rs index e1ffa51..3c9d445 100644 --- a/src/orphans/sync_check.rs +++ b/src/orphans/sync_check.rs @@ -126,6 +126,7 @@ async fn replay_staged_torrents( wallet.clone(), params.map.clone(), true, + params.node_syncing, true, ) .await diff --git a/src/orphans/undo_transactions/undo_borrower.rs b/src/orphans/undo_transactions/undo_borrower.rs index b7538b4..eec5f0f 100644 --- a/src/orphans/undo_transactions/undo_borrower.rs +++ b/src/orphans/undo_transactions/undo_borrower.rs @@ -1,105 +1,105 @@ -use crate::blocks::loan_payment::ContractPaymentTransaction; -use crate::blocks::loans::LoanContractTransaction; +use crate::blocks::loan_payment::ContractPaymentTransaction; +use crate::blocks::loans::LoanContractTransaction; use crate::common::network_paths_and_settings::block_extension_and_paths; use crate::decode; use crate::records::balance_sheet::operations::balance_sheet_operation_with_db; -use crate::records::record_chain::add_payments_db::remove_payment; -use crate::records::record_chain::nft_provenance::remove_nft_history_entry; -use crate::rpc::commands::transaction_by_txid::request_transaction_by_txid; -use crate::rpc::responses::RpcResponse; -use crate::sled::Db; - -pub async fn undo_borrower_transaction( - transaction: ContractPaymentTransaction, - mining_receiver: &str, - db: &Db, -) -> Result<(), String> { - // restore balances and database state for a contract payment - // that is being removed during orphan rollback - let operand_subtraction = "subtraction"; - let operand_addition = "addition"; - let ( - _network_name, - _padded_base_coin, - type_str, - _torrentpath, - _wallet_path, - _blockpath, - _db_path, - _balance_path, - _log_path, - ) = block_extension_and_paths(); - - // reload the original loan contract so the rollback uses the - // same lender and asset information the payment was based on - let contract_hash = decode(&transaction.unsigned_contract_payment.contract_hash) - .map_err(|e| format!("Error decoding contract hash: {e}"))?; - let contract = request_transaction_by_txid(db, contract_hash.clone()).await; - - let loan_txtype = 7; - let loan_tx = match contract { - RpcResponse::Binary(contract_bytes) => { - if contract_bytes.is_empty() { - return Err("Invalid loan contract: empty transaction bytes".to_string()); - } - if contract_bytes[0] != loan_txtype { - return Err( - "Invalid loan contract: referenced transaction is not a loan contract" - .to_string(), - ); - } - LoanContractTransaction::from_bytes(loan_txtype, &contract_bytes[1..]) - .await - .map_err(|e| e.to_string())? - } - }; - - let lender = loan_tx.unsigned_loan_contract.lender; - let loan_coin = loan_tx.unsigned_loan_contract.loan_coin; - let borrower = &transaction.unsigned_contract_payment.address; - let payback_amount = transaction.unsigned_contract_payment.payback_amount; - let tip = transaction.unsigned_contract_payment.tip; - let txfee = transaction.unsigned_contract_payment.txfee; - - // reverse the fee, tip, and repayment movements that were - // applied when the borrower payment was originally saved - let _ = - balance_sheet_operation_with_db(db, mining_receiver, txfee, &type_str, operand_subtraction); - let _ = balance_sheet_operation_with_db(db, borrower, txfee, &type_str, operand_addition); - let _ = - balance_sheet_operation_with_db(db, mining_receiver, tip, &loan_coin, operand_subtraction); - let _ = balance_sheet_operation_with_db(db, borrower, tip, &loan_coin, operand_addition); - let _ = balance_sheet_operation_with_db( - db, - &lender, - payback_amount, - &loan_coin, - operand_subtraction, - ); - let _ = - balance_sheet_operation_with_db(db, borrower, payback_amount, &loan_coin, operand_addition); - - // Remove the payment transaction lookup from the txid tree. - let txid_tree = db - .open_tree("txid") - .map_err(|e| format!("Failed to open txid tree: {e}"))?; - let tx_hash = transaction.unsigned_contract_payment.hash().await; - txid_tree - .remove(decode(&tx_hash).map_err(|e| format!("Error decoding txid: {e}"))?) - .map_err(|e| format!("Failed to remove borrower txid: {e}"))?; - - // Loan payments involving NFTs also add a provenance entry for the loan - // asset, so remove it if this loan coin is tracked as an NFT. - let nft_tree = db - .open_tree("nfts") - .map_err(|e| format!("Failed to open nfts tree: {e}"))?; - let tx_hash_bytes = decode(&tx_hash).map_err(|e| format!("Error decoding txid: {e}"))?; - if nft_tree.contains_key(loan_coin.as_bytes()).unwrap_or(false) { - let _ = remove_nft_history_entry(db, &loan_coin, &tx_hash_bytes); - } - - // The aggregate payment record is reduced by the payment being undone. - let _ = remove_payment(db, contract_hash, payback_amount); - +use crate::records::record_chain::add_payments_db::remove_payment; +use crate::records::record_chain::nft_provenance::remove_nft_history_entry; +use crate::rpc::commands::transaction_by_txid::request_transaction_by_txid; +use crate::rpc::responses::RpcResponse; +use crate::sled::Db; + +pub async fn undo_borrower_transaction( + transaction: ContractPaymentTransaction, + mining_receiver: &str, + db: &Db, +) -> Result<(), String> { + // restore balances and database state for a contract payment + // that is being removed during orphan rollback + let operand_subtraction = "subtraction"; + let operand_addition = "addition"; + let ( + _network_name, + _padded_base_coin, + type_str, + _torrentpath, + _wallet_path, + _blockpath, + _db_path, + _balance_path, + _log_path, + ) = block_extension_and_paths(); + + // reload the original loan contract so the rollback uses the + // same lender and asset information the payment was based on + let contract_hash = decode(&transaction.unsigned_contract_payment.contract_hash) + .map_err(|e| format!("Error decoding contract hash: {e}"))?; + let contract = request_transaction_by_txid(db, contract_hash.clone()).await; + + let loan_txtype = 7; + let loan_tx = match contract { + RpcResponse::Binary(contract_bytes) => { + if contract_bytes.is_empty() { + return Err("Invalid loan contract: empty transaction bytes".to_string()); + } + if contract_bytes[0] != loan_txtype { + return Err( + "Invalid loan contract: referenced transaction is not a loan contract" + .to_string(), + ); + } + LoanContractTransaction::from_bytes(loan_txtype, &contract_bytes[1..]) + .await + .map_err(|e| e.to_string())? + } + }; + + let lender = loan_tx.unsigned_loan_contract.lender; + let loan_coin = loan_tx.unsigned_loan_contract.loan_coin; + let borrower = &transaction.unsigned_contract_payment.address; + let payback_amount = transaction.unsigned_contract_payment.payback_amount; + let tip = transaction.unsigned_contract_payment.tip; + let txfee = transaction.unsigned_contract_payment.txfee; + + // reverse the fee, tip, and repayment movements that were + // applied when the borrower payment was originally saved + let _ = + balance_sheet_operation_with_db(db, mining_receiver, txfee, &type_str, operand_subtraction); + let _ = balance_sheet_operation_with_db(db, borrower, txfee, &type_str, operand_addition); + let _ = + balance_sheet_operation_with_db(db, mining_receiver, tip, &loan_coin, operand_subtraction); + let _ = balance_sheet_operation_with_db(db, borrower, tip, &loan_coin, operand_addition); + let _ = balance_sheet_operation_with_db( + db, + &lender, + payback_amount, + &loan_coin, + operand_subtraction, + ); + let _ = + balance_sheet_operation_with_db(db, borrower, payback_amount, &loan_coin, operand_addition); + + // Remove the payment transaction lookup from the txid tree. + let txid_tree = db + .open_tree("txid") + .map_err(|e| format!("Failed to open txid tree: {e}"))?; + let tx_hash = transaction.unsigned_contract_payment.hash().await; + txid_tree + .remove(decode(&tx_hash).map_err(|e| format!("Error decoding txid: {e}"))?) + .map_err(|e| format!("Failed to remove borrower txid: {e}"))?; + + // Loan payments involving NFTs also add a provenance entry for the loan + // asset, so remove it if this loan coin is tracked as an NFT. + let nft_tree = db + .open_tree("nfts") + .map_err(|e| format!("Failed to open nfts tree: {e}"))?; + let tx_hash_bytes = decode(&tx_hash).map_err(|e| format!("Error decoding txid: {e}"))?; + if nft_tree.contains_key(loan_coin.as_bytes()).unwrap_or(false) { + let _ = remove_nft_history_entry(db, &loan_coin, &tx_hash_bytes); + } + + // The aggregate payment record is reduced by the payment being undone. + let _ = remove_payment(db, contract_hash, payback_amount); + Ok(()) } diff --git a/src/orphans/undo_transactions/undo_collateral.rs b/src/orphans/undo_transactions/undo_collateral.rs index 7e11707..bb727ae 100644 --- a/src/orphans/undo_transactions/undo_collateral.rs +++ b/src/orphans/undo_transactions/undo_collateral.rs @@ -1,108 +1,108 @@ -use crate::blocks::collateral::CollateralClaimTransaction; -use crate::blocks::loans::LoanContractTransaction; +use crate::blocks::collateral::CollateralClaimTransaction; +use crate::blocks::loans::LoanContractTransaction; use crate::common::network_paths_and_settings::block_extension_and_paths; use crate::decode; use crate::records::balance_sheet::operations::balance_sheet_operation_with_db; -use crate::records::record_chain::nft_provenance::remove_nft_history_entry; -use crate::rpc::commands::transaction_by_txid::request_transaction_by_txid; -use crate::rpc::responses::RpcResponse; -use crate::sled::Db; - -pub async fn undo_collateral_transaction( - transaction: CollateralClaimTransaction, - mining_receiver: &str, - db: &Db, -) -> Result<(), String> { - // restore balances and contract state for a collateral - // claim that is being removed during orphan rollback - let operand_subtraction = "subtraction"; - let operand_addition = "addition"; - let ( - _network_name, - _padded_base_coin, - type_str, - _torrentpath, - _wallet_path, - _blockpath, - _db_path, - _balance_path, - _log_path, - ) = block_extension_and_paths(); - - // reload the original loan contract so the collateral - // asset and amount can be restored correctly - let contract_hash = decode(&transaction.unsigned_collateral_claim.contract_hash) - .map_err(|e| format!("Error decoding contract hash: {e}"))?; - let contract = request_transaction_by_txid(db, contract_hash.clone()).await; - - let loan_txtype = 7; - let loan_tx = match contract { - RpcResponse::Binary(contract_bytes) => { - if contract_bytes.is_empty() { - return Err("Invalid loan contract: empty transaction bytes".to_string()); - } - if contract_bytes[0] != loan_txtype { - return Err( - "Invalid loan contract: referenced transaction is not a loan contract" - .to_string(), - ); - } - LoanContractTransaction::from_bytes(loan_txtype, &contract_bytes[1..]) - .await - .map_err(|e| e.to_string())? - } - }; - - let collateral = loan_tx.unsigned_loan_contract.collateral; - let collateral_amount = loan_tx.unsigned_loan_contract.collateral_amount; - let collateral_holding = format!( - "collateral_{}", - transaction.unsigned_collateral_claim.contract_hash - ); - let claimer = &transaction.unsigned_collateral_claim.address; - let txfee = transaction.unsigned_collateral_claim.txfee; - - // reverse the fee and move the collateral back into the - // contract holding wallet until the claim exists again - let _ = - balance_sheet_operation_with_db(db, mining_receiver, txfee, &type_str, operand_subtraction); - let _ = balance_sheet_operation_with_db(db, claimer, txfee, &type_str, operand_addition); - let _ = balance_sheet_operation_with_db( - db, - claimer, - collateral_amount, - &collateral, - operand_subtraction, - ); - let _ = balance_sheet_operation_with_db( - db, - &collateral_holding, - collateral_amount, - &collateral, - operand_addition, - ); - - // Remove the collateral-claim transaction lookup from the txid tree. - let txid_tree = db.open_tree("txid").unwrap(); - let tx_hash = transaction.unsigned_collateral_claim.hash().await; - txid_tree - .remove(decode(&tx_hash).map_err(|e| format!("Error decoding txid: {e}"))?) - .unwrap(); - - // NFT collateral claims write provenance for the collateral asset. - let nft_tree = db.open_tree("nfts").unwrap(); - let tx_hash_bytes = decode(&tx_hash).map_err(|e| format!("Error decoding txid: {e}"))?; - if nft_tree - .contains_key(collateral.as_bytes()) - .unwrap_or(false) - { - let _ = remove_nft_history_entry(db, &collateral, &tx_hash_bytes); - } - - // Mark the loan contract active again because the collateral claim no - // longer exists after rollback. - let loan_tree = db.open_tree("loan").unwrap(); - loan_tree.insert(contract_hash, "true".as_bytes()).unwrap(); - +use crate::records::record_chain::nft_provenance::remove_nft_history_entry; +use crate::rpc::commands::transaction_by_txid::request_transaction_by_txid; +use crate::rpc::responses::RpcResponse; +use crate::sled::Db; + +pub async fn undo_collateral_transaction( + transaction: CollateralClaimTransaction, + mining_receiver: &str, + db: &Db, +) -> Result<(), String> { + // restore balances and contract state for a collateral + // claim that is being removed during orphan rollback + let operand_subtraction = "subtraction"; + let operand_addition = "addition"; + let ( + _network_name, + _padded_base_coin, + type_str, + _torrentpath, + _wallet_path, + _blockpath, + _db_path, + _balance_path, + _log_path, + ) = block_extension_and_paths(); + + // reload the original loan contract so the collateral + // asset and amount can be restored correctly + let contract_hash = decode(&transaction.unsigned_collateral_claim.contract_hash) + .map_err(|e| format!("Error decoding contract hash: {e}"))?; + let contract = request_transaction_by_txid(db, contract_hash.clone()).await; + + let loan_txtype = 7; + let loan_tx = match contract { + RpcResponse::Binary(contract_bytes) => { + if contract_bytes.is_empty() { + return Err("Invalid loan contract: empty transaction bytes".to_string()); + } + if contract_bytes[0] != loan_txtype { + return Err( + "Invalid loan contract: referenced transaction is not a loan contract" + .to_string(), + ); + } + LoanContractTransaction::from_bytes(loan_txtype, &contract_bytes[1..]) + .await + .map_err(|e| e.to_string())? + } + }; + + let collateral = loan_tx.unsigned_loan_contract.collateral; + let collateral_amount = loan_tx.unsigned_loan_contract.collateral_amount; + let collateral_holding = format!( + "collateral_{}", + transaction.unsigned_collateral_claim.contract_hash + ); + let claimer = &transaction.unsigned_collateral_claim.address; + let txfee = transaction.unsigned_collateral_claim.txfee; + + // reverse the fee and move the collateral back into the + // contract holding wallet until the claim exists again + let _ = + balance_sheet_operation_with_db(db, mining_receiver, txfee, &type_str, operand_subtraction); + let _ = balance_sheet_operation_with_db(db, claimer, txfee, &type_str, operand_addition); + let _ = balance_sheet_operation_with_db( + db, + claimer, + collateral_amount, + &collateral, + operand_subtraction, + ); + let _ = balance_sheet_operation_with_db( + db, + &collateral_holding, + collateral_amount, + &collateral, + operand_addition, + ); + + // Remove the collateral-claim transaction lookup from the txid tree. + let txid_tree = db.open_tree("txid").unwrap(); + let tx_hash = transaction.unsigned_collateral_claim.hash().await; + txid_tree + .remove(decode(&tx_hash).map_err(|e| format!("Error decoding txid: {e}"))?) + .unwrap(); + + // NFT collateral claims write provenance for the collateral asset. + let nft_tree = db.open_tree("nfts").unwrap(); + let tx_hash_bytes = decode(&tx_hash).map_err(|e| format!("Error decoding txid: {e}"))?; + if nft_tree + .contains_key(collateral.as_bytes()) + .unwrap_or(false) + { + let _ = remove_nft_history_entry(db, &collateral, &tx_hash_bytes); + } + + // Mark the loan contract active again because the collateral claim no + // longer exists after rollback. + let loan_tree = db.open_tree("loan").unwrap(); + loan_tree.insert(contract_hash, "true".as_bytes()).unwrap(); + Ok(()) } diff --git a/src/records/balance_sheet/pathing.rs b/src/records/balance_sheet/pathing.rs index 902b033..f53e07f 100644 --- a/src/records/balance_sheet/pathing.rs +++ b/src/records/balance_sheet/pathing.rs @@ -1,92 +1,92 @@ -use crate::common::network_paths_and_settings::block_extension_and_paths; -use crate::records::balance_sheet::tokens_to_lower::strip_spaces_and_lowercase; -use crate::wallets::structures::Wallet; -use crate::PathBuf; - -pub fn network_name() -> &'static str { - let ( - network_name, - _padded_base_coin, - _suffix, - _torrent_path, - _wallet_path, - _block_path, - _db_path, - _balance_path, - _log_path, - ) = block_extension_and_paths(); - network_name -} - -pub fn balance_root_path() -> PathBuf { - // The balance root is the configured balance-sheet directory scoped - // to the active network name. - let ( - _network_name, - _padded_base_coin, - _suffix, - _torrent_path, - _wallet_path, - _block_path, - _db_path, - balance_path, - _log_path, - ) = block_extension_and_paths(); - PathBuf::from(balance_path) -} - -pub fn canonical_balance_address(address: &str) -> Option { - // Balance storage is normalized to the deterministic short address, - // regardless of whether callers still pass a long or short address. - Wallet::normalize_to_short_address(address) -} - -pub fn address_root_path(address: &str) -> Option { - let canonical_address = canonical_balance_address(address)?; - Some(balance_root_path().join(canonical_address)) -} - -pub fn balance_asset_segments(coin_type: &str) -> Vec { - // NFT balances use a nested path of asset name plus item/series - // number, while normal coins and tokens stay as a single segment. - let coin = strip_spaces_and_lowercase(coin_type); - - if let Some((series_name, item_number)) = coin.rsplit_once('_') { - if !series_name.is_empty() - && !item_number.is_empty() - && item_number.chars().all(|c| c.is_ascii_digit()) - { - return vec![series_name.to_string(), item_number.to_string()]; - } - } - - vec![coin] -} - -pub fn balance_file_path(address: &str, coin_type: &str) -> PathBuf { - // Build the canonical wallet balance path for an address and asset - // using the current hierarchical balance-sheet layout. - let mut path = address_root_path(address).unwrap_or_else(balance_root_path); - for segment in balance_asset_segments(coin_type) { - path.push(segment); - } - path.push("wallet.bal"); - path -} - -pub fn asset_name_from_relative_path(relative_path: &std::path::Path) -> Option { - // Convert a relative balance-sheet file path back into the logical - // asset name used by wallet and balance queries. - let segments: Vec = relative_path - .iter() - .map(|part| part.to_string_lossy().into_owned()) - .collect(); - - match segments.as_slice() { - [token_dir, wallet_file] if wallet_file == "wallet.bal" => Some(token_dir.clone()), - [token_dir, item_number, wallet_file] if wallet_file == "wallet.bal" => { - Some(format!("{token_dir}_{item_number}")) - } - _ => None, - } -} +use crate::common::network_paths_and_settings::block_extension_and_paths; +use crate::records::balance_sheet::tokens_to_lower::strip_spaces_and_lowercase; +use crate::wallets::structures::Wallet; +use crate::PathBuf; + +pub fn network_name() -> &'static str { + let ( + network_name, + _padded_base_coin, + _suffix, + _torrent_path, + _wallet_path, + _block_path, + _db_path, + _balance_path, + _log_path, + ) = block_extension_and_paths(); + network_name +} + +pub fn balance_root_path() -> PathBuf { + // The balance root is the configured balance-sheet directory scoped + // to the active network name. + let ( + _network_name, + _padded_base_coin, + _suffix, + _torrent_path, + _wallet_path, + _block_path, + _db_path, + balance_path, + _log_path, + ) = block_extension_and_paths(); + PathBuf::from(balance_path) +} + +pub fn canonical_balance_address(address: &str) -> Option { + // Balance storage is normalized to the deterministic short address, + // regardless of whether callers still pass a long or short address. + Wallet::normalize_to_short_address(address) +} + +pub fn address_root_path(address: &str) -> Option { + let canonical_address = canonical_balance_address(address)?; + Some(balance_root_path().join(canonical_address)) +} + +pub fn balance_asset_segments(coin_type: &str) -> Vec { + // NFT balances use a nested path of asset name plus item/series + // number, while normal coins and tokens stay as a single segment. + let coin = strip_spaces_and_lowercase(coin_type); + + if let Some((series_name, item_number)) = coin.rsplit_once('_') { + if !series_name.is_empty() + && !item_number.is_empty() + && item_number.chars().all(|c| c.is_ascii_digit()) + { + return vec![series_name.to_string(), item_number.to_string()]; + } + } + + vec![coin] +} + +pub fn balance_file_path(address: &str, coin_type: &str) -> PathBuf { + // Build the canonical wallet balance path for an address and asset + // using the current hierarchical balance-sheet layout. + let mut path = address_root_path(address).unwrap_or_else(balance_root_path); + for segment in balance_asset_segments(coin_type) { + path.push(segment); + } + path.push("wallet.bal"); + path +} + +pub fn asset_name_from_relative_path(relative_path: &std::path::Path) -> Option { + // Convert a relative balance-sheet file path back into the logical + // asset name used by wallet and balance queries. + let segments: Vec = relative_path + .iter() + .map(|part| part.to_string_lossy().into_owned()) + .collect(); + + match segments.as_slice() { + [token_dir, wallet_file] if wallet_file == "wallet.bal" => Some(token_dir.clone()), + [token_dir, item_number, wallet_file] if wallet_file == "wallet.bal" => { + Some(format!("{token_dir}_{item_number}")) + } + _ => None, + } +} diff --git a/src/records/unpack_block/load_by_block_number.rs b/src/records/unpack_block/load_by_block_number.rs index 1bd6239..ea15820 100644 --- a/src/records/unpack_block/load_by_block_number.rs +++ b/src/records/unpack_block/load_by_block_number.rs @@ -1,225 +1,225 @@ -use crate::blocks::block::{Block, VrfBlock, VRF_BLOCK_BYTES}; -use crate::blocks::burn::BurnTransaction; -use crate::blocks::collateral::CollateralClaimTransaction; -use crate::blocks::genesis::GenesisTransaction; -use crate::blocks::issue_token::IssueTokenTransaction; -use crate::blocks::loan_payment::ContractPaymentTransaction; -use crate::blocks::loans::LoanContractTransaction; -use crate::blocks::marketing::MarketingTransaction; -use crate::blocks::nft::CreateNftTransaction; -use crate::blocks::rewards::RewardsTransaction; -use crate::blocks::swap::SwapTransaction; -use crate::blocks::token::CreateTokenTransaction; -use crate::blocks::transfer::TransferTransaction; -use crate::blocks::vanity::VanityAddressTransaction; -use crate::common::network_paths_and_settings::block_extension_and_paths; -use crate::common::types::{ - Transaction, BORROWER_TYPE, BURN_TYPE, COLLATERAL_TYPE, CREATE_NFT_TYPE, CREATE_TOKEN_TYPE, - GENESIS_TYPE, ISSUE_TOKEN_TYPE, LENDER_TYPE, MARKETING_TYPE, REWARDS_TYPE, SWAP_TYPE, - TRANSFER_TYPE, VANITY_ADDRESS_TYPE, -}; -use crate::fs; -use crate::rpc::command_maps::get_bytes; -use crate::PathBuf; - -// The transaction body helpers keep the block parser aligned with the command map sizes. -fn transaction_body_len(txtype: u8) -> Result { - let total_len = get_bytes(txtype); - if total_len <= 1 { - return Err(format!("Unknown transaction type: {txtype}")); - } - - // get_bytes includes the transaction type byte; parser bodies start after - // that byte has already been consumed. - Ok(total_len - 1) -} - -fn transaction_body_slice( - binary_data: &[u8], - start: usize, - body_len: usize, -) -> Result<&[u8], String> { - // Slice with bounds checking so truncated block files fail cleanly instead - // of panicking during transaction parsing. - binary_data - .get(start..start + body_len) - .ok_or_else(|| format!("Truncated transaction body at offset {start}")) -} - -pub async fn load_block(block_number: u32) -> Result { - // Blocks are loaded from disk by height, then split back into the header and - // variable-length transaction payloads. - let ( - _network_name, - _padded_base_coin, - block_ext, - _torrent_path, - _wallet_path, - block_path, - _db_path, - _balance_path, - _log_path, - ) = block_extension_and_paths(); - let file_name = PathBuf::from(block_path) - .join(format!("{block_number}.{block_ext}")) - .to_string_lossy() - .into_owned(); - - // Load the full block because this path reconstructs both the header and - // every transaction for validation or inspection. +use crate::blocks::block::{Block, VrfBlock, VRF_BLOCK_BYTES}; +use crate::blocks::burn::BurnTransaction; +use crate::blocks::collateral::CollateralClaimTransaction; +use crate::blocks::genesis::GenesisTransaction; +use crate::blocks::issue_token::IssueTokenTransaction; +use crate::blocks::loan_payment::ContractPaymentTransaction; +use crate::blocks::loans::LoanContractTransaction; +use crate::blocks::marketing::MarketingTransaction; +use crate::blocks::nft::CreateNftTransaction; +use crate::blocks::rewards::RewardsTransaction; +use crate::blocks::swap::SwapTransaction; +use crate::blocks::token::CreateTokenTransaction; +use crate::blocks::transfer::TransferTransaction; +use crate::blocks::vanity::VanityAddressTransaction; +use crate::common::network_paths_and_settings::block_extension_and_paths; +use crate::common::types::{ + Transaction, BORROWER_TYPE, BURN_TYPE, COLLATERAL_TYPE, CREATE_NFT_TYPE, CREATE_TOKEN_TYPE, + GENESIS_TYPE, ISSUE_TOKEN_TYPE, LENDER_TYPE, MARKETING_TYPE, REWARDS_TYPE, SWAP_TYPE, + TRANSFER_TYPE, VANITY_ADDRESS_TYPE, +}; +use crate::fs; +use crate::rpc::command_maps::get_bytes; +use crate::PathBuf; + +// The transaction body helpers keep the block parser aligned with the command map sizes. +fn transaction_body_len(txtype: u8) -> Result { + let total_len = get_bytes(txtype); + if total_len <= 1 { + return Err(format!("Unknown transaction type: {txtype}")); + } + + // get_bytes includes the transaction type byte; parser bodies start after + // that byte has already been consumed. + Ok(total_len - 1) +} + +fn transaction_body_slice( + binary_data: &[u8], + start: usize, + body_len: usize, +) -> Result<&[u8], String> { + // Slice with bounds checking so truncated block files fail cleanly instead + // of panicking during transaction parsing. + binary_data + .get(start..start + body_len) + .ok_or_else(|| format!("Truncated transaction body at offset {start}")) +} + +pub async fn load_block(block_number: u32) -> Result { + // Blocks are loaded from disk by height, then split back into the header and + // variable-length transaction payloads. + let ( + _network_name, + _padded_base_coin, + block_ext, + _torrent_path, + _wallet_path, + block_path, + _db_path, + _balance_path, + _log_path, + ) = block_extension_and_paths(); + let file_name = PathBuf::from(block_path) + .join(format!("{block_number}.{block_ext}")) + .to_string_lossy() + .into_owned(); + + // Load the full block because this path reconstructs both the header and + // every transaction for validation or inspection. let binary_data = match fs::read(&file_name) { Ok(data) => data, Err(err) => { return Err(format!("Unable to read block {block_number}: {err:?}")); } }; - - if binary_data.len() < VRF_BLOCK_BYTES { - return Err("Unable to load block: binary data shorter than VrfBlock header".to_string()); - } - - let vrf_block = VrfBlock::from_bytes(&binary_data[0..VRF_BLOCK_BYTES]) - .await - .map_err(|e| e.to_string())?; - let mut i = VRF_BLOCK_BYTES; - let mut transactions: Vec = Vec::new(); - - while i < binary_data.len() { - // Each stored transaction begins with its type byte, followed by the fixed-size - // body for that transaction family. - let txtype = binary_data[i]; - i += 1; - let body_len = transaction_body_len(txtype)?; - let body = transaction_body_slice(&binary_data, i, body_len)?; - let transaction = match txtype { - GENESIS_TYPE => { - let genesis = Transaction::Genesis( - GenesisTransaction::from_bytes(txtype, body) - .await - .map_err(|e| e.to_string())?, - ); - i += body_len; - genesis - } - REWARDS_TYPE => { - let rewards = Transaction::Rewards( - RewardsTransaction::from_bytes(txtype, body) - .await - .map_err(|e| e.to_string())?, - ); - i += body_len; - rewards - } - TRANSFER_TYPE => { - let transfer = Transaction::Transfer( - TransferTransaction::from_bytes(txtype, body) - .await - .map_err(|e| e.to_string())?, - ); - i += body_len; - transfer - } - BURN_TYPE => { - let burn = Transaction::Burn( - BurnTransaction::from_bytes(txtype, body) - .await - .map_err(|e| e.to_string())?, - ); - i += body_len; - burn - } - CREATE_TOKEN_TYPE => { - let create_token = Transaction::Token( - CreateTokenTransaction::from_bytes(txtype, body) - .await - .map_err(|e| e.to_string())?, - ); - i += body_len; - create_token - } - ISSUE_TOKEN_TYPE => { - let issue_token = Transaction::IssueToken( - IssueTokenTransaction::from_bytes(txtype, body) - .await - .map_err(|e| e.to_string())?, - ); - i += body_len; - issue_token - } - CREATE_NFT_TYPE => { - let create_nft = Transaction::Nft( - CreateNftTransaction::from_bytes(txtype, body) - .await - .map_err(|e| e.to_string())?, - ); - i += body_len; - create_nft - } - MARKETING_TYPE => { - let marketing = Transaction::Marketing( - MarketingTransaction::from_bytes(txtype, body) - .await - .map_err(|e| e.to_string())?, - ); - i += body_len; - marketing - } - SWAP_TYPE => { - let swap = Transaction::Swap( - SwapTransaction::from_bytes(txtype, body) - .await - .map_err(|e| e.to_string())?, - ); - i += body_len; - swap - } - LENDER_TYPE => { - let loan = Transaction::Lender( - LoanContractTransaction::from_bytes(txtype, body) - .await - .map_err(|e| e.to_string())?, - ); - i += body_len; - loan - } - BORROWER_TYPE => { - let payment = Transaction::Borrower( - ContractPaymentTransaction::from_bytes(txtype, body) - .await - .map_err(|e| e.to_string())?, - ); - i += body_len; - payment - } - COLLATERAL_TYPE => { - let collateral = Transaction::Collateral( - CollateralClaimTransaction::from_bytes(txtype, body) - .await - .map_err(|e| e.to_string())?, - ); - i += body_len; - collateral - } - VANITY_ADDRESS_TYPE => { - let vanity = Transaction::Vanity( - VanityAddressTransaction::from_bytes(txtype, body) - .await - .map_err(|e| e.to_string())?, - ); - i += body_len; - vanity - } - _ => { - return Err(format!("Unsupported transaction type: {txtype}")); - } - }; - transactions.push(transaction); - } - - let block = Block { - vrf_block, - transactions, - }; - - Ok(block) -} + + if binary_data.len() < VRF_BLOCK_BYTES { + return Err("Unable to load block: binary data shorter than VrfBlock header".to_string()); + } + + let vrf_block = VrfBlock::from_bytes(&binary_data[0..VRF_BLOCK_BYTES]) + .await + .map_err(|e| e.to_string())?; + let mut i = VRF_BLOCK_BYTES; + let mut transactions: Vec = Vec::new(); + + while i < binary_data.len() { + // Each stored transaction begins with its type byte, followed by the fixed-size + // body for that transaction family. + let txtype = binary_data[i]; + i += 1; + let body_len = transaction_body_len(txtype)?; + let body = transaction_body_slice(&binary_data, i, body_len)?; + let transaction = match txtype { + GENESIS_TYPE => { + let genesis = Transaction::Genesis( + GenesisTransaction::from_bytes(txtype, body) + .await + .map_err(|e| e.to_string())?, + ); + i += body_len; + genesis + } + REWARDS_TYPE => { + let rewards = Transaction::Rewards( + RewardsTransaction::from_bytes(txtype, body) + .await + .map_err(|e| e.to_string())?, + ); + i += body_len; + rewards + } + TRANSFER_TYPE => { + let transfer = Transaction::Transfer( + TransferTransaction::from_bytes(txtype, body) + .await + .map_err(|e| e.to_string())?, + ); + i += body_len; + transfer + } + BURN_TYPE => { + let burn = Transaction::Burn( + BurnTransaction::from_bytes(txtype, body) + .await + .map_err(|e| e.to_string())?, + ); + i += body_len; + burn + } + CREATE_TOKEN_TYPE => { + let create_token = Transaction::Token( + CreateTokenTransaction::from_bytes(txtype, body) + .await + .map_err(|e| e.to_string())?, + ); + i += body_len; + create_token + } + ISSUE_TOKEN_TYPE => { + let issue_token = Transaction::IssueToken( + IssueTokenTransaction::from_bytes(txtype, body) + .await + .map_err(|e| e.to_string())?, + ); + i += body_len; + issue_token + } + CREATE_NFT_TYPE => { + let create_nft = Transaction::Nft( + CreateNftTransaction::from_bytes(txtype, body) + .await + .map_err(|e| e.to_string())?, + ); + i += body_len; + create_nft + } + MARKETING_TYPE => { + let marketing = Transaction::Marketing( + MarketingTransaction::from_bytes(txtype, body) + .await + .map_err(|e| e.to_string())?, + ); + i += body_len; + marketing + } + SWAP_TYPE => { + let swap = Transaction::Swap( + SwapTransaction::from_bytes(txtype, body) + .await + .map_err(|e| e.to_string())?, + ); + i += body_len; + swap + } + LENDER_TYPE => { + let loan = Transaction::Lender( + LoanContractTransaction::from_bytes(txtype, body) + .await + .map_err(|e| e.to_string())?, + ); + i += body_len; + loan + } + BORROWER_TYPE => { + let payment = Transaction::Borrower( + ContractPaymentTransaction::from_bytes(txtype, body) + .await + .map_err(|e| e.to_string())?, + ); + i += body_len; + payment + } + COLLATERAL_TYPE => { + let collateral = Transaction::Collateral( + CollateralClaimTransaction::from_bytes(txtype, body) + .await + .map_err(|e| e.to_string())?, + ); + i += body_len; + collateral + } + VANITY_ADDRESS_TYPE => { + let vanity = Transaction::Vanity( + VanityAddressTransaction::from_bytes(txtype, body) + .await + .map_err(|e| e.to_string())?, + ); + i += body_len; + vanity + } + _ => { + return Err(format!("Unsupported transaction type: {txtype}")); + } + }; + transactions.push(transaction); + } + + let block = Block { + vrf_block, + transactions, + }; + + Ok(block) +} diff --git a/src/records/unpack_block/unpack_header.rs b/src/records/unpack_block/unpack_header.rs index 8aa2f35..e4247f9 100644 --- a/src/records/unpack_block/unpack_header.rs +++ b/src/records/unpack_block/unpack_header.rs @@ -1,27 +1,27 @@ -use crate::blocks::block::{VrfBlock, VRF_BLOCK_BYTES}; -use crate::common::network_paths_and_settings::block_extension_and_paths; -use crate::AsyncReadExt; -use crate::File; -use crate::PathBuf; - -pub async fn load_block_header(block_number: u32) -> Result { - // Header-only loads avoid reading the full block when only chain metadata - // is needed. - let ( - _network_name, - _padded_base_coin, - block_ext, - _torrent_path, - _wallet_path, - block_path, - _db_path, - _balance_path, - _log_path, - ) = block_extension_and_paths(); - let file_name = PathBuf::from(block_path) - .join(format!("{block_number}.{block_ext}")) - .to_string_lossy() - .into_owned(); +use crate::blocks::block::{VrfBlock, VRF_BLOCK_BYTES}; +use crate::common::network_paths_and_settings::block_extension_and_paths; +use crate::AsyncReadExt; +use crate::File; +use crate::PathBuf; + +pub async fn load_block_header(block_number: u32) -> Result { + // Header-only loads avoid reading the full block when only chain metadata + // is needed. + let ( + _network_name, + _padded_base_coin, + block_ext, + _torrent_path, + _wallet_path, + block_path, + _db_path, + _balance_path, + _log_path, + ) = block_extension_and_paths(); + let file_name = PathBuf::from(block_path) + .join(format!("{block_number}.{block_ext}")) + .to_string_lossy() + .into_owned(); let file = match File::open(&file_name).await { Ok(file) => file, Err(err) => { @@ -30,23 +30,23 @@ pub async fn load_block_header(block_number: u32) -> Result { )); } }; - - let mut binary_data = Vec::with_capacity(VRF_BLOCK_BYTES); - // Only read the fixed VrfBlock prefix from the block file. - if let Err(err) = file - .take(VRF_BLOCK_BYTES as u64) - .read_to_end(&mut binary_data) - .await + + let mut binary_data = Vec::with_capacity(VRF_BLOCK_BYTES); + // Only read the fixed VrfBlock prefix from the block file. + if let Err(err) = file + .take(VRF_BLOCK_BYTES as u64) + .read_to_end(&mut binary_data) + .await { return Err(format!( "Error reading block header for height {block_number}: {err:?}" )); } - - // The stored header format is the same VrfBlock prefix used at the - // beginning of every block file. - match VrfBlock::from_bytes(&binary_data).await { - Ok(block) => Ok(block), - Err(err) => Err(format!("Error parsing block: {err:?}")), - } -} + + // The stored header format is the same VrfBlock prefix used at the + // beginning of every block file. + match VrfBlock::from_bytes(&binary_data).await { + Ok(block) => Ok(block), + Err(err) => Err(format!("Error parsing block: {err:?}")), + } +} diff --git a/src/records/wallet_registry/helpers.rs b/src/records/wallet_registry/helpers.rs index 657137f..cfcb220 100644 --- a/src/records/wallet_registry/helpers.rs +++ b/src/records/wallet_registry/helpers.rs @@ -1,5 +1,5 @@ -use super::*; - +use super::*; + pub fn get_registered_pubkey(db: &Db, short_address: &[u8]) -> sled::Result>> { // The primary registry maps canonical short-address bytes to long public keys. let tree = db.open_tree(WALLET_REGISTRY_TREE)?; diff --git a/src/records/wallet_registry/mappings.rs b/src/records/wallet_registry/mappings.rs index c4b3c19..b1d0ddb 100644 --- a/src/records/wallet_registry/mappings.rs +++ b/src/records/wallet_registry/mappings.rs @@ -1,79 +1,79 @@ -use super::*; - -pub fn resolve_canonical_registered_short_address( - db: &Db, - address: &str, -) -> sled::Result> { - // Normalize long addresses, normal short addresses, and vanity-shaped - // addresses into the current-network short address format first. - let normalized = match Wallet::normalize_to_short_address(address) { - Some(address) => address, - None => return Ok(None), - }; - - // A direct registry hit means the input was already the canonical short - // address for a registered wallet. - let normalized_bytes = match Wallet::short_address_to_bytes(&normalized) { - Some(bytes) => bytes, - None => return Ok(None), - }; - - if short_address_exists(db, &normalized_bytes)? { - return Ok(Some(normalized)); - } - - // If the normalized text is a vanity address, resolve it through the - // vanity-to-owner tree and return the owner's canonical short address. - if let Some(owner_short_address) = resolve_owner_from_vanity_address(db, &normalized)? { - return Ok(Some(owner_short_address)); - } - - Ok(None) -} - -pub fn resolve_pubkey_from_short_address( - db: &Db, - short_address: &str, -) -> sled::Result>> { - // Resolve vanity aliases before loading the public key so signature checks - // always use the registered owner address. - let canonical_short_address = - match resolve_canonical_registered_short_address(db, short_address)? { - Some(address) => address, - None => return Ok(None), - }; - - let short_address_bytes = match Wallet::short_address_to_bytes(&canonical_short_address) { - Some(bytes) => bytes, - None => return Ok(None), - }; - - get_registered_pubkey(db, &short_address_bytes) -} - -pub fn require_canonical_registered_short_address( - db: &Db, - address: &str, - label: &str, -) -> Result { - // This path is used where vanity aliases are not allowed. The input may be - // normalized, but it must already equal the canonical registered address. - let normalized = Wallet::normalize_to_short_address(address) - .ok_or_else(|| format!("{label} is invalid."))?; - - let canonical = resolve_canonical_registered_short_address(db, &normalized) - .map_err(|err| format!("{label} lookup failed: {err}"))? - .ok_or_else(|| format!("{label} is not registered."))?; - - if canonical != normalized { - return Err(format!( - "{label} must use the canonical short address instead of a vanity alias." - )); - } - - Ok(canonical) -} - +use super::*; + +pub fn resolve_canonical_registered_short_address( + db: &Db, + address: &str, +) -> sled::Result> { + // Normalize long addresses, normal short addresses, and vanity-shaped + // addresses into the current-network short address format first. + let normalized = match Wallet::normalize_to_short_address(address) { + Some(address) => address, + None => return Ok(None), + }; + + // A direct registry hit means the input was already the canonical short + // address for a registered wallet. + let normalized_bytes = match Wallet::short_address_to_bytes(&normalized) { + Some(bytes) => bytes, + None => return Ok(None), + }; + + if short_address_exists(db, &normalized_bytes)? { + return Ok(Some(normalized)); + } + + // If the normalized text is a vanity address, resolve it through the + // vanity-to-owner tree and return the owner's canonical short address. + if let Some(owner_short_address) = resolve_owner_from_vanity_address(db, &normalized)? { + return Ok(Some(owner_short_address)); + } + + Ok(None) +} + +pub fn resolve_pubkey_from_short_address( + db: &Db, + short_address: &str, +) -> sled::Result>> { + // Resolve vanity aliases before loading the public key so signature checks + // always use the registered owner address. + let canonical_short_address = + match resolve_canonical_registered_short_address(db, short_address)? { + Some(address) => address, + None => return Ok(None), + }; + + let short_address_bytes = match Wallet::short_address_to_bytes(&canonical_short_address) { + Some(bytes) => bytes, + None => return Ok(None), + }; + + get_registered_pubkey(db, &short_address_bytes) +} + +pub fn require_canonical_registered_short_address( + db: &Db, + address: &str, + label: &str, +) -> Result { + // This path is used where vanity aliases are not allowed. The input may be + // normalized, but it must already equal the canonical registered address. + let normalized = Wallet::normalize_to_short_address(address) + .ok_or_else(|| format!("{label} is invalid."))?; + + let canonical = resolve_canonical_registered_short_address(db, &normalized) + .map_err(|err| format!("{label} lookup failed: {err}"))? + .ok_or_else(|| format!("{label} is not registered."))?; + + if canonical != normalized { + return Err(format!( + "{label} must use the canonical short address instead of a vanity alias." + )); + } + + Ok(canonical) +} + pub fn resolve_local_input_short_address(address: &str) -> Result { // CLI tools may receive long, short, or vanity addresses, so normalize the // user input before opening the local registry. @@ -96,100 +96,100 @@ pub fn resolve_local_input_short_address(address: &str) -> Result sled::Result, Vec)>> { - let tree = db.open_tree(WALLET_REGISTRY_TREE)?; - let mut wallets = Vec::new(); - - // Registry sync ships raw short-address/public-key pairs across peers. - for entry in tree.iter() { - let (short_address, public_key) = entry?; - wallets.push((short_address.to_vec(), public_key.to_vec())); - } - - Ok(wallets) -} - -pub fn get_registered_vanity_for_owner( - db: &Db, - owner_short_address: &str, -) -> sled::Result> { - // Owner lookups are canonicalized first so callers can pass either long or - // short owner addresses. - let canonical_owner = match resolve_canonical_registered_short_address(db, owner_short_address)? - { - Some(address) => address, - None => return Ok(None), - }; - - let owner_bytes = match Wallet::short_address_to_bytes(&canonical_owner) { - Some(bytes) => bytes, - None => return Ok(None), - }; - - let tree = db.open_tree(WALLET_VANITY_OWNER_TREE)?; - // Owner tree stores owner short-address bytes -> vanity-address bytes. - let Some(vanity_bytes) = tree.get(owner_bytes)?.map(|value| value.to_vec()) else { - return Ok(None); - }; - - Ok(Wallet::bytes_to_vanity_address(&vanity_bytes)) -} - -pub fn resolve_owner_from_vanity_address( - db: &Db, - vanity_address: &str, -) -> sled::Result> { - // Vanity text is normalized to the fixed byte payload before sled lookup. - let vanity_bytes = match Wallet::vanity_address_to_bytes(vanity_address) { - Some(bytes) => bytes, - None => return Ok(None), - }; - - let tree = db.open_tree(WALLET_VANITY_ADDRESS_TREE)?; - // Vanity tree stores vanity-address bytes -> owner short-address bytes. - let Some(owner_bytes) = tree.get(vanity_bytes)?.map(|value| value.to_vec()) else { - return Ok(None); - }; - - Ok(Wallet::bytes_to_short_address(&owner_bytes)) -} - -pub fn take_previous_vanity_for_txid( - db: &Db, - txid_hex: &str, -) -> sled::Result>> { - let tree = db.open_tree(WALLET_VANITY_ROLLBACK_TREE)?; - let key = decode(txid_hex).unwrap_or_default(); - // Taking removes the rollback marker so the same vanity undo cannot be - // replayed twice. - let Some(value) = tree.remove(key)?.map(|value| value.to_vec()) else { - return Ok(None); - }; - - if value.is_empty() { - // Empty bytes mean the owner had no prior vanity mapping. - return Ok(Some(None)); - } - - Ok(Some(Wallet::bytes_to_vanity_address(&value))) -} + +pub fn list_registered_wallets(db: &Db) -> sled::Result, Vec)>> { + let tree = db.open_tree(WALLET_REGISTRY_TREE)?; + let mut wallets = Vec::new(); + + // Registry sync ships raw short-address/public-key pairs across peers. + for entry in tree.iter() { + let (short_address, public_key) = entry?; + wallets.push((short_address.to_vec(), public_key.to_vec())); + } + + Ok(wallets) +} + +pub fn get_registered_vanity_for_owner( + db: &Db, + owner_short_address: &str, +) -> sled::Result> { + // Owner lookups are canonicalized first so callers can pass either long or + // short owner addresses. + let canonical_owner = match resolve_canonical_registered_short_address(db, owner_short_address)? + { + Some(address) => address, + None => return Ok(None), + }; + + let owner_bytes = match Wallet::short_address_to_bytes(&canonical_owner) { + Some(bytes) => bytes, + None => return Ok(None), + }; + + let tree = db.open_tree(WALLET_VANITY_OWNER_TREE)?; + // Owner tree stores owner short-address bytes -> vanity-address bytes. + let Some(vanity_bytes) = tree.get(owner_bytes)?.map(|value| value.to_vec()) else { + return Ok(None); + }; + + Ok(Wallet::bytes_to_vanity_address(&vanity_bytes)) +} + +pub fn resolve_owner_from_vanity_address( + db: &Db, + vanity_address: &str, +) -> sled::Result> { + // Vanity text is normalized to the fixed byte payload before sled lookup. + let vanity_bytes = match Wallet::vanity_address_to_bytes(vanity_address) { + Some(bytes) => bytes, + None => return Ok(None), + }; + + let tree = db.open_tree(WALLET_VANITY_ADDRESS_TREE)?; + // Vanity tree stores vanity-address bytes -> owner short-address bytes. + let Some(owner_bytes) = tree.get(vanity_bytes)?.map(|value| value.to_vec()) else { + return Ok(None); + }; + + Ok(Wallet::bytes_to_short_address(&owner_bytes)) +} + +pub fn take_previous_vanity_for_txid( + db: &Db, + txid_hex: &str, +) -> sled::Result>> { + let tree = db.open_tree(WALLET_VANITY_ROLLBACK_TREE)?; + let key = decode(txid_hex).unwrap_or_default(); + // Taking removes the rollback marker so the same vanity undo cannot be + // replayed twice. + let Some(value) = tree.remove(key)?.map(|value| value.to_vec()) else { + return Ok(None); + }; + + if value.is_empty() { + // Empty bytes mean the owner had no prior vanity mapping. + return Ok(Some(None)); + } + + Ok(Some(Wallet::bytes_to_vanity_address(&value))) +} diff --git a/src/records/wallet_registry/storage.rs b/src/records/wallet_registry/storage.rs index ab29f98..9da43fc 100644 --- a/src/records/wallet_registry/storage.rs +++ b/src/records/wallet_registry/storage.rs @@ -1,76 +1,76 @@ -use super::*; - -pub fn register_short_address( - db: &Db, - short_address: &[u8], - public_key: &[u8], +use super::*; + +pub fn register_short_address( + db: &Db, + short_address: &[u8], + public_key: &[u8], ) -> sled::Result { let tree = db.open_tree(WALLET_REGISTRY_TREE)?; // Re-registering the same public key is harmless, but a different public // key for the same short address is a real conflict. if let Some(existing) = tree.get(short_address)? { - if existing.as_ref() == public_key { - return Ok(WalletRegistrationResult::AlreadyRegistered); - } - return Ok(WalletRegistrationResult::Conflict); - } - - tree.insert(short_address, public_key)?; - Ok(WalletRegistrationResult::Inserted) -} - -pub fn register_or_update_vanity_address( - db: &Db, - owner_short_address: &str, - vanity_address: &str, + if existing.as_ref() == public_key { + return Ok(WalletRegistrationResult::AlreadyRegistered); + } + return Ok(WalletRegistrationResult::Conflict); + } + + tree.insert(short_address, public_key)?; + Ok(WalletRegistrationResult::Inserted) +} + +pub fn register_or_update_vanity_address( + db: &Db, + owner_short_address: &str, + vanity_address: &str, ) -> sled::Result { // Vanity ownership is only valid for an already registered canonical owner. let normalized_owner = match resolve_canonical_registered_short_address(db, owner_short_address)? { - Some(address) => address, - None => return Ok(VanityRegistrationResult::OwnerNotRegistered), - }; + Some(address) => address, + None => return Ok(VanityRegistrationResult::OwnerNotRegistered), + }; let normalized_vanity = match Wallet::vanity_address_to_bytes(vanity_address) .and_then(|bytes| Wallet::bytes_to_vanity_address(&bytes)) { - Some(address) => address, - None => return Ok(VanityRegistrationResult::InvalidVanity), - }; - - let owner_bytes = match Wallet::short_address_to_bytes(&normalized_owner) { - Some(bytes) => bytes, - None => return Ok(VanityRegistrationResult::InvalidOwner), - }; - let vanity_bytes = match Wallet::vanity_address_to_bytes(&normalized_vanity) { - Some(bytes) => bytes, - None => return Ok(VanityRegistrationResult::InvalidVanity), - }; - + Some(address) => address, + None => return Ok(VanityRegistrationResult::InvalidVanity), + }; + + let owner_bytes = match Wallet::short_address_to_bytes(&normalized_owner) { + Some(bytes) => bytes, + None => return Ok(VanityRegistrationResult::InvalidOwner), + }; + let vanity_bytes = match Wallet::vanity_address_to_bytes(&normalized_vanity) { + Some(bytes) => bytes, + None => return Ok(VanityRegistrationResult::InvalidVanity), + }; + let owner_tree = db.open_tree(WALLET_VANITY_OWNER_TREE)?; let vanity_tree = db.open_tree(WALLET_VANITY_ADDRESS_TREE)?; // Check the owner tree first to see whether this is an insert, update, or // no-op re-registration of the same vanity. let existing_vanity_bytes = owner_tree.get(&owner_bytes)?.map(|value| value.to_vec()); - if let Some(existing_vanity_bytes) = &existing_vanity_bytes { - if *existing_vanity_bytes == vanity_bytes { - if let Some(existing_owner_bytes) = - vanity_tree.get(&vanity_bytes)?.map(|value| value.to_vec()) - { - if existing_owner_bytes == owner_bytes { - return Ok(VanityRegistrationResult::AlreadyRegistered); - } - } - } - } - - let result = if existing_vanity_bytes.is_some() { - VanityRegistrationResult::Updated - } else { - VanityRegistrationResult::Inserted - }; - + if let Some(existing_vanity_bytes) = &existing_vanity_bytes { + if *existing_vanity_bytes == vanity_bytes { + if let Some(existing_owner_bytes) = + vanity_tree.get(&vanity_bytes)?.map(|value| value.to_vec()) + { + if existing_owner_bytes == owner_bytes { + return Ok(VanityRegistrationResult::AlreadyRegistered); + } + } + } + } + + let result = if existing_vanity_bytes.is_some() { + VanityRegistrationResult::Updated + } else { + VanityRegistrationResult::Inserted + }; + if let Some(old_vanity_bytes) = existing_vanity_bytes { // Updating an owner removes the old reverse vanity -> owner mapping // before writing the new pair. @@ -80,29 +80,29 @@ pub fn register_or_update_vanity_address( // Keep both directions in sync so vanity resolution and wallet restoration // can each use the efficient lookup direction they need. vanity_tree.insert(&vanity_bytes, owner_bytes.clone())?; - owner_tree.insert(&owner_bytes, vanity_bytes.clone())?; - - Ok(result) -} - -pub fn remove_registered_vanity_for_owner( - db: &Db, - owner_short_address: &str, + owner_tree.insert(&owner_bytes, vanity_bytes.clone())?; + + Ok(result) +} + +pub fn remove_registered_vanity_for_owner( + db: &Db, + owner_short_address: &str, ) -> sled::Result { // Removing by owner canonicalizes first so long/short owner inputs remove // the same vanity mapping. let normalized_owner = - match resolve_canonical_registered_short_address(db, owner_short_address)? { - Some(address) => address, - None => return Ok(false), - }; - let owner_bytes = match Wallet::short_address_to_bytes(&normalized_owner) { - Some(bytes) => bytes, - None => return Ok(false), - }; - - let owner_tree = db.open_tree(WALLET_VANITY_OWNER_TREE)?; - let vanity_tree = db.open_tree(WALLET_VANITY_ADDRESS_TREE)?; + match resolve_canonical_registered_short_address(db, owner_short_address)? { + Some(address) => address, + None => return Ok(false), + }; + let owner_bytes = match Wallet::short_address_to_bytes(&normalized_owner) { + Some(bytes) => bytes, + None => return Ok(false), + }; + + let owner_tree = db.open_tree(WALLET_VANITY_OWNER_TREE)?; + let vanity_tree = db.open_tree(WALLET_VANITY_ADDRESS_TREE)?; let Some(vanity_bytes) = owner_tree.remove(&owner_bytes)?.map(|value| value.to_vec()) else { return Ok(false); @@ -110,23 +110,23 @@ pub fn remove_registered_vanity_for_owner( // The reverse vanity lookup must be removed with the owner mapping. vanity_tree.remove(&vanity_bytes)?; - - Ok(true) -} - -pub fn store_previous_vanity_for_txid( - db: &Db, - txid_hex: &str, - previous_vanity: Option<&str>, + + Ok(true) +} + +pub fn store_previous_vanity_for_txid( + db: &Db, + txid_hex: &str, + previous_vanity: Option<&str>, ) -> sled::Result<()> { let tree = db.open_tree(WALLET_VANITY_ROLLBACK_TREE)?; let key = decode(txid_hex).unwrap_or_default(); // A missing previous vanity is stored as an empty value so undo can // distinguish "known none" from "no rollback record exists". let value = match previous_vanity { - Some(vanity) => Wallet::vanity_address_to_bytes(vanity).unwrap_or_default(), - None => Vec::new(), - }; - let _ = tree.insert(key, value)?; - Ok(()) -} + Some(vanity) => Wallet::vanity_address_to_bytes(vanity).unwrap_or_default(), + None => Vec::new(), + }; + let _ = tree.insert(key, value)?; + Ok(()) +} diff --git a/src/records/wallet_registry/structs.rs b/src/records/wallet_registry/structs.rs index 2ab1902..fdf2ab9 100644 --- a/src/records/wallet_registry/structs.rs +++ b/src/records/wallet_registry/structs.rs @@ -1,17 +1,17 @@ #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum WalletRegistrationResult { - Inserted, - AlreadyRegistered, - Conflict, -} - -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum VanityRegistrationResult { - Inserted, - Updated, - AlreadyRegistered, - Conflict, - OwnerNotRegistered, - InvalidOwner, - InvalidVanity, -} + Inserted, + AlreadyRegistered, + Conflict, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum VanityRegistrationResult { + Inserted, + Updated, + AlreadyRegistered, + Conflict, + OwnerNotRegistered, + InvalidOwner, + InvalidVanity, +} diff --git a/src/rpc/client/handshake_processing.rs b/src/rpc/client/handshake_processing.rs index 1ac2c3f..89c881f 100644 --- a/src/rpc/client/handshake_processing.rs +++ b/src/rpc/client/handshake_processing.rs @@ -190,18 +190,25 @@ pub async fn bootstrap_peer_discovery(mut params: BootstrapParams) -> Result<(), let post_sync_remote_height = request_remote_height(stream.clone(), params.map.clone(), current_key.clone()).await?; - if post_sync_remote_height != post_sync_local_height { + let imported_candidates = match hydrate_torrent_candidates(stream.clone(), params.map.clone(), current_key.clone()) .await { Ok(imported) => { if imported > 0 { - warn!("[sync] hydrated {imported} torrent candidates before post-sync orphan check"); + warn!( + "[sync] hydrated {imported} torrent candidates before post-sync orphan check" + ); } + imported } - Err(err) => warn!("[sync] failed to hydrate torrent candidates: {err}"), - } + Err(err) => { + warn!("[sync] failed to hydrate torrent candidates: {err}"); + 0 + } + }; + if post_sync_remote_height != post_sync_local_height || imported_candidates > 0 { let orphan_checkup_params = OrphanCheckup2 { stream: stream.clone(), db: params.db.clone(), diff --git a/src/rpc/client/syncing.rs b/src/rpc/client/syncing.rs index 43c9bae..3ea6064 100644 --- a/src/rpc/client/syncing.rs +++ b/src/rpc/client/syncing.rs @@ -80,6 +80,7 @@ pub async fn node_syncing( wallet.clone(), map.clone(), false, + node_syncing, false, ) .await diff --git a/src/rpc/commands/network_info.rs b/src/rpc/commands/network_info.rs index ffa893b..4af373f 100644 --- a/src/rpc/commands/network_info.rs +++ b/src/rpc/commands/network_info.rs @@ -1,97 +1,97 @@ -use crate::common::network_paths_and_settings::block_extension_and_paths; -use crate::records::block_height::get_block_height::get_height; -use crate::records::memory::mempool::{largest_fee, total_transactions}; -use crate::records::unpack_block::unpack_header::load_block_header; -use crate::rpc::commands::structs::NetworkInfo; -use crate::rpc::responses::RpcResponse; -use crate::sled::Db; -use crate::Utc; - -async fn network_info_to_bytes(info: NetworkInfo) -> Vec { - // Serialize the network-info snapshot into the fixed RPC payload - // layout expected by peers and external clients. - let mut bytes = Vec::new(); - - let version_bytes = info.version.to_le_bytes(); - let network_bytes = info.network.as_bytes(); - let time_bytes = info.time.to_le_bytes(); - let prefix_bytes = info.wallet_prefix.as_bytes(); - let block_height_bytes = info.height.to_le_bytes(); - let next_block_difficulty_bytes = info.next_block_difficulty.to_le_bytes(); - let total_block_transactions_bytes = info.total_block_transactions.to_le_bytes(); - let transaction_response = total_transactions().await; - let RpcResponse::Binary(total_mempool_transactions_bytes) = transaction_response; - let fee_response = largest_fee().await; - let RpcResponse::Binary(largest_tx_fee_bytes) = fee_response; - - bytes.extend(version_bytes); - bytes.extend(network_bytes); - bytes.extend(time_bytes); - bytes.extend(prefix_bytes); - bytes.extend(block_height_bytes); - bytes.extend(next_block_difficulty_bytes); - bytes.extend(total_block_transactions_bytes); - bytes.extend(total_mempool_transactions_bytes); - bytes.extend(largest_tx_fee_bytes); - - bytes -} - -pub async fn request_network_info(db: &Db) -> RpcResponse { - // Build a point-in-time network snapshot from local chain state, - // mempool counts, fee stats, and network naming configuration. - let version = 1; - - let ( - network_name, - _wallet_type, - suffix, - _torrentpath, - _wallet_path, - _blockpath, - _db_path, - _balance_path, - _log_path, - ) = block_extension_and_paths(); - let network = network_name.to_string(); - let wallet_prefix = suffix.to_uppercase(); - - let height = get_height(db); - - let block = match load_block_header(height).await { - Ok(data) => data, - Err(_) => { - let error = "Error: Calcaulting Network Info" - .to_string() - .as_bytes() - .to_vec(); - return RpcResponse::Binary(error); - } - }; - - let tree = db.open_tree("txid").unwrap(); - let total_block_transactions = tree.len() as u32; - - let time = Utc::now().timestamp() as u32; - - // These fields are filled inside `to_bytes` from their live RPC - // helpers so the serialization step stays self-contained. - let total_mempool_transactions = 0_u32; - let largest_tx_fee = 0_u64; - - let network_info = NetworkInfo { - version, - network, - time, - wallet_prefix, - height, - next_block_difficulty: block.unmined_block.next_block_difficulty, - total_block_transactions, - total_mempool_transactions, - largest_tx_fee, - }; - - let info_bytes = network_info_to_bytes(network_info).await; - - RpcResponse::Binary(info_bytes) -} +use crate::common::network_paths_and_settings::block_extension_and_paths; +use crate::records::block_height::get_block_height::get_height; +use crate::records::memory::mempool::{largest_fee, total_transactions}; +use crate::records::unpack_block::unpack_header::load_block_header; +use crate::rpc::commands::structs::NetworkInfo; +use crate::rpc::responses::RpcResponse; +use crate::sled::Db; +use crate::Utc; + +async fn network_info_to_bytes(info: NetworkInfo) -> Vec { + // Serialize the network-info snapshot into the fixed RPC payload + // layout expected by peers and external clients. + let mut bytes = Vec::new(); + + let version_bytes = info.version.to_le_bytes(); + let network_bytes = info.network.as_bytes(); + let time_bytes = info.time.to_le_bytes(); + let prefix_bytes = info.wallet_prefix.as_bytes(); + let block_height_bytes = info.height.to_le_bytes(); + let next_block_difficulty_bytes = info.next_block_difficulty.to_le_bytes(); + let total_block_transactions_bytes = info.total_block_transactions.to_le_bytes(); + let transaction_response = total_transactions().await; + let RpcResponse::Binary(total_mempool_transactions_bytes) = transaction_response; + let fee_response = largest_fee().await; + let RpcResponse::Binary(largest_tx_fee_bytes) = fee_response; + + bytes.extend(version_bytes); + bytes.extend(network_bytes); + bytes.extend(time_bytes); + bytes.extend(prefix_bytes); + bytes.extend(block_height_bytes); + bytes.extend(next_block_difficulty_bytes); + bytes.extend(total_block_transactions_bytes); + bytes.extend(total_mempool_transactions_bytes); + bytes.extend(largest_tx_fee_bytes); + + bytes +} + +pub async fn request_network_info(db: &Db) -> RpcResponse { + // Build a point-in-time network snapshot from local chain state, + // mempool counts, fee stats, and network naming configuration. + let version = 1; + + let ( + network_name, + _wallet_type, + suffix, + _torrentpath, + _wallet_path, + _blockpath, + _db_path, + _balance_path, + _log_path, + ) = block_extension_and_paths(); + let network = network_name.to_string(); + let wallet_prefix = suffix.to_uppercase(); + + let height = get_height(db); + + let block = match load_block_header(height).await { + Ok(data) => data, + Err(_) => { + let error = "Error: Calcaulting Network Info" + .to_string() + .as_bytes() + .to_vec(); + return RpcResponse::Binary(error); + } + }; + + let tree = db.open_tree("txid").unwrap(); + let total_block_transactions = tree.len() as u32; + + let time = Utc::now().timestamp() as u32; + + // These fields are filled inside `to_bytes` from their live RPC + // helpers so the serialization step stays self-contained. + let total_mempool_transactions = 0_u32; + let largest_tx_fee = 0_u64; + + let network_info = NetworkInfo { + version, + network, + time, + wallet_prefix, + height, + next_block_difficulty: block.unmined_block.next_block_difficulty, + total_block_transactions, + total_mempool_transactions, + largest_tx_fee, + }; + + let info_bytes = network_info_to_bytes(network_info).await; + + RpcResponse::Binary(info_bytes) +} diff --git a/src/rpc/commands/receive_torrent.rs b/src/rpc/commands/receive_torrent.rs index 653054b..e6e4ded 100644 --- a/src/rpc/commands/receive_torrent.rs +++ b/src/rpc/commands/receive_torrent.rs @@ -188,6 +188,7 @@ pub async fn torrent_submission( torrent, staged_path, false, + false, db_clone.clone(), map_for_download, ) diff --git a/src/rpc/commands/torrent_candidates.rs b/src/rpc/commands/torrent_candidates.rs index 703ed82..7d37f42 100644 --- a/src/rpc/commands/torrent_candidates.rs +++ b/src/rpc/commands/torrent_candidates.rs @@ -1,64 +1,64 @@ -use crate::common::network_paths_and_settings::block_extension_and_paths; -use crate::orphans::snapshot_check::snapshot_height; -use crate::records::block_height::get_block_height::get_height; -use crate::rpc::responses::RpcResponse; -use crate::sled::Db; -use crate::torrent::torrenting_system::save_torrent::{list_staged_torrents, read_staged_torrent}; -use crate::{read, Path}; - -async fn canonical_torrent_bytes(height: u32) -> Option> { - // Canonical torrents are the saved `.torrent` files that match - // blocks already accepted into the local chain. - let ( - _network_name, - _padded_base_coin, - _block_ext, - torrent_path, - _wallet_path, - _block_path, - _db_path, - _balance_path, - _log_path, - ) = block_extension_and_paths(); - let file_path = Path::new(&torrent_path).join(format!("{height}.torrent")); - read(&file_path).await.ok() -} - -pub async fn request_torrent_candidates(db: &Db) -> RpcResponse { - // Send both canonical and staged torrent files from the last saved - // snapshot onward so a freshly connected peer can fill its staging area. - let start_height = snapshot_height(db).await.unwrap_or(0); - let current_height = get_height(db); - let mut candidates = Vec::new(); - - for height in start_height..=current_height { - if let Some(torrent_bytes) = canonical_torrent_bytes(height).await { - candidates.push((height, torrent_bytes)); - } - } - - if let Ok(staged_torrents) = list_staged_torrents().await { - for (height, staged_path) in staged_torrents { - if height < start_height { - continue; - } - // Staged torrents may not yet be canonical on this node, but - // peers still need them when evaluating short-range reorgs. - if let Ok(torrent_bytes) = read_staged_torrent(&staged_path).await { - candidates.push((height, torrent_bytes)); - } - } - } - - // Response layout: candidate count, then repeated height, byte - // length, and raw torrent bytes. - let mut response = Vec::new(); - response.extend_from_slice(&(candidates.len() as u32).to_le_bytes()); - for (height, torrent_bytes) in candidates { - response.extend_from_slice(&height.to_le_bytes()); - response.extend_from_slice(&(torrent_bytes.len() as u32).to_le_bytes()); - response.extend_from_slice(&torrent_bytes); - } - - RpcResponse::Binary(response) -} +use crate::common::network_paths_and_settings::block_extension_and_paths; +use crate::orphans::snapshot_check::snapshot_height; +use crate::records::block_height::get_block_height::get_height; +use crate::rpc::responses::RpcResponse; +use crate::sled::Db; +use crate::torrent::torrenting_system::save_torrent::{list_staged_torrents, read_staged_torrent}; +use crate::{read, Path}; + +async fn canonical_torrent_bytes(height: u32) -> Option> { + // Canonical torrents are the saved `.torrent` files that match + // blocks already accepted into the local chain. + let ( + _network_name, + _padded_base_coin, + _block_ext, + torrent_path, + _wallet_path, + _block_path, + _db_path, + _balance_path, + _log_path, + ) = block_extension_and_paths(); + let file_path = Path::new(&torrent_path).join(format!("{height}.torrent")); + read(&file_path).await.ok() +} + +pub async fn request_torrent_candidates(db: &Db) -> RpcResponse { + // Send both canonical and staged torrent files from the last saved + // snapshot onward so a freshly connected peer can fill its staging area. + let start_height = snapshot_height(db).await.unwrap_or(0); + let current_height = get_height(db); + let mut candidates = Vec::new(); + + for height in start_height..=current_height { + if let Some(torrent_bytes) = canonical_torrent_bytes(height).await { + candidates.push((height, torrent_bytes)); + } + } + + if let Ok(staged_torrents) = list_staged_torrents().await { + for (height, staged_path) in staged_torrents { + if height < start_height { + continue; + } + // Staged torrents may not yet be canonical on this node, but + // peers still need them when evaluating short-range reorgs. + if let Ok(torrent_bytes) = read_staged_torrent(&staged_path).await { + candidates.push((height, torrent_bytes)); + } + } + } + + // Response layout: candidate count, then repeated height, byte + // length, and raw torrent bytes. + let mut response = Vec::new(); + response.extend_from_slice(&(candidates.len() as u32).to_le_bytes()); + for (height, torrent_bytes) in candidates { + response.extend_from_slice(&height.to_le_bytes()); + response.extend_from_slice(&(torrent_bytes.len() as u32).to_le_bytes()); + response.extend_from_slice(&torrent_bytes); + } + + RpcResponse::Binary(response) +} diff --git a/src/rpc/commands/tx_count.rs b/src/rpc/commands/tx_count.rs index ed89f94..3f5c520 100644 --- a/src/rpc/commands/tx_count.rs +++ b/src/rpc/commands/tx_count.rs @@ -1,150 +1,150 @@ -use crate::blocks::block::VRF_BLOCK_BYTES; -use crate::common::binary_conversions::binary_to_string; -use crate::common::network_paths_and_settings::block_extension_and_paths; -use crate::common::types::{GENESIS_TYPE, REWARDS_TYPE, VANITY_ADDRESS_TYPE}; -use crate::rpc::command_maps; -use crate::rpc::responses::RpcResponse; -use crate::sled::Db; -use crate::PathBuf; -use crate::{AsyncReadExt, AsyncSeekExt, File, SeekFrom}; - -const HEADER_SIZE: u64 = VRF_BLOCK_BYTES as u64; - -async fn lookup_transaction_location(db: &Db, txid: Vec) -> Result<(u64, u32, String), String> { - // The txid tree stores `block:index`, which is enough to locate the - // transaction inside the saved block file on disk. - let tree = db.open_tree("txid").unwrap(); - let value = match tree.get(txid) { - Ok(Some(result)) => result.to_vec(), - Ok(None) => { - return Err("error: Key not found".to_string()); - } - Err(_) => { - return Err("error: Error retrieving value".to_string()); - } - }; - - let value_str = binary_to_string(value); - // Stored txid locations are saved as ASCII `height:index`. - let parts: Vec<&str> = value_str.split(':').collect(); - - let block: u64 = parts[0].parse().unwrap_or_default(); - let position: u32 = parts[1].parse().unwrap_or_default(); - - let ( - _network_name, - _padded_base_coin, - block_ext, - _torrent_path, - _wallet_path, - block_path, - _db_path, - _balance_path, - _log_path, - ) = block_extension_and_paths(); - let block_filename = PathBuf::from(block_path) - .join(format!("{block}.{block_ext}")) - .to_string_lossy() - .into_owned(); - Ok((block, position, block_filename)) -} - -async fn read_transaction_type(file_path: &str, position: u64) -> Option { - // Transaction offsets are located by repeatedly reading the type byte - // so the fixed encoded size for each saved transaction can be applied. - let mut file = File::open(file_path).await.ok()?; - file.seek(SeekFrom::Start(position)).await.ok()?; - - let mut transaction_type_byte = [0u8; 1]; - file.read_exact(&mut transaction_type_byte).await.ok()?; - - Some(transaction_type_byte[0]) -} - -async fn calculate_offset(file_path: &str, position: u32) -> Option> { - // Walk forward through the serialized block body until the requested - // transaction index is reached, then read exactly that transaction. - let mut total_bytes_to_skip: u64 = HEADER_SIZE; - let mut current_position: u32 = 1; - - let mut transaction_type = read_transaction_type(file_path, HEADER_SIZE).await?; - - while current_position < position { - // Transaction records are fixed-size by type, so the type byte - // determines how far to skip to reach the requested index. - let size = command_maps::get_bytes(transaction_type) as u64; - total_bytes_to_skip += size; - transaction_type = read_transaction_type(file_path, total_bytes_to_skip).await?; - current_position += 1; - } - - let size = command_maps::get_bytes(transaction_type) as u64; - let mut file = File::open(file_path).await.ok()?; - file.seek(SeekFrom::Start(total_bytes_to_skip)).await.ok()?; - - let mut transaction_bytes = vec![0u8; size as usize]; - file.read_exact(&mut transaction_bytes).await.ok()?; - - Some(transaction_bytes) -} - -fn reward_value(tx_bytes: &[u8]) -> u64 { - // Rewards are counted twice: total rewards and non-zero rewards. - if tx_bytes.len() < 13 { - return 0; - } - - let mut value_bytes = [0u8; 8]; - value_bytes.copy_from_slice(&tx_bytes[5..13]); - u64::from_le_bytes(value_bytes) -} - -pub async fn request_tx_count(db: &Db) -> RpcResponse { - // Count saved transactions by type directly from the txid index and - // block files so the response reflects the committed chain state. - let tree = db.open_tree("txid").unwrap(); - let mut totals = [0u64; (VANITY_ADDRESS_TYPE as usize) + 1]; - let mut non_zero = [0u64; (VANITY_ADDRESS_TYPE as usize) + 1]; - - for entry in tree.iter() { - let Ok((txid, _)) = entry else { - continue; - }; - - let Ok((_block, position, block_filename)) = - lookup_transaction_location(db, txid.to_vec()).await - else { - continue; - }; - - let Some(tx_bytes) = calculate_offset(&block_filename, position).await else { - continue; - }; - - let Some(&txtype) = tx_bytes.first() else { - continue; - }; - - if txtype == GENESIS_TYPE || txtype > VANITY_ADDRESS_TYPE { - continue; - } - - // Genesis is excluded from the public count table; every other - // known transaction type gets total and non-zero columns. - totals[txtype as usize] = totals[txtype as usize].saturating_add(1); - - if txtype == REWARDS_TYPE && reward_value(&tx_bytes) > 0 { - non_zero[txtype as usize] = non_zero[txtype as usize].saturating_add(1); - } - } - - let mut response = Vec::with_capacity(VANITY_ADDRESS_TYPE as usize * 17); - for txtype in 1u8..=VANITY_ADDRESS_TYPE { - // Response rows are type byte, total count, non-zero count. - response.push(txtype); - response.extend_from_slice(&totals[txtype as usize].to_le_bytes()); - response.extend_from_slice(&non_zero[txtype as usize].to_le_bytes()); - } - - RpcResponse::Binary(response) -} +use crate::blocks::block::VRF_BLOCK_BYTES; +use crate::common::binary_conversions::binary_to_string; +use crate::common::network_paths_and_settings::block_extension_and_paths; +use crate::common::types::{GENESIS_TYPE, REWARDS_TYPE, VANITY_ADDRESS_TYPE}; +use crate::rpc::command_maps; +use crate::rpc::responses::RpcResponse; +use crate::sled::Db; +use crate::PathBuf; +use crate::{AsyncReadExt, AsyncSeekExt, File, SeekFrom}; + +const HEADER_SIZE: u64 = VRF_BLOCK_BYTES as u64; + +async fn lookup_transaction_location(db: &Db, txid: Vec) -> Result<(u64, u32, String), String> { + // The txid tree stores `block:index`, which is enough to locate the + // transaction inside the saved block file on disk. + let tree = db.open_tree("txid").unwrap(); + let value = match tree.get(txid) { + Ok(Some(result)) => result.to_vec(), + Ok(None) => { + return Err("error: Key not found".to_string()); + } + Err(_) => { + return Err("error: Error retrieving value".to_string()); + } + }; + + let value_str = binary_to_string(value); + // Stored txid locations are saved as ASCII `height:index`. + let parts: Vec<&str> = value_str.split(':').collect(); + + let block: u64 = parts[0].parse().unwrap_or_default(); + let position: u32 = parts[1].parse().unwrap_or_default(); + + let ( + _network_name, + _padded_base_coin, + block_ext, + _torrent_path, + _wallet_path, + block_path, + _db_path, + _balance_path, + _log_path, + ) = block_extension_and_paths(); + let block_filename = PathBuf::from(block_path) + .join(format!("{block}.{block_ext}")) + .to_string_lossy() + .into_owned(); + Ok((block, position, block_filename)) +} + +async fn read_transaction_type(file_path: &str, position: u64) -> Option { + // Transaction offsets are located by repeatedly reading the type byte + // so the fixed encoded size for each saved transaction can be applied. + let mut file = File::open(file_path).await.ok()?; + file.seek(SeekFrom::Start(position)).await.ok()?; + + let mut transaction_type_byte = [0u8; 1]; + file.read_exact(&mut transaction_type_byte).await.ok()?; + + Some(transaction_type_byte[0]) +} + +async fn calculate_offset(file_path: &str, position: u32) -> Option> { + // Walk forward through the serialized block body until the requested + // transaction index is reached, then read exactly that transaction. + let mut total_bytes_to_skip: u64 = HEADER_SIZE; + let mut current_position: u32 = 1; + + let mut transaction_type = read_transaction_type(file_path, HEADER_SIZE).await?; + + while current_position < position { + // Transaction records are fixed-size by type, so the type byte + // determines how far to skip to reach the requested index. + let size = command_maps::get_bytes(transaction_type) as u64; + total_bytes_to_skip += size; + transaction_type = read_transaction_type(file_path, total_bytes_to_skip).await?; + current_position += 1; + } + + let size = command_maps::get_bytes(transaction_type) as u64; + let mut file = File::open(file_path).await.ok()?; + file.seek(SeekFrom::Start(total_bytes_to_skip)).await.ok()?; + + let mut transaction_bytes = vec![0u8; size as usize]; + file.read_exact(&mut transaction_bytes).await.ok()?; + + Some(transaction_bytes) +} + +fn reward_value(tx_bytes: &[u8]) -> u64 { + // Rewards are counted twice: total rewards and non-zero rewards. + if tx_bytes.len() < 13 { + return 0; + } + + let mut value_bytes = [0u8; 8]; + value_bytes.copy_from_slice(&tx_bytes[5..13]); + u64::from_le_bytes(value_bytes) +} + +pub async fn request_tx_count(db: &Db) -> RpcResponse { + // Count saved transactions by type directly from the txid index and + // block files so the response reflects the committed chain state. + let tree = db.open_tree("txid").unwrap(); + let mut totals = [0u64; (VANITY_ADDRESS_TYPE as usize) + 1]; + let mut non_zero = [0u64; (VANITY_ADDRESS_TYPE as usize) + 1]; + + for entry in tree.iter() { + let Ok((txid, _)) = entry else { + continue; + }; + + let Ok((_block, position, block_filename)) = + lookup_transaction_location(db, txid.to_vec()).await + else { + continue; + }; + + let Some(tx_bytes) = calculate_offset(&block_filename, position).await else { + continue; + }; + + let Some(&txtype) = tx_bytes.first() else { + continue; + }; + + if txtype == GENESIS_TYPE || txtype > VANITY_ADDRESS_TYPE { + continue; + } + + // Genesis is excluded from the public count table; every other + // known transaction type gets total and non-zero columns. + totals[txtype as usize] = totals[txtype as usize].saturating_add(1); + + if txtype == REWARDS_TYPE && reward_value(&tx_bytes) > 0 { + non_zero[txtype as usize] = non_zero[txtype as usize].saturating_add(1); + } + } + + let mut response = Vec::with_capacity(VANITY_ADDRESS_TYPE as usize * 17); + for txtype in 1u8..=VANITY_ADDRESS_TYPE { + // Response rows are type byte, total count, non-zero count. + response.push(txtype); + response.extend_from_slice(&totals[txtype as usize].to_le_bytes()); + response.extend_from_slice(&non_zero[txtype as usize].to_le_bytes()); + } + + RpcResponse::Binary(response) +} diff --git a/src/rpc/server/handshake.rs b/src/rpc/server/handshake.rs index dec97b6..d4e2cde 100644 --- a/src/rpc/server/handshake.rs +++ b/src/rpc/server/handshake.rs @@ -14,6 +14,7 @@ use crate::records::memory::connections::{ use crate::records::memory::network_mapping::NodeInfo; use crate::records::memory::response_channels::generate_uid; use crate::records::memory::response_channels::Command; +use crate::records::memory::structs::Connection; use crate::records::wallet_registry::{register_short_address, WalletRegistrationResult}; use crate::rpc::client::syncing::node_syncing; use crate::rpc::client::wallet_registry_sync::sync_wallet_registry_with_retries; @@ -24,11 +25,13 @@ use crate::rpc::server::handshake_verifications::{connection_count, perform_hand use crate::rpc::server::structs::{CombineAndSendDataParams, HandshakeTestParams}; use crate::rpc::server::tests::{endpoint_port, is_port_open}; use crate::sled::Db; +use crate::sleep; use crate::startup::network_broadcast::announce_self_to_network; use crate::startup::remote_height::request_remote_height; use crate::wallets::structures::Wallet; use crate::Arc; use crate::AsyncWriteExt; +use crate::Duration; use crate::Mutex; use crate::Settings; use crate::TcpStream; @@ -96,20 +99,28 @@ async fn sync_incoming_peer_before_operational( let post_sync_remote_height = request_remote_height(stream.clone(), map.clone(), connections_key.to_string()).await?; - if post_sync_remote_height > post_sync_local_height { - match hydrate_torrent_candidates(stream.clone(), map.clone(), connections_key.to_string()) - .await - { - Ok(imported) => { - if imported > 0 { - warn!( + let imported_candidates = match hydrate_torrent_candidates( + stream.clone(), + map.clone(), + connections_key.to_string(), + ) + .await + { + Ok(imported) => { + if imported > 0 { + warn!( "[sync] hydrated {imported} torrent candidates before incoming post-sync orphan check" ); - } } - Err(err) => warn!("[sync] failed to hydrate incoming torrent candidates: {err}"), + imported } + Err(err) => { + warn!("[sync] failed to hydrate incoming torrent candidates: {err}"); + 0 + } + }; + if post_sync_remote_height > post_sync_local_height || imported_candidates > 0 { let orphan_checkup_params = OrphanCheckup2 { stream: stream.clone(), db: db.clone(), @@ -128,6 +139,40 @@ async fn sync_incoming_peer_before_operational( Ok(true) } +fn spawn_incoming_peer_promotion_watcher( + stream: Arc>, + db: Db, + map: Arc>, + connections_key: String, +) { + tokio::spawn(async move { + loop { + sleep(Duration::from_secs(5)).await; + + if Connection::get_stream_from_memory(&connections_key) + .await + .is_none() + { + break; + } + + let local_height = get_height(&db); + let remote_height = + match request_remote_height(stream.clone(), map.clone(), connections_key.clone()) + .await + { + Ok(height) => height, + Err(_) => continue, + }; + + if remote_height >= local_height { + mark_peer_operational(&connections_key, map.clone()).await; + break; + } + } + }); +} + async fn complete_incoming_miner_setup( stream: Arc>, db: &Db, @@ -191,6 +236,13 @@ async fn complete_incoming_miner_setup( set_node_mode(NodeMode::Normal); clear_mining_stop_request(); set_mining_state(MiningState::Idle); + } else { + spawn_incoming_peer_promotion_watcher( + stream.clone(), + db.clone(), + map.clone(), + connections_key.to_string(), + ); } if let Err(err) = NodeInfo::rebuild_mined_counts_from_chain(db).await { diff --git a/src/startup/connections.rs b/src/startup/connections.rs index 179c6d4..19ee910 100644 --- a/src/startup/connections.rs +++ b/src/startup/connections.rs @@ -1,18 +1,18 @@ -use crate::common::network_startup::get_connections; -use crate::log::{error, info}; -use crate::miner::flag::{ - clear_mining_stop_request, set_mining_state, set_node_mode, MiningState, NodeMode, -}; -use crate::records::memory::response_channels::Command; -use crate::rpc::client::handshake::connect_and_handshake; -use crate::rpc::client::structs::Connect; +use crate::common::network_startup::get_connections; +use crate::log::{error, info}; +use crate::miner::flag::{ + clear_mining_stop_request, set_mining_state, set_node_mode, MiningState, NodeMode, +}; +use crate::records::memory::response_channels::Command; +use crate::rpc::client::handshake::connect_and_handshake; +use crate::rpc::client::structs::Connect; use crate::sled::Db; use crate::sleep; use crate::wallets::structures::Wallet; use crate::Arc; -use crate::Duration; -use crate::Mutex; - +use crate::Duration; +use crate::Mutex; + pub async fn handle_connections( db: Db, wallet: Arc, @@ -34,22 +34,22 @@ pub async fn handle_connections( // try the configured bootstrap peers one by one until a // handshake succeeds or the list is exhausted let filtered_servers = get_connections().await; - let mut last_error: Option = None; - - for server in filtered_servers { - // build the outbound handshake request using cloned - // shared state so each attempt can run independently - let db_clone = db.clone(); - + let mut last_error: Option = None; + + for server in filtered_servers { + // build the outbound handshake request using cloned + // shared state so each attempt can run independently + let db_clone = db.clone(); + // parse the configured peer string once before spawning - // the outbound connection attempt - let socket_address = server.parse().expect("Failed to parse the socket address"); - - // Clone the Arc for use in other async functions - let map_clone = Arc::clone(&map); - - let first: bool = true; - let connect_params = Connect { + // the outbound connection attempt + let socket_address = server.parse().expect("Failed to parse the socket address"); + + // Clone the Arc for use in other async functions + let map_clone = Arc::clone(&map); + + let first: bool = true; + let connect_params = Connect { addr: socket_address, db: db_clone, node_ip: server.to_string(), @@ -57,33 +57,33 @@ pub async fn handle_connections( map: map_clone, first, }; - - let err_string = match connect_and_handshake(connect_params).await { - Ok(()) => { - info!("Connected to {server}"); - return Ok(()); - } - Err(err) => err.to_string(), - }; - - // A peer can reject us because it already has this connection recorded. - // In that case retrying other bootstrap peers would not fix the local duplicate state. - if err_string.contains("The connection is already in the connection manager Please wait 10 minutes and try again") { - return Err(err_string); - } + + let err_string = match connect_and_handshake(connect_params).await { + Ok(()) => { + info!("Connected to {server}"); + return Ok(()); + } + Err(err) => err.to_string(), + }; + + // A peer can reject us because it already has this connection recorded. + // In that case retrying other bootstrap peers would not fix the local duplicate state. + if err_string.contains("The connection is already in the connection manager Please wait 10 minutes and try again") { + return Err(err_string); + } error!("Error connecting to {server}: {err_string}"); last_error = Some(err_string.clone()); - sleep(Duration::from_secs(5)).await; - } - - if let Some(err) = last_error { - info!("No bootstrap peers connected during startup: {err}"); - } else { - info!("No bootstrap peers connected during startup."); - } - // Startup can continue as a standalone node even if no bootstrap peer is reachable. - set_node_mode(NodeMode::Normal); - clear_mining_stop_request(); - set_mining_state(MiningState::Idle); - Ok(()) -} + sleep(Duration::from_secs(5)).await; + } + + if let Some(err) = last_error { + info!("No bootstrap peers connected during startup: {err}"); + } else { + info!("No bootstrap peers connected during startup."); + } + // Startup can continue as a standalone node even if no bootstrap peer is reachable. + set_node_mode(NodeMode::Normal); + clear_mining_stop_request(); + set_mining_state(MiningState::Idle); + Ok(()) +} diff --git a/src/startup/daemonize.rs b/src/startup/daemonize.rs index f20c8ac..50f3c25 100644 --- a/src/startup/daemonize.rs +++ b/src/startup/daemonize.rs @@ -1,236 +1,236 @@ -#[cfg(unix)] -use crate::common::network_paths_and_settings::block_extension_and_paths; -#[cfg(unix)] -use crate::env; -#[cfg(unix)] -use crate::lazy_static; -#[cfg(unix)] -use crate::log::{error, info, warn}; -#[cfg(unix)] -use crate::sled::Db; -#[cfg(unix)] -use crate::task; -#[cfg(unix)] -use crate::PathBuf; -#[cfg(unix)] -use nix::sys::signal::{kill, Signal}; -#[cfg(unix)] -use nix::unistd::{daemon, Pid}; -#[cfg(unix)] -use std::sync::Mutex as StdMutex; - -#[cfg(unix)] -lazy_static! { - static ref PID_FILE_PATH: StdMutex> = StdMutex::new(None); -} - -#[cfg(unix)] -fn should_daemonize() -> bool { - // --foreground keeps Linux startup attached to the terminal for debugging. - !env::args().any(|arg| arg == "--foreground") -} - -#[cfg(unix)] -fn read_pid(pid_path: &PathBuf) -> Result> { - // PID files are plain text so shell tools can inspect them too. - let pid_contents = std::fs::read_to_string(pid_path)?; - Ok(pid_contents.trim().parse::()?) -} - -#[cfg(unix)] -fn pid_file_path() -> PathBuf { - // PID files live under the network-scoped db path so testnet/mainnet never collide. - let ( - _network_name, - _padded_base_coin, - _suffix, - _torrent_path, - _wallet_path, - _block_path, - db_path, - _balance_path, - _log_path, - ) = block_extension_and_paths(); - PathBuf::from(db_path).join("contractless.pid") -} - -#[cfg(unix)] -fn existing_process_running(pid_path: &PathBuf) -> Result> { - // The PID file is treated as the source of truth for duplicate-start checks. - if !pid_path.exists() { - return Ok(false); - } - - let pid = read_pid(pid_path)?; - - match kill(Pid::from_raw(pid), None) { - Ok(_) => Ok(true), - Err(_) => Ok(false), - } -} - -#[cfg(unix)] -fn remove_pid_file_if_present(pid_path: &PathBuf) { - // Missing PID files are harmless during cleanup. - let _ = std::fs::remove_file(pid_path); -} - -#[cfg(unix)] -fn write_pid_file(pid_path: &PathBuf) -> Result<(), Box> { - // The PID file is written only after the process has detached so later control - // commands point at the background daemon rather than the original shell session. - if let Some(parent) = pid_path.parent() { - std::fs::create_dir_all(parent)?; - } - - let pid = std::process::id().to_string(); - std::fs::write(pid_path, pid)?; - - let mut slot = PID_FILE_PATH.lock().expect("failed to lock pid file slot"); - *slot = Some(pid_path.clone()); - - Ok(()) -} - -#[cfg(unix)] -pub fn remove_registered_pid_file() { - let pid_path = { - let slot = PID_FILE_PATH.lock().expect("failed to lock pid file slot"); - slot.clone() - }; - - if let Some(path) = pid_path { - remove_pid_file_if_present(&path); - } -} - -#[cfg(unix)] -pub fn daemonize_after_wallet_prompt() -> Result> { - // Linux waits until after the wallet prompt to detach so the encryption key never - // needs to be handed off to a second process. - if !should_daemonize() { - return Ok(false); - } - - let pid_path = pid_file_path(); - // Refuse to detach if an active process already owns the network-scoped PID file. - if existing_process_running(&pid_path)? { - return Err(format!( - "Contractless is already running. Remove stale PID file if needed: {}", - pid_path.display() - ) - .into()); - } - - remove_pid_file_if_present(&pid_path); - // daemon(true, false) keeps the working directory but redirects stdio for background mode. - daemon(true, false)?; - write_pid_file(&pid_path)?; - info!("Daemonized node process with pid {}", std::process::id()); - - Ok(true) -} - -#[cfg(unix)] -pub fn handle_control_command() -> Result> { - let args: Vec = env::args().skip(1).collect(); - let pid_path = pid_file_path(); - - // The control commands operate through the PID file rather than by interacting - // with a terminal session, so they work even after the node detaches. - if args.iter().any(|arg| arg == "--status") { - if existing_process_running(&pid_path)? { - let pid = read_pid(&pid_path)?; - println!("Contractless is running with pid {pid}."); - } else if pid_path.exists() { - println!( - "Contractless is not running, but a stale PID file exists at {}.", - pid_path.display() - ); - } else { - println!("Contractless is not running."); - } - return Ok(true); - } - - if args.iter().any(|arg| arg == "--stop") { - if !pid_path.exists() { - println!("Contractless is not running."); - return Ok(true); - } - - let pid = read_pid(&pid_path)?; - // SIGTERM triggers install_shutdown_cleanup in the daemon process. - match kill(Pid::from_raw(pid), Some(Signal::SIGTERM)) { - Ok(_) => { - println!("Sent SIGTERM to Contractless process {pid}."); +#[cfg(unix)] +use crate::common::network_paths_and_settings::block_extension_and_paths; +#[cfg(unix)] +use crate::env; +#[cfg(unix)] +use crate::lazy_static; +#[cfg(unix)] +use crate::log::{error, info, warn}; +#[cfg(unix)] +use crate::sled::Db; +#[cfg(unix)] +use crate::task; +#[cfg(unix)] +use crate::PathBuf; +#[cfg(unix)] +use nix::sys::signal::{kill, Signal}; +#[cfg(unix)] +use nix::unistd::{daemon, Pid}; +#[cfg(unix)] +use std::sync::Mutex as StdMutex; + +#[cfg(unix)] +lazy_static! { + static ref PID_FILE_PATH: StdMutex> = StdMutex::new(None); +} + +#[cfg(unix)] +fn should_daemonize() -> bool { + // --foreground keeps Linux startup attached to the terminal for debugging. + !env::args().any(|arg| arg == "--foreground") +} + +#[cfg(unix)] +fn read_pid(pid_path: &PathBuf) -> Result> { + // PID files are plain text so shell tools can inspect them too. + let pid_contents = std::fs::read_to_string(pid_path)?; + Ok(pid_contents.trim().parse::()?) +} + +#[cfg(unix)] +fn pid_file_path() -> PathBuf { + // PID files live under the network-scoped db path so testnet/mainnet never collide. + let ( + _network_name, + _padded_base_coin, + _suffix, + _torrent_path, + _wallet_path, + _block_path, + db_path, + _balance_path, + _log_path, + ) = block_extension_and_paths(); + PathBuf::from(db_path).join("contractless.pid") +} + +#[cfg(unix)] +fn existing_process_running(pid_path: &PathBuf) -> Result> { + // The PID file is treated as the source of truth for duplicate-start checks. + if !pid_path.exists() { + return Ok(false); + } + + let pid = read_pid(pid_path)?; + + match kill(Pid::from_raw(pid), None) { + Ok(_) => Ok(true), + Err(_) => Ok(false), + } +} + +#[cfg(unix)] +fn remove_pid_file_if_present(pid_path: &PathBuf) { + // Missing PID files are harmless during cleanup. + let _ = std::fs::remove_file(pid_path); +} + +#[cfg(unix)] +fn write_pid_file(pid_path: &PathBuf) -> Result<(), Box> { + // The PID file is written only after the process has detached so later control + // commands point at the background daemon rather than the original shell session. + if let Some(parent) = pid_path.parent() { + std::fs::create_dir_all(parent)?; + } + + let pid = std::process::id().to_string(); + std::fs::write(pid_path, pid)?; + + let mut slot = PID_FILE_PATH.lock().expect("failed to lock pid file slot"); + *slot = Some(pid_path.clone()); + + Ok(()) +} + +#[cfg(unix)] +pub fn remove_registered_pid_file() { + let pid_path = { + let slot = PID_FILE_PATH.lock().expect("failed to lock pid file slot"); + slot.clone() + }; + + if let Some(path) = pid_path { + remove_pid_file_if_present(&path); + } +} + +#[cfg(unix)] +pub fn daemonize_after_wallet_prompt() -> Result> { + // Linux waits until after the wallet prompt to detach so the encryption key never + // needs to be handed off to a second process. + if !should_daemonize() { + return Ok(false); + } + + let pid_path = pid_file_path(); + // Refuse to detach if an active process already owns the network-scoped PID file. + if existing_process_running(&pid_path)? { + return Err(format!( + "Contractless is already running. Remove stale PID file if needed: {}", + pid_path.display() + ) + .into()); + } + + remove_pid_file_if_present(&pid_path); + // daemon(true, false) keeps the working directory but redirects stdio for background mode. + daemon(true, false)?; + write_pid_file(&pid_path)?; + info!("Daemonized node process with pid {}", std::process::id()); + + Ok(true) +} + +#[cfg(unix)] +pub fn handle_control_command() -> Result> { + let args: Vec = env::args().skip(1).collect(); + let pid_path = pid_file_path(); + + // The control commands operate through the PID file rather than by interacting + // with a terminal session, so they work even after the node detaches. + if args.iter().any(|arg| arg == "--status") { + if existing_process_running(&pid_path)? { + let pid = read_pid(&pid_path)?; + println!("Contractless is running with pid {pid}."); + } else if pid_path.exists() { + println!( + "Contractless is not running, but a stale PID file exists at {}.", + pid_path.display() + ); + } else { + println!("Contractless is not running."); + } + return Ok(true); + } + + if args.iter().any(|arg| arg == "--stop") { + if !pid_path.exists() { + println!("Contractless is not running."); + return Ok(true); + } + + let pid = read_pid(&pid_path)?; + // SIGTERM triggers install_shutdown_cleanup in the daemon process. + match kill(Pid::from_raw(pid), Some(Signal::SIGTERM)) { + Ok(_) => { + println!("Sent SIGTERM to Contractless process {pid}."); } Err(err) => { remove_pid_file_if_present(&pid_path); return Err(format!("Failed to stop Contractless process {pid}: {err}").into()); } } - - return Ok(true); - } - - Ok(false) -} - -#[cfg(unix)] -pub fn install_shutdown_cleanup(db: Db) { - task::spawn(async move { - use tokio::signal::unix::{signal, SignalKind}; - - // The Unix shutdown handler flushes sled and removes the PID file so stop, - // reboot, and signal-driven exits leave the node in a clean state. - let mut sigterm = match signal(SignalKind::terminate()) { - Ok(stream) => stream, - Err(err) => { - error!("Failed to register SIGTERM handler: {err}"); - return; - } - }; - - let mut sigint = match signal(SignalKind::interrupt()) { - Ok(stream) => stream, - Err(err) => { - error!("Failed to register SIGINT handler: {err}"); - return; - } - }; - - tokio::select! { - _ = sigterm.recv() => { - warn!("Received SIGTERM, shutting down."); - } - _ = sigint.recv() => { - warn!("Received SIGINT, shutting down."); - } - } - - if let Err(err) = db.flush_async().await { - error!("Failed to flush sled during shutdown: {err}"); - } - - // Removing the PID file here lets the next startup proceed without manual cleanup. - remove_registered_pid_file(); - std::process::exit(0); - }); -} - -#[cfg(not(unix))] -pub fn daemonize_after_wallet_prompt() -> Result> { - Ok(false) -} - -#[cfg(not(unix))] -pub fn handle_control_command() -> Result> { - Ok(false) -} - -#[cfg(not(unix))] -pub fn install_shutdown_cleanup(_db: crate::sled::Db) {} - -#[cfg(not(unix))] -pub fn remove_registered_pid_file() {} + + return Ok(true); + } + + Ok(false) +} + +#[cfg(unix)] +pub fn install_shutdown_cleanup(db: Db) { + task::spawn(async move { + use tokio::signal::unix::{signal, SignalKind}; + + // The Unix shutdown handler flushes sled and removes the PID file so stop, + // reboot, and signal-driven exits leave the node in a clean state. + let mut sigterm = match signal(SignalKind::terminate()) { + Ok(stream) => stream, + Err(err) => { + error!("Failed to register SIGTERM handler: {err}"); + return; + } + }; + + let mut sigint = match signal(SignalKind::interrupt()) { + Ok(stream) => stream, + Err(err) => { + error!("Failed to register SIGINT handler: {err}"); + return; + } + }; + + tokio::select! { + _ = sigterm.recv() => { + warn!("Received SIGTERM, shutting down."); + } + _ = sigint.recv() => { + warn!("Received SIGINT, shutting down."); + } + } + + if let Err(err) = db.flush_async().await { + error!("Failed to flush sled during shutdown: {err}"); + } + + // Removing the PID file here lets the next startup proceed without manual cleanup. + remove_registered_pid_file(); + std::process::exit(0); + }); +} + +#[cfg(not(unix))] +pub fn daemonize_after_wallet_prompt() -> Result> { + Ok(false) +} + +#[cfg(not(unix))] +pub fn handle_control_command() -> Result> { + Ok(false) +} + +#[cfg(not(unix))] +pub fn install_shutdown_cleanup(_db: crate::sled::Db) {} + +#[cfg(not(unix))] +pub fn remove_registered_pid_file() {} diff --git a/src/startup/unlock_pipe.rs b/src/startup/unlock_pipe.rs index 97476df..fc7973b 100644 --- a/src/startup/unlock_pipe.rs +++ b/src/startup/unlock_pipe.rs @@ -1,205 +1,205 @@ -#[cfg(windows)] -use crate::common::network_paths_and_settings::block_extension_and_paths; -#[cfg(windows)] -use crate::from_slice; -#[cfg(windows)] -use crate::log::{error, info, warn}; -#[cfg(windows)] -use crate::startup::unlock_structs::{ServiceWaitState, UnlockPipeRequest, UnlockPipeResponse}; -#[cfg(windows)] -use crate::to_string; -#[cfg(windows)] -use crate::wallets::structures::Wallet; -#[cfg(windows)] -use crate::Arc; -#[cfg(windows)] -use crate::{sleep, timeout, AsyncReadExt, AsyncWriteExt, Duration, RwLock}; -#[cfg(windows)] -use crate::{AtomicBool, AtomicOrdering}; -#[cfg(windows)] -use tokio::net::windows::named_pipe::ServerOptions; -#[cfg(windows)] -use tokio::sync::mpsc; -#[cfg(windows)] -use windows_sys::Win32::Foundation::LocalFree; -#[cfg(windows)] -use windows_sys::Win32::Security::Authorization::ConvertStringSecurityDescriptorToSecurityDescriptorW; -#[cfg(windows)] -use windows_sys::Win32::Security::{PSECURITY_DESCRIPTOR, SECURITY_ATTRIBUTES}; - -#[cfg(windows)] -pub fn pipe_name() -> String { - // Include the active network name so testnet and mainnet services never share a pipe. - let ( - network_name, - _padded_base_coin, - _suffix, - _torrent_path, - _wallet_path, - _block_path, - _db_path, - _balance_path, - _log_path, - ) = block_extension_and_paths(); - format!(r"\\.\pipe\contractless_{network_name}_unlock") -} - -#[cfg(windows)] -fn create_pipe_server( - pipe_name: &str, - first_instance: bool, -) -> std::io::Result { - // The unlock pipe is local-only IPC between the Windows service and the helper - // tool that submits the wallet key after the service has already started. - let mut options = ServerOptions::new(); - options.reject_remote_clients(true); - if first_instance { - options.first_pipe_instance(true); - } - - // Allow the service, local administrators, interactive users, and normal local users - // to talk to the unlock pipe while still rejecting remote clients at the pipe layer. - let security_descriptor = wide_null("D:P(A;;GA;;;SY)(A;;GA;;;BA)(A;;GRGW;;;IU)(A;;GRGW;;;BU)"); - let mut raw_sd: PSECURITY_DESCRIPTOR = std::ptr::null_mut(); - - let converted = unsafe { - ConvertStringSecurityDescriptorToSecurityDescriptorW( - security_descriptor.as_ptr(), - 1, - &mut raw_sd, - std::ptr::null_mut(), - ) - }; - - if converted == 0 || raw_sd.is_null() { - return Err(std::io::Error::last_os_error()); - } - - let mut attrs = SECURITY_ATTRIBUTES { - nLength: std::mem::size_of::() as u32, - lpSecurityDescriptor: raw_sd as *mut _, - bInheritHandle: 0, - }; - - let server = unsafe { - options.create_with_security_attributes_raw(pipe_name, &mut attrs as *mut _ as *mut _) - }; - - unsafe { - LocalFree(raw_sd as *mut _); - } - - server -} - -#[cfg(windows)] -fn wide_null(value: &str) -> Vec { - // Windows security APIs expect null-terminated UTF-16 strings. - value.encode_utf16().chain(std::iter::once(0)).collect() -} - -#[cfg(windows)] +#[cfg(windows)] +use crate::common::network_paths_and_settings::block_extension_and_paths; +#[cfg(windows)] +use crate::from_slice; +#[cfg(windows)] +use crate::log::{error, info, warn}; +#[cfg(windows)] +use crate::startup::unlock_structs::{ServiceWaitState, UnlockPipeRequest, UnlockPipeResponse}; +#[cfg(windows)] +use crate::to_string; +#[cfg(windows)] +use crate::wallets::structures::Wallet; +#[cfg(windows)] +use crate::Arc; +#[cfg(windows)] +use crate::{sleep, timeout, AsyncReadExt, AsyncWriteExt, Duration, RwLock}; +#[cfg(windows)] +use crate::{AtomicBool, AtomicOrdering}; +#[cfg(windows)] +use tokio::net::windows::named_pipe::ServerOptions; +#[cfg(windows)] +use tokio::sync::mpsc; +#[cfg(windows)] +use windows_sys::Win32::Foundation::LocalFree; +#[cfg(windows)] +use windows_sys::Win32::Security::Authorization::ConvertStringSecurityDescriptorToSecurityDescriptorW; +#[cfg(windows)] +use windows_sys::Win32::Security::{PSECURITY_DESCRIPTOR, SECURITY_ATTRIBUTES}; + +#[cfg(windows)] +pub fn pipe_name() -> String { + // Include the active network name so testnet and mainnet services never share a pipe. + let ( + network_name, + _padded_base_coin, + _suffix, + _torrent_path, + _wallet_path, + _block_path, + _db_path, + _balance_path, + _log_path, + ) = block_extension_and_paths(); + format!(r"\\.\pipe\contractless_{network_name}_unlock") +} + +#[cfg(windows)] +fn create_pipe_server( + pipe_name: &str, + first_instance: bool, +) -> std::io::Result { + // The unlock pipe is local-only IPC between the Windows service and the helper + // tool that submits the wallet key after the service has already started. + let mut options = ServerOptions::new(); + options.reject_remote_clients(true); + if first_instance { + options.first_pipe_instance(true); + } + + // Allow the service, local administrators, interactive users, and normal local users + // to talk to the unlock pipe while still rejecting remote clients at the pipe layer. + let security_descriptor = wide_null("D:P(A;;GA;;;SY)(A;;GA;;;BA)(A;;GRGW;;;IU)(A;;GRGW;;;BU)"); + let mut raw_sd: PSECURITY_DESCRIPTOR = std::ptr::null_mut(); + + let converted = unsafe { + ConvertStringSecurityDescriptorToSecurityDescriptorW( + security_descriptor.as_ptr(), + 1, + &mut raw_sd, + std::ptr::null_mut(), + ) + }; + + if converted == 0 || raw_sd.is_null() { + return Err(std::io::Error::last_os_error()); + } + + let mut attrs = SECURITY_ATTRIBUTES { + nLength: std::mem::size_of::() as u32, + lpSecurityDescriptor: raw_sd as *mut _, + bInheritHandle: 0, + }; + + let server = unsafe { + options.create_with_security_attributes_raw(pipe_name, &mut attrs as *mut _ as *mut _) + }; + + unsafe { + LocalFree(raw_sd as *mut _); + } + + server +} + +#[cfg(windows)] +fn wide_null(value: &str) -> Vec { + // Windows security APIs expect null-terminated UTF-16 strings. + value.encode_utf16().chain(std::iter::once(0)).collect() +} + +#[cfg(windows)] pub async fn run_unlock_pipe_server( service_state: Arc>, shutdown: Arc, unlock_sender: mpsc::UnboundedSender>, ) { - let pipe_name = pipe_name(); - let mut first_instance = true; - - info!("Named pipe listener started at {pipe_name}"); - - // A new pipe instance is created for each request so the service can accept - // repeated status checks and a later wallet submission through the same name. - while !shutdown.load(AtomicOrdering::SeqCst) { - let mut server = match create_pipe_server(&pipe_name, first_instance) { + let pipe_name = pipe_name(); + let mut first_instance = true; + + info!("Named pipe listener started at {pipe_name}"); + + // A new pipe instance is created for each request so the service can accept + // repeated status checks and a later wallet submission through the same name. + while !shutdown.load(AtomicOrdering::SeqCst) { + let mut server = match create_pipe_server(&pipe_name, first_instance) { Ok(server) => server, Err(err) => { error!("Failed to create named pipe {pipe_name}: {err}"); sleep(Duration::from_secs(1)).await; continue; } - }; - - first_instance = false; - - // Use a short connect timeout so shutdown checks are not blocked by an idle pipe. - match timeout(Duration::from_secs(1), server.connect()).await { - Ok(Ok(())) => {} - Ok(Err(err)) => { - warn!("Named pipe connect failed: {err}"); - continue; - } - Err(_) => { - continue; - } - } - - // Requests are length-prefixed JSON so the service can read exactly one command. - let request_len = match server.read_u32_le().await { - Ok(len) => len as usize, - Err(err) => { - warn!("Named pipe length read failed: {err}"); - continue; - } - }; - - let mut request_bytes = vec![0u8; request_len]; - if let Err(err) = server.read_exact(&mut request_bytes).await { - warn!("Named pipe read failed: {err}"); - continue; - } - - let response = - handle_request(&request_bytes, service_state.clone(), unlock_sender.clone()).await; - - // Responses use the same length-prefixed JSON shape as requests. - let response_bytes = match to_string(&response) { - Ok(json) => json.into_bytes(), - Err(err) => { - error!("Failed to serialize named pipe response: {err}"); - continue; - } - }; - - if let Err(err) = server.write_u32_le(response_bytes.len() as u32).await { - warn!("Named pipe response length write failed: {err}"); - continue; - } - - if let Err(err) = server.write_all(&response_bytes).await { - warn!("Named pipe write failed: {err}"); - } - } - - info!("Named pipe listener stopped."); -} - -#[cfg(windows)] + }; + + first_instance = false; + + // Use a short connect timeout so shutdown checks are not blocked by an idle pipe. + match timeout(Duration::from_secs(1), server.connect()).await { + Ok(Ok(())) => {} + Ok(Err(err)) => { + warn!("Named pipe connect failed: {err}"); + continue; + } + Err(_) => { + continue; + } + } + + // Requests are length-prefixed JSON so the service can read exactly one command. + let request_len = match server.read_u32_le().await { + Ok(len) => len as usize, + Err(err) => { + warn!("Named pipe length read failed: {err}"); + continue; + } + }; + + let mut request_bytes = vec![0u8; request_len]; + if let Err(err) = server.read_exact(&mut request_bytes).await { + warn!("Named pipe read failed: {err}"); + continue; + } + + let response = + handle_request(&request_bytes, service_state.clone(), unlock_sender.clone()).await; + + // Responses use the same length-prefixed JSON shape as requests. + let response_bytes = match to_string(&response) { + Ok(json) => json.into_bytes(), + Err(err) => { + error!("Failed to serialize named pipe response: {err}"); + continue; + } + }; + + if let Err(err) = server.write_u32_le(response_bytes.len() as u32).await { + warn!("Named pipe response length write failed: {err}"); + continue; + } + + if let Err(err) = server.write_all(&response_bytes).await { + warn!("Named pipe write failed: {err}"); + } + } + + info!("Named pipe listener stopped."); +} + +#[cfg(windows)] async fn handle_request( request_bytes: &[u8], service_state: Arc>, unlock_sender: mpsc::UnboundedSender>, ) -> UnlockPipeResponse { - // Malformed helper requests are reported back through the pipe instead of panicking the service. - let request = match from_slice::(request_bytes) { - Ok(request) => request, - Err(err) => { - return UnlockPipeResponse::Error { - message: format!("Invalid request payload: {err}"), - }; - } - }; - - match request { - UnlockPipeRequest::Ping => UnlockPipeResponse::Pong, - UnlockPipeRequest::Status => { - let state = *service_state.read().await; - UnlockPipeResponse::Status { state } - } - UnlockPipeRequest::SubmitKey { wallet_key } => { - // The service only accepts a wallet key while it is still in the locked - // waiting state, and it validates the key before allowing normal startup. + // Malformed helper requests are reported back through the pipe instead of panicking the service. + let request = match from_slice::(request_bytes) { + Ok(request) => request, + Err(err) => { + return UnlockPipeResponse::Error { + message: format!("Invalid request payload: {err}"), + }; + } + }; + + match request { + UnlockPipeRequest::Ping => UnlockPipeResponse::Pong, + UnlockPipeRequest::Status => { + let state = *service_state.read().await; + UnlockPipeResponse::Status { state } + } + UnlockPipeRequest::SubmitKey { wallet_key } => { + // The service only accepts a wallet key while it is still in the locked + // waiting state, and it validates the key before allowing normal startup. let current_state = *service_state.read().await; if !matches!(current_state, ServiceWaitState::WaitingForUnlock) { return UnlockPipeResponse::Error { @@ -208,7 +208,7 @@ async fn handle_request( ), }; } - + match Wallet::try_obtain_wallet(wallet_key, None).await { Ok(wallet) => { // Mark unlocked before sending the wallet so status checks immediately reflect progress. @@ -222,19 +222,19 @@ async fn handle_request( let mut state = service_state.write().await; *state = ServiceWaitState::WaitingForUnlock; return UnlockPipeResponse::Error { - message: "Service failed to accept the unlock request.".to_string(), - }; - } - - UnlockPipeResponse::KeyAccepted - } - Err(err) => UnlockPipeResponse::Error { message: err }, - } - } - } -} - -#[cfg(not(windows))] -pub fn pipe_name() -> String { - String::new() -} + message: "Service failed to accept the unlock request.".to_string(), + }; + } + + UnlockPipeResponse::KeyAccepted + } + Err(err) => UnlockPipeResponse::Error { message: err }, + } + } + } +} + +#[cfg(not(windows))] +pub fn pipe_name() -> String { + String::new() +} diff --git a/src/torrent/structs.rs b/src/torrent/structs.rs index ef07341..f42706f 100644 --- a/src/torrent/structs.rs +++ b/src/torrent/structs.rs @@ -52,6 +52,7 @@ pub struct DownloadSave { pub block_number: u32, pub allow_during_reorg: bool, pub allow_historical: bool, + pub allow_startup_peers: bool, pub db: Db, pub verification_service: Arc, pub map: Arc>, diff --git a/src/torrent/torrenting_system/download_pieces.rs b/src/torrent/torrenting_system/download_pieces.rs index 50520f3..4c7b02a 100644 --- a/src/torrent/torrenting_system/download_pieces.rs +++ b/src/torrent/torrenting_system/download_pieces.rs @@ -7,7 +7,9 @@ use crate::records::memory::torrentmap::{PieceReservation, TorrentMap}; use crate::sleep; use crate::torrent::structs::PieceStatus; use crate::torrent::structs::{DownloadSave, DownloadedPieceJob, PieceDownloadJob, RequestPiece}; -use crate::torrent::torrenting_system::get_nodes::get_nodes_from_memory; +use crate::torrent::torrenting_system::get_nodes::{ + get_nodes_from_memory, get_sync_nodes_from_memory, +}; use crate::torrent::torrenting_system::request_piece::request_piece_from_node; use crate::torrent::torrenting_system::temp_database_storage::remove_block_pieces_from_db; use crate::torrent::torrenting_system::temp_database_storage::save_piece_to_db; @@ -210,7 +212,11 @@ pub async fn download_block_pieces(params: DownloadSave) -> Result<(), String> { // Re-scan connected peers for any piece that is still pending or // has failed and needs another download attempt. - let connected_nodes = get_nodes_from_memory().await; + let connected_nodes = if params.allow_startup_peers { + get_sync_nodes_from_memory().await + } else { + get_nodes_from_memory().await + }; if connected_nodes.is_empty() { return Err(format!( "No connected miner peers available for block {}", diff --git a/src/torrent/torrenting_system/get_nodes.rs b/src/torrent/torrenting_system/get_nodes.rs index b0a5fbd..02722da 100644 --- a/src/torrent/torrenting_system/get_nodes.rs +++ b/src/torrent/torrenting_system/get_nodes.rs @@ -31,6 +31,31 @@ pub async fn get_nodes_from_memory() -> Vec<(String, Arc>)> { nodes } +pub async fn get_sync_nodes_from_memory() -> Vec<(String, Arc>)> { + // Startup chain sync happens before a peer is marked fully ready for + // mining/consensus, but after registry and network-map sync completed. + let connection_storage = CONNECTIONS.read().await; + let mut nodes = Vec::new(); + if let Some(connection) = &*connection_storage { + for connection_info in connection.connection_map.values() { + if ClientType::from_bytes(&connection_info.client_type) != Some(ClientType::Miner) { + continue; + } + if !connection_info.ready + && !(connection_info.wallet_registry_synced && connection_info.network_map_synced) + { + continue; + } + let ip = binary_to_ip(connection_info.ip.clone()); + let port = connection_info.port; + let key = format!("{ip}:{port}"); + let stream_arc = Arc::clone(&connection_info.stream); + nodes.push((key, stream_arc)); + } + } + nodes +} + pub async fn get_torrent_broadcast_nodes_from_memory() -> Vec<(String, Arc>)> { // Torrent announcements are allowed to reach miner peers that are // still starting/syncing so they can stage new candidates while diff --git a/src/torrent/torrenting_system/save_torrent.rs b/src/torrent/torrenting_system/save_torrent.rs index c6fa492..e6f0ecc 100644 --- a/src/torrent/torrenting_system/save_torrent.rs +++ b/src/torrent/torrenting_system/save_torrent.rs @@ -2,70 +2,70 @@ use crate::common::network_paths_and_settings::block_extension_and_paths; use crate::torrent::structs::Torrent; use crate::{create_dir_all, fs, read, read_dir, remove_file, AsyncWriteExt, Path}; use std::io::ErrorKind; - -fn staged_torrent_dir() -> Result { - // Keep staged torrents under a dedicated subdirectory so incoming - // network data is separated from canonical saved torrent files. - let ( - _network_name, - _padded_base_coin, - _block_ext, - out_path, - _wallet_path, - _block_path, - _db_path, - _balance_path, - _log_path, - ) = block_extension_and_paths(); - let staging_dir = Path::new(&out_path).join("staging"); - Ok(staging_dir) -} - + +fn staged_torrent_dir() -> Result { + // Keep staged torrents under a dedicated subdirectory so incoming + // network data is separated from canonical saved torrent files. + let ( + _network_name, + _padded_base_coin, + _block_ext, + out_path, + _wallet_path, + _block_path, + _db_path, + _balance_path, + _log_path, + ) = block_extension_and_paths(); + let staging_dir = Path::new(&out_path).join("staging"); + Ok(staging_dir) +} + fn staged_torrent_path(height: u32, info_hash: &str) -> std::path::PathBuf { // Each candidate gets its own deterministic staging path, so competing // torrents at the same height cannot race over a shared suffix. let ( _network_name, _padded_base_coin, - _block_ext, - out_path, - _wallet_path, - _block_path, - _db_path, - _balance_path, - _log_path, + _block_ext, + out_path, + _wallet_path, + _block_path, + _db_path, + _balance_path, + _log_path, ) = block_extension_and_paths(); Path::new(&out_path) .join("staging") .join(format!("{height}.{info_hash}.torrent")) } - -fn canonical_torrent_path(height: u32) -> std::path::PathBuf { - let ( - _network_name, - _padded_base_coin, - _block_ext, - out_path, - _wallet_path, - _block_path, - _db_path, - _balance_path, - _log_path, - ) = block_extension_and_paths(); - Path::new(&out_path).join(format!("{height}.torrent")) -} - -async fn torrent_bytes_match(path: &Path, torrent_bytes: &[u8]) -> Result { - if !path.exists() { - return Ok(false); - } - - let existing = read(path) - .await - .map_err(|e| format!("Failed to read torrent {}: {}", path.display(), e))?; - Ok(existing == torrent_bytes) -} - + +fn canonical_torrent_path(height: u32) -> std::path::PathBuf { + let ( + _network_name, + _padded_base_coin, + _block_ext, + out_path, + _wallet_path, + _block_path, + _db_path, + _balance_path, + _log_path, + ) = block_extension_and_paths(); + Path::new(&out_path).join(format!("{height}.torrent")) +} + +async fn torrent_bytes_match(path: &Path, torrent_bytes: &[u8]) -> Result { + if !path.exists() { + return Ok(false); + } + + let existing = read(path) + .await + .map_err(|e| format!("Failed to read torrent {}: {}", path.display(), e))?; + Ok(existing == torrent_bytes) +} + fn parse_staged_torrent_file_name(file_name: &str) -> Option<(u32, String)> { // New staged names encode both height and candidate hash: // `..torrent`. @@ -84,58 +84,58 @@ fn parse_staged_torrent_file_name(file_name: &str) -> Option<(u32, String)> { let suffix = suffix_str.parse::().ok()?; Some((height, format!("legacy-{suffix:010}"))) } - -pub async fn list_staged_torrents() -> Result, String> { + +pub async fn list_staged_torrents() -> Result, String> { // Enumerate staged torrents in height/candidate order so replay and // cleanup logic handle them in a stable sequence. - let staging_dir = staged_torrent_dir()?; - - if !staging_dir.exists() { - return Ok(Vec::new()); - } - - let mut entries = read_dir(&staging_dir) - .await - .map_err(|e| format!("Failed to read staging directory: {e}"))?; - let mut staged = Vec::new(); - - while let Some(entry) = entries - .next_entry() - .await - .map_err(|e| format!("Failed to iterate staging directory: {e}"))? - { - let path = entry.path(); - let Some(file_name) = path.file_name().and_then(|name| name.to_str()) else { - continue; - }; + let staging_dir = staged_torrent_dir()?; + + if !staging_dir.exists() { + return Ok(Vec::new()); + } + + let mut entries = read_dir(&staging_dir) + .await + .map_err(|e| format!("Failed to read staging directory: {e}"))?; + let mut staged = Vec::new(); + + while let Some(entry) = entries + .next_entry() + .await + .map_err(|e| format!("Failed to iterate staging directory: {e}"))? + { + let path = entry.path(); + let Some(file_name) = path.file_name().and_then(|name| name.to_str()) else { + continue; + }; if let Some((height, candidate_key)) = parse_staged_torrent_file_name(file_name) { staged.push((height, candidate_key, path)); } } - - staged.sort_by(|a, b| a.0.cmp(&b.0).then(a.1.cmp(&b.1))); - Ok(staged - .into_iter() - .map(|(height, _suffix, path)| (height, path)) - .collect()) -} - -pub async fn list_staged_torrents_for_height( - height: u32, -) -> Result, String> { - let staged = list_staged_torrents().await?; - Ok(staged - .into_iter() - .filter_map(|(staged_height, path)| { - if staged_height == height { - Some(path) - } else { - None - } - }) - .collect()) -} - + + staged.sort_by(|a, b| a.0.cmp(&b.0).then(a.1.cmp(&b.1))); + Ok(staged + .into_iter() + .map(|(height, _suffix, path)| (height, path)) + .collect()) +} + +pub async fn list_staged_torrents_for_height( + height: u32, +) -> Result, String> { + let staged = list_staged_torrents().await?; + Ok(staged + .into_iter() + .filter_map(|(staged_height, path)| { + if staged_height == height { + Some(path) + } else { + None + } + }) + .collect()) +} + pub async fn save_staged_torrent(height: u32, torrent_bytes: &[u8]) -> Result { // Store this candidate by its advertised block info hash. That makes // staging idempotent per candidate and prevents same-height races from @@ -183,9 +183,9 @@ pub async fn save_staged_torrent(height: u32, torrent_bytes: &[u8]) -> Result return Err(format!("Failed to create staged torrent file: {err}")), }; - - // Keep torrent staging separate from the canonical torrent file so - // validation and orphan logic can decide when promotion is safe. + + // Keep torrent staging separate from the canonical torrent file so + // validation and orphan logic can decide when promotion is safe. torrent_file .write_all(torrent_bytes) .await @@ -194,55 +194,55 @@ pub async fn save_staged_torrent(height: u32, torrent_bytes: &[u8]) -> Result Result, String> { - read(path) - .await - .map_err(|e| format!("Failed to read staged torrent {}: {}", path.display(), e)) -} - -pub async fn remove_staged_torrent(path: &Path) -> Result<(), String> { - if path.exists() { - remove_file(path) - .await - .map_err(|e| format!("Failed to remove staged torrent {}: {}", path.display(), e))?; - } - Ok(()) -} - -pub async fn remove_staged_torrents_for_height(height: u32) -> Result<(), String> { - // Remove every staged copy for the requested height so stale - // torrents do not interfere with later replay or promotion. - let staged = list_staged_torrents().await?; - for (staged_height, path) in staged { - if staged_height == height { - remove_staged_torrent(&path).await?; - } - } - Ok(()) -} - -pub async fn prune_staged_torrents(current_height: u32) -> Result<(), String> { - // Snapshot-based pruning keeps recent orphan evidence available - // until the trusted rollback floor advances past it. - let cutoff = current_height; - let staged = list_staged_torrents().await?; - for (staged_height, path) in staged { - if staged_height <= cutoff { - remove_staged_torrent(&path).await?; - } - } - Ok(()) -} - -pub fn promote_staged_torrent(staged_path: &Path, height: u32) -> Result { - // Promotion moves the selected staged torrent into the canonical - // filename once the corresponding block has been saved. - let canonical_path = canonical_torrent_path(height); - + + Ok(torrent_file_path.to_string_lossy().to_string()) +} + +pub async fn read_staged_torrent(path: &Path) -> Result, String> { + read(path) + .await + .map_err(|e| format!("Failed to read staged torrent {}: {}", path.display(), e)) +} + +pub async fn remove_staged_torrent(path: &Path) -> Result<(), String> { + if path.exists() { + remove_file(path) + .await + .map_err(|e| format!("Failed to remove staged torrent {}: {}", path.display(), e))?; + } + Ok(()) +} + +pub async fn remove_staged_torrents_for_height(height: u32) -> Result<(), String> { + // Remove every staged copy for the requested height so stale + // torrents do not interfere with later replay or promotion. + let staged = list_staged_torrents().await?; + for (staged_height, path) in staged { + if staged_height == height { + remove_staged_torrent(&path).await?; + } + } + Ok(()) +} + +pub async fn prune_staged_torrents(current_height: u32) -> Result<(), String> { + // Snapshot-based pruning keeps recent orphan evidence available + // until the trusted rollback floor advances past it. + let cutoff = current_height; + let staged = list_staged_torrents().await?; + for (staged_height, path) in staged { + if staged_height <= cutoff { + remove_staged_torrent(&path).await?; + } + } + Ok(()) +} + +pub fn promote_staged_torrent(staged_path: &Path, height: u32) -> Result { + // Promotion moves the selected staged torrent into the canonical + // filename once the corresponding block has been saved. + let canonical_path = canonical_torrent_path(height); + if !staged_path.exists() { return Err(format!( "Staged torrent does not exist for block {} at {}", @@ -257,6 +257,6 @@ pub fn promote_staged_torrent(staged_path: &Path, height: u32) -> Result, map: Arc>, @@ -36,6 +37,7 @@ pub async fn setup_download( block_number, allow_during_reorg, allow_historical: false, + allow_startup_peers, db: db.clone(), verification_service, map, diff --git a/src/torrent/torrenting_system/torrent_requests.rs b/src/torrent/torrenting_system/torrent_requests.rs index c3d118b..9d64eb9 100644 --- a/src/torrent/torrenting_system/torrent_requests.rs +++ b/src/torrent/torrenting_system/torrent_requests.rs @@ -41,6 +41,7 @@ pub async fn handle_response_and_save_torrent( wallet: Arc, map: Arc>, allow_during_reorg: bool, + allow_startup_peers: bool, rebroadcast: bool, ) -> Result<(), String> { let Some((torrent, staged_path)) = @@ -55,6 +56,7 @@ pub async fn handle_response_and_save_torrent( torrent, staged_path, allow_during_reorg, + allow_startup_peers, db.clone(), map.clone(), ) @@ -98,6 +100,7 @@ pub async fn process_torrent_response(params: ProcessTorrentResponse) -> Result< torrent, staged_path, params.allow_during_reorg, + false, params.db, params.map.clone(), ) @@ -143,6 +146,7 @@ pub async fn setup_download_for_torrent( torrent: Torrent, staged_path: String, allow_during_reorg: bool, + allow_startup_peers: bool, db: Db, map: Arc>, ) -> Result<(), String> { @@ -156,6 +160,7 @@ pub async fn setup_download_for_torrent( torrent, staged_path, allow_during_reorg, + allow_startup_peers, db, Arc::new(verification_service), map, diff --git a/src/torrent/unpack_local_torrent.rs b/src/torrent/unpack_local_torrent.rs index 2537c7d..c3a0dce 100644 --- a/src/torrent/unpack_local_torrent.rs +++ b/src/torrent/unpack_local_torrent.rs @@ -1,68 +1,68 @@ -use crate::common::network_paths_and_settings::block_extension_and_paths; -use crate::records::block_height::decrease_block_height::decrease_height; -use crate::records::block_height::get_block_height::get_height; -use crate::sled::Db; -use crate::torrent::structs::Torrent; -use crate::AsyncReadExt; -use crate::File; -use crate::Path; -use crate::PathBuf; - -async fn get_or_correct_local_torrent(db: &Db, height: u32) -> Result { - // Look for the canonical torrent file for this network and height. - let ( - _network_name, - _padded_base_coin, - _file_ext, - torrent_path, - _wallet_path, - _block_path, - _db_path, - _balance_path, - _log_path, - ) = block_extension_and_paths(); - - let torrent_file = PathBuf::from(&torrent_path) - .join(format!("{height}.torrent")) - .to_string_lossy() - .into_owned(); - let file_exists = Path::new(&torrent_file).exists(); - if file_exists { - // Existing canonical torrent path can be loaded by the caller. - Ok(torrent_file) - } else { - // If the chain height points past available torrent data, step the recorded height back. - let check_height = get_height(db); - if check_height > 0 { - let torrent_file = PathBuf::from(torrent_path) - .join(format!("{height}.torrent")) - .to_string_lossy() - .into_owned(); - - let file_exists = Path::new(&torrent_file).exists(); - if !file_exists { - decrease_height(check_height - 1, db); - } - } - Err("Failed to load torrent".to_string()) - } -} - -pub async fn load_torrent(db: &Db, height: u32) -> Result { - let torrent = get_or_correct_local_torrent(db, height).await?; - - if let Ok(mut torrent_file) = File::open(&torrent).await { - // Torrent files are stored in the compact binary format from Torrent::to_bytes. - let mut torrent_contents = Vec::new(); - torrent_file - .read_to_end(&mut torrent_contents) - .await - .map_err(|e| e.to_string())?; - // Convert the saved bytes back into the in-memory metadata struct. - Torrent::from_bytes(&torrent_contents) - .await - .map_err(|e| e.to_string()) - } else { - Err("Could not open torrent".to_string()) - } -} +use crate::common::network_paths_and_settings::block_extension_and_paths; +use crate::records::block_height::decrease_block_height::decrease_height; +use crate::records::block_height::get_block_height::get_height; +use crate::sled::Db; +use crate::torrent::structs::Torrent; +use crate::AsyncReadExt; +use crate::File; +use crate::Path; +use crate::PathBuf; + +async fn get_or_correct_local_torrent(db: &Db, height: u32) -> Result { + // Look for the canonical torrent file for this network and height. + let ( + _network_name, + _padded_base_coin, + _file_ext, + torrent_path, + _wallet_path, + _block_path, + _db_path, + _balance_path, + _log_path, + ) = block_extension_and_paths(); + + let torrent_file = PathBuf::from(&torrent_path) + .join(format!("{height}.torrent")) + .to_string_lossy() + .into_owned(); + let file_exists = Path::new(&torrent_file).exists(); + if file_exists { + // Existing canonical torrent path can be loaded by the caller. + Ok(torrent_file) + } else { + // If the chain height points past available torrent data, step the recorded height back. + let check_height = get_height(db); + if check_height > 0 { + let torrent_file = PathBuf::from(torrent_path) + .join(format!("{height}.torrent")) + .to_string_lossy() + .into_owned(); + + let file_exists = Path::new(&torrent_file).exists(); + if !file_exists { + decrease_height(check_height - 1, db); + } + } + Err("Failed to load torrent".to_string()) + } +} + +pub async fn load_torrent(db: &Db, height: u32) -> Result { + let torrent = get_or_correct_local_torrent(db, height).await?; + + if let Ok(mut torrent_file) = File::open(&torrent).await { + // Torrent files are stored in the compact binary format from Torrent::to_bytes. + let mut torrent_contents = Vec::new(); + torrent_file + .read_to_end(&mut torrent_contents) + .await + .map_err(|e| e.to_string())?; + // Convert the saved bytes back into the in-memory metadata struct. + Torrent::from_bytes(&torrent_contents) + .await + .map_err(|e| e.to_string()) + } else { + Err("Could not open torrent".to_string()) + } +} diff --git a/src/wallets/load_wallets.rs b/src/wallets/load_wallets.rs index 75def39..9b7f93e 100644 --- a/src/wallets/load_wallets.rs +++ b/src/wallets/load_wallets.rs @@ -1,90 +1,90 @@ -use crate::common::network_paths_and_settings::block_extension_and_paths; -use crate::decode_image_and_extract_text; -use crate::decrypts; -use crate::from_slice; -use crate::log::error; -use crate::metadata; -use crate::read; -use crate::wallets::structures::{SavedWallet, Wallet}; -use crate::Path; - -impl Wallet { - pub async fn get_wallet_path() -> String { - // Load the active network paths and keep only the wallet file path. - let ( - _network_name, - _padded_base_coin, - _suffix, - _torrent_path, - wallet_path, - _block_path, - _db_path, - _balance_path, - _log_path, - ) = block_extension_and_paths(); - wallet_path - } - - pub async fn private_key_from_wallet( - wallet_path: &Path, - wallet_key: String, - ) -> Result { - // Read the wallet JSON file from disk. - if let Ok(wallet_content) = read(wallet_path).await { - // Deserialize the saved wallet before extracting the encrypted image payload. - let mut wallet: SavedWallet = from_slice(&wallet_content) - .map_err(|e| format!("Deserialization of wallet failed: {e}"))?; - - // Extract the encrypted private key text from the wallet image. - if let Some(encrypted_text) = decode_image_and_extract_text(&wallet.private_key) { - // Decrypt the private key with the user-provided wallet key. - if let Some(decrypted_private_key) = decrypts(&encrypted_text, Some(&wallet_key)) { - // Replace the saved encrypted image payload with the decrypted private key in memory. - wallet.private_key = decrypted_private_key; - - // Attach the encryption key so the full wallet can be reused by callers. - let full_wallet = Wallet { - saved: wallet, - encryption_key: wallet_key.clone(), - }; - - // Return the loaded wallet when image extraction and decryption both succeed. - Ok(full_wallet) - } else { - error!("Decryption of private key failed."); - Err("Decryption of private key failed.".into()) - } - } else { - error!("Decryption of image failed."); - Err("Decryption of image failed.".into()) - } - } else { - error!("Wallet path did not exist"); - Err("Wallet path did not exist".into()) - } - } - - async fn load_wallet(wallet_path: &Path, wallet_key: String) -> Result { - // Load an existing wallet when the file exists. - if metadata(wallet_path).await.is_ok() { - Self::private_key_from_wallet(wallet_path, wallet_key).await - } else { - // Create, save, and load a new wallet when no wallet file exists. - Self::generate_saved_struct(wallet_path, wallet_key).await - } - } - - pub async fn try_obtain_wallet( - wallet_key: String, - path: Option<&str>, - ) -> Result { - // Use a caller-provided path when supplied, otherwise use the active network wallet path. - let wallet_path = match path { - Some(p) => p.to_string(), - None => Wallet::get_wallet_path().await, - }; - - // Load or create the wallet and return any failure to the caller. - Self::load_wallet(Path::new(&wallet_path), wallet_key).await - } -} +use crate::common::network_paths_and_settings::block_extension_and_paths; +use crate::decode_image_and_extract_text; +use crate::decrypts; +use crate::from_slice; +use crate::log::error; +use crate::metadata; +use crate::read; +use crate::wallets::structures::{SavedWallet, Wallet}; +use crate::Path; + +impl Wallet { + pub async fn get_wallet_path() -> String { + // Load the active network paths and keep only the wallet file path. + let ( + _network_name, + _padded_base_coin, + _suffix, + _torrent_path, + wallet_path, + _block_path, + _db_path, + _balance_path, + _log_path, + ) = block_extension_and_paths(); + wallet_path + } + + pub async fn private_key_from_wallet( + wallet_path: &Path, + wallet_key: String, + ) -> Result { + // Read the wallet JSON file from disk. + if let Ok(wallet_content) = read(wallet_path).await { + // Deserialize the saved wallet before extracting the encrypted image payload. + let mut wallet: SavedWallet = from_slice(&wallet_content) + .map_err(|e| format!("Deserialization of wallet failed: {e}"))?; + + // Extract the encrypted private key text from the wallet image. + if let Some(encrypted_text) = decode_image_and_extract_text(&wallet.private_key) { + // Decrypt the private key with the user-provided wallet key. + if let Some(decrypted_private_key) = decrypts(&encrypted_text, Some(&wallet_key)) { + // Replace the saved encrypted image payload with the decrypted private key in memory. + wallet.private_key = decrypted_private_key; + + // Attach the encryption key so the full wallet can be reused by callers. + let full_wallet = Wallet { + saved: wallet, + encryption_key: wallet_key.clone(), + }; + + // Return the loaded wallet when image extraction and decryption both succeed. + Ok(full_wallet) + } else { + error!("Decryption of private key failed."); + Err("Decryption of private key failed.".into()) + } + } else { + error!("Decryption of image failed."); + Err("Decryption of image failed.".into()) + } + } else { + error!("Wallet path did not exist"); + Err("Wallet path did not exist".into()) + } + } + + async fn load_wallet(wallet_path: &Path, wallet_key: String) -> Result { + // Load an existing wallet when the file exists. + if metadata(wallet_path).await.is_ok() { + Self::private_key_from_wallet(wallet_path, wallet_key).await + } else { + // Create, save, and load a new wallet when no wallet file exists. + Self::generate_saved_struct(wallet_path, wallet_key).await + } + } + + pub async fn try_obtain_wallet( + wallet_key: String, + path: Option<&str>, + ) -> Result { + // Use a caller-provided path when supplied, otherwise use the active network wallet path. + let wallet_path = match path { + Some(p) => p.to_string(), + None => Wallet::get_wallet_path().await, + }; + + // Load or create the wallet and return any failure to the caller. + Self::load_wallet(Path::new(&wallet_path), wallet_key).await + } +}