Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 110 additions & 19 deletions hotdata/_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@
* **Refresh, then re-mint** -- prefer the refresh token when available; on
refresh failure, re-mint from the held API token (always possible since the
SDK holds it). Matches the CLI.
* **Transient-failure retry** -- a momentary ``5xx`` or a transport error on the
token endpoint is retried with bounded exponential backoff + jitter
(``_MAX_ATTEMPTS`` total) before giving up, so a brief server-side blip does
not fail the caller (#113). A ``4xx`` is never retried -- a bad/expired
credential is not transient.
* **TLS/proxy reuse** -- the exchange call reuses the SDK's configured TLS,
client cert and proxy settings (see :func:`_pool_from_config`) so it behaves
like every other SDK request, with a bounded timeout so a stalled token
Expand All @@ -36,6 +41,7 @@

import json
import os
import random
import ssl
import threading
import time
Expand All @@ -47,6 +53,15 @@
_TIMEOUT = 30.0 # seconds -- never let a stalled token endpoint hang every request
_CLIENT_ID = "hotdata-python-sdk"

# Bounded retry of *transient* token-exchange failures (#113). A momentary 5xx
# or a transport error (connection/read failure) on the token endpoint should
# not fail the caller outright -- an immediate re-attempt typically succeeds.
# We retry those, but never a 4xx (a bad/expired credential is not transient).
_MAX_ATTEMPTS = 3 # one initial attempt + up to two retries
_BACKOFF_BASE = 0.1 # seconds -- first retry waits ~this, doubling thereafter
_BACKOFF_MAX = 2.0 # cap on a single backoff so a flapping host can't stall us
_BACKOFF_JITTER = 0.5 # additive jitter fraction (delay in [base, 1.5*base]) to spread retries out

# Env var that disables exchange entirely. Used as a hard escape hatch during
# the rollout window and for local/dev setups. Only affirmative values opt out
# (see _DISABLE_VALUES) so that ``=0`` / ``=false`` do NOT silently disable it.
Expand Down Expand Up @@ -109,8 +124,11 @@ def _pool_from_config(configuration):
pool_args["server_hostname"] = configuration.tls_server_name
if configuration.socket_options is not None:
pool_args["socket_options"] = configuration.socket_options
# `retries`/`maxsize` are intentionally not mirrored: the exchange is a
# single bounded-timeout request that fails fast rather than retrying.
# `retries`/`maxsize` are intentionally not mirrored: urllib3's own Retry is
# left at urllib3's default (it does not retry POST status codes anyway), so
# we do not inherit the SDK's connection-reset retry here. Transient-failure
# retry for the exchange is handled explicitly in `_TokenManager._exchange`
# (5xx + transport errors, bounded backoff), where we can keep 4xx fatal.

if configuration.proxy:
if _is_socks_proxy_url(configuration.proxy):
Expand All @@ -125,6 +143,29 @@ def _pool_from_config(configuration):
return urllib3.PoolManager(**pool_args)


def _is_transient_status(status):
"""True for HTTP statuses worth retrying (server-side, likely momentary).

Only 5xx is transient: the request reached the server but it failed to
handle it (e.g. a brief ``500``/``503``). A 4xx -- including ``400``/``401``
from a bad or expired credential -- is a definitive rejection that a retry
will not fix, so it is never retried.
"""
return 500 <= status < 600


def _backoff_delay(attempt):
"""Seconds to sleep before retry number ``attempt`` (0 = first retry).

Exponential growth from ``_BACKOFF_BASE`` (doubling per attempt) capped at
``_BACKOFF_MAX``, plus additive jitter in ``[0, _BACKOFF_JITTER * base]`` so
concurrent clients retrying the same blip don't resynchronize into a thundering
herd. Mirrors the Rust SDK's ``backoff_delay``.
"""
base = min(_BACKOFF_BASE * (2 ** attempt), _BACKOFF_MAX)
return base * (1 + _BACKOFF_JITTER * random.random())


class _TokenManager:
"""Exchanges an API token for short-lived JWTs and keeps them fresh.

Expand All @@ -134,10 +175,11 @@ class _TokenManager:
exchanged.
"""

def __init__(self, credential, configuration, pool=None):
def __init__(self, credential, configuration, pool=None, sleep=None):
self._credential = credential
self._config = configuration # read host + TLS lazily at mint time
self._pool = pool # injected in tests; else built from config TLS
self._sleep = sleep or time.sleep # injected in tests so retry backoff is instant
self._lock = threading.Lock()
self._jwt = None
self._exp = 0.0
Expand Down Expand Up @@ -168,8 +210,18 @@ def bearer_value(self):
"""
if not self._needs_exchange:
return self._credential # already a JWT (or opt-out) -> unchanged
# Lock-free fast path: a still-valid cached JWT needs no mint and must
# not block behind an in-flight (possibly retrying) mint that holds the
# lock for up to several timeouts. Attribute reads are atomic under the
# GIL; the worst a benign jwt/exp race can do is fall through to re-check
# under the lock, and the _LEEWAY margin keeps a token read here valid on
# the wire even if it is about to be rotated.
jwt = self._jwt
if jwt and time.time() < self._exp - _LEEWAY:
return jwt
with self._lock:
# Fast path: a still-valid cached JWT, no network call.
# Re-check under the lock: another thread may have minted a fresh JWT
# while we waited (double-checked locking).
if self._jwt and time.time() < self._exp - _LEEWAY:
return self._jwt
# Prefer the refresh token; on failure, drop it and re-mint below.
Expand All @@ -187,24 +239,12 @@ def _mint(self, params):
# -- a non-200, a transport error, or a malformed/missing-token body --
# returns False so the caller re-mints from the held API token. An
# api_token mint instead raises TokenExchangeError on any failure, since
# there is no further fallback.
# there is no further fallback. Transient failures (5xx + transport
# errors) are retried inside _exchange before either outcome.
params["client_id"] = _CLIENT_ID
is_refresh = params["grant_type"] == "refresh_token"
try:
pool = self._pool or _pool_from_config(self._config) # reuses ssl_ca_cert/cert/proxy
host = self._config.host.rstrip("/") # read host lazily -- may be set post-construct
resp = pool.request(
"POST",
f"{host}/v1/auth/jwt",
body=urlencode(params),
headers={"Content-Type": "application/x-www-form-urlencoded"},
timeout=_TIMEOUT,
)
if resp.status != 200:
raise TokenExchangeError(
f"token exchange failed: {resp.status} {resp.data[:200]!r}"
)
data = json.loads(resp.data)
data = self._exchange(params)
token = data["access_token"]
expires_in = float(data.get("expires_in", 300))
except (
Expand All @@ -224,6 +264,57 @@ def _mint(self, params):
self._refresh = data.get("refresh_token") or self._refresh
return True

def _exchange(self, params):
# POST the token-exchange request, retrying transient failures, and
# return the parsed JSON body of the 200 response.
#
# Retries 5xx responses and transport errors (urllib3 HTTPError, e.g. a
# connection/read failure) with bounded exponential backoff + jitter, up
# to _MAX_ATTEMPTS total. A 4xx is returned immediately as a fatal
# TokenExchangeError -- bad/expired credentials are not transient. Once
# the budget is exhausted, the last failure is surfaced: a 5xx as a
# TokenExchangeError preserving the status/body, a transport error as the
# raised HTTPError (which _mint wraps). JSON/missing-token errors from
# parsing a 200 body propagate unretried.
pool = self._pool or _pool_from_config(self._config) # reuses ssl_ca_cert/cert/proxy
host = self._config.host.rstrip("/") # read host lazily -- may be set post-construct
url = f"{host}/v1/auth/jwt"
body = urlencode(params)
headers = {"Content-Type": "application/x-www-form-urlencoded"}
for attempt in range(_MAX_ATTEMPTS):
last = attempt == _MAX_ATTEMPTS - 1
try:
resp = pool.request(
"POST",
url,
body=body,
headers=headers,
timeout=_TIMEOUT,
# Disable urllib3's own per-request retries so this loop is
# the sole arbiter of the attempt budget. Otherwise urllib3's
# default (Retry(3)) would retry connection errors *inside*
# each attempt, multiplying the effective transport-attempt
# count well past _MAX_ATTEMPTS.
retries=False,
)
except urllib3.exceptions.HTTPError:
# Transport-level failure (connection/read error): transient, but
# don't retry past the budget -- re-raise for _mint to handle.
if last:
raise
self._sleep(_backoff_delay(attempt))
continue
if resp.status == 200:
return json.loads(resp.data)
if _is_transient_status(resp.status) and not last:
self._sleep(_backoff_delay(attempt))
continue
# A 4xx, or a 5xx with the retry budget exhausted: fatal, surfacing
# the last status/body.
raise TokenExchangeError(
f"token exchange failed: {resp.status} {resp.data[:200]!r}"
)


__all__ = [
"TokenExchangeError",
Expand Down
Loading