1- #include <stdalign.h>
21#include <stdio.h>
32#include <stdatomic.h>
43#include <threads.h>
54#include <stdlib.h>
65#include <stdbool.h>
76#include <assert.h>
7+ #include <math.h>
88
9+ #define PRECISION 100 /* upper bound in BPP sum */
910#define CACHE_LINE_SIZE 64
10- #define N_JOBS 16
11- #define N_THREADS 8
11+ #define N_THREADS 64
12+
13+ struct tpool_future {
14+ void * result ;
15+ void * arg ;
16+ atomic_flag flag ;
17+ };
1218
1319typedef struct job {
14- void * args ;
20+ void * (* func )(void * );
21+ struct tpool_future * future ;
1522 struct job * next , * prev ;
1623} job_t ;
1724
@@ -21,64 +28,93 @@ typedef struct idle_job {
2128 _Atomic (job_t * ) prev ;
2229 unsigned long long version ;
2330 };
24- _Atomic struct B16 {
25- job_t * _prev ;
31+ _Atomic struct versioned_prev {
32+ job_t * ptr ;
2633 unsigned long long _version ;
27- } DCAS ;
34+ } v_prev ;
2835 };
2936 char padding [CACHE_LINE_SIZE - sizeof (_Atomic (job_t * )) -
30- sizeof (unsigned long long )];
37+ sizeof (unsigned long long )]; /* avoid false sharing */
3138 job_t job ;
3239} idle_job_t ;
3340
3441enum state { idle , running , cancelled };
3542
36- typedef struct thread_pool {
43+ typedef struct tpool {
3744 atomic_flag initialezed ;
3845 int size ;
3946 thrd_t * pool ;
4047 atomic_int state ;
4148 thrd_start_t func ;
42- // job queue is a SPMC ring buffer
43- idle_job_t * head ;
44- } thread_pool_t ;
49+ idle_job_t * head ; /* job queue is a SPMC ring buffer */
50+ } tpool_t ;
51+
52+ static struct tpool_future * tpool_future_create (void * arg )
53+ {
54+ struct tpool_future * future = malloc (sizeof (struct tpool_future ));
55+ if (future ) {
56+ future -> result = NULL ;
57+ future -> arg = arg ;
58+ atomic_flag_clear (& future -> flag );
59+ atomic_flag_test_and_set (& future -> flag );
60+ }
61+ return future ;
62+ }
63+
64+ void tpool_future_wait (struct tpool_future * future )
65+ {
66+ while (atomic_flag_test_and_set (& future -> flag ))
67+ ;
68+ }
69+
70+ void tpool_future_destroy (struct tpool_future * future )
71+ {
72+ free (future -> result );
73+ free (future );
74+ }
4575
4676static int worker (void * args )
4777{
4878 if (!args )
4979 return EXIT_FAILURE ;
50- thread_pool_t * thrd_pool = (thread_pool_t * )args ;
80+ tpool_t * thrd_pool = (tpool_t * )args ;
5181
5282 while (1 ) {
83+ /* worker is laid off */
5384 if (atomic_load (& thrd_pool -> state ) == cancelled )
5485 return EXIT_SUCCESS ;
5586 if (atomic_load (& thrd_pool -> state ) == running ) {
56- // claim the job
57- struct B16 job = atomic_load (& thrd_pool -> head -> DCAS );
58- struct B16 next ;
87+ /* worker takes the job */
88+ struct versioned_prev job = atomic_load (& thrd_pool -> head -> v_prev );
89+ /* worker checks if there is only an idle job in the job queue */
90+ if (job .ptr == & thrd_pool -> head -> job ) {
91+ /* worker says it is idle */
92+ atomic_store (& thrd_pool -> state , idle );
93+ thrd_yield ();
94+ continue ;
95+ }
96+
97+ struct versioned_prev next ;
98+ /* compare 16 byte at once */
5999 do {
60- next ._prev = job ._prev -> prev ;
100+ next .ptr = job .ptr -> prev ;
61101 next ._version = job ._version ;
62- } while (!atomic_compare_exchange_weak (& thrd_pool -> head -> DCAS , & job ,
63- next ));
102+ } while (!atomic_compare_exchange_weak (& thrd_pool -> head -> v_prev ,
103+ & job , next ));
64104
65- if (job ._prev -> args == NULL ) {
66- atomic_store (& thrd_pool -> state , idle );
67- } else {
68- printf ("Hello from job %d\n" , * (int * )job ._prev -> args );
69- free (job ._prev -> args );
70- free (job ._prev ); // could cause dangling pointer in other threads
71- }
105+ job .ptr -> future -> result =
106+ (void * )job .ptr -> func (job .ptr -> future -> arg );
107+ atomic_flag_clear (& job .ptr -> future -> flag );
108+ free (job .ptr );
72109 } else {
73- /* To auto run when jobs added, set status to running if job queue is not empty.
74- * As long as the producer is protected */
110+ /* worker is idle */
75111 thrd_yield ();
76- continue ;
77112 }
78113 };
114+ return EXIT_SUCCESS ;
79115}
80116
81- static bool thread_pool_init ( thread_pool_t * thrd_pool , size_t size )
117+ static bool tpool_init ( tpool_t * thrd_pool , size_t size )
82118{
83119 if (atomic_flag_test_and_set (& thrd_pool -> initialezed )) {
84120 printf ("This thread pool has already been initialized.\n" );
@@ -92,14 +128,13 @@ static bool thread_pool_init(thread_pool_t *thrd_pool, size_t size)
92128 return false;
93129 }
94130
95- // May use memory pool for jobs
96131 idle_job_t * idle_job = malloc (sizeof (idle_job_t ));
97132 if (!idle_job ) {
98133 printf ("Failed to allocate idle job.\n" );
99134 return false;
100135 }
101- // idle_job will always be the first job
102- idle_job -> job . args = NULL ;
136+
137+ /* idle_job will always be the first job */
103138 idle_job -> job .next = & idle_job -> job ;
104139 idle_job -> job .prev = & idle_job -> job ;
105140 idle_job -> prev = & idle_job -> job ;
@@ -109,21 +144,21 @@ static bool thread_pool_init(thread_pool_t *thrd_pool, size_t size)
109144 thrd_pool -> state = idle ;
110145 thrd_pool -> size = size ;
111146
112- for (size_t i = 0 ; i < size ; i ++ ) {
147+ /* employer hires many workers */
148+ for (size_t i = 0 ; i < size ; i ++ )
113149 thrd_create (thrd_pool -> pool + i , worker , thrd_pool );
114- //TODO: error handling
115- }
116150
117151 return true;
118152}
119153
120- static void thread_pool_destroy ( thread_pool_t * thrd_pool )
154+ static void tpool_destroy ( tpool_t * thrd_pool )
121155{
122156 if (atomic_exchange (& thrd_pool -> state , cancelled ))
123157 printf ("Thread pool cancelled with jobs still running.\n" );
124- for (int i = 0 ; i < thrd_pool -> size ; i ++ ) {
158+
159+ for (int i = 0 ; i < thrd_pool -> size ; i ++ )
125160 thrd_join (thrd_pool -> pool [i ], NULL );
126- }
161+
127162 while (thrd_pool -> head -> prev != & thrd_pool -> head -> job ) {
128163 job_t * job = thrd_pool -> head -> prev -> prev ;
129164 free (thrd_pool -> head -> prev );
@@ -135,58 +170,89 @@ static void thread_pool_destroy(thread_pool_t *thrd_pool)
135170 atomic_flag_clear (& thrd_pool -> initialezed );
136171}
137172
138- __attribute__((nonnull (2 ))) static bool add_job (thread_pool_t * thrd_pool ,
139- void * args )
173+ /* Use Bailey–Borwein–Plouffe formula to approximate PI */
174+ static void * bbp (void * arg )
175+ {
176+ int k = * (int * )arg ;
177+ double sum = (4.0 / (8 * k + 1 )) - (2.0 / (8 * k + 4 )) -
178+ (1.0 / (8 * k + 5 )) - (1.0 / (8 * k + 6 ));
179+ double * product = malloc (sizeof (double ));
180+ if (!product )
181+ return NULL ;
182+
183+ * product = 1 / pow (16 , k ) * sum ;
184+ return (void * )product ;
185+ }
186+
187+ struct tpool_future * add_job (tpool_t * thrd_pool , void * (* func )(void * ),
188+ void * arg )
140189{
141- // May use memory pool for jobs
142190 job_t * job = malloc (sizeof (job_t ));
143191 if (!job )
144- return false;
192+ return NULL ;
193+
194+ struct tpool_future * future = tpool_future_create (arg );
195+ if (!future ) {
196+ free (job );
197+ return NULL ;
198+ }
145199
146- // unprotected producer
147- job -> args = args ;
200+ job -> func = func ;
201+ job -> future = future ;
148202 job -> next = thrd_pool -> head -> job .next ;
149203 job -> prev = & thrd_pool -> head -> job ;
150204 thrd_pool -> head -> job .next -> prev = job ;
151205 thrd_pool -> head -> job .next = job ;
152206 if (thrd_pool -> head -> prev == & thrd_pool -> head -> job ) {
153207 thrd_pool -> head -> prev = job ;
154208 thrd_pool -> head -> version += 1 ;
155- // trap worker at idle job
209+ /* the previous job of the idle job is itself */
156210 thrd_pool -> head -> job .prev = & thrd_pool -> head -> job ;
157211 }
158-
159- return true;
212+ return future ;
160213}
161214
162- static inline void wait_until (thread_pool_t * thrd_pool , int state )
215+ static inline void wait_until (tpool_t * thrd_pool , int state )
163216{
164- while (atomic_load (& thrd_pool -> state ) != state ) {
217+ while (atomic_load (& thrd_pool -> state ) != state )
165218 thrd_yield ();
166- }
167219}
168220
169221int main ()
170222{
171- thread_pool_t thrd_pool = { .initialezed = ATOMIC_FLAG_INIT };
172- if (!thread_pool_init (& thrd_pool , N_THREADS )) {
223+ int bbp_args [PRECISION ];
224+ struct tpool_future * futures [PRECISION ];
225+ double bbp_sum = 0 ;
226+
227+ tpool_t thrd_pool = { .initialezed = ATOMIC_FLAG_INIT };
228+ if (!tpool_init (& thrd_pool , N_THREADS )) {
173229 printf ("failed to init.\n" );
174230 return 0 ;
175231 }
176- for (int i = 0 ; i < N_JOBS ; i ++ ) {
177- int * id = malloc (sizeof (int ));
178- * id = i ;
179- add_job (& thrd_pool , id );
180- }
181- // Due to simplified job queue (not protecting producer), starting the pool manually
232+ /* employer ask workers to work */
182233 atomic_store (& thrd_pool .state , running );
234+
235+ /* employer wait ... until workers are idle */
183236 wait_until (& thrd_pool , idle );
184- for (int i = 0 ; i < N_JOBS ; i ++ ) {
185- int * id = malloc (sizeof (int ));
186- * id = i ;
187- add_job (& thrd_pool , id );
237+
238+ /* employer add more job to the job queue */
239+ for (int i = 0 ; i < PRECISION ; i ++ ) {
240+ bbp_args [i ] = i ;
241+ futures [i ] = add_job (& thrd_pool , bbp , & bbp_args [i ]);
188242 }
243+
244+ /* employer ask workers to work */
189245 atomic_store (& thrd_pool .state , running );
190- thread_pool_destroy (& thrd_pool );
246+
247+ /* employer wait for the result of job */
248+ for (int i = 0 ; i < PRECISION ; i ++ ) {
249+ tpool_future_wait (futures [i ]);
250+ bbp_sum += * (double * )(futures [i ]-> result );
251+ tpool_future_destroy (futures [i ]);
252+ }
253+
254+ /* employer destroys the job queue and lays workers off */
255+ tpool_destroy (& thrd_pool );
256+ printf ("PI calculated with %d terms: %.15f\n" , PRECISION , bbp_sum );
191257 return 0 ;
192258}
0 commit comments