Contractless/src/orphans/sync_check.rs

321 lines
12 KiB
Rust

use crate::common::check_genesis::genesis_checkup;
use crate::log::{error, info, warn};
use crate::miner::flag::end_reorg_lock;
use crate::orphans::add_genesis::create_genesis_block;
use crate::orphans::checkup_state::take_orphan_recheck_height;
use crate::orphans::deep_sync_rollback::deep_sync_rollback;
use crate::orphans::orphan_window_check::orphan_window_check;
use crate::orphans::replay_errors::{
should_retry_staged_candidate, staged_candidate_status_for_error,
};
use crate::orphans::snapshot_check::snapshot_verified;
use crate::orphans::structs::CheckUp;
use crate::orphans::structs::OrphanCheckup2;
use crate::orphans::structs::UndoTransactions;
use crate::records::block_height::get_block_height::get_height;
use crate::records::memory::torrent_status::{
get_torrent_status, set_torrent_status, TorrentStatus,
};
use crate::startup::remote_height::request_remote_height;
use crate::torrent::structs::Torrent;
use crate::torrent::torrenting_system::save_torrent::{
list_staged_torrents, read_staged_torrent, remove_staged_torrent,
};
use crate::torrent::torrenting_system::torrent_requests::handle_response_and_save_torrent;
use crate::wallets::structures::Wallet;
use crate::Arc;
async fn replay_staged_torrents(
params: &OrphanCheckup2,
wallet: Arc<Wallet>,
) -> Result<(), String> {
// staged torrents are replayed after orphan correction so
// any valid deferred candidates can be reconsidered in order.
// Replay is height-based: all candidates for the current expected
// height are tested before giving up on that height. Valid staged
// torrents are retained inside the orphan window even if they do
// not advance the chain on this replay pass.
loop {
let staged_torrents = list_staged_torrents().await?;
if staged_torrents.is_empty() {
return Ok(());
}
// Replay only the next expected height. Later staged torrents have to
// wait until their parent height exists locally.
let local_height = get_height(&params.db);
let expected_height = if local_height > 0 || genesis_checkup().await {
local_height + 1
} else {
local_height
};
let mut candidates = Vec::new();
for (height, staged_path) in staged_torrents {
// collect all candidates for the current expected height.
// anything beyond that height must wait until a winner is found.
if height == expected_height {
candidates.push(staged_path);
continue;
}
if height > expected_height {
break;
}
}
if candidates.is_empty() {
return Ok(());
}
let mut ordered_candidates = Vec::new();
for staged_path in candidates {
// Corrupt staged files are removed so they do not keep blocking
// future replay attempts for this height.
let torrent_bytes = match read_staged_torrent(&staged_path).await {
Ok(bytes) => bytes,
Err(err) => {
error!("[orphan] failed to read staged torrent {expected_height}: {err}");
remove_staged_torrent(&staged_path).await?;
continue;
}
};
let torrent = match Torrent::from_bytes(&torrent_bytes).await {
Ok(torrent) => torrent,
Err(err) => {
error!("[orphan] failed to parse staged torrent {expected_height}: {err}");
remove_staged_torrent(&staged_path).await?;
continue;
}
};
if matches!(
get_torrent_status(expected_height, &torrent.info.info_hash).await,
TorrentStatus::Invalid
) {
continue;
}
ordered_candidates.push(torrent);
}
// Status only filters out known-bad candidates. Among every candidate
// still eligible for replay, use the normal block-fight ordering.
ordered_candidates.sort_by(|a, b| {
a.info
.timestamp
.cmp(&b.info.timestamp)
.then(a.info.nonce.cmp(&b.info.nonce))
.then(a.info.vrf.cmp(&b.info.vrf))
});
if ordered_candidates.is_empty() {
return Ok(());
}
let mut advanced_height = false;
let mut retryable_pending = false;
for torrent in ordered_candidates {
let torrent_info_hash = torrent.info.info_hash.clone();
// Reuse the normal torrent save/verify pipeline; staged replay
// should behave exactly like receiving the torrent live.
match handle_response_and_save_torrent(
expected_height,
&params.db,
torrent,
wallet.clone(),
params.map.clone(),
true,
true,
)
.await
{
Ok(()) => {
// Mark the candidate according to whether it actually
// advanced the local chain height.
advanced_height = get_height(&params.db) >= expected_height;
let status = if advanced_height {
TorrentStatus::Valid
} else {
TorrentStatus::Invalid
};
set_torrent_status(expected_height, &torrent_info_hash, status).await;
if advanced_height {
break;
}
}
Err(err) => {
let status = staged_candidate_status_for_error(&err);
if status != TorrentStatus::Invalid {
retryable_pending = true;
// Piece availability is not proof that the candidate
// lost the block fight; leave it pending so a later
// orphan pass can retry after more peers stage it.
set_torrent_status(expected_height, &torrent_info_hash, status).await;
} else {
set_torrent_status(
expected_height,
&torrent_info_hash,
TorrentStatus::Invalid,
)
.await;
}
error!("[orphan] staged torrent replay candidate failed: height={expected_height} err={err}");
continue;
}
}
}
if !advanced_height {
// Every staged candidate for the current expected height was
// exhausted without extending the chain, so stop replay here.
if retryable_pending {
warn!(
"[orphan] replay paused at height {expected_height}; candidate pieces are not currently available"
);
return Ok(());
}
return Ok(());
}
}
}
async fn sync_checkup_pass(params: &OrphanCheckup2, wallet: Arc<Wallet>) -> Result<(), String> {
// bootstrap missing genesis first so the normal orphan
// correction logic can operate against a valid local chain
if params.local_height == 0 && !genesis_checkup().await {
warn!("[orphan] local genesis missing, creating genesis block");
create_genesis_block(
params.local_height,
params.map.clone(),
params.stream.clone(),
params.db.clone(),
wallet.clone(),
params.connections_key.clone(),
)
.await;
}
let undo_transactions_params = UndoTransactions {
start_height: params.local_height,
replay_to_height: params.remote_height,
db: params.db.clone(),
stream: params.stream.clone(),
map: params.map.clone(),
node_syncing: params.node_syncing,
connections_key: params.connections_key.clone(),
};
// snapshot verification can trigger an immediate rollback
// if a trusted checkpoint no longer matches local state
if !snapshot_verified(undo_transactions_params, wallet.clone()).await {
// A snapshot rollback already happened, so replay staged torrents and
// exit instead of running the near-tip rules against stale heights.
let mut replay_waiting = false;
match replay_staged_torrents(params, wallet.clone()).await {
Ok(()) => {}
Err(err) => {
replay_waiting = should_retry_staged_candidate(&err);
error!("[orphan] staged torrent replay error: {err}");
}
}
if replay_waiting {
warn!(
"[orphan] replay is waiting for block data; leaving candidates pending for a later pass"
);
}
return Ok(());
}
// run the two orphan rules in order, then replay any staged
// torrents that were deferred while reorganization was happening
let checkup_params = CheckUp {
local_height: params.local_height,
remote_height: params.remote_height,
recheck_from_height: params.recheck_from_height,
db: params.db.clone(),
stream: params.stream.clone(),
map: params.map.clone(),
node_syncing: params.node_syncing,
connections_key: params.connections_key.clone(),
};
deep_sync_rollback(checkup_params.clone(), wallet.clone()).await;
let mut replay_waiting = false;
let height_before_window_check = get_height(&params.db);
match orphan_window_check(checkup_params, wallet.clone()).await {
Ok(()) => {}
Err(err) => {
if should_retry_staged_candidate(&err)
&& get_height(&params.db) < height_before_window_check
{
replay_waiting = true;
}
error!("[orphan] orphan window check error: {err}");
}
}
let height_before_replay = get_height(&params.db);
match replay_staged_torrents(params, wallet.clone()).await {
Ok(()) => {}
Err(err) => {
replay_waiting |= should_retry_staged_candidate(&err);
error!("[orphan] staged torrent replay error: {err}");
}
}
if get_height(&params.db) > height_before_replay {
replay_waiting = false;
}
if replay_waiting {
warn!(
"[orphan] replay is waiting for block data; leaving candidates pending for a later pass"
);
}
info!("[orphan] orphan check pass completed");
Ok(())
}
pub async fn sync_checkup(mut params: OrphanCheckup2, wallet: Arc<Wallet>) -> Result<(), String> {
let result = loop {
match sync_checkup_pass(&params, wallet.clone()).await {
Ok(()) => {}
Err(err) => break Err(err),
}
let Some(recheck_height) = take_orphan_recheck_height() else {
break Ok(());
};
let local_height = get_height(&params.db);
let remote_height = match request_remote_height(
params.stream.clone(),
params.map.clone(),
params.connections_key.clone(),
)
.await
{
Ok(height) => height,
Err(err) => {
warn!("[orphan] failed to refresh remote height before queued recheck: {err}");
params.remote_height
}
};
params.local_height = local_height;
params.remote_height = remote_height.max(recheck_height).max(local_height);
params.recheck_from_height = Some(recheck_height);
warn!(
"[orphan] running queued orphan recheck: local_height={} remote_height={} queued_height={}",
params.local_height, params.remote_height, recheck_height
);
};
if !params.node_syncing {
end_reorg_lock();
}
if result.is_ok() {
info!("[orphan] orphan check completed");
}
result
}