File-Server (II)
A. Second Edition
This is comp444 lab4 and it is concentrated on thread synchronization, FIFO, mapped I/O, advisory file lock etc.
B.The problem
COMP 444 Winter term 2005 Programming Assignment 4 Due Date: Midnight, April 11, 2005. Objective: You will extend our simple file server from assignment three. Basically, we are going to add functionality to search and modify a collection of ¡°database¡± files. While the completed file server is indeed simple, it does demonstrate a few of the basic features of the real thing. To begin, we must construct the database that our file server will manage. In order to guarantee a common database format, I have written a couple of simple utilities that you can use to generate and view the database files. generate.c: You can compile this source file and use it to create a basic database file. The idea is to generate a file that consists of a collection of records, each made up of one or more fields (again, this is meant to model a simple database table). In order to make it easy to find specific records, we will store the table in a binary format. In other words, each record consists of a set of integer values. So, for example, if a record is made up of 8 fields, then every record will be 32 bytes in size (assuming 4 byte integers). If you have to find record 83, then it is quite simple to identify the location of the start of the record. Note that the first 8 bytes of the dataset will record the row and column count for the file (ie, first 4 bytes = # of rows, next 4 bytes = # of columns). The remaining bytes will be the records themselves. This way, you can read the first 8 bytes and know exactly how to process the remaining bytes. The generate program takes three arguments: rows, columns, and file name. The row count is limited to 10000 and the column count to 10. When values are created for each column, they are randomly chosen in the range 0-99. So, for example, ¡°generate.exe 1000 6 sales.db¡± would create a table called sales.db with 1000 rows and 6 columns (rows and columns start at 0). You will create a database by generating a group of files. Do NOT include these files with your submission (tables will be generated from scratch when the grading is done). You can name the files anything you want but, for convenience, each file should probably end with a common suffix like ¡°.db¡±. You will probably want to create 5 or 10 such files when testing. viewer.c: Because the datasets are in a binary format, it is not possible to simply display the records directly to the screen. It is important for testing (and grading) to actually see the records. So I have provided a little viewer program that can read the binary data and display a humanreadable ascii version to the screen. The viewer program takes three arguments: start_record, end_record, and file name. The start_record and end_record simply define the range of records you want to look at. So, for example, ¡°viewer.exe 10 15 sales.db¡± would display 6 records in sales.db, starting with record 10. Okay, so now we have a way to generate our database and a way to verify the contents. It¡¯s now time to integrate them with our file server. You are going to add two functions to the file server: select and update. The select option will allow a client application to retrieve a range of records from a given dataset (much like the viewer program).
The update option will allow your client to increment or decrement a given field by one. This is a little more complicated because incrementing and decrementing implies that you have to be very careful about concurrent access. This issue is directly related to one of the questions on the last theory assignment. Specifically, you will have to employ advisory locking to ensure that the concurrent updates do not corrupt the values in the record. So, for example, if the current value is 25 and you do 5 concurrent increments and 3 concurrent decrements, you must be able to check that the final result is 27 (you can use the viewer program for this). Note: after the update operation, a value may be greater than 99 or less than 0. This is OK. Your clients will now have to specify these new requests. In the previous assignment, you created a ¡°request record¡± with three fields: process ID, record operation, and file name. We will continue to use this record. We will add three new operations: FILE_SELECT = 4, FILE_INCREMENT = 5, and FILE_DECREMENT = 6. The other two fields in the request record will remain the same. Clearly, if either of these two options is selected, we require additional information. For the FILE_SELECT, we need to specify two additional fields: start_record and end_record. For the FILE_INCREMENT and FILE_DECREMENT, we will also specify two additional fields: record number and column number (we will update a specific field value in a specific record). Your server processing code will therefore check the file operation value in the original request. If it is between 4 and 6 (one of the new operations) then two additional integer values will follow the main request record. The addition of the two supporting values means that you will have to modify your queue structure slightly in order to accommodate the new values. You can do this any way you like. In terms of the actual processing there will be two main things to deal with. First, you will require locking to ensure that data is not corrupted. When write locks are in place, other processes should be denied access to the relevant regions in the file (although they are free to access nonlocked regions of the same file). You will have to use blocking lock calls to implement this feature with your threads. In other words, the update call should not fail if a lock is held. The thread should simply block until it has obtained the lock. The second thing that you must do when processing db requests is to use memory mapping instead of read/write calls for all I/O. Again, it will be easy to define the map ranges since the db files have such a regular structure. In any case, you will use maps to read the data for the FILE_SELECT requests, and you will use maps for the update operations. (In a real server, of course, we would have a caching framework that avoided a lot of redundant I/O). Concurrent maps are not inherently synchronized but with the locking that you are using, it should not be possible to map two identical regions at the same time. On the client side, we will also make things simple. Instead of creating a really complicated parsing routine for the client (for all of the possible command line parameters), we will simply create three new clients: client_select.exe, client_increment.exe, and client_decrement.exe. The select client will receive two arguments ¨C start and end of range. The increment and decrement clients will also receive two arguments ¨C record and column. Otherwise, they are exactly the same as the client from assignment 3 and will operate in the same way. For the select client, the selected records will be returned via the pipe. These records will be written to the local result.PID file. For the increment and decrement operations, the server will return two integers: the initial value and the value after the increment or decrement operation. Again, you will write both of these to result.PID. If you have handled you locking properly, you will be able to verify this by looking at the modified values (when concurrent updates have been performed). So that should be about it. Really, the assignment is not hard at all. It really just consists of updating your server to provide two new functions, select and update. You will use locking in order to properly manage concurrent access and you will use memory mapping for the I/O. If you run into any major problems, post them to the newsgroup. Good luck.
¡¡
Should lab4 be compatible with lab3? | 2005-04-03 | ||||
Professor, Since there is added features in lab4, should I my lab4 still workable with requests in lab3? What I mean is that should the marker also test those test casees in lab3 on my new lab4? Or can I assume I am doing a completely new lab4? Thank you.
|
Do we really need three kinds of client? | 2005-04-03 | ||||
Since it only slightly reduces the complexity
of client-end by defining client_select, client_decrement,
client_increment. (Actually the benefit seems to me is just save one byte
since we can just pass an additional "kinds" byte to differentiate the
three different operations.) But the server side is exactly same. What I
mean is that when server tries to write back, the worst case is to write
back the whole record instead of a couple of integer when operation type
is "select". And my design is to make the server side always write same structure, then there is little benefit to write three almost identical clients.
|
Unexplained problem of lock | 2005-04-04 | ||||||||
Professor, I observed an unconfirmed phenomenon. That is when I locked a range of file after I mapped the file into memory with option of "PROT_READ|PROT_WRITE" and it seems I cannot update the contents of file. i.e. whatever I update or memcpy to the mapped memory, there is nothing changed in the file. If I removed the writelock, updating is OK. Am I making some mistakes cause I was really starved to death after almost half day's struggle? Thank you.
|
¡¡C.The idea of program
1. file listing: a) source code file: myhead.h //all necessary included header file and some constants errHandle.c //an error handling module, very simple fprotocol.h //define all communication-related structures, enum, constants fclient.c //the source code for an uniformal client and all other three client "select,decrment,increment" just "exec" it fserver.c //the source code file for server client_select.c //the select client which is a just a wrapper of fclient.c client_increment.c //the increment client and is only a wrapper of real client fclient.c client_decrement.c //the decrement client and is only a wrapper of read client fclient.c generate.c //the professor supplied generater viewer.c //the professor supplied viewer b) executable file: fserver.exe //the server executable fclient.exe //the uniformed client executable client_select.exe //the select client executable client_increment.exe //the increment client executable client_decrement.exe //the decrement client executable generate.exe //the professor supplied generater viewer.exe //the professor supplied viewer c) makefile: makefile //it includes compiling of professor supplied "generate and viewer" d) script file: setup.script //a simple script file to setup running environment, only need run once at current directory, //create directory, create db files run.script //a test running script, should be run in directory 'rundir' 2. program logic: a) server-side: i) I made a big change in job queue from lab3 because I realized that I should include all life time of a thread into a critical section. Synchronization doesn't necessarily need to be sequencial. That is to say I only include the minimum necessary part into critical section such as getting a job, inserting a job, removing a job which absolutely needs to be synchronized. And for all other part of code like reading, updating, locking, unlocking file maybe blocked or take a long time, so they SHOULD NOT be included into critical section. Otherwise all job can only be executed in a manner of "first come first server" and we cannot even observe either "queue_full" phenomenon or "blocked-when-set-lock" phenomenon because when one thread will only finish his job before he gives up "mutex". ii) So, my job "queue" is not exactly a "queue". Instead it is simply an array of job and "listening thread will insert job when there is vacancy and all working threads will try to pick up a job when it is not taken by others. The sequence of array doesn't necesarily gurantee the execution sequence because some job maybe blocked and still those jobs inserted after blocked jobs can still proceed. iii) I dropped the idea of "two-condition-variables" which originally differentiate "listening thread" from all other "working threads" because it become two complicated after the queue is not really FIFO. Now all threads are bound with one conditional variable and the "broadcast" will wake up all threads even though job-queue is either full or empty. iv) Each thread will "map" a file to its own space. Therefore it is absolutely necessary for a call "msync" after each update to the file contents. Otherwise the consequent thread operation might be still on old, un-modified version of file. Of course a better solution is to book keep all files opened by "mmap" call and maintain a single copy of mapped memory pointer for each distinct file. However, I am a bit lazy to do this. v) The synchronization of each processing job is even simpler than lab3. First I designed an "enum" flag for job status(empty, issued, taken) and put them into a parallel array besides "job" structure array. By checking "job-status" array, we know the status of each position of "job structure array". Inserting, taking, eliminating a job structure is a critical operation and is protected by a "mutex". This would happened at the beginning and the ending of a procedure of thread: taking a job, emptying a job. In between, the thread maybe blocked due to file locks and it won't affect other threads' getting jobs. client-side: i) The only thing worth mentioning is the output structure. I designed a universal output structure for all types of file operation. That is, a structure with an integer array of ten. Because the maximum column number of a record is 10 and all other requests only need up to two integer to be returned. This is fine for all other request type except "select" which may require more than one record to be returned. Therefore I asked the server to handle "select" by sending records one by one. And "select" client surely knows how many records it expects, then loop for the number of records before writing results to result file. 3. program running: i) You should run the "setup.script" once and it will generate a running directory "rundir" and generate all necessary database files. ii) Under "rundir", run "run.script" which is a test script and shows basic. iii) Similarly like lab3, you can setup number of threads in server by passing it in commandline argument. If not, the default is 10 threads. iv) The running parameter format is like this: client_select.exe file_name start_record end_record client_increment.exe file_name row_number col_number client_decrement.exe file_name row_number col_number fclient.exe file_name request_type [param1] [param2] //for lab3 compatible. Please note that in order to test lab3, you need to run "fclient.exe" and it requires two parameters, "file_name" and "request_type".
¡¡
D.The major functions
The major lesson I learned is from one of my classmates. I claimed in class that within one process, every "open" call will just
be like a "dup" call if you are trying to open the same file within different threads. This implies there is synchronization
problem with that file for both "read/write". And he repeatedly said that it wouldn't make sense. And finally I wrote a simple
test to try to convince him. It turned out that I am wrong. Every "open" call will generate an independent "file table structure".
I felt quite embarrassed.
Another small issue is that you need to specify "PROT_READ" if you want to read from the "mapped I/O" by call of "mmap". Even it
seems to you very natural that it already is mapped to your "logical memory address". Still you need right to "read". So does write.
And the one you might neglect is that if each thread try to map same file to its own copy, you need to "msync" for each update.
Otherwise, you would be doomed.
E.Further improvement
¡¡
F.File listing
1. myhead.h
2. errHandle.c
3. fprotocol.h
4. fclient.c
5. fserver.c
6. client_select.c
7. client_decrement.c
8. client_increment.c
9. generate.c (supplied by professor)
10. viewer.c (supplied by professor)
11. makefile
12. setup.script
13. run.script
¡¡
file name: myhead.h
#ifndef MYHEAD_H #define MYHEAD_H #include <sys/resource.h> #include <sys/wait.h> #include <sys/time.h> #include <sys/types.h> #include <sys/stat.h> #include <stdio.h> #include <stdlib.h> #include <errno.h> #include <dirent.h> #include <unistd.h> #include <string.h> #include <fcntl.h> #include <signal.h> #include <sys/mman.h> extern int errno; //define three mode and it is purely like // DFA=Deterministic Finite Automaton #define FindNext 0 #define Comparing 1 typedef int bool; #define TRUE 1 #define FALSE 0 //for a particular file NAME, NOT path //const int MaxNameLength=20; #define MaxPathLength 128 #define MaxNameLength 20 #define MaxIntLength 20 #define BuffSize 128 #define EmptyGridNode 'E' //the empty tic-tac-toe grid #define PlayerTypeLength 2 #define PlayerPidLength (MaxIntLength+1) #define BoardLength 9 //the tic-tac-toe length #define PlayerX_Pos PlayerTypeLength #define PlayerO_Pos (PlayerTypeLength*2+PlayerPidLength) #define BoardPos ((PlayerTypeLength+PlayerPidLength)*2+1) #define WinnerPos (BoardPos+BoardLength+1) #define WinnerTypeLength 1 //simply the X or O #define SleepSeconds 1 #define FIFOMode S_IRUSR|S_IWUSR|S_IXUSR|S_IRGRP|S_IWGRP|S_IXGRP void errHandle(char* msg); #endif
file name: errHandle.c
#include "myhead.h" extern int errno; void errHandle(char* msg) { if (msg!=NULL) { perror(msg); } else { strerror(errno); } exit(errno); }
file name: fprotocol.h
#ifndef FPROTOCOL_H #define FPROTOCOL_H #include <sys/types.h> typedef enum { FILE_TYPE = 1, FILE_INODE = 2, FILE_SIZE = 3, FILE_SELECT = 4, FILE_INCREMENT = 5, FILE_DECREMENT = 6, FILE_NOT_FOUND = -1, UNKNOWN_ERROR = -2, QUEUE_FULL = -3 }RequestType; typedef enum { JobEmpty =0, JobIssued =1, JobTaken =2 }JobStatus; typedef enum { Regular = 1, Directory = 2, SoftLink = 3, Character = 4, Block = 5, FIFO = 6, Socket = 7 }FileType; #define MaxColumnNumber 10 typedef struct { pid_t pid; RequestType fileOp; int resultArray[MaxColumnNumber]; int resultNumber; }ResponseRec; #define MaxRequestFileNameLength 64 typedef struct { pid_t pid; RequestType fileOp; int param1; int param2; char fileName[MaxRequestFileNameLength]; }Job_t; #define RequestRecordLength sizeof(Job_t) char* serverFifoName="fserver.fifo"; #define DefaultThreadNumber 10 #endif
file name: fclient.c
#include "myhead.h" #include "fprotocol.h" extern char* serverFifoName; extern char* requestTypeStr; char* getRequestName(RequestType theType); void writeResult(ResponseRec* responseRec); int main(int argc, char* argv[]) { int fdwrite, fdread, count=0; ResponseRec result; char pidBuf[MaxIntLength+7]; Job_t requestRec; char readBuf[MaxIntLength]; if (argc!=5&&argc!=3) { errHandle("usage:fclient.exe filename requestType [param1] [param2]\n"); } requestRec.pid=getpid(); requestRec.fileOp=atoi(argv[2]); strcpy(requestRec.fileName, argv[1]); if (argc==5) { requestRec.param1=atoi(argv[3]); requestRec.param2=atoi(argv[4]); if (requestRec.fileOp==FILE_SELECT) { if (requestRec.param1>requestRec.param2) { errHandle("in select option start record number is smaller than end record\n"); } } } //else ignored sprintf(pidBuf, "%d.fifo", getpid()); if (mkfifo(pidBuf, FIFOMode)<0) { errHandle("create fifo at client fails"); } if ((fdwrite=open(serverFifoName, O_RDWR))<0) { errHandle("open fifo error"); } if (write(fdwrite, &requestRec, sizeof(Job_t))!=sizeof(Job_t)) { errHandle("write error for fifo"); } if ((fdread=open(pidBuf, O_RDWR))<0) { errHandle("open fifo for read error"); } while (1) { if (read(fdread, &result, sizeof(ResponseRec))!=sizeof(ResponseRec)) { errHandle("error of read fifo"); } else { //printf("client %d read something\n", getpid()); if (result.fileOp!=QUEUE_FULL)//if it is, then need to be patient { if (requestRec.fileOp==FILE_SELECT) { count++; writeResult(&result); if (count>requestRec.param2-requestRec.param1) { break; } else { continue; } } writeResult(&result); break; } else { printf("received queue_full msg\n"); } } } //read successfuland close fifo if (close(fdread)<0) { errHandle("close fifo error"); } return 0; } void writeResult(ResponseRec* result) { char pidBuf[MaxIntLength+7]; char buf[128]; int fdwrite, i; sprintf(pidBuf, "%d.result", getpid()); if ((fdwrite=open(pidBuf, O_WRONLY|O_CREAT|O_APPEND, FIFOMode))<0) { errHandle("create result file error"); } //in case error happens //printf("fclient is writing\n"); sprintf(buf, "\nrequest type=%s\n", getRequestName(result->fileOp)); if (result->fileOp>0) { for (i=0; i<result->resultNumber; i++) { sprintf(pidBuf, "%d ", result->resultArray[i]); strcat(buf, pidBuf); } } if (write(fdwrite, buf, strlen(buf))!=strlen(buf)) { errHandle("write result error"); } if (close(fdwrite)<0) { errHandle("close error"); } } char* getRequestName(RequestType theType) { switch(theType) { case FILE_TYPE: return "FILE_TYPE"; case FILE_INODE: return "FILE_INODE"; case FILE_SIZE: return "FILE_SIZE"; case FILE_SELECT: return "FILE_SELECT"; case FILE_INCREMENT: return "FILE_INCREMENT"; case FILE_DECREMENT: return "FILE_DECREMENT"; case FILE_NOT_FOUND: return "FILE_NOT_FOUND"; case UNKNOWN_ERROR: return "UNKNOWN_ERROR"; case QUEUE_FULL: return "QUEUE_FULL"; } }
¡¡
file name: fserver.c
#include "myhead.h" #include "fprotocol.h" #include <pthread.h> Job_t* jobs; JobStatus* jobStatus;//array of empty status of job extern char* serverFifoName; int fdServer; //int jobCount=0; //int workerCount=0; //int currentJob=0; int threadNumber; Job_t requestRec; //ResponseRec responseRec; //int fifoMode=S_IRUSR|S_IWUSR|S_IXUSR|S_IRGRP|S_IWGRP|S_IXGRP; pthread_t* threads; //char* serverFifoName="fserver.fifo"; //DynaArrayPtr arrayPtr; void enterCritical(); void exitCritical(); pthread_cond_t jobCond=PTHREAD_COND_INITIALIZER; pthread_cond_t controlCond=PTHREAD_COND_INITIALIZER; pthread_mutex_t mutex=PTHREAD_MUTEX_INITIALIZER; void createQueue(); void initialize(int argc, char* argv[]); void uninitialize(); void createThreads(); void createFifo(char* fifoName); void writeFifo(ResponseRec* responseRec); //the child do job void doJob(int currentJob); //the control thread is main thread void doControl(int currentJob); void broadcast(pthread_cond_t* cond); int getJob(JobStatus jobStatus); //the thread function void* jobFunc(void* arg); void controlFunc(); int openDB(char* fileName); //should return MAPPED POINTER int main(int argc, char* argv[]) { initialize(argc, argv); controlFunc(); return 0; } int openDB(char* fileName) { int fd; //in order to map file, even readonly need to be read&write if ((fd=open(fileName, O_RDWR))<0) { errHandle("open db error"); } return fd; } //control always broadcast job_cond void controlFunc() { int n, currentJob; ResponseRec responseRec; while (1) { //reading here are BLOCKING if ((n=read(fdServer, &requestRec, sizeof(Job_t)))!=sizeof(Job_t)) { if (n==0) { //sleep(SleepSeconds); } else { //printf("\ncontrol thread read %d\n", n); errHandle("read pipe in control thread error"); } } //printf("control read pid=%d\n", requestRec.pid); //lock the mutex enterCritical(); //if the queue is full while ((currentJob=getJob(JobEmpty))==-1) { //printf("job queue is full now\n"); //sprintf(queueFullBuf, "%d", QUEUE_FULL); //buf always has processID at beginning and terminated with NULL //sprintf(pidBuf,"%d.fifo",requestRec.pid); //strcat(pidBuf, ".fifo"); responseRec.pid=requestRec.pid; responseRec.fileOp=QUEUE_FULL; //printf("prepare to write to fifo of pid=%d\n", responseRec.pid); writeFifo(&responseRec); //printf("write job queue full msg\n"); broadcast(&jobCond); if (pthread_cond_wait(&controlCond, &mutex)) { errHandle("condition wait error"); } } //currentJob=getJob(JobEmpty); doControl(currentJob); broadcast(&jobCond); //printf("insert job %d\n", currentJob); exitCritical(); } } void broadcast(pthread_cond_t* cond) { if (pthread_cond_broadcast(cond)) { errHandle("condition broadcast error\n"); } /* //always favour control thread if (pthread_cond_broadcast(&controlCond)) { errHandle("condition broadcast error\n"); } //printf("jobcontrol broadcasted\n"); //to make job count become 80% of capacity //if (jobCount>8*threadNumber/10) { if (pthread_cond_broadcast(&jobCond)) { errHandle("condition broadcast error\n"); } } */ } void doJob(int currentJob) { //char pidBuf[MaxIntLength];//20 //char resultBuf[MaxIntLength]; ResponseRec responseRec; caddr_t ptr; struct flock lock; struct stat statBuf; //workerCount++; int fd, row, col, i, j,len, offset; //by default it is the same responseRec.pid=jobs[currentJob].pid; responseRec.fileOp=jobs[currentJob].fileOp; //printf("inside job\n"); if (stat(jobs[currentJob].fileName, &statBuf)<0) { if (errno==ENOENT) { //however heere is different responseRec.fileOp=FILE_NOT_FOUND; } else { errHandle("stat error"); } } //printf("what is this?\n"); switch(jobs[currentJob].fileOp) { case FILE_TYPE: if (S_ISREG(statBuf.st_mode)) { responseRec.resultArray[0]=Regular; } if (S_ISDIR(statBuf.st_mode)) { responseRec.resultArray[0]=Directory; } if (S_ISLNK(statBuf.st_mode)) { responseRec.resultArray[0]=SoftLink; } if (S_ISCHR(statBuf.st_mode)) { responseRec.resultArray[0]=Character; } if (S_ISBLK(statBuf.st_mode)) { responseRec.resultArray[0]=Block; } if (S_ISFIFO(statBuf.st_mode)) { responseRec.resultArray[0]=FIFO; } if (S_ISSOCK(statBuf.st_mode)) { responseRec.resultArray[0]=Socket; } responseRec.resultNumber=1; break; case FILE_INODE: responseRec.resultArray[0]=statBuf.st_ino; responseRec.resultNumber=1; break; case FILE_SIZE: responseRec.resultArray[0]=statBuf.st_size; responseRec.resultNumber=1; break; case FILE_SELECT: //printf("here?\n"); fd=openDB(jobs[currentJob].fileName); if (read(fd, &row, sizeof(int))!=sizeof(int)||read(fd, &col, sizeof(int))!=sizeof(int)) { errHandle("read error of row and col"); } offset=8+jobs[currentJob].param1*col*sizeof(int); len=col*(jobs[currentJob].param2-jobs[currentJob].param1)*sizeof(int); //printf("param1=%d, param2=%d, offset=%d, len=%d\n", jobs[currentJob].param1, jobs[currentJob].param2, offset, len); //printf("before map\n"); //the place to lock is lock.l_start=offset; lock.l_type=F_RDLCK; lock.l_whence=SEEK_SET; lock.l_len=len; //printf("here is right?"); if (fcntl(fd, F_SETLKW, &lock)<0) { errHandle("fcntl set lock error"); } //sleep(1); //here the offset+len is all I want to map if ((ptr=mmap(0, offset+len, PROT_READ, MAP_SHARED, fd, 0))==(caddr_t)(-1)) { errHandle("mmap error"); } //printf("map succeed in select, ptr=%u\n", ptr); responseRec.resultNumber=col; for (i=0; i<=jobs[currentJob].param2-jobs[currentJob].param1; i++) { memcpy(responseRec.resultArray, ptr+offset+i*col*4, col*4); /* for (j=0; j<col; j++) { printf("array[%d]=%d\n", j, responseRec.resultArray[j]); } */ writeFifo(&responseRec);//every time it writes and closes fd } if (munmap(ptr, offset+len)<0) { errHandle("unmap error"); } lock.l_type=F_UNLCK; if (fcntl(fd, F_SETLK, &lock)<0) { errHandle("unlock error"); } return;//no need to writefifo case FILE_INCREMENT: case FILE_DECREMENT: fd=openDB(jobs[currentJob].fileName); if (read(fd, &row, sizeof(int))!=sizeof(int)||read(fd, &col, sizeof(int))!=sizeof(int)) { errHandle("read error of row and col"); } if (row<jobs[currentJob].param1||col<jobs[currentJob].param2) { errHandle("invalid record or column number"); } offset=8+jobs[currentJob].param1*col*4+jobs[currentJob].param2*4; //printf("param1=%d, param2=%d, offset=%d\n", jobs[currentJob].param1, jobs[currentJob].param2, offset); lock.l_start=offset; lock.l_type=F_WRLCK; lock.l_whence=SEEK_SET; lock.l_len=sizeof(int); //printf("here is right?"); if (fcntl(fd, F_SETLKW, &lock)<0) { errHandle("fcntl set lock error"); } //sleep(1); if ((ptr=mmap(0, offset+sizeof(int), PROT_WRITE|PROT_READ, MAP_SHARED, fd, 0))==(caddr_t)(-1)) { errHandle("mmap error"); } responseRec.resultArray[0]=*((int*)(ptr+offset)); if (jobs[currentJob].fileOp==FILE_INCREMENT) { responseRec.resultArray[1]=responseRec.resultArray[0]+1; } else { responseRec.resultArray[1]=responseRec.resultArray[0]-1; } //printf("memcpy %d\n", *(responseRec.resultArray+1)); memcpy(ptr+offset,responseRec.resultArray+1, sizeof(int)); //(*((int*)(ptr+offset)))=responseRec.result; //responseRec.resultArray[1]=*((int*)(ptr+offset)); responseRec.resultNumber=2; if (msync(ptr, offset, MS_SYNC)<0) { errHandle("msync error"); } //printf("originally it is: %d now the record becomes %u\n", responseRec.resultArray[0], *((int*)(ptr+offset))); if (munmap(ptr, sizeof(int))<0) { errHandle("unmap error"); } lock.l_type=F_UNLCK; if (fcntl(fd, F_SETLK, &lock)<0) { errHandle("unlock error"); } //printf("unlock successful\n"); //printf("what I write is:result[0]=%d, [1]=%d\n", responseRec.resultArray[0], responseRec.resultArray[1]); //close(fd); break; default: responseRec.fileOp=UNKNOWN_ERROR; break; } //printf("\nhere child read job index=%d", currentJob); //printf("\npid=%d, fileop=%d, filename=%s\n", jobs[currentJob].pid, jobs[currentJob].fileOp, jobs[currentJob].fileName); //sprintf(pidBuf, "%d.fifo", jobs[currentJob].pid); //sprintf(resultBuf, "%d", result); //it is client's responsibility to create fifo writeFifo(&responseRec); } void writeFifo(ResponseRec* responseRec) { int fd; //char intBuf[MaxIntLength]; char fifoName[MaxIntLength+5]; sprintf(fifoName, "%d.fifo", responseRec->pid); if ((fd=open(fifoName, O_WRONLY))<0) { //printf("\nthe fifonoame is %s\n", fifoName); //write(STDOUT_FILENO, fifoName, strlen(fifoName)); errHandle("open fifo for write error"); } //sprintf(intBuf, "%d", responseRec.result); if (write(fd, responseRec, sizeof(ResponseRec))!=sizeof(ResponseRec)) { errHandle("write fifo error"); } //close(fd); } //assume processID is only 5 digits terminated with NULL //assume fileoperation is only 1digit terminated with NULL void doControl(int currentJob) { jobs[currentJob]=requestRec;//should be struct copy?? //strcpy(jobs[index].fileName, buf+ProcessIDLength+FileOperationLength+1); //jobCount++; //printf("change jobstatus [%d]=%d\n", currentJob, JobIssued); jobStatus[currentJob]=JobIssued; //printf("get a job with pid=%d, fileop=%d, filename=%s\n", pid, requestType, jobs[index].fileName); } void* jobFunc(void* arg) { int currentJob; while (1) { enterCritical(); //waiting for condition variable while ((currentJob=getJob(JobIssued))==-1) { broadcast(&controlCond); if (pthread_cond_wait(&jobCond, &mutex)) { errHandle("condition wait error\n"); } } //getJob must retrieve job of same status issued by parameter in job status array //printf("begin search job\n"); //currentJob=getJob(JobIssued); jobStatus[currentJob]=JobTaken; //jobCount--;//not here!!! //workerCount++; //here is end of critical section exitCritical(); //do job must be thread safe //printf("begin one job\n"); doJob(currentJob); //printf("finished one job\n"); //workerCount--; enterCritical(); //jobCount--; jobStatus[currentJob]=JobEmpty; //make it empty //workerCount--; exitCritical(); broadcast(&controlCond); } } void initialize(int argc, char* argv[]) { if (argc==1) { threadNumber=DefaultThreadNumber; } else { if (argc==2) { threadNumber=atoi(argv[1]); if (threadNumber<=0) { errHandle("usage: fserver.exe [threadnumber]\n"); } } else { errHandle("usage: fserver.exe [threadnumber]\n"); } } createFifo(serverFifoName); createQueue(); //arrayPtr=createArray();//create the dynamic array for integer; if ((fdServer=open(serverFifoName, O_RDWR))<0) { errHandle("open server fifo error"); } createThreads(); } void uninitialize() { free(jobs); free(jobStatus); } void createQueue() { if ((jobs=(Job_t*)malloc(threadNumber*sizeof(Job_t)))==NULL) { errHandle("create job queue error\n"); } //all job status is '0'=empty if ((jobStatus=(JobStatus*)calloc(threadNumber, sizeof(JobStatus)))==NULL) { errHandle("create job status array error"); } //jobCount=workerCount=0;//initialize to 0; } void createFifo(char* fifoName) { if (mkfifo(fifoName, FIFOMode)<0) { if (errno!=EEXIST) { errHandle("create fifo error\n"); } } } void createThreads() { int i; if ((threads=(pthread_t*)malloc(threadNumber*sizeof(pthread_t)))==NULL) { errHandle("cannot malloc pthreads array\n"); } for (i=0; i<threadNumber; i++) { //each thread is given an index "i" if (pthread_create(&threads[i], NULL, jobFunc,(void*)i)) { errHandle("create threads error\n"); } } } void enterCritical() { if (pthread_mutex_lock(&mutex)) { errHandle("mutex lock error"); } } void exitCritical() { if (pthread_mutex_unlock(&mutex)) { errHandle("mutex unlock error"); } } //return the first index of status same as param "status" int getJob(JobStatus status) { int i; for (i=0; i<threadNumber; i++) { //printf("jobStatus[%d]=%d\n", i, jobStatus[i]); if (jobStatus[i]==status) { return i; } } //errHandle("cannot find designated job"); return -1; }
file name: client_select.c
#include "myhead.h" #include "fprotocol.h" int main(int argc, char* argv[]) { char buf[2]; if (argc!=4) { printf("usage: client_select.exe filename startrec endrec\n"); exit(0); } buf[0]='0'+FILE_SELECT; buf[1]='\0'; execl("./fclient.exe", "fclient.exe", argv[1], buf, argv[2], argv[3], NULL); return 0; }
¡¡
file name: client_increment.c
#include "myhead.h" #include "fprotocol.h" int main(int argc, char* argv[]) { char buf[2]; if (argc!=4) { printf("usage: client_select.exe filename startrec endrec\n"); exit(0); } buf[0]='0'+FILE_INCREMENT; buf[1]='\0'; execl("./fclient.exe", "fclient.exe", argv[1], buf, argv[2], argv[3], NULL); return 0; }
¡¡
file name: client_decrement.c
#include "myhead.h" #include "fprotocol.h" int main(int argc, char* argv[]) { char buf[2]; if (argc!=4) { printf("usage: client_select.exe filename startrec endrec\n"); exit(0); } buf[0]='0'+FILE_DECREMENT; buf[1]='\0'; execl("./fclient.exe", "fclient.exe", argv[1], buf, argv[2], argv[3], NULL); return 0; }
¡¡
file name: generate.c (supplied by professor)
#include <stdio.h> #include <stdlib.h> int main(int argc, char *argv[]){ FILE *fp; int rows, columns, i, j; int *record; size_t objects_written; /* process the command line arguments */ if(argc != 4){ fprintf(stderr, "Usage: %s rows columns filename\n", argv[0]); exit(1); } rows = atoi(argv[1]); columns = atoi(argv[2]); if( (fp = fopen(argv[3], "w")) == NULL){ perror("File open error"); exit(1); } /* do some error checking */ if( (rows < 1) || (rows > 10000)){ fprintf(stderr, "Invalid row count: %d\n", rows); exit(1); } if( (columns < 1) || (columns > 10)){ fprintf(stderr, "Invalid column count: %d\n", columns); exit(1); } /* write the row count and column count to the file */ if( (objects_written = fwrite(&rows, sizeof(int), 1, fp)) != 1){ fprintf(stderr, "Error writing row count\n"); exit(1); } if( (objects_written = fwrite(&columns, sizeof(int), 1, fp)) != 1){ fprintf(stderr, "Error writing column count \n"); exit(1); } /* create a buffer for each record */ if( (record = (int *)malloc(rows * sizeof(int))) == NULL){ fprintf(stderr, "Malloc error\n"); exit(1); } srand(time(0)); /* seed the random number generator */ /* create the records and write them to file */ for(i = 0; i < rows; i++){ for(j = 0; j < columns; j++) record[j] = rand() % 100; if( (objects_written = fwrite(record, sizeof(int), columns, fp)) != columns){ fprintf(stderr, "Error writing row %d \n", i); exit(1); } } fclose(fp); printf("\nWrote %d records (%d columns) to %s\n\n", rows, columns, argv[3]); }
¡¡
¡¡
file name: viewer.c
#include <stdio.h> #include <stdlib.h> int main(int argc, char *argv[]){ FILE *fp; int start_row, end_row, rows, columns, i, j; int *record; size_t objects_read; char header[7]; /* process the command line argumens */ if(argc != 4){ fprintf(stderr, "Usage: %s startrow endrow filename\n", argv[0]); exit(1); } start_row = atoi(argv[1]); end_row = atoi(argv[2]); if( (fp = fopen(argv[3], "r")) == NULL){ perror("File open error"); exit(1); } /* read the row count and column count from the file */ if( (objects_read = fread(&rows, sizeof(int), 1, fp)) != 1){ fprintf(stderr, "Error reading row count\n"); exit(1); } if( (objects_read = fread(&columns, sizeof(int), 1, fp)) != 1){ fprintf(stderr, "Error reading column count \n"); exit(1); } /* a little error checking on row entries */ if( (start_row < 0) || (start_row > rows)){ fprintf(stderr, "Invalid start_row value: %d\n", start_row); exit(1); } if( (end_row < 0) || (end_row > rows)){ fprintf(stderr, "Invalid end_row value: %d\n", end_row); exit(1); } if(start_row > end_row){ fprintf(stderr, "Start row cannot be greater than end row\n"); exit(1); } /* find the specified records */ if( (fseek(fp, start_row * (columns * sizeof(int)), SEEK_CUR)) < 0){ perror("Seek error"); exit(1); } /* create a buffer for each record */ if( (record = (int *)malloc(rows * sizeof(int))) == NULL){ fprintf(stderr, "Malloc error\n"); exit(1); } /* print the header line */ printf("\n\nDataset has %d rows and %d columns\n", rows, columns); printf("You have selected row %d to row %d\n", start_row, end_row); printf("\nRow\t\t"); for(j = 0; j < columns; j++){ sprintf(header, "col %d", j); printf("%s\t", header); } printf("\n\n"); /* display the specified records to the screen */ for(i = 0; i < (end_row - start_row + 1); i++){ if( (objects_read = fread(record, sizeof(int), columns, fp)) != columns){ fprintf(stderr, "Error writing row %d \n", start_row + i); exit(1); } printf("%d\t\t", start_row + i); for(j = 0; j < columns; j++) printf("%d\t", record[j]); printf("\n"); } printf("\n\n"); fclose(fp); }
file name: setup.script
echo "this is a simple set up script and only need to be run once" echo "create a 'rundir' directory for testing" mkdir rundir echo "now let's generate some database file for testing purpose after we run make" make cp ./run.script ./rundir/run.script cd rundir chmod +x run.script echo "generate sales.db with 1000 record and 6 column" ./generate.exe 1000 6 sales.db echo "generate salary.db with 500 record and 8 column" ./generate.exe 500 8 salary.db echo "generate personnel.db with 350 record and 5 column" ./generate.exe 350 5 personnel.db echo "generate credit.db with 100 record and 10 column" ./generate.exe 100 10 credit.db echo "now you can run the test script 'run.script' for a testing"
file name: run.script
echo "now let's run server with default 10 threads" ./fserver.exe 10 & echo "before running, let's see the result of rec 1 col 2 of personnel.db\n" ./viewer.exe 1 1 personnel.db sleep 2 echo "now decrment 6 times for rec 1 col 2\n" ./client_decrement.exe personnel.db 1 2 & ./client_decrement.exe personnel.db 1 2 & ./client_decrement.exe personnel.db 1 2 & ./client_decrement.exe personnel.db 1 2 & ./client_decrement.exe personnel.db 1 2 & ./client_decrement.exe personnel.db 1 2 & echo "OS need time to synchronize mapped memory contents with disk file\n" echo "we must sleep 2 seconds before we can see the result\n" sleep 2 ./viewer.exe 1 1 personnel.db echo "now increment 10 time for rec=1 col=2\n" ./client_increment.exe personnel.db 1 2 & ./client_increment.exe personnel.db 1 2 & ./client_increment.exe personnel.db 1 2 & ./client_increment.exe personnel.db 1 2 & ./client_increment.exe personnel.db 1 2 & ./client_increment.exe personnel.db 1 2 & ./client_increment.exe personnel.db 1 2 & ./client_increment.exe personnel.db 1 2 & ./client_increment.exe personnel.db 1 2 & ./client_increment.exe personnel.db 1 2 & echo "We must sleep for 2 seconds before we can see result again/n" sleep 2 ./viewer.exe 1 1 personnel.db echo "it now selects record 1 up to record 20 of 'personnel.db', the result is usually stored the 'maximum_pid.result' file\n" ./client_select.exe personnel.db 1 20 &
file name: makefile
all: fserver.exe fclient.exe client_select.exe client_increment.exe client_decrement.exe generate.exe viewer.exe @echo "make complete for fserver.exe, fclient.exe, " @echo "client_select.exe, cliclient_increment.exe ent_decrement.exe," @echo "generate.exe, viewer.exe" cp *.exe *.script ./rundir/ rm -f ./rundir/*.fifo rm -f ./rundir/*.result generate.exe : generate.c @echo "compiling generate.exe..." gcc generate.c -o generate.exe viewer.exe : viewer.c @echo "compiling viewer.exe..." gcc viewer.c -o viewer.exe client_increment.exe : client_increment.c @echo "compiling client_increment.exe..." gcc -g client_increment.c -o client_increment.exe client_decrement.exe : client_decrement.c @echo "compiling client_decrement.exe..." gcc -g client_decrement.c -o client_decrement.exe client_select.exe : client_select.c @echo "compiling client_select.exe..." gcc -g client_select.c -o client_select.exe errHandle.o : errHandle.c myhead.h @echo "compiling errHanle module..." gcc -g -c errHandle.c -o errHandle.o fclient.exe : fclient.c errHandle.o myhead.h fprotocol.h @echo "compiling fclient.exe..." gcc -g fclient.c errHandle.o -o fclient.exe fserver.exe : fserver.c myhead.h errHandle.o fprotocol.h @echo "compiling fserver.exe..." gcc -g -lpthread fserver.c errHandle.o -o fserver.exe clear : @echo "clear up...\n" rm *.o *.exe
How to run?
a) chmod +x ./setup.script
b) ./setup.script
c) cd rundir
d) ./run.script
A possible running result(please note that the left-most number '1' is NOT data but a row number):
[root@sec05 rundir]# ./run.script
now let's run server with default 10 threads
before running, let's see the result of rec 1 col 2 of personnel.db\n
Dataset has 350 rows and 5 columns
You have selected row 1 to row 1
Row col 0 col 1 col 2 col 3 col 4
1 10 2 21 82 52
now decrment 6 times for rec 1 col 2\n
OS need time to synchronize mapped memory contents with disk file\n
we must sleep 2 seconds before we can see the result\n
Dataset has 350 rows and 5 columns
You have selected row 1 to row 1
Row col 0 col 1 col 2 col 3 col 4
1 10 2 15 82 52
now increment 10 time for rec=1 col=2\n
We must sleep for 2 seconds before we can see result again/n
Dataset has 350 rows and 5 columns
You have selected row 1 to row 1
Row col 0 col 1 col 2 col 3 col 4
1 10 2 25 82 52
it now selects record 1 up to record 20 of 'personnel.db', the result is usually
stored the 'maximum_pid.result' file\n
¡¡
The contents of "selected" result file:
request type=FILE_SELECT
10 2 25 82 52
request type=FILE_SELECT
52 62 50 7 6
request type=FILE_SELECT
32 62 49 73 14
request type=FILE_SELECT
60 5 67 58 57
request type=FILE_SELECT
53 87 82 88 21
request type=FILE_SELECT
8 23 48 41 9
request type=FILE_SELECT
97 52 12 10 86
request type=FILE_SELECT
16 15 0 18 22
request type=FILE_SELECT
6 50 36 8 75
request type=FILE_SELECT
50 68 33 69 26
request type=FILE_SELECT
42 74 65 77 63
request type=FILE_SELECT
87 37 86 87 79
request type=FILE_SELECT
48 36 83 12 99
request type=FILE_SELECT
69 28 14 21 98
request type=FILE_SELECT
88 79 0 76 87
request type=FILE_SELECT
27 79 7 12 0
request type=FILE_SELECT
33 55 75 99 32
¡¡