diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..93d7517 --- /dev/null +++ b/.gitignore @@ -0,0 +1,10 @@ +target/ +logs/ +blocks/ +torrents/ +state/ +balance_sheet/ +wallets/ +*.wallet +*.key +*.pdb diff --git a/src/orphans/orphan_checkup.rs b/src/orphans/orphan_checkup.rs index 7d89c32..e21968c 100644 --- a/src/orphans/orphan_checkup.rs +++ b/src/orphans/orphan_checkup.rs @@ -11,6 +11,7 @@ use crate::records::unpack_block::load_by_binary_data::load_block_from_binary; use crate::records::unpack_block::unpack_header::load_block_header; use crate::torrent::structs::{DownloadSave, Torrent}; use crate::torrent::torrenting_system::create_file::combine_pieces; +use crate::torrent::torrenting_system::download_locks::acquire_candidate_download; use crate::torrent::torrenting_system::download_pieces::download_block_pieces; use crate::torrent::torrenting_system::save_torrent::{ list_staged_torrents_for_height, read_staged_torrent, @@ -101,6 +102,7 @@ async fn candidate_attaches_before_rollback( // Metadata may choose a candidate, but only downloaded block bytes can // prove the rollback is safe. torrent.verify(height, ¶ms.db, wallet_key).await?; + let _download_guard = acquire_candidate_download(height, &torrent.info.info_hash, true).await?; let verification_service = global_verification_service() .ok_or_else(|| "Verification service not initialized".to_string())?; diff --git a/src/orphans/replay_errors.rs b/src/orphans/replay_errors.rs index 9110411..d811eb1 100644 --- a/src/orphans/replay_errors.rs +++ b/src/orphans/replay_errors.rs @@ -11,4 +11,7 @@ pub fn should_retry_staged_candidate(error: &str) -> bool { || error.contains("No replacement torrent received") || error.contains("Piece reply channel closed") || error.contains("Replay waiting for block pieces") + || error.contains("Candidate download already active") + || error.contains("Timed out waiting for active candidate download") + || error.contains("Downloaded candidate length does not match torrent metadata") } diff --git a/src/torrent/torrenting_system/download_locks.rs b/src/torrent/torrenting_system/download_locks.rs new file mode 100644 index 0000000..6bc3e8f --- /dev/null +++ b/src/torrent/torrenting_system/download_locks.rs @@ -0,0 +1,71 @@ +use crate::lazy_static; +use crate::{sleep, Duration}; +use std::collections::HashSet; +use std::sync::Mutex; + +const DOWNLOAD_LOCK_WAIT_MS: u64 = 30_000; +const DOWNLOAD_LOCK_POLL_MS: u64 = 50; + +lazy_static! { + static ref ACTIVE_DOWNLOADS: Mutex> = Mutex::new(HashSet::new()); +} + +pub struct CandidateDownloadGuard { + key: String, +} + +impl Drop for CandidateDownloadGuard { + fn drop(&mut self) { + let mut active = ACTIVE_DOWNLOADS + .lock() + .unwrap_or_else(|poisoned| poisoned.into_inner()); + active.remove(&self.key); + } +} + +fn download_key(height: u32, info_hash: &str) -> String { + format!("{height}:{info_hash}") +} + +fn try_acquire(height: u32, info_hash: &str) -> Option { + let key = download_key(height, info_hash); + let mut active = ACTIVE_DOWNLOADS + .lock() + .unwrap_or_else(|poisoned| poisoned.into_inner()); + + if active.insert(key.clone()) { + Some(CandidateDownloadGuard { key }) + } else { + None + } +} + +pub async fn acquire_candidate_download( + height: u32, + info_hash: &str, + wait_for_active_download: bool, +) -> Result { + if let Some(guard) = try_acquire(height, info_hash) { + return Ok(guard); + } + + if !wait_for_active_download { + return Err(format!( + "Candidate download already active for block {height}" + )); + } + + let mut waited_ms = 0; + while waited_ms < DOWNLOAD_LOCK_WAIT_MS { + sleep(Duration::from_millis(DOWNLOAD_LOCK_POLL_MS)).await; + waited_ms += DOWNLOAD_LOCK_POLL_MS; + + if let Some(guard) = try_acquire(height, info_hash) { + return Ok(guard); + } + } + + Err(format!( + "Timed out waiting for active candidate download at block {height}" + )) +} diff --git a/src/torrent/torrenting_system/download_pieces.rs b/src/torrent/torrenting_system/download_pieces.rs index bd052d3..50520f3 100644 --- a/src/torrent/torrenting_system/download_pieces.rs +++ b/src/torrent/torrenting_system/download_pieces.rs @@ -101,13 +101,8 @@ async fn handle_downloaded_piece(job: DownloadedPieceJob, data: Vec) { }; if received_hash == expected_hash { - // A hash match makes the piece available for final block assembly. - { - let mut torrent_map = job.torrent_map.lock().await; - let _ = torrent_map.mark_piece_complete(job.piece); - } if let Err(err) = save_piece_to_db( - job.db, + job.db.clone(), job.block_number, &job.torrent.info.info_hash, job.piece, @@ -119,6 +114,14 @@ async fn handle_downloaded_piece(job: DownloadedPieceJob, data: Vec) { "[download] failed to stage piece data: block_number={} piece={} err={}", job.block_number, job.piece, err ); + mark_piece_failed(job.torrent_map, job.piece, &job.ip).await; + return; + } + + // A hash match is only complete after the piece bytes are staged for final assembly. + { + let mut torrent_map = job.torrent_map.lock().await; + let _ = torrent_map.mark_piece_complete(job.piece); } } else { // A bad hash is treated like a failed peer response so another node can be tried. diff --git a/src/torrent/torrenting_system/mod.rs b/src/torrent/torrenting_system/mod.rs index 499be71..1bc6601 100644 --- a/src/torrent/torrenting_system/mod.rs +++ b/src/torrent/torrenting_system/mod.rs @@ -1,5 +1,6 @@ // The torrenting_system module coordinates peer discovery, piece downloads, staging, and save flow. pub mod create_file; +pub mod download_locks; pub mod download_pieces; pub mod get_nodes; pub mod request_piece; diff --git a/src/torrent/torrenting_system/save_block.rs b/src/torrent/torrenting_system/save_block.rs index ed35f8d..ce75354 100644 --- a/src/torrent/torrenting_system/save_block.rs +++ b/src/torrent/torrenting_system/save_block.rs @@ -29,7 +29,6 @@ pub async fn verify_and_save_block(params: DownloadSave) -> Result<(), String> { "[download] deferring verify/save during reorg: block_number={}", params.block_number ); - cleanup_download_pieces(¶ms).await; return Err("Block download/save deferred while reorganizing.".to_string()); } @@ -62,6 +61,10 @@ pub async fn verify_and_save_block(params: DownloadSave) -> Result<(), String> { ) .await?; + if result.len() != params.torrent.info.length as usize { + return Err("Downloaded candidate length does not match torrent metadata.".to_string()); + } + // The combined block bytes must hash to the torrent's advertised info hash. if skein_128_hash_bytes(&result) != params.torrent.info.info_hash { let err_msg = "Hash validation failed for complete block".to_string(); diff --git a/src/torrent/torrenting_system/setup_block_download.rs b/src/torrent/torrenting_system/setup_block_download.rs index 782ae4e..8015cae 100644 --- a/src/torrent/torrenting_system/setup_block_download.rs +++ b/src/torrent/torrenting_system/setup_block_download.rs @@ -1,6 +1,7 @@ use crate::records::memory::response_channels::Command; use crate::sled::Db; use crate::torrent::structs::{DownloadSave, Torrent}; +use crate::torrent::torrenting_system::download_locks::acquire_candidate_download; use crate::torrent::torrenting_system::download_pieces::download_block_pieces; use crate::torrent::torrenting_system::save_block::verify_and_save_block; use crate::torrent::torrenting_system::torrent_map::create_torrent_map; @@ -19,6 +20,10 @@ pub async fn setup_download( verification_service: Arc, map: Arc>, ) -> Result<(), String> { + let _download_guard = + acquire_candidate_download(block_number, &torrent.info.info_hash, allow_during_reorg) + .await?; + // Initialize in-memory status tracking for each piece in the torrent. let torrent_map = create_torrent_map(&torrent).await?;