Contractless/src/records/record_chain/save.rs

602 lines
21 KiB
Rust

use crate::common::check_genesis::genesis_checkup;
use crate::common::network_paths_and_settings::block_extension_and_paths;
use crate::decode;
use crate::fs;
use crate::log::{error, info};
use crate::miner::flag::{
is_mining_running, is_normal_mode, is_reorganizing_mode, is_syncing_mode,
};
use crate::orphans::snapshot_check::{snapshot_height, update_snapshot};
use crate::records::block_height::get_block_height::get_height;
use crate::records::block_height::increase_block_height::increase_height;
use crate::records::memory::averages::asert_genesis_anchor;
use crate::records::memory::mempool::{
apply_selected_transaction_math, mark_processed_by_signatures,
mark_selected_transactions_processed, restore_processed_by_signatures,
restore_selected_transactions_processed, select_transactions_for_block,
spawn_processed_cleanup, stream_selected_transaction_originals,
};
use crate::records::memory::network_mapping::NodeInfo;
use crate::records::memory::torrent_status::prune_torrent_statuses_through_height;
use crate::records::record_chain::parse_transactions::handle_transactions;
use crate::records::record_chain::pending_effects::PendingEffects;
use crate::records::record_chain::previous_difficulty::previous_block_difficulty;
use crate::records::record_chain::rewards_tx::finalize_rewards_through_height;
use crate::records::record_chain::save_flags::SAVE_FLAG;
use crate::records::record_chain::structs::{
SaveBinaryDataParams, SaveBinaryDataWithMempoolStreamParams, SaveBlockParams, SaveType,
};
use crate::records::unpack_block::unpack_header::load_block_header;
use crate::torrent::create_metadata::{broadcast_new_torrent_to_peers, metadata_from_file};
use crate::torrent::torrenting_system::save_torrent::prune_staged_torrents;
use crate::torrent::torrenting_system::torrent_cache::prune_recent_torrents;
use crate::Arc;
use crate::Mutex;
use crate::PathBuf;
use crate::Utc;
use crate::{sled::Db, TimeZone};
pub async fn save_block(params: SaveBlockParams) -> Result<(), String> {
// Serialize all block saves through a single async lock so mining,
// sync, and orphan-driven writes cannot interleave.
let _lock = SAVE_FLAG.lock().await;
let SaveBlockParams {
block,
db,
header_hash,
timestamp,
signatures,
save_type,
allow_during_reorg,
map,
} = params;
let genesis_missing = !genesis_checkup().await;
if save_type.is_updating() {
if is_reorganizing_mode() && !allow_during_reorg {
return Err("Cannot save discovered block while reorganizing.".to_string());
}
} else {
if is_reorganizing_mode() {
return Err("Cannot save mined block while reorganizing.".to_string());
}
if is_syncing_mode() {
return Err("Cannot save mined block while syncing.".to_string());
}
if !is_mining_running() {
return Err("Cannot save mined block when mining is not running.".to_string());
}
if !genesis_missing && !is_normal_mode() {
return Err("Cannot save mined block outside normal node mode.".to_string());
}
}
let header_bytes = block
.vrf_block
.to_bytes()
.await
.map_err(|e| e.to_string())?;
let mut binary_data = header_bytes.clone();
let previous_hash = &block.vrf_block.unmined_block.previous_hash;
let miner = &block.vrf_block.unmined_block.miner;
// Ensure the block being saved really extends the current chain tip
// before any headers, files, or mempool effects are written.
let current_height = get_height(&db);
if current_height > 0 {
let current_block = load_block_header(current_height).await?;
let current_hash = current_block.hash().await;
if current_hash != *previous_hash {
if save_type == SaveType::Mining {
return Err(format!(
"Stale mining candidate: current chain tip changed before save. current_height={current_height}"
));
}
error!("Discovered block rejected: previous hash mismatch. current_height={current_height} current_tip_hash={current_hash} candidate_previous_hash={previous_hash}");
return Err("Incorrect previous_block_hash.".to_string());
}
}
let block_header_number = if !genesis_missing {
get_height(&db) + 1
} else {
0
};
// Capture the current difficulty context before the block is written
// so the saved-block diagnostic log reflects the live adjustment input.
let mut previous_difficulty = 0_u64;
if block_header_number > 0 {
previous_difficulty = previous_block_difficulty(block_header_number).await?;
}
log_saved_block_difficulty(
block_header_number,
timestamp,
previous_difficulty,
block.vrf_block.unmined_block.next_block_difficulty,
)
.await;
let mut index_counter = 0;
let index_mutex = Arc::new(Mutex::new(&mut index_counter));
let mut pending_effects = PendingEffects::default();
// Append transaction-derived record data to the block binary while
// tracking the index offset where mempool-derived records begin.
binary_data = handle_transactions(
&block,
binary_data,
&db,
index_mutex.clone(),
miner.clone(),
block_header_number,
&mut pending_effects,
)
.await?;
let start_index = {
let index = index_mutex.lock().await;
**index
};
// Locally mined blocks stream the original selected mempool records
// into the saved block file, while synced/discovered blocks already
// carry their transaction data in the downloaded payload.
if !save_type.is_updating() {
let selected = select_transactions_for_block(10_000_000)
.await
.map_err(|e| e.to_string())?;
apply_selected_transaction_math(
&selected,
&db,
miner.clone(),
block_header_number,
start_index,
&mut pending_effects,
)
.await
.map_err(|e| e.to_string())?;
save_binary_data_with_mempool_stream(SaveBinaryDataWithMempoolStreamParams {
data: &binary_data,
header_bytes: &header_bytes,
selected: &selected,
pending_effects: &pending_effects,
db: &db,
previous_height: current_height,
block_header_number,
difficulty: previous_difficulty,
timestamp,
header_hash: &header_hash,
previous_hash,
save_type: save_type.clone(),
miner: miner.clone(),
map,
})
.await?;
spawn_processed_cleanup(block_header_number);
} else {
save_binary_data(SaveBinaryDataParams {
data: &binary_data,
header_bytes: &header_bytes,
pending_effects: &pending_effects,
signatures: &signatures,
db: &db,
previous_height: current_height,
block_header_number,
difficulty: previous_difficulty,
timestamp,
header_hash: &header_hash,
previous_hash,
save_type,
miner: miner.clone(),
map,
})
.await?;
spawn_processed_cleanup(block_header_number);
}
Ok(())
}
async fn log_saved_block_difficulty(
block_number: u32,
timestamp: u32,
current_difficulty: u64,
new_difficulty: u64,
) {
// Skip genesis because it is the ASERT anchor itself.
if block_number == 0 {
return;
}
let Some((anchor_height, anchor_timestamp, anchor_difficulty)) = asert_genesis_anchor().await
else {
info!(
"[difficulty] saved_block={block_number} timestamp={timestamp} current_difficulty={current_difficulty} new_difficulty={new_difficulty} asert_anchor=missing"
);
return;
};
let elapsed_seconds = timestamp.saturating_sub(anchor_timestamp);
let expected_seconds = block_number
.saturating_sub(anchor_height)
.saturating_mul(15);
let error_seconds = elapsed_seconds as i64 - expected_seconds as i64;
info!(
"[difficulty] saved_block={block_number} timestamp={timestamp} target_seconds=15 anchor_height={anchor_height} elapsed_seconds={elapsed_seconds} expected_seconds={expected_seconds} error_seconds={error_seconds} anchor_difficulty={anchor_difficulty} current_difficulty={current_difficulty} new_difficulty={new_difficulty}"
);
}
async fn save_binary_data_with_mempool_stream(
params: SaveBinaryDataWithMempoolStreamParams<'_>,
) -> Result<(), String> {
let SaveBinaryDataWithMempoolStreamParams {
data,
header_bytes,
selected,
pending_effects,
db,
previous_height,
block_header_number,
difficulty,
timestamp,
header_hash,
previous_hash,
save_type,
miner,
map,
} = params;
// Build the on-disk block context and announce whether the save came
// from local mining or an updating path.
let (file_name, next_number, difficulty) = block_file_context(db, difficulty).await;
let current_time = format_block_time(timestamp);
if next_number != 0 {
info!("New block mined {next_number} at {current_time}");
} else if !save_type.is_updating() {
info!("Genesis block mined {next_number} at {current_time}");
} else {
info!("Genesis block discovered {next_number} at {current_time}");
}
let temp_file_name = format!("{file_name}.tmp");
let _ = fs::remove_file(&temp_file_name);
let mut file = fs::File::create(temp_file_name.clone()).map_err(|e| e.to_string())?;
std::io::Write::write_all(&mut file, data).map_err(|e| e.to_string())?;
// Preserve the original selected mempool records after the block
// payload so later unpacking can reconstruct the full transaction set.
stream_selected_transaction_originals(&mut file, selected)
.await
.map_err(|e| e.to_string())?;
drop(file);
fs::rename(&temp_file_name, &file_name).map_err(|e| e.to_string())?;
let mut applied_effects = match pending_effects.apply(db) {
Ok(applied_effects) => applied_effects,
Err(err) => {
cleanup_block_file(&file_name);
return Err(err);
}
};
if let Err(err) = mark_selected_transactions_processed(selected, block_header_number).await {
let _ = restore_selected_transactions_processed(selected).await;
if let Err(rollback_err) = applied_effects.rollback(db) {
error!("Failed to roll back block effects: {rollback_err}");
}
cleanup_block_file(&file_name);
return Err(err.to_string());
}
if let Err(err) = commit_block_indexes(db, block_header_number, header_bytes, header_hash) {
let _ = restore_selected_transactions_processed(selected).await;
if let Err(rollback_err) = applied_effects.rollback(db) {
error!("Failed to roll back block effects: {rollback_err}");
}
cleanup_block_file(&file_name);
return Err(err);
}
let torrent_bytes = match metadata_from_file(
&file_name,
next_number,
difficulty,
timestamp,
header_hash,
previous_hash,
miner.clone(),
)
.await
{
Ok(torrent_bytes) => torrent_bytes,
Err(err) => {
cleanup_block_indexes(db, block_header_number, header_hash);
cleanup_torrent_file(next_number);
let _ = restore_selected_transactions_processed(selected).await;
if let Err(rollback_err) = applied_effects.rollback(db) {
error!("Failed to roll back block effects: {rollback_err}");
}
cleanup_block_file(&file_name);
return Err(err);
}
};
if next_number != 0 {
if let Err(err) = increase_height(db) {
cleanup_block_indexes(db, block_header_number, header_hash);
cleanup_torrent_file(next_number);
let _ = restore_selected_transactions_processed(selected).await;
if let Err(rollback_err) = applied_effects.rollback(db) {
error!("Failed to roll back block effects: {rollback_err}");
}
cleanup_block_file(&file_name);
return Err(err);
}
let _ = update_snapshot(db, next_number, map.clone()).await;
if let Some(snapshot_height) = snapshot_height(db).await {
if let Err(err) = finalize_rewards_through_height(db, snapshot_height).await {
error!(
"Failed to finalize rewards through snapshot height {snapshot_height}: {err}"
);
}
prune_recent_torrents(snapshot_height).await;
prune_torrent_statuses_through_height(snapshot_height).await;
let _ = prune_staged_torrents(snapshot_height).await;
}
} else {
let _ = update_snapshot(db, next_number, map.clone()).await;
}
if !is_syncing_mode() {
broadcast_new_torrent_to_peers(next_number, &torrent_bytes, map).await;
}
// Only advance mined-count tracking when this save actually moved
// the persisted chain height forward.
if get_height(db) > previous_height {
NodeInfo::increment_mined(&miner).await;
}
Ok(())
}
async fn save_binary_data(params: SaveBinaryDataParams<'_>) -> Result<(), String> {
let SaveBinaryDataParams {
data,
header_bytes,
pending_effects,
signatures,
db,
previous_height,
block_header_number,
difficulty,
timestamp,
header_hash,
previous_hash,
save_type,
miner,
map,
} = params;
// Build the on-disk block context and announce whether the save came
// from local mining or an updating path.
let (file_name, next_number, difficulty) = block_file_context(db, difficulty).await;
let current_time = format_block_time(timestamp);
if next_number != 0 {
if !save_type.is_updating() {
info!("New block mined {next_number} at {current_time}");
} else {
info!("New block discovered {next_number} at {current_time}");
}
} else if !save_type.is_updating() {
info!("Genesis block mined {next_number} at {current_time}");
} else {
info!("Genesis block discovered {next_number} at {current_time}");
}
let temp_file_name = format!("{file_name}.tmp");
let _ = fs::remove_file(&temp_file_name);
fs::write(&temp_file_name, data).map_err(|e| e.to_string())?;
fs::rename(&temp_file_name, &file_name).map_err(|e| e.to_string())?;
let mut applied_effects = match pending_effects.apply(db) {
Ok(applied_effects) => applied_effects,
Err(err) => {
cleanup_block_file(&file_name);
return Err(err);
}
};
let chunk_size = 1000;
for chunk in signatures.chunks(chunk_size) {
if let Err(err) = mark_processed_by_signatures(chunk, block_header_number).await {
let _ = restore_processed_by_signatures(signatures).await;
if let Err(rollback_err) = applied_effects.rollback(db) {
error!("Failed to roll back block effects: {rollback_err}");
}
cleanup_block_file(&file_name);
return Err(err.to_string());
}
}
if let Err(err) = commit_block_indexes(db, block_header_number, header_bytes, header_hash) {
let _ = restore_processed_by_signatures(signatures).await;
if let Err(rollback_err) = applied_effects.rollback(db) {
error!("Failed to roll back block effects: {rollback_err}");
}
cleanup_block_file(&file_name);
return Err(err);
}
let torrent_bytes = match metadata_from_file(
&file_name,
next_number,
difficulty,
timestamp,
header_hash,
previous_hash,
miner.clone(),
)
.await
{
Ok(torrent_bytes) => torrent_bytes,
Err(err) => {
cleanup_block_indexes(db, block_header_number, header_hash);
cleanup_torrent_file(next_number);
let _ = restore_processed_by_signatures(signatures).await;
if let Err(rollback_err) = applied_effects.rollback(db) {
error!("Failed to roll back block effects: {rollback_err}");
}
cleanup_block_file(&file_name);
return Err(err);
}
};
if next_number != 0 {
if let Err(err) = increase_height(db) {
cleanup_block_indexes(db, block_header_number, header_hash);
cleanup_torrent_file(next_number);
let _ = restore_processed_by_signatures(signatures).await;
if let Err(rollback_err) = applied_effects.rollback(db) {
error!("Failed to roll back block effects: {rollback_err}");
}
cleanup_block_file(&file_name);
return Err(err);
}
let _ = update_snapshot(db, next_number, map.clone()).await;
if let Some(snapshot_height) = snapshot_height(db).await {
if let Err(err) = finalize_rewards_through_height(db, snapshot_height).await {
error!(
"Failed to finalize rewards through snapshot height {snapshot_height}: {err}"
);
}
prune_recent_torrents(snapshot_height).await;
prune_torrent_statuses_through_height(snapshot_height).await;
let _ = prune_staged_torrents(snapshot_height).await;
}
} else {
let _ = update_snapshot(db, next_number, map.clone()).await;
}
if !is_syncing_mode() {
broadcast_new_torrent_to_peers(next_number, &torrent_bytes, map).await;
}
// Only advance mined-count tracking when this save actually moved
// the persisted chain height forward.
if get_height(db) > previous_height {
NodeInfo::increment_mined(&miner).await;
}
Ok(())
}
async fn block_file_context(db: &Db, mut difficulty: u64) -> (String, u32, u64) {
// Genesis uses block number 0 and a fixed starting difficulty,
// while all later saves append to the current chain height.
let current_height = get_height(db);
let next_number = if genesis_checkup().await {
current_height + 1
} else {
difficulty = 1200000000000000_u64;
0
};
let (
_network_name,
_padded_base_coin,
block_ext,
_torrent_path,
_wallet_path,
block_path,
_db_path,
_balance_path,
_log_path,
) = block_extension_and_paths();
let file_name = PathBuf::from(block_path)
.join(format!("{next_number}.{block_ext}"))
.to_string_lossy()
.into_owned();
(file_name, next_number, difficulty)
}
fn commit_block_indexes(
db: &Db,
block_header_number: u32,
header_bytes: &[u8],
header_hash: &str,
) -> Result<(), String> {
// Commit the height->header and hash->height indexes together after
// the block file exists but before height marks the block current.
let tree = db
.open_tree("block_headers")
.map_err(|e| format!("Failed to open block_headers tree: {e}"))?;
tree.insert(block_header_number.to_le_bytes(), header_bytes)
.map_err(|e| format!("Failed to write block header index: {e}"))?;
let hkey = decode(header_hash).map_err(|e| format!("Failed to decode header hash: {e}"))?;
let htree = db
.open_tree("block_hashes")
.map_err(|e| format!("Failed to open block_hashes tree: {e}"))?;
let hvalue = block_header_number.to_le_bytes();
htree
.insert(hkey, &hvalue)
.map_err(|e| format!("Failed to write block hash index: {e}"))?;
Ok(())
}
fn cleanup_block_indexes(db: &Db, block_header_number: u32, header_hash: &str) {
// If the final height commit fails, remove the indexes that would
// otherwise point at a block the chain height does not acknowledge.
if let Ok(tree) = db.open_tree("block_headers") {
let _ = tree.remove(block_header_number.to_le_bytes());
}
if let Ok(hkey) = decode(header_hash) {
if let Ok(htree) = db.open_tree("block_hashes") {
let _ = htree.remove(hkey);
}
}
}
fn cleanup_block_file(file_name: &str) {
let _ = fs::remove_file(file_name);
}
fn cleanup_torrent_file(block_number: u32) {
let (
_network_name,
_padded_base_coin,
_block_ext,
torrent_path,
_wallet_path,
_block_path,
_db_path,
_balance_path,
_log_path,
) = block_extension_and_paths();
let torrent_file = PathBuf::from(&torrent_path).join(format!("{block_number}.torrent"));
let temp_torrent_file = PathBuf::from(torrent_path).join(format!("{block_number}.torrent.tmp"));
let _ = fs::remove_file(torrent_file);
let _ = fs::remove_file(temp_torrent_file);
}
fn format_block_time(timestamp: u32) -> String {
match Utc.timestamp_opt(timestamp as i64, 0).single() {
Some(datetime) => datetime.format("%H:%M:%S").to_string(),
None => "invalid-time".to_string(),
}
}