B
    2*™\e¥  ã               @   sÐ  d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlZd dl	Z	d dl
mZ d dlmZmZ yd dlZW n ek
r   dZY nX G dd„ de	jƒZe	 ejdkd¡G dd	„ d	e	jƒƒZe	 ejdkd
¡G dd„ de	jƒƒZG dd„ de	jƒZe	 ejdkd¡G dd„ de	jƒƒZe	 eedƒd¡G dd„ de	jƒƒZe	 ejdkd¡G dd„ de	jƒƒZe	 ejdkd¡G dd„ de	jƒƒZG dd„ de	jƒZG dd„ de	jƒZdd„ Z e!dkrÌe	 "¡  dS ) é    N)Úsupport)Úassert_python_okÚspawn_pythonc               @   s   e Zd Zdd„ ZdS )ÚGenericTestsc             C   sš   x”t tƒD ]ˆ}tt|ƒ}|dkr0|  |tj¡ q
|dkrH|  |tj¡ q
| d¡rl| d¡sl|  |tj¡ q
| d¡r
|  |tj¡ |  t	j
d¡ q
W d S )N>   ÚSIG_DFLÚSIG_IGN>   ÚSIG_SETMASKÚ	SIG_BLOCKÚSIG_UNBLOCKZSIGZSIG_ZCTRL_Úwin32)ÚdirÚsignalÚgetattrÚassertIsInstanceÚHandlersZSigmasksÚ
startswithZSignalsÚassertEqualÚsysÚplatform)ÚselfÚnameÚsig© r   ú;C:\ALexclude\prg\programme\Python37\Lib\test\test_signal.pyÚ
test_enums   s    

zGenericTests.test_enumsN)Ú__name__Ú
__module__Ú__qualname__r   r   r   r   r   r      s   r   r   zNot valid on Windowsc               @   s4   e Zd Zdd„ Zdd„ Zdd„ Zdd„ Zd	d
„ ZdS )Ú
PosixTestsc             G   s   d S )Nr   )r   Úargsr   r   r   Útrivial_signal_handler%   s    z!PosixTests.trivial_signal_handlerc             C   s(   |   ttjd¡ |   ttjd| j¡ d S )Ni’  )ÚassertRaisesÚ
ValueErrorr   Ú	getsignalr    )r   r   r   r   Ú,test_out_of_range_signal_number_raises_error(   s    z7PosixTests.test_out_of_range_signal_number_raises_errorc             C   s   |   ttjtjd ¡ d S )N)r!   Ú	TypeErrorr   ÚSIGUSR1)r   r   r   r   Ú0test_setting_signal_handler_to_none_raises_error.   s    
z;PosixTests.test_setting_signal_handler_to_none_raises_errorc             C   sZ   t   t j| j¡}|  |t j¡ |  t  t j¡| j¡ t   t j|¡ |  t  t j¡|¡ d S )N)r   ZSIGHUPr    r   r   r   r#   )r   Zhupr   r   r   Útest_getsignal2   s    zPosixTests.test_getsignalc             C   s&   t j t¡}t j |d¡}t|ƒ d S )Nzsignalinterproctester.py)ÚosÚpathÚdirnameÚ__file__Újoinr   )r   r+   Zscriptr   r   r   Útest_interprocess_signal;   s    z#PosixTests.test_interprocess_signalN)r   r   r   r    r$   r'   r(   r.   r   r   r   r   r   #   s
   	r   zWindows specificc               @   s   e Zd Zdd„ ZdS )ÚWindowsSignalTestsc          	   C   s¶   dd„ }t ƒ }xTtjtjtjtjtjtjtjfD ]0}t 	|¡d k	r0t |t ||¡¡ | 
|¡ q0W |  |¡ |  t¡ t d|¡ W d Q R X |  t¡ t d|¡ W d Q R X d S )Nc             S   s   d S )Nr   )ÚxÚyr   r   r   Ú<lambda>E   ó    z3WindowsSignalTests.test_issue9324.<locals>.<lambda>éÿÿÿÿé   )Úsetr   ÚSIGABRTÚSIGBREAKÚSIGFPEÚSIGILLÚSIGINTÚSIGSEGVÚSIGTERMr#   ÚaddÚ
assertTruer!   r"   )r   ÚhandlerÚcheckedr   r   r   r   Útest_issue9324C   s    
z!WindowsSignalTests.test_issue9324N)r   r   r   rB   r   r   r   r   r/   A   s   r/   c               @   sN   e Zd Zdd„ Zdd„ Zdd„ Zdd„ Zd	d
„ Ze 	e
jdkd¡dd„ ƒZdS )ÚWakeupFDTestsc          	   C   sL   |   t¡ tjtjd W d Q R X |   t¡ t tjd¡ W d Q R X d S )N)ÚsignumF)r!   r%   r   Úset_wakeup_fdr;   )r   r   r   r   Útest_invalid_call[   s    zWakeupFDTests.test_invalid_callc             C   s    t  ¡ }|  ttftj|¡ d S )N)r   Zmake_bad_fdr!   r"   ÚOSErrorr   rE   )r   Úfdr   r   r   Útest_invalid_fdd   s    
zWakeupFDTests.test_invalid_fdc             C   s0   t   ¡ }| ¡ }| ¡  |  ttftj|¡ d S )N)ÚsocketÚfilenoÚcloser!   r"   rG   r   rE   )r   ZsockrH   r   r   r   Útest_invalid_socketi   s
    
z!WakeupFDTests.test_invalid_socketc             C   s¶   t  ¡ \}}|  t j|¡ |  t j|¡ t  ¡ \}}|  t j|¡ |  t j|¡ tt dƒrrt  |d¡ t  |d¡ t |¡ |  t |¡|¡ |  t d¡|¡ |  t d¡d¡ d S )NÚset_blockingFr4   )	r)   ÚpipeÚ
addCleanuprL   ÚhasattrrN   r   rE   r   )r   Zr1Zw1Zr2Zw2r   r   r   Útest_set_wakeup_fd_resultp   s    

z'WakeupFDTests.test_set_wakeup_fd_resultc             C   s   t   ¡ }|  |j¡ | d¡ | ¡ }t   ¡ }|  |j¡ | d¡ | ¡ }t |¡ |  t |¡|¡ |  t d¡|¡ |  t d¡d¡ d S )NFr4   )rJ   rP   rL   ZsetblockingrK   r   rE   r   )r   Zsock1Zfd1Zsock2Zfd2r   r   r   Ú test_set_wakeup_fd_socket_result   s    


z.WakeupFDTests.test_set_wakeup_fd_socket_resultr   ztests specific to POSIXc          	   C   sŽ   t  ¡ \}}|  t j|¡ |  t j|¡ t  |d¡ |  t¡}t |¡ W d Q R X |  	t
|jƒd| ¡ t  |d¡ t |¡ t d¡ d S )NTz&the fd %s must be in non-blocking modeFr4   )r)   rO   rP   rL   rN   r!   r"   r   rE   r   ÚstrZ	exception)r   ZrfdZwfdÚcmr   r   r   Útest_set_wakeup_fd_blocking“   s    

z)WakeupFDTests.test_set_wakeup_fd_blockingN)r   r   r   rF   rI   rM   rR   rS   ÚunittestÚskipIfr   r   rV   r   r   r   r   rC   Y   s   	rC   c               @   st   e Zd Ze edkd¡ddœdd„ƒZe edkd¡dd„ ƒZd	d
„ Zdd„ Z	dd„ Z
e eedƒd¡dd„ ƒZdS )ÚWakeupSignalTestsNzneed _testcapiT)Úorderedc            G   s&   d  ttt|ƒƒ||¡}td|ƒ d S )Na  if 1:
        import _testcapi
        import os
        import signal
        import struct

        signals = {!r}

        def handler(signum, frame):
            pass

        def check_signum(signals):
            data = os.read(read, len(signals)+1)
            raised = struct.unpack('%uB' % len(data), data)
            if not {!r}:
                raised = set(raised)
                signals = set(signals)
            if raised != signals:
                raise Exception("%r != %r" % (raised, signals))

        {}

        signal.signal(signal.SIGALRM, handler)
        read, write = os.pipe()
        os.set_blocking(write, False)
        signal.set_wakeup_fd(write)

        test()
        check_signum(signals)

        os.close(read)
        os.close(write)
        z-c)ÚformatÚtupleÚmapÚintr   )r   Z	test_bodyrZ   ZsignalsÚcoder   r   r   Úcheck_wakeup¨   s    #zWakeupSignalTests.check_wakeupc          	   C   sj   d}t  ¡ \}}z4yt  |d¡ W n tk
r6   Y nX |  d¡ W d t  |¡ t  |¡ X td|ƒ d S )Na)  if 1:
        import _testcapi
        import errno
        import os
        import signal
        import sys
        from test.support import captured_stderr

        def handler(signum, frame):
            1/0

        signal.signal(signal.SIGALRM, handler)
        r, w = os.pipe()
        os.set_blocking(r, False)

        # Set wakeup_fd a read-only file descriptor to trigger the error
        signal.set_wakeup_fd(r)
        try:
            with captured_stderr() as err:
                _testcapi.raise_signal(signal.SIGALRM)
        except ZeroDivisionError:
            # An ignored exception should have been printed out on stderr
            err = err.getvalue()
            if ('Exception ignored when trying to write to the signal wakeup fd'
                not in err):
                raise AssertionError(err)
            if ('OSError: [Errno %d]' % errno.EBADF) not in err:
                raise AssertionError(err)
        else:
            raise AssertionError("ZeroDivisionError not raised")

        os.close(r)
        os.close(w)
        ó   xz9OS doesn't report write() error on the read end of a pipez-c)r)   rO   ÚwriterG   ÚskipTestrL   r   )r   r_   ÚrÚwr   r   r   Útest_wakeup_write_errorÏ   s    &
z)WakeupSignalTests.test_wakeup_write_errorc             C   s   |   dtj¡ d S )Na·  def test():
            import select
            import time

            TIMEOUT_FULL = 10
            TIMEOUT_HALF = 5

            class InterruptSelect(Exception):
                pass

            def handler(signum, frame):
                raise InterruptSelect
            signal.signal(signal.SIGALRM, handler)

            signal.alarm(1)

            # We attempt to get a signal during the sleep,
            # before select is called
            try:
                select.select([], [], [], TIMEOUT_FULL)
            except InterruptSelect:
                pass
            else:
                raise Exception("select() was not interrupted")

            before_time = time.monotonic()
            select.select([read], [], [], TIMEOUT_FULL)
            after_time = time.monotonic()
            dt = after_time - before_time
            if dt >= TIMEOUT_HALF:
                raise Exception("%s >= %s" % (dt, TIMEOUT_HALF))
        )r`   r   ÚSIGALRM)r   r   r   r   Útest_wakeup_fd_early  s    z&WakeupSignalTests.test_wakeup_fd_earlyc             C   s   |   dtj¡ d S )Na`  def test():
            import select
            import time

            TIMEOUT_FULL = 10
            TIMEOUT_HALF = 5

            class InterruptSelect(Exception):
                pass

            def handler(signum, frame):
                raise InterruptSelect
            signal.signal(signal.SIGALRM, handler)

            signal.alarm(1)
            before_time = time.monotonic()
            # We attempt to get a signal during the select call
            try:
                select.select([read], [], [], TIMEOUT_FULL)
            except InterruptSelect:
                pass
            else:
                raise Exception("select() was not interrupted")
            after_time = time.monotonic()
            dt = after_time - before_time
            if dt >= TIMEOUT_HALF:
                raise Exception("%s >= %s" % (dt, TIMEOUT_HALF))
        )r`   r   rg   )r   r   r   r   Útest_wakeup_fd_during%  s    z'WakeupSignalTests.test_wakeup_fd_duringc             C   s   |   dtjtj¡ d S )NzÊdef test():
            import _testcapi
            signal.signal(signal.SIGUSR1, handler)
            _testcapi.raise_signal(signal.SIGUSR1)
            _testcapi.raise_signal(signal.SIGALRM)
        )r`   r   r&   rg   )r   r   r   r   Útest_signumC  s    zWakeupSignalTests.test_signumÚpthread_sigmaskzneed signal.pthread_sigmask()c             C   s   | j dtjtjdd d S )Naì  def test():
            signum1 = signal.SIGUSR1
            signum2 = signal.SIGUSR2

            signal.signal(signum1, handler)
            signal.signal(signum2, handler)

            signal.pthread_sigmask(signal.SIG_BLOCK, (signum1, signum2))
            _testcapi.raise_signal(signum1)
            _testcapi.raise_signal(signum2)
            # Unblocking the 2 signals calls the C signal handler twice
            signal.pthread_sigmask(signal.SIG_UNBLOCK, (signum1, signum2))
        F)rZ   )r`   r   r&   ZSIGUSR2)r   r   r   r   Útest_pendingK  s    zWakeupSignalTests.test_pending)r   r   r   rW   rX   Ú	_testcapir`   rf   rh   ri   rj   Ú
skipUnlessrQ   r   rl   r   r   r   r   rY   ¦   s   &4"rY   Z
socketpairzneed socket.socketpairc               @   sT   e Zd Ze edkd¡dd„ ƒZe edkd¡dd„ ƒZe edkd¡dd„ ƒZdS )	ÚWakeupSocketSignalTestsNzneed _testcapic             C   s   d}t d|ƒ d S )Na·  if 1:
        import signal
        import socket
        import struct
        import _testcapi

        signum = signal.SIGINT
        signals = (signum,)

        def handler(signum, frame):
            pass

        signal.signal(signum, handler)

        read, write = socket.socketpair()
        write.setblocking(False)
        signal.set_wakeup_fd(write.fileno())

        _testcapi.raise_signal(signum)

        data = read.recv(1)
        if not data:
            raise Exception("no signum written")
        raised = struct.unpack('B', data)
        if raised != signals:
            raise Exception("%r != %r" % (raised, signals))

        read.close()
        write.close()
        z-c)r   )r   r_   r   r   r   Útest_socket`  s     z#WakeupSocketSignalTests.test_socketc             C   s.   t jdkrd}nd}dj|d}td|ƒ d S )NÚntÚsendrb   a.  if 1:
        import errno
        import signal
        import socket
        import sys
        import time
        import _testcapi
        from test.support import captured_stderr

        signum = signal.SIGINT

        def handler(signum, frame):
            pass

        signal.signal(signum, handler)

        read, write = socket.socketpair()
        read.setblocking(False)
        write.setblocking(False)

        signal.set_wakeup_fd(write.fileno())

        # Close sockets: send() will fail
        read.close()
        write.close()

        with captured_stderr() as err:
            _testcapi.raise_signal(signum)

        err = err.getvalue()
        if ('Exception ignored when trying to {action} to the signal wakeup fd'
            not in err):
            raise AssertionError(err)
        )Úactionz-c)r)   r   r[   r   )r   rs   r_   r   r   r   Útest_send_error„  s
    
"z'WakeupSocketSignalTests.test_send_errorc             C   s.   t jdkrd}nd}dj|d}td|ƒ d S )Nrq   rr   rb   a×  if 1:
        import errno
        import signal
        import socket
        import sys
        import time
        import _testcapi
        from test.support import captured_stderr

        signum = signal.SIGINT

        # This handler will be called, but we intentionally won't read from
        # the wakeup fd.
        def handler(signum, frame):
            pass

        signal.signal(signum, handler)

        read, write = socket.socketpair()

        # Fill the socketpair buffer
        if sys.platform == 'win32':
            # bpo-34130: On Windows, sometimes non-blocking send fails to fill
            # the full socketpair buffer, so use a timeout of 50 ms instead.
            write.settimeout(0.050)
        else:
            write.setblocking(False)

        # Start with large chunk size to reduce the
        # number of send needed to fill the buffer.
        written = 0
        for chunk_size in (2 ** 16, 2 ** 8, 1):
            chunk = b"x" * chunk_size
            try:
                while True:
                    write.send(chunk)
                    written += chunk_size
            except (BlockingIOError, socket.timeout):
                pass

        print(f"%s bytes written into the socketpair" % written, flush=True)

        write.setblocking(False)
        try:
            write.send(b"x")
        except BlockingIOError:
            # The socketpair buffer seems full
            pass
        else:
            raise AssertionError("%s bytes failed to fill the socketpair "
                                 "buffer" % written)

        # By default, we get a warning when a signal arrives
        msg = ('Exception ignored when trying to {action} '
               'to the signal wakeup fd')
        signal.set_wakeup_fd(write.fileno())

        with captured_stderr() as err:
            _testcapi.raise_signal(signum)

        err = err.getvalue()
        if msg not in err:
            raise AssertionError("first set_wakeup_fd() test failed, "
                                 "stderr: %r" % err)

        # And also if warn_on_full_buffer=True
        signal.set_wakeup_fd(write.fileno(), warn_on_full_buffer=True)

        with captured_stderr() as err:
            _testcapi.raise_signal(signum)

        err = err.getvalue()
        if msg not in err:
            raise AssertionError("set_wakeup_fd(warn_on_full_buffer=True) "
                                 "test failed, stderr: %r" % err)

        # But not if warn_on_full_buffer=False
        signal.set_wakeup_fd(write.fileno(), warn_on_full_buffer=False)

        with captured_stderr() as err:
            _testcapi.raise_signal(signum)

        err = err.getvalue()
        if err != "":
            raise AssertionError("set_wakeup_fd(warn_on_full_buffer=False) "
                                 "test failed, stderr: %r" % err)

        # And then check the default again, to make sure warn_on_full_buffer
        # settings don't leak across calls.
        signal.set_wakeup_fd(write.fileno())

        with captured_stderr() as err:
            _testcapi.raise_signal(signum)

        err = err.getvalue()
        if msg not in err:
            raise AssertionError("second set_wakeup_fd() test failed, "
                                 "stderr: %r" % err)

        )rs   z-c)r)   r   r[   r   )r   rs   r_   r   r   r   Útest_warn_on_full_buffer¯  s
    
dz0WakeupSocketSignalTests.test_warn_on_full_buffer)	r   r   r   rW   rX   rm   rp   rt   ru   r   r   r   r   ro   ]  s   $+ro   c               @   s,   e Zd Zdd„ Zdd„ Zdd„ Zdd„ Zd	S )
ÚSiginterruptTestc          	   C   s’   d|f }t d|ƒt}y|j ¡ }|jdd\}}W n tjk
rR   | ¡  dS X || }| ¡ }|dkr|td||f ƒ‚|dkS W d	Q R X d	S )
z¿Perform a read during which a signal will arrive.  Return True if the
        read is interrupted by the signal and raises an exception.  Return False
        if it returns normally.
        aò  if 1:
            import errno
            import os
            import signal
            import sys

            interrupt = %r
            r, w = os.pipe()

            def handler(signum, frame):
                1 / 0

            signal.signal(signal.SIGALRM, handler)
            if interrupt is not None:
                signal.siginterrupt(signal.SIGALRM, interrupt)

            print("ready")
            sys.stdout.flush()

            # run the test twice
            try:
                for loop in range(2):
                    # send a SIGALRM in a second (during the read)
                    signal.alarm(1)
                    try:
                        # blocking call: read from a pipe without data
                        os.read(r, 1)
                    except ZeroDivisionError:
                        pass
                    else:
                        sys.exit(2)
                sys.exit(3)
            finally:
                os.close(r)
                os.close(w)
        z-cg      @)ZtimeoutF)é   é   zChild error (exit code %s): %rrx   N)	r   ÚstdoutÚreadlineÚcommunicateÚ
subprocessZTimeoutExpiredÚkillÚwaitÚ	Exception)r   Z	interruptr_   ÚprocessZ
first_linery   ÚstderrÚexitcoder   r   r   Úreadpipe_interrupted   s    *

z%SiginterruptTest.readpipe_interruptedc             C   s   |   d ¡}|  |¡ d S )N)rƒ   r?   )r   Úinterruptedr   r   r   Útest_without_siginterrupt\  s    
z*SiginterruptTest.test_without_siginterruptc             C   s   |   d¡}|  |¡ d S )NT)rƒ   r?   )r   r„   r   r   r   Útest_siginterrupt_onc  s    
z%SiginterruptTest.test_siginterrupt_onc             C   s   |   d¡}|  |¡ d S )NF)rƒ   ZassertFalse)r   r„   r   r   r   Útest_siginterrupt_offj  s    
z&SiginterruptTest.test_siginterrupt_offN)r   r   r   rƒ   r…   r†   r‡   r   r   r   r   rv     s   <rv   c               @   sn   e Zd Zdd„ Zdd„ Zdd„ Zdd„ Zd	d
„ Zdd„ Zdd„ Z	e
 ejdkd¡dd„ ƒZdd„ Zdd„ ZdS )Ú
ItimerTestc             C   s(   d| _ d| _d | _t tj| j¡| _d S )NFr   )Úhndl_calledÚ
hndl_countÚitimerr   rg   Úsig_alrmÚ	old_alarm)r   r   r   r   ÚsetUpt  s    zItimerTest.setUpc             C   s,   t   t j| j¡ | jd k	r(t  | jd¡ d S )Nr   )r   rg   r   r‹   Ú	setitimer)r   r   r   r   ÚtearDownz  s    
zItimerTest.tearDownc             G   s
   d| _ d S )NT)r‰   )r   r   r   r   r   rŒ   €  s    zItimerTest.sig_alrmc             G   sF   d| _ | jdkrt d¡‚n| jdkr4t tjd¡ |  jd7  _d S )NTrx   z.setitimer didn't disable ITIMER_VIRTUAL timer.r   é   )r‰   rŠ   r   ÚItimerErrorr   ÚITIMER_VIRTUAL)r   r   r   r   r   Ú
sig_vtalrmƒ  s    

zItimerTest.sig_vtalrmc             G   s   d| _ t tjd¡ d S )NTr   )r‰   r   r   ÚITIMER_PROF)r   r   r   r   r   Úsig_prof  s    zItimerTest.sig_profc             C   s   |   tjtjdd¡ d S )Nr4   r   )r!   r   r’   r   )r   r   r   r   Útest_itimer_exc”  s    zItimerTest.test_itimer_excc             C   s0   t j| _t  | jd¡ t  ¡  |  | jd¡ d S )Ng      ð?T)r   ÚITIMER_REALr‹   r   Zpauser   r‰   )r   r   r   r   Útest_itimer_real  s    zItimerTest.test_itimer_real)Znetbsd5zDitimer not reliable (does not mix well with threading) on some BSDs.c             C   s”   t j| _t   t j| j¡ t  | jdd¡ t ¡ }x<t ¡ | dk rbtdddƒ}t  	| j¡dkr2P q2W |  
d¡ |  t  	| j¡d¡ |  | jd	¡ d S )
Ng333333Ó?gš™™™™™É?g      N@i90  i2	 i“–˜ )g        g        z8timeout: likely cause: machine too slow or load too highT)r   r“   r‹   Z	SIGVTALRMr”   r   ÚtimeÚ	monotonicÚpowÚ	getitimerrc   r   r‰   )r   Ú
start_timeÚ_r   r   r   Útest_itimer_virtual¤  s    
zItimerTest.test_itimer_virtualc             C   s”   t j| _t   t j| j¡ t  | jdd¡ t ¡ }x<t ¡ | dk rbtdddƒ}t  	| j¡dkr2P q2W |  
d¡ |  t  	| j¡d¡ |  | jd¡ d S )	Ngš™™™™™É?g      N@i90  i2	 i“–˜ )g        g        z8timeout: likely cause: machine too slow or load too highT)r   r•   r‹   ÚSIGPROFr–   r   rš   r›   rœ   r   rc   r   r‰   )r   rž   rŸ   r   r   r   Útest_itimer_profº  s    
zItimerTest.test_itimer_profc             C   s2   t j| _t  | jd¡ t d¡ |  | jd¡ d S )Ngíµ ÷Æ°>r‘   T)r   r˜   r‹   r   rš   Úsleepr   r‰   )r   r   r   r   Útest_setitimer_tinyÎ  s    
zItimerTest.test_setitimer_tinyN)r   r   r   rŽ   r   rŒ   r”   r–   r—   r™   rW   rX   r   r   r    r¢   r¤   r   r   r   r   rˆ   r  s   	rˆ   c               @   s   e Zd ZdZe eedƒd¡dd„ ƒZe eedƒd¡e eedƒd¡dd	„ ƒƒZ	e eed
ƒd¡dd„ ƒZ
e eedƒd¡dd„ ƒZe eedƒd¡dd„ ƒZe eedƒd¡dd„ ƒZe eedƒd¡dd„ ƒZe eedƒd¡dd„ ƒZe eedƒd¡dd„ ƒZe eedƒd¡d d!„ ƒZe eedƒd¡e eedƒd¡d"d#„ ƒƒZe eedƒd¡d$d%„ ƒZe eedƒd¡d&d'„ ƒZe eed
ƒd¡d(d)„ ƒZd*S )+ÚPendingSignalsTestsz[
    Test pthread_sigmask(), pthread_kill(), sigpending() and sigwait()
    functions.
    Ú
sigpendingzneed signal.sigpending()c             C   s   |   t ¡ tƒ ¡ d S )N)r   r   r¦   r6   )r   r   r   r   Útest_sigpending_emptyÝ  s    z)PendingSignalsTests.test_sigpending_emptyrk   zneed signal.pthread_sigmask()c             C   s   d}t d|ƒ d S )Na  if 1:
            import os
            import signal

            def handler(signum, frame):
                1/0

            signum = signal.SIGUSR1
            signal.signal(signum, handler)

            signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
            os.kill(os.getpid(), signum)
            pending = signal.sigpending()
            for sig in pending:
                assert isinstance(sig, signal.Signals), repr(pending)
            if pending != {signum}:
                raise Exception('%s != {%s}' % (pending, signum))
            try:
                signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
            except ZeroDivisionError:
                pass
            else:
                raise Exception("ZeroDivisionError not raised")
        z-c)r   )r   r_   r   r   r   Útest_sigpendingâ  s    z#PendingSignalsTests.test_sigpendingZpthread_killzneed signal.pthread_kill()c             C   s   d}t d|ƒ d S )Naâ  if 1:
            import signal
            import threading
            import sys

            signum = signal.SIGUSR1

            def handler(signum, frame):
                1/0

            signal.signal(signum, handler)

            tid = threading.get_ident()
            try:
                signal.pthread_kill(tid, signum)
            except ZeroDivisionError:
                pass
            else:
                raise Exception("ZeroDivisionError not raised")
        z-c)r   )r   r_   r   r   r   Útest_pthread_kill  s    z%PendingSignalsTests.test_pthread_killc             C   s   d|  ¡ |f }td|ƒ dS )zo
        test: body of the "def test(signum):" function.
        blocked: number of the blocked signal
        aw  if 1:
        import signal
        import sys
        from signal import Signals

        def handler(signum, frame):
            1/0

        %s

        blocked = %s
        signum = signal.SIGALRM

        # child: block and wait the signal
        try:
            signal.signal(signum, handler)
            signal.pthread_sigmask(signal.SIG_BLOCK, [blocked])

            # Do the tests
            test(signum)

            # The handler must not be called on unblock
            try:
                signal.pthread_sigmask(signal.SIG_UNBLOCK, [blocked])
            except ZeroDivisionError:
                print("the signal handler has been called",
                      file=sys.stderr)
                sys.exit(1)
        except BaseException as err:
            print("error: {}".format(err), file=sys.stderr)
            sys.stderr.flush()
            sys.exit(1)
        z-cN)Ústripr   )r   ZblockedÚtestr_   r   r   r   Úwait_helper  s    'zPendingSignalsTests.wait_helperZsigwaitzneed signal.sigwait()c             C   s   |   tjd¡ d S )Na   
        def test(signum):
            signal.alarm(1)
            received = signal.sigwait([signum])
            assert isinstance(received, signal.Signals), received
            if received != signum:
                raise Exception('received %s, not %s' % (received, signum))
        )r¬   r   rg   )r   r   r   r   Útest_sigwaitH  s    z PendingSignalsTests.test_sigwaitZsigwaitinfozneed signal.sigwaitinfo()c             C   s   |   tjd¡ d S )Nz×
        def test(signum):
            signal.alarm(1)
            info = signal.sigwaitinfo([signum])
            if info.si_signo != signum:
                raise Exception("info.si_signo != %s" % signum)
        )r¬   r   rg   )r   r   r   r   Útest_sigwaitinfoT  s    z$PendingSignalsTests.test_sigwaitinfoÚsigtimedwaitzneed signal.sigtimedwait()c             C   s   |   tjd¡ d S )Nzá
        def test(signum):
            signal.alarm(1)
            info = signal.sigtimedwait([signum], 10.1000)
            if info.si_signo != signum:
                raise Exception('info.si_signo != %s' % signum)
        )r¬   r   rg   )r   r   r   r   Útest_sigtimedwait_  s    z%PendingSignalsTests.test_sigtimedwaitc             C   s   |   tjd¡ d S )Nzþ
        def test(signum):
            import os
            os.kill(os.getpid(), signum)
            info = signal.sigtimedwait([signum], 0)
            if info.si_signo != signum:
                raise Exception('info.si_signo != %s' % signum)
        )r¬   r   rg   )r   r   r   r   Útest_sigtimedwait_pollj  s    z*PendingSignalsTests.test_sigtimedwait_pollc             C   s   |   tjd¡ d S )Nz¿
        def test(signum):
            received = signal.sigtimedwait([signum], 1.0)
            if received is not None:
                raise Exception("received=%r" % (received,))
        )r¬   r   rg   )r   r   r   r   Útest_sigtimedwait_timeoutw  s    z-PendingSignalsTests.test_sigtimedwait_timeoutc             C   s   t j}|  tt j|gd¡ d S )Ng      ð¿)r   rg   r!   r"   r¯   )r   rD   r   r   r   Ú"test_sigtimedwait_negative_timeout  s    z6PendingSignalsTests.test_sigtimedwait_negative_timeoutc             C   s   t ddƒ d S )Nz-ca­  if True:
            import os, threading, sys, time, signal

            # the default handler terminates the process
            signum = signal.SIGUSR1

            def kill_later():
                # wait until the main thread is waiting in sigwait()
                time.sleep(1)
                os.kill(os.getpid(), signum)

            # the signal must be blocked by all the threads
            signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
            killer = threading.Thread(target=kill_later)
            killer.start()
            received = signal.sigwait([signum])
            if received != signum:
                print("sigwait() received %s, not %s" % (received, signum),
                      file=sys.stderr)
                sys.exit(1)
            killer.join()
            # unblock the signal, which should have been cleared by sigwait()
            signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
        )r   )r   r   r   r   Útest_sigwait_thread‡  s    	z'PendingSignalsTests.test_sigwait_threadc             C   sH   |   ttj¡ |   ttjd¡ |   ttjddd¡ |   ttjdg ¡ d S )Nr‘   rw   rx   i¤  )r!   r%   r   rk   rG   )r   r   r   r   Útest_pthread_sigmask_arguments©  s    z2PendingSignalsTests.test_pthread_sigmask_argumentsc             C   s   d}t d|ƒ d S )Na-	  if 1:
        import signal
        import os; import threading

        def handler(signum, frame):
            1/0

        def kill(signum):
            os.kill(os.getpid(), signum)

        def check_mask(mask):
            for sig in mask:
                assert isinstance(sig, signal.Signals), repr(sig)

        def read_sigmask():
            sigmask = signal.pthread_sigmask(signal.SIG_BLOCK, [])
            check_mask(sigmask)
            return sigmask

        signum = signal.SIGUSR1

        # Install our signal handler
        old_handler = signal.signal(signum, handler)

        # Unblock SIGUSR1 (and copy the old mask) to test our signal handler
        old_mask = signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
        check_mask(old_mask)
        try:
            kill(signum)
        except ZeroDivisionError:
            pass
        else:
            raise Exception("ZeroDivisionError not raised")

        # Block and then raise SIGUSR1. The signal is blocked: the signal
        # handler is not called, and the signal is now pending
        mask = signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
        check_mask(mask)
        kill(signum)

        # Check the new mask
        blocked = read_sigmask()
        check_mask(blocked)
        if signum not in blocked:
            raise Exception("%s not in %s" % (signum, blocked))
        if old_mask ^ blocked != {signum}:
            raise Exception("%s ^ %s != {%s}" % (old_mask, blocked, signum))

        # Unblock SIGUSR1
        try:
            # unblock the pending signal calls immediately the signal handler
            signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
        except ZeroDivisionError:
            pass
        else:
            raise Exception("ZeroDivisionError not raised")
        try:
            kill(signum)
        except ZeroDivisionError:
            pass
        else:
            raise Exception("ZeroDivisionError not raised")

        # Check the new mask
        unblocked = read_sigmask()
        if signum in unblocked:
            raise Exception("%s in %s" % (signum, unblocked))
        if blocked ^ unblocked != {signum}:
            raise Exception("%s ^ %s != {%s}" % (blocked, unblocked, signum))
        if old_mask != unblocked:
            raise Exception("%s != %s" % (old_mask, unblocked))
        z-c)r   )r   r_   r   r   r   Útest_pthread_sigmask±  s    Jz(PendingSignalsTests.test_pthread_sigmaskc          	   C   sJ   d}t d|ƒ2}| ¡ \}}| ¡ }|dkr<td||f ƒ‚W d Q R X d S )Na7  if True:
            import threading
            import signal
            import sys

            def handler(signum, frame):
                sys.exit(3)

            signal.signal(signal.SIGUSR1, handler)
            signal.pthread_kill(threading.get_ident(), signal.SIGUSR1)
            sys.exit(2)
        z-crx   zChild error (exit code %s): %s)r   r{   r~   r   )r   r_   r€   ry   r   r‚   r   r   r   Útest_pthread_kill_main_threadþ  s    z1PendingSignalsTests.test_pthread_kill_main_threadN)r   r   r   Ú__doc__rW   rn   rQ   r   r§   r¨   r©   r¬   r­   r®   r°   r±   r²   r³   r´   rµ   r¶   r·   r   r   r   r   r¥   Ø  sB   -

	Lr¥   c               @   s\   e Zd ZdZdd„ Zdd„ Zdd„ Ze e	e
dƒd	¡d
d„ ƒZe e	e
dƒd	¡dd„ ƒZdS )Ú
StressTestz©
    Stress signal delivery, especially when a signal arrives in
    the middle of recomputing the signal state or executing
    previously tripped signal handlers.
    c             C   s    t   ||¡}|  t j ||¡ d S )N)r   rP   )r   rD   r@   Zold_handlerr   r   r   Úsetsig  s    zStressTest.setsigc                sš   d‰ g ‰d
‡ ‡fdd„	}|   tjtjd¡ |  tj|¡ |ƒ  xtˆƒˆ k rXt d¡ q@W ‡fdd„t	tˆƒd ƒD ƒ}t
 |¡}tjr–td	|f ƒ |S )Né   c                s,   t ˆƒˆ k r(ˆ t ¡ ¡ t tjd¡ d S )Ngíµ ÷Æ°>)ÚlenÚappendrš   Zperf_counterr   r   r˜   )rD   Úframe)ÚNÚtimesr   r   r@   '  s    z5StressTest.measure_itimer_resolution.<locals>.handlerr   gü©ñÒMbP?c                s    g | ]}ˆ |d   ˆ |  ‘qS )r‘   r   )Ú.0Úi)rÀ   r   r   ú
<listcomp>5  s    z8StressTest.measure_itimer_resolution.<locals>.<listcomp>r‘   z,detected median itimer() resolution: %.6f s.)NN)rP   r   r   r˜   rº   rg   r¼   rš   r£   ÚrangeÚ
statisticsZmedianr   ÚverboseÚprint)r   r@   Z	durationsZmedr   )r¿   rÀ   r   Úmeasure_itimer_resolution#  s    
z$StressTest.measure_itimer_resolutionc             C   s4   |   ¡ }|dkrdS |dkr dS |  d|f ¡ d S )Ng-Cëâ6?i'  g{®Gáz„?éd   z^detected itimer resolution (%.3f s.) too high (> 10 ms.) on this platform (or system too busy))rÈ   rc   )r   Zresor   r   r   Údecide_itimer_count;  s    zStressTest.decide_itimer_countr   ztest needs setitimer()c                s  |   ¡ }g ‰ dd„ }d‡ fdd„	}|  tj|¡ |  tj|¡ |  tj|¡ d}t ¡ d }x||k rìt 	t 
¡ tj¡ |d7 }x&tˆ ƒ|k r¦t ¡ |k r¦t d	¡ q‚W t 	t 
¡ tj¡ |d7 }x&tˆ ƒ|k rèt ¡ |k rèt d	¡ qÄW q^W |  tˆ ƒ|d
¡ dS )z;
        This test uses dependent signal handlers.
        c             S   s   t  t jdt ¡ d  ¡ d S )Ngíµ ÷Æ°>gñhãˆµøä>)r   r   r˜   Úrandom)rD   r¾   r   r   r   Úfirst_handlerR  s    z@StressTest.test_stress_delivery_dependent.<locals>.first_handlerNc                s   ˆ   | ¡ d S )N)r½   )rD   r¾   )Úsigsr   r   Úsecond_handlerZ  s    zAStressTest.test_stress_delivery_dependent.<locals>.second_handlerr   g      .@r‘   gñhãˆµøä>zSome signals were lost)NN)rÊ   rº   r   r¡   r&   rg   rš   r›   r)   r}   Úgetpidr¼   r£   r   )r   r¿   rÌ   rÎ   Úexpected_sigsÚdeadliner   )rÍ   r   Útest_stress_delivery_dependentI  s&    
z)StressTest.test_stress_delivery_dependentc                sÄ   |   ¡ }g ‰ ‡ fdd„}|  tj|¡ |  tj|¡ d}t ¡ d }xh||k r¬t tjdt	 	¡ d  ¡ t
 t
 ¡ tj¡ |d7 }x&tˆ ƒ|k r¨t ¡ |k r¨t d¡ q„W qFW |  tˆ ƒ|d¡ d	S )
z>
        This test uses simultaneous signal handlers.
        c                s   ˆ   | ¡ d S )N)r½   )rD   r¾   )rÍ   r   r   r@   €  s    z=StressTest.test_stress_delivery_simultaneous.<locals>.handlerr   g      .@gíµ ÷Æ°>gñhãˆµøä>rw   zSome signals were lostN)rÊ   rº   r   r&   rg   rš   r›   r   r˜   rË   r)   r}   rÏ   r¼   r£   r   )r   r¿   r@   rÐ   rÑ   r   )rÍ   r   Ú!test_stress_delivery_simultaneousw  s    
z,StressTest.test_stress_delivery_simultaneousN)r   r   r   r¸   rº   rÈ   rÊ   rW   rn   rQ   r   rÒ   rÓ   r   r   r   r   r¹     s   -r¹   c               C   s   t  ¡  d S )N)r   Zreap_childrenr   r   r   r   ÚtearDownModule™  s    rÔ   Ú__main__)#r)   rË   r   rJ   rÅ   r|   r   Z	threadingrš   rW   r«   r   Ztest.support.script_helperr   r   rm   ÚImportErrorZTestCaser   rX   r   r   rn   r/   rC   rY   rQ   ro   rv   rˆ   r¥   r¹   rÔ   r   Úmainr   r   r   r   Ú<module>   sN   


M
 7
 @
T
e  B 
