Follower me on GitHub

前言:使用ruby写的开源octopress越来越受到欢迎,并且因为github.com,很多作者把blog放在其二级域名上面,使用了它.国内也有很多人使用,比如mrzhang.me,其作者的主题还是不错滴,最近在华蟒python邮件组一封招聘邮件,里面竟然其中有一条:* 有github账号,开源项目和octpress的blog(但是职位是python的web开发,我很囧)

1 安装前准备:

假设你有github帐号,利用github提供的一个特性,使用key实现ssh信任链接

ssh-keygen -t dsa cat ~/.ssh/id_dsa.pub#把其中的数据粘贴到github上面,这个就不说了

2 使用rvm: echo insecure >> ~/.curlrc curl -k https://raw.github.com/wayneeseguin/rvm/master/binscripts/rvm-installer | bash -s stable #ruby版本管理,类似于python的pythonbrew export PATH=$PATH:/home/dongwm/.rvm/bin #或者直接写到profile rvm list known #列出已知的ruby #本来我使用的ruby是1.8的 但是这个框架需要1.9.2以上? ruby 1.8.7 (2011-06-30 patchlevel 352) [i686-linux] dongwm@dongwm ~ $ rvm install 1.9.3 #安装ruby1.9.3

git clone git://github.com/imathis/octopress.git dongwm.github.com  #下载octopress源码 dongwm@dongwm ~ $ rvm  –create use 1.9.3@dongwm.github.com #使用ruby1.9.3 Using /home/dongwm/.rvm/gems/ruby-1.9.3-p194 with gemset dongwm.github.com Running /home/dongwm/.rvm/hooks/after_use

dongwm@dongwm ~ $ cd dongweiming.github.com/  #第一次切换到这个放置octopress目录时会提示你以下信息 Do you wish to trust this .rvmrc file? (/home/dongwm/dongwm.github.com/.rvmrc) y[es], n[o], v[iew], c[ancel]> y  #选择 y  以后就不在问你了

3 使用bundle管理项目中所有gem依赖

dongwm@dongwm ~/dongwm.github.com $ gem install bundler

dongwm@dongwm ~/dongwm.github.com $ bundle install #安装需要的gem依赖

4 安装主题,假如我们不想用默认的主题

dongwm@dongwm ~/dongwm.github.com $ git clone git://github.com/bkutil/bootstrap-theme.git .themes/bootstrap-theme #git clone git://github.com/sevenadrian/foxslide .themes/foxslide #git clone git://github.com/barmstrong/octopress-bootstrap.git .themes/octopress-bootstrap dongwm@dongwm ~/dongwm.github.com $ rake install[‘foxslide’] #安装主题,默认主题rake install dongwm@dongwm ~/dongwm.github.com $ rake generate #生成模板文件

注:每次换主题其实就是下载git源码+rake install + rake generate

5 可选 代码预览

假如测试环境想预览效果可以使用rake preview

6 部署代码到github

dongwm@dongwm ~/dongwm.github.com $ rake setup_github_pages  #设置链接 Enter the read/write url for your repository (For example, ‘git@github.com:your_username/your_username.github.com) Repository url: git@github.com:dongweiming/dongweiming.github.com  这里的dongweiming是我的帐号名字,后面的dongweiming.github.com是我的源,也就是github创建的源的名字,需要你手动在github网站增加,其实也是最后项直接访问的网站名字 以后访问 http://dongweiming.github.com dongwm@dongwm ~/dongwm.github.com $ rake deploy #部署到github

当你看到“Github Pages deploy complete”后,就表示done,可以访问了

7 版本控制

既然是github,不用git就搞笑了

dongwm@dongwm ~/dongwm.github.com $ 832  cd source/_posts/ #因为在添加文章之类都会在source/_posts目录下面增加相应的文件,那么我要备份这个目录,也就是使用版本控制 dongwm@dongwm ~/octopress/source/_posts $ git init #初始化 dongwm@dongwm ~/octopress/source/_posts $ touch README.md dongwm@dongwm ~/octopress/source/_posts $  git add * dongwm@dongwm ~/octopress/source/_posts $ git commit -m ‘First version’

dongwm@dongwm ~/octopress/source/_posts $ git remote add dongwm git@github.com:dongweiming/dongweiming.github.com.git

这个意思就是 我添加了一个叫做’dongwm’的远程快捷方式,他链接到dongweiming帐号的dongweiming.github.com项目

dongwm@dongwm ~/octopress/source/_posts $ git checkout -b backup #创建一个分支叫做backup Switched to a new branch ‘backup’ dongwm@dongwm ~/octopress/source/_posts $ git push dongwm backup #将修改push到backup分支,这样就实现了对这个目录的控制 Counting objects: 3, done. Writing objects: 100% (3/3), 213 bytes, done. Total 3 (delta 0), reused 0 (delta 0) To git@github.com:dongweiming/dongweiming.github.com.git * [new branch]      backup -> backup

 

前言:sphix是一个允许开发人员以纯文本格式使用reStructuredText 标记语法编写文档,自定义显示效果的文档工具。比较有代表性的网站有python的docs官网张沈鹏的42区等等,没事我在内网弄个记录一些工作文档。

注:我这里使用了gentoo系统

1  安装sphinx和apache2

因为还有个同名的大名鼎鼎的sphinx搜索引擎,需要指定到类型:

sudo emerge dev-python/sphinx apache2 2 配置一个wiki站点

dongwm@localhost ~ $sphinx-quickstart  # 使用这个命令快速创建

> Root path for the documentation [.]:    test #过程中会有一些提示选项,根据你的需要定制,我对所有的疑问都是yes,注意这步,是问你这个配置的站点的root目录,我这里是一个新的test子目录目录,在家目录下 完成后会出现test目录,包含这样的内容:

dongwm@localhost ~/test $ ls -l total 24 drwxr-xr-x 4 dongwm dongwm 4096 Sep 14 13:38 build  #最后生成的html文件目录 -rw-r–r– 1 dongwm dongwm 5113 Sep 14 13:37 make.bat #我想是windows下的make -rw-r–r– 1 dongwm dongwm 5589 Sep 14 13:37 Makefile #这个大家都熟悉 drwxr-xr-x 4 dongwm dongwm 4096 Sep 14 13:38 source  #源文件就是我们要编辑的wiki显示的代码的源文件目录

3 修改要显示的源文件 原理:修改source下面的XXX.rst(或者你定义的其他后缀文件),最后会生成同名的html文件,比如编辑index.rst会生成index.html,这个也就是默认的主显文件,通过他链接到其他文档处

4 生成html文件目录 dongwm@localhost ~/test $  make html

这样就会在build/html目录下生成html文件,当然你也自定义

5 设置apache以及htaccess

修改/etc/apache2/vhosts.d/default_vhost.include

DocumentRoot “/home/dongwm/test/build/html”  12行 其中/home/dongwm/test/build/html就是生成html的目录,我就使用了默认的

<Directory “/home/dongwm/test/build/html”>  15行

在/home/dongwm/test/build/html目录,也就是网站根目录添加新文件.htaccess

dongwm@localhost ~/test/build/html $ cat !$ cat .htaccess <Files ~ “^.(htaccess|htpasswd)$”> deny from all </Files> Options -Indexes <FilesMatch “.(gif|jpg|jpeg|png|ico)$”> Header set Cache-Control “max-age=86400”  #静态图片缓存24小时 </FilesMatch> AuthUserFile /home/dongwm/.htpasswd  #密码验证文件使用htpasswd生成 AuthGroupFile /dev/null AuthName “Please enter your ID and password” AuthType Basic require valid-user order deny,allow allow from 10.28.101.1/24  #只容许这个段的人访问 deny from all

dongwm@localhost ~/test $ sudo /etc/init.d/apache2 restart  #重启apache2

6 显示中文

修改source/conf.py其中的language=”zh_CN” 重新make html即可

7 技巧

1 快速开发

每次我们修改了rst的源文件还需要make html去生成html文件,操作很麻烦,我在使用sublime_text2编辑器,写一个自定义编译程序的build,然后在Tools->Build System里面选择这个程序,

比如:dongwm@localhost ~ $ cat ~/.config/sublime-text-2/Packages/User/makehtml.sublime-build  #一定要保存在这个目录,这个命令就是makehtml(去掉文件的.sublime-build后缀) { “cmd”: [“make”, “html”], #我要执行make html  每个参数都要用引号隔开 “working_dir”: “${project_path:${folder}}”  #他的语法大家可以去看官方文档,这里表示执行这个命令实在我当前的工作目录 }

前言:gevent是python的一个并发框架,以微线程greenlet为核心,使用了epoll事件监听机制以及诸多其他优化而变得高效.而且其中有个monkey类, 将现有基于Python线程直接转化为greenlet(类似于打patch).他和线程框架性能比高大概4倍(看下图,是gevent和paste的对比):

工作暂时没有用gevent的地方,这里就简单的对http://sdiehl.github.com/gevent-tutorial的一些例子和内容翻译:

1 同步和异步

import gevent

def foo(): print('Running in foo') gevent.sleep(0) #让当前的greenlet睡眠N秒,这0标识控制其它协程而不会让其它进程睡眠 print('Explicit context switch to foo again')

def bar(): print('Explicit context to bar') gevent.sleep(0) print('Implicit context switch back to bar')

gevent.joinall([ #gevent.Greenlet实例,直到这个greenlet完成或者超时 gevent.spawn(foo), #spawn可以实现一个grennlet实例并且加到队列并且启动,效果类似于gevent.Greenlet(foo).start() gevent.spawn(bar), ])

执行结果的效果图:

dongwm@localhost ~ $ python test.py Explicit context to bar Running in foo Explicit context switch to foo again Implicit context switch back to bar

import time
import gevent
from gevent import select #类似于内置的select.select()实现(请关注http://www.dongwm.com/archives/guanyuselectyanjiu/),只是将线程操作改成了greenlet

start = time.time() tic = lambda: 'at %1.1f seconds' % (time.time() - start)

def gr1(): print('Started Polling: ', tic()) select.select([], [], [], 2) #参数分别是,等待的可读列表,等待的可写列表,等待的可执行列表,超时时间(这里是2秒) print('Ended Polling: ', tic())

def gr2(): print('Started Polling: ', tic()) select.select([], [], [], 2) print('Ended Polling: ', tic())

def gr3(): print("Hey lets do some stuff while the greenlets poll, at", tic()) gevent.sleep(1)

gevent.joinall([ gevent.spawn(gr1), gevent.spawn(gr2), gevent.spawn(gr3), ])

执行结果:

dongwm@localhost ~ $ python test.py (‘Hey lets do some stuff while the greenlets poll, at’, ‘at 0.0 seconds’)  #因为gr1和gr2开始是阻塞的,gr3直接打印 (‘Started Polling: ‘, ‘at 0.0 seconds’) (‘Started Polling: ‘, ‘at 0.0 seconds’) (‘Ended Polling: ‘, ‘at 2.0 seconds’) (‘Ended Polling: ‘, ‘at 2.0 seconds’)

import gevent
import random

def task(pid): gevent.sleep(random.randint(0,2)*0.001) print('Task', pid, 'done')

def synchronous(): #同步 for i in range(1,10): task(i)

def asynchronous(): #异步 threads = [gevent.spawn(task, i) for i in xrange(10)] gevent.joinall(threads)

print('Synchronous:') synchronous()

print('Asynchronous:') asynchronous()

执行结果:

dongwm@localhost ~ $ python test.py Synchronous:  #协程不会控制其它进程睡眠,所以挨个执行 (‘Task’, 1, ‘done’) (‘Task’, 2, ‘done’) (‘Task’, 3, ‘done’) (‘Task’, 4, ‘done’) (‘Task’, 5, ‘done’) (‘Task’, 6, ‘done’) (‘Task’, 7, ‘done’) (‘Task’, 8, ‘done’) (‘Task’, 9, ‘done’) Asynchronous:  #他们放在grennlet里面,sleep的时间是随机的,完成顺序也就不同了 (‘Task’, 2, ‘done’) (‘Task’, 3, ‘done’) (‘Task’, 5, ‘done’) (‘Task’, 7, ‘done’) (‘Task’, 9, ‘done’) (‘Task’, 6, ‘done’) (‘Task’, 1, ‘done’) (‘Task’, 0, ‘done’) (‘Task’, 8, ‘done’) (‘Task’, 4, ‘done’)

import gevent
from gevent import Greenlet

def foo(message, n): gevent.sleep(n) print(message)

thread1 = Greenlet.spawn(foo, "Hello", 1) #实例化Greenlet thread2 = gevent.spawn(foo, "I live!", 2) #实例化gevent,其实也是创建Greenlet实例,只是包装了一下 thread3 = gevent.spawn(lambda x: (x+1), 2) #一个lambda表达式

threads = [thread1, thread2, thread3] gevent.joinall(threads) #等待所有greenlet完成

执行结果:

dongwm@localhost ~ $ python test.py Hello I live!  #打印出来效果不明显,事实上等待一秒打印第一行,再等待一秒打印第二行,然后马上完成(lambda没有显示)

import gevent
from gevent import Greenlet

class MyGreenlet(Greenlet): #重载Greenlet类

def __init__(self, message, n): Greenlet.__init__(self) self.message = message self.n = n

def _run(self): #重写_run方法 print(self.message) gevent.sleep(self.n)

g = MyGreenlet("Hi there!", 3) g.start() g.join()

import gevent

def win(): return 'You win!'

def fail(): raise Exception('You fail at failing.')

winner = gevent.spawn(win) loser = gevent.spawn(fail)

print(winner.started) # started表示的Greenlet是否已经开始,返回布尔值 print(loser.started) # True

try: gevent.joinall([winner, loser]) except Exception as e: print('This will never be reached')

print(winner.value) # value表示greenlet实例返回值:'You win!' print(loser.value) # None

print(winner.ready()) # 是否已停止Greenlet的布尔值,True print(loser.ready()) # True

print(winner.successful()) # 表示的Greenlet是否已成功停止,而不是抛出异常,True print(loser.successful()) # False print(loser.exception) #打印异常的报错信息

执行结果:

dongwm@localhost ~ $ python test.py True True Traceback (most recent call last): File “/usr/lib/python2.7/site-packages/gevent-1.0dev-py2.7-linux-i686.egg/gevent/greenlet.py”, line 328, in run result = self._run(*self.args, **self.kwargs) File “test.py”, line 7, in fail raise Exception(‘You fail at failing.’) Exception: You fail at failing. <Greenlet at 0xb73cd39cL: fail> failed with Exception

You win! None True True True False You fail at failing.

import gevent
from gevent import Timeout

seconds = 10

timeout = Timeout(seconds) timeout.start()

def wait(): gevent.sleep(10)

try: gevent.spawn(wait).join() except Timeout: print 'Could not complete'

上面的例子是可以执行完成的,但是假如修改seconds = 5,让数值少入sleep,那么就会有超时被捕捉到

还可以使用with关键字处理上下文:

import gevent
from gevent import Timeout

time_to_wait = 5 # seconds

class TooLong(Exception): pass

with Timeout(time_to_wait, TooLong): gevent.sleep(10)

以及其他的方式的:

import gevent
from gevent import Timeout

def wait(): gevent.sleep(2)

timer = Timeout(1).start() thread1 = gevent.spawn(wait) #这种超时类型前面讲过

try: thread1.join(timeout=timer) except Timeout: print('Thread 1 timed out')

timer = Timeout.start_new(1) #start_new是一个快捷方式 thread2 = gevent.spawn(wait)

try: thread2.get(timeout=timer) #get返回greenlet的结果,包含异常 except Timeout: print('Thread 2 timed out')

try: gevent.with_timeout(1, wait) #如果超时前返回异常,取消这个方法 except Timeout: print('Thread 3 timed out')

2 数据结构

import gevent
from gevent.event import AsyncResult

a = AsyncResult() #保存一个值或者一个异常的事件实例

def setter(): gevent.sleep(3) #3秒后唤起所有线程的a的值 a.set() #保存值,唤起等待线程

def waiter(): a.get() # 3秒后get方法不再阻塞,返回存贮的值或者异常 print 'I live!'

gevent.joinall([ gevent.spawn(setter), gevent.spawn(waiter), ])

更清晰的例子:

import gevent
from gevent.event import AsyncResult
a = AsyncResult()

def setter(): gevent.sleep(3) a.set('Hello!')

def waiter(): print a.get()

gevent.joinall([ gevent.spawn(setter), gevent.spawn(waiter), ])

import gevent
from gevent.queue import Queue  #类似于内置的Queue

tasks = Queue() #队列实例

def worker(n): while not tasks.empty(): task = tasks.get() print('Worker %s got task %s' % (n, task)) gevent.sleep(0)

print('Quitting time!')

def boss(): for i in xrange(1,25): tasks.put_nowait(i) #非阻塞的把数据放到队列里面

gevent.spawn(boss).join()

gevent.joinall([ gevent.spawn(worker, 'steve'), gevent.spawn(worker, 'john'), gevent.spawn(worker, 'nancy'), ])

执行结果:

[root@248_STAT ~]# python !$ python test.py Worker steve got task 1 #3个用户循环的取出数据 Worker john got task 2 Worker nancy got task 3 Worker steve got task 4 Worker nancy got task 5 Worker john got task 6 Worker steve got task 7 Worker john got task 8 Worker nancy got task 9 Worker steve got task 10 Worker nancy got task 11 Worker john got task 12 Worker steve got task 13 Worker john got task 14 Worker nancy got task 15 Worker steve got task 16 Worker nancy got task 17 Worker john got task 18 Worker steve got task 19 Worker john got task 20 Worker nancy got task 21 Worker steve got task 22 Worker nancy got task 23 Worker john got task 24 Quitting time! Quitting time! Quitting time!

一个更复杂的例子:

import gevent
from gevent.queue import Queue, Empty

tasks = Queue(maxsize=3) #限制队列的长度

def worker(n): try: while True: task = tasks.get(timeout=1) # 减少队列,超时为1秒 print('Worker %s got task %s' % (n, task)) gevent.sleep(0) except Empty: print('Quitting time!')

def boss(): """ Boss will wait to hand out work until a individual worker is free since the maxsize of the task queue is 3. """

for i in xrange(1,10): tasks.put(i) #这里boss没有盲目的不停放入数据,而是在当最大三个队列数有空余才放入数据,事实上方法转换过程中,boss放入三个数据,worker取出三个数据,boss再放入数据.... print('Assigned all work in iteration 1')

for i in xrange(10,20): tasks.put(i) print('Assigned all work in iteration 2')

gevent.joinall([ gevent.spawn(boss), gevent.spawn(worker, 'steve'), gevent.spawn(worker, 'john'), gevent.spawn(worker, 'bob'), ])

import gevent
from gevent.pool import Group 
def talk(msg):
    for i in xrange(3):
        print(msg)

g1 = gevent.spawn(talk, 'bar') g2 = gevent.spawn(talk, 'foo') g3 = gevent.spawn(talk, 'fizz')

group = Group() #保持greenlet实例的组运行,连接到没个项目,在其完成后删除 group.add(g1) group.add(g2) group.join()

group.add(g3) group.join()

看更加明确的例子:

import gevent
from gevent import getcurrent
from gevent.pool import Group

group = Group()

def hello_from(n): print('Size of group', len(group)) print('Hello from Greenlet %s' % id(getcurrent())) #获取当前gevent实例的id

group.map(hello_from, xrange(3)) #map迭代方法,参数为方法和其参数

def intensive(n): gevent.sleep(3 - n) return 'task', n

print('Ordered')

ogroup = Group() for i in ogroup.imap(intensive, xrange(3)): #相当于 itertools.imap,返回一个迭代器, 它是调用了一个其值在输入迭代器上的函数, 返回结果. 它类似于函数 map() , 只是前者在 #任意输入迭代器结束后就停止(而不是插入None值来补全所有的输入) print(i)

print('Unordered')

igroup = Group() for i in igroup.imap_unordered(intensive, xrange(3)): print(i)

执行结果:

[root@248_STAT ~]# python test.py (‘Size of group’, 3) Hello from Greenlet 314818960 (‘Size of group’, 3) Hello from Greenlet 314819280 (‘Size of group’, 3) Hello from Greenlet 314819440 Ordered (‘task’, 0) (‘task’, 1) (‘task’, 2) Unordered (‘task’, 2) (‘task’, 1) (‘task’, 0)

还能限制pool池的大小

import gevent
from gevent import getcurrent
from gevent.pool import Pool

pool = Pool(2)

def hello_from(n): print('Size of pool', len(pool))

pool.map(hello_from, xrange(3))

返回结果:

[root@248_STAT ~]# python test.py (‘Size of pool’, 2) (‘Size of pool’, 2) (‘Size of pool’, 1) #因为上面的pool容纳不了第三个,这是一个新的pool

以下是作者写的一个pool操作类:

from gevent.pool import Pool

class SocketPool(object):

def __init__(self): self.pool = Pool(1000) #设置池容量1000 self.pool.start()

def listen(self, socket): while True: socket.recv()

def add_handler(self, socket): if self.pool.full(): #容量慢报错 raise Exception("At maximum pool size") else: #否则执行在新的grenlet里面执行listen方法 self.pool.spawn(self.listen, socket)

def shutdown(self): self.pool.kill() #关闭pool

from gevent import sleep
from gevent.pool import Pool
from gevent.coros import BoundedSemaphore

sem = BoundedSemaphore(2) #设定对共享资源的访问数量

def worker1(n): sem.acquire() #获取资源 print('Worker %i acquired semaphore' % n) sleep(0) sem.release() #释放资源 print('Worker %i released semaphore' % n)

def worker2(n): with sem: #使用with关键字 print('Worker %i acquired semaphore' % n) sleep(0) print('Worker %i released semaphore' % n)

pool = Pool() pool.map(worker1, xrange(0,2)) pool.map(worker2, xrange(3,6))

执行结果:

[root@248_STAT ~]# python test.py Worker 0 acquired semaphore Worker 1 acquired semaphore  #因为pool能容纳这2个请求,所以同时获取,再释放 Worker 0 released semaphore Worker 1 released semaphore Worker 3 acquired semaphore #因为只能接收2个,那么5就要到下一轮 Worker 4 acquired semaphore Worker 3 released semaphore Worker 4 released semaphore Worker 5 acquired semaphore Worker 5 released semaphore

一个gevent教材上面说过的ping pong的那个协程例子的另一个实现:

import gevent
from gevent.queue import Queue
from gevent import Greenlet

class Actor(gevent.Greenlet): #自定义actor类

def __init__(self): self.inbox = Queue() #收件箱作为一个队列 Greenlet.__init__(self)

def receive(self, message): raise NotImplemented() #内置常量,表面意为没有实施

def _run(self): # self.running = True

while self.running: message = self.inbox.get() #获取队列数据 self.receive(message)

class Pinger(Actor): def receive(self, message): #重写方法 print message pong.inbox.put('ping') #当获取收件箱有数据,获取数据,再放入数据(注意:是ping中放pong数据),其中pong是一个局部变量,它是Ponger的实例,以下的同理 gevent.sleep(0)

class Ponger(Actor): def receive(self, message): print message ping.inbox.put('pong') gevent.sleep(0)

ping = Pinger() pong = Ponger()

ping.start() pong.start()

ping.inbox.put('start') #最开始都是阻塞的,给一个触发 gevent.joinall([ping, pong])

 

3 实际应用

1 zeromq和gevent:

zeromq的介绍请参看:http://www.infoq.com/cn/news/2010/09/introduction-zero-mq

假设你已经安装了zeromq,gevent_zeromq(https://github.com/traviscline/gevent-zeromq.git)和pyzmq

一个很基础的例子:

import gevent
from gevent_zeromq import zmq

# Global Context context = zmq.Context() #它是GreenContext的一个简写,确保greenlet化socket

def server(): server_socket = context.socket(zmq.REQ) #创建一个socket,使用mq类型模式REQ/REP(请求/回复,服务器是请求),还有PUB/SUB(发布/订阅),push/pull等 server_socket.bind("tcp://127.0.0.1:5000") #绑定socket

for request in range(1,10): server_socket.send("Hello") print('Switched to Server for ', request) server_socket.recv() #这里发生上下文切换

def client(): client_socket = context.socket(zmq.REP) (客户端是回复) client_socket.connect("tcp://127.0.0.1:5000") #连接server的socket端口

for request in range(1,10):

client_socket.recv() print('Switched to Client for ', request) client_socket.send("World")

publisher = gevent.spawn(server) client = gevent.spawn(client)

gevent.joinall([publisher, client])

执行结果:

[root@248_STAT ~]# python test.py (‘Switched to Server for ‘, 1) (‘Switched to Client for ‘, 1) (‘Switched to Server for ‘, 2) (‘Switched to Client for ‘, 2) (‘Switched to Server for ‘, 3) (‘Switched to Client for ‘, 3) (‘Switched to Server for ‘, 4) (‘Switched to Client for ‘, 4) (‘Switched to Server for ‘, 5) (‘Switched to Client for ‘, 5) (‘Switched to Server for ‘, 6) (‘Switched to Client for ‘, 6) (‘Switched to Server for ‘, 7) (‘Switched to Client for ‘, 7) (‘Switched to Server for ‘, 8) (‘Switched to Client for ‘, 8) (‘Switched to Server for ‘, 9) (‘Switched to Client for ‘, 9)

 

2 telnet 服务器

from gevent.server import StreamServer #StreamServer是一个通用的TCP服务器

def handle(socket, address): socket.send("Hello from a telnet!\n") for i in range(5): socket.send(str(i) + '\n') #给socket客户端发送数据 socket.close() #关闭客户端连接

server = StreamServer(('127.0.0.1', 5000), handle) #当出现连接调用定义的方法handle server.serve_forever()

执行结果:

dongwm@localhost ~ $ nc 127.0.0.1 5000 Hello from a telnet! 0 1 2 3 4 dongwm@localhost ~ $ telnet 127.0.0.1 5000 Trying 127.0.0.1… Connected to 127.0.0.1. Escape character is ‘^]’. Hello from a telnet! 0 1 2 3 4 Connection closed by foreign host. 3 wsgi服务器

from gevent.wsgi import WSGIServer

def application(environ, start_response): status = '200 OK' #页面状态指定为200 ok body = '<p>Hello World</p>'

headers = [ ('Content-Type', 'text/html') ]

start_response(status, headers) return [body]

WSGIServer(('', 8000), application).serve_forever() #启动一个占用8000端口的wsgi服务器

 

from gevent.pywsgi import WSGIServer #使用pywsgi可以我们自己定义产生结果的处理引擎

def application(environ, start_response): status = '200 OK'

headers = [ ('Content-Type', 'text/html') ]

start_response(status, headers) yield "<p>Hello" #yield出数据 yield "World</p>"

WSGIServer(('', 8000), application).serve_forever()

我们看一个用ab(Apache Benchmark)的性能测试(更多信息请查看http://nichol.as/benchmark-of-python-web-servers),我这里只

对比了gevent和paste的性能比(没做系统优化,只是在同样条件下看性能差距):

paste的wsgi程序:

from gevent.wsgi import WSGIServer

def application(environ, start_response): status = '200 OK' body = '<p>Hello World</p>'

headers = [ ('Content-Type', 'text/html') ]

start_response(status, headers) return [body]

#WSGIServer(('', 8000), application).serve_forever() from paste import httpserver httpserver.serve(application, '0.0.0.0', request_queue_size=500)

dongwm@localhost ~ $ /usr/sbin/ab2 -n 10000 -c 100 http://127.0.0.1:8000/ #gevent的性能,条件是:并发100,请求1W This is ApacheBench, Version 2.3 <$Revision: 655654 $> Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/ Licensed to The Apache Software Foundation, http://www.apache.org/

Benchmarking 127.0.0.1 (be patient) Completed 1000 requests Completed 2000 requests Completed 3000 requests Completed 4000 requests Completed 5000 requests Completed 6000 requests Completed 7000 requests Completed 8000 requests Completed 9000 requests Completed 10000 requests Finished 10000 requests

Server Software: Server Hostname:        127.0.0.1 Server Port:            8000

Document Path:          / Document Length:        18 bytes

Concurrency Level:      100 Time taken for tests:   2.805 seconds Complete requests:      10000 Failed requests:        0 Write errors:           0 Total transferred:      1380000 bytes HTML transferred:       180000 bytes Requests per second:    3564.90 [#/sec] (mean) Time per request:       28.051 [ms] (mean) Time per request:       0.281 [ms] (mean, across all concurrent requests) Transfer rate:          480.43 [Kbytes/sec] received

Connection Times (ms) min  mean[+/-sd] median   max Connect:        0    0   0.2      0       2 Processing:     2   28  15.1     27      69 Waiting:        1   28  15.1     27      69 Total:          2   28  15.1     27      69

Percentage of the requests served within a certain time (ms) 50%     27 66%     35 75%     40 80%     42 90%     48 95%     54 98%     59 99%     62 100%     69 (longest request)

dongwm@localhost ~ $ /usr/sbin/ab2 -n 10000 -c 100 http://127.0.0.1:8080/  #paste的性能 This is ApacheBench, Version 2.3 <$Revision: 655654 $> Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/ Licensed to The Apache Software Foundation, http://www.apache.org/

Benchmarking 127.0.0.1 (be patient) Completed 1000 requests Completed 2000 requests Completed 3000 requests Completed 4000 requests Completed 5000 requests Completed 6000 requests Completed 7000 requests Completed 8000 requests Completed 9000 requests Completed 10000 requests Finished 10000 requests

Server Software:        PasteWSGIServer/0.5 Server Hostname:        127.0.0.1 Server Port:            8080

Document Path:          / Document Length:        18 bytes

Concurrency Level:      100 Time taken for tests:   4.119 seconds Complete requests:      10000 Failed requests:        0 Write errors:           0 Total transferred:      1600000 bytes HTML transferred:       180000 bytes Requests per second:    2427.52 [#/sec] (mean) Time per request:       41.194 [ms] (mean) Time per request:       0.412 [ms] (mean, across all concurrent requests) Transfer rate:          379.30 [Kbytes/sec] received

Connection Times (ms) min  mean[+/-sd] median   max Connect:        0    0   0.2      0       2 Processing:     2   41   5.4     41     107 Waiting:        1   41   5.2     40      97 Total:          4   41   5.3     41     107

Percentage of the requests served within a certain time (ms) 50%     41 66%     41 75%     42 80%     43 90%     46 95%     50 98%     56 99%     59 100%    107 (longest request)

很不好理解吧,那我把数据直接整理下:

1 测试用时:

Time taken for tests:   2.805 seconds #gevent

Time taken for tests:   4.119 seconds #paste 花费时间更长 2 每秒请求数:

Requests per second:    3564.90 [#/sec] (mean) #gevent的嘛,每秒请求数大的多 Requests per second:    2427.52 [#/sec] (mean) #paste

3 每请求数耗时:

Time per request:       28.051 [ms] (mean) #gevent耗时少 Time per request:       0.281 [ms] (mean, across all concurrent requests) #gevent并发请求时耗时少 Time per request:       41.194 [ms] (mean) #paste Time per request:       0.412 [ms] (mean, across all concurrent requests) #paste

4 传输效率:

Transfer rate:          448.26 [Kbytes/sec] received #gevent的效率更高 Transfer rate:          379.30 [Kbytes/sec] received #paste

5 连接消耗的时间的分解:

Connection Times (ms) min  mean[+/-sd] median   max Connect:        0    0   0.2      0       2 Processing:     2   28  15.1     27      69 Waiting:        1   28  15.1     27      69 Total:          2   28  15.1     27      69

Connection Times (ms) #paste min  mean[+/-sd] median   max Connect:        0    0   0.2      0       2 Processing:     2   41   5.4     41     107 Waiting:        1   41   5.2     40      97 Total:          4   41   5.3     41     107 #明显其中最大用时107/97都大于gevent的69ms,最小用时gevent略强

6 整个场景中所有请求的响应情况。在场景中每个请求都有一个响应时间

Percentage of the requests served within a certain time (ms) #gevent 50%     29 66%     31 75%     34 80%     34 90%     36 95%     38 98%     42 99%     44 100%     71 (longest request)

可以这样理解:50%用户效应小于29ms,60%用户响应小于31ms,最长的访问响应为71ms Percentage of the requests served within a certain time (ms) #paste 50%     41 66%     41 75%     42 80%     43 90%     46 95%     50 98%     56 99%     59 100%    107 (longest request)  #很明显,无论那个区间,paste性能都略差

4 长轮询

import gevent
from gevent.queue import Queue, Empty
from gevent.pywsgi import WSGIServer
import json

data_source = Queue()

def producer(): while True: data_source.put_nowait('Hello World') #往队列非阻塞的放入数据 gevent.sleep(1)

def ajax_endpoint(environ, start_response): status = '200 OK' headers = [ ('Content-Type', 'application/json') #设定网络文件的类型是json ] try: datum = data_source.get(timeout=5) except Empty: datum = [] #假如gevent.sleep的时间设置的长一些(比如5s),在不停刷新过程中会获得空列表

start_response(status, headers) return json.dumps(datum) #返回数据,打印出来的数据是一个带引号的字符串

gevent.spawn(producer)

WSGIServer(('', 8000), ajax_endpoint).serve_forever()

4 聊天室(源码在这里https://github.com/sdiehl/minichat.git):

from gevent import monkey
monkey.patch_all() #给模块打包
from flask import Flask, render_template, request, json #作者在这里使用了flask框架,当然你也可以用其它比如django.tornado,bottle等

from gevent import queue from gevent.pywsgi import WSGIServer

app = Flask(__name__) app.debug = True

class Room(object):

def __init__(self): self.users = set() self.messages = []

def backlog(self, size=25): return self.messages[-size:]

def subscribe(self, user): self.users.add(user)

def add(self, message): for user in self.users: print user user.queue.put_nowait(message) self.messages.append(message)

class User(object):

def __init__(self): self.queue = queue.Queue()

rooms = { 'python': Room(), 'django': Room(), }

users = {}

@app.route('/') #flask指定url的处理使用路由的方式,访问页面地址根目录就会执行choose_name def choose_name(): return render_template('choose.html') #然后调用模板choose.html,这个html文件最后使用了GET方法提交了一个uid页面(/<uid>)

@app.route('/<uid>') #请求被转到了这里 def main(uid): return render_template('main.html', #调用模板提供几个room的连接 uid=uid, rooms=rooms.keys() #格局选择的连接,通过GET方法转到那个相应url:/<room>/<uid> )

@app.route('/<room>/<uid>') #请求被转到了这里 def join(room, uid): user = users.get(uid, None)

if not user: users[uid] = user = User()

active_room = rooms[room] active_room.subscribe(user) print 'subscribe', active_room, user

messages = active_room.backlog()

return render_template('room.html', #room.html包含一个POST提交方式,把你的聊天数据提交,并且更新页面(通过jquery的ajax调用url/poll/<uid>) room=room, uid=uid, messages=messages)

@app.route("/put/<room>/<uid>", methods=["POST"]) #通过这个url def put(room, uid): user = users[uid] room = rooms[room]

message = request.form['message'] room.add(':'.join([uid, message]))

return ''

@app.route("/poll/<uid>", methods=["POST"]) def poll(uid): try: msg = users[uid].queue.get(timeout=10) except queue.Empty: msg = [] return json.dumps(msg) #返回队列中包含的聊天记录

if __name__ == "__main__": http = WSGIServer(('', 5000), app) http.serve_forever()

来一个更复杂带有前台后端的模型(例子来自http://blog.pythonisito.com/2011/07/gevent-zeromq-websockets-and-flot-ftw.html):

源码在:http://dl.dropbox.com/u/24086834/blog/20110723/zmq_websocket.tar.gz

其中需要修改graph.js第二行:

var ws = new WebSocket(“ws://localhost:9999/test”);

为:

var ws = new MozWebSocket(“ws://localhost:9999/test”);  #因为我的火狐用的websocket不同

这个demo.py,我来解析下:

import os
import time
import math
import json
import webbrowser

import paste.urlparser #paste是一个WSGI工具包,在WSGI的基础上包装了几层,让应用管理和实现变得方便

import gevent from gevent_zeromq import zmq from geventwebsocket.handler import WebSocketHandler #基于gevent的pywsgi的WebSocket的处理程序

def main(): #主方法 context = zmq.Context() gevent.spawn(zmq_server, context) #上个例子使用joinall,这个例子是spawn+start,context是参数,也就是实例化的GreenContext ws_server = gevent.pywsgi.WSGIServer( ('', 9999), WebSocketApp(context), handler_class=WebSocketHandler) http_server = gevent.pywsgi.WSGIServer( ('', 8000), paste.urlparser.StaticURLParser(os.path.dirname(__file__))) # paste.urlparser用来处理url和静态文件 http_server.start() #启动grennlet实例 ws_server.start() webbrowser.open('http://localhost:8000/graph.html') #启动浏览器看这个页面,当正常启动后js会画图 zmq_producer(context)

def zmq_server(context): sock_incoming = context.socket(zmq.SUB) sock_outgoing = context.socket(zmq.PUB) sock_incoming.bind('tcp://*:5000') #发布绑定 sock_outgoing.bind('inproc://queue') #订阅绑定,本地(通过内存)进程(线程间)通信传输 sock_incoming.setsockopt(zmq.SUBSCRIBE, "") #这里表示对发布的所有信息都订阅 while True: msg = sock_incoming.recv() sock_outgoing.send(msg)

class WebSocketApp(object):

def __init__(self, context): self.context = context

def __call__(self, environ, start_response): ws = environ['wsgi.websocket'] sock = self.context.socket(zmq.SUB) sock.setsockopt(zmq.SUBSCRIBE, "") #订阅所有信息 sock.connect('inproc://queue') #websocket连接到订阅的地址 while True: msg = sock.recv() ws.send(msg)

def zmq_producer(context): #发布的方法 socket = context.socket(zmq.PUB) socket.connect('tcp://127.0.0.1:5000') #绑定到发布的socket

while True: x = time.time() * 1000 y = 2.5 * (1 + math.sin(x / 500)) socket.send(json.dumps(dict(x=x, y=y))) #往发布socket发送数据,这样,数据会被inproc://queue订阅,而被websocket获取,根据数据展示 gevent.sleep(0.05)

if __name__ == '__main__': main()

 本文主要说删除

metadata.drop_all(engine) #删除某引擎的全部表

metadata.remove(test_table)  #删除某一个table

clear_mappers() #取消所有的映射

在relation中有一个参数cascade,它是基于session的操作,包括把对象放入session,从session删除对象等,如果指定cascade=”all”表示做的任何session操作给映射类都能很好的工作,默认包含save-update, merge mapper(ParentClass, parent, properties=dict( children=relation(ChildClass, backref=’parent’, cascade=’all,delete-orphan’) )) #delete-orphan表示如果曾经是子类(childclass)实例但是却没有和父类连接的情况下,假如要删除这个子类,而不想挂空父类引用了的实例, 额看个例子就懂了: photo = Table( … ‘photo’, metadata, … Column(‘id’, Integer, primary_key=True)) tag = Table( … ‘tag’, metadata, … Column(‘id’, Integer, primary_key=True), … Column(‘photo_id’, None, ForeignKey(‘photo.id’)), … Column(‘tag’, String(80))) class Photo(object): … pass … class Tag(object): … def __init__(self, tag): … self.tag = tag … mapper(Photo, photo, properties=dict( … tags=relation(Tag, backref=’photo’, cascade=”all”))) <Mapper at 0x851504c; Photo> >>> mapper(Tag, tag) <Mapper at 0x8515dac; Tag> >>> p1 = Photo() >>> p2 = Photo() >>> p1.tags = [Tag(tag=’foo’),Tag(tag=’bar’),Tag(tag=’baz’)] >>> p2.tags = [Tag(tag=’foo’),Tag(tag=’bar’),Tag(tag=’baz’)] >>> session.add(p1) >>> session.add(p2) >>> session.flush() >>> for t in session.query(Tag): … print t.id,t.photo_id, t.tag … 1 1 foo #出现以下关联数据 2 1 bar 3 1 baz 4 2 foo 5 2 bar 6 2 baz >>> session.delete(session.query(Photo).get(1)) #删除一个tag的数据 >>> session.flush() >>> for t in session.query(Tag): … print t.id, t.photo_id, t.tag … 4 2 foo #他会删除关联所有t.photo_id为1的数据,在这里relation(ChildClass, backref=’parent’, cascade=’all,delete-orphan’)指定delete-orphan没什么,关键看下面 5 2 bar 6 2 baz >>> p3 = session.query(Photo).get(2) >>> del p3.tags[0] #如果我只是删除关联点… >>> session.flush() >>> for t in session.query(Tag): … print t.id, t.photo_id, t.tag … 4 None foo #关联点photo_id成了none,但是条目存在 –他不会影响其它关联表 5 2 bar 6 2 baz

>>> p3 = session.query(Photo).get(2) #假如没有设置delete-orphan >>> del p3.tags[0] >>> session.flush() >>> for t in session.query(Tag): … print t.id, t.photo_id, t.tag 5 2 bar #自动删除了关联的其它表的项 6 2 baz 注:可用的cascade参数包含:

  • save-update -我的理解是调用session.add()会自动将项目添加到相应级联关系上,也适用于已经从关系中删除的项目嗨没有来得及刷新的情况
  • merge - 它是session.merge的实现,复制状态到具有相同标识符的持久化实例的实例,如果没有持久的实例和当前session相关联,返回的持久化实例。如果给定的实例未保存,他会保存一个副本,并返回这个副本作为一个新的持久化实例
  • expunge - 从session中删除实例
  • delete - 标记一个实例被删除,执行flush()会执行删除操作
  • delete-orphan-如果子类从母类删除,标记之,但是不影响母类
  • refresh-expire - 定期刷新在给定的实例的属性,查询并刷新数据库
  • all - 以上全部属性的集合:“save-update,merge, refresh-expire, expunge, delete

本文主要是面向对象的继承映射到关系数据库表的方法

>>> class Product(object): …     def __init__(self, sku, msrp): …         self.sku = sku …         self.msrp = msrp …     def __repr__(self): …         return ‘<%s %s>’ % ( …             self.__class__.__name__, self.sku) … >>> class Clothing(Product): …     def __init__(self, sku, msrp, clothing_info): …         Product.__init__(self, sku, msrp) #继承了Product …         self.clothing_info = clothing_info … >>> class Accessory(Product): …     def __init__(self, sku, msrp, accessory_info): …         Product.__init__(self, sku, msrp) #继承了Product …         self.accessory_info = accessory_info 也就是这样的意思:

这个单表继承中(如下图,黑色的表示没有被映射):

从创建表结构是这样:

>>> product_table = Table( …     ‘product’, metadata, …     Column(‘sku’, String(20), primary_key=True), …     Column(‘msrp’, Numeric), …     Column(‘clothing_info’, String), …     Column(‘accessory_info’, String), …     Column(‘product_type’, String(1), nullable=False)) #一个新的字段 >>> mapper( …     Product, product_table, …     polymorphic_on=product_table.c.product_type, #映射继承层次结构使用polymorphic_on表示继承在product_type字段,值是polymorphic_identity指定的标识 …     polymorphic_identity=’P’) #标识继承 Product ,父类 <Mapper at 0x85833ec; Product> >>> mapper(Clothing, inherits=Product, …        polymorphic_identity=’C’)   #标识继承Clothing product <Mapper at 0x858362c; Clothing> >>> >>> mapper(Accessory, inherits=Product, #继承至Product …        polymorphic_identity=’A’) #标识继承Accessory <Mapper at 0x8587d8c; Accessory> >>> products = [  #创建一些产品 …     Product(‘123’, 11.22), …     Product(‘456’, 33.44), …     Clothing(‘789’, 123.45, “Nice Pants”), …     Clothing(‘111’, 125.45, “Nicer Pants”), …     Accessory(‘222’, 24.99, “Wallet”), …     Accessory(‘333’, 14.99, “Belt”) ] >>> Session = sessionmaker() >>> session = Session() >>> for p in products: …     session.add(p) … >>> session.flush() >>> print session.query(Product).all() #全部都有 [<Product 123>, <Product 456>, <Clothing 789>, <Clothing 111>, <Accessory 222>, <Accessory 333>] >>> print session.query(Clothing).all()  #只显示2个 [<Clothing 789>, <Clothing 111>] >>> print session.query(Accessory).all() #只显示2个,是不是上面的映射效果和创建3个类而分别orm好的多呢? [<Accessory 222>, <Accessory 333>]

>>> for row in product_table.select().execute(): #从父类库查询,所有数据都有,只是product_type不同 …     print row … (u’123’, Decimal(‘11.2200000000’), None, None, u’P’) (u’456’, Decimal(‘33.4400000000’), None, None, u’P’) (u’789’, Decimal(‘123.4500000000’), u’Nice Pants’, None, u’C’) (u’111’, Decimal(‘125.4500000000’), u’Nicer Pants’, None, u’C’) (u’222’, Decimal(‘24.9900000000’), None, u’Wallet’, u’A’) (u’333’, Decimal(‘14.9900000000’), None, u’Belt’, u’A’)

具体的映射见下图:

查询一个没有的不存在的映射:

>>> print session.query(Accessory)[0].clothing_info None

具体表的继承

每个表包含的数据量,需要实现它的类;没有浪费的空间

>>> metadata.remove(product_table) >>> product_table = Table( …     ‘product’, metadata, …     Column(‘sku’, String(20), primary_key=True), …     Column(‘msrp’, Numeric)) >>> clothing_table = Table( …     ‘clothing’, metadata, …     Column(‘sku’, String(20), primary_key=True), …     Column(‘msrp’, Numeric), …     Column(‘clothing_info’, String)) >>> >>> accessory_table = Table( …     ‘accessory’, metadata, …     Column(‘sku’, String(20), primary_key=True), …     Column(‘msrp’, Numeric), …     Column(‘accessory_info’, String)) >>> 摄像我们想要获取Product’sku’是222的数据(没有其他额外的工作),我们不得不层次型的查询每个类,请看这个例子:

>>> punion = polymorphic_union( …     dict(P=product_table, …     C=clothing_table, …     A=accessory_table), …     ‘type_’) >>> >>> print punion SELECT accessory.sku, accessory.msrp, accessory.accessory_info, CAST(NULL AS VARCHAR) AS clothing_info, ‘A’ AS type_ FROM accessory UNION ALL SELECT product.sku, product.msrp, CAST(NULL AS VARCHAR) AS accessory_info, CAST(NULL AS VARCHAR) AS clothing_info, ‘P’ AS type_ FROM product UNION ALL SELECT clothing.sku, clothing.msrp, CAST(NULL AS VARCHAR) AS accessory_info, clothing.clothing_info, ‘C’ AS type_ FROM clothing 现在我们就有了一个很好的标记了(C,A,P)

>>> mapper( …     Product, product_table, with_polymorphic=(‘*’, punion),  #使用with_polymorphic=(‘*’, punion)的方式映射父类,指定不同表选择,实现多态,并且提高了性能(只select了一次) …     polymorphic_on=punion.c.type_, …     polymorphic_identity=’P’) <Mapper at 0x8605b6c; Product> >>> mapper(Clothing, clothing_table, inherits=Product, … polymorphic_identity=’C’, … concrete=True) <Mapper at 0x84f1bac; Clothing> >>> mapper(Accessory, accessory_table, inherits=Product, … polymorphic_identity=’A’, … concrete=True) <Mapper at 0x858770c; Accessory>

>>> session.query(Product).get(‘222’) <Accessory 222>

 

 

 

本文主要是ORM的sission查询和更新

session负责执行内存中的对象和数据库表之间的同步工作,创建session可以这样:

Session = sessionmaker(bind=engine) #sqlalchemy.orm.session.Session类有很多参数,使用sessionmaker是为了简化这个过程

或者: Session = sessionmaker() Session.configure(bind=engine)

注:sessionmaker的参数: autoflush=True  #为True时,session将在执行session的任何查询前自动调用flush()。这将确保返回的结果

transactional=False #为True时,session将自动使用事务commit twophase=False #当处理多个数据库实例,当使用flush()但是没有提交事务commit时,给每个数据库一个标识,使整个事务回滚

创建session,添加数据的例子(以前也出现过很多次了) dongwm@localhost ~ $ python Python 2.7.3 (default, Jul 11 2012, 10:10:17) [GCC 4.5.3] on linux2 Type “help”, “copyright”, “credits” or “license” for more information. >>> from sqlalchemy import * >>> from sqlalchemy.orm import * >>> engine = create_engine(‘sqlite://’) >>> metadata = MetaData(engine) >>> account_table = Table( … ‘account’, metadata, … Column(‘id’, Integer, primary_key=True), … Column(‘balance’, Numeric)) >>> class Account(object): pass … >>> mapper(Account, account_table) <Mapper at 0x84e6f2c; Account> >>> account_table.create() >>> a = Account() >>> a.balance = 100.00 >>> Session = sessionmaker(bind=engine) >>> session = Session() >>> session.add(a) >>> session.flush() >>> session.delete(a) #自动删除 account_table相应条目,但是在1:N和M:N关系中不会自动删除它的级联关系 >>> session.flush()

注:session的对象状态:

Transient:短暂的,主要指内存中的对象

Pending:挂起的,这样的对象准备插入数据库,等执行了flush就会插入

Persistent:持久的

Detached:对象在数据库里面有记录,但是不属于session

>>> make_transient(a)  #因为标识了已删除,恢复a的状态 >>> session.add(a) #重新添加 >>> session.flush() >>> query = session.query(Account) >>> print query SELECT account.id AS account_id, account.balance AS account_balance FROM account >>> for obj in query: …     print obj … <__main__.Account object at 0x84eef0c>

>>> query.all()  #查询所有 [<__main__.Account object at 0x84eef0c>] >>> query = query.filter(Account.balance > 10.00)  #filter过滤 >>> for obj in query: …     print obj.balance …

100.00

>>> for i in session.query(Account).filter_by(balance=100.00 ):  #通过条件过滤 …     print i >>> query = session.query(Account) >>> query = query.from_statement(‘select *from account where balance=:bac’) #通过带通配符的SQL语句其中:bac标识这个参数是bac >>> query = query.params(bac=’100.00’) #根据bac指定值寻找 >>> print query.all() [<__main__.Account object at 0x84eef0c>]

本地session

>>> Session = scoped_session(sessionmaker(  #设置一个本地的共享session …     bind=engine, autoflush=True)) >>> session = Session() >>> session2 = Session() >>> session is session2  #他们是同一个 True

>>> a = Account() >>> a.balance = 100.00 >>> Session.add(a) #注意 这是的’S’是大写 >>> Session.flush() >>> b = Account() >>> a.balance = 200.00 >>> session.add(a)  #其实他们是一个共享的session 名字都可以 >>> session.flush() >>> print session.query(Account).all() #查询到了2个 [<__main__.Account object at 0x851be0c>, <__main__.Account object at 0x84f7d6c>]

注:这样的映射mapper也可以这样是用:

mapper(Product, product_table, properties=dict( categories=relation(Category, secondary=product_category_table, backref=’products’)))

Session.mapper(Product, product_table, properties=dict( categories=relation(Category, secondary=product_category_table, backref=’products’))) #它的优点是可以初始化参数

本文主要是讲关于sqlalchemy的扩展

扩展其实就是一些外部的插件,比如sqlsoup,associationproxy,declarative,horizontal_shard等等

1 declarative

假如想要数据映射,以前的做法是:

from sqlalchemy import create_engine
from sqlalchemy import Column, MetaData, Table
from sqlalchemy import Integer, String, ForeignKey
from sqlalchemy.orm import mapper, sessionmaker

class User(object): #简单类 def __init__(self, name, fullname, password): self.name = name self.fullname = fullname self.password = password def __repr__(self): return "<User('%s','%s', '%s')>" % (self.name, self.fullname, self.password) metadata = MetaData() users_table = Table('users', metadata, Column('user_id', Integer, primary_key=True), Column('name', String), Column('fullname', String), Column('password', String) ) email_table = Table('email', metadata, Column('email_id', Integer, primary_key=True), Column('email_address', String), Column('user_id', Integer, ForeignKey('users.user_id')) ) metadata.create_all(engine)

mapper(User, users_table) #映射

但是我们可以该换风格,可以用这样的方法:

from sqlalchemy import Column, Integer, String, ForeignKey
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import backref, mapper, relation, sessionmaker

Base = declarative_base()

class User(Base): __tablename__ = "users" #设定接收映射的表名 id = Column(Integer, primary_key=True) #将表结构写到类里面 name = Column(String) fullname = Column(String) password = Column(String) def __init__(self, name, fullname, password): self.name = name self.fullname = fullname self.password = password def __repr__(self): return "<User('%s','%s', '%s')>" % (self.name, self.fullname, self.password)

class Address(Base): __tablename__ = "addresses" id = Column(Integer, primary_key=True) email_address = Column(String, nullable=False) user_id = Column(Integer, ForeignKey('users.id')) user = relation(User, backref=backref('addresses', order_by=id)) #创建双向关系,标识以user的id为连接,也就是说:Address到User是多对一,User到Address是一对多 def __init__(self, email_address): self.email_address = email_address def __repr__(self): return "<Address('%s')>" % self.email_address engine = create_engine("sqlite:///tutorial.db", echo=True)

users_table = User.__table__ #获取User表对象句柄 metadata = Base.metadata #获取metadata句柄 metadata.create_all(engine)

下面具体说:

engine = create_engine(‘sqlite://’) #创建引擎 Base.metadata.create_all(engine) #常见表 Base.metadata.bind = create_engine(‘sqlite://’) #绑定 Base = declarative_base(bind=create_engine(‘sqlite://’)) #绑定引擎 mymetadata = MetaData() Base = declarative_base(metadata=mymetadata) #设定元数据设定简单关系: class User(Base): __tablename__ = ‘users’id = Column(Integer, primary_key=True) name = Column(String(50)) addresses = relationship(“Address”, backref=”user”) #relationship其实就是relation的全称

class Address(Base): __tablename__ = ‘addresses’

id = Column(Integer, primary_key=True) email = Column(String(50)) user_id = Column(Integer, ForeignKey(‘users.id’)) 设定多对多关系: keywords = Table( ‘keywords’, Base.metadata, Column(‘author_id’, Integer, ForeignKey(‘authors.id’)), Column(‘keyword_id’, Integer, ForeignKey(‘keywords.id’)) ) class Author(Base): __tablename__ = ‘authors’ id = Column(Integer, primary_key=True) keywords = relationship(“Keyword”, secondary=keywords) 定义SQL表达式: class MyClass(Base): __tablename__ = ‘sometable’ __table_args__ = {‘mysql_engine’:’InnoDB’} #名字,映射类,元数据之外的指定需要使用__table_args__

或者: class MyClass(Base): __tablename__ = ‘sometable’ __table_args__ = ( ForeignKeyConstraint([‘id’], [‘remote_table.id’]), #元组方式 UniqueConstraint(‘foo’), )

或者: class MyClass(Base): __tablename__ = ‘sometable’ __table_args__ = ( ForeignKeyConstraint([‘id’], [‘remote_table.id’]), UniqueConstraint(‘foo’), {‘autoload’:True} #最后的参数可以用字典 想想*argsand **kwargs ) 使用混合式: class MyClass(Base): __table__ = Table(‘my_table’, Base.metadata, #在__table__里指定表结构 Column(‘id’, Integer, primary_key=True), Column(‘name’, String(50)) ) 2 sqlsoup(在sqlalchemy0.8版本后他变成了一个独立的项目,http://pypi.python.org/pypi/sqlsoup,

而我使用gentoo提供的0.7.8版本,以下的程序import部分可能不适用更高版本,而需要import sqlsoup)

sqlsoup提供一个方便的访问数据库的接方式,而无需创建类,映射数据库

还是看例子的对比:

用以前的方式创建一个数据库并且插入一些数据:

>>> from sqlalchemy import * >>> engine = create_engine(‘sqlite:///dongwm.db’) >>> metadata = MetaData(engine) >>> product_table = Table( …     ‘product’, metadata, …     Column(‘sku’, String(20), primary_key=True), …     Column(‘msrp’, Numeric)) >>> store_table = Table( …     ‘store’, metadata, …     Column(‘id’, Integer, primary_key=True), …     Column(‘name’, Unicode(255))) >>> product_price_table = Table( …     ‘product_price’, metadata, … Column(‘sku2’, None, ForeignKey(‘product.sku’), primary_key=True), … Column(‘store_id’, None, ForeignKey(‘store.id’), primary_key=True), …     Column(‘price’, Numeric, default=0)) >>> metadata.create_all() >>> stmt = product_table.insert() >>> stmt.execute([dict(sku=”123”, msrp=12.34), …               dict(sku=”456”, msrp=22.12), …               dict(sku=”789”, msrp=41.44)]) <sqlalchemy.engine.base.ResultProxy object at 0x84fbdcc> >>> stmt = store_table.insert() >>> stmt.execute([dict(name=”Main Store”), …               dict(name=”Secondary Store”)]) <sqlalchemy.engine.base.ResultProxy object at 0x850068c> >>> stmt = product_price_table.insert() >>> stmt.execute([dict(store_id=1, sku=”123”), …               dict(store_id=1, sku2=”456”), …               dict(store_id=1, sku2=”789”), …               dict(store_id=2, sku2=”123”), …               dict(store_id=2, sku2=”456”), …               dict(store_id=2, sku2=”789”)]) <sqlalchemy.engine.base.ResultProxy object at 0x85008cc> 创建插入完毕,然后我们用sqlsoup连接操作:

>>> from sqlalchemy.ext.sqlsoup import SqlSoup >>> db = SqlSoup(‘sqlite:///dongwm.db’)  #连接一个存在的数据库 >>> print db.product.all() #打印结果 [MappedProduct(sku=u’123’,msrp=Decimal(‘12.3400000000’)), MappedProduct(sku=u’456’,msrp=Decimal(‘22.1200000000’)), MappedProduct(sku=u’789’,msrp=Decimal(‘41.4400000000’))] >>> print db.product.get(‘123’) #是不是比session.query(Product)简单呢? MappedProduct(sku=u’123’,msrp=Decimal(‘12.3400000000’))

注:假如想创建一个数据库: db = SqlSoup(‘sqlite:///:memory:’)

>>> newprod = db.product.insert(sku=’111’, msrp=22.44) #没有使用数据映射的插入 >>> db.flush() >>> db.clear() #调用底层,清除所有session实例,它是session.expunge_all的别名 >>> db.product.all() [MappedProduct(sku=u’123’,msrp=Decimal(‘12.3400000000’)), MappedProduct(sku=u’456’,msrp=Decimal(‘22.1200000000’)), MappedProduct(sku=u’789’,msrp=Decimal(‘41.4400000000’)), MappedProduct(sku=u’111’,msrp=Decimal(‘22.4400000000’))] #新条目已经存在了 #MappedProduct使用__getattr__将无法识别的属性和访问方法转发到它的query属性,它还提供了一些数据处理功能用于更新

>>> from sqlalchemy import or_, and_, desc >>> where = or_(db.product.sku==’123’, db.product.sku==’111’) >>> db.product.filter(where).order_by(desc(db.product.msrp)).all() #这样使用多条件过滤,降序排练 [MappedProduct(sku=’111’,msrp=22.44), MappedProduct(sku=u’123’,msrp=Decimal(‘12.3400000000’))]

>>> join1 = db.join(db.product, db.product_price, isouter=True) #关联2个表, isouter=True确保LEFT OUTER(还没理解) >>> join1.all() [MappedJoin(sku=u’123’,msrp=Decimal(‘12.3400000000’),sku2=u’123’,store_id=1,price=Decimal(‘0E-10’)),  #这个字段包含了2个表的相应字段 MappedJoin(sku=u’123’,msrp=Decimal(‘12.3400000000’),sku2=u’123’,store_id=2,price=Decimal(‘0E-10’)), MappedJoin(sku=u’456’,msrp=Decimal(‘22.1200000000’),sku2=u’456’,store_id=1,price=Decimal(‘0E-10’)), MappedJoin(sku=u’456’,msrp=Decimal(‘22.1200000000’),sku2=u’456’,store_id=2,price=Decimal(‘0E-10’)), MappedJoin(sku=u’789’,msrp=Decimal(‘41.4400000000’),sku2=u’789’,store_id=1,price=Decimal(‘0E-10’)), MappedJoin(sku=u’789’,msrp=Decimal(‘41.4400000000’),sku2=u’789’,store_id=2,price=Decimal(‘0E-10’)), MappedJoin(sku=u’111’,msrp=Decimal(‘22.4400000000’),sku2=None,store_id=None,price=None)] >>> join2 = db.join(join1, db.store, isouter=True) #将store表也关联进来(因为也有一个外键),就是关联三个表 >>> join2.all() [MappedJoin(sku=u’123’,msrp=Decimal(‘12.3400000000’),sku2=u’123’,store_id=1,price=Decimal(‘0E-10’),id=1,name=u’Main Store’), MappedJoin(sku=u’123’,msrp=Decimal(‘12.3400000000’),sku2=u’123’,store_id=2,price=Decimal(‘0E-10’),id=2,name=u’Secondary Store’), MappedJoin(sku=u’456’,msrp=Decimal(‘22.1200000000’),sku2=u’456’,store_id=1,price=Decimal(‘0E-10’),id=1,name=u’Main Store’), MappedJoin(sku=u’456’,msrp=Decimal(‘22.1200000000’),sku2=u’456’,store_id=2,price=Decimal(‘0E-10’),id=2,name=u’Secondary Store’), MappedJoin(sku=u’789’,msrp=Decimal(‘41.4400000000’),sku2=u’789’,store_id=1,price=Decimal(‘0E-10’),id=1,name=u’Main Store’), MappedJoin(sku=u’789’,msrp=Decimal(‘41.4400000000’),sku2=u’789’,store_id=2,price=Decimal(‘0E-10’),id=2,name=u’Secondary Store’), MappedJoin(sku=u’111’,msrp=Decimal(‘22.4400000000’),sku2=None,store_id=None,price=None,id=None,name=None)] >>> join3 = db.with_labels(join1) #根据原籍标记,比如sku会说出:product_sku,告诉你它来着product表,但是指定了jion1,就不会标识关于store的表 >>> join3.first() MappedJoin(product_sku=u’123’,product_msrp=Decimal(‘12.3400000000’),product_price_sku2=u’123’,product_price_store_id=1,product_price_price=Decimal(‘0E-10’)) >>> db.with_labels(join2).first() MappedJoin(product_sku=u’123’,product_msrp=Decimal(‘12.3400000000’),product_price_sku2=u’123’,product_price_store_id=1,product_price_price=Decimal(‘0E-10’),store_id=1,store_name=u’Main Store’) >>> labelled_product = db.with_labels(db.product) >>> join4 = db.join(labelled_product, db.product_price,  isouter=True) >>> join4.first() MappedJoin(product_sku=u’123’,product_msrp=Decimal(‘12.3400000000’),sku2=u’123’,store_id=1,price=Decimal(‘0E-10’))

>>> db.clear() >>> join5 = db.join(db.product, db.product_price) >>> s = select([db.product._table, …     func.avg(join5.c.price).label(‘avg_price’)], #添加一个字段计算产品(product)的price平均值,字段名为avg_price …     from_obj=[join5._table], …     group_by=[join5.c.sku]) >>> s = s.alias(‘products_with_avg_price’) #它是from sqlalchemy import alias; a = alias(self, name=name)的简写 >>> products_with_avg_price = db.map(s, primary_key=[join5.c.sku]) #因为没有映射到表或者join,需要指定如何找到主键 >>> products_with_avg_price.all() [MappedJoin(sku=u’123’,msrp=Decimal(‘12.3400000000’),avg_price=0.0), MappedJoin(sku=u’456’,msrp=Decimal(‘22.1200000000’),avg_price=0.0), MappedJoin(sku=u’789’,msrp=Decimal(‘41.4400000000’),avg_price=0.0)] >>> db.product_price.first().price = 50.00 >>> db.flush() >>> products_with_avg_price.all() [MappedJoin(sku=u’123’,msrp=Decimal(‘12.3400000000’),avg_price=0.0), MappedJoin(sku=u’456’,msrp=Decimal(‘22.1200000000’),avg_price=0.0), MappedJoin(sku=u’789’,msrp=Decimal(‘41.4400000000’),avg_price=0.0)] >>> db.products_with_avg_price = products_with_avg_price #保存映射到db,方便重用 >>> msrp=select([db.product.c.msrp], …     db.product.sku==db.product_price.sku2) #获取sku和sku2相等时候msrp的值 >>> db.product_price.update(  #更新数据 …     values=dict(price=msrp),synchronize_session=False) #设置price这个字段值为上面对应的msrp

6 >>> db.product_price.all() [MappedProduct_price(sku2=u’123’,store_id=1,price=Decimal(‘12.3400000000’)), MappedProduct_price(sku2=u’456’,store_id=1,price=Decimal(‘22.1200000000’)), MappedProduct_price(sku2=u’789’,store_id=1,price=Decimal(‘41.4400000000’)), MappedProduct_price(sku2=u’123’,store_id=2,price=Decimal(‘12.3400000000’)), MappedProduct_price(sku2=u’456’,store_id=2,price=Decimal(‘22.1200000000’)), MappedProduct_price(sku2=u’789’,store_id=2,price=Decimal(‘41.4400000000’))]

3 associationproxy

associationproxy用于创建一个读/写整个关系的目标属性

看一个例子就懂了:

>>> user_table = Table( …     ‘user’, metadata, …     Column(‘id’, Integer, primary_key=True), …     Column(‘user_name’, String(255), unique=True), …     Column(‘password’, String(255))) >>> brand_table = Table( …     ‘brand’, metadata, …     Column(‘id’, Integer, primary_key=True), …     Column(‘name’, String(255))) >>> sales_rep_table = Table( …     ‘sales_rep’, metadata, … Column(‘brand_id’, None, ForeignKey(‘brand.id’), primary_key=True), … Column(‘user_id’, None, ForeignKey(‘user.id’), primary_key=True), …     Column(‘commission_pct’, Integer, default=0)) >>> class User(object): pass … >>> class Brand(object): pass … >>> class SalesRep(object): pass … >>> mapper(User, user_table, properties=dict( …     sales_rep=relation(SalesRep, backref=’user’, uselist=False))) <Mapper at 0x87472ec; User> >>> mapper(Brand, brand_table, properties=dict( …     sales_reps=relation(SalesRep, backref=’brand’))) <Mapper at 0x874770c; Brand> >>> mapper(SalesRep, sales_rep_table) <Mapper at 0x874768c; SalesRep>

ORM完成,但是假如我们想要brand(品牌)类对象的一个所有SalesReps for Brand(品牌的销售代表)的User列表属性,可以这样:

class Brand(object): @property def users(self): return [ sr.user for sr in self.sales_reps ]

但是不方便增加删除,而使用association_proxy:

>>> from sqlalchemy.ext.associationproxy import association_proxy >>> class Brand(object): …         users=association_proxy(‘sales_reps’, ‘user’) …

或者:

mapper(Brand, brand_table, properties=dict( sales_reps=relation(SalesRep, backref=’brand’))) Brand.users=association_proxy(‘sales_reps’, ‘user’)#优点是维持了域对象

我们需要修改类,增加属性:

class User(object): def __init__(self, user_name=None, password=None): self.user_name=user_name self.password=password

class Brand(object): def __init__(self, name=None): self.name = name

class SalesRep(object): def __init__(self, user=None, brand=None, commission_pct=0): self.user = user self.brand = brand self.commission_pct=commission_pct

看下面的效果:

>>> b = Brand(‘Cool Clothing’) >>> session.add(b) >>> u = User(‘rick’, ‘foo’) >>> session.add(u) >>> session.flush() 2012-07-20 12:22:33,191 INFO sqlalchemy.engine.base.Engine INSERT INTO user (user_name, password) VALUES (?, ?) 2012-07-20 12:22:33,191 INFO sqlalchemy.engine.base.Engine (‘rick’, ‘foo’) 2012-07-20 12:22:33,191 INFO sqlalchemy.engine.base.Engine INSERT INTO brand (name) VALUES (?) 2012-07-20 12:22:33,191 INFO sqlalchemy.engine.base.Engine (‘Cool Clothing’,) >>> b.users 2012-07-20 12:22:42,135 INFO sqlalchemy.engine.base.Engine SELECT sales_rep.brand_id AS sales_rep_brand_id, sales_rep.user_id AS sales_rep_user_id, sales_rep.commission_pct AS sales_rep_commission_pct FROM sales_rep WHERE ? = sales_rep.brand_id 2012-07-20 12:22:42,135 INFO sqlalchemy.engine.base.Engine (2,) [] >>> b.users.append(u) #自动创建一个单一的位置参数调用其中介(SalesRep)对象 2012-07-20 12:22:46,782 INFO sqlalchemy.engine.base.Engine SELECT sales_rep.brand_id AS sales_rep_brand_id, sales_rep.user_id AS sales_rep_user_id, sales_rep.commission_pct AS sales_rep_commission_pct FROM sales_rep WHERE ? = sales_rep.user_id 2012-07-20 12:22:46,782 INFO sqlalchemy.engine.base.Engine (2,) >>> b.users [<__main__.User object at 0x87d7b6c>] >>> b.sales_reps [<__main__.SalesRep object at 0x87d7c4c>] >>> b.sales_reps[0].commission_pct 0 >>> session.flush() 2012-07-20 12:23:14,215 INFO sqlalchemy.engine.base.Engine INSERT INTO sales_rep (brand_id, user_id, commission_pct) VALUES (?, ?, ?) 2012-07-20 12:23:14,215 INFO sqlalchemy.engine.base.Engine (2, 2, 0)

更复杂的想法给销售人员一个10%的提成:

Brand.users=association_proxy( ‘sales_reps’, ‘user’, creator=lambda u:SalesRep(user=u, commission_pct=10))

假设我们想要的品牌属性是一个附带User和佣金commission_pct的字典:

from sqlalchemy.orm.collections import attribute_mapped_collection >>> from sqlalchemy.orm.collections import attribute_mapped_collection >>> reps_by_user_class=attribute_mapped_collection(‘user’) >>> clear_mappers() >>> mapper(Brand, brand_table, properties=dict( …     sales_reps_by_user=relation( …         SalesRep, backref=’brand’, …         collection_class=reps_by_user_class))) <Mapper at 0x862c5ec; Brand>

>>> Brand.commissions=association_proxy( …     ‘sales_reps_by_user’, ‘commission_pct’, …     creator=lambda key,value: SalesRep(user=key, commission_pct=value)) >>> mapper(User, user_table, properties=dict( …     sales_rep=relation(SalesRep, backref=’user’, uselist=False))) <Mapper at 0x8764b2c; User> >>> mapper(SalesRep, sales_rep_table) <Mapper at 0x87bb4cc; SalesRep> >>> b = session.query(Brand).get(1) >>> u = session.query(User).get(1) >>> b.commissions[u] = 20 >>> session.bind.echo = False >>> session.flush() >>> b = session.query(Brand).get(1) >>> u = session.query(User).get(1) >>> u.user_name u’dongwm’ >>> print b.commissions[u] 20 >>> print b.sales_reps_by_user[u] #代理和原来的关系是自动同步的 <__main__.SalesRep object at 0x87e3dcc> >>> print b.sales_reps_by_user[u].commission_pct 20

 

 

 

前言:最近开始学习SQLAlchemy,本教程是其官方文档以及在读英文版<Essential SQLAlchemy>的翻译加一些自己的理解和总结

1 什么是 SQLAlchemy?

它是给mysql, oracle,sqlite等关系型数据库的python接口,不需要大幅修改原有的python代码,它已经包含了SQL表达式语言和ORM,看一些例子:

sql=”INSERT INTO user(user_name, password) VALUES (%s, %s)” cursor = conn.cursor() cursor.execute(sql, (‘dongwm’, ‘testpass’))

以上是一个常用的mysql的SQL语句,但是冗长也容易出错,并且可能导致安全问题(因为是字符串的语句,会存在SQL注入),并且代码不跨平台,在不同数据库软件的语句不同(以下是一个 Oracle例子),不具备客移植性:

sql=”INSERT INTO user(user_name, password) VALUES (:1, :2)” cursor = conn.cursor() cursor.execute(sql, ‘dongwm’, ‘testpass’)

而在SQLAlchemy里只需要这样写:

statement = user_table.insert(user_name=’rick’, password=’parrot’) statement.execute()  #护略是什么数据库环境

SQLAlchemy还能让你写出很pythonic的语句:

statement = user_table.select(and_( user_table.c.created >= date(2007,1,1), user_table.c.created < date(2008,1,1)) result = statement.execute()  #检索所有在2007年创建的用户

metadata=MetaData(‘sqlite://’) # 告诉它你设置的数据库类型是基于内存的sqlite user_table = Table(  #创建一个表 ‘tf_user’, metadata, Column(‘id’, Integer, primary_key=True),  #一些字段,假设你懂SQL,那么以下的字段很好理解 Column(‘user_name’, Unicode(16), unique=True, nullable=False), Column(‘email_address’, Unicode(255), unique=True, nullable=False), Column(‘password’, Unicode(40), nullable=False), Column(‘first_name’, Unicode(255), default=”), Column(‘last_name’, Unicode(255), default=”), Column(‘created’, DateTime, default=datetime.now))

users_table = Table(‘users’, metadata, autoload=True) #假设table已经存在.就不需要指定字段,只是加个autoload=True

class User(object): pass  #虽然SQLAlchemy强大,但是插入更新还是需要手动指定,可以使用ORM,方法就是:设定一个类,定义一个表,把表映射到类里面 mapper(User, user_table)

下面是一个完整ORM的例子:

from sqlalchemy.orm import mapper, sessionmaker #sessionmaker() 函数是最常使用的创建最顶层可用于整个应用 Session 的方法,Session 管理着所有与数据库之间的会话
from datetime import datetime
from sqlalchemy import Table, MetaData, Column, ForeignKey, Integer, String, Unicode, DateTime #会SQL的人能理解这些函数吧?
engine = create_engine("sqlite:///tutorial.db", echo=True) #创建到数据库的连接,echo=True 表示用logging输出调试结果
metadata = MetaData() #跟踪表属性
user_table = Table( #创建一个表所需的信息:字段,表名等
'tf_user', metadata,
Column('id', Integer, primary_key=True),
Column('user_name', Unicode(16), unique=True, nullable=False),
Column('email_address', Unicode(255), unique=True, nullable=False),
Column('password', Unicode(40), nullable=False),
Column('first_name', Unicode(255), default=''),
Column('last_name', Unicode(255), default=''),
Column('created', DateTime, default=datetime.now))
metadata.create_all(engine)  #在数据库中生成表
class User(object): pass #创建一个映射类
mapper(User, user_table) #把表映射到类
Session = sessionmaker() #创建了一个自定义了的 Session类
Session.configure(bind=engine)  #将创建的数据库连接关联到这个session
session = Session()
u = User()
u.user_name='dongwm'
u.email_address='dongwm@dongwm.com'
u.password='testpass'  #给映射类添加以下必要的属性,因为上面创建表指定这几个字段不能为空
session.add(u)  #在session中添加内容

session.flush() #保存数据 session.commit() #数据库事务的提交,sisson自动过期而不需要关闭

query = session.query(User) #query() 简单的理解就是select() 的支持 ORM 的替代方法,可以接受任意组合的 class/column 表达式 print list(query) #列出所有user print query.get(1) #根据主键显示 print query.filter_by(user_name='dongwm').first() #类似于SQL的where,打印其中的第一个 u = query.filter_by(user_name='dongwm').first() u.password = 'newpass' #修改其密码字段 session.commit() #提交事务 print query.get(1).password #打印会出现新密码

for instance in session.query(User).order_by(User.id): #根据id字段排序,打印其中的用户名和邮箱地址 print instance.user_name, instance.email_address

既然是ORM框架,我们来一个更复杂的包含关系的例子,先看sql语句:

CREATE TABLE tf_user ( id INTEGER NOT NULL, user_name VARCHAR(16) NOT NULL, email_address VARCHAR(255) NOT NULL, password VARCHAR(40) NOT NULL, first_name VARCHAR(255), last_name VARCHAR(255), created TIMESTAMP, PRIMARY KEY (id), UNIQUE (user_name), UNIQUE (email_address)); CREATE TABLE tf_group ( id INTEGER NOT NULL, group_name VARCHAR(16) NOT NULL, PRIMARY KEY (id), UNIQUE (group_name)); CREATE TABLE tf_permission ( id INTEGER NOT NULL, permission_name VARCHAR(16) NOT NULL, PRIMARY KEY (id), UNIQUE (permission_name)); CREATE TABLE user_group ( user_id INTEGER, group_id INTEGER, PRIMARY KEY(user_id, group_id), FOREIGN KEY(user_id) REFERENCES tf_user (id), #user_group的user_id关联了tf_user的id字段 FOREIGN KEY(group_id) REFERENCES tf_group (id));  #group_id关联了 tf_group 的id字段

CREATE TABLE group_permission ( group_id INTEGER, permission_id INTEGER, PRIMARY KEY(group_id, permission_id), FOREIGN KEY(group_id) REFERENCES tf_group (id),  #group_permission的id关联 tf_group的id字段 FOREIGN KEY(permission_id) REFERENCES tf_permission (id)); #permission_id关联了tf_permission 的id字段

这是一个复杂的多对多的关系,比如检查用户是否有admin权限,sql需要这样:

SELECT COUNT(*) FROM tf_user, tf_group, tf_permission WHERE tf_user.user_name=’dongwm’ AND tf_user.id=user_group.user_id AND user_group.group_id = group_permission.group_id AND group_permission.permission_id = tf_permission.id AND permission_name=’admin’;  看起来太复杂并且繁长了

在面向对象的世界里,是这样的:

class User(object): groups=[] class Group(object): users=[] permissions=[] class Permission(object): groups=[]

print 'Summary for %s' % user.user_name
for g in user.groups:
    print ' Member of group %s' % g.group_name
    for p in g.permissions:
        print '... which has permission %s' % p.permission_name

 

def user_has_permission(user, permission_name):  #检查用户是否有permission_name的权限的函数
    for g in user.groups:
        for p in g.permissions: #可以看出来使用了for循环
            if p.permission_name == 'admin':
                return True
    return False

而在SQLAlchemy中,这样做:

mapper(User, user_table, properties=dict( groups=relation(Group, secondary=user_group, backref=’users’))) #properties是一个字典值。增加了一个groups 值,它又是一个 relation 对象,这个对象实现

#了Group类与user_group的  映射。这样我通过user_table的groups 属性就可以反映出RssFeed的值来,

#中间表对象(user_group)传给secondary参数,backref为自己的表(users) mapper(Group, group_table, properties=dict( permissions=relation(Permission, secondary=group_permission, backref=’groups’))) mapper(Permission, permission_table)

 

q = session.query(Permission) dongwm_is_admin = q.count_by(permission_name=’admin’,user_name=’dongwm’)

假如计算组里用户数(不包含忘记删除但是重复的)

for p in permissions: users = set() for g in p.groups: for u in g.users: users.add(u) print ‘Permission %s has %d users’ % (p.permission_name, len(users))

在SQLAlchemy可以这样:

q=select([Permission.c.permission_name, func.count(user_group.c.user_id)], and_(Permission.c.id==group_permission.c.permission_id, Group.c.id==group_permission.c.group_id, Group.c.id==user_group.c.group_id), group_by=[Permission.c.permission_name], distinct=True) rs=q.execute() for permission_name, num_users in q.execute(): print ‘Permission %s has %d users’ % (permission_name, num_users) #虽然也长,但是减少了数据库查询次数,也就是让简单事情简单化,复杂事情可能简单解决

看一个综合的例子:

class User(object):  #这些类设计数据库的模型

def __init__(self, group_name=None, users=None, permissions=None): if users is None: users = [] if permissions is None: permissions = [] self.group_name = group_name self._users = users self._permissions = permissions

def add_user(self, user): self._users.append(user)

def del_user(self, user): self._users.remove(user)

def add_permission(self, permission): self._permissions.append(permission)

def del_permission(self, permission): self._permissions.remove(permission)

class Permission(object):

def __init__(self, permission_name=None, groups=None): self.permission_name = permission_name self._groups = groups

def join_group(self, group): self._groups.append(group)

def leave_group(self, group): self._groups.remove(group)

用sqlalchemy的效果是这样的:

user_table = Table( ‘tf_user’, metadata, Column(‘id’, Integer, primary_key=True), Column(‘user_name’, Unicode(16), unique=True, nullable=False), Column(‘password’, Unicode(40), nullable=False))

group_table = Table( ‘tf_group’, metadata, Column(‘id’, Integer, primary_key=True), Column(‘group_name’, Unicode(16), unique=True, nullable=False))

permission_table = Table( ‘tf_permission’, metadata, Column(‘id’, Integer, primary_key=True), Column(‘permission_name’, Unicode(16), unique=True, nullable=False))

user_group = Table( ‘user_group’,  metadata, Column(‘user_id’, None, ForeignKey(‘tf_user.id’), primary_key=True), Column(‘group_id’, None, ForeignKey(‘tf_group.id’), primary_key=True))

group_permission = Table( ‘group_permission’,  metadata, Column(‘group_id’, None, ForeignKey(‘tf_group.id’), primary_key=True), Column(‘permission_id’, None, ForeignKey(‘tf_permission.id’), primary_key=True))

mapper(User, user_table, properties=dict( _groups=relation(Group, secondary=user_group, backref=’_users’))) mapper(Group, group_table, properties=dict( _permissions=relation(Permission, secondary=group_permission, backref=_’groups’))) mapper(Permission, permission_table)

这里没有修改对象,而join_group,leave_group这样的函数依然可用,sqlalchemy会跟踪变化,并且自动刷新数据库

上面介绍了一个完整的例子,连接数据库嗨可以这样:

engine = create_engine(‘sqlite://’) connection = engine.connect()  #使用connect result = connection.execute(“select user_name from tf_user”) for row in result: print ‘user name: %s’ % row[‘user_name’] result.close()

engine = create_engine(‘sqlite://’, strategy=’threadlocal’)  #,strategy=’threadlocal’表示重用其它本地线程减少对数据库的访问

from sqlalchemy.databases.mysql import MSEnum, MSBigInteger  #这个 sqlalchemy.databases是某数据库软件的’方言’集合,只支持特定平台 user_table = Table(‘tf_user’, meta, Column(‘id’, MSBigInteger), Column(‘honorific’, MSEnum(‘Mr’, ‘Mrs’, ‘Ms’, ‘Miss’, ‘Dr’, ‘Prof’)))

以下是几个MetaData的应用: unbound_meta = MetaData()  #这个metadata没有绑定 db1 = create_engine(‘sqlite://’) unbound_meta.bind = db1  #关联引擎

db2 = MetaData(‘sqlite:///test1.db’)  #直接设置引擎 bound_meta1 = MetaData(db2)

# Create a bound MetaData with an implicitly created engine bound_meta2 = MetaData(‘sqlite:///test2.db’)  #隐式绑定引擎 meta = MetaData(‘sqlite://’) #直接绑定引擎可以让源数据直接访问数据库

user_table = Table( ‘tf_user’, meta, Column(‘id’, Integer, primary_key=True), Column(‘user_name’, Unicode(16), unique=True, nullable=False), Column(‘password’, Unicode(40), nullable=False))

group_table = Table( ‘tf_group’, meta, Column(‘id’, Integer, primary_key=True), Column(‘group_name’, Unicode(16), unique=True, nullable=False))

meta.create_all() #创建所有的数据库(以上2个),函数无参数

result_set = group_table.select().execute() #选取 group_table的所有表数据

以下看一个关联多引擎的例子:

meta = MetaData()  #这里不能直接关联了 engine1 = create_engine(‘sqlite:///test1.db’)  #2个引擎 engine2 = create_engine(‘sqlite:///test2.db’)

# Use the engine parameter to load tables from the first engineuser_table = Table( ‘tf_user’, meta, autoload=True, autoload_with=engine1)  #从第一个引擎加载这些表 group_table = Table( ‘tf_group’, meta, autoload=True, autoload_with=engine1) permission_table = Table( ‘tf_permission’, meta, autoload=True, autoload_with=engine1) user_group_table = Table( ‘user_group’, meta, autoload=True, autoload_with=engine1) group_permission_table = Table( ‘group_permission’, meta, autoload=True, autoload_with=engine1)

meta.create_all(engine2) #在第二个引擎里面创建表

class ImageType(sqlalchemy.types.Binary):  #自定义我们的table的类 def convert_bind_param(self, value, engine): sfp = StringIO() value.save(sfp, ‘JPEG’) return sfp.getvalue() def convert_result_value(self, value, engine): sfp = StringIO(value) image = PIL.Image.open(sfp) return image  #这里我们定义了一个图形处理的类型

当定义了metadata后,会自定生成一个table.c object: q = user_table.select(  #查询创建在2007年6月1号之前的用户,并且第一个字母是’r’ user_table.c.user_name.like(‘r%’)  #这里的c就是那个特殊的类,当使用sql表达式会用到 & user_table.c.created < datetime(2007,6,1)) 或者替代这样: q = user_table.select(and_( user_table.c.user_name.like(‘r%’), user_table.c.created < datetime(2007,6,1))) 也可以使用rom映射: q = session.query(User) q = q.filter(User.c.user_name.like(‘r%’) & User.c.created > datetime(2007,6,1))

还是一个ORM的例子:

user_table = Table( ‘tf_user’, metadata, Column(‘id’, Integer, primary_key=True), Column(‘user_name’, Unicode(16), unique=True, nullable=False), Column(‘email_address’, Unicode(255), unique=True, nullable=False), Column(‘password’, Unicode(40), nullable=False), Column(‘first_name’, Unicode(255), default=”), Column(‘last_name’, Unicode(255), default=”), Column(‘created’, DateTime, default=datetime.now))  #这是一个定义的表类型

group_table = Table( ‘tf_group’, metadata, Column(‘id’, Integer, primary_key=True), Column(‘group_name’, Unicode(16), unique=True, nullable=False))

user_group = Table( ‘user_group’,  metadata, Column(‘user_id’, None, ForeignKey(‘tf_user.id’), primary_key=True), Column(‘group_id’, None, ForeignKey(‘tf_group.id’), … primary_key=True))

import sha class User(object):  #映射类

def _get_password(self): return self._password def _set_password(self, value): self._password = sha.new(value).hexdigest() #只存储用户的哈希密码 password=property(_get_password, _set_password)

def password_matches(self, password): return sha.new(password).hexdigest() == self._password

mapper(User, user_table, properties=dict( #映射将创建id, user_name, email_address, password, first_name, last_name, created等字段 _password=user_table.c.password)) #使用哈希后的密码替换真实密码,数据库只保存哈希后的,这里在orm上修改

mapper(User, user_table, properties=dict( _password=user_table.c.password, groups=relation(Group, secondary=user_group, backref=’users’))) #这里表示可以访问所有的组,用户只需访问一个成员团体属性,user_group映射类添加group和Group关联,

# User类添加users访问group属性,看效果: group1.users.append(user1)  #给group1添加用户user1,自动更新 user2.groups.append(group2) #把user2添加到group2组,自动更新

1  ORM模型的简单性简化了数据库查询过程。使用ORM查询工具,用户可以访问期望数据,而不必理解数据库的底层结构

以下是SQL语句: region_table = Table( ‘region’, metadata, Column(‘id’, Integer, primary_key=True), Column(‘name’, Unicode(255))) 相应的类: class Region(object):

def __init__(self, name): self.name = name

def __repr__(self): return ‘<Region %s>’ % self.name

看一下在交互模式下:

>>> dir(Region) [‘__class__’, ‘__delattr__’, ‘__dict__’, ‘__doc__’, ‘__format__’, ‘__getattribute__’, ‘__hash__’, ‘__init__’, ‘__module__’, ‘__new__’, ‘__reduce__’, ‘__reduce_ex__’, ‘__repr__’, ‘__setattr__’, ‘__sizeof__’, ‘__str__’, ‘__subclasshook__’, ‘__weakref__’] >>> mapper(Region,region_table)  #ORM映射 <Mapper at 0x84bdb2c; Region> >>> dir(Region) [‘__class__’, ‘__delattr__’, ‘__dict__’, ‘__doc__’, ‘__format__’, ‘__getattribute__’, ‘__hash__’, ‘__init__’, ‘__module__’, ‘__new__’, ‘__reduce__’, ‘__reduce_ex__’, ‘__repr__’, ‘__setattr__’, ‘__sizeof__’, ‘__str__’, ‘__subclasshook__’, ‘__weakref__’, ‘_sa_class_manager’, ‘id’, ‘name’] #增加了很多属性 >>> Region.id <sqlalchemy.orm.attributes.InstrumentedAttribute object at 0x84c238c> >>> Region.name <sqlalchemy.orm.attributes.InstrumentedAttribute object at 0x84c254c>

>>> r0 = Region(name=”Northeast”) >>> r1 = Region(name=”Southwest”) >>> r0 <Region Northeast>  #类能显示这样的数据是因为类定义了__repr__方法 >>> r1 <Region Southwest> >>> from sqlalchemy.orm import clear_mappers >>> clear_mappers() #取消映射 >>> Region.name #不再有这个属性 Traceback (most recent call last): File “<stdin>”, line 1, in <module> AttributeError: type object ‘Region’ has no attribute ‘name’ >>> dir(Region)  #回到了原来的只有类属性 [‘__class__’, ‘__delattr__’, ‘__dict__’, ‘__doc__’, ‘__format__’, ‘__getattribute__’, ‘__hash__’, ‘__init__’, ‘__module__’, ‘__new__’, ‘__reduce__’, ‘__reduce_ex__’, ‘__repr__’, ‘__setattr__’, ‘__sizeof__’, ‘__str__’, ‘__subclasshook__’, ‘__weakref__’]

>>> r0 = Region(name=”Northeast”)  #从这里开始理解ORM做了什么 >>> r1 = Region(name=”Southwest”) #实现了2个类的实例

>>> metadata.create_all(engine) #创建table

>>> Session = sessionmaker()  #通过sessionmaker产生一个会话 >>> Session.configure(bind=engine) #绑定到数据库连接 >>> session = Session()  #产生会话实例,让对象可以被载入或保存到数据库,而只需要访问类却不用直接访问数据库 >>> session.bind.echo = True #显示打印信息

>>> session.add(r1) #把r0,r12个实例加到会话中 >>> session.add(r0) >>> print r0.id  #因为还没有保存,数据为空 None >>> session.flush() #提交数据到数据库 2012-07-18 10:24:07,116 INFO sqlalchemy.engine.base.Engine BEGIN (implicit) 2012-07-18 10:24:07,116 INFO sqlalchemy.engine.base.Engine INSERT INTO region (name) VALUES (?) 2012-07-18 10:24:07,116 INFO sqlalchemy.engine.base.Engine (‘Southwest’,) 2012-07-18 10:24:07,117 INFO sqlalchemy.engine.base.Engine INSERT INTO region (name) VALUES (?) 2012-07-18 10:24:07,117 INFO sqlalchemy.engine.base.Engine (‘Northeast’,) >>> print r0.id #id因为子增长,出现了 2 >>> r0.name = ‘Northwest’ >>> session.flush() #修改提交 2012-07-18 10:24:50,644 INFO sqlalchemy.engine.base.Engine UPDATE region SET name=? WHERE region.id = ? 2012-07-18 10:24:50,644 INFO sqlalchemy.engine.base.Engine (‘Northwest’, 2) >>> print r0.name #数据库中的数据被update成了新值 Northwest >>> dir(Region) [‘__class__’, ‘__delattr__’, ‘__dict__’, ‘__doc__’, ‘__format__’, ‘__getattribute__’, ‘__hash__’, ‘__init__’, ‘__module__’, ‘__new__’, ‘__reduce__’, ‘__reduce_ex__’, ‘__repr__’, ‘__setattr__’, ‘__sizeof__’, ‘__str__’, ‘__subclasshook__’, ‘__weakref__’] >>> mapper(Region, region_table, include_properties=[‘id’]) #使用 include_properties只映射某些字段,同样还有exclude_properties <Mapper at 0x84c26cc; Region> >>> dir(Region) [‘__class__’, ‘__delattr__’, ‘__dict__’, ‘__doc__’, ‘__format__’, ‘__getattribute__’, ‘__hash__’, ‘__init__’, ‘__module__’, ‘__new__’, ‘__reduce__’, ‘__reduce_ex__’, ‘__repr__’, ‘__setattr__’, ‘__sizeof__’, ‘__str__’, ‘__subclasshook__’, ‘__weakref__’, ‘_sa_class_manager’, ‘id’]  #只多了一个”id”

>>> clear_mappers() >>> dir(Region) [‘__class__’, ‘__delattr__’, ‘__dict__’, ‘__doc__’, ‘__format__’, ‘__getattribute__’, ‘__hash__’, ‘__init__’, ‘__module__’, ‘__new__’, ‘__reduce__’, ‘__reduce_ex__’, ‘__repr__’, ‘__setattr__’, ‘__sizeof__’, ‘__str__’, ‘__subclasshook__’, ‘__weakref__’] >>> mapper(Region, region_table, column_prefix=’_’)  #映射后自定义修改新属性的前缀 <Mapper at 0x84f73ac; Region> >>> dir(Region) [‘__class__’, ‘__delattr__’, ‘__dict__’, ‘__doc__’, ‘__format__’, ‘__getattribute__’, ‘__hash__’, ‘__init__’, ‘__module__’, ‘__new__’, ‘__reduce__’, ‘__reduce_ex__’, ‘__repr__’, ‘__setattr__’, ‘__sizeof__’, ‘__str__’, ‘__subclasshook__’, ‘__weakref__’, ‘_id’, ‘_name’, ‘_sa_class_manager’] #id和name等前面都有了”_”

>>> clear_mappers() >>> dir(Region) [‘__class__’, ‘__delattr__’, ‘__dict__’, ‘__doc__’, ‘__format__’, ‘__getattribute__’, ‘__hash__’, ‘__init__’, ‘__module__’, ‘__new__’, ‘__reduce__’, ‘__reduce_ex__’, ‘__repr__’, ‘__setattr__’, ‘__sizeof__’, ‘__str__’, ‘__subclasshook__’, ‘__weakref__’] >>> mapper(Region, region_table, properties=dict( …     region_name=region_table.c.name,  #想把name的属性定义为region_name,因为c.name就是用Table创建的表结构的特定实例的name属性 …     region_id=region_table.c.id)) <Mapper at 0x8509d2c; Region> >>> dir(Region) [‘__class__’, ‘__delattr__’, ‘__dict__’, ‘__doc__’, ‘__format__’, ‘__getattribute__’, ‘__hash__’, ‘__init__’, ‘__module__’, ‘__new__’, ‘__reduce__’, ‘__reduce_ex__’, ‘__repr__’, ‘__setattr__’, ‘__sizeof__’, ‘__str__’, ‘__subclasshook__’, ‘__weakref__’, ‘_sa_class_manager’, ‘region_id’, ‘region_name’]  #id改名为region_id

>>> class Region(object):  #重新定义类 …     def __init__(self, name): …         self.name = name …     def __repr__(self): …         return ‘<Region %s>’ % self.name …     def _get_name(self): #这个_get和_set是为了让内置的property调用 …         return self._name …     def _set_name(self, value): …         assert value.endswith(‘Region’), \ …             ‘Region names must end in “Region”’ …         self._name = value …     name=property(_get_name, _set_name) #通过property的定义,当获取成员x的值时,就会调用_get_name函数(第一个参数),当给成员x赋值时,就会调用_set_name函数(第二个参数),当删除x时,就会调用delx函数(这里没有设置) … >>> from sqlalchemy.orm import synonym >>> mapper(Region, region_table, column_prefix=’_’, properties=dict( …     name=synonym(‘_name’))) #首先检验_name的属性是否满足 <Mapper at 0x84f7acc; Region> >>> s0 = Region(‘Southeast’)  #没有正确结尾 Traceback (most recent call last): File “<stdin>”, line 1, in <module> File “<string>”, line 4, in __init__ File “/usr/lib/python2.7/site-packages/SQLAlchemy-0.7.8-py2.7-linux-i686.egg/sqlalchemy/orm/state.py”, line 98, in initialize_instance return manager.original_init(*mixed[1:], **kwargs) File “<stdin>”, line 3, in __init__ File “<string>”, line 1, in __set__ File “<stdin>”, line 10, in _set_name AssertionError: Region names must end in “Region” >>> s0 = Region(‘Southeast Region’) #正常

>>> segment_table = Table( …     ‘segment’, metadata, …     Column(‘id’, Integer, primary_key=True), …     Column(‘lat0’, Float), …     Column(‘long0’, Float), …     Column(‘lat1’, Float), …     Column(‘long1’, Float))

>>> metadata.create_all(engine) #创建表 >>> class RouteSegment(object): #一个含有begin和end的类 …     def __init__(self, begin, end): …         self.begin = begin …         self.end = end …     def __repr__(self): …         return ‘<Route %s to %s>’ % (self.begin, self.end) … >>> class MapPoint(object): …     def __init__(self, lat, long): …         self.coords = lat, long …     def __composite_values__(self): #返回比较后的列表或者元祖 …         return self.coords …     def __eq__(self, other): …         return self.coords == other.coords …     def __ne__(self, other): …         return self.coords != other.coords …     def __repr__(self): …         return ‘(%s lat, %s long)’ % self.coords … …

>>> from sqlalchemy.orm import composite >>> mapper(RouteSegment, segment_table, properties=dict( …     begin=composite(MapPoint,  #创建多个属性 …         segment_table.c.lat0, …         segment_table.c.long0), …     end=composite(MapPoint, …         segment_table.c.lat1, segment_table.c.long1))) <Mapper at 0x86203cc; RouteSegment> >>> work=MapPoint(33.775562,-84.29478) >>> library=MapPoint(34.004313,-84.452062) >>> park=MapPoint(33.776868,-84.389785) >>> routes = [ …     RouteSegment(work, library), …     RouteSegment(work, park), …     RouteSegment(library, work), …     RouteSegment(library, park), …     RouteSegment(park, library), …     RouteSegment(park, work)]

>>> for rs in routes: …     session.add(rs) … >>> session.flush() >>> q = session.query(RouteSegment) >>> print RouteSegment.begin==work segment.lat0 = :lat0_1 AND segment.long0 = :long0_1 >>> q = q.filter(RouteSegment.begin==work) >>> for rs in q: …     print rs … 2012-07-18 11:12:29,360 INFO sqlalchemy.engine.base.Engine SELECT segment.id AS segment_id, segment.lat0 AS segment_lat0, segment.long0 AS segment_long0, segment.lat1 AS segment_lat1, segment.long1 AS segment_long1 FROM segment WHERE segment.lat0 = ? AND segment.long0 = ? 2012-07-18 11:12:29,360 INFO sqlalchemy.engine.base.Engine (33.775562, -84.29478) <Route (33.775562 lat, -84.29478 long) to (34.004313 lat, -84.452062 long)> <Route (33.775562 lat, -84.29478 long) to (33.776868 lat, -84.389785 long)>

>>> from sqlalchemy.orm import PropComparator >>> class MapPointComparator(PropComparator): #自定义运算符继承PropComparator类 …     def __lt__(self, other):  #自定义小于运算结果 …         return and_(*[a<b for a, b in …             zip(self.prop.columns, …                 other.__composite_values__())]) … >>> mapper(RouteSegment, segment_table, properties=dict( …     begin=composite(MapPoint, …                     segment_table.c.lat0, segment_table.c.long0, …                     comparator=MapPointComparator),  #定义使用自定义的运算类 …     end=composite(MapPoint, …                   segment_table.c.lat1, segment_table.c.long1, …                   comparator=MapPointComparator))) <Mapper at 0x85b2bac; RouteSegment> >>> product_table = Table( … ‘product’, metadata, … Column(‘sku’, String(20), primary_key=True), … Column(‘msrp’, Numeric), … Column(‘image’, BLOB)) >>> from sqlalchemy.orm import deferred >>> mapper(Product, product_table, properties=dict( …     image=deferred(product_table.c.image)))  #deferred意思是延迟,就是在实现 mapper 时,可以指定某些字段是 Deferred 装入的,这样象通常一样取出数据时,这些字段并不真正的从数据库中取出,只有在你真正需要时才取出,这样可以减少资源的占用和提高效率,只有在读取 image时才会取出相应的数据 <Mapper at 0x862a40c; Product>

>>> metadata.remove(product_table)  #因为已经常见了表,先删除 >>> product_table = Table( …     ‘product’, metadata, …     Column(‘sku’, String(20), primary_key=True), …     Column(‘msrp’, Numeric), …     Column(‘image1’, Binary), …     Column(‘image2’, Binary), …     Column(‘image3’, Binary))

>>> clear_mappers() #已经映射,先取消 >>> mapper(Product, product_table, properties=dict( …     image1=deferred(product_table.c.image1, group=’images’), …     image2=deferred(product_table.c.image2, group=’images’), …     image3=deferred(product_table.c.image3, group=’images’))) #Deferred字段可以通过在 properties 中指定 group参数来表示编组情况。这样当一个组的某个

#字段被取出时, 同组的其它字段均被取出 <Mapper at 0x85b8c4c; Product>

>>> q = product_table.join(  被映射的是join了product_summary_table到product_table的结果 … product_summary_table, … product_table.c.sku==product_summary_table.c.sku).alias(‘full_product’) >>> class FullProduct(object): pass … >>> mapper(FullProduct, q) <Mapper at 0x86709cc; FullProduct>

mapper函数的一些参数:

always_refresh =False:返回查询旧会修改内存中的值,但是populate_existing优先级高

allow_column_override =False:允许关系属性将具有相同的名称定义为一个映射列,否则名称冲突,产生异常

2 ORM的关系

1 1:N relations (1对多)

>>> mapper(Store, store_table) <Mapper at 0x84fba4c; Store> >>> from sqlalchemy.orm import relation >>> mapper(Region, region_table, properties=dict( …     stores=relation(Store))) #让2个表关联,给Region添加一个属性stores,通过它联系Store来修改Store <Mapper at 0x84f76ac; Region>

>>> r0 = Region(‘test’)

>>> session.add(r0) #先生成一条数据 >>> session.commit() 2012-07-18 13:56:26,858 INFO sqlalchemy.engine.base.Engine BEGIN (implicit) 2012-07-18 13:56:26,859 INFO sqlalchemy.engine.base.Engine INSERT INTO region (name) VALUES (?) 2012-07-18 13:56:26,859 INFO sqlalchemy.engine.base.Engine (‘test’,) 2012-07-18 13:56:26,859 INFO sqlalchemy.engine.base.Engine COMMIT >>> rgn = session.query(Region).get(1)  #获取这条数据 2012-07-18 13:56:37,250 INFO sqlalchemy.engine.base.Engine BEGIN (implicit) 2012-07-18 13:56:37,251 INFO sqlalchemy.engine.base.Engine SELECT region.id AS region_id, region.name AS region_name FROM region WHERE region.id = ? 2012-07-18 13:56:37,251 INFO sqlalchemy.engine.base.Engine (1,) >>> s0 = Store(name=’3rd and Juniper’) #创建一个实例 >>> rgn.stores.append(s0) #通过Region的依赖建立新的Store(其中的一个字段region_id值来着region的id字段) 2012-07-18 13:56:51,611 INFO sqlalchemy.engine.base.Engine SELECT store.id AS store_id, store.region_id AS store_region_id, store.name AS store_name FROM store WHERE ? = store.region_id 2012-07-18 13:56:51,611 INFO sqlalchemy.engine.base.Engine (1,) >>> session.flush() #保存数据库 2012-07-18 13:57:02,131 INFO sqlalchemy.engine.base.Engine INSERT INTO store (region_id, name) VALUES (?, ?) 2012-07-18 13:57:02,131 INFO sqlalchemy.engine.base.Engine (1, ‘3rd and Juniper’) 注:假如2个表之间有多个外部依赖关系,需要使用primaryjoin指定:

mapper(Region, region_table, properties=dict( stores=relation(Store, primaryjoin=(store_table.c.region_id  #判断关系来着region_id和region的id ==region_table.c.id))))

2 M:N relations(多对多)

上面有SQL语句:我复制过来:

category_table = Table( ‘category’, metadata, Column(‘id’, Integer, primary_key=True), Column(‘level_id’, None, ForeignKey(‘level.id’)), Column(‘parent_id’, None, ForeignKey(‘category.id’)), Column(‘name’, String(20))) product_table = Table( ‘product’, metadata, Column(‘sku’, String(20), primary_key=True), Column(‘msrp’, Numeric)) product_category_table = Table( ‘product_category’, metadata, Column(‘product_id’, None, ForeignKey(‘product.sku’), primary_key=True), Column(‘category_id’, None, ForeignKey(‘category.id’), primary_key=True))

可以看出来product_category_table和category_table 是多对多的关系.

>>> mapper(Category, category_table, properties=dict( …     products=relation(Product, …     secondary=product_category_table))) <Mapper at 0x859c8cc; Category> >>> mapper(Product, product_table, properties=dict( …     categories=relation(Category, …     secondary=product_category_table))) <Mapper at 0x859c5cc; Product>

>>> r0=Product(‘123’,’234’)

>>> session.add(r0) >>> session.flush() 2012-07-18 14:18:06,599 INFO sqlalchemy.engine.base.Engine BEGIN (implicit) 2012-07-18 14:18:06,618 INFO sqlalchemy.engine.base.Engine INSERT INTO product (sku, msrp) VALUES (?, ?) 2012-07-18 14:18:06,618 INFO sqlalchemy.engine.base.Engine (‘123’, 234.0) >>> session.query(Product).get(‘123’).categories

>>> clear_mappers() >>> mapper(Category, category_table, properties=dict( …     products=relation(Product, secondary=product_category_table, … primaryjoin=(product_category_table.c.category_id  #primaryjoin是要被映射的表和连接表的条件 …                                    == category_table.c.id), … secondaryjoin=(product_category_table.c.product_id  #secondaryjoin是连接表和想加入的表的条件 …                                      == product_table.c.sku)))) <Mapper at 0x84ff7cc; Category> >>> mapper(Product, product_table, properties=dict( …     categories=relation(Category, secondary=product_category_table, … primaryjoin=(product_category_table.c.product_id …                                      == product_table.c.sku), … secondaryjoin=(product_category_table.c.category_id …                                        == category_table.c.id)))) <Mapper at 0x859cb8c; Product> 1:1 relations(一对一):特殊的(1:N) 还是上面的SQL:

product_table = Table( ‘product’, metadata, Column(‘sku’, String(20), primary_key=True), Column(‘msrp’, Numeric)) product_summary_table = Table( ‘product_summary’, metadata, Column(‘sku’, None, ForeignKey(‘product.sku’), primary_key=True), #只有一个外联到product Column(‘name’, Unicode(255)), Column(‘description’, Unicode))

>>> mapper(Product, product_table, properties=dict( …     summary=relation(ProductSummary))) KeyboardInterrupt >>> mapper(ProductSummary, product_summary_table) <Mapper at 0x84fbe6c; ProductSummary> >>> mapper(Product, product_table, properties=dict( …     summary=relation(ProductSummary))) <Mapper at 0x85bee6c; Product> >>> prod = session.query(Product).get(‘123’) []  #product_summary_table因为product_table儿存在,浪费了

>>> mapper(ProductSummary, product_summary_table) <Mapper at 0x84f7dec; ProductSummary> >>> mapper(Product, product_table, properties=dict( …     summary=relation(ProductSummary,uselist=False)))  #使用uselist=False就不会这样了 <Mapper at 0x860584c; Product> >>> prod = session.query(Product).get(‘123’) >>> print prod.summary None >>> mapper(ProductSummary, product_summary_table) <Mapper at 0x859ca0c; ProductSummary> >>> mapper(Product, product_table, properties=dict( …     summary=relation(ProductSummary, uselist=False, …     backref=’product’))) #自定义自己表的函数 <Mapper at 0x860e90c; Product> >>> prod = session.query(Product).get(‘123’) >>> prod.summary = ProductSummary(name=”Fruit”, description=”Some … Fruit”) >>> print prod.summary <ProductSummary Fruit> >>> print prod.summary.product #他的属性就是prod,可就是表本身 <Product 123> >>> print prod.summary.product is prod True

>>> mapper(Level, level_table, properties=dict( …     categories=relation(Category, backref=’level’))) <Mapper at 0x860590c; Level> >>> mapper(Category, category_table, properties=dict( …     products=relation(Product, …         secondary=product_category_table))) <Mapper at 0x860ec8c; Category> >>> mapper(Product, product_table, properties=dict( …     categories=relation(Category, …         secondary=product_category_table))) <Mapper at 0x860e7ec; Product> >>> lvl = Level(name=’Department’) >>> cat = Category(name=’Produce’, level=lvl) >>> session.add(lvl) >>> session.flush() 2012-07-18 14:44:02,005 INFO sqlalchemy.engine.base.Engine INSERT INTO level (parent_id, name) VALUES (?, ?) 2012-07-18 14:44:02,005 INFO sqlalchemy.engine.base.Engine (None, ‘Department’) 2012-07-18 14:44:02,020 INFO sqlalchemy.engine.base.Engine INSERT INTO category (level_id, parent_id, name) VALUES (?, ?, ?) 2012-07-18 14:44:02,020 INFO sqlalchemy.engine.base.Engine (1, None, ‘Produce’) >>> prod = session.query(Product).get(‘123’) >>> print prod.categories [] >>> print cat.products 2012-07-18 14:44:25,517 INFO sqlalchemy.engine.base.Engine SELECT product.sku AS product_sku, product.msrp AS product_msrp FROM product, product_category WHERE ? = product_category.category_id AND product.sku = product_category.product_id 2012-07-18 14:44:25,517 INFO sqlalchemy.engine.base.Engine (1,) [] >>> prod.categories.append(cat) >>> print prod.categories [<Category Department.Produce>] >>> print cat.products  #backref自动更新,在多对多的情况,可以使用relation函数两次,但是2个属性没有保持同步 []  #解决方法:

>>> mapper(Level, level_table, properties=dict( …categories=relation(Category, backref=’level’))) >>> mapper(Category, category_table, properties=dict( …products=relation(Product, secondary=product_category_table, … backref=’categories’)))  #在Product也设置backref,就会保持同步 >>> mapper(Product, product_table) >>> lvl = Level(name=’Department’) >>> cat = Category(name=’Produce’, level=lvl) >>> session.save(lvl) >>> prod = session.query(Product).get(‘123’) >>> print prod.categories [] >>> print cat.products [] >>> prod.categories.append(cat) >>> print prod.categories [<Category Department.Produce>] >>>print cat.products [<Product 123>]

>>> from sqlalchemy.orm import backref >>> clear_mappers() >>> mapper(ProductSummary, product_summary_table, properties=dict( … product=relation(Product, … backref=backref(‘summary’, uselist=False))))  #还可以使用backref函数做一样的事情 <Mapper at 0x860aaec; ProductSummary> >>> mapper(Product, product_table) <Mapper at 0x85bee6c; Product>

4 Self-Referential 自我参照映射 level_table = Table( ‘level’, metadata, Column(‘id’, Integer, primary_key=True), Column(‘parent_id’, None, ForeignKey(‘level.id’)), #这个外联其实还是这个类的id,也就是映射了自己的对象 Column(‘name’, String(20))) >>> mapper(Level, level_table, properties=dict( … children=relation(Level))) #不同层次之间的父子关系,我这里指定得到”子”的属性 <Mapper at 0x860a66c; Level> >>> mapper(Level, level_table, properties=dict( …     children=relation(Level, …     backref=backref(‘parent’, …     remote_side=[level_table.c.id]))))  #remote_side指定’子’的id,local side”就是字段parent_id <Mapper at 0x860e42c; Level> >>> l0 = Level(‘Gender’) >>> l1 = Level(‘Department’, parent=l0) >>> session.add(l0) >>> session.flush() 2012-07-18 15:07:55,810 INFO sqlalchemy.engine.base.Engine INSERT INTO level (parent_id, name) VALUES (?, ?) 2012-07-18 15:07:55,810 INFO sqlalchemy.engine.base.Engine (None, ‘Gender’) #插入l0,他没有父级 2012-07-18 15:07:55,810 INFO sqlalchemy.engine.base.Engine INSERT INTO level (parent_id, name) VALUES (?, ?) 2012-07-18 15:07:55,810 INFO sqlalchemy.engine.base.Engine (2, ‘Department’)

注 我们还能反过来用:

mapper(Level, level_table, properties=dict( parent=relation(Level, remote_side=[level_table.c.parent_id], backref=’children’)))

我们创建一个多引擎的例子:

from sqlalchemy import create_engine
from sqlalchemy.orm import mapper, sessionmaker
from sqlalchemy import Numeric,Table, MetaData, Column, ForeignKey, Integer, String
engine1 = create_engine('sqlite://')
engine2 = create_engine('sqlite://')
metadata = MetaData()
product_table = Table(
'product', metadata,
Column('sku', String(20), primary_key=True),
Column('msrp', Numeric))
product_summary_table = Table(
'product_summary', metadata,
Column('sku', String(20), ForeignKey('product.sku'), primary_key=True),
Column('name', Unicode(255)),
Column('description', Unicode))
product_table.create(bind=engine1)
product_summary_table.create(bind=engine2)
stmt = product_table.insert()
engine1.execute(
stmt,
[dict(sku="123", msrp=12.34),
dict(sku="456", msrp=22.12),
dict(sku="789", msrp=41.44)])
stmt = product_summary_table.insert()
engine2.execute(
stmt,
[dict(sku="123", name="Shoes", description="Some Shoes"),
dict(sku="456", name="Pants", description="Some Pants"),
dict(sku="789", name="Shirts", description="Some Shirts")])

 

这样就创建了表并且插入了一些数据

dongwm@localhost ~ $ python Python 2.7.3 (default, Jul 11 2012, 10:10:17) [GCC 4.5.3] on linux2 Type “help”, “copyright”, “credits” or “license” for more information. >>> from sqlalchemy import create_engine >>> from sqlalchemy.orm import mapper, sessionmaker >>> from sqlalchemy import Numeric,Table, MetaData, Column, ForeignKey, Integer, String,Unicode >>> engine1 = create_engine(‘sqlite://’) >>> engine2 = create_engine(‘sqlite://’)  #创建多个引擎 >>> metadata = MetaData() >>> product_table = Table( … ‘product’, metadata, … Column(‘sku’, String(20), primary_key=True), … Column(‘msrp’, Numeric)) >>> product_summary_table = Table( … ‘product_summary’, metadata, … Column(‘sku’, String(20), ForeignKey(‘product.sku’), primary_key=True), … Column(‘name’, Unicode(255)), … Column(‘description’, Unicode)) >>> product_table.create(bind=engine1) >>> product_summary_table.create(bind=engine2) >>> stmt = product_table.insert() >>> engine1.execute( … stmt, … [dict(sku=”123”, msrp=12.34), … dict(sku=”456”, msrp=22.12), … dict(sku=”789”, msrp=41.44)]) <sqlalchemy.engine.base.ResultProxy object at 0x84ef9ec> >>> stmt = product_summary_table.insert() >>> engine2.execute(  #用引擎2 插入数据,那么product_summary的数据就在这个引擎 … stmt, … [dict(sku=”123”, name=”Shoes”, description=”Some Shoes”), … dict(sku=”456”, name=”Pants”, description=”Some Pants”), … dict(sku=”789”, name=”Shirts”, description=”Some Shirts”)]) /usr/lib/python2.7/site-packages/SQLAlchemy-0.7.8-py2.7-linux-i686.egg/sqlalchemy/engine/default.py:463: SAWarning: Unicode type received non-unicode bind param value. param.append(processors[key](compiled_params[key])) <sqlalchemy.engine.base.ResultProxy object at 0x84e896c> >>> class Product(object): …     def __init__(self, sku, msrp, summary=None): …         self.sku = sku …         self.msrp = msrp …         self.summary = summary …     def __repr__(self): …         return ‘<Product %s>’ % self.sku … >>> class ProductSummary(object): …     def __init__(self, name, description): …         self.name = name …         self.description = description …     def __repr__(self): …         return ‘<ProductSummary %s>’ % self.name … >>> from sqlalchemy.orm import clear_mappers,backref,relation >>> clear_mappers() >>> mapper(ProductSummary, product_summary_table, properties=dict( …     product=relation(Product, …                      backref=backref(‘summary’, uselist=False)))) <Mapper at 0x84efa4c; ProductSummary> >>> mapper(Product, product_table) <Mapper at 0x84efd0c; Product> >>> Session = sessionmaker(binds={Product:engine1,  #这里绑定了2个引擎,不同orm的引擎不同 …     ProductSummary:engine2}) >>> session = Session() >>> engine1.echo = engine2.echo = True >>> session.query(Product).all() #查询product的数据 2012-07-18 19:00:59,514 INFO sqlalchemy.engine.base.Engine BEGIN (implicit) 2012-07-18 19:00:59,514 INFO sqlalchemy.engine.base.Engine SELECT product.sku AS product_sku, product.msrp AS product_msrp FROM product 2012-07-18 19:00:59,514 INFO sqlalchemy.engine.base.Engine () /usr/lib/python2.7/site-packages/SQLAlchemy-0.7.8-py2.7-linux-i686.egg/sqlalchemy/types.py:215: SAWarning: Dialect sqlite+pysqlite does *not* support Decimal objects natively, and SQLAlchemy must convert from floating point - rounding errors and other issues may occur. Please consider storing Decimal numbers as strings or integers on this platform for lossless storage. d[coltype] = rp = d[‘impl’].result_processor(dialect, coltype) [<Product 123>, <Product 456>, <Product 789>] >>> session.query(ProductSummary).all() #查询ProductSummary 2012-07-18 19:01:07,510 INFO sqlalchemy.engine.base.Engine BEGIN (implicit) 2012-07-18 19:01:07,510 INFO sqlalchemy.engine.base.Engine SELECT product_summary.sku AS product_summary_sku, product_summary.name AS product_summary_name, product_summary.description AS product_summary_description FROM product_summary 2012-07-18 19:01:07,510 INFO sqlalchemy.engine.base.Engine () [<ProductSummary Shoes>, <ProductSummary Pants>, <ProductSummary Shirts>]

>>> from sqlalchemy.orm.shard import ShardedSession #使用ShardedSession对会话水平分区,根据需求把数据分开 >>> product_table = Table( …     ‘product’, metadata, …     Column(‘sku’, String(20), primary_key=True), …     Column(‘msrp’, Numeric)) >>> metadata.create_all(bind=engine1) >>> metadata.create_all(bind=engine2) >>> class Product(object): …     def __init__(self, sku, msrp): …         self.sku = sku …         self.msrp = msrp …     def __repr__(self): …         return ‘<Product %s>’ % self.sku … >>> clear_mappers() >>> product_mapper = mapper(Product, product_table) >>> def shard_chooser(mapper, instance, clause=None):  #返回包含映射和实例的行的分区ID …     if mapper is not product_mapper: #非设定的orm映射叫做odd …         return ‘odd’ …     if (instance.sku  #数据为偶数也叫做even …         and instance.sku[0].isdigit() …         and int(instance.sku[0]) % 2 == 0): …         return ‘even’ …     else: …         return ‘odd’ #否则叫做odd …

>>> def id_chooser(query, ident):  根据查询和映射类的主键返回对象想通过查询驻留的shard ID列表 …     if query.mapper is not product_mapper: …         return [‘odd’] …     if (ident \ …         and ident[0].isdigit() …         and int(ident[0]) % 2 == 0): …         return [‘even’] …     return [‘odd’] … >>> def query_chooser(query): #返回可选的shard ID列表 …     return [‘even’, ‘odd’] … >>> Session = sessionmaker(class_=ShardedSession) >>> session = Session( …     shard_chooser=shard_chooser, …     id_chooser=id_chooser, …     query_chooser=query_chooser, …     shards=dict(even=engine1, …                 odd=engine2)) >>> products = [ Product(‘%d%d%d’ % (i,i,i), 0.0) …     for i in range(10) ] >>> for p in products: …     session.add(p) … >>> session.flush() >>> for row in engine1.execute(product_table.select()): …     print row … 2012-07-18 19:11:19,811 INFO sqlalchemy.engine.base.Engine SELECT product.sku, product.msrp FROM product 2012-07-18 19:11:19,811 INFO sqlalchemy.engine.base.Engine () (u’000’, Decimal(‘0E-10’)) #偶数数据写在engine1 (u’222’, Decimal(‘0E-10’)) (u’444’, Decimal(‘0E-10’)) (u’666’, Decimal(‘0E-10’)) (u’888’, Decimal(‘0E-10’)) >>> for row in engine2.execute(product_table.select()): …     print row … 2012-07-18 19:11:40,098 INFO sqlalchemy.engine.base.Engine SELECT product.sku, product.msrp FROM product 2012-07-18 19:11:40,099 INFO sqlalchemy.engine.base.Engine () (u’111’, Decimal(‘0E-10’)) #奇数数据写在engine1 (u’333’, Decimal(‘0E-10’)) (u’555’, Decimal(‘0E-10’)) (u’777’, Decimal(‘0E-10’)) (u’999’, Decimal(‘0E-10’)) >>> session.query(Product).all() 2012-07-18 19:12:36,130 INFO sqlalchemy.engine.base.Engine SELECT product.sku AS product_sku, product.msrp AS product_msrp FROM product 2012-07-18 19:12:36,130 INFO sqlalchemy.engine.base.Engine () 2012-07-18 19:12:36,131 INFO sqlalchemy.engine.base.Engine SELECT product.sku AS product_sku, product.msrp AS product_msrp FROM product 2012-07-18 19:12:36,131 INFO sqlalchemy.engine.base.Engine () [<Product 123>, <Product 456>, <Product 789>, <Product 000>, <Product 222>, <Product 444>, <Product 666>, <Product 888>, <Product 111>, <Product 333>, <Product 555>, <Product 777>, <Product 999>]

from sqlalchemy import create_engine from sqlalchemy.orm import mapper, sessionmaker from datetime import datetime from sqlalchemy import Numeric,Table, MetaData, Column, ForeignKey, Integer, String, Unicode, DateTime from sqlalchemy import types from sqlalchemy.databases import sqlite engine1 = create_engine(‘sqlite://’) engine2 = create_engine(‘sqlite://’) metadata = MetaData() product_table = Table( ‘product’, metadata, Column(‘sku’, String(20), primary_key=True), Column(‘msrp’, Numeric)) product_summary_table = Table( ‘product_summary’, metadata, Column(‘sku’, String(20), ForeignKey(‘product.sku’), primary_key=True), Column(‘name’, Unicode(255)), Column(‘description’, Unicode)) product_table.create(bind=engine1) product_summary_table.create(bind=engine2) stmt = product_table.insert() engine1.execute( stmt, [dict(sku=”123”, msrp=12.34), dict(sku=”456”, msrp=22.12), dict(sku=”789”, msrp=41.44)]) stmt = product_summary_table.insert() engine2.execute( stmt, [dict(sku=”123”, name=”Shoes”, description=”Some Shoes”), dict(sku=”456”, name=”Pants”, description=”Some Pants”), dict(sku=”789”, name=”Shirts”, description=”Some Shirts”)])