最近更新: 2010-12-08

Python - Pipe 在 Signal 發生時的處理事項

當我在 Python 中結合 multiprocessing.Pipemultiprocessing.Processsignal 模組進行傳統的多工行程設計時,我注意到被擱置的管道資料讀取方法(read),在沒有取得資料的情形下就返回。使得父、子行程之間的互動過程不如預期。錯誤訊息顯示,讀取方法被系統中斷了(Interrupted system call)。read, signal, interrupted system call 這三個關鍵資訊,讓我直覺聯想到我碰到 EINTR 這個狀況了。當我在錯誤處理的流程中增加 errno 的判斷動作之後,我確認這就是 EINTR 狀況。

一位使用過 C 語言撰寫多工行程程式的程序員,對 EINTR 這件事絕不陌生。只是我未料想到會在 Python 之中再度碰到這位老朋友。對於這位老朋友,我們的招待原則請看《多工作業下的資料讀寫處理事項 - read()/write() 被 signal 中斷的處理 》。那篇處理事項中,提到 C 語言要準備兩招應付 EINTR 狀況。不過使用 Python 時則只需要第一招:如果錯誤是因為被 signal 中斷的話,就再讀一次,如果是其他原因導致的錯誤,則視為致命錯誤,應該中止程式繼續

典型的擱置式 I/O 處理模式

首先,我們先來看看典型的擱置式 I/O 處理模式。請看 blocking_io_without_signal.py。

import os,sys,multiprocessing,signal,time

def sub_process(pipe):
    while True:
        rc = pipe.recv()
        print("sub process recv: %s" % rc)
        if rc == "END":
            return


if __name__ == "__main__":
    rpipe, wpipe = multiprocessing.Pipe(False)

    sub_process = multiprocessing.Process(target=sub_process, args=(rpipe,))
    sub_process.start()
    time.sleep(5)
    wpipe.send("fool")
    time.sleep(5)
    wpipe.send("END")
    sub_process.join()

這個範例程式,規劃了一對管道端子,分別交給父行程與子行程持有。父行程寫入資料,子行程等待讀取資料。當子行程讀到一行 "END" 之後,則結束子行程。


$ python blocking_io_without_signal.py 
sub process recv: fool
sub process recv: END

程式執行5秒後輸出第一行 fool,再過5秒後輸出第二行 END。兩行程結束。

接著,我要往這個典型的處理程式中,加入訊號。最常見的狀況,就是應用 SIGALRM 實現計時器的場合。在這個場合中,等待管道輸入資料的擱置中讀取行為,將會因此被訊號 SIGALRM 提前打斷。加入 SIGALRM 計時器的範例程式: unsafe_blocking_io_with_signal.py。

import os,sys,multiprocessing,signal,time

def sub_process(pipe):
    def alarm_timer(signum, func):
        print("alarm(%d)" % signum)
        
    signal.signal(signal.SIGALRM, alarm_timer)
    signal.setitimer(signal.ITIMER_REAL, 1, 1)

    #signal.siginterrupt(signal.SIGALRM, False)
    # this does not work perfectly.

    while True:
        rc = pipe.recv()
        print("sub process recv: %s" % rc)
        if rc == "END":
            return


if __name__ == "__main__":
    rpipe, wpipe = multiprocessing.Pipe(False)
    # this will be interrupted by any signal.
    # If you are an expert Unix/POSIX programmer, you will invoke sigaction() 
    # with SA_RESTART to avoid this behaviour. That is modern way.
    # But Python does not provide this way, therefore you need to use 
    # traditional way.
    # See also:
    # http://bugs.python.org/msg102725

    sub_process = multiprocessing.Process(target=sub_process, args=(rpipe,))
    sub_process.start()
    time.sleep(5)
    wpipe.send("fool")
    time.sleep(5)
    wpipe.send("END")
    sub_process.join()

$ python unsafe_blocking_io_with_signal.py 
alarm(14)
Process Process-1:
Traceback (most recent call last):
  File "/usr/lib/python2.6/multiprocessing/process.py", line 232, in _bootstrap
    self.run()
  File "/usr/lib/python2.6/multiprocessing/process.py", line 88, in run
    self._target(*self._args, **self._kwargs)
  File "unsafe_blocking_io_with_signal.py", line 14, in sub_process
    rc = pipe.recv()
IOError: [Errno 4] Interrupted system call

執行後,你將發現子行程未如預期般地於5秒後讀取資料。事實上,它在執行一秒後,就被系統打斷了。原因就是系統觸發了訊號 SIGALRM

在我的試驗過程中,我曾嘗試使用 signal.siginterrupt() 抑制 SIGALRM 打斷擱置中的 I/O 行為之事。可惜效果不如預期,它仍然被打斷了。

實作較安全的 Pipe 替代品

Python 的 multiprocessing.Pipe 是一個工廠方法,它會生產一對管道讀寫端子。這對管道讀寫端子是 multiprocessing.Connection 的實體。我嘗試過幾種重構寫法後,採用的寫法是實作一個包覆 multiprocessing.Connection 的新類別 SignalSafeConnection 與一個替代的工廠方法 - SignalSafePipeSignalSafeConnection 將我需要的 I/O 處理方法,包裹在如果錯誤是因為被 signal 中斷的話,就再讀一次,如果是其他原因導致的錯誤,則視為致命錯誤,應該中止程式繼續的處理原則中。SignalSafePipe 則會生產一對 SignalSafeConnection 類的管道讀寫端子;我用它替代 multiprocessing.Pipe 方法。

完整的實作範例,為 safe_blocking_io_with_signal.py。

import os,sys,multiprocessing,signal,time

class SignalSafeConnection(object):
    def __init__(self, handle):
        self.handle = handle

    def send(self, obj):
        # this is a traditional skill of unix programming,
        # but be careful to use it.
        while True:
            try:
                rc = self.handle.send(obj)
            except (OSError, IOError) as e:
                if e.errno == 4:
                    print("SIGIO, Interrupted system call, restart")
                    continue
                else:
                    # unexcepted error, re-raise
                    raise
            else:
                break
        return rc

    def recv(self):
        while True:
            try:
                rc = self.handle.recv()
            except (OSError, IOError) as e:
                if e.errno == 4:
                    print("SIGIO, Interrupted system call, restart")
                    continue
                else:
                    # unexcepted error, re-raise
                    raise
            else:
                break
        return rc

    def fileno(self):
        return self.handle.fileno()
    
    def close(self):
        self.handle.close()
        self.handle = None
    
    def poll(self, timeout=0):
        end_time = time.time() + timeout
        while True:
            try:
                rc = self.handle.poll(timeout)
            except (OSError, IOError) as e:
                if e.errno == 4:
                    current_time = time.time()
                    if current_time > end_time:
                        rc = False
                        break
                    print("SIGIO, Interrupted system call, restart")
                    timeout = end_time - current_time
                    continue
                else:
                    # unexcepted error, re-raise
                    raise
            else:
                break
        return rc
    
    def send_bytes(self, buffer):
        while True:
            try:
                rc = self.handle.send_bytes(buffer)
            except (OSError, IOError) as e:
                if e.errno == 4:
                    print("SIGIO, Interrupted system call, restart")
                    continue
                else:
                    # unexcepted error, re-raise
                    raise
            else:
                break
        return rc

    def recv_bytes(self, maxlength):
        while True:
            try:
                rc = self.handle.send_bytes(buffer)
            except (OSError, IOError) as e:
                if e.errno == 4:
                    print("SIGIO, Interrupted system call, restart")
                    continue
                else:
                    # unexcepted error, re-raise
                    raise
            else:
                break
        return rc


def SignalSafePipe(duplex):
    pipe1, pipe2 = multiprocessing.Pipe(duplex)
    sspipe1 = SignalSafeConnection(pipe1)
    sspipe2 = SignalSafeConnection(pipe2)
    return sspipe1, sspipe2


def sub_process(pipe):
    def alarm_timer(signum, func):
        print("alarm(%d)" % signum)
        
    signal.signal(signal.SIGALRM, alarm_timer)
    signal.setitimer(signal.ITIMER_REAL, 1, 1)

    #signal.siginterrupt(signal.SIGALRM, False)
    # this does not work perfectly.

    while True:
        rc = pipe.recv()
        print("sub process recv: %s" % rc)
        if rc == "END":
            return


if __name__ == "__main__":
    #rpipe, wpipe = multiprocessing.Pipe(False)

    rpipe, wpipe = SignalSafePipe(False)

    sub_process = multiprocessing.Process(target=sub_process, args=(rpipe,))
    sub_process.start()
    time.sleep(5)
    wpipe.send("fool")
    time.sleep(5)
    wpipe.send("END")
    sub_process.join()

$ python safe_blocking_io_with_signal.py 
alarm(14)
SIGIO, Interrupted system call, restart
    ... (省略)
sub process recv: fool
alarm(14)
SIGIO, Interrupted system call, restart
    ... (省略)
sub process recv: END

程式執行5秒後輸出第一行 fool,再過5秒後輸出第二行 END。在等待的同時,計時器也會每秒觸發一次,這讓我們可以在等待的過程中,抽空做別的事情。這就是 process, pipe, signal 三者合作下進行多工作業程式設計的傳統技藝。

基於線程的並行模式

前幾節提到的基於訊號(signal)的並行模式,是傳統的設計模式。隨著現代作業系統逐漸實現線程(thread, 或稱執行緒)功能後,現代的並行設計模式主要是以線程來實現。

例如 unsafe_blocking_io_with_signal.py 的內容,在沒有意外的情況下,完全可以使用 threading.Timer 取代 signal.setitimer。如下列所示:

import os,sys,multiprocessing,signal,time,threading

def sub_process(pipe):
    def alarm_timer(interval):
        while True:
            print("sub process alarm")
            time.sleep(interval)
        
    #signal.signal(signal.SIGALRM, alarm_timer)
    #signal.setitimer(signal.ITIMER_REAL, 1, 1)
    t = threading.Timer(1, alarm_timer, args=(1,))
    t.start()

    while True:
        rc = pipe.recv()
        print("sub process recv: %s" % rc)
        if rc == "END":
            return


if __name__ == "__main__":
    rpipe, wpipe = multiprocessing.Pipe(False)

    sub_process = multiprocessing.Process(target=sub_process, args=(rpipe,))
    sub_process.start()
    time.sleep(5)
    wpipe.send("fool")
    time.sleep(5)
    wpipe.send("END")
    sub_process.join()

如果你的程式中不需要利用訊號處理並行作業,那麼 Python 可以保證線程在運行時,不會干擾到 Pipe 的擱置行為。你便不需要考慮 EINTR 的狀況。

後記

GIL?

上一節中,我強調在沒有意外的情況下可以用線程取代訊號實現並行作業,這是因為我很不幸地碰到了意外情況。當我使用 Python 設計一個稍微複雜的服務程式時,我在主行程上配置的 threading.Timer 沒有運作。儘管 active_thread()Timer.is_alive() 都顯示這個計時器還活著,但它就是沒有執行。直到子行程結束時,主行程的計時器才突然醒過來。我懷疑這是 Global Interpreter Lock 在搞鬼。最後我祭出了傳家法寶,用訊號實現我的需求。也因此與 EINTR 再會,並產生了這篇文章。

重構寫法

我原本嘗試用 Ruby 式 open class 寫法,直接修改 _multiprocessing.Connection 這個類別。然而 Python 向我抱怨這是一個原生的內容,不允許修改它的行為。即使我參考了《Opening python classes》的寫法也不行。Python 告訴我參考文章中使用的某些方法並不存在。我想或許是我用的 Python 版本比較舊吧。我用的是 Python 2.6。

預防行程無盡等待

當我們把 I/O 行為包入一個不被訊號所終止的迴圈時,我們要小心處理這個迴圈。因為我們缺乏資訊得知打斷擱置狀態的訊號是哪一個。這表示連使用者主動發出的訊號,也可能被忽視。例如你可能希望用 SIGALRM 增加一個逾時終止的功能,讓 SIGALRM 主動打斷一個等候太久的資料讀取行為。這時,我們的處理方式就會忽視這個訊號,導致行程陷入無盡地等待迴圈中。

這個狀況有解決方案,就是用 poll()。我們通常配合 poll() 使用 read(),可以指定最長擱置時間,以免行程陷入無盡地等待迴圈。

樂多舊網址: http://blog.roodo.com/rocksaying/archives/14644791.html

樂多舊回應
tim.yellow@gmail.com(tim) (#comment-21803321)
Wed, 08 Jun 2011 20:51:54 +0800
非常有用,搭配 python在 Library Reference 裡Signal那一節,就可以更了解signal 處理!