楼主: 18615735432
32 0

[其他] Python 高手编程系列五百九十五:处理错误与速率限制 [推广有奖]

  • 0关注
  • 0粉丝

等待验证会员

学前班

40%

还不是VIP/贵宾

-

威望
0
论坛币
0 个
通用积分
0
学术水平
0 点
热心指数
0 点
信用等级
0 点
经验
20 点
帖子
1
精华
0
在线时间
0 小时
注册时间
2018-12-27
最后登录
2018-12-27

楼主
18615735432 发表于 2025-12-3 18:47:43 |AI写论文

+2 论坛币
k人 参与回答

经管之家送您一份

应届毕业生专属福利!

求职就业群
赵安豆老师微信:zhaoandou666

经管之家联合CDA

送您一个全额奖学金名额~ !

感谢您参与论坛问题回答

经管之家送您两个论坛币!

+2 论坛币

在处理并发请求时,一个常见的挑战来自于外部服务所设置的速率限制。以 Google Maps API 为例,在本书编写期间,其对未认证用户的免费调用额度为每秒最多 10 次请求,每日上限为 2,500 次。当我们使用多线程机制进行高频访问时,很容易触达这一限制。

更复杂的是,当前的代码结构尚未涵盖异常情况的处理逻辑,而在多线程环境下捕获和响应异常本身就比单线程更加困难。当客户端超出 Google 的调用频率限制时,api.geocode() 方法会抛出异常——这一点是积极的信号,说明系统能够反馈问题。[此处为图片1]

然而,该异常仅在工作线程内部触发,并不会直接导致整个程序终止。尽管出错的工作线程会立即退出,但主线程仍会通过 work_queue.join() 持续等待队列中所有任务完成。这意味着如果不对异常妥善处理,可能出现部分线程崩溃而主程序无法正常退出的情况,造成死锁或挂起。

为了避免此类问题,我们需要对代码做出适当调整,使其具备更强的容错能力。具体做法是:当工作线程捕获到异常时,将异常对象放入 results_queue 中,并像正常完成一样调用 task_done() 标记当前任务结束。这样可以确保主线程不会因等待未完成的任务而永久阻塞。随后,主线程可以从结果队列中读取数据,检查是否存在异常实例,并在必要时重新抛出这些异常,以便进一步处理或记录。

以下是改进后的 worker()main() 函数实现:

def worker(work_queue, results_queue):
    while True:
        try:
            item = work_queue.get(block=False)
        except Empty:
            break
        else:
            try:
                result = fetch_place(item)
            except Exception as err:
                results_queue.put(err)
            else:
                results_queue.put(result)
        finally:
            work_queue.task_done()

def main():
    work_queue = Queue()
    results_queue = Queue()

    for place in PLACES:
        work_queue.put(place)

    threads = [
        Thread(target=worker, args=(work_queue, results_queue))
        for _ in range(THREAD_POOL_SIZE)
    ]

    for thread in threads:
        thread.start()

    work_queue.join()

    while threads:
        threads.pop().join()

    while not results_queue.empty():
        result = results_queue.get()
        if isinstance(result, Exception):
            raise result
        present_result(result)

现在我们的程序已经具备基本的异常防护机制,接下来可以主动测试极限场景,例如人为突破速率限制来验证系统的健壮性。我们可以通过调整初始参数来快速达到此目的:增加待查询地点的数量以及扩大线程池规模,如下所示:

PLACES = (
    'Reykjavik', 'Vienna', 'Zadar', 'Venice',
    'Wrocaw', 'Bologna', 'Berlin', 'Subice',
    'New York', 'Delhi'
) * 10

THREAD_POOL_SIZE = 10

如果你的运行环境执行速度较快,很快就会看到类似以下的输出:

$ python3 threadpool_with_errors.py
New York, NY, USA, 40.71, -74.01
Berlin, Germany, 52.52, 13.40
Wrocaw, Poland, 51.11, 17.04
Zadar, Croatia, 44.12, 15.23
Vienna, Austria, 48.21, 16.37
Bologna, Italy, 44.49, 11.34
Reykjavík, Iceland, 64.13, -21.82
Venice, Italy, 45.44, 12.32
Delhi, Gujarat, India, 21.57, 73.22
Slubice, Poland, 52.35, 14.56
Vienna, Austria, 48.21, 16.37
Zadar, Croatia, 44.12, 15.23
Venice, Italy, 45.44, 12.32
Reykjavík, Iceland, 64.13, -21.82
Traceback (most recent call last):

上述输出表明,前几项请求成功返回,但在达到服务端限制后开始出现异常,最终由主线程捕获并中断流程,从而验证了异常处理机制的有效性。

from threading import Lock
import time


class Throttle:
    """
    实现令牌桶算法的节流类,用于控制请求速率。
    """
    def __init__(self, rate):
        self._consume_lock = Lock()
        self.rate = rate  # 每秒允许的请求数(即最大令牌数)
        self.tokens = 0   # 当前可用令牌数量
        self.last = 0     # 上次操作的时间戳

    def consume(self, amount=1):
        """
        请求使用指定数量的令牌。如果当前令牌不足,则返回0。
        """
        with self._consume_lock:
            now = time.time()

            # 初始化首次调用的时间戳,防止初始突发请求
            if self.last == 0:
                self.last = now

            # 计算自上次填充以来经过的时间
            elapsed = now - self.last

            # 根据经过的时间按速率补充令牌(仅当有足够时间产生新令牌时)
            if int(elapsed * self.rate) > 0:
                self.tokens += int(elapsed * self.rate)
                self.last = now  # 更新最后填充时间

            # 确保令牌数不超过设定的速率上限(避免桶溢出)
            if self.tokens > self.rate:
                self.tokens = self.rate

            # 检查是否有足够的令牌满足本次请求
            if self.tokens >= amount:
                self.tokens -= amount
                return amount
            else:
                return 0  # 令牌不足,拒绝请求

程序运行过程中出现的异常并非由代码逻辑错误导致,而是由于对API接口的请求频率超出了服务限制所引发。具体堆栈信息如下:

File “threadpool_with_errors.py”, line 83, in
main()

File “threadpool_with_errors.py”, line 76, in main
raise result

File “threadpool_with_errors.py”, line 43, in worker
result = fetch_place(item)

File “threadpool_with_errors.py”, line 23, in fetch_place
return api.geocode(place)[0]

File “…\site-packages\gmaps\geocoding.py”, line 37, in geocode
return self._make_request(self.GEOCODE_URL, parameters, “results”)

File “…\site-packages\gmaps\client.py”, line 89, in _make_request
)(response)

gmaps.errors.RateLimitExceeded: {‘status’: ‘OVER_QUERY_LIMIT’, ‘results’: [],
‘error_message’: ‘You have exceeded your rate-limit for this API.’, ‘url’:
‘https://maps.googleapis.com/maps/api/geocode/json?address=Wroc%C5%82aw&sens
or=false’}

该错误表明已超过Google Maps Geocoding API的查询配额限制。虽然此服务免费提供,但其对单位时间内的请求数量设有严格限制。当前程序并发请求过多且速度过快,导致触发限流机制。

为了使程序能够稳定运行,必须引入速率控制机制,以降低请求频率。这种控制工作执行速度的技术通常被称为“节流”(Throttling)。尽管PyPI上存在多个可用于实现速率限制的第三方库,但本例中将不依赖外部包,而是通过线程安全的方式自行实现,借此介绍多线程环境下的锁机制应用。

节流算法设计:令牌桶(Token Bucket)

本文采用一种经典的节流算法——令牌桶算法,其实现原理简洁高效:

  • 系统维护一个虚拟的“桶”,其中存放有限数量的“令牌”。
  • 每个令牌代表一次执行任务的权限。
  • 每当工作线程需要执行任务时,需先向桶申请相应数量的令牌:
  1. 首先计算自上次填充桶以来经过的时间;
  2. 根据预设速率和时间差,向桶中添加对应数量的新令牌;
  3. 若桶中现有令牌数大于或等于请求的数量,则扣除相应令牌并允许执行;
  4. 否则,拒绝请求,返回零。

关键注意事项包括:令牌桶初始状态应为零令牌;任何时候都不得让桶中令牌数超过设定的最大值(即每秒允许的请求数)。如果不加以约束,可能导致短时间内释放过多令牌,从而突破速率限制。

鉴于速率限制通常以“每秒请求数”为单位,我们可直接以秒为时间基准进行计量,无需处理复杂的时间单位转换。因此,桶的最大容量即为设定的rate值,确保不会存储超出该时间段内允许的请求数量。

使用方式说明

在实际使用中,建议在主线程中创建一个Throttle实例(例如:Throttle(10) 表示每秒最多10个请求),并将该实例作为参数传递给各个工作线程。由于类内部使用了threading.Lock来保护共享状态,因此在多线程环境下访问是线程安全的。

[此处为图片1]

通过将throttle.consume()调用置于每次请求之前,可以有效控制请求频率,避免因超出API限制而导致服务中断。例如,在调用fetch_place前加入throttle.consume(1),即可实现精准的速率调控。

def worker(work_queue, results_queue, throttle):
    while True:
        try:
            item = work_queue.get(block=False)
        except Empty:
            break
        else:
            # 等待令牌释放,确保请求频率受控
            while not throttle.consume():
                pass
            try:
                result = fetch_place(item)
            except Exception as err:
                results_queue.put(err)
            else:
                results_queue.put(result)
            finally:
                work_queue.task_done()

现在我们可以对 worker() 函数的实现进行更新,使其在处理每个任务项时,先等待 throttle 组件释放一个可用令牌后再继续执行。这种机制有效控制了操作的频率,避免过于频繁的请求。

该函数持续从工作队列中获取任务,若队列为空则退出循环。每当取得一个项目后,会阻塞式地等待 throttle 消耗一个令牌,以符合速率限制要求。随后尝试调用 fetch_place() 来获取对应位置的数据,并将结果或可能发生的异常放入结果队列中。无论成功或失败,最后都会标记该任务已完成,以保证队列的任务跟踪机制正常运作。

[此处为图片1]
二维码

扫码加我 拉你入群

请注明:姓名-公司-职位

以便审核进群资格,未注明则拒绝

关键词:python parameters exception Parameter Packages

您需要登录后才可以回帖 登录 | 我要注册

本版微信群
加好友,备注cda
拉您进交流群
GMT+8, 2025-12-5 20:23