diff --git a/_test_unstructured_client/unit/test_pptx_split.py b/_test_unstructured_client/unit/test_pptx_split.py new file mode 100644 index 00000000..6138f3bd --- /dev/null +++ b/_test_unstructured_client/unit/test_pptx_split.py @@ -0,0 +1,421 @@ +from __future__ import annotations + +import io +import math +import zipfile +from pathlib import Path +from typing import Iterable, Optional +from unittest.mock import patch + +import httpx +import pytest + +from unstructured_client._hooks.custom import pptx_utils +from unstructured_client._hooks.custom.split_pdf_hook import ( + DEFAULT_CONCURRENCY_LEVEL, + HI_RES_STRATEGY, + SplitPdfHook, + get_optimal_split_size, +) +from unstructured_client._hooks.types import BeforeRequestContext + + +# --------------------------------------------------------------------------- +# Synthetic .pptx builder +# +# The splitter only reads the OOXML package structure (presentation.xml, +# its rels, the per-slide rels, and [Content_Types].xml) — it never parses +# slide *content*. So a structurally-faithful package with placeholder XML +# bodies is enough to exercise every code path hermetically, without depending +# on a real PowerPoint file checked into the repo. +# --------------------------------------------------------------------------- + +_REL_NS = "http://schemas.openxmlformats.org/officeDocument/2006/relationships" + + +def build_synthetic_pptx( + num_slides: int, + notes_for: Optional[Iterable[int]] = None, + rels_overrides: bool = False, +) -> bytes: + """Build a minimal but structurally valid .pptx with ``num_slides`` slides. + + Args: + num_slides: Number of slides to create. + notes_for: 1-based slide numbers that should reference a notes slide. + rels_overrides: If True, declare a ```` content-type for every + ``.rels`` part too (mirroring decks that don't rely solely on the + ```` rule). Exercises the override pruning. + """ + notes = set(notes_for or ()) + parts: dict[str, bytes] = {} + + # Shared parts. + parts["_rels/.rels"] = ( + f'' + f'' + "" + ).encode() + parts["ppt/theme/theme1.xml"] = b"" + parts["ppt/media/image1.png"] = b"\x89PNG\r\n\x1a\nshared-image" + parts["ppt/slideMasters/slideMaster1.xml"] = b"" + parts["ppt/slideMasters/_rels/slideMaster1.xml.rels"] = ( + f'' + f'' + f'' + "" + ).encode() + parts["ppt/slideLayouts/slideLayout1.xml"] = b"" + parts["ppt/slideLayouts/_rels/slideLayout1.xml.rels"] = ( + f'' + f'' + "" + ).encode() + parts["ppt/notesMasters/notesMaster1.xml"] = b"" + + # Slides + their rels (+ optional notes). + for n in range(1, num_slides + 1): + parts[f"ppt/slides/slide{n}.xml"] = f"".encode() + slide_rels = [ + f'', + f'', + ] + if n in notes: + slide_rels.append( + f'' + ) + parts[f"ppt/notesSlides/notesSlide{n}.xml"] = f"".encode() + parts[f"ppt/notesSlides/_rels/notesSlide{n}.xml.rels"] = ( + f'' + f'' + f'' + "" + ).encode() + parts[f"ppt/slides/_rels/slide{n}.xml.rels"] = ( + f'' + + "".join(slide_rels) + + "" + ).encode() + + # presentation.xml: master id list + slide id list (rId14.. for slides). + sld_ids = "".join( + f'' for n in range(1, num_slides + 1) + ) + parts["ppt/presentation.xml"] = ( + '' + '' + '' + f"{sld_ids}" + '' + "" + ).encode() + + pres_rels = [ + f'', + f'', + f'', + ] + for n in range(1, num_slides + 1): + pres_rels.append( + f'' + ) + parts["ppt/_rels/presentation.xml.rels"] = ( + f'' + + "".join(pres_rels) + + "" + ).encode() + + # [Content_Types].xml + overrides = [ + '', + '', + '', + '', + ] + for n in range(1, num_slides + 1): + overrides.append( + f'' + ) + if rels_overrides: + overrides.append( + f'' + ) + if n in notes: + overrides.append( + f'' + ) + parts["[Content_Types].xml"] = ( + '' + '' + '' + '' + '' + + "".join(overrides) + + "" + ).encode() + + buffer = io.BytesIO() + with zipfile.ZipFile(buffer, "w", zipfile.ZIP_DEFLATED) as out: + # [Content_Types].xml must be first per OPC convention. + out.writestr("[Content_Types].xml", parts.pop("[Content_Types].xml")) + for name, data in parts.items(): + out.writestr(name, data) + return buffer.getvalue() + + +def _names(chunk: bytes) -> set[str]: + with zipfile.ZipFile(io.BytesIO(chunk)) as z: + return set(z.namelist()) + + +def _override_partnames(chunk: bytes) -> list[str]: + import re + + with zipfile.ZipFile(io.BytesIO(chunk)) as z: + ct = z.read("[Content_Types].xml").decode() + return [p.lstrip("/") for p in re.findall(r'") + assert pptx_utils.is_pptx(buf.getvalue()) is False + + +def test_is_pptx_accepts_binaryio(): + stream = io.BytesIO(build_synthetic_pptx(2)) + assert pptx_utils.is_pptx(stream) is True + + +def test_get_pptx_slide_count(): + assert pptx_utils.get_pptx_slide_count(build_synthetic_pptx(7)) == 7 + + +# --------------------------------------------------------------------------- +# Chunking +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize( + ("num_slides", "split_size"), + [(10, 3), (10, 5), (1, 2), (20, 20), (368, 20)], +) +def test_chunk_count_and_slide_totals(num_slides, split_size): + content = build_synthetic_pptx(num_slides) + chunks = pptx_utils.get_pptx_chunks_in_memory(content, split_size=split_size) + + assert len(chunks) == math.ceil(num_slides / split_size) + + total = 0 + for index, (buf, offset) in enumerate(chunks): + chunk_bytes = buf.getvalue() + count = pptx_utils.get_pptx_slide_count(chunk_bytes) + expected = min(split_size, num_slides - index * split_size) + assert count == expected + assert offset == index * split_size + total += count + assert total == num_slides + + +def test_chunks_are_valid_zip_with_no_dangling_overrides(): + content = build_synthetic_pptx(10, rels_overrides=True) + chunks = pptx_utils.get_pptx_chunks_in_memory(content, split_size=4) + + for buf, _ in chunks: + chunk_bytes = buf.getvalue() + names = _names(chunk_bytes) + # Every content-type Override must point at a part that exists. + for partname in _override_partnames(chunk_bytes): + assert partname in names, f"dangling override: {partname}" + + +def test_chunks_retain_shared_parts_and_drop_other_slides(): + content = build_synthetic_pptx(10) + chunks = pptx_utils.get_pptx_chunks_in_memory(content, split_size=4) + + first = chunks[0][0].getvalue() + names = _names(first) + # Shared parts retained. + for shared_part in ( + "ppt/slideMasters/slideMaster1.xml", + "ppt/slideLayouts/slideLayout1.xml", + "ppt/theme/theme1.xml", + "ppt/media/image1.png", + "ppt/notesMasters/notesMaster1.xml", + ): + assert shared_part in names + # Only this chunk's slides (1-4) are present. + assert "ppt/slides/slide1.xml" in names + assert "ppt/slides/slide4.xml" in names + assert "ppt/slides/slide5.xml" not in names + assert "ppt/slides/_rels/slide5.xml.rels" not in names + + +def test_notes_slides_follow_their_slides(): + # Slides 2 and 9 have notes. + content = build_synthetic_pptx(10, notes_for={2, 9}) + chunks = pptx_utils.get_pptx_chunks_in_memory(content, split_size=4) + + # Chunk 0 = slides 1-4 -> keeps notesSlide2, drops notesSlide9. + chunk0 = _names(chunks[0][0].getvalue()) + assert "ppt/notesSlides/notesSlide2.xml" in chunk0 + assert "ppt/notesSlides/notesSlide9.xml" not in chunk0 + + # Chunk 2 = slides 9-10 -> keeps notesSlide9, not notesSlide2. + chunk2 = _names(chunks[2][0].getvalue()) + assert "ppt/notesSlides/notesSlide9.xml" in chunk2 + assert "ppt/notesSlides/notesSlide2.xml" not in chunk2 + + +def test_page_range_subsetting(): + content = build_synthetic_pptx(10) + chunks = pptx_utils.get_pptx_chunks_in_memory( + content, split_size=2, page_start=3, page_end=6 + ) + # Slides 3..6 inclusive -> 4 slides, 2 chunks of 2. + assert len(chunks) == 2 + assert [offset for _, offset in chunks] == [2, 4] + assert sum(pptx_utils.get_pptx_slide_count(b.getvalue()) for b, _ in chunks) == 4 + + +def test_chunk_paths_writes_files(tmp_path): + content = build_synthetic_pptx(7) + tempdir, chunk_paths = pptx_utils.get_pptx_chunk_paths( + content, cache_tmp_data_dir=str(tmp_path), split_size=3 + ) + try: + assert len(chunk_paths) == 3 + for path, _offset in chunk_paths: + assert Path(path).exists() + assert pptx_utils.is_pptx(Path(path).read_bytes()) + finally: + tempdir.cleanup() + + +# --------------------------------------------------------------------------- +# Hook gating +# --------------------------------------------------------------------------- + + +def _make_ctx() -> BeforeRequestContext: + class _Config: + timeout_ms = None + retry_config = None + + ctx = BeforeRequestContext.__new__(BeforeRequestContext) + ctx.operation_id = "partition" + ctx.config = _Config() + return ctx + + +def _run_before_request(strategy: str, split_pdf_page: str, num_slides: int = 30): + hook = SplitPdfHook() + hook.client = httpx.Client() + content = build_synthetic_pptx(num_slides) + form_data = { + "files": { + "filename": "deck.pptx", + "content_type": pptx_utils.PPTX_CONTENT_TYPE, + "file": io.BytesIO(content), + }, + "split_pdf_page": split_pdf_page, + "strategy": strategy, + } + request = httpx.Request( + "POST", + "http://localhost:8000/general/v0/general", + headers={"Content-Type": "multipart/form-data; boundary=b"}, + ) + with patch( + "unstructured_client._hooks.custom.split_pdf_hook.request_utils.get_multipart_stream_fields", + return_value=form_data, + ): + result = hook.before_request(_make_ctx(), request) + return hook, result + + +def _is_split(result) -> bool: + return isinstance(result, httpx.Request) and result.url.path.endswith("/general/docs") + + +def test_hook_splits_pptx_for_hi_res(): + num_slides = 30 + hook, result = _run_before_request(HI_RES_STRATEGY, "true", num_slides=num_slides) + assert _is_split(result) + operation_id = result.headers["operation_id"] + coros = hook.coroutines_to_execute[operation_id] + + # The hook reuses the PDF default split sizing. + split_size = get_optimal_split_size(num_slides, DEFAULT_CONCURRENCY_LEVEL) + assert len(coros) == math.ceil(num_slides / split_size) + # Page numbers continue contiguously across chunks, starting at 1. + page_numbers = [c.keywords["page_number"] for c in coros] + assert page_numbers == list(range(1, num_slides + 1, split_size)) + hook._clear_operation(operation_id) + + +def test_hook_does_not_split_pptx_for_fast(): + _hook, result = _run_before_request("fast", "true") + assert not _is_split(result) + + +def test_hook_does_not_split_pptx_when_flag_false(): + _hook, result = _run_before_request(HI_RES_STRATEGY, "false") + assert not _is_split(result) + + +def test_hook_falls_back_to_whole_doc_when_slide_count_fails(): + with patch( + "unstructured_client._hooks.custom.split_pdf_hook.pptx_utils.get_pptx_slide_count", + side_effect=ValueError("boom"), + ): + hook, result = _run_before_request(HI_RES_STRATEGY, "true", num_slides=30) + # The original request is returned unchanged (whole deck sent unsplit). + assert not _is_split(result) + assert hook.coroutines_to_execute == {} + + +def test_hook_falls_back_to_whole_doc_when_chunking_fails(): + with patch( + "unstructured_client._hooks.custom.split_pdf_hook.pptx_utils.get_pptx_chunks_in_memory", + side_effect=RuntimeError("bad package"), + ): + hook, result = _run_before_request(HI_RES_STRATEGY, "true", num_slides=30) + assert not _is_split(result) + # Operation state is cleaned up after the fallback. + assert hook.coroutines_to_execute == {} + assert hook.allow_failed == {} + + +def test_hook_chunk_request_uses_pptx_content_type(): + hook, result = _run_before_request(HI_RES_STRATEGY, "true", num_slides=30) + operation_id = result.headers["operation_id"] + chunk_request = hook.coroutines_to_execute[operation_id][0].keywords[ + "pdf_chunk_request" + ] + body = chunk_request.read().decode("latin1") + assert pptx_utils.PPTX_CONTENT_TYPE in body + assert "deck.pptx" in body + assert 'name="split_pdf_page"' in body + hook._clear_operation(operation_id) diff --git a/src/unstructured_client/_hooks/custom/pptx_utils.py b/src/unstructured_client/_hooks/custom/pptx_utils.py new file mode 100644 index 00000000..1c16ba8a --- /dev/null +++ b/src/unstructured_client/_hooks/custom/pptx_utils.py @@ -0,0 +1,335 @@ +from __future__ import annotations + +import io +import logging +import posixpath +import re +import tempfile +import zipfile +from pathlib import Path +from typing import BinaryIO, Generator, Optional, Tuple, Union + +from unstructured_client._hooks.custom.common import UNSTRUCTURED_CLIENT_LOGGER_NAME +from unstructured_client._hooks.custom.validation_errors import FileValidationError + +logger = logging.getLogger(UNSTRUCTURED_CLIENT_LOGGER_NAME) + +PPTX_CONTENT_TYPE = ( + "application/vnd.openxmlformats-officedocument.presentationml.presentation" +) + +# Canonical OOXML package part names for a PresentationML package. +_PRESENTATION_PART = "ppt/presentation.xml" +_PRESENTATION_RELS_PART = "ppt/_rels/presentation.xml.rels" +_CONTENT_TYPES_PART = "[Content_Types].xml" + +# A slide id entry in presentation.xml, e.g. +_SLD_ID_RE = re.compile(rb"]*?/>") +# A relationship entry in a .rels part, e.g. +# +_RELATIONSHIP_RE = re.compile(rb"]*?/>") +# A content-types Override entry, e.g. +# +_OVERRIDE_RE = re.compile(rb"]*?/>") + +_SLIDE_PART_RE = re.compile(r"^ppt/slides/slide\d+\.xml$") +_SLIDE_RELS_PART_RE = re.compile(r"^ppt/slides/_rels/slide\d+\.xml\.rels$") +_NOTES_PART_RE = re.compile(r"^ppt/notesSlides/notesSlide\d+\.xml$") +_NOTES_RELS_PART_RE = re.compile(r"^ppt/notesSlides/_rels/notesSlide\d+\.xml\.rels$") + + +class PPTXValidationError(FileValidationError): + """Exception for PPTX validation errors.""" + + def __init__(self, message: str): + super().__init__(message, file_type="PPTX") + + +def _as_bytes(pptx_file: Union[BinaryIO, bytes]) -> bytes: + if isinstance(pptx_file, bytes): + return pptx_file + pptx_file.seek(0) + return pptx_file.read() + + +def is_pptx(pptx_file: Union[BinaryIO, bytes]) -> bool: + """Return True if the given file is an OOXML PresentationML package (.pptx). + + Detection is based on the package structure (a zip containing + ``ppt/presentation.xml``) rather than the filename or declared content type, + so it correctly rejects legacy ``.ppt`` (OLE) files, which are not zips. + """ + try: + content = _as_bytes(pptx_file) + with zipfile.ZipFile(io.BytesIO(content)) as archive: + return _PRESENTATION_PART in archive.namelist() + except (zipfile.BadZipFile, OSError): + return False + + +def _attr(entry: bytes, name: str) -> Optional[str]: + match = re.search(rb'\b' + name.encode() + rb'="([^"]*)"', entry) + return match.group(1).decode() if match else None + + +def _slide_id_entries(presentation_xml: bytes) -> list[tuple[bytes, str]]: + """Ordered ``(raw element, r:id)`` pairs from presentation.xml.""" + entries: list[tuple[bytes, str]] = [] + for match in _SLD_ID_RE.finditer(presentation_xml): + entry = match.group(0) + r_id = _attr(entry, "r:id") + if r_id is not None: + entries.append((entry, r_id)) + return entries + + +def get_pptx_slide_count(pptx_file: Union[BinaryIO, bytes]) -> int: + """Return the number of slides referenced in the presentation.""" + content = _as_bytes(pptx_file) + with zipfile.ZipFile(io.BytesIO(content)) as archive: + presentation_xml = archive.read(_PRESENTATION_PART) + return len(_slide_id_entries(presentation_xml)) + + +def _rid_to_target(rels_xml: bytes, type_suffix: str) -> dict[str, str]: + """Map relationship ``Id`` -> ``Target`` for relationships whose Type ends + with ``type_suffix`` (e.g. ``"/slide"``).""" + mapping: dict[str, str] = {} + for match in _RELATIONSHIP_RE.finditer(rels_xml): + entry = match.group(0) + rel_type = _attr(entry, "Type") + if rel_type is None or not rel_type.endswith(type_suffix): + continue + r_id = _attr(entry, "Id") + target = _attr(entry, "Target") + if r_id is not None and target is not None: + mapping[r_id] = target + return mapping + + +def _normalize_target(target: str, base_dir: str) -> str: + """Resolve a relationship Target (relative to ``base_dir``) into a package + part name (posix, no leading slash). Collapses ``..`` segments so e.g. + ``../notesSlides/notesSlide2.xml`` from ``ppt/slides`` becomes + ``ppt/notesSlides/notesSlide2.xml``.""" + if target.startswith("/"): + return target.lstrip("/") + return posixpath.normpath(posixpath.join(base_dir, target)).lstrip("/") + + +def _rels_part_for(part_name: str) -> str: + parent = Path(part_name).parent.as_posix() + name = Path(part_name).name + return f"{parent}/_rels/{name}.rels" + + +def _slide_part_for_rels(rels_part: str) -> str: + # ppt/slides/_rels/slide1.xml.rels -> ppt/slides/slide1.xml + parent = Path(rels_part).parent.parent.as_posix() + name = Path(rels_part).name[: -len(".rels")] + return f"{parent}/{name}" + + +def _filter_xml_elements( + xml: bytes, pattern: re.Pattern[bytes], should_drop +) -> bytes: + """Remove every element matched by ``pattern`` for which ``should_drop(entry)`` + returns True, leaving the rest of the document byte-for-byte intact.""" + return pattern.sub( + lambda m: b"" if should_drop(m.group(0)) else m.group(0), xml + ) + + +class _PptxPackage: + """Reads a pptx package once and produces minimal per-chunk sub-decks. + + Each chunk keeps every shared part (masters, layouts, themes, notes master, + media, docProps, ...) unchanged and retains only the slides in the chunk plus + the notes slides they reference. presentation.xml, its rels, and + [Content_Types].xml are rewritten so they reference only the kept parts. + """ + + def __init__(self, content: bytes): + self._content = content + self._archive = zipfile.ZipFile(io.BytesIO(content)) + self._names = set(self._archive.namelist()) + + self._presentation_xml = self._archive.read(_PRESENTATION_PART) + self._presentation_rels = self._archive.read(_PRESENTATION_RELS_PART) + self._content_types = self._archive.read(_CONTENT_TYPES_PART) + + self._slide_ids = _slide_id_entries(self._presentation_xml) + # r:id -> slide part name, in slide order + rid_to_slide = _rid_to_target(self._presentation_rels, "/slide") + self._ordered_slides: list[tuple[str, str]] = [] # (r:id, slide part) + for _entry, r_id in self._slide_ids: + target = rid_to_slide.get(r_id) + if target is None: + continue + self._ordered_slides.append( + (r_id, _normalize_target(target, "ppt")) + ) + + # slide part -> notes part it references (if any) + self._slide_to_notes: dict[str, str] = {} + for _r_id, slide_part in self._ordered_slides: + rels_part = _rels_part_for(slide_part) + if rels_part not in self._names: + continue + notes = _rid_to_target(self._archive.read(rels_part), "/notesSlide") + slide_dir = Path(slide_part).parent.as_posix() + for notes_target in notes.values(): + self._slide_to_notes[slide_part] = _normalize_target( + notes_target, slide_dir + ) + break + + @property + def slide_count(self) -> int: + return len(self._ordered_slides) + + def close(self) -> None: + self._archive.close() + + def _build_chunk(self, start: int, end: int) -> bytes: + """Build a sub-deck containing slides ``[start, end)`` (zero-based).""" + chunk_slides = self._ordered_slides[start:end] + kept_rids = {r_id for r_id, _ in chunk_slides} + kept_slide_parts = {part for _, part in chunk_slides} + kept_notes_parts = { + self._slide_to_notes[part] + for part in kept_slide_parts + if part in self._slide_to_notes + } + + def is_dropped(name: str) -> bool: + if _SLIDE_PART_RE.match(name): + return name not in kept_slide_parts + if _SLIDE_RELS_PART_RE.match(name): + return _slide_part_for_rels(name) not in kept_slide_parts + if _NOTES_PART_RE.match(name): + return name not in kept_notes_parts + if _NOTES_RELS_PART_RE.match(name): + return _slide_part_for_rels(name) not in kept_notes_parts + return False + + new_presentation_xml = _filter_xml_elements( + self._presentation_xml, + _SLD_ID_RE, + lambda entry: _attr(entry, "r:id") not in kept_rids, + ) + new_presentation_rels = _filter_xml_elements( + self._presentation_rels, + _RELATIONSHIP_RE, + lambda entry: ( + (_attr(entry, "Type") or "").endswith("/slide") + and _attr(entry, "Id") not in kept_rids + ), + ) + # Some packages declare an for every part (even .rels parts) + # rather than relying on . Drop the override + # for any part we are not writing so nothing dangles. + dropped_part_names = {name for name in self._names if is_dropped(name)} + new_content_types = _filter_xml_elements( + self._content_types, + _OVERRIDE_RE, + lambda entry: (_attr(entry, "PartName") or "").lstrip("/") + in dropped_part_names, + ) + + rewrites = { + _PRESENTATION_PART: new_presentation_xml, + _PRESENTATION_RELS_PART: new_presentation_rels, + _CONTENT_TYPES_PART: new_content_types, + } + + buffer = io.BytesIO() + with zipfile.ZipFile(buffer, "w", zipfile.ZIP_DEFLATED) as out: + for item in self._archive.infolist(): + name = item.filename + if is_dropped(name): + continue + data = rewrites.get(name) + if data is None: + data = self._archive.read(name) + # Build a fresh ZipInfo per entry: writestr mutates the + # passed ZipInfo's header_offset, and reusing the source + # archive's ZipInfo objects would corrupt its central + # directory for the next chunk. + zinfo = zipfile.ZipInfo(filename=name, date_time=item.date_time) + zinfo.compress_type = item.compress_type + zinfo.external_attr = item.external_attr + out.writestr(zinfo, data) + buffer.seek(0) + return buffer.getvalue() + + def iter_chunks( + self, split_size: int, page_start: int, page_end: Optional[int] + ) -> Generator[Tuple[bytes, int], None, None]: + """Yield ``(chunk_bytes, zero_based_slide_offset)`` for each sub-deck. + + ``page_start``/``page_end`` are 1-based, inclusive slide numbers matching + the PDF page-range semantics. + """ + offset = page_start - 1 + offset_end = page_end if page_end else self.slide_count + while offset < offset_end: + end = min(offset + split_size, offset_end) + yield self._build_chunk(offset, end), offset + offset += split_size + + +def get_pptx_chunks_in_memory( + pptx_content: bytes, + split_size: int = 1, + page_start: int = 1, + page_end: Optional[int] = None, +) -> list[Tuple[BinaryIO, int]]: + """Split a pptx into in-memory sub-deck buffers, mirroring + ``SplitPdfHook._get_pdf_chunks_in_memory``. + + Returns a list of ``(BytesIO, zero_based_slide_offset)`` tuples. + """ + package = _PptxPackage(pptx_content) + try: + return [ + (io.BytesIO(chunk_bytes), offset) + for chunk_bytes, offset in package.iter_chunks( + split_size, page_start, page_end + ) + ] + finally: + package.close() + + +def get_pptx_chunk_paths( + pptx_content: bytes, + cache_tmp_data_dir: str, + split_size: int = 1, + page_start: int = 1, + page_end: Optional[int] = None, +) -> Tuple[tempfile.TemporaryDirectory, list[Tuple[Path, int]]]: + """Split a pptx into sub-deck files on disk, mirroring + ``SplitPdfHook._get_pdf_chunk_paths``. + + Returns the owning ``TemporaryDirectory`` (so the caller can track it for + cleanup) and a list of ``(Path, zero_based_slide_offset)`` tuples. + """ + tempdir = tempfile.TemporaryDirectory( # pylint: disable=consider-using-with + dir=cache_tmp_data_dir, prefix="unstructured_client_" + ) + tempdir_path = Path(tempdir.name) + + package = _PptxPackage(pptx_content) + chunk_paths: list[Tuple[Path, int]] = [] + try: + for chunk_no, (chunk_bytes, offset) in enumerate( + package.iter_chunks(split_size, page_start, page_end), start=1 + ): + chunk_path = tempdir_path / f"chunk_{chunk_no}.pptx" + chunk_path.write_bytes(chunk_bytes) + chunk_paths.append((chunk_path, offset)) + finally: + package.close() + + return tempdir, chunk_paths diff --git a/src/unstructured_client/_hooks/custom/request_utils.py b/src/unstructured_client/_hooks/custom/request_utils.py index bfc9cb0f..993b6dbc 100644 --- a/src/unstructured_client/_hooks/custom/request_utils.py +++ b/src/unstructured_client/_hooks/custom/request_utils.py @@ -136,39 +136,63 @@ def create_pdf_chunk_request( pdf_chunk: Tuple[BinaryIO, int], original_request: httpx.Request, filename: str, +) -> httpx.Request: + """Creates a new PDF-chunk request for the partition API. + + Thin wrapper over `create_document_chunk_request` that sends the chunk as a + PDF payload. + """ + return create_document_chunk_request( + form_data=form_data, + document_chunk=pdf_chunk, + original_request=original_request, + filename=filename, + content_type="application/pdf", + ) + + +def create_document_chunk_request( + form_data: FormData, + document_chunk: Tuple[BinaryIO, int], + original_request: httpx.Request, + filename: str, + content_type: str, ) -> httpx.Request: """Creates a new request object with the updated payload for the partition API. Args: form_data: The form data. - pdf_chunk: Tuple of pdf chunk contents (can be both io.BytesIO or + document_chunk: Tuple of chunk contents (can be both io.BytesIO or a file object created with e.g. open()) and the page number. original_request: The original request. - filename: The filename. + filename: The filename. The original filename (and therefore its + extension) is preserved so the server detects the chunk's file type. + content_type: The MIME type to declare for the chunk file (e.g. + "application/pdf" for PDF chunks). Returns: The updated request object. """ - pdf_chunk_file, page_number = pdf_chunk + chunk_file, page_number = document_chunk data = create_pdf_chunk_request_params(form_data, page_number) original_headers = prepare_request_headers(original_request.headers) - pdf_chunk_content: BinaryIO | bytes = ( - pdf_chunk_file.getvalue() - if isinstance(pdf_chunk_file, io.BytesIO) - else pdf_chunk_file + chunk_content: BinaryIO | bytes = ( + chunk_file.getvalue() + if isinstance(chunk_file, io.BytesIO) + else chunk_file ) - pdf_chunk_partition_params = shared.PartitionParameters( + chunk_partition_params = shared.PartitionParameters( files=shared.Files( - content=pdf_chunk_content, + content=chunk_content, file_name=filename, - content_type="application/pdf", + content_type=content_type, ), **data, ) serialized_body = serialize_request_body( - pdf_chunk_partition_params, + chunk_partition_params, False, False, "multipart", diff --git a/src/unstructured_client/_hooks/custom/split_pdf_hook.py b/src/unstructured_client/_hooks/custom/split_pdf_hook.py index 912da332..7a7490b8 100644 --- a/src/unstructured_client/_hooks/custom/split_pdf_hook.py +++ b/src/unstructured_client/_hooks/custom/split_pdf_hook.py @@ -23,7 +23,7 @@ from pypdf import PdfReader, PdfWriter import pypdfium2 as pdfium # type: ignore[import-untyped] -from unstructured_client._hooks.custom import form_utils, pdf_utils, request_utils +from unstructured_client._hooks.custom import form_utils, pdf_utils, pptx_utils, request_utils from unstructured_client._hooks.custom.common import UNSTRUCTURED_CLIENT_LOGGER_NAME from unstructured_client._hooks.custom.form_utils import ( PARTITION_FORM_CONCURRENCY_LEVEL_KEY, @@ -720,21 +720,43 @@ def _before_request_unlocked( if split_pdf_page is None or split_pdf_page == "false": return request - pdf_file_meta = form_data.get(PARTITION_FORM_FILES_KEY) + file_meta = form_data.get(PARTITION_FORM_FILES_KEY) if ( - pdf_file_meta is None or not all(metadata in pdf_file_meta for metadata in + file_meta is None or not all(metadata in file_meta for metadata in ["filename", "content_type", "file"]) ): return request - pdf_file = pdf_file_meta.get("file") - if pdf_file is None: + input_file = file_meta.get("file") + if input_file is None: return request - pdf = pdf_utils.read_pdf(pdf_file) - if pdf is None: - return request - - pdf = pdf_utils.check_pdf(pdf) + # Determine the document type. PDFs split for any strategy; PPTX decks + # only split for hi_res (where per-slide OCR makes the round trips worth + # it). Anything else continues unsplit. + pdf = pdf_utils.read_pdf(input_file) + pptx_bytes: Optional[bytes] = None + if pdf is not None: + pdf = pdf_utils.check_pdf(pdf) + file_type = "pdf" + num_pages = pdf.get_num_pages() + else: + pptx_bytes = self._read_input_file_bytes(input_file) + if ( + form_data.get("strategy") != HI_RES_STRATEGY + or not pptx_utils.is_pptx(pptx_bytes) + ): + return request + file_type = "pptx" + try: + num_pages = pptx_utils.get_pptx_slide_count(pptx_bytes) + except Exception: # pylint: disable=broad-except + # Never let a splitting problem fail the request: if the deck + # can't be read, send the whole document unsplit. + logger.warning( + "split_pptx event=slide_count_failed reason=fallback_whole_document", + exc_info=True, + ) + return request starting_page_number = form_utils.get_starting_page_number( form_data, @@ -770,7 +792,7 @@ def _before_request_unlocked( page_range_start, page_range_end = form_utils.get_page_range( form_data, key=PARTITION_FORM_PAGE_RANGE_KEY.replace("[]", ""), - max_pages=pdf.get_num_pages(), + max_pages=num_pages, ) page_count = page_range_end - page_range_start + 1 @@ -781,7 +803,7 @@ def _before_request_unlocked( # If the doc is small enough, and we aren't slicing it with a page range: # do not split, just continue with the original request - if split_size >= page_count and page_count == len(pdf.pages): + if split_size >= page_count and page_count == num_pages: return request if operation_id in self.coroutines_to_execute: @@ -803,45 +825,78 @@ def _before_request_unlocked( ) try: - pdf = self._trim_large_pages(pdf, form_data) - - pdf.stream.seek(0) - pdf_bytes = pdf.stream.read() - temp_dir_path = None - pdf_chunks: Iterable[Tuple[BinaryIO, int]] - if cache_tmp_data_feature: - pdf_chunk_paths = self._get_pdf_chunk_paths( - pdf_bytes, - operation_id=operation_id, - cache_tmp_data_dir=cache_tmp_data_dir, - split_size=split_size, - page_start=page_range_start, - page_end=page_range_end - ) - temp_dir = self.tempdirs.get(operation_id) - temp_dir_path = temp_dir.name if temp_dir is not None else None - # force free PDF object memory - del pdf - pdf_chunks = self._get_pdf_chunk_files(pdf_chunk_paths) + doc_chunks: Iterable[Tuple[BinaryIO, int]] + if file_type == "pptx": + # PPTX splitting uses stdlib zip/xml, so it doesn't need the + # pypdfium memory dance below. + pptx_content = cast(bytes, pptx_bytes) + if cache_tmp_data_feature: + tempdir, pptx_chunk_paths = pptx_utils.get_pptx_chunk_paths( + pptx_content, + cache_tmp_data_dir=cache_tmp_data_dir, + split_size=split_size, + page_start=page_range_start, + page_end=page_range_end, + ) + self.tempdirs[operation_id] = tempdir + temp_dir_path = tempdir.name + doc_chunks = self._get_pdf_chunk_files(pptx_chunk_paths) + else: + doc_chunks = pptx_utils.get_pptx_chunks_in_memory( + pptx_content, + split_size=split_size, + page_start=page_range_start, + page_end=page_range_end, + ) else: - pdf_chunks = self._get_pdf_chunks_in_memory( - pdf_bytes, - split_size=split_size, - page_start=page_range_start, - page_end=page_range_end - ) + assert pdf is not None # narrowed by file_type == "pdf" + pdf = self._trim_large_pages(pdf, form_data) + + pdf.stream.seek(0) + pdf_bytes = pdf.stream.read() + + if cache_tmp_data_feature: + pdf_chunk_paths = self._get_pdf_chunk_paths( + pdf_bytes, + operation_id=operation_id, + cache_tmp_data_dir=cache_tmp_data_dir, + split_size=split_size, + page_start=page_range_start, + page_end=page_range_end + ) + temp_dir = self.tempdirs.get(operation_id) + temp_dir_path = temp_dir.name if temp_dir is not None else None + # force free PDF object memory + del pdf + doc_chunks = self._get_pdf_chunk_files(pdf_chunk_paths) + else: + doc_chunks = self._get_pdf_chunks_in_memory( + pdf_bytes, + split_size=split_size, + page_start=page_range_start, + page_end=page_range_end + ) self.coroutines_to_execute[operation_id] = [] - for pdf_chunk_file, page_index in pdf_chunks: + for chunk_file, page_index in doc_chunks: chunk_index = len(self.coroutines_to_execute[operation_id]) + 1 page_number = page_index + starting_page_number - pdf_chunk_request = request_utils.create_pdf_chunk_request( - form_data=form_data, - pdf_chunk=(pdf_chunk_file, page_number), - filename=pdf_file_meta["filename"], - original_request=request, - ) + if file_type == "pptx": + chunk_request = request_utils.create_document_chunk_request( + form_data=form_data, + document_chunk=(chunk_file, page_number), + original_request=request, + filename=file_meta["filename"], + content_type=pptx_utils.PPTX_CONTENT_TYPE, + ) + else: + chunk_request = request_utils.create_pdf_chunk_request( + form_data=form_data, + pdf_chunk=(chunk_file, page_number), + filename=file_meta["filename"], + original_request=request, + ) # using partial as the shared client parameter must be passed in `run_tasks` function # in `after_success`. coroutine = partial( @@ -849,8 +904,8 @@ def _before_request_unlocked( _operation_id=operation_id, chunk_index=chunk_index, page_number=page_number, - pdf_chunk_request=pdf_chunk_request, - pdf_chunk_file=pdf_chunk_file, + pdf_chunk_request=chunk_request, + pdf_chunk_file=chunk_file, retry_config=self.operation_retry_configs.get(operation_id), cache_tmp_data_feature=cache_tmp_data_feature, temp_dir_path=temp_dir_path, @@ -862,7 +917,7 @@ def _before_request_unlocked( logger.info( "split_pdf event=plan_created operation_id=%s filename=%s strategy=%s page_range=%s-%s page_count=%d split_size=%d chunk_count=%d concurrency=%d allow_failed=%s cache_mode=%s timeout_seconds=%s retry_config_mode=%s pool_max_connections=%d pool_max_keepalive=%d pool_keepalive_expiry=%.1fs tls=%s", operation_id, - Path(pdf_file_meta["filename"]).name, + Path(file_meta["filename"]).name, form_data.get("strategy"), page_range_start, page_range_end, @@ -897,6 +952,16 @@ def _before_request_unlocked( ) except Exception: self._clear_operation(operation_id) + if file_type == "pptx": + # PPTX splitting is best-effort: if building the slide-chunks + # fails for any reason, fall back to sending the whole deck + # unsplit rather than failing the partition request. + logger.warning( + "split_pptx event=split_failed operation_id=%s reason=fallback_whole_document", + operation_id, + exc_info=True, + ) + return request raise def before_request( @@ -1049,6 +1114,17 @@ async def call_api_partial( return response + @staticmethod + def _read_input_file_bytes(input_file: Union[BinaryIO, bytes]) -> bytes: + """Return the full bytes of the uploaded file, restoring the read + position so downstream consumers see an unconsumed stream.""" + if isinstance(input_file, bytes): + return input_file + input_file.seek(0) + content = input_file.read() + input_file.seek(0) + return content + def _trim_large_pages(self, pdf: PdfReader, form_data: dict[str, Any]) -> PdfReader: if form_data.get("strategy") != HI_RES_STRATEGY: return pdf