Contractless/src/records/memory/mempool/lookups.rs

593 lines
26 KiB
Rust

use super::*;
pub async fn signature_exists(signature: &str, hash: &str) -> Result<bool> {
let client_handle = db_client().await?;
let client = client_handle.as_ref();
// Check every mempool table because the signature column names differ by
// transaction type, especially for two-party swaps and loans.
let row = client
.query_one(
r#"
SELECT
CASE
WHEN EXISTS (SELECT 1 FROM transfer a WHERE a.signature = $1 AND a.hash = $2 AND a.processed = false)
OR EXISTS (SELECT 1 FROM token b WHERE b.signature = $1 AND b.hash = $2 AND b.processed = false)
OR EXISTS (SELECT 1 FROM issue_token c WHERE c.signature = $1 AND c.hash = $2 AND c.processed = false)
OR EXISTS (SELECT 1 FROM burn d WHERE d.signature = $1 AND d.hash = $2 AND d.processed = false)
OR EXISTS (SELECT 1 FROM nft e WHERE e.signature = $1 AND e.hash = $2 AND e.processed = false)
OR EXISTS (SELECT 1 FROM marketing f WHERE f.signature = $1 AND f.hash = $2 AND f.processed = false)
OR EXISTS (SELECT 1 FROM vanity_address va WHERE va.signature = $1 AND va.hash = $2 AND va.processed = false)
OR EXISTS (SELECT 1 FROM swap g WHERE g.signature1 = $1 AND g.hash = $2 AND g.processed = false)
OR EXISTS (SELECT 1 FROM swap h WHERE h.signature2 = $1 AND h.hash = $2 AND h.processed = false)
OR EXISTS (SELECT 1 FROM loan_contract i WHERE i.signature1 = $1 AND i.hash = $2 AND i.processed = false)
OR EXISTS (SELECT 1 FROM loan_contract j WHERE j.signature2 = $1 AND j.hash = $2 AND j.processed = false)
OR EXISTS (SELECT 1 FROM loan_payment k WHERE k.signature = $1 AND k.hash = $2 AND k.processed = false)
OR EXISTS (SELECT 1 FROM collateral_claim l WHERE l.signature = $1 AND l.hash = $2 AND l.processed = false)
THEN 1
ELSE 0
END AS signature_found;
"#,
&[&signature, &hash],
)
.await?;
let found: i32 = row.get(0);
Ok(found == 1)
}
pub async fn transaction_by_signature(signature: &str) -> RpcResponse {
let client_handle = match db_client().await {
Ok(client) => client,
Err(_) => return RpcResponse::Binary(Vec::new()),
};
let client = client_handle.as_ref();
// Return the original serialized transaction bytes, not a reconstructed
// row, so RPC callers receive the same payload that would enter a block.
let result = client
.query_opt(
r#"
SELECT original FROM (
SELECT original FROM transfer WHERE signature = $1 AND processed = false LIMIT 1
UNION ALL
SELECT original FROM token WHERE signature = $1 AND processed = false LIMIT 1
UNION ALL
SELECT original FROM issue_token WHERE signature = $1 AND processed = false LIMIT 1
UNION ALL
SELECT original FROM burn WHERE signature = $1 AND processed = false LIMIT 1
UNION ALL
SELECT original FROM nft WHERE signature = $1 AND processed = false LIMIT 1
UNION ALL
SELECT original FROM marketing WHERE signature = $1 AND processed = false LIMIT 1
UNION ALL
SELECT original FROM vanity_address WHERE signature = $1 AND processed = false LIMIT 1
UNION ALL
SELECT original FROM swap WHERE signature1 = $1 AND processed = false LIMIT 1
UNION ALL
SELECT original FROM swap WHERE signature2 = $1 AND processed = false LIMIT 1
UNION ALL
SELECT original FROM loan_contract WHERE signature1 = $1 AND processed = false LIMIT 1
UNION ALL
SELECT original FROM loan_contract WHERE signature2 = $1 AND processed = false LIMIT 1
UNION ALL
SELECT original FROM loan_payment WHERE signature = $1 AND processed = false LIMIT 1
UNION ALL
SELECT original FROM collateral_claim WHERE signature = $1 AND processed = false LIMIT 1
) AS subquery LIMIT 1
"#,
&[&signature],
)
.await;
match result {
Ok(Some(row)) => {
let bytes: Vec<u8> = row.get(0);
RpcResponse::Binary(bytes)
}
_ => RpcResponse::Binary(Vec::new()),
}
}
pub async fn transactions_by_address(db: &Db, address: &str) -> RpcResponse {
let client_handle = match db_client().await {
Ok(client) => client,
Err(_) => return RpcResponse::Binary(Vec::new()),
};
let client = client_handle.as_ref();
// Canonicalize vanity aliases before querying pending rows.
let addresses = canonical_mempool_addresses(db, address);
// Concatenate original transaction bytes; the RPC/bin caller can split the
// stream by transaction type and fixed byte length.
let rows = match client
.query(
r#"
SELECT original FROM (
SELECT original FROM transfer WHERE receiver = ANY($1) AND processed = false
UNION ALL
SELECT original FROM token WHERE creator = ANY($1) AND processed = false
UNION ALL
SELECT original FROM issue_token WHERE creator = ANY($1) AND processed = false
UNION ALL
SELECT original FROM burn WHERE address = ANY($1) AND processed = false
UNION ALL
SELECT original FROM nft WHERE creator = ANY($1) AND processed = false
UNION ALL
SELECT original FROM marketing WHERE advertiser = ANY($1) AND processed = false
UNION ALL
SELECT original FROM vanity_address WHERE address = ANY($1) AND processed = false
UNION ALL
SELECT original FROM swap WHERE sender1 = ANY($1) AND processed = false
UNION ALL
SELECT original FROM swap WHERE sender2 = ANY($1) AND processed = false
UNION ALL
SELECT original FROM loan_contract WHERE lender = ANY($1) AND processed = false
UNION ALL
SELECT original FROM loan_contract WHERE borrower = ANY($1) AND processed = false
UNION ALL
SELECT original FROM loan_payment WHERE address = ANY($1) AND processed = false
UNION ALL
SELECT original FROM collateral_claim WHERE address = ANY($1) AND processed = false
) AS subquery;
"#,
&[&addresses],
)
.await
{
Ok(r) => r,
Err(_) => return RpcResponse::Binary(Vec::new()),
};
let mut bytes = Vec::new();
for row in rows {
let chunk: Vec<u8> = row.get(0);
bytes.extend(chunk);
}
RpcResponse::Binary(bytes)
}
pub async fn latest_pending_txids_by_address(db: &Db, address: &str, limit: usize) -> Vec<Vec<u8>> {
if limit == 0 {
return Vec::new();
}
let client_handle = match db_client().await {
Ok(client) => client,
Err(_) => return Vec::new(),
};
let client = client_handle.as_ref();
let addresses = canonical_mempool_addresses(db, address);
let limit = i64::try_from(limit).unwrap_or(i64::MAX);
let rows = match client
.query(
r#"
SELECT hash FROM (
SELECT DISTINCT ON (hash) hash, time, source_id FROM (
SELECT hash, time, id AS source_id FROM transfer
WHERE (sender = ANY($1) OR receiver = ANY($1)) AND processed = false
UNION ALL
SELECT hash, time, id AS source_id FROM token
WHERE creator = ANY($1) AND processed = false
UNION ALL
SELECT hash, time, id AS source_id FROM issue_token
WHERE creator = ANY($1) AND processed = false
UNION ALL
SELECT hash, time, id AS source_id FROM burn
WHERE address = ANY($1) AND processed = false
UNION ALL
SELECT hash, time, id AS source_id FROM nft
WHERE creator = ANY($1) AND processed = false
UNION ALL
SELECT hash, time, id AS source_id FROM marketing
WHERE advertiser = ANY($1) AND processed = false
UNION ALL
SELECT hash, time, id AS source_id FROM vanity_address
WHERE address = ANY($1) AND processed = false
UNION ALL
SELECT hash, time, id AS source_id FROM swap
WHERE (sender1 = ANY($1) OR sender2 = ANY($1)) AND processed = false
UNION ALL
SELECT hash, time, id AS source_id FROM loan_contract
WHERE (lender = ANY($1) OR borrower = ANY($1)) AND processed = false
UNION ALL
SELECT hash, time, id AS source_id FROM loan_payment
WHERE address = ANY($1) AND processed = false
UNION ALL
SELECT hash, time, id AS source_id FROM collateral_claim
WHERE address = ANY($1) AND processed = false
) AS pending
ORDER BY hash, time DESC, source_id DESC
) AS deduped
ORDER BY time DESC, source_id DESC
LIMIT $2
"#,
&[&addresses, &limit],
)
.await
{
Ok(rows) => rows,
Err(_) => return Vec::new(),
};
let mut txids = rows
.into_iter()
.filter_map(|row| {
let hash: String = row.get("hash");
decode(&hash).ok().filter(|bytes| bytes.len() == 32)
})
.collect::<Vec<_>>();
txids.truncate(limit as usize);
txids
}
pub async fn pending_transaction_by_txid(txid: &[u8]) -> Option<Vec<u8>> {
if txid.len() != 32 {
return None;
}
let client_handle = db_client().await.ok()?;
let client = client_handle.as_ref();
let hash = crate::encode(txid);
let row = client
.query_opt(
r#"
SELECT original FROM (
SELECT original FROM transfer WHERE hash = $1 AND processed = false
UNION ALL
SELECT original FROM token WHERE hash = $1 AND processed = false
UNION ALL
SELECT original FROM issue_token WHERE hash = $1 AND processed = false
UNION ALL
SELECT original FROM burn WHERE hash = $1 AND processed = false
UNION ALL
SELECT original FROM nft WHERE hash = $1 AND processed = false
UNION ALL
SELECT original FROM marketing WHERE hash = $1 AND processed = false
UNION ALL
SELECT original FROM vanity_address WHERE hash = $1 AND processed = false
UNION ALL
SELECT original FROM swap WHERE hash = $1 AND processed = false
UNION ALL
SELECT original FROM loan_contract WHERE hash = $1 AND processed = false
UNION ALL
SELECT original FROM loan_payment WHERE hash = $1 AND processed = false
UNION ALL
SELECT original FROM collateral_claim WHERE hash = $1 AND processed = false
) AS subquery LIMIT 1
"#,
&[&hash],
)
.await
.ok()?;
row.map(|row| row.get("original"))
}
pub async fn largest_fee() -> RpcResponse {
let client_handle = match db_client().await {
Ok(client) => client,
Err(_) => return RpcResponse::Binary(0u32.to_le_bytes().to_vec()),
};
let client = client_handle.as_ref();
// Swaps have two possible fees, so both sides are included in the max.
let row = match client
.query_one(
r#"
SELECT MAX(fee) AS largest_txid FROM (
SELECT CAST(MAX(fee) AS BIGINT) AS fee FROM transfer
UNION ALL
SELECT CAST(MAX(fee) AS BIGINT) AS fee FROM token
UNION ALL
SELECT CAST(MAX(fee) AS BIGINT) AS fee FROM issue_token
UNION ALL
SELECT CAST(MAX(fee) AS BIGINT) AS fee FROM burn
UNION ALL
SELECT CAST(MAX(fee) AS BIGINT) AS fee FROM nft
UNION ALL
SELECT CAST(MAX(fee) AS BIGINT) AS fee FROM marketing
UNION ALL
SELECT CAST(MAX(fee) AS BIGINT) AS fee FROM vanity_address
UNION ALL
SELECT CAST(MAX(fee1) AS BIGINT) AS fee FROM swap
UNION ALL
SELECT CAST(MAX(fee2) AS BIGINT) AS fee FROM swap
UNION ALL
SELECT CAST(MAX(fee) AS BIGINT) AS fee FROM loan_contract
UNION ALL
SELECT CAST(MAX(fee) AS BIGINT) AS fee FROM loan_payment
UNION ALL
SELECT CAST(MAX(fee) AS BIGINT) AS fee FROM collateral_claim
) AS combined_max_txids;
"#,
&[],
)
.await
{
Ok(r) => r,
Err(_) => return RpcResponse::Binary(0u32.to_le_bytes().to_vec()),
};
let max_fee: Option<i64> = row.get(0);
let fee = (max_fee.unwrap_or(0) as u64).to_le_bytes().to_vec();
RpcResponse::Binary(fee)
}
async fn pending_saved_loan_payment_balance(
db: &Db,
addresses: &[String],
coin: &str,
) -> Result<u64, Box<dyn std::error::Error + Send + Sync>> {
let client_handle = db_client().await?;
let client = client_handle.as_ref();
let rows = client
.query(
r#"
SELECT payback_amount, tip, contract_hash
FROM loan_payment
WHERE address = ANY($1)
AND processed = false
"#,
&[&addresses],
)
.await?;
let mut total = 0u64;
for row in rows {
let contract_hash: String = row.get("contract_hash");
let Ok(contract_key) = decode(&contract_hash) else {
continue;
};
let RpcResponse::Binary(bytes) = request_transaction_by_txid(db, contract_key).await;
if bytes.is_empty() || bytes[0] != 7 {
continue;
}
let Ok(contract) = LoanContractTransaction::from_bytes(7, &bytes[1..]).await else {
continue;
};
if !contract
.unsigned_loan_contract
.loan_coin
.trim()
.eq_ignore_ascii_case(coin.trim())
{
continue;
}
let payback_amount: i64 = row.get("payback_amount");
let tip: i64 = row.get("tip");
let payment_total = (payback_amount.max(0) as u64)
.checked_add(tip.max(0) as u64)
.ok_or_else(|| std::io::Error::other("Pending loan payment reservation overflowed"))?;
total = total
.checked_add(payment_total)
.ok_or_else(|| std::io::Error::other("Pending loan payment reservation overflowed"))?;
}
Ok(total)
}
pub async fn get_coin_balance(
db: &Db,
address: &str,
coin: &str,
) -> Result<u64, Box<dyn std::error::Error + Send + Sync>> {
let client_handle = db_client().await?;
let client = client_handle.as_ref();
// Pending-balance checks use canonical addresses so vanity and short
// address inputs see the same outgoing obligations.
let addresses = canonical_mempool_addresses(db, address);
let (asset_name, nft_series) = nft_asset_parts(coin);
let nft_series = nft_series as i32;
let row = client
.query_one(
r#"
SELECT CAST((
COALESCE((SELECT SUM(t.value)
FROM transfer t
WHERE t.sender = ANY($1) AND t.coin = $2 AND t.nft_series = $3 AND t.processed = false), 0)
+ COALESCE((SELECT SUM(tok.number)
FROM token tok
WHERE tok.creator = ANY($1) AND tok.ticker = $2 AND tok.processed = false), 0)
+ COALESCE((SELECT SUM(it.number)
FROM issue_token it
WHERE it.creator = ANY($1) AND it.ticker = $2 AND it.processed = false), 0)
+ COALESCE((SELECT SUM(b.value)
FROM burn b
WHERE b.address = ANY($1) AND b.coin = $2 AND b.nft_series = $3 AND b.processed = false), 0)
+ COALESCE((SELECT SUM(s.value1)
FROM swap s
WHERE s.sender1 = ANY($1) AND s.ticker1 = $2 AND s.nft_series1 = $3 AND s.processed = false), 0)
+ COALESCE((SELECT SUM(s.tip1)
FROM swap s
WHERE s.sender1 = ANY($1) AND s.ticker1 = $2 AND s.nft_series1 = $3 AND s.processed = false), 0)
+ COALESCE((SELECT SUM(s.value2)
FROM swap s
WHERE s.sender2 = ANY($1) AND s.ticker2 = $2 AND s.nft_series2 = $3 AND s.processed = false), 0)
+ COALESCE((SELECT SUM(s.tip2)
FROM swap s
WHERE s.sender2 = ANY($1) AND s.ticker2 = $2 AND s.nft_series2 = $3 AND s.processed = false), 0)
+ COALESCE((SELECT SUM(lc.loan_amount)
FROM loan_contract lc
WHERE lc.lender = ANY($1) AND lc.loan_coin = $2 AND lc.processed = false), 0)
+ COALESCE((SELECT SUM(lc.collateral_amount)
FROM loan_contract lc
WHERE lc.borrower = ANY($1) AND lc.collateral = $2 AND lc.processed = false), 0)
+ COALESCE((
SELECT SUM(lp.payback_amount)
FROM loan_payment lp
JOIN loan_contract lc ON lc.txid = lp.contract_hash
WHERE lp.address = ANY($1) AND lc.loan_coin = $2 AND lp.processed = false AND lc.processed = false
), 0)
+ COALESCE((
SELECT SUM(lp.tip)
FROM loan_payment lp
JOIN loan_contract lc ON lc.txid = lp.contract_hash
WHERE lp.address = ANY($1) AND lc.loan_coin = $2 AND lp.processed = false AND lc.processed = false
), 0)
) AS BIGINT) AS total
"#,
&[&addresses, &asset_name, &nft_series],
)
.await?;
// Negative projections are clamped because callers only need the amount
// already reserved by pending mempool activity.
let total: i64 = row.get(0);
let chain_loan_payments = pending_saved_loan_payment_balance(db, &addresses, coin).await?;
Ok((total.max(0) as u64).saturating_add(chain_loan_payments))
}
pub async fn get_basecoin_balance(
db: &Db,
address: &str,
) -> Result<u64, Box<dyn std::error::Error + Send + Sync>> {
let client_handle = db_client().await?;
let client = client_handle.as_ref();
let addresses = canonical_mempool_addresses(db, address);
// Base coin projection includes direct base transfers plus all fees and
// any pending loan/swap movements denominated in the base coin.
let row = client
.query_one(
r#"
SELECT CAST((
COALESCE((SELECT SUM(t.value)
FROM transfer t
WHERE t.sender = ANY($1) AND t.coin = $2 AND t.processed = false), 0)
+ COALESCE((SELECT SUM(t.fee) FROM transfer t WHERE t.sender = ANY($1) AND t.processed = false), 0)
+ COALESCE((SELECT SUM(b.value)
FROM burn b
WHERE b.address = ANY($1) AND b.coin = $2 AND b.processed = false), 0)
+ COALESCE((SELECT SUM(x.fee) FROM token x WHERE x.creator = ANY($1) AND x.processed = false), 0)
+ COALESCE((SELECT SUM(it.fee) FROM issue_token it WHERE it.creator = ANY($1) AND it.processed = false), 0)
+ COALESCE((SELECT SUM(b.fee) FROM burn b WHERE b.address = ANY($1) AND b.processed = false), 0)
+ COALESCE((SELECT SUM(n.fee) FROM nft n WHERE n.creator = ANY($1) AND n.processed = false), 0)
+ COALESCE((SELECT SUM(m.fee) FROM marketing m WHERE m.advertiser = ANY($1) AND m.processed = false), 0)
+ COALESCE((SELECT SUM(v.fee) FROM vanity_address v WHERE v.address = ANY($1) AND v.processed = false), 0)
+ COALESCE((SELECT SUM(cc.fee) FROM collateral_claim cc WHERE cc.address = ANY($1) AND cc.processed = false), 0)
+ COALESCE((SELECT SUM(lc.fee) FROM loan_contract lc WHERE lc.lender = ANY($1) AND lc.processed = false), 0)
+ COALESCE((SELECT SUM(lp.fee) FROM loan_payment lp WHERE lp.address = ANY($1) AND lp.processed = false), 0)
+ COALESCE((SELECT SUM(lc.loan_amount)
FROM loan_contract lc
WHERE lc.lender = ANY($1) AND lc.loan_coin = $2 AND lc.processed = false), 0)
+ COALESCE((SELECT SUM(lc.collateral_amount)
FROM loan_contract lc
WHERE lc.borrower = ANY($1) AND lc.collateral = $2 AND lc.processed = false), 0)
+ COALESCE((
SELECT SUM(lp.payback_amount)
FROM loan_payment lp
JOIN loan_contract lc ON lc.txid = lp.contract_hash
WHERE lp.address = ANY($1) AND lc.loan_coin = $2 AND lp.processed = false AND lc.processed = false
), 0)
+ COALESCE((
SELECT SUM(lp.tip)
FROM loan_payment lp
JOIN loan_contract lc ON lc.txid = lp.contract_hash
WHERE lp.address = ANY($1) AND lc.loan_coin = $2 AND lp.processed = false AND lc.processed = false
), 0)
+ COALESCE((SELECT SUM(s.fee1) FROM swap s WHERE s.sender1 = ANY($1) AND s.processed = false), 0)
+ COALESCE((SELECT SUM(s.fee2) FROM swap s WHERE s.sender2 = ANY($1) AND s.processed = false), 0)
+ COALESCE((SELECT SUM(s.value1) FROM swap s WHERE s.sender1 = ANY($1) AND s.ticker1 = $2 AND s.processed = false), 0)
+ COALESCE((SELECT SUM(s.tip1) FROM swap s WHERE s.sender1 = ANY($1) AND s.ticker1 = $2 AND s.processed = false), 0)
+ COALESCE((SELECT SUM(s.value2) FROM swap s WHERE s.sender2 = ANY($1) AND s.ticker2 = $2 AND s.processed = false), 0)
+ COALESCE((SELECT SUM(s.tip2) FROM swap s WHERE s.sender2 = ANY($1) AND s.ticker2 = $2 AND s.processed = false), 0)
) AS BIGINT) AS total
"#,
&[&addresses, &*BASECOIN],
)
.await?;
let total: i64 = row.get(0);
let chain_loan_payments = pending_saved_loan_payment_balance(db, &addresses, &BASECOIN).await?;
Ok((total.max(0) as u64).saturating_add(chain_loan_payments))
}
pub async fn get_pending_payments_for_contract(
contract_hash: &str,
) -> Result<u64, Box<dyn std::error::Error + Send + Sync>> {
let client_handle = db_client().await?;
let client = client_handle.as_ref();
// Mempool/UI callers can use this for unconfirmed payment visibility.
// Consensus validation must use confirmed contract payments only.
let row = client
.query_one(
r#"
SELECT CAST(COALESCE(SUM(lp.payback_amount), 0) AS BIGINT) AS total
FROM loan_payment lp
WHERE lp.contract_hash = $1
AND lp.processed = false
"#,
&[&contract_hash],
)
.await?;
let total: i64 = row.get(0);
Ok(total.max(0) as u64)
}
pub async fn total_transactions() -> RpcResponse {
let client_handle = match db_client().await {
Ok(client) => client,
Err(_) => return RpcResponse::Binary(vec![0; 8]),
};
let client = client_handle.as_ref();
// Count rows across all mempool tables, including processed rows that may
// still be retained briefly for orphan rollback.
let row = match client
.query_one(
r#"
SELECT CAST(SUM(row_count) AS BIGINT) AS total_rows FROM (
SELECT COUNT(*) AS row_count FROM transfer
UNION ALL
SELECT COUNT(*) AS row_count FROM token
UNION ALL
SELECT COUNT(*) AS row_count FROM issue_token
UNION ALL
SELECT COUNT(*) AS row_count FROM burn
UNION ALL
SELECT COUNT(*) AS row_count FROM nft
UNION ALL
SELECT COUNT(*) AS row_count FROM marketing
UNION ALL
SELECT COUNT(*) AS row_count FROM vanity_address
UNION ALL
SELECT COUNT(*) AS row_count FROM swap
UNION ALL
SELECT COUNT(*) AS row_count FROM loan_contract
UNION ALL
SELECT COUNT(*) AS row_count FROM loan_payment
UNION ALL
SELECT COUNT(*) AS row_count FROM collateral_claim
) AS combined;
"#,
&[],
)
.await
{
Ok(r) => r,
Err(_) => return RpcResponse::Binary(vec![0; 8]),
};
let total: Option<i64> = row.get(0);
let result = (total.unwrap_or(0) as u32).to_le_bytes().to_vec();
RpcResponse::Binary(result)
}