python3.6 使用 sqlalchemy 读取 mysql 中的数据,进行多进程并发处理
2017-08-06
1. 介绍 SQLALChemy
SQLALChemy 是一个 python 的 ORM(Object Relational Mapper) 框架,开发人员可以快速开发操作数据库的程序, 它提供完整的数据库访问层,提供高性能的数据库访问能力。 它支持 SQLite、MySQL、Postgres、Oracle 等常用的数据库访问
2. 安装 SQLAlChemy
pip install sqlalchemy
2.1 创建测试数据库
# 建立数据库
CREATE DATABASE `test` /*!40100 DEFAULT CHARACTER SET utf8mb4 */;
2.2 用 SQLALChemy 创建数据库表
2.2.1 程序关键点
- 创建操作数据库的 engine,使用 pymysql 库访问 mysql 数据库
- 创建操作数据库的 session,绑定到 engine 上
- 从 Base 继承定义 User,Article 类,对应 mapping 到数据库的 member,article 表
- 使用 session.create_all 创建数据库表结构
- session.add_all 新增数据到数据库
- session.commit 提交所有变更到数据库,此时可以再数据库中查询插入的数据
- 查询数据使用 session.query 方法,也可以在后面连接使用 filter 进行条件过滤
#!/usr/bin/env python
# coding: utf-8
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
# 创建数据库 engine 使用 utf8 编码
eng = create_engine('mysql+pymysql://root:1@localhost:3306/test?charset=utf8')
Base = declarative_base()
# 创建 session 类,绑定engine
Session = sessionmaker(bind=eng)
session = Session()
class User(Base):
'''
用户类,对应数据库的 member 表
'''
__tablename__ = 'member'
# 定义表字段
mid = Column(Integer, primary_key=True)
nickname = Column(String(50))
email = Column(String(128))
def __repl__(self):
return '<User(name={}, email={}, nickname{}>'.format(mid,
email,
nickname)
class Article(Base):
'''
文章类,对应数据库中的 article 表
'''
__tablename__ = 'article'
# 定义表字段
arid = Column(Integer, primary_key=True)
tags = Column(String(128))
description = Column(String(256))
title = Column(String(256))
def create_table():
'''
创建数据库表结构,导入初始数据
'''
# 创建表
Base.metadata.create_all(eng)
# 插入数据
session.add_all([
User(mid=1, nickname='测试数据 test hello', email='test@gmail.com'),
User(mid=2, nickname='测试数据 china hello', email='test@gmail.com'),
User(mid=3, nickname='测试数据 上海 hello', email='test@gmail.com'),
User(mid=4, nickname='测试数据 北京 hello', email='test@gmail.com'),
User(mid=5, nickname='测试数据 上海 hello', email='test@gmail.com'),
User(mid=6, nickname='测试数据 山东 hello', email='test@gmail.com'),
User(mid=7, nickname='测试数据 武夷山 hello', email='test@gmail.com'),
User(mid=8, nickname='测试数据 黄山 hello', email='test@gmail.com'),
Article(arid=1, tags='测试数据 test hello', title='销售额度', description='测试 test ok'),
Article(arid=2, tags='测试数据 china hello', title='成功转型', description='测试 test ok'),
Article(arid=3, tags='测试数据 上海 hello', title='蓝蓝的天上白云飘', description='测试 test ok'),
Article(arid=4, tags='测试数据 背景 hello', title='在水一方', description='测试 test ok'),
Article(arid=5, tags='测试数据 上海 hello', title='晴天,阴天,雨天,大风天', description='测试 test ok'),
Article(arid=6, tags='测试数据 山东 hello', title='每年365天,每天24小时', description='测试 test ok'),
Article(arid=7, tags='测试数据 武夷山 hello', title='高效工作的秘密', description='测试 test ok'),
Article(arid=8, tags='测试数据 黄山 hello', title='战狼2', description='测试 test ok'),
]
)
# 提交到数据库
session.commit()
def modify_data():
'''
测试修改数据
'''
# 查询用户表
users = session.query(User).all()
print(users)
# 查询文章表
# articles = session.query(Article).all()
articles = session.query(Article).filter(Article.arid==2)
print(articles)
# 修改文章表
articles[0].description = '程度,修改描述可以成功'
print(session.dirty)
# 提交到数据库
session.commit()
if __name__ == '__main__':
create_engine()
# modify_data()
3. 多进程搜索程序
3.1 程序关键点
- 创建 DbMgr 类,封装数据库访问操作
- 用 sqlalchemy 从数据库获取 member,article 表中的数据
使用 automap 自动反射出 member,article 表对应的类
- 创建 Searcher 类,提供进程调用函数,用来查询符合条件的结果,并且提供进程执行完的回调展示方法
- 创建 10 个进程的进程池
- 循环获取用户输入,创建 searcher 对象,多进程并发执行过滤
- 多进程调用使用 python multiprocessing
#!/usr/bin/env python
# coding: utf-8
import os
from sqlalchemy import create_engine, MetaData
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.automap import automap_base
from multiprocessing import Pool
class DbMgr():
'''
连接数据库,从数据库获取 用户表,文章表数据
'''
def __init__(self):
eng = create_engine('mysql+pymysql://root:1@localhost/test?charset=utf8')
# 使用 automap 自动反射出 member,article 表对应的类
meta = MetaData()
meta.reflect(eng, only=['member', 'article'])
Base = automap_base(metadata=meta)
Base.prepare()
self._Member = Base.classes.member
self._Article = Base.classes.article
# 获取操作数据库的 session
Session = sessionmaker(eng, autocommit=True)
self._ses = Session()
def get_data(self):
'''
查询用户表,文章表
'''
self._users = self._ses.query('"user"', self._Member.mid,
self._Member.email,
self._Member.nickname).all()
self._articles = self._ses.query(self._Article).all()
self._articles = [('ar', i.arid, i.title, i.tags, i.description)
for i in self._articles]
return list(self._users) + list(self._articles)
class Searcher():
'''
进城处理函数,查找符合条件的结果,找到后返回结果
'''
def __init__(self, keyword):
self._keyword = keyword
def run(self, data):
'''
查找字符串
'''
try:
if self._keyword in str(data):
return 'ret: ' + str(os.getpid()) + '->' + str(data)
else:
return None
except Exception as e:
return e
def callback(self, data):
'''
全部执行完后回调函数,展示结果
'''
try:
for i in data:
if i:
print('match: {}'.format(i))
except Exception as e:
print(e)
def main():
# 从数据库读取数据
mgr = DbMgr()
# 创建过滤进程池
pool = Pool(4)
# 创建搜索器
while True:
keyword = input('\n输入搜索词: ')
if keyword == 'q':
break
searcher = Searcher(keyword)
# 从数据库获取数据
data = mgr.get_data()
res = pool.map_async(searcher.run,
data,
10,
callback=searcher.callback)
# 等待所有进程执行完成
res.wait()
print('all done', res.successful())
if __name__ == '__main__':
main()
3.2 程序运行
(py36env) servadmin@debian:~/test # python mysql2es.py
输入搜索词: test
match: ret: 10013->('user', 1, 'test@gmail.com', '测试数据 test hello')
match: ret: 10013->('user', 2, 'test@gmail.com', '测试数据 china hello')
match: ret: 10013->('user', 3, 'test@gmail.com', '测试数据 上海 hello')
match: ret: 10013->('user', 4, 'test@gmail.com', '测试数据 背景 hello')
match: ret: 10013->('user', 5, 'test@gmail.com', '测试数据 上海 hello')
match: ret: 10013->('user', 6, 'test@gmail.com', '测试数据 山东 hello')
match: ret: 10013->('user', 7, 'test@gmail.com', '测试数据 武夷山 hello')
match: ret: 10013->('user', 8, 'test@gmail.com', '测试数据 黄山 hello')
match: ret: 10013->('ar', 1, '销售额度', '测试数据 test hello', '程度,修改描述可以成功')
match: ret: 10013->('ar', 3, '蓝蓝的天上白云飘', '测试数据 上海 hello', '测试 test ok')
match: ret: 10013->('ar', 4, '在水一方', '测试数据 背景 hello', '测 test ok')
match: ret: 10013->('ar', 5, '晴天,阴天,雨天,大风天', '测试数据 上海 hello', '测试 test ok')
match: ret: 10013->('ar', 6, '每年365天,每天24小时', '测试数据 山东 hello', '测试 test ok')
match: ret: 10013->('ar', 7, '高效工作的秘密', '测试数据 武夷山 hello', '测试 test ok')
match: ret: 10013->('ar', 8, '战狼2', '测试数据 黄山 hello', '测试 test ok')
all done True
输入搜索词: 转型
match: ret: 10013->('ar', 2, '成功转型', '测试数据 china hello', '程度,修改描述可以成功')
all done True
输入搜索词: test
match: ret: 10013->('user', 1, 'test@gmail.com', '测试数据 test hello')
match: ret: 10013->('user', 2, 'test@gmail.com', '测试数据 china hello')
match: ret: 10013->('user', 3, 'test@gmail.com', '测试数据 上海 hello')
match: ret: 10013->('user', 4, 'test@gmail.com', '测试数据 背景 hello')
match: ret: 10013->('user', 5, 'test@gmail.com', '测试数据 上海 hello')
match: ret: 10013->('user', 6, 'test@gmail.com', '测试数据 山东 hello')
match: ret: 10013->('user', 7, 'test@gmail.com', '测试数据 武夷山 hello')
match: ret: 10013->('user', 8, 'test@gmail.com', '测试数据 黄山 hello')
match: ret: 10013->('ar', 1, '销售额度', '测试数据 test hello', '程度,修改描述可以成功')
match: ret: 10013->('ar', 3, '蓝蓝的天上白云飘', '测试数据 上海 hello', '测试 test ok')
match: ret: 10013->('ar', 4, '在水一方', '测试数据 背景 hello', '测试 test ok')
match: ret: 10013->('ar', 5, '晴天,阴天,雨天,大风天', '测试数据 上海 hello', '测试 test ok')
match: ret: 10013->('ar', 6, '每年365天,每天24小时', '测试数据 山东 hello', '测试 test ok')
match: ret: 10013->('ar', 7, '高效工作的秘密', '测试数据 武夷山 hello', '测试 test ok')
match: ret: 10013->('ar', 8, '战狼2', '测试数据 黄山 hello', '测试 test ok')
all done True
输入搜索词: