a 5êdg`¾ã@súddlZddlZddlZddlZddlZddlZddlZddlZddlZddl Z ddl Z ddl m Z ddl mZmZz ddlZWney–dZYn0Gdd„de jƒZe  ejdkd¡Gdd „d e jƒƒZe  ejdkd ¡Gd d „d e jƒƒZGd d„de jƒZe  ejdkd¡Gdd„de jƒƒZe  eedƒd¡Gdd„de jƒƒZe  ejdkd¡Gdd„de jƒƒZe  ejdkd¡Gdd„de jƒƒZGdd„de jƒZGdd„de jƒZ Gdd„de jƒZ!Gdd „d e jƒZ"d!d"„Z#e$d#kröe  %¡dS)$éN)Úsupport)Úassert_python_okÚ spawn_pythonc@seZdZdd„ZdS)Ú GenericTestscCs–ttƒD]ˆ}tt|ƒ}|dvr.| |tj¡q|dvrF| |tj¡q| d¡rj| d¡sj| |tj¡q| d¡r| |tj¡| t j d¡qdS)N>ÚSIG_DFLÚSIG_IGN>Ú SIG_SETMASKÚ SIG_UNBLOCKÚ SIG_BLOCKZSIGZSIG_ZCTRL_Úwin32) ÚdirÚsignalÚgetattrÚassertIsInstanceÚHandlersÚSigmasksÚ startswithÚSignalsÚ assertEqualÚsysÚplatform)ÚselfÚnameÚsig©rú,/usr/local/lib/python3.9/test/test_signal.pyÚ test_enumss   zGenericTests.test_enumsN)Ú__name__Ú __module__Ú __qualname__rrrrrrsrr zNot valid on Windowsc@sZeZdZdd„Zdd„Zdd„Zdd„Zd d „Zd d „Zd d„Z e   e j d¡dd„ƒZdS)Ú PosixTestscGsdS©Nr©rÚargsrrrÚtrivial_signal_handler&sz!PosixTests.trivial_signal_handlercCs8| ttjd¡| ttjd|j¡| ttjd¡dS)Ni’)Ú assertRaisesÚ ValueErrorr Ú getsignalr$Ú strsignal©rrrrÚ,test_out_of_range_signal_number_raises_error)s  ÿz7PosixTests.test_out_of_range_signal_number_raises_errorcCs| ttjtjd¡dSr!)r%Ú TypeErrorr ÚSIGUSR1r)rrrÚ0test_setting_signal_handler_to_none_raises_error1s ÿz;PosixTests.test_setting_signal_handler_to_none_raises_errorcCsZt tj|j¡}| |tj¡| t tj¡|j¡t tj|¡| t tj¡|¡dSr!)r ÚSIGHUPr$rrrr')rZhuprrrÚtest_getsignal5sÿzPosixTests.test_getsignalcCs@| dt tj¡¡| dt tj¡¡| dt tj¡¡dS)NZ InterruptZ TerminatedZHangup)ÚassertInr r(ÚSIGINTÚSIGTERMr.r)rrrÚtest_strsignal=szPosixTests.test_strsignalcCs&tj t¡}tj |d¡}t|ƒdS)Nzsignalinterproctester.py)ÚosÚpathÚdirnameÚ__file__Újoinr)rr6ZscriptrrrÚtest_interprocess_signalCs z#PosixTests.test_interprocess_signalcCsdt ¡}| |t¡| tjj|¡| tjj|¡| d|¡| tj |¡|  t |ƒtj ¡dS©Nr) r Ú valid_signalsrÚsetr0rr1ÚSIGALRMÚ assertNotInÚNSIGÚ assertLessÚlen©rÚsrrrÚtest_valid_signalsHs  zPosixTests.test_valid_signalsúsys.executable required.cCs<tjtjddgtjd}| d|j¡| |jt j ¡dS)z+KeyboardInterrupt triggers exit via SIGINT.ú-czaimport os, signal, time os.kill(os.getpid(), signal.SIGINT) for _ in range(999): time.sleep(0.01)©ÚstderróKeyboardInterruptN) Ú subprocessÚrunrÚ executableÚPIPEr0rHrÚ returncoder r1)rÚprocessrrrÚ!test_keyboard_interrupt_exit_codeQsÿûz,PosixTests.test_keyboard_interrupt_exit_codeN)rrrr$r*r-r/r3r9rDÚunittestÚ skipUnlessrrLrPrrrrr $s r zWindows specificc@s2eZdZdd„Zdd„Ze ejd¡dd„ƒZ dS) ÚWindowsSignalTestscCsdt ¡}| |t¡| t|ƒd¡| tjj|¡|  d|¡|  tj |¡|  t|ƒtj ¡dS)Nér) r r;rr<ZassertGreaterEqualrAr0rr1r>r?r@rBrrrrDes  z%WindowsSignalTests.test_valid_signalscCsÚdd„}tƒ}tjtjtjtjtjtjtjfD]0}t  |¡dur.t |t ||¡¡|  |¡q.|  |¡|  t ¡t d|¡Wdƒn1s–0Y|  t ¡t d|¡Wdƒn1sÌ0YdS)NcSsdSr!r)ÚxÚyrrrÚpóz3WindowsSignalTests.test_issue9324..éÿÿÿÿé)r<r ÚSIGABRTZSIGBREAKÚSIGFPEÚSIGILLr1ÚSIGSEGVr2r'ÚaddÚ assertTruer%r&)rÚhandlerÚcheckedrrrrÚtest_issue9324ns  þ   * z!WindowsSignalTests.test_issue9324rEcCs<tjtjddgtjd}| d|j¡d}| |j|¡dS)z?KeyboardInterrupt triggers an exit using STATUS_CONTROL_C_EXIT.rFzraise KeyboardInterruptrGrIl:N) rJrKrrLrMr0rHrrN)rrOZSTATUS_CONTROL_C_EXITrrrrPƒs þz4WindowsSignalTests.test_keyboard_interrupt_exit_codeN) rrrrDrcrQrRrrLrPrrrrrSbs  rSc@sNeZdZdd„Zdd„Zdd„Zdd„Zd d „Ze  e j d kd ¡d d„ƒZ dS)Ú WakeupFDTestscCst| t¡tjtjdWdƒn1s.0Y| t¡t tjd¡Wdƒn1sf0YdS)N)ÚsignumF)r%r+r Ú set_wakeup_fdr1r)rrrÚtest_invalid_call”s , zWakeupFDTests.test_invalid_callcCs t ¡}| ttftj|¡dSr!)rZ make_bad_fdr%r&ÚOSErrorr rf)rÚfdrrrÚtest_invalid_fds ÿzWakeupFDTests.test_invalid_fdcCs0t ¡}| ¡}| ¡| ttftj|¡dSr!)ÚsocketÚfilenoÚcloser%r&rhr rf)rÚsockrirrrÚtest_invalid_socket¢s  ÿz!WakeupFDTests.test_invalid_socketcCs¶t ¡\}}| tj|¡| tj|¡t ¡\}}| tj|¡| tj|¡ttdƒrrt |d¡t |d¡t |¡| t |¡|¡| t d¡|¡| t d¡d¡dS)NÚ set_blockingFrY) r4ÚpipeÚ addCleanuprmÚhasattrrpr rfr)rZr1Zw1Zr2Zw2rrrÚtest_set_wakeup_fd_result©s      z'WakeupFDTests.test_set_wakeup_fd_resultcCst ¡}| |j¡| d¡| ¡}t ¡}| |j¡| d¡| ¡}t |¡| t |¡|¡| t d¡|¡| t d¡d¡dS)NFrY)rkrrrmÚ setblockingrlr rfr)rZsock1Úfd1Zsock2Úfd2rrrÚ test_set_wakeup_fd_socket_resultºs     z.WakeupFDTests.test_set_wakeup_fd_socket_resultr ztests specific to POSIXcCs¢t ¡\}}| tj|¡| tj|¡t |d¡| t¡}t |¡Wdƒn1s^0Y|  t |j ƒd|¡t |d¡t |¡t d¡dS)NTz&the fd %s must be in non-blocking modeFrY) r4rqrrrmrpr%r&r rfrÚstrÚ exception)rZrfdZwfdÚcmrrrÚtest_set_wakeup_fd_blockingÌs   ( ÿ  z)WakeupFDTests.test_set_wakeup_fd_blockingN) rrrrgrjrortrxrQÚskipIfrrr|rrrrrd’s rdc@steZdZe edud¡ddœdd„ƒZe edud¡dd„ƒZd d „Zd d „Z d d„Z e  e e dƒd¡dd„ƒZdS)ÚWakeupSignalTestsNúneed _testcapiT©ÚorderedcGs&d ttt|ƒƒ||¡}td|ƒdS)Naif 1: import _testcapi import os import signal import struct signals = {!r} def handler(signum, frame): pass def check_signum(signals): data = os.read(read, len(signals)+1) raised = struct.unpack('%uB' % len(data), data) if not {!r}: raised = set(raised) signals = set(signals) if raised != signals: raise Exception("%r != %r" % (raised, signals)) {} signal.signal(signal.SIGALRM, handler) read, write = os.pipe() os.set_blocking(write, False) signal.set_wakeup_fd(write) test() check_signum(signals) os.close(read) os.close(write) rF)ÚformatÚtupleÚmapÚintr)rZ test_bodyrZsignalsÚcoderrrÚ check_wakeupás à"zWakeupSignalTests.check_wakeupc Cs|d}t ¡\}}zFzt |d¡Wnty4Yn 0| d¡Wt |¡t |¡nt |¡t |¡0td|ƒdS)Na&if 1: import _testcapi import errno import os import signal import sys from test.support import captured_stderr def handler(signum, frame): 1/0 signal.signal(signal.SIGALRM, handler) r, w = os.pipe() os.set_blocking(r, False) # Set wakeup_fd a read-only file descriptor to trigger the error signal.set_wakeup_fd(r) try: with captured_stderr() as err: signal.raise_signal(signal.SIGALRM) except ZeroDivisionError: # An ignored exception should have been printed out on stderr err = err.getvalue() if ('Exception ignored when trying to write to the signal wakeup fd' not in err): raise AssertionError(err) if ('OSError: [Errno %d]' % errno.EBADF) not in err: raise AssertionError(err) else: raise AssertionError("ZeroDivisionError not raised") os.close(r) os.close(w) óxz9OS doesn't report write() error on the read end of a piperF)r4rqÚwriterhÚskipTestrmr)rr†ÚrÚwrrrÚtest_wakeup_write_errors"     ÿ  z)WakeupSignalTests.test_wakeup_write_errorcCs| dtj¡dS)Na·def test(): import select import time TIMEOUT_FULL = 10 TIMEOUT_HALF = 5 class InterruptSelect(Exception): pass def handler(signum, frame): raise InterruptSelect signal.signal(signal.SIGALRM, handler) signal.alarm(1) # We attempt to get a signal during the sleep, # before select is called try: select.select([], [], [], TIMEOUT_FULL) except InterruptSelect: pass else: raise Exception("select() was not interrupted") before_time = time.monotonic() select.select([read], [], [], TIMEOUT_FULL) after_time = time.monotonic() dt = after_time - before_time if dt >= TIMEOUT_HALF: raise Exception("%s >= %s" % (dt, TIMEOUT_HALF)) ©r‡r r=r)rrrÚtest_wakeup_fd_early<sáz&WakeupSignalTests.test_wakeup_fd_earlycCs| dtj¡dS)Na`def test(): import select import time TIMEOUT_FULL = 10 TIMEOUT_HALF = 5 class InterruptSelect(Exception): pass def handler(signum, frame): raise InterruptSelect signal.signal(signal.SIGALRM, handler) signal.alarm(1) before_time = time.monotonic() # We attempt to get a signal during the select call try: select.select([read], [], [], TIMEOUT_FULL) except InterruptSelect: pass else: raise Exception("select() was not interrupted") after_time = time.monotonic() dt = after_time - before_time if dt >= TIMEOUT_HALF: raise Exception("%s >= %s" % (dt, TIMEOUT_HALF)) rŽr)rrrÚtest_wakeup_fd_during^såz'WakeupSignalTests.test_wakeup_fd_duringcCs| dtjtj¡dS)Nz§def test(): signal.signal(signal.SIGUSR1, handler) signal.raise_signal(signal.SIGUSR1) signal.raise_signal(signal.SIGALRM) )r‡r r,r=r)rrrÚ test_signum|süzWakeupSignalTests.test_signumÚpthread_sigmaskúneed signal.pthread_sigmask()cCs|jdtjtjdddS)Naædef test(): signum1 = signal.SIGUSR1 signum2 = signal.SIGUSR2 signal.signal(signum1, handler) signal.signal(signum2, handler) signal.pthread_sigmask(signal.SIG_BLOCK, (signum1, signum2)) signal.raise_signal(signum1) signal.raise_signal(signum2) # Unblocking the 2 signals calls the C signal handler twice signal.pthread_sigmask(signal.SIG_UNBLOCK, (signum1, signum2)) Fr€)r‡r r,ÚSIGUSR2r)rrrÚ test_pendingƒs ôzWakeupSignalTests.test_pending)rrrrQr}Ú _testcapir‡rrrr‘rRrsr r•rrrrr~ßs& 3" ÿr~Ú socketpairzneed socket.socketpairc@sTeZdZe edud¡dd„ƒZe edud¡dd„ƒZe edud¡dd„ƒZdS) ÚWakeupSocketSignalTestsNrcCsd}td|ƒdS)Na´if 1: import signal import socket import struct import _testcapi signum = signal.SIGINT signals = (signum,) def handler(signum, frame): pass signal.signal(signum, handler) read, write = socket.socketpair() write.setblocking(False) signal.set_wakeup_fd(write.fileno()) signal.raise_signal(signum) data = read.recv(1) if not data: raise Exception("no signum written") raised = struct.unpack('B', data) if raised != signals: raise Exception("%r != %r" % (raised, signals)) read.close() write.close() rF©r©rr†rrrÚ test_socket˜sz#WakeupSocketSignalTests.test_socketcCs.tjdkrd}nd}dj|d}td|ƒdS)NÚntÚsendr‰a+if 1: import errno import signal import socket import sys import time import _testcapi from test.support import captured_stderr signum = signal.SIGINT def handler(signum, frame): pass signal.signal(signum, handler) read, write = socket.socketpair() read.setblocking(False) write.setblocking(False) signal.set_wakeup_fd(write.fileno()) # Close sockets: send() will fail read.close() write.close() with captured_stderr() as err: signal.raise_signal(signum) err = err.getvalue() if ('Exception ignored when trying to {action} to the signal wakeup fd' not in err): raise AssertionError(err) ©ÚactionrF©r4rr‚r©rrŸr†rrrÚtest_send_error¼s !ß"z'WakeupSocketSignalTests.test_send_errorcCs.tjdkrd}nd}dj|d}td|ƒdS)Nrœrr‰aË if 1: import errno import signal import socket import sys import time import _testcapi from test.support import captured_stderr signum = signal.SIGINT # This handler will be called, but we intentionally won't read from # the wakeup fd. def handler(signum, frame): pass signal.signal(signum, handler) read, write = socket.socketpair() # Fill the socketpair buffer if sys.platform == 'win32': # bpo-34130: On Windows, sometimes non-blocking send fails to fill # the full socketpair buffer, so use a timeout of 50 ms instead. write.settimeout(0.050) else: write.setblocking(False) # Start with large chunk size to reduce the # number of send needed to fill the buffer. written = 0 for chunk_size in (2 ** 16, 2 ** 8, 1): chunk = b"x" * chunk_size try: while True: write.send(chunk) written += chunk_size except (BlockingIOError, socket.timeout): pass print(f"%s bytes written into the socketpair" % written, flush=True) write.setblocking(False) try: write.send(b"x") except BlockingIOError: # The socketpair buffer seems full pass else: raise AssertionError("%s bytes failed to fill the socketpair " "buffer" % written) # By default, we get a warning when a signal arrives msg = ('Exception ignored when trying to {action} ' 'to the signal wakeup fd') signal.set_wakeup_fd(write.fileno()) with captured_stderr() as err: signal.raise_signal(signum) err = err.getvalue() if msg not in err: raise AssertionError("first set_wakeup_fd() test failed, " "stderr: %r" % err) # And also if warn_on_full_buffer=True signal.set_wakeup_fd(write.fileno(), warn_on_full_buffer=True) with captured_stderr() as err: signal.raise_signal(signum) err = err.getvalue() if msg not in err: raise AssertionError("set_wakeup_fd(warn_on_full_buffer=True) " "test failed, stderr: %r" % err) # But not if warn_on_full_buffer=False signal.set_wakeup_fd(write.fileno(), warn_on_full_buffer=False) with captured_stderr() as err: signal.raise_signal(signum) err = err.getvalue() if err != "": raise AssertionError("set_wakeup_fd(warn_on_full_buffer=False) " "test failed, stderr: %r" % err) # And then check the default again, to make sure warn_on_full_buffer # settings don't leak across calls. signal.set_wakeup_fd(write.fileno()) with captured_stderr() as err: signal.raise_signal(signum) err = err.getvalue() if msg not in err: raise AssertionError("second set_wakeup_fd() test failed, " "stderr: %r" % err) ržrFr r¡rrrÚtest_warn_on_full_bufferçs cdz0WakeupSocketSignalTests.test_warn_on_full_buffer) rrrrQr}r–r›r¢r£rrrrr˜•s  # *r˜c@s,eZdZdd„Zdd„Zdd„Zdd„Zd S) ÚSiginterruptTestc CsÂd|f}td|ƒš}z |j ¡}|jtjd\}}Wn*tjy`| ¡YWdƒdS0||}|  ¡}|dvrŠt d||fƒ‚|dkWdƒSWdƒn1s´0YdS) z¿Perform a read during which a signal will arrive. Return True if the read is interrupted by the signal and raises an exception. Return False if it returns normally. aòif 1: import errno import os import signal import sys interrupt = %r r, w = os.pipe() def handler(signum, frame): 1 / 0 signal.signal(signal.SIGALRM, handler) if interrupt is not None: signal.siginterrupt(signal.SIGALRM, interrupt) print("ready") sys.stdout.flush() # run the test twice try: for loop in range(2): # send a SIGALRM in a second (during the read) signal.alarm(1) try: # blocking call: read from a pipe without data os.read(r, 1) except ZeroDivisionError: pass else: sys.exit(2) sys.exit(3) finally: os.close(r) os.close(w) rF)ÚtimeoutNF)éézChild error (exit code %s): %rr§) rÚstdoutÚreadlineÚ communicaterÚ SHORT_TIMEOUTrJÚTimeoutExpiredÚkillÚwaitÚ Exception)rZ interruptr†rOZ first_liner¨rHÚexitcoderrrÚreadpipe_interruptedXs"#Ý$  ÿz%SiginterruptTest.readpipe_interruptedcCs| d¡}| |¡dSr!©r±r`©rZ interruptedrrrÚtest_without_siginterrupt”s z*SiginterruptTest.test_without_siginterruptcCs| d¡}| |¡dS©NTr²r³rrrÚtest_siginterrupt_on›s z%SiginterruptTest.test_siginterrupt_oncCs| d¡}| |¡dS)NF)r±Z assertFalser³rrrÚtest_siginterrupt_off¢s z&SiginterruptTest.test_siginterrupt_offN)rrrr±r´r¶r·rrrrr¤Us<r¤c@sneZdZdd„Zdd„Zdd„Zdd„Zd d „Zd d „Zd d„Z e   e j dvd¡dd„ƒZdd„Zdd„ZdS)Ú ItimerTestcCs(d|_d|_d|_t tj|j¡|_dS)NFr)Ú hndl_calledÚ hndl_countÚitimerr r=Úsig_alrmÚ old_alarmr)rrrÚsetUp¬szItimerTest.setUpcCs,t tj|j¡|jdur(t |jd¡dSr:)r r=r½r»Ú setitimerr)rrrÚtearDown²s zItimerTest.tearDowncGs d|_dSrµ)r¹r"rrrr¼¸szItimerTest.sig_alrmcGsFd|_|jdkrt d¡‚n|jdkr4t tjd¡|jd7_dS)NTr§z.setitimer didn't disable ITIMER_VIRTUAL timer.ré)r¹rºr Ú ItimerErrorr¿ÚITIMER_VIRTUALr"rrrÚ sig_vtalrm»s    zItimerTest.sig_vtalrmcGsd|_t tjd¡dS)NTr)r¹r r¿Ú ITIMER_PROFr"rrrÚsig_profÈszItimerTest.sig_profcCs| tjtjdd¡dS)NrYr)r%r rÂr¿r)rrrÚtest_itimer_excÌszItimerTest.test_itimer_exccCs0tj|_t |jd¡t ¡| |jd¡dS)Ngð?T)r Ú ITIMER_REALr»r¿Úpauserr¹r)rrrÚtest_itimer_realÕszItimerTest.test_itimer_real)Znetbsd5zDitimer not reliable (does not mix well with threading) on some BSDs.cCstj|_t tj|j¡t |jdd¡t ¡}t ¡|dkr`tdddƒ}t  |j¡dkr0qjq0|  d¡|  t  |j¡d¡|  |j d ¡dS) Ng333333Ó?çš™™™™™É?çN@é90é2 é“–˜©çrÑú8timeout: likely cause: machine too slow or load too highT) r rÃr»Ú SIGVTALRMrÄr¿ÚtimeÚ monotonicÚpowÚ getitimerrŠrr¹©rZ start_timeÚ_rrrÚtest_itimer_virtualÜs  zItimerTest.test_itimer_virtualcCstj|_t tj|j¡t |jdd¡t ¡}t ¡|dkr`tdddƒ}t  |j¡dkr0qjq0|  d¡|  t  |j¡d¡|  |j d¡dS) NrËrÌrÍrÎrÏrÐrÒT) r rÅr»ÚSIGPROFrÆr¿rÔrÕrÖr×rŠrr¹rØrrrÚtest_itimer_profòs  zItimerTest.test_itimer_profcCs2tj|_t |jd¡t d¡| |jd¡dS)Nçíµ ÷ư>rÁT)r rÈr»r¿rÔÚsleeprr¹r)rrrÚtest_setitimer_tinys zItimerTest.test_setitimer_tinyN)rrrr¾rÀr¼rÄrÆrÇrÊrQr}rrrÚrÜrßrrrrr¸ªs   ÿ r¸c@sºeZdZdZe eedƒd¡dd„ƒZe eedƒd¡e eedƒd¡dd „ƒƒZ e eed ƒd ¡d d „ƒZ e eedƒd¡dd„ƒZ e eedƒd¡dd„ƒZ e eedƒd¡dd„ƒZ e eedƒd¡dd„ƒZe eedƒd¡dd„ƒZe eedƒd¡dd„ƒZe eedƒd¡d d!„ƒZe eedƒd¡e eedƒd¡d"d#„ƒƒZe eedƒd¡d$d%„ƒZe eedƒd¡d&d'„ƒZe eedƒd¡d(d)„ƒZe eed ƒd ¡d*d+„ƒZd,S)-ÚPendingSignalsTestsz[ Test pthread_sigmask(), pthread_kill(), sigpending() and sigwait() functions. Ú sigpendingzneed signal.sigpending()cCs| t ¡tƒ¡dSr!)rr rár<r)rrrÚtest_sigpending_emptysz)PendingSignalsTests.test_sigpending_emptyr’r“cCsd}td|ƒdS)Na if 1: import os import signal def handler(signum, frame): 1/0 signum = signal.SIGUSR1 signal.signal(signum, handler) signal.pthread_sigmask(signal.SIG_BLOCK, [signum]) os.kill(os.getpid(), signum) pending = signal.sigpending() for sig in pending: assert isinstance(sig, signal.Signals), repr(pending) if pending != {signum}: raise Exception('%s != {%s}' % (pending, signum)) try: signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum]) except ZeroDivisionError: pass else: raise Exception("ZeroDivisionError not raised") rFr™ršrrrÚtest_sigpendingsz#PendingSignalsTests.test_sigpendingÚ pthread_killzneed signal.pthread_kill()cCsd}td|ƒdS)Naâif 1: import signal import threading import sys signum = signal.SIGUSR1 def handler(signum, frame): 1/0 signal.signal(signum, handler) tid = threading.get_ident() try: signal.pthread_kill(tid, signum) except ZeroDivisionError: pass else: raise Exception("ZeroDivisionError not raised") rFr™ršrrrÚtest_pthread_kill9sz%PendingSignalsTests.test_pthread_killcCsd| ¡|f}td|ƒdS)zo test: body of the "def test(signum):" function. blocked: number of the blocked signal awif 1: import signal import sys from signal import Signals def handler(signum, frame): 1/0 %s blocked = %s signum = signal.SIGALRM # child: block and wait the signal try: signal.signal(signum, handler) signal.pthread_sigmask(signal.SIG_BLOCK, [blocked]) # Do the tests test(signum) # The handler must not be called on unblock try: signal.pthread_sigmask(signal.SIG_UNBLOCK, [blocked]) except ZeroDivisionError: print("the signal handler has been called", file=sys.stderr) sys.exit(1) except BaseException as err: print("error: {}".format(err), file=sys.stderr) sys.stderr.flush() sys.exit(1) rFN)Ústripr)rZblockedÚtestr†rrrÚ wait_helperRs à%zPendingSignalsTests.wait_helperÚsigwaitzneed signal.sigwait()cCs| tjd¡dS)Na  def test(signum): signal.alarm(1) received = signal.sigwait([signum]) assert isinstance(received, signal.Signals), received if received != signum: raise Exception('received %s, not %s' % (received, signum)) ©rèr r=r)rrrÚ test_sigwait€sz PendingSignalsTests.test_sigwaitÚ sigwaitinfozneed signal.sigwaitinfo()cCs| tjd¡dS)Nz× def test(signum): signal.alarm(1) info = signal.sigwaitinfo([signum]) if info.si_signo != signum: raise Exception("info.si_signo != %s" % signum) rêr)rrrÚtest_sigwaitinfoŒsz$PendingSignalsTests.test_sigwaitinfoÚ sigtimedwaitzneed signal.sigtimedwait()cCs| tjd¡dS)Nzá def test(signum): signal.alarm(1) info = signal.sigtimedwait([signum], 10.1000) if info.si_signo != signum: raise Exception('info.si_signo != %s' % signum) rêr)rrrÚtest_sigtimedwait—sz%PendingSignalsTests.test_sigtimedwaitcCs| tjd¡dS)Nzþ def test(signum): import os os.kill(os.getpid(), signum) info = signal.sigtimedwait([signum], 0) if info.si_signo != signum: raise Exception('info.si_signo != %s' % signum) rêr)rrrÚtest_sigtimedwait_poll¢sz*PendingSignalsTests.test_sigtimedwait_pollcCs| tjd¡dS)Nz¿ def test(signum): received = signal.sigtimedwait([signum], 1.0) if received is not None: raise Exception("received=%r" % (received,)) rêr)rrrÚtest_sigtimedwait_timeout¯sz-PendingSignalsTests.test_sigtimedwait_timeoutcCstj}| ttj|gd¡dS)Ngð¿)r r=r%r&rî)rrerrrÚ"test_sigtimedwait_negative_timeout¹sz6PendingSignalsTests.test_sigtimedwait_negative_timeoutcCstddƒdS)NrFa­if True: import os, threading, sys, time, signal # the default handler terminates the process signum = signal.SIGUSR1 def kill_later(): # wait until the main thread is waiting in sigwait() time.sleep(1) os.kill(os.getpid(), signum) # the signal must be blocked by all the threads signal.pthread_sigmask(signal.SIG_BLOCK, [signum]) killer = threading.Thread(target=kill_later) killer.start() received = signal.sigwait([signum]) if received != signum: print("sigwait() received %s, not %s" % (received, signum), file=sys.stderr) sys.exit(1) killer.join() # unblock the signal, which should have been cleared by sigwait() signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum]) r™r)rrrÚtest_sigwait_thread¿s z'PendingSignalsTests.test_sigwait_threadcCsü| ttj¡| ttjd¡| ttjddd¡| ttjdg¡| t¡"t tjtjg¡Wdƒn1sv0Y| t¡ t tjdg¡Wdƒn1s°0Y| t¡$t tjdd>g¡Wdƒn1sî0YdS)NrÁr¦r§i¤riè)r%r+r r’rhr&r r?r)rrrÚtest_pthread_sigmask_argumentsás 0 . z2PendingSignalsTests.test_pthread_sigmask_argumentscCsJt tjt ¡¡}| tjtj|¡t tjt ¡¡}| |t ¡¡dSr!)r r’r r;rrrr ZassertLessEqualrBrrrÚ"test_pthread_sigmask_valid_signalsïsz6PendingSignalsTests.test_pthread_sigmask_valid_signalscCsd}td|ƒdS)Na- if 1: import signal import os; import threading def handler(signum, frame): 1/0 def kill(signum): os.kill(os.getpid(), signum) def check_mask(mask): for sig in mask: assert isinstance(sig, signal.Signals), repr(sig) def read_sigmask(): sigmask = signal.pthread_sigmask(signal.SIG_BLOCK, []) check_mask(sigmask) return sigmask signum = signal.SIGUSR1 # Install our signal handler old_handler = signal.signal(signum, handler) # Unblock SIGUSR1 (and copy the old mask) to test our signal handler old_mask = signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum]) check_mask(old_mask) try: kill(signum) except ZeroDivisionError: pass else: raise Exception("ZeroDivisionError not raised") # Block and then raise SIGUSR1. The signal is blocked: the signal # handler is not called, and the signal is now pending mask = signal.pthread_sigmask(signal.SIG_BLOCK, [signum]) check_mask(mask) kill(signum) # Check the new mask blocked = read_sigmask() check_mask(blocked) if signum not in blocked: raise Exception("%s not in %s" % (signum, blocked)) if old_mask ^ blocked != {signum}: raise Exception("%s ^ %s != {%s}" % (old_mask, blocked, signum)) # Unblock SIGUSR1 try: # unblock the pending signal calls immediately the signal handler signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum]) except ZeroDivisionError: pass else: raise Exception("ZeroDivisionError not raised") try: kill(signum) except ZeroDivisionError: pass else: raise Exception("ZeroDivisionError not raised") # Check the new mask unblocked = read_sigmask() if signum in unblocked: raise Exception("%s in %s" % (signum, unblocked)) if blocked ^ unblocked != {signum}: raise Exception("%s ^ %s != {%s}" % (blocked, unblocked, signum)) if old_mask != unblocked: raise Exception("%s != %s" % (old_mask, unblocked)) rFr™ršrrrÚtest_pthread_sigmaskøsHz(PendingSignalsTests.test_pthread_sigmaskcCs^d}td|ƒ<}| ¡\}}| ¡}|dkr‡fdd„t tˆƒdƒDƒ}t   |¡}t j r’td |fƒ|S) Nécs,tˆƒˆkr(ˆ t ¡¡t tjd¡dS)NrÝ)rAÚappendrÔÚ perf_counterr r¿rÈ©reÚframe©ÚNÚtimesrrrans z5StressTest.measure_itimer_resolution..handlerrgü©ñÒMbP?cs g|]}ˆ|dˆ|‘qS)rÁr)Ú.0Úi)rrrÚ |rXz8StressTest.measure_itimer_resolution..rÁz,detected median itimer() resolution: %.6f s.)NN)rrr r¿rÈrûr=rArÔrÞÚrangeÚ statisticsZmedianrÚverboseÚprint)rraZ durationsZmedrrrÚmeasure_itimer_resolutionjs   z$StressTest.measure_itimer_resolutioncCs4| ¡}|dkrdS|dkr dS| d|f¡dS)Ng-Cëâ6?i'g{®Gáz„?édz^detected itimer resolution (%.3f s.) too high (> 10 ms.) on this platform (or system too busy))r rŠ)rZresorrrÚdecide_itimer_count‚sþzStressTest.decide_itimer_countr¿ztest needs setitimer()csú| ¡}g‰dd„}d ‡fdd„ }| tj|¡| tj|¡| tj|¡d}t ¡tj }||krät   t   ¡tj¡|d7}t ˆƒ|kr¤t ¡|kr¤t d¡q€t   t   ¡tj¡|d7}t ˆƒ|kr^t ¡|kr^t d¡q¾q^| t ˆƒ|d ¡dS) z; This test uses dependent signal handlers. cSst tjdt ¡d¡dS)NrÝçñh㈵øä>)r r¿rÈÚrandomrÿrrrÚ first_handler™sz@StressTest.test_stress_delivery_dependent..first_handlerNcsˆ |¡dSr!©rýrÿ©ZsigsrrÚsecond_handler¡szAStressTest.test_stress_delivery_dependent..second_handlerrrÁrúSome signals were lost)NN)r rûr rÛr,r=rÔrÕrr«r4r­ÚgetpidrArÞr)rrrrÚ expected_sigsÚdeadlinerrrÚtest_stress_delivery_dependents& z)StressTest.test_stress_delivery_dependentcs¾| ¡}g‰‡fdd„}| tj|¡| tj|¡d}t ¡tj}||kr¨t  tj dt   ¡d¡t   t  ¡tj¡|d7}tˆƒ|krFt ¡|krFt d¡q‚qF| tˆƒ|d¡dS) z> This test uses simultaneous signal handlers. csˆ |¡dSr!rrÿrrrraÇsz=StressTest.test_stress_delivery_simultaneous..handlerrrÝrr¦rN)r rûr r,r=rÔrÕrr«r¿rÈrr4r­rrArÞr)rrrarrrrrÚ!test_stress_delivery_simultaneous¾s z,StressTest.test_stress_delivery_simultaneousr,ztest needs SIGUSR1cs&tj‰d‰d‰d‰‡fdd„‰‡‡‡fdd„}‡‡‡fdd„}t ˆˆ¡}| tjˆ|¡tj|d }z¬d}t ¡d}| ¡|ƒd ‰| ¡|j durÊ|  |j j t ¡|  d ˆ›d t|j j ƒ¡d }Wdƒn1sÞ0Y|sø| ˆd¡| ˆˆ¡Wd ‰| ¡nd ‰| ¡0dS) NrFcs ˆd7‰dS©NrÁrrÿ)Únum_received_signalsrrÚcustom_handlerèszAStressTest.test_stress_modifying_handlers..custom_handlercsˆst ˆ¡ˆd7‰qdSr)r Ú raise_signalr)Údo_stopÚnum_sent_signalsrerrÚset_interruptsìs zAStressTest.test_stress_modifying_handlers..set_interruptscs8ˆdkr4tdƒD] }ˆtjfD]}t ˆ|¡qqqdS)Nr i N)rr r)rra)rrrerrÚcycle_handlersòs zAStressTest.test_stress_modifying_handlers..cycle_handlers)ÚtargetTzSignal z ignored due to race condition)r r,rrÚ threadingÚThreadrZcatch_unraisable_exceptionÚstartr8Z unraisablerÚ exc_valuerhr0ryZ assertGreaterr@)rr r!rúÚtZignoredr{r)rrrrrerÚtest_stress_modifying_handlersßs>       þ"  ÿz)StressTest.test_stress_modifying_handlersN)rrrrørûr r rQrRrsr rrr(rrrrrù_s  ÿ , ÿ  ÿrùc@s6eZdZdd„Ze ejdkd¡dd„ƒZdd„Z d S) ÚRaiseSignalTestcCs:| t¡t tj¡Wdƒn1s,0YdSr!)r%ÚKeyboardInterruptr rr1r)rrrÚ test_sigints zRaiseSignalTest.test_sigintr zWindows specific testc CsVzd}t |¡| d¡Wn4tyP}z|jtjkr:n‚WYd}~n d}~00dS)NrÁz#OSError (Invalid argument) expected)r rZfailrhÚerrnoÚEINVAL)rr.ÚerrrÚtest_invalid_argument s  z%RaiseSignalTest.test_invalid_argumentcsJd‰‡fdd„}t tj|¡}| tjtj|¡t tj¡| ˆ¡dS)NFcsd‰dSrµr)ÚaÚb©Zis_okrrra.sz-RaiseSignalTest.test_handler..handler)r r1rrrr`)rraZ old_signalrr2rÚ test_handler,s   zRaiseSignalTest.test_handlerN) rrrr+rQr}rrr/r3rrrrr)s r)c@s&eZdZe eedƒd¡dd„ƒZdS)ÚPidfdSignalTestÚpidfd_send_signalzpidfd support not built incCs | t¡}t dtj¡Wdƒn1s.0Y|jjtjkrR| d¡n|jjtj krj| d¡|  |jjtj ¡t   dt  ¡›t j¡}| t j|¡| td¡$t |tjtƒd¡Wdƒn1sØ0Y| t¡t |tj¡Wdƒn1s0YdS)Nrzkernel does not support pidfdsz"Not enough privileges to use pidfsz/proc/z^siginfo must be None$)r%rhr r5r1rzr,ZENOSYSrŠÚEPERMrÚEBADFr4ÚopenrÚ O_DIRECTORYrrrmZassertRaisesRegexr+Úobjectr*)rr{Zmy_pidfdrrrÚtest_pidfd_send_signal:s ,  2 z&PidfdSignalTest.test_pidfd_send_signalN)rrrrQrRrsr r;rrrrr48s þr4cCs t ¡dSr!)rÚ reap_childrenrrrrÚtearDownModuleMsr=Ú__main__)&r,r4rr rkrrJrr#rÔrQrçrZtest.support.script_helperrrr–Ú ImportErrorZTestCaserr}rr rRrSrdr~rsr˜r¤r¸ràrùr)r4r=rÚmainrrrrÚsT    =/M6@TeQ<