文章内容
2019/12/9 17:28:56,作 者: 黄兵
RabbitMQ 在python中使用json传输数据

在互联网上,json可以很方便的传输数据,相比于xml语法简单。
在RabbitMQ中传输Json数据示例:
生产者:
# !/usr/bin/env python
# -*- coding: utf-8 -*-
import pika
import json
from config import conn_config
config=conn_config.ConnConfig()
# RabbitMQ认证
credentials = pika.PlainCredentials(config.rabbitmq_user, config.rabbitmq_password)
conn_params = pika.ConnectionParameters(config.rabbitmq_ip, 5672, config.rabbitmq_vhost,
credentials=credentials)
# 建立到代理服务器的连接
conn_broker = pika.BlockingConnection(conn_params)
# 获取信道
channel = conn_broker.channel()
# 声明交换器
channel.exchange_declare(exchange=config.rabbitmq_vhost, exchange_type="direct", passive=False,
durable=True,
auto_delete=False)
msg = {"shopping_carts_id": 1111, "build_status_id": 1, "user_id": 222}
json_msg = json.dumps(msg)
msg_props = pika.BasicProperties()
msg_props.content_type = "application/json"
channel.basic_publish(body=json_msg, exchange=config.rabbitmq_vhost, properties=msg_props, routing_key='hola')首先msg是一个字典:
msg = {"shopping_carts_id": 1111, "build_status_id": 1, "user_id": 222}之后序列化成json数据,在队列中传输:
json_msg = json.dumps(msg)
消费者:
# !/usr/bin/env python
# -*- coding: utf-8 -*-
from config import conn_config
from service_class import save_build_phone_number_service
conn = conn_config.ConnConfig()
channel = conn.conn_rabbitmq()
# 声明交换器
channel.exchange_declare(exchange=conn.rabbitmq_vhost, exchange_type='direct', passive=False, durable=True,
auto_delete=False)
# 声明队列
channel.queue_declare(queue="build_phone_number_queue")
# 通过键“hola”将队列和交换器绑定起来
channel.queue_bind(queue="build_phone_number_queue", exchange=conn.rabbitmq_vhost, routing_key="hola")
# 用户处理传入的消息函数
def msg_consumer(channel, method, header, body):
# 确认消息
channel.basic_ack(delivery_tag=method.delivery_tag)
if body == 'quit':
# 停止消息并退出
channel.basic_cancel(consumer_tag='build_phone_number_consumer')
channel.stop_consuming()
else:
# 保存消息队列数据到数据库,防止号码新建不成功,无据可查
save_consumer = save_build_phone_number_service.SaveBuildPhoneNumber()
save_consumer.save_consumer(body=body)
return
# 订阅消费者
channel.basic_consume(queue='build_phone_number_queue', on_message_callback=msg_consumer,
consumer_tag='build_phone_number_consumer')
# 开始消费
channel.start_consuming()
SaveBuildPhoneNumber.py
# !/usr/bin/env python
# -*- coding: utf-8 -*-
from contextlib import closing
from datetime import datetime
import logging
import json
from config.conn_config import ConnConfig
class SaveBuildPhoneNumber:
"""
保存消息队列传输过来的数据
"""
def __init__(self):
pass
@staticmethod
def save_consumer(body):
body_json = json.loads(body)
shopping_card_id = body_json['shopping_carts_id']
build_status_id = body_json['build_status_id']
user_id = body_json['user_id']
time_create = datetime.now()接收到的数据,首先将已编码的 JSON 字符串解码为 Python 对象:
body_json = json.loads(body)
转换成字典后,获取相关内容:
shopping_card_id = body_json['shopping_carts_id'] build_status_id = body_json['build_status_id'] user_id = body_json['user_id']
之后就可以根据具体业务逻辑编写代码。
参考2代码示例:rabbit-queue-python
参考资料:
1、《RabbitMQ实战 高效部署分布式消息队列》[美] Alvaro Videla , Jason J.W. Williams 著,中国工信出版集团 2015.10
2、Install RabbitMQ and Send JSON data With Python on Ubuntu
3、Using body serialization with JSON
黄兵个人博客原创。
评论列表