1#include <stdlib.h> // exit_failure, exit_success
2#include <stdio.h> // stdin, stdout, printf
3#include <pthread.h> // threads
4#include <string.h> // string
5#include <unistd.h> // sleep
6#include <stdbool.h> // bool
7#include <fcntl.h> // open
8
9
10struct event {
11 pthread_mutex_t critical;
12 pthread_mutex_t signalM;
13 pthread_cond_t signalC;
14 int eventCount;
15};
16
17struct allVars {
18 struct event inEvents;
19 struct event outEvents;
20 int bufferSize;
21 char buffer[10][128];
22};
23
24
25
26
27/**
28 * Advance the EventCount
29 */
30void advanceIn(struct event *inEvents) {
31 // increment the inEvents counter
32 pthread_mutex_lock(&inEvents->critical);
33 inEvents->eventCount++;
34 pthread_mutex_unlock(&inEvents->critical);
35
36 // signal await to continue
37 pthread_mutex_lock(&inEvents->signalM);
38 pthread_cond_signal(&inEvents->signalC);
39 pthread_mutex_unlock(&inEvents->signalM);
40}
41void advanceOut(struct event *outEvents) {
42 // increment the outEvents counter
43 pthread_mutex_lock(&outEvents->critical);
44 outEvents->eventCount++;
45 pthread_mutex_unlock(&outEvents->critical);
46
47 // signal await to continue
48 pthread_mutex_lock(&outEvents->signalM);
49 pthread_cond_signal(&outEvents->signalC);
50 pthread_mutex_unlock(&outEvents->signalM);
51}
52
53
54
55/**
56 * Wait for ticket and buffer availability
57 */
58void awaitIn(struct event *inEvents, int ticket) {
59
60 int eventCount;
61
62 // get the counter
63 pthread_mutex_lock(&inEvents->critical);
64 eventCount = inEvents->eventCount;
65 pthread_mutex_unlock(&inEvents->critical);
66
67 // lock signaling mutex
68 pthread_mutex_lock(&inEvents->signalM);
69
70 // loop until the ticket machine shows your number
71 while (ticket > eventCount) {
72 // wait until a ticket is called
73 pthread_cond_wait(&inEvents->signalC, &inEvents->signalM);
74
75 // get the counter
76 pthread_mutex_lock(&inEvents->critical);
77 eventCount = inEvents->eventCount;
78 pthread_mutex_unlock(&inEvents->critical);
79 }
80
81 // unlock signaling mutex
82 pthread_mutex_unlock(&inEvents->signalM);
83}
84void awaitOut(struct event *outEvents, int ticket) {
85
86 int eventCount;
87
88 // get the counter
89 pthread_mutex_lock(&outEvents->critical);
90 eventCount = outEvents->eventCount;
91 pthread_mutex_unlock(&outEvents->critical);
92
93 // lock signaling mutex
94 pthread_mutex_lock(&outEvents->signalM);
95
96 // loop until the ticket machine shows your number
97 while (ticket > eventCount) {
98 // wait until a ticket is called
99 pthread_cond_wait(&outEvents->signalC, &outEvents->signalM);
100
101 // get the counter
102 pthread_mutex_lock(&outEvents->critical);
103 eventCount = outEvents->eventCount;
104 pthread_mutex_unlock(&outEvents->critical);
105 }
106
107 // unlock signaling mutex
108 pthread_mutex_unlock(&outEvents->signalM);
109}
110
111
112
113/**
114 * Add to buffer
115 */
116void putBuffer(struct allVars *allVars, char data[]) {
117 // get the current write position
118 pthread_mutex_lock(&allVars->inEvents.critical);
119 int in = allVars->inEvents.eventCount;
120 pthread_mutex_unlock(&allVars->inEvents.critical);
121
122 // wait until theres a space free in the buffer
123 awaitOut(&allVars->outEvents, in - allVars->bufferSize + 1); // set to 2 to keep 1 index distance
124
125 // add data to buffer
126 strcpy(allVars->buffer[in % allVars->bufferSize], data);
127
128 // increment the eventCounter and signal
129 advanceIn(&allVars->inEvents);
130}
131
132
133
134/**
135 * Get from buffer
136 */
137char *getBuffer(struct allVars *allVars) {
138 // get the current read position
139 pthread_mutex_lock(&allVars->outEvents.critical);
140 int out = allVars->outEvents.eventCount;
141 pthread_mutex_unlock(&allVars->outEvents.critical);
142
143 // wait until theres something in the buffer
144 awaitIn(&allVars->inEvents, out + 1);
145
146 // allocate memory for returned string
147 char *str = malloc(128);
148
149 // get the buffer data
150 strcpy(str, allVars->buffer[out % allVars->bufferSize]);
151
152 // increment the eventCounter and signal
153 advanceOut(&allVars->outEvents);
154
155 return str;
156}
157
158
159
160/** child thread (producer) */
161void *childThread(void *allVars) {
162 char str[12];
163 int count = 0;
164
165 while (true) {
166 sprintf(str, "%d", count++);
167 putBuffer(allVars, str);
168 }
169
170 pthread_exit(EXIT_SUCCESS);
171}
172
173/** child2 thread (consumer) */
174void *childThread2(void *allVars) {
175 char str[10];
176 int count = 0;
177
178 while (true) {
179 char *out = getBuffer(allVars);
180 printf("buf: %s\n", out);
181 free(out);
182 }
183
184 pthread_exit(EXIT_SUCCESS);
185}
186
187
188
189int main(int argc, char *argv[]) {
190 if (argc != 2) {
191 fprintf(stderr, "Usage: %s <bufferrSize>\n", argv[0]);
192 exit(1);
193 }
194 int bufferSize = atoi(argv[1]);
195
196 // init structs
197 struct event inEvents = {
198 PTHREAD_MUTEX_INITIALIZER,
199 PTHREAD_MUTEX_INITIALIZER,
200 PTHREAD_COND_INITIALIZER,
201 0
202 };
203 struct event outEvents = {
204 PTHREAD_MUTEX_INITIALIZER,
205 PTHREAD_MUTEX_INITIALIZER,
206 PTHREAD_COND_INITIALIZER,
207 0
208 };
209 struct allVars allVars = {
210 inEvents, // events
211 outEvents,
212 -1, // bufferSize
213 {"", {""}} // buffer[][]
214 };
215 allVars.bufferSize = bufferSize;
216 // NOTE: allVars will have copied PTHREAD_MUTEX_INTIIALIZER and COND
217#if 1
218 // Added to also use XXX_init functions; Not strictly necessary.
219 pthread_mutex_init(&allVars.inEvents.critical, NULL);
220 pthread_mutex_init(&allVars.inEvents.signalM, NULL);
221 pthread_mutex_init(&allVars.outEvents.critical, NULL);
222 pthread_mutex_init(&allVars.outEvents.signalM, NULL);
223 pthread_cond_init(&allVars.inEvents.signalC, NULL);
224 pthread_cond_init(&allVars.outEvents.signalC, NULL);
225#endif
226
227 // create child thread (producer)
228 pthread_t thread;
229 if (pthread_create(&thread, NULL, childThread, &allVars)) {
230 fprintf(stderr, "failed to create child thread");
231 exit(EXIT_FAILURE);
232 }
233 pthread_t thread2;
234 if (pthread_create(&thread2, NULL, childThread2, &allVars)) {
235 fprintf(stderr, "failed to create child2 thread");
236 exit(EXIT_FAILURE);
237 }
238
239 pthread_join(thread, NULL);
240 pthread_join(thread2, NULL);
241
242
243#if 0
244 // (consumer)
245 while (true) {
246 char *out = getBuffer(&allVars);
247 printf("buf: %s\n", out);
248 free(out);
249 }
250#endif
251
252
253 return (EXIT_SUCCESS);
254}