Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 54 additions & 68 deletions crates/vite_task/src/session/execute/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@ pub mod fingerprint;
pub mod glob_inputs;
pub mod spawn;

use std::{collections::BTreeMap, process::Stdio, sync::Arc};
use std::{collections::BTreeMap, io::Write as _, process::Stdio, sync::Arc};

use futures_util::FutureExt;
use tokio::io::AsyncWriteExt as _;
use vite_path::AbsolutePath;
use vite_task_plan::{
ExecutionGraph, ExecutionItemDisplay, ExecutionItemKind, LeafExecutionKind, SpawnCommand,
Expand Down Expand Up @@ -144,21 +143,18 @@ impl ExecutionContext<'_> {
LeafExecutionKind::InProcess(in_process_execution) => {
// In-process (built-in) commands: caching is disabled, execute synchronously
let mut stdio_config = leaf_reporter
.start(CacheStatus::Disabled(CacheDisabledReason::InProcessExecution))
.await;
.start(CacheStatus::Disabled(CacheDisabledReason::InProcessExecution));

let execution_output = in_process_execution.execute();
// Write output to the stdout writer from StdioConfig
let _ = stdio_config.stdout_writer.write_all(&execution_output.stdout).await;
let _ = stdio_config.stdout_writer.flush().await;
let _ = stdio_config.stdout_writer.write_all(&execution_output.stdout);
let _ = stdio_config.stdout_writer.flush();

leaf_reporter
.finish(
None,
CacheUpdateStatus::NotUpdated(CacheNotUpdatedReason::CacheDisabled),
None,
)
.await;
leaf_reporter.finish(
None,
CacheUpdateStatus::NotUpdated(CacheNotUpdatedReason::CacheDisabled),
None,
);
false
}
LeafExecutionKind::Spawn(spawn_execution) => {
Expand Down Expand Up @@ -220,13 +216,11 @@ pub async fn execute_spawn(
) {
Ok(inputs) => inputs,
Err(err) => {
leaf_reporter
.finish(
None,
CacheUpdateStatus::NotUpdated(CacheNotUpdatedReason::CacheDisabled),
Some(ExecutionError::Cache { kind: CacheErrorKind::Lookup, source: err }),
)
.await;
leaf_reporter.finish(
None,
CacheUpdateStatus::NotUpdated(CacheNotUpdatedReason::CacheDisabled),
Some(ExecutionError::Cache { kind: CacheErrorKind::Lookup, source: err }),
);
return SpawnOutcome::Failed;
}
};
Expand All @@ -247,13 +241,11 @@ pub async fn execute_spawn(
Err(err) => {
// Cache lookup error — report through finish.
// Note: start() is NOT called because we don't have a valid cache status.
leaf_reporter
.finish(
None,
CacheUpdateStatus::NotUpdated(CacheNotUpdatedReason::CacheDisabled),
Some(ExecutionError::Cache { kind: CacheErrorKind::Lookup, source: err }),
)
.await;
leaf_reporter.finish(
None,
CacheUpdateStatus::NotUpdated(CacheNotUpdatedReason::CacheDisabled),
Some(ExecutionError::Cache { kind: CacheErrorKind::Lookup, source: err }),
);
return SpawnOutcome::Failed;
}
}
Expand All @@ -263,23 +255,25 @@ pub async fn execute_spawn(
};

// 2. Report execution start with the determined cache status.
// Returns StdioConfig with the reporter's suggestion and async writers.
let mut stdio_config = leaf_reporter.start(cache_status).await;
// Returns StdioConfig with the reporter's suggestion and writers.
let mut stdio_config = leaf_reporter.start(cache_status);

// 3. If cache hit, replay outputs via the StdioConfig writers and finish early.
// No need to actually execute the command — just replay what was cached.
if let Some(cached) = cached_value {
for output in cached.std_outputs.iter() {
let writer: &mut (dyn tokio::io::AsyncWrite + Unpin) = match output.kind {
let writer: &mut dyn std::io::Write = match output.kind {
spawn::OutputKind::StdOut => &mut stdio_config.stdout_writer,
spawn::OutputKind::StdErr => &mut stdio_config.stderr_writer,
};
let _ = writer.write_all(&output.content).await;
let _ = writer.flush().await;
let _ = writer.write_all(&output.content);
let _ = writer.flush();
}
leaf_reporter
.finish(None, CacheUpdateStatus::NotUpdated(CacheNotUpdatedReason::CacheHit), None)
.await;
leaf_reporter.finish(
None,
CacheUpdateStatus::NotUpdated(CacheNotUpdatedReason::CacheHit),
None,
);
return SpawnOutcome::CacheHit;
}

Expand All @@ -293,29 +287,25 @@ pub async fn execute_spawn(
if use_inherited {
// Inherited mode: all three stdio FDs (stdin, stdout, stderr) are inherited
// from the parent process. No fspy tracking, no output capture.
// Drop the StdioConfig writers before spawning to avoid holding tokio::io::Stdout
// Drop the StdioConfig writers before spawning to avoid holding std::io::Stdout
// while the child also writes to the same FD.
drop(stdio_config);

match spawn_inherited(&spawn_execution.spawn_command).await {
Ok(result) => {
leaf_reporter
.finish(
Some(result.exit_status),
CacheUpdateStatus::NotUpdated(CacheNotUpdatedReason::CacheDisabled),
None,
)
.await;
leaf_reporter.finish(
Some(result.exit_status),
CacheUpdateStatus::NotUpdated(CacheNotUpdatedReason::CacheDisabled),
None,
);
return SpawnOutcome::Spawned(result.exit_status);
}
Err(err) => {
leaf_reporter
.finish(
None,
CacheUpdateStatus::NotUpdated(CacheNotUpdatedReason::CacheDisabled),
Some(ExecutionError::Spawn(err)),
)
.await;
leaf_reporter.finish(
None,
CacheUpdateStatus::NotUpdated(CacheNotUpdatedReason::CacheDisabled),
Some(ExecutionError::Spawn(err)),
);
return SpawnOutcome::Failed;
}
}
Expand Down Expand Up @@ -346,13 +336,11 @@ pub async fn execute_spawn(
{
Ok(negs) => negs,
Err(err) => {
leaf_reporter
.finish(
None,
CacheUpdateStatus::NotUpdated(CacheNotUpdatedReason::CacheDisabled),
Some(ExecutionError::PostRunFingerprint(err)),
)
.await;
leaf_reporter.finish(
None,
CacheUpdateStatus::NotUpdated(CacheNotUpdatedReason::CacheDisabled),
Some(ExecutionError::PostRunFingerprint(err)),
);
return SpawnOutcome::Failed;
}
}
Expand All @@ -367,8 +355,8 @@ pub async fn execute_spawn(
let result = match spawn_with_tracking(
&spawn_execution.spawn_command,
cache_base_path,
&mut stdio_config.stdout_writer,
&mut stdio_config.stderr_writer,
&mut *stdio_config.stdout_writer,
&mut *stdio_config.stderr_writer,
std_outputs.as_mut(),
path_accesses.as_mut(),
&resolved_negatives,
Expand All @@ -377,13 +365,11 @@ pub async fn execute_spawn(
{
Ok(result) => result,
Err(err) => {
leaf_reporter
.finish(
None,
CacheUpdateStatus::NotUpdated(CacheNotUpdatedReason::CacheDisabled),
Some(ExecutionError::Spawn(err)),
)
.await;
leaf_reporter.finish(
None,
CacheUpdateStatus::NotUpdated(CacheNotUpdatedReason::CacheDisabled),
Some(ExecutionError::Spawn(err)),
);
return SpawnOutcome::Failed;
}
};
Expand Down Expand Up @@ -456,7 +442,7 @@ pub async fn execute_spawn(
// 7. Finish the leaf execution with the result and optional cache error.
// Cache update/fingerprint failures are reported but do not affect the outcome —
// the process ran, so we return its actual exit status.
leaf_reporter.finish(Some(result.exit_status), cache_update_status, cache_error).await;
leaf_reporter.finish(Some(result.exit_status), cache_update_status, cache_error);

SpawnOutcome::Spawned(result.exit_status)
}
Expand Down Expand Up @@ -560,6 +546,6 @@ impl Session<'_> {

// Leaf-level errors and non-zero exit statuses are tracked internally
// by the reporter.
reporter.finish().await
reporter.finish()
}
}
19 changes: 10 additions & 9 deletions crates/vite_task/src/session/execute/spawn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

use std::{
collections::hash_map::Entry,
io::Write,
process::{ExitStatus, Stdio},
time::{Duration, Instant},
};
Expand All @@ -10,7 +11,7 @@ use bincode::{Decode, Encode};
use fspy::AccessMode;
use rustc_hash::FxHashSet;
use serde::Serialize;
use tokio::io::{AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _};
use tokio::io::AsyncReadExt as _;
use vite_path::{AbsolutePath, RelativePathBuf};
use vite_task_plan::SpawnCommand;
use wax::Program as _;
Expand Down Expand Up @@ -72,8 +73,8 @@ pub struct TrackedPathAccesses {
pub async fn spawn_with_tracking(
spawn_command: &SpawnCommand,
workspace_root: &AbsolutePath,
stdout_writer: &mut (dyn AsyncWrite + Unpin),
stderr_writer: &mut (dyn AsyncWrite + Unpin),
stdout_writer: &mut dyn Write,
stderr_writer: &mut dyn Write,
std_outputs: Option<&mut Vec<StdOutput>>,
path_accesses: Option<&mut TrackedPathAccesses>,
resolved_negatives: &[wax::Glob<'static>],
Expand Down Expand Up @@ -128,9 +129,9 @@ pub async fn spawn_with_tracking(
0 => stdout_done = true,
n => {
let content = stdout_buf[..n].to_vec();
// Write to the async writer immediately
stdout_writer.write_all(&content).await?;
stdout_writer.flush().await?;
// Write to the sync writer immediately
stdout_writer.write_all(&content)?;
stdout_writer.flush()?;
// Store outputs for caching
if let Some(outputs) = &mut outputs {
if let Some(last) = outputs.last_mut()
Expand All @@ -149,9 +150,9 @@ pub async fn spawn_with_tracking(
0 => stderr_done = true,
n => {
let content = stderr_buf[..n].to_vec();
// Write to the async writer immediately
stderr_writer.write_all(&content).await?;
stderr_writer.flush().await?;
// Write to the sync writer immediately
stderr_writer.write_all(&content)?;
stderr_writer.flush()?;
// Store outputs for caching
if let Some(outputs) = &mut outputs {
if let Some(last) = outputs.last_mut()
Expand Down
4 changes: 2 additions & 2 deletions crates/vite_task/src/session/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ impl<'a> Session<'a> {

let builder = LabeledReporterBuilder::new(
self.workspace_path(),
Box::new(tokio::io::stdout()),
Box::new(std::io::stdout()),
run_command.flags.verbose,
Some(self.make_summary_writer()),
self.program_name.clone(),
Expand Down Expand Up @@ -602,7 +602,7 @@ impl<'a> Session<'a> {

// Create a plain (standalone) reporter — no graph awareness, no summary
let plain_reporter =
reporter::PlainReporter::new(silent_if_cache_hit, Box::new(tokio::io::stdout()));
reporter::PlainReporter::new(silent_if_cache_hit, Box::new(std::io::stdout()));

// Execute the spawn directly using the free function, bypassing the graph pipeline
let outcome = execute::execute_spawn(
Expand Down
Loading
Loading