class Deliverer
{
public:
// 用來保護下面那個訊息貯列的互斥子
pthread_mutex_t msg_mutex;
// 將產生的、或處理過的訊息放在這裡,讓其他人稍候可以取走。
list<Message> msg_list;
// 用來等待與通知訊息的信號量
sem_t msg_sem;
};
執行緒同步 13:信號量
前篇介紹了使用信號來通知睡眠甦醒的操作方式, 但以實際應用來說,直接這樣操作是過於低階原始而缺乏足夠的實用價值。 那麼本篇就來介紹一個能夠適合前篇那樣的應用場景的實用同步工具:信號量。
本篇所介紹的信號量與前篇介紹的信號在中文名稱上比較沒有識別度而容易混淆, 因此這裡再次提醒,本篇介紹的「信號量」(Semaphore)與前篇所介紹的「信號」(Signal)並不相同, 而為免歧異,在本文中可能會使用「中斷信號」來稱呼「信號」(Signal)。
信號量是一種執行緒同步工具,其核心就是一個無號整數,
而我們使用者可以對它做加一、減一、或取值的操作。
呼叫 post() 函式可以將信號量的數值加一,呼叫 wait() 可以將信號量的數值減一,
而 getvalue() 可以讀取信號量當前的數值。
當然不同程式庫或系統 API 的信號量可能還提供更多的功能和擴展,
例如本篇下面就將會介紹的 timedwait(),然而上面的三個函式就是信號量最基本的三個操作功能。
但是前面說了,信號量提供的數值是個無號整數,
那麼如果數值已經被減到零之後還要再呼叫減一的話,會發生什麼事呢?
答案是這個減值的呼叫會被卡住,直到看誰呼叫了 post() 加一之後,
先前呼叫減一的這個操作才能完成並返回。
這就是信號量的運作邏輯與使用方式了,很簡單吧?
使用信號量改良測試範例
如果要把信號量套用在前篇使用的那個測試範例的話, 那麼信號量最適合用來作為訊息貯列內訊息數量的計數了。 當推送訊息給貯列的時候就給信號量的值加一,而取出貯列訊息的時候就給信號量的值減一; 當貯列沒有訊息的時候,減一的操作就會卡在那裡等, 直到有誰推入訊息給貯列的時候,我們的減一操作就能立刻被喚醒並完成。
但是還有一個小問題,
在實際的應用中如果等不到有人推送訊息的話,這減一的呼叫是不是就得等到天荒地老了?
以前我也解釋過,這樣的現象其實是非常容易遇到的,
比如網路可能因故根本就收不到資料,或比如使用者想要終止等待並退出程式等等。
對於這個需求,我們可以改呼叫信號量 wait() 函式的另一個功能擴充函式 timedwait(),
它的作用基本和 wait() 是一樣的,只不過可以讓我們指定最多等待多久時間。
如果時間到了的時候仍然不能完成值減一的操作的話,那麼 timedwait() 函式依然會返回,
並且我們可以從其返回值知道它是成功返回還是失敗返回。
其實這整個使用操作也可以讓我們用另一個邏輯來理解。
當信號量的值為零的時候,其實 timedwait() 就等同是一個指定最長休眠時間的休眠函式,
而當有人呼叫了 post() 的話就好似給它發送了一個信號來提前中止休眠。
我們同樣把上一篇的測試範例拿來修改,使用信號量取代原先的睡眠加中斷信號的使用組合。
首先修改 Deliverer 類別,這次我們不再需要知道到底是哪個執行緒在等待了,
只需要在這裡放一個信號量物件,讓它自己去處理這些事情就好。
此外也注意,現在圍繞在這個主要的貯列(msg_list)旁邊有兩個同步工具物件,
一個是互斥子、一個是信號量,
其中互斥子的作用是保護貯列的操作以避免資料競爭的情況,
而信號量的作用則是給等待貯列訊息的其它執行緒一個高效的休眠、與即時喚醒通知的作用。
以下就是修改後的 Deliverer 部份:
再來是修改等待接收訊息的部份。
在嘗試索取訊息前先將信號量的值減一;
若信號量的值為零無發再減,則表示當前訊息貯列為空,
那就等待最多一秒鐘看能不能等來新的訊息被存入貯列?
另外注意 timedwait() 的休眠時間採用的是絕對時間,而不是以往我們可能比較熟悉的相對時間。
像 sleep() 這類函式使用的是相對時間,意思就如同叫它:「請給我休息 X 秒鐘」;
而 timedwait() 使用的則是絕對時間,意思就如同叫它:「請給我休息直到 X 月 X 日的 X 分 X 秒」這樣!
於是在呼叫的時候,我們要先取得當下的時間,在後再給那個時間增加我們想要休眠的秒數之後傳入。
使用絕對時間有好處也有壞處,但在現階段的情況下我們也不好做評價,
只是人家函式介面就這樣定義,我們也只能照做。
以下即為修改後的程式碼:
while(!data->go_term)
{
// 將信號量的值減一,
// 若無值可減的話則嘗試等待最多一秒鐘。
struct timespec ts = { time(nullptr) + 1 };
bool avail = 0 == sem_timedwait(&data->src->msg_sem, &ts);
// 嘗試向設定的訊息來源索取新訊息
Message msg;
if(avail)
avail = PopMessage(data->src, msg);
if(avail)
{
// 取得新的訊息以後,將訊息做點該做的處理,
// 然後再將訊息推入自己的貯列,
// 讓下一個接手的人可以取走。
msg.count++;
PushMessage(data, msg);
}
}
|
Tip
|
前面說:「請休息直到 X 月 X 日的 X 分 X 秒」其實是一種口語化的描述,
實際上絕大多數作業系統所使用的絕對時間是一個被稱為 timestamp 的整數,
意義為從某個起始時間日期開始所經過的秒數。
以 UNIX 和 UNIX 相容系統來說,這個起始時間是 1970-01-01 00:00:00。
例如 2026-04-12 14:30:00 距離這個起始時間經過了 1775975400 秒鐘,
因此 2026-04-12 14:30:00 的 timestamp 就是 1775975400。
所以在上面的程式碼就看到我先呼叫 time() 取得當下的 timestamp,
然後再加上我想要睡眠的秒數,就成了我希望 sem_timedwait() 醒過來的 timestamp。
|
最後則是修改推送訊息的部份。 在將一筆訊息存入貯列的時候,順便給信號量的值加一。 整個操作和前篇在這裡發送信號的操作幾乎是一比一完全代換的, 只不過這次不需要在意是誰在等待?有沒有人在等待?或甚至是不是很多人在等待?要叫醒哪一個?等等的問題了。 以下就是修改後的程式碼:
static
void PushMessage(Deliverer *data, const Message &msg)
{
// 將訊息存入訊息貯列
pthread_mutex_lock(&data->msg_mutex);
data->msg_list.push_back(msg);
// 給信號量值加一,也通知任何可能正在等待新訊息的執行緒可以醒過來了!
sem_post(&data->msg_sem);
pthread_mutex_unlock(&data->msg_mutex);
}
以上只節錄最關鍵的修改部份,而完整的程式碼可以 從這裡下載。 程式在我電腦上執行出來的結果如下:
...... 前面省略
Factory: Generate "message-4"
Reporter: Got message, txt="message-4", counter=1001, delay=4
Factory: Generate "message-5"
Reporter: Got message, txt="message-5", counter=1001, delay=4
Factory: Generate "message-6"
Reporter: Got message, txt="message-6", counter=1001, delay=8
Factory: Generate "message-7"
Reporter: Got message, txt="message-7", counter=1001, delay=8
Factory: Generate "message-8"
Reporter: Got message, txt="message-8", counter=1001, delay=8
Factory: Generate "message-9"
Reporter: Got message, txt="message-9", counter=1001, delay=4
...... 後面省略
從測試結果來看,表現一如預期的優良,甚至於比前篇使用中斷信號的結果還要再好一些, 我想這應該是因為信號量的內部工作都在系統核心層內進行,而少了核心態切換次數的緣故吧!
問題檢討
本篇介紹的信號量相比前篇直接操作中斷信號而言已經讓使用變得簡單許多,甚至於效果還略為更佳。 但是信號量的使用一般還有一些比較容易出問題的棘手地方, 也就是為什麼信號量的使用一般比較少出現在開發應用中的原因。
但是上面的測試範例程式碼因為對於相關關鍵操作的分佈比較零散,不容易直觀的看出信號量在使用上暴露的問題。
因此在繼續深入探討之前,我要先對上面的範例做點修改,
主要的目的是將對信號量、貯列、和互斥子等這些關鍵物件的操作給集中起來,試圖使問題更加一目了然。
為此,我要將原本用來取出貯列訊息的 PopMessage() 和消費者迴圈中減值與等待的操作合併起來,
成為 WaitPopMessage() 函式:
static
bool WaitPopMessage(
Deliverer *data, Message &msg, unsigned timeout_ms)
{
/*
* 嘗試從訊息貯列中取出一筆訊息
*/
// 將貯列的訊息數量計數減一,
// 若目前計數為零,則嘗試等待一會兒。
struct timespec ts;
ts.tv_sec = time(nullptr) + timeout_ms / 1000;
ts.tv_nsec = timeout_ms % 1000 * 1000 * 1000;
if( 0 != sem_timedwait(&data->msg_sem, &ts) )
{
// 如果等待超時而仍不能成功操作減值,
// 則直接返回取無訊息的結果。
return false;
}
// 若減值成功則下一步從貯列取出一個訊息。
// 不過因為從信號量減值操作到上鎖互斥子之間還有意外的可能,
// 因此這裡取出訊息的操作也不一定就肯定成功,
// 這點也必須要考量在內。
pthread_mutex_lock(&data->msg_mutex);
bool have_msg = !data->msg_list.empty();
if(have_msg)
{
msg = data->msg_list.front();
data->msg_list.pop_front();
}
pthread_mutex_unlock(&data->msg_mutex);
return have_msg;
}
因為將許多相關的操作都搬移至 WaitPopMessage() 了,
於是原來等待索取和處理訊息的迴圈則變得相當精簡:
while(!data->go_term)
{
Message msg;
bool avail = WaitPopMessage(data->src, msg, 1000);
if(avail)
{
/* ...... 此處省略 ...... */
}
}
本篇的測試範例條件其實相當單純,就是單純取得並簡單處理訊息而已,再沒有其它工作互相混雜, 且所有的工廠與消費者都是一對一的供給索取關係,基本不會有其它干擾因素, 因此才沒有產生下面要描述的那些問題。 然而對於實際開發應用的需求來說,條件可能會複雜得多, 比如可能會有來自於多個執行緒的索取操作一次同時發生,甚至於有時候連推送資料的執行緒可能也不只一條。 因此雖然理論上如果嚴格按照邏輯來操作使用信號量協同工作的話也不至於就必定會發生問題, 只不過在實際複雜的現場環境下相對不容易維繫嚴謹性,較容易出漏洞罷了!
回顧上面的 WaitPopMessage() 函式,雖然對信號量的操作、和對貯列的操作彼此都是原子的,
然而在信號量減值操作完成之後,一直到互斥子上鎖之間存在一個空檔,給其它情況的亂入帶來可乘之機。
想想有沒有可能,在信號量減值完成後到互斥子上鎖的中間被其它執行緒給截胡,搶先完成上鎖並取走貯列訊息?
這就會導致雖然前面信號量減值成功,但後面取資訊失敗。
失敗就算了,就返回目前取不到東西的結果不就好了?非也!
雖然沒能成功從貯列取出訊息,但是對信號量的減值已經完成了啊。
這就會造成用來記錄貯列訊息數量的信號量與貯列實際的訊息數量發生不一致,
導致後續有機會發生明明貯列還有訊息但信號量值已為零,造成索取資料的執行緒持續等待的結果!
(如果它不是採用如本篇使用的 timedwait() 而是使用 wait(),那就恭喜卡死嘍!)
或者更誇張的情況可能是,如果有人把 timedwait() 直接就當成休眠函式,
也不檢查操作是否正確完成,一律在休眠結束後嘗試從貯列索取信息,則數字不匹配的情況就產生了;
只不過這時候是信號量的數值會大於貯列訊息的實際數量,這種情形倒是不會讓程式卡死,
就是會增加一些後續的虛操作而已!
總結而言,信號量在實際應用中容易產生的問題有兩個: 其一是信號量的操作和互斥子上鎖操作之間並不是原子的; 其二是為了保證信號量的值和實際的資源數量之間互相一致,需要更加小心的維繫程式碼的邏輯嚴謹。 從這兩點來看,就不難發現另一個執行緒同步工具「條件變量」根本就是踩在信號量的這兩個痛點上面進行改善, 而這可能也就是為什麼在大部份的實際需求裡面被使用的都是條件變量而少見信號量的原因; 不過那是下一篇的內容了……
總結
本篇介紹了信號量這個執行緒同步工具。 相比於前篇直接使用中斷信號,信號量的使用更加簡便親民且效能還更好,已經具有實用價值。 只不過信號量在複雜的實際應用中仍容易產生一些較為困擾的狀況,需要被更加謹慎的操作、檢查、和設計。 下篇就讓我們延續這個話題,看看那赫赫有名的條件變量是使用什麼樣的思路來解決這些問題的吧!
下一篇:「執行緒同步 14:條件變量」