文章内容
2019/12/9 17:28:56,作 者: 黄兵
RabbitMQ 在python中使用json传输数据
在互联网上,json可以很方便的传输数据,相比于xml语法简单。
在RabbitMQ中传输Json数据示例:
生产者:
# !/usr/bin/env python # -*- coding: utf-8 -*- import pika import json from config import conn_config config=conn_config.ConnConfig() # RabbitMQ认证 credentials = pika.PlainCredentials(config.rabbitmq_user, config.rabbitmq_password) conn_params = pika.ConnectionParameters(config.rabbitmq_ip, 5672, config.rabbitmq_vhost, credentials=credentials) # 建立到代理服务器的连接 conn_broker = pika.BlockingConnection(conn_params) # 获取信道 channel = conn_broker.channel() # 声明交换器 channel.exchange_declare(exchange=config.rabbitmq_vhost, exchange_type="direct", passive=False, durable=True, auto_delete=False) msg = {"shopping_carts_id": 1111, "build_status_id": 1, "user_id": 222} json_msg = json.dumps(msg) msg_props = pika.BasicProperties() msg_props.content_type = "application/json" channel.basic_publish(body=json_msg, exchange=config.rabbitmq_vhost, properties=msg_props, routing_key='hola')
首先msg是一个字典:
msg = {"shopping_carts_id": 1111, "build_status_id": 1, "user_id": 222}
之后序列化成json数据,在队列中传输:
json_msg = json.dumps(msg)
消费者:
# !/usr/bin/env python
# -*- coding: utf-8 -*-
from config import conn_config
from service_class import save_build_phone_number_service
conn = conn_config.ConnConfig()
channel = conn.conn_rabbitmq()
# 声明交换器
channel.exchange_declare(exchange=conn.rabbitmq_vhost, exchange_type='direct', passive=False, durable=True,
auto_delete=False)
# 声明队列
channel.queue_declare(queue="build_phone_number_queue")
# 通过键“hola”将队列和交换器绑定起来
channel.queue_bind(queue="build_phone_number_queue", exchange=conn.rabbitmq_vhost, routing_key="hola")
# 用户处理传入的消息函数
def msg_consumer(channel, method, header, body):
# 确认消息
channel.basic_ack(delivery_tag=method.delivery_tag)
if body == 'quit':
# 停止消息并退出
channel.basic_cancel(consumer_tag='build_phone_number_consumer')
channel.stop_consuming()
else:
# 保存消息队列数据到数据库,防止号码新建不成功,无据可查
save_consumer = save_build_phone_number_service.SaveBuildPhoneNumber()
save_consumer.save_consumer(body=body)
return
# 订阅消费者
channel.basic_consume(queue='build_phone_number_queue', on_message_callback=msg_consumer,
consumer_tag='build_phone_number_consumer')
# 开始消费
channel.start_consuming()
SaveBuildPhoneNumber.py
# !/usr/bin/env python # -*- coding: utf-8 -*- from contextlib import closing from datetime import datetime import logging import json from config.conn_config import ConnConfig class SaveBuildPhoneNumber: """ 保存消息队列传输过来的数据 """ def __init__(self): pass @staticmethod def save_consumer(body): body_json = json.loads(body) shopping_card_id = body_json['shopping_carts_id'] build_status_id = body_json['build_status_id'] user_id = body_json['user_id'] time_create = datetime.now()
接收到的数据,首先将已编码的 JSON 字符串解码为 Python 对象:
body_json = json.loads(body)
转换成字典后,获取相关内容:
shopping_card_id = body_json['shopping_carts_id'] build_status_id = body_json['build_status_id'] user_id = body_json['user_id']
之后就可以根据具体业务逻辑编写代码。
参考2代码示例:rabbit-queue-python
参考资料:
1、《RabbitMQ实战 高效部署分布式消息队列》[美] Alvaro Videla , Jason J.W. Williams 著,中国工信出版集团 2015.10
2、Install RabbitMQ and Send JSON data With Python on Ubuntu
3、Using body serialization with JSON
黄兵个人博客原创。
评论列表