Contractless/src/orphans/save_blocks.rs

228 lines
8.6 KiB
Rust

use crate::orphans::replay_errors::staged_candidate_status_for_error;
use crate::orphans::structs::UndoTransactions;
use crate::orphans::torrent_candidates::hydrate_torrent_candidates;
use crate::records::block_height::get_block_height::get_height;
use crate::records::memory::response_channels::reserve_entry_with_context;
use crate::rpc::command_maps::RPC_TORRENT_BY_HEIGHT;
use crate::records::memory::torrent_status::{
get_torrent_status, set_torrent_status, TorrentStatus,
};
use crate::torrent::structs::Torrent;
use crate::torrent::torrenting_system::save_torrent::{
list_staged_torrents_for_height, read_staged_torrent,
};
use crate::torrent::torrenting_system::torrent_requests::{
handle_response_and_save_torrent, send_request_torrent_message,
};
use crate::wallets::structures::Wallet;
use crate::Arc;
use crate::{timeout, Duration};
use std::collections::HashSet;
pub async fn save_new_blocks(
params: &UndoTransactions,
replay_to_height: u32,
wallet: Arc<Wallet>,
mut true_start_height: u32,
) -> Result<(), String> {
// After rollback, save replacement blocks only up to the height
// proven by the peer/orphan context. The old local tip may include
// blocks mined on the losing branch while orphan correction was running.
let mut hydrated_heights = HashSet::new();
loop {
let mut resolved_from_staging = false;
let staged_candidates = list_staged_torrents_for_height(true_start_height).await?;
let mut ordered_candidates = Vec::new();
for staged_path in staged_candidates {
// Try already-staged torrents before asking the peer again.
// Bad staged entries are ignored here and marked below if tested.
let torrent_bytes = match read_staged_torrent(&staged_path).await {
Ok(bytes) => bytes,
Err(_) => continue,
};
let torrent = match Torrent::from_bytes(&torrent_bytes).await {
Ok(torrent) => torrent,
Err(_) => continue,
};
if matches!(
get_torrent_status(true_start_height, &torrent.info.info_hash).await,
TorrentStatus::Invalid
) {
continue;
}
ordered_candidates.push(torrent);
}
// Status only filters out known-bad candidates. Among every candidate
// still eligible for replay, use the normal block-fight ordering.
ordered_candidates.sort_by(|a, b| {
a.info
.timestamp
.cmp(&b.info.timestamp)
.then(a.info.nonce.cmp(&b.info.nonce))
.then(a.info.vrf.cmp(&b.info.vrf))
});
for torrent in &ordered_candidates {
let torrent_info_hash = torrent.info.info_hash.clone();
// Height advancement is the proof that the candidate actually
// extended the chain rather than merely parsing successfully.
let local_height_before = get_height(&params.db);
match handle_response_and_save_torrent(
true_start_height,
&params.db,
torrent.clone(),
wallet.clone(),
params.map.clone(),
true,
params.node_syncing,
true,
)
.await
{
Ok(()) => {
if get_height(&params.db) > local_height_before {
set_torrent_status(
true_start_height,
&torrent_info_hash,
TorrentStatus::Valid,
)
.await;
resolved_from_staging = true;
break;
} else {
set_torrent_status(
true_start_height,
&torrent_info_hash,
TorrentStatus::Invalid,
)
.await;
}
}
Err(err) => {
let status = staged_candidate_status_for_error(&err);
set_torrent_status(true_start_height, &torrent_info_hash, status).await;
}
}
}
if resolved_from_staging {
// Continue replaying if staged data supplied this height and more
// replacement heights remain.
match replay_to_height.cmp(&true_start_height) {
std::cmp::Ordering::Equal => break,
std::cmp::Ordering::Greater => {
true_start_height += 1;
continue;
}
std::cmp::Ordering::Less => break,
}
}
if hydrated_heights.insert(true_start_height) {
let imported = hydrate_torrent_candidates(
params.stream.clone(),
params.map.clone(),
params.connections_key.clone(),
)
.await?;
if imported > 0 {
// Peer candidate hydration can add staged torrents that
// are not canonical on the peer yet. Restart this height
// so those candidates are tried before canonical fallback.
continue;
}
}
// No staged candidate worked, so request the replacement torrent
// directly from the connected peer.
let (hashmap_key, _save_tx, save_rx) = reserve_entry_with_context(
params.map.clone(),
Some(RPC_TORRENT_BY_HEIGHT),
Some(params.connections_key.clone()),
)
.await;
send_request_torrent_message(
params.stream.clone(),
true_start_height,
hashmap_key,
params.connections_key.clone(),
)
.await
.map_err(|e| e.to_string())?;
let mut rx = save_rx.lock().await;
if let Some(remote_torrent) =
timeout(Duration::from_secs(30), rx.recv())
.await
.map_err(|_| {
format!(
"Timed out waiting for replacement torrent at height {true_start_height}"
)
})?
{
if let Ok(text) = String::from_utf8(remote_torrent.clone()) {
let trimmed = text.trim();
if !trimmed.is_empty() {
// Torrent replies should be raw torrent bytes. Text here
// usually means the peer returned an error response.
return Err(format!(
"Unexpected textual torrent response while replaying height {true_start_height}: {trimmed}"
));
}
}
let torrent = match Torrent::from_bytes(&remote_torrent).await {
Ok(torrent) => torrent,
Err(err) => {
return Err(err.to_string());
}
};
let torrent_info_hash = torrent.info.info_hash.clone();
let local_height_before = get_height(&params.db);
// Save through the normal torrent path so all validation and
// record updates stay identical to a live broadcast.
handle_response_and_save_torrent(
true_start_height,
&params.db,
torrent,
wallet.clone(),
params.map.clone(),
true,
params.node_syncing,
true,
)
.await?;
if get_height(&params.db) <= local_height_before {
set_torrent_status(
true_start_height,
&torrent_info_hash,
TorrentStatus::Invalid,
)
.await;
return Err(format!(
"Replacement torrent at height {true_start_height} did not advance the chain"
));
}
set_torrent_status(true_start_height, &torrent_info_hash, TorrentStatus::Valid).await;
} else {
return Err(format!(
"No replacement torrent received while replaying height {true_start_height}"
));
}
// continue until the requested replacement range
// has been fully saved locally
match replay_to_height.cmp(&true_start_height) {
std::cmp::Ordering::Equal => break,
std::cmp::Ordering::Greater => true_start_height += 1,
std::cmp::Ordering::Less => break,
}
}
Ok(())
}