當我在 Python 中結合 multiprocessing.Pipe 、multiprocessing.Process 與 signal 模組進行傳統的多工行程設計時,我注意到被擱置的管道資料讀取方法(read),在沒有取得資料的情形下就返回。使得父、子行程之間的互動過程不如預期。錯誤訊息顯示,讀取方法被系統中斷了(Interrupted system call)。read, signal, interrupted system call 這三個關鍵資訊,讓我直覺聯想到我碰到 EINTR 這個狀況了。當我在錯誤處理的流程中增加 errno 的判斷動作之後,我確認這就是 EINTR 狀況。
一位使用過 C 語言撰寫多工行程程式的程序員,對 EINTR 這件事絕不陌生。只是我未料想到會在 Python 之中再度碰到這位老朋友。對於這位老朋友,我們的招待原則請看《多工作業下的資料讀寫處理事項 - read()/write() 被 signal 中斷的處理 》。那篇處理事項中,提到 C 語言要準備兩招應付 EINTR 狀況。不過使用 Python 時則只需要第一招:如果錯誤是因為被 signal 中斷的話,就再讀一次,如果是其他原因導致的錯誤,則視為致命錯誤,應該中止程式繼續 。
典型的擱置式 I/O 處理模式
首先,我們先來看看典型的擱置式 I/O 處理模式。請看 blocking_io_without_signal.py。
import os , sys , multiprocessing , signal , time
def sub_process ( pipe ):
while True :
rc = pipe . recv ()
print ( "sub process recv: %s" % rc )
if rc == "END" :
return
if __name__ == "__main__" :
rpipe , wpipe = multiprocessing . Pipe ( False )
sub_process = multiprocessing . Process ( target = sub_process , args = ( rpipe ,))
sub_process . start ()
time . sleep ( 5 )
wpipe . send ( "fool" )
time . sleep ( 5 )
wpipe . send ( "END" )
sub_process . join ()
這個範例程式,規劃了一對管道端子,分別交給父行程與子行程持有。父行程寫入資料,子行程等待讀取資料。當子行程讀到一行 "END" 之後,則結束子行程。
$ python blocking_io_without_signal.py
sub process recv: fool
sub process recv: END
程式執行5秒後輸出第一行 fool,再過5秒後輸出第二行 END。兩行程結束。
接著,我要往這個典型的處理程式中,加入訊號。最常見的狀況,就是應用 SIGALRM 實現計時器的場合。在這個場合中,等待管道輸入資料的擱置中讀取行為,將會因此被訊號 SIGALRM 提前打斷。加入 SIGALRM 計時器的範例程式: unsafe_blocking_io_with_signal.py。
import os , sys , multiprocessing , signal , time
def sub_process ( pipe ):
def alarm_timer ( signum , func ):
print ( "alarm(%d)" % signum )
signal . signal ( signal . SIGALRM , alarm_timer )
signal . setitimer ( signal . ITIMER_REAL , 1 , 1 )
#signal.siginterrupt(signal.SIGALRM, False)
# this does not work perfectly.
while True :
rc = pipe . recv ()
print ( "sub process recv: %s" % rc )
if rc == "END" :
return
if __name__ == "__main__" :
rpipe , wpipe = multiprocessing . Pipe ( False )
# this will be interrupted by any signal.
# If you are an expert Unix/POSIX programmer, you will invoke sigaction()
# with SA_RESTART to avoid this behaviour. That is modern way.
# But Python does not provide this way, therefore you need to use
# traditional way.
# See also:
# http://bugs.python.org/msg102725
sub_process = multiprocessing . Process ( target = sub_process , args = ( rpipe ,))
sub_process . start ()
time . sleep ( 5 )
wpipe . send ( "fool" )
time . sleep ( 5 )
wpipe . send ( "END" )
sub_process . join ()
$ python unsafe_blocking_io_with_signal.py
alarm(14)
Process Process-1:
Traceback (most recent call last):
File "/usr/lib/python2.6/multiprocessing/process.py", line 232, in _bootstrap
self.run()
File "/usr/lib/python2.6/multiprocessing/process.py", line 88, in run
self._target(*self._args, **self._kwargs)
File "unsafe_blocking_io_with_signal.py", line 14, in sub_process
rc = pipe.recv()
IOError: [Errno 4] Interrupted system call
執行後,你將發現子行程未如預期般地於5秒後讀取資料。事實上,它在執行一秒後,就被系統打斷了。原因就是系統觸發了訊號 SIGALRM 。
在我的試驗過程中,我曾嘗試使用 signal.siginterrupt() 抑制 SIGALRM 打斷擱置中的 I/O 行為之事。可惜效果不如預期,它仍然被打斷了。
實作較安全的 Pipe 替代品
Python 的 multiprocessing.Pipe 是一個工廠方法,它會生產一對管道讀寫端子。這對管道讀寫端子是 multiprocessing.Connection 的實體。我嘗試過幾種重構寫法後,採用的寫法是實作一個包覆 multiprocessing.Connection 的新類別 SignalSafeConnection 與一個替代的工廠方法 - SignalSafePipe 。
SignalSafeConnection 將我需要的 I/O 處理方法,包裹在如果錯誤是因為被 signal 中斷的話,就再讀一次,如果是其他原因導致的錯誤,則視為致命錯誤,應該中止程式繼續 的處理原則中。SignalSafePipe 則會生產一對 SignalSafeConnection 類的管道讀寫端子;我用它替代 multiprocessing.Pipe 方法。
完整的實作範例,為 safe_blocking_io_with_signal.py。
import os , sys , multiprocessing , signal , time
class SignalSafeConnection ( object ):
def __init__ ( self , handle ):
self . handle = handle
def send ( self , obj ):
# this is a traditional skill of unix programming,
# but be careful to use it.
while True :
try :
rc = self . handle . send ( obj )
except ( OSError , IOError ) as e :
if e . errno == 4 :
print ( "SIGIO, Interrupted system call, restart" )
continue
else :
# unexcepted error, re-raise
raise
else :
break
return rc
def recv ( self ):
while True :
try :
rc = self . handle . recv ()
except ( OSError , IOError ) as e :
if e . errno == 4 :
print ( "SIGIO, Interrupted system call, restart" )
continue
else :
# unexcepted error, re-raise
raise
else :
break
return rc
def fileno ( self ):
return self . handle . fileno ()
def close ( self ):
self . handle . close ()
self . handle = None
def poll ( self , timeout = 0 ):
end_time = time . time () + timeout
while True :
try :
rc = self . handle . poll ( timeout )
except ( OSError , IOError ) as e :
if e . errno == 4 :
current_time = time . time ()
if current_time > end_time :
rc = False
break
print ( "SIGIO, Interrupted system call, restart" )
timeout = end_time - current_time
continue
else :
# unexcepted error, re-raise
raise
else :
break
return rc
def send_bytes ( self , buffer ):
while True :
try :
rc = self . handle . send_bytes ( buffer )
except ( OSError , IOError ) as e :
if e . errno == 4 :
print ( "SIGIO, Interrupted system call, restart" )
continue
else :
# unexcepted error, re-raise
raise
else :
break
return rc
def recv_bytes ( self , maxlength ):
while True :
try :
rc = self . handle . send_bytes ( buffer )
except ( OSError , IOError ) as e :
if e . errno == 4 :
print ( "SIGIO, Interrupted system call, restart" )
continue
else :
# unexcepted error, re-raise
raise
else :
break
return rc
def SignalSafePipe ( duplex ):
pipe1 , pipe2 = multiprocessing . Pipe ( duplex )
sspipe1 = SignalSafeConnection ( pipe1 )
sspipe2 = SignalSafeConnection ( pipe2 )
return sspipe1 , sspipe2
def sub_process ( pipe ):
def alarm_timer ( signum , func ):
print ( "alarm(%d)" % signum )
signal . signal ( signal . SIGALRM , alarm_timer )
signal . setitimer ( signal . ITIMER_REAL , 1 , 1 )
#signal.siginterrupt(signal.SIGALRM, False)
# this does not work perfectly.
while True :
rc = pipe . recv ()
print ( "sub process recv: %s" % rc )
if rc == "END" :
return
if __name__ == "__main__" :
#rpipe, wpipe = multiprocessing.Pipe(False)
rpipe , wpipe = SignalSafePipe ( False )
sub_process = multiprocessing . Process ( target = sub_process , args = ( rpipe ,))
sub_process . start ()
time . sleep ( 5 )
wpipe . send ( "fool" )
time . sleep ( 5 )
wpipe . send ( "END" )
sub_process . join ()
$ python safe_blocking_io_with_signal.py
alarm(14)
SIGIO, Interrupted system call, restart
... (省略)
sub process recv: fool
alarm(14)
SIGIO, Interrupted system call, restart
... (省略)
sub process recv: END
程式執行5秒後輸出第一行 fool,再過5秒後輸出第二行 END。在等待的同時,計時器也會每秒觸發一次,這讓我們可以在等待的過程中,抽空做別的事情。這就是 process, pipe, signal 三者合作下進行多工作業程式設計的傳統技藝。
基於線程的並行模式
前幾節提到的基於訊號(signal)的並行模式,是傳統的設計模式。隨著現代作業系統逐漸實現線程(thread, 或稱執行緒)功能後,現代的並行設計模式主要是以線程來實現。
例如 unsafe_blocking_io_with_signal.py 的內容,在沒有意外的情況下 ,完全可以使用 threading.Timer 取代 signal.setitimer 。如下列所示:
import os , sys , multiprocessing , signal , time , threading
def sub_process ( pipe ):
def alarm_timer ( interval ):
while True :
print ( "sub process alarm" )
time . sleep ( interval )
#signal.signal(signal.SIGALRM, alarm_timer)
#signal.setitimer(signal.ITIMER_REAL, 1, 1)
t = threading . Timer ( 1 , alarm_timer , args = ( 1 ,))
t . start ()
while True :
rc = pipe . recv ()
print ( "sub process recv: %s" % rc )
if rc == "END" :
return
if __name__ == "__main__" :
rpipe , wpipe = multiprocessing . Pipe ( False )
sub_process = multiprocessing . Process ( target = sub_process , args = ( rpipe ,))
sub_process . start ()
time . sleep ( 5 )
wpipe . send ( "fool" )
time . sleep ( 5 )
wpipe . send ( "END" )
sub_process . join ()
如果你的程式中不需要利用訊號處理並行作業,那麼 Python 可以保證線程在運行時,不會干擾到 Pipe 的擱置行為。你便不需要考慮 EINTR 的狀況。
後記
GIL?
上一節中,我強調在沒有意外的情況下 可以用線程取代訊號實現並行作業,這是因為我很不幸地碰到了意外情況。當我使用 Python 設計一個稍微複雜的服務程式時,我在主行程上配置的 threading.Timer 沒有運作。儘管 active_thread() 與 Timer.is_alive() 都顯示這個計時器還活著,但它就是沒有執行。直到子行程結束時,主行程的計時器才突然醒過來。我懷疑這是 Global Interpreter Lock 在搞鬼。最後我祭出了傳家法寶,用訊號實現我的需求。也因此與 EINTR 再會,並產生了這篇文章。
重構寫法
我原本嘗試用 Ruby 式 open class 寫法,直接修改 _multiprocessing.Connection 這個類別。然而 Python 向我抱怨這是一個原生的內容,不允許修改它的行為。即使我參考了《Opening python classes 》的寫法也不行。Python 告訴我參考文章中使用的某些方法並不存在。我想或許是我用的 Python 版本比較舊吧。我用的是 Python 2.6。
預防行程無盡等待
當我們把 I/O 行為包入一個不被訊號所終止的迴圈時,我們要小心處理這個迴圈。因為我們缺乏資訊得知打斷擱置狀態的訊號是哪一個。這表示連使用者主動發出的訊號,也可能被忽視。例如你可能希望用 SIGALRM 增加一個逾時終止的功能,讓 SIGALRM 主動打斷一個等候太久的資料讀取行為。這時,我們的處理方式就會忽視這個訊號,導致行程陷入無盡地等待迴圈中。
這個狀況有解決方案,就是用 poll() 。我們通常配合 poll() 使用 read() ,可以指定最長擱置時間,以免行程陷入無盡地等待迴圈。
樂多舊網址: http://blog.roodo.com/rocksaying/archives/14644791.html
樂多舊回應