Follower me on GitHub

3 实际应用

1 zeromq和gevent:

zeromq的介绍请参看:http://www.infoq.com/cn/news/2010/09/introduction-zero-mq

假设你已经安装了zeromq,gevent_zeromq(https://github.com/traviscline/gevent-zeromq.git)和pyzmq

一个很基础的例子:

import gevent
from gevent_zeromq import zmq

# Global Context context = zmq.Context() #它是GreenContext的一个简写,确保greenlet化socket

def server(): server_socket = context.socket(zmq.REQ) #创建一个socket,使用mq类型模式REQ/REP(请求/回复,服务器是请求),还有PUB/SUB(发布/订阅),push/pull等 server_socket.bind("tcp://127.0.0.1:5000") #绑定socket

for request in range(1,10): server_socket.send("Hello") print('Switched to Server for ', request) server_socket.recv() #这里发生上下文切换

def client(): client_socket = context.socket(zmq.REP) (客户端是回复) client_socket.connect("tcp://127.0.0.1:5000") #连接server的socket端口

for request in range(1,10):

client_socket.recv() print('Switched to Client for ', request) client_socket.send("World")

publisher = gevent.spawn(server) client = gevent.spawn(client)

gevent.joinall([publisher, client])

执行结果:

[root@248_STAT ~]# python test.py (‘Switched to Server for ‘, 1) (‘Switched to Client for ‘, 1) (‘Switched to Server for ‘, 2) (‘Switched to Client for ‘, 2) (‘Switched to Server for ‘, 3) (‘Switched to Client for ‘, 3) (‘Switched to Server for ‘, 4) (‘Switched to Client for ‘, 4) (‘Switched to Server for ‘, 5) (‘Switched to Client for ‘, 5) (‘Switched to Server for ‘, 6) (‘Switched to Client for ‘, 6) (‘Switched to Server for ‘, 7) (‘Switched to Client for ‘, 7) (‘Switched to Server for ‘, 8) (‘Switched to Client for ‘, 8) (‘Switched to Server for ‘, 9) (‘Switched to Client for ‘, 9)

 

2 telnet 服务器

from gevent.server import StreamServer #StreamServer是一个通用的TCP服务器

def handle(socket, address): socket.send("Hello from a telnet!\n") for i in range(5): socket.send(str(i) + '\n') #给socket客户端发送数据 socket.close() #关闭客户端连接

server = StreamServer(('127.0.0.1', 5000), handle) #当出现连接调用定义的方法handle server.serve_forever()

执行结果:

dongwm@localhost ~ $ nc 127.0.0.1 5000 Hello from a telnet! 0 1 2 3 4 dongwm@localhost ~ $ telnet 127.0.0.1 5000 Trying 127.0.0.1… Connected to 127.0.0.1. Escape character is ‘^]’. Hello from a telnet! 0 1 2 3 4 Connection closed by foreign host. 3 wsgi服务器

from gevent.wsgi import WSGIServer

def application(environ, start_response): status = '200 OK' #页面状态指定为200 ok body = '<p>Hello World</p>'

headers = [ ('Content-Type', 'text/html') ]

start_response(status, headers) return [body]

WSGIServer(('', 8000), application).serve_forever() #启动一个占用8000端口的wsgi服务器

 

from gevent.pywsgi import WSGIServer #使用pywsgi可以我们自己定义产生结果的处理引擎

def application(environ, start_response): status = '200 OK'

headers = [ ('Content-Type', 'text/html') ]

start_response(status, headers) yield "<p>Hello" #yield出数据 yield "World</p>"

WSGIServer(('', 8000), application).serve_forever()

我们看一个用ab(Apache Benchmark)的性能测试(更多信息请查看http://nichol.as/benchmark-of-python-web-servers),我这里只

对比了gevent和paste的性能比(没做系统优化,只是在同样条件下看性能差距):

paste的wsgi程序:

from gevent.wsgi import WSGIServer

def application(environ, start_response): status = '200 OK' body = '<p>Hello World</p>'

headers = [ ('Content-Type', 'text/html') ]

start_response(status, headers) return [body]

#WSGIServer(('', 8000), application).serve_forever() from paste import httpserver httpserver.serve(application, '0.0.0.0', request_queue_size=500)

dongwm@localhost ~ $ /usr/sbin/ab2 -n 10000 -c 100 http://127.0.0.1:8000/ #gevent的性能,条件是:并发100,请求1W This is ApacheBench, Version 2.3 <$Revision: 655654 $> Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/ Licensed to The Apache Software Foundation, http://www.apache.org/

Benchmarking 127.0.0.1 (be patient) Completed 1000 requests Completed 2000 requests Completed 3000 requests Completed 4000 requests Completed 5000 requests Completed 6000 requests Completed 7000 requests Completed 8000 requests Completed 9000 requests Completed 10000 requests Finished 10000 requests

Server Software: Server Hostname:        127.0.0.1 Server Port:            8000

Document Path:          / Document Length:        18 bytes

Concurrency Level:      100 Time taken for tests:   2.805 seconds Complete requests:      10000 Failed requests:        0 Write errors:           0 Total transferred:      1380000 bytes HTML transferred:       180000 bytes Requests per second:    3564.90 [#/sec] (mean) Time per request:       28.051 [ms] (mean) Time per request:       0.281 [ms] (mean, across all concurrent requests) Transfer rate:          480.43 [Kbytes/sec] received

Connection Times (ms) min  mean[+/-sd] median   max Connect:        0    0   0.2      0       2 Processing:     2   28  15.1     27      69 Waiting:        1   28  15.1     27      69 Total:          2   28  15.1     27      69

Percentage of the requests served within a certain time (ms) 50%     27 66%     35 75%     40 80%     42 90%     48 95%     54 98%     59 99%     62 100%     69 (longest request)

dongwm@localhost ~ $ /usr/sbin/ab2 -n 10000 -c 100 http://127.0.0.1:8080/  #paste的性能 This is ApacheBench, Version 2.3 <$Revision: 655654 $> Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/ Licensed to The Apache Software Foundation, http://www.apache.org/

Benchmarking 127.0.0.1 (be patient) Completed 1000 requests Completed 2000 requests Completed 3000 requests Completed 4000 requests Completed 5000 requests Completed 6000 requests Completed 7000 requests Completed 8000 requests Completed 9000 requests Completed 10000 requests Finished 10000 requests

Server Software:        PasteWSGIServer/0.5 Server Hostname:        127.0.0.1 Server Port:            8080

Document Path:          / Document Length:        18 bytes

Concurrency Level:      100 Time taken for tests:   4.119 seconds Complete requests:      10000 Failed requests:        0 Write errors:           0 Total transferred:      1600000 bytes HTML transferred:       180000 bytes Requests per second:    2427.52 [#/sec] (mean) Time per request:       41.194 [ms] (mean) Time per request:       0.412 [ms] (mean, across all concurrent requests) Transfer rate:          379.30 [Kbytes/sec] received

Connection Times (ms) min  mean[+/-sd] median   max Connect:        0    0   0.2      0       2 Processing:     2   41   5.4     41     107 Waiting:        1   41   5.2     40      97 Total:          4   41   5.3     41     107

Percentage of the requests served within a certain time (ms) 50%     41 66%     41 75%     42 80%     43 90%     46 95%     50 98%     56 99%     59 100%    107 (longest request)

很不好理解吧,那我把数据直接整理下:

1 测试用时:

Time taken for tests:   2.805 seconds #gevent

Time taken for tests:   4.119 seconds #paste 花费时间更长 2 每秒请求数:

Requests per second:    3564.90 [#/sec] (mean) #gevent的嘛,每秒请求数大的多 Requests per second:    2427.52 [#/sec] (mean) #paste

3 每请求数耗时:

Time per request:       28.051 [ms] (mean) #gevent耗时少 Time per request:       0.281 [ms] (mean, across all concurrent requests) #gevent并发请求时耗时少 Time per request:       41.194 [ms] (mean) #paste Time per request:       0.412 [ms] (mean, across all concurrent requests) #paste

4 传输效率:

Transfer rate:          448.26 [Kbytes/sec] received #gevent的效率更高 Transfer rate:          379.30 [Kbytes/sec] received #paste

5 连接消耗的时间的分解:

Connection Times (ms) min  mean[+/-sd] median   max Connect:        0    0   0.2      0       2 Processing:     2   28  15.1     27      69 Waiting:        1   28  15.1     27      69 Total:          2   28  15.1     27      69

Connection Times (ms) #paste min  mean[+/-sd] median   max Connect:        0    0   0.2      0       2 Processing:     2   41   5.4     41     107 Waiting:        1   41   5.2     40      97 Total:          4   41   5.3     41     107 #明显其中最大用时107/97都大于gevent的69ms,最小用时gevent略强

6 整个场景中所有请求的响应情况。在场景中每个请求都有一个响应时间

Percentage of the requests served within a certain time (ms) #gevent 50%     29 66%     31 75%     34 80%     34 90%     36 95%     38 98%     42 99%     44 100%     71 (longest request)

可以这样理解:50%用户效应小于29ms,60%用户响应小于31ms,最长的访问响应为71ms Percentage of the requests served within a certain time (ms) #paste 50%     41 66%     41 75%     42 80%     43 90%     46 95%     50 98%     56 99%     59 100%    107 (longest request)  #很明显,无论那个区间,paste性能都略差

4 长轮询

import gevent
from gevent.queue import Queue, Empty
from gevent.pywsgi import WSGIServer
import json

data_source = Queue()

def producer(): while True: data_source.put_nowait('Hello World') #往队列非阻塞的放入数据 gevent.sleep(1)

def ajax_endpoint(environ, start_response): status = '200 OK' headers = [ ('Content-Type', 'application/json') #设定网络文件的类型是json ] try: datum = data_source.get(timeout=5) except Empty: datum = [] #假如gevent.sleep的时间设置的长一些(比如5s),在不停刷新过程中会获得空列表

start_response(status, headers) return json.dumps(datum) #返回数据,打印出来的数据是一个带引号的字符串

gevent.spawn(producer)

WSGIServer(('', 8000), ajax_endpoint).serve_forever()

4 聊天室(源码在这里https://github.com/sdiehl/minichat.git):

from gevent import monkey
monkey.patch_all() #给模块打包
from flask import Flask, render_template, request, json #作者在这里使用了flask框架,当然你也可以用其它比如django.tornado,bottle等

from gevent import queue from gevent.pywsgi import WSGIServer

app = Flask(__name__) app.debug = True

class Room(object):

def __init__(self): self.users = set() self.messages = []

def backlog(self, size=25): return self.messages[-size:]

def subscribe(self, user): self.users.add(user)

def add(self, message): for user in self.users: print user user.queue.put_nowait(message) self.messages.append(message)

class User(object):

def __init__(self): self.queue = queue.Queue()

rooms = { 'python': Room(), 'django': Room(), }

users = {}

@app.route('/') #flask指定url的处理使用路由的方式,访问页面地址根目录就会执行choose_name def choose_name(): return render_template('choose.html') #然后调用模板choose.html,这个html文件最后使用了GET方法提交了一个uid页面(/<uid>)

@app.route('/<uid>') #请求被转到了这里 def main(uid): return render_template('main.html', #调用模板提供几个room的连接 uid=uid, rooms=rooms.keys() #格局选择的连接,通过GET方法转到那个相应url:/<room>/<uid> )

@app.route('/<room>/<uid>') #请求被转到了这里 def join(room, uid): user = users.get(uid, None)

if not user: users[uid] = user = User()

active_room = rooms[room] active_room.subscribe(user) print 'subscribe', active_room, user

messages = active_room.backlog()

return render_template('room.html', #room.html包含一个POST提交方式,把你的聊天数据提交,并且更新页面(通过jquery的ajax调用url/poll/<uid>) room=room, uid=uid, messages=messages)

@app.route("/put/<room>/<uid>", methods=["POST"]) #通过这个url def put(room, uid): user = users[uid] room = rooms[room]

message = request.form['message'] room.add(':'.join([uid, message]))

return ''

@app.route("/poll/<uid>", methods=["POST"]) def poll(uid): try: msg = users[uid].queue.get(timeout=10) except queue.Empty: msg = [] return json.dumps(msg) #返回队列中包含的聊天记录

if __name__ == "__main__": http = WSGIServer(('', 5000), app) http.serve_forever()

来一个更复杂带有前台后端的模型(例子来自http://blog.pythonisito.com/2011/07/gevent-zeromq-websockets-and-flot-ftw.html):

源码在:http://dl.dropbox.com/u/24086834/blog/20110723/zmq_websocket.tar.gz

其中需要修改graph.js第二行:

var ws = new WebSocket(“ws://localhost:9999/test”);

为:

var ws = new MozWebSocket(“ws://localhost:9999/test”);  #因为我的火狐用的websocket不同

这个demo.py,我来解析下:

import os
import time
import math
import json
import webbrowser

import paste.urlparser #paste是一个WSGI工具包,在WSGI的基础上包装了几层,让应用管理和实现变得方便

import gevent from gevent_zeromq import zmq from geventwebsocket.handler import WebSocketHandler #基于gevent的pywsgi的WebSocket的处理程序

def main(): #主方法 context = zmq.Context() gevent.spawn(zmq_server, context) #上个例子使用joinall,这个例子是spawn+start,context是参数,也就是实例化的GreenContext ws_server = gevent.pywsgi.WSGIServer( ('', 9999), WebSocketApp(context), handler_class=WebSocketHandler) http_server = gevent.pywsgi.WSGIServer( ('', 8000), paste.urlparser.StaticURLParser(os.path.dirname(__file__))) # paste.urlparser用来处理url和静态文件 http_server.start() #启动grennlet实例 ws_server.start() webbrowser.open('http://localhost:8000/graph.html') #启动浏览器看这个页面,当正常启动后js会画图 zmq_producer(context)

def zmq_server(context): sock_incoming = context.socket(zmq.SUB) sock_outgoing = context.socket(zmq.PUB) sock_incoming.bind('tcp://*:5000') #发布绑定 sock_outgoing.bind('inproc://queue') #订阅绑定,本地(通过内存)进程(线程间)通信传输 sock_incoming.setsockopt(zmq.SUBSCRIBE, "") #这里表示对发布的所有信息都订阅 while True: msg = sock_incoming.recv() sock_outgoing.send(msg)

class WebSocketApp(object):

def __init__(self, context): self.context = context

def __call__(self, environ, start_response): ws = environ['wsgi.websocket'] sock = self.context.socket(zmq.SUB) sock.setsockopt(zmq.SUBSCRIBE, "") #订阅所有信息 sock.connect('inproc://queue') #websocket连接到订阅的地址 while True: msg = sock.recv() ws.send(msg)

def zmq_producer(context): #发布的方法 socket = context.socket(zmq.PUB) socket.connect('tcp://127.0.0.1:5000') #绑定到发布的socket

while True: x = time.time() * 1000 y = 2.5 * (1 + math.sin(x / 500)) socket.send(json.dumps(dict(x=x, y=y))) #往发布socket发送数据,这样,数据会被inproc://queue订阅,而被websocket获取,根据数据展示 gevent.sleep(0.05)

if __name__ == '__main__': main()