5 3 並行程式的潛在問題(一) - ianchen0119/AwesomeCS GitHub Wiki

建立 Thread 的成本遠低於 Process 的成本,執行 Context switch 時的效能也有顯著的差異。既然並行程式這麼好,為什麼資工系不在一開始就將相關技巧教給學生、甚至是到畢業都沒學到呢? 很顯然的,當執行緒一多時,如果開發者的頭腦不夠清楚便會產出各式各樣的問題以及臭蟲,本篇文章整理了一些並行程式的眉眉角角,歡迎大家一同學習並給予指教。

Race condition

Race condition 用來描述一個系統或者 Process 的輸出依賴於不受控制的事件出現順序、出現時機。 像是: 有多個 Process 嘗試存取同一個記憶體位置,如果沒有處理好,就有可能發生無法預期的執行結果,這種情況容易發生在錯誤的後端程式、資料庫、檔案系統設計或是其他採用多執行緒設計的程式。

範例

考慮以下程式碼:

#include <pthread.h>
void* myfunc(void* ptr) {
    int i = *((int *) ptr);
    printf("%d ", i);
    return NULL;
}

int main() {
    int i;
    pthread_t tid;
    for(i =0; i < 10; i++) {
        pthread_create(&tid, NULL, myfunc, &i);
    }
    pthread_exit(NULL);
}

上面的範例使用 POSIX Thread 建立 10 個執行緒,其程式目的是希望能夠接連印出 0 - 9 的數字。 實際上,其執行結果與預期行為大有不同!程式執行後,印出的結果會在不同數字間跳動,其原因在查看 pthread_create() 的定義後便能輕鬆找出:

int pthread_create(pthread_t *thread, const pthread_attr_t *attr,
                          void *(*start_routine) (void *), void *arg);

我們可以看到,傳遞到子執行緒的參數必須是 void 指標,也就代表,我們並不是傳遞實體數字給子執行緒,是只交給他參數的記憶體位址而已。 也因為這樣,當作業系統在各執行緒間反覆切換時各個子執行緒就會同時操作同一個記憶體的資料,無法確保執行緒的執行順序,這個情況就是 Race condition

解決辦法

SystemProgramming 的教材中也提供了問題的解法,其解題思維就是避免多個執行緒同時存取相同的記憶體。 首先,先定義一個結構:

struct T {
  pthread_t id;
  int start;
  char result[100];
};

定義好後,進行記憶體分配:

struct T *info = calloc(10 , sizeof(struct T));

做到這一步,就已經為即將建立的 10 個子執行緒保留好位置了,接著我們可以將相關參數傳給 pthread_create() 做執行緒的建立工作:

pthread_create(&info[i].id, NULL, func, &info[i]);

Mutex lock && Condition variables

在介紹 Mutex lock 與 Condition variable 之前,我們要先了解他的應用場景。

Critical sections

Critical sections 代表某程式區段只能在同一時間被一個執行緒處理,如果有多個執行緒同時執行了這段程式碼,可能會有超出預期的錯誤行為出現。

取自 CS:APP Concurrent Programming, Page 40-56 。

範例

考慮以下程式碼:

int sum = 0;

void *countgold(void *param)
{
    int i;
    for (i = 0; i < 10000000; i++)
    {
        sum += 1;
    }
    return NULL;
}

該程式碼若是在單一執行緒上工作,並不會出現問題。 不過!如果使用 POSIX Thread 建立多個執行緒處理 countgold() 時,由於多個執行緒同時存取同一個記憶體的資料 (在這個範例中為 sum ),便會造成前面提到的 Race condition。

解決方法

要避免 Race condition,我們只需要預防 Critical sections 在多個執行緒同時執行,我們先標示出程式中的 Critical sections:

// ...
{
    for (i = 0; i < 10000000; i++)
    {
        // critical section!
        sum += 1;
        // end
    }
}
// ...    

問題的解決辦法就是: 在存取 sum 變數行為的前面加上一道鎖,任何執行緒訪問他之前都需要將鎖上鎖,等到操作完再由上鎖的執行緒做解鎖。 知道了鎖的需求以及應用面後,我們就可以來認識 Mutex lock 啦!

Mutex lock API

使用 Mutex lock 前須詳閱公開說明書,互斥鎖有以下特性:

  • 誰負責上鎖就由誰負責解鎖
  • 在銷毀鎖之前,必須確保沒有執行緒被這個鎖 block
  • Mutex lock 有專屬的 type: pthread_mutex_t
#include <pthread.h>
pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER; // 初始化-法1
pthread_mutex_init(&m, NULL); // 初始化-法2
pthread_mutex_lock(&m);   // 上鎖
pthread_mutex_unlock(&m); // 解鎖
pthread_mutex_destroy(&m) // 銷毀鎖

使用 PTHREAD_MUTEX_INITIALIZER 初始化時,必須確保鎖是全域變數!

掌握 Mutex lock 後,我們將上面的範例稍做修改:

int sum = 0;
pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER;
void *countgold(void *param)
{
    int i;
    pthread_mutex_lock(&m);
    for (i = 0; i < 10000000; i++)
    {
        sum += 1;
    }
    pthread_mutex_unlock(&m);
    return NULL;
}

上面的範例會確保執行緒執行完 10000000 次的迴圈才做解鎖,當然,我們也可以換個做法:

void *countgold(void *param)
{
    int i;
    for (i = 0; i < 10000000; i++)
    {
        pthread_mutex_lock(&m);
        sum += 1;
        pthread_mutex_unlock(&m);
    }
    return NULL;
}

這樣的做法同樣能預防競爭危害,不過執行時間會更長。

注意事項!

使用 mutex lock 時,多個執行緒仍然會因為作業系統排程不斷的做 Context switch ,概念有點像是:

  1. Main-Thread: 等待所有 user-level 的 thread 都 join。
  2. Thread-1: 我現在將 sum 給上鎖了,正在執行操作....。
  3. Thread-2: 我沒辦法對 sum 操作,繼續等吧。
  4. Main-Thread: 等待所有 user-level 的 thread 都 join。
  5. Thread-1: 我現在將 sum 給上鎖了,正在執行操作....。
  6. Thread-2: 我沒辦法對 sum 操作,繼續等吧。
  7. Main-Thread: 等待所有 user-level 的 thread 都 join。
  8. Thread-1: 我對 sum 的操作結束了,解鎖並 Join 。
  9. Thread-2: 現在沒上鎖,我要上鎖了。
  10. Main-Thread: 等待所有 user-level 的 thread 都 join。
  11. Thread-2: 我現在將 sum 給上鎖了,正在執行操作....。
  12. Thread-2: 我對 sum 的操作結束了,解鎖並 Join。
  13. Main-Thread: 所有 user-level 的 thread 都 join 了,程式結束。

不只如此,程式設計者需要確保多個執行緒嘗試上的是同一個鎖!

#include <stdio.h>
#include <pthread.h>
int sum = 0;
pthread_mutex_t m1 = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t m2 = PTHREAD_MUTEX_INITIALIZER;
void *countgold1(void *param)
{
    int i;
    pthread_mutex_lock(&m1);
    for (i = 0; i < 10000000; i++)
    {
        sum += 1;
    }
    pthread_mutex_unlock(&m1);
    return NULL;
}
void *countgold2(void *param)
{
    int i;
    pthread_mutex_lock(&m2);
    for (i = 0; i < 10000000; i++)
    {
        sum += 1;
    }
    pthread_mutex_unlock(&m2);
    return NULL;
}
int main() {
    pthread_t tid1, tid2;
    pthread_create(&tid1, NULL, countgold1, NULL);
    pthread_create(&tid2, NULL, countgold2, NULL);

    //Wait for both threads to finish:
    pthread_join(tid1, NULL);
    pthread_join(tid2, NULL);

    printf("ARRRRG sum is %d\n", sum);
    return 0;
}

在上面範例中,執行緒 1 以及 2 都各自上了鎖,兩者同時開心的對 sum 變數執行操作,race condition 的狀況一樣發生在該程式身上。

Condition variables

Condition variables 通常會跟 Mutex 搭配使用。

int pthread_cond_init(pthread_cond_t *cond,const pthread_condattr_t  *attr);
int pthread_cond_destroy(pthread_cond_t *cond);
int pthread_cond_wait(pthread_cond_t *cond,pthread_mutex_t *mutex);
int pthread_cond_signal(pthread_cond_t *cond);
int pthread_cond_broadcast(pthread_cond_t *cond); // all threads waiting on a condition need to be woken up

解釋:

  1. 檢查 mutex 是否上鎖?若否則上鎖並向下執行
  2. 使用 Condition variables 時,我們會設立一些判斷條件,視情況繼續執行或是呼叫 pthread_cond_wait() 解鎖並進入 waiting queue。
  3. 若經過條件判斷後,Task-1 繼續執行,便會執行到 Task-1 結束前才解鎖。
  4. 如果上鎖,則進入 waiting queue,否則上鎖並執行程式碼,並且當 Task-2 更改判斷條件後須使用 pthread_cond_signal() 更改 Condition variable 的狀態。

使用 Condition variables 改寫生產者/消費者問題

修改程式碼之前,需要知道 pthread_cond_wait() 被呼叫後會有以下行為:

  1. 解鎖,供其他執行緒上鎖
  2. 進入休眠,直到收到 pthread_cond_signal()
  3. 被喚醒後,要重新上鎖!

了解 API 的行為後,考慮以下程式碼:

int buffer[10];
int *in;
int count = 0;
void *producer(void *arg){
	for(;;){
		while(count == MAX_SIZE);
		pthread_mutex_lock(&lock);
		buffer[count] = 1;
		count++;
		printf("%d", count);
		pthread_mutex_unlock(&lock);
	}
}
void *consumer(void *arg){
	for(;;){
		while(count == 0);
		pthread_mutex_lock(&lock);
		count--;
		printf("%d", count);
		pthread_mutex_unlock(&lock);
	}
}

使用 Condition variables 改寫:

pthread_cond_t notFull = PTHREAD_COND_INITIALIZER;
pthread_cond_t notEmpty = PTHREAD_COND_INITIALIZER;
int buffer[10];
int *in;
int count = 0;
void *producer(void *arg){
	for(;;){
		while(count == MAX_SIZE)
				pthread_cond_wait(notFull, lock);
		pthread_mutex_lock(&lock);
		buffer[count] = 1;
		count++;
		pthread_cond_signal(notEmpty);
		printf("%d", count);
		pthread_mutex_unlock(&lock);
	}
}
void *consumer(void *arg){
	for(;;){
		while(count == 0);
				pthread_cond_wait(notEmpty, lock);
		pthread_mutex_lock(&lock);
		count--;
		pthread_cond_signal(notFull);
		printf("%d", count);
		pthread_mutex_unlock(&lock);
	}
}

Reference

⚠️ **GitHub.com Fallback** ⚠️