Linux thread operations

         File-Server (II)

A. Second Edition
This is comp444 lab4 and it is concentrated on thread synchronization, FIFO, mapped I/O, advisory file lock etc.
B.The problem
COMP 444
Winter term 2005
Programming Assignment 4
Due Date: Midnight, April 11, 2005.
Objective: You will extend our simple file server from assignment three. Basically, we are going
to add functionality to search and modify a collection of ¡°database¡± files. While the completed file
server is indeed simple, it does demonstrate a few of the basic features of the real thing.
To begin, we must construct the database that our file server will manage. In order to guarantee a
common database format, I have written a couple of simple utilities that you can use to generate
and view the database files.
generate.c: You can compile this source file and use it to create a basic database file. The idea
is to generate a file that consists of a collection of records, each made up of one or more fields
(again, this is meant to model a simple database table). In order to make it easy to find specific
records, we will store the table in a binary format. In other words, each record consists of a set of
integer values. So, for example, if a record is made up of 8 fields, then every record will be 32
bytes in size (assuming 4 byte integers). If you have to find record 83, then it is quite simple to
identify the location of the start of the record. Note that the first 8 bytes of the dataset will record
the row and column count for the file (ie, first 4 bytes = # of rows, next 4 bytes = # of columns).
The remaining bytes will be the records themselves. This way, you can read the first 8 bytes and
know exactly how to process the remaining bytes.
The generate program takes three arguments: rows, columns, and file name. The row count is
limited to 10000 and the column count to 10. When values are created for each column, they are
randomly chosen in the range 0-99. So, for example, ¡°generate.exe 1000 6 sales.db¡± would
create a table called sales.db with 1000 rows and 6 columns (rows and columns start at 0).
You will create a database by generating a group of files. Do NOT include these files with your
submission (tables will be generated from scratch when the grading is done). You can name the
files anything you want but, for convenience, each file should probably end with a common suffix
like ¡°.db¡±. You will probably want to create 5 or 10 such files when testing.
viewer.c: Because the datasets are in a binary format, it is not possible to simply display the
records directly to the screen. It is important for testing (and grading) to actually see the records.
So I have provided a little viewer program that can read the binary data and display a humanreadable
ascii version to the screen.
The viewer program takes three arguments: start_record, end_record, and file name. The
start_record and end_record simply define the range of records you want to look at. So, for
example, ¡°viewer.exe 10 15 sales.db¡± would display 6 records in sales.db, starting with record 10.
Okay, so now we have a way to generate our database and a way to verify the contents. It¡¯s now
time to integrate them with our file server.
You are going to add two functions to the file server: select and update. The select option will
allow a client application to retrieve a range of records from a given dataset (much like the viewer
program).
The update option will allow your client to increment or decrement a given field by one. This is a
little more complicated because incrementing and decrementing implies that you have to be very
careful about concurrent access. This issue is directly related to one of the questions on the last
theory assignment. Specifically, you will have to employ advisory locking to ensure that the
concurrent updates do not corrupt the values in the record. So, for example, if the current value is
25 and you do 5 concurrent increments and 3 concurrent decrements, you must be able to check
that the final result is 27 (you can use the viewer program for this). Note: after the update
operation, a value may be greater than 99 or less than 0. This is OK.
Your clients will now have to specify these new requests. In the previous assignment, you created
a ¡°request record¡± with three fields: process ID, record operation, and file name. We will continue
to use this record. We will add three new operations: FILE_SELECT = 4, FILE_INCREMENT = 5,
and FILE_DECREMENT = 6. The other two fields in the request record will remain the same.
Clearly, if either of these two options is selected, we require additional information. For the
FILE_SELECT, we need to specify two additional fields: start_record and end_record. For the
FILE_INCREMENT and FILE_DECREMENT, we will also specify two additional fields: record
number and column number (we will update a specific field value in a specific record). Your
server processing code will therefore check the file operation value in the original request. If it is
between 4 and 6 (one of the new operations) then two additional integer values will follow the
main request record. The addition of the two supporting values means that you will have to
modify your queue structure slightly in order to accommodate the new values. You can do this
any way you like.
In terms of the actual processing there will be two main things to deal with. First, you will require
locking to ensure that data is not corrupted. When write locks are in place, other processes
should be denied access to the relevant regions in the file (although they are free to access nonlocked
regions of the same file). You will have to use blocking lock calls to implement this feature
with your threads. In other words, the update call should not fail if a lock is held. The thread
should simply block until it has obtained the lock.
The second thing that you must do when processing db requests is to use memory mapping
instead of read/write calls for all I/O. Again, it will be easy to define the map ranges since the db
files have such a regular structure. In any case, you will use maps to read the data for the
FILE_SELECT requests, and you will use maps for the update operations. (In a real server, of
course, we would have a caching framework that avoided a lot of redundant I/O). Concurrent
maps are not inherently synchronized but with the locking that you are using, it should not be
possible to map two identical regions at the same time.
On the client side, we will also make things simple. Instead of creating a really complicated
parsing routine for the client (for all of the possible command line parameters), we will simply
create three new clients: client_select.exe, client_increment.exe, and client_decrement.exe. The
select client will receive two arguments ¨C start and end of range. The increment and decrement
clients will also receive two arguments ¨C record and column. Otherwise, they are exactly the
same as the client from assignment 3 and will operate in the same way. For the select client, the
selected records will be returned via the pipe. These records will be written to the local result.PID
file. For the increment and decrement operations, the server will return two integers: the initial
value and the value after the increment or decrement operation. Again, you will write both of
these to result.PID. If you have handled you locking properly, you will be able to verify this by
looking at the modified values (when concurrent updates have been performed).
So that should be about it. Really, the assignment is not hard at all. It really just consists of
updating your server to provide two new functions, select and update. You will use locking in
order to properly manage concurrent access and you will use memory mapping for the I/O. If you
run into any major problems, post them to the newsgroup. Good luck.
¡¡
Should lab4 be compatible with lab3? 2005-04-03
Professor,

Since there is added features in lab4, should I my lab4 still workable with requests in lab3? What I mean is that should the marker also test those test casees in lab3 on my new lab4? Or can I assume I am doing a completely new lab4?

Thank you.
compatibility 2005-04-04
My assumption was that you would extend lab3 to add the new functionality required in lab 4. As such, both types of functionality should be accessible in the final assignment. This means that your server has to check to see which types of requests are being passed to it...and whether they require the additional two fields.

Dr Eavis
Do we really need three kinds of client? 2005-04-03
Since it only slightly reduces the complexity of client-end by defining client_select, client_decrement, client_increment. (Actually the benefit seems to me is just save one byte since we can just pass an additional "kinds" byte to differentiate the three different operations.) But the server side is exactly same. What I mean is that when server tries to write back, the worst case is to write back the whole record instead of a couple of integer when operation type is "select".

And my design is to make the server side always write same structure, then there is little benefit to write three almost identical clients.
three clients 2005-04-04
On the server side, the additional clients don't change anything. The server just writes back to the relevant pipe.

My use of separate clients wasn't so much for the benefit of students - it's for the benefit of the marker. Invariably, students will design command line interfaces that accept parameters in different orders. This is miserable to straighten out. Moreover, when running a bunch of clients concurrently, it can be tedious to try to keep track of all those parameters if only one client application is used. With separate client appliction names, it will be a little easier to set up and evaluate the test scripts. In practice, of course, you would never write separate applications for different requests.

Dr eavis
Unexplained problem of lock 2005-04-04
Professor,

I observed an unconfirmed phenomenon. That is when I locked a range of file after I mapped the file into memory with option of "PROT_READ|PROT_WRITE" and it seems I cannot update the contents of file. i.e. whatever I update or memcpy to the mapped memory, there is nothing changed in the file. If I removed the writelock, updating is OK. Am I making some mistakes cause I was really starved to death after almost half day's struggle?

Thank you.
Everything works fine unless you "msync" 2005-04-06
OK, it turns out I really need to "msync" otherwise I may never be able to modify file correctly.

It works like this: when you modify the mapped memory, OS would not update file immediately. Instead it become a scheduled job. And the context switch between threads are so fast that all the mapped action by afterwards threads are just reading the old file contents.

¡¡
Of course this is under assumption 2005-04-06
that each thread gets a copy of "mapped" version of same file.

¡¡
¡¡
C.The idea of program
1. file listing:
a) source code file:
	myhead.h	//all necessary included header file and some constants
	errHandle.c	//an error handling module, very simple
	fprotocol.h	//define all communication-related structures, enum, constants
	fclient.c	//the source code for an uniformal client and all other three client "select,decrment,increment" just "exec" it
	fserver.c	//the source code file for server
	client_select.c	//the select client which is a just a wrapper of fclient.c
	client_increment.c //the increment client and is only a wrapper of real client fclient.c
	client_decrement.c //the decrement client and is only a wrapper of read client fclient.c
	generate.c	//the professor supplied generater
	viewer.c	//the professor supplied viewer
b) executable file:
	fserver.exe	//the server executable
	fclient.exe	//the uniformed client executable
	client_select.exe //the select client executable
	client_increment.exe //the increment client executable
	client_decrement.exe //the decrement client executable
	generate.exe	//the professor supplied generater
	viewer.exe	//the professor supplied viewer
c) makefile:
	makefile	//it includes compiling of professor supplied "generate and viewer"

d) script file:
	setup.script	//a simple script file to setup running environment, only need run once at current directory,
			//create directory, create db files
	run.script	//a test running script, should be run in directory 'rundir'

2. program logic:
a) server-side:
	i) I made a big change in job queue from lab3 because I realized that I should include all life time of a thread into a 
	critical section. Synchronization doesn't necessarily need to be sequencial. That is to say I only include the minimum necessary
	part into critical section such as getting a job, inserting a job, removing a job which absolutely needs to be synchronized. And
	for all other part of code like reading, updating, locking, unlocking file maybe blocked or take a long time, so they SHOULD NOT 
	be included into critical section. Otherwise all job can only be executed in a manner of "first come first server" and we cannot
	even observe either "queue_full" phenomenon or "blocked-when-set-lock" phenomenon because when one thread will only finish his 
	job before he gives up "mutex".
	ii) So, my job "queue" is not exactly a "queue". Instead it is simply an array of job and "listening thread will insert job 
	when there is vacancy and all working threads will try to pick up a job when it is not taken by others. The sequence of array
	doesn't necesarily gurantee the execution sequence because some job maybe blocked and still those jobs inserted after blocked 
	jobs can still proceed.
	iii) I dropped the idea of "two-condition-variables" which originally differentiate "listening thread" from all other "working 
	threads" because it become two complicated after the queue is not really FIFO. Now all threads are bound with one conditional
	variable and the "broadcast" will wake up all threads even though job-queue is either full or empty.
	iv) Each thread will "map" a file to its own space. Therefore it is absolutely necessary for a call "msync" after each update to
	the file contents. Otherwise the consequent thread operation might be still on old, un-modified version of file. Of course a 
	better solution is to book keep all files opened by "mmap" call and maintain a single copy of mapped memory pointer for each 
	distinct file. However, I am a bit lazy to do this.
	v) The synchronization of each processing job is even simpler than lab3. First I designed an "enum" flag for job status(empty,
	issued, taken) and put them into a parallel array besides "job" structure array. By checking "job-status" array, we know the 
	status of each position of "job structure array". Inserting, taking, eliminating a job structure is a critical operation and is 
	protected by a "mutex". This would happened at the beginning and the ending of a procedure of thread: taking a job, emptying a 
	job. In between, the thread maybe blocked due to file locks and it won't affect other threads' getting jobs.
client-side:
	i) The only thing worth mentioning is the output structure. I designed a universal output structure for all types of file
	operation. That is, a structure with an integer array of ten. Because the maximum column number of a record is 10 and all
	other requests only need up to two integer to be returned. This is fine for all other request type except "select" which may
	require more than one record to be returned. Therefore I asked the server to handle "select" by sending records one by one.
	And "select" client surely knows how many records it expects, then loop for the number of records before writing results to
	result file.

3. program running:
	i) You should run the "setup.script" once and it will generate a running directory "rundir" and generate all necessary database
	files. 
	ii) Under "rundir", run "run.script" which is a test script and shows basic.
	iii) Similarly like lab3, you can setup number of threads in server by passing it in commandline argument. If not, the default
	is 10 threads.
	iv) The running parameter format is like this: 
		client_select.exe file_name start_record end_record
		client_increment.exe file_name row_number col_number
		client_decrement.exe file_name row_number col_number
		fclient.exe file_name request_type [param1] [param2]  //for lab3 compatible.
	Please note that in order to test lab3, you need to run "fclient.exe" and it requires two parameters, "file_name" and 
	"request_type". 

¡¡

D.The major functions
The major lesson I learned is from one of my classmates. I claimed in class that within one process, every "open" call will just
be like a "dup" call if you are trying to open the same file within different threads. This implies there is synchronization 
problem with that file for both "read/write". And he repeatedly said that it wouldn't make sense. And finally I wrote a simple
test to try to convince him. It turned out that I am wrong. Every "open" call will generate an independent "file table structure".
I felt quite embarrassed. 
Another small issue is that you need to specify "PROT_READ" if you want to read from the "mapped I/O" by call of "mmap". Even it
seems to you very natural that it already is mapped to your "logical memory address". Still you need right to "read". So does write.
And the one you might neglect is that if each thread try to map same file to its own copy, you need to "msync" for each update.
Otherwise, you would be doomed.
E.Further improvement
¡¡
F.File listing
1. myhead.h
2. errHandle.c
3. fprotocol.h
4. fclient.c
5. fserver.c
6. client_select.c
7. client_decrement.c
8. client_increment.c
9. generate.c (supplied by professor)
10. viewer.c (supplied by professor)
11. makefile
12. setup.script
13. run.script
¡¡
file name: myhead.h
#ifndef MYHEAD_H
#define MYHEAD_H
#include <sys/resource.h>
#include <sys/wait.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <dirent.h>
#include <unistd.h>
#include <string.h>
#include <fcntl.h>
#include <signal.h>
#include <sys/mman.h>

extern int errno;

//define three mode and it is purely like
// DFA=Deterministic Finite Automaton
#define FindNext        0
#define Comparing       1

typedef int bool;

#define TRUE  1
#define FALSE 0

//for a particular file NAME, NOT path
//const int MaxNameLength=20;
#define MaxPathLength 128

#define MaxNameLength  20 

#define MaxIntLength   20

#define BuffSize   128

#define EmptyGridNode   'E' //the empty tic-tac-toe grid

#define PlayerTypeLength 2

#define PlayerPidLength (MaxIntLength+1)

#define BoardLength 9 //the tic-tac-toe length

#define PlayerX_Pos PlayerTypeLength

#define PlayerO_Pos (PlayerTypeLength*2+PlayerPidLength)

#define BoardPos ((PlayerTypeLength+PlayerPidLength)*2+1)

#define WinnerPos (BoardPos+BoardLength+1)

#define WinnerTypeLength 1 //simply the X or O

#define SleepSeconds 1

#define FIFOMode S_IRUSR|S_IWUSR|S_IXUSR|S_IRGRP|S_IWGRP|S_IXGRP

void errHandle(char* msg);
	
#endif


file name: errHandle.c
#include "myhead.h"

extern int errno;

void errHandle(char* msg)
{
	if (msg!=NULL)
	{
		perror(msg);
	}
	else
	{
		strerror(errno);
	}	
	exit(errno);	
}
	
file name: fprotocol.h
#ifndef FPROTOCOL_H
#define FPROTOCOL_H
#include <sys/types.h>

typedef enum
{
	FILE_TYPE      =   1,
	FILE_INODE     =   2,
	FILE_SIZE      =   3,
	FILE_SELECT    =   4,
	FILE_INCREMENT =   5,
	FILE_DECREMENT =   6,
	FILE_NOT_FOUND =  -1,
	UNKNOWN_ERROR  =  -2,
	QUEUE_FULL     =  -3
}RequestType;		

typedef enum
{
	JobEmpty	=0,
	JobIssued	=1,
	JobTaken	=2
}JobStatus;

typedef enum
{
	Regular    =    1,
	Directory  =    2,
	SoftLink   =    3,
	Character  =    4,
	Block      =    5, 
	FIFO       =    6, 
	Socket     =    7
}FileType; 

#define MaxColumnNumber 10

typedef struct
{
	pid_t pid;
	RequestType fileOp;
	int resultArray[MaxColumnNumber];
	int resultNumber;
}ResponseRec;


#define MaxRequestFileNameLength 64


typedef struct 
{
        pid_t pid;
        RequestType fileOp;
	int param1;
	int param2;
        char fileName[MaxRequestFileNameLength];
}Job_t;

#define RequestRecordLength sizeof(Job_t)

char* serverFifoName="fserver.fifo";

#define DefaultThreadNumber 10

#endif

		
file name: fclient.c
#include "myhead.h"
#include "fprotocol.h"

extern char* serverFifoName;
extern char* requestTypeStr;

char* getRequestName(RequestType theType);

void writeResult(ResponseRec* responseRec);

int main(int argc, char* argv[])
{
	int fdwrite, fdread, count=0;
	ResponseRec result;
	char pidBuf[MaxIntLength+7];
	Job_t requestRec;
	char readBuf[MaxIntLength];
	if (argc!=5&&argc!=3)
	{
		errHandle("usage:fclient.exe filename requestType [param1] [param2]\n");
	}
	requestRec.pid=getpid();
	requestRec.fileOp=atoi(argv[2]);
	strcpy(requestRec.fileName, argv[1]);
	if (argc==5)
	{
		requestRec.param1=atoi(argv[3]);
		requestRec.param2=atoi(argv[4]);
		if (requestRec.fileOp==FILE_SELECT)
		{
			if (requestRec.param1>requestRec.param2)
			{
				errHandle("in select option start record number is smaller than end record\n");
			}
		}
	}
	//else ignored
	sprintf(pidBuf, "%d.fifo", getpid());
	if (mkfifo(pidBuf, FIFOMode)<0)
	{
		errHandle("create fifo at client fails");
	}
	if ((fdwrite=open(serverFifoName, O_RDWR))<0)
	{
		errHandle("open fifo error");
	}
	if (write(fdwrite, &requestRec, sizeof(Job_t))!=sizeof(Job_t))
	{
		errHandle("write error for fifo");
	}
	if ((fdread=open(pidBuf, O_RDWR))<0)
	{
		errHandle("open fifo for read error");
	}
	while (1)
	{
		if (read(fdread, &result, sizeof(ResponseRec))!=sizeof(ResponseRec))
		{
			errHandle("error of read fifo");
		}
		else
		{
			//printf("client %d read something\n", getpid());
			if (result.fileOp!=QUEUE_FULL)//if it is, then need to be patient
			{
				if (requestRec.fileOp==FILE_SELECT)
				{
					count++;
					writeResult(&result);
					if (count>requestRec.param2-requestRec.param1)
					{
						break;		
					}
					else
					{
						continue;
					}
				}
				writeResult(&result);
				break;
			}
			else
			{
				printf("received queue_full msg\n");
			}
		}
	}
	//read successfuland close fifo
	if (close(fdread)<0)
	{
		errHandle("close fifo error");
	}
	return 0;
}
			
void writeResult(ResponseRec* result)
{
	char pidBuf[MaxIntLength+7];
	char buf[128];
	int fdwrite, i;
	sprintf(pidBuf, "%d.result", getpid());
	if ((fdwrite=open(pidBuf, O_WRONLY|O_CREAT|O_APPEND, FIFOMode))<0)
	{
		errHandle("create result file error");
	}
	//in case error happens
	//printf("fclient is writing\n");
	sprintf(buf, "\nrequest type=%s\n", getRequestName(result->fileOp));
	if (result->fileOp>0)
	{
		for (i=0; i<result->resultNumber; i++)
		{
			sprintf(pidBuf, "%d ", result->resultArray[i]);
			strcat(buf, pidBuf);
		}
	}
		
	if (write(fdwrite, buf, strlen(buf))!=strlen(buf))
	{
		errHandle("write result error");
	}
	if (close(fdwrite)<0)
	{
		errHandle("close error");
	}
}


char* getRequestName(RequestType theType)
{
	switch(theType)
	{
	case FILE_TYPE:
		return "FILE_TYPE";
	case FILE_INODE:
		return "FILE_INODE";
	case FILE_SIZE:
		return "FILE_SIZE";
	case FILE_SELECT:
		return "FILE_SELECT";
	case FILE_INCREMENT:
		return "FILE_INCREMENT";
	case FILE_DECREMENT:
		return "FILE_DECREMENT";
	case FILE_NOT_FOUND:
		return "FILE_NOT_FOUND";
	case UNKNOWN_ERROR:
		return "UNKNOWN_ERROR";
	case QUEUE_FULL:
		return "QUEUE_FULL";
	}
}
¡¡
file name: fserver.c
#include "myhead.h"
#include "fprotocol.h"
#include <pthread.h>

Job_t* jobs;
JobStatus* jobStatus;//array of empty status of job
extern char* serverFifoName;

int fdServer;
//int jobCount=0;
//int workerCount=0;
//int currentJob=0;
int threadNumber;
Job_t requestRec;
//ResponseRec responseRec;
//int fifoMode=S_IRUSR|S_IWUSR|S_IXUSR|S_IRGRP|S_IWGRP|S_IXGRP;
pthread_t* threads;
//char* serverFifoName="fserver.fifo";
//DynaArrayPtr arrayPtr;

void enterCritical();
void exitCritical();

pthread_cond_t jobCond=PTHREAD_COND_INITIALIZER;
pthread_cond_t controlCond=PTHREAD_COND_INITIALIZER;
pthread_mutex_t mutex=PTHREAD_MUTEX_INITIALIZER;

void createQueue();
void initialize(int argc, char* argv[]);
void uninitialize();

void createThreads();
void createFifo(char* fifoName);
void writeFifo(ResponseRec* responseRec);

//the child do job
void doJob(int currentJob);
//the control thread is main thread
void doControl(int currentJob);
void broadcast(pthread_cond_t* cond);

int getJob(JobStatus jobStatus);
//the thread function
void* jobFunc(void* arg);
void controlFunc();
int openDB(char* fileName);

//should return MAPPED POINTER

int main(int argc, char* argv[])
{
	initialize(argc, argv);
	controlFunc();
	return 0;
}

int openDB(char* fileName)
{
	int fd;
	//in order to map file, even readonly need to be read&write
	if ((fd=open(fileName, O_RDWR))<0)
	{
		errHandle("open db error");
	}
	return fd;
}


//control always broadcast job_cond 
void controlFunc()
{
	int n, currentJob;
	ResponseRec responseRec;
	while (1)
	{
		//reading here are BLOCKING
		if ((n=read(fdServer, &requestRec, sizeof(Job_t)))!=sizeof(Job_t))
		{
			if (n==0)
			{
				//sleep(SleepSeconds);
			}
			else
			{
				//printf("\ncontrol thread read %d\n", n);
				errHandle("read pipe in control thread error");
			}
		}

		//printf("control read pid=%d\n", requestRec.pid);
		//lock the mutex
		enterCritical();
		//if the queue is full
		while ((currentJob=getJob(JobEmpty))==-1)
		{
			//printf("job queue is full now\n");
			//sprintf(queueFullBuf, "%d", QUEUE_FULL);
			//buf always has processID at beginning and terminated with NULL
			//sprintf(pidBuf,"%d.fifo",requestRec.pid);
			//strcat(pidBuf, ".fifo");
			responseRec.pid=requestRec.pid;
			responseRec.fileOp=QUEUE_FULL;
			//printf("prepare to write to fifo of pid=%d\n", responseRec.pid);
			writeFifo(&responseRec);
			//printf("write job queue full msg\n");
			broadcast(&jobCond);
			if (pthread_cond_wait(&controlCond, &mutex))
			{
				errHandle("condition wait error");
			}
		}
		//currentJob=getJob(JobEmpty);
		doControl(currentJob);
		broadcast(&jobCond);
		//printf("insert job %d\n", currentJob);
		exitCritical();
	}
}

void broadcast(pthread_cond_t* cond)
{
	if (pthread_cond_broadcast(cond))
	{
		errHandle("condition broadcast error\n");
	}
	
/*
	//always favour control thread
	if (pthread_cond_broadcast(&controlCond))
	{
		errHandle("condition broadcast error\n");
	}
	//printf("jobcontrol broadcasted\n");
	//to make job count become 80% of capacity
	//if (jobCount>8*threadNumber/10)
	{		
		if (pthread_cond_broadcast(&jobCond))
		{
			errHandle("condition broadcast error\n");
		}
	}
*/
}

					
void doJob(int currentJob)
{
	//char pidBuf[MaxIntLength];//20
	//char resultBuf[MaxIntLength];
	ResponseRec responseRec;
	caddr_t ptr;
	struct flock lock;
	struct stat statBuf;
	//workerCount++;	
	int fd, row, col, i, j,len, offset;
	//by default it is the same
	responseRec.pid=jobs[currentJob].pid;
	responseRec.fileOp=jobs[currentJob].fileOp;
	//printf("inside job\n");
	if (stat(jobs[currentJob].fileName, &statBuf)<0)
	{
		if (errno==ENOENT)
		{
			//however heere is different
			responseRec.fileOp=FILE_NOT_FOUND;
		}
		else
		{
			errHandle("stat error");
		}
	}
	//printf("what is this?\n");
	switch(jobs[currentJob].fileOp)
	{
	case FILE_TYPE:
		if (S_ISREG(statBuf.st_mode))
		{
			responseRec.resultArray[0]=Regular;
		}
		if (S_ISDIR(statBuf.st_mode))
		{
			responseRec.resultArray[0]=Directory;
		}
		if (S_ISLNK(statBuf.st_mode))
		{
			responseRec.resultArray[0]=SoftLink;
		}
		if (S_ISCHR(statBuf.st_mode))
		{
			responseRec.resultArray[0]=Character;
		}
		if (S_ISBLK(statBuf.st_mode))
		{
			responseRec.resultArray[0]=Block;
		}
		if (S_ISFIFO(statBuf.st_mode))
		{
			responseRec.resultArray[0]=FIFO;
		}
		if (S_ISSOCK(statBuf.st_mode))
		{
			responseRec.resultArray[0]=Socket;
		}
		responseRec.resultNumber=1;
		break;
	case FILE_INODE:
		responseRec.resultArray[0]=statBuf.st_ino;
		responseRec.resultNumber=1;
		break;
	case FILE_SIZE:
		responseRec.resultArray[0]=statBuf.st_size;
		responseRec.resultNumber=1;
		break;
	case FILE_SELECT:
		//printf("here?\n");
		fd=openDB(jobs[currentJob].fileName);
		if (read(fd, &row, sizeof(int))!=sizeof(int)||read(fd, &col, sizeof(int))!=sizeof(int))
		{
			errHandle("read error of row and col");
		}
		offset=8+jobs[currentJob].param1*col*sizeof(int);
		len=col*(jobs[currentJob].param2-jobs[currentJob].param1)*sizeof(int);
		//printf("param1=%d, param2=%d, offset=%d, len=%d\n", jobs[currentJob].param1, jobs[currentJob].param2, offset, len);
		//printf("before map\n");
		//the place to lock is 
		lock.l_start=offset;
		lock.l_type=F_RDLCK;
		lock.l_whence=SEEK_SET;
		lock.l_len=len;
		//printf("here is right?");
		if (fcntl(fd, F_SETLKW, &lock)<0)
		{
			errHandle("fcntl set lock error");
		}
		//sleep(1);
		//here the offset+len is all I want to map
		if ((ptr=mmap(0, offset+len, PROT_READ, MAP_SHARED, fd, 0))==(caddr_t)(-1))
		{
			errHandle("mmap error");
		}
		//printf("map succeed in select, ptr=%u\n", ptr);
		responseRec.resultNumber=col;
		for (i=0; i<=jobs[currentJob].param2-jobs[currentJob].param1; i++)
		{
			memcpy(responseRec.resultArray, ptr+offset+i*col*4, col*4);
			/*
			for (j=0; j<col; j++)
			{
				printf("array[%d]=%d\n", j, responseRec.resultArray[j]);
			}
			*/
			writeFifo(&responseRec);//every time it writes and closes fd
		}
		if (munmap(ptr, offset+len)<0)
		{
			errHandle("unmap error");
		}
		lock.l_type=F_UNLCK;

		if (fcntl(fd, F_SETLK, &lock)<0)
		{
			errHandle("unlock error");
		}

		return;//no need to writefifo
	case FILE_INCREMENT:
	case FILE_DECREMENT:
		fd=openDB(jobs[currentJob].fileName);
		if (read(fd, &row, sizeof(int))!=sizeof(int)||read(fd, &col, sizeof(int))!=sizeof(int))
		{
			errHandle("read error of row and col");
		}
		if (row<jobs[currentJob].param1||col<jobs[currentJob].param2)
		{
			errHandle("invalid record or column number");
		}

		offset=8+jobs[currentJob].param1*col*4+jobs[currentJob].param2*4;
		//printf("param1=%d, param2=%d, offset=%d\n", jobs[currentJob].param1, jobs[currentJob].param2, offset);
		lock.l_start=offset;
		lock.l_type=F_WRLCK;
		lock.l_whence=SEEK_SET;
		lock.l_len=sizeof(int);
		//printf("here is right?");
		if (fcntl(fd, F_SETLKW, &lock)<0)
		{
			errHandle("fcntl set lock error");
		}
		//sleep(1);		
		if ((ptr=mmap(0, offset+sizeof(int), PROT_WRITE|PROT_READ, MAP_SHARED, fd, 0))==(caddr_t)(-1))
		{
			errHandle("mmap error");
		}
		responseRec.resultArray[0]=*((int*)(ptr+offset));
		if (jobs[currentJob].fileOp==FILE_INCREMENT)
		{
			responseRec.resultArray[1]=responseRec.resultArray[0]+1;
		}
		else
		{
			responseRec.resultArray[1]=responseRec.resultArray[0]-1;
		}
		//printf("memcpy %d\n", *(responseRec.resultArray+1));
		memcpy(ptr+offset,responseRec.resultArray+1, sizeof(int));

		//(*((int*)(ptr+offset)))=responseRec.result;
		//responseRec.resultArray[1]=*((int*)(ptr+offset));
		responseRec.resultNumber=2;
		if (msync(ptr, offset, MS_SYNC)<0)
		{
			errHandle("msync error");
		}
		//printf("originally it is: %d now the record becomes %u\n", responseRec.resultArray[0], *((int*)(ptr+offset)));
		if (munmap(ptr, sizeof(int))<0)
		{
			errHandle("unmap error");
		}
		lock.l_type=F_UNLCK;
		if (fcntl(fd, F_SETLK, &lock)<0)
		{
			errHandle("unlock error");
		}
		//printf("unlock successful\n");
		//printf("what I write is:result[0]=%d, [1]=%d\n", responseRec.resultArray[0], responseRec.resultArray[1]);
		//close(fd);
		break;
	default:
		responseRec.fileOp=UNKNOWN_ERROR;
		break;
	}
	//printf("\nhere child read job index=%d", currentJob);
	//printf("\npid=%d, fileop=%d, filename=%s\n", jobs[currentJob].pid, jobs[currentJob].fileOp, jobs[currentJob].fileName);
	//sprintf(pidBuf, "%d.fifo", jobs[currentJob].pid);
	//sprintf(resultBuf, "%d", result);
	//it is client's responsibility to create fifo
	writeFifo(&responseRec);
}

void writeFifo(ResponseRec* responseRec)
{
	int fd;
	//char intBuf[MaxIntLength];
	char fifoName[MaxIntLength+5];
	sprintf(fifoName, "%d.fifo", responseRec->pid);
	if ((fd=open(fifoName, O_WRONLY))<0)
	{
		//printf("\nthe fifonoame is  %s\n", fifoName);
		//write(STDOUT_FILENO, fifoName, strlen(fifoName));
		errHandle("open fifo for write error");
	}
	//sprintf(intBuf, "%d", responseRec.result);
	if (write(fd, responseRec, sizeof(ResponseRec))!=sizeof(ResponseRec))
	{
		errHandle("write fifo error");
	}
	//close(fd);
}

//assume processID is only 5 digits terminated with NULL
//assume fileoperation is only 1digit terminated with NULL
void doControl(int currentJob)
{
	jobs[currentJob]=requestRec;//should be struct copy??
	//strcpy(jobs[index].fileName, buf+ProcessIDLength+FileOperationLength+1);
	//jobCount++;
	//printf("change jobstatus [%d]=%d\n", currentJob, JobIssued);
	jobStatus[currentJob]=JobIssued;
	//printf("get a job with pid=%d, fileop=%d, filename=%s\n", pid, requestType, jobs[index].fileName);
}

void* jobFunc(void* arg)
{
	int currentJob;
	while (1)
	{
		enterCritical();
		//waiting for condition variable
		while ((currentJob=getJob(JobIssued))==-1)
		{
			broadcast(&controlCond);
			if (pthread_cond_wait(&jobCond, &mutex))
			{
				errHandle("condition wait error\n");
			}
		}
		//getJob must retrieve job of same status issued by parameter in job status array
		//printf("begin search job\n");
		//currentJob=getJob(JobIssued);
		jobStatus[currentJob]=JobTaken;
		//jobCount--;//not here!!!
		//workerCount++;
		//here is end of critical section
		exitCritical();
		//do job must be thread safe
		//printf("begin one job\n");
		doJob(currentJob);
		//printf("finished one job\n");

		//workerCount--;
		enterCritical();
		//jobCount--;
		jobStatus[currentJob]=JobEmpty; //make it empty
		//workerCount--;
		exitCritical();
		broadcast(&controlCond);
	}
}

void initialize(int argc, char* argv[])
{
	if (argc==1)
	{
		threadNumber=DefaultThreadNumber;
	}
	else
	{
		if (argc==2)
		{
			threadNumber=atoi(argv[1]);
			if (threadNumber<=0)
			{
				errHandle("usage: fserver.exe [threadnumber]\n");
			}
	
		}
		else
		{
			errHandle("usage: fserver.exe [threadnumber]\n");
		}
	}
	createFifo(serverFifoName);
	createQueue();
	//arrayPtr=createArray();//create the dynamic array for integer;
	if ((fdServer=open(serverFifoName, O_RDWR))<0)
	{
		errHandle("open server fifo error");
	}
	createThreads();
}

void uninitialize()
{
	free(jobs);
	free(jobStatus);
}

void createQueue()
{
	if ((jobs=(Job_t*)malloc(threadNumber*sizeof(Job_t)))==NULL)
	{
		errHandle("create job queue error\n");
	}
	//all job status is '0'=empty
	if ((jobStatus=(JobStatus*)calloc(threadNumber, sizeof(JobStatus)))==NULL)
	{
		errHandle("create job status array error");
	}
	//jobCount=workerCount=0;//initialize to 0;
}


void createFifo(char* fifoName)
{
	if (mkfifo(fifoName, FIFOMode)<0)
	{
		if (errno!=EEXIST)	
		{
			errHandle("create fifo error\n");
		}
	}
}


void createThreads()
{
	int i;
	if ((threads=(pthread_t*)malloc(threadNumber*sizeof(pthread_t)))==NULL)
	{
		errHandle("cannot malloc pthreads array\n");
	}
	for (i=0; i<threadNumber; i++)
	{
		//each thread is given an index "i"
		if (pthread_create(&threads[i], NULL, jobFunc,(void*)i))
		{
			errHandle("create threads error\n");
		}
	}
}

		

void enterCritical()
{
	if (pthread_mutex_lock(&mutex))
	{
		errHandle("mutex lock error");
	}
}

void exitCritical()
{
	if (pthread_mutex_unlock(&mutex))
	{
		errHandle("mutex unlock error");
	}
}

//return the first index of status same as param "status"
int getJob(JobStatus status)
{
	int i;
	for (i=0; i<threadNumber; i++)
	{
		//printf("jobStatus[%d]=%d\n", i, jobStatus[i]);
		if (jobStatus[i]==status)
		{
			return i;
		}
	}
	//errHandle("cannot find designated job");
	return -1;
}

file name: client_select.c
#include "myhead.h"
#include "fprotocol.h"


int main(int argc, char* argv[])
{
	char buf[2];
	if (argc!=4)
	{
		printf("usage: client_select.exe filename startrec endrec\n");
		exit(0);
	}
	buf[0]='0'+FILE_SELECT;
	buf[1]='\0';
	execl("./fclient.exe", "fclient.exe", argv[1],  buf, argv[2], argv[3], NULL);
	return 0;
}
¡¡
file name: client_increment.c
#include "myhead.h"
#include "fprotocol.h"


int main(int argc, char* argv[])
{
	char buf[2];
	if (argc!=4)
	{
		printf("usage: client_select.exe filename startrec endrec\n");
		exit(0);
	}
	buf[0]='0'+FILE_INCREMENT;
	buf[1]='\0';
	execl("./fclient.exe", "fclient.exe", argv[1],  buf, argv[2], argv[3], NULL);
	return 0;
}
¡¡
file name: client_decrement.c
#include "myhead.h"
#include "fprotocol.h"


int main(int argc, char* argv[])
{
	char buf[2];
	if (argc!=4)
	{
		printf("usage: client_select.exe filename startrec endrec\n");
		exit(0);
	}
	buf[0]='0'+FILE_DECREMENT;
	buf[1]='\0';
	execl("./fclient.exe", "fclient.exe", argv[1],  buf, argv[2], argv[3], NULL);
	return 0;
}
¡¡
file name: generate.c (supplied by professor)
#include <stdio.h>
#include <stdlib.h>



int main(int argc, char *argv[]){

	FILE *fp;
	int rows, columns, i, j;
	int *record;
	size_t objects_written;


	/* process the command line arguments */
	if(argc != 4){
		fprintf(stderr, "Usage: %s rows columns filename\n", argv[0]);
		exit(1);
	}

	rows = atoi(argv[1]);
	columns = atoi(argv[2]);

	if( (fp = fopen(argv[3], "w")) == NULL){
		perror("File open error");
		exit(1);
	}

	
	/* do some error checking */
	if( (rows < 1) || (rows > 10000)){
		fprintf(stderr, "Invalid row count: %d\n", rows);
		exit(1);
	} 

	if( (columns < 1) || (columns > 10)){
		fprintf(stderr, "Invalid column count: %d\n", columns);
		exit(1);
	} 


	/* write the row count and column count to the file */
	if( (objects_written = fwrite(&rows, sizeof(int), 1, fp)) != 1){
		fprintf(stderr, "Error writing row count\n");
		exit(1);
	}

	if( (objects_written = fwrite(&columns, sizeof(int), 1, fp)) != 1){
		fprintf(stderr, "Error writing column count \n");
		exit(1);
	}



	/* create a buffer for each record */
	if( (record = (int *)malloc(rows * sizeof(int))) == NULL){
		fprintf(stderr, "Malloc error\n");
		exit(1);
	}

	srand(time(0)); /* seed the random number generator */

	/* create the records and write them to file */
	for(i = 0; i < rows; i++){
		for(j = 0; j < columns; j++)
			record[j] = rand() % 100;
			
		if( (objects_written = fwrite(record, sizeof(int), columns, fp)) != columns){
			fprintf(stderr, "Error writing row %d \n", i);
			exit(1);
		}
	}

	fclose(fp);
	
	printf("\nWrote %d records (%d columns) to %s\n\n", rows, columns, argv[3]);

}
¡¡
¡¡
file name: viewer.c
#include <stdio.h>
#include <stdlib.h>



int main(int argc, char *argv[]){

	FILE *fp;
	int start_row, end_row, rows, columns, i, j;
	int *record;
	size_t objects_read;
	char	header[7];


	/* process the command line argumens */
	if(argc != 4){
		fprintf(stderr, "Usage: %s startrow endrow filename\n", argv[0]);
		exit(1);
	}

	start_row = atoi(argv[1]);
	end_row = atoi(argv[2]);

	if( (fp = fopen(argv[3], "r")) == NULL){
		perror("File open error");
		exit(1);
	}

	
	
	
	/* read the row count and column count from the file */
	if( (objects_read = fread(&rows, sizeof(int), 1, fp)) != 1){
		fprintf(stderr, "Error reading row count\n");
		exit(1);
	}

	if( (objects_read = fread(&columns, sizeof(int), 1, fp)) != 1){
		fprintf(stderr, "Error reading column count \n");
		exit(1);
	}
	
	
	
	
	/* a little error checking on row entries */
	if( (start_row < 0) || (start_row > rows)){
		fprintf(stderr, "Invalid start_row value: %d\n", start_row);
		exit(1);
	} 

	if( (end_row < 0) || (end_row > rows)){
		fprintf(stderr, "Invalid end_row value: %d\n", end_row);
		exit(1);
	} 
	
	if(start_row > end_row){
		fprintf(stderr, "Start row cannot be greater than end row\n");
		exit(1);
	} 

	



	/* find the specified records */
	if( (fseek(fp, start_row * (columns * sizeof(int)), SEEK_CUR)) < 0){
		perror("Seek error");
		exit(1);
	}


	/* create a buffer for each record */
	if( (record = (int *)malloc(rows * sizeof(int))) == NULL){
		fprintf(stderr, "Malloc error\n");
		exit(1);
	}


	/* print the header line */
	printf("\n\nDataset has %d rows and %d columns\n", rows, columns);
	printf("You have selected row %d to row %d\n", start_row, end_row); 
	printf("\nRow\t\t");
	for(j = 0; j < columns; j++){
		sprintf(header, "col %d", j);
		printf("%s\t", header);
	}
	printf("\n\n");
	

	/* display the specified records to the screen */
	for(i = 0; i < (end_row - start_row + 1); i++){
			
		if( (objects_read = fread(record, sizeof(int), columns, fp)) != columns){
			fprintf(stderr, "Error writing row %d \n", start_row + i);
			exit(1);
		}
		
		printf("%d\t\t", start_row + i);
		for(j = 0; j < columns; j++)
			printf("%d\t", record[j]);
		printf("\n");
	}

	printf("\n\n");
	
	fclose(fp);


}

file name: setup.script
echo "this is a simple set up script and only need to be run once"
echo "create a 'rundir' directory for testing"
mkdir rundir
echo "now let's generate some database file for testing purpose after we run make"
make
cp ./run.script ./rundir/run.script 
cd rundir
chmod +x run.script

echo "generate sales.db with 1000 record and 6 column"
./generate.exe 1000 6 sales.db

echo "generate salary.db with 500 record and 8 column"
./generate.exe 500 8 salary.db

echo "generate personnel.db with 350 record and 5 column"
./generate.exe 350 5 personnel.db

echo "generate credit.db with 100 record and 10 column"
./generate.exe 100 10 credit.db

echo "now you can run the test script 'run.script' for a testing"


file name: run.script
echo "now let's run server with default 10 threads"
./fserver.exe 10 &
echo "before running, let's see the result of rec 1 col 2 of personnel.db\n"
./viewer.exe 1 1 personnel.db
sleep 2

echo "now decrment 6 times for rec 1 col 2\n"
./client_decrement.exe personnel.db 1 2 &
./client_decrement.exe personnel.db 1 2 &
./client_decrement.exe personnel.db 1 2 &
./client_decrement.exe personnel.db 1 2 &
./client_decrement.exe personnel.db 1 2 &
./client_decrement.exe personnel.db 1 2 &

echo "OS need time to synchronize mapped memory contents with disk file\n"
echo "we must sleep 2 seconds before we can see the result\n"
sleep 2
./viewer.exe 1 1 personnel.db 

echo "now increment 10 time for rec=1 col=2\n"
./client_increment.exe personnel.db 1 2 &
./client_increment.exe personnel.db 1 2 &
./client_increment.exe personnel.db 1 2 &
./client_increment.exe personnel.db 1 2 &
./client_increment.exe personnel.db 1 2 &
./client_increment.exe personnel.db 1 2 &
./client_increment.exe personnel.db 1 2 &
./client_increment.exe personnel.db 1 2 &
./client_increment.exe personnel.db 1 2 &
./client_increment.exe personnel.db 1 2 &

echo "We must sleep for 2 seconds before we can see result again/n"
sleep 2
./viewer.exe 1 1 personnel.db
echo "it now selects record 1 up to record 20 of 'personnel.db', the result is usually stored the 'maximum_pid.result' file\n"
./client_select.exe personnel.db  1 20 &


file name: makefile
all:    fserver.exe fclient.exe client_select.exe client_increment.exe client_decrement.exe generate.exe viewer.exe
	@echo "make complete for fserver.exe, fclient.exe, "
	@echo "client_select.exe, cliclient_increment.exe ent_decrement.exe,"
	@echo "generate.exe, viewer.exe" 
	cp *.exe *.script ./rundir/
	rm -f ./rundir/*.fifo
	rm -f ./rundir/*.result
	
generate.exe : generate.c
	@echo "compiling generate.exe..."
	gcc generate.c -o generate.exe

viewer.exe : viewer.c
	@echo "compiling viewer.exe..."
	gcc viewer.c -o viewer.exe

client_increment.exe : client_increment.c
	@echo "compiling client_increment.exe..."
	gcc -g client_increment.c -o client_increment.exe
client_decrement.exe : client_decrement.c
	@echo "compiling client_decrement.exe..."
	gcc -g client_decrement.c -o client_decrement.exe

client_select.exe : client_select.c
	@echo "compiling client_select.exe..."
	gcc -g client_select.c -o client_select.exe

errHandle.o : errHandle.c myhead.h
	@echo "compiling errHanle module..."
	gcc -g -c errHandle.c -o errHandle.o

fclient.exe : fclient.c errHandle.o myhead.h fprotocol.h
	@echo "compiling fclient.exe..." 
	gcc -g fclient.c errHandle.o -o fclient.exe

fserver.exe : fserver.c myhead.h errHandle.o fprotocol.h
	@echo "compiling fserver.exe..."
	gcc -g -lpthread fserver.c errHandle.o -o fserver.exe
clear :
	@echo "clear up...\n"
	rm *.o *.exe






How to run?

a) chmod +x ./setup.script

b) ./setup.script

c) cd  rundir

d) ./run.script

A possible running result(please note that the left-most number '1' is NOT data but a row number):

[root@sec05 rundir]# ./run.script
now let's run server with default 10 threads
before running, let's see the result of rec 1 col 2 of personnel.db\n


Dataset has 350 rows and 5 columns
You have selected row 1 to row 1

Row col 0 col 1 col 2 col 3 col 4

1 10 2 21 82 52


now decrment 6 times for rec 1 col 2\n
OS need time to synchronize mapped memory contents with disk file\n
we must sleep 2 seconds before we can see the result\n


Dataset has 350 rows and 5 columns
You have selected row 1 to row 1

Row col 0 col 1 col 2 col 3 col 4

1 10 2 15 82 52


now increment 10 time for rec=1 col=2\n
We must sleep for 2 seconds before we can see result again/n


Dataset has 350 rows and 5 columns
You have selected row 1 to row 1

Row col 0 col 1 col 2 col 3 col 4

1 10 2 25 82 52

it now selects record 1 up to record 20 of 'personnel.db', the result is usually stored the 'maximum_pid.result' file\n
¡¡

The contents of "selected" result file:

request type=FILE_SELECT
10 2 25 82 52
request type=FILE_SELECT
52 62 50 7 6
request type=FILE_SELECT
32 62 49 73 14
request type=FILE_SELECT
60 5 67 58 57
request type=FILE_SELECT
53 87 82 88 21
request type=FILE_SELECT
8 23 48 41 9
request type=FILE_SELECT
97 52 12 10 86
request type=FILE_SELECT
16 15 0 18 22
request type=FILE_SELECT
6 50 36 8 75
request type=FILE_SELECT
50 68 33 69 26
request type=FILE_SELECT
42 74 65 77 63
request type=FILE_SELECT
87 37 86 87 79
request type=FILE_SELECT
48 36 83 12 99
request type=FILE_SELECT
69 28 14 21 98
request type=FILE_SELECT
88 79 0 76 87
request type=FILE_SELECT
27 79 7 12 0
request type=FILE_SELECT
33 55 75 99 32

¡¡







                                 back.gif (341 bytes)       up.gif (335 bytes)         next.gif (337 bytes)