Fix Windows node RPC reply handling

This commit is contained in:
viraladmin 2026-05-24 22:53:51 -06:00
parent 118466d9cd
commit b5bbf348a6
3 changed files with 21 additions and 22 deletions

View File

@ -1,10 +1,10 @@
use crate::records::memory::connections::get_client_type_from_memory; use crate::records::memory::connections::get_client_type_from_memory;
use crate::records::memory::enums::ClientType; use crate::records::memory::enums::ClientType;
use crate::rpc::command_maps::RPC_REPLY; use crate::rpc::command_maps::RPC_REPLY;
use crate::rpc::read_bytes_from_stream::read_first_byte;
use crate::rpc::server::connection_memory_manager::remove_stream_from_memory; use crate::rpc::server::connection_memory_manager::remove_stream_from_memory;
use crate::rpc::server::flood_protection::check_request_frequency_with_client_type; use crate::rpc::server::flood_protection::check_request_frequency_with_client_type;
use crate::rpc::server::structs::IncomingCommand; use crate::rpc::server::structs::IncomingCommand;
use crate::io::ErrorKind;
use crate::log::warn; use crate::log::warn;
use crate::sled::Db; use crate::sled::Db;
use crate::Arc; use crate::Arc;
@ -12,33 +12,35 @@ use crate::Duration;
use crate::Mutex; use crate::Mutex;
use crate::sleep; use crate::sleep;
use crate::TcpStream; use crate::TcpStream;
use crate::timeout;
async fn wait_for_stream_data(stream_locked: &Arc<Mutex<TcpStream>>) -> Result<bool, String> { async fn read_next_command_byte(stream_locked: &Arc<Mutex<TcpStream>>) -> Result<Option<u8>, String> {
// Poll the socket with peek so the command byte stays in the stream // Poll for a command byte with a nonblocking read. This avoids holding
// until the normal byte reader consumes it. // the stream lock across an awaited read while still removing the need
// for peek-based readiness checks.
let timeout_duration = Duration::from_millis(100); let timeout_duration = Duration::from_millis(100);
loop { loop {
let stream = stream_locked.lock().await; let stream = stream_locked.lock().await;
let mut buffer = [0; 1]; let mut buffer = [0; 1];
match timeout(timeout_duration, async { stream.peek(&mut buffer).await }).await { match stream.try_read(&mut buffer) {
Ok(Ok(n)) => { Ok(1) => return Ok(Some(buffer[0])),
if n > 0 { Ok(0) => {
return Ok(true); drop(stream);
} remove_stream_from_memory(stream_locked).await;
if stream.peer_addr().is_err() { return Ok(None);
warn!("Dropped stream: {:?}", stream.peer_addr().unwrap_err());
drop(stream);
remove_stream_from_memory(stream_locked).await;
return Ok(false);
}
} }
Ok(Err(_)) | Err(_) => { Ok(_) => {}
Err(err) if err.kind() == ErrorKind::WouldBlock => {
drop(stream); drop(stream);
sleep(timeout_duration).await; sleep(timeout_duration).await;
} }
Err(err) => {
warn!("Dropped stream: {err:?}");
drop(stream);
remove_stream_from_memory(stream_locked).await;
return Ok(None);
}
} }
} }
} }
@ -61,12 +63,9 @@ pub async fn next_incoming_command(
) -> Result<Option<IncomingCommand>, String> { ) -> Result<Option<IncomingCommand>, String> {
// A disconnected socket returns None so the caller can end the RPC // A disconnected socket returns None so the caller can end the RPC
// loop without treating a clean disconnect as a command failure. // loop without treating a clean disconnect as a command failure.
if !wait_for_stream_data(&stream_locked).await? { let Some(command) = read_next_command_byte(&stream_locked).await? else {
return Ok(None); return Ok(None);
} };
let command =
read_first_byte(connections_key, stream_locked.clone()).await?;
let ip = peer_ip(&stream_locked).await; let ip = peer_ip(&stream_locked).await;
// Connection memory is the source of truth for whether this stream // Connection memory is the source of truth for whether this stream