Implementing Thread API for Shared Communication Buffers 

Multithreading and condition variables in C assignment help

#include #include #include #define BUFFER_SIZE 100 /* size of input buffer */ char inputBuffer[BUFFER_SIZE]; /* input buffer */ pthread_mutex_t inputMutex; /* mutex to access buffer */ pthread_cond_t inputProduce; /* condition to access buffer */ pthread_cond_t inputConsume; /* condition to access buffer */ int buffStart = 0; /* variables to keep track of reads and writes to the buffer */ int buffEnd = 0; int buffLen = 0; char replacementBuffer[BUFFER_SIZE]; /* replacement buffer */ pthread_mutex_t replacementMutex; /* mutex to access buffer */ pthread_cond_t replacementProduce; /* condition to access buffer */ pthread_cond_t replacementConsume; /* condition to access buffer */ int replStart = 0; /* variables to keep track of reads and writes to the buffer */ int replEnd = 0; int replLen = 0; char outputBuffer[BUFFER_SIZE]; /* output buffer */ pthread_mutex_t outputMutex; /* mutex to access buffer */ pthread_cond_t outputProduce; /* condition to access buffer */ pthread_cond_t outputConsume; /* condition to access buffer */ int outputStart = 0; /* variables to keep track of reads and writes to the buffer */ int outputEnd = 0; int outputLen = 0; void *inputThread(void *arg) { int c; char *stop = "STOP\n"; char buffer[5]; int i, n = 0, start = 1; int terminate = 0; while(!terminate) { read(STDIN_FILENO, &c, 1); buffer[n++] = c; /* save in initial buffer */ if (c == '\n') /* if detected end of line */ { start = 1; } else if (start) /* we are at start of new line */ { start = 0; if (c == 'S') /* could be stop */ continue; /* accumulate next chars */ } if (n > 1 && c == stop[n - 1]) /* we are still inside a stop */ { if (n == 5) /* if we matched the whole stop */ { n = 1; /* send a single char */ buffer[0] = 0; /* send end of input */ terminate = 1; /* terminate after sending end of input */ } else continue; /* else, keep adding chars */ } for (i = 0; i < n; i++) /* send all accumulated chars */ { pthread_mutex_lock(&inputMutex); /* wait until there's space to save a character*/ while (buffLen >= BUFFER_SIZE) pthread_cond_wait(&inputProduce, &inputMutex); inputBuffer[buffEnd++] = buffer[i]; /* save read char in buffer*/ buffEnd %= BUFFER_SIZE; buffLen++; pthread_cond_signal(&inputConsume); /* wake consumer */ /* release the lock */ pthread_mutex_unlock(&inputMutex); } n = 0; /* reset buffer */ } pthread_exit(NULL); /* terminate thread */ } void *lineSeparatorThread(void *arg) { int c; int terminate = 0; while(!terminate) { /* consumer part*/ pthread_mutex_lock(&inputMutex); /* acquire lock */ /* wait until there's a character*/ while (buffLen == 0) pthread_cond_wait(&inputConsume, &inputMutex); c = inputBuffer[buffStart++]; /* read char from input buffer*/ buffStart %= BUFFER_SIZE; buffLen--; pthread_cond_signal(&inputProduce); /* wake producer */ /* release the lock */ pthread_mutex_unlock(&inputMutex); if (c == '\n') /* make the replacement */ c = ' '; else if (c == 0) /* end of input*/ terminate = 1; /* terminate after sending end of input */ /* producer part */ pthread_mutex_lock(&replacementMutex); /* wait until there's space to save a character*/ while (replLen >= BUFFER_SIZE) pthread_cond_wait(&replacementProduce, &replacementMutex); replacementBuffer[replEnd++] = c; /* save read char in buffer*/ replEnd %= BUFFER_SIZE; replLen++; pthread_cond_signal(&replacementConsume); /* wake consumer */ /* release the lock */ pthread_mutex_unlock(&replacementMutex); } pthread_exit(NULL); /* terminate thread */ } void *plusSignThread(void *arg) { int c; char buffer[2]; int i, n = 0; int terminate = 0; while(!terminate) { /* consumer part*/ pthread_mutex_lock(&replacementMutex); /* acquire lock */ /* wait until there's a character*/ while (replLen == 0) pthread_cond_wait(&replacementConsume, &replacementMutex); c = replacementBuffer[replStart++]; /* read char from input buffer*/ replStart %= BUFFER_SIZE; replLen--; pthread_cond_signal(&replacementProduce); /* wake producer */ /* release the lock */ pthread_mutex_unlock(&replacementMutex); buffer[n++] = c; if (c == '+') /* if detected '+' */ { if (n == 1) { continue; } else { buffer[0] = '^'; /* replace for '^' */ n = 1; } } else if (c == 0) /* if end of input */ terminate = 1; for (i = 0; i < n; i++) /* send 1 or 2 chars */ { /* producer part */ pthread_mutex_lock(&outputMutex); /* wait until there's space to save a character*/ while (outputLen >= BUFFER_SIZE) pthread_cond_wait(&outputProduce, &outputMutex); outputBuffer[outputEnd++] = buffer[i]; /* save read char in buffer*/ outputEnd %= BUFFER_SIZE; outputLen++; pthread_cond_signal(&outputConsume); /* wake consumer */ /* release the lock */ pthread_mutex_unlock(&outputMutex); } n = 0; } pthread_exit(NULL); /* terminate thread */ } void *outputThread(void *arg) { char printBuffer[81]; int printChars = 0; int c; while(1) { pthread_mutex_lock(&outputMutex); /* wait until there's a character*/ while (outputLen == 0) pthread_cond_wait(&outputConsume, &outputMutex); c = outputBuffer[outputStart++]; /* read char from input buffer*/ outputStart %= BUFFER_SIZE; outputLen--; pthread_cond_signal(&outputProduce); /* wake producer */ /* release the lock */ pthread_mutex_unlock(&outputMutex); if (c == 0) /* input has ended */ break; /* terminate thread */ printBuffer[printChars++] = c; /* save char in buffer to be printed */ if (printChars >= 80) /* print every 80 characters */ { printBuffer[printChars] = '\n'; write(STDOUT_FILENO, printBuffer, 81); printChars = 0; } } pthread_exit(NULL); /* terminate thread */ } int main() { int i; int status; pthread_t threads[4]; /* array for holding the 4 threads */ pthread_mutex_init(&inputMutex, NULL); pthread_cond_init (&inputConsume, NULL); pthread_cond_init (&inputProduce, NULL); pthread_mutex_init(&replacementMutex, NULL); pthread_cond_init (&replacementConsume, NULL); pthread_cond_init (&replacementProduce, NULL); pthread_mutex_init(&outputMutex, NULL); pthread_cond_init (&outputConsume, NULL); pthread_cond_init (&outputProduce, NULL); /* create input thread */ status = pthread_create(&threads[0], NULL, inputThread, NULL); if (status) /* if error in creating thread */ { printf("Error creating input thread\n"); return 1; /* exit with error */ } /* create line separator thread */ status = pthread_create(&threads[1], NULL, lineSeparatorThread, NULL); if (status) /* if error in creating thread */ { printf("Error creating line separator thread\n"); return 1; /* exit with error */ } /* create plus sign thread */ status = pthread_create(&threads[2], NULL, plusSignThread, NULL); if (status) /* if error in creating thread */ { printf("Error creating plus sign thread\n"); return 1; /* exit with error */ } /* create output thread */ status = pthread_create(&threads[3], NULL, outputThread, NULL); if (status) /* if error in creating thread */ { printf("Error creating output thread\n"); return 1; /* exit with error */ } /* wait for all threads to complete */ for (i = 0; i < 4; i++) pthread_join(threads[i], NULL); /* destroy mutexes and conditions */ pthread_mutex_destroy(&inputMutex); pthread_cond_destroy(&inputProduce); pthread_cond_destroy(&inputConsume); pthread_mutex_destroy(&replacementMutex); pthread_cond_destroy(&replacementProduce); pthread_cond_destroy(&replacementConsume); pthread_mutex_destroy(&outputMutex); pthread_cond_destroy(&outputProduce); pthread_cond_destroy(&outputConsume); return 0; }