a 5êdgq,ã@s¾dZddlZddlZddlmZddlmZe d¡ Zddl Z ddl Z ddl Z e  ¡dd„ƒZ Gdd „d ejƒZGd d „d ejƒZGd d „d eƒZGdd„deƒZdd„ZedkrºeƒdS)z&Unit tests for socket timeout feature.éN)Úsupport)Ú socket_helperÚnetworkcCsJt |¡,t ||tjtj¡ddWdƒS1s<0YdS)z—Resolve an (host, port) to an address. We must perform name resolution before timeout tests, otherwise it will be performed by connect(). réN)rÚtransient_internetÚsocketÚ getaddrinfoÚAF_INETÚ SOCK_STREAM)ÚhostÚport©r ú-/usr/local/lib/python3.9/test/test_timeout.pyÚresolve_addresss  ÿÿÿrc@sXeZdZdZdd„Zdd„Zdd„Zdd „Zd d „Zd d „Z dd„Z dd„Z dd„Z dS)ÚCreationTestCasez9Test case for socket.gettimeout() and socket.settimeout()cCst tjtj¡|_dS©N)rr r Úsock©Úselfr r rÚsetUpszCreationTestCase.setUpcCs|j ¡dSr©rÚcloserr r rÚtearDown"szCreationTestCase.tearDowncCs| |j ¡dd¡dS)Nztimeout not disabled by default)Ú assertEqualrÚ gettimeoutrr r rÚtestObjectCreation%sÿz#CreationTestCase.testObjectCreationcCs^|j d¡| |j ¡d¡|j d¡| |j ¡d¡|j d¡| |j ¡d¡dS)Ngáz®Ga@é)rÚ settimeoutrrrr r rÚtestFloatReturnValue*s    z%CreationTestCase.testFloatReturnValuecCsP|j d¡| t|j ¡ƒtdƒ¡|j d¡| t|j ¡ƒtdƒ¡dS)Néçð?g333333@)rrrÚtyperrr r rÚtestReturnType5s  zCreationTestCase.testReturnTypecCs |j d¡|j d¡|j d¡|j d¡| t|jjd¡| t|jjd¡| t|jjd¡| t|jjg¡| t|jji¡| t|jjd¡dS)NrçÚr y)rrÚ assertRaisesÚ TypeErrorrr r rÚ testTypeCheck=s    zCreationTestCase.testTypeCheckcCs:| t|jjd¡| t|jjd¡| t|jjd¡dS)Néÿÿÿÿgð¿)r%Ú ValueErrorrrrr r rÚtestRangeCheckJszCreationTestCase.testRangeCheckcCs”|j d¡|j d¡| |j ¡d¡|j d¡| |j ¡d¡|j d¡|j d¡| |j ¡d¡|j d¡| |j ¡d¡dS)Né TFr#)rrÚ setblockingrrrr r rÚtestTimeoutThenBlockingPs      z(CreationTestCase.testTimeoutThenBlockingcCsX|j d¡|j d¡| |j ¡d¡|j d¡|j d¡| |j ¡d¡dS)NFrT)rr,rrrrr r rÚtestBlockingThenTimeout^s     z(CreationTestCase.testBlockingThenTimeoutN) Ú__name__Ú __module__Ú __qualname__Ú__doc__rrrrr"r'r*r-r.r r r rrs  rc@s*eZdZdZejZdd„ZeZdd„Z dS)ÚTimeoutTestCaseg@cCs tƒ‚dSr)ÚNotImplementedErrorrr r rrtszTimeoutTestCase.setUpc Gs¬|j |¡t|j|ƒ}t|ƒD]Z}t ¡}z ||ŽWq tjyx}z&t ¡|}WYd}~q†WYd}~q d}~00q | d¡|  |||j ¡|  ||d¡dS)z² Test the specified socket method. The method is run at most `count` times and must raise a socket.timeout within `timeout` + self.fuzz seconds. Nzsocket.timeout was not raisedr ) rrÚgetattrÚrangeÚtimeÚ monotonicrÚtimeoutZfailZ assertLessÚfuzzZ assertGreater) rÚcountr9ÚmethodÚargsÚiÚt1ÚeZdeltar r rÚ_sock_operationys     & zTimeoutTestCase._sock_operationN) r/r0r1r:rZHOSTÚ localhostrrrAr r r rr3is r3c@s\eZdZdZdd„Zdd„Ze dd¡dd „ƒZd d „Z d d „Z dd„Z dd„Z dd„Z dS)ÚTCPTimeoutTestCasez3TCP test case for socket.socket() timeout functionscCs"t tjtj¡|_tddƒ|_dS)Nzwww.python.org.éP)rr r rrÚ addr_remoterr r rr“szTCPTimeoutTestCase.setUpcCs|j ¡dSrrrr r rr—szTCPTimeoutTestCase.tearDownTz*need to replace these hosts; see bpo-35518c Cs$tddƒ}tddƒ}d}t tjtj¡}tj}| |¡zdz| |¡WnFtjy\Yn4t yŽ}z|j t j krzd}WYd}~n d}~00W|  ¡~n |  ¡~0|rÖ|  d |d|d ||d|d ¡¡||_t |jd¡"| d d d |j¡Wdƒn1s0YdS) Nzblackhole.snakebite.netiZÝzwhitehole.snakebite.neti[ÝTFzÇWe didn't receive a connection reset (RST) packet from {}:{} within {} seconds, so we're unable to test connect timeout against the corresponding {}:{} (which is configured to silently drop packets).rrgü©ñÒMbP?Úconnect)rrr r rZLOOPBACK_TIMEOUTrrFr9ÚOSErrorÚerrnoZ ECONNREFUSEDrZskipTestÚformatrErrrA)rZ blackholeZ whiteholeÚskiprr9Úerrr r rÚtestConnectTimeoutšs<   ÿ÷ÿz%TCPTimeoutTestCase.testConnectTimeoutcCsRt |jd¡.|j |j¡| dddd¡Wdƒn1sD0YdS)Nrrçø?Úrecvé)rrrErrFrArr r rÚtestRecvTimeoutìsz"TCPTimeoutTestCase.testRecvTimeoutcCs,t |j|j¡|j ¡| ddd¡dS)NrrMÚaccept)rÚ bind_portrrBÚlistenrArr r rÚtestAcceptTimeoutòs z$TCPTimeoutTestCase.testAcceptTimeoutcCsnt tjtj¡J}t ||j¡| ¡|j |  ¡¡|  ddddd¡Wdƒn1s`0YdS)NédrMÚsendóXé@ © rr r rrRrBrSrrFÚ getsocknamerA©rZservr r rÚtestSendøs zTCPTimeoutTestCase.testSendc Cstt tjtj¡P}t ||j¡| ¡|j |  ¡¡|  ddddd|  ¡¡Wdƒn1sf0YdS)NrUrMÚsendtorWrXrYr[r r rÚ testSendtosÿzTCPTimeoutTestCase.testSendtocCsnt tjtj¡J}t ||j¡| ¡|j |  ¡¡|  ddddd¡Wdƒn1s`0YdS)NrUrMÚsendallrWrXrYr[r r rÚ testSendall s zTCPTimeoutTestCase.testSendallN)r/r0r1r2rrÚunittestZskipIfrLrPrTr\r^r`r r r rrCs  Q  rCc@s(eZdZdZdd„Zdd„Zdd„ZdS) ÚUDPTimeoutTestCasez3UDP test case for socket.socket() timeout functionscCst tjtj¡|_dSr)rr Ú SOCK_DGRAMrrr r rrszUDPTimeoutTestCase.setUpcCs|j ¡dSrrrr r rrszUDPTimeoutTestCase.tearDowncCs$t |j|j¡| dddd¡dS)NrrMÚrecvfromrO)rrRrrBrArr r rÚtestRecvfromTimeoutsz&UDPTimeoutTestCase.testRecvfromTimeoutN)r/r0r1r2rrrer r r rrbsrbcCst d¡t ttt¡dS)Nr)rZrequiresZ run_unittestrrCrbr r r rÚ test_main%s  ýrfÚ__main__)r2Ú functoolsraÚtestrZ test.supportrZis_resource_enabledZ skip_expectedr7rHrÚ lru_cacherZTestCaserr3rCrbrfr/r r r rÚs$    M'