WheatField
WheatField

一个消息队列连接泄漏问题浅析

April 25, 20251055 words, 6 min read
Authors

在一个小项目中,笔者用到了 RabbitMQ 进行任务的异步处理。 RabbitMQ 服务搭建在一个 VPS 服务上,客户端使用了 Python 的 aio_pika 库进行消息的发送和接收。

但用着用着,发现 rabbitmq 服务连接数量越来越多,有时候甚至达到了 200 多个,但实际上这个项目中只有两个消费任务, 6 个消费者在消费。

一开始我还以为是 aio_pika 库的问题,心想这也太拉胯了,开源果然是最贵的,但研究了一下发现是我使用姿势不对。

项目中的使用方式

一开始的 publisher 及 consumer 实现如下:

class ConnectionConfig(BaseModel):
    pass

class BaseProducer:
    def __init__(self, config: ConnectionConfig):
        self.config = config
        self.connection: typing.Optional[aio_pika.Connection] = None
        self.channel: typing.Optional[aio_pika.Channel] = None
        self.queue: typing.Optional[aio_pika.Queue] = None
        self._is_connected: bool = False

    async def connect(self) -> None:
        try:
            if not self._is_connected:
                self.connection = await aio_pika.connect_robust(
                    self.config.url,
                    timeout=self.config.connection_timeout
                )
                self.channel = await self.connection.channel()
                self.queue = await self.channel.declare_queue(self.config.queue_name)
                self._is_connected = True
                logger.info(f"Connected to queue: {self.config.queue_name}")
        except Exception as e:
            self._is_connected = False
            logger.error(f"Connection error: {str(e)}")

class BaseConsumer:
    def __init__(self, config: ConnectionConfig):
        self.config = config
        self.connection: typing.Optional[aio_pika.Connection] = None
        self.channel: typing.Optional[aio_pika.Channel] = None
        self.queue: typing.Optional[aio_pika.Queue] = None
        self._is_connected: bool = False

    async def connect(self) -> None:
        try:
            if not self._is_connected:
                self.connection = await aio_pika.connect_robust(
                    self.config.url,
                    timeout=self.config.connection_timeout
                )
                self.channel = await self.connection.channel()
                await self.channel.set_qos(prefetch_count=self.config.prefetch_count)
                self.queue = await self.channel.declare_queue(self.config.queue_name)
                self._is_connected = True
                logger.info(f"Connected to queue: {self.config.queue_name}")
        except Exception as e:
            self._is_connected = False
            logger.error(f"Connection error: {str(e)}")

    async def process_queue(self) -> typing.AsyncGenerator[bytes, None]:
        try:
            await self.connect()
            async with self.queue.iterator() as queue_iter:
                async for message in queue_iter:
                    async with message.process() as processed_message:
                        yield processed_message.body
        except Exception as e:
            self._is_connected = False
            logger.error(f"Connection error: {str(e)}")

这里就有一个很明显的但笔者一开始没有意识到的问题,那就是 connectionchannelException 里没有主动关闭。 那在消费异常时,consumer 会不断创建新的连接,而旧的连接仍然保留着,从而导致连接数越来越多。

Q: 那为什么连接(connection)不自己关闭呢?

A: 一句话来说,应用层的异常不会影响连接层的正常运行,二者是解耦的,这是设计中的基操。

从资源管理的角度上讲,在很多编程实践中,特别是涉及网络、文件等外部资源时,资源的释放(关闭)通常需要显式地进行管理。 原因有以下几点:

  • 连接状态的复杂性:网络连接有多种状态(如 ESTABLISHED、CLOSE_WAIT 等),系统无法自动判断 app 何时真正"完成"了连接交互。
  • 资源管理的可预测性:显式关闭让开发者可以在最合适的时机释放资源,避免资源泄漏或过早释放。
  • 连接复用需求:在高性能应用中,连接通常被重复使用(如连接池),而不是每次通信后就关闭。
  • 错误处理的需要:允许开发者处理关闭过程中可能出现的错误,如文件锁或租约问题。

所以即使在消费时出现异常,比如拿到的数据不合法,也不会影响连接层的正常运行。也就是说,虽然你的代码崩了,但连接还在。\square

修复

因此正确的做法是,在应用层(消费时)出现问题时,把异常报出来了,然后继续处理下一个消息。

对于连接的创建,要么通过资源管理器(async with)自动关闭连接,要么使用连接池进行连接复用。

第一种适合短期、单次使用的连接需求。比如 publish 消息时,如果频率不是很高,消息发送后,连接就可以关闭了,下次使用时再重新创建,这样可以减少资源占用。

async def publish(self, message_list: list[bytes]) -> None:
    async with self.connection:
        async with self.channel:
            for message in message_list:
                await self.channel.default_exchange.publish(
                    aio_pika.Message(body=message),
                    routing_key=self.config.queue_name
                )

第二种适合适合需要频繁创建连接的高并发应用,可以显著减少连接建立的延迟和资源消耗。同时也可以限制最大连接数,防止资源耗尽。

connection_pool = Pool(get_connection, max_size=2)
channel_pool = Pool(get_channel, max_size=2)

async with connection_pool, channel_pool:
    async for message in self.queue.iterator():
        async with message.process() as processed_message:
            yield processed_message.body

小结

资源管理是一个容易被忽视的问题,特别是在异步编程中。在开发时,还是需要多注意一下的,不然就会像笔者一样,被坑了才发现。

参考

Comments