multiple producer and consumer

            Multiple Producers and Consumers

A. First Edition
This is a small simulation of my project which has three major tasks, namely graphic displaying, view collecting, mpi
communicating. In order to create a multiple-threaded producers and consumers synchronization mechanism, I choose the 
conditional wait which is superior to busy waiting. After a night and a morning coding, debugging and testing, I finished
this small simulation and were just ready to post it for future reference when developing my project. Then suddenly I fell
into the old "new" trap. If these tasks are a kind of sequential, is there any benefit to develop them in multiple-threaded?
Hence, the sub title suits perfectly the dilemma: the pro and con of multiple-threaded!
B.The problem

The graphic thread will produce some events for view thread. The view thread accepts events and produces some requests.

The mpi thread consumes requests and produces some pixel data to feed graphic thread. The event, request and pixel all have

their own buffers. The buffers are all circular array managed by buffer index. One big problem in producer-consumer pattern

is that the producer should not produce more than consumer can consume. However, here the producer and consumer have three

groups like cascade model. Each thread has its own producer and consumer buffer index. Graphic thread has a "eventIndex"

which is producing pointer and it must make sure the following consumer, namely view thread, has space for new events to

consume. So, the condition for graphic thread is (graphicEventIndex+1)%MaxBufferNumber==mpiPixelIndex. Similarly the view

thread must make sure two conditions. One is to make sure there exists new events pending. The other is to make sure there

is space for mpi thread to consume. ((viewRequestIndex+1)%MaxBufferNumber==mpiPixelIndex || viewEventIndex==graphicEventIndex)

Taking view thread as an example, the eventIndex is used to indicating new pending event for view thread to "copy" to

request buffer. However, the requestIndex is used for its supplier "graphic thread" to see if there is space to "produce"

new event. This is a bit odd. Maybe people use a better logic to express this "pending" task and available consumable

space.

C.The idea of program

The conditional wait has a superior advantage compared with common busy waiting. But why do we use threads when these work

are in sequential basis?

The biggest problems in multiple threading program is "deadlock" and "livelock" or non-progressive. The condition must

be a kind of non-inclusive so that at any moment there must be at least one thread evaluates the condition to be true.

Otherwise there will be deadlock. Also the logic must make sure the program is progressive. Otherwise the conditional wait

would be no different from "busy waiting". For example, some threads runs again and again and no progressive at all.

These two issues takes most of my debugging time. Of course the most basic non-inclusive access of protected data is done

by the basic mutex-condition-wait mechanism.

D.The major functions
You must understand the basic semantics of conditional wait. As far as I understand the key is the mutual exclusive "wait"
condition to prevent from deadlock. As for "signal" or "broadcast", you can do it at any time even before you unlock mutex 
or not. The extreme case is that an irrelevant thread can also call "signal" or "broadcast".
E.Further improvement
F.File listing
1. multiplethread.cpp
 
 
file name: multiplethread.cpp
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>
#include <sys/time.h>

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t  cond  = PTHREAD_COND_INITIALIZER;


void* graphicProc(void*param);
void* viewProc(void*param);
void* mpiProc(void*param);
int  frameNumber = 0;


typedef void*(*ThreadProc)(void*);

const int MaxBufferNumber=5;
const int MaxFrameNumber=1000;
const int MaxBufferSize=1024;
const int MaxGraphicJobNumber=100;
const int MaxViewJobNumber=2000;
const int MaxMpiJobNumber=5000;

int eventIndex[MaxBufferNumber];
int requestIndex[MaxBufferNumber];
int pixelIndex[MaxBufferNumber];

int graphicRuns=0, viewRuns=0, mpiRuns=0;

char eventBuffer[MaxBufferNumber][MaxBufferSize];
char requestBuffer[MaxBufferNumber][MaxBufferSize];
char pixelBuffer[MaxBufferNumber][MaxBufferSize];

pthread_t graphicThread, viewThread, mpiThread;

//for graphicThread, eventIndex is producer, pixelIndex is consumer
int graphicEventIndex=0, graphicPixelIndex=MaxBufferNumber-1;
//for viewThread,eventIndex is producer, request is consumer
int viewEventIndex=0, viewRequestIndex=0;
//for mpiThread, requestIndex is producer, pixelIndex is consumer
int mpiRequestIndex=0, mpiPixelIndex=0;


bool createEvent();

void graphicProcessEvent();
void viewProcessEvent();
void mpiProcessRequest();


struct timeval start, end;

int i;
float result;

int totalSec, totalUsec;


void printStatus()
{
	printf("graphic: event=%d,pixel=%d\n", graphicEventIndex, graphicPixelIndex);
	printf("view: event=%d, request=%d\n", viewEventIndex, viewRequestIndex);
	printf("mpi: request=%d, pixel=%d\n", mpiRequestIndex, mpiPixelIndex);
}

int main()
{
	pthread_mutex_init(&mutex, NULL);
	pthread_cond_init(&cond, NULL);

	sprintf(pixelBuffer[graphicPixelIndex], "empty starting pixel buffer\n");
	gettimeofday(&start, NULL);
	pthread_create(&graphicThread, NULL, graphicProc, NULL);
	pthread_create(&mpiThread, NULL, mpiProc, NULL);
	pthread_create(&viewThread,NULL, viewProc, NULL);

	printf("main thread finishes creating...\n");

	pthread_join(graphicThread, NULL);
	pthread_join(viewThread, NULL);
	pthread_join(mpiThread, NULL);

	printf("graphic runs:%d, view runs:%d, mpi runs:%d\n", graphicRuns, viewRuns, mpiRuns);		
	return 0;
}


void* graphicProc(void* param)
{
	int i;
	int usec;
	printf("graphic thread starts...\n");
	while (frameNumber<MaxFrameNumber)
	{
		//printf("graphic threads starts new frame...\n");
			//eventIndex=%d, pixelIndex=%d\n", graphicEventIndex, graphicPixelIndex);
		//printStatus();
		pthread_mutex_lock(&mutex);

		//only one case we stop graphic thread, that is when no space for new events
		//another case to speed up view by checking if there is pending new event in view
		while ((graphicEventIndex+1)%MaxBufferNumber==mpiPixelIndex)
			//graphicEventIndex!=viewEventIndex)
		{
			pthread_cond_wait(&cond, &mutex);
		}

		//if we have event space to add new event
			//createNewEvent();			
		for (i=0; i<MaxGraphicJobNumber; i++)
		{
			sprintf(eventBuffer[graphicEventIndex], "new event#%d of frame number %d\n",
				graphicEventIndex, frameNumber);
		}
		graphicEventIndex=(graphicEventIndex+1)%MaxBufferNumber;
		
		//this is only meaningful for very first frame
		if ((graphicPixelIndex+1)%MaxBufferNumber!=mpiPixelIndex)
		{
			//there is some new incoming pixels to display
			graphicPixelIndex=(graphicPixelIndex+1)%MaxBufferNumber;
		}		
		//printStatus();
		printf("graphic thread is displaying pixel buffer:'%s' of frame number %d\n", 
			pixelBuffer[graphicPixelIndex], frameNumber);
		frameNumber++; 

		pthread_cond_broadcast(&cond);
		pthread_mutex_unlock(&mutex);
		graphicRuns++;
		gettimeofday(&end, NULL);
		usec=(end.tv_sec-start.tv_sec)*1000000+end.tv_usec-start.tv_usec;
		printf("fps:%f\n", 1000000.0/(float)usec);
		memcpy(&start, &end, sizeof(timeval));
	}
}


//general idea: conditional wait conditions are 
//a) whether there is new pending tasks by checking if producer index is equal to consumer index
//b) whether there is more space for production by checking if (producerIndex+1)%MaxBufferNumber
//is equal to following producerIndex
		
void* viewProc(void* param)
{
	int i;
	char buffer[MaxBufferSize];
	printf("view thread starts...\n");
	while (frameNumber<MaxFrameNumber)
	{
		//printf("view thread starts a new frame...\n");
			// eventIndex=%d, requestIndex=%d...\n",viewEventIndex, viewRequestIndex);
		//printStatus();
		pthread_mutex_lock(&mutex);
		
		//if only stops when both condition are true
		//a)there is no more space for new request buffer
		//b)there is no event
		while ((viewRequestIndex+1)%MaxBufferNumber==mpiPixelIndex||
			viewEventIndex==graphicEventIndex)//no event
		{
			pthread_cond_wait(&cond, &mutex);
		}
		//viewProcessRequest();
		
		if (viewEventIndex!=graphicEventIndex)
		{
			//there is some new event and starting process event
			sprintf(buffer, " and view thread process this event#%d at frame#%d\n", viewEventIndex,
				frameNumber);
			strcat(eventBuffer[viewEventIndex], buffer);
			viewEventIndex=(viewEventIndex+1)%MaxBufferNumber;//finish processing event
			
		}
		//if there is still space to put new request in request buffer and
		//if there exists new pending event waiting for copying to request buffer
		if ((viewRequestIndex+1)%MaxBufferNumber!=mpiPixelIndex&&
			viewEventIndex!=viewRequestIndex)
		{
			//there is new processed event and there is space for new request
			for (i=0; i<MaxViewJobNumber; i++)
			{
				strcpy(requestBuffer[viewRequestIndex], eventBuffer[viewRequestIndex]);
			}
			viewRequestIndex=(viewRequestIndex+1)%MaxBufferNumber;//finish processing event
		}

		//printf("view thread finishes one frame...\n");
		pthread_cond_broadcast(&cond);
		pthread_mutex_unlock(&mutex);
		viewRuns++;
	}
}

void* mpiProc(void* param)
{
	int i;
	char buffer[MaxBufferSize];
	printf("mpi thread starts...\n");
	while (frameNumber<MaxFrameNumber)
	{
		//printf("mpi thread starts a new frame...\n");
			//requestIndex=%d, pixelIndex=%d\n",mpiRequestIndex, mpiPixelIndex);
		//printStatus();
		pthread_mutex_lock(&mutex);
		
		//it only stops for both two conditions
		//a) no new request
		//b) no more space to write pixel results
		while (mpiRequestIndex==viewRequestIndex||(mpiPixelIndex+1)%MaxBufferNumber==graphicEventIndex)
		{
			pthread_cond_wait(&cond, &mutex);
		}
		//mpiProcessRequest();
		if (mpiRequestIndex!=viewRequestIndex)
		{
			//a new request in buffer waiting for processing
			sprintf(buffer, " and mpi thread process this request#%d at frame number#%d\n",
				mpiRequestIndex, frameNumber);
			strcat(requestBuffer[mpiRequestIndex], buffer);
			mpiRequestIndex=(mpiRequestIndex+1)%MaxBufferNumber;
		}
		//this needs two conditions, 
		//a)there is pending finished request to be copied to pixel buffer
		//b)there is space for mpi thread to copy which means graphic thread is not processing the
		//target pixel buffer
		if ((mpiPixelIndex+1)%MaxBufferNumber!=graphicEventIndex&&mpiRequestIndex!=mpiPixelIndex)
		{
			for (i=0; i<MaxMpiJobNumber; i++)
			{
				strcpy(pixelBuffer[mpiPixelIndex], requestBuffer[mpiPixelIndex]);
			}
			mpiPixelIndex=(mpiPixelIndex+1)%MaxBufferNumber;
		}
		//printf("mpi thread finishes one frame...\n");
		
		pthread_cond_broadcast(&cond);
		pthread_mutex_unlock(&mutex);
		mpiRuns++;
	}
}

		

The result is like this:

graphic thread starts...

graphic thread is displaying pixel buffer:'empty starting pixel buffer

' of frame number 0

fps:2976.190476

graphic thread is displaying pixel buffer:'empty starting pixel buffer

' of frame number 1

fps:5847.953216

graphic thread is displaying pixel buffer:'empty starting pixel buffer

' of frame number 2

fps:7194.244604

graphic thread is displaying pixel buffer:'empty starting pixel buffer

' of frame number 3

fps:7299.270073

mpi thread starts...

view thread starts...

main thread finishes creating...

graphic thread is displaying pixel buffer:'new event#0 of frame number 0

and view thread process this event#0 at frame#4

and mpi thread process this request#0 at frame number#4

' of frame number 4

fps:380.372765

graphic thread is displaying pixel buffer:'new event#1 of frame number 1

and view thread process this event#1 at frame#4

and mpi thread process this request#1 at frame number#5

' of frame number 5

fps:808.407437

graphic thread is displaying pixel buffer:'new event#2 of frame number 2

and view thread process this event#2 at frame#4

and mpi thread process this request#2 at frame number#6

' of frame number 6

fps:814.332248

graphic thread is displaying pixel buffer:'new event#3 of frame number 3

and view thread process this event#3 at frame#4

and mpi thread process this request#3 at frame number#7

' of frame number 7

fps:814.332248

graphic thread is displaying pixel buffer:'new event#4 of frame number 4

and view thread process this event#4 at frame#8

and mpi thread process this request#4 at frame number#8

' of frame number 8

....
....
 

fps:629.722922

graphic thread is displaying pixel buffer:'new event#4 of frame number 189

and view thread process this event#4 at frame#193

and mpi thread process this request#4 at frame number#193

' of frame number 193

fps:637.755102

graphic thread is displaying pixel buffer:'new event#0 of frame number 190

and view thread process this event#0 at frame#194

and mpi thread process this request#0 at frame number#194

' of frame number 194

fps:638.569604

graphic thread is displaying pixel buffer:'new event#1 of frame number 191

and view thread process this event#1 at frame#195

and mpi thread process this request#1 at frame number#195

' of frame number 195

fps:638.569604

graphic thread is displaying pixel buffer:'new event#2 of frame number 192

and view thread process this event#2 at frame#196

and mpi thread process this request#2 at frame number#196

' of frame number 196

fps:638.162093

graphic thread is displaying pixel buffer:'new event#3 of frame number 193

and view thread process this event#3 at frame#197

and mpi thread process this request#3 at frame number#197

' of frame number 197

fps:638.162093

graphic thread is displaying pixel buffer:'new event#4 of frame number 194

and view thread process this event#4 at frame#198

and mpi thread process this request#4 at frame number#198

' of frame number 198

fps:629.722922

graphic thread is displaying pixel buffer:'new event#0 of frame number 195

and view thread process this event#0 at frame#199

and mpi thread process this request#0 at frame number#199

' of frame number 199

fps:637.755102

graphic runs:200, view runs:196, mpi runs:196



 

 

 







                                 back.gif (341 bytes)       up.gif (335 bytes)         next.gif (337 bytes)