Follower me on GitHub

本文主要是讲关于sqlalchemy的扩展

扩展其实就是一些外部的插件,比如sqlsoup,associationproxy,declarative,horizontal_shard等等

1 declarative

假如想要数据映射,以前的做法是:

from sqlalchemy import create_engine
from sqlalchemy import Column, MetaData, Table
from sqlalchemy import Integer, String, ForeignKey
from sqlalchemy.orm import mapper, sessionmaker

class User(object): #简单类 def __init__(self, name, fullname, password): self.name = name self.fullname = fullname self.password = password def __repr__(self): return "<User('%s','%s', '%s')>" % (self.name, self.fullname, self.password) metadata = MetaData() users_table = Table('users', metadata, Column('user_id', Integer, primary_key=True), Column('name', String), Column('fullname', String), Column('password', String) ) email_table = Table('email', metadata, Column('email_id', Integer, primary_key=True), Column('email_address', String), Column('user_id', Integer, ForeignKey('users.user_id')) ) metadata.create_all(engine)

mapper(User, users_table) #映射

但是我们可以该换风格,可以用这样的方法:

from sqlalchemy import Column, Integer, String, ForeignKey
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import backref, mapper, relation, sessionmaker

Base = declarative_base()

class User(Base): __tablename__ = "users" #设定接收映射的表名 id = Column(Integer, primary_key=True) #将表结构写到类里面 name = Column(String) fullname = Column(String) password = Column(String) def __init__(self, name, fullname, password): self.name = name self.fullname = fullname self.password = password def __repr__(self): return "<User('%s','%s', '%s')>" % (self.name, self.fullname, self.password)

class Address(Base): __tablename__ = "addresses" id = Column(Integer, primary_key=True) email_address = Column(String, nullable=False) user_id = Column(Integer, ForeignKey('users.id')) user = relation(User, backref=backref('addresses', order_by=id)) #创建双向关系,标识以user的id为连接,也就是说:Address到User是多对一,User到Address是一对多 def __init__(self, email_address): self.email_address = email_address def __repr__(self): return "<Address('%s')>" % self.email_address engine = create_engine("sqlite:///tutorial.db", echo=True)

users_table = User.__table__ #获取User表对象句柄 metadata = Base.metadata #获取metadata句柄 metadata.create_all(engine)

下面具体说:

engine = create_engine(‘sqlite://’) #创建引擎 Base.metadata.create_all(engine) #常见表 Base.metadata.bind = create_engine(‘sqlite://’) #绑定 Base = declarative_base(bind=create_engine(‘sqlite://’)) #绑定引擎 mymetadata = MetaData() Base = declarative_base(metadata=mymetadata) #设定元数据设定简单关系: class User(Base): __tablename__ = ‘users’id = Column(Integer, primary_key=True) name = Column(String(50)) addresses = relationship(“Address”, backref=”user”) #relationship其实就是relation的全称

class Address(Base): __tablename__ = ‘addresses’

id = Column(Integer, primary_key=True) email = Column(String(50)) user_id = Column(Integer, ForeignKey(‘users.id’)) 设定多对多关系: keywords = Table( ‘keywords’, Base.metadata, Column(‘author_id’, Integer, ForeignKey(‘authors.id’)), Column(‘keyword_id’, Integer, ForeignKey(‘keywords.id’)) ) class Author(Base): __tablename__ = ‘authors’ id = Column(Integer, primary_key=True) keywords = relationship(“Keyword”, secondary=keywords) 定义SQL表达式: class MyClass(Base): __tablename__ = ‘sometable’ __table_args__ = {‘mysql_engine’:’InnoDB’} #名字,映射类,元数据之外的指定需要使用__table_args__

或者: class MyClass(Base): __tablename__ = ‘sometable’ __table_args__ = ( ForeignKeyConstraint([‘id’], [‘remote_table.id’]), #元组方式 UniqueConstraint(‘foo’), )

或者: class MyClass(Base): __tablename__ = ‘sometable’ __table_args__ = ( ForeignKeyConstraint([‘id’], [‘remote_table.id’]), UniqueConstraint(‘foo’), {‘autoload’:True} #最后的参数可以用字典 想想*argsand **kwargs ) 使用混合式: class MyClass(Base): __table__ = Table(‘my_table’, Base.metadata, #在__table__里指定表结构 Column(‘id’, Integer, primary_key=True), Column(‘name’, String(50)) ) 2 sqlsoup(在sqlalchemy0.8版本后他变成了一个独立的项目,http://pypi.python.org/pypi/sqlsoup,

而我使用gentoo提供的0.7.8版本,以下的程序import部分可能不适用更高版本,而需要import sqlsoup)

sqlsoup提供一个方便的访问数据库的接方式,而无需创建类,映射数据库

还是看例子的对比:

用以前的方式创建一个数据库并且插入一些数据:

>>> from sqlalchemy import * >>> engine = create_engine(‘sqlite:///dongwm.db’) >>> metadata = MetaData(engine) >>> product_table = Table( …     ‘product’, metadata, …     Column(‘sku’, String(20), primary_key=True), …     Column(‘msrp’, Numeric)) >>> store_table = Table( …     ‘store’, metadata, …     Column(‘id’, Integer, primary_key=True), …     Column(‘name’, Unicode(255))) >>> product_price_table = Table( …     ‘product_price’, metadata, … Column(‘sku2’, None, ForeignKey(‘product.sku’), primary_key=True), … Column(‘store_id’, None, ForeignKey(‘store.id’), primary_key=True), …     Column(‘price’, Numeric, default=0)) >>> metadata.create_all() >>> stmt = product_table.insert() >>> stmt.execute([dict(sku=”123”, msrp=12.34), …               dict(sku=”456”, msrp=22.12), …               dict(sku=”789”, msrp=41.44)]) <sqlalchemy.engine.base.ResultProxy object at 0x84fbdcc> >>> stmt = store_table.insert() >>> stmt.execute([dict(name=”Main Store”), …               dict(name=”Secondary Store”)]) <sqlalchemy.engine.base.ResultProxy object at 0x850068c> >>> stmt = product_price_table.insert() >>> stmt.execute([dict(store_id=1, sku=”123”), …               dict(store_id=1, sku2=”456”), …               dict(store_id=1, sku2=”789”), …               dict(store_id=2, sku2=”123”), …               dict(store_id=2, sku2=”456”), …               dict(store_id=2, sku2=”789”)]) <sqlalchemy.engine.base.ResultProxy object at 0x85008cc> 创建插入完毕,然后我们用sqlsoup连接操作:

>>> from sqlalchemy.ext.sqlsoup import SqlSoup >>> db = SqlSoup(‘sqlite:///dongwm.db’)  #连接一个存在的数据库 >>> print db.product.all() #打印结果 [MappedProduct(sku=u’123’,msrp=Decimal(‘12.3400000000’)), MappedProduct(sku=u’456’,msrp=Decimal(‘22.1200000000’)), MappedProduct(sku=u’789’,msrp=Decimal(‘41.4400000000’))] >>> print db.product.get(‘123’) #是不是比session.query(Product)简单呢? MappedProduct(sku=u’123’,msrp=Decimal(‘12.3400000000’))

注:假如想创建一个数据库: db = SqlSoup(‘sqlite:///:memory:’)

>>> newprod = db.product.insert(sku=’111’, msrp=22.44) #没有使用数据映射的插入 >>> db.flush() >>> db.clear() #调用底层,清除所有session实例,它是session.expunge_all的别名 >>> db.product.all() [MappedProduct(sku=u’123’,msrp=Decimal(‘12.3400000000’)), MappedProduct(sku=u’456’,msrp=Decimal(‘22.1200000000’)), MappedProduct(sku=u’789’,msrp=Decimal(‘41.4400000000’)), MappedProduct(sku=u’111’,msrp=Decimal(‘22.4400000000’))] #新条目已经存在了 #MappedProduct使用__getattr__将无法识别的属性和访问方法转发到它的query属性,它还提供了一些数据处理功能用于更新

>>> from sqlalchemy import or_, and_, desc >>> where = or_(db.product.sku==’123’, db.product.sku==’111’) >>> db.product.filter(where).order_by(desc(db.product.msrp)).all() #这样使用多条件过滤,降序排练 [MappedProduct(sku=’111’,msrp=22.44), MappedProduct(sku=u’123’,msrp=Decimal(‘12.3400000000’))]

>>> join1 = db.join(db.product, db.product_price, isouter=True) #关联2个表, isouter=True确保LEFT OUTER(还没理解) >>> join1.all() [MappedJoin(sku=u’123’,msrp=Decimal(‘12.3400000000’),sku2=u’123’,store_id=1,price=Decimal(‘0E-10’)),  #这个字段包含了2个表的相应字段 MappedJoin(sku=u’123’,msrp=Decimal(‘12.3400000000’),sku2=u’123’,store_id=2,price=Decimal(‘0E-10’)), MappedJoin(sku=u’456’,msrp=Decimal(‘22.1200000000’),sku2=u’456’,store_id=1,price=Decimal(‘0E-10’)), MappedJoin(sku=u’456’,msrp=Decimal(‘22.1200000000’),sku2=u’456’,store_id=2,price=Decimal(‘0E-10’)), MappedJoin(sku=u’789’,msrp=Decimal(‘41.4400000000’),sku2=u’789’,store_id=1,price=Decimal(‘0E-10’)), MappedJoin(sku=u’789’,msrp=Decimal(‘41.4400000000’),sku2=u’789’,store_id=2,price=Decimal(‘0E-10’)), MappedJoin(sku=u’111’,msrp=Decimal(‘22.4400000000’),sku2=None,store_id=None,price=None)] >>> join2 = db.join(join1, db.store, isouter=True) #将store表也关联进来(因为也有一个外键),就是关联三个表 >>> join2.all() [MappedJoin(sku=u’123’,msrp=Decimal(‘12.3400000000’),sku2=u’123’,store_id=1,price=Decimal(‘0E-10’),id=1,name=u’Main Store’), MappedJoin(sku=u’123’,msrp=Decimal(‘12.3400000000’),sku2=u’123’,store_id=2,price=Decimal(‘0E-10’),id=2,name=u’Secondary Store’), MappedJoin(sku=u’456’,msrp=Decimal(‘22.1200000000’),sku2=u’456’,store_id=1,price=Decimal(‘0E-10’),id=1,name=u’Main Store’), MappedJoin(sku=u’456’,msrp=Decimal(‘22.1200000000’),sku2=u’456’,store_id=2,price=Decimal(‘0E-10’),id=2,name=u’Secondary Store’), MappedJoin(sku=u’789’,msrp=Decimal(‘41.4400000000’),sku2=u’789’,store_id=1,price=Decimal(‘0E-10’),id=1,name=u’Main Store’), MappedJoin(sku=u’789’,msrp=Decimal(‘41.4400000000’),sku2=u’789’,store_id=2,price=Decimal(‘0E-10’),id=2,name=u’Secondary Store’), MappedJoin(sku=u’111’,msrp=Decimal(‘22.4400000000’),sku2=None,store_id=None,price=None,id=None,name=None)] >>> join3 = db.with_labels(join1) #根据原籍标记,比如sku会说出:product_sku,告诉你它来着product表,但是指定了jion1,就不会标识关于store的表 >>> join3.first() MappedJoin(product_sku=u’123’,product_msrp=Decimal(‘12.3400000000’),product_price_sku2=u’123’,product_price_store_id=1,product_price_price=Decimal(‘0E-10’)) >>> db.with_labels(join2).first() MappedJoin(product_sku=u’123’,product_msrp=Decimal(‘12.3400000000’),product_price_sku2=u’123’,product_price_store_id=1,product_price_price=Decimal(‘0E-10’),store_id=1,store_name=u’Main Store’) >>> labelled_product = db.with_labels(db.product) >>> join4 = db.join(labelled_product, db.product_price,  isouter=True) >>> join4.first() MappedJoin(product_sku=u’123’,product_msrp=Decimal(‘12.3400000000’),sku2=u’123’,store_id=1,price=Decimal(‘0E-10’))

>>> db.clear() >>> join5 = db.join(db.product, db.product_price) >>> s = select([db.product._table, …     func.avg(join5.c.price).label(‘avg_price’)], #添加一个字段计算产品(product)的price平均值,字段名为avg_price …     from_obj=[join5._table], …     group_by=[join5.c.sku]) >>> s = s.alias(‘products_with_avg_price’) #它是from sqlalchemy import alias; a = alias(self, name=name)的简写 >>> products_with_avg_price = db.map(s, primary_key=[join5.c.sku]) #因为没有映射到表或者join,需要指定如何找到主键 >>> products_with_avg_price.all() [MappedJoin(sku=u’123’,msrp=Decimal(‘12.3400000000’),avg_price=0.0), MappedJoin(sku=u’456’,msrp=Decimal(‘22.1200000000’),avg_price=0.0), MappedJoin(sku=u’789’,msrp=Decimal(‘41.4400000000’),avg_price=0.0)] >>> db.product_price.first().price = 50.00 >>> db.flush() >>> products_with_avg_price.all() [MappedJoin(sku=u’123’,msrp=Decimal(‘12.3400000000’),avg_price=0.0), MappedJoin(sku=u’456’,msrp=Decimal(‘22.1200000000’),avg_price=0.0), MappedJoin(sku=u’789’,msrp=Decimal(‘41.4400000000’),avg_price=0.0)] >>> db.products_with_avg_price = products_with_avg_price #保存映射到db,方便重用 >>> msrp=select([db.product.c.msrp], …     db.product.sku==db.product_price.sku2) #获取sku和sku2相等时候msrp的值 >>> db.product_price.update(  #更新数据 …     values=dict(price=msrp),synchronize_session=False) #设置price这个字段值为上面对应的msrp

6 >>> db.product_price.all() [MappedProduct_price(sku2=u’123’,store_id=1,price=Decimal(‘12.3400000000’)), MappedProduct_price(sku2=u’456’,store_id=1,price=Decimal(‘22.1200000000’)), MappedProduct_price(sku2=u’789’,store_id=1,price=Decimal(‘41.4400000000’)), MappedProduct_price(sku2=u’123’,store_id=2,price=Decimal(‘12.3400000000’)), MappedProduct_price(sku2=u’456’,store_id=2,price=Decimal(‘22.1200000000’)), MappedProduct_price(sku2=u’789’,store_id=2,price=Decimal(‘41.4400000000’))]

3 associationproxy

associationproxy用于创建一个读/写整个关系的目标属性

看一个例子就懂了:

>>> user_table = Table( …     ‘user’, metadata, …     Column(‘id’, Integer, primary_key=True), …     Column(‘user_name’, String(255), unique=True), …     Column(‘password’, String(255))) >>> brand_table = Table( …     ‘brand’, metadata, …     Column(‘id’, Integer, primary_key=True), …     Column(‘name’, String(255))) >>> sales_rep_table = Table( …     ‘sales_rep’, metadata, … Column(‘brand_id’, None, ForeignKey(‘brand.id’), primary_key=True), … Column(‘user_id’, None, ForeignKey(‘user.id’), primary_key=True), …     Column(‘commission_pct’, Integer, default=0)) >>> class User(object): pass … >>> class Brand(object): pass … >>> class SalesRep(object): pass … >>> mapper(User, user_table, properties=dict( …     sales_rep=relation(SalesRep, backref=’user’, uselist=False))) <Mapper at 0x87472ec; User> >>> mapper(Brand, brand_table, properties=dict( …     sales_reps=relation(SalesRep, backref=’brand’))) <Mapper at 0x874770c; Brand> >>> mapper(SalesRep, sales_rep_table) <Mapper at 0x874768c; SalesRep>

ORM完成,但是假如我们想要brand(品牌)类对象的一个所有SalesReps for Brand(品牌的销售代表)的User列表属性,可以这样:

class Brand(object): @property def users(self): return [ sr.user for sr in self.sales_reps ]

但是不方便增加删除,而使用association_proxy:

>>> from sqlalchemy.ext.associationproxy import association_proxy >>> class Brand(object): …         users=association_proxy(‘sales_reps’, ‘user’) …

或者:

mapper(Brand, brand_table, properties=dict( sales_reps=relation(SalesRep, backref=’brand’))) Brand.users=association_proxy(‘sales_reps’, ‘user’)#优点是维持了域对象

我们需要修改类,增加属性:

class User(object): def __init__(self, user_name=None, password=None): self.user_name=user_name self.password=password

class Brand(object): def __init__(self, name=None): self.name = name

class SalesRep(object): def __init__(self, user=None, brand=None, commission_pct=0): self.user = user self.brand = brand self.commission_pct=commission_pct

看下面的效果:

>>> b = Brand(‘Cool Clothing’) >>> session.add(b) >>> u = User(‘rick’, ‘foo’) >>> session.add(u) >>> session.flush() 2012-07-20 12:22:33,191 INFO sqlalchemy.engine.base.Engine INSERT INTO user (user_name, password) VALUES (?, ?) 2012-07-20 12:22:33,191 INFO sqlalchemy.engine.base.Engine (‘rick’, ‘foo’) 2012-07-20 12:22:33,191 INFO sqlalchemy.engine.base.Engine INSERT INTO brand (name) VALUES (?) 2012-07-20 12:22:33,191 INFO sqlalchemy.engine.base.Engine (‘Cool Clothing’,) >>> b.users 2012-07-20 12:22:42,135 INFO sqlalchemy.engine.base.Engine SELECT sales_rep.brand_id AS sales_rep_brand_id, sales_rep.user_id AS sales_rep_user_id, sales_rep.commission_pct AS sales_rep_commission_pct FROM sales_rep WHERE ? = sales_rep.brand_id 2012-07-20 12:22:42,135 INFO sqlalchemy.engine.base.Engine (2,) [] >>> b.users.append(u) #自动创建一个单一的位置参数调用其中介(SalesRep)对象 2012-07-20 12:22:46,782 INFO sqlalchemy.engine.base.Engine SELECT sales_rep.brand_id AS sales_rep_brand_id, sales_rep.user_id AS sales_rep_user_id, sales_rep.commission_pct AS sales_rep_commission_pct FROM sales_rep WHERE ? = sales_rep.user_id 2012-07-20 12:22:46,782 INFO sqlalchemy.engine.base.Engine (2,) >>> b.users [<__main__.User object at 0x87d7b6c>] >>> b.sales_reps [<__main__.SalesRep object at 0x87d7c4c>] >>> b.sales_reps[0].commission_pct 0 >>> session.flush() 2012-07-20 12:23:14,215 INFO sqlalchemy.engine.base.Engine INSERT INTO sales_rep (brand_id, user_id, commission_pct) VALUES (?, ?, ?) 2012-07-20 12:23:14,215 INFO sqlalchemy.engine.base.Engine (2, 2, 0)

更复杂的想法给销售人员一个10%的提成:

Brand.users=association_proxy( ‘sales_reps’, ‘user’, creator=lambda u:SalesRep(user=u, commission_pct=10))

假设我们想要的品牌属性是一个附带User和佣金commission_pct的字典:

from sqlalchemy.orm.collections import attribute_mapped_collection >>> from sqlalchemy.orm.collections import attribute_mapped_collection >>> reps_by_user_class=attribute_mapped_collection(‘user’) >>> clear_mappers() >>> mapper(Brand, brand_table, properties=dict( …     sales_reps_by_user=relation( …         SalesRep, backref=’brand’, …         collection_class=reps_by_user_class))) <Mapper at 0x862c5ec; Brand>

>>> Brand.commissions=association_proxy( …     ‘sales_reps_by_user’, ‘commission_pct’, …     creator=lambda key,value: SalesRep(user=key, commission_pct=value)) >>> mapper(User, user_table, properties=dict( …     sales_rep=relation(SalesRep, backref=’user’, uselist=False))) <Mapper at 0x8764b2c; User> >>> mapper(SalesRep, sales_rep_table) <Mapper at 0x87bb4cc; SalesRep> >>> b = session.query(Brand).get(1) >>> u = session.query(User).get(1) >>> b.commissions[u] = 20 >>> session.bind.echo = False >>> session.flush() >>> b = session.query(Brand).get(1) >>> u = session.query(User).get(1) >>> u.user_name u’dongwm’ >>> print b.commissions[u] 20 >>> print b.sales_reps_by_user[u] #代理和原来的关系是自动同步的 <__main__.SalesRep object at 0x87e3dcc> >>> print b.sales_reps_by_user[u].commission_pct 20