reference: Python 源码剖析
Python_multithreading
GIL与线程调度
Python线程的创建
Python线程的调度
Python子线程的销毁
Python线程的用户级互斥与同步
高级线程库–threading
1.Python多线程机制 a.GIL与线程调度 b.初见Python Thread c.Python线程的创建 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 [threadmodule.c] static PyObject* thread_PyThread_start_new_thread (PyObject* self, PyObject* fargs) { PyObject* func, *args, *keyw = NULL ; struct bootstate * boot ; long ident; PyArg_UnpackTuple(fargs,"start_new_thread" ,2 ,3 ,&func,&args,&keyw); boot = PyMem_NEW(struct bootstate, 1 ); boot->interp = PyThreadState_GET()->interp; boo->func = func; boot->args = args; boot->keyw - keyw; PyEval_InitThreads(); ident = PyThread_start_new_thread(t_bootstrap,(void *)boot); return PyInt_FromLong(ident); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 [pythread.h] typedef void * PyThread_type_lock;[ceval.c] static PyThread_type_lock interpreter_lock = 0 ; static long main_thread = 0 ;void PyEval_InitThreads (void ) { if (interpreter_lock) return ; interpreter_lock = PyThread_allocate_lock(); PyThread_acquire_lock(interpreter_lock,1 ); main_thread = PyThread_get_thread_ident(); } [thread_nt.h] PyThread_type_lock PyThread_allocate_lock (void ) { PNRMUTEX aLock; if (!initialized) PyThread_init_thread(); aLock = AllocNonRecursiveMutex(); return (PyThread_type_lock) aLock; [thead.c] void PyThread_init_thread (void ) { if (initialized) return ; initialized = 1 ; PyThread__init_thread(); } [thread_nt.h] static void PyThread__init_thread (void ) {}}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 [thread_nt.h] typedef struct NRMUTEX { LONG owned; DWORD thread_id; HANDLE hevent; } NRMUTEX, *PNRMUTEX [thread_nt.h] PNRMUTEX AllocNonRecursiveMutex (void ) { PNRMUTEX mutex = (PNRMUTEX)malloc (sizeof (NRMUTEX)); if (mutex && !InitializeNonRecursiveMutex(mutex)){ free (mutex); Mutex = NULL ; } return mutex; } BOOL InitializeNonRecursiveMutex (PNRMUTEX mutex) { ... mutex->owned = -1 ; mutex->thread_id = 0 ; mutex->hevent = CreadEvent(NULL ,FALSE,FALSE,NULL ); return mutex->hevent != NULL ; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 [thread_nt.h] int PyThread_acquire_lock (PyThread_type_lock aLock, int waitflag) { int success; success = aLock && EnterNonRecursiveMutex((PNRMUTEX) aLock, (waitflag == 1 ? INFINITE : 0 )) == WAIT_OBJECT_0; return success; } DWORD EnterNonRecursiveMutex (PNRMUTEX mutex, BOOL wait) { DWORD ret; if (!wait){ if (InterlockedCompareExchange((PVOID *)&mutex->owned,(PVOID)0 ,(PVOID) -1 ) != (PVOID) - 1 ) return WAIT_TIMEOUT; ret = WAIT_OBJECT_0; }else { ret = InterlockedIncrement(&mutex->owned) ? WaitForSingleObject(mutex->hevent,INFINITE) : WAIT_OBJECT_0; } mutex->thread_id = GetCurrentThreadId(); return ret; }
1 2 3 4 5 6 7 8 9 10 11 [thread_nt.h] void PyThread_release_lock (PyThread_type_lock aLock) { LeaveNonRecursiveMutex((PNRMUTEX)aLock); } BOOL LeaveNonRecursiveMutex (PNRMUTEX mutex) { mutex->thread_id = 0 ; return InterlockedDecrement(&mutex->owned) < 0 || SetEvent(mutex->hevent); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 [threadmodule.c] static PyObject* thread_PyThread_start_new_thread (PyObject* self, PyObject* fargs) { PyObject* func, *args, *keyw = NULL ; struct bootstate * boot ; long ident; PyArg_UnpackTuple(fargs,"start_new_thread" ,2 ,3 ,&func,&args,&keyw); boot = PyMem_NEW(struct bootstate, 1 ); boot->interp = PyThreadState_GET()->interp; boo->func = func; boot->args = args; boot->keyw - keyw; PyEval_InitThreads(); ident = PyThread_start_new_thread(t_bootstrap,(void *)boot); return PyInt_FromLong(ident); } [thread.c] static size_t _pythread_stacksize = 0 ; [thread_nt.h] long PyThread_start_new_thread(void (*func)(void *), void* arg){ unsigned long rv; callobj obj; obj.id = -1 ; obj.func = func; obj.arg = arg; obj.done = CreateSemaphore(NULL ,0 ,1 ,NULL ); rv = _beginthread(bootstrap,_pythread_stacksize,&obj); if (rv == (unsigned long ) - 1 ){ obj.id = -1 ; }else { WaitForSingleObject(obj.done,INFINITE); } CloseHandle((HANDLE)obj.done); return obj.id; } [thread_nt.h] typedef struct { void (*func)(void *); void *arg; long id; HANDLE done; } callobj;
1 2 3 4 5 6 7 8 9 10 11 12 13 [thread_nt.h] static int bootstrap (void *call) { callobj* obj = (callobj *)call; void (*func)(void *) = obj->func; void *arg = obj->arg; obj->id = PyThread_get_thread_ident(); ReleaseSemaphore(obj->done,1 ,NULL ); func(arg); return 0 ; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 [threadmodule.c] static void t_bootstrap (void * boot_raw) { struct bootstate * boot = (struct bootstate *)boot_raw ; PyThreadState* tstate; PyObject* res; tstate = PyThreadState_New(boot->interp); PyEval_AcquireThread(tstate); res = PyEval_CallObjectWithKeywords(boot->func,boot->args,boot->keyw); PyMem_DEL(boot_raw); PyThreadState_Clear(tstate); PyThreadState_DeleteCurrent(); PyThread_exit_thread(); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 [ceval.c] void PyEval_AcquireThread (PyThreadState* tstate) { if (tstate == NULL ) Py_FatalError("PyEval_AcquireThread: NULL new thread state" ); assert(interpreter_lock); PyThread_acquire_lock(interpreter_lock,1 ); if (PyThreadState_Swap(tstate) != NULL ) Py_FatalError("PyEval_AcquireThread: non-NULL old thread state" ); } [pystate.c] PyThreadState* PyThreadState_Swap (PyThreadState* newts) { PyThreadState* oldts = _PyThreadState_Current; _PyThreadState_Current = newts; return oldts; }
1 2 3 4 5 6 7 8 9 10 [pystate.h] typedef struct _ts { struct _ts *next ; PyInterpreterState* interp; struct _frame * frame ; int recursion_depth; ... int gilstate_counter; long thread_id; } PyThreadState;
1 2 3 4 5 6 7 8 9 [thread.c] struct key { struct key * next ; long id; int key; void * value; }; static struct key * keyhead = NULL ;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 [pystate.c] static PyInterpreterState* autoInterpreterState = NULL ;static int autoTLSkey = 0 ;void _PyGILState_Init(PyInterpreterState* i,PyThreadState* t){ autoTLSkey = PyThread_create_key(); autoInterpreterState = i; assert(PyThread_get_key_value(autoTLSkey) == NULL ); _PyGILState_NoteThreadState(t); ... } static void _PyGILState_NoteThreadState(PyThreadState* tstate){ if (!autoTLSkey) return ; PyThread_set_key_value(autoTLSkey,(void *)tstate); tstate->gilstate_counter = 1 ; } [thread.c] static PyThreadState_create_key (void ) { if (keymutex == NULL ) keymutex = PyThread_allocate_lock(); return ++nkeys; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 [thread.c] static struct key* find_key (int key,void * value) { struct key * p ; long id = PyThread_get_thread_ident(); PyThread_acquire_lock(keymutex,1 ); for (p = keyhead; p != NULL ; p = p->next){ if (p->id ==id && p->key == key) goto Done; } p = (struct key *)malloc (sizeof (struct key)); if (p != NULL ){ p->id = id; p->key = key; p->next = keyhead; keyhead = p; } Done: PyThread_release_lock(keymutex); return p; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 [thread.c] void * PyThread_get_key_value (int key) { struct key * p = find_key (key ,NULL ); return p->value; } int PyThread_set_key_value (int key, void * value) { struct key * p = find_key (key ,value ); return 0 ; } void PyThread_delete_key (int key) { struct key *p ,**q ; PyThread_acquire_lock(keymutex,1 ); q = &keyhead; while ((p = *q) != NULL ){ if (p->key == key){ *q = p->next; free ((void *)p); }else { q = &p->next; } PyThread_release_lock(keymutex); } }
1 2 3 4 5 6 7 8 9 10 [pystate.c] PyThreadState* PyThreadState_New (PyInterpreterState* interp) { PyThreadState* tstate = (PyThreadState *)malloc (sizezof(PyThreadState)); ... #ifdef WITH_THREAD _PyGILState_NoteThreadState(tstate); #endif ... return tstate; }
当前活动的python线程不一定是获得了GIL的线程
当所有的线程都完成了初始化动作之后,操作系统的线程调度和Python的线程调度才会统一
进入Python解释器后才完成线程初始化(PyEval_EvalFrame)
d.Python线程的调度
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 [ceval.c] PyObject* PyEval_EvalFrameEx (PyFrameObject* f) { ... why = WHY_NOT; for (;;){ ... if (--_Py_Ticker < 0 ){ _Py_Ticker = _Py_CheckInterval; tstate->tick_counter++; if (interpreter_lock){ PyThreadState_Swap(NULL ); PyThread_release_lock(interpreter_lock); PyThread_acquire_lock(interpreter_lock,1 ); PyThreadState_Swap(tstate) != NULL ; } } ... } } [ceval.c] int _Py_CheckInterval = 100 ;volatile int _Py_Ticker = 100 ;
e.Python子线程的销毁 1 2 3 4 5 6 7 8 9 10 11 12 13 14 [threadmodule.c] static void t_bootstrap (void * boot_raw) { struct bootstate * boot = (struct bootstate *)boot_raw ; PyThreadState* tstate; PyObject* res; tstate = PyThreadState_New(boot->interp); PyEval_AcquireThread(tstate); res = PyEval_CallObjectWithKeywords(boot->func,boot->args,boot->keyw); PyMem_DEL(boot_raw); PyThreadState_Clear(tstate); PyThreadState_DeleteCurrent(); PyThread_exit_thread(); }
Python首先通过PyThreadState_Clear清理当前线程所对应的线程状态对象
1 2 3 4 5 6 7 8 9 10 [pystate.c] void PyThreadState_DeleteCurrent () { PyThreadState* tstate = _PyThreadState_Current; _PyThreadState_Current = NULL ; tstate_delete_common(tstate); if (autoTLSkey && PyThread_get_key_value(autoTLSkey) == tstate) PyThread_delete_key_value(autoTLSkey); PyEval_ReleaseLock(); }
Python最后通过PyThread_exit_thread完成各个平台上不同的销毁原声线程的工作
f.Python线程的用户级互斥与同步
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 [threadmodule.c] static PyObject* thread_PyThread_allocate_lock (PyObject* self) { return (PyObject *)newlockobject(); } static lockobject* newlockobject (void ) { lockobject* self; self = PyObject_New(lockobject,&Locktype); self->lock_lock = PyThread_allocate_lock(); return self; } [pythread.h] typedef void * PyThread_type_lock;[threadmodule.c] typedef struct { PyObject_HEAD PyThread_type_lock lock_lock; } lockobject; [threadmodule.c] static PyMethodDef lock_method[] = { {"acquire_lock" ,(PyCFunction)lock_PyThread_acquire_lock,...} {"acquire" ,(PyCFunction)lock_PyThread_acquire_lock,...} {"release_lock" ,(PyCFunction)lock_PyThread_release_lock,...} {"release" ,(PyCFunction)lock_PyThread_release_lock,...} {"locked_lock" ,(PyCFunction)lock_locked_lock,...} {"locked" ,(PyCFunction)lock_locked_lock,..} {NULL ,NULL } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 [threadmodule.c] static PyObject* lock_PyThread_acquire_lock (lockobject* self,PyObject* args) { int i = 1 ; PyArg_ParseTuple(args,"|i:acquire" ,&i); Py_BEGIN_ALLOW_THREADS i = PyThread_acquire_lock(self->lock_lock,i) Py_END_ALLOW_THREADS } static PyObject* lock_PyThread_release_lock(lockobject* self){ if (PyThread_acquire_lock(self->lock_lock,0 )){ PyThread_release_lock(self->lock_lock); PyErr_SetString(ThreadError,"release unlocked lock" ); return NULL ; } PyThread_release_lock(self->lock_lock); Py_INCREF(Py_None); return Py_None; }
g.高级线程库–threading
1 2 3 4 5 6 7 [threading.py] import thread_start_new_thread = thread.start_new_thread _allocate_lock = thread.allocate_lock _get_ident = thread.get_ident ThreadError = thread.error
1 2 3 4 5 [threading.py] _active_limbo_lock = _allocate_lock() _active = {} // _active[thread_id] = thread 已经创建子线程 _limbo = {} //_limbo[thread] = thread 在未创建原生子线程
1 2 3 4 5 6 7 [threading.py] def enumerate () : _active_limbo_lock.acquire() active = _active.values() + _limbo.values() _active_limbo_lock.release() return active
1 2 3 [threading.py] _allocate_lock = thread.allocate_lock Lock = _allocate_lock
可重入的Lock
需要一个Lock对象作为参数,否则将在内部自行创建一个RLock对象,提供了wait和notify语义
Semaphore对象内部维护着一个Condition对象,管理共享资源池
Event
Threading 中的 Thread
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 [threading.py] class Thread (_Verbose) : __initialized = False def __init__ (self,group=None, target=None, name=None, args=() ,kwargs={},verbose=None) : ... self.__name = str(name or _newname()) self.__started = False self.__stopped = False self.__block = Condition(Lock()) self.__initialized = True def start (self) : _active_limbo_lock.acquire() _limbo[self] = self _active_limbo_lock.release() _start_new_thread(self.__bootstrap,{}) _sleep(0.000001 ) def run (self) : if self.__target: self.__target(*self.__args,**self.__kwargs) def __bootstrap (self) : try : self.__started = True _active_limbo_lock.acquire() _active[_get_ident()] = self del _limbo[self] _active_limbo_lock.release() try : self.run() finally : self.__stop() try : self.__delete() except : pass def __stop (self) : self.__block.acquire() self.__stopped = True self.__block.notifyAll() self.__block.release() def join (self,timeout=None) : self.__block.acquire() if timeout is None : while not self.__stopped: self.__block.wait() else : deadline = _time() + timeout while not self.__stopped: delay = deadline - _time() if delay <= 0 : if __debug__: self._note("%s.join(): timed out" ,self) break self.__block,wait(delay) else : if __debug__: self._note("%s.joi(): thread stopped" ,self) self.__block.release()