a 5êdgÜÍã@sVdZddlZddlmZmZmZmZddlmZm Z ddl Z ddl Z ddl Z ddl Z ddlZddlZddlZddlZddlZddlZddlZddlZddlmZddlmZdZGdd „d eƒZGd d „d e jƒZGd d „d ejƒZGdd„deƒZGdd„deƒZ Gdd„deƒZ!Gdd„deƒZ"Gdd„de jƒZ#Gdd„deƒZ$Gdd„deƒZ%Gdd„dej&ƒZ&Gdd„dej'ƒZ(e )e j*dud ¡Gd!d"„d"ej'ƒƒZ+Gd#d$„d$ej,ƒZ,Gd%d&„d&ej'ƒZ-Gd'd(„d(ej.ƒZ.Gd)d*„d*ej/ƒZ/Gd+d,„d,ej0ƒZ0Gd-d.„d.ej1ƒZ1Gd/d0„d0ejƒZ2Gd1d2„d2ejƒZ3Gd3d4„d4ejƒZ4e5d5krRe 6¡dS)6z! Tests for the threading module. éN)ÚverboseÚ import_moduleÚ cpython_onlyÚunlink)Úassert_python_okÚassert_python_failure)Ú lock_tests)Úsupport)Znetbsd5zhp-ux11c@s,eZdZdd„Zdd„Zdd„Zdd„Zd S) ÚCountercCs d|_dS)Nr©Úvalue©Úself©rú//usr/local/lib/python3.9/test/test_threading.pyÚ__init__#szCounter.__init__cCs|jd7_dS©Nér r rrrÚinc%sz Counter.inccCs|jd8_dSrr r rrrÚdec'sz Counter.deccCs|jS©Nr r rrrÚget)sz Counter.getN)Ú__name__Ú __module__Ú __qualname__rrrrrrrrr "sr c@seZdZdd„Zdd„ZdS)Ú TestThreadcCs,tjj||d||_||_||_||_dS)N©Úname)Ú threadingÚThreadrÚtestcaseÚsemaÚmutexÚnrunning)rrr r!r"r#rrrr-s zTestThread.__init__c Cs&t ¡d}tr&td|j|dfƒ|jä|jB|j ¡trTt|j ¡dƒ|j   |j ¡d¡Wdƒn1s|0Yt   |¡tr¢td|jdƒ|jJ|j  ¡|j  |j ¡d¡trätd |j|j ¡fƒWdƒn1sø0YWdƒn1s0YdS) NgˆÃ@ztask %s will run for %.1f usecg€„.Aztasks are runningéZtaskÚdonerz$%s is finished. %d tasks are running)ÚrandomrÚprintrr!r"r#rrr ZassertLessEqualÚtimeÚsleeprZassertGreaterEqual)rÚdelayrrrÚrun4s*  ÿ 2  ÿzTestThread.runN)rrrrr+rrrrr,src@seZdZdd„Zdd„ZdS)Ú BaseTestCasecCstj ¡|_dSr)Útestr Zthreading_setupÚ_threadsr rrrÚsetUpNszBaseTestCase.setUpcCstjj|jŽtj ¡dSr)r-r Zthreading_cleanupr.Ú reap_childrenr rrrÚtearDownQszBaseTestCase.tearDownN)rrrr/r1rrrrr,Msr,c@s”eZdZdd„Zdd„Zdd„Zdd„Zd d „Zd d „Zd d„Z dd„Z dd„Z dd„Z dd„Z dd„Zdd„Zdd„Zdd„Ze eedƒd ¡d!d"„ƒZe eedƒd#¡d$d%„ƒZe eedƒd ¡d&d'„ƒZd(d)„Ze eedƒd*¡e eed+ƒd,¡d-d.„ƒƒZe ejevd/¡e eedƒd*¡e eed+ƒd,¡d0d1„ƒƒƒZd2d3„Z d4d5„Z!d6d7„Z"d8d9„Z#d:d;„Z$e%dd?„ƒZ'd@dA„Z(dBdC„Z)dDS)EÚ ThreadTestsc Cs0d}tjdd}t ¡}tƒ}g}t|ƒD]F}td|||||ƒ}| |¡| |j¡|  t |ƒd¡|  ¡q*t tdƒr¸t dd„|Dƒƒt ¡hB}| d|¡| t|ƒ|d ¡trÄtd ƒ|D]D}| ¡| | ¡¡| |jd ¡| |j¡|  t |ƒd ¡qÈtrtd ƒ| | ¡d ¡dS)Né r$r z z^$Ú get_native_idcss|] }|jVqdSr)Ú native_id)Ú.0ÚtrrrÚ nóz/ThreadTests.test_various_ops..rz!waiting for all tasks to completerz#^$zall tasks done)rÚBoundedSemaphoreÚRLockr ÚrangerÚappendÚ assertIsNoneÚidentÚ assertRegexÚreprÚstartÚhasattrÚsetr4Ú assertNotInÚ assertEqualÚlenrr'ÚjoinÚ assertFalseÚis_aliveÚassertNotEqualÚassertIsNotNoner) rZNUMTASKSr!r"Z numrunningÚthreadsÚir7Z native_idsrrrÚtest_various_opsZs4        zThreadTests.test_various_opscs†| t ¡j¡‡‡fdd„}t ¡‰g‰t ¡4t |d¡}ˆ  ¡|  ˆd|¡Wdƒn1sl0Ytj ˆd=dS)Ncsˆ t ¡j¡ˆ ¡dSr)r=rÚ currentThreadr?rDr©r%r?rrÚfsz9ThreadTests.test_ident_of_no_threading_threads..frr) rLrrPr?ÚEventr Úwait_threads_exitÚ_threadÚstart_new_threadÚwaitrFÚ_active)rrRÚtidrrQrÚ"test_ident_of_no_threading_threads~s  .z.ThreadTests.test_ident_of_no_threading_threadscCsPtr tdƒzt d¡Wntjy8t d¡‚Yn0| ¡t d¡dS)Nz!with 256 KiB thread stack size...iú4platform does not support changing thread stack sizer© rr'rÚ stack_sizerUÚerrorÚunittestZSkipTestrOr rrrÚtest_various_ops_small_stackŽsÿ z(ThreadTests.test_various_ops_small_stackcCsPtr tdƒzt d¡Wntjy8t d¡‚Yn0| ¡t d¡dS)Nzwith 1 MiB thread stack size...ir[rr\r rrrÚtest_various_ops_large_stackšsÿ z(ThreadTests.test_various_ops_large_stackcCs®dd„}t ¡}| ¡t ¡&t ||f¡}| ¡Wdƒn1sL0Y| |tj¡|  tj|tj ¡|  tj|  ¡¡|  ttj|ƒd¡tj|=dS)NcSst ¡| ¡dSr)rÚcurrent_threadÚrelease)r"rrrrR§sz*ThreadTests.test_foreign_thread..fÚ _DummyThread)rÚLockÚacquirer rTrUrVÚassertInrXÚassertIsInstancerdÚ assertTruerJr@rA)rrRr"rYrrrÚtest_foreign_thread¥s &zThreadTests.test_foreign_threadc sÂtdƒ}|jj}|j|jf|_Gdd„dtƒ‰| ˆ¡}t ¡}|  |t ¡|  |d¡z|||ƒ}qdWnˆyzYn 0|  d¡z|  |d¡Wnty¨Yn0t ¡‰t ¡‰G‡‡‡fdd„dtjƒ}|ƒ}d |_| ¡trôtd ƒtrtd ƒ|d |ƒ}|  |d¡tr&td ƒˆ ¡}| |¡trFtdƒ| |j¡tr`tdƒ||j|ƒ}|  |d¡tr†tdƒˆjtjd| |j¡tr®tdƒ|jr¾| ¡dS)NÚctypesc@s eZdZdS)z.AsyncExcN)rrrrrrrÚAsyncExcÂsrlrzAsyncExc not raisedrcseZdZ‡‡‡fdd„ZdS)z:ThreadTests.test_PyThreadState_SetAsyncExc..WorkercsNt ¡|_d|_zˆ ¡t d¡qWn ˆyHd|_ˆ ¡Yn0dS)NFgš™™™™™¹?T)rÚ get_identÚidÚfinishedrDr(r)r ©rlZworker_saw_exceptionZworker_startedrrr+æs  z>ThreadTests.test_PyThreadState_SetAsyncExc..Worker.runN©rrrr+rrprrÚWorkeråsrrTz started worker threadz trying nonsensical thread idéÿÿÿÿz, waiting for worker thread to get startedz" verifying worker hasn't exitedz2 attempting to raise asynch exception in workerz5 waiting for worker to say it caught the exception©Útimeoutz all OK -- joining worker)rZ pythonapiZPyThreadState_SetAsyncExcZc_ulongZ py_objectÚargtypesÚ ExceptionrrmrhÚintZ assertGreaterZfailrFÚUnboundLocalErrorrSrÚdaemonrBrr'rWrirIrornr Ú SHORT_TIMEOUTrH) rrkZ set_async_excÚ exceptionrYÚresultrrr7ÚretrrprÚtest_PyThreadState_SetAsyncExc¼sb               z*ThreadTests.test_PyThreadState_SetAsyncExccCs^dd„}tj}|t_z.fail_new_threadcSsdSrrrrrrÚr9z0ThreadTests.test_limbo_cleanup..©Útargetz:Failed to cleanup _limbo map on failure of Thread.start().)rÚ_start_new_threadrÚ assertRaisesr€rBrIÚ_limbo)rrƒr‡r7rrrÚtest_limbo_cleanupsþzThreadTests.test_limbo_cleanupcCs(tdƒtddƒ\}}}| |d¡dS)Nrkú-caNif 1: import ctypes, sys, time, _thread # This lock is used as a simple event variable. ready = _thread.allocate_lock() ready.acquire() # Module globals are cleared before __del__ is run # So we save the functions in class dict class C: ensure = ctypes.pythonapi.PyGILState_Ensure release = ctypes.pythonapi.PyGILState_Release def __del__(self): state = self.ensure() self.release(state) def waitingThread(): x = C() ready.release() time.sleep(100) _thread.start_new_thread(waitingThread, ()) ready.acquire() # Be sure the other thread is waiting. sys.exit(42) é*)rrrF©rÚrcÚoutÚerrrrrÚtest_finalize_running_thread#sz(ThreadTests.test_finalize_running_threadcCstddƒdS)Nr‹aPif 1: import sys, threading # A deadlock-killer, to prevent the # testsuite to hang forever def killer(): import os, time time.sleep(2) print('program blocked; aborting') os._exit(2) t = threading.Thread(target=killer) t.daemon = True t.start() # This is the trace function def func(frame, event, arg): threading.current_thread() return func sys.settrace(func) )rr rrrÚtest_finalize_with_traceDsz$ThreadTests.test_finalize_with_tracecCs0tddƒ\}}}| | ¡d¡| |d¡dS)Nr‹a§if 1: import threading from time import sleep def child(): sleep(1) # As a non-daemon thread we SHOULD wake up and nothing # should be torn down yet print("Woke up, sleep function is:", sleep) threading.Thread(target=child).start() raise SystemExit s5Woke up, sleep function is: r9)rrFÚstriprrrrÚtest_join_nondaemon_on_shutdown]s  ÿz+ThreadTests.test_join_nondaemon_on_shutdownc Csˆtj}t ¡}zhtddƒD]N}t |d¡tjdd„d}| ¡| ¡|ƒ}|  ||d||f¡qWt |¡n t |¡0dS)Nrédg-Cëâ6*?cSsdSrrrrrrr„yr9z7ThreadTests.test_enumerate_after_join..r…z� triggered after %d trials: %s) rÚ enumerateÚsysÚgetswitchintervalr<ÚsetswitchintervalrrBrHrE)rÚenumÚ old_intervalrNr7ÚlrrrÚtest_enumerate_after_joinqs ÿz%ThreadTests.test_enumerate_after_joincCsŒGdd„dtƒ}|dd}t |¡}|j ¡~|j|ƒdt |ƒ¡d|dd}t |¡}|j ¡~|j|ƒdt |ƒ¡ddS)Nc@seZdZdd„Zdd„ZdS)zDThreadTests.test_no_refcycle_through_target..RunSelfFunctioncSs.||_tj|j|fd|id|_|j ¡dS)NÚ yet_another)r†r‚Úkwargs)Ú should_raiserrÚ_runÚthreadrB)rr rrrr„s þzMThreadTests.test_no_refcycle_through_target..RunSelfFunction.__init__cSs|jr t‚dSr)r Ú SystemExit)rZ other_refržrrrr¡szIThreadTests.test_no_refcycle_through_target..RunSelfFunction._runN)rrrrr¡rrrrÚRunSelfFunctionƒs r¤F)r z%d references still around)ÚmsgT)ÚobjectÚweakrefÚrefr¢rHr>r—Ú getrefcount)rr¤Z cyclic_objectZweak_cyclic_objectZraising_cyclic_objectZweak_raising_cyclic_objectrrrÚtest_no_refcycle_through_target‚s&    ÿÿ    ÿÿz+ThreadTests.test_no_refcycle_through_targetcCsHt ¡}| ¡| d¡| ¡| d¡t ¡}| ¡t ¡dS)NTr) rrÚisDaemonÚ setDaemonÚgetNameÚsetNamerSÚisSetÚ activeCount)rr7ÚerrrÚtest_old_threading_api¡s  z"ThreadTests.test_old_threading_apicCs2t ¡}| dt|ƒ¡d|_| dt|ƒ¡dS©NrzT)rrrErArzrg©rr7rrrÚtest_repr_daemon­szThreadTests.test_repr_daemoncCsHt ¡}| |j¡tjdd}| |j¡tjdd}| |j¡dS)NF©rzT)rrrIrzrir´rrrÚtest_daemon_param³s     zThreadTests.test_daemon_paramÚforkúneeds os.fork()cCs:t d¡}td|ƒ\}}}| |d¡| | ¡d¡dS)Na“ import atexit import os import sys from test.support import wait_process # Import the threading module to register its "at fork" callback import threading def exit_handler(): pid = os.fork() if not pid: print("child process ok", file=sys.stderr, flush=True) # child process sys.exit() else: wait_process(pid, exitcode=0) # exit_handler() will be called after threading._shutdown() atexit.register(exit_handler) r‹r9schild process ok)ÚtextwrapÚdedentrrFÚrstrip©rÚcodeÚ_rrrrrÚtest_fork_at_exit»s  zThreadTests.test_fork_at_exitztest needs fork()cCs0d}td|ƒ\}}}| |d¡| |d¡dS)NaŠif 1: import _thread, threading, os, time def background_thread(evt): # Creates and registers the _DummyThread instance threading.current_thread() evt.set() time.sleep(10) evt = threading.Event() _thread.start_new_thread(background_thread, (evt,)) evt.wait() assert threading.active_count() == 2, threading.active_count() if os.fork() == 0: assert threading.active_count() == 1, threading.active_count() os._exit(0) else: os.wait() r‹r9©rrFr½rrrÚtest_dummy_thread_after_forkØs z(ThreadTests.test_dummy_thread_after_forkcCsŠt ¡}| tj|¡tj d¡tdƒD]Z}tjdd„d}|  ¡t   ¡}|dkrnt   |  ¡rfdnd¡q*| ¡tj|dd q*dS) Ngíµ ÷ư>écSsdSrrrrrrr„þr9z6ThreadTests.test_is_alive_after_fork..r…ré r3©Úexitcode)r—r˜Ú addCleanupr™r-r r<rrrBÚosr¸Ú_exitrJrHÚ wait_process)rr›rNr7ÚpidrrrÚtest_is_alive_after_forkós  z$ThreadTests.test_is_alive_after_forkcsht ¡}ˆ |jd¡ˆ |jt ¡j¡ˆ |jt ¡¡‡fdd„}tj|d}| ¡|  ¡dS)NÚ MainThreadcsˆ t ¡jt ¡j¡dSr)rKrÚ main_threadr?rbrr rrrRs ÿz'ThreadTests.test_main_thread..fr…) rrÎrFrr?rbrmrrBrH)rÚmainrRÚthrr rÚtest_main_threads  zThreadTests.test_main_threadztest needs os.fork()Úwaitpidztest needs os.waitpid()cCs@d}td|ƒ\}}}| ¡ dd¡}| |d¡| |d¡dS)Na£if 1: import os, threading from test import support pid = os.fork() if pid == 0: main = threading.main_thread() print(main.name) print(main.ident == threading.current_thread().ident) print(main.ident == threading.get_ident()) else: support.wait_process(pid, exitcode=0) r‹ú Úr9zMainThread True True ©rÚdecodeÚreplacerF©rr¾r¿rrÚdatarrrÚtest_main_thread_after_forks   z'ThreadTests.test_main_thread_after_forkúdue to known OS bugcCs@d}td|ƒ\}}}| ¡ dd¡}| |d¡| |d¡dS)NaÔif 1: import os, threading, sys from test import support def f(): pid = os.fork() if pid == 0: main = threading.main_thread() print(main.name) print(main.ident == threading.current_thread().ident) print(main.ident == threading.get_ident()) # stdout is fully buffered because not a tty, # we have to flush before exit. sys.stdout.flush() else: support.wait_process(pid, exitcode=0) th = threading.Thread(target=f) th.start() th.join() r‹rÓrÔr9zThread-1 True True rÕrØrrrÚ/test_main_thread_after_fork_from_nonmain_thread*s  z;ThreadTests.test_main_thread_after_fork_from_nonmain_threadcCsBd}td|ƒ\}}}| ¡}| |d¡| | ¡dgd¡dS)Na€if 1: import gc, threading main_thread = threading.current_thread() assert main_thread is threading.main_thread() # sanity check class RefCycle: def __init__(self): self.cycle = self def __del__(self): print("GC:", threading.current_thread() is main_thread, threading.main_thread() is main_thread, threading.enumerate() == [main_thread]) RefCycle() gc.collect() # sanity check x = RefCycle() r‹r9zGC: True True Trueé)rrÖrFÚ splitlinesrØrrrÚ test_main_thread_during_shutdownHs  ÿz,ThreadTests.test_main_thread_during_shutdowncCs$d}td|ƒ\}}}| |d¡dS)Na¤if 1: import os import threading import time import random def random_sleep(): seconds = random.random() * 0.010 time.sleep(seconds) class Sleeper: def __del__(self): random_sleep() tls = threading.local() def f(): # Sleep a bit so that the thread is still running when # Py_Finalize() is called. random_sleep() tls.x = Sleeper() random_sleep() threading.Thread(target=f).start() random_sleep() r‹r9rÁ)rr¾rŽrrrrrÚtest_finalization_shutdownesz&ThreadTests.test_finalization_shutdowncsÚt ¡‰t ¡‰ˆ ¡ˆ ¡‡‡fdd„}tj|d}| |jd¡| ¡ˆ ¡| |  ¡¡|j}|  |jddd¡ˆ  ¡| |jt j dd¡| |  ¡¡|  ¡|  |  ¡¡| |j¡| ¡dS)Ncsˆ ¡ˆ ¡t d¡dS)Nç{®Gáz„?)rcrfr(r)r©ZfinishÚstartedrrrRŽsz'ThreadTests.test_tstate_lock..fr…rrtF)rUÚ allocate_lockrfrrÚassertIsÚ _tstate_lockrBrirJrIrcr r{r>rH)rrRr7Ú tstate_lockrrârÚtest_tstate_lockˆs&  zThreadTests.test_tstate_lockcsªt ¡‰t ¡‰ˆ ¡ˆ ¡‡‡fdd„}tj|d}| ¡ˆ ¡| dt|ƒ¡ˆ ¡d}t dƒD]}|t|ƒvr‚qŽt   d¡qn| |t|ƒ¡|  ¡dS)Ncsˆ ¡ˆ ¡dSr)rcrfrrârrrR±sz(ThreadTests.test_repr_stopped..fr…rãÚstoppediôrá) rUrärfrrrBrgrArcr<r(r)rH)rrRr7Z LOOKING_FORrNrrârÚtest_repr_stopped«s"    zThreadTests.test_repr_stoppedcs tddƒD]}t |¡‰‡fdd„t|ƒDƒ}|D] }| ¡q2|D] }| ¡qD‡fdd„t|ƒDƒ}|D] }| ¡ql|D] }| ¡q~| tˆj¡q dS)Nrr3csg|]}tjˆjd‘qS©r…)rrrf©r6r¿©ÚbsrrÚ Êsÿz;ThreadTests.test_BoundedSemaphore_limit..csg|]}tjˆjd‘qSrë)rrrcrìrírrrïÐsÿ)r<rr:rBrHrˆÚ ValueErrorrc)rÚlimitrMr7rrírÚtest_BoundedSemaphore_limitÆs"  ÿ   ÿ  z'ThreadTests.test_BoundedSemaphore_limitc sЇfdd„‰dd„‰‡‡fdd„‰dˆ_t ¡}t ˆ¡z>t ˆ¡ddl}| ˆ¡tdƒD] }ˆƒq`Wt |¡n t |¡0dS) NcsˆSrr)ÚframeÚeventÚarg)Ú noop_tracerrröàsz9ThreadTests.test_frame_tstate_tracing..noop_tracecss dVqdS)NÚ generatorrrrrrr÷äsz8ThreadTests.test_frame_tstate_tracing..generatorcsˆjdurˆƒˆ_tˆjƒSr)ÚgenÚnextr)Úcallbackr÷rrrúès z7ThreadTests.test_frame_tstate_tracing..callbackrr$)rør—ÚgettraceÚsettracerÚ _testcapiZcall_in_temporary_c_threadr<)rZ old_tracerýr-r)rúr÷rörÚtest_frame_tstate_tracingØs      z%ThreadTests.test_frame_tstate_tracingc Cs dD]–}|j|dvt ¡}tj|j|d}| ¡|j}|sP| |tj¡n|  |tj¡|  ¡|  ¡|  |tj¡Wdƒq1s0YqdS)N)FTr¶)r†rz) ZsubTestrrSrrWrBrærgÚ_shutdown_locksrErDrH)rrzrôr¢rçrrrÚtest_shutdown_locksszThreadTests.test_shutdown_lockscCs$tddƒ\}}}| | ¡d¡dS)Nr‹a(if 1: import threading class Atexit: def __del__(self): print("thread_dict.atexit = %r" % thread_dict.atexit) thread_dict = threading.local() thread_dict.atexit = "value" atexit = Atexit() sthread_dict.atexit = 'value')rrFr¼rrrrÚtest_locals_at_exits zThreadTests.test_locals_at_exitcCsDdd„}t ¡ tj|d ¡Wdƒn1s60YdS)NcSsdSrrrrrrÚnoop,r9z0ThreadTests.test_leak_without_join..noopr…)r rTrrrB)rrrrrÚtest_leak_without_join)s z"ThreadTests.test_leak_without_joinN)*rrrrOrZr`rarjrrŠr‘r’r”rrªr²rµr·r_Ú skipUnlessrCrÈrÀrÂrÌrÑrÚÚskipIfr—ÚplatformÚplatforms_to_skiprÜrßràrèrêròrrþrrrrrrrr2VsP$  X!      ## ' r2c@sÔeZdZdd„Zdd„Ze eedƒd¡e  e j e vd¡dd „ƒƒZ e eedƒd¡e  e j e vd¡d d „ƒƒZe  e j e vd¡d d „ƒZe eedƒd¡e  e j e vd¡dd„ƒƒZe eedƒd¡dd„ƒZdS)ÚThreadJoinOnShutdowncCs8d|}td|ƒ\}}}| ¡ dd¡}| |d¡dS)Na…if 1: import sys, os, time, threading # a thread, which waits for the main program to terminate def joiningfunc(mainthread): mainthread.join() print('end of thread') # stdout is fully buffered because not a tty, we have to flush # before exit. sys.stdout.flush() r‹rÓrÔzend of main end of thread rÕ)rÚscriptrŽrrrÙrrrÚ _run_and_join4s  ö z"ThreadJoinOnShutdown._run_and_joincCsd}| |¡dS)Nzõif 1: import os t = threading.Thread(target=joiningfunc, args=(threading.current_thread(),)) t.start() time.sleep(0.1) print('end of main') ©r ©rr rrrÚtest_1_join_on_shutdownEsz,ThreadJoinOnShutdown.test_1_join_on_shutdownr¸r¹rÛcCsd}| |¡dS)Na½if 1: from test import support childpid = os.fork() if childpid != 0: # parent process support.wait_process(childpid, exitcode=0) sys.exit(0) # child process t = threading.Thread(target=joiningfunc, args=(threading.current_thread(),)) t.start() print('end of main') r r rrrÚtest_2_join_in_forked_processQsz2ThreadJoinOnShutdown.test_2_join_in_forked_processcCsd}| |¡dS)Na¸if 1: from test import support main_thread = threading.current_thread() def worker(): childpid = os.fork() if childpid != 0: # parent process support.wait_process(childpid, exitcode=0) sys.exit(0) # child process t = threading.Thread(target=joiningfunc, args=(main_thread,)) print('end of main') t.start() t.join() # Should not block: main_thread is already stopped w = threading.Thread(target=worker) w.start() r r rrrÚ!test_3_join_in_forked_from_threadfsz6ThreadJoinOnShutdown.test_3_join_in_forked_from_threadcCs"d}td|ƒ\}}}| |¡dS)Nacif True: import os import random import sys import time import threading thread_has_run = set() def random_io(): '''Loop for a while sleeping random tiny amounts and doing some I/O.''' while True: with open(os.__file__, 'rb') as in_f: stuff = in_f.read(200) with open(os.devnull, 'wb') as null_f: null_f.write(stuff) time.sleep(random.random() / 1995) thread_has_run.add(threading.current_thread()) def main(): count = 0 for _ in range(40): new_thread = threading.Thread(target=random_io) new_thread.daemon = True new_thread.start() count += 1 while len(thread_has_run) < count: time.sleep(0.001) # Trigger process shutdown sys.exit(0) main() r‹©rrI©rr rŽrrrrrÚtest_4_daemon_threadsƒs!z*ThreadJoinOnShutdown.test_4_daemon_threadscCsNdd„}g}tdƒD]"}tj|d}| |¡| ¡q|D] }| ¡q.do_fork_and_waitér…)r<rrr=rBrH)rrrMrNr7rrrÚtest_reinit_tls_after_fork¬s     z/ThreadJoinOnShutdown.test_reinit_tls_after_forkcCsg}tdƒD]&}tjdd„d}| |¡| ¡q t ¡}|dkrltt  ¡ƒdkr`t  d¡qzt  d¡nt j |dd |D] }|  ¡q~dS) NrcSs t d¡S)Ng333333Ó?)r(r)rrrrr„Ër9zKThreadJoinOnShutdown.test_clear_threads_states_after_fork..r…rré3é4rÅ)r<rrr=rBrÈr¸rGr—Ú_current_framesrÉr rÊrH)rrMrNr7rËrrrÚ$test_clear_threads_states_after_forkÄs     z9ThreadJoinOnShutdown.test_clear_threads_states_after_forkN)rrrr r r_rrCrÈrr—rrrrrrrrrrrr2s    ( rc@s0eZdZdd„Zdd„Zdd„Zedd„ƒZd S) ÚSubinterpThreadingTestscCsFt ¡\}}| tj|¡| tj|¡ttdƒr>t |d¡||fS)NÚ set_blockingF)rÈÚpiperÇÚcloserCr)rÚrÚwrrrrÞs    zSubinterpThreadingTests.pipecCsL| ¡\}}t d|f¡}tj |¡}| |d¡| t |d¡d¡dS)Naþ import os import random import threading import time def random_sleep(): seconds = random.random() * 0.010 time.sleep(seconds) def f(): # Sleep a bit so that the thread is still running when # Py_EndInterpreter is called. random_sleep() os.write(%d, b"x") threading.Thread(target=f).start() random_sleep() rróx© rrºr»r-r Zrun_in_subinterprFrÈÚread©rrr r¾r~rrrÚtest_threads_joinæs î  z)SubinterpThreadingTests.test_threads_joincCsL| ¡\}}t d|f¡}tj |¡}| |d¡| t |d¡d¡dS)Na§ import os import random import threading import time def random_sleep(): seconds = random.random() * 0.010 time.sleep(seconds) class Sleeper: def __del__(self): random_sleep() tls = threading.local() def f(): # Sleep a bit so that the thread is still running when # Py_EndInterpreter is called. random_sleep() tls.x = Sleeper() os.write(%d, b"x") threading.Thread(target=f).start() random_sleep() rrr!r"r$rrrÚtest_threads_join_2s ç  z+SubinterpThreadingTests.test_threads_join_2cCshdtjj›d}d|f}tj ¡ td|ƒ\}}}Wdƒn1sJ0Y| d| ¡¡dS)Nzõif 1: import os import threading import time def f(): # Make sure the daemon thread is still running when # Py_EndInterpreter is called. time.sleep(zJ) threading.Thread(target=f, daemon=True).start() z[if 1: import _testcapi _testcapi.run_in_subinterp(%r) r‹z:Fatal Python error: Py_EndInterpreter: not the last thread)r-r r{ZSuppressCrashReportrrgrÖ)rZsubinterp_coder rŽrrrrrÚtest_daemon_threads_fatal_error'sø ü .ÿz7SubinterpThreadingTests.test_daemon_threads_fatal_errorN)rrrrr%r&rr'rrrrrÝs %rc@sdeZdZdd„Zdd„Zdd„Zdd„Zd d „Zd d „Zd d„Z dd„Z dd„Z dd„Z dd„Z dS)ÚThreadingExceptionTestscCs*t ¡}| ¡| t|j¡| ¡dSr)rrrBrˆÚ RuntimeErrorrH©rr¢rrrÚtest_start_thread_againBsz/ThreadingExceptionTests.test_start_thread_againcCst ¡}| t|j¡dSr)rrbrˆr)rH)rrbrrrÚtest_joining_current_threadHsz3ThreadingExceptionTests.test_joining_current_threadcCst ¡}| t|j¡dSr)rrrˆr)rHr*rrrÚtest_joining_inactive_threadLsz4ThreadingExceptionTests.test_joining_inactive_threadcCs.t ¡}| ¡| tt|dd¡| ¡dSr³)rrrBrˆr)ÚsetattrrHr*rrrÚtest_daemonize_active_threadPsz4ThreadingExceptionTests.test_daemonize_active_threadcCst ¡}| t|j¡dSr)rrerˆr)rc)rÚlockrrrÚtest_releasing_unacquired_lockVsz6ThreadingExceptionTests.test_releasing_unacquired_lockcCshd}d}tjtjd|gtjtjd}| ¡\}}| ¡ dd¡}| |j dd| ¡¡| ||¡dS) Naif True: import threading def recurse(): return recurse() def outer(): try: recurse() except RecursionError: pass w = threading.Thread(target=outer) w.start() w.join() print('end of main thread') zend of main thread r‹)ÚstdoutÚstderrrÓrÔrzUnexpected error: ) Ú subprocessÚPopenr—Ú executableÚPIPEÚ communicaterÖr×rFÚ returncode)rr Zexpected_outputÚpr2r3rÙrrrÚtest_recursion_limitZsÿ z,ThreadingExceptionTests.test_recursion_limitcCs\d}td|ƒ\}}}| |d¡| ¡}| d|¡| d|¡| d|¡| d|¡dS)NaÈif True: import threading import time running = False def run(): global running running = True while running: time.sleep(0.01) 1/0 t = threading.Thread(target=run) t.start() while not running: time.sleep(0.01) running = False t.join() r‹r9úException in threadú"Traceback (most recent call last):ÚZeroDivisionErrorúUnhandled exception©rrFrÖrgrErrrrÚtest_print_exceptionys    z,ThreadingExceptionTests.test_print_exceptioncCs\d}td|ƒ\}}}| |d¡| ¡}| d|¡| d|¡| d|¡| d|¡dS)Naýif True: import sys import threading import time running = False def run(): global running running = True while running: time.sleep(0.01) 1/0 t = threading.Thread(target=run) t.start() while not running: time.sleep(0.01) sys.stderr = None running = False t.join() r‹r9r<r=r>r?r@rrrrÚ%test_print_exception_stderr_is_none_1”s    z=ThreadingExceptionTests.test_print_exception_stderr_is_none_1cCs4d}td|ƒ\}}}| |d¡| d| ¡¡dS)Naýif True: import sys import threading import time running = False def run(): global running running = True while running: time.sleep(0.01) 1/0 sys.stderr = None t = threading.Thread(target=run) t.start() while not running: time.sleep(0.01) running = False t.join() r‹r9r?)rrFrErÖrrrrÚ%test_print_exception_stderr_is_none_2±s z=ThreadingExceptionTests.test_print_exception_stderr_is_none_2csXdd„‰G‡fdd„dtjƒ}|ƒ}| ¡| ¡| |j¡| |jt¡d|_dS)NcSs‚dSrrrrrrÚ bare_raiseËszOThreadingExceptionTests.test_bare_raise_in_brand_new_thread..bare_raisecseZdZdZ‡fdd„ZdS)zOThreadingExceptionTests.test_bare_raise_in_brand_new_thread..Issue27558Nc s:z ˆƒWn*ty4}z||_WYd}~n d}~00dSr)rwÚexc)rrE©rDrrr+Ñs zSThreadingExceptionTests.test_bare_raise_in_brand_new_thread..Issue27558.run)rrrrEr+rrFrrÚ Issue27558ÎsrG)rrrBrHrLrErhr))rrGr¢rrFrÚ#test_bare_raise_in_brand_new_threadÊs  z;ThreadingExceptionTests.test_bare_raise_in_brand_new_threadcsLdd„‰| ttjj¡‡fdd„tdƒDƒ}|D]}| ¡| ¡q2dS)NcSsHttjjddd"}| d¡t ¡Wdƒn1s:0YdS)Nr zutf-8)Úencodingú )Úopenr-r ÚTESTFNÚwriteÚ tracebackÚ format_stack)ÚfprrrÚ modify_fileás zQThreadingExceptionTests.test_multithread_modify_file_noerror..modify_filecsg|]}tjˆd‘qSrë)rr)r6rN©rQrrrïçsÿzPThreadingExceptionTests.test_multithread_modify_file_noerror..r•)rÇrr-r rLr<rBrH)rrMr7rrRrÚ$test_multithread_modify_file_noerrorßs þz.ThreadExitcSst d¡dSr)r—Úexitr rrrr+sz8ExceptHookTests.test_system_exit..ThreadExit.runNrqrrrrÚ ThreadExitsrcr3rÔ)rrr r[rBrHrFr\)rrcr3r¢rrrÚtest_system_exits  &z ExceptHookTests.test_system_exitcs¨d‰‡fdd„}zŒt td|¡&tƒ}| ¡| ¡Wdƒn1sL0Y| ˆjt¡| t ˆj ƒd¡| ˆj ˆj j ¡|  ˆj|¡Wd‰nd‰0dS)Ncs|‰dSrr)Z hook_argsrrrÚhook'sz4ExceptHookTests.test_custom_excepthook..hookr`rU)r Ú swap_attrrrTrBrHrFÚexc_typerðÚstrÚ exc_valueÚ exc_tracebackÚ __traceback__rår¢)rrer¢rrrÚtest_custom_excepthook$s &z&ExceptHookTests.test_custom_excepthookc sÔdd„}d‰‡fdd„}t td|¡~t td|¡Pt d¡&}tƒ}| ¡| ¡Wdƒn1sn0YWdƒn1sŒ0YWdƒn1sª0Y| |  ¡d¡| ˆd¡dS) NcSs tdƒ‚dS)Núthreading_hook failedrVrrrrÚthreading_hook:szCExceptHookTests.test_custom_excepthook_fail..threading_hookcs t|ƒ‰dSr)rh)rgrirj©Zerr_strrrÚsys_hook?sz=ExceptHookTests.test_custom_excepthook_fail..sys_hookr`r3z#Exception in threading.excepthook: rm) r rfrr—r[rTrBrHrFr\)rrnrpr3r¢rrorÚtest_custom_excepthook_fail9s   ÿþb ÿz+ExceptHookTests.test_custom_excepthook_failN) rrrr]r rrardrlrqrrrrrWõs    rWc@s$eZdZdd„Zdd„Zdd„ZdS)Ú TimerTestscCst |¡g|_t ¡|_dSr)r,r/Ú callback_argsrrSÚcallback_eventr rrrr/Qs zTimerTests.setUpcCs t d|j¡}| ¡|j ¡|j d¡d|jd<|j  ¡t d|j¡}| ¡|j ¡|  t |j ƒd¡|  |j difdifg¡|  ¡|  ¡dS)NráZblahZbarZfoorÝr)rÚTimerÚ _callback_spyrBrtrWr‚r=rŸÚclearrFrGrsrH)rZtimer1Ztimer2rrrÚ test_init_immutable_default_argsVs     z+TimerTests.test_init_immutable_default_argscOs*|j |dd…| ¡f¡|j ¡dSr)rsr=ÚcopyrtrD)rr‚rŸrrrrvgszTimerTests._callback_spyN)rrrr/rxrvrrrrrrOsrrc@seZdZeejƒZdS)Ú LockTestsN)rrrÚ staticmethodrreÚlocktyperrrrrzksrzc@seZdZeejƒZdS)Ú PyRLockTestsN)rrrr{rÚ_PyRLockr|rrrrr}nsr}zRLock not implemented in Cc@seZdZeejƒZdS)Ú CRLockTestsN)rrrr{rÚ_CRLockr|rrrrrqsrc@seZdZeejƒZdS)Ú EventTestsN)rrrr{rrSZ eventtyperrrrrusrc@seZdZeejƒZdS)ÚConditionAsRLockTestsN)rrrr{rÚ Conditionr|rrrrr‚xsr‚c@seZdZeejƒZdS)ÚConditionTestsN)rrrr{rrƒZcondtyperrrrr„|sr„c@seZdZeejƒZdS)ÚSemaphoreTestsN)rrrr{rÚ SemaphoreÚsemtyperrrrr…sr…c@seZdZeejƒZdS)ÚBoundedSemaphoreTestsN)rrrr{rr:r‡rrrrrˆ‚srˆc@seZdZeejƒZdS)Ú BarrierTestsN)rrrr{rÚBarrierZ barriertyperrrrr‰…sr‰c@seZdZdd„ZdS)Ú MiscTestCasecCs&dh}ddh}tj|td||ddS)Nr€rPr°)rrU)ÚextraÚ blacklist)r Z check__all__r)rrŒrrrrÚ test__all__Šs  ÿzMiscTestCase.test__all__N)rrrrŽrrrrr‹‰sr‹c@s$eZdZdd„Zdd„Zdd„ZdS)ÚInterruptMainTestscCsZdd„}tj|d}| t¡ | ¡| ¡Wdƒn1sD0Y| ¡dS)NcSs t ¡dSr)rUÚinterrupt_mainrrrrÚcall_interrupt•szHInterruptMainTests.test_interrupt_main_subthread..call_interruptr…)rrrˆÚKeyboardInterruptrBrH)rr‘r7rrrÚtest_interrupt_main_subthread’s   &z0InterruptMainTests.test_interrupt_main_subthreadcCs6| t¡t ¡Wdƒn1s(0YdSr)rˆr’rUrr rrrÚtest_interrupt_main_mainthreads z1InterruptMainTests.test_interrupt_main_mainthreadc Csdt tj¡}zBt tjtj¡t ¡t tjtj¡t ¡Wt tj|¡nt tj|¡0dSr)ÚsignalÚ getsignalÚSIGINTÚSIG_IGNrUrÚSIG_DFL)rÚhandlerrrrÚtest_interrupt_main_noerror£s  z.InterruptMainTests.test_interrupt_main_noerrorN)rrrr“r”r›rrrrr‘s rc@s$eZdZdd„Zdd„Zdd„ZdS)Ú AtexitTestscCs.tddƒ\}}}| |¡| | ¡d¡dS)Nr‹zif True: import threading def run_last(): print('parrot') threading._register_atexit(run_last) sparrot)rrIrFr“rrrrÚtest_atexit_output³s zAtexitTests.test_atexit_outputcCstddƒ\}}}| |¡dS)Nr‹aNif True: import threading from unittest.mock import Mock mock = Mock() threading._register_atexit(mock) mock.assert_not_called() # force early shutdown to ensure it was called once threading._shutdown() mock.assert_called_once() rrrrrÚtest_atexit_called_onceÀs z#AtexitTests.test_atexit_called_oncecCs.tddƒ\}}}| |¡| d| ¡¡dS)Nr‹zÜif True: import threading def func(): pass def run_last(): threading._register_atexit(func) threading._register_atexit(run_last) z2RuntimeError: can't register atexit after shutdown)rrirgrÖrrrrÚtest_atexit_after_shutdownÏs  ÿz&AtexitTests.test_atexit_after_shutdownN)rrrrržrŸrrrrrœ±s rœÚ__main__)7Ú__doc__Z test.supportr-rrrrZtest.support.script_helperrrr&r—rUrr(r_r§rÈr4r•rºrNrr rr¦r rrZTestCaser,r2rrr(rTrWrrrzZ RLockTestsr}rr€rrr‚r„r…rˆr‰r‹rrœrrÏrrrrÚsd   ! a,b2Z 2