multithreaded programming - QLGQ/learning-python GitHub Wiki

Introduction

使用多线程编程和一个共享的数据结构如Queue,这种程序任务可以用几个功能单一的线程来组织。

  • UserRequestThread:负责读取客户的输入,可能是一个I/O信道。程序可能创建多个线程,每个客户一个,请求会被放入队列中。
  • RequestProcessor:一个负责从队列中获取并处理请求的线程,它为下面那种线程提供输出。
  • ReplyThread:负责把给用户的输出取出来,如果是网络应用程序就把结果发送出去,否则就保存在本地文件系统或数据库中。

Threads and Processes

真正的并行执行多任务只能在多核CPU上实现,但是,由于任务数量远远多于CPU的核心数量,所以,操作系统也会自动把多任务轮流调动到每个核心上执行。

线程是最小的执行单元,而进程由至少一个线程组成。如何调度进程和线程,完全由操作系统决定,程序自己不能决定什么时候执行,执行多长时间。

Processes

计算机程序只不过是磁盘中可执行的二进制(或其他类型)的数据。它们只有在被读取到内 存中,被操作系统调用的时候才开始它们的生命期。**进程(有时被称为重量级进程)是程序 的一次执行。**每个进程都有自己的地址空间、内存、数据栈及其他记录其运行轨迹的辅助数 据。操作系统管理在其上运行的所有进程,并为这些进程公平地分配时间、进程也可以通过 fork和spawn操作来完成其他的任务。不过各个进程有自己的内存空间、数据栈等,所以只能 使用进程间通讯(interprocess communication, IPC),而不能直接共享信息。

Multiprocessing

Unix/Linux操作系统提供了一个fork()系统调用,它非常特殊。普通的函数调用,调用一次,返回一次,但是fork()调用一次,返回两次,因为操作系统自动把当前进程(称为父进程)复制了一份(称为子进程),然后,分别在父进程和子进程内返回。

子进程永远返回0,而父进程返回子进程的ID。 这样做的理由是,一个父进程可以fork出很多子进程 ,所以,父进程要记下每个子进程的ID,而子进程只需要调用getppid()就可以拿到父进程的ID。

Python的os模块封装了常见的系统调用,其中就包括fork,可以在Python程序中轻松创建子进程:

import os

print('Process (%s) starts...' % os.getpid())
# Only works on Unix/Linux/Mac:
pid = os.fork()
if pid == 0:
    print('I am a child process (%s) and my parent is %s.' % (os.getpid(), os.getppid()))
else:
    print('I (%s) just created a child process (%s).' % (os.getpid(), pid))

运行结果如下:

Process (876) starts...
I (876) just created a child process (877).
I am a child process (877) and my parent is 876.

由于Python是跨平台的,自然也应该提供一个跨平台的多进程支持。multiprocessing模块就是跨平台版本的多进程模块。

multiprocessing模块提供了一个Process类来代表一个进程对象,下面的例子演示了启动一个子进程并等待其结束:

from multiprocessing import Process
import os

def run_proc(name):
    print('Run child process %s (%s)...' % (name, os.getpid()))

if __name__ = '__main__':
    print('Parent process %s.' % os.getpid())
    p = Process(target=run_proc, args=('test',))
    print('Child process will start.')
    p.start()
    p.join()
    print('Child process end.')

执行结果如下:

Parent process 928.
Process will start.
Run child process test (929)...
Process end.

创建子进程时,只需要传入一个执行函数和函数的参数,创建一个Process实例,用start()方法启动,这样创建进程比fork()还要简单。join()方法可以等待子进程结束后再继续往下运行,通常用于进程间的同步。

Pool:
如果要启动大量的子进程,可以用进程池的方式批量创建子进程:

from multiprocessing import Pool
import os, time, random

def long_time_task(name):
    print('Run task %s (%s)...' % (name, os.getpid()))
    start = time.time()
    time.sleep(random.random() * 3)
    end = time.time()
    print('Task %s runs %0.2f seconds.' % (name, (end - start)))

if __name__=='__main__':
    print('Parent process %s.' % os.getpid())
    p = Pool(4)
    for i in range(5):
        p.apply_async(long_time_task, args=(i,))
    print('Waiting for all subprocesses done...')
    p.close()
    p.join()
    print('All subprocesses done.')

运行结果如下:

Parent process 669.
Waiting for all subprocesses done...
Run task 0 (671)...
Run task 1 (672)...
Run task 2 (673)...
Run task 3 (674)...
Task 2 runs 0.14 seconds.
Run task 4 (673)...
Task 1 runs 0.27 seconds.
Task 3 runs 0.86 seconds.
Task 0 runs 1.41 seconds.
Task 4 runs 1.91 seconds.
All subprocesses done.

对Pool对象调用join()方法会等待所有子进程执行完毕,调用join()之前必须先调用close(),调用close()之后就不能继续添加新的Process了。apply_async()维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去。

请注意输出的结果,task 0,1,2,3是立刻执行的,而task 4要等待前面某个task完成后才执行,这是因为Pool的默认大小在我的电脑上是4,因此,最多同时执行4个进程。这是Pool有意设计的限制,并不是操作系统的限制。

函数解释:

  • apply(func[, args[, kwds]])是阻塞的。apply用于传递不定参数,同python中的apply函数一致(不过内置的apply函数从2.3以后就不建议使用了),主进程会阻塞于函数。主进程的执行流程同单进程一致。
  • apply_async(func[, args[, kwds[, callback]]]) 它是非阻塞,与apply用法一致,但它是非阻塞的且支持结果返回后进行回调。主进程循环运行过程中不等待apply_async的返回结果,在主进程结束后,即使子进程还未返回整个程序也会退出。虽然 apply_async是非阻塞的,但其返回结果的get方法却是阻塞的,如使用result.get()会阻塞主进程。如果我们对返回结果不感兴趣, 那么可以在主进程中使用pool.close与pool.join来防止主进程退出。注意join方法一定要在close或terminate之后调用。
  • close() 关闭pool,使其不在接受新的任务。
  • terminate() 结束工作进程,不在处理未完成的任务。
  • join() 主进程阻塞,等待子进程的退出, join方法要在close或terminate之后使用。

Threads

线程(有时被称为轻量级进程)跟进程有些相似,不同的是,**所有的线程运行在同一个进程 中,共享相同的运行环境。**它们可以被想象成是在主进程或“主线程”中并行运行的“迷你进 程”。

**线程有开始,顺序执行和结束三部分。**它有一个自己的指令指针,记录自己运行到什么地 方。线程的运行可能被抢占(中断),或暂时的被挂起(也叫睡眠),让其他的线程运行, 这叫做让步。一个进程中的各个线程之间共享同一片数据空间,所以线程之间可以比进程之 间更方便地共享数据以及相互通讯。线程一般都是并发执行的,正是由于这种并行和数据共 享的机制使得多个任务的合作变为可能。实际上,在单CPU的系统中,真正的并发是不可能 的,每个线程会被安排成每次只运行一小会儿,然后就把CPU让出来,让其他的线程去运 行。在进程的整个运行过程中,每个线程都只做自己的事,在需要的时候跟其他的线程共享 运行的结果。

当然,这样的共享并不是完全没有危险的。如果多个线程共同访问同一片数据,则由于数据 访问的顺序不一样,有可能导致数据结果的不一致的问题。这叫做竞态条件(race condition)。幸运的是,大多数线程库都带有一系列的同步原语,来控制线程的执行和数据 的访问。

另一个要注意的地方是,由于有的函数会在完成之前阻塞住,在没有特别为多线程做修改的 情况下,这种“贪婪”的函数会让CPU的时间分配有所倾斜。导致各个线程分配到的运行时间可 能不尽相同,不尽公平。

Threads and Python

Global Interpreter Lock

**Python代码的执行由Python虚拟机(也叫解释器主循环)来控制。**Python在设计之初就考虑 到要在主循环中,同时只有一个线程在执行,就像单CPU的系统中运行多个进程那样,内存 中可以存放多个程序,但任意时刻,只有一个程序在CPU中运行。同样地,虽然Python解释 器中可以“运行”多个线程,但在任意时刻,只有一个线程在解释器中运行。

对Python虚拟机的访问由全局解释器锁(global interpreter lock, GIL)来控制,正是这个锁能 保证同一时刻只有一个线程在运行。在多线程环境中,Python虚拟机按以下方式执行。

  1. 设置GIL;
  2. 切换到一个线程去运行;
  3. 运行:
    • 指定数量的字节码的指令,或者
    • 线程主动让出控制(可以调用time.sleep(0))
  4. 把线程设置为睡眠状态;
  5. 解锁GIL;
  6. 再次重复以上所有步骤。

在调用外部代码(如C/C++扩展函数)的时候,GIL将会被锁定,直到这个函数结束为止(由于在这期间没有Python的字节码被运行,所以不会做线程切换)。

Exiting Threads

当一个线程结束计算,它就退出了。线程可以调用thread.exit()之类的退出函数,也可以使用Python退出进程的标准方法,如sys.exit()或抛出一个SystemExit异常等。不过,你不可以直接“杀掉”(kill)一个线程。

The thread Module

除了产生线程外,thread模块也提供了基本的同步数据结构锁对象(lock object,也叫原语锁、简单锁、互斥锁、互斥量、二值信号量)。如之前所说,同步原语与线程的管理是密不可分的。

thread Module and Lock Objects

Function/Method Description
thread Module Function
start_new_thread(function, args, kwargs=None) 产生一个新的线程,在新线程中用指定的参数和可选的kwargs来调用这个函数
allocate_lock() 分配一个LockType类型的锁对象
exit() 让线程退出
LockType Lock Object Methods
acquire(wait = None) 尝试获取锁对象
locked() 如果获取了锁对象返回True,否则返回False
release() 释放锁

start_new_thread()函数是thread模块的一个关键函数,它的语法与内建的apply()函数完全一样,其参数为:函数,函数的参数以及可选的关键字参数。不同的是,函数不是在主线程里运行,而是产生一个新的线程来运行这个函数。

The threading Module

threading Module Objects

Object Description
Thread 表示一个线程的执行的对象
Lock 锁原语对象(跟thread模块里的锁对象相同)
RLock 可重入锁对象,使单线程可以再次获得已经获得了的锁(递归锁定)
Condition 条件变量对象能让一个线程停下来,等待其他线程满足了某个“条件”,如,状态的改变或值的改变
Event 通用的条件变量,多个线程可以等待某个事件的发生,在事件发生后,所有的线程都会被激活
Semaphore 为等待锁的线程提供一个类似“等候室”的结构
BoundedSemaphore 与Semaphore类似,只是它不允许超过初始值
Timer 与Thread相似,只是它要等待一段时间后才开始运行
Barrier 创建一个障碍,一些指定的线程在被允许继续之前必须到达该障碍

Daemon threads

Threading模块支持守护线程,它们是这样工作的:守护线程一般是一个等待客户请求服务器,如果没有客户提出请求,它就在那等着。如果你设定一个线程为守护线程,就表示你在说这个线程是不重要的,在进程退出的时候,不用等待这个线程退出。

如果你的主线程要退出的时候,不用等待那些子线程完成,那就设定这些线程的daemon属性。即,在线程开始(调用thread.start())之前,调用setDaemon()函数设定线程的daemon标志(thread.setDaemon(True))就表示这个线程“不重要”。

如果你想要等待子线程完成再退出,那就什么都不用做,或者显示地调用thread.setDaemon(False)以保证其daemon标志为False。你可以调用thread.isDaemon()函数来判断其daemon标志的值。新的子线程会继承其父线程的daemon标志。整个Python会在所有的非守护线程退出后才会结束,即进程中没有非守护线程存在的时候才结束。

The Thread Class

threading模块中的Thread类是你主要的运行对象,它有很多thread模块里没有的函数,如下表所示。
Thread Object Attribute and Methods

Attribute Description
Thread对象数据属性
name 一个线程的名称
ident 一个线程的标识符
daemon 布尔型标志,预示着一个线程是否是守护型的
Thread对象方法
init(group=None, target=None, name=None, args=(), kwargs={}, verbose=None, daemon=None) 实例化一个Thread对象,使目标和任何参数或键值参数可回调的,一个名字或者一个组也是能够通过的,但是一个组是未实现的,一个冗长的标志也是可接受的,任何守护值设置thread.daemon属性或标志
start() 开始线程的执行
run() 定义线程的功能的函数(一般会被子类重写)
join(timeout=None) 程序挂起,直到线程结束;如果给了time-out,则最多阻塞timeout秒
getName() 返回线程的名称
setName(name) 设置线程的名称
isAlive/is_alive() 布尔标志,表示这个线程是否还在运行中
isDaemon() 返回线程的daemon 标志
setDaemon(daemonic) 把线程的daemon标志设为daemonic(一定要在调用start()函数前调用

用Thread类,你可以用多种方法来创建线程:

  • 创建一个Thread的实例,传给它一个函数;
  • 创建一个Thread的实例,传给它一个可调用的类对象;
  • 从Thread派生出一个子类,创建一个这个子类的实例。

Create Thread Instance, Passing in Function

在实例化每个Thread对象的时候,我们把函数(target)和参数(args)传进去,得到返回的Thread实例。实例化一个Thread(调用Thread())与调用thread.start_new_thread()之间最大的区别就是,新的线程不会立即开始。在你创建线程对象,但不想马上运行线程的时候,这是一个很有用的同步特性。

threading模块的Thread类有一个join()函数,允许主线程等待线程的结束。join()会等到线程结束,或者在给了timeout参数的时候,等到超时为止。使用join()看上去会比使用一个等待锁释放的无限循环清楚一些(这种锁也被称为“自旋锁”)。join()的另一个比较重要的方面是它可以完全不用调用。一旦线程启动后,就会一直运行,直到线程的函数结束,退出为止。如果你的主线程除了等线程结束外,还有其他的事情要做(如处理或等待其他的客户请求),那就不用调用join(),只有在你要等待线程结束的时候才要调用join()。

Other Threading Module Functions

除了各种同步对象和线程对象外,threading模块还提供了一些函数。
Threading Module Function

Function Description
activeCount/active_count() 当前活动的线程对象的数量
currentThread()/current_thread 返回当前线程对象
enumerate() 返回当前活动线程的列表
settrace(func) 为所有线程设置一个跟踪函数
setprofile(func) 为所有线程设置一个跟踪函数
stack_size(size=0) 返回新创建的线程的堆栈大小,任意尺寸可以设置给后来创建的线程

Queue Module

Queue模块可以用来进行线程间通讯,让各个线程之间共享数据。

函数 描述
Queue模块函数
queue(size) 创建一个大小为size的Queue对象
Queue对象函数
qsize() 返回队列的大小(由于在返回的时候,队列可能会被其他线程修改,所以这个值是近似值)
empty() 如果队列为空,返回True;否则返回False
full() 如果队列已满,返回True;否则返回False
put(item, block=0) 把item放到队列中,如果给了block(不为0),函数会一直阻塞到队列中有空间为止
get(block=0) 从队列中取一个对象,如果给了block(不为0),函数会一直阻塞到队列中有对象为止
#!/usr/bin/env python
# -*-coding:utf-8-*-

from multiprocessing import Process, Queue
import os, time, random

# the codes to be executed by data-writing subprocess
def write(q):
    print('Process to write: %s' % os.getpid())
    for value in ['A', 'B', 'C']:
        print('Put %s to queue...' % value)
        q.put(value)
        time.sleep(random.random())

# the codes to be executed by data-reading subprocess 
def read(q):
    print('Process to read: %s' % os.getpid())
    while True:
        value = q.get(True)
        print('Get %s from queue.' % value)

if __name__ == '__main__':
    q = Queue()       # parent process creates Queue and then pass to each subprocess
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    pw.start()         # start subprogress pw, write into queue
    pr.start()           # start subprocess pr, read out from queue
    pw.join()           # wait for subprocess pw to end
    pr.terminate()  # subprocess pr is a endless loop, and no way to wait for it to end, so have to force it to finish

运行结果为:

Process to write: 4231
Put A to queue...
Process to read: 4232
Get A from queue.
Put B to queue...
Get B from queue.
Put C to queue...
Get C from queue.

下表列出了一些多线程编程中可能用得到的模块:

模块 描述
thread 基本的,低级别的线程模块
threading 高级别的线程和同步对象
Queue 供多线程使用的同步先进先出(FIFO)队列
mutex 互斥对象
SocketServer 具有线程控制的TCP和UDP管理器