use crate::orphans::replay_errors::should_retry_staged_candidate; use crate::orphans::structs::UndoTransactions; use crate::orphans::torrent_candidates::hydrate_torrent_candidates; use crate::records::block_height::get_block_height::get_height; use crate::records::memory::response_channels::reserve_entry; use crate::records::memory::torrent_status::{ get_torrent_status, mark_other_torrent_statuses_invalid, set_torrent_status, TorrentStatus, }; use crate::torrent::structs::Torrent; use crate::torrent::torrenting_system::save_torrent::{ list_staged_torrents_for_height, read_staged_torrent, }; use crate::torrent::torrenting_system::torrent_requests::{ handle_response_and_save_torrent, send_request_torrent_message, }; use crate::{timeout, Duration}; use std::collections::HashSet; pub async fn save_new_blocks( params: &UndoTransactions, max_height: u32, wallet_key: &str, mut true_start_height: u32, ) -> Result<(), String> { // after rollback, request and save each remote block from the // divergence point up to the height we need to restore let mut hydrated_heights = HashSet::new(); loop { let mut resolved_from_staging = false; let staged_candidates = list_staged_torrents_for_height(true_start_height).await?; let mut ordered_candidates = Vec::new(); for staged_path in staged_candidates { // Try already-staged torrents before asking the peer again. // Bad staged entries are ignored here and marked below if tested. let torrent_bytes = match read_staged_torrent(&staged_path).await { Ok(bytes) => bytes, Err(_) => continue, }; let torrent = match Torrent::from_bytes(&torrent_bytes).await { Ok(torrent) => torrent, Err(_) => continue, }; if matches!( get_torrent_status(true_start_height, &torrent.info.info_hash).await, TorrentStatus::Invalid ) { continue; } ordered_candidates.push(torrent); } // Status only filters out known-bad candidates. Among every candidate // still eligible for replay, use the normal block-fight ordering. ordered_candidates.sort_by(|a, b| { a.info .timestamp .cmp(&b.info.timestamp) .then(a.info.nonce.cmp(&b.info.nonce)) .then(a.info.vrf.cmp(&b.info.vrf)) }); for torrent in &ordered_candidates { let torrent_info_hash = torrent.info.info_hash.clone(); // Height advancement is the proof that the candidate actually // extended the chain rather than merely parsing successfully. let local_height_before = get_height(¶ms.db); match handle_response_and_save_torrent( true_start_height, ¶ms.db, torrent.clone(), wallet_key, params.map.clone(), true, ) .await { Ok(()) => { if get_height(¶ms.db) > local_height_before { set_torrent_status( true_start_height, &torrent_info_hash, TorrentStatus::Valid, ) .await; mark_other_torrent_statuses_invalid(true_start_height, &torrent_info_hash) .await; resolved_from_staging = true; break; } else { set_torrent_status( true_start_height, &torrent_info_hash, TorrentStatus::Invalid, ) .await; } } Err(err) => { let status = if should_retry_staged_candidate(&err) { // Missing pieces mean the candidate has not been // tested yet, so keep it eligible for a later replay. TorrentStatus::Pending } else { TorrentStatus::Invalid }; set_torrent_status(true_start_height, &torrent_info_hash, status).await; } } } if resolved_from_staging { // Continue replaying if staged data supplied this height and more // replacement heights remain. match max_height.cmp(&true_start_height) { std::cmp::Ordering::Equal => break, std::cmp::Ordering::Greater => { true_start_height += 1; continue; } std::cmp::Ordering::Less => break, } } if hydrated_heights.insert(true_start_height) { let imported = hydrate_torrent_candidates( params.stream.clone(), params.map.clone(), params.connections_key.clone(), ) .await?; if imported > 0 { // Peer candidate hydration can add staged torrents that // are not canonical on the peer yet. Restart this height // so those candidates are tried before canonical fallback. continue; } } // No staged candidate worked, so request the replacement torrent // directly from the connected peer. let (hashmap_key, _save_tx, save_rx) = reserve_entry(params.map.clone()).await; send_request_torrent_message( params.stream.clone(), true_start_height, hashmap_key, params.connections_key.clone(), ) .await .map_err(|e| e.to_string())?; let mut rx = save_rx.lock().await; if let Some(remote_torrent) = timeout(Duration::from_secs(30), rx.recv()) .await .map_err(|_| { format!( "Timed out waiting for replacement torrent at height {true_start_height}" ) })? { if let Ok(text) = String::from_utf8(remote_torrent.clone()) { let trimmed = text.trim(); if !trimmed.is_empty() { // Torrent replies should be raw torrent bytes. Text here // usually means the peer returned an error response. return Err(format!( "Unexpected textual torrent response while replaying height {true_start_height}: {trimmed}" )); } } let torrent = match Torrent::from_bytes(&remote_torrent).await { Ok(torrent) => torrent, Err(err) => { return Err(err.to_string()); } }; let torrent_info_hash = torrent.info.info_hash.clone(); let local_height_before = get_height(¶ms.db); // Save through the normal torrent path so all validation and // record updates stay identical to a live broadcast. handle_response_and_save_torrent( true_start_height, ¶ms.db, torrent, wallet_key, params.map.clone(), true, ) .await?; if get_height(¶ms.db) <= local_height_before { set_torrent_status( true_start_height, &torrent_info_hash, TorrentStatus::Invalid, ) .await; return Err(format!( "Replacement torrent at height {true_start_height} did not advance the chain" )); } set_torrent_status(true_start_height, &torrent_info_hash, TorrentStatus::Valid).await; mark_other_torrent_statuses_invalid(true_start_height, &torrent_info_hash).await; } else { return Err(format!( "No replacement torrent received while replaying height {true_start_height}" )); } // continue until the requested replacement range // has been fully saved locally match max_height.cmp(&true_start_height) { std::cmp::Ordering::Equal => break, std::cmp::Ordering::Greater => true_start_height += 1, std::cmp::Ordering::Less => break, } } Ok(()) }