SQLAlchemy - wnh5/myStudy GitHub Wiki

原文:http://www.cnblogs.com/iwangzc/p/4112078.html

1.版本检查

import sqlalchemy sqlalchemy.version

2.连接

from sqlalchemy import create_engine engine = create_engine('sqlite:///:memory:',echo=True) echo参数为True时,会显示每条执行的SQL语句,可以关闭。create_engine()返回一个Engine的实例,并且它表示通过数据库语法处理细节的核心接口,在这种情况下,数据库语法将会被解释称Python的类方法。

3.声明映像

当使用ORM【1】时,构造进程首先描述数据库的表,然后定义我们用来映射那些表的类。在现版本的SQLAlchemy中,这两个任务通常一起执行,通过使用Declarative方法,我们可以创建一些包含描述要被映射的实际数据库表的准则的映射类。

使用Declarative方法定义的映射类依据一个基类,这个基类是维系类和数据表关系的目录——我们所说的Declarative base class。在一个普通的模块入口中,应用通常只需要有一个base的实例。我们通过declarative_base()功能创建一个基类:

from sqlalchemy.ext.declarativeimportdeclarative_base Base = declarative_base() 有了这个base,我们可以依据这个base定义任意数量的映射类。一个简单的user例子:

from sqlalchemy import Column, Integer, String class User(Base): tablename= 'users' id= Column(Integer, primary_key=True) name = Column(String) 用Declarative构造的一个类至少需要一个__tablename__属性,一个主键行。

4.构造模式(项目中没用到)

5.创建映射类的实例

ed_user = User(name='ed',fullname='Ed Jones', password='edspassword') 6.创建会话

现在我们已经准备毫和数据库开始会话了。ORM通过Session与数据库建立连接的。当应用第一次载入时,我们定义一个Session类(声明create_engine()的同时),这个Session类为新的Session对象提供工厂服务。

from sqlalchemy.orm import sessionmaker Session = sessionmaker(bind=engine) 这个定制的Session类会创建绑定到数据库的Session对象。如果需要和数据库建立连接,只需要实例化一个Session:

session = Session() 虽然上面的Session已经和数据库引擎Engine关联,但是还没有打开任何连接。当它第一次被使用时,就会从Engine维护的一个连接池中检索是否存在连接,如果存在便会保持连接知道我们提交所有更改并且/或者关闭session对象。

7.添加新对象(简略)

ed_user = User(name='ed', fullname='Ed Jones', password='edspassword') session.add(ed_user) 至此,我们可以认为,新添加的这个对象实例仍在等待中;ed_user对象现在并不代表数据库中的一行数据。直到使用flush进程,Session才会让SQL保持连接。如果查询这条数据的话,所有等待信息会被第一时间刷新,查询结果也会立即发行。

session.commit() 通过commit()可以提交所有剩余的更改到数据库。

8.回滚

session.rollback() 9.查询

通过Session的query()方法创建一个查询对象。这个函数的参数数量是可变的,参数可以是任何类或者是类的描述的集合。下面是一个迭代输出User类的例子:

for instance in session.query(User).order_by(User.id): print instance.name,instance.fullname Query也支持ORM描述作为参数。任何时候,多个类的实体或者是基于列的实体表达都可以作为query()函数的参数,返回类型是元组:

for name, fullname in session.query(User.name,User.fullname):

print name, fullname Query返回的元组被命名为KeyedTuple类的实例元组。并且可以把它当成一个普通的Python数据类操作。元组的名字就相当于属性的属性名,类的类名一样。

for row in session.query(User, User.name).all():

print row.User,row.name <User(name='ed',fullname='Ed Jones', password='f8s7ccs')>ed label()不知道怎么解释,看下例子就明白了。相当于row.name

for row in session.query(User.name.label('name_label')).all():

print(row.name_label) aliased()我的理解是类的别名,如果有多个实体都要查询一个类,可以用aliased()

from sqlalchemy.orm import aliased user_alias = aliased(User, name='user_alias') for row in session.query(user_alias,user_alias.name).all():

print row.user_alias Query的 基本操作包括LIMIT和OFFSET,使用Python数组切片和ORDERBY结合可以让操作变得很方便。

for u in session.query(User).order_by(User.id)[1:3]:

#只查询第二条和第三条数据 9.1使用关键字变量过滤查询结果,filter 和 filter_by都适用。【2】使用很简单,下面列出几个常用的操作:

query.filter(User.name == 'ed') #equals query.filter(User.name != 'ed') #not equals query.filter(User.name.like('%ed%')) #LIKE uery.filter(User.name.in_(['ed','wendy', 'jack'])) #IN query.filter(User.name.in_(session.query(User.name).filter(User.name.like('%ed%'))#IN query.filter(~User.name.in_(['ed','wendy', 'jack']))#not IN query.filter(User.name == None)#is None query.filter(User.name != None)#not None from sqlalchemy import and_ query.filter(and_(User.name =='ed',User.fullname =='Ed Jones')) # and query.filter(User.name == 'ed',User.fullname =='Ed Jones') # and query.filter(User.name == 'ed').filter(User.fullname == 'Ed Jones')# and from sqlalchemy import or_ query.filter(or_(User.name =='ed', User.name =='wendy')) #or query.filter(User.name.match('wendy')) #match 9.2.返回列表和数量(标量?)

all()返回一个列表:可以进行Python列表的操作。

query = session.query(User).filter(User.name.like('%ed')).order_by(User.id) query.all()

[<User(name='ed',fullname='EdJones', password='f8s7ccs')>,<User(name='fred', fullname='FredFlinstone', password='blah')>]

first()适用于限制一个情况,返回查询到的第一个结果作为标量?:好像只能作为属性,类

query.first()

<User(name='ed',fullname='Ed Jones', password='f8s7ccs')> one()完全获取所有行,并且如果查询到的不只有一个对象或是有复合行,就会抛出异常。

from sqlalchemy.orm.exc import MultipleResultsFound user = query.one() try:

  user = query.one() except   MultipleResultsFound, e:   print e Multiple rows were found for one() 如果一行也没有:

from sqlalchemy.orm.exc import NoResultFound try:

  user = query.filter(User.id == 99).one() except NoResultFound, e:   print e No row was found for one() one()方法对于想要解决“no items found”和“multiple items found”是不同的系统是极好的。(这句有语病啊)例如web服务返回,本来是在no results found情况下返回”404“的,结果在多个results found情况下也会跑出一个应用异常。

scalar()作为one()方法的依据,并且在one()成功基础上返回行的第一列。

query = session.query(User.id).filter(User.name == 'ed') query.scalar()

7 9.3.使用字符串SQL

字符串能使Query更加灵活,通过text()构造指定字符串的使用,这种方法可以用在很多方法中,像filter()和order_by()。

from sqlalchemy import text for user in session.query(User).filter(text("id<224")).order_by(text("id")).all() 绑定参数可以指定字符串,用params()方法指定数值。

session.query(User).filter(text("id<:value and name=:name")).\

params(value=224, name='fred').order_by(User.id).one()

如果要用一个完整的SQL语句,可以使用from_statement()。

ession.query(User).from_statement(text("SELECT* FROM users where name=:name")).
params(name='ed').all() 也可以用from_statement()获取完整的”raw”,用字符名确定希望被查询的特定列:

session.query("id","name", "thenumber12").\

from_statement(text("SELECT id, name, 12 as ""thenumber12 FROM users where name=:name")).
 params(name='ed').all()

[(1,u'ed', 12)] 感觉这个不太符合ORM的思想啊。。。

9.4 计数

count()用来统计查询结果的数量。

session.query(User).filter(User.name.like('%ed')).count()

func.count()方法比count()更高级一点【3】

from sqlalchemy import func

session.query(func.count(User.name),User.name).group_by(User.name).all()

[(1,u'ed'), (1,u'fred'), (1,u'mary'), (1,u'wendy')] 为了实现简单计数SELECT count(*) FROM table,可以这么写:

session.query(func.count('*')).select_from(User).scalar()

如果我们明确表达计数是根据User表的主键的话,可以省略select_from(User):

session.query(func.count(User.id)).scalar()

code: ` #!/usr/bin/python #coding=utf-8

''' Created on 2017年7月22日

@author: qianfen

postgres测试使用ORM:sqlalchemy

postgres=# \d test Table "public.test" Column | Type | Modifiers
--------+-------------------+--------------------------------------------------- id | integer | not null default nextval('test_id_seq'::regclass) num | integer | data | character varying | Indexes: "test_pkey" PRIMARY KEY, btree (id)

''' import sqlalchemy as sqla import sqlalchemy.orm as sqlorm from sqlalchemy.ext.declarative import declarative_base as sqla_declarative_base

Base = sqla_declarative_base ()

import sys import psycopg2

Base = sqla_declarative_base () class Test ( Base ): tablename = 'test' id = sqla.Column ( 'id' , sqla.Integer , primary_key = True ) num = sqla.Column ( 'num' , sqla.Integer ) data = sqla.Column ( 'data' , sqla.String ) def toString(self): return "%d,%d,%s" % (self.id,self.num,self.data)

class PostgresAlchemy(object): def init(self): self.url = "postgresql+psycopg2://qianfen:postgres@localhost:5432/postgres" self.engine = sqla.create_engine ( self.url , echo = True ) Base.metadata.bind = self.engine Base.metadata.create_all () self.Session = sqlorm.scoped_session ( sqlorm.sessionmaker ( bind = self.engine ))

def insert(self,num,data):
    test  =   Test ( num = num, data=data ) 
    session  =   self.Session () 
    try : 
        session . add ( test ) 
        session . flush () 
        session . commit () 
    finally : 
        session . close () 
    
def selectAll(self):
    session = self.Session()
    try:
        for row in session.query(Test).order_by(Test.id):
            print row.toString()
    finally:
        session.close()

def selectById(self,id):
    session = self.Session()
    try:
        for row in session.query(Test).filter(Test.id==id).order_by(Test.id):
            print row.toString()
    finally:
        session.close()

if name == 'main': pg = PostgresAlchemy()

pg.insert(100, "orm test ok?")

pg.selectAll()
pg.selectById(10)
pg.selectById(16)

`