diff --git a/binaries/linux-contractless-testnet-0.1.0.tar.gz b/binaries/linux-contractless-testnet-0.1.0.tar.gz deleted file mode 100644 index e6044c9..0000000 Binary files a/binaries/linux-contractless-testnet-0.1.0.tar.gz and /dev/null differ diff --git a/binaries/linux-contractless-testnet-0.1.0.zip b/binaries/linux-contractless-testnet-0.1.0.zip deleted file mode 100644 index 96d2a33..0000000 Binary files a/binaries/linux-contractless-testnet-0.1.0.zip and /dev/null differ diff --git a/src/rpc/server/command_loop_state.rs b/src/rpc/server/command_loop_state.rs index aa8b59f..526ad50 100644 --- a/src/rpc/server/command_loop_state.rs +++ b/src/rpc/server/command_loop_state.rs @@ -1,10 +1,10 @@ use crate::records::memory::connections::get_client_type_from_memory; use crate::records::memory::enums::ClientType; use crate::rpc::command_maps::RPC_REPLY; -use crate::rpc::read_bytes_from_stream::read_first_byte; use crate::rpc::server::connection_memory_manager::remove_stream_from_memory; use crate::rpc::server::flood_protection::check_request_frequency_with_client_type; use crate::rpc::server::structs::IncomingCommand; +use crate::io::ErrorKind; use crate::log::warn; use crate::sled::Db; use crate::Arc; @@ -12,33 +12,35 @@ use crate::Duration; use crate::Mutex; use crate::sleep; use crate::TcpStream; -use crate::timeout; -async fn wait_for_stream_data(stream_locked: &Arc>) -> Result { - // Poll the socket with peek so the command byte stays in the stream - // until the normal byte reader consumes it. +async fn read_next_command_byte(stream_locked: &Arc>) -> Result, String> { + // Poll for a command byte with a nonblocking read. This avoids holding + // the stream lock across an awaited read while still removing the need + // for peek-based readiness checks. let timeout_duration = Duration::from_millis(100); loop { let stream = stream_locked.lock().await; let mut buffer = [0; 1]; - match timeout(timeout_duration, async { stream.peek(&mut buffer).await }).await { - Ok(Ok(n)) => { - if n > 0 { - return Ok(true); - } - if stream.peer_addr().is_err() { - warn!("Dropped stream: {:?}", stream.peer_addr().unwrap_err()); - drop(stream); - remove_stream_from_memory(stream_locked).await; - return Ok(false); - } + match stream.try_read(&mut buffer) { + Ok(1) => return Ok(Some(buffer[0])), + Ok(0) => { + drop(stream); + remove_stream_from_memory(stream_locked).await; + return Ok(None); } - Ok(Err(_)) | Err(_) => { + Ok(_) => {} + Err(err) if err.kind() == ErrorKind::WouldBlock => { drop(stream); sleep(timeout_duration).await; } + Err(err) => { + warn!("Dropped stream: {err:?}"); + drop(stream); + remove_stream_from_memory(stream_locked).await; + return Ok(None); + } } } } @@ -61,12 +63,9 @@ pub async fn next_incoming_command( ) -> Result, String> { // A disconnected socket returns None so the caller can end the RPC // loop without treating a clean disconnect as a command failure. - if !wait_for_stream_data(&stream_locked).await? { + let Some(command) = read_next_command_byte(&stream_locked).await? else { return Ok(None); - } - - let command = - read_first_byte(connections_key, stream_locked.clone()).await?; + }; let ip = peer_ip(&stream_locked).await; // Connection memory is the source of truth for whether this stream