From cb92ff10eb71f03902fb8f59d7ed908df849f418 Mon Sep 17 00:00:00 2001 From: Zac Farrell Date: Sat, 27 Jun 2026 15:11:15 -0700 Subject: [PATCH 1/7] feat(uploads): add transparent presigned direct-to-storage upload --- .openapi-generator-ignore | 4 +- CHANGELOG.md | 13 + README.md | 39 +++ hotdata/__init__.py | 6 +- hotdata/uploads.py | 589 +++++++++++++++++++++++++++++++++ scripts/patch_query_exports.py | 21 +- tests/test_uploads.py | 553 +++++++++++++++++++++++++++++++ 7 files changed, 1213 insertions(+), 12 deletions(-) create mode 100644 hotdata/uploads.py create mode 100644 tests/test_uploads.py diff --git a/.openapi-generator-ignore b/.openapi-generator-ignore index 3f045e6..e2329d5 100644 --- a/.openapi-generator-ignore +++ b/.openapi-generator-ignore @@ -8,11 +8,13 @@ setup.py # never emits or overwrites them, but they are listed here as the source of # truth for "hand-maintained, don't touch": _auth.py (JWT exchange), arrow.py # (Arrow IPC result fetch), query.py (429 retry + truncation auto-follow, #688), -# _retry.py (pre-response connection-reset retry on all methods, #118). +# _retry.py (pre-response connection-reset retry on all methods, #118), +# uploads.py (transparent presigned direct-to-storage upload flow). hotdata/_auth.py hotdata/arrow.py hotdata/query.py hotdata/_retry.py +hotdata/uploads.py # Hand-written test for the patched ApiClient.close()/context-manager behavior # (re-applied by scripts/patch_api_client_close.py). It lives in the generated diff --git a/CHANGELOG.md b/CHANGELOG.md index fd55d6d..9f22722 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,19 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- `hotdata.uploads.UploadsApi` (the default `hotdata.UploadsApi`) gains + `upload_file(path, ...)`, a transparent direct-to-storage upload that + orchestrates the presigned flow end to end: it opens a session + (`POST /v1/uploads`), `PUT`s the bytes **straight to object storage** (a single + `PUT` for a small file, concurrent part `PUT`s for a large one), and finalizes + (`POST /v1/uploads/{id}/finalize`), returning the `FinalizeUploadResponse`. The + bytes never round-trip through the API. Supports a progress callback, an + auto-scaled (or caller-set) part size, bounded concurrency with a peak-memory + budget, and idempotent per-part retry. Storage `PUT`s go through a dedicated, + header-isolated pool so no SDK auth/workspace headers reach object storage. + ### Changed - feat(uploads): add file upload endpoints diff --git a/README.md b/README.md index ada8de8..9063c43 100644 --- a/README.md +++ b/README.md @@ -89,6 +89,45 @@ with ApiClient(Configuration(api_key="...", workspace_id="...")) as client: Both methods accept `offset` and `limit` for pagination. They raise `hotdata.arrow.ResultNotReadyError` if the result is still pending or processing — poll `results.get_result(result_id)` until `status == "ready"` first. +## File uploads + +`hotdata.uploads.UploadsApi` (also the default `hotdata.UploadsApi`) adds +`upload_file`, which uploads a local file **directly to object storage** and +finalizes it in one call. It opens an upload session, `PUT`s the bytes straight +to storage — a single `PUT` for a small file, concurrent part `PUT`s for a large +one — then finalizes. The bytes never round-trip through the API. + +```python +from hotdata import ApiClient, Configuration, UploadsApi + +with ApiClient(Configuration(api_key="...", workspace_id="...")) as client: + uploads = UploadsApi(client) + + finalized = uploads.upload_file( + "data.parquet", + content_type="application/parquet", + progress=lambda done, total: print(f"{done}/{total} bytes"), + ) + + # Pass finalized.upload_id to the managed-table load endpoint. + print(finalized.upload_id) +``` + +The SDK picks single vs. multipart from the file size, auto-scales the part size, +and bounds part concurrency to a peak-memory budget (override with `part_size` / +`max_concurrency`). Storage `PUT`s go through a dedicated, header-isolated +connection pool, so the SDK's auth and workspace headers never reach object +storage (which would otherwise reject the upload). + +It raises `hotdata.uploads.StorageError` if storage rejects a `PUT`, +`MalformedSessionError` if the session response is inconsistent, and the usual +`hotdata.exceptions.ApiException` if opening the session or finalizing fails — +for example a `501` `PRESIGN_UNSUPPORTED`, meaning the backend cannot issue +upload URLs. In that case send the raw bytes to the legacy `POST /v1/files` +endpoint, which the generated client still exposes as +`hotdata.api.uploads_api.UploadsApi.upload_file(body=...)` (the enhanced +`upload_file` above takes a file path and shadows it). + ## API reference Generated Markdown for every operation and model is in [`docs/`](https://github.com/hotdata-dev/sdk-python/tree/main/docs): diff --git a/hotdata/__init__.py b/hotdata/__init__.py index ab65616..b992a24 100644 --- a/hotdata/__init__.py +++ b/hotdata/__init__.py @@ -322,7 +322,9 @@ # --- hand-applied: prefer the enhanced clients over the generated ones # (re-applied by scripts/patch_query_exports.py after regeneration). # hotdata.query.QueryApi adds 429 retry + truncation auto-follow; -# hotdata.arrow.ResultsApi adds Arrow IPC result fetch. The raw generated -# classes remain importable from hotdata.api.query_api / hotdata.api.results_api. +# hotdata.arrow.ResultsApi adds Arrow IPC result fetch; +# hotdata.uploads.UploadsApi adds transparent presigned direct-to-storage +# uploads. The raw generated classes remain importable from hotdata.api.*. from hotdata.query import QueryApi as QueryApi # noqa: E402,F811 from hotdata.arrow import ResultsApi as ResultsApi # noqa: E402,F811 +from hotdata.uploads import UploadsApi as UploadsApi # noqa: E402,F811 diff --git a/hotdata/uploads.py b/hotdata/uploads.py new file mode 100644 index 0000000..4e397ef --- /dev/null +++ b/hotdata/uploads.py @@ -0,0 +1,589 @@ +"""Ergonomic, hand-written direct-to-storage (presigned) file uploads. + +The auto-generated :class:`hotdata.api.uploads_api.UploadsApi` exposes the +presigned-upload flow only as raw building blocks. This module wraps it with a +thin subclass that orchestrates the whole flow transparently, so a caller +uploads a file with a single call: + +1. ``POST /v1/uploads`` (:meth:`create_upload_session_handler`) opens a session + and returns either a single ``url`` (``mode == "single"``) or a set of + ``part_urls`` plus a ``part_size`` (``mode == "multipart"``), along with a + one-time ``finalize_token``. +2. The client ``PUT`` s the bytes **directly to object storage** — never back + through the API. Single uploads stream the whole file to ``url``; multipart + uploads slice the file into ``part_size``-byte chunks and ``PUT`` each chunk + to its ``part_urls[i - 1]``, collecting the storage ``ETag`` per part. +3. ``POST /v1/uploads/{upload_id}/finalize`` (:meth:`finalize_upload_handler`) + confirms the upload with the finalize token in the ``X-Upload-Finalize-Token`` + header (empty object ``{}`` for single; the ascending ``{part_number, e_tag}`` + list for multipart) and returns a + :class:`~hotdata.models.finalize_upload_response.FinalizeUploadResponse`. + +Read ``upload_id`` from that response to load the upload into a managed table. + +Storage PUT header isolation +---------------------------- + +A presigned storage URL already carries its authorization in the query string +(or in the server-provided ``headers`` map). Object stores (S3 and compatible) +reject a ``PUT`` carrying extra signed-ish headers with ``403 +SignatureDoesNotMatch``, so storage ``PUT`` s go through a *dedicated, bare* +:class:`urllib3.PoolManager` that carries NONE of the SDK's bearer / workspace +headers — only an explicit ``Content-Length`` and whatever the server placed in +the session ``headers`` map (currently always empty). The SDK's own +``ApiClient`` (with its auth headers and proxy/TLS config) is deliberately not +reused for storage transfers. + +This ``UploadsApi`` is a drop-in replacement for +:class:`hotdata.api.uploads_api.UploadsApi`; every generated method works +unchanged, and :meth:`UploadsApi.upload_file` adds the orchestrated flow. +""" + +from __future__ import annotations + +import io +import logging +import os +import threading +from concurrent.futures import FIRST_EXCEPTION, ThreadPoolExecutor, wait +from dataclasses import dataclass +from typing import Any, BinaryIO, Callable, Dict, List, Optional, Union + +import urllib3 +from urllib3.util.retry import Retry + +from hotdata.api.uploads_api import UploadsApi as _GeneratedUploadsApi +from hotdata.models.create_upload_request import CreateUploadRequest +from hotdata.models.finalize_upload_part import FinalizeUploadPart +from hotdata.models.finalize_upload_request import FinalizeUploadRequest +from hotdata.models.finalize_upload_response import FinalizeUploadResponse +from hotdata.models.upload_session_response import UploadSessionResponse + +_log = logging.getLogger("hotdata.uploads") + +#: One mebibyte, the unit the storage part-size range is expressed in. +MIB = 1024 * 1024 + +#: Default cap on concurrent part ``PUT`` s when the caller doesn't set +#: ``max_concurrency``. Matches the boto3 / AWS CLI default of 10. The effective +#: in-flight count is the MIN of this and a memory budget (see +#: :func:`effective_in_flight`). +DEFAULT_MAX_CONCURRENCY = 10 + +#: Default part-size hint, in bytes (8 MiB), sent when the caller doesn't set +#: ``part_size``. The server clamps the hint to its own range and returns the +#: actual size. See :func:`auto_part_size_hint`. +DEFAULT_PART_SIZE = 8 * MIB + +#: Target ceiling on part count when auto-scaling the part-size hint for very +#: large files, with headroom under S3's hard 10,000-part limit. +TARGET_MAX_PARTS = 9000 + +#: Minimum part size storage accepts (5 MiB). The hint is clamped to at least +#: this; the server enforces it too. +MIN_PART_SIZE = 5 * MIB + +#: Maximum part size storage accepts (5 GiB). The hint is clamped to at most this. +MAX_PART_SIZE = 5 * 1024 * MIB + +#: Target peak-memory budget for in-flight part buffers (256 MiB). Each in-flight +#: part buffers up to ``part_size`` bytes, so :func:`effective_in_flight` derives +#: the in-flight count as ``budget / part_size``. This is a TARGET, not a hard +#: ceiling: when the server returns a very large ``part_size`` a single in-flight +#: part may already exceed the budget, so it caps *concurrency*, not part size. +UPLOAD_MEMORY_BUDGET = 256 * MIB + +#: Progress callback: ``(bytes_done_total, total)``, where ``total`` is the full +#: declared file size. ``bytes_done_total`` is monotonically non-decreasing and +#: reaches exactly ``total`` when the transfer completes. For a multipart upload +#: it is invoked from worker threads, so a callback that touches shared state +#: must do its own locking. +UploadProgress = Callable[[int, int], None] + + +def auto_part_size_hint(declared_size: int) -> int: + """Compute the part-size HINT to send in ``CreateUploadRequest.part_size`` + when the caller did not specify one. + + Starts from :data:`DEFAULT_PART_SIZE` (8 MiB) and grows only for files large + enough that 8 MiB parts would exceed :data:`TARGET_MAX_PARTS` — so the common + case is unchanged and only very large files (beyond ~72 GiB) get a larger + hint to keep the part count bounded. The result is rounded UP to a whole MiB + and clamped to ``[MIN_PART_SIZE, MAX_PART_SIZE]``. The server still has the + final say and clamps to its own range. Pure and total: + ``declared_size == 0`` yields :data:`DEFAULT_PART_SIZE`. + """ + # Smallest part size that keeps the count at or under the target. + by_count = -(-declared_size // TARGET_MAX_PARTS) # ceil division + raw = max(DEFAULT_PART_SIZE, by_count) + # Round up to a whole MiB so the hint is a clean multiple. + rounded = -(-raw // MIB) * MIB + return max(MIN_PART_SIZE, min(rounded, MAX_PART_SIZE)) + + +def effective_in_flight(max_concurrency: int, part_size: int) -> int: + """Compute how many part ``PUT`` s to keep in flight, given the caller's + ``max_concurrency`` and the SERVER's actual returned ``part_size``. + + Peak buffered memory is ``in_flight * part_size``, so in-flight is capped at + ``UPLOAD_MEMORY_BUDGET / part_size``, then at ``max_concurrency``. Normal + 8 MiB parts give ``256/8 = 32``, capped to ``max_concurrency``; a 64 MiB part + gives ``4``. ``max_concurrency`` is honored as an explicit floor: a caller + asking for ``1`` (or ``0``) gets serial uploads (``1``). The result is always + ``>= 1``. Pure and total: a zero ``part_size`` is treated as 1. + """ + cap = max(max_concurrency, 1) + by_budget = max(UPLOAD_MEMORY_BUDGET // max(part_size, 1), 1) + return min(by_budget, cap) + + +@dataclass +class UploadOptions: + """Options for :meth:`UploadsApi.upload_file`. + + All fields are optional. ``content_type`` / ``content_encoding`` / + ``filename`` are recorded with the upload (advisory metadata; they do not + change where the bytes are stored). ``part_size`` is a hint the server clamps + to its allowed range and ignores for single-``PUT`` uploads. ``progress``, + when set, is invoked as bytes flow. + """ + + #: Content type to record for the uploaded file (e.g. a Parquet/CSV/JSON MIME + #: type). Advisory. + content_type: Optional[str] = None + #: Content encoding to record for the uploaded file (e.g. ``gzip``). + content_encoding: Optional[str] = None + #: Original file name, recorded for bookkeeping. Defaults to the source + #: path's file name when not set. + filename: Optional[str] = None + #: Preferred part size, in bytes, for a large (multipart) upload. A hint the + #: server clamps; ignored for single-``PUT`` uploads. When unset, the SDK + #: auto-scales a hint via :func:`auto_part_size_hint`. + part_size: Optional[int] = None + #: Maximum number of part ``PUT`` s to keep in flight for a multipart upload. + #: ``None`` uses :data:`DEFAULT_MAX_CONCURRENCY`. The effective in-flight + #: count is the MIN of this and a peak-memory budget (see + #: :func:`effective_in_flight`). + max_concurrency: Optional[int] = None + #: Optional progress callback invoked with ``(bytes_done_total, total)``. + progress: Optional[UploadProgress] = None + + +# --- Errors --------------------------------------------------------------- + + +class UploadError(Exception): + """Base class for errors raised by :meth:`UploadsApi.upload_file` while + driving the direct-to-storage flow. + + Opening the session or finalizing fails with the generated client's + :class:`~hotdata.exceptions.ApiException` instead (e.g. a ``501`` + ``PRESIGN_UNSUPPORTED`` when the storage backend cannot issue upload URLs); + only the storage transfer and session-consistency failures raise + ``UploadError`` subclasses. + """ + + +class MalformedSessionError(UploadError): + """The create-session response was internally inconsistent for its declared + ``mode`` (e.g. ``single`` without a ``url``, ``multipart`` without + ``part_urls`` / ``part_size``, or a ``part_urls`` count that does not match + how the file slices into parts). + """ + + +class StorageError(UploadError): + """A storage ``PUT`` returned a non-2xx status. + + Carries the HTTP ``status`` and the response ``body`` (often XML for S3-style + errors), plus the 1-based ``part_number`` for a multipart ``PUT`` (``None`` + for the single-``PUT`` path). + """ + + def __init__(self, *, status: int, part_number: Optional[int], body: str) -> None: + self.status = status + self.part_number = part_number + self.body = body + if part_number is None: + msg = f"storage rejected the upload with status {status}: {body}" + else: + msg = f"storage rejected part {part_number} with status {status}: {body}" + super().__init__(msg) + + +class MissingETagError(UploadError): + """Storage accepted a part ``PUT`` but returned no ``ETag`` header, so the + part cannot be finalized. + """ + + def __init__(self, *, part_number: int) -> None: + self.part_number = part_number + super().__init__( + f"storage returned no ETag for part {part_number}; cannot finalize" + ) + + +# --- Storage transport ---------------------------------------------------- + +# A dedicated, process-wide, *bare* pool for storage PUTs. Deliberately NOT the +# SDK's ApiClient pool: a host app may have installed auth / workspace headers +# there, which storage would reject. Built lazily and reused. No request +# timeout: a large upload legitimately takes minutes. +_STORAGE_POOL: Optional[urllib3.PoolManager] = None +_STORAGE_POOL_LOCK = threading.Lock() + + +def _storage_pool() -> urllib3.PoolManager: + global _STORAGE_POOL + if _STORAGE_POOL is None: + with _STORAGE_POOL_LOCK: + if _STORAGE_POOL is None: + _STORAGE_POOL = urllib3.PoolManager() + return _STORAGE_POOL + + +def _part_retry() -> Retry: + """Retry policy for multipart part ``PUT`` s. + + A part ``PUT`` is idempotent — storage overwrites a part by number — so a + retried part cannot corrupt the upload. Retry connection errors and the + transient 429/5xx statuses storage may return under load. The single-``PUT`` + path streams an un-replayable body and so is sent with ``retries=False``. + """ + return Retry( + total=3, + backoff_factor=0.2, + status_forcelist=(429, 500, 502, 503, 504), + allowed_methods=frozenset({"PUT"}), + raise_on_status=False, + ) + + +def _storage_headers(session_headers: Dict[str, str], content_length: int) -> Dict[str, str]: + """Build the bare PUT headers: an explicit ``Content-Length`` plus the + server-provided session headers replayed verbatim (currently always empty; + the only place a ``Content-Type`` may legitimately be set). + """ + headers = {"Content-Length": str(content_length)} + headers.update(session_headers) + return headers + + +def _check_storage_status(response: Any, part_number: Optional[int]) -> None: + status = int(response.status) + if status >= 400: + data = getattr(response, "data", b"") or b"" + if isinstance(data, (bytes, bytearray)): + body = bytes(data).decode("utf-8", errors="replace") + else: + body = str(data) + raise StorageError(status=status, part_number=part_number, body=body) + + +class _ProgressReader(io.RawIOBase): + """Wrap a binary file object so each read advances a byte counter and + invokes the progress callback. Used for the single-``PUT`` path so progress + is byte-granular rather than jumping 0% -> 100%. Monotonic non-decreasing; + the running total never exceeds ``total``. + + A :class:`io.RawIOBase` so it satisfies the file-like body type that + ``urllib3`` ``PUT`` s; ``http.client`` reads it in blocks via + :meth:`readinto`, which is where progress is reported. + """ + + def __init__( + self, + fileobj: BinaryIO, + total: int, + progress: Optional[UploadProgress], + ) -> None: + super().__init__() + self._file = fileobj + self._total = total + self._progress = progress + self._done = 0 + + def readable(self) -> bool: + return True + + def readinto(self, b: Any) -> int: + chunk = self._file.read(len(b)) + if not chunk: + return 0 + n = len(chunk) + b[:n] = chunk + self._done = min(self._done + n, self._total) + if self._progress is not None: + self._progress(self._done, self._total) + return n + + +class UploadsApi(_GeneratedUploadsApi): + """Drop-in replacement for :class:`hotdata.api.uploads_api.UploadsApi` that + adds :meth:`upload_file`, an orchestrated direct-to-storage upload. All + base-class methods work unchanged. + """ + + def upload_file( # type: ignore[override] + self, + path: Union[str, "os.PathLike[str]"], + *, + content_type: Optional[str] = None, + content_encoding: Optional[str] = None, + filename: Optional[str] = None, + part_size: Optional[int] = None, + max_concurrency: Optional[int] = None, + progress: Optional[UploadProgress] = None, + options: Optional[UploadOptions] = None, + _request_timeout: Any = None, + ) -> FinalizeUploadResponse: + """Upload a local file directly to object storage and finalize it. + + Opens an upload session, ``PUT`` s the bytes straight to storage (one + ``PUT`` for a small file, concurrent part ``PUT`` s for a large one), + then finalizes and returns the + :class:`~hotdata.models.finalize_upload_response.FinalizeUploadResponse`. + Read ``upload_id`` from it to load the upload into a managed table. The + bytes never round-trip through the API. + + Per-call keyword arguments override the matching :class:`UploadOptions` + field when both are given. + + :param path: Path to the local file to upload. + :param content_type: Content type to record (advisory). + :param content_encoding: Content encoding to record (advisory). + :param filename: Original file name to record; defaults to the path's + base name. + :param part_size: Preferred multipart part-size hint, in bytes; the + server clamps it and ignores it for single-``PUT`` uploads. + :param max_concurrency: Max in-flight part ``PUT`` s for a multipart + upload (default :data:`DEFAULT_MAX_CONCURRENCY`, further bounded by a + peak-memory budget). + :param progress: Callback invoked with ``(bytes_done_total, total)``. + :param options: An :class:`UploadOptions` bundle; individual keyword + arguments take precedence over its fields. + :raises MalformedSessionError: the session response was inconsistent. + :raises StorageError: a storage ``PUT`` returned a non-2xx status. + :raises MissingETagError: a part ``PUT`` returned no ``ETag``. + :raises hotdata.exceptions.ApiException: opening the session or + finalizing failed (e.g. ``501`` ``PRESIGN_UNSUPPORTED`` — the storage + backend cannot issue upload URLs; use ``POST /v1/files`` instead). + """ + opts = options or UploadOptions() + content_type = content_type if content_type is not None else opts.content_type + content_encoding = ( + content_encoding if content_encoding is not None else opts.content_encoding + ) + filename = filename if filename is not None else opts.filename + part_size = part_size if part_size is not None else opts.part_size + max_concurrency = ( + max_concurrency if max_concurrency is not None else opts.max_concurrency + ) + progress = progress if progress is not None else opts.progress + + fspath = os.fspath(path) + total = os.stat(fspath).st_size + if filename is None: + filename = os.path.basename(fspath) + + # Part-size hint: honor an explicit value, else auto-scale from the + # declared size. The server clamps it regardless. + part_size_hint = part_size if part_size is not None else auto_part_size_hint(total) + + session = self.create_upload_session_handler( + CreateUploadRequest( + content_type=content_type, + content_encoding=content_encoding, + filename=filename, + declared_size_bytes=total, + part_size=part_size_hint, + ), + _request_timeout=_request_timeout, + ) + + # Report initial progress so a 0-byte file (or an instant single PUT) + # still emits a terminal tick. + if progress is not None: + progress(0, total) + + if session.mode == "single": + self._upload_single(session, fspath, total, progress) + parts: Optional[List[FinalizeUploadPart]] = None + elif session.mode == "multipart": + cap = max_concurrency if max_concurrency is not None else DEFAULT_MAX_CONCURRENCY + parts = self._upload_multipart(session, fspath, total, cap, progress) + else: + raise MalformedSessionError(f"unknown upload mode {session.mode!r}") + + # Finalize: single sends an empty object `{}`; multipart sends the + # ascending parts list. The token rides the X-Upload-Finalize-Token + # header. + # + # The body MUST be a JSON object, never `{"parts": null}`: the server + # rejects a null `parts`. Passing `parts=None` explicitly would put the + # field in pydantic's `model_fields_set`, serializing to `{"parts": + # null}`; constructing WITHOUT the field leaves it unset so it drops out + # and the body is `{}`. So build the single-mode body field-free. + # + # Finalize is exactly-once on the server, but it is safe under the SDK's + # default retry: that policy only retries a POST when the connection was + # reset *before* the request reached the server (so the server did no + # work) — it never replays a finalize the server actually processed. + if parts is None: + finalize_body = FinalizeUploadRequest() + else: + finalize_body = FinalizeUploadRequest(parts=parts) + return self.finalize_upload_handler( + session.upload_id, + session.finalize_token, + finalize_body, + _request_timeout=_request_timeout, + ) + + # -- internals --------------------------------------------------------- + + def _upload_single( + self, + session: UploadSessionResponse, + path: str, + total: int, + progress: Optional[UploadProgress], + ) -> None: + """Single-``PUT`` path: stream the whole file to ``session.url``, + invoking the progress callback incrementally as chunks are sent. + + A streaming body cannot be replayed, so this ``PUT`` is sent once with no + retry — an intentional trade for smooth progress on the large, common + single-``PUT`` path. + """ + url = session.url + if not url: + raise MalformedSessionError("single upload missing `url`") + + with open(path, "rb") as fileobj: + body = _ProgressReader(fileobj, total, progress) + response = _storage_pool().request( + "PUT", + url, + body=body, + headers=_storage_headers(session.headers, total), + retries=False, + ) + _check_storage_status(response, part_number=None) + + # Guarantee a terminal tick at exactly `total`, even if the last chunk + # boundary or an empty file left the counter short. + if progress is not None: + progress(total, total) + + def _upload_multipart( + self, + session: UploadSessionResponse, + path: str, + total: int, + max_concurrency: int, + progress: Optional[UploadProgress], + ) -> List[FinalizeUploadPart]: + """Multipart path: slice the file into ``part_size``-byte chunks (the + last is the remainder), ``PUT`` each chunk to its ``part_urls[i - 1]`` + with bounded concurrency, and collect ``(part_number, e_tag)`` per part. + + Returns the parts sorted ascending by part number, ready for finalize. + """ + part_urls = session.part_urls + part_size = session.part_size + if not part_urls: + raise MalformedSessionError("multipart upload missing `part_urls`") + if part_size is None or part_size <= 0: + raise MalformedSessionError( + f"multipart upload has invalid `part_size` {part_size!r}" + ) + + # The URL count must match the number of `part_size`-byte chunks the file + # splits into (last is the remainder). A mismatch means a session + # inconsistent with our declared size, so fail loudly. + expected_parts = max(-(-total // part_size), 1) # ceil, at least 1 + if len(part_urls) != expected_parts: + raise MalformedSessionError( + f"multipart upload returned {len(part_urls)} part URLs but the " + f"file ({total} bytes) splits into {expected_parts} parts of " + f"{part_size} bytes" + ) + + in_flight = effective_in_flight(max_concurrency, part_size) + retry = _part_retry() + done = [0] # boxed running byte total; guarded by `lock` + lock = threading.Lock() + + def upload_part(index: int) -> FinalizeUploadPart: + part_number = index + 1 + offset = index * part_size + length = min(part_size, total - offset) + chunk = _read_range(path, offset, length) + response = _storage_pool().request( + "PUT", + part_urls[index], + body=chunk, + headers=_storage_headers(session.headers, length), + retries=retry, + ) + _check_storage_status(response, part_number=part_number) + etag = response.headers.get("ETag") + if not etag: + raise MissingETagError(part_number=part_number) + if progress is not None: + with lock: + done[0] = min(done[0] + length, total) + current = done[0] + progress(current, total) + return FinalizeUploadPart(e_tag=etag, part_number=part_number) + + results: List[Optional[FinalizeUploadPart]] = [None] * len(part_urls) + with ThreadPoolExecutor(max_workers=in_flight) as executor: + futures = { + executor.submit(upload_part, i): i for i in range(len(part_urls)) + } + done_set, _ = wait(futures, return_when=FIRST_EXCEPTION) + # Surface the first failure (and let the context manager join the + # rest). A part PUT is idempotent, so an in-flight straggler that + # also fails or succeeds does no harm. + for future in done_set: + exc = future.exception() + if exc is not None: + raise exc + for future, index in futures.items(): + results[index] = future.result() + + # results is indexed by 0-based part position, so it is already ascending + # by part_number with no duplicates. + return [part for part in results if part is not None] + + +def _read_range(path: str, offset: int, length: int) -> bytes: + """Read exactly ``length`` bytes starting at ``offset``. Each multipart part + task opens its own handle so tasks never share a cursor and a retry re-reads + the same range cleanly. + """ + with open(path, "rb") as fileobj: + fileobj.seek(offset) + return fileobj.read(length) + + +__all__ = [ + "DEFAULT_MAX_CONCURRENCY", + "DEFAULT_PART_SIZE", + "MAX_PART_SIZE", + "MIB", + "MIN_PART_SIZE", + "TARGET_MAX_PARTS", + "UPLOAD_MEMORY_BUDGET", + "MalformedSessionError", + "MissingETagError", + "StorageError", + "UploadError", + "UploadOptions", + "UploadProgress", + "UploadsApi", + "auto_part_size_hint", + "effective_in_flight", +] diff --git a/scripts/patch_query_exports.py b/scripts/patch_query_exports.py index 22c534b..1335e13 100644 --- a/scripts/patch_query_exports.py +++ b/scripts/patch_query_exports.py @@ -1,13 +1,14 @@ #!/usr/bin/env python3 """Re-apply enhanced default client exports after OpenAPI regeneration. -The generated ``hotdata/__init__.py`` binds the top-level ``QueryApi`` and -``ResultsApi`` to the raw generated classes. We want ``from hotdata import -QueryApi`` / ``ResultsApi`` to resolve to the hand-written enhanced clients -instead — ``hotdata.query.QueryApi`` (429 retry + truncation auto-follow) and -``hotdata.arrow.ResultsApi`` (Arrow IPC fetch) — so the obvious happy path gets -the safe behavior the #640/#688 query contract needs, rather than the bare -client. The raw generated classes stay importable from ``hotdata.api.*``. +The generated ``hotdata/__init__.py`` binds the top-level ``QueryApi``, +``ResultsApi`` and ``UploadsApi`` to the raw generated classes. We want ``from +hotdata import QueryApi`` / ``ResultsApi`` / ``UploadsApi`` to resolve to the +hand-written enhanced clients instead — ``hotdata.query.QueryApi`` (429 retry + +truncation auto-follow), ``hotdata.arrow.ResultsApi`` (Arrow IPC fetch) and +``hotdata.uploads.UploadsApi`` (transparent presigned direct-to-storage upload) +— so the obvious happy path gets the safe, ergonomic behavior rather than the +bare client. The raw generated classes stay importable from ``hotdata.api.*``. ``hotdata/__init__.py`` is regenerated, so this override is appended (after all generated imports, so every name it references is defined) and re-applied by @@ -28,10 +29,12 @@ "# --- hand-applied: prefer the enhanced clients over the generated ones\n" "# (re-applied by scripts/patch_query_exports.py after regeneration).\n" "# hotdata.query.QueryApi adds 429 retry + truncation auto-follow;\n" - "# hotdata.arrow.ResultsApi adds Arrow IPC result fetch. The raw generated\n" - "# classes remain importable from hotdata.api.query_api / hotdata.api.results_api.\n" + "# hotdata.arrow.ResultsApi adds Arrow IPC result fetch;\n" + "# hotdata.uploads.UploadsApi adds transparent presigned direct-to-storage\n" + "# uploads. The raw generated classes remain importable from hotdata.api.*.\n" "from hotdata.query import QueryApi as QueryApi # noqa: E402,F811\n" "from hotdata.arrow import ResultsApi as ResultsApi # noqa: E402,F811\n" + "from hotdata.uploads import UploadsApi as UploadsApi # noqa: E402,F811\n" ) diff --git a/tests/test_uploads.py b/tests/test_uploads.py new file mode 100644 index 0000000..e92ef35 --- /dev/null +++ b/tests/test_uploads.py @@ -0,0 +1,553 @@ +"""Unit tests for hotdata.uploads. + +The ergonomic upload flow has two collaborators: the generated session +create/finalize ops (which go through the SDK ApiClient) and the direct-to- +storage ``PUT`` s (which go through a separate bare urllib3 pool). These tests +stub both seams so no server or object store is needed: + +* the generated ``create_upload_session_handler`` / ``finalize_upload_handler`` + are patched to return canned models and capture the finalize body; +* ``hotdata.uploads._STORAGE_POOL`` is replaced with a fake pool that records + every ``PUT`` (url, body, headers) and returns a canned status + ETag. + +They verify the single and multipart paths end to end: correct bytes per part, +ascending part list with ETags, header isolation on storage PUTs, the finalize +body shape, progress reporting, and the error paths. +""" + +from __future__ import annotations + +import threading +from typing import Any, Dict, List, Optional, Tuple + +import pytest + +import hotdata.uploads as uploads_mod +from hotdata import UploadsApi as ExportedUploadsApi +from hotdata.uploads import ( + DEFAULT_PART_SIZE, + MAX_PART_SIZE, + MIB, + MIN_PART_SIZE, + TARGET_MAX_PARTS, + UPLOAD_MEMORY_BUDGET, + MalformedSessionError, + MissingETagError, + StorageError, + UploadOptions, + UploadsApi, + auto_part_size_hint, + effective_in_flight, +) +from hotdata.models.finalize_upload_response import FinalizeUploadResponse +from hotdata.models.upload_session_response import UploadSessionResponse + + +# --- Fakes ---------------------------------------------------------------- + + +class _FakeHeaders: + """Case-insensitive header map, like urllib3's HTTPHeaderDict.""" + + def __init__(self, data: Dict[str, str]): + self._data = {k.lower(): v for k, v in data.items()} + + def get(self, key: str, default: Any = None) -> Any: + return self._data.get(key.lower(), default) + + +class _FakeResponse: + def __init__(self, status: int, *, etag: Optional[str] = None, body: bytes = b""): + self.status = status + headers: Dict[str, str] = {} + if etag is not None: + headers["ETag"] = etag + self.headers = _FakeHeaders(headers) + self.data = body + + +class _FakeStoragePool: + """Stand-in for the bare storage urllib3.PoolManager. + + Records each PUT and returns a per-URL canned response. ``body`` may be raw + bytes (multipart) or a file-like progress reader (single PUT) — file-like + bodies are drained so the single-PUT progress callback fires. + """ + + def __init__(self) -> None: + self.calls: List[Dict[str, Any]] = [] + # url -> _FakeResponse, or a default for any url. + self.responses: Dict[str, _FakeResponse] = {} + self.default_response = _FakeResponse(200, etag='"default-etag"') + self.lock = threading.Lock() + + def request( + self, + method: str, + url: str, + *, + body: Any = None, + headers: Optional[Dict[str, str]] = None, + retries: Any = None, + **kwargs: Any, + ) -> _FakeResponse: + # Drain a file-like body (single PUT) so its progress reader runs; keep + # raw bytes verbatim (multipart parts). + if hasattr(body, "read"): + collected = b"" + while True: + chunk = body.read(64 * 1024) + if not chunk: + break + collected += chunk + recorded_body = collected + else: + recorded_body = body + with self.lock: + self.calls.append( + { + "method": method, + "url": url, + "body": recorded_body, + "headers": dict(headers or {}), + "retries": retries, + } + ) + return self.responses.get(url, self.default_response) + + +@pytest.fixture +def fake_pool(monkeypatch: pytest.MonkeyPatch) -> _FakeStoragePool: + pool = _FakeStoragePool() + monkeypatch.setattr(uploads_mod, "_STORAGE_POOL", pool) + return pool + + +def _patch_session( + monkeypatch: pytest.MonkeyPatch, + api: UploadsApi, + session: UploadSessionResponse, + finalize_capture: List[Tuple[Any, ...]], +) -> None: + """Stub the generated create/finalize ops on the instance.""" + + def fake_create(create_upload_request: Any, **kwargs: Any) -> UploadSessionResponse: + finalize_capture.append(("create", create_upload_request)) + return session + + def fake_finalize( + upload_id: str, + x_upload_finalize_token: str, + finalize_upload_request: Any = None, + **kwargs: Any, + ) -> FinalizeUploadResponse: + finalize_capture.append( + ("finalize", upload_id, x_upload_finalize_token, finalize_upload_request) + ) + from datetime import datetime, timezone + + return FinalizeUploadResponse( + content_type=None, + created_at=datetime(2026, 1, 1, tzinfo=timezone.utc), + size_bytes=0, + status="ready", + upload_id=upload_id, + ) + + monkeypatch.setattr(api, "create_upload_session_handler", fake_create) + monkeypatch.setattr(api, "finalize_upload_handler", fake_finalize) + + +def _make_api() -> UploadsApi: + from hotdata import ApiClient, Configuration + + config = Configuration( + host="https://api.hotdata.test", + api_key="test-key", + workspace_id="ws_test", + ) + return UploadsApi(ApiClient(config)) + + +# --- Pure helpers (ported from the Rust unit tests) ----------------------- + + +def test_auto_part_size_keeps_8mib_for_normal_files() -> None: + assert auto_part_size_hint(0) == DEFAULT_PART_SIZE + assert auto_part_size_hint(1) == DEFAULT_PART_SIZE + assert auto_part_size_hint(100 * MIB) == DEFAULT_PART_SIZE + assert auto_part_size_hint(1024 * MIB) == DEFAULT_PART_SIZE + assert auto_part_size_hint(DEFAULT_PART_SIZE * TARGET_MAX_PARTS) == DEFAULT_PART_SIZE + + +def test_auto_part_size_scales_up_for_very_large_files() -> None: + big = 200 * 1024 * MIB # 200 GiB + hint = auto_part_size_hint(big) + assert hint > DEFAULT_PART_SIZE + assert hint % MIB == 0 + assert -(-big // hint) <= TARGET_MAX_PARTS + assert MIN_PART_SIZE <= hint <= MAX_PART_SIZE + + +def test_auto_part_size_clamps_to_max_for_enormous_files() -> None: + assert auto_part_size_hint(100 * 1024 * 1024 * MIB) == MAX_PART_SIZE + + +def test_effective_in_flight_capped_by_max_concurrency() -> None: + assert effective_in_flight(12, 8 * MIB) == 12 + assert effective_in_flight(10, 8 * MIB) == 10 + assert effective_in_flight(12, MIB) == 12 + + +def test_effective_in_flight_reduced_by_memory_budget() -> None: + assert effective_in_flight(12, 64 * MIB) == 4 + assert effective_in_flight(12, 128 * MIB) == 2 + + +def test_effective_in_flight_honors_explicit_low_concurrency() -> None: + assert effective_in_flight(1, 8 * MIB) == 1 + assert effective_in_flight(0, 8 * MIB) == 1 + assert effective_in_flight(2, 8 * MIB) == 2 + + +def test_effective_in_flight_floors_at_1_and_handles_zero() -> None: + assert effective_in_flight(12, UPLOAD_MEMORY_BUDGET * 4) == 1 + assert effective_in_flight(12, 0) == 12 + + +# --- Single-PUT path ------------------------------------------------------ + + +def test_single_upload_puts_bytes_and_finalizes( + monkeypatch: pytest.MonkeyPatch, fake_pool: _FakeStoragePool, tmp_path: Any +) -> None: + content = b"hello, hotdata" * 100 + src = tmp_path / "data.parquet" + src.write_bytes(content) + + session = UploadSessionResponse( + finalize_token="tok-123", + headers={}, + mode="single", + upload_id="up_1", + url="https://storage.test/put/up_1?sig=abc", + ) + capture: List[Tuple[Any, ...]] = [] + api = _make_api() + _patch_session(monkeypatch, api, session, capture) + + ticks: List[Tuple[int, int]] = [] + result = api.upload_file( + str(src), + content_type="application/parquet", + progress=lambda done, total: ticks.append((done, total)), + ) + + # One PUT carrying the whole file, to the single URL. + assert len(fake_pool.calls) == 1 + call = fake_pool.calls[0] + assert call["method"] == "PUT" + assert call["url"] == session.url + assert call["body"] == content + # Header isolation: only Content-Length, no SDK auth/workspace headers. + assert call["headers"]["Content-Length"] == str(len(content)) + assert "Authorization" not in call["headers"] + assert "X-Workspace-Id" not in call["headers"] + # Single PUT is sent without retry (un-replayable streaming body). + assert call["retries"] is False + + # create carried the declared size + content type; finalize sent an empty + # parts body (single uploads have no parts) with the token. + kind, create_req = capture[0] + assert kind == "create" + assert create_req.declared_size_bytes == len(content) + assert create_req.content_type == "application/parquet" + assert create_req.filename == "data.parquet" + fin = capture[1] + assert fin[0] == "finalize" and fin[1] == "up_1" and fin[2] == "tok-123" + assert fin[3].parts is None + # Finalize body serializes to `{}` (no parts), which the server requires. + assert fin[3].to_dict() == {} + + assert isinstance(result, FinalizeUploadResponse) + # Progress is monotonic and ends exactly at total. + assert ticks[0] == (0, len(content)) + assert ticks[-1] == (len(content), len(content)) + assert all(b <= len(content) for b, _ in ticks) + assert [b for b, _ in ticks] == sorted(b for b, _ in ticks) + + +def test_single_upload_missing_url_raises( + monkeypatch: pytest.MonkeyPatch, fake_pool: _FakeStoragePool, tmp_path: Any +) -> None: + src = tmp_path / "f.bin" + src.write_bytes(b"x" * 10) + session = UploadSessionResponse( + finalize_token="t", headers={}, mode="single", upload_id="u", url=None + ) + api = _make_api() + _patch_session(monkeypatch, api, session, []) + with pytest.raises(MalformedSessionError, match="missing `url`"): + api.upload_file(str(src)) + + +def test_single_upload_storage_error( + monkeypatch: pytest.MonkeyPatch, fake_pool: _FakeStoragePool, tmp_path: Any +) -> None: + src = tmp_path / "f.bin" + src.write_bytes(b"x" * 10) + url = "https://storage.test/put" + session = UploadSessionResponse( + finalize_token="t", headers={}, mode="single", upload_id="u", url=url + ) + fake_pool.responses[url] = _FakeResponse(403, body=b"SignatureDoesNotMatch") + api = _make_api() + _patch_session(monkeypatch, api, session, []) + with pytest.raises(StorageError) as ei: + api.upload_file(str(src)) + assert ei.value.status == 403 + assert ei.value.part_number is None + assert "SignatureDoesNotMatch" in ei.value.body + + +def test_zero_byte_single_upload( + monkeypatch: pytest.MonkeyPatch, fake_pool: _FakeStoragePool, tmp_path: Any +) -> None: + src = tmp_path / "empty.bin" + src.write_bytes(b"") + session = UploadSessionResponse( + finalize_token="t", + headers={}, + mode="single", + upload_id="u", + url="https://storage.test/put", + ) + api = _make_api() + _patch_session(monkeypatch, api, session, []) + ticks: List[Tuple[int, int]] = [] + api.upload_file(str(src), progress=lambda d, t: ticks.append((d, t))) + assert fake_pool.calls[0]["body"] == b"" + assert fake_pool.calls[0]["headers"]["Content-Length"] == "0" + # Terminal tick at (0, 0). + assert ticks[-1] == (0, 0) + + +# --- Multipart path ------------------------------------------------------- + + +def test_multipart_upload_slices_and_collects_etags( + monkeypatch: pytest.MonkeyPatch, fake_pool: _FakeStoragePool, tmp_path: Any +) -> None: + part_size = 5 * MIB + # 2 full parts + a remainder => 3 parts. + content = bytes((i % 256) for i in range(2 * part_size + 1234)) + src = tmp_path / "big.parquet" + src.write_bytes(content) + + part_urls = [ + "https://storage.test/p1", + "https://storage.test/p2", + "https://storage.test/p3", + ] + for i, url in enumerate(part_urls, start=1): + fake_pool.responses[url] = _FakeResponse(200, etag=f'"etag-{i}"') + + session = UploadSessionResponse( + finalize_token="mtok", + headers={}, + mode="multipart", + part_size=part_size, + part_urls=part_urls, + upload_id="up_mp", + ) + capture: List[Tuple[Any, ...]] = [] + api = _make_api() + _patch_session(monkeypatch, api, session, capture) + + ticks: List[Tuple[int, int]] = [] + api.upload_file( + str(src), + max_concurrency=2, + progress=lambda d, t: ticks.append((d, t)), + ) + + # Three PUTs, one per part. Order may vary (concurrent), so index by URL. + assert len(fake_pool.calls) == 3 + by_url = {c["url"]: c for c in fake_pool.calls} + assert by_url[part_urls[0]]["body"] == content[0:part_size] + assert by_url[part_urls[1]]["body"] == content[part_size:(2 * part_size)] + assert by_url[part_urls[2]]["body"] == content[(2 * part_size):] + # Each part PUT declares its own Content-Length and carries no auth headers. + assert by_url[part_urls[2]]["headers"]["Content-Length"] == str(len(content) - 2 * part_size) + assert "Authorization" not in by_url[part_urls[0]]["headers"] + # Part PUTs are retryable (idempotent), so a Retry object is passed. + assert by_url[part_urls[0]]["retries"] is not False + assert by_url[part_urls[0]]["retries"] is not None + + # Finalize body has the ascending part list with the storage ETags. + fin = capture[-1] + assert fin[0] == "finalize" + parts = fin[3].parts + assert [p.part_number for p in parts] == [1, 2, 3] + assert [p.e_tag for p in parts] == ['"etag-1"', '"etag-2"', '"etag-3"'] + + # Progress reaches total exactly and is non-decreasing. + assert ticks[-1] == (len(content), len(content)) + assert [b for b, _ in ticks] == sorted(b for b, _ in ticks) + + +def test_multipart_part_url_count_mismatch_raises( + monkeypatch: pytest.MonkeyPatch, fake_pool: _FakeStoragePool, tmp_path: Any +) -> None: + part_size = 5 * MIB + content = b"a" * (2 * part_size) # splits into exactly 2 parts + src = tmp_path / "f.bin" + src.write_bytes(content) + session = UploadSessionResponse( + finalize_token="t", + headers={}, + mode="multipart", + part_size=part_size, + part_urls=["https://storage.test/only-one"], # too few + upload_id="u", + ) + api = _make_api() + _patch_session(monkeypatch, api, session, []) + with pytest.raises(MalformedSessionError, match="part URLs"): + api.upload_file(str(src)) + + +def test_multipart_missing_etag_raises( + monkeypatch: pytest.MonkeyPatch, fake_pool: _FakeStoragePool, tmp_path: Any +) -> None: + part_size = 5 * MIB + content = b"a" * part_size + src = tmp_path / "f.bin" + src.write_bytes(content) + url = "https://storage.test/p1" + fake_pool.responses[url] = _FakeResponse(200, etag=None) # accepted, no ETag + session = UploadSessionResponse( + finalize_token="t", + headers={}, + mode="multipart", + part_size=part_size, + part_urls=[url], + upload_id="u", + ) + api = _make_api() + _patch_session(monkeypatch, api, session, []) + with pytest.raises(MissingETagError) as ei: + api.upload_file(str(src)) + assert ei.value.part_number == 1 + + +def test_multipart_storage_error_includes_part_number( + monkeypatch: pytest.MonkeyPatch, fake_pool: _FakeStoragePool, tmp_path: Any +) -> None: + part_size = 5 * MIB + content = b"a" * part_size + src = tmp_path / "f.bin" + src.write_bytes(content) + url = "https://storage.test/p1" + fake_pool.responses[url] = _FakeResponse(500, body=b"boom") + session = UploadSessionResponse( + finalize_token="t", + headers={}, + mode="multipart", + part_size=part_size, + part_urls=[url], + upload_id="u", + ) + api = _make_api() + _patch_session(monkeypatch, api, session, []) + with pytest.raises(StorageError) as ei: + api.upload_file(str(src)) + assert ei.value.status == 500 + assert ei.value.part_number == 1 + + +# --- Options / session shape ---------------------------------------------- + + +def test_unknown_mode_raises( + monkeypatch: pytest.MonkeyPatch, fake_pool: _FakeStoragePool, tmp_path: Any +) -> None: + src = tmp_path / "f.bin" + src.write_bytes(b"x") + session = UploadSessionResponse( + finalize_token="t", headers={}, mode="teleport", upload_id="u" + ) + api = _make_api() + _patch_session(monkeypatch, api, session, []) + with pytest.raises(MalformedSessionError, match="unknown upload mode"): + api.upload_file(str(src)) + + +def test_options_bundle_and_kwarg_precedence( + monkeypatch: pytest.MonkeyPatch, fake_pool: _FakeStoragePool, tmp_path: Any +) -> None: + src = tmp_path / "f.bin" + src.write_bytes(b"x" * 10) + session = UploadSessionResponse( + finalize_token="t", + headers={}, + mode="single", + upload_id="u", + url="https://storage.test/put", + ) + capture: List[Tuple[Any, ...]] = [] + api = _make_api() + _patch_session(monkeypatch, api, session, capture) + + opts = UploadOptions(content_type="text/csv", filename="from-opts.csv") + # Keyword filename overrides the options bundle; content_type comes from opts. + api.upload_file(str(src), filename="from-kwarg.csv", options=opts) + + create_req = capture[0][1] + assert create_req.content_type == "text/csv" + assert create_req.filename == "from-kwarg.csv" + + +def test_explicit_part_size_hint_forwarded( + monkeypatch: pytest.MonkeyPatch, fake_pool: _FakeStoragePool, tmp_path: Any +) -> None: + src = tmp_path / "f.bin" + src.write_bytes(b"x" * 10) + session = UploadSessionResponse( + finalize_token="t", + headers={}, + mode="single", + upload_id="u", + url="https://storage.test/put", + ) + capture: List[Tuple[Any, ...]] = [] + api = _make_api() + _patch_session(monkeypatch, api, session, capture) + api.upload_file(str(src), part_size=16 * MIB) + assert capture[0][1].part_size == 16 * MIB + + +def test_session_headers_replayed_on_storage_put( + monkeypatch: pytest.MonkeyPatch, fake_pool: _FakeStoragePool, tmp_path: Any +) -> None: + src = tmp_path / "f.bin" + src.write_bytes(b"x" * 10) + session = UploadSessionResponse( + finalize_token="t", + headers={"x-amz-server-side-encryption": "AES256"}, + mode="single", + upload_id="u", + url="https://storage.test/put", + ) + api = _make_api() + _patch_session(monkeypatch, api, session, []) + api.upload_file(str(src)) + hdrs = fake_pool.calls[0]["headers"] + assert hdrs["x-amz-server-side-encryption"] == "AES256" + assert hdrs["Content-Length"] == "10" + + +def test_exported_uploads_api_is_enhanced() -> None: + # `from hotdata import UploadsApi` resolves to the ergonomic subclass. + assert ExportedUploadsApi is UploadsApi + assert hasattr(ExportedUploadsApi, "upload_file") From c122e28f4fab1bd59f117218dd0e99f080113c44 Mon Sep 17 00:00:00 2001 From: Zac Farrell Date: Sat, 27 Jun 2026 15:23:15 -0700 Subject: [PATCH 2/7] fix(uploads): harden failure path, error model, and concurrency tests --- CHANGELOG.md | 8 +- hotdata/uploads.py | 151 +++++++++++++---- tests/test_uploads.py | 386 ++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 510 insertions(+), 35 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9f22722..2655440 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,8 +17,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 (`POST /v1/uploads/{id}/finalize`), returning the `FinalizeUploadResponse`. The bytes never round-trip through the API. Supports a progress callback, an auto-scaled (or caller-set) part size, bounded concurrency with a peak-memory - budget, and idempotent per-part retry. Storage `PUT`s go through a dedicated, - header-isolated pool so no SDK auth/workspace headers reach object storage. + budget, and idempotent per-part retry (tunable via `part_retry`). Storage + `PUT`s go through a dedicated, header-isolated pool so no SDK auth/workspace + headers reach object storage. Failures surface as a typed hierarchy under + `UploadError`: `StorageError` (non-2xx from storage), `StorageTransportError` + (transport failure before any response), `MissingETagError`, + `MalformedSessionError`, and `SizeLimitError`. ### Changed diff --git a/hotdata/uploads.py b/hotdata/uploads.py index 4e397ef..5198b25 100644 --- a/hotdata/uploads.py +++ b/hotdata/uploads.py @@ -45,7 +45,7 @@ import logging import os import threading -from concurrent.futures import FIRST_EXCEPTION, ThreadPoolExecutor, wait +from concurrent.futures import ThreadPoolExecutor, as_completed from dataclasses import dataclass from typing import Any, BinaryIO, Callable, Dict, List, Optional, Union @@ -93,6 +93,11 @@ #: part may already exceed the budget, so it caps *concurrency*, not part size. UPLOAD_MEMORY_BUDGET = 256 * MIB +#: The wire models sizes as a signed 64-bit integer. A declared size or part-size +#: hint beyond this is rejected up front rather than silently sent. Only reachable +#: for pathological sizes (~8 EiB), mirroring the Rust SDK's ``SizeOverflow``. +_MAX_WIRE_INT = 2**63 - 1 + #: Progress callback: ``(bytes_done_total, total)``, where ``total`` is the full #: declared file size. ``bytes_done_total`` is monotonically non-decreasing and #: reaches exactly ``total`` when the transfer completes. For a multipart upload @@ -167,6 +172,10 @@ class UploadOptions: max_concurrency: Optional[int] = None #: Optional progress callback invoked with ``(bytes_done_total, total)``. progress: Optional[UploadProgress] = None + #: ``urllib3`` retry policy for multipart part ``PUT`` s (idempotent, so safe + #: to retry). ``None`` uses :func:`default_part_retry`. The single-``PUT`` + #: path streams an un-replayable body and is always sent without retry. + part_retry: Optional[Retry] = None # --- Errors --------------------------------------------------------------- @@ -192,6 +201,37 @@ class MalformedSessionError(UploadError): """ +class SizeLimitError(UploadError): + """A size (the file's declared size, or the part-size hint) does not fit the + wire's signed 64-bit field. Only reachable for pathological sizes beyond + ~8 EiB. + """ + + def __init__(self, *, what: str, value: int) -> None: + self.what = what + self.value = value + super().__init__(f"{what} ({value} bytes) exceeds the maximum supported size") + + +class StorageTransportError(UploadError): + """A storage ``PUT`` failed at the transport layer (connection refused, + reset, TLS error, DNS failure, or retries exhausted) before any HTTP status + was returned. The underlying ``urllib3`` error is the exception ``__cause__``. + + Distinct from :class:`StorageError`, which is a completed request that + returned a non-2xx HTTP status. + """ + + def __init__(self, *, url: str, part_number: Optional[int]) -> None: + self.url = url + self.part_number = part_number + if part_number is None: + msg = "the upload transfer to storage failed before any response" + else: + msg = f"the transfer of part {part_number} to storage failed before any response" + super().__init__(msg) + + class StorageError(UploadError): """A storage ``PUT`` returned a non-2xx status. @@ -242,13 +282,14 @@ def _storage_pool() -> urllib3.PoolManager: return _STORAGE_POOL -def _part_retry() -> Retry: - """Retry policy for multipart part ``PUT`` s. +def default_part_retry() -> Retry: + """The default retry policy for multipart part ``PUT`` s. A part ``PUT`` is idempotent — storage overwrites a part by number — so a retried part cannot corrupt the upload. Retry connection errors and the transient 429/5xx statuses storage may return under load. The single-``PUT`` - path streams an un-replayable body and so is sent with ``retries=False``. + path streams an un-replayable body and so is always sent without retry. + Override per upload via :attr:`UploadOptions.part_retry`. """ return Retry( total=3, @@ -269,15 +310,34 @@ def _storage_headers(session_headers: Dict[str, str], content_length: int) -> Di return headers -def _check_storage_status(response: Any, part_number: Optional[int]) -> None: +def _put_to_storage( + url: str, + body: Any, + headers: Dict[str, str], + retries: Any, + part_number: Optional[int], +) -> Any: + """``PUT`` ``body`` to a presigned storage ``url`` on the bare, header-isolated + pool, returning the response. Wraps a transport failure as + :class:`StorageTransportError` and a non-2xx status as :class:`StorageError`. + """ + _log.debug("storage PUT %s (content-length=%s)", url, headers.get("Content-Length")) + try: + response = _storage_pool().request( + "PUT", url, body=body, headers=headers, retries=retries + ) + except urllib3.exceptions.HTTPError as exc: + raise StorageTransportError(url=url, part_number=part_number) from exc status = int(response.status) + _log.debug("storage PUT %s -> %s", url, status) if status >= 400: data = getattr(response, "data", b"") or b"" if isinstance(data, (bytes, bytearray)): - body = bytes(data).decode("utf-8", errors="replace") + error_body = bytes(data).decode("utf-8", errors="replace") else: - body = str(data) - raise StorageError(status=status, part_number=part_number, body=body) + error_body = str(data) + raise StorageError(status=status, part_number=part_number, body=error_body) + return response class _ProgressReader(io.RawIOBase): @@ -334,6 +394,7 @@ def upload_file( # type: ignore[override] part_size: Optional[int] = None, max_concurrency: Optional[int] = None, progress: Optional[UploadProgress] = None, + part_retry: Optional[Retry] = None, options: Optional[UploadOptions] = None, _request_timeout: Any = None, ) -> FinalizeUploadResponse: @@ -360,11 +421,17 @@ def upload_file( # type: ignore[override] upload (default :data:`DEFAULT_MAX_CONCURRENCY`, further bounded by a peak-memory budget). :param progress: Callback invoked with ``(bytes_done_total, total)``. + :param part_retry: ``urllib3`` retry policy for multipart part ``PUT`` s; + defaults to :func:`default_part_retry`. :param options: An :class:`UploadOptions` bundle; individual keyword arguments take precedence over its fields. + :raises SizeLimitError: the file is larger than the wire's 64-bit field. :raises MalformedSessionError: the session response was inconsistent. :raises StorageError: a storage ``PUT`` returned a non-2xx status. + :raises StorageTransportError: a storage ``PUT`` failed before any + response (connection/TLS/DNS error, or retries exhausted). :raises MissingETagError: a part ``PUT`` returned no ``ETag``. + :raises OSError: the local file could not be opened or read. :raises hotdata.exceptions.ApiException: opening the session or finalizing failed (e.g. ``501`` ``PRESIGN_UNSUPPORTED`` — the storage backend cannot issue upload URLs; use ``POST /v1/files`` instead). @@ -380,6 +447,7 @@ def upload_file( # type: ignore[override] max_concurrency if max_concurrency is not None else opts.max_concurrency ) progress = progress if progress is not None else opts.progress + part_retry = part_retry if part_retry is not None else opts.part_retry fspath = os.fspath(path) total = os.stat(fspath).st_size @@ -390,6 +458,13 @@ def upload_file( # type: ignore[override] # declared size. The server clamps it regardless. part_size_hint = part_size if part_size is not None else auto_part_size_hint(total) + # The wire models sizes as a signed i64; reject a pathological size up + # front rather than silently sending an out-of-range value. + if total > _MAX_WIRE_INT: + raise SizeLimitError(what="declared_size_bytes", value=total) + if part_size_hint > _MAX_WIRE_INT: + raise SizeLimitError(what="part_size", value=part_size_hint) + session = self.create_upload_session_handler( CreateUploadRequest( content_type=content_type, @@ -411,7 +486,8 @@ def upload_file( # type: ignore[override] parts: Optional[List[FinalizeUploadPart]] = None elif session.mode == "multipart": cap = max_concurrency if max_concurrency is not None else DEFAULT_MAX_CONCURRENCY - parts = self._upload_multipart(session, fspath, total, cap, progress) + retry = part_retry if part_retry is not None else default_part_retry() + parts = self._upload_multipart(session, fspath, total, cap, retry, progress) else: raise MalformedSessionError(f"unknown upload mode {session.mode!r}") @@ -425,10 +501,15 @@ def upload_file( # type: ignore[override] # null}`; constructing WITHOUT the field leaves it unset so it drops out # and the body is `{}`. So build the single-mode body field-free. # - # Finalize is exactly-once on the server, but it is safe under the SDK's - # default retry: that policy only retries a POST when the connection was - # reset *before* the request reached the server (so the server did no - # work) — it never replays a finalize the server actually processed. + # Finalize is exactly-once on the server. It is safe under the SDK's + # DEFAULT retry policy: that policy does no status retries (status=0), and + # urllib3 gates read-error retries (which include any ProtocolError, e.g. + # a post-response reset) by allowed_methods, which excludes POST — so a + # finalize the server already processed is never replayed. Only a + # pre-response reset (server did no work) is retried, which is safe. NOTE: + # a caller who overrides Configuration.retries with a policy that retries + # POST on a status/read error could double-finalize and see a spurious + # "already finalized" error; the default is the safe path. if parts is None: finalize_body = FinalizeUploadRequest() else: @@ -462,14 +543,13 @@ def _upload_single( with open(path, "rb") as fileobj: body = _ProgressReader(fileobj, total, progress) - response = _storage_pool().request( - "PUT", + _put_to_storage( url, - body=body, - headers=_storage_headers(session.headers, total), + body, + _storage_headers(session.headers, total), retries=False, + part_number=None, ) - _check_storage_status(response, part_number=None) # Guarantee a terminal tick at exactly `total`, even if the last chunk # boundary or an empty file left the counter short. @@ -482,6 +562,7 @@ def _upload_multipart( path: str, total: int, max_concurrency: int, + part_retry: Retry, progress: Optional[UploadProgress], ) -> List[FinalizeUploadPart]: """Multipart path: slice the file into ``part_size``-byte chunks (the @@ -511,7 +592,6 @@ def _upload_multipart( ) in_flight = effective_in_flight(max_concurrency, part_size) - retry = _part_retry() done = [0] # boxed running byte total; guarded by `lock` lock = threading.Lock() @@ -520,14 +600,13 @@ def upload_part(index: int) -> FinalizeUploadPart: offset = index * part_size length = min(part_size, total - offset) chunk = _read_range(path, offset, length) - response = _storage_pool().request( - "PUT", + response = _put_to_storage( part_urls[index], - body=chunk, - headers=_storage_headers(session.headers, length), - retries=retry, + chunk, + _storage_headers(session.headers, length), + retries=part_retry, + part_number=part_number, ) - _check_storage_status(response, part_number=part_number) etag = response.headers.get("ETag") if not etag: raise MissingETagError(part_number=part_number) @@ -539,20 +618,23 @@ def upload_part(index: int) -> FinalizeUploadPart: return FinalizeUploadPart(e_tag=etag, part_number=part_number) results: List[Optional[FinalizeUploadPart]] = [None] * len(part_urls) - with ThreadPoolExecutor(max_workers=in_flight) as executor: + executor = ThreadPoolExecutor(max_workers=in_flight) + try: futures = { executor.submit(upload_part, i): i for i in range(len(part_urls)) } - done_set, _ = wait(futures, return_when=FIRST_EXCEPTION) - # Surface the first failure (and let the context manager join the - # rest). A part PUT is idempotent, so an in-flight straggler that - # also fails or succeeds does no harm. - for future in done_set: + # Surface the first failure as soon as it lands rather than waiting + # for in-flight stragglers to drain. A part PUT is idempotent, so any + # straggler still running when we bail does no harm. + for future in as_completed(futures): exc = future.exception() if exc is not None: raise exc - for future, index in futures.items(): - results[index] = future.result() + results[futures[future]] = future.result() + finally: + # cancel_futures drops parts that have not started; already-running + # ones can't be cancelled (threads) but we no longer wait on them. + executor.shutdown(wait=False, cancel_futures=True) # results is indexed by 0-based part position, so it is already ascending # by part_number with no duplicates. @@ -579,11 +661,14 @@ def _read_range(path: str, offset: int, length: int) -> bytes: "UPLOAD_MEMORY_BUDGET", "MalformedSessionError", "MissingETagError", + "SizeLimitError", "StorageError", + "StorageTransportError", "UploadError", "UploadOptions", "UploadProgress", "UploadsApi", "auto_part_size_hint", + "default_part_retry", "effective_in_flight", ] diff --git a/tests/test_uploads.py b/tests/test_uploads.py index e92ef35..98c0e5b 100644 --- a/tests/test_uploads.py +++ b/tests/test_uploads.py @@ -17,13 +17,18 @@ from __future__ import annotations +import os import threading from typing import Any, Dict, List, Optional, Tuple import pytest +import urllib3 +from urllib3.util.retry import Retry + import hotdata.uploads as uploads_mod from hotdata import UploadsApi as ExportedUploadsApi +from hotdata.exceptions import ApiException from hotdata.uploads import ( DEFAULT_PART_SIZE, MAX_PART_SIZE, @@ -33,7 +38,9 @@ UPLOAD_MEMORY_BUDGET, MalformedSessionError, MissingETagError, + SizeLimitError, StorageError, + StorageTransportError, UploadOptions, UploadsApi, auto_part_size_hint, @@ -551,3 +558,382 @@ def test_exported_uploads_api_is_enhanced() -> None: # `from hotdata import UploadsApi` resolves to the ergonomic subclass. assert ExportedUploadsApi is UploadsApi assert hasattr(ExportedUploadsApi, "upload_file") + + +# --- Session/finalize API error paths ------------------------------------- + + +def test_create_session_501_propagates_as_api_exception( + monkeypatch: pytest.MonkeyPatch, fake_pool: _FakeStoragePool, tmp_path: Any +) -> None: + # A 501 PRESIGN_UNSUPPORTED from POST /v1/uploads surfaces as ApiException + # (the documented fallback signal), and no storage PUT is attempted. + src = tmp_path / "f.bin" + src.write_bytes(b"x" * 10) + api = _make_api() + + def boom(create_upload_request: Any, **kwargs: Any) -> Any: + raise ApiException(status=501, reason="PRESIGN_UNSUPPORTED") + + monkeypatch.setattr(api, "create_upload_session_handler", boom) + with pytest.raises(ApiException) as ei: + api.upload_file(str(src)) + assert ei.value.status == 501 + assert fake_pool.calls == [] # never reached storage + + +def test_finalize_is_called_exactly_once( + monkeypatch: pytest.MonkeyPatch, fake_pool: _FakeStoragePool, tmp_path: Any +) -> None: + src = tmp_path / "f.bin" + src.write_bytes(b"x" * 10) + session = UploadSessionResponse( + finalize_token="t", + headers={}, + mode="single", + upload_id="u", + url="https://storage.test/put", + ) + capture: List[Tuple[Any, ...]] = [] + api = _make_api() + _patch_session(monkeypatch, api, session, capture) + api.upload_file(str(src)) + finalize_calls = [c for c in capture if c[0] == "finalize"] + assert len(finalize_calls) == 1 + + +# --- Transport / size / retry plumbing ------------------------------------ + + +def test_storage_transport_error_is_wrapped( + monkeypatch: pytest.MonkeyPatch, tmp_path: Any +) -> None: + src = tmp_path / "f.bin" + src.write_bytes(b"x" * 10) + + class _ExplodingPool: + def request(self, *a: Any, **k: Any) -> Any: + raise urllib3.exceptions.ProtocolError("connection aborted") + + monkeypatch.setattr(uploads_mod, "_STORAGE_POOL", _ExplodingPool()) + session = UploadSessionResponse( + finalize_token="t", + headers={}, + mode="single", + upload_id="u", + url="https://storage.test/put", + ) + api = _make_api() + _patch_session(monkeypatch, api, session, []) + with pytest.raises(StorageTransportError) as ei: + api.upload_file(str(src)) + assert isinstance(ei.value.__cause__, urllib3.exceptions.HTTPError) + assert ei.value.part_number is None + + +def test_size_limit_guard_on_part_size_hint( + monkeypatch: pytest.MonkeyPatch, fake_pool: _FakeStoragePool, tmp_path: Any +) -> None: + src = tmp_path / "f.bin" + src.write_bytes(b"x" * 10) + api = _make_api() + # create should never be reached — the guard fires first. + monkeypatch.setattr( + api, + "create_upload_session_handler", + lambda *a, **k: pytest.fail("create should not be called"), + ) + with pytest.raises(SizeLimitError) as ei: + api.upload_file(str(src), part_size=2**63) + assert ei.value.what == "part_size" + + +def test_size_limit_guard_on_declared_size( + monkeypatch: pytest.MonkeyPatch, fake_pool: _FakeStoragePool, tmp_path: Any +) -> None: + src = tmp_path / "f.bin" + src.write_bytes(b"x" * 10) + api = _make_api() + + real_stat = os.stat + + class _HugeStat: + st_size = 2**63 + + monkeypatch.setattr( + uploads_mod.os, + "stat", + lambda p: _HugeStat() if str(p) == str(src) else real_stat(p), + ) + with pytest.raises(SizeLimitError) as ei: + api.upload_file(str(src)) + assert ei.value.what == "declared_size_bytes" + + +def test_custom_part_retry_is_forwarded( + monkeypatch: pytest.MonkeyPatch, fake_pool: _FakeStoragePool, tmp_path: Any +) -> None: + part_size = 5 * MIB + content = b"a" * part_size + src = tmp_path / "f.bin" + src.write_bytes(content) + url = "https://storage.test/p1" + fake_pool.responses[url] = _FakeResponse(200, etag='"e1"') + session = UploadSessionResponse( + finalize_token="t", + headers={}, + mode="multipart", + part_size=part_size, + part_urls=[url], + upload_id="u", + ) + api = _make_api() + _patch_session(monkeypatch, api, session, []) + + custom = Retry(total=7) + api.upload_file(str(src), part_retry=custom) + assert fake_pool.calls[0]["retries"] is custom + + +def test_default_part_retry_used_when_unset( + monkeypatch: pytest.MonkeyPatch, fake_pool: _FakeStoragePool, tmp_path: Any +) -> None: + part_size = 5 * MIB + src = tmp_path / "f.bin" + src.write_bytes(b"a" * part_size) + url = "https://storage.test/p1" + fake_pool.responses[url] = _FakeResponse(200, etag='"e1"') + session = UploadSessionResponse( + finalize_token="t", + headers={}, + mode="multipart", + part_size=part_size, + part_urls=[url], + upload_id="u", + ) + api = _make_api() + _patch_session(monkeypatch, api, session, []) + api.upload_file(str(src)) + retries = fake_pool.calls[0]["retries"] + assert isinstance(retries, Retry) + # The default policy retries idempotent PUTs on transient statuses. + assert 429 in retries.status_forcelist + + +# --- Byte-granular progress (single PUT) ---------------------------------- + + +def test_single_put_progress_is_byte_granular( + monkeypatch: pytest.MonkeyPatch, fake_pool: _FakeStoragePool, tmp_path: Any +) -> None: + # A body larger than the read block size produces multiple intermediate + # progress ticks rather than jumping 0 -> total. + content = b"z" * (300 * 1024) + src = tmp_path / "big_single.bin" + src.write_bytes(content) + session = UploadSessionResponse( + finalize_token="t", + headers={}, + mode="single", + upload_id="u", + url="https://storage.test/put", + ) + api = _make_api() + _patch_session(monkeypatch, api, session, []) + ticks: List[Tuple[int, int]] = [] + api.upload_file(str(src), progress=lambda d, t: ticks.append((d, t))) + + intermediate = [d for d, t in ticks if 0 < d < t] + assert len(intermediate) >= 2, ticks + assert ticks[-1] == (len(content), len(content)) + assert [d for d, _ in ticks] == sorted(d for d, _ in ticks) + + +# --- Concurrency is bounded and actually reached -------------------------- + + +class _ConcurrencyProbePool: + """Fake pool that measures peak simultaneous in-flight PUTs via a barrier. + + Sized so each wave has exactly ``parties`` threads; the barrier forces all + ``parties`` to be in flight at once, and the executor's worker cap forbids + more — so the recorded peak equals the effective concurrency. + """ + + def __init__(self, parties: int) -> None: + self.barrier = threading.Barrier(parties, timeout=5) + self.active = 0 + self.peak = 0 + self.lock = threading.Lock() + + def request(self, method: str, url: str, **kwargs: Any) -> _FakeResponse: + with self.lock: + self.active += 1 + self.peak = max(self.peak, self.active) + try: + self.barrier.wait() + finally: + with self.lock: + self.active -= 1 + return _FakeResponse(200, etag='"etag"') + + +def _multipart_session(part_urls: List[str], part_size: int) -> UploadSessionResponse: + return UploadSessionResponse( + finalize_token="t", + headers={}, + mode="multipart", + part_size=part_size, + part_urls=part_urls, + upload_id="u", + ) + + +def test_multipart_concurrency_is_bounded_and_reached( + monkeypatch: pytest.MonkeyPatch, tmp_path: Any +) -> None: + part_size = 5 * MIB + # 4 parts, max_concurrency=2 -> effective in-flight is 2; two waves of 2. + content = b"a" * (4 * part_size) + src = tmp_path / "mp.bin" + src.write_bytes(content) + part_urls = [f"https://storage.test/p{i}" for i in range(1, 5)] + probe = _ConcurrencyProbePool(parties=2) + monkeypatch.setattr(uploads_mod, "_STORAGE_POOL", probe) + api = _make_api() + _patch_session(monkeypatch, api, _multipart_session(part_urls, part_size), []) + + assert effective_in_flight(2, part_size) == 2 + api.upload_file(str(src), max_concurrency=2) + assert probe.peak == 2 + + +def test_multipart_serial_when_max_concurrency_one( + monkeypatch: pytest.MonkeyPatch, tmp_path: Any +) -> None: + part_size = 5 * MIB + content = b"a" * (3 * part_size) + src = tmp_path / "mp1.bin" + src.write_bytes(content) + part_urls = [f"https://storage.test/p{i}" for i in range(1, 4)] + probe = _ConcurrencyProbePool(parties=1) # barrier of 1 -> never blocks + monkeypatch.setattr(uploads_mod, "_STORAGE_POOL", probe) + api = _make_api() + _patch_session(monkeypatch, api, _multipart_session(part_urls, part_size), []) + api.upload_file(str(src), max_concurrency=1) + assert probe.peak == 1 + + +# --- More malformed-session cases ----------------------------------------- + + +def test_multipart_empty_part_urls_raises( + monkeypatch: pytest.MonkeyPatch, fake_pool: _FakeStoragePool, tmp_path: Any +) -> None: + src = tmp_path / "f.bin" + src.write_bytes(b"a" * (5 * MIB)) + session = UploadSessionResponse( + finalize_token="t", + headers={}, + mode="multipart", + part_size=5 * MIB, + part_urls=[], + upload_id="u", + ) + api = _make_api() + _patch_session(monkeypatch, api, session, []) + with pytest.raises(MalformedSessionError, match="part_urls"): + api.upload_file(str(src)) + + +def test_multipart_nonpositive_part_size_raises( + monkeypatch: pytest.MonkeyPatch, fake_pool: _FakeStoragePool, tmp_path: Any +) -> None: + src = tmp_path / "f.bin" + src.write_bytes(b"a" * 10) + session = UploadSessionResponse( + finalize_token="t", + headers={}, + mode="multipart", + part_size=0, + part_urls=["https://storage.test/p1"], + upload_id="u", + ) + api = _make_api() + _patch_session(monkeypatch, api, session, []) + with pytest.raises(MalformedSessionError, match="part_size"): + api.upload_file(str(src)) + + +# --- Header isolation under a poisoned main client ------------------------ + + +def test_poisoned_main_client_headers_do_not_reach_storage( + monkeypatch: pytest.MonkeyPatch, fake_pool: _FakeStoragePool, tmp_path: Any +) -> None: + src = tmp_path / "f.bin" + src.write_bytes(b"x" * 10) + session = UploadSessionResponse( + finalize_token="t", + headers={}, + mode="single", + upload_id="u", + url="https://storage.test/put", + ) + api = _make_api() + # Install auth/scope/UA defaults on the SDK client — they must NOT leak onto + # the storage PUT, which goes through the dedicated bare pool. + api.api_client.set_default_header("Authorization", "Bearer SECRET") + api.api_client.set_default_header("X-Workspace-Id", "ws_secret") + api.api_client.set_default_header("User-Agent", "hotdata-sdk/test") + _patch_session(monkeypatch, api, session, []) + api.upload_file(str(src)) + hdrs = fake_pool.calls[0]["headers"] + assert "Authorization" not in hdrs + assert "X-Workspace-Id" not in hdrs + assert "User-Agent" not in hdrs + assert hdrs["Content-Length"] == "10" + + +# --- Wire-level create body ----------------------------------------------- + + +def test_auto_part_size_hint_lands_on_wire( + monkeypatch: pytest.MonkeyPatch, fake_pool: _FakeStoragePool, tmp_path: Any +) -> None: + content = b"x" * (200 * 1024) + src = tmp_path / "f.bin" + src.write_bytes(content) + session = UploadSessionResponse( + finalize_token="t", + headers={}, + mode="single", + upload_id="u", + url="https://storage.test/put", + ) + capture: List[Tuple[Any, ...]] = [] + api = _make_api() + _patch_session(monkeypatch, api, session, capture) + api.upload_file(str(src)) + create_req = capture[0][1] + assert create_req.part_size == auto_part_size_hint(len(content)) + assert create_req.part_size == DEFAULT_PART_SIZE + + +def test_content_encoding_forwarded( + monkeypatch: pytest.MonkeyPatch, fake_pool: _FakeStoragePool, tmp_path: Any +) -> None: + src = tmp_path / "f.bin.gz" + src.write_bytes(b"x" * 10) + session = UploadSessionResponse( + finalize_token="t", + headers={}, + mode="single", + upload_id="u", + url="https://storage.test/put", + ) + capture: List[Tuple[Any, ...]] = [] + api = _make_api() + _patch_session(monkeypatch, api, session, capture) + api.upload_file(str(src), content_encoding="gzip") + assert capture[0][1].content_encoding == "gzip" From 9687df028ab29f1090e9aa7093477a700eeef833 Mon Sep 17 00:00:00 2001 From: Zac Farrell Date: Sat, 27 Jun 2026 16:36:31 -0700 Subject: [PATCH 3/7] feat(uploads): accept bytes/file objects and add upload_stream --- CHANGELOG.md | 7 +- README.md | 42 ++++--- hotdata/uploads.py | 274 ++++++++++++++++++++++++++++++++++++++---- tests/test_uploads.py | 205 +++++++++++++++++++++++++++++++ 4 files changed, 490 insertions(+), 38 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2655440..a9e0cde 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,7 +22,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 headers reach object storage. Failures surface as a typed hierarchy under `UploadError`: `StorageError` (non-2xx from storage), `StorageTransportError` (transport failure before any response), `MissingETagError`, - `MalformedSessionError`, and `SizeLimitError`. + `MalformedSessionError`, and `SizeLimitError`. `upload_file` accepts a path, + raw `bytes`, or a seekable binary file object. +- `hotdata.uploads.UploadsApi.upload_stream` uploads an arbitrary byte source + (`bytes` or a binary file object, streamed without buffering) to the legacy + `POST /v1/files` endpoint — the fallback when presigned uploads are + unsupported or the source is a non-seekable stream. ### Changed diff --git a/README.md b/README.md index 9063c43..f6f7f70 100644 --- a/README.md +++ b/README.md @@ -113,20 +113,34 @@ with ApiClient(Configuration(api_key="...", workspace_id="...")) as client: print(finalized.upload_id) ``` -The SDK picks single vs. multipart from the file size, auto-scales the part size, -and bounds part concurrency to a peak-memory budget (override with `part_size` / -`max_concurrency`). Storage `PUT`s go through a dedicated, header-isolated -connection pool, so the SDK's auth and workspace headers never reach object -storage (which would otherwise reject the upload). - -It raises `hotdata.uploads.StorageError` if storage rejects a `PUT`, -`MalformedSessionError` if the session response is inconsistent, and the usual -`hotdata.exceptions.ApiException` if opening the session or finalizing fails — -for example a `501` `PRESIGN_UNSUPPORTED`, meaning the backend cannot issue -upload URLs. In that case send the raw bytes to the legacy `POST /v1/files` -endpoint, which the generated client still exposes as -`hotdata.api.uploads_api.UploadsApi.upload_file(body=...)` (the enhanced -`upload_file` above takes a file path and shadows it). +`upload_file` accepts a path, raw `bytes`, or a seekable binary file object +(`size` is inferred for all three). The SDK picks single vs. multipart from the +size, auto-scales the part size, and bounds part concurrency to a peak-memory +budget (override with `part_size` / `max_concurrency` / `part_retry`). Storage +`PUT`s go through a dedicated, header-isolated connection pool, so the SDK's auth +and workspace headers never reach object storage (which would otherwise reject +the upload). + +Failures surface as a typed hierarchy under `hotdata.uploads.UploadError`: +`StorageError` (storage returned a non-2xx), `StorageTransportError` (the PUT +failed before any response), `MissingETagError`, `MalformedSessionError`, and +`SizeLimitError`. Opening the session or finalizing raises the usual +`hotdata.exceptions.ApiException` — for example a `501` `PRESIGN_UNSUPPORTED`, +meaning the backend cannot issue upload URLs. + +For that fallback (or to upload from a non-seekable stream), use `upload_stream`, +which sends the bytes to the legacy `POST /v1/files` endpoint in one request, +streaming a file object without buffering it in memory: + +```python +with open("data.parquet", "rb") as f: + resp = uploads.upload_stream(f, content_type="application/parquet") +print(resp.id) +``` + +Note `upload_file` shadows the generated raw-body `upload_file(body=...)`; that +raw operation is still reachable at +`hotdata.api.uploads_api.UploadsApi.upload_file`. ## API reference diff --git a/hotdata/uploads.py b/hotdata/uploads.py index 5198b25..b74b3e7 100644 --- a/hotdata/uploads.py +++ b/hotdata/uploads.py @@ -46,8 +46,9 @@ import os import threading from concurrent.futures import ThreadPoolExecutor, as_completed +from contextlib import contextmanager from dataclasses import dataclass -from typing import Any, BinaryIO, Callable, Dict, List, Optional, Union +from typing import Any, BinaryIO, Callable, Dict, Iterator, List, Optional, Union import urllib3 from urllib3.util.retry import Retry @@ -57,7 +58,20 @@ from hotdata.models.finalize_upload_part import FinalizeUploadPart from hotdata.models.finalize_upload_request import FinalizeUploadRequest from hotdata.models.finalize_upload_response import FinalizeUploadResponse +from hotdata.models.upload_response import UploadResponse from hotdata.models.upload_session_response import UploadSessionResponse +from hotdata.rest import RESTResponse + +#: Response-type map for the legacy ``POST /v1/files`` op, mirroring the +#: generated ``upload_file`` handler. +_UPLOAD_FILE_RESPONSE_TYPES: Dict[str, Optional[str]] = { + "201": "UploadResponse", + "400": "ApiErrorResponse", +} + +#: The accepted input types for :meth:`UploadsApi.upload_file`: a filesystem +#: path, raw bytes, or a seekable binary file object. +UploadSource = Union[str, "os.PathLike[str]", bytes, bytearray, BinaryIO] _log = logging.getLogger("hotdata.uploads") @@ -300,6 +314,20 @@ def default_part_retry() -> Retry: ) +def _to_timeout(request_timeout: Any) -> Any: + """Convert the SDK's ``_request_timeout`` (a number or ``(connect, read)`` + tuple) into a :class:`urllib3.Timeout`, or ``None`` for the pool default — + matching the generated REST client's conversion. + """ + if not request_timeout: + return None + if isinstance(request_timeout, (int, float)): + return urllib3.Timeout(total=request_timeout) + if isinstance(request_timeout, tuple) and len(request_timeout) == 2: + return urllib3.Timeout(connect=request_timeout[0], read=request_timeout[1]) + return None + + def _storage_headers(session_headers: Dict[str, str], content_length: int) -> Dict[str, str]: """Build the bare PUT headers: an explicit ``Content-Length`` plus the server-provided session headers replayed verbatim (currently always empty; @@ -386,8 +414,9 @@ class UploadsApi(_GeneratedUploadsApi): def upload_file( # type: ignore[override] self, - path: Union[str, "os.PathLike[str]"], + source: UploadSource, *, + size: Optional[int] = None, content_type: Optional[str] = None, content_encoding: Optional[str] = None, filename: Optional[str] = None, @@ -398,7 +427,7 @@ def upload_file( # type: ignore[override] options: Optional[UploadOptions] = None, _request_timeout: Any = None, ) -> FinalizeUploadResponse: - """Upload a local file directly to object storage and finalize it. + """Upload a file directly to object storage and finalize it. Opens an upload session, ``PUT`` s the bytes straight to storage (one ``PUT`` for a small file, concurrent part ``PUT`` s for a large one), @@ -407,14 +436,20 @@ def upload_file( # type: ignore[override] Read ``upload_id`` from it to load the upload into a managed table. The bytes never round-trip through the API. + ``source`` may be a filesystem path, raw ``bytes`` / ``bytearray``, or a + seekable binary file object. A non-seekable stream is not accepted here + (multipart needs positioned reads) — use :meth:`upload_stream` for that. + Per-call keyword arguments override the matching :class:`UploadOptions` field when both are given. - :param path: Path to the local file to upload. + :param source: A path, ``bytes``, or seekable binary file object. + :param size: Byte size of ``source`` when it is a file object whose size + cannot be determined (otherwise inferred from the path/bytes/seek). :param content_type: Content type to record (advisory). :param content_encoding: Content encoding to record (advisory). :param filename: Original file name to record; defaults to the path's - base name. + base name (no default for bytes / file-object sources). :param part_size: Preferred multipart part-size hint, in bytes; the server clamps it and ignores it for single-``PUT`` uploads. :param max_concurrency: Max in-flight part ``PUT`` s for a multipart @@ -449,10 +484,10 @@ def upload_file( # type: ignore[override] progress = progress if progress is not None else opts.progress part_retry = part_retry if part_retry is not None else opts.part_retry - fspath = os.fspath(path) - total = os.stat(fspath).st_size - if filename is None: - filename = os.path.basename(fspath) + src = _make_source(source, size) + total = src.size + if filename is None and isinstance(src, _PathSource): + filename = src.default_filename # Part-size hint: honor an explicit value, else auto-scale from the # declared size. The server clamps it regardless. @@ -482,12 +517,12 @@ def upload_file( # type: ignore[override] progress(0, total) if session.mode == "single": - self._upload_single(session, fspath, total, progress) + self._upload_single(session, src, total, progress) parts: Optional[List[FinalizeUploadPart]] = None elif session.mode == "multipart": cap = max_concurrency if max_concurrency is not None else DEFAULT_MAX_CONCURRENCY retry = part_retry if part_retry is not None else default_part_retry() - parts = self._upload_multipart(session, fspath, total, cap, retry, progress) + parts = self._upload_multipart(session, src, total, cap, retry, progress) else: raise MalformedSessionError(f"unknown upload mode {session.mode!r}") @@ -521,16 +556,101 @@ def upload_file( # type: ignore[override] _request_timeout=_request_timeout, ) + def upload_stream( + self, + body: Union[bytes, bytearray, BinaryIO], + *, + content_type: Optional[str] = None, + content_length: Optional[int] = None, + _request_timeout: Any = None, + ) -> UploadResponse: + """Stream an arbitrary byte source to the legacy ``POST /v1/files`` raw + upload endpoint, returning the + :class:`~hotdata.models.upload_response.UploadResponse`. + + Use this when the presigned :meth:`upload_file` path is unavailable + (e.g. a ``501`` ``PRESIGN_UNSUPPORTED``) or when the bytes come from a + non-seekable stream that :meth:`upload_file` cannot use. The body is sent + through the SDK's authenticated client (workspace + bearer headers) — not + the bare storage pool — because this request goes to the API, not object + storage. + + ``body`` may be ``bytes`` / ``bytearray`` or a readable binary file + object. A file object is streamed without being buffered into memory; set + ``content_length`` (or pass a seekable file, whose length is inferred) so + the server can reject an oversized upload before reading the body. + + :param body: The bytes or binary stream to upload. + :param content_type: Content type for the body; defaults to + ``application/octet-stream``. + :param content_length: Explicit body length in bytes. Inferred from + ``bytes`` length or a seekable file; required for a non-seekable + stream to avoid chunked transfer. + :raises hotdata.exceptions.ApiException: the upload was rejected. + """ + if isinstance(body, (bytes, bytearray)): + # bytes go through the generated serialize + transport unchanged; + # urllib3 sets Content-Length from the buffer length automatically. + data = bytes(body) + params = self._upload_file_serialize( + body=data, + _request_auth=None, + _content_type=content_type, + _headers=None, + _host_index=0, + ) + response_data = self.api_client.call_api(*params, _request_timeout=_request_timeout) + response_data.read() + return self.api_client.response_deserialize( + response_data=response_data, + response_types_map=_UPLOAD_FILE_RESPONSE_TYPES, + ).data + + # A file object: infer its length when seekable so the request is framed + # (not chunked), then stream it through the SDK's authenticated pool. + if content_length is None: + seekable = getattr(body, "seekable", None) + if callable(seekable) and body.seekable(): + current = body.tell() + content_length = body.seek(0, io.SEEK_END) - current + body.seek(current) + headers: Optional[Dict[str, str]] = ( + {"Content-Length": str(content_length)} if content_length is not None else None + ) + method, url, header_params, _body, _post = self._upload_file_serialize( + body=None, + _request_auth=None, + _content_type=content_type, + _headers=headers, + _host_index=0, + ) + # Stream via the SDK's configured pool (auth + TLS/proxy), bypassing the + # generated rest layer, which buffers and rejects file-like bodies. + raw = self.api_client.rest_client.pool_manager.request( + method, + url, + body=body, + headers=header_params, + preload_content=False, + timeout=_to_timeout(_request_timeout), + ) + rest_response = RESTResponse(raw) + rest_response.read() + return self.api_client.response_deserialize( + response_data=rest_response, + response_types_map=_UPLOAD_FILE_RESPONSE_TYPES, + ).data + # -- internals --------------------------------------------------------- def _upload_single( self, session: UploadSessionResponse, - path: str, + src: _Source, total: int, progress: Optional[UploadProgress], ) -> None: - """Single-``PUT`` path: stream the whole file to ``session.url``, + """Single-``PUT`` path: stream the whole source to ``session.url``, invoking the progress callback incrementally as chunks are sent. A streaming body cannot be replayed, so this ``PUT`` is sent once with no @@ -541,7 +661,7 @@ def _upload_single( if not url: raise MalformedSessionError("single upload missing `url`") - with open(path, "rb") as fileobj: + with src.reader() as fileobj: body = _ProgressReader(fileobj, total, progress) _put_to_storage( url, @@ -559,7 +679,7 @@ def _upload_single( def _upload_multipart( self, session: UploadSessionResponse, - path: str, + src: _Source, total: int, max_concurrency: int, part_retry: Retry, @@ -599,7 +719,7 @@ def upload_part(index: int) -> FinalizeUploadPart: part_number = index + 1 offset = index * part_size length = min(part_size, total - offset) - chunk = _read_range(path, offset, length) + chunk = src.read_range(offset, length) response = _put_to_storage( part_urls[index], chunk, @@ -641,14 +761,121 @@ def upload_part(index: int) -> FinalizeUploadPart: return [part for part in results if part is not None] -def _read_range(path: str, offset: int, length: int) -> bytes: - """Read exactly ``length`` bytes starting at ``offset``. Each multipart part - task opens its own handle so tasks never share a cursor and a retry re-reads - the same range cleanly. +class _Source: + """Random-access view over the bytes to upload, plus the total ``size``. + + Abstracts the three accepted inputs (a path, raw bytes, or a seekable file + object) behind :meth:`read_range` (positioned read of one part, used + concurrently for multipart) and :meth:`reader` (a fresh stream over the whole + content, used for the single-``PUT`` path). + """ + + size: int + + def read_range(self, offset: int, length: int) -> bytes: + raise NotImplementedError + + @contextmanager + def reader(self) -> Iterator[BinaryIO]: + raise NotImplementedError + yield # pragma: no cover - makes this a generator for the type checker + + +class _PathSource(_Source): + """A filesystem path. Each read opens its own handle so concurrent part + reads never share a cursor and a retry re-reads its range cleanly. """ - with open(path, "rb") as fileobj: - fileobj.seek(offset) - return fileobj.read(length) + + def __init__(self, path: "os.PathLike[str] | str") -> None: + self._path = os.fspath(path) + self.size = os.stat(self._path).st_size + + @property + def default_filename(self) -> str: + return os.path.basename(self._path) + + def read_range(self, offset: int, length: int) -> bytes: + with open(self._path, "rb") as fileobj: + fileobj.seek(offset) + return fileobj.read(length) + + @contextmanager + def reader(self) -> Iterator[BinaryIO]: + fileobj = open(self._path, "rb") + try: + yield fileobj + finally: + fileobj.close() + + +class _BytesSource(_Source): + """Raw in-memory bytes. Slicing is cheap and thread-safe.""" + + def __init__(self, data: bytes) -> None: + self._data = data + self.size = len(data) + + def read_range(self, offset: int, length: int) -> bytes: + return self._data[offset:offset + length] + + @contextmanager + def reader(self) -> Iterator[BinaryIO]: + yield io.BytesIO(self._data) + + +class _FileObjSource(_Source): + """A user-owned, seekable binary file object. A lock serializes the + seek+read of each part (cheap, in-memory) so concurrent part tasks never + corrupt the shared cursor; the slow ``PUT`` s still run concurrently. The + file object is never closed (the caller owns it). + """ + + def __init__(self, fileobj: BinaryIO, size: int) -> None: + self._file = fileobj + self.size = size + self._lock = threading.Lock() + + def read_range(self, offset: int, length: int) -> bytes: + with self._lock: + self._file.seek(offset) + return self._file.read(length) + + @contextmanager + def reader(self) -> Iterator[BinaryIO]: + self._file.seek(0) + yield self._file + + +def _make_source(source: UploadSource, size: Optional[int]) -> _Source: + """Normalize an upload input into a :class:`_Source`. + + Accepts a path, ``bytes`` / ``bytearray``, or a seekable binary file object. + A non-seekable stream (or any other type) raises ``TypeError`` pointing at + :meth:`UploadsApi.upload_stream`, which streams to the legacy endpoint. + """ + if isinstance(source, (bytes, bytearray)): + return _BytesSource(bytes(source)) + if isinstance(source, (str, os.PathLike)): + return _PathSource(source) + if hasattr(source, "read"): + seekable = getattr(source, "seekable", None) + if not (callable(seekable) and source.seekable()): + raise TypeError( + "upload_file needs a seekable file object (it does positioned " + "reads for multipart). For a non-seekable stream use " + "upload_stream, which sends to POST /v1/files in one request." + ) + if size is None: + current = source.tell() + size = source.seek(0, io.SEEK_END) - current + source.seek(current) + return _FileObjSource(source, size) + raise TypeError( + f"upload_file accepts a path, bytes, or a seekable binary file object, " + f"not {type(source).__name__}. (Did you mean the generated " + f"hotdata.api.uploads_api.UploadsApi.upload_file(body=...) raw op, or " + f"upload_stream?)" + ) __all__ = [ @@ -667,6 +894,7 @@ def _read_range(path: str, offset: int, length: int) -> bytes: "UploadError", "UploadOptions", "UploadProgress", + "UploadSource", "UploadsApi", "auto_part_size_hint", "default_part_retry", diff --git a/tests/test_uploads.py b/tests/test_uploads.py index 98c0e5b..f1506b8 100644 --- a/tests/test_uploads.py +++ b/tests/test_uploads.py @@ -17,6 +17,8 @@ from __future__ import annotations +import io +import json import os import threading from typing import Any, Dict, List, Optional, Tuple @@ -47,6 +49,7 @@ effective_in_flight, ) from hotdata.models.finalize_upload_response import FinalizeUploadResponse +from hotdata.models.upload_response import UploadResponse from hotdata.models.upload_session_response import UploadSessionResponse @@ -73,6 +76,20 @@ def __init__(self, status: int, *, etag: Optional[str] = None, body: bytes = b"" self.data = body +class _FakeUrllib3Response: + """Minimal urllib3.HTTPResponse stand-in for the API transport (not storage): + enough for rest.RESTResponse + ApiClient.response_deserialize. + """ + + def __init__(self, status: int, data: bytes, headers: Dict[str, str]): + self.status = status + self.reason = "OK" if 200 <= status < 300 else "Error" + self.data = data + # A plain dict (lowercased keys): ApiResponse validates headers as a dict, + # and response_deserialize looks up 'content-type'. + self.headers = {k.lower(): v for k, v in headers.items()} + + class _FakeStoragePool: """Stand-in for the bare storage urllib3.PoolManager. @@ -937,3 +954,191 @@ def test_content_encoding_forwarded( _patch_session(monkeypatch, api, session, capture) api.upload_file(str(src), content_encoding="gzip") assert capture[0][1].content_encoding == "gzip" + + +# --- upload_file from bytes / file objects -------------------------------- + + +def test_upload_file_from_bytes_single( + monkeypatch: pytest.MonkeyPatch, fake_pool: _FakeStoragePool +) -> None: + content = b"in-memory parquet bytes" * 50 + session = UploadSessionResponse( + finalize_token="t", + headers={}, + mode="single", + upload_id="u", + url="https://storage.test/put", + ) + capture: List[Tuple[Any, ...]] = [] + api = _make_api() + _patch_session(monkeypatch, api, session, capture) + api.upload_file(content, content_type="application/parquet") + assert fake_pool.calls[0]["body"] == content + # No filename is inferred for a bytes source. + assert capture[0][1].filename is None + assert capture[0][1].declared_size_bytes == len(content) + + +def test_upload_file_from_bytes_multipart( + monkeypatch: pytest.MonkeyPatch, fake_pool: _FakeStoragePool +) -> None: + part_size = 5 * MIB + content = bytes((i % 251) for i in range(2 * part_size + 99)) + part_urls = ["https://storage.test/p1", "https://storage.test/p2", "https://storage.test/p3"] + for i, url in enumerate(part_urls, start=1): + fake_pool.responses[url] = _FakeResponse(200, etag=f'"e{i}"') + session = _multipart_session(part_urls, part_size) + api = _make_api() + _patch_session(monkeypatch, api, session, []) + api.upload_file(content, max_concurrency=2) + by_url = {c["url"]: c for c in fake_pool.calls} + assert by_url[part_urls[0]]["body"] == content[0:part_size] + assert by_url[part_urls[2]]["body"] == content[2 * part_size:] + + +def test_upload_file_from_file_object( + monkeypatch: pytest.MonkeyPatch, fake_pool: _FakeStoragePool +) -> None: + content = b"seekable file object content" * 100 + session = UploadSessionResponse( + finalize_token="t", + headers={}, + mode="single", + upload_id="u", + url="https://storage.test/put", + ) + capture: List[Tuple[Any, ...]] = [] + api = _make_api() + _patch_session(monkeypatch, api, session, capture) + # Size is inferred from the seekable stream; the user's file is not closed. + fileobj = io.BytesIO(content) + api.upload_file(fileobj, filename="data.parquet") + assert fake_pool.calls[0]["body"] == content + assert capture[0][1].declared_size_bytes == len(content) + assert capture[0][1].filename == "data.parquet" + assert not fileobj.closed + + +def test_upload_file_file_object_multipart_concurrent_reads( + monkeypatch: pytest.MonkeyPatch, fake_pool: _FakeStoragePool +) -> None: + part_size = 5 * MIB + content = bytes((i % 251) for i in range(3 * part_size)) + part_urls = [f"https://storage.test/p{i}" for i in range(1, 4)] + for i, url in enumerate(part_urls, start=1): + fake_pool.responses[url] = _FakeResponse(200, etag=f'"e{i}"') + session = _multipart_session(part_urls, part_size) + api = _make_api() + _patch_session(monkeypatch, api, session, []) + api.upload_file(io.BytesIO(content), max_concurrency=3) + by_url = {c["url"]: c for c in fake_pool.calls} + # The lock-guarded positioned reads must still slice each part correctly. + assert by_url[part_urls[0]]["body"] == content[0:part_size] + assert by_url[part_urls[1]]["body"] == content[part_size:2 * part_size] + assert by_url[part_urls[2]]["body"] == content[2 * part_size:] + + +def test_upload_file_non_seekable_stream_raises(fake_pool: _FakeStoragePool) -> None: + class _NonSeekable: + def read(self, n: int = -1) -> bytes: + return b"" + + def seekable(self) -> bool: + return False + + api = _make_api() + with pytest.raises(TypeError, match="upload_stream"): + api.upload_file(_NonSeekable()) # type: ignore[arg-type] + + +def test_upload_file_bad_type_raises(fake_pool: _FakeStoragePool) -> None: + api = _make_api() + with pytest.raises(TypeError, match="path, bytes, or a seekable"): + api.upload_file(12345) # type: ignore[arg-type] + + +# --- upload_stream (legacy POST /v1/files) -------------------------------- + + +def _upload_response_json() -> bytes: + return json.dumps( + { + "content_type": "application/parquet", + "created_at": "2026-01-01T00:00:00Z", + "id": "file_123", + "size_bytes": 5, + "status": "ready", + } + ).encode() + + +def test_upload_stream_bytes_posts_to_v1_files( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setenv("HOTDATA_DISABLE_JWT_EXCHANGE", "1") + from hotdata import rest + + captured: Dict[str, Any] = {} + + def fake_request( + self: Any, + method: str, + url: str, + headers: Any = None, + body: Any = None, + post_params: Any = None, + _request_timeout: Any = None, + ) -> Any: + captured.update(method=method, url=url, body=body, headers=dict(headers or {})) + resp = _FakeUrllib3Response( + 201, _upload_response_json(), {"content-type": "application/json"} + ) + return rest.RESTResponse(resp) + + monkeypatch.setattr(rest.RESTClientObject, "request", fake_request) + api = _make_api() + out = api.upload_stream(b"hello", content_type="application/parquet") + assert captured["method"] == "POST" + assert captured["url"].endswith("/v1/files") + assert captured["body"] == b"hello" + assert captured["headers"]["Content-Type"] == "application/parquet" + assert isinstance(out, UploadResponse) + assert out.id == "file_123" + + +def test_upload_stream_file_object_streams_with_inferred_length( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setenv("HOTDATA_DISABLE_JWT_EXCHANGE", "1") + api = _make_api() + + captured: Dict[str, Any] = {} + + class _FakePoolManager: + def request(self, method: str, url: str, **kwargs: Any) -> Any: + body = kwargs["body"] + captured.update( + method=method, + url=url, + headers=dict(kwargs.get("headers") or {}), + streamed=body.read(), + preload_content=kwargs.get("preload_content"), + ) + return _FakeUrllib3Response( + 201, _upload_response_json(), {"content-type": "application/json"} + ) + + monkeypatch.setattr(api.api_client.rest_client, "pool_manager", _FakePoolManager()) + + payload = b"streamed-from-a-file-object" + out = api.upload_stream(io.BytesIO(payload)) + assert captured["method"] == "POST" + assert captured["url"].endswith("/v1/files") + # Length inferred from the seekable stream -> framed (not chunked). + assert captured["headers"]["Content-Length"] == str(len(payload)) + assert captured["streamed"] == payload + assert captured["preload_content"] is False + # Default content type when unset. + assert captured["headers"]["Content-Type"] == "application/octet-stream" + assert isinstance(out, UploadResponse) From 4fabe0a58a5def6159dbc1b2a724893550e2b245 Mon Sep 17 00:00:00 2001 From: Zac Farrell Date: Sat, 27 Jun 2026 16:52:48 -0700 Subject: [PATCH 4/7] fix(uploads): file-object cursor, finalize no-retry, session scope, short-read guard --- README.md | 14 ++-- hotdata/uploads.py | 117 +++++++++++++++++++-------- tests/test_uploads.py | 180 ++++++++++++++++++++++++++++++++++++++++-- 3 files changed, 267 insertions(+), 44 deletions(-) diff --git a/README.md b/README.md index f6f7f70..91abc4d 100644 --- a/README.md +++ b/README.md @@ -114,12 +114,14 @@ with ApiClient(Configuration(api_key="...", workspace_id="...")) as client: ``` `upload_file` accepts a path, raw `bytes`, or a seekable binary file object -(`size` is inferred for all three). The SDK picks single vs. multipart from the -size, auto-scales the part size, and bounds part concurrency to a peak-memory -budget (override with `part_size` / `max_concurrency` / `part_retry`). Storage -`PUT`s go through a dedicated, header-isolated connection pool, so the SDK's auth -and workspace headers never reach object storage (which would otherwise reject -the upload). +(`size` is inferred for all three; a file object is read from its current +position to the end). The SDK picks single vs. multipart from the size, +auto-scales the part size, and bounds part concurrency to a peak-memory budget +(override with `part_size` / `max_concurrency` / `part_retry`). Storage `PUT`s go +through a dedicated, header-isolated connection pool, so the SDK's auth and +workspace headers never reach object storage (which would otherwise reject the +upload). Finalize is sent with retries disabled so the exactly-once call is never +accidentally replayed. Failures surface as a typed hierarchy under `hotdata.uploads.UploadError`: `StorageError` (storage returned a non-2xx), `StorageTransportError` (the PUT diff --git a/hotdata/uploads.py b/hotdata/uploads.py index b74b3e7..8843296 100644 --- a/hotdata/uploads.py +++ b/hotdata/uploads.py @@ -41,6 +41,7 @@ from __future__ import annotations +import copy import io import logging import os @@ -54,6 +55,7 @@ from urllib3.util.retry import Retry from hotdata.api.uploads_api import UploadsApi as _GeneratedUploadsApi +from hotdata.api_client import ApiClient from hotdata.models.create_upload_request import CreateUploadRequest from hotdata.models.finalize_upload_part import FinalizeUploadPart from hotdata.models.finalize_upload_request import FinalizeUploadRequest @@ -439,6 +441,9 @@ def upload_file( # type: ignore[override] ``source`` may be a filesystem path, raw ``bytes`` / ``bytearray``, or a seekable binary file object. A non-seekable stream is not accepted here (multipart needs positioned reads) — use :meth:`upload_stream` for that. + A file object is uploaded from its current position to the end (its + cursor is consumed, not restored); pass ``size`` only if its length + cannot be inferred by seeking. Per-call keyword arguments override the matching :class:`UploadOptions` field when both are given. @@ -535,25 +540,15 @@ def upload_file( # type: ignore[override] # field in pydantic's `model_fields_set`, serializing to `{"parts": # null}`; constructing WITHOUT the field leaves it unset so it drops out # and the body is `{}`. So build the single-mode body field-free. - # - # Finalize is exactly-once on the server. It is safe under the SDK's - # DEFAULT retry policy: that policy does no status retries (status=0), and - # urllib3 gates read-error retries (which include any ProtocolError, e.g. - # a post-response reset) by allowed_methods, which excludes POST — so a - # finalize the server already processed is never replayed. Only a - # pre-response reset (server did no work) is retried, which is safe. NOTE: - # a caller who overrides Configuration.retries with a policy that retries - # POST on a status/read error could double-finalize and see a spurious - # "already finalized" error; the default is the safe path. if parts is None: finalize_body = FinalizeUploadRequest() else: finalize_body = FinalizeUploadRequest(parts=parts) - return self.finalize_upload_handler( + return self._finalize_handler( session.upload_id, session.finalize_token, finalize_body, - _request_timeout=_request_timeout, + _request_timeout, ) def upload_stream( @@ -588,6 +583,20 @@ def upload_stream( stream to avoid chunked transfer. :raises hotdata.exceptions.ApiException: the upload was rejected. """ + if not isinstance(body, (bytes, bytearray)) and not hasattr(body, "read"): + raise TypeError( + f"upload_stream accepts bytes or a readable binary file object, " + f"not {type(body).__name__}" + ) + + # The generated /v1/files op carries only WorkspaceId + BearerAuth; add + # the X-Session-Id scope header here when a session is configured so + # sandbox-scoped uploads keep their session context (matches the Rust SDK). + scope_headers: Dict[str, str] = {} + session_id = self.api_client.configuration.session_id + if session_id: + scope_headers["X-Session-Id"] = session_id + if isinstance(body, (bytes, bytearray)): # bytes go through the generated serialize + transport unchanged; # urllib3 sets Content-Length from the buffer length automatically. @@ -596,7 +605,7 @@ def upload_stream( body=data, _request_auth=None, _content_type=content_type, - _headers=None, + _headers=scope_headers or None, _host_index=0, ) response_data = self.api_client.call_api(*params, _request_timeout=_request_timeout) @@ -614,14 +623,14 @@ def upload_stream( current = body.tell() content_length = body.seek(0, io.SEEK_END) - current body.seek(current) - headers: Optional[Dict[str, str]] = ( - {"Content-Length": str(content_length)} if content_length is not None else None - ) + headers: Dict[str, str] = dict(scope_headers) + if content_length is not None: + headers["Content-Length"] = str(content_length) method, url, header_params, _body, _post = self._upload_file_serialize( body=None, _request_auth=None, _content_type=content_type, - _headers=headers, + _headers=headers or None, _host_index=0, ) # Stream via the SDK's configured pool (auth + TLS/proxy), bypassing the @@ -643,6 +652,36 @@ def upload_stream( # -- internals --------------------------------------------------------- + def _finalize_handler( + self, + upload_id: str, + finalize_token: str, + body: FinalizeUploadRequest, + _request_timeout: Any, + ) -> FinalizeUploadResponse: + """Finalize the upload with retries disabled. + + Finalize is exactly-once on the server: a retry of an ambiguous transient + (a 429 the server actually processed, or a lost response) would turn a + finalize that SUCCEEDED into a spurious "already finalized" error. So, + regardless of the SDK's configured retry policy, this runs through a + one-off client whose transport does no retries — mirroring the Rust SDK, + which clones its config with ``max_retries = 0`` for finalize. Auth and + scope settings are inherited via the shallow-copied configuration. + """ + finalize_config = copy.copy(self.api_client.configuration) + finalize_config.retries = False + finalize_client = ApiClient(finalize_config) + try: + return _GeneratedUploadsApi(finalize_client).finalize_upload_handler( + upload_id, + finalize_token, + body, + _request_timeout=_request_timeout, + ) + finally: + finalize_client.close() + def _upload_single( self, session: UploadSessionResponse, @@ -720,6 +759,15 @@ def upload_part(index: int) -> FinalizeUploadPart: offset = index * part_size length = min(part_size, total - offset) chunk = src.read_range(offset, length) + if len(chunk) != length: + # The source returned fewer bytes than its declared size implies + # (a truncated file, or a wrong explicit `size`). Sending it would + # mismatch Content-Length, so fail loudly before the PUT. + raise UploadError( + f"source returned {len(chunk)} bytes for part {part_number} " + f"(expected {length} at offset {offset}); it may have changed " + f"size or the declared size is wrong" + ) response = _put_to_storage( part_urls[index], chunk, @@ -752,9 +800,11 @@ def upload_part(index: int) -> FinalizeUploadPart: raise exc results[futures[future]] = future.result() finally: - # cancel_futures drops parts that have not started; already-running - # ones can't be cancelled (threads) but we no longer wait on them. - executor.shutdown(wait=False, cancel_futures=True) + # cancel_futures drops parts that have not started; wait=True then + # blocks for the few already-running workers so that, by the time + # this returns (or re-raises), no background thread is still reading + # the (possibly caller-owned) source or firing the progress callback. + executor.shutdown(wait=True, cancel_futures=True) # results is indexed by 0-based part position, so it is already ascending # by part_number with no duplicates. @@ -824,25 +874,34 @@ def reader(self) -> Iterator[BinaryIO]: class _FileObjSource(_Source): - """A user-owned, seekable binary file object. A lock serializes the - seek+read of each part (cheap, in-memory) so concurrent part tasks never - corrupt the shared cursor; the slow ``PUT`` s still run concurrently. The - file object is never closed (the caller owns it). + """A user-owned, seekable binary file object. The content uploaded is from + the object's position *at call time* to its end (the Python convention for + file-object uploads, like ``requests`` / ``boto3``); ``base`` records that + position so every read is relative to it. + + A lock serializes the seek+read of each part (cheap, in-memory) so concurrent + part tasks never corrupt the shared cursor; the slow ``PUT`` s still run + concurrently. The file object is never closed (the caller owns it), and its + cursor is not restored afterwards. """ - def __init__(self, fileobj: BinaryIO, size: int) -> None: + def __init__(self, fileobj: BinaryIO, size: Optional[int]) -> None: self._file = fileobj + self._base = fileobj.tell() + if size is None: + size = fileobj.seek(0, io.SEEK_END) - self._base + fileobj.seek(self._base) self.size = size self._lock = threading.Lock() def read_range(self, offset: int, length: int) -> bytes: with self._lock: - self._file.seek(offset) + self._file.seek(self._base + offset) return self._file.read(length) @contextmanager def reader(self) -> Iterator[BinaryIO]: - self._file.seek(0) + self._file.seek(self._base) yield self._file @@ -865,10 +924,6 @@ def _make_source(source: UploadSource, size: Optional[int]) -> _Source: "reads for multipart). For a non-seekable stream use " "upload_stream, which sends to POST /v1/files in one request." ) - if size is None: - current = source.tell() - size = source.seek(0, io.SEEK_END) - current - source.seek(current) return _FileObjSource(source, size) raise TypeError( f"upload_file accepts a path, bytes, or a seekable binary file object, " diff --git a/tests/test_uploads.py b/tests/test_uploads.py index f1506b8..577c0a3 100644 --- a/tests/test_uploads.py +++ b/tests/test_uploads.py @@ -43,6 +43,7 @@ SizeLimitError, StorageError, StorageTransportError, + UploadError, UploadOptions, UploadsApi, auto_part_size_hint, @@ -161,13 +162,11 @@ def fake_create(create_upload_request: Any, **kwargs: Any) -> UploadSessionRespo def fake_finalize( upload_id: str, - x_upload_finalize_token: str, - finalize_upload_request: Any = None, - **kwargs: Any, + finalize_token: str, + body: Any = None, + _request_timeout: Any = None, ) -> FinalizeUploadResponse: - finalize_capture.append( - ("finalize", upload_id, x_upload_finalize_token, finalize_upload_request) - ) + finalize_capture.append(("finalize", upload_id, finalize_token, body)) from datetime import datetime, timezone return FinalizeUploadResponse( @@ -179,7 +178,10 @@ def fake_finalize( ) monkeypatch.setattr(api, "create_upload_session_handler", fake_create) - monkeypatch.setattr(api, "finalize_upload_handler", fake_finalize) + # Stub the internal finalize seam (upload_file calls _finalize_handler, which + # routes through a one-off no-retry client); see test_finalize_uses_no_retry_client + # for the real seam's behavior. + monkeypatch.setattr(api, "_finalize_handler", fake_finalize) def _make_api() -> UploadsApi: @@ -1141,4 +1143,168 @@ def request(self, method: str, url: str, **kwargs: Any) -> Any: assert captured["preload_content"] is False # Default content type when unset. assert captured["headers"]["Content-Type"] == "application/octet-stream" + # The SDK's User-Agent and scope auth ride the streamed request too. + assert "User-Agent" in captured["headers"] + assert captured["headers"]["X-Workspace-Id"] == "ws_test" assert isinstance(out, UploadResponse) + + +def test_upload_stream_bad_type_raises() -> None: + api = _make_api() + with pytest.raises(TypeError, match="bytes or a readable"): + api.upload_stream(12345) # type: ignore[arg-type] + + +def _make_api_with_session(session_id: str) -> UploadsApi: + from hotdata import ApiClient, Configuration + + config = Configuration( + host="https://api.hotdata.test", + api_key="test-key", + workspace_id="ws_test", + session_id=session_id, + ) + return UploadsApi(ApiClient(config)) + + +def test_upload_stream_sends_session_and_workspace_scope( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setenv("HOTDATA_DISABLE_JWT_EXCHANGE", "1") + from hotdata import rest + + captured: Dict[str, Any] = {} + + def fake_request( + self: Any, + method: str, + url: str, + headers: Any = None, + body: Any = None, + post_params: Any = None, + _request_timeout: Any = None, + ) -> Any: + captured["headers"] = dict(headers or {}) + resp = _FakeUrllib3Response( + 201, _upload_response_json(), {"content-type": "application/json"} + ) + return rest.RESTResponse(resp) + + monkeypatch.setattr(rest.RESTClientObject, "request", fake_request) + api = _make_api_with_session("sb_xyz") + api.upload_stream(b"hi") + assert captured["headers"]["X-Session-Id"] == "sb_xyz" + assert captured["headers"]["X-Workspace-Id"] == "ws_test" + + +# --- Finalize hardening ---------------------------------------------------- + + +def test_finalize_uses_no_retry_client( + monkeypatch: pytest.MonkeyPatch, fake_pool: _FakeStoragePool +) -> None: + # Don't stub _finalize_handler: let the real one run and route through its + # one-off no-retry client. Capture the retries policy the generated finalize + # sees. + from datetime import datetime, timezone + + from hotdata.api.uploads_api import UploadsApi as Gen + + session = UploadSessionResponse( + finalize_token="t", + headers={}, + mode="single", + upload_id="u", + url="https://storage.test/put", + ) + api = _make_api() + monkeypatch.setattr( + api, "create_upload_session_handler", lambda *a, **k: session + ) + + captured: Dict[str, Any] = {} + + def fake_gen_finalize( + self: Any, + upload_id: str, + x_upload_finalize_token: str, + finalize_upload_request: Any = None, + **kwargs: Any, + ) -> FinalizeUploadResponse: + captured["retries"] = self.api_client.configuration.retries + return FinalizeUploadResponse( + content_type=None, + created_at=datetime(2026, 1, 1, tzinfo=timezone.utc), + size_bytes=0, + status="ready", + upload_id=upload_id, + ) + + monkeypatch.setattr(Gen, "finalize_upload_handler", fake_gen_finalize) + api.upload_file(b"hello world") + # The exactly-once finalize must run with retries disabled regardless of the + # SDK's configured policy. + assert captured["retries"] is False + + +# --- File-object cursor + short-read correctness -------------------------- + + +def test_upload_file_file_object_mid_cursor_single( + monkeypatch: pytest.MonkeyPatch, fake_pool: _FakeStoragePool +) -> None: + # A file object positioned mid-stream uploads from the cursor to the end + # (the tail), not from byte 0. + content = b"H" * 100 + b"T" * 200 + fobj = io.BytesIO(content) + fobj.seek(100) + session = UploadSessionResponse( + finalize_token="t", + headers={}, + mode="single", + upload_id="u", + url="https://storage.test/put", + ) + capture: List[Tuple[Any, ...]] = [] + api = _make_api() + _patch_session(monkeypatch, api, session, capture) + api.upload_file(fobj) + assert fake_pool.calls[0]["body"] == b"T" * 200 + assert capture[0][1].declared_size_bytes == 200 + + +def test_upload_file_file_object_mid_cursor_multipart( + monkeypatch: pytest.MonkeyPatch, fake_pool: _FakeStoragePool +) -> None: + part_size = 5 * MIB + head = b"H" * 10 + tail = bytes((i % 251) for i in range(2 * part_size + 77)) + fobj = io.BytesIO(head + tail) + fobj.seek(len(head)) + part_urls = ["https://storage.test/p1", "https://storage.test/p2", "https://storage.test/p3"] + for i, url in enumerate(part_urls, start=1): + fake_pool.responses[url] = _FakeResponse(200, etag=f'"e{i}"') + session = _multipart_session(part_urls, part_size) + api = _make_api() + _patch_session(monkeypatch, api, session, []) + api.upload_file(fobj, max_concurrency=3) + by_url = {c["url"]: c for c in fake_pool.calls} + assert by_url[part_urls[0]]["body"] == tail[0:part_size] + assert by_url[part_urls[1]]["body"] == tail[part_size:2 * part_size] + assert by_url[part_urls[2]]["body"] == tail[2 * part_size:] + + +def test_multipart_short_read_raises( + monkeypatch: pytest.MonkeyPatch, fake_pool: _FakeStoragePool +) -> None: + part_size = 5 * MIB + # Only one part's worth of bytes, but we declare two parts' worth via `size`. + fobj = io.BytesIO(b"a" * part_size) + part_urls = ["https://storage.test/p1", "https://storage.test/p2"] + for url in part_urls: + fake_pool.responses[url] = _FakeResponse(200, etag='"e"') + session = _multipart_session(part_urls, part_size) + api = _make_api() + _patch_session(monkeypatch, api, session, []) + with pytest.raises(UploadError, match="returned 0 bytes"): + api.upload_file(fobj, size=2 * part_size) From f9b8ee117fde135036ccd4a95fec3314879bb9a3 Mon Sep 17 00:00:00 2001 From: Zac Farrell Date: Sat, 27 Jun 2026 20:07:11 -0700 Subject: [PATCH 5/7] fix(uploads): guard single-PUT byte count against size mismatch --- hotdata/uploads.py | 24 ++++++++++++++++++++-- tests/test_uploads.py | 47 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 69 insertions(+), 2 deletions(-) diff --git a/hotdata/uploads.py b/hotdata/uploads.py index 8843296..b71be61 100644 --- a/hotdata/uploads.py +++ b/hotdata/uploads.py @@ -393,16 +393,27 @@ def __init__( self._progress = progress self._done = 0 + @property + def bytes_read(self) -> int: + """Total bytes actually pulled from the source so far.""" + return self._done + def readable(self) -> bool: return True def readinto(self, b: Any) -> int: - chunk = self._file.read(len(b)) + # Never read past the declared size: a source longer than `total` (a + # growing file, or a wrong explicit `size`) must not spill extra bytes + # past `Content-Length` and corrupt the pooled connection. + remaining = self._total - self._done + if remaining <= 0: + return 0 + chunk = self._file.read(min(len(b), remaining)) if not chunk: return 0 n = len(chunk) b[:n] = chunk - self._done = min(self._done + n, self._total) + self._done += n if self._progress is not None: self._progress(self._done, self._total) return n @@ -709,6 +720,15 @@ def _upload_single( retries=False, part_number=None, ) + # The source ran out before its declared size: we PUT a body shorter + # than `Content-Length`. Fail loudly before finalizing rather than + # commit a truncated object (mirrors the multipart short-read guard). + if body.bytes_read != total: + raise UploadError( + f"source provided {body.bytes_read} bytes but the declared " + f"size is {total}; it may have changed size or the declared " + f"size is wrong" + ) # Guarantee a terminal tick at exactly `total`, even if the last chunk # boundary or an empty file left the counter short. diff --git a/tests/test_uploads.py b/tests/test_uploads.py index 577c0a3..cd95253 100644 --- a/tests/test_uploads.py +++ b/tests/test_uploads.py @@ -1308,3 +1308,50 @@ def test_multipart_short_read_raises( _patch_session(monkeypatch, api, session, []) with pytest.raises(UploadError, match="returned 0 bytes"): api.upload_file(fobj, size=2 * part_size) + + +def test_single_short_read_raises( + monkeypatch: pytest.MonkeyPatch, fake_pool: _FakeStoragePool +) -> None: + # Declare more bytes than the source actually has on the single-PUT path: + # the source runs out before `Content-Length`, so it must fail loudly + # before finalizing rather than PUT a body short of its declared length. + content = b"a" * 100 + fobj = io.BytesIO(content) + session = UploadSessionResponse( + finalize_token="t", + headers={}, + mode="single", + upload_id="u", + url="https://storage.test/put", + ) + capture: List[Tuple[Any, ...]] = [] + api = _make_api() + _patch_session(monkeypatch, api, session, capture) + with pytest.raises(UploadError, match="100 bytes.*declared size is 200"): + api.upload_file(fobj, size=200) + # The short body must not have been finalized. + assert not any(c[0] == "finalize" for c in capture) + + +def test_single_over_read_caps_at_declared_size( + monkeypatch: pytest.MonkeyPatch, fake_pool: _FakeStoragePool +) -> None: + # Declare fewer bytes than the source has on the single-PUT path: the PUT + # must send exactly the declared count, never spilling extra bytes past + # `Content-Length` (which would corrupt the pooled connection). + content = b"b" * 200 + fobj = io.BytesIO(content) + session = UploadSessionResponse( + finalize_token="t", + headers={}, + mode="single", + upload_id="u", + url="https://storage.test/put", + ) + api = _make_api() + _patch_session(monkeypatch, api, session, []) + api.upload_file(fobj, size=50) + call = fake_pool.calls[0] + assert call["body"] == b"b" * 50 + assert call["headers"]["Content-Length"] == "50" From e09eda40f9b9ef2fbfa876cfec890c5f2afc68f6 Mon Sep 17 00:00:00 2001 From: Zac Farrell Date: Sat, 27 Jun 2026 20:18:39 -0700 Subject: [PATCH 6/7] fix(uploads): fire multipart progress callback under the lock --- hotdata/uploads.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/hotdata/uploads.py b/hotdata/uploads.py index b71be61..1090af1 100644 --- a/hotdata/uploads.py +++ b/hotdata/uploads.py @@ -799,10 +799,14 @@ def upload_part(index: int) -> FinalizeUploadPart: if not etag: raise MissingETagError(part_number=part_number) if progress is not None: + # Fire the callback under the lock so delivery order matches the + # computed order: releasing the lock first would let two workers + # race and deliver ticks out of order, breaking the documented + # monotonically-non-decreasing guarantee. Callbacks are expected + # to be cheap (and do their own locking), so serializing is fine. with lock: done[0] = min(done[0] + length, total) - current = done[0] - progress(current, total) + progress(done[0], total) return FinalizeUploadPart(e_tag=etag, part_number=part_number) results: List[Optional[FinalizeUploadPart]] = [None] * len(part_urls) From c89c39dfc53241f0a601b0bb2a0abc037173094f Mon Sep 17 00:00:00 2001 From: Zac Farrell Date: Sat, 27 Jun 2026 20:18:39 -0700 Subject: [PATCH 7/7] docs(uploads): make changelog upload notes user-facing --- CHANGELOG.md | 29 ++++++++++++----------------- 1 file changed, 12 insertions(+), 17 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a9e0cde..21f69e9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,25 +9,20 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added -- `hotdata.uploads.UploadsApi` (the default `hotdata.UploadsApi`) gains - `upload_file(path, ...)`, a transparent direct-to-storage upload that - orchestrates the presigned flow end to end: it opens a session - (`POST /v1/uploads`), `PUT`s the bytes **straight to object storage** (a single - `PUT` for a small file, concurrent part `PUT`s for a large one), and finalizes - (`POST /v1/uploads/{id}/finalize`), returning the `FinalizeUploadResponse`. The +- `hotdata.UploadsApi` gains `upload_file(source, ...)`, a transparent + direct-to-storage upload. Give it a file path, raw `bytes`, or a seekable + binary file object and it opens an upload session, sends the data **straight + to object storage** (a single request for a small file, concurrent multipart + for a large one), and finalizes — returning the `FinalizeUploadResponse`. Your bytes never round-trip through the API. Supports a progress callback, an auto-scaled (or caller-set) part size, bounded concurrency with a peak-memory - budget, and idempotent per-part retry (tunable via `part_retry`). Storage - `PUT`s go through a dedicated, header-isolated pool so no SDK auth/workspace - headers reach object storage. Failures surface as a typed hierarchy under - `UploadError`: `StorageError` (non-2xx from storage), `StorageTransportError` - (transport failure before any response), `MissingETagError`, - `MalformedSessionError`, and `SizeLimitError`. `upload_file` accepts a path, - raw `bytes`, or a seekable binary file object. -- `hotdata.uploads.UploadsApi.upload_stream` uploads an arbitrary byte source - (`bytes` or a binary file object, streamed without buffering) to the legacy - `POST /v1/files` endpoint — the fallback when presigned uploads are - unsupported or the source is a non-seekable stream. + budget, and idempotent per-part retry (tunable via `part_retry`). Failures + raise a typed hierarchy under `UploadError`: `StorageError`, + `StorageTransportError`, `MissingETagError`, `MalformedSessionError`, and + `SizeLimitError`. +- `hotdata.UploadsApi.upload_stream` uploads `bytes` or a binary stream + (streamed without buffering) in a single request — the fallback for when + direct-to-storage uploads aren't available or the source isn't seekable. ### Changed