Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 118 additions & 1 deletion src/modelscope_hub/_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@
import io
import os
import time
import re

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Import threading and define a global lock to protect the thread-safe caching of resolved repository regions.

Suggested change
import re
import re
import threading
_RESOLVED_REGIONS_LOCK = threading.Lock()

import uuid
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from typing import TYPE_CHECKING
from urllib.parse import urlparse

import requests
from tqdm.auto import tqdm
Expand All @@ -44,6 +46,7 @@
ENV_FILE_LOCK,
ENV_INTRA_CLOUD_ACCELERATION,
ENV_INTRA_CLOUD_REGION,
ENV_INTER_CLOUD_REGIONS,
)
from .errors import (
CacheNotFound,
Expand Down Expand Up @@ -368,6 +371,98 @@ def _get(url: str, timeout: float):
self._cached_region = region_id
return region_id

# OSS internal endpoint hostname pattern:
# <prefix>.oss<region>-internal.aliyuncs.com
# e.g. modelhub-cn-hangzhou.oss-cn-hangzhou-internal.aliyuncs.com
_OSS_INTERNAL_RE = re.compile(
r".*\.oss.*-internal\.aliyuncs\.com$"
)

@staticmethod
def _is_oss_internal_url(url: str) -> bool:
"""Check if a URL points to an OSS internal (intranet) endpoint.

Matches hostnames like:
modelhub-cn-hangzhou.oss-cn-hangzhou-internal.aliyuncs.com
"""
try:
host = urlparse(url).hostname or ""
except Exception:
return False
return bool(DownloadManager._OSS_INTERNAL_RE.match(host))

@staticmethod
def _probe_redirect_url(
url: str,
headers: dict[str, str],
cookies: dict | None = None,
timeout: float = 5.0,
) -> str:
"""Send a HEAD request without following redirects to get the 302 Location."""
try:
r = requests.head(
url, headers=headers, cookies=cookies,
allow_redirects=False, timeout=timeout,
)
if r.status_code in (301, 302, 303, 307, 308):
return r.headers.get("Location", "")
except requests.exceptions.RequestException:
pass
return ""

def _get_inter_cloud_regions(self) -> list[str]:
"""Read the inter-cloud peer region list from the environment."""
from .constants import _env
raw = _env(ENV_INTER_CLOUD_REGIONS, "INTER_CLOUD_ACCELERATION_REGIONS") or ""
return [r.strip().lower() for r in raw.split(",") if r.strip()]

def _resolve_inter_region_headers(
self,
url: str,
headers: dict[str, str],
cookies: dict | None = None,
peer_regions: list[str] | None = None,
) -> tuple[dict[str, str], str]:
"""Probe peer regions and return headers with the best region for OSS internal download.

Steps:
1. Probe with current (local) region — if already OSS internal, return as-is.
2. Try each peer region in order (skip duplicates of local) — first OSS internal hit wins.
3. If all miss, fall back to original headers (default path).

Returns:
(headers, source) where source is "local", "peer", or "default".
"""
if peer_regions is None:
peer_regions = self._get_inter_cloud_regions()
if not peer_regions:
return headers, "default"

current_region = headers.get("x-aliyun-region-id", "").lower()

# Step 1: Probe with current (local) region
redirect_url = self._probe_redirect_url(url, headers, cookies)
if self._is_oss_internal_url(redirect_url):
logger.debug("Inter-region: local region already yields OSS internal URL, skipping.")
return headers, "local"

# Step 2: Try each peer region in order
for peer_region in peer_regions:
if peer_region == current_region:
continue
probe_headers = {**headers, "x-aliyun-region-id": peer_region}
redirect_url = self._probe_redirect_url(url, probe_headers, cookies)
if self._is_oss_internal_url(redirect_url):
logger.debug(
'Inter-region acceleration: using peer region "%s" (OSS internal).',
peer_region,
)
return probe_headers, "peer"

# Step 3: All peers missed
logger.debug("Inter-region: no peer region yields OSS internal URL, using default path.")
return headers, "default"

def _build_download_headers(
self,
user_agent: dict | str | None = None,
Expand Down Expand Up @@ -764,6 +859,28 @@ def _download_with_resume(

download_headers = self._build_download_headers(user_agent)

# Inter-region acceleration: probe peer regions for OSS internal URL
# Progress bar prefix: "⚡ " = local OSS, "⇄ " = peer OSS, " " = CDN, "" = not configured
source_prefix = ""
peer_regions = self._get_inter_cloud_regions()
if peer_regions:
try:
probe_url = self._client.get_download_url(
repo_id, repo_type, file_path, revision,
)
cookies = None
if self._client.token:
cookies = {"m_session_id": self._client.token}
download_headers, source = self._resolve_inter_region_headers(
probe_url, download_headers, cookies,
peer_regions=peer_regions,
)
source_prefix = {
"local": "\u26a1 ", "peer": "\u21c4 ", "default": " ",
}[source]
except Exception as exc:
logger.warning("Failed to resolve inter-region acceleration: %s. Falling back to default.", exc)

Comment on lines +862 to +883

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Performance Bottleneck: Missing Cache for Resolved Regions

When downloading a repository with many files (e.g., via download_repo), the files are downloaded in parallel. Currently, the SDK performs sequential HEAD requests (probing) for every single file in the repository.

Since all files in a repository are stored in the same OSS bucket and thus share the same optimal region, probing independently for each file is extremely redundant. It adds massive latency to the download start time of each file and can easily trigger rate limits or 5xx errors on the ModelScope server.

Solution

Cache the resolved region per (repo_id, repo_type) using a thread-safe global lock. This reduces the number of probes from N * (1 + P) to just 1 * (1 + P) (where N is the number of files and P is the number of peer regions).

        # Inter-region acceleration: probe peer regions for OSS internal URL
        # Progress bar prefix: "⚡ " = local OSS, "⇄ " = peer OSS, "  " = CDN, "" = not configured
        source_prefix = ""
        peer_regions = self._get_inter_cloud_regions()
        if peer_regions:
            if not hasattr(self, "_resolved_repo_regions"):
                with _RESOLVED_REGIONS_LOCK:
                    if not hasattr(self, "_resolved_repo_regions"):
                        self._resolved_repo_regions = {}

            cache_key = (repo_id, repo_type)
            with _RESOLVED_REGIONS_LOCK:
                in_cache = cache_key in self._resolved_repo_regions
                if in_cache:
                    cached_region, source = self._resolved_repo_regions[cache_key]

            if in_cache:
                if cached_region is not None:
                    download_headers["x-aliyun-region-id"] = cached_region
                else:
                    download_headers.pop("x-aliyun-region-id", None)
                source_prefix = {
                    "local": "⚡ ", "peer": "⇄ ", "default": "  ",
                }[source]
            else:
                try:
                    probe_url = self._client.get_download_url(
                        repo_id, repo_type, file_path, revision,
                    )
                    cookies = None
                    if self._client.token:
                        cookies = {"m_session_id": self._client.token}
                    download_headers, source = self._resolve_inter_region_headers(
                        probe_url, download_headers, cookies,
                        peer_regions=peer_regions,
                    )
                    resolved_region = download_headers.get("x-aliyun-region-id")
                    with _RESOLVED_REGIONS_LOCK:
                        self._resolved_repo_regions[cache_key] = (resolved_region, source)
                    source_prefix = {
                        "local": "⚡ ", "peer": "⇄ ", "default": "  ",
                    }[source]
                except Exception as exc:
                    logger.warning("Failed to resolve inter-region acceleration: %s. Falling back to default.", exc)

if use_parallel:
url = self._client.get_download_url(
repo_id, repo_type, file_path, revision,
Expand Down Expand Up @@ -842,7 +959,7 @@ def _download_with_resume(
initial=existing_size,
unit="B",
unit_scale=True,
desc=Path(file_path).name,
desc=f"{source_prefix}{Path(file_path).name}",
leave=False,
) as pbar:
with open(tmp_path, mode) as fh:
Expand Down
14 changes: 14 additions & 0 deletions src/modelscope_hub/cli/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from __future__ import annotations

import os
import sys
from argparse import Action
from concurrent.futures import ThreadPoolExecutor, as_completed
Expand Down Expand Up @@ -85,6 +86,15 @@ def register(subparsers: Action) -> None:
help="Glob to exclude (snapshot mode). Repeatable.",
)
p.add_argument("--force", action="store_true", help="Re-download even if cached.")
p.add_argument(
"--inter-regions",
type=str,
default=None,
help="Comma-separated list of peer regions reachable via CEN/VPC "
"peering for cross-region internal acceleration. "
"Overrides env MODELSCOPE_DOWNLOAD_INTER_CLOUD_REGIONS. "
"Example: cn-hangzhou,cn-zhangjiakou,cn-wulanchabu",
)

# Legacy compat (hidden from --help)
add_legacy_download_args(p)
Expand All @@ -95,6 +105,10 @@ def register(subparsers: Action) -> None:
def execute(self) -> None:
normalize_download_args(self.args)

# Apply --inter-regions to env before download
if getattr(self.args, "inter_regions", None) is not None:
os.environ["MODELSCOPE_DOWNLOAD_INTER_CLOUD_REGIONS"] = self.args.inter_regions

# Handle collection download separately
if self.args.repo_type == "collection":
self._download_collection()
Expand Down
5 changes: 5 additions & 0 deletions src/modelscope_hub/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,10 @@ def _env_register(
_env_register(ENV_INTRA_CLOUD_REGION, "(auto)", "Override intra-cloud region ID", "Download",
deprecated_names=("INTRA_CLOUD_ACCELERATION_REGION",))

ENV_INTER_CLOUD_REGIONS: str = "MODELSCOPE_DOWNLOAD_INTER_CLOUD_REGIONS"
_env_register(ENV_INTER_CLOUD_REGIONS, "",
"Comma-separated peer regions for cross-region internal acceleration", "Download")

UPLOAD_LFS_THRESHOLD: int = _env_int("UPLOAD_LFS_THRESHOLD", 5 * 1024 * 1024)
UPLOAD_LFS_ENFORCE_THRESHOLD: int = _env_int("UPLOAD_LFS_ENFORCE_THRESHOLD", 1 * 1024 * 1024)

Expand Down Expand Up @@ -520,6 +524,7 @@ def _env_register(
"ENV_CACHE",
"ENV_INTRA_CLOUD_ACCELERATION",
"ENV_INTRA_CLOUD_REGION",
"ENV_INTER_CLOUD_REGIONS",
"ENV_MODELSCOPE_DOMAIN",
"ENV_PREFER_AI_SITE",
"ENV_REGISTRY",
Expand Down