from sqlalchemy
import create_engine
from sqlalchemy
.sql
.expression
import text
'''
from sqlalchemy.sql.expression import (
select, insert, delete, udpate,
table, column, func,
asc, desc, between, distinct, cast,
case, literal_column, bindparam,
and_, or_, not_, all_, any_
)
'''
from sqlalchemy
.sql
import expression
as sse
from sqlalchemy
import MetaData
, Table
from sqlalchemy
import Integer
, Float
, Text
, String
, TIMESTAMP
, DateTime
from sqlalchemy
import Column
, ForeignKey
uri
= 'sqlite:///:memory:'
engine
= create_engine
(uri
)
meta
= MetaData
(bind
=engine
)
db_type
= uri
.split
(':')[0].split
('+')[0]
if db_type
== 'mysql':
from sqlalchemy
.dialects
.mysql
.pymysql
import dialect
dialect_obj
= dialect
()
elif db_type
== 'sqlite':
from sqlalchemy
.dialects
.sqlite
import dialect
dialect_obj
= dialect
()
else:
dialect_obj
= engine
.dialect
def SS(*args
, **kwargs
):
global dialect_obj
kwargs
['dialect_obj'] = dialect_obj
print(structure_sql
(*args
, **kwargs
))
创建表
tb_user
= Table
(
'users',
meta
,
Column
('id', Integer
, autoincrement
=True, primary_key
=True, comment
='主键ID'),
Column
('username', String
(20), nullable
=False, unique
=True, comment
='用户名'),
Column
('password', String
(32), nullable
=False, comment
='密码'),
Column
('age', Integer
, default
=18, comment
='年龄'),
extend_existing
=True,
comment
='用户表'
)
tb_trade
= Table
(
'trades',
meta
,
Column
('id', Integer
, autoincrement
=True, primary_key
=True, comment
='主键ID'),
Column
('user_id', Integer
, ForeignKey
('users.id'), nullable
=False, comment
='用户ID'),
Column
('amount', Float
(2), default
=0.0, comment
='交易金额'),
Column
('trade_time', TIMESTAMP
, server_default
=text
('CURRENT_TIMESTAMP'), comment
='交易时间'),
extend_existing
=True,
comment
='交易表'
)
meta
.create_all
(tables
=meta
.sorted_tables
)
填充数据
ins
= tb_user
.insert
()
data
= [
{'username': 'swartz', 'password': 'abcdefg', 'age': 26},
{'username': 'gates', 'password': '123456', 'age': 50},
{'username': 'linus', 'password': '123454321', 'age': 50},
{'username': 'bill', 'password': 'abcdefg', 'age': 36},
{'username': 'python', 'password': '666ge666', 'age': 37}
]
with engine
.connect
() as conn
:
rp_data
= conn
.execute
(ins
, data
)
one
= {
'username': 'ruirui',
'password' : '123456'
}
rp_one
= conn
.execute
(ins
, **one
)
print('添加条数:', rp_data
.rowcount
+ rp_one
.rowcount
)
sel_uids
= tb_user
.select
().with_only_columns
([tb_user
.columns
.id])
sel_uids
= sel_uids
.order_by
(tb_user
.columns
.id.asc
())
rp
= conn
.execute
(sel_uids
)
uids
= rp
.fetchall
()
uids
= [row
[0] for row
in uids
]
rp
.close
()
print(uids
)
ins_tone
= tb_trade
.insert
()
tone
= {
'user_id': 1,
'amount': 123.456
}
rp_tone
= conn
.execute
(ins_tone
, **tone
)
print('添加', '成功' if rp_tone
.rowcount
> 0 else '失败')
sel_t
= tb_trade
.select
()
rp_t
= conn
.execute
(sel_t
)
t_data
= rp_t
.fetchall
()
rp_t
.close
()
print(t_data
)
操作
添加、查询数据练习
class User(Base
):
__tablename__
= 'users'
id = Column
(Integer
, primary_key
=True)
name
= Column
('username', String
(20), nullable
=False)
age
= Column
(Integer
, default
=18)
Base
.metadata
.create_all
(engine
)
obj
= User
()
obj
.name
= 'ruirui'
session
.add
(obj
)
session
.flush
()
users
= []
for username
in 'zhangsan,lisi,wangwu,zhaoliu'.split
(','):
u_obj
= User
()
u_obj
.name
= username
users
.append
(u_obj
)
session
.add_all
(users
)
session
.commit
()
q
= session
.query
(User
)
print(q
.count
())
from sqlalchemy
import func
total
= session
.query
(func
.count
(User
.id).label
('total')).first
().total
total
= session
.query
(func
.count
(User
.id)).scalar
()
u_f
= q
.first
()
print('是否是User?:', isinstance(u_f
, User
))
u_a
= q
.all()
u_id_3
= q
.get
(3)
销毁引擎
engine
.dispose
()
转载于:https://www.cnblogs.com/cp9648/p/10415824.html
转载请注明原文地址: https://mac.8miu.com/read-12023.html