您的当前位置:首页正文

SQLAlchemy学习笔记

来源:华佗小知识
家乡.jpg

SQLAlchemy学习笔记(版权所有,翻版必究)

一、实例化数据库链接session

  • 参数
    • DATABASE_URI(数据库地址):
    • convert_unicode(True):设置str、boolean类型数据的默认行为
    • echo:engine的log行为,默认False,可修改为True或者Debug,标准输出到控制台
    • pool_size:数据库连接池大小,默认None,为5个连接数
    • pool_recycle:数据库连接的自动回收时间
    • autocommit:自动提交
    • autoflush:刷新session
    • bind:绑定engine
  • 方式
  • 方式1:不绑定app
engine = create_engine(DATABASE_URI, convert_unicode=True, pool_size=50, pool_recycle=3600)
session = scoped_session(sessionmaker(autocommit=True, autoflush=False, bind=engine))
db.session = session()
  • 方式2:绑定flask app
# store.py
from flask.ext.sqlalchemy import SQLAlchemy
db = SQLAlchemy()
----------------------------------------------------------------
# app.py
def create_app(config_name=None):
        app = Flask(__name__)
        db.app = app
        db.init_app(app)
        return app
app = create_app()
----------------------------------------------------------------
# handler.py
from store import db
result =  db.session.query(ModelA).all()

二、Model定义

import db
所有model类继承db.Model
    • int
      • db.Column(db.Integer, primary_key=True)
    • string
      • db.Column(db.String(50), nullable=False, unique=True)
    • time
      • db.Column(db.TIMESTAMP, default=datetime.now(), nullable=False) # 不能插入timestamp=0的数据
      • db.Column(db.DateTime(), nullable=False, server_default = func.now())
    • boolean
      • db.Column(db.Boolean, nullable=True, default=True)
    • float
      • db.Column(db.Float, nullable=False)
    • 主键 primary = True/False
    • 是否为空 nullable = True/False
    • 是否唯一 unique = True/False
    • 默认值 default = 0
    • 外键 foreignkey
      • 级联删除 ondelete='CASCADE'
      # parentModel
      # 定义父级关联的子集model
      from sqlalchemy.orm import relationship  
      children = db.relationship("hostAssets", cascade="all, delete-orphan", passive_deletes=True)  
      
      # children Model
      # 定义外键字段和外键删除规则
      from sqlalchemy import ForeignKey  
      business_id = db.Column(db.Integer, db.ForeignKey('business.id', ondelete='CASCADE'), nullable=True)```  
     
    

三、Result序列化

  • model增加to_dict方法
# ModelA.py
model中绑定__table__的信息
__tablename__ = 'model_a'
def to_dict(self):
  return {c.name: getattr(self, c.name, None) for c in self.__table__.columns}

result = session.query().first()
result.to_dict()

四、

  • get 查询
  • __dict__
_sa_instance:为内置的属性
  • query() 查询
    • all()
      ret = session.query(User).all() 得到的结果是model实例对象的列表,如果查询不到,返回空list,可以用ret.column获取指定字段的value

    • first()
      ret = session.query(User).first() 得到的结果是model实例对象,可以用ret.column获取指定字段的value,等同于one_or_none(),如果查询不到,返回None

    • scalar()
      ret = session.query(User.name).scalar() 得到的是name字段的value

    • get(id)
      ret = session.query(User).get(5) 得到的结果是model实例对象,可以用ret.column获取指定字段的value,get的内容必须是主键

  • update
    参数:
    synchronize_session=False 不同步更新当前session
    synchronize_session= fetch 更新之前进行查询,获取最新的更新对象
db.session.query(hostAssets).filter(hostAssets.business_id == ids).update({"business_id": None},
                                                                            synchronize_session=False)  

  • merge(obiect)
net_mission = get_object_or_404(NetMission, data['id'])  #  get_object_or_404 可换为普通的query语句
net_mission.mission_name = data['mission_name']  
db.session.merge(net_mission)  

  • delete
    参数:
    synchronize_session=False 不同步更新当前session
    synchronize_session= fetch 更新之前进行查询,获取最新的更新对象
db.session.query(hostAssetsTrend).filter(hostAssetsTrend.tid == ids).delete(synchronize_session=False)  
PingResultHours.query.filter(PingResultHours.create_time < time_1).delete(synchronize_session='fetch')

  • delete(object)
net_mission = get_object_or_404(NetMission, data['id'])  #  get_object_or_404 可换为普通的query语句
db.session.delete(net_mission)  
  
  • add 添加
  • 方式一
_ = PingResult(source_ip=ret['source_ip'], order_ip=ret['order_ip'],
                              delay=round(float(ret['delay']), 2), loss=round(float(ret['loss']), 2),
                              create_time=datetime.now())
db.session.add(_)

  • 方式二
db.session.execute(PingResult.__table__.insert(),[{"business_name":"A"},{"business_name":"B"}])

  • commit
  • filter
    from sqlalchemy import and\_, or\_, desc
  • and_
hosts = model.query.filter(and_(model.source_ip == data['source_ip'],
                                   model.order_ip == data['order_ip'],
                                   model.create_time >= start_time)).order_by('create_time')
  • or_
key = '%%%s%%'%key
cond = model.query.filter(or_(hardwareFault.host_sn.like(key), hardwareFault.host_name.like(key))).all()
  • between
User.query.filter(User.id.between(1, 10)).all()
  • like
key = '%%%s%%'%key
cond = model.query.filter(or_(hardwareFault.host_sn.like(key), hardwareFault.host_name.like(key))).all()
  • order_by, desc, asc
latest_data = hosts.order_by(desc('create_time')).limit(1).first()  
latest_data = hosts.order_by(hosts.create_time.desc()).limit(1).first()
  • in_
hardwareFault.query.filter(hardwareFault.host_sn.in_([1,2,3,4,5,6])).all()
  • rollback
    • db.session.rollback()
  • close
    • db.session.close()
  • fllush
    • db.session.flush()

五、高级查询

  • label
    • 类似于sql中的as,可以对查询的字段或者运算的结果重命名
  • group_by
  • 可以group多个字段,即为要同时符合多个条件,结果可以用(result.字段名)获取
    result = db.session.query(PingResultHours.source_ip, PingResultHours.order_ip, func.avg(
       PingResultHours.delay).label('delay'), func.avg(PingResultHours.loss).label('loss')).filter(
       PingResultHours.create_time > time_1, PingResultHours.create_time <= current_time).group_by(
       PingResultHours.source_ip, PingResultHours.order_ip).all() 
  • union,union_all
    相当于sql中的union,对相同字段的结果做拼接
  • join
records = db.session.query(faultRepairRecord).  
join(hostAssets, faultRepairRecord.host_sn == .order_by(faultRepairRecord.create_time.desc()).
order_by('id').offset(start).limit(result['page_size']).all()

  • limit
    限制返回的结果数量
  • skip
    要跳过的项数,SKIP 不能脱离 ORDER BY 子句单独使用
  • distinct
    对字段去重
records = db.session.query(
distinct(performanceTrend.create_time),performanceTrend.create_time,performanceTrend.cpu_ratio,
performanceTrend.cpu_max,performanceTrend.memory_ratio,performanceTrend.hdd_ratio).  
filter(and_(performanceTrend.create_time >= new_starttime, performanceTrend.create_time <= new_endtime)).all()

六、Sql执行方式

  • session.query
  • session.execute
    条件变量可以通过dict或者%s做替换
records = db.session.execute(
"select avg(type) as datatype,DATE_FORMAT(create_time,'%Y-%m-%d')as createTime,avg(cpu_ratio) as cpu_ratio,
max(cpu_max) as cpu_max,avg(memory_ratio) as memory_ratio, avg(hdd_ratio) as hdd_ratio 
from performance_trend where type=1 and 
(create_time > :stime  and create_time < :etime) group by createTime",{"stime":new_starttime,"etime":new_endtime}).fetchall()

七、內建func函数

  • time类
    • 对查询的time字段做格式化输出
      func.date_format(PingResult.create_time,'%Y-%m-%d').label('create_time'))
  • math类
  • func.sum()
  • func.avg()
  • func.count()
  • func.max()
  • func.min()