+1 (315) 557-6473 

Process Data in Multiple Threads with A Single Output Thread in C Assignment Solutions.


Instructions

Objective
Write a program to process data in multiple threads with a single output thread in C assignment solutions.

Requirements and Specifications

Process data in multiple threads with single output thread in C language

Screenshots of output

Process data in multiple threads with single output thread in C language 1

Source Code

#include <stdio.h>

#include <unistd.h>

#include <pthread.h>

#define BUFFER_SIZE 100 /* size of input buffer */

char inputBuffer[BUFFER_SIZE]; /* input buffer */

pthread_mutex_t inputMutex; /* mutex to access buffer */

pthread_cond_t inputProduce; /* condition to access buffer */

pthread_cond_t inputConsume; /* condition to access buffer */

int buffStart = 0; /* variables to keep track of reads and writes to the buffer */

int buffEnd = 0;

int buffLen = 0;

char replacementBuffer[BUFFER_SIZE]; /* replacement buffer */

pthread_mutex_t replacementMutex; /* mutex to access buffer */

pthread_cond_t replacementProduce; /* condition to access buffer */

pthread_cond_t replacementConsume; /* condition to access buffer */

int replStart = 0; /* variables to keep track of reads and writes to the buffer */

int replEnd = 0;

int replLen = 0;

char outputBuffer[BUFFER_SIZE]; /* output buffer */

pthread_mutex_t outputMutex; /* mutex to access buffer */

pthread_cond_t outputProduce; /* condition to access buffer */

pthread_cond_t outputConsume; /* condition to access buffer */

int outputStart = 0; /* variables to keep track of reads and writes to the buffer */

int outputEnd = 0;

int outputLen = 0;

void *inputThread(void *arg)

{

    int c;

    char *stop = "STOP\n";

    char buffer[5];

    int i, n = 0, start = 1;

    int terminate = 0;

    while(!terminate)

    {

        read(STDIN_FILENO, &c, 1);

        buffer[n++] = c; /* save in initial buffer */

        if (c == '\n') /* if detected end of line */

        {

            start = 1;

        }

        else if (start) /* we are at start of new line */

        {

            start = 0;

            if (c == 'S') /* could be stop */

                continue; /* accumulate next chars */

        }

        if (n > 1 && c == stop[n - 1]) /* we are still inside a stop */

        {

            if (n == 5) /* if we matched the whole stop */

            {

                n = 1; /* send a single char */

                buffer[0] = 0; /* send end of input */

                terminate = 1; /* terminate after sending end of input */

            }

            else

                continue; /* else, keep adding chars */

        }

        for (i = 0; i < n; i++) /* send all accumulated chars */

        {

            pthread_mutex_lock(&inputMutex);

            /* wait until there's space to save a character*/

            while (buffLen >= BUFFER_SIZE)

                pthread_cond_wait(&inputProduce, &inputMutex);

            inputBuffer[buffEnd++] = buffer[i]; /* save read char in buffer*/

            buffEnd %= BUFFER_SIZE;

            buffLen++;

            pthread_cond_signal(&inputConsume); /* wake consumer */

            /* release the lock */

            pthread_mutex_unlock(&inputMutex);

        }

        n = 0; /* reset buffer */

    }

    pthread_exit(NULL); /* terminate thread */

}

void *lineSeparatorThread(void *arg)

{

    int c;

    int terminate = 0;

    while(!terminate)

    {

        /* consumer part*/

        pthread_mutex_lock(&inputMutex); /* acquire lock */

        /* wait until there's a character*/

        while (buffLen == 0)

            pthread_cond_wait(&inputConsume, &inputMutex);

        c = inputBuffer[buffStart++]; /* read char from input buffer*/

        buffStart %= BUFFER_SIZE;

        buffLen--;

        pthread_cond_signal(&inputProduce); /* wake producer */

        /* release the lock */

        pthread_mutex_unlock(&inputMutex);

        if (c == '\n') /* make the replacement */

            c = ' ';

        else if (c == 0) /* end of input*/

            terminate = 1; /* terminate after sending end of input */

        /* producer part */

        pthread_mutex_lock(&replacementMutex);

        /* wait until there's space to save a character*/

        while (replLen >= BUFFER_SIZE)

            pthread_cond_wait(&replacementProduce, &replacementMutex);

        replacementBuffer[replEnd++] = c; /* save read char in buffer*/

        replEnd %= BUFFER_SIZE;

        replLen++;

        pthread_cond_signal(&replacementConsume); /* wake consumer */

        /* release the lock */

        pthread_mutex_unlock(&replacementMutex);

    }

    pthread_exit(NULL); /* terminate thread */

}

void *plusSignThread(void *arg)

{

    int c;

    char buffer[2];

    int i, n = 0;

    int terminate = 0;

    while(!terminate)

    {

        /* consumer part*/

        pthread_mutex_lock(&replacementMutex); /* acquire lock */

        /* wait until there's a character*/

        while (replLen == 0)

            pthread_cond_wait(&replacementConsume, &replacementMutex);

        c = replacementBuffer[replStart++]; /* read char from input buffer*/

        replStart %= BUFFER_SIZE;

        replLen--;

        pthread_cond_signal(&replacementProduce); /* wake producer */

        /* release the lock */

        pthread_mutex_unlock(&replacementMutex);

        buffer[n++] = c;

        if (c == '+') /* if detected '+' */

        {

            if (n == 1)

            {

                continue;

            }

            else

            {

                buffer[0] = '^'; /* replace for '^' */

                n = 1;

            }

        }

        else if (c == 0) /* if end of input */

            terminate = 1;

        for (i = 0; i < n; i++) /* send 1 or 2 chars */

        {

            /* producer part */

            pthread_mutex_lock(&outputMutex);

            /* wait until there's space to save a character*/

            while (outputLen >= BUFFER_SIZE)

                pthread_cond_wait(&outputProduce, &outputMutex);

            outputBuffer[outputEnd++] = buffer[i]; /* save read char in buffer*/

            outputEnd %= BUFFER_SIZE;

            outputLen++;

            pthread_cond_signal(&outputConsume); /* wake consumer */

            /* release the lock */

            pthread_mutex_unlock(&outputMutex);

        }

        n = 0;

    }

    pthread_exit(NULL); /* terminate thread */

}

void *outputThread(void *arg)

{

    char printBuffer[81];

    int printChars = 0;

    int c;

    while(1)

    {

        pthread_mutex_lock(&outputMutex);

        /* wait until there's a character*/

        while (outputLen == 0)

            pthread_cond_wait(&outputConsume, &outputMutex);

        c = outputBuffer[outputStart++]; /* read char from input buffer*/

        outputStart %= BUFFER_SIZE;

        outputLen--;

        pthread_cond_signal(&outputProduce); /* wake producer */

        /* release the lock */

        pthread_mutex_unlock(&outputMutex);

        if (c == 0) /* input has ended */

            break; /* terminate thread */

        printBuffer[printChars++] = c; /* save char in buffer to be printed */

        if (printChars >= 80) /* print every 80 characters */

        {

            printBuffer[printChars] = '\n';

            write(STDOUT_FILENO, printBuffer, 81);

            printChars = 0;

        }

    }

    pthread_exit(NULL); /* terminate thread */

}

int main()

{

    int i;

    int status;

    pthread_t threads[4]; /* array for holding the 4 threads */

    pthread_mutex_init(&inputMutex, NULL);

    pthread_cond_init (&inputConsume, NULL);

    pthread_cond_init (&inputProduce, NULL);

    pthread_mutex_init(&replacementMutex, NULL);

    pthread_cond_init (&replacementConsume, NULL);

    pthread_cond_init (&replacementProduce, NULL);

    pthread_mutex_init(&outputMutex, NULL);

    pthread_cond_init (&outputConsume, NULL);

    pthread_cond_init (&outputProduce, NULL);

    /* create input thread */

    status = pthread_create(&threads[0], NULL, inputThread, NULL);

    if (status) /* if error in creating thread */

    {

        printf("Error creating input thread\n");

        return 1; /* exit with error */

    }

    /* create line separator thread */

    status = pthread_create(&threads[1], NULL, lineSeparatorThread, NULL);

    if (status) /* if error in creating thread */

    {

        printf("Error creating line separator thread\n");

        return 1; /* exit with error */

    }

    /* create plus sign thread */

    status = pthread_create(&threads[2], NULL, plusSignThread, NULL);

    if (status) /* if error in creating thread */

    {

        printf("Error creating plus sign thread\n");

        return 1; /* exit with error */

    }

    /* create output thread */

    status = pthread_create(&threads[3], NULL, outputThread, NULL);

    if (status) /* if error in creating thread */

    {

        printf("Error creating output thread\n");

        return 1; /* exit with error */

    }

    /* wait for all threads to complete */

    for (i = 0; i < 4; i++)

        pthread_join(threads[i], NULL);

    /* destroy mutexes and conditions */

    pthread_mutex_destroy(&inputMutex);

    pthread_cond_destroy(&inputProduce);

    pthread_cond_destroy(&inputConsume);

    pthread_mutex_destroy(&replacementMutex);

    pthread_cond_destroy(&replacementProduce);

    pthread_cond_destroy(&replacementConsume);

    pthread_mutex_destroy(&outputMutex);

    pthread_cond_destroy(&outputProduce);

    pthread_cond_destroy(&outputConsume);

    return 0;

}