在处理并发请求时,一个常见的挑战来自于外部服务所设置的速率限制。以 Google Maps API 为例,在本书编写期间,其对未认证用户的免费调用额度为每秒最多 10 次请求,每日上限为 2,500 次。当我们使用多线程机制进行高频访问时,很容易触达这一限制。
更复杂的是,当前的代码结构尚未涵盖异常情况的处理逻辑,而在多线程环境下捕获和响应异常本身就比单线程更加困难。当客户端超出 Google 的调用频率限制时,api.geocode() 方法会抛出异常——这一点是积极的信号,说明系统能够反馈问题。[此处为图片1]
然而,该异常仅在工作线程内部触发,并不会直接导致整个程序终止。尽管出错的工作线程会立即退出,但主线程仍会通过 work_queue.join() 持续等待队列中所有任务完成。这意味着如果不对异常妥善处理,可能出现部分线程崩溃而主程序无法正常退出的情况,造成死锁或挂起。
为了避免此类问题,我们需要对代码做出适当调整,使其具备更强的容错能力。具体做法是:当工作线程捕获到异常时,将异常对象放入 results_queue 中,并像正常完成一样调用 task_done() 标记当前任务结束。这样可以确保主线程不会因等待未完成的任务而永久阻塞。随后,主线程可以从结果队列中读取数据,检查是否存在异常实例,并在必要时重新抛出这些异常,以便进一步处理或记录。
以下是改进后的 worker() 和 main() 函数实现:
def worker(work_queue, results_queue):
while True:
try:
item = work_queue.get(block=False)
except Empty:
break
else:
try:
result = fetch_place(item)
except Exception as err:
results_queue.put(err)
else:
results_queue.put(result)
finally:
work_queue.task_done()
def main():
work_queue = Queue()
results_queue = Queue()
for place in PLACES:
work_queue.put(place)
threads = [
Thread(target=worker, args=(work_queue, results_queue))
for _ in range(THREAD_POOL_SIZE)
]
for thread in threads:
thread.start()
work_queue.join()
while threads:
threads.pop().join()
while not results_queue.empty():
result = results_queue.get()
if isinstance(result, Exception):
raise result
present_result(result)
现在我们的程序已经具备基本的异常防护机制,接下来可以主动测试极限场景,例如人为突破速率限制来验证系统的健壮性。我们可以通过调整初始参数来快速达到此目的:增加待查询地点的数量以及扩大线程池规模,如下所示:
PLACES = (
'Reykjavik', 'Vienna', 'Zadar', 'Venice',
'Wrocaw', 'Bologna', 'Berlin', 'Subice',
'New York', 'Delhi'
) * 10
THREAD_POOL_SIZE = 10
如果你的运行环境执行速度较快,很快就会看到类似以下的输出:
$ python3 threadpool_with_errors.py
New York, NY, USA, 40.71, -74.01
Berlin, Germany, 52.52, 13.40
Wrocaw, Poland, 51.11, 17.04
Zadar, Croatia, 44.12, 15.23
Vienna, Austria, 48.21, 16.37
Bologna, Italy, 44.49, 11.34
Reykjavík, Iceland, 64.13, -21.82
Venice, Italy, 45.44, 12.32
Delhi, Gujarat, India, 21.57, 73.22
Slubice, Poland, 52.35, 14.56
Vienna, Austria, 48.21, 16.37
Zadar, Croatia, 44.12, 15.23
Venice, Italy, 45.44, 12.32
Reykjavík, Iceland, 64.13, -21.82
Traceback (most recent call last):
上述输出表明,前几项请求成功返回,但在达到服务端限制后开始出现异常,最终由主线程捕获并中断流程,从而验证了异常处理机制的有效性。
from threading import Lock
import time
class Throttle:
"""
实现令牌桶算法的节流类,用于控制请求速率。
"""
def __init__(self, rate):
self._consume_lock = Lock()
self.rate = rate # 每秒允许的请求数(即最大令牌数)
self.tokens = 0 # 当前可用令牌数量
self.last = 0 # 上次操作的时间戳
def consume(self, amount=1):
"""
请求使用指定数量的令牌。如果当前令牌不足,则返回0。
"""
with self._consume_lock:
now = time.time()
# 初始化首次调用的时间戳,防止初始突发请求
if self.last == 0:
self.last = now
# 计算自上次填充以来经过的时间
elapsed = now - self.last
# 根据经过的时间按速率补充令牌(仅当有足够时间产生新令牌时)
if int(elapsed * self.rate) > 0:
self.tokens += int(elapsed * self.rate)
self.last = now # 更新最后填充时间
# 确保令牌数不超过设定的速率上限(避免桶溢出)
if self.tokens > self.rate:
self.tokens = self.rate
# 检查是否有足够的令牌满足本次请求
if self.tokens >= amount:
self.tokens -= amount
return amount
else:
return 0 # 令牌不足,拒绝请求
程序运行过程中出现的异常并非由代码逻辑错误导致,而是由于对API接口的请求频率超出了服务限制所引发。具体堆栈信息如下:
File “threadpool_with_errors.py”, line 83, in
main()File “threadpool_with_errors.py”, line 76, in main
raise resultFile “threadpool_with_errors.py”, line 43, in worker
result = fetch_place(item)File “threadpool_with_errors.py”, line 23, in fetch_place
return api.geocode(place)[0]File “…\site-packages\gmaps\geocoding.py”, line 37, in geocode
return self._make_request(self.GEOCODE_URL, parameters, “results”)File “…\site-packages\gmaps\client.py”, line 89, in _make_request
)(response)gmaps.errors.RateLimitExceeded: {‘status’: ‘OVER_QUERY_LIMIT’, ‘results’: [],
‘error_message’: ‘You have exceeded your rate-limit for this API.’, ‘url’:
‘https://maps.googleapis.com/maps/api/geocode/json?address=Wroc%C5%82aw&sens
or=false’}
该错误表明已超过Google Maps Geocoding API的查询配额限制。虽然此服务免费提供,但其对单位时间内的请求数量设有严格限制。当前程序并发请求过多且速度过快,导致触发限流机制。
为了使程序能够稳定运行,必须引入速率控制机制,以降低请求频率。这种控制工作执行速度的技术通常被称为“节流”(Throttling)。尽管PyPI上存在多个可用于实现速率限制的第三方库,但本例中将不依赖外部包,而是通过线程安全的方式自行实现,借此介绍多线程环境下的锁机制应用。
节流算法设计:令牌桶(Token Bucket)
本文采用一种经典的节流算法——令牌桶算法,其实现原理简洁高效:
- 系统维护一个虚拟的“桶”,其中存放有限数量的“令牌”。
- 每个令牌代表一次执行任务的权限。
- 每当工作线程需要执行任务时,需先向桶申请相应数量的令牌:
- 首先计算自上次填充桶以来经过的时间;
- 根据预设速率和时间差,向桶中添加对应数量的新令牌;
- 若桶中现有令牌数大于或等于请求的数量,则扣除相应令牌并允许执行;
- 否则,拒绝请求,返回零。
关键注意事项包括:令牌桶初始状态应为零令牌;任何时候都不得让桶中令牌数超过设定的最大值(即每秒允许的请求数)。如果不加以约束,可能导致短时间内释放过多令牌,从而突破速率限制。
鉴于速率限制通常以“每秒请求数”为单位,我们可直接以秒为时间基准进行计量,无需处理复杂的时间单位转换。因此,桶的最大容量即为设定的rate值,确保不会存储超出该时间段内允许的请求数量。
使用方式说明
在实际使用中,建议在主线程中创建一个Throttle实例(例如:Throttle(10) 表示每秒最多10个请求),并将该实例作为参数传递给各个工作线程。由于类内部使用了threading.Lock来保护共享状态,因此在多线程环境下访问是线程安全的。
[此处为图片1]
通过将throttle.consume()调用置于每次请求之前,可以有效控制请求频率,避免因超出API限制而导致服务中断。例如,在调用fetch_place前加入throttle.consume(1),即可实现精准的速率调控。
def worker(work_queue, results_queue, throttle):
while True:
try:
item = work_queue.get(block=False)
except Empty:
break
else:
# 等待令牌释放,确保请求频率受控
while not throttle.consume():
pass
try:
result = fetch_place(item)
except Exception as err:
results_queue.put(err)
else:
results_queue.put(result)
finally:
work_queue.task_done()
现在我们可以对 worker() 函数的实现进行更新,使其在处理每个任务项时,先等待 throttle 组件释放一个可用令牌后再继续执行。这种机制有效控制了操作的频率,避免过于频繁的请求。
该函数持续从工作队列中获取任务,若队列为空则退出循环。每当取得一个项目后,会阻塞式地等待 throttle 消耗一个令牌,以符合速率限制要求。随后尝试调用 fetch_place() 来获取对应位置的数据,并将结果或可能发生的异常放入结果队列中。无论成功或失败,最后都会标记该任务已完成,以保证队列的任务跟踪机制正常运作。
[此处为图片1]

雷达卡


京公网安备 11010802022788号







