0x01 python GIL
关于python的GIL(全局解释器锁),大部分解释都已经给出,因此来复习一下。其GIL锁的作用为用于保护对Python对象的访问,从而防止多个线程一次执行Python字节码。也就是说无论有多少个CPU,多个线程的任务同时只能有一个任务在执行,这就造成了python的多线程不是真正的多线程。那么真正多线程为pthread模块等,其在单个CPU中,其运行方式与python多线程一致,而在两个及以上CPU的时候,每一个CPU可以同时执行一个线程。在Cpython中,主要有两个操作:
- PyThread_acquire_lock
- PyThread_release_lock
其GIL的代码在https://github.com/python/cpython/blob/00ada2c1d57c5b8b468bad32ff24fa14113ae5c7/Python/thread_pthread.h,获取GIL锁的代码如下
int
PyThread_acquire_lock(PyThread_type_lock lock, int waitflag)
{
return PyThread_acquire_lock_timed(lock, waitflag ? -1 : 0, /*intr_flag=*/0);
}
PyLockStatus
PyThread_acquire_lock_timed(PyThread_type_lock lock, PY_TIMEOUT_T microseconds,
int intr_flag)
{
PyLockStatus success = PY_LOCK_FAILURE;
pthread_lock *thelock = (pthread_lock *)lock;
int status, error = 0;
dprintf(("PyThread_acquire_lock_timed(%p, %lld, %d) called\n",
lock, microseconds, intr_flag));
if (microseconds == 0) {
status = pthread_mutex_trylock( &thelock->mut ); /* 获取lock变量权限 */
if (status != EBUSY)
CHECK_STATUS_PTHREAD("pthread_mutex_trylock[1]");
}
else {
status = pthread_mutex_lock( &thelock->mut ); /* 获取lock变量权限 */
CHECK_STATUS_PTHREAD("pthread_mutex_lock[1]");
}
if (status == 0) {
if (thelock->locked == 0) {
success = PY_LOCK_ACQUIRED;
}
else if (microseconds != 0) {
struct timespec abs;
if (microseconds > 0) {
_PyThread_cond_after(microseconds, &abs);
}
/* continue trying until we get the lock */
/* mut must be locked by me -- part of the condition
* protocol */
while (success == PY_LOCK_FAILURE) {
if (microseconds > 0) {
/* 已经上锁,等待释放锁*/
status = pthread_cond_timedwait(
&thelock->lock_released,
&thelock->mut, &abs);
if (status == 1) {
break;
}
if (status == ETIMEDOUT)
break;
CHECK_STATUS_PTHREAD("pthread_cond_timedwait");
}
else {
status = pthread_cond_wait(
&thelock->lock_released,
&thelock->mut);
CHECK_STATUS_PTHREAD("pthread_cond_wait");
}
if (intr_flag && status == 0 && thelock->locked) {
/* We were woken up, but didn't get the lock. We probably received
* a signal. Return PY_LOCK_INTR to allow the caller to handle
* it and retry. */
success = PY_LOCK_INTR;
break;
}
else if (status == 0 && !thelock->locked) {
success = PY_LOCK_ACQUIRED;
}
}
}
if (success == PY_LOCK_ACQUIRED) thelock->locked = 1; /* 当前线程上锁 */
status = pthread_mutex_unlock( &thelock->mut );
/* 释放锁以让其他线程进入临界区等待锁 */
CHECK_STATUS_PTHREAD("pthread_mutex_unlock[1]");
}
if (error) success = PY_LOCK_FAILURE;
dprintf(("PyThread_acquire_lock_timed(%p, %lld, %d) -> %d\n",
lock, microseconds, intr_flag, success));
return success;
}
释放锁代码如下
void
PyThread_release_lock(PyThread_type_lock lock)
{
pthread_lock *thelock = (pthread_lock *)lock;
int status, error = 0;
(void) error; /* silence unused-but-set-variable warning */
dprintf(("PyThread_release_lock(%p) called\n", lock));
status = pthread_mutex_lock( &thelock->mut ); /* 通过上锁mutex来进行GIL的操作 */
CHECK_STATUS_PTHREAD("pthread_mutex_lock[3]");
thelock->locked = 0; /* 释放GIL锁 */
/* wake up someone (anyone, if any) waiting on the lock */
status = pthread_cond_signal( &thelock->lock_released ); /* 释放信号量 通知线程GIL以解锁 */
CHECK_STATUS_PTHREAD("pthread_cond_signal");
status = pthread_mutex_unlock( &thelock->mut ); /* 解锁mutex */
CHECK_STATUS_PTHREAD("pthread_mutex_unlock[3]");
}
GIL的切换可以由sys.setswitchinterval
控制,同时也可以靠着"时间片轮"来进行切换
0x02 多线程切换方式
进程上下文切换有四种方式
- 先到先服务(FCFS)
- 最短作业优先(SJF)
- 优先级调度
- 时间片轮法(RR)
在多进程的上下文切换中,除了主要的硬件IO的等待使得任务进入阻塞态、延时函数之外,能使进程进行上下文切换的就只有时间轮了。那么时间轮的意思就是给丁程序一个时间片,程序在这个时间片中运行,如果超出了这个时间片,那么CPU便会停止该程序,进而上下文切换到另一个进程中来执行。其线程的操作也是一样的,举个例子:有两个进程A和B,进程A中有两个线程a1和a2,B进程有两个线程b1和b2,同时也有两个CPU,那么同时运行两个进程,那么进程A和进程B是同时运行的。一般来说A和B两个进程在每个CPU都是独立的,互相都不干涉。但是如果线程a1和a2都因为IO等待进入到阻塞态中,那么改CPU不会等待进程A中的两个线程,而是直接上下文切换到进程B中的另一个线程。同理多线程也是一样的态度。
并不是所有的线程切换都需要内核来调度,用户线程不需要内核的调度,它归属于用户调度
0x03 python 线程、进程和协程
对于python中的进程一般使用的是multiprocessing
这个模块,该模块可以让python实现真正意义上的并行操作,并行和并发的区别就是并发为多个任务同时产生,但不一定是同时运行,而并行就是多个任务同时在运行。
进程
multiprocessing
一般使用可以用Pool的对象,它提供了一种快捷的方法,可以将输入数据分配给不同进程处理(数据并行)
from multiprocessing import Pool
import multiprocessing
def test(x):
return x**3
if __name__ == '__main__':
pool = Pool(multiprocessing.cpu_count())
print(pool.map(test,[1,2,3]))
另外一个类就是Process
类,通过创建一个 Process
对象然后调用它的 start()
方法来生成进程,例子大致可以用这些例子
from multiprocessing import Pool,Process
import multiprocessing
def test(x):
print(x**3)
if __name__ == '__main__':
for i in range(3,10):
pros = Process(target=test, args = (i,))
pros.start()
pros.join()
线程
其主要模块是threading
,此文也不再介绍
协程
协程也是线程的另一种,也就是用户态的另一种线程,但是切换的成本比线程还要小,代表的模块有asyncio
,gevent
等。大致用法为
import gevent
from gevent import monkey;monkey.patch_all()
def test(x):
print(x**3)
return x**3
if __name__ =='__main__':
jobs = []
for i in range(10):
jobs.append(gevent.spawn(test,i))
gevent.joinall(jobs)
同时monkey.patch_all()这个代码为猴子补丁,作用为当gevent开多线程时monkey会阻塞住线程的继续执行,需要对monkey.patch_all进行处理。猴子补丁的意思是在运行是的动态替换。可以看成在模块运行时动态替换模块功能。可以用如下方法表示
from SomeOtherProduct.SomeModule import SomeClass
def speak(self):
return "ook ook eee eee eee!"SomeClass.speak = speak