文章内容

2019/12/9 17:28:56,作 者: 黄兵

RabbitMQ 在python中使用json传输数据


在互联网上,json可以很方便的传输数据,相比于xml语法简单。

在RabbitMQ中传输Json数据示例:

生产者:

# !/usr/bin/env python
# -*- coding: utf-8 -*-

import pika
import json

from config import conn_config

config=conn_config.ConnConfig()
# RabbitMQ认证
credentials = pika.PlainCredentials(config.rabbitmq_user, config.rabbitmq_password)
conn_params = pika.ConnectionParameters(config.rabbitmq_ip, 5672, config.rabbitmq_vhost,
                                        credentials=credentials)
# 建立到代理服务器的连接
conn_broker = pika.BlockingConnection(conn_params)
# 获取信道
channel = conn_broker.channel()
# 声明交换器
channel.exchange_declare(exchange=config.rabbitmq_vhost, exchange_type="direct", passive=False,
                         durable=True,
                         auto_delete=False)
msg = {"shopping_carts_id": 1111, "build_status_id": 1, "user_id": 222}
json_msg = json.dumps(msg)
msg_props = pika.BasicProperties()
msg_props.content_type = "application/json"
channel.basic_publish(body=json_msg, exchange=config.rabbitmq_vhost, properties=msg_props, routing_key='hola')

首先msg是一个字典:

msg = {"shopping_carts_id": 1111, "build_status_id": 1, "user_id": 222}

之后序列化成json数据,在队列中传输:

json_msg = json.dumps(msg)


消费者:

# !/usr/bin/env python
# -*- coding: utf-8 -*-

from config import conn_config
from service_class import save_build_phone_number_service

conn = conn_config.ConnConfig()
channel = conn.conn_rabbitmq()
# 声明交换器
channel.exchange_declare(exchange=conn.rabbitmq_vhost, exchange_type='direct', passive=False, durable=True,
                         auto_delete=False)
# 声明队列
channel.queue_declare(queue="build_phone_number_queue")
# 通过键“hola”将队列和交换器绑定起来
channel.queue_bind(queue="build_phone_number_queue", exchange=conn.rabbitmq_vhost, routing_key="hola")


# 用户处理传入的消息函数
def msg_consumer(channel, method, header, body):
    # 确认消息
    channel.basic_ack(delivery_tag=method.delivery_tag)
    if body == 'quit':
        # 停止消息并退出
        channel.basic_cancel(consumer_tag='build_phone_number_consumer')
        channel.stop_consuming()
    else:
        # 保存消息队列数据到数据库,防止号码新建不成功,无据可查
        save_consumer = save_build_phone_number_service.SaveBuildPhoneNumber()
        save_consumer.save_consumer(body=body)
    return


# 订阅消费者
channel.basic_consume(queue='build_phone_number_queue', on_message_callback=msg_consumer,
                      consumer_tag='build_phone_number_consumer')
# 开始消费
channel.start_consuming()

SaveBuildPhoneNumber.py

# !/usr/bin/env python
# -*- coding: utf-8 -*-

from contextlib import closing
from datetime import datetime
import logging
import json

from config.conn_config import ConnConfig


class SaveBuildPhoneNumber:
    """
    保存消息队列传输过来的数据
    """

    def __init__(self):
        pass

    @staticmethod
    def save_consumer(body):
        body_json = json.loads(body)
        shopping_card_id = body_json['shopping_carts_id']
        build_status_id = body_json['build_status_id']
        user_id = body_json['user_id']
        time_create = datetime.now()

接收到的数据,首先将已编码的 JSON 字符串解码为 Python 对象:

body_json = json.loads(body)

转换成字典后,获取相关内容:

shopping_card_id = body_json['shopping_carts_id']
build_status_id = body_json['build_status_id']
user_id = body_json['user_id']

之后就可以根据具体业务逻辑编写代码。

参考2代码示例:rabbit-queue-python


参考资料:

1、《RabbitMQ实战 高效部署分布式消息队列》[美] Alvaro Videla , Jason J.W. Williams 著,中国工信出版集团 2015.10

2、Install RabbitMQ and Send JSON data With Python on Ubuntu

3、Using body serialization with JSON

4、python RabbitMQ队列使用(入门篇)


黄兵个人博客原创。

转载请注明出处:黄兵个人博客 - RabbitMQ 在python中使用json传输数据

分享到:

发表评论

评论列表