本节内容
- RabbitMQ——消息队列
- Memcached & Redis使用
1、RabbitMQ——消息队列
RabbitMQ与Queue的关系
1、做的事情是一样的,两者都是队列。
2、Queue分两种,线程Q和进程Q,线程Q是用于同一进程下的多线程交互的,进程Q只能用于父进程与子进程之间的交互,或者同属于同一父进程下的多个子进程进行交互。
3、当遇到不同语言的进程交互或同是python下但不相关的进程之间交互时,python自带的Queue就不能使用了,此时需要一个中间代理,即RabbitMQ。
windows下两个独立的进程之间通讯方式:
1、走硬盘,json,pickle
2、走网卡,socket
3、走broker,比如RabbitMQ,每个软件连接RabbitMQ,也是使用的封装好的socket
RabbitMQ这类的消息队列broker,不仅可以为2个独立进程服务,也可以为任意的,任意数量的进程服务。
安装 http://www.rabbitmq.com/install-standalone-mac.html
安装python rabbitMQ module
1 2 3 4 5 6 7 | pip install pika or easy_install pika or 源码 https: / / pypi.python.org / pypi / pika |
实现最简单的队列通信
send端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | #!/usr/bin/env python import pika connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost' )) #这里相当于建立一个最基本的socket channel = connection.channel() #声明一个管道 #声明queue channel.queue_declare(queue = 'hello' ) #在管道中,再声明一个队列 #n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange. channel.basic_publish(exchange = '', routing_key = 'hello' , #routing_key要被传入队列名称 body = 'Hello World!' ) #body是要发送的消息内容 print ( " [x] Sent 'Hello World!'" ) connection.close() #关闭连接 |
receive端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 | #_*_coding:utf-8_*_ __author__ = 'Alex Li' import pika connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost' )) #建立连接 channel = connection.channel() #建立管道 #You may ask why we declare the queue again ‒ we have already declared it in our previous code. # We could avoid that if we were sure that the queue already exists. For example if send.py program #was run before. But we're not yet sure which program to run first. In such cases it's a good # practice to repeat declaring the queue in both programs. channel.queue_declare(queue = 'hello' ) #声明队列,声明从哪个队列里收消息 def callback(ch, method, properties, body): #这是callback函数的标准格式。 print ( " [x] Received %r" % body) channel.basic_consume(callback, #如果收到消息,就调用callback函数来处理消息 queue = 'hello' , #这是要从哪个队列里接受消息。 no_ack = True ) #这句的语法就是声明了要消费消息 print ( ' [*] Waiting for messages. To exit press CTRL+C' ) channel.start_consuming() #这里是正式接收消费消息。启动后,一直运行,没有消息就阻塞。 |
channel.queue_declare(queue
=
'hello'
) #声明队列,声明从哪个队列里收消息
在我们确定‘hello’这个队列确实存在的情况下,这句语法可以不写,因为在生产者代码中已经声明过了。此处写的原因是,我们不确定是生产者还是消费者先运行。
def
callback(ch, method, properties, body): #这是callback函数的标准格式。
默认ch是管道的内存对象;method包含了把信息发给哪个queue,使用哪种分发模式;
channel.basic_consume(callback,
queue
=
'hello'
,
no_ack
=
True
),这里no_ack=no_acknowledgement,意思是客户端无论处理完了还是没处理完,都不用给服务端发确认消息。这个no_ack默认值为False。这种情况下,RabbitMQ会自动检测socket连接,只要正常处理消息的客户端宕机,socket连接也会断开。此时RMQ会把刚刚正在处理的消息发给下一个客户端,重新处理。这种机制可以保证消息一定被处理掉。
Work Queues
在这种模式下,RabbitMQ会默认把p发的消息依次分发给各个消费者(c),跟负载均衡差不多
消息提供者代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost' )) channel = connection.channel() # 声明queue channel.queue_declare(queue = 'task_queue' ) # n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange. import sys message = ' ' .join(sys.argv[ 1 :]) or "Hello World! %s" % time.time() channel.basic_publish(exchange = '', routing_key = 'task_queue' , body = message, properties = pika.BasicProperties( delivery_mode = 2 , # make message persistent ) ) print ( " [x] Sent %r" % message) connection.close() |
消费者代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | #_*_coding:utf-8_*_ import pika, time connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost' )) channel = connection.channel() def callback(ch, method, properties, body): print ( " [x] Received %r" % body) time.sleep( 20 ) print ( " [x] Done" ) print ( "method.delivery_tag" ,method.delivery_tag) ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_consume(callback, queue = 'task_queue' , no_ack = True ) print ( ' [*] Waiting for messages. To exit press CTRL+C' ) channel.start_consuming() |
此时,先启动消息生产者,然后再分别启动3个消费者,通过生产者多发送几条消息,就会发现,这几条消息会被依次分配到各个消费者身上
消息公平分发
如果Rabbit只管按顺序把消息发到各个消费者身上,不考虑消费者负载的话,很可能出现,一个机器配置不高的消费者那里堆积了很多消息处理不完,同时配置高的消费者却一直很轻松。为解决此问题,可以在各个消费者端,配置perfetch=1,意思就是告诉RabbitMQ在我这个消费者当前消息还没处理完的时候就不要再给我发新消息了。
1 | channel.basic_qos(prefetch_count = 1 ) |
带消息持久化+公平分发的完整代码
生产者端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | #!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host = 'localhost' )) channel = connection.channel() channel.queue_declare(queue = 'task_queue' , durable = True ) message = ' ' .join(sys.argv[ 1 :]) or "Hello World!" channel.basic_publish(exchange = '', routing_key = 'task_queue' , body = message, properties = pika.BasicProperties( delivery_mode = 2 , # make message persistent )) print ( " [x] Sent %r" % message) connection.close() |
消费者端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 | #!/usr/bin/env python import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters( host = 'localhost' )) channel = connection.channel() channel.queue_declare(queue = 'task_queue' , durable = True ) print ( ' [*] Waiting for messages. To exit press CTRL+C' ) def callback(ch, method, properties, body): print ( " [x] Received %r" % body) time.sleep(body.count(b '.' )) print ( " [x] Done" ) ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count = 1 ) channel.basic_consume(callback, queue = 'task_queue' ) channel.start_consuming() |
Publish\Subscribe(消息发布\订阅)
之前的例子都基本是1对1的消息发送和接收,即消息只能发送到指定的queue里,但有些时候你想让你的消息被所有的Queue收到,类似广播的效果,这时候就要用到exchange了,
Exchange在定义的时候是有类型的,以决定到底是哪些Queue符合条件,可以接收消息
fanout: 所有bind到此exchange的queue都可以接收消息
direct: 通过routingKey和exchange决定的那个唯一的queue可以接收消息topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息 表达式符号说明:#代表一个或多个字符,*代表任何字符 例:#.a会匹配a.a,aa.a,aaa.a等 *.a会匹配a.a,b.a,c.a等 注:使用RoutingKey为#,Exchange Type为topic的时候相当于使用fanoutheaders: 通过headers 来决定把消息发给哪些queue
消息publisher
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host = 'localhost' )) channel = connection.channel() channel.exchange_declare(exchange = 'logs' , type = 'fanout' ) message = ' ' .join(sys.argv[ 1 :]) or "info: Hello World!" channel.basic_publish(exchange = 'logs' , routing_key = '', body = message) print ( " [x] Sent %r" % message) connection.close() |
消息subscriber
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | #_*_coding:utf-8_*_ __author__ = 'Alex Li' import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host = 'localhost' )) channel = connection.channel() channel.exchange_declare(exchange = 'logs' , type = 'fanout' ) result = channel.queue_declare(exclusive = True ) #不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除 queue_name = result.method.queue channel.queue_bind(exchange = 'logs' , queue = queue_name) print ( ' [*] Waiting for logs. To exit press CTRL+C' ) def callback(ch, method, properties, body): print ( " [x] %r" % body) channel.basic_consume(callback, queue = queue_name, no_ack = True ) channel.start_consuming() |
有选择的接收消息(exchange type=direct)
RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列。
publisher
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host = 'localhost' )) channel = connection.channel() channel.exchange_declare(exchange = 'direct_logs' , type = 'direct' ) severity = sys.argv[ 1 ] if len (sys.argv) > 1 else 'info' message = ' ' .join(sys.argv[ 2 :]) or 'Hello World!' channel.basic_publish(exchange = 'direct_logs' , routing_key = severity, body = message) print ( " [x] Sent %r:%r" % (severity, message)) connection.close() |
subscriber
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 | import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host = 'localhost' )) channel = connection.channel() channel.exchange_declare(exchange = 'direct_logs' , type = 'direct' ) result = channel.queue_declare(exclusive = True ) queue_name = result.method.queue severities = sys.argv[ 1 :] if not severities: sys.stderr.write( "Usage: %s [info] [warning] [error]\n" % sys.argv[ 0 ]) sys.exit( 1 ) for severity in severities: channel.queue_bind(exchange = 'direct_logs' , queue = queue_name, routing_key = severity) print ( ' [*] Waiting for logs. To exit press CTRL+C' ) def callback(ch, method, properties, body): print ( " [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue = queue_name, no_ack = True ) channel.start_consuming() |
2、Memcached & Redis使用
Memcached
http://www.cnblogs.com/wupeiqi/articles/5132791.html
它相对简单,具体使用方法参见上述文档,这里不赘述了。
Redis
http://www.cnblogs.com/alex3714/articles/6217453.html
介绍
NoSQL(NoSQL = Not Only SQL ),意即“不仅仅是SQL”,泛指非关系型的数据库。
redis是业界主流的key-value nosql 数据库之一。和Memcached类似,它支持存储的value类型相对更多,包括string(字符串)、list()、set(集合)、zset(sorted set --有序集合)和hash(哈希类型)。这些都支持push/pop、add/remove及取交集并集和差集及更丰富的操作,而且这些操作都是原子性的。在此基础上,redis支持各种不同方式的排序。与memcached一样,为了保证效率,数据都是缓存在内存中。区别的是redis会周期性的把更新的数据写入磁盘或者把修改操作写入追加的记录文件,并且在此基础上实现了master-slave(主从)同步。
Redis优点
-
异常快速 : Redis是非常快的,每秒可以执行大约110000设置操作,81000个/每秒的读取操作。
-
支持丰富的数据类型 : Redis支持最大多数开发人员已经知道如列表,集合,可排序集合,哈希等数据类型。
这使得在应用中很容易解决的各种问题,因为我们知道哪些问题处理使用哪种数据类型更好解决。 -
操作都是原子的 : 所有 Redis 的操作都是原子,从而确保当两个客户同时访问 Redis 服务器得到的是更新后的值(最新值)。
- MultiUtility工具:Redis是一个多功能实用工具,可以在很多如:缓存,消息传递队列中使用(Redis原生支持发布/订阅),在应用程序中,如:Web应用程序会话,网站页面点击数等任何短暂的数据;
Redis API使用
redis-py 的API的使用可以分类为:
- 连接方式
- 连接池
- 操作
- String 操作
- Hash 操作
- List 操作
- Set 操作
- Sort Set 操作
- 管道
- 发布订阅
连接方式
1、操作模式
redis-py提供两个类Redis和StrictRedis用于实现Redis的命令,StrictRedis用于实现大部分官方的命令,并使用官方的语法和命令,Redis是StrictRedis的子类,用于向后兼容旧版本的redis-py。
1 2 3 4 5 | import redis r = redis.Redis(host = '10.211.55.4' , port = 6379 ) r. set ( 'foo' , 'Bar' ) print r.get( 'foo' ) |
2、连接池
redis-py使用connection pool来管理对一个redis server的所有连接,避免每次建立、释放连接的开销。默认,每个Redis实例都会维护一个自己的连接池。可以直接建立一个连接池,然后作为参数Redis,这样就可以实现多个Redis实例共享一个连接池。