Tutorial code: subtle.c (producer/consumer using condition variables)ΒΆ

  1#include <stdlib.h>                     // exit_failure, exit_success
  2#include <stdio.h>                      // stdin, stdout, printf
  3#include <pthread.h>                    // threads
  4#include <string.h>                     // string
  5#include <unistd.h>                     // sleep
  6#include <stdbool.h>                    // bool
  7#include <fcntl.h>                      // open
  8
  9
 10struct event {
 11    pthread_mutex_t critical;
 12    pthread_mutex_t signalM;
 13    pthread_cond_t signalC;
 14    int eventCount;
 15};
 16
 17struct allVars {
 18    struct event inEvents;
 19    struct event outEvents;
 20    int bufferSize;
 21    char buffer[10][128];
 22};
 23
 24
 25
 26
 27/**
 28 * Advance the EventCount
 29 */
 30void advanceIn(struct event *inEvents) {
 31    // increment the inEvents counter
 32    pthread_mutex_lock(&inEvents->critical);
 33    inEvents->eventCount++;
 34    pthread_mutex_unlock(&inEvents->critical);
 35
 36    // signal await to continue
 37    pthread_mutex_lock(&inEvents->signalM);
 38    pthread_cond_signal(&inEvents->signalC);
 39    pthread_mutex_unlock(&inEvents->signalM);
 40}
 41void advanceOut(struct event *outEvents) {
 42    // increment the outEvents counter
 43    pthread_mutex_lock(&outEvents->critical);
 44    outEvents->eventCount++;
 45    pthread_mutex_unlock(&outEvents->critical);
 46
 47    // signal await to continue
 48    pthread_mutex_lock(&outEvents->signalM);
 49    pthread_cond_signal(&outEvents->signalC);
 50    pthread_mutex_unlock(&outEvents->signalM);
 51}
 52
 53
 54
 55/**
 56 * Wait for ticket and buffer availability
 57 */
 58void awaitIn(struct event *inEvents, int ticket) {
 59
 60    int eventCount;
 61
 62    // get the counter
 63    pthread_mutex_lock(&inEvents->critical);
 64    eventCount = inEvents->eventCount;
 65    pthread_mutex_unlock(&inEvents->critical);
 66
 67    // lock signaling mutex
 68    pthread_mutex_lock(&inEvents->signalM);
 69
 70    // loop until the ticket machine shows your number
 71    while (ticket > eventCount) {
 72        // wait until a ticket is called
 73        pthread_cond_wait(&inEvents->signalC, &inEvents->signalM);
 74
 75        // get the counter
 76        pthread_mutex_lock(&inEvents->critical);
 77        eventCount = inEvents->eventCount;
 78        pthread_mutex_unlock(&inEvents->critical);
 79    }
 80
 81    // unlock signaling mutex
 82    pthread_mutex_unlock(&inEvents->signalM);
 83}
 84void awaitOut(struct event *outEvents, int ticket) {
 85
 86    int eventCount;
 87
 88    // get the counter
 89    pthread_mutex_lock(&outEvents->critical);
 90    eventCount = outEvents->eventCount;
 91    pthread_mutex_unlock(&outEvents->critical);
 92
 93    // lock signaling mutex
 94    pthread_mutex_lock(&outEvents->signalM);
 95
 96    // loop until the ticket machine shows your number
 97    while (ticket > eventCount) {
 98        // wait until a ticket is called
 99        pthread_cond_wait(&outEvents->signalC, &outEvents->signalM);
100
101        // get the counter
102        pthread_mutex_lock(&outEvents->critical);
103        eventCount = outEvents->eventCount;
104        pthread_mutex_unlock(&outEvents->critical);
105    }
106
107    // unlock signaling mutex
108    pthread_mutex_unlock(&outEvents->signalM);
109}
110
111
112
113/**
114 * Add to buffer
115 */
116void putBuffer(struct allVars *allVars, char data[]) {
117    // get the current write position
118    pthread_mutex_lock(&allVars->inEvents.critical);
119    int in = allVars->inEvents.eventCount;
120    pthread_mutex_unlock(&allVars->inEvents.critical);
121
122    // wait until theres a space free in the buffer
123    awaitOut(&allVars->outEvents, in - allVars->bufferSize + 1);   // set to 2 to keep 1 index distance
124
125    // add data to buffer
126    strcpy(allVars->buffer[in % allVars->bufferSize], data);
127
128    // increment the eventCounter and signal
129    advanceIn(&allVars->inEvents);
130}
131
132
133
134/**
135 * Get from buffer
136 */
137char *getBuffer(struct allVars *allVars) {
138    // get the current read position
139    pthread_mutex_lock(&allVars->outEvents.critical);
140    int out = allVars->outEvents.eventCount;
141    pthread_mutex_unlock(&allVars->outEvents.critical);
142
143    // wait until theres something in the buffer
144    awaitIn(&allVars->inEvents, out + 1);
145
146    // allocate memory for returned string
147    char *str = malloc(128);
148
149    // get the buffer data
150    strcpy(str, allVars->buffer[out % allVars->bufferSize]);
151
152    // increment the eventCounter and signal
153    advanceOut(&allVars->outEvents);
154
155    return str;
156}
157
158
159
160/** child thread (producer) */
161void *childThread(void *allVars) {
162    char str[12];
163    int count = 0;
164
165    while (true) {
166        sprintf(str, "%d", count++);
167        putBuffer(allVars, str);
168    }
169
170    pthread_exit(EXIT_SUCCESS);
171}
172
173/** child2 thread (consumer) */
174void *childThread2(void *allVars) {
175    char str[10];
176    int count = 0;
177
178    while (true) {
179        char *out = getBuffer(allVars);
180        printf("buf: %s\n", out);
181        free(out);
182    }
183
184    pthread_exit(EXIT_SUCCESS);
185}
186
187
188
189int main(int argc, char *argv[]) {
190    if (argc != 2) {
191      fprintf(stderr, "Usage: %s <bufferrSize>\n", argv[0]);
192      exit(1);
193    }
194    int bufferSize = atoi(argv[1]);
195
196    // init structs
197    struct event inEvents = {
198        PTHREAD_MUTEX_INITIALIZER,
199        PTHREAD_MUTEX_INITIALIZER,
200        PTHREAD_COND_INITIALIZER,
201        0
202    };
203    struct event outEvents = {
204        PTHREAD_MUTEX_INITIALIZER,
205        PTHREAD_MUTEX_INITIALIZER,
206        PTHREAD_COND_INITIALIZER,
207        0
208    };
209    struct allVars allVars = {
210        inEvents,       // events
211        outEvents,
212        -1,             // bufferSize
213        {"", {""}}      // buffer[][]
214    };
215    allVars.bufferSize = bufferSize;
216    // NOTE: allVars will have copied PTHREAD_MUTEX_INTIIALIZER and COND
217#if 1
218    // Added to also use XXX_init functions; Not strictly necessary.
219    pthread_mutex_init(&allVars.inEvents.critical, NULL);
220    pthread_mutex_init(&allVars.inEvents.signalM, NULL);
221    pthread_mutex_init(&allVars.outEvents.critical, NULL);
222    pthread_mutex_init(&allVars.outEvents.signalM, NULL);
223    pthread_cond_init(&allVars.inEvents.signalC, NULL);
224    pthread_cond_init(&allVars.outEvents.signalC, NULL);
225#endif
226
227    // create child thread (producer)
228    pthread_t thread;
229    if (pthread_create(&thread, NULL, childThread, &allVars)) {
230        fprintf(stderr, "failed to create child thread");
231        exit(EXIT_FAILURE);
232    }
233    pthread_t thread2;
234    if (pthread_create(&thread2, NULL, childThread2, &allVars)) {
235        fprintf(stderr, "failed to create child2 thread");
236        exit(EXIT_FAILURE);
237    }
238
239    pthread_join(thread, NULL);
240    pthread_join(thread2, NULL);
241
242
243#if 0
244    // (consumer)
245    while (true) {
246        char *out = getBuffer(&allVars);
247        printf("buf: %s\n", out);
248        free(out);
249    }
250#endif
251
252
253    return (EXIT_SUCCESS);
254}