python3.6 - threading 多线程编程基础(1)

󰃭 2017-07-24

1. 什么是进程,线程

进程(process)

  • 进程是正在运行的程序的实例
  • 操作系统利用进程把工作划分为一些功能单元
  • 操作系统加载程序,以进程的方式在操作系统中运行它,并分配了系统资源给进程(内存等)

线程(thread)

  • 进程中所包含的一个或多个执行单元称为线程(thread),线程是 CPU 调度和执行的基本单位
  • 操作系统创建一个进程后,该进程会有一个主线程,主线程退出进程就会退出,并释放进程占用的资源
  • 一个线程可以创建另一个线程,同一个进程中的多个线程之间可以并发执行

理解进程线程关系,公司与员工 <==> 进程与线程

  • 公司本身是进程,它包含很多人,每个人是一个线程,还有资源打印机,会议室,资金等
  • 真正执行工作的是线程(公司员工),公司的业务状态是靠人来执行的
  • 这些人可以并行工作,处理事情
  • 当大家访问公共资源的时候会有冲突,例如使用相同的会议室,打卡,这就需要队列排队,预约上锁
  • 但是与现实模型不同的是,这些人是有几个 cpu 控制的,例如 8 个 cpu 对应 80 个人,cpu 需要切换控制
  • 线程遇到阻塞的时候,cpu 就需要切换执行另一个线程,避免等待

2. python 多线程

  • python 带有 threading 库,支持开发多线程程序

用对象实现多线程示例

import time
import threading

def main():
    # 线程函数
    def thr_func(data):
        '''
        循环打印,打印后暂停1秒
        '''
        for i in range(2):
            print(data, i)
            time.sleep(1)


    # 创建 3 个线程
    # 创建第一个线程
    one_thr = threading.Thread(target=thr_func, args=['one thr'])
    # 开始执行线程
    one_thr.start()
    
    # 创建执行第二个线程
    two_thr = threading.Thread(target=thr_func, args=['two thr'])
    two_thr.start()
    
    # 创建执行第三个线程
    three_thr = threading.Thread(target=thr_func, args=['three thr'])
    three_thr.start()

	# 调用 join 等待线程结束
    one_thr.join()
    two_thr.join()
    three_thr.join()

    # 创建 10 个线程
    thrs = [threading.Thread(target=thr_func, args=['hello world thr: %d' % i]) for i in range(10)]
    # 开始执行线程
    [thr.start() for thr in thrs]
    # 等待线程结束
    [thr.join() for thr in thrs]


if __name__ == '__main__':
    main()

运行程序,获取输出结果

# 运行
python test_multi.py

# 输出
one thr 0
two thr 0
three thr 0
hello world thr: 0 0
hello world thr: 1 0
hello world thr: 2 0
hello world thr: 3 0
hello world thr: 4 0
hello world thr: 5 0
hello world thr: 6 0
hello world thr: 7 0
hello world thr: 8 0
hello world thr: 9 0

In [54]: two thr 1
three thr 1
one thr 1
hello world thr: 0 1
hello world thr: 3 1
hello world thr: 4 1
hello world thr: 2 1
hello world thr: 5 1
hello world thr: 1 1
hello world thr: 6 1
hello world thr: 8 1
hello world thr: 9 1
hello world thr: 7 1

class MyThread(threading.Thread): def init(self, myname, count): self._myname = myname self._count = count super().init()

def run(self):
"""
实现线程函数,start 后被调用
"""
    for i in range(self._count):
        print(self._myname, ',', i)
        time.sleep(0.5)

if name == ‘main’: thr = MyThread(‘hello’, 10) thr1 = MyThread(‘hello’, 10) thr.start() thr1.start() # join 等待线程退出 thr.join() thr1.join()


<br/>

# 3. 线程并发问题
多线程并发访问资源的时候,会有冲突,导致问题

## 用锁(lock)处理并发资源访问
`下面是有问题的示例代码`
```python
import time
import threading
import random


def concur_code(lock):
    counter = 0
    def thr_func():
        '''
        增加统计数值
        '''
        nonlocal counter
        for i in range(10):
            if lock:
                # 使用加锁的代码,同一时刻只能一个线程获取锁,进入运行
                with lock:
                    tmp_counter = counter
                    time.sleep(0.01)
                    tmp_counter += 1
                    time.sleep(0.01)
                    counter = tmp_counter
            else:
                # 不加锁的时候会这里会出现冲突,线程间赋值会重复
                tmp_counter = counter
                time.sleep(0.01)
                tmp_counter += 1
                counter = tmp_counter


    # 创建 10 个线程
    thrs = [threading.Thread(target=thr_func) for i in range(10)]
    [thr.start() for thr in thrs]
    [thr.join() for thr in thrs]
    print('all count: %d' % counter)


if __name__ == '__main__':
    # lock = threading.Lock()
    lock = False
    concur_code(lock)

运行结果错误,本来应该10*10=100,但结果只有10

# 运行结果错误,本来应该10*10=100,但结果只有10
python test_multi.py

all count: 10

用锁处理资源竞争

修改代码,使用线程锁

# 修改添加锁
if __name__ == '__main__':
    lock = threading.Lock()
    concur_code(lock)

运行修改后代码

# 运行正常
python test_multi.py

all count: 100

4. 用队列在线程间传递数据,通过队列在线程间分发处理事务

  • Queue 是一个 FIFO 先进先出队列,可以放置任意类型的 python 对象
  • 队列对象自动获取线程锁,释放锁,是线程安全的
  • 任意时刻只有一个线程可以修改队列
  • 可以用来在做线程间传递数据

下面是多个生产者,发送信息到 queue,多个消费者从 queue 获取信息使用的示例

import time
import threading
import queue
import random
import itertools


def use_queue():
    # 数据队列
    dataQueue = queue.Queue()
    lock = threading.Lock()

    # 消费者数量
    num_consumers = 2
    # 生产者数量
    num_producers = 4
    # 生产者产生的消息数量  
    num_msges = 4
    counter = 0

    # 生产者函数
    def producer(id_prod):
        for i in range(num_msges):
            time.sleep(id_prod)
            dataQueue.put('producer id={}, count={}'.format(id_prod, i))

    # 消费者函数
    def consumer(id_consumer):
        while True:
            time.sleep(0.1)
            try:
                data = dataQueue.get(block=False)
            except queue.Empty:
                if counter == num_producers * num_msges:
                    return
                else:
                    pass
            else:
                with lock:
                    nonlocal counter
                    counter += 1
                    print('consumer {}, total={}, got => {}'
                            .format(id_consumer, counter, data))

    # 创建消费线程
    thr_consumers = [threading.Thread(target=consumer, args=(i,))
            for i in range(num_consumers)]

    # 创建生产线程
    thr_producers = [threading.Thread(target=producer, args=(i,))
            for i in range(num_producers)]

    # 开始执行线程
    [thr.start() for thr in itertools.chain(thr_consumers, thr_producers)]
    # 等待线程执行结束
    [thr.join() for thr in itertools.chain(thr_consumers, thr_producers)]


if __name__ == '__main__':
    use_queue()