Python - 多线程

当我们写的程序需要并发时,我们就需要用到 Python 中的一些并发库,例如 asynciothreadmultiprocessing 等,本文主要介绍 Python 标准库中的多线程库 thread

threading 基本使用

使用多线程的优势在于

  • 程序运行更快
  • 适用于 IO 密集的场景

Python 标准库提供了两个模块,_threadthreadingthreading_thread 进行了封装,虽然 Python 有 GIL ,会在线程切换时消耗很多资源,但是在 IO 密集的场景下,Python 多线程还是很管用的

先看看 threading 的基本使用

import threading

def hello(*args, **kwargs): #定义一个 hello 函数
print('hello, world', args, kwargs)

实例化线程对象,target 参数为指定函数,args 是传递的列表参数,kwargs 是传递的字典参数,通过 start 方法启动一个线程

>>> t = threading.Thread(target=hello, args=[1, 2, 3], kwargs={'a': 'b'})

>>> t.start()
hello, world (1, 2, 3) {'a': 'b'}

threading 和 Thread 常用参数和方法

name 参数

导入 logging 库,更加直观的显示线程的信息

import logging
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s [%(threadName)s] %(message)s') #配置 logging

def hello():
logging.info('test')

>>> t = threading.Thread(target=hello, name='hello_thread') #指定线程名称
2017-03-12 22:30:58,556 INFO [hello_thread] test

daemon

线程退出的时候,该线程的所有daemon 的子线程都会退出,而no-daemon 子线程不会退出

而线程退出会等待所有的 no-daemon 子线程退出

join 方法

子线程的 join 方法会阻塞主线程,直到子线程退出或者超时,如果不设置超时时间,就会一直等待子线程退出

import threading
from time import sleep

def worker():
sleep(3)


threads = [threading.Thread(target=worker) for _ in range(3)]


def main():
for t in threads:
t.start()
for t in threads:
t.join()

执行 main 函数能够感觉到是一直阻塞着的,直到子线程退出

>>> main()

enumerate 方法

列出当前所有的线程

>>> threading.enumerate()
[<_MainThread(MainThread, started 140736493310912)>,
<HistorySavingThread(IPythonHistorySavingThread, started 123145445384192

local

线程共享内存、状态和资源,但是threading local 对象的属性,只对当前线程可见

>>> ctx = threading.local()

>>> ctx
<_thread._local at 0x10d6419e8>

>>> ctx.data = 'aaa'

def worker():
print(ctx.data)

>>> worker()
'aaa'

>>> threading.Thread(target=worker).start()

In [101]: Exception in thread Thread-2477:
Traceback (most recent call last):
File "/usr/local/opt/pyenv/versions/3.5.3/Python.framework/Versions/3.5/lib/python3.5/threading.py", line 914, in _bootstrap_inner
self.run()
File "/usr/local/opt/pyenv/versions/3.5.3/Python.framework/Versions/3.5/lib/python3.5/threading.py", line 862, in run
self._target(*self._args, **self._kwargs)
File "<ipython-input-96-ffa8a91f04dc>", line 2, in worker
print(ctx.data)
AttributeError: '_thread._local' object has no attribute 'data'

实例化 Thread 类

之前通过 target 参数的方式不是非常的优雅,其实可以通过继承 Thread 类并重写 run 方法来编写更加优雅的代码

class MyThread(threading.Thread):
def run(self):
print('hello, world')


>>> MyThread()
<MyThread(Thread-1, initial)>

>>> MyThread().start()
hello, world

传递参数

通过重写 __init__() 方法传递参数

class MyThread(threading.Thread):
def __init__(self, *args, **kwargs):
super().__init__()
self.args = args
self.kwargs = kwargs

def run(self):
print('hello, world', self.args, self.kwargs)


>>> t = MyThread(1, 2, 3, state='ok')
>>> t.start()
hello, world (1, 2, 3) {'state': 'ok'}

线程同步

在使用多个线程同时操作一个资源的情况下( 例如读文件) ,我们需要控制同一时刻只有一个线程对资源进行操作,这时候就需要一些同步机制,如 锁、队列、条件、事件等

Lock

我们可以通过 threading.Lock 来解决这个问题

Lock 对象一般有两个操作,获取 acquire 和 释放 release

通过 acquire 方法 将 Lock 对象状态设置为锁定,如果是锁定状态则会阻塞,release 方法则将 Lock 对象解锁

import threading

lock = threading.Lock()

>>> lock.acquire()
True

>>> lock.acquire(timeout=3)
False

>>> lock.release()

>>> lock.acquire(timeout=3)
True

一个抓取页面的例子,通过使用锁,实现了线程之间的同步

import requests
import threading
import logging

logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s [%(threadName)s] %(message)s')
lock = threading.Lock()
file = open('data', 'a')
urls = ['http://baidu.com', 'http://cn.bing.com/']


class FetchUrls(threading.Thread):
def __init__(self, url: str, file, lock: threading.Lock, name=None):
super().__init__()
self.url = url
self.file = file
self.lock = lock
if name is not None:
self.name = name

def run(self):
res = requests.get(self.url)
self.lock.acquire()
logging.info('Lock Acquire')
self.file.write(res.text)
logging.info('File Writed')
self.lock.release()

测试

>>> ts = [FetchUrls(url, file, lock, name=url) for url in urls]

>>> [t.start() for t in ts]
2017-03-13 14:00:05,142 INFO [http://baidu.com] Lock Acquire
2017-03-13 14:00:05,142 INFO [http://baidu.com] File Writed
2017-03-13 14:00:05,271 INFO [http://cn.bing.com/] Lock Acquire
2017-03-13 14:00:05,272 INFO [http://cn.bing.com/] File Writed

RLock

RLock 是一个可重用锁,可以多次调用 acquire 而不阻塞,但是 release 时也要执行和 acquire 一样的次数

import threading

>>> rlock = threading.RLock()

>>> rlock.acquire()
True

>>> rlock.acquire()
True

>>> rlock.acquire()
True

>>> rlock.release()

>>> rlock.release()

>>> rlock.release()

>>> rlock.release()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
RuntimeError: cannot release un-acquired lock

Condition

如果多个线程使用 生产者 —> 消费者的模式,可以使用 Condition,生产者生产数据后,通过 notify/notify_all 通知给消费者消费数据

import threading
import random
import logging
import time

logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s [%(threadName)s] %(message)s')


class Producer(threading.Thread):
def __init__(self, datas: list, cond: threading.Condition, name=None):
super().__init__()
self.cond = cond
self.datas = datas
if name is not None:
self.name = name

def run(self):
while True:
data = random.randint(1, 100)
logging.info(data)
self.datas.append(data)
self.cond.acquire()
self.cond.notify()
self.cond.release()
time.sleep(1)
"""
self.cond.acquire()
self.cond.notify()
self.cond.release()
等价于
with self.cond:
self.notify()

无论 notify 还是 wait 都需要先 acquire,然后再 release
一般使用 with 语句
"""



class Consumer(threading.Thread):
def __init__(self, datas: list, cond: threading.Condition, name=None):
super().__init__()
self.cond = cond
self.datas = datas
if name is not None:
self.name = name

def run(self):
while True:
self.cond.acquire()
while True:
data = self.datas.pop()
logging.info(data)
break
self.cond.wait() #消费者通过 wait 方法等待 生产者 notify
self.cond.release()


def runner():
datas = []
cond = threading.Condition()
t1 = Producer(datas, cond, name='producer')
t2 = Consumer(datas, cond, name='consumer')
t1.start()
t2.start()
t1.join()
t2.join()

测试

>>> runner()
2017-03-13 14:56:12,442 INFO [producer] 89
2017-03-13 14:56:12,442 INFO [consumer] 89
2017-03-13 14:56:13,445 INFO [producer] 85
2017-03-13 14:56:13,445 INFO [consumer] 85
2017-03-13 14:56:14,450 INFO [producer] 57
2017-03-13 14:56:14,450 INFO [consumer] 57
2017-03-13 14:56:15,454 INFO [producer] 65
2017-03-13 14:56:15,454 INFO [consumer] 65
2017-03-13 14:56:16,458 INFO [producer] 15
2017-03-13 14:56:16,459 INFO [consumer] 15

Event

Event 是一个简单的机制,线程发出一个信号,其他线程等待

import threading
import logging
import time
import random

logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s [%(threadName)s] %(message)s')


class EventProducer(threading.Thread):
def __init__(self, datas: list, event: threading.Event, name=None):
super().__init__()
self.datas = datas
self.event = event
if name is not None:
self.name = name

def run(self):
while True:
data = random.randint(1, 100)
logging.info(data)
self.datas.append(data)
self.event.set()
time.sleep(1)


class EventConsumer(threading.Thread):
def __init__(self, datas: list, event: threading.Event, name=None):
super().__init__()
self.datas = datas
self.event = event
if name is not None:
self.name = name

def run(self):
while True:
self.event.wait() # wait 方法阻塞 消费者线程
try:
data = self.datas.pop()
logging.info(data)
except IndexError:
continue


def runner():
event = threading.Event()
datas = []
t1 = EventProducer(datas, event, name='EventProducer')
t2 = EventConsumer(datas, event, name='EventConsumer')
t1.start()
t2.start()
t1.join()
t2.join()

测试

>>> runner()
2017-03-13 16:18:54,251 INFO [EventProducer] 82
2017-03-13 16:18:54,251 INFO [EventConsumer] 82
2017-03-13 16:18:55,261 INFO [EventProducer] 37
2017-03-13 16:18:55,261 INFO [EventConsumer] 37
2017-03-13 16:18:56,270 INFO [EventProducer] 92
2017-03-13 16:18:56,271 INFO [EventConsumer] 92

Queue

之前的几个 提供者 —> 消费者 的例子 一直用一个全局的列表来传递数据,其实不是很科学,不同线程传递数据应该使用 Queue ,因为 Queue 本身也可以阻塞线程,使用 Queue 还可以省去同步

import queue
import threading
import logging
import random
from time import sleep

logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s [%(threadName)s] %(message)s')


class QueueProducer(threading.Thread):
def __init__(self, queue: queue.Queue(), name=None):
super().__init__()
self.queue = queue
if name is not None:
self.name = name

def run(self):
while True:
data = random.randint(1, 100)
logging.info(data)
self.queue.put(data)
sleep(1)


class QueueConsumer(threading.Thread):
def __init__(self, queue: queue.Queue, name=None):
super().__init__()
self.queue = queue
if name is not None:
self.name = name

def run(self):
while True:
data = self.queue.get()
logging.info(data)


def runner():
q = queue.Queue()
t1 = QueueProducer(q, name='QueueProducer')
t2 = QueueConsumer(q, name='QueueConsumer')
t1.start()
t2.start()
t1.join()
t2.join()

测试

>>> runner()
2017-03-13 16:34:49,401 INFO [QueueProducer] 82
2017-03-13 16:34:49,401 INFO [QueueConsumer] 82
2017-03-13 16:34:50,405 INFO [QueueProducer] 2
2017-03-13 16:34:50,405 INFO [QueueConsumer] 2
2017-03-13 16:34:51,406 INFO [QueueProducer] 74

GIL

提到 Python 多线程就一定要说说 GIL Global Interpreter Lock 全局解释器锁,由于 GIL 的存在,Python 的线程不能达到真正的并行,在 CPython (C语言实现的 Python) 中 线程使用的是操作系统原生的线程

CPython 中,一个解释器有一条主线程,和若干条用户程序的线程,由于 GIL 的存在,每一个进程要执行时,都要去获取 GIL ,所以并不能有效的利用多核 CPU 实现多线程并行,也就是说,多个线程不能够同时执行

如果要实现真正的并行,就需要使用 multiprocessing 这个多进程模块了

参考资料

Python Threads synchronization

GIL Wikipedia