RabbitMQ消息队列 - downtiser/python-one GitHub Wiki

之前利用进程Queue实现了python进程间的简单通信,但它只能在python里实现进程间通信,如果要想在不同平台上实现进程通信,就需要一个中间人来帮助传递数据.RabbitMQ消息队列就是用来做一个中间人的,在使用RabbitMQ之前要先安装erlang语言环境,然后安装RabbitMQ服务,在python上使用rabbitmq要安装pika模块

代码实现:发送方:

#Downtiser
# 消息发送方
import pika
credentials = pika.PlainCredentials('downtiser', '密码')  #身份认证信息
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', virtual_host= '/', credentials = credentials))
#端口号默认5672,是rabbitmq服务的端口,此时建立了一个socket
channel = connection.channel() #声明一个管道,数据传输在这个管道内进行

channel.queue_declare(queue='queue1')  #声明一个消息队列,传输具体的数据

channel.basic_publish(exchange='', routing_key='queue1', body='Hello world!')  
#指定了关键字为队列名,将body内的内容发送给队名为queue1的队列
print('传输完成')

connection.close() #消息传输完毕,关闭socket

接收方

#Downtiser
import pika
credentials = pika.PlainCredentials('downtiser', '密码')
conn = pika.BlockingConnection(pika.ConnectionParameters('localhost', virtual_host= '/', credentials = credentials)) #创建连接
channel = conn.channel() #建立管道

channel.queue_declare(queue='queue1')
# 未避免接收方开始运行时发送方还未声明队列,所以在接收方也要声明队列,避免出错

def callback(ch, method, properties, body):
    print('ch>>>%s  method>>>%s  properties>>>%s'%(ch, method, properties))
    # 第一个为生成的管道实例,
    print('收到消息 %s'%body.decode())


channel.basic_consume(consumer_callback = callback, queue = 'queue1', no_ack = True )
# 声明消息接受,如果收到消息,就调用callback函数来处理收到的消息,queue指明从哪个队列中接受消息
print('----开始接受消息-----')
channel.start_consuming() #开始接受消息,能持续接受消息,接收不到就阻塞

另外,如果有多个接收方访问同一个消息队列,RabbitMQ将会采用 轮询 机制,即将队列中的信息取出,依次按顺序平均发送给每个接收方,循环进行

上面的接收方的接受声明中的no_ack = True代表当数据接受完成时,不向RabbitMQ发送确认消息,所以如果接受数据过程突然中断,数据就会丢失。所以应将这个参数去掉,默认为会发送确认消息,这样当有一个接收方中断时,数据会自动转给下一个接收方,如果全部中断,则会将数据存回队列中,所以在callback函数中还应该向发送方发送一个确认信息.修改接收方代码如下:

#Downtiser
import pika, time
credentials = pika.PlainCredentials('downtiser', '密码')
conn = pika.BlockingConnection(pika.ConnectionParameters('localhost', virtual_host= '/', credentials = credentials)) #创建连接
channel = conn.channel() #建立管道

channel.queue_declare(queue='queue1')
# 未避免接收方开始运行时发送方还未声明队列,所以在接收方也要声明队列,避免出错

def callback(ch, method, properties, body):
    print('ch>>>%s  method>>>%s  properties>>>%s'%(ch, method, properties))
    time.sleep(5)
    # 第一个为生成的管道实例,
    print('收到消息 %s'%body.decode())
    ch.basic_ack(delivery_tag=method.delivery_tag) #向发送方发送确认信息


channel.basic_consume(consumer_callback = callback,
                      queue = 'queue1',
                      #no_ack = True
                    )
# 声明消息接受,如果收到消息,就调用callback函数来处理收到的消息,queue指明从哪个队列中接受消息,no_ack 代表任务处理完后是否向RabbitMQ
# 发送确认信息,默认会发送,如果不发生,当接受消息时若中断,消息就会丢失
print('----开始接受消息-----')
channel.start_consuming() #开始接受消息,能持续接受消息,接收不到就阻塞

此时接受端中断数据不会丢失,但如果RabbitMQ服务中断了,队列信息就会全部丢失,要解决这个问题,就需要将消息队列持久化.首先要在声明队列的时候要设置参数durable = Ture,这步仅将队列本身持久化,里面的信息还是会丢失,所以还要将队列里的信息持久化,所以发送方在设置发送属性是要添加参数properties = (pika.BasicProperties(delivery_mod = 2) 使得队列消息持久化. 代码实现如下:

  • 发送方
#Downtiser
# 消息发送方
import pika
credentials = pika.PlainCredentials('downtiser', '密码')
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', virtual_host= '/', credentials = credentials))
channel = connection.channel()

channel.queue_declare(queue = 'queue1', durable=True)  #durable 设置队列持久化

channel.basic_publish(exchange='',
                      routing_Key='queue1',
                      body='Hello world!',
                      properties=(pika.BasicProperties(delivery_mode=2))
                      )  #properties里设置队列消息持久化

print('传输完成')

connection.close()
  • 接收方:
#Downtiser
import pika, time
credentials = pika.PlainCredentials('downtiser', '密码')
conn = pika.BlockingConnection(pika.ConnectionParameters('localhost', virtual_host= '/', credentials = credentials)) #创建连接
channel = conn.channel() #建立管道

channel.queue_declare(queue = 'queue1', durable = True)
# 接收方也要声明队列持久化

def callback(ch, method, properties, body):
    print('ch>>>%s  method>>>%s  properties>>>%s'%(ch, method, properties))
    print('收到消息 %s'%body.decode())
    ch.basic_ack(delivery_tag=method.delivery_tag)


channel.basic_consume(consumer_callback = callback,
                      queue = 'queue1',
                      #no_ack = True
                    )
print('----开始接受消息-----')
channel.start_consuming()

现在消息和队列都不会因为某一方的中断而消失了

在现实生活中,还存在着机器性能参差不齐的情况,所以接受数据的速度也有很多差异,为了能够根据机器性能的不同为接收方分发不同量的消息,就需要进行权重分配,不过在RabbitMQ中只需要在接收方设置一下,让发送方在接受方处理完消息后再对这个接收方发送消息即可,只需在接收方声明接受信息前加上一句channel.basic_qos(prefetch_count = 1)即可。

根据源码解释,basic_qos类方法为连接管道提供了 预取窗口 ,发送方每次都会将消息提前发到这个 预取窗口 ,接受方在完成处理消息后,就会自动到这个窗口中取消息,谁先完成处理,谁就能先取到消息,在双方都完成的情况下,在按顺序分配. 其中设置prefetch_count参数可以设置 预取窗口 的个数

  • 代码示例:
#Downtiser
import pika, time
credentials = pika.PlainCredentials('downtiser', '密码')
conn = pika.BlockingConnection(pika.ConnectionParameters('localhost', virtual_host= '/', credentials = credentials)) #创建连接
channel = conn.channel() 

channel.queue_declare(queue = 'queue1', durable = True)
channel.basic_qos(prefetch_count=1)  #设置预取窗口
def callback(ch, method, properties, body):
    print('ch>>>%s  method>>>%s  properties>>>%s'%(ch, method, properties))
    time.sleep(5)
    print('收到消息 %s'%body.decode())

    ch.basic_ack(delivery_tag=method.delivery_tag)


channel.basic_consume(consumer_callback = callback,
                      queue = 'queue1',
                      #no_ack = True
                    )
print('----开始接受消息-----')
channel.start_consuming()

现在已经实现了对特定的队列发送消息,但如果想要对一批消息队列发送消息,达到广播的效果,就需要用到exchange消息转发器。exchange有以下几种类型:

  • fanout : 所有bind到exchange的队列都能收到消息,跟广播一样
  • direct : 在发送方声明发送消息时参数routingKeyexchange所规定的队列能收到消息
  • topic : 所有符合routingKey表达式的队列能收到消息(类似于正则),#代表一个或多个字符,*代表任何字符,光一个#,效果和 fanout 一样
  • headers : 通过 headers 来决定把消息发给哪些队列

fanout型exchange:

  • 发布方:
#Downtiser
import pika

credentials = pika.PlainCredentials('downtiser', '密码')
conection = pika.BlockingConnection(pika.ConnectionParameters('localhost',virtual_host='/', credentials=credentials))
channel = conection.channel()
channel.exchange_declare(exchange='broadcast', exchange_type='fanout') #声明一个转发器和其类型

channel.basic_publish(exchange='broadcast', routing_key='', body='hello, world!') #指定转发器,因为是广播,所以不需要指定队列

print('完成广播!')
conection.close()
  • 接受方:
#Downtiser
import pika

credential = pika.PlainCredentials('downtiser', '密码')

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost',virtual_host='/',credentials=credential))

channel = connection.channel()
channel.exchange_declare(exchange='broadcast', exchange_type='fanout')
res = channel.queue_declare(exclusive=True)  #随机生成一个和已有队列名不同的队列
queue_name = res.method.queue  #获取队列名
print(queue_name)
channel.queue_bind(exchange='broadcast',queue=queue_name)  #将队列和转发器绑定
def callback(channel, method, properties, body):
    print('----收到消息----')
    print(body.decode())
    channel.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(callback, queue=queue_name, )
print('等待消息中.....')
channel.start_consuming()

值得注意的是,接收方只有在运行状态下才能收到消息,一旦断开,就收不到消息了,类似于收音机

direct型exchange:通过指定routing_Key关键字,对于接收方,将队列和指定的关键字与exchange绑定,在发布方,也指定了关键字,具有关键字的队列才能收到消息,代码实现如下:

  • 发布方:
#Downtiser
import pika, sys
credential = pika.PlainCredentials('downtiser', '密码')
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', virtual_host='/', credentials=credential))
channel = connection.channel()
channel.exchange_declare(exchange='direct_type', exchange_type='direct')
severities = sys.argv[1] if len(sys.argv) > 1  else 'info'  #设置队列关键字,默认为info
#设置接受消息的队列的关键字
message = ''.join(sys.argv[2:]) or 'hello world!'  #如果cmd里写了后续参数就发送后续参数,否则发hello world!
channel.basic_publish(exchange='direct_type',
                      routing_key=severities,   #指定队列关键字
                      body=message
                    )
print('接受完成!')
connection.close()
  • 发布方:
#Downtiser
import pika, sys
credential = pika.PlainCredentials('downtiser', '密码')
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', virtual_host='/', credentials=credential))
channel = connection.channel()
channel.exchange_declare(exchange='direct_type', exchange_type='direct')
res = channel.queue_declare(exclusive=True)
queue_name = res.method.queue
print(queue_name)
severities = sys.argv[1:]  #接受关键字列表
if not severities: #如果用户不输入关键字,就报错并退出
    exit('请输入关键字!')

for severity in severities: #获取关键字并将其和队列与转发器绑定
    channel.queue_bind(exchange='direct_type',
                       queue=queue_name,
                       routing_key=severity
                       )
def callback(channel, method, properties, body):
    print('----收到消息----')
    print(body.decode())
    channel.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(callback,
                      queue=queue_name,
                      )
channel.start_consuming()

topic型exchange:代码跟direct型几乎一样,只是在声明exchange时将类型改为topic.主要区别在接收方可以使用*,#通配符作为关键字绑定exchange匹配消息,如*.info表示匹配以.info结尾的消息,apache.*表示以apache.开头的消息,#表示匹配所有信息.

Remote procedure call(RPC):远程过程调用。有时候发布方向订阅方发送一条消息后还期望订阅方能将处理消息后的结果返回,这时候就需要用到RPC.下面两段代码实现客户端向服务器端发送一个整数,服务器端判断该整数是否为fisher number,并将结果返回给客户端,实现双向通信

  • sever端:
#Downtiser
import pika

credential = pika.PlainCredentials('downtiser', '密码')
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', virtual_host='/', credentials=credential))
channel = connection.channel()
channel.queue_declare('rpc_queue')

def fisher_number(n):
    multipliers = []
    i = 2
    while i <= n / 2:
        if n % i == 0:
            multipliers.append(i)
        i += 1

    multipliers.append(n)
    sum = 1
    for multiplier in multipliers:
        sum *= multiplier

    if sum == n ** 3:
        for i, item in enumerate(multipliers):
            multipliers[i] = str(item)
        res = '%s is a fisher number: '%n+'*'.join(multipliers)+'='+'%s^3' % n
        return res
    else:
        return '%s is not a fisher number'%n

def on_request(channel, method, properties, body):
    print('有客户访问!')
    n = int(body)
    res = fisher_number(n)
    channel.basic_publish(exchange='',
                          routing_key=properties.reply_to,  #接受客户端随请求一同发送过来的接受结果的队列
                          properties=pika.BasicProperties(
                              correlation_id=properties.correlation_id
                              #将客户的请求时发送的唯一id标识符再发回给客户端确认,确保客户端收到的是所需的结果
                          ),
                          body=res
                          )
    channel.basic_ack(delivery_tag=method.delivery_tag)


if __name__ == '__main__' :
    channel.basic_consume(on_request,
                          queue= 'rpc_queue'
                          )
    print('等待客户端请求.....')
    channel.start_consuming()
  • clinet端
#Downtiser
import pika
import uuid
class ClientRPC(object):
    def __init__(self):
        self.__credential = pika.PlainCredentials('downtiser', '密码')
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(
            'localhost',
            virtual_host='/',
            credentials=self.__credential
        )
        )
        self.channel = self.connection.channel()
        self.random_queue = self.channel.queue_declare(exclusive=True)
        self.queue_name = self.random_queue.method.queue
        self.channel.basic_consume(
            self.get_res,
            queue=self.queue_name
        )
    def get_res(self, channel, method, properties, body):
        if properties.correlation_id == self.correlation_id:  #如果服务器端发送的标识符和本地一致,可以确定得到正确的结果
            self.response = body.decode()
    def judge_fisher(self, n):
        self.response = None
        self.correlation_id = str(uuid.uuid4())  #获取标识符
        self.channel.basic_publish(
            exchange='',
            routing_key='rpc_queue',
            properties=pika.BasicProperties(
                reply_to=self.queue_name, #告诉服务端将结果放到哪个队列中
                correlation_id=self.correlation_id  #发送唯一标识符
            ),
            body=str(n)
        )
        print('waiting for result....')
        while self.response == None:
            self.connection.process_data_events() #不阻塞的接受消息
        print('Done!')
        print(self.response)

if __name__ == '__main__':
    rpc_client_obj = ClientRPC()
    while True:
        n = input('please input a number you want to judge>>>')
        rpc_client_obj.judge_fisher(int(n))