气象数据 API 接入工程:限流、缓存、重试与批量取数

做新能源功率预测、电量评估或资源评估,绕不开「从一个气象数据 API 把大量经纬度、长时间窗的数据可靠地拉下来」这件事。单点拉一段数据的代码谁都会写:拼请求体、POST、解析 JSON。但当你要回填三百个场站、十年小时级序列,或者要在每天清晨的预报刷新时刻并发拉一批点,这段看似简单的代码就会暴露出一堆工程问题:被限流打回 429、网络抖动导致整批任务半路夭折、同一个点被不同脚本重复拉了五遍、一条慢请求把整个 pipeline 卡住。这些都不是算法问题,而是数据接入层的工程问题。本文把生产里反复打磨的四套手段——限流、缓存、重试、批量取数——系统讲一遍,并给出可直接复用的代码骨架。
关键要点
- 气象数据 API 接入的瓶颈通常不在算法而在工程:限流、超时、重复取数、批量调度,任何一处没做好都会让数据接入层成为整个功率预测系统的隐形短板。
- 客户端要先做自我限流(令牌桶/漏桶),把请求速率压在配额线以内,而不是等服务端返回 429 再被动退避;429 是兜底,不是常态。
- 气象数据「同一格点同一时间窗的历史值不会变」,天然适合缓存;缓存键要由
dataSourceId + 量化后的经纬度 + 时间窗 + 字段集 + 时区共同决定,并对历史与预报采用不同 TTL。 - 重试只对幂等且可恢复的错误生效(429、5xx、超时、连接重置),必须用指数退避 + 随机抖动避免重试风暴,且要区分「可重试」与「不可重试」(4xx 参数错、鉴权失败直接放弃)。
- 批量取数靠「请求分片 + 有界并发 + 断点续传」三件套:把大任务切成幂等子任务,用有限并发池消费,失败子任务可单独重跑而不影响整体。
一、为什么接入层会成为瓶颈
先把问题摊开。一个典型的新能源数据接入需求长这样:N 个场站点位、每个点要拉 M 个气象字段、时间跨度从几个月到十几年小时级。哪怕单个 API 调用只要一两秒,N×(时间分片数)一乘,请求数轻松上万。这时候三个矛盾同时出现。
第一是速率矛盾。任何对外开放的 API 都有配额(按小时、按天、按并发),你一旦无脑开多线程猛打,要么触发限流被拒,要么把对方服务拖慢连累自己。第二是重复矛盾。多个脚本、多个同事、甚至同一个 pipeline 的不同 stage,常常在重复拉同一段数据——而历史再分析数据「同一格点同一时段」是不变的,这些重复请求纯属浪费配额和时间。第三是可靠性矛盾。请求一多,网络抖动、服务端瞬时 5xx、TCP 连接重置就成了必然事件而非偶然事件,没有重试机制的批量任务几乎一定会半路失败,而失败后从头再来又会放大前两个矛盾。
这三个矛盾分别对应限流、缓存、重试三套手段,再加上把它们串起来的批量调度。下面逐一拆解。
二、限流:先做客户端自我限流
很多人对限流的第一反应是「服务端返回 429 我再退避」。这是被动且低效的——等被拒再退避,意味着每一波请求都要先撞墙再回弹,配额在撞墙的瞬间已经浪费,吞吐曲线锯齿状抖动。正确的做法是客户端主动把发出速率压在配额线以内,让 429 成为极少出现的兜底信号。
经典实现是令牌桶(token bucket):以固定速率往桶里放令牌(比如每秒放 1 个,对应「每秒 1 次」的配额),每发一个请求消耗一个令牌,桶空了就阻塞等待。令牌桶相比固定窗口计数的好处是允许短时突发——桶里攒了几个令牌时可以瞬间打出几个请求,又不会突破长期平均速率。如果你要的是更严格的平滑(不允许突发),则用漏桶(leaky bucket),请求以恒定速率「漏」出去。
一个不依赖第三方库的极简令牌桶,足够应付大多数批量取数场景:
import threading
import time
class TokenBucket:
"""线程安全的令牌桶限流器。
rate: 每秒补充的令牌数(= 允许的平均 QPS)
capacity: 桶容量(= 允许的最大突发量)
"""
def __init__(self, rate: float, capacity: float):
self.rate = rate
self.capacity = capacity
self._tokens = capacity
self._last = time.monotonic()
self._lock = threading.Lock()
def acquire(self, tokens: float = 1.0):
while True:
with self._lock:
now = time.monotonic()
# 按经过的时间补充令牌,但不超过桶容量
self._tokens = min(
self.capacity,
self._tokens + (now - self._last) * self.rate,
)
self._last = now
if self._tokens >= tokens:
self._tokens -= tokens
return
# 令牌不够,算出还要等多久
wait = (tokens - self._tokens) / self.rate
time.sleep(wait)
用的时候在每次发请求前 bucket.acquire() 即可。把 rate 设成略低于你实际拿到的配额(举个假想的数:若某接口给你 60 次/小时,就设成 60/3600 ≈ 0.0166,再留 10–20% 余量),整个 pipeline 的请求就会被自动节流到安全速率,不需要在业务代码里到处 sleep。具体配额以你账号的实际限额为准,这里的数字只是演示折算方法。
一个常被忽略的点:限流要按真正的配额维度来切。如果配额是「每账号每小时」,那么所有线程、所有子任务必须共享同一个令牌桶实例;如果你给每个线程各开一个桶,N 个线程就会打出 N 倍速率,照样被限流。配额是「每 IP」还是「每 Key」也要看清——多机分布式拉取时,每台机器一个本地桶只能保证单机不超速,跨机总速率仍需在调度层统一收口。
三、缓存:吃透气象数据的不变性
气象数据有一个对缓存极其友好的性质:历史再分析数据,同一格点、同一时间窗、同一字段集的返回值是不变的。ERA5 的 2024-06-01 这一格点的 100m 风速,今天查和明年查应该是同一个数(除非数据集重新发布版本)。这意味着历史数据的缓存命中率可以做得非常高,缓存就是省配额、省时间最直接的杠杆。
3.1 缓存键怎么设计
缓存的成败几乎全在键的设计上。气象取数的缓存键必须由所有影响返回值的请求参数共同决定,缺一个都会导致「键碰撞」(不同请求命中同一缓存,拿到错数据)或「永不命中」(相同请求算出不同键,缓存形同虚设)。对运梦气象 API 这类「点 + 时间窗 + 字段」的接口,键至少要包含:
dataSourceId(era5 / nasa / zg1 / ger,不同源同一点的值不同)- 经纬度——必须量化。浮点经纬度
32.032531和32.032530在业务上是同一个点,但作为字符串拼进键就是两个键。统一按数据源原生分辨率取整(如 ERA5 0.25° 就把经纬度对齐到 0.25 的整数倍,或固定保留小数位),是缓存命中的前提。 - 时间窗
stime/etime - 字段集——要排序后再拼,
["tas","rsds"]和["rsds","tas"]应算同一个键 timezone——时区不同,返回的 timeList 和对齐方式都不同
把这些归一化后拼成字符串再做一次哈希(如 SHA-1)当作键,既稳定又不会因为字段顺序、浮点尾数而漏命中。
3.2 历史与预报要用不同 TTL
缓存的过期策略(TTL)要按数据语义分层,绝不能一刀切:
- 历史再分析(era5/nasa/zg1):值基本不变,TTL 可以设很长(数天到数月),甚至对「已经过了数据滞后期、确定不再变」的时间窗做永久缓存。唯一要留意的是 ERA5 约 T-5 天的滞后边界——靠近当前时刻的那几天可能从 ERA5T 临时值被稳定版覆盖,这部分时间窗 TTL 要短一些或干脆不缓存。
- 预报(德国气象局,dataSourceId=
ger,覆盖未来约 7 天):预报每天会刷新,缓存 TTL 要短(按预报刷新周期,通常几小时),过期就重取,否则你会拿着昨天的预报当今天用。
实现上,本地批量任务用一个磁盘 KV(如 diskcache、SQLite,甚至按键名落 parquet 文件)就够;多进程/多机共享则上 Redis。关键不是用什么存储,而是键的归一化和按语义分层的 TTL这两件事做对。
四、重试:指数退避 + 抖动,且只重试该重试的
请求一多,瞬时失败就是必然。但重试是把双刃剑——盲目重试会在服务端已经过载时火上浇油,制造重试风暴。可靠的重试要回答三个问题:哪些错该重试、隔多久重试、最多重试几次。
哪些该重试。 只重试幂等且可恢复的错误:HTTP 429(被限流)、500/502/503/504(服务端瞬时故障)、连接超时、读超时、连接重置。而 400(参数错)、401/403(鉴权失败)、404 这类是确定性失败,重试一万次结果都一样,必须立即放弃并把错误抛出来——气象取数全是 POST,但 downloadSync 这类查询语义上是幂等的(同样的参数查同样的数据),所以网络层重试是安全的。
隔多久。 用指数退避(exponential backoff):第 n 次重试等待约 base × 2ⁿ,让服务端有喘息时间。但纯指数退避有个陷阱——如果一批请求同时失败、同时按相同节奏退避,它们会在同一时刻同时再次涌入,制造新的尖峰。解决办法是叠加随机抖动(jitter),把每个请求的等待时间随机打散。另外,429 响应常带 Retry-After 头,有它就优先听它的。
import random
import time
import requests
RETRYABLE_STATUS = {429, 500, 502, 503, 504}
MAX_RETRIES = 5
BASE_DELAY = 1.0 # 秒
MAX_DELAY = 60.0
def post_with_retry(url, *, headers, json, bucket, timeout=600):
last_exc = None
for attempt in range(MAX_RETRIES + 1):
bucket.acquire() # 每次尝试都过限流器
try:
resp = requests.post(url, headers=headers, json=json, timeout=timeout)
except (requests.Timeout, requests.ConnectionError) as e:
last_exc = e # 网络层错误:可重试
else:
if resp.status_code < 400:
return resp
if resp.status_code not in RETRYABLE_STATUS:
resp.raise_for_status() # 4xx 参数/鉴权错:直接放弃
last_exc = requests.HTTPError(f"HTTP {resp.status_code}")
# 优先听服务端的 Retry-After
ra = resp.headers.get("Retry-After")
if ra and ra.isdigit():
time.sleep(min(int(ra), MAX_DELAY))
continue
if attempt == MAX_RETRIES:
break
# 指数退避 + 全抖动(full jitter)
delay = min(MAX_DELAY, BASE_DELAY * (2 ** attempt))
time.sleep(random.uniform(0, delay))
raise RuntimeError(f"重试 {MAX_RETRIES} 次仍失败") from last_exc
这里有几个生产细节:每次重试前都重新过一遍限流器(重试本身也是请求,不能绕过配额);raise_for_status 只在确认是不可重试的 4xx 时调用,避免把可重试错误当成致命错误;全抖动(uniform(0, delay))比固定退避更能打散尖峰。设置一个总预算上限(最大重试次数 + 单次超时)也很重要,否则一个永远 503 的接口会让单个子任务无限挂起,拖垮整个批次。
五、批量取数:分片 + 有界并发 + 断点续传
前面三件事都备齐了,最后要把它们组织成一个能稳定跑完上万请求的批量管线。核心是三件套。
请求分片(sharding)。 运梦气象 API v1 是单点查询、不支持 bbox,时间窗过大单次请求也会变慢甚至超时。所以把大任务切成幂等的子任务:每个子任务 = 一个点 + 一个时间分片(比如按年或按月切)+ 一组字段。切片的好处是粒度可控、失败可单独重跑、天然适配缓存键。一个经验值是把单次请求的时间窗控制在「几千到几万条记录」量级,既不会因太小而请求数爆炸,也不会因太大而单请求超时。
有界并发(bounded concurrency)。 不要无脑 for 串行(太慢),也不要无脑开几百个线程(限流器会把它们全堵在 acquire 上,徒增内存和上下文切换)。合理做法是用一个有界线程池,并发度设成略高于「限流速率 × 平均单请求耗时」即可——再多的线程也只会排队等令牌。concurrent.futures.ThreadPoolExecutor 配上前面的令牌桶就是标准组合:
import os
import hashlib
import json as _json
from concurrent.futures import ThreadPoolExecutor, as_completed
import diskcache
API = "https://console.yun-meng.top/api/energy-weather/search/weather/action/downloadSync"
API_KEY = os.environ["YUNMENG_API_KEY"] # 控制台创建的 API Key,形如 sk-xxxx
HEADERS = {"Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json"}
bucket = TokenBucket(rate=0.9, capacity=5) # 略低于配额,留余量
cache = diskcache.Cache("./weather_cache") # 本地磁盘缓存
def cache_key(p: dict) -> str:
# 归一化:经纬度量化、字段排序,确保相同语义算出同一键
norm = {
"src": p["dataSourceId"],
"lat": round(p["lat"], 2), # 对齐到 0.01°(按需调)
"lon": round(p["lon"], 2),
"stime": p["stime"], "etime": p["etime"],
"fields": sorted(p["fields"]),
"tz": p["timezone"],
}
raw = _json.dumps(norm, sort_keys=True, ensure_ascii=False)
return hashlib.sha1(raw.encode("utf-8")).hexdigest()
def fetch_one(payload: dict) -> dict:
key = cache_key(payload)
cached = cache.get(key)
if cached is not None: # 命中缓存,不耗配额
return cached
resp = post_with_retry(API, headers=HEADERS, json=payload, bucket=bucket)
result = resp.json()
if not result.get("success"): # 业务层状态校验
raise RuntimeError(result.get("msg", "查询失败"))
data = result["data"]
time_list = data["timeList"]
for f in payload["fields"]: # 数组等长校验,杜绝错位
assert len(data[f]) == len(time_list), f"字段 {f} 与 timeList 长度不一致"
# 历史源缓存久、预报源缓存短
ttl = 3600 if payload["dataSourceId"] == "ger" else 30 * 86400
cache.set(key, data, expire=ttl)
return data
def run_batch(payloads: list[dict], max_workers: int = 8) -> dict:
results, failed = {}, []
with ThreadPoolExecutor(max_workers=max_workers) as pool:
future_map = {pool.submit(fetch_one, p): cache_key(p) for p in payloads}
for fut in as_completed(future_map):
key = future_map[fut]
try:
results[key] = fut.result()
except Exception as e: # 失败子任务记下来,不中断整体
failed.append((key, str(e)))
if failed:
print(f"{len(failed)} 个子任务失败,可单独重跑:{failed[:3]} ...")
return results
断点续传 / 失败隔离。 上面的 run_batch 体现了关键纪律:单个子任务失败不能炸掉整批。把失败的子任务收集起来单独重跑,而不是 raise 让整个 executor 崩溃——上万请求跑到 99% 时因为一个点失败而全部丢弃,是最让人崩溃的事故。再加上缓存层天然就是断点续传:重跑时已成功的子任务直接命中缓存,不会重复消耗配额,只有真正没拿到的才会重新发请求。这让「跑一半挂了,改完重跑」变成零成本操作。
六、把四件事串起来
这四套手段不是孤立的,而是层层嵌套的同心圆:最外层是批量调度(分片 + 有界并发 + 失败隔离),它调用带重试的请求函数,重试函数每次都过限流器,而在发请求之前先查缓存。一个请求的完整生命周期是:分片 → 查缓存(命中即返回)→ 过限流器拿令牌 → 发请求 →(失败则指数退避重试)→ 校验状态与数组等长 → 写缓存 → 返回。把这套骨架沉淀成一个内部的「气象取数客户端」,所有业务脚本都走它,就能彻底告别「这个脚本被限流了」「那批数据拉了一半丢了」「同一段数据被拉了好几遍」这类反复出现的接入层事故。
七、落地:在运梦气象 API 上的实践
上面的代码骨架是直接针对运梦气象 API 的同步取数接口(downloadSync)写的,几个适配点值得说明。其一,配额维度要先看清:免登录试用按「IP + 浏览器标识 + User-Agent」综合识别、额度有限(用于先试后买),注册正式账号后改为按调用扣减账户额度、单个 API Key 有默认并发上限并可按需申请上调——所以令牌桶要全局共享一个实例,rate 按你实际拿到的配额折算并留余量,而不要把代码里的示例数值当成真实限额。其二,它的返回是统一 JSON envelope(外层 code / success / data / msg,data 里是 timeList 加各字段平行数组),所以 fetch_one 里务必先判 success 再校验各字段数组与 timeList 等长,这两道关和缓存、重试是正交的、缺一不可。其三,缓存的 TTL 要按源分层:历史源(era5/nasa/zg1,运梦产品可取 1950 年至今)的值稳定、可长缓存;预报源(ger,德国气象局,未来约 7 天)每日刷新、只能短缓存。把限流、缓存、重试、批量这四层和这三个适配点对齐,一个能稳定回填几百场站、十年小时级序列的气象数据接入层,就不再是难事,而数据接入也不再是功率预测系统里那个最容易出事的隐形短板。
常见问题
客户端限流和服务端 429 退避,到底要不要都做?
都要,但优先级不同。客户端令牌桶是主动把发出速率压在配额线以内,让请求平滑、配额不浪费,这是常态机制;服务端 429 退避是被动兜底——当估算速率不准、或多机汇聚导致瞬时超速时,靠 429 + 指数退避把溢出的请求拦回来。只做后者会导致吞吐锯齿抖动且浪费配额,只做前者则在配额估算偏差时缺少兜底,所以两者配合。
历史气象数据可以永久缓存吗?
绝大部分可以。同一格点、同一历史时间窗、同一字段集的再分析值是不变的,命中后可长期复用。唯一要注意的是靠近当前时刻、还在数据滞后期内的时间窗(如 ERA5 约 T-5 天边界),其值可能从临时版被稳定版覆盖,这部分要短 TTL 或不缓存;预报数据每日刷新,更要短 TTL。
指数退避为什么一定要加随机抖动?
因为一批请求往往同时失败、同时按相同的指数节奏退避,于是会在同一时刻同时再次涌入服务端,制造出新的请求尖峰,反而加重过载。加入随机抖动(如 full jitter,等待时间在 [0, base×2ⁿ] 间随机取)能把这些重试请求在时间上打散,避免重试风暴。
批量取数应该开多少并发?
并发度大约等于「限流速率 × 平均单请求耗时」即可,再多的线程也只会全部堵在限流器的 acquire 上空等,徒增内存和上下文切换开销。比如限流约 1 QPS、单请求平均耗时 5 秒,理论上 5 个左右的在途请求就能把速率吃满,开 8 个留点余量足矣。真正的吞吐上限由配额决定,不由线程数决定。
收尾
气象数据 API 接入的难点从来不在「怎么发一个请求」,而在「怎么稳定地发一万个请求」。限流让你不被配额拒之门外,缓存让你不为同一份数据重复付费,重试让你扛得住必然出现的瞬时故障,批量调度把它们组织成可断点续传、失败隔离的管线。这四件事都不依赖高深算法,但每一件没做好都足以让数据接入层成为整个功率预测系统的隐形短板。把它们沉淀成一个统一的取数客户端,数据这一层就稳了。
参考与延伸阅读
- Amazon Web Services Architecture Blog. Exponential Backoff And Jitter. https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/
- Google Cloud. API rate limiting / Designing for resilience (Cloud Architecture Center). https://cloud.google.com/architecture/api-design
- ECMWF / Copernicus Climate Change Service. ERA5: data documentation (Copernicus Knowledge Base). https://confluence.ecmwf.int/display/CKB/ERA5%3A+data+documentation
- MDN Web Docs. HTTP 429 Too Many Requests / Retry-After. https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/429