This is the mail archive of the
pthreads-win32@sources.redhat.com
mailing list for the pthreas-win32 project.
RE: New pthread_once implementation
- From: Vladimir Kliatchko <vladimir at kliatchko dot com>
- To: 'Gottlob Frege' <gottlobfrege at gmail dot com>, pthreads-win32 at sources dot redhat dot com
- Date: Mon, 30 May 2005 09:36:06 -0400
- Subject: RE: New pthread_once implementation
T1 comes in, proceeds to [0064]
T2 comes in, proceeds to [0119]
T1 is cancelled
T3 comes in, loops around, resets the state, proceeds to [0064]
T2 wakes up, proceeds to right before [0125]
T4 comes in and proceeds to right before [0119]
T3 proceeds to right before [0080]
T2,T3,T4 are ready to race for CloseHandle, ReleaseSemaphore, and
WaitForSingleObject respectively
Regaring MCS version:
It can be quite expensing, but only when multiple threads call pthread_once
simultaneously. Also, the overhead is proportional to the number of threads
so that:
1 thread - no overhead
2 simultaneous threads - the same overhead as in semaphore based version
3 or more simultaneous threads - higher overhead - but is this an important
case?
[0001] #define PTHREAD_ONCE_INIT {0, 0, 0, 0}
[0002]
[0003] enum ptw32_once_state {
[0004] PTW32_ONCE_INIT = 0x0,
[0005] PTW32_ONCE_DONE = 0x1,
[0006] PTW32_ONCE_STARTED = 0x2,
[0007] PTW32_ONCE_CANCELLED = 0x3
[0008] };
[0009]
[0010] struct pthread_once_t_
[0011] {
[0012] int state;
[0013] int reserved;
[0014] int numSemaphoreUsers;
[0015] HANDLE semaphore;
[0016] };
[0017]
[0018] static void PTW32_CDECL
[0019] ptw32_once_init_routine_cleanup(void * arg) {
[0020] pthread_once_t * once_control = (pthread_once_t *) arg;
[0021]
[0022] /*
[0023] * Continue to direct new threads into the wait path until the
waiter that we
[0024] * release or a new thread can reset state to INIT.
[0025] */
[0026] (void) PTW32_INTERLOCKED_EXCHANGE((LPLONG)&once_control->state,
(LONG)PTW32_ONCE_CANCELLED);
[0027]
[0028] if (InterlockedExchangeAdd((LPLONG)&once_control->semaphore, 0L))
/* MBR fence */
[0029] {
[0030] ReleaseSemaphore(once_control->semaphore, 1, NULL);
[0031] }
[0032] }
[0033]
[0034] int
[0035] pthread_once (pthread_once_t * once_control, void (*init_routine)
(void)) {
[0036] int result;
[0037] int state;
[0038] HANDLE sema;
[0039]
[0040] if (once_control == NULL || init_routine == NULL)
[0041] {
[0042] result = EINVAL;
[0043] goto FAIL0;
[0044] }
[0045] else
[0046] {
[0047] result = 0;
[0048] }
[0049]
[0050] while ((state = (int)
[0051]
PTW32_INTERLOCKED_COMPARE_EXCHANGE((PTW32_INTERLOCKED_LPLONG)&once_control->
state,
[0052]
(PTW32_INTERLOCKED_LONG)PTW32_ONCE_STARTED,
[0053]
(PTW32_INTERLOCKED_LONG)PTW32_ONCE_INIT))
[0054] != PTW32_ONCE_DONE)
[0055] {
[0056] if (PTW32_ONCE_INIT == state)
[0057] {
[0058]
[0059] #ifdef _MSC_VER
[0060] #pragma inline_depth(0)
[0061] #endif
[0062]
[0063] pthread_cleanup_push(ptw32_once_init_routine_cleanup,
(void *) once_control);
[0064] (*init_routine)();
[0065] pthread_cleanup_pop(0);
[0066]
[0067] #ifdef _MSC_VER
[0068] #pragma inline_depth()
[0069] #endif
[0070]
[0071] (void)
PTW32_INTERLOCKED_EXCHANGE((LPLONG)&once_control->state,
[0072] (LONG)PTW32_ONCE_DONE);
[0073]
[0074] /*
[0075] * we didn't create the semaphore.
[0076] * it is only there if there is someone waiting.
[0077] */
[0078] if
(InterlockedExchangeAdd((LPLONG)&once_control->semaphore, 0L)) /* MBR fence
*/
[0079] {
[0080] ReleaseSemaphore(once_control->semaphore,
[0081] once_control->numSemaphoreUsers,
NULL);
[0082] }
[0083] }
[0084] else
[0085] {
[0086] if (1 ==
InterlockedIncrement((LPLONG)&once_control->numSemaphoreUsers))
[0087] {
[0088] sema = CreateSemaphore(NULL, 0, INT_MAX, NULL);
[0089]
[0090] if
(PTW32_INTERLOCKED_COMPARE_EXCHANGE((PTW32_INTERLOCKED_LPLONG)&once_control-
>semaphore,
[0091]
(PTW32_INTERLOCKED_LONG)sema,
[0092]
(PTW32_INTERLOCKED_LONG)0))
[0093] {
[0094] CloseHandle(sema);
[0095] }
[0096] }
[0097]
[0098] /*
[0099] * If initter was cancelled then state is CANCELLED.
[0100] * Until state is reset to INIT, all new threads will
enter the wait path.
[0101] * The woken waiter, if it exists, will also re-enter the
wait path, but
[0102] * either it or a new thread will reset state = INIT here,
continue around the Wait,
[0103] * and become the new initter. Any thread that is
suspended in the wait path before
[0104] * this point will hit this check. Any thread suspended
between this check and
[0105] * the Wait will wait on a valid semaphore, and possibly
continue through it
[0106] * if the cancellation handler has incremented (released)
it and there were
[0107] * no waiters.
[0108] */
[0109] (void)
PTW32_INTERLOCKED_COMPARE_EXCHANGE((PTW32_INTERLOCKED_LPLONG)&once_control->
state,
[0110]
(PTW32_INTERLOCKED_LONG)PTW32_ONCE_INIT,
[0111]
(PTW32_INTERLOCKED_LONG)PTW32_ONCE_CANCELLED);
[0112]
[0113] /*
[0114] * Check 'state' again in case the initting thread has
finished
[0115] * and left before seeing that there was a semaphore.
[0116] */
[0117] if (InterlockedExchangeAdd((LPLONG)&once_control->state,
0L) >= PTW32_ONCE_STARTED)
[0118] {
[0119] WaitForSingleObject(once_control->semaphore,
INFINITE);
[0120] }
[0121]
[0122] if (0 ==
InterlockedDecrement((LPLONG)&once_control->numSemaphoreUsers))
[0123] {
[0124] /* we were last */
[0125] if ((sema =
[0126] (HANDLE)
PTW32_INTERLOCKED_EXCHANGE((LPLONG)&once_control->semaphore, (LONG)0)))
[0127] {
[0128] CloseHandle(sema);
[0129] }
[0130] }
[0131] }
[0132] }
[0133]
[0134] /*
[0135] * ------------
[0136] * Failure Code
[0137] * ------------
[0138] */
[0139] FAIL0:
[0140] return (result);
[0141] } /* pthread_once */
[0142]
> -----Original Message-----
> From: Ross Johnson [mailto:ross.johnson@homemail.com.au]
> Sent: Monday, May 30, 2005 5:56 AM
> To: Vladimir Kliatchko
> Cc: Gottlob Frege; Pthreads-Win32 list
> Subject: RE: New pthread_once implementation
>
> Hi Vlad,
>
> The nice thing about your implementation using semaphores was that: even
> though you could release just one waiter on cancellation, all waiting
> threads could be released in one call to the kernel when exiting
> normally. In your MCS version, the dequeueing involves sequential calls
> to SetEvent, which could be much slower in comparison. That's my only
> concern with it. The threat of an async cancellation leaving waiters
> stranded was a concern at one point, but none of the previous
> implementations of this routine has been safe against it either.
>
> Still pondering your previous version (and not yet convinced that it's
> fatally flawed), I've tried another variation.
>
> In this variation, the cancellation handler doesn't reset state to INIT,
> but to a new state == CANCELLED so that any newly arriving threads plus
> the awoken waiter are prevented from becoming the new initter until
> state can be reset to INIT in the wait path [by one of those threads]
> when semaphore is guaranteed to be valid. I think this removes any races
> between semaphore closure and operations.
>
> [NB. in the test before the WaitForSingleObject call, the == is now >=]
>
> This variation passes repeated runs of once4.c (aggressive cancellation
> with varying priority threads hitting the once_control) on my uni-
> processor. I also went as far as adding Sleep(1); after every semicolon
> and left-curly brace to try to break it.
>
> PS. I'm also perhaps too conscious of 'spamming' the list with endless
> versions of this stubborn little routine, but this is the purpose of the
> list, so I'm not personally going to worry about it. I'm sure anyone who
> finds it irritating will filter it or something.
>
>
> #define PTHREAD_ONCE_INIT {0, 0, 0, 0}
>
> enum ptw32_once_state {
> PTW32_ONCE_INIT = 0x0,
> PTW32_ONCE_DONE = 0x1,
> PTW32_ONCE_STARTED = 0x2,
> PTW32_ONCE_CANCELLED = 0x3
> };
>
> struct pthread_once_t_
> {
> int state;
> int reserved;
> int numSemaphoreUsers;
> HANDLE semaphore;
> };
>
> static void PTW32_CDECL
> ptw32_once_init_routine_cleanup(void * arg)
> {
> pthread_once_t * once_control = (pthread_once_t *) arg;
>
> /*
> * Continue to direct new threads into the wait path until the waiter
> that we
> * release or a new thread can reset state to INIT.
> */
> (void) PTW32_INTERLOCKED_EXCHANGE((LPLONG)&once_control->state,
> (LONG)PTW32_ONCE_CANCELLED);
>
> if (InterlockedExchangeAdd((LPLONG)&once_control->semaphore, 0L)) /* MBR
> fence */
> {
> ReleaseSemaphore(once_control->semaphore, 1, NULL);
> }
> }
>
> int
> pthread_once (pthread_once_t * once_control, void (*init_routine) (void))
> {
> int result;
> int state;
> HANDLE sema;
>
> if (once_control == NULL || init_routine == NULL)
> {
> result = EINVAL;
> goto FAIL0;
> }
> else
> {
> result = 0;
> }
>
> while ((state = (int)
>
> PTW32_INTERLOCKED_COMPARE_EXCHANGE((PTW32_INTERLOCKED_LPLONG)&once_control
> ->state,
>
> (PTW32_INTERLOCKED_LONG)PTW32_ONCE_STARTED,
>
> (PTW32_INTERLOCKED_LONG)PTW32_ONCE_INIT))
> != PTW32_ONCE_DONE)
> {
> if (PTW32_ONCE_INIT == state)
> {
>
> #ifdef _MSC_VER
> #pragma inline_depth(0)
> #endif
>
> pthread_cleanup_push(ptw32_once_init_routine_cleanup, (void *)
> once_control);
> (*init_routine)();
> pthread_cleanup_pop(0);
>
> #ifdef _MSC_VER
> #pragma inline_depth()
> #endif
>
> (void) PTW32_INTERLOCKED_EXCHANGE((LPLONG)&once_control->state,
> (LONG)PTW32_ONCE_DONE);
>
> /*
> * we didn't create the semaphore.
> * it is only there if there is someone waiting.
> */
> if (InterlockedExchangeAdd((LPLONG)&once_control->semaphore,
> 0L)) /* MBR fence */
> {
> ReleaseSemaphore(once_control->semaphore,
> once_control->numSemaphoreUsers, NULL);
> }
> }
> else
> {
> if (1 == InterlockedIncrement((LPLONG)&once_control-
> >numSemaphoreUsers))
> {
> sema = CreateSemaphore(NULL, 0, INT_MAX, NULL);
>
> if
> (PTW32_INTERLOCKED_COMPARE_EXCHANGE((PTW32_INTERLOCKED_LPLONG)&once_contro
> l->semaphore,
>
(PTW32_INTERLOCKED_LONG)sema,
>
(PTW32_INTERLOCKED_LONG)0))
> {
> CloseHandle(sema);
> }
> }
>
> /*
> * If initter was cancelled then state is CANCELLED.
> * Until state is reset to INIT, all new threads will enter the
> wait path.
> * The woken waiter, if it exists, will also re-enter the wait
> path, but
> * either it or a new thread will reset state = INIT here,
> continue around the Wait,
> * and become the new initter. Any thread that is suspended in the
> wait path before
> * this point will hit this check. Any thread suspended between
> this check and
> * the Wait will wait on a valid semaphore, and possibly continue
> through it
> * if the cancellation handler has incremented (released) it and
> there were
> * no waiters.
> */
> (void)
> PTW32_INTERLOCKED_COMPARE_EXCHANGE((PTW32_INTERLOCKED_LPLONG)&once_control
> ->state,
>
> (PTW32_INTERLOCKED_LONG)PTW32_ONCE_INIT,
>
> (PTW32_INTERLOCKED_LONG)PTW32_ONCE_CANCELLED);
>
> /*
> * Check 'state' again in case the initting thread has finished
> * and left before seeing that there was a semaphore.
> */
> if (InterlockedExchangeAdd((LPLONG)&once_control->state, 0L) >=
> PTW32_ONCE_STARTED)
> {
> WaitForSingleObject(once_control->semaphore, INFINITE);
> }
>
> if (0 == InterlockedDecrement((LPLONG)&once_control-
> >numSemaphoreUsers))
> {
> /* we were last */
> if ((sema =
> (HANDLE)
PTW32_INTERLOCKED_EXCHANGE((LPLONG)&once_control-
> >semaphore, (LONG)0)))
> {
> CloseHandle(sema);
> }
> }
> }
> }
>
> /*
> * ------------
> * Failure Code
> * ------------
> */
> FAIL0:
> return (result);
> } /* pthread_once */
>