python3.6 使用 sqlalchemy 读取 mysql 中的数据,进行多进程并发处理

󰃭 2017-08-06

1. 介绍 SQLALChemy

SQLALChemy 是一个 python 的 ORM(Object Relational Mapper) 框架,开发人员可以快速开发操作数据库的程序, 它提供完整的数据库访问层,提供高性能的数据库访问能力。 它支持 SQLite、MySQL、Postgres、Oracle 等常用的数据库访问

2. 安装 SQLAlChemy

pip install sqlalchemy

2.1 创建测试数据库

# 建立数据库
CREATE DATABASE `test` /*!40100 DEFAULT CHARACTER SET utf8mb4 */;

2.2 用 SQLALChemy 创建数据库表

2.2.1 程序关键点

  • 创建操作数据库的 engine,使用 pymysql 库访问 mysql 数据库
  • 创建操作数据库的 session,绑定到 engine 上
  • 从 Base 继承定义 User,Article 类,对应 mapping 到数据库的 member,article 表
  • 使用 session.create_all 创建数据库表结构
  • session.add_all 新增数据到数据库
  • session.commit 提交所有变更到数据库,此时可以再数据库中查询插入的数据
  • 查询数据使用 session.query 方法,也可以在后面连接使用 filter 进行条件过滤
#!/usr/bin/env python
# coding: utf-8


from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker


# 创建数据库 engine 使用 utf8 编码
eng = create_engine('mysql+pymysql://root:1@localhost:3306/test?charset=utf8')
Base = declarative_base()

# 创建 session 类,绑定engine
Session = sessionmaker(bind=eng)
session = Session()


class User(Base):
    '''
    用户类,对应数据库的 member 表
    '''
    __tablename__ = 'member'

    # 定义表字段
    mid = Column(Integer, primary_key=True)
    nickname = Column(String(50))
    email = Column(String(128))

    def __repl__(self):
        return '<User(name={}, email={}, nickname{}>'.format(mid,
                                                             email,
                                                             nickname)


class Article(Base):
    '''
    文章类,对应数据库中的 article 表
    '''
    __tablename__ = 'article'

    # 定义表字段
    arid = Column(Integer, primary_key=True)
    tags = Column(String(128))
    description = Column(String(256))
    title = Column(String(256))


def create_table():
    '''
    创建数据库表结构,导入初始数据
    '''
    # 创建表
    Base.metadata.create_all(eng)

    # 插入数据
    session.add_all([
        User(mid=1, nickname='测试数据 test hello', email='test@gmail.com'),
        User(mid=2, nickname='测试数据 china hello', email='test@gmail.com'),
        User(mid=3, nickname='测试数据 上海 hello', email='test@gmail.com'),
        User(mid=4, nickname='测试数据 北京 hello', email='test@gmail.com'),
        User(mid=5, nickname='测试数据 上海 hello', email='test@gmail.com'),
        User(mid=6, nickname='测试数据 山东 hello', email='test@gmail.com'),
        User(mid=7, nickname='测试数据 武夷山 hello', email='test@gmail.com'),
        User(mid=8, nickname='测试数据 黄山 hello', email='test@gmail.com'),

        Article(arid=1, tags='测试数据 test hello', title='销售额度', description='测试 test ok'),
        Article(arid=2, tags='测试数据 china hello', title='成功转型', description='测试 test ok'),
        Article(arid=3, tags='测试数据 上海 hello', title='蓝蓝的天上白云飘', description='测试 test ok'),
        Article(arid=4, tags='测试数据 背景 hello', title='在水一方', description='测试 test ok'),
        Article(arid=5, tags='测试数据 上海 hello', title='晴天,阴天,雨天,大风天', description='测试 test ok'),
        Article(arid=6, tags='测试数据 山东 hello', title='每年365天,每天24小时', description='测试 test ok'),
        Article(arid=7, tags='测试数据 武夷山 hello', title='高效工作的秘密', description='测试 test ok'),
        Article(arid=8, tags='测试数据 黄山 hello', title='战狼2', description='测试 test ok'),
    ]
    )

    # 提交到数据库
    session.commit()


def modify_data():
    '''
    测试修改数据
    '''

    # 查询用户表
    users = session.query(User).all()
    print(users)

    # 查询文章表
    # articles = session.query(Article).all()
    articles = session.query(Article).filter(Article.arid==2)
    print(articles)

    # 修改文章表
    articles[0].description = '程度,修改描述可以成功'
    print(session.dirty)

    # 提交到数据库
    session.commit()


if __name__ == '__main__':
    create_engine()
    # modify_data()

3. 多进程搜索程序

3.1 程序关键点

  • 创建 DbMgr 类,封装数据库访问操作
  • 用 sqlalchemy 从数据库获取 member,article 表中的数据
  • 使用 automap 自动反射出 member,article 表对应的类
  • 创建 Searcher 类,提供进程调用函数,用来查询符合条件的结果,并且提供进程执行完的回调展示方法
  • 创建 10 个进程的进程池
  • 循环获取用户输入,创建 searcher 对象,多进程并发执行过滤
  • 多进程调用使用 python multiprocessing
#!/usr/bin/env python
# coding: utf-8


import os


from sqlalchemy import create_engine, MetaData
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.automap import automap_base
from multiprocessing import Pool


class DbMgr():
    '''
    连接数据库,从数据库获取 用户表,文章表数据
    '''

    def __init__(self):
        eng = create_engine('mysql+pymysql://root:1@localhost/test?charset=utf8')

        # 使用 automap 自动反射出 member,article 表对应的类
        meta = MetaData()
        meta.reflect(eng, only=['member', 'article'])
        Base = automap_base(metadata=meta)
        Base.prepare()

        self._Member = Base.classes.member
        self._Article = Base.classes.article

        # 获取操作数据库的 session
        Session = sessionmaker(eng, autocommit=True)
        self._ses = Session()

    def get_data(self):
        '''
        查询用户表,文章表
        '''
        self._users = self._ses.query('"user"', self._Member.mid,
                                      self._Member.email,
                                      self._Member.nickname).all()

        self._articles = self._ses.query(self._Article).all()
        self._articles = [('ar', i.arid, i.title, i.tags, i.description)
                          for i in self._articles]

        return list(self._users) + list(self._articles)


class Searcher():
    '''
    进城处理函数,查找符合条件的结果,找到后返回结果
    '''
    def __init__(self, keyword):
        self._keyword = keyword

    def run(self, data):
        '''
        查找字符串
        '''
        try:
            if self._keyword in str(data):
                return 'ret: ' + str(os.getpid()) + '->' + str(data)
            else:
                return None
        except Exception as e:
            return e

    def callback(self, data):
        '''
        全部执行完后回调函数,展示结果
        '''
        try:
            for i in data:
                if i:
                    print('match: {}'.format(i))
        except Exception as e:
            print(e)


def main():
    # 从数据库读取数据
    mgr = DbMgr()

    # 创建过滤进程池
    pool = Pool(4)

    # 创建搜索器
    while True:
        keyword = input('\n输入搜索词:  ')
        if keyword == 'q':
            break

        searcher = Searcher(keyword)

        # 从数据库获取数据
        data = mgr.get_data()
        res = pool.map_async(searcher.run,
                             data,
                             10,
                             callback=searcher.callback)

        # 等待所有进程执行完成
        res.wait()
        print('all done', res.successful())


if __name__ == '__main__':
    main()

3.2 程序运行

(py36env) servadmin@debian:~/test # python mysql2es.py

输入搜索词:  test
match: ret: 10013->('user', 1, 'test@gmail.com', '测试数据 test hello')
match: ret: 10013->('user', 2, 'test@gmail.com', '测试数据 china hello')
match: ret: 10013->('user', 3, 'test@gmail.com', '测试数据 上海 hello')
match: ret: 10013->('user', 4, 'test@gmail.com', '测试数据 背景 hello')
match: ret: 10013->('user', 5, 'test@gmail.com', '测试数据 上海 hello')
match: ret: 10013->('user', 6, 'test@gmail.com', '测试数据 山东 hello')
match: ret: 10013->('user', 7, 'test@gmail.com', '测试数据 武夷山 hello')
match: ret: 10013->('user', 8, 'test@gmail.com', '测试数据 黄山 hello')
match: ret: 10013->('ar', 1, '销售额度', '测试数据 test hello', '程度,修改描述可以成功')
match: ret: 10013->('ar', 3, '蓝蓝的天上白云飘', '测试数据 上海 hello', '测试 test ok')
match: ret: 10013->('ar', 4, '在水一方', '测试数据 背景 hello', '测 test ok')
match: ret: 10013->('ar', 5, '晴天,阴天,雨天,大风天', '测试数据 上海 hello', '测试 test ok')
match: ret: 10013->('ar', 6, '每年365天,每天24小时', '测试数据 山东 hello', '测试 test ok')
match: ret: 10013->('ar', 7, '高效工作的秘密', '测试数据 武夷山 hello', '测试 test ok')
match: ret: 10013->('ar', 8, '战狼2', '测试数据 黄山 hello', '测试 test ok')
all done True

输入搜索词:  转型
match: ret: 10013->('ar', 2, '成功转型', '测试数据 china hello', '程度,修改描述可以成功')
all done True

输入搜索词:  test
match: ret: 10013->('user', 1, 'test@gmail.com', '测试数据 test hello')
match: ret: 10013->('user', 2, 'test@gmail.com', '测试数据 china hello')
match: ret: 10013->('user', 3, 'test@gmail.com', '测试数据 上海 hello')
match: ret: 10013->('user', 4, 'test@gmail.com', '测试数据 背景 hello')
match: ret: 10013->('user', 5, 'test@gmail.com', '测试数据 上海 hello')
match: ret: 10013->('user', 6, 'test@gmail.com', '测试数据 山东 hello')
match: ret: 10013->('user', 7, 'test@gmail.com', '测试数据 武夷山 hello')
match: ret: 10013->('user', 8, 'test@gmail.com', '测试数据 黄山 hello')
match: ret: 10013->('ar', 1, '销售额度', '测试数据 test hello', '程度,修改描述可以成功')
match: ret: 10013->('ar', 3, '蓝蓝的天上白云飘', '测试数据 上海 hello', '测试 test ok')
match: ret: 10013->('ar', 4, '在水一方', '测试数据 背景 hello', '测试 test ok')
match: ret: 10013->('ar', 5, '晴天,阴天,雨天,大风天', '测试数据 上海 hello', '测试 test ok')
match: ret: 10013->('ar', 6, '每年365天,每天24小时', '测试数据 山东 hello', '测试 test ok')
match: ret: 10013->('ar', 7, '高效工作的秘密', '测试数据 武夷山 hello', '测试 test ok')
match: ret: 10013->('ar', 8, '战狼2', '测试数据 黄山 hello', '测试 test ok')
all done True

输入搜索词: