-
Notifications
You must be signed in to change notification settings - Fork 2
feat: Cross-region internal download acceleration via inter-cloud connections #32
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -25,10 +25,12 @@ | |
| import io | ||
| import os | ||
| import time | ||
| import re | ||
| import uuid | ||
| from concurrent.futures import ThreadPoolExecutor, as_completed | ||
| from pathlib import Path | ||
| from typing import TYPE_CHECKING | ||
| from urllib.parse import urlparse | ||
|
|
||
| import requests | ||
| from tqdm.auto import tqdm | ||
|
|
@@ -44,6 +46,7 @@ | |
| ENV_FILE_LOCK, | ||
| ENV_INTRA_CLOUD_ACCELERATION, | ||
| ENV_INTRA_CLOUD_REGION, | ||
| ENV_INTER_CLOUD_REGIONS, | ||
| ) | ||
| from .errors import ( | ||
| CacheNotFound, | ||
|
|
@@ -368,6 +371,98 @@ def _get(url: str, timeout: float): | |
| self._cached_region = region_id | ||
| return region_id | ||
|
|
||
| # OSS internal endpoint hostname pattern: | ||
| # <prefix>.oss<region>-internal.aliyuncs.com | ||
| # e.g. modelhub-cn-hangzhou.oss-cn-hangzhou-internal.aliyuncs.com | ||
| _OSS_INTERNAL_RE = re.compile( | ||
| r".*\.oss.*-internal\.aliyuncs\.com$" | ||
| ) | ||
|
|
||
| @staticmethod | ||
| def _is_oss_internal_url(url: str) -> bool: | ||
| """Check if a URL points to an OSS internal (intranet) endpoint. | ||
|
|
||
| Matches hostnames like: | ||
| modelhub-cn-hangzhou.oss-cn-hangzhou-internal.aliyuncs.com | ||
| """ | ||
| try: | ||
| host = urlparse(url).hostname or "" | ||
| except Exception: | ||
| return False | ||
| return bool(DownloadManager._OSS_INTERNAL_RE.match(host)) | ||
|
|
||
| @staticmethod | ||
| def _probe_redirect_url( | ||
| url: str, | ||
| headers: dict[str, str], | ||
| cookies: dict | None = None, | ||
| timeout: float = 5.0, | ||
| ) -> str: | ||
| """Send a HEAD request without following redirects to get the 302 Location.""" | ||
| try: | ||
| r = requests.head( | ||
| url, headers=headers, cookies=cookies, | ||
| allow_redirects=False, timeout=timeout, | ||
| ) | ||
| if r.status_code in (301, 302, 303, 307, 308): | ||
| return r.headers.get("Location", "") | ||
| except requests.exceptions.RequestException: | ||
| pass | ||
| return "" | ||
|
|
||
| def _get_inter_cloud_regions(self) -> list[str]: | ||
| """Read the inter-cloud peer region list from the environment.""" | ||
| from .constants import _env | ||
| raw = _env(ENV_INTER_CLOUD_REGIONS, "INTER_CLOUD_ACCELERATION_REGIONS") or "" | ||
| return [r.strip().lower() for r in raw.split(",") if r.strip()] | ||
|
|
||
| def _resolve_inter_region_headers( | ||
| self, | ||
| url: str, | ||
| headers: dict[str, str], | ||
| cookies: dict | None = None, | ||
| peer_regions: list[str] | None = None, | ||
| ) -> tuple[dict[str, str], str]: | ||
| """Probe peer regions and return headers with the best region for OSS internal download. | ||
|
|
||
| Steps: | ||
| 1. Probe with current (local) region — if already OSS internal, return as-is. | ||
| 2. Try each peer region in order (skip duplicates of local) — first OSS internal hit wins. | ||
| 3. If all miss, fall back to original headers (default path). | ||
|
|
||
| Returns: | ||
| (headers, source) where source is "local", "peer", or "default". | ||
| """ | ||
| if peer_regions is None: | ||
| peer_regions = self._get_inter_cloud_regions() | ||
| if not peer_regions: | ||
| return headers, "default" | ||
|
|
||
| current_region = headers.get("x-aliyun-region-id", "").lower() | ||
|
|
||
| # Step 1: Probe with current (local) region | ||
| redirect_url = self._probe_redirect_url(url, headers, cookies) | ||
| if self._is_oss_internal_url(redirect_url): | ||
| logger.debug("Inter-region: local region already yields OSS internal URL, skipping.") | ||
| return headers, "local" | ||
|
|
||
| # Step 2: Try each peer region in order | ||
| for peer_region in peer_regions: | ||
| if peer_region == current_region: | ||
| continue | ||
| probe_headers = {**headers, "x-aliyun-region-id": peer_region} | ||
| redirect_url = self._probe_redirect_url(url, probe_headers, cookies) | ||
| if self._is_oss_internal_url(redirect_url): | ||
| logger.debug( | ||
| 'Inter-region acceleration: using peer region "%s" (OSS internal).', | ||
| peer_region, | ||
| ) | ||
| return probe_headers, "peer" | ||
|
|
||
| # Step 3: All peers missed | ||
| logger.debug("Inter-region: no peer region yields OSS internal URL, using default path.") | ||
| return headers, "default" | ||
|
|
||
| def _build_download_headers( | ||
| self, | ||
| user_agent: dict | str | None = None, | ||
|
|
@@ -764,6 +859,28 @@ def _download_with_resume( | |
|
|
||
| download_headers = self._build_download_headers(user_agent) | ||
|
|
||
| # Inter-region acceleration: probe peer regions for OSS internal URL | ||
| # Progress bar prefix: "⚡ " = local OSS, "⇄ " = peer OSS, " " = CDN, "" = not configured | ||
| source_prefix = "" | ||
| peer_regions = self._get_inter_cloud_regions() | ||
| if peer_regions: | ||
| try: | ||
| probe_url = self._client.get_download_url( | ||
| repo_id, repo_type, file_path, revision, | ||
| ) | ||
| cookies = None | ||
| if self._client.token: | ||
| cookies = {"m_session_id": self._client.token} | ||
| download_headers, source = self._resolve_inter_region_headers( | ||
| probe_url, download_headers, cookies, | ||
| peer_regions=peer_regions, | ||
| ) | ||
| source_prefix = { | ||
| "local": "\u26a1 ", "peer": "\u21c4 ", "default": " ", | ||
| }[source] | ||
| except Exception as exc: | ||
| logger.warning("Failed to resolve inter-region acceleration: %s. Falling back to default.", exc) | ||
|
|
||
|
Comment on lines
+862
to
+883
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Performance Bottleneck: Missing Cache for Resolved RegionsWhen downloading a repository with many files (e.g., via Since all files in a repository are stored in the same OSS bucket and thus share the same optimal region, probing independently for each file is extremely redundant. It adds massive latency to the download start time of each file and can easily trigger rate limits or 5xx errors on the ModelScope server. SolutionCache the resolved region per # Inter-region acceleration: probe peer regions for OSS internal URL
# Progress bar prefix: "⚡ " = local OSS, "⇄ " = peer OSS, " " = CDN, "" = not configured
source_prefix = ""
peer_regions = self._get_inter_cloud_regions()
if peer_regions:
if not hasattr(self, "_resolved_repo_regions"):
with _RESOLVED_REGIONS_LOCK:
if not hasattr(self, "_resolved_repo_regions"):
self._resolved_repo_regions = {}
cache_key = (repo_id, repo_type)
with _RESOLVED_REGIONS_LOCK:
in_cache = cache_key in self._resolved_repo_regions
if in_cache:
cached_region, source = self._resolved_repo_regions[cache_key]
if in_cache:
if cached_region is not None:
download_headers["x-aliyun-region-id"] = cached_region
else:
download_headers.pop("x-aliyun-region-id", None)
source_prefix = {
"local": "⚡ ", "peer": "⇄ ", "default": " ",
}[source]
else:
try:
probe_url = self._client.get_download_url(
repo_id, repo_type, file_path, revision,
)
cookies = None
if self._client.token:
cookies = {"m_session_id": self._client.token}
download_headers, source = self._resolve_inter_region_headers(
probe_url, download_headers, cookies,
peer_regions=peer_regions,
)
resolved_region = download_headers.get("x-aliyun-region-id")
with _RESOLVED_REGIONS_LOCK:
self._resolved_repo_regions[cache_key] = (resolved_region, source)
source_prefix = {
"local": "⚡ ", "peer": "⇄ ", "default": " ",
}[source]
except Exception as exc:
logger.warning("Failed to resolve inter-region acceleration: %s. Falling back to default.", exc) |
||
| if use_parallel: | ||
| url = self._client.get_download_url( | ||
| repo_id, repo_type, file_path, revision, | ||
|
|
@@ -842,7 +959,7 @@ def _download_with_resume( | |
| initial=existing_size, | ||
| unit="B", | ||
| unit_scale=True, | ||
| desc=Path(file_path).name, | ||
| desc=f"{source_prefix}{Path(file_path).name}", | ||
| leave=False, | ||
| ) as pbar: | ||
| with open(tmp_path, mode) as fh: | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Import
threadingand define a global lock to protect the thread-safe caching of resolved repository regions.