This is the mail archive of the pthreads-win32@sources.redhat.com mailing list for the pthreas-win32 project.


Index Nav: [Date Index] [Subject Index] [Author Index] [Thread Index]
Message Nav: [Date Prev] [Date Next] [Thread Prev] [Thread Next]
Other format: [Raw text]

RE: New pthread_once implementation


T1 comes in, proceeds to [0064]
T2 comes in, proceeds to [0119]
T1 is cancelled
T3 comes in, loops around, resets the state, proceeds to [0064]
T2 wakes up, proceeds to right before [0125]
T4 comes in and proceeds to right before [0119]
T3 proceeds to right before [0080]
T2,T3,T4 are ready to race for CloseHandle, ReleaseSemaphore, and
WaitForSingleObject respectively

Regaring MCS version:
It can be quite expensing, but only when multiple threads call pthread_once
simultaneously. Also, the overhead is proportional to the number of threads
so that:
1 thread - no overhead
2 simultaneous threads - the same overhead as in semaphore based version
3 or more simultaneous threads - higher overhead - but is this an important
case?

[0001]  #define PTHREAD_ONCE_INIT       {0, 0, 0, 0}
[0002]  
[0003]  enum ptw32_once_state {
[0004]    PTW32_ONCE_INIT      = 0x0,
[0005]    PTW32_ONCE_DONE      = 0x1,
[0006]    PTW32_ONCE_STARTED   = 0x2,
[0007]    PTW32_ONCE_CANCELLED = 0x3
[0008]  };
[0009]  
[0010]  struct pthread_once_t_
[0011]  {
[0012]    int          state;
[0013]    int          reserved;
[0014]    int          numSemaphoreUsers;
[0015]    HANDLE       semaphore;
[0016]  };
[0017]  
[0018]  static void PTW32_CDECL
[0019]  ptw32_once_init_routine_cleanup(void * arg) {
[0020]    pthread_once_t * once_control = (pthread_once_t *) arg;
[0021]  
[0022]    /*
[0023]     * Continue to direct new threads into the wait path until the
waiter that we
[0024]     * release or a new thread can reset state to INIT.
[0025]     */
[0026]    (void) PTW32_INTERLOCKED_EXCHANGE((LPLONG)&once_control->state,
(LONG)PTW32_ONCE_CANCELLED);
[0027]  
[0028]    if (InterlockedExchangeAdd((LPLONG)&once_control->semaphore, 0L))
/* MBR fence */
[0029]      {
[0030]        ReleaseSemaphore(once_control->semaphore, 1, NULL);
[0031]      }
[0032]  }
[0033]  
[0034]  int
[0035]  pthread_once (pthread_once_t * once_control, void (*init_routine)
(void)) {
[0036]    int result;
[0037]    int state;
[0038]    HANDLE sema;
[0039]  
[0040]    if (once_control == NULL || init_routine == NULL)
[0041]      {
[0042]        result = EINVAL;
[0043]        goto FAIL0;
[0044]      }
[0045]    else
[0046]      {
[0047]        result = 0;
[0048]      }
[0049]  
[0050]    while ((state = (int)
[0051]
PTW32_INTERLOCKED_COMPARE_EXCHANGE((PTW32_INTERLOCKED_LPLONG)&once_control->
state,
[0052]
(PTW32_INTERLOCKED_LONG)PTW32_ONCE_STARTED,
[0053]
(PTW32_INTERLOCKED_LONG)PTW32_ONCE_INIT))
[0054]  	 != PTW32_ONCE_DONE)
[0055]      {
[0056]        if (PTW32_ONCE_INIT == state)
[0057]          {
[0058]  
[0059]  #ifdef _MSC_VER
[0060]  #pragma inline_depth(0)
[0061]  #endif
[0062]  
[0063]            pthread_cleanup_push(ptw32_once_init_routine_cleanup,
(void *) once_control);
[0064]            (*init_routine)();
[0065]            pthread_cleanup_pop(0);
[0066]  
[0067]  #ifdef _MSC_VER
[0068]  #pragma inline_depth()
[0069]  #endif
[0070]  
[0071]            (void)
PTW32_INTERLOCKED_EXCHANGE((LPLONG)&once_control->state,
[0072]                                              (LONG)PTW32_ONCE_DONE);
[0073]  
[0074]            /*
[0075]             * we didn't create the semaphore.
[0076]             * it is only there if there is someone waiting.
[0077]             */
[0078]            if
(InterlockedExchangeAdd((LPLONG)&once_control->semaphore, 0L)) /* MBR fence
*/
[0079]              {
[0080]                ReleaseSemaphore(once_control->semaphore,
[0081]                                 once_control->numSemaphoreUsers,
NULL);
[0082]              }
[0083]          }
[0084]        else
[0085]          {
[0086]            if (1 ==
InterlockedIncrement((LPLONG)&once_control->numSemaphoreUsers))
[0087]              {
[0088]                sema = CreateSemaphore(NULL, 0, INT_MAX, NULL);
[0089]  
[0090]                if
(PTW32_INTERLOCKED_COMPARE_EXCHANGE((PTW32_INTERLOCKED_LPLONG)&once_control-
>semaphore,
[0091]
(PTW32_INTERLOCKED_LONG)sema,
[0092]
(PTW32_INTERLOCKED_LONG)0))
[0093]                  {
[0094]                    CloseHandle(sema);
[0095]                  }
[0096]              }
[0097]  
[0098]  	  /*
[0099]  	   * If initter was cancelled then state is CANCELLED.
[0100]  	   * Until state is reset to INIT, all new threads will
enter the wait path.
[0101]  	   * The woken waiter, if it exists, will also re-enter the
wait path, but
[0102]  	   * either it or a new thread will reset state = INIT here,
continue around the Wait,
[0103]  	   * and become the new initter. Any thread that is
suspended in the wait path before
[0104]  	   * this point will hit this check. Any thread suspended
between this check and
[0105]  	   * the Wait will wait on a valid semaphore, and possibly
continue through it
[0106]  	   * if the cancellation handler has incremented (released)
it and there were
[0107]  	   * no waiters.
[0108]  	   */
[0109]  	  (void)
PTW32_INTERLOCKED_COMPARE_EXCHANGE((PTW32_INTERLOCKED_LPLONG)&once_control->
state,
[0110]
(PTW32_INTERLOCKED_LONG)PTW32_ONCE_INIT,
[0111]
(PTW32_INTERLOCKED_LONG)PTW32_ONCE_CANCELLED);
[0112]  
[0113]  	  /*
[0114]  	   * Check 'state' again in case the initting thread has
finished
[0115]  	   * and left before seeing that there was a semaphore.
[0116]  	   */
[0117]            if (InterlockedExchangeAdd((LPLONG)&once_control->state,
0L) >= PTW32_ONCE_STARTED)
[0118]              {
[0119]                WaitForSingleObject(once_control->semaphore,
INFINITE);
[0120]              }
[0121]  
[0122]            if (0 ==
InterlockedDecrement((LPLONG)&once_control->numSemaphoreUsers))
[0123]              {
[0124]                /* we were last */
[0125]                if ((sema =
[0126]  		   (HANDLE)
PTW32_INTERLOCKED_EXCHANGE((LPLONG)&once_control->semaphore, (LONG)0)))
[0127]                  {
[0128]                    CloseHandle(sema);
[0129]                  }
[0130]              }
[0131]          }
[0132]      }
[0133]  
[0134]    /*
[0135]     * ------------
[0136]     * Failure Code
[0137]     * ------------
[0138]     */
[0139]  FAIL0:
[0140]    return (result);
[0141]  }                               /* pthread_once */
[0142]  

> -----Original Message-----
> From: Ross Johnson [mailto:ross.johnson@homemail.com.au]
> Sent: Monday, May 30, 2005 5:56 AM
> To: Vladimir Kliatchko
> Cc: Gottlob Frege; Pthreads-Win32 list
> Subject: RE: New pthread_once implementation
> 
> Hi Vlad,
> 
> The nice thing about your implementation using semaphores was that: even
> though you could release just one waiter on cancellation, all waiting
> threads could be released in one call to the kernel when exiting
> normally. In your MCS version, the dequeueing involves sequential calls
> to SetEvent, which could be much slower in comparison. That's my only
> concern with it. The threat of an async cancellation leaving waiters
> stranded was a concern at one point, but none of the previous
> implementations of this routine has been safe against it either.
> 
> Still pondering your previous version (and not yet convinced that it's
> fatally flawed), I've tried another variation.
> 
> In this variation, the cancellation handler doesn't reset state to INIT,
> but to a new state == CANCELLED so that any newly arriving threads plus
> the awoken waiter are prevented from becoming the new initter until
> state can be reset to INIT in the wait path [by one of those threads]
> when semaphore is guaranteed to be valid. I think this removes any races
> between semaphore closure and operations.
> 
> [NB. in the test before the WaitForSingleObject call, the == is now >=]
> 
> This variation passes repeated runs of once4.c (aggressive cancellation
> with varying priority threads hitting the once_control) on my uni-
> processor. I also went as far as adding Sleep(1); after every semicolon
> and left-curly brace to try to break it.
> 
> PS. I'm also perhaps too conscious of 'spamming' the list with endless
> versions of this stubborn little routine, but this is the purpose of the
> list, so I'm not personally going to worry about it. I'm sure anyone who
> finds it irritating will filter it or something.
> 
> 
> #define PTHREAD_ONCE_INIT       {0, 0, 0, 0}
> 
> enum ptw32_once_state {
>   PTW32_ONCE_INIT      = 0x0,
>   PTW32_ONCE_DONE      = 0x1,
>   PTW32_ONCE_STARTED   = 0x2,
>   PTW32_ONCE_CANCELLED = 0x3
> };
> 
> struct pthread_once_t_
> {
>   int          state;
>   int          reserved;
>   int          numSemaphoreUsers;
>   HANDLE       semaphore;
> };
> 
> static void PTW32_CDECL
> ptw32_once_init_routine_cleanup(void * arg)
> {
>   pthread_once_t * once_control = (pthread_once_t *) arg;
> 
>   /*
>    * Continue to direct new threads into the wait path until the waiter
> that we
>    * release or a new thread can reset state to INIT.
>    */
>   (void) PTW32_INTERLOCKED_EXCHANGE((LPLONG)&once_control->state,
> (LONG)PTW32_ONCE_CANCELLED);
> 
>   if (InterlockedExchangeAdd((LPLONG)&once_control->semaphore, 0L)) /* MBR
> fence */
>     {
>       ReleaseSemaphore(once_control->semaphore, 1, NULL);
>     }
> }
> 
> int
> pthread_once (pthread_once_t * once_control, void (*init_routine) (void))
> {
>   int result;
>   int state;
>   HANDLE sema;
> 
>   if (once_control == NULL || init_routine == NULL)
>     {
>       result = EINVAL;
>       goto FAIL0;
>     }
>   else
>     {
>       result = 0;
>     }
> 
>   while ((state = (int)
> 
> PTW32_INTERLOCKED_COMPARE_EXCHANGE((PTW32_INTERLOCKED_LPLONG)&once_control
> ->state,
> 
> (PTW32_INTERLOCKED_LONG)PTW32_ONCE_STARTED,
> 
> (PTW32_INTERLOCKED_LONG)PTW32_ONCE_INIT))
> 	 != PTW32_ONCE_DONE)
>     {
>       if (PTW32_ONCE_INIT == state)
>         {
> 
> #ifdef _MSC_VER
> #pragma inline_depth(0)
> #endif
> 
>           pthread_cleanup_push(ptw32_once_init_routine_cleanup, (void *)
> once_control);
>           (*init_routine)();
>           pthread_cleanup_pop(0);
> 
> #ifdef _MSC_VER
> #pragma inline_depth()
> #endif
> 
>           (void) PTW32_INTERLOCKED_EXCHANGE((LPLONG)&once_control->state,
>                                             (LONG)PTW32_ONCE_DONE);
> 
>           /*
>            * we didn't create the semaphore.
>            * it is only there if there is someone waiting.
>            */
>           if (InterlockedExchangeAdd((LPLONG)&once_control->semaphore,
> 0L)) /* MBR fence */
>             {
>               ReleaseSemaphore(once_control->semaphore,
>                                once_control->numSemaphoreUsers, NULL);
>             }
>         }
>       else
>         {
>           if (1 == InterlockedIncrement((LPLONG)&once_control-
> >numSemaphoreUsers))
>             {
>               sema = CreateSemaphore(NULL, 0, INT_MAX, NULL);
> 
>               if
> (PTW32_INTERLOCKED_COMPARE_EXCHANGE((PTW32_INTERLOCKED_LPLONG)&once_contro
> l->semaphore,
>
(PTW32_INTERLOCKED_LONG)sema,
>
(PTW32_INTERLOCKED_LONG)0))
>                 {
>                   CloseHandle(sema);
>                 }
>             }
> 
> 	  /*
> 	   * If initter was cancelled then state is CANCELLED.
> 	   * Until state is reset to INIT, all new threads will enter the
> wait path.
> 	   * The woken waiter, if it exists, will also re-enter the wait
> path, but
> 	   * either it or a new thread will reset state = INIT here,
> continue around the Wait,
> 	   * and become the new initter. Any thread that is suspended in the
> wait path before
> 	   * this point will hit this check. Any thread suspended between
> this check and
> 	   * the Wait will wait on a valid semaphore, and possibly continue
> through it
> 	   * if the cancellation handler has incremented (released) it and
> there were
> 	   * no waiters.
> 	   */
> 	  (void)
> PTW32_INTERLOCKED_COMPARE_EXCHANGE((PTW32_INTERLOCKED_LPLONG)&once_control
> ->state,
> 
> (PTW32_INTERLOCKED_LONG)PTW32_ONCE_INIT,
> 
> (PTW32_INTERLOCKED_LONG)PTW32_ONCE_CANCELLED);
> 
> 	  /*
> 	   * Check 'state' again in case the initting thread has finished
> 	   * and left before seeing that there was a semaphore.
> 	   */
>           if (InterlockedExchangeAdd((LPLONG)&once_control->state, 0L) >=
> PTW32_ONCE_STARTED)
>             {
>               WaitForSingleObject(once_control->semaphore, INFINITE);
>             }
> 
>           if (0 == InterlockedDecrement((LPLONG)&once_control-
> >numSemaphoreUsers))
>             {
>               /* we were last */
>               if ((sema =
> 		   (HANDLE)
PTW32_INTERLOCKED_EXCHANGE((LPLONG)&once_control-
> >semaphore, (LONG)0)))
>                 {
>                   CloseHandle(sema);
>                 }
>             }
>         }
>     }
> 
>   /*
>    * ------------
>    * Failure Code
>    * ------------
>    */
> FAIL0:
>   return (result);
> }                               /* pthread_once */
> 



Index Nav: [Date Index] [Subject Index] [Author Index] [Thread Index]
Message Nav: [Date Prev] [Date Next] [Thread Prev] [Thread Next]