Skip to content

feat: add chunked file upload support Streaming Upload API (rx.upload_files_chunk)#6190

Open
FarhanAliRaza wants to merge 12 commits intoreflex-dev:mainfrom
FarhanAliRaza:chunked-upload
Open

feat: add chunked file upload support Streaming Upload API (rx.upload_files_chunk)#6190
FarhanAliRaza wants to merge 12 commits intoreflex-dev:mainfrom
FarhanAliRaza:chunked-upload

Conversation

@FarhanAliRaza
Copy link
Collaborator

@FarhanAliRaza FarhanAliRaza commented Mar 18, 2026

Implement chunked/streaming file uploads to handle large files without loading them entirely into memory. Moves upload handling logic from app.py to event.py, adds chunked upload JS helpers, and updates the upload component to support the new upload_files_chunk API. Includes unit and integration tests for chunked upload, cancel, and streaming.

All Submissions:

  • Have you followed the guidelines stated in CONTRIBUTING.md file?
  • Have you checked to ensure there aren't any other open Pull Requests for the desired changed?

Type of change

Please delete options that are not relevant.

  • New feature (non-breaking change which adds functionality)
  • This change requires a documentation update

New Feature Submission:

  • Does your submission pass the tests?
  • Have you linted your code locally prior to submission?

Changes To Core Features:

  • Have you added an explanation of what your changes do and why you'd like us to include them?
  • Have you written new tests for your core changes, as applicable?
  • Have you successfully ran tests with your changes locally?

closes #6184

…_files_chunk)

Implement chunked/streaming file uploads to handle large files without
loading them entirely into memory. Moves upload handling logic from
app.py to event.py, adds chunked upload JS helpers, and updates the
upload component to support the new upload_files_chunk API. Includes
unit and integration tests for chunked upload, cancel, and streaming.
@codspeed-hq
Copy link

codspeed-hq bot commented Mar 18, 2026

Merging this PR will improve performance by 3.48%

⚡ 1 improved benchmark
✅ 7 untouched benchmarks

Performance Changes

Benchmark BASE HEAD Efficiency
test_compile_stateful[_stateful_page] 150.9 µs 145.8 µs +3.48%

Comparing FarhanAliRaza:chunked-upload (c4e6690) with main (7ee3026)

Open in CodSpeed

@greptile-apps
Copy link
Contributor

greptile-apps bot commented Mar 18, 2026

Greptile Summary

This PR introduces a streaming/chunked file upload API (rx.upload_files_chunk) that avoids loading entire files into memory. Upload handling is extracted from app.py into a new reflex/uploads.py module, a new /_upload_chunk endpoint is registered, and a custom async multipart parser (_UploadChunkMultipartParser) pushes raw bytes incrementally to a background event handler via the new UploadChunkIterator. The frontend gains a parallel uploadFilesChunk JS helper that posts to the new endpoint. Test coverage includes both unit and integration tests for streaming, cancellation, and progress tracking.

Key issues found:

  • on_drop_rejected incorrectly typed with _on_drop_args_spec — rejected-file callbacks should never receive an UploadChunkIterator; using the chunk spec here allows users to accidentally wire a streaming handler to on_drop_rejected. The on_drop_rejected field in both GhostUpload and Upload should keep the original _on_drop_spec.
  • No timeout when awaiting a cancelled background task — on ClientDisconnect, task.cancel() is followed by an unbounded await task. A handler with a slow finally block will stall the route indefinitely.
  • O(n) list.pop(0) in _flush_emitted_chunks — high-throughput small-chunk uploads repeatedly shift a list; use a deque or swap-and-iterate pattern instead.
  • UploadFile imported via reflex.app re-export in event.py — since UploadFile now lives in reflex.uploads, the deferred import in resolve_upload_handler_param should reference reflex.uploads directly.
  • Hardcoded handler name strings "uploadFiles" / "uploadFilesChunk" — used as identifiers in both upload.py and state.js; should be extracted into shared constants.

Confidence Score: 2/5

  • Two logic bugs (unbounded await on disconnect and incorrect on_drop_rejected spec) need to be fixed before merging.
  • The architecture is sound and tests are thorough, but the unbounded await task on client disconnect can permanently stall the upload route, and the on_drop_rejected spec change can silently allow users to wire an incompatible streaming handler — both are observable runtime issues that should be addressed before shipping.
  • Pay close attention to reflex/uploads.py (disconnect handling) and reflex/components/core/upload.py (on_drop_rejected spec).

Important Files Changed

Filename Overview
reflex/uploads.py New module housing upload route handlers and the streaming multipart parser; contains an unbounded await task on disconnect and an O(n) pop(0) in the hot flush path.
reflex/event.py Adds UploadChunk, UploadChunkIterator, and UploadFilesChunk; imports UploadFile via reflex.app re-export instead of directly from reflex.uploads.
reflex/components/core/upload.py Introduces _on_drop_args_spec to support chunk uploads on on_drop, but incorrectly applies it to on_drop_rejected as well; also uses hardcoded handler name string literals.
reflex/.templates/web/utils/helpers/upload.js Refactors upload logic into shared sendUploadRequest helper and adds uploadFilesChunk for the new _upload_chunk endpoint; env.UPLOAD_CHUNK is populated automatically via set_env_json.
reflex/.templates/web/utils/state.js Routes uploadFilesChunk handler name to the new JS function; uses hardcoded handler name strings that should be constants.
reflex/app.py Moves upload route logic to reflex/uploads.py and registers the new _upload_chunk endpoint; clean refactor with no new issues.
reflex/constants/event.py Adds UPLOAD_CHUNK = "_upload_chunk" to the Endpoint enum; this is automatically picked up by set_env_json() so env.UPLOAD_CHUNK is correctly populated in the frontend.
tests/units/test_app.py Adds thorough unit tests for chunk streaming, invalid multipart data (returns 400), and background task draining.
tests/integration/test_upload.py Adds integration tests for streaming upload (chunk verification), cancel-on-disconnect, and progress tracking.

Sequence Diagram

sequenceDiagram
    participant Browser
    participant UploadChunkRoute as POST /_upload_chunk
    participant StateManager
    participant BackgroundTask as Background Handler
    participant UploadChunkIterator

    Browser->>UploadChunkRoute: POST multipart/form-data (headers: token, handler)
    UploadChunkRoute->>StateManager: get_state + _process_background(event)
    StateManager-->>UploadChunkRoute: asyncio.Task
    UploadChunkRoute->>UploadChunkIterator: set_consumer_task(task)
    Note over BackgroundTask: Task scheduled, awaiting chunks

    loop Stream parsing
        Browser->>UploadChunkRoute: HTTP body bytes (streamed)
        UploadChunkRoute->>UploadChunkIterator: push(UploadChunk)
        UploadChunkIterator-->>BackgroundTask: yield UploadChunk
        BackgroundTask->>StateManager: async with self (emit state update)
        StateManager-->>Browser: WebSocket state delta
    end

    UploadChunkRoute->>UploadChunkIterator: finish()
    BackgroundTask-->>StateManager: (completes, final state update)
    UploadChunkRoute-->>Browser: 202 Accepted

    alt Client Disconnects Mid-Upload
        Browser->>UploadChunkRoute: TCP close / ClientDisconnect
        UploadChunkRoute->>BackgroundTask: task.cancel()
        UploadChunkRoute->>BackgroundTask: await task (no timeout ⚠️)
        UploadChunkRoute-->>Browser: 200 (empty)
    end
Loading

Last reviewed commit: "feat: add chunked fi..."

reflex/upload.py Outdated
Comment on lines +522 to +526
except ClientDisconnect:
task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await task
return Response()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Unbounded await task on client disconnect

When the client disconnects, task.cancel() is called and then await task waits for it to complete. However, if the background handler has a long finally block or ignores cancellation (e.g., due to a blocking I/O call), this await will block indefinitely, leaving the upload route handler permanently stalled.

A timeout should be added so that the handler eventually returns regardless:

except ClientDisconnect:
    task.cancel()
    with contextlib.suppress(asyncio.CancelledError, asyncio.TimeoutError):
        await asyncio.wait_for(task, timeout=5.0)
    return Response()

Comment on lines +220 to +223
on_drop: EventHandler[_on_drop_args_spec]

# Fired when dropped files do not meet the specified criteria.
on_drop_rejected: EventHandler[_on_drop_spec]
on_drop_rejected: EventHandler[_on_drop_args_spec]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 on_drop_rejected should not use _on_drop_args_spec

on_drop_rejected fires when dropped files fail validation (wrong type, too large, etc.) — it is called with a list of rejected file records, never with an UploadChunkIterator. Changing its spec to _on_drop_args_spec (which includes passthrough_event_spec(UploadChunkIterator)) allows users to accidentally attach a streaming-upload handler to on_drop_rejected, which is semantically wrong and will never work as intended.

This is also true for the Upload class at line 274.

Suggested change
on_drop: EventHandler[_on_drop_args_spec]
# Fired when dropped files do not meet the specified criteria.
on_drop_rejected: EventHandler[_on_drop_spec]
on_drop_rejected: EventHandler[_on_drop_args_spec]
# Fired when files are dropped.
on_drop: EventHandler[_on_drop_args_spec]
# Fired when dropped files do not meet the specified criteria.
on_drop_rejected: EventHandler[_on_drop_spec]

Comment on lines +216 to +219
async def _flush_emitted_chunks(self) -> None:
"""Push parsed upload chunks into the handler iterator."""
while self._chunks_to_emit:
await self.chunk_iter.push(self._chunks_to_emit.pop(0))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 O(n) list.pop(0) in hot upload path

self._chunks_to_emit is declared as list[UploadChunk]. Calling pop(0) on a list is O(n) because it shifts all elements left. For a high-throughput streaming upload that emits many small chunks, this degrades performance.

Either change _chunks_to_emit to a collections.deque and use popleft(), or simply iterate and clear:

async def _flush_emitted_chunks(self) -> None:
    """Push parsed upload chunks into the handler iterator."""
    chunks, self._chunks_to_emit = self._chunks_to_emit, []
    for chunk in chunks:
        await self.chunk_iter.push(chunk)

reflex/event.py Outdated
UploadTypeError: If the handler is a background task.
UploadValueError: If the handler does not accept ``list[rx.UploadFile]``.
"""
from reflex.app import UploadFile
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Indirect import of UploadFile via reflex.app re-export

UploadFile was moved from reflex.app to reflex.uploads in this PR, but this function still imports it through reflex.app's re-export. reflex.app itself imports from reflex.uploads, so this creates an unnecessary level of indirection. The deferred import should reference the defining module directly:

Suggested change
from reflex.app import UploadFile
from reflex.uploads import UploadFile

Comment on lines +318 to +321
if event.client_handler_name not in {
"uploadFiles",
"uploadFilesChunk",
}:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Hardcoded string identifiers should be extracted into constants

The string literals "uploadFiles" and "uploadFilesChunk" are used as identifiers in multiple places — here in upload.py and also inside state.js (applyRestEvent). Per the project's style guidelines, string literals used as identifiers or keys should be extracted into named constants to avoid typos and make future renaming easier.

Consider defining them as module-level constants (e.g., in reflex/constants/event.py or a dedicated location):

_UPLOAD_FILES_HANDLER = "uploadFiles"
_UPLOAD_FILES_CHUNK_HANDLER = "uploadFilesChunk"

and referencing them from both upload.py and state.js.

Rule Used: String literals that are used as identifiers or ke... (source)

Learnt From
reflex-dev/flexgen#2170

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

Copy link
Collaborator

@masenf masenf left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if possible, the frontend code should be consolidated. i don't think there's a need to change the frontend code at all. you should be able to detect which type of upload is used in the backend and dispatch to the correct upload type based on the resolved handler arg type

)

assert chunk.filename == "foo.txt"
assert isinstance(rx.UploadChunkIterator(), rx.UploadChunkIterator)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔

up_comp_5 = Upload.create(
id="foo_id",
on_drop=StreamingUploadStateTest.chunk_drop_handler(
cast(Any, rx.upload_files_chunk(upload_id="foo_id"))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use # pyright: ignore[...] comments so if we eventually fix the typing down the road, we can detect "useless type ignore" and remove them

Move upload helpers from reflex/upload.py to reflex/_upload.py, unify
the frontend to use a single uploadFiles function instead of separate
uploadFiles/uploadFilesChunk paths, and normalize upload payload keys
server-side in state.py instead of branching in the JS client.
@FarhanAliRaza
Copy link
Collaborator Author

Don't know why pre-commit is failing.

@masenf
Copy link
Collaborator

masenf commented Mar 18, 2026

Don't know why pre-commit is failing.

The pyi_generator script created files that different from the last known hash. i.e. "something" in the pyi output changed and it looks like it was something that most components are inheriting, so probably in the base component class

EDIT: actually i see that you added UploadFile as default import, so every pyi file got its hash changed. do we need UploadFile as a default import?

@FarhanAliRaza
Copy link
Collaborator Author

EDIT: actually i see that you added UploadFile as default import, so every pyi file got its hash changed. do we need UploadFile as a default import?

No we dont need it in default imports.
But it has another issue it was generating verbose pyi like this
image

even when we have a top-level import UploadFile in upload.pyi it still tries to do an absolute import. but can be made better.
I fixed this in pyi_generator.

@FarhanAliRaza FarhanAliRaza requested a review from masenf March 19, 2026 09:03
on_drop: EventHandler[_on_drop_args_spec]

# Fired when dropped files do not meet the specified criteria.
on_drop_rejected: EventHandler[_on_drop_spec]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a grey line comment, so i'll probably make it into a separate issue, but i don't think this is the right signature for the reject trigger, it shouldn't have anything to do with the files arg and i don't think the js object resembles a list[rx.UploadFile]

Comment on lines +92 to +113
class _UploadChunkMultipartParser:
"""Streaming multipart parser for streamed upload files."""

def __init__(
self,
headers: Headers,
stream: AsyncGenerator[bytes, None],
chunk_iter: UploadChunkIterator,
) -> None:
self.headers = headers
self.stream = stream
self.chunk_iter = chunk_iter
self._charset = ""
self._current_partial_header_name = b""
self._current_partial_header_value = b""
self._current_part = _UploadChunkPart()
self._chunks_to_emit: deque[UploadChunk] = deque()
self._seen_upload_chunk = False
self._part_count = 0
self._emitted_chunk_count = 0
self._emitted_bytes = 0
self._stream_chunk_count = 0
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be a dataclass

return None


@dataclasses.dataclass
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
@dataclasses.dataclass
@dataclasses.dataclass(kw_only=True, slots=True)

These flags should be considered for all dataclasses:

  1. kw_only increases modularity by allowing subclasses to have non-default arguments and not worry about argument ordering if the class is reorganized later
  2. slots increases performance by allocating a specific place for each field, particularly useful for classes that get instantiated or accessed frequently

i think we could probably add these to the UploadFile class as well

reflex/event.py Outdated
Comment on lines +98 to +102
@dataclasses.dataclass(
init=True,
frozen=True,
)
class UploadChunk:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
@dataclasses.dataclass(
init=True,
frozen=True,
)
class UploadChunk:
@dataclasses.dataclass(
init=True,
frozen=True,
kw_only=True,
slots=True,
)
class UploadChunk:

can this class also move to the _upload module?

reflex/event.py Outdated
data: bytes


class UploadChunkIterator(AsyncIterator[UploadChunk]):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this also move to _upload module, in the process of refactoring event.py and don't want to add additional implementations here (changes to existing stuff is fine)

reflex/state.py Outdated
except Exception:
type_hints = {}

await _normalize_upload_payload(handler, payload)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i dont really like this added in the hot path that all event processing goes through. where in the frontend are we hardcoding "files"? if that's easy enough to resolve on the frontend or maybe in the upload handler itself if possible that would be better.

…hrough upload endpoint

Move UploadChunk and UploadChunkIterator from reflex.event to reflex._upload,
use lazy imports to break circular dependencies, and remove early-return guards
for empty file lists. Empty uploads now flow through the normal upload path
instead of being short-circuited on the frontend or normalized via websocket
fallback (_normalize_upload_payload removed). Adds tests for empty buffered
and chunk uploads with aliased handler parameters.
reflex/event.py Outdated
Comment on lines +1104 to +1108
_upload_module = import_module("reflex._upload")
UploadChunk = _upload_module.UploadChunk
UploadChunkIterator = _upload_module.UploadChunkIterator


Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think these can be removed from this module as well.

to expose them at the top level reflex namespace, add them to the lazy import spec in reflex/__init__.py

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, it remained by mistake.

Re-export UploadChunk and UploadChunkIterator directly from
reflex._upload instead of re-importing them through reflex.event,
removing the eager import_module call at module load time.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Streaming Upload API (rx.upload_files_chunk)

2 participants