initial release

This commit is contained in:
viraladmin 2026-05-26 14:19:47 -06:00
parent b23949d2d8
commit 8f9c9cb266
8 changed files with 105 additions and 7 deletions

10
.gitignore vendored Normal file
View File

@ -0,0 +1,10 @@
target/
logs/
blocks/
torrents/
state/
balance_sheet/
wallets/
*.wallet
*.key
*.pdb

View File

@ -11,6 +11,7 @@ use crate::records::unpack_block::load_by_binary_data::load_block_from_binary;
use crate::records::unpack_block::unpack_header::load_block_header; use crate::records::unpack_block::unpack_header::load_block_header;
use crate::torrent::structs::{DownloadSave, Torrent}; use crate::torrent::structs::{DownloadSave, Torrent};
use crate::torrent::torrenting_system::create_file::combine_pieces; use crate::torrent::torrenting_system::create_file::combine_pieces;
use crate::torrent::torrenting_system::download_locks::acquire_candidate_download;
use crate::torrent::torrenting_system::download_pieces::download_block_pieces; use crate::torrent::torrenting_system::download_pieces::download_block_pieces;
use crate::torrent::torrenting_system::save_torrent::{ use crate::torrent::torrenting_system::save_torrent::{
list_staged_torrents_for_height, read_staged_torrent, list_staged_torrents_for_height, read_staged_torrent,
@ -101,6 +102,7 @@ async fn candidate_attaches_before_rollback(
// Metadata may choose a candidate, but only downloaded block bytes can // Metadata may choose a candidate, but only downloaded block bytes can
// prove the rollback is safe. // prove the rollback is safe.
torrent.verify(height, &params.db, wallet_key).await?; torrent.verify(height, &params.db, wallet_key).await?;
let _download_guard = acquire_candidate_download(height, &torrent.info.info_hash, true).await?;
let verification_service = global_verification_service() let verification_service = global_verification_service()
.ok_or_else(|| "Verification service not initialized".to_string())?; .ok_or_else(|| "Verification service not initialized".to_string())?;

View File

@ -11,4 +11,7 @@ pub fn should_retry_staged_candidate(error: &str) -> bool {
|| error.contains("No replacement torrent received") || error.contains("No replacement torrent received")
|| error.contains("Piece reply channel closed") || error.contains("Piece reply channel closed")
|| error.contains("Replay waiting for block pieces") || error.contains("Replay waiting for block pieces")
|| error.contains("Candidate download already active")
|| error.contains("Timed out waiting for active candidate download")
|| error.contains("Downloaded candidate length does not match torrent metadata")
} }

View File

@ -0,0 +1,71 @@
use crate::lazy_static;
use crate::{sleep, Duration};
use std::collections::HashSet;
use std::sync::Mutex;
const DOWNLOAD_LOCK_WAIT_MS: u64 = 30_000;
const DOWNLOAD_LOCK_POLL_MS: u64 = 50;
lazy_static! {
static ref ACTIVE_DOWNLOADS: Mutex<HashSet<String>> = Mutex::new(HashSet::new());
}
pub struct CandidateDownloadGuard {
key: String,
}
impl Drop for CandidateDownloadGuard {
fn drop(&mut self) {
let mut active = ACTIVE_DOWNLOADS
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
active.remove(&self.key);
}
}
fn download_key(height: u32, info_hash: &str) -> String {
format!("{height}:{info_hash}")
}
fn try_acquire(height: u32, info_hash: &str) -> Option<CandidateDownloadGuard> {
let key = download_key(height, info_hash);
let mut active = ACTIVE_DOWNLOADS
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
if active.insert(key.clone()) {
Some(CandidateDownloadGuard { key })
} else {
None
}
}
pub async fn acquire_candidate_download(
height: u32,
info_hash: &str,
wait_for_active_download: bool,
) -> Result<CandidateDownloadGuard, String> {
if let Some(guard) = try_acquire(height, info_hash) {
return Ok(guard);
}
if !wait_for_active_download {
return Err(format!(
"Candidate download already active for block {height}"
));
}
let mut waited_ms = 0;
while waited_ms < DOWNLOAD_LOCK_WAIT_MS {
sleep(Duration::from_millis(DOWNLOAD_LOCK_POLL_MS)).await;
waited_ms += DOWNLOAD_LOCK_POLL_MS;
if let Some(guard) = try_acquire(height, info_hash) {
return Ok(guard);
}
}
Err(format!(
"Timed out waiting for active candidate download at block {height}"
))
}

View File

@ -101,13 +101,8 @@ async fn handle_downloaded_piece(job: DownloadedPieceJob, data: Vec<u8>) {
}; };
if received_hash == expected_hash { if received_hash == expected_hash {
// A hash match makes the piece available for final block assembly.
{
let mut torrent_map = job.torrent_map.lock().await;
let _ = torrent_map.mark_piece_complete(job.piece);
}
if let Err(err) = save_piece_to_db( if let Err(err) = save_piece_to_db(
job.db, job.db.clone(),
job.block_number, job.block_number,
&job.torrent.info.info_hash, &job.torrent.info.info_hash,
job.piece, job.piece,
@ -119,6 +114,14 @@ async fn handle_downloaded_piece(job: DownloadedPieceJob, data: Vec<u8>) {
"[download] failed to stage piece data: block_number={} piece={} err={}", "[download] failed to stage piece data: block_number={} piece={} err={}",
job.block_number, job.piece, err job.block_number, job.piece, err
); );
mark_piece_failed(job.torrent_map, job.piece, &job.ip).await;
return;
}
// A hash match is only complete after the piece bytes are staged for final assembly.
{
let mut torrent_map = job.torrent_map.lock().await;
let _ = torrent_map.mark_piece_complete(job.piece);
} }
} else { } else {
// A bad hash is treated like a failed peer response so another node can be tried. // A bad hash is treated like a failed peer response so another node can be tried.

View File

@ -1,5 +1,6 @@
// The torrenting_system module coordinates peer discovery, piece downloads, staging, and save flow. // The torrenting_system module coordinates peer discovery, piece downloads, staging, and save flow.
pub mod create_file; pub mod create_file;
pub mod download_locks;
pub mod download_pieces; pub mod download_pieces;
pub mod get_nodes; pub mod get_nodes;
pub mod request_piece; pub mod request_piece;

View File

@ -29,7 +29,6 @@ pub async fn verify_and_save_block(params: DownloadSave) -> Result<(), String> {
"[download] deferring verify/save during reorg: block_number={}", "[download] deferring verify/save during reorg: block_number={}",
params.block_number params.block_number
); );
cleanup_download_pieces(&params).await;
return Err("Block download/save deferred while reorganizing.".to_string()); return Err("Block download/save deferred while reorganizing.".to_string());
} }
@ -62,6 +61,10 @@ pub async fn verify_and_save_block(params: DownloadSave) -> Result<(), String> {
) )
.await?; .await?;
if result.len() != params.torrent.info.length as usize {
return Err("Downloaded candidate length does not match torrent metadata.".to_string());
}
// The combined block bytes must hash to the torrent's advertised info hash. // The combined block bytes must hash to the torrent's advertised info hash.
if skein_128_hash_bytes(&result) != params.torrent.info.info_hash { if skein_128_hash_bytes(&result) != params.torrent.info.info_hash {
let err_msg = "Hash validation failed for complete block".to_string(); let err_msg = "Hash validation failed for complete block".to_string();

View File

@ -1,6 +1,7 @@
use crate::records::memory::response_channels::Command; use crate::records::memory::response_channels::Command;
use crate::sled::Db; use crate::sled::Db;
use crate::torrent::structs::{DownloadSave, Torrent}; use crate::torrent::structs::{DownloadSave, Torrent};
use crate::torrent::torrenting_system::download_locks::acquire_candidate_download;
use crate::torrent::torrenting_system::download_pieces::download_block_pieces; use crate::torrent::torrenting_system::download_pieces::download_block_pieces;
use crate::torrent::torrenting_system::save_block::verify_and_save_block; use crate::torrent::torrenting_system::save_block::verify_and_save_block;
use crate::torrent::torrenting_system::torrent_map::create_torrent_map; use crate::torrent::torrenting_system::torrent_map::create_torrent_map;
@ -19,6 +20,10 @@ pub async fn setup_download(
verification_service: Arc<VerificationService>, verification_service: Arc<VerificationService>,
map: Arc<Mutex<Command>>, map: Arc<Mutex<Command>>,
) -> Result<(), String> { ) -> Result<(), String> {
let _download_guard =
acquire_candidate_download(block_number, &torrent.info.info_hash, allow_during_reorg)
.await?;
// Initialize in-memory status tracking for each piece in the torrent. // Initialize in-memory status tracking for each piece in the torrent.
let torrent_map = create_torrent_map(&torrent).await?; let torrent_map = create_torrent_map(&torrent).await?;