文章内容

2021/2/2 17:48:34,作 者: 黄兵

Unexpected connection close detected: StreamLostError

最近使用RabbitMQ消息队列,但是消费者有的时候需要很长时间运行,结果会报如下错误:

ConnectionResetError: [WinError 10054] 远程主机强迫关闭了一个现有的连接。

01-23 09:35 pika.adapters.base_connection ERROR    connection_lost: StreamLostError: ("Stream connection lost: ConnectionResetError(10054, '远程主机强迫关闭了一个现有的连接。', None, 10054, None)",)

01-23 09:35 pika.adapters.blocking_connection ERROR    Unexpected connection close detected: StreamLostError: ("Stream connection lost: ConnectionResetError(10054, '远程主机强迫关闭了一个现有的连接。', None, 10054, None)",)

关于为何出现这个错误,这篇文章(pika.adapters.blocking_connection StreamLostError: ("Stream connection lost: ConnectionResetError(10054)已经详细描述了,在此不再赘述。

下面是我参考上面这篇文章,重新更改了部分代码,解决了这个问题,代码如下:

config.py

# !/usr/bin/env python
# -*- coding: utf-8 -*-

from retrying import retry
import pika

from unit import meta_singleton


class ConnConfig(metaclass=meta_singleton.MetaSingleton):

def __init__(self):
# RabbitMQ Config
self.RABBITMQ_USER_IPCrawler = 'user'
self.RABBITMQ_PASSWORD_IPCrawler = 'pass'
self.RABBITMQ_VHOST_IPCrawler = 'vhost'
self.RABBITMQ_ADDR = '1.1.1.1'
self.RabbitMQ_queue_ip_crawler = 'queue'
self.RabbitMQ_consumer_tag_ip_crawler = 'tag_consumer'

@retry(stop_max_attempt_number=10, wait_fixed=2)
def conn_rabbitmq_ip_crawler(self, vhost):
credentials = pika.PlainCredentials(self.RABBITMQ_USER_IPCrawler, self.RABBITMQ_PASSWORD_IPCrawler)
# 20分钟发送一次心跳包
conn_params = pika.ConnectionParameters(self.RABBITMQ_ADDR, 5672, vhost, credentials=credentials,
heartbeat=20 * 60)
# 建立到代理服务器的连接
conn_broker = pika.BlockingConnection(conn_params)
# 获取信道
channel = conn_broker.channel()

return conn_broker, channel

consumer.py

#!/usr/bin/env python
# -*- coding: UTF-8 -*-

import json
import functools
import threading
from config import rabbitmq_config
from unit import update_ip_location
from config import LoggingConfig


def ack_message(channel, delivery_tag):
"""Note that `channel` must be the same pika channel instance via which
the message being ACKed was retrieved (AMQP protocol constraint).
"""
if channel.is_open:
channel.basic_ack(delivery_tag)
else:
# Channel is already closed, so we can't ACK this message;
# log and/or do something that makes sense for your app in this case.
pass


def do_work(connection, channel, delivery_tag, body):
logger_name = 'process_ip2location'
_logger = LoggingConfig.init_logging(logger_name)
# 用户处理传入的消息函数
thread_id = threading.get_ident()
fmt1 = 'Thread id: {} Delivery tag: {} Message body: {}'
# 记录接收到的消息
_logger.info(fmt1.format(thread_id, delivery_tag, body))
# 保存消息队列数据到数据库,防止号码新建不成功,无据可查
body_json = json.loads(body)
start_ip = body_json['ip_start']
end_ip = body_json['ip_end']
country_code = body_json['country_code']
region = body_json['region']
city = body_json['city']
# 更新数据库操作
init_update_ip_location = update_ip_location.UpdateIPLocation()
init_update_ip_location.process_ip_location(country_code=country_code, start_ip=start_ip, end_ip=end_ip,
region=region, city=city)
cb = functools.partial(ack_message, channel, delivery_tag)
connection.add_callback_threadsafe(cb)


def on_message(channel, method_frame, header_frame, body, args):
(connection, threads) = args
delivery_tag = method_frame.delivery_tag
t = threading.Thread(target=do_work, args=(connection, channel, delivery_tag, body))
t.start()
threads.append(t)


init_rabbitmq = rabbitmq_config.ConnConfig()
RabbitMQ_vhost = init_rabbitmq.RABBITMQ_VHOST_IPCrawler

connection, channel = init_rabbitmq.conn_rabbitmq_ip_crawler(RabbitMQ_vhost)
# 声明交换器
channel.exchange_declare(exchange=RabbitMQ_vhost, exchange_type='direct', passive=False, durable=True,
auto_delete=False)
# 声明队列
channel.queue_declare(queue=init_rabbitmq.RabbitMQ_queue_ip_crawler, auto_delete=False)
# 通过键“hola”将队列和交换器绑定起来
channel.queue_bind(queue=init_rabbitmq.RabbitMQ_queue_ip_crawler, exchange=RabbitMQ_vhost, routing_key="hola")
channel.basic_qos(prefetch_count=1)
threads = []
on_message_callback = functools.partial(on_message, args=(connection, threads))
# 订阅消费者
channel.basic_consume(queue=init_rabbitmq.RabbitMQ_queue_ip_crawler, on_message_callback=on_message_callback,
consumer_tag=init_rabbitmq.RabbitMQ_consumer_tag_ip_crawler)
try:
# 开始消费
channel.start_consuming()
except KeyboardInterrupt:
# 停止消息并退出
channel.stop_consuming()

# Wait for all to complete
for thread in threads:
thread.join()

connection.close()

代码经过长时间运行,未出现错误。


黄兵个人博客原创。

转载请注明出处:黄兵个人博客 - Unexpected connection close detected: StreamLostError

分享到:

发表评论

评论列表