#include <assert.h>
#include <stdio.h>
#include <string>
#include <list>
#include <pthread.h>
#include <semaphore.h>

#ifdef _WIN32
#   include <windows.h>
#else
#   include <unistd.h>
#endif

using namespace std;

static
void SleepMs(unsigned ms)
{
#ifdef _WIN32
    Sleep(ms);
#else
    usleep( 1000 * ms );
#endif
}

static
unsigned GetMsTime(void)
{
#ifdef _WIN32
    return GetTickCount();
#else
    struct timespec ts;
    int err = clock_gettime(CLOCK_MONOTONIC_COARSE, &ts);
    assert( err == 0 );

    return ts.tv_sec * 1000 + ts.tv_nsec / (1000*1000);
#endif
}

struct Message
{
    unsigned ctime; // 訊息產生的時間
    unsigned count; // 記錄這個訊息被多少人處理過的計數
    string str;     // 訊息內容
};

class Deliverer
{
public:
    // 用來保護下面那個訊息貯列的互斥子
    pthread_mutex_t msg_mutex;

    // 將產生的、或處理過的訊息放在這裡，讓其他人稍候可以取走。
    list<Message> msg_list;

    // 用來等待與通知訊息的信號量
    sem_t msg_sem;
};

class Factory : public Deliverer
{
public:
    unsigned period;    // 產生消息的時間間隔
    unsigned total_num; // 要產生的消息數量

    pthread_t thrd;
};

class Receiver : public Deliverer
{
public:
    Deliverer *src;     // 要向這個來源索取新訊息

    pthread_t thrd;
    bool go_term;
};

static
void PushMessage(Deliverer *data, const Message &msg)
{
    // 將訊息存入訊息貯列

    pthread_mutex_lock(&data->msg_mutex);

    data->msg_list.push_back(msg);

    // 給信號量值加一，也通知任何可能正在等待新訊息的執行緒可以醒過來了！
    sem_post(&data->msg_sem);

    pthread_mutex_unlock(&data->msg_mutex);
}

static
bool PopMessage(Deliverer *data, Message &msg)
{
    // 嘗試從訊息貯列中取出一筆訊息

    pthread_mutex_lock(&data->msg_mutex);

    bool have_msg = !data->msg_list.empty();
    if(have_msg)
    {
        msg = data->msg_list.front();
        data->msg_list.pop_front();
    }

    pthread_mutex_unlock(&data->msg_mutex);

    return have_msg;
}

static
void* FactoryThread(Factory *data)
{
    for(unsigned i = 0; i < data->total_num; ++i)
    {
        // 每隔一段時間就產生一個新訊息，
        // 並將訊息推入自己的貯列，讓其他人可以取走。

        Message msg;
        msg.ctime = GetMsTime();
        msg.count = 1;
        msg.str = "message-" + to_string(i);
        printf("Factory: Generate \"%s\"\n", msg.str.c_str());

        PushMessage(data, msg);

        // 階段工作結束，稍微休息一小段時間再繼續下一輪。
        if(data->period)
            SleepMs(data->period);
    }

    return nullptr;
}

static
void* TransferThread(Receiver *data)
{
    while(!data->go_term)
    {
        // 將信號量的值減一，
        // 若無值可減的話則嘗試等待最多一秒鐘。
        struct timespec ts = { time(nullptr) + 1 };
        bool avail = 0 == sem_timedwait(&data->src->msg_sem, &ts);

        // 嘗試向設定的訊息來源索取新訊息
        Message msg;
        if(avail)
            avail = PopMessage(data->src, msg);

        if(avail)
        {
            // 取得新的訊息以後，將訊息做點該做的處理，
            // 然後再將訊息推入自己的貯列，
            // 讓下一個接手的人可以取走。
            msg.count++;
            PushMessage(data, msg);
        }
    }

    return nullptr;
}

static
void* ReporterThread(Receiver *data)
{
    while(!data->go_term)
    {
        // 將信號量的值減一，
        // 若無值可減的話則嘗試等待最多一秒鐘。
        struct timespec ts = { time(nullptr) + 1 };
        bool avail = 0 == sem_timedwait(&data->src->msg_sem, &ts);

        // 嘗試向設定的訊息來源索取新訊息
        Message msg;
        if(avail)
            avail = PopMessage(data->src, msg);

        if(avail)
        {
            msg.count++;

            // 這是最後一個接手處理訊息的執行緒，後面沒有其它接收者了。
            // 於是收到訊息便要統計相關資訊並列印出來。
            unsigned delay = GetMsTime() - msg.ctime;
            printf("Reporter: Got message, txt=\"%s\", counter=%u, delay=%u\n",
                msg.str.c_str(), msg.count, delay);
        }
    }

    return nullptr;
}

int main(int argc, char *argv[])
{
    // 建立並啟動定時產生訊息的執行緒

    Factory factory;
    factory.period = 500;
    factory.total_num = 20;
    pthread_mutex_init(&factory.msg_mutex, nullptr);
    sem_init(&factory.msg_sem, 0, 0);

    pthread_create(&factory.thrd, nullptr, (void*(*)(void*)) FactoryThread, &factory);

    // 建立並啟動索取和轉手訊息的執行緒們

    static const int rcvr_num = 1000;
    Receiver rcvr_list[rcvr_num];

    for(int i = 0; i < rcvr_num; ++i)
    {
        Receiver *rcvr = &rcvr_list[i];

        // 第一個 receiver 的訊息來源是 factory，
        // 其它 receiver 的來源則是前一個 receiver。
        Deliverer *msg_src =
            i == 0 ?
            (Deliverer*) &factory :
            (Deliverer*) &rcvr_list[i-1];

        // 最後一個 receiver 要負責顯示訊息最後的結果，
        // 而其它 receiver 負責轉交訊息給下一個 receiver。
        void*(*thrd_func)(void*) =
            i == rcvr_num - 1 ?
            (void*(*)(void*)) ReporterThread :
            (void*(*)(void*)) TransferThread;

        pthread_mutex_init(&rcvr->msg_mutex, nullptr);
        sem_init(&rcvr->msg_sem, 0, 0);
        rcvr->src = msg_src;
        rcvr->go_term = false;

        pthread_create(&rcvr->thrd, nullptr, thrd_func, rcvr);
    }

    // 等待工廠執行緒完成工作自然結束

    pthread_join(factory.thrd, nullptr);

    // 停止其它所有執行緒

    for(int i = rcvr_num - 1; i >= 0; --i)
        rcvr_list[i].go_term = true;

    for(int i = rcvr_num - 1; i >= 0; --i)
        pthread_join(rcvr_list[i].thrd, nullptr);

    // 銷毀已建立的資源

    sem_destroy(&factory.msg_sem);
    pthread_mutex_destroy(&factory.msg_mutex);

    for(int i = 0; i < rcvr_num; ++i)
    {
        sem_destroy(&rcvr_list[i].msg_sem);
        pthread_mutex_destroy(&rcvr_list[i].msg_mutex);
    }

    return 0;
}
