Multiple Producers and Consumers
A. First Edition
This is a small simulation of my project which has three major tasks, namely graphic displaying, view collecting, mpi
communicating. In order to create a multiple-threaded producers and consumers synchronization mechanism, I choose the
conditional wait which is superior to busy waiting. After a night and a morning coding, debugging and testing, I finished
this small simulation and were just ready to post it for future reference when developing my project. Then suddenly I fell
into the old "new" trap. If these tasks are a kind of sequential, is there any benefit to develop them in multiple-threaded?
Hence, the sub title suits perfectly the dilemma: the pro and con of multiple-threaded!
B.The problem
The graphic thread will produce some events for view thread. The view thread accepts events and produces some requests.
The mpi thread consumes requests and produces some pixel data to feed graphic thread. The event, request and pixel all have
their own buffers. The buffers are all circular array managed by buffer index. One big problem in producer-consumer pattern
is that the producer should not produce more than consumer can consume. However, here the producer and consumer have three
groups like cascade model. Each thread has its own producer and consumer buffer index. Graphic thread has a "eventIndex"
which is producing pointer and it must make sure the following consumer, namely view thread, has space for new events to
consume. So, the condition for graphic thread is (graphicEventIndex+1)%MaxBufferNumber==mpiPixelIndex. Similarly the view
thread must make sure two conditions. One is to make sure there exists new events pending. The other is to make sure there
is space for mpi thread to consume. ((viewRequestIndex+1)%MaxBufferNumber==mpiPixelIndex || viewEventIndex==graphicEventIndex)
Taking view thread as an example, the eventIndex is used to indicating new pending event for view thread to "copy" to
request buffer. However, the requestIndex is used for its supplier "graphic thread" to see if there is space to "produce"
new event. This is a bit odd. Maybe people use a better logic to express this "pending" task and available consumable
space.
The conditional wait has a superior advantage compared with common busy waiting. But why do we use threads when these work
are in sequential basis?
The biggest problems in multiple threading program is "deadlock" and "livelock" or non-progressive. The condition must
be a kind of non-inclusive so that at any moment there must be at least one thread evaluates the condition to be true.
Otherwise there will be deadlock. Also the logic must make sure the program is progressive. Otherwise the conditional wait
would be no different from "busy waiting". For example, some threads runs again and again and no progressive at all.
These two issues takes most of my debugging time. Of course the most basic non-inclusive access of protected data is done
by the basic mutex-condition-wait mechanism.
D.The major functions
You must understand the basic semantics of conditional wait. As far as I understand the key is the mutual exclusive "wait"
condition to prevent from deadlock. As for "signal" or "broadcast", you can do it at any time even before you unlock mutex
or not. The extreme case is that an irrelevant thread can also call "signal" or "broadcast".
E.Further improvement
F.File listing
1. multiplethread.cpp
file name: multiplethread.cpp
#include <stdio.h> #include <stdlib.h> #include <string.h> #include <pthread.h> #include <unistd.h> #include <sys/time.h> pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; pthread_cond_t cond = PTHREAD_COND_INITIALIZER; void* graphicProc(void*param); void* viewProc(void*param); void* mpiProc(void*param); int frameNumber = 0; typedef void*(*ThreadProc)(void*); const int MaxBufferNumber=5; const int MaxFrameNumber=1000; const int MaxBufferSize=1024; const int MaxGraphicJobNumber=100; const int MaxViewJobNumber=2000; const int MaxMpiJobNumber=5000; int eventIndex[MaxBufferNumber]; int requestIndex[MaxBufferNumber]; int pixelIndex[MaxBufferNumber]; int graphicRuns=0, viewRuns=0, mpiRuns=0; char eventBuffer[MaxBufferNumber][MaxBufferSize]; char requestBuffer[MaxBufferNumber][MaxBufferSize]; char pixelBuffer[MaxBufferNumber][MaxBufferSize]; pthread_t graphicThread, viewThread, mpiThread; //for graphicThread, eventIndex is producer, pixelIndex is consumer int graphicEventIndex=0, graphicPixelIndex=MaxBufferNumber-1; //for viewThread,eventIndex is producer, request is consumer int viewEventIndex=0, viewRequestIndex=0; //for mpiThread, requestIndex is producer, pixelIndex is consumer int mpiRequestIndex=0, mpiPixelIndex=0; bool createEvent(); void graphicProcessEvent(); void viewProcessEvent(); void mpiProcessRequest(); struct timeval start, end; int i; float result; int totalSec, totalUsec; void printStatus() { printf("graphic: event=%d,pixel=%d\n", graphicEventIndex, graphicPixelIndex); printf("view: event=%d, request=%d\n", viewEventIndex, viewRequestIndex); printf("mpi: request=%d, pixel=%d\n", mpiRequestIndex, mpiPixelIndex); } int main() { pthread_mutex_init(&mutex, NULL); pthread_cond_init(&cond, NULL); sprintf(pixelBuffer[graphicPixelIndex], "empty starting pixel buffer\n"); gettimeofday(&start, NULL); pthread_create(&graphicThread, NULL, graphicProc, NULL); pthread_create(&mpiThread, NULL, mpiProc, NULL); pthread_create(&viewThread,NULL, viewProc, NULL); printf("main thread finishes creating...\n"); pthread_join(graphicThread, NULL); pthread_join(viewThread, NULL); pthread_join(mpiThread, NULL); printf("graphic runs:%d, view runs:%d, mpi runs:%d\n", graphicRuns, viewRuns, mpiRuns); return 0; } void* graphicProc(void* param) { int i; int usec; printf("graphic thread starts...\n"); while (frameNumber<MaxFrameNumber) { //printf("graphic threads starts new frame...\n"); //eventIndex=%d, pixelIndex=%d\n", graphicEventIndex, graphicPixelIndex); //printStatus(); pthread_mutex_lock(&mutex); //only one case we stop graphic thread, that is when no space for new events //another case to speed up view by checking if there is pending new event in view while ((graphicEventIndex+1)%MaxBufferNumber==mpiPixelIndex) //graphicEventIndex!=viewEventIndex) { pthread_cond_wait(&cond, &mutex); } //if we have event space to add new event //createNewEvent(); for (i=0; i<MaxGraphicJobNumber; i++) { sprintf(eventBuffer[graphicEventIndex], "new event#%d of frame number %d\n", graphicEventIndex, frameNumber); } graphicEventIndex=(graphicEventIndex+1)%MaxBufferNumber; //this is only meaningful for very first frame if ((graphicPixelIndex+1)%MaxBufferNumber!=mpiPixelIndex) { //there is some new incoming pixels to display graphicPixelIndex=(graphicPixelIndex+1)%MaxBufferNumber; } //printStatus(); printf("graphic thread is displaying pixel buffer:'%s' of frame number %d\n", pixelBuffer[graphicPixelIndex], frameNumber); frameNumber++; pthread_cond_broadcast(&cond); pthread_mutex_unlock(&mutex); graphicRuns++; gettimeofday(&end, NULL); usec=(end.tv_sec-start.tv_sec)*1000000+end.tv_usec-start.tv_usec; printf("fps:%f\n", 1000000.0/(float)usec); memcpy(&start, &end, sizeof(timeval)); } } //general idea: conditional wait conditions are //a) whether there is new pending tasks by checking if producer index is equal to consumer index //b) whether there is more space for production by checking if (producerIndex+1)%MaxBufferNumber //is equal to following producerIndex void* viewProc(void* param) { int i; char buffer[MaxBufferSize]; printf("view thread starts...\n"); while (frameNumber<MaxFrameNumber) { //printf("view thread starts a new frame...\n"); // eventIndex=%d, requestIndex=%d...\n",viewEventIndex, viewRequestIndex); //printStatus(); pthread_mutex_lock(&mutex); //if only stops when both condition are true //a)there is no more space for new request buffer //b)there is no event while ((viewRequestIndex+1)%MaxBufferNumber==mpiPixelIndex|| viewEventIndex==graphicEventIndex)//no event { pthread_cond_wait(&cond, &mutex); } //viewProcessRequest(); if (viewEventIndex!=graphicEventIndex) { //there is some new event and starting process event sprintf(buffer, " and view thread process this event#%d at frame#%d\n", viewEventIndex, frameNumber); strcat(eventBuffer[viewEventIndex], buffer); viewEventIndex=(viewEventIndex+1)%MaxBufferNumber;//finish processing event } //if there is still space to put new request in request buffer and //if there exists new pending event waiting for copying to request buffer if ((viewRequestIndex+1)%MaxBufferNumber!=mpiPixelIndex&& viewEventIndex!=viewRequestIndex) { //there is new processed event and there is space for new request for (i=0; i<MaxViewJobNumber; i++) { strcpy(requestBuffer[viewRequestIndex], eventBuffer[viewRequestIndex]); } viewRequestIndex=(viewRequestIndex+1)%MaxBufferNumber;//finish processing event } //printf("view thread finishes one frame...\n"); pthread_cond_broadcast(&cond); pthread_mutex_unlock(&mutex); viewRuns++; } } void* mpiProc(void* param) { int i; char buffer[MaxBufferSize]; printf("mpi thread starts...\n"); while (frameNumber<MaxFrameNumber) { //printf("mpi thread starts a new frame...\n"); //requestIndex=%d, pixelIndex=%d\n",mpiRequestIndex, mpiPixelIndex); //printStatus(); pthread_mutex_lock(&mutex); //it only stops for both two conditions //a) no new request //b) no more space to write pixel results while (mpiRequestIndex==viewRequestIndex||(mpiPixelIndex+1)%MaxBufferNumber==graphicEventIndex) { pthread_cond_wait(&cond, &mutex); } //mpiProcessRequest(); if (mpiRequestIndex!=viewRequestIndex) { //a new request in buffer waiting for processing sprintf(buffer, " and mpi thread process this request#%d at frame number#%d\n", mpiRequestIndex, frameNumber); strcat(requestBuffer[mpiRequestIndex], buffer); mpiRequestIndex=(mpiRequestIndex+1)%MaxBufferNumber; } //this needs two conditions, //a)there is pending finished request to be copied to pixel buffer //b)there is space for mpi thread to copy which means graphic thread is not processing the //target pixel buffer if ((mpiPixelIndex+1)%MaxBufferNumber!=graphicEventIndex&&mpiRequestIndex!=mpiPixelIndex) { for (i=0; i<MaxMpiJobNumber; i++) { strcpy(pixelBuffer[mpiPixelIndex], requestBuffer[mpiPixelIndex]); } mpiPixelIndex=(mpiPixelIndex+1)%MaxBufferNumber; } //printf("mpi thread finishes one frame...\n"); pthread_cond_broadcast(&cond); pthread_mutex_unlock(&mutex); mpiRuns++; } }
The result is like this:
graphic thread starts...
graphic thread is displaying pixel buffer:'empty starting pixel buffer
' of frame number 0
fps:2976.190476
graphic thread is displaying pixel buffer:'empty starting pixel buffer
' of frame number 1
fps:5847.953216
graphic thread is displaying pixel buffer:'empty starting pixel buffer
' of frame number 2
fps:7194.244604
graphic thread is displaying pixel buffer:'empty starting pixel buffer
' of frame number 3
fps:7299.270073
mpi thread starts...
view thread starts...
main thread finishes creating...
graphic thread is displaying pixel buffer:'new event#0 of frame number 0
and view thread process this event#0 at frame#4
and mpi thread process this request#0 at frame number#4
' of frame number 4
fps:380.372765
graphic thread is displaying pixel buffer:'new event#1 of frame number 1
and view thread process this event#1 at frame#4
and mpi thread process this request#1 at frame number#5
' of frame number 5
fps:808.407437
graphic thread is displaying pixel buffer:'new event#2 of frame number 2
and view thread process this event#2 at frame#4
and mpi thread process this request#2 at frame number#6
' of frame number 6
fps:814.332248
graphic thread is displaying pixel buffer:'new event#3 of frame number 3
and view thread process this event#3 at frame#4
and mpi thread process this request#3 at frame number#7
' of frame number 7
fps:814.332248
graphic thread is displaying pixel buffer:'new event#4 of frame number 4
and view thread process this event#4 at frame#8
and mpi thread process this request#4 at frame number#8
' of frame number 8
....
....
fps:629.722922
graphic thread is displaying pixel buffer:'new event#4 of frame number 189
and view thread process this event#4 at frame#193
and mpi thread process this request#4 at frame number#193
' of frame number 193
fps:637.755102
graphic thread is displaying pixel buffer:'new event#0 of frame number 190
and view thread process this event#0 at frame#194
and mpi thread process this request#0 at frame number#194
' of frame number 194
fps:638.569604
graphic thread is displaying pixel buffer:'new event#1 of frame number 191
and view thread process this event#1 at frame#195
and mpi thread process this request#1 at frame number#195
' of frame number 195
fps:638.569604
graphic thread is displaying pixel buffer:'new event#2 of frame number 192
and view thread process this event#2 at frame#196
and mpi thread process this request#2 at frame number#196
' of frame number 196
fps:638.162093
graphic thread is displaying pixel buffer:'new event#3 of frame number 193
and view thread process this event#3 at frame#197
and mpi thread process this request#3 at frame number#197
' of frame number 197
fps:638.162093
graphic thread is displaying pixel buffer:'new event#4 of frame number 194
and view thread process this event#4 at frame#198
and mpi thread process this request#4 at frame number#198
' of frame number 198
fps:629.722922
graphic thread is displaying pixel buffer:'new event#0 of frame number 195
and view thread process this event#0 at frame#199
and mpi thread process this request#0 at frame number#199
' of frame number 199
fps:637.755102
graphic runs:200, view runs:196, mpi runs:196