最近更新: 2011-01-03

如何以 semaphore 進行資源的共用互斥鎖定

在 Unix 系統中,資源的共用機制,通常以檔案鎖最為常見,也最為容易使用,只要使用 flock() 即可對檔案進行 LOCK_SH (共享鎖定)、 LOCK_EX (互斥鎖定)。但是其他的資源就沒這麼方便,例如共享記憶體完全沒有提供鎖定功能,此時就必須借助其他的行程間通訊機制進行協調,例如「號誌」(semaphore)。

本文即在說明,如何以號誌實作一個如 flock() 具有 LOCK_SH, LOCK_EX, LOCK_UN, LOCK_NB 四種參數的「資源共用互斥鎖定函數」。

這是一篇我寫的舊文章。因為前陣子要對其他人說明共用與排他的資源鎖定觀念,所以又翻了出來,重新排版。

關於 semaphore 的中文譯名,我當初(1999年)是依《新洞悉UNIX系列叢書 - 系統程式設計篇》的用法。它將 semaphore 譯為「信號」,而 signal 譯為「訊號」。不過這兩個名詞實在很容易混淆,故我現在改用「號誌」稱呼 semaphore 。

實作說明

我將運用 semget(), semctl(), semop() 實作一個 lock_shm() 的資源共用互斥鎖定函數。它的行為模仿 flock(),定義了下列四個上鎖行為。

  • LOCK_SH - 加上共享鎖定(讀取鎖定)
  • LOCK_EX - 加上排他鎖定(寫入鎖定)
  • LOCK_NB - 不擱置( non-blocking ,配合前兩者使用 )
  • LOCK_UN - 解除鎖定

此函數提供了一個近似 flock() 行為的鎖定函數。lock_shm() 並不是直接對共享記憶體上鎖。事實上 Unix 並沒有鎖定共享記憶體的動作,所以我是用號誌的互斥動作對共享記憶體上鎖。換言之,lock_shm() 不只是個「共享記憶體鎖」,更是一個「共用互斥資源鎖」。「共用互斥」之英文為: Mutual exclusion - 可共用、也可排他。可以很多人一起使用;也可排除他人只給自已用。

BSD 家族使用 mmap() 將檔案內容映射到記憶體,藉此達成行程間共用記憶體的目的。由於此機制是透過檔案描述子 (fd) 實現,所以可以對檔案描述子上鎖。從這些 IPC 機制間的差異,可以略約窺探到 SVR 家族與 BSD 家族之設計理念的差異。
上鎖流程示意圖
上鎖流程示意圖

上圖假設有 1 、 2 、 3 三個行程打算上鎖的四個階段。 其中被紅色方框圍住的動作,表示被擱置了。 擱置動作是由作業系統處理的。

  1. 行程 1 想加上共享鎖(SHLOCK)。
    1. 行程 1 先鎖住 EXLOCK ,再加上 SHLOCK 。
    2. 行程 2 想鎖住 EXLOCK ,但被擱置。
    3. 行程 3 想鎖住 EXLOCK ,但被擱置。
  2. 行程 1 完成上鎖動作。
    1. 行程 1 加上 SHLOCK 後,解開 EXLOCK ,完成上鎖動作。
    2. 行程 2 鎖住 EXLOCK ,但被擱置,等 行程 1 之前加上的 SHLOCK 解開。
    3. 行程 3 依然被擱置在等待鎖住 EXLOCK 的地方。
  3. 行程 1 解鎖,行程 2 結束等待,完成上鎖動作。
    1. 行程 1 解開 SHLOCK 。
    2. 行程 2 已無 SHLOCK , 行程 2 結束擱置狀態,完成上鎖動作。
    3. 行程 3 依然被擱置在等待鎖住 EXLOCK 的地方。
  4. 行程 2 解鎖,行程 3 結束等待,完成上鎖。
    1. 行程 2 解開 EXLOCK 。
    2. 行程 3 鎖住 EXLOCK ,再加上 SHLOCK ,最後解開 EXLOCK ,完成上鎖動作。
實作方式

因為同時要應付 LOCK_EX 及 LOCK_SH 兩種情形,故需要兩個號誌為一組。 一個號誌控制排他鎖(EXLOCK - exclusion lock),另一個號誌控制共用鎖(SHLOCK - shared lock)。

因為排他鎖的特性是,一次只能用一個,故此號誌的初值為 1。 而共用鎖則是可以同時好幾個使用者共用,故此號誌的初值設為 0 。

還有一點要注意的是,排他鎖與共用鎖不會同時存在,當共用鎖被鎖上時,想鎖上排他鎖的動作就會被擱置,等到共用鎖全部都解除之後,才能鎖上排他鎖。同樣地,後來的共用鎖上鎖動作,又必須等待前一個排隊中的排他鎖上鎖動作解除後才可以鎖上。 亦即後來的排他鎖必須等前面的共用鎖全部解除;後來的共用鎖必須等前面的排他鎖解除。

如何鎖上共用鎖
  1. 鎖上排他鎖: EXLOCK.sem_op -1 。
    等待先前的排他鎖解除,並防止後來的上鎖動作插隊。當 EXLOCK 等於 0 時,試圖將其減一的動作,就會被系統擱置,直到 EXLOCK 的值大於等於 -1 的絕對值。
  2. 鎖上共用鎖: SHLOCK.sem_op +1 。
    因為共用鎖可以同時鎖上多個,所以上鎖動作是用加的。 SHLOCK 的值,會等於目前取得共用權利的程序的數目。
  3. 解除排他鎖: EXLOCK.sem_op +1 。
    讓後來的上鎖動作可以繼續動作。 但是後到的 LOCK_EX 將會等待 SHLOCK 的值變為 0。而在它等待的時間裡,後到的 LOCK_SH 將會等待它解除 EXLOCK
如何鎖上排他鎖
  1. 鎖上排他鎖: EXLOCK.sem_op -1 。
    如果 EXLOCK 等於 1 ,則減一的動作會使 EXLOCK 變為 0 (1 + -1 = 0)。如果 EXLOCK 等於 0 ,則試圖減一的動作將會被擱置。
  2. 等待共用鎖全部解除: SHLOCK.sem_op 等於 0。
    等待 SHLOCK 的值變為 0 ,亦即所有的共用鎖都已解除。 我第一步都先鎖上排他鎖,就是為了防止後來的上鎖動作插隊。
如何解除鎖定

這講起來比較麻煩,我就懶得細述了,看個人理解能力吧。

  1. if ( EXLOCK =1 and SHLOCK > 0 ) then SHLOCK -1 (解除 SHLOCK)
  2. if ( EXLOCK =0 and has process wait SHLOCK be zero) then SHLOCK -1 (解除 SHLOCK)
  3. if ( EXLOCK =0 but no process wait SHLOCK be zero) then EXLOCK +1 (解除 EXLOCK)

實作內容

程式是我從我的一個舊軟體 (Firebird BBS library) 中的 bcache.c 中擷取出來的。bcache.c 是一個共享記憶體使用模組,並利用號誌來實作其中的 lock_shm() 功能。舊文的內容已經不能編譯了,故我這次改版時順便改寫了程式碼,並在 Ubuntu 10.04 下編譯實測過。

/* Firebird BBS, The TIP Project. 資料快取模組: bcache.c */
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
#include <sys/file.h>
#include <sys/ipc.h>
//#include "bbsdefs.h"
//#include "config.h"
//#include "struct.h"
#include <sys/sem.h>

#if !defined(LOCK_SH)
    #define LOCK_SH		1	/* shared lock */
#endif
#if !defined(LOCK_EX)
    #define LOCK_EX		2	/* exclusive lock */
#endif
#if !defined(LOCK_NB)
    #define LOCK_NB		4	/* don't block when locking */
#endif
#if !defined(LOCK_UN)
    #define LOCK_UN		8	/* unlock */
#endif

/* write by rock
  求 IPC 鍵值。
  須傳入一檔案名稱,若該檔不存在, IPC_key() 會自動建立。

  RC: 成功(鍵值)、失敗(-1)
  See also: stat(), ftok()
*/
static int IPC_key(const char*ipcname) {
    struct stat st;
    int fd;
    if( stat(ipcname, &st) == 0 && S_ISREG( st.st_mode) )
        return st.st_ino;
    if((fd=creat(ipcname, 0644))<0)
        return -1;
    fstat(fd, &st);
    close(fd);
    return st.st_ino; // 以 i_node 的值做為 IPC key
    //不理會 SYSV 的 ftok() 函數。
    // ftok() 是 SVR 所提供的函數,在 POSIX 及 BSD 中沒有。
    //return ftok(ipcname,proj);
}

/*
  NAME: lock_shm() 鎖定共享記憶體。

    LOCK_SH 加上共享鎖定(讀取鎖定)
    LOCK_EX 加上互斥鎖定(寫入鎖定)
    LOCK_NB 不擱置( non-blocking ,配合前兩者使用 )
    LOCK_UN 解除鎖定

  此函數提供了一個近似 flock() 行為的 shm 鎖定函數。

  RC: 成功(0)、失敗(-1)
  See also: semget(), semctl(),  semop()
*/
int lock_shm(const char*shmname,int op) {
    enum { EXLOCK,  SHLOCK} lock;
    struct sembuf lockop={0, 0, SEM_UNDO} /* sem 操作指令*/;
    int semkey,  semid;
    ushort sem_val[] = {
        1 /*Init value of EXLOCK*/,
        0 /*Init value of SHLOCK*/ };

    if( (semkey=IPC_key(shmname)) < 0) //嘗試建立 sem
        return -1;

    if( (semid=semget(semkey,  2, IPC_CREAT|IPC_EXCL|0640)) >= 0) {
        if( semctl(semid, 0, SETALL, sem_val) < 0)	// 初始 sem 的值
            return -1;
    }
    else {
        if((semid=semget(semkey,1,0)) < 0) //sem 可能建立了,取得semid
            return -1;
    }

    /*{
    ushort sval[2];
    semctl(semid,0,GETALL,sval);
    printf("[%d] Value  of EXLOCK:%d, Value of SHLOCK:%d\n",
        semid, sval[EXLOCK], sval[SHLOCK]);
    }*/

    /*
    non-blocking 鎖定,只要多設一個旗標 IPC_NOWAIT 即達目的。
    連傳回值也一樣:當無法立即取得對 sem 的操作時, semop() 不會被擱置,
    會立即返回(傳回 -1 ),並設定 errno 為 EAGAIN 。
    */
    if( op & LOCK_NB )
        lockop.sem_flg |= IPC_NOWAIT;

    if( op & LOCK_EX ) {
        lockop.sem_num = EXLOCK;
        lockop.sem_op = -1;

        if( semop(semid,  &lockop, 1) < 0 )
            return -1;
        //printf("EXLOCK... wait SHLOCK be zero\n");
        lockop.sem_num  = SHLOCK;
        lockop.sem_op = 0;
        return semop(semid, &lockop, 1);
    }
    else if( op &  LOCK_SH ) {
        lockop.sem_num = EXLOCK;
        lockop.sem_op = -1;
        if( semop(semid, &lockop,  1) < 0 )
            return -1;
        //printf("EXLOCK... wait to get SHLOCK\n");
        lockop.sem_num  = SHLOCK;
        lockop.sem_op = 1;
        if( semop(semid, &lockop, 1) < 0 )
            return -1;
        //printf("SHLOCK  ok\n");
        lockop.sem_num = EXLOCK;
        lockop.sem_op = 1;
        //printf("Release EXLOCK\n");
        return semop(semid, &lockop, 1);
    }

    //以下動作皆為 LOCK_UN
    if( semctl(semid, 0, GETALL,  sem_val) < 0 )
        return -1;
    //printf("LOCK_UN, EXLOCK:%d, SHLOCK:%d\n",sem_val[0],  sem_val[1]);

    /*
    如果沒有程序在等待取得鎖定的話,那解除鎖定的動作,將會是"刪除 sem ",而不
    是"改變 sem 的值"。
    如果有程序等待取得 LOCK_EX 的話,那就是在等待 SHLOCK 的值變為 0 。
    如果有程序等待取得 LOCK_SH 的話,那就是在等待 EXLOCK 的值增加。
    */
    if( semctl(semid, SHLOCK, GETZCNT, 0) == 0 &&
        semctl(semid, EXLOCK, GETNCNT, 0)  == 0 )
    {
        //printf("may remove...");
        if( sem_val[EXLOCK] <= 1 && sem_val[SHLOCK]  == 0) {
            /*printf("remove sem %d,Z:%d,N:%d\n", semid,
              semctl(semid,SHLOCK,GETZCNT,0),
              semctl(semid,EXLOCK,GETNCNT,0));
            */
           return semctl(semid, 0, IPC_RMID, 0);
        }
    }

    if( sem_val[EXLOCK] >0 && sem_val[SHLOCK]  > 0 ) {
        lockop.sem_num = SHLOCK;
        lockop.sem_op = -1;
        //printf("EXLOCK >0 and SHLOCK  >0: UNLOCK SHLOCK\n");
        return semop(semid, &lockop, 1);
    }
    else if( sem_val[EXLOCK]  <= 0 ) {
        if( semctl(semid, SHLOCK, GETZCNT, NULL) >0 ) {
            lockop.sem_num = SHLOCK;
            lockop.sem_op = -1;
            //printf("UNLOCK SHLOCK\n");
        }
        else {
            lockop.sem_num = EXLOCK;
            lockop.sem_op = 1;
            //printf("UNLOCK EXLOCK\n");
        }
        return semop(semid, &lockop,  1);
    }
    return 0;
}

因為實作上的技術問題,一個設計不良的解鎖動作(LOCK_UN),可能會破壞掉目前正在等待取得鎖定的程序隊伍。 例如: 行程a 尚未鎖上任何排他鎖或共用鎖,卻要求解鎖。此時會誤將行程b 加上的鎖給解除了。如果此時有另一行程c 正等待行程b 解除鎖定,就會誤以為行為b 已經解除鎖定(實際上是被行程a 越過界給解除了),開始存取資源,造成行程b 和行程c 互搶資源。

這個問題在技術上並不難克服。只是我懶得做了。

以下是測試程式:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#if !defined(LOCK_SH)
    #define LOCK_SH		1	/* shared lock */
#endif
#if !defined(LOCK_EX)
    #define LOCK_EX		2	/* exclusive lock */
#endif
#if !defined(LOCK_NB)
    #define LOCK_NB		4	/* don't block when locking */
#endif
#if !defined(LOCK_UN)
    #define LOCK_UN		8	/* unlock */
#endif
int lock_shm(const char*shmname,int op);

void dump(char *prog) {
    char buf[80];
    sprintf(buf,"dump[%s].",prog);
    write(1, buf, strlen(buf));
    sleep(1);
}

void main(int argc, char* argv[]) {
    char  shmname[] = "TEST_BCACHE";
    int i;
    char *prog;
    prog= argv[1];
    printf("[%s]LOCK_EX:%d\n",  prog, lock_shm(shmname,LOCK_EX) );
    for(i = 0; i < 5; i++)
        dump(prog);
    printf("\n[%s]LOCK_UN:%d\n",  prog, lock_shm(shmname,LOCK_UN) );
    printf("[%s]LOCK_SH:%d\n", prog, lock_shm(shmname,LOCK_SH)  );
    for(i =0; i <5; i++)
        dump(prog);
    printf("\n[%s]LOCK_UN:%d\n", prog, lock_shm(shmname,LOCK_UN)  );
    printf("[%s]LOCK_EX:%d\n", prog, lock_shm(shmname,LOCK_EX) );
    for(i =0; i <5;  i++)
        dump(prog);
    printf("\n[%s]LOCK_UN:%d\n", prog, lock_shm(shmname,LOCK_UN)  );
}

執行範例:

$ gcc -o testlock testlock.c lock_shm.c
$ ./testlock a & ./testlock b &
[1] 3285
[2] 3286
$ [a]LOCK_EX:0
dump[a].dump[a].dump[a].dump[a].dump[a].
[a]LOCK_UN:0
[b]LOCK_EX:0
dump[b].dump[b].dump[b].dump[b].dump[b].
[b]LOCK_UN:0
[a]LOCK_SH:0
dump[a].[b]LOCK_SH:0
dump[b].dump[a].dump[b].dump[a].dump[b].dump[a].dump[b].dump[a].dump[b].
[a]LOCK_UN:0

[b]LOCK_UN:0
[a]LOCK_EX:0
dump[a].dump[a].dump[a].dump[a].dump[a].
[a]LOCK_UN:0
[b]LOCK_EX:0
dump[b].dump[b].dump[b].dump[b].dump[b].
[b]LOCK_UN:0

[1]-  Done                    ./testlock a
[2]+  Done                    ./testlock b

先跑幾隻丟到背景,等執行一部份後,再跑幾隻到背景,就可以觀察數隻程式利用號誌協調執行順序的情形了。以前面的執行範例為例,行程a 先取得了排他鎖,所以一開始只會看到 dump[a] 。接著換行程b 取得排他鎖,所以接著只會看到 dump[b] 。再接著,行程a 與 b 取得共用鎖,所以 dump[a], dump[b] 將交錯出現。

參考文件

  • 新洞悉UNIX系列叢書 - 系統程式設計篇, 劉祖亮著,和碩科技出版
    第 414 到 419 頁。
  • semget()
  • semctl()
  • semop()

這三個函式,我在 Solaris, Linux 和 FreeBSD 都使用過。都被納入 POSIX 規格之中,具有可攜性。

樂多舊網址: http://blog.roodo.com/rocksaying/archives/14848695.html