diff --git a/src/bin/ui.rs b/src/bin/ui.rs index e0f8f6d1..9c6799b7 100644 --- a/src/bin/ui.rs +++ b/src/bin/ui.rs @@ -431,7 +431,7 @@ fn load_form() -> (FormState, Option) { youtube_via_relay: false, passthrough_hosts: Vec::new(), block_quic: true, - block_stun: true, + block_stun: false, disable_padding: false, force_http1: false, tunnel_doh: true, @@ -695,8 +695,8 @@ struct ConfigWire<'a> { /// emit only when the user has explicitly disabled the block. #[serde(skip_serializing_if = "is_true")] block_doh: bool, - /// Default true. Emit only when the user disables STUN/TURN blocking. - #[serde(skip_serializing_if = "is_true")] + /// Default false. Emit only when the user enables STUN/TURN blocking. + #[serde(skip_serializing_if = "is_false")] block_stun: bool, #[serde(skip_serializing_if = "Vec::is_empty")] fronting_groups: &'a Vec, diff --git a/src/config.rs b/src/config.rs index d4251aa8..132b73b0 100644 --- a/src/config.rs +++ b/src/config.rs @@ -504,7 +504,7 @@ fn default_tunnel_doh() -> bool { true } /// Default for `block_quic`: `true`. QUIC over the TCP-based tunnel /// causes TCP-over-TCP meltdown (<1 Mbps). Browsers fall back to /// HTTPS/TCP within seconds of the silent UDP drop. Issue #793. -fn default_block_stun() -> bool { true } +fn default_block_stun() -> bool { false } fn default_block_quic() -> bool { true } /// Default for `block_doh`: `true` (browser DoH is rejected so the diff --git a/src/tunnel_client.rs b/src/tunnel_client.rs index 887561a3..54c21ffa 100644 --- a/src/tunnel_client.rs +++ b/src/tunnel_client.rs @@ -78,8 +78,8 @@ const INFLIGHT_ACTIVE: usize = 4; /// How many consecutive empty replies before dropping from active to idle depth. const INFLIGHT_COOLDOWN: u32 = 3; -/// Max sessions that can run at elevated pipeline depth per deployment. -const MAX_ELEVATED_PER_DEPLOYMENT: u64 = 30; +/// Max sessions that can run at elevated pipeline depth (total, not per deployment). +const MAX_ELEVATED_TOTAL: u64 = 10; /// Adaptive coalesce defaults: after each new op arrives, wait another /// step for more ops. Resets on every arrival, up to max from the first @@ -442,7 +442,7 @@ impl TunnelMux { .batch_timeout() .saturating_add(REPLY_TIMEOUT_SLACK); pipeline_debug::set_limits( - MAX_ELEVATED_PER_DEPLOYMENT * unique_n as u64, + MAX_ELEVATED_TOTAL, (CONCURRENCY_PER_DEPLOYMENT * unique_n) as u64, ); let (tx, rx) = mpsc::unbounded_channel(); @@ -462,7 +462,7 @@ impl TunnelMux { unreachable_cache: Mutex::new(HashMap::new()), reply_timeout, elevated_sessions: AtomicU64::new(0), - max_elevated: MAX_ELEVATED_PER_DEPLOYMENT * unique_n as u64, + max_elevated: MAX_ELEVATED_TOTAL, }) } @@ -1439,7 +1439,7 @@ async fn tunnel_loop( let mut next_data_write_seq: u64 = 0; let mut eof_seen = false; let mut client_closed = false; - let mut pending_writes: BTreeMap = BTreeMap::new(); + let mut pending_writes: BTreeMap = BTreeMap::new(); // Buffered upload data waiting to be sent (when pipeline is full). let mut buffered_upload: Option = None; @@ -1597,12 +1597,12 @@ async fn tunnel_loop( next_write_seq += 1; while let Some(entry) = pending_writes.first_entry() { if *entry.key() != next_write_seq { break; } - let (_, (buffered_resp, _)) = entry.remove_entry(); + let (_, (buffered_resp, _, _)) = entry.remove_entry(); let _ = write_tunnel_response(&mut writer, &buffered_resp).await; next_write_seq += 1; } } else { - pending_writes.insert(meta.seq, (resp, script_id)); + pending_writes.insert(meta.seq, (resp, script_id, meta.was_empty_poll)); } continue; } @@ -1632,6 +1632,41 @@ async fn tunnel_loop( } } + // Escalating backoff: avoid flooding empty polls on idle + // sessions. Mirrors the pre-pipelining cadence. + let keepalive_delay = match consecutive_empty { + 0 => Duration::from_millis(20), + 1 => Duration::from_millis(80), + 2 => Duration::from_millis(200), + 3 => Duration::from_millis(500), + _ => Duration::from_secs(2), + }; + if consecutive_empty > 0 { + // Wait for either the backoff timer or client data. + if !client_closed { + read_buf.reserve(65536); + tokio::select! { + biased; + result = reader.read_buf(&mut read_buf) => { + match result { + Ok(0) => break, + Ok(n) => { + consecutive_empty = 0; + let data = extract_bytes(&mut read_buf, n); + let (meta, reply_rx) = send_data_op(sid, data, &mut next_send_seq, &mut next_data_write_seq, mux); + inflight.push(wrap_reply(meta, reply_rx)); + continue; + } + Err(_) => break, + } + } + _ = tokio::time::sleep(keepalive_delay) => {} + } + } else { + tokio::time::sleep(keepalive_delay).await; + } + } + let (meta, reply_rx) = send_empty_poll(sid, &mut next_send_seq, mux); tracing::debug!( "sess {}: keepalive poll seq={}", &sid[..sid.len().min(8)], meta.seq @@ -1640,8 +1675,9 @@ async fn tunnel_loop( } // Can we read from the client? Yes if not closed, not eof, and - // we have room for more inflight ops (fast-path allows +4 extra). - let can_read = !client_closed && !eof_seen && inflight.len() < max_inflight + 4; + // we have room for more inflight ops (+2 extra for upload data + // so it doesn't wait for a full pipeline drain). + let can_read = !client_closed && !eof_seen && inflight.len() < max_inflight + 2; tokio::select! { biased; @@ -1712,8 +1748,14 @@ async fn tunnel_loop( consecutive_data = consecutive_data.saturating_add(1); let bytes = resp.d.as_ref().map(|d| d.len() as u64 * 3 / 4).unwrap_or(0); total_download_bytes += bytes; + } else if meta.was_empty_poll && consecutive_data > 0 { + // Stale empty-poll reply during an active data + // streak — don't penalise the streak. The poll + // was queued before data started flowing; the + // empty result is expected. } else { consecutive_empty = consecutive_empty.saturating_add(1); + consecutive_data = 0; } if is_eof { eof_seen = true; @@ -1722,7 +1764,7 @@ async fn tunnel_loop( // Flush buffered out-of-order writes. while let Some(entry) = pending_writes.first_entry() { if *entry.key() != next_write_seq { break; } - let (_, (buffered_resp, _)) = entry.remove_entry(); + let (_, (buffered_resp, _, buf_was_empty_poll)) = entry.remove_entry(); let buf_eof = buffered_resp.eof.unwrap_or(false); match write_tunnel_response(&mut writer, &buffered_resp).await? { WriteOutcome::Wrote => { @@ -1732,7 +1774,12 @@ async fn tunnel_loop( total_download_bytes += bytes; } WriteOutcome::NoData => { - consecutive_empty = consecutive_empty.saturating_add(1); + if buf_was_empty_poll && consecutive_data > 0 { + // Stale empty poll — don't break data streak. + } else { + consecutive_empty = consecutive_empty.saturating_add(1); + consecutive_data = 0; + } } WriteOutcome::BadBase64 => break, } @@ -1742,7 +1789,7 @@ async fn tunnel_loop( } } } else { - pending_writes.insert(meta.seq, (resp, script_id)); + pending_writes.insert(meta.seq, (resp, script_id, meta.was_empty_poll)); } // Send buffered upload data now that a slot freed up. @@ -1810,9 +1857,12 @@ async fn tunnel_loop( } // Schedule refill if pipeline needs more polls. + // Skip refill at IDLE depth with consecutive empties — + // the keepalive path handles that with proper backoff. if !eof_seen && inflight.len() < max_inflight && refill_at.is_none() + && !(max_inflight <= INFLIGHT_IDLE && consecutive_empty >= 2) { refill_at = Some(Box::pin(tokio::time::sleep( if max_inflight > INFLIGHT_IDLE { Duration::from_millis(100) } else { Duration::ZERO } @@ -1854,8 +1904,9 @@ async fn tunnel_loop( let (meta, reply_rx) = send_data_op(sid, data, &mut next_send_seq, &mut next_data_write_seq, mux); consecutive_empty = 0; inflight.push(wrap_reply(meta, reply_rx)); - } else if inflight.len() < max_inflight + 4 { - // Fast-path: pipeline full but under +4 extra. + } else if inflight.len() < max_inflight + 2 { + // Two extra slots for upload data so it doesn't + // wait for a full pipeline drain. let (meta, reply_rx) = send_data_op(sid, data, &mut next_send_seq, &mut next_data_write_seq, mux); consecutive_empty = 0; inflight.push(wrap_reply(meta, reply_rx)); @@ -2200,7 +2251,7 @@ mod tests { // `fronter.batch_timeout()` (see `TunnelMux::start`). reply_timeout: Duration::from_secs(35), elevated_sessions: AtomicU64::new(0), - max_elevated: MAX_ELEVATED_PER_DEPLOYMENT * num_scripts as u64, + max_elevated: MAX_ELEVATED_TOTAL, }); (mux, rx) }