python3.6 - threading 多线程编程基础(1)
2017-07-24
1. 什么是进程,线程
进程(process)
- 进程是正在运行的程序的实例
- 操作系统利用进程把工作划分为一些功能单元
- 操作系统加载程序,以进程的方式在操作系统中运行它,并分配了系统资源给进程(内存等)
线程(thread)
- 进程中所包含的一个或多个执行单元称为线程(thread),线程是 CPU 调度和执行的基本单位
- 操作系统创建一个进程后,该进程会有一个主线程,主线程退出进程就会退出,并释放进程占用的资源
- 一个线程可以创建另一个线程,同一个进程中的多个线程之间可以并发执行
理解进程线程关系,公司与员工 <==> 进程与线程
- 公司本身是进程,它包含很多人,每个人是一个线程,还有资源打印机,会议室,资金等
- 真正执行工作的是线程(公司员工),公司的业务状态是靠人来执行的
- 这些人可以并行工作,处理事情
- 当大家访问公共资源的时候会有冲突,例如使用相同的会议室,打卡,这就需要队列排队,预约上锁
但是与现实模型不同的是
,这些人是有几个 cpu 控制的,例如 8 个 cpu 对应 80 个人,cpu 需要切换控制- 线程遇到阻塞的时候,cpu 就需要切换执行另一个线程,避免等待
2. python 多线程
- python 带有 threading 库,支持开发多线程程序
用对象实现多线程示例
import time
import threading
def main():
# 线程函数
def thr_func(data):
'''
循环打印,打印后暂停1秒
'''
for i in range(2):
print(data, i)
time.sleep(1)
# 创建 3 个线程
# 创建第一个线程
one_thr = threading.Thread(target=thr_func, args=['one thr'])
# 开始执行线程
one_thr.start()
# 创建执行第二个线程
two_thr = threading.Thread(target=thr_func, args=['two thr'])
two_thr.start()
# 创建执行第三个线程
three_thr = threading.Thread(target=thr_func, args=['three thr'])
three_thr.start()
# 调用 join 等待线程结束
one_thr.join()
two_thr.join()
three_thr.join()
# 创建 10 个线程
thrs = [threading.Thread(target=thr_func, args=['hello world thr: %d' % i]) for i in range(10)]
# 开始执行线程
[thr.start() for thr in thrs]
# 等待线程结束
[thr.join() for thr in thrs]
if __name__ == '__main__':
main()
运行程序,获取输出结果
# 运行
python test_multi.py
# 输出
one thr 0
two thr 0
three thr 0
hello world thr: 0 0
hello world thr: 1 0
hello world thr: 2 0
hello world thr: 3 0
hello world thr: 4 0
hello world thr: 5 0
hello world thr: 6 0
hello world thr: 7 0
hello world thr: 8 0
hello world thr: 9 0
In [54]: two thr 1
three thr 1
one thr 1
hello world thr: 0 1
hello world thr: 3 1
hello world thr: 4 1
hello world thr: 2 1
hello world thr: 5 1
hello world thr: 1 1
hello world thr: 6 1
hello world thr: 8 1
hello world thr: 9 1
hello world thr: 7 1
class MyThread(threading.Thread): def init(self, myname, count): self._myname = myname self._count = count super().init()
def run(self):
"""
实现线程函数,start 后被调用
"""
for i in range(self._count):
print(self._myname, ',', i)
time.sleep(0.5)
if name == ‘main’: thr = MyThread(‘hello’, 10) thr1 = MyThread(‘hello’, 10) thr.start() thr1.start() # join 等待线程退出 thr.join() thr1.join()
<br/>
# 3. 线程并发问题
多线程并发访问资源的时候,会有冲突,导致问题
## 用锁(lock)处理并发资源访问
`下面是有问题的示例代码`
```python
import time
import threading
import random
def concur_code(lock):
counter = 0
def thr_func():
'''
增加统计数值
'''
nonlocal counter
for i in range(10):
if lock:
# 使用加锁的代码,同一时刻只能一个线程获取锁,进入运行
with lock:
tmp_counter = counter
time.sleep(0.01)
tmp_counter += 1
time.sleep(0.01)
counter = tmp_counter
else:
# 不加锁的时候会这里会出现冲突,线程间赋值会重复
tmp_counter = counter
time.sleep(0.01)
tmp_counter += 1
counter = tmp_counter
# 创建 10 个线程
thrs = [threading.Thread(target=thr_func) for i in range(10)]
[thr.start() for thr in thrs]
[thr.join() for thr in thrs]
print('all count: %d' % counter)
if __name__ == '__main__':
# lock = threading.Lock()
lock = False
concur_code(lock)
运行结果错误,本来应该10*10=100,但结果只有10
# 运行结果错误,本来应该10*10=100,但结果只有10
python test_multi.py
all count: 10
用锁处理资源竞争
修改代码,使用线程锁
# 修改添加锁
if __name__ == '__main__':
lock = threading.Lock()
concur_code(lock)
运行修改后代码
# 运行正常
python test_multi.py
all count: 100
4. 用队列在线程间传递数据,通过队列在线程间分发处理事务
- Queue 是一个 FIFO 先进先出队列,可以放置任意类型的 python 对象
- 队列对象自动获取线程锁,释放锁,是线程安全的
- 任意时刻只有一个线程可以修改队列
- 可以用来在做线程间传递数据
下面是多个生产者,发送信息到 queue,多个消费者从 queue 获取信息使用的示例
import time
import threading
import queue
import random
import itertools
def use_queue():
# 数据队列
dataQueue = queue.Queue()
lock = threading.Lock()
# 消费者数量
num_consumers = 2
# 生产者数量
num_producers = 4
# 生产者产生的消息数量
num_msges = 4
counter = 0
# 生产者函数
def producer(id_prod):
for i in range(num_msges):
time.sleep(id_prod)
dataQueue.put('producer id={}, count={}'.format(id_prod, i))
# 消费者函数
def consumer(id_consumer):
while True:
time.sleep(0.1)
try:
data = dataQueue.get(block=False)
except queue.Empty:
if counter == num_producers * num_msges:
return
else:
pass
else:
with lock:
nonlocal counter
counter += 1
print('consumer {}, total={}, got => {}'
.format(id_consumer, counter, data))
# 创建消费线程
thr_consumers = [threading.Thread(target=consumer, args=(i,))
for i in range(num_consumers)]
# 创建生产线程
thr_producers = [threading.Thread(target=producer, args=(i,))
for i in range(num_producers)]
# 开始执行线程
[thr.start() for thr in itertools.chain(thr_consumers, thr_producers)]
# 等待线程执行结束
[thr.join() for thr in itertools.chain(thr_consumers, thr_producers)]
if __name__ == '__main__':
use_queue()