Follower me on GitHub

1 自定义表结构类型:

from sqlalchemy import types

class MyCustomEnum(types.TypeDecorator):  #自定义的类型继承至types.TypeDecorator

impl=types.Integer  #实现指定的类型int

def __init__(self, enum_values, *l, **kw): types.TypeDecorator.__init__(self, *l, **kw) self._enum_values = enum_values

def convert_bind_param(self, value, engine):  #必须含有这个方法,转换python语言为SQL result = self.impl.convert_bind_param(value, engine) if result not in self._enum_values: raise TypeError, ( “Value %s must be one of %s” % (result, self._enum_values)) return result

def convert_result_value(self, value, engine):  #必须含有这个方法,通过db的api把SQL转换成python语言 ‘Do nothing here’ return self.impl.convert_result_value(value, engine)

看一个例子:

from sqlalchemy import types
from sqlalchemy.databases import sqlite
class MyCustomEnum(types.TypeDecorator):
    impl = types.Integer
    def __init__(self, enum_values, *l, **kw):
        types.TypeDecorator.__init__(self, *l, **kw)
        self._enum_values = enum_values
    def bind_processor(self, dialect): #如果提供这个方法会替代convert_bind_param( )和convert_result_value( ) 
        impl_processor = self.impl.bind_processor(dialect)
        if impl_processor:
            def processor(value):
                result = impl_processor(value)
                assert value in self._enum_values, \
                    "Value %s must be one of %s" % (result,
                    self._enum_values)
                return result
        else:
            def processor(value):
                assert value in self._enum_values, \
                    "Value %s must be one of %s" % (value,
                     self._enum_values)
                return value
        return processor
mce=MyCustomEnum([1,2,3])
processor = mce.bind_processor(sqlite.dialect())
print processor(1) #返回1 
print processor(5) #返回错误,因为不是1,2,3中的数据
你甚至可以直接定义自定的TypeDecorator

class NewType(types.TypeEngine): #TypeDecorator继承自types.TypeEngine

def __init__(self, *args): self._args = args

def get_col_spec(self):  #create_table( )会用到这个方法 return ‘NEWTYPE(%s)’ % ‘,’.join(self._args)

def convert_bind_param(self, value, engine):  #这个必须设置 return value

def convert_result_value(self, value, engine):  #这个也必须设置 return value

2 SQL语句在交互模式下的例子:

dongwm@localhost ~ $ python Python 2.7.3 (default, Jul 11 2012, 10:10:17) [GCC 4.5.3] on linux2 Type “help”, “copyright”, “credits” or “license” for more information. >>> from sqlalchemy import Table, MetaData, Column, ForeignKey, Integer, String, Unicode, DateTime >>> metadata=MetaData() >>> simple_table = Table(  #一个简单的表结构 …     ‘simple’, metadata, …     Column(‘id’, Integer, primary_key=True), …     Column(‘col1’, Unicode(20))) >>> >>> stmt = simple_table.insert()  #插入数据操作的实例 >>> print stmt #打印这个实例 INSERT INTO simple (id, col1) VALUES (:id, :col1) #里面包含需要替换的变量 >>> compiled_stmt = stmt.compile()  #编译语句 >>> print compiled_stmt.params #转成了字典得方式 {‘id’: None, ‘col1’: None} >>> from sqlalchemy import create_engine >>> engine = create_engine(‘sqlite://’) >>> simple_table.create(bind=engine)  #创建table >>> engine.execute(stmt, col1=”Foo”) #给语句添加值 /usr/lib/python2.7/site-packages/SQLAlchemy-0.7.8-py2.7-linux-i686.egg/sqlalchemy/engine/default.py:463: SAWarning: Unicode type received non-unicode bind param value. param.append(processors[key](compiled_params[key])) <sqlalchemy.engine.base.ResultProxy object at 0x8376c8c> >>> metadata.bind = engine  #和上面效果一样,给语句添加值 >>> stmt.execute(col1=”Bar”) <sqlalchemy.engine.base.ResultProxy object at 0x8376f4c> >>> stmt = simple_table.insert(values=dict(col1=”Initial value”)) #这次插入已经设置了值 >>> print stmt INSERT INTO simple (col1) VALUES (?) >>> compiled_stmt = stmt.compile() >>> print compiled_stmt.params {‘col1’: ‘Initial value’} >>> stmt = simple_table.insert() >>> stmt.execute(col1=”First value”) <sqlalchemy.engine.base.ResultProxy object at 0x838832c> >>> >>> stmt.execute(col1=”Second value”) <sqlalchemy.engine.base.ResultProxy object at 0x838844c> >>> stmt.execute(col1=”Third value”) #这样一行一行插入真是费劲 <sqlalchemy.engine.base.ResultProxy object at 0x838856c> >>> stmt.execute([dict(col1=”Fourth Value”), #可以一次插入多行 …     dict(col1=”Fifth Value”), …     dict(col1=”Sixth Value”)]) <sqlalchemy.engine.base.ResultProxy object at 0x83886ac> >>> from sqlalchemy import text >>> stmt = simple_table.update( …     whereclause=text(“col1=’First value’”), …     values=dict(col1=’1st Value’))  #执行col1是First value的条目设置值为1st Value >>> stmt.execute() <sqlalchemy.engine.base.ResultProxy object at 0x838878c> >>> stmt = simple_table.update(text(“col1=’Second value’”)) #寻找col1是Second value的条目 >>> stmt.execute(col1=’2nd Value’) #执行更新时,设置其值,想过和上面的一样 <sqlalchemy.engine.base.ResultProxy object at 0x8376f4c> >>> stmt = simple_table.update(text(“col1=’Third value’”)) >>> print stmt UPDATE simple SET id=?, col1=? WHERE col1=’Third value’ >>> engine.echo = True #设置打印调试日志 >>> stmt.execute(col1=’3rd value’) 2012-07-17 15:16:59,231 INFO sqlalchemy.engine.base.Engine UPDATE simple SET col1=? WHERE col1=’Third value’ 2012-07-17 15:16:59,245 INFO sqlalchemy.engine.base.Engine (‘3rd value’,) 2012-07-17 15:16:59,245 INFO sqlalchemy.engine.base.Engine COMMIT <sqlalchemy.engine.base.ResultProxy object at 0x83767ec>

>>> stmt = simple_table.delete(  #删除 …     text(“col1=’Second value’”)) >>> stmt.execute() 2012-07-17 15:21:03,806 INFO sqlalchemy.engine.base.Engine DELETE FROM simple WHERE col1=’Second value’ 2012-07-17 15:21:03,806 INFO sqlalchemy.engine.base.Engine () 2012-07-17 15:21:03,806 INFO sqlalchemy.engine.base.Engine COMMIT <sqlalchemy.engine.base.ResultProxy object at 0x8376a0c> >>> from sqlalchemy import select >>> stmt = select([simple_table.c.col1])  #查询col1这个字段 >>> for row in stmt.execute(): …     print row (u’Foo’,) (u’Bar’,) (u’1st Value’,) (u’2nd Value’,) (u’3rd value’,) (u’Fourth Value’,) (u’Fifth Value’,) (u’Sixth Value’,)

>>> stmt = simple_table.select() #和上面的区别是这是条目全部显示 >>> for row in stmt.execute():  #这2句也可以这样表示stmt = select( simple_table]) …     print row … (1, u’Foo’) (2, u’Bar’) (3, u’1st Value’) (4, u’2nd Value’) (5, u’3rd value’) (6, u’Fourth Value’) (7, u’Fifth Value’) (8, u’Sixth Value’) >>> x = simple_table.c.col1==”Foo” >>> print type(x) <class ‘sqlalchemy.sql.expression._BinaryExpression’> >>> print x simple.col1 = :col1_1 >>> expr = simple_table.c.col1 + “-col1”  #它还支持运算符 >>> print expr simple.col1 || :col1_1 >>> from sqlalchemy.databases import mysql >>> print expr.compile(dialect=mysql.MySQLDialect()) concat(simple.col1, %s) #在不同的数据库软件,效果不同

>>> from sqlalchemy import func >>> print func.now() now() >>> print func.current_timestamp <sqlalchemy.sql.expression._FunctionGenerator object at 0x83888cc> >>> print func._(text(‘a=b’)) (a=b)

注:sqlalchemy支持in,op,startwith,endwith,between,like等运算

>>> from sqlalchemy import bindparam  #自定义绑定的词 >>> stmt = select([simple_table.c.col1], …     whereclause=simple_table.c.col1==bindparam(‘test’))  #用test替换原来的col1 >>> print stmt SELECT simple.col1 FROM simple WHERE simple.col1 = ? #这里依然是col1 >>> print stmt.execute(test=’Foo’).fetchall() [(u’Foo’,)]

>>> stmt = simple_table.select(order_by=[simple_table.c.col1])  #更具col1,升序排序 >>> print stmt SELECT simple.id, simple.col1 FROM simple ORDER BY simple.col1 >>> print stmt.execute().fetchall() [(3, u’1st Value’), (4, u’2nd Value’), (5, u’3rd value’), (2, u’Bar’), (7, u’Fifth Value’), (1, u’Foo’), (6, u’Fourth Value’), (8, u’Sixth Value’)] >>> from sqlalchemy import desc >>> stmt = simple_table.select(order_by=[desc(simple_table.c.col1)]) #根据col1,降序排序 >>> print stmt SELECT simple.id, simple.col1 FROM simple ORDER BY simple.col1 DESC >>> print stmt.execute().fetchall() [(8, u’Sixth Value’), (6, u’Fourth Value’), (1, u’Foo’), (7, u’Fifth Value’), (2, u’Bar’), (5, u’3rd value’), (4, u’2nd Value’), (3, u’1st Value’)] 注:distinct=True去重复,效果类似于SELECT DISTINCT

>>> stmt = simple_table.select(offset=1, limit=1) #offset设置偏移,这里就是略过第一个,返回第二个.limit设置返回多少个条目 >>> print stmt SELECT simple.id, simple.col1 FROM simple LIMIT ? OFFSET ? >>> print stmt.execute().fetchall() [(2, u’Bar’)] 看下面的例子:

“Persons” 表:

Id_P LastName FirstName Address City
1 Adams John Oxford Street London
2 Bush George Fifth Avenue New York
3 Carter Thomas Changan Street Beijing
“Orders” 表:
Id_O OrderNo Id_P
1 77895 3
2 44678 3
3 22456 1
4 24562 1
5 34764 65
现在,我们希望列出所有的人,以及他们的定购号码: SELECT Persons.LastName, Persons.FirstName, Orders.OrderNo FROM Persons LEFT JOIN Orders #将orders表join进来 ON Persons.Id_P=Orders.Id_P #关系联系 ORDER BY Persons.LastName #排序 书中的例子是这样的: SELECT store.name FROM store JOIN product_price ON store.id=product_price.store_id JOIN product ON product_price.sku=product.sku WHERE product.msrp != product_price.price; 转换成sqlalchemy语句: >>>from_obj = store_table.join(product_price_table).join(product_table) >>> query = store_table.select() >>> query = query.select_from(from_obj) >>> query = query.where(product_table.c.msrp != product_price_table.c.price) >>> print query SELECT store.id, store.name FROM store JOIN product_price ON store.id = product_price.store_id JOIN product ON product.sku = product_price.sku WHERE product.msrp != product_price.price >>> print query.column(‘product.sku’) SELECT store.id, store.name, product.sku FROM store JOIN product_price ON store.id = product_price.store_id  JOIN product ON product.sku = product_price.sku WHERE product.msrp != product_price.price >>> query2 = select([store_table, product_table.c.sku],from_obj=[from_obj],whereclause=(product_table.c.msrp !=product_price_table.c.price)) >>> print query2 SELECT store.id, store.name, product.sku FROM store JOIN product_price ON store.id = product_price.store_id JOIN product ON product.sku = product_price.sku WHERE product.msrp != product_price.price >>> query = product_table.select(and_(product_table.c.msrp > 10.00 ,product_table.c.msrp < 20.00)) #范围查询 >>> print query SELECT product.sku, product.msrp FROM product WHERE product.msrp > ? AND product.msrp < ? >>> for r in query.execute(): …print r (u’123’, Decimal(“12.34”))

>>> from sqlalchemy import intersect

>>> query0 = product_table.select(product_table.c.msrp > 10.00) >>> query1 = product_table.select(product_table.c.msrp < 20.00) >>> query = intersect(query0, query1) #使用 intersect添加多query >>> print query SELECT product.sku, product.msrp

employee_table = Table( ‘employee’, metadata, Column(‘id’, Integer, primary_key=True), Column(‘manager’, None, ForeignKey(‘employee.id’)), Column(‘name’, String(255)))

给设定alias:

比如想实现以下SQL

SELECT employee.name FROM employee, employee AS manager WHERE employee.manager_id = manager.id AND manager.name = ‘Fred’

>>> manager = employee_table.alias(‘mgr’) >>> stmt = select([employee_table.c.name], … and_(employee_table.c.manager_id==manager.c.id, … manager.c.name==’Fred’)) >>> print stmt SELECT employee.name FROM employee, employee AS mgr WHERE employee.manager_id = mgr.id AND mgr.name = ?

>>> manager = employee_table.alias()  #自动alias >>> stmt = select([employee_table.c.name], …and_(employee_table.c.manager_id==manager.c.id, …manager.c.name==’Fred’)) >>> print stmt SELECT employee.name FROM employee, employee AS employee_1 WHERE employee.manager_id = employee_1.id AND employee_1.name = ?

 

 

from sqlalchemy import types class MyCustomEnum(types.TypeDecorator): impl=types.Integer def __init__(self, enum_values, *l, **kw): types.TypeDecorator.__init__(self, *l, **kw) self._enum_values = enum_values def convert_bind_param(self, value, engine): result = self.impl.convert_bind_param(value, engine) if result not in self._enum_values: raise TypeError, ( “Value %s must be one of %s” % (result, self._enum_values)) Application-Specific Custom Types | 63return result def convert_result_value(self, value, engine): ‘Do nothing here’ return self.impl.convert_result_value(value, engine)

对于SQLAlchemy的一些总结:

1 metadata.create_all()

创建多个table可以这样使用,但是他还有个功能,它添加了”IF NOT EXISTS”,就是在数据库存在的时候,他还是安全的

2 交互模式下的一个全过程:

dongwm@localhost ~ $ python
Python 2.7.3 (default, Jul 11 2012, 10:10:17) 
[GCC 4.5.3] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> from sqlalchemy import create_engine
>>> from sqlalchemy import Table, MetaData, Column, ForeignKey, Integer, String, Unicode, DateTime
>>> from datetime import datetime
>>> metadata = MetaData('sqlite:///tutorial.db')
>>> user_table = Table(
...     'tf_user', metadata,
...     Column('id', Integer, primary_key=True),
...     Column('user_name', Unicode(16),
...            unique=True, nullable=False),
...     Column('password', Unicode(40), nullable=False),
...     Column('display_name', Unicode(255), default=''),
...     Column('created', DateTime, default=datetime.now))
__main__:7: SAWarning: Unicode column received non-unicode default value.
>>> stmt = user_table.insert()  #插入数据
>>> stmt.execute(user_name='dongwm1', password='secret',display_name='testdongwm1')
/usr/lib/python2.7/site-packages/SQLAlchemy-0.7.8-py2.7-linux-i686.egg/sqlalchemy/engine/default.py:463: SAWarning: Unicode type received non-unicode bind param value.
  param.append(processors[key](compiled_params[key]))
<sqlalchemy.engine.base.ResultProxy object at 0x8377fcc>
>>> stmt.execute(user_name='dongwm2', password='secret',display_name='testdongwm2') #这个实例可以多次插入,和sql区别很大
<sqlalchemy.engine.base.ResultProxy object at 0x837e4ec>
>>> stmt = user_table.select() #select查询
>>> result = stmt.execute()
>>> for row in result:
...     print row 
... 
(1, u'dongwm1', u'secret', u'testdongwm1', datetime.datetime(2012, 7, 17, 11, 57, 48, 515953))
(2, u'dongwm2', u'secret', u'testdongwm2', datetime.datetime(2012, 7, 17, 11, 58, 5, 226977))
>>> result = stmt.execute()
>>> row =result.fetchone() #只获取符合要求的第一项
>>> print  row['user_name']
dongwm1
>>> print row.password
secret
>>> print row.items()
[(u'id', 1), (u'user_name', u'dongwm1'), (u'password', u'secret'), (u'display_name', u'testdongwm1'), (u'created', datetime.datetime(2012, 7, 17, 11, 57, 48, 515953))]
>>> stmt = user_table.select(user_table.c.user_name=='dongwm1') #过滤留下user_name=='dongwm1的项
>>> print stmt.execute().fetchall() #获取所有符合项
[(1, u'dongwm1', u'secret', u'testdongwm1', datetime.datetime(2012, 7, 17, 11, 57, 48, 515953))]
>>> stmt = user_table.update(user_table.c.user_name=='dongwm1') #更新数据
>>> stmt.execute(password='secret123') #修改密码
<sqlalchemy.engine.base.ResultProxy object at 0x8377f6c>
>>> stmt = user_table.delete(user_table.c.user_name != 'dongwm1')  #删除user_name不是dongwm1的条目
>>> stmt.execute()
<sqlalchemy.engine.base.ResultProxy object at 0x837f3ac>
>>> user_table.select().execute().fetchall() #查询发现就剩一条了
[(1, u'dongwm1', u'secret123', u'testdongwm1', datetime.datetime(2012, 7, 17, 11, 57, 48, 515953))]

3 sission上面已经说过了,补充一些:

session.delete(u) #把映射类从会话中删除

4 关于引擎

引擎就是根据不同的数据库方言连接数据库的方法

以下是一些例子(方法 driver://username:password@host:port/database): engine = create_engine(‘sqlite://’)  #连接基于内存的sqlite engine = create_engine(‘sqlite:///data.sqlite’)  #连接基于硬盘文件的sqlite engine = create_engine(‘postgres://dongwm:foo@localhost:5432/pg_db’)  #连接postgresql engine = create_engine(‘mysql://localhost/mysql_db’)  #连接mysql engine = create_engine(‘oracle://dongwm:foo@oracle_tns’) #连接基于TNS协议的Oracle engine =create_engine(‘oracle://dongwm:foo@localhost:1521/oracle_sid’) #连接没有TNS名字的Oracle

也可以带一些参数:

url=’postgres://dongwm:foo@localhost/pg_db?arg1=foo&arg2=bar’ engine = create_engine(url)

或者:

engine = create_engine(‘postgres://dongwm:foo@localhost/pg_db’, connect_args=dict(arg1=’foo’, arg2=’bar’))

还可以通过函数完全控制连接:

import psycopg def connect_pg(): return psycopg.connect(user=’rick’, host=’localhost’) engine = create_engine(‘postgres://’, creator=connect_pg)

import logging handler = logging.FileHandler(‘sqlalchemy.engine.log’)  #可以给它添加一个日志文件处理类 handler.level = logging.DEBUG logging.getLogger(‘sqlalchemy.engine’).addHandler(handler)

上面说的操作表,也可以直接操作数据库:

conn = engine.connect() result = conn.execute(‘select user_name, email_address from tf_user’) #结果是一个sqlalchemy.engine.ResultProxy的实例 for row in result: print ‘User name: %s Email address: %s’ % ( row[‘user_name’], row[‘email_address’]) conn.close()

from sqlalchemy import pool #本来它已经自动通过数据库连接管理数据池,但是也可以手动管理 import psycopg2 psycopg = pool.manage(psycopg2) #结果是一个sqlalchemy.pool.DBProxy实例 connection = psycopg.connect(database=’mydb’, username=’rick’, password=’foo’)

5 关于元数据metadata

它收集了描述table对象等的元数据类,当使用ORM等时必须使用metadata

如果他被绑定了,那么使用table.create()就会生成表,没有绑定需要:table.create(bind=some_engine_or_connection),其中table.create

包含一些函数:

autoload:默认是false,当数据库已经存在这个table会自动加载覆盖

autoload_with:默认是false,是否自动加载引擎的字段结构

reflect:默认是false,是否体现源表结构

brand_table = Table(‘brand’, metadata, Column(‘name’, Unicode(255)), # 覆盖类型 autoload=True)

6 关于表结构:

设置表主键可以这样:

Column(‘brand_id’, Integer, ForeignKey(‘brand.id’),primary_key=True), #通过primary_key=True Column(‘sku’, Unicode(80), primary_key=True)) 也可以这样: product_table = Table( ‘product’, metadata, Column(‘brand_id’, Integer, ForeignKey(‘brand.id’)), Column(‘sku’, Unicode(80)), PrimaryKeyConstraint(‘brand_id’, ‘sku’, name=’prikey’))  #通过PrimaryKeyConstraint

style_table = Table( ‘style’, metadata, Column(‘brand_id’, Integer, primary_key=True), Column(‘sku’, Unicode(80), primary_key=True), Column(‘code’, Unicode(80), primary_key=True), ForeignKeyConstraint(  #使用复合键,关联外部表的字段 [‘brand_id’, ‘sku’], [‘product.brand_id’, ‘product.sku’]))

product_table = Table( ‘product’, metadata, Column(‘id’, Integer, primary_key=True), Column(‘brand_id’, Integer, ForeignKey(‘brand.id’)), #他的brand_id关联brand的让id Column(‘sku’, Unicode(80)), UniqueConstraint(‘brand_id’, ‘sku’)) #约束唯一标识数据库表中的每条记录

payment_table = Table( ‘payment’, metadata, Column(‘amount’, Numeric(10,2), CheckConstraint(‘amount > 0’))) #验证amount大于0 user_table = Table( ‘tf_user’, MetaData(), Column(‘id’, Integer, primary_key=True), Column(‘user_name’, Unicode(16), unique=True, nullable=False), Column(‘password’, Unicode(40), nullable=False), Column(‘first_name’, Unicode(255), default=”), Column(‘last_name’, Unicode(255), default=”), Column(‘created_apptime’, DateTime, default=datetime.now), #default表示当不舍定具体值时设定一个默认值 Column(‘created_dbtime’, DateTime, PassiveDefault(‘sysdate’)), # PassiveDefault是数据库级别的默认值, Column(‘modified’, DateTime, onupdate=datetime.now)) #单设置onupdate这个属性,这是不应用到数据库的设计中的.只是存在于映射类中.

#它是活跃更新的,因为每次执行的时间都不同

user_table = Table( ‘tf_user’, MetaData(), Column(‘id’, Integer, primary_key=True), Column(‘user_name’, Unicode(16), unique=True, nullable=False, index=True), #一旦数据库增长到一定规模时,可能需要考虑增加表的索引,以加快某些操作 Column(‘password’, Unicode(40), nullable=False), Column(‘first_name’, Unicode(255), default=”), Column(‘last_name’, Unicode(255), default=”, index=True))

其中指定索引也可以这样:

i = Index(‘idx_name’, user_table.c.first_name,user_table.c.last_name,unique=True) i.create(bind=e)

brand_table = Table( ‘brand’, metadata, Column(‘id’, Integer, Sequence(‘brand_id_seq’), primary_key=True),  #需要通过序列化方式来创建新主键标识符的数据库,

#SQLAlchemy 并不会自动为其生成。可以指定Sequence 生成 Column(‘name’, Unicode(255), unique=True, nullable=False))

7 元数据操作

meta1 = MetaData(‘postgres://postgres:password@localhost/test’, … reflect=True) meta2 = MetaData(‘sqlite://’) for table in meta1.table_iterator(): table.tometadata(meta2) #通过这个方法让meta1的元数据被meta2使用 meta2.create_all()

2 假如想放弃绑定使用drop_all( )或者drop(e)

前言:我一直是vmware忠实使用者,最近重新编译gentoo内核后重新安装vmware并且升级到8.02,其它模块正常,但是唯独vmnet编译失败,报错日志如下:

2012-07-12T10:33:01.401+08:00| vthread-3| I120: Building module with command: /usr/bin/make -j -C /tmp/vmware-root/modules/vmnet-only auto-build SUPPORT_SMP=1 HEADER_DIR=/lib/modules/3.2.21-gentoo/build/include CC=/usr/bin/gcc GREP=/usr/bin/make IS_GCC_3=no VMCCVER=4.5.3 2012-07-12T10:33:02.088+08:00| vthread-3| I120: Failed to compile module vmnet!

解决:这是vmware软件bug,官方已经提供补丁:

http://communities.vmware.com/servlet/JiveServlet/download/2025997-86343/vmware802fixlinux320.tar.gz

解压缩后2个文件,根据你的vmpalyer和vmware版本修改patch-modules_3.2.0.sh ,然后执行:

dongwm@localhost ~/下载 $ sudo ./patch-modules_3.2.0.sh

前言:一直是firefox忠实用户,本文是普及篇,系统是opensuse11.3,其他系统请借鉴方法

1 首先查看当前系统firefox信息:

linux-vkmz:~ # rpm -qa|grep -i firefox MozillaFirefox-3.6.6-1.2.i586  #这里就是firefox的版本3.6.6,而现在最新版本已经是firefox13.0.1,可是系统源没有更高版本 MozillaFirefox-branding-openSUSE-3.5-16.2.i586 MozillaFirefox-translations-common-3.6.6-1.2.i586 2 下载最新版本firefox:

linux-vkmz:~ # wget http://download.firefox.com.cn/releases/firefox/13.0/zh-CN/Firefox-latest.tar.bz2

linux-vkmz:~ #tar jxf Firefox-latest.tar.bz2 #解压缩

3 查看系统firefox被安装到了那里

linux-vkmz:~ # rpm -ql MozillaFirefox-3.6.6-1.2.i586

/usr/bin/firefox  #我只截取了一部分,可以看见库文件在/usr/lib,可执行文件在/usr/bin /usr/lib/firefox /usr/lib/firefox/application.ini /usr/lib/firefox/blocklist.xml /usr/lib/firefox/chrome /usr/lib/firefox/chrome/browser.jar /usr/lib/firefox/chrome/browser.m

… linux-vkmz:~ #cp -rp firefox/* /usr/lib/firefox/   #拷贝lib文件到系统安装的firefox路径下

linux-vkmz:~ #cp -rp firefox/firefox* /usr/bin/ #拷贝2个可执行文件到/usr/bin下

OK了

前言:sys是一个关系系统配置使用的模块,和os一样经常被使用

import sys

print 'Version info:' print print 'sys.version =', repr(sys.version) #这些是编译python时候的一些系统信息,这事python版本 print 'sys.version_info =', sys.version_info #python版本号元组格式 print 'sys.hexversion =', hex(sys.hexversion) #版本标识为一个整数 print 'sys.subversion =', sys.subversion #python分支,修订号等信息 print 'sys.api_version =', sys.api_version #C的api版本 print 'This interpreter was built for:', sys.platform #系统版本,比如linux,windows等类似于命令uname -s的结果 print 'Interpreter executable:', sys.executable #编译器的位置 /usr/bin/python print 'Installation prefix :', sys.prefix #包含bin,lib目录的服目录,这里是/usr print sys.flags #当执行python -h 出现很多选项,当执行某选项和参数,那么sys.flag就包含了这个项 print 'Default encoding :', sys.getdefaultencoding() #默认文件Unicode编码 print 'Filesystem encoding :', sys.getfilesystemencoding() #默认文件系统编码 #reload(sys) #当想要修改文件编码,先reloadsys模块(否则没有setdefaultencoding函数),再set #sys.setdefaultencoding('utf-8')

>>> sys.ps1
'>>> ' #ps1是交互模式中的提示符,效果类似于linux的PS1
>>> sys.ps2
'... ' #ps2是交互模式中,当出现":"等需要多行输入的数据时候的提示符
>>> sys.ps1='-->'
-->sys.ps2='~~~'
-->for i in xrange(2): 
~~~    print i
~~~
0
1

import sys

class LineCounter(object): #修改PS1修饰符还可以定义类的方式,假设本文件是test.py  def __init__(self): self.count = 0 #对命令计数,初始为0  def __str__(self): self.count += 1 #使用一次触发一次计数增加 return '(%3d)> ' % self.count

进入交互模式:

>>> from test import LineCounter
>>> import sys
>>> sys.ps1 = LineCounter()
(  1)> 
(  2)> #点击一次增加1
(  3)>

import sys

class ExpressionCounter(object):

def __init__(self): self.count = 0 self.previous_value = self

def __call__(self, value): print print ' Previous:', self.previous_value print ' New :', value print if value != self.previous_value: self.count += 1 sys.ps1 = '(%3d)> ' % self.count self.previous_value = value sys.__displayhook__(value) #默认值保存在这里 print 'installing' sys.displayhook = ExpressionCounter()#钩子,我的理解是每次使用都要触发的表达式 假设本文件是test

dongwm@linux-vkmz:~> python
Python 2.6.5 (r265:79063, Jul  5 2010, 11:47:21) 
[GCC 4.5.0 20100604 [gcc-4_5-branch revision 160292]] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> import test
installing
>>> 1+2

Previous: <test.ExpressionCounter object at 0x80ab2ac> New : 3

3 ( 1)> 'abc'

Previous: 3 New : abc

'abc' ( 2)> 'abc'

Previous: abc New : abc

'abc' ( 2)> 'abc'*3

Previous: abc New : abcabcabc

'abcabcabc'

 

import sys
import textwrap

names = sorted(sys.modules.keys()) #sys.modules是模块名和模块路径信息的字典集合 name_text = ', '.join(names)

print textwrap.fill(name_text) for name in sys.builtin_module_names: #内建模块就是不用import进来就能使用的模块,这些模块不再上面的sys.modules里面 print name for d in sys.path: # sys.path是管理模块的搜索路径的列表集合,包含当前目录,site-packages目录列在最后 print d #注:当在执行程序时前面设置:PYTHONPATH=XXX:xxx,则首先输出它指定的目录

import sys
import os

base_dir = os.path.dirname(__file__) or '.' #dirname(__file__)表示当前程序文件所在目录 print 'Base directory:', base_dir

package_dir_a = os.path.join(base_dir, 'package_dir_a') sys.path.insert(0, package_dir_a) #在当前目录插入一个python目录路径

import example print 'Imported example from:', example.__file__ print '\t', example.DATA #比如这里是的数据是A

package_dir_b = os.path.join(base_dir, 'package_dir_b') sys.path.insert(0, package_dir_b) #在当前目录再插入一个python目录路径 reload(example) #reload这个模块,但是首先搜索的是package_dir_b下的这个模块数据 print 'Reloaded example from:', example.__file__ print '\t', example.DATA #这里的数据就是B了

import sys

class NoisyImportFinder(object):

PATH_TRIGGER = 'NoisyImportFinder_PATH_TRIGGER' #设定一个数据,方便识别 def __init__(self, path_entry): print 'Checking NoisyImportFinder support for %s' % path_entry if path_entry != self.PATH_TRIGGER: print 'NoisyImportFinder does not work for %s' % path_entry raise ImportError() return

def find_module(self, fullname, path=None): print 'NoisyImportFinder looking for "%s"' % fullname return None

sys.path_hooks.append(NoisyImportFinder) #添加自定义的import的发现者

sys.path.insert(0, NoisyImportFinder.PATH_TRIGGER) #将数据路径插入到python路径

try: import target_module except Exception, e: print 'Import failed:', e

执行:

dongwm@linux-vkmz:~> python test.py Checking NoisyImportFinder support for NoisyImportFinder_PATH_TRIGGER #首先import使用我们自定义的import发现类 NoisyImportFinder looking for “target_module” Checking NoisyImportFinder support for /home/dongwm #使用下一个python路径(当前目录)搜索 NoisyImportFinder does not work for /home/dongwm #不工作 Import failed: No module named target_module #使用系统的import发现

import sys #这事一个脚本,用来生成shelve对象
import shelve
import os

filename = './import_example.shelve' if os.path.exists(filename): os.unlink(filename) db = shelve.open(filename) try: db['data:README'] = """ ============== package README ==============

This is the README for ``package``. """ db['package.__init__'] = """ print 'package imported' message = 'This message is in package.__init__' """ db['package.module1'] = """ print 'package.module1 imported' message = 'This message is in package.module1' """ db['package.subpackage.__init__'] = """ print 'package.subpackage imported' message = 'This message is in package.subpackage.__init__' """ db['package.subpackage.module2'] = """ print 'package.subpackage.module2 imported' message = 'This message is in package.subpackage.module2' """ db['package.with_error'] = """ print 'package.with_error being imported' raise ValueError('raising exception to break import') """ print 'Created %s with:' % filename for key in sorted(db.keys()): print '\t', key finally: db.close()

执行:

dongwm@linux-vkmz:~> python test.py Created ./import_example.shelve with: data:README package.__init__ package.module1 package.subpackage.__init__ package.subpackage.module2 package.with_error

import contextlib
import imp
import os
import shelve
import sys

@contextlib.contextmanager def shelve_context(filename, flag='r'): #创建上下文管理方法,让shelves使用with db = shelve.open(filename, flag) try: yield db finally: db.close()

def _mk_init_name(fullname): #返回被定义了包名字的__init__ module的名字 if fullname.endswith('.__init__'): return fullname return fullname + '.__init__'

def _get_key_name(fullname, db): #在shelves里面寻找fullname或者fullname.__init__,返回名字 if fullname in db: return fullname init_name = _mk_init_name(fullname) if init_name in db: return init_name return None

class ShelveFinder(object): #shelve归档里面寻找模块 def __init__(self, path_entry): if not os.path.isfile(path_entry): raise ImportError try: with shelve_context(path_entry): pass except Exception, e: raise ImportError(str(e)) else: print 'new shelf added to import path:', path_entry self.path_entry = path_entry return

def __str__(self): return '<%s for "%s">' % (self.__class__.__name__, self.path_entry)

def find_module(self, fullname, path=None): path = path or self.path_entry print 'looking for "%s" in %s ...' % (fullname, path), with shelve_context(path) as db: key_name = _get_key_name(fullname, db) if key_name: print 'found it as %s' % key_name return ShelveLoader(path) print 'not found' return None

class ShelveLoader(object): #从shelve数据里面为模块加载源

def __init__(self, path_entry): self.path_entry = path_entry return

def _get_filename(self, fullname): # Make up a fake filename that starts with the path entry # so pkgutil.get_data() works correctly. return os.path.join(self.path_entry, fullname) #设置假文件,pkgutil.get_data()正常 def get_source(self, fullname): print 'loading source for "%s" from shelf' % fullname try: with shelve_context(self.path_entry) as db: key_name = _get_key_name(fullname, db) if key_name: return db[key_name] raise ImportError('could not find source for %s' % fullname) except Exception, e: print 'could not load source:', e raise ImportError(str(e))

def get_code(self, fullname): source = self.get_source(fullname) print 'compiling code for "%s"' % fullname return compile(source, self._get_filename(fullname), 'exec', dont_inherit=True)

def get_data(self, path): print 'looking for data in %s for "%s"' % (self.path_entry, path) if not path.startswith(self.path_entry): raise IOError path = path[len(self.path_entry)+1:] key_name = 'data:' + path try: with shelve_context(self.path_entry) as db: return db[key_name] except Exception, e: # Convert all errors to IOError raise IOError

def is_package(self, fullname): init_name = _mk_init_name(fullname) with shelve_context(self.path_entry) as db: return init_name in db

def load_module(self, fullname): source = self.get_source(fullname)

if fullname in sys.modules: print 'reusing existing module from previous import of "%s"' % fullname mod = sys.modules[fullname] else: print 'creating a new module object for "%s"' % fullname mod = sys.modules.setdefault(fullname, imp.new_module(fullname)) mod.__file__ = self._get_filename(fullname) mod.__name__ = fullname mod.__path__ = self.path_entry mod.__loader__ = self mod.__package__ = '.'.join(fullname.split('.')[:-1])

if self.is_package(fullname): print 'adding path for package' # Set __path__ for packages # so we can find the sub-modules. mod.__path__ = [ self.path_entry ] else: print 'imported as regular module'

print 'execing source...' exec source in mod.__dict__ print 'done' return mod

import sys
import sys_shelve_importer

def show_module_details(module): print ' message :', module.message print ' __name__ :', module.__name__ print ' __package__:', module.__package__ print ' __file__ :', module.__file__ print ' __path__ :', module.__path__ print ' __loader__ :', module.__loader__

filename = '/tmp/pymotw_import_example.shelve' sys.path_hooks.append(sys_shelve_importer.ShelveFinder) sys.path.insert(0, filename) #导入shelve文件

print 'Import of "package":' import package #import这个包

print print 'Examine package details:' show_module_details(package)

print print 'Global settings:' print 'sys.modules entry:', sys.modules['package']

执行结果:

dongwm@linux-vkmz:~> python test.py Import of “package”: new shelf added to import path: import_example.shelve looking for “package” in import_example.shelve … found it as package.__init__ loading source for “package” from shelf creating a new module object for “package” adding path for package execing source… package imported done

Examine package details: message    : This message is in package.__init__ __name__   : package __package__: __file__   : import_example.shelve/package __path__   : [‘import_example.shelve’] __loader__ : <sys_shelve_importer.ShelveLoader object at 0x80aed0c>

Global settings: sys.modules entry: <module ‘package’ from ‘import_example.shelve/package’>

import sys
import sys_shelve_importer

def show_module_details(module): print ' message :', module.message print ' __name__ :', module.__name__ print ' __package__:', module.__package__ print ' __file__ :', module.__file__ print ' __path__ :', module.__path__ print ' __loader__ :', module.__loader__

filename = 'import_example.shelve' sys.path_hooks.append(sys_shelve_importer.ShelveFinder) sys.path.insert(0, filename)

print print 'Import of "package.module1":' import package.module1 #import子模块

print print 'Examine package.module1 details:' show_module_details(package.module1)

print print 'Import of "package.subpackage.module2":' import package.subpackage.module2

print print 'Examine package.subpackage.module2 details:' show_module_details(package.subpackage.module2)

执行结果:

dongwm@linux-vkmz:~> python test.py

Import of “package.module1”: new shelf added to import path: import_example.shelve looking for “package” in import_example.shelve … found it as package.__init__ loading source for “package” from shelf creating a new module object for “package” adding path for package execing source… package imported done looking for “package.module1” in import_example.shelve … found it as package.module1 loading source for “package.module1” from shelf creating a new module object for “package.module1” imported as regular module execing source… package.module1 imported done

Examine package.module1 details: message    : This message is in package.module1 __name__   : package.module1 __package__: package __file__   : import_example.shelve/package.module1 __path__   : import_example.shelve __loader__ : <sys_shelve_importer.ShelveLoader object at 0x8119d6c>

Import of “package.subpackage.module2”: looking for “package.subpackage” in import_example.shelve … found it as package.subpackage.__init__ loading source for “package.subpackage” from shelf creating a new module object for “package.subpackage” adding path for package execing source… package.subpackage imported done looking for “package.subpackage.module2” in import_example.shelve … found it as package.subpackage.module2 loading source for “package.subpackage.module2” from shelf creating a new module object for “package.subpackage.module2” imported as regular module execing source… package.subpackage.module2 imported done

Examine package.subpackage.module2 details: message    : This message is in package.subpackage.module2 __name__   : package.subpackage.module2 __package__: package.subpackage __file__   : import_example.shelve/package.subpackage.module2 __path__   : import_example.shelve __loader__ : <sys_shelve_importer.ShelveLoader object at 0x8119e0c>

import sys
import sys_shelve_importer

filename = 'import_example.shelve' sys.path_hooks.append(sys_shelve_importer.ShelveFinder) sys.path.insert(0, filename)

print 'First import of "package":' import package

print print 'Reloading "package":' reload(package) #创建一个新的模块对象,而不是现有的模块被重新使用

执行结果:

dongwm@linux-vkmz:~> python test.py First import of “package”: new shelf added to import path: import_example.shelve looking for “package” in import_example.shelve … found it as package.__init__ loading source for “package” from shelf creating a new module object for “package” adding path for package execing source… package imported done

Reloading “package”: looking for “package” in import_example.shelve … found it as package.__init__ loading source for “package” from shelf reusing existing module from previous import of “package” adding path for package execing source… package imported done

import sys
import sys_shelve_importer

filename = 'import_example.shelve' sys.path_hooks.append(sys_shelve_importer.ShelveFinder) sys.path.insert(0, filename)

try: import package.module3 except ImportError, e: print 'Failed to import:', e

执行结果:

dongwm@linux-vkmz:~> python test.py new shelf added to import path: import_example.shelve looking for “package” in import_example.shelve … found it as package.__init__ loading source for “package” from shelf creating a new module object for “package” adding path for package execing source… package imported done looking for “package.module3” in import_example.shelve … not found Failed to import: No module named module3

import sys
import sys_shelve_importer
import os
import pkgutil

filename = '/tmp/pymotw_import_example.shelve' sys.path_hooks.append(sys_shelve_importer.ShelveFinder) sys.path.insert(0, filename)

import package

readme_path = os.path.join(package.__path__[0], 'README')

#readme = package.__loader__.get_data(readme_path) readme = pkgutil.get_data('package', 'README') #返回文件内容 print readme

foo_path = os.path.join(package.__path__[0], 'foo') #foo = package.__loader__.get_data(foo_path) foo = pkgutil.get_data('package', 'foo') print foo

执行结果:

dongwm@linux-vkmz:~> python test.py new shelf added to import path: import_example.shelve looking for “package” in import_example.shelve … found it as package.__init__ loading source for “package” from shelf creating a new module object for “package” adding path for package execing source… package imported done looking for data in import_example.shelve for “import_example.shelve/README”

============== package README ==============

This is the README for “package“.

looking for data in import_example.shelve for “import_example.shelve/foo” Traceback (most recent call last): File “test.py”, line 20, in <module> foo = pkgutil.get_data(‘package’, ‘foo’) File “/usr/lib/python2.6/pkgutil.py”, line 583, in get_data return loader.get_data(resource_name) File “/home/dongwm/sys_shelve_importer.py”, line 107, in get_data raise IOError IOError

import sys

def my_excepthook(type, value, traceback): #三个参数包含:错误类型,错误值和错误的回调 print 'Unhandled error:', type, value

sys.excepthook = my_excepthook #设置一个异常处理方法的通用处理

print 'Before exception'

raise RuntimeError('This is the error message')

print 'After exception'

执行结果:

dongwm@linux-vkmz:~> python test.py Before exception Unhandled error: <type ‘exceptions.RuntimeError’> This is the error message  #由这个异常做的处理,因为异常,不再执行后面的print

import sys
import threading
import time

def do_something_with_exception(): exc_type, exc_value = sys.exc_info()[:2] #当出现异常,通过sys.exc_info捕获当前异常,返回元组包含错误类型,错误值 print 'Handling %s exception with message "%s" in %s' % \ (exc_type.__name__, exc_value, threading.current_thread().name)

def cause_exception(delay): time.sleep(delay) raise RuntimeError('This is the error message')

def thread_target(delay): try: cause_exception(delay) except: do_something_with_exception()

threads = [ threading.Thread(target=thread_target, args=(0.3,)), threading.Thread(target=thread_target, args=(0.1,)), ] for t in threads: t.start() for t in threads: t.join()

执行结果:

dongwm@linux-vkmz:~> python test.py Handling RuntimeError exception with message “This is the error message” in Thread-2 #因为第2个线程time.sleep时间短,先完成 Handling RuntimeError exception with message “This is the error message” in Thread-1

import sys

def trace_calls(frame, event, arg): #调用跟踪函数需要三个参数:正在运行的代码 的堆栈帧,事件通知类型,事件参数 if event != 'call': return co = frame.f_code func_name = co.co_name if func_name == 'write': return #放弃write func_line_no = frame.f_lineno func_filename = co.co_filename caller = frame.f_back caller_line_no = caller.f_lineno caller_filename = caller.f_code.co_filename print 'Call to %s on line %s of %s from line %s of %s' % \ (func_name, func_line_no, func_filename, caller_line_no, caller_filename) return

def b(): print 'in b()'

def a(): print 'in a()' b()

sys.settrace(trace_calls) #settrace用来跟踪程序运行 a()

执行结果:

dongwm@linux-vkmz:~> python test.py Call to a on line 24 of test.py from line 29 of test.py #先运行到24行的def a() in a() Call to b on line 21 of test.py from line 26 of test.py #函数a调用了21行的b() in b()

注:正在执行的事件类型:

事件 时间点 参数
‘call’ 方法执行前 None
‘line’ 某行被执行前 None
‘return’ 方法返回前 返回值
‘exception’ 异常发生 包含类型,值,回调的元组
‘c_call’ c方法call前 c方法对象
‘c_return’ c方法返回后 None
‘c_exception’ c方法出现错误后 None
import sys

def trace_lines(frame, event, arg): if event != 'line': return co = frame.f_code func_name = co.co_name line_no = frame.f_lineno filename = co.co_filename print ' %s line %s' % (func_name, line_no)

def trace_calls(frame, event, arg): if event != 'call': return co = frame.f_code func_name = co.co_name if func_name == 'write': return line_no = frame.f_lineno filename = co.co_filename print 'Call to %s on line %s of %s' % (func_name, line_no, filename) if func_name in TRACE_INTO: #跟踪到新的方法里面,使用一个本地跟踪 return trace_lines return

def c(input): print 'input =', input print 'Leaving c()'

def b(arg): val = arg * 5 c(val) print 'Leaving b()'

def a(): b(2) print 'Leaving a()'

TRACE_INTO = ['b'] #保存在变量函数的全局列表

sys.settrace(trace_calls) a()

执行结果:

dongwm@linux-vkmz:~> python test.py Call to a on line 37 of test.py Call to b on line 32 of test.py b line 33 b line 34 Call to c on line 28 of test.py input = 10 Leaving c() b line 35 Leaving b() Leaving a()

import sys

def trace_calls_and_returns(frame, event, arg): co = frame.f_code func_name = co.co_name if func_name == 'write': return line_no = frame.f_lineno filename = co.co_filename if event == 'call': print 'Call to %s on line %s of %s' % (func_name, line_no, filename) return trace_calls_and_returns #返回值可以监测 到自身的引用 elif event == 'return': print '%s => %s' % (func_name, arg) return

def b(): print 'in b()' return 'response_from_b '

def a(): print 'in a()' val = b() return val * 2

sys.settrace(trace_calls_and_returns) a()

执行结果:

dongwm@linux-vkmz:~> python test.py Call to a on line 22 of test.py in a() Call to b on line 18 of test.py in b() b => response_from_b a => response_from_b response_from_b

import sys

def trace_exceptions(frame, event, arg): #通过寻找一个本地跟踪功能的异常事件监控异常,异常发生会返回错误类型,值和回调数据 if event != 'exception': return co = frame.f_code func_name = co.co_name line_no = frame.f_lineno filename = co.co_filename exc_type, exc_value, exc_traceback = arg print 'Tracing exception: %s "%s" on line %s of %s' % \ (exc_type.__name__, exc_value, line_no, func_name)

def trace_calls(frame, event, arg): if event != 'call': return co = frame.f_code func_name = co.co_name if func_name in TRACE_INTO: return trace_exceptions

def c(): raise RuntimeError('generating exception in c()')

def b(): c() print 'Leaving b()'

def a(): b() print 'Leaving a()'

TRACE_INTO = ['a', 'b', 'c']

sys.settrace(trace_calls) try: a() except Exception, e: print 'Exception handler:', e

执行结果:

dongwm@linux-vkmz:~> python test.py Tracing exception: RuntimeError “generating exception in c()” on line 23 of c Tracing exception: RuntimeError “generating exception in c()” on line 26 of b Tracing exception: RuntimeError “generating exception in c()” on line 30 of a Exception handler: generating exception in c()

import sys
import threading
from Queue import Queue
import time

def show_thread(q, extraByteCodes): for i in range(5): for j in range(extraByteCodes): pass q.put(threading.current_thread().name) return

def run_threads(prefix, interval, extraByteCodes): print '%(prefix)s interval = %(interval)s with %(extraByteCodes)s extra operations' % locals() sys.setcheckinterval(interval) #线程执行多任务。在一个固定的时间间隔,暂停字节码执行和解释检查是否需要执行任何信号处理。在相同的时间间隔检查,全局解释器锁释放当前线程, #然后获取,给其他线程一个机会执行,setcheckinterval修改这个间隔,但是可能影响性能 q = Queue() threads = [ threading.Thread(target=show_thread, name='%s T%s' % (prefix, i), args=(q, extraByteCodes) ) for i in range(3) ] for t in threads: t.start() for t in threads: t.join() while not q.empty(): print q.get() print return

run_threads('Default', interval=10, extraByteCodes=1000) run_threads('Custom', interval=10, extraByteCodes=0)

执行结果:

dongwm@linux-vkmz:~> python test.py Default interval = 10 with 1000 extra operations #当检查间隔是小于线程字节码的数量,解释可能给另一个线程控制,以便它运行一段时间 Default T0 Default T2 Default T1 Default T0 Default T2 Default T1 Default T0 Default T2 Default T1 Default T0 Default T0 Default T2 Default T1 Default T2 Default T1

Custom interval = 10 with 0 extra operations #当检查间隔是大 于由一个线程正在执行的字节码数时不会释放控制,该线程将在时间间隔出现前完成其工作 Custom T0 Custom T0 Custom T0 Custom T0 Custom T0 Custom T1 Custom T1 Custom T2 Custom T1 Custom T2 Custom T1 Custom T1 Custom T2 Custom T2 Custom T2

import sys
import threading
from Queue import Queue
import time

def show_thread(q, extraByteCodes): for i in range(5): for j in range(extraByteCodes): pass #q.put(threading.current_thread().name) print threading.current_thread().name ##使该线程打印直接追加到队列中,而不是sys.stdout修改。输出是可预测的要少得多 return

def run_threads(prefix, interval, extraByteCodes): print '%(prefix)s interval = %(interval)s with %(extraByteCodes)s extra operations' % locals() sys.setcheckinterval(interval) q = Queue() threads = [ threading.Thread(target=show_thread, name='%s T%s' % (prefix, i), args=(q, extraByteCodes) ) for i in range(3) ] for t in threads: t.start() for t in threads: t.join() while not q.empty(): print q.get() print return

run_threads('Default', interval=100, extraByteCodes=1000) run_threads('Custom', interval=10, extraByteCodes=0)

import sys
import threading
import time

io_lock = threading.Lock() blocker = threading.Lock()

def block(i): t = threading.current_thread() with io_lock: print '%s with ident %s going to sleep' % (t.name, t.ident) if i: blocker.acquire() # 获取但不释放 time.sleep(0.2) with io_lock: print t.name, 'finishing' return

threads = [ threading.Thread(target=block, args=(i,)) for i in range(3) ] for t in threads: t.setDaemon(True) t.start()

threads_by_ident = dict((t.ident, t) for t in threads) #映射对象和标识

time.sleep(0.01) with io_lock: for ident, frame in sys._current_frames().items(): #识别死锁线程工作很难,sys._current_frames()可以帮助准确显示 t = threads_by_ident.get(ident) if not t: #主线程忽略 continue print t.name, 'stopped in', frame.f_code.co_name, print 'at line', frame.f_lineno, 'of', frame.f_code.co_filename

dongwm@linux-vkmz:~> python test.py Thread-1 with ident -1221342352 going to sleep Thread-1 finishing #线程一没有sleep,完成了 Thread-2 with ident -1221342352 going to sleep #线程2获取block Thread-3 with ident -1229735056 going to sleep Thread-3 stopped in block at line 13 of test.py #线程3想获取block,但是被线程2占用 Thread-2 stopped in block at line 14 of test.py

 

import sys
print 'Arguments:', sys.argv #命令行参数

执行结果:

dongwm@linux-vkmz:~ python test.py -h Arguments: [‘test.py’, ‘-h’]

import sys

print sys.stderr, 'STATUS: Reading from stdin' #stderr被用于警告或错误信息使用

data = sys.stdin.read() #输入流保存

print sys.stderr, 'STATUS: Writing data to stdout'

sys.stdout.write(data) #打印数据 sys.stdout.flush() #将stdout实时自动刷出来

print sys.stderr, 'STATUS: Done'

import sys

exit_code = int(sys.argv[1]) sys.exit(exit_code) #设置退出时候的code,正常执行是0,不正常就是非0

import sys

one = [] print 'At start :', sys.getrefcount(one) #Python使用引用计数垃圾收集的自动内存管理。一个对象被自动标记被收集时,它的引用计数下降到零,使用getrefcount()审查现有对象的引用计数 two = one print 'Second reference :', sys.getrefcount(one) del two print 'After del :', sys.getrefcount(one)

执行结果:

dongwm@linux-vkmz:~> python test.py At start         : 2 Second reference : 3 After del        : 2

import sys

class OldStyle: pass

class NewStyle(object): pass

for obj in [ [], (), {}, 'c', 'string', 1, 2.3, OldStyle, OldStyle(), NewStyle, NewStyle(), ]: print '%10s : %s' % (type(obj).__name__, sys.getsizeof(obj)) #各种类型对象的大小

执行结果:

dongwm@linux-vkmz:~> python test.py list : 32 tuple : 24 dict : 136 str : 25 str : 30 int : 12 float : 16 classobj : 44 instance : 32 type : 448 NewStyle : 28

import sys

class WithoutAttributes(object): pass

class WithAttributes(object): def __init__(self): self.a = 'a' self.b = 'b' return

without_attrs = WithoutAttributes() print 'WithoutAttributes:', sys.getsizeof(without_attrs)

with_attrs = WithAttributes() print 'WithAttributes:', sys.getsizeof(with_attrs)

执行结果:

dongwm@linux-vkmz:~> python test.py WithoutAttributes: 28 WithAttributes: 28 #带属性的和不带属性的类竟然一样大看下面的例子,通过聚合对象的 属性的大小来计算:

import sys

class WithAttributes(object): def __init__(self): self.a = 'a' self.b = 'b' return def __sizeof__(self): return object.__sizeof__(self) + \ sum(sys.getsizeof(v) for v in self.__dict__.values())

my_inst = WithAttributes() print sys.getsizeof(my_inst)

执行结果:

dongwm@linux-vkmz:~> python test.py 78  #这样就不是28,这样就合理了

import sys

print 'Initial limit:', sys.getrecursionlimit() #得到递归的次数,要不然不停的自己调用自己会引起崩溃 sys.setrecursionlimit(10) #设置递归的次数

print 'Modified limit:', sys.getrecursionlimit()

def generate_recursion_error(i): print 'generate_recursion_error(%s)' % i generate_recursion_error(i+1) #不断的递归调用自己,但是计数加+1

try: generate_recursion_error(1) except RuntimeError, err: print 'Caught exception:', err

执行结果:

dongwm@linux-vkmz:~> python test.py Initial limit: 1000 #默认可递归的次数为1000 Modified limit: 10 generate_recursion_error(1) generate_recursion_error(2) generate_recursion_error(3) generate_recursion_error(4) generate_recursion_error(5) generate_recursion_error(6) generate_recursion_error(7) generate_recursion_error(8) Caught exception: maximum recursion depth exceeded while getting the str of an object #到达递归限制

import sys

print 'maxint :', sys.maxint #python支持的最大数字 print 'maxsize :', sys.maxsize #python支持的最大的数据大小 print 'maxunicode:', sys.maxunicode #支持 Unicode的最大整数

 

socket服务器端:

import socket
import sys

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) #socket.AF_INET表示类型是ipv4,SOCK_STREAM提供面向连接的稳定数据传输,即TCP协议.相应的UDP就是SOCK_DGRAM server_address = ('localhost', 10000) print >>sys.stderr, 'starting up on %s port %s' % server_address sock.bind(server_address) #绑定到本地的10000端口 sock.listen(1) #使用主动连接套接口变为被连接套接口,使得一个进程可以接受其它进程的请求,从而成为一个服务器进程。指定最多允许1个客户连接到服务器

while True: print >>sys.stderr, 'waiting for a connection' connection, client_address = sock.accept() #在listen()后监听连接 try: print >>sys.stderr, 'connection from', client_address

while True: #等待收到数据后。。。 data = connection.recv(16) #从TCP连接的另一端接收数据,缓存区为16 print >>sys.stderr, 'received "%s"' % data if data: print >>sys.stderr, 'sending data back to the client' connection.sendall(data) else: print >>sys.stderr, 'no more data from', client_address break

finally: connection.close() #关闭socket连接

import socket
import sys

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) #创建socket连接

server_address = ('localhost', 10000) print >>sys.stderr, 'connecting to %s port %s' % server_address sock.connect(server_address) #连接到socket服务器的端口 try: message = 'This is the message. It will be repeated.' print >>sys.stderr, 'sending "%s"' % message sock.sendall(message) #send().sendall() 之间做出选择。前者一次发送尽可能多的字节数,后者发送整个报文(如果不能发送就会引发一个异常) amount_received = 0 amount_expected = len(message)

while amount_received < amount_expected: data = sock.recv(16) amount_received += len(data) print >>sys.stderr, ‘received “%s”’ % data

finally: print >>sys.stderr, ‘closing socket’ sock.close()

import socket
import sys

sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) #创建一个udp的socket连接 server_address = ('localhost', 10000) print >>sys.stderr, 'starting up on %s port %s' % server_address sock.bind(server_address) while True: print >>sys.stderr, '\nwaiting to receive message' data, address = sock.recvfrom(4096) #udp使用recvfrom接收

print >>sys.stderr, 'received %s bytes from %s' % (len(data), address) print >>sys.stderr, data

if data: sent = sock.sendto(data, address) #udp使用sendto发送 print >>sys.stderr, 'sent %s bytes back to %s' % (sent, address)

import socket
import sys

sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) #udp的socket客户端

server_address = ('localhost', 10000) message = 'This is the message. It will be repeated.'

try:

print >>sys.stderr, 'sending "%s"' % message sent = sock.sendto(message, server_address)

print >>sys.stderr, 'waiting to receive' data, server = sock.recvfrom(4096) print >>sys.stderr, 'received "%s"' % data

finally: print >>sys.stderr, 'closing socket' sock.close()

import socket
import sys
import os

server_address = './uds_socket' #一个socket文件

try: os.unlink(server_address) #确保这个文件本来不存在 except OSError: if os.path.exists(server_address): raise sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) #创建一个unix domain socket print >>sys.stderr, 'starting up on %s' % server_address sock.bind(server_address) sock.listen(1) while True: print >>sys.stderr, 'waiting for a connection' connection, client_address = sock.accept() try: print >>sys.stderr, 'connection from', client_address while True: data = connection.recv(16) print >>sys.stderr, 'received "%s"' % data if data: print >>sys.stderr, 'sending data back to the client' connection.sendall(data) else: print >>sys.stderr, 'no more data from', client_address break

finally: connection.close()

import socket
import sys

sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) #unix domain socket客户端

server_address = './uds_socket' print >>sys.stderr, 'connecting to %s' % server_address try: sock.connect(server_address) except socket.error, msg: print >>sys.stderr, msg sys.exit(1)

try:

message = 'This is the message. It will be repeated.' print >>sys.stderr, 'sending "%s"' % message sock.sendall(message) amount_received = 0 amount_expected = len(message) while amount_received < amount_expected: data = sock.recv(16) amount_received += len(data) print >>sys.stderr, 'received "%s"' % data finally: print >>sys.stderr, 'closing socket' sock.close()

注:这个socket文件需要注意权限:

dongwm@localhost ~ $ls -l !$ ls -l uds_socket srwxr-xr-x 1 dongwm dongwm 0  7月  9 12:46 uds_socket 假如你没有权限,那么会告诉你:connecting to ./uds_socket [Errno 13] Permission denied

import socket
import os

parent, child = socket.socketpair() #socket父子进程之间通信

pid = os.fork()

if pid: print 'in parent, sending message' child.close() parent.sendall('ping') response = parent.recv(1024) print 'response from child:', response parent.close()

else: print 'in child, waiting for message' parent.close() message = child.recv(1024) print 'message from parent:', message child.sendall('pong') child.close()

执行结果:

dongwm@localhost ~ $python test.py in parent, sending message in child, waiting for message message from parent: ping response from child: pong

import socket
import struct
import sys

message = 'very important data' multicast_group = ('224.3.29.71', 10000) #在发送者和每一接收者之间实现点对多点网络连接。如果一台发送者同时给多个的接收者传输相同的数据,也只需复制一份的相同数据包。它提高了数据传送效率。减少了骨干网络出现拥塞的可能性 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) #创建socket,用来接收,这个程序可以运行在多个IP上面,组播只能用UDP sock.settimeout(0.2) #设置超时 ttl = struct.pack('b', 1) sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, ttl) #设置组播 try:

print >>sys.stderr, 'sending "%s"' % message sent = sock.sendto(message, multicast_group) while True: print >>sys.stderr, 'waiting to receive' try: data, server = sock.recvfrom(16) except socket.timeout: print >>sys.stderr, 'timed out, no more responses' break else: print >>sys.stderr, 'received "%s" from %s' % (data, server) finally: print >>sys.stderr, 'closing socket' sock.close()

import socket
import struct
import sys

multicast_group = '224.3.29.71' server_address = ('', 10000) sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) #这个程序发送组播数据 sock.bind(server_address) #绑定组播地址 group = socket.inet_aton(multicast_group) mreq = struct.pack('4sL', group, socket.INADDR_ANY) sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) #告诉系统在所有接口增加这个socket到组播组 while True: print >>sys.stderr, '\nwaiting to receive message' data, address = sock.recvfrom(1024)

print >>sys.stderr, 'received %s bytes from %s' % (len(data), address) print >>sys.stderr, data

print >>sys.stderr, 'sending acknowledgement to', address sock.sendto('ack', address)

执行结果:

ip1(192.168.8.49):

[htdocs@debian ~]$ python test.py

waiting to receive message received 19 bytes from (‘192.168.8.48’, 59274) very important data sending acknowledgement to (‘192.168.8.48’, 59274)

waiting to receive message ip2(192.168.8.48):

dongwm@localhost ~ $python test1.py

waiting to receive message received 19 bytes from (‘192.168.8.48’, 59274) very important data sending acknowledgement to (‘192.168.8.48’, 59274)

waiting to receive message 发送端(192.168.8.48):

dongwm@localhost ~ $python test.py sending “very important data” waiting to receive received “ack” from (‘192.168.8.48’, 10000) waiting to receive received “ack” from (‘192.168.8.49’, 10000) waiting to receive timed out, no more responses closing socket

import binascii
import socket
import struct
import sys

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_address = ('localhost', 10000) sock.bind(server_address) sock.listen(1)

unpacker = struct.Struct('I 2s f')

while True: print >>sys.stderr, '\nwaiting for a connection' connection, client_address = sock.accept() try: data = connection.recv(unpacker.size) print >>sys.stderr, 'received "%s"' % binascii.hexlify(data) #收到的二进制数据

unpacked_data = unpacker.unpack(data) print >>sys.stderr, 'unpacked:', unpacked_data #数据解包

finally: connection.close()

import binascii
import socket
import struct
import sys

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_address = ('localhost', 10000) sock.connect(server_address)

values = (1, 'ab', 2.7) packer = struct.Struct('I 2s f') packed_data = packer.pack(*values) #压包

try: print >>sys.stderr, 'sending "%s"' % binascii.hexlify(packed_data), values sock.sendall(packed_data)

finally: print >>sys.stderr, 'closing socket' sock.close()

执行结果:

dongwm@localhost ~ $python !$ python server.py  #接收端

waiting for a connection received “0100000061620000cdcc2c40” unpacked: (1, ‘ab’, 2.700000047683716)

waiting for a connection dongwm@localhost ~ $python test.py  #发送端 sending “0100000061620000cdcc2c40” (1, ‘ab’, 2.7) closing socket

升级步骤:

1 emerge -–sync #同步更新目录 2 emerge portage #更新portage 3 emerge python #更新python,这属于个人习惯 4 python-updater #更新python配置 5 emerge -aDjNquv @world #升级整个系统(包括依赖),-D 表示-deep,-j不加参数表示不限制同时运行的emerge进程(模式是1)

#-N表示查看每个包的可用的USE参数,-q表示减少显示器屏幕输出,-u表示 –update ,-v表示详细模式 6 emerge -av –depclean #清除不需要的软件包 7 revdep-rebuild #检查系统依赖关系自动安装不满足的 关系包,假如过程中出现错误,重新 执行还是过去的包数量,

#这是因为对需要重新安装的依赖包的数据缓存了,可以删除:rm /var/cache/revdep-rebuild/*.rr 8 dispatch-conf或etc-update #更新系统配置文件

9 (可选)emerge -ej world #重新编译系统,假如USE修改不大就不需要

1 [blocks B ] 原因:说明小于sys-libs/talloc-2.0.5的包被sys-libs/talloc-2.0.7被屏蔽了,需要卸载重装新版本 解决:dongwm@localhost ~ $sudo emerge -C talloc && sudo emerge -1 talloc 2 !!! The following updates are masked by LICENSE changes: - www-plugins/adobe-flash-11.2.202.236 原因:这是adobe-flash造成的阻止安装,需要手动容许这个LICENSE 解决:echo “www-plugins/adobe-flash AdobeFlash-10.3” >> /etc/portage/package.license #其中的10.3是报错提示信息重提到的

3 !!! One of the following masked packages is required to complete your request: - x11-plugins/compiz-plugins-extra-0.8.8::gentoo (masked by: corruption)

原因:主要是权限问题,本来更新portage后不存在这个ebuild,但是依赖关系又需要,所以去http://gpo.zugaina.org/x11-plugins/compiz-plugins-extra下载了一个相应版本,其他的地址,你懂的!

解决:ebuild compiz-plugins-extra-0.8.8.ebuild digest

4 emerge: there are no ebuilds built with USE flags to satisfy “>=x11-libs/libdrm-2.4.24[video_cards_nouveau]”. !!! One of the following packages is required to complete your request: - x11-libs/libdrm-2.4.33::gentoo (Change USE: +video_cards_nouveau)

原因:需要指定一个USE标识,可以直接在安装指定 解决:这样安装:USE=video_cards_nouveau emerge  x11-drivers/xf86-video-nouveau 或者给某个包一个特定的USE到/etc/portage/package.use,这里也就是添加一行:

x11-drivers/xf86-video-nouveau video_cards_nouveau 5The following REQUIRED_USE flag constraints are unsatisfied:

osdmenu? ( X ) xscreensaver? ( X ) xv? ( X )

原因:说明USE包括osdmenu对其他标识有影响

解决:需要去掉,或者’-osdmenu’

6 /usr/lib/gcc/i686-pc-linux-gnu/4.5.3/../../../../i686-pc-linux-gnu/bin/ld: warning: libxcb-util.so.0, needed by /usr/lib/gcc/i686-pc-linux-gnu/4.5.3/../../../../lib/libstartup-notification-1.so, not found (try using -rpath or -rpath-link)(安装时)

原因: libxcb-util的动态链接有问题

解决:需要重新emerge x11-libs/xcb-util, x11-libs/libxc 然后 revdep-rebuild重建依赖关系

7 Traceback (most recent call last): File “configure.py”, line 32, in <module> import sipconfig ImportError: No module named sipconfig * ERROR: dev-python/PyQt4-4.9.1 failed (configure phase): *   (no error message) 原因:安装qt需要sip

解决:重新安装dev-python/sip