]> git.michaelhowe.org Git - packages/o/openafs.git/commitdiff
Windows: Stop the thundering herd
authorJeffrey Altman <jaltman@your-file-system.com>
Fri, 20 Jan 2012 19:43:06 +0000 (14:43 -0500)
committerJeffrey Altman <jaltman@secure-endpoints.com>
Fri, 27 Jan 2012 00:11:41 +0000 (16:11 -0800)
The afs redirector used notification events to wake up worker
threads when a task was added to a work queue.  Notification
events when signalled wake up all threads instead of just one.

Instead, use synchronization events to wake up a single thread at
a time and restructure the code to permit workers to wake up
additional workers if there is additional work to be performed
or during library shutdown.

Thanks to Peter Scott for his assistance.

Change-Id: I0fb9d8578035f606f03170622fc9c50a1dbfee3a
Reviewed-on: http://gerrit.openafs.org/6595
Tested-by: BuildBot <buildbot@rampaginggeek.com>
Reviewed-by: Jeffrey Altman <jaltman@secure-endpoints.com>
Tested-by: Jeffrey Altman <jaltman@secure-endpoints.com>
src/WINNT/afsrdr/kernel/fs/AFSCommSupport.cpp
src/WINNT/afsrdr/kernel/fs/AFSGeneric.cpp
src/WINNT/afsrdr/kernel/lib/AFSWorker.cpp

index 9585be63f089b2044269c69768df5b1bdfd2664f..0baab2223781ca37d820e2e8997791c47af099ef 100644 (file)
@@ -938,12 +938,9 @@ AFSCleanupIrpPool()
 
         //
         // Set the event to release any waiting workers
+        // (everyone waits on IrpPoolHasReleaseEntries)
         //
 
-        KeSetEvent( &pCommSrvc->IrpPoolHasEntries,
-                    0,
-                    FALSE);
-
         KeSetEvent( &pCommSrvc->IrpPoolHasReleaseEntries,
                     0,
                     FALSE);
@@ -1240,6 +1237,14 @@ AFSProcessIrpRequest( IN PIRP Irp)
 
                 AFSReleaseResource( &pCommSrvc->IrpPoolLock);
 
+                //
+                // Wake up the next worker since this is a SynchronizationEvent
+                //
+
+                KeSetEvent( &pCommSrvc->IrpPoolHasReleaseEntries,
+                            0,
+                            FALSE);
+
                 try_return( ntStatus = STATUS_DEVICE_NOT_READY);
             }
 
@@ -1291,16 +1296,16 @@ AFSProcessIrpRequest( IN PIRP Irp)
                     pEntry = pEntry->fLink;
                 }
 
-                if( pCommSrvc->RequestPoolHead == NULL)
+                if( pEntry != NULL)
                 {
 
-                    KeClearEvent( &pCommSrvc->IrpPoolHasEntries);
-                }
-
-                if( pEntry == NULL)
-                {
+                    //
+                    // There might be another release entry pending
+                    //
 
-                    KeClearEvent( &pCommSrvc->IrpPoolHasReleaseEntries);
+                    KeSetEvent( &pCommSrvc->IrpPoolHasReleaseEntries,
+                                0,
+                                FALSE);
                 }
 
                 //
@@ -1326,11 +1331,13 @@ AFSProcessIrpRequest( IN PIRP Irp)
 
                         pCommSrvc->RequestPoolTail = NULL;
                     }
-                }
-                else
-                {
+                    else
+                    {
 
-                    KeClearEvent( &pCommSrvc->IrpPoolHasEntries);
+                        KeSetEvent( &pCommSrvc->IrpPoolHasEntries,
+                                    0,
+                                    FALSE);
+                    }
                 }
 
                 //
index 3a50e5ca18823ee5f28a436a98de3bbd8da3ac30..9150c66708df15716f10ca6ed5d892ba87e8fbed 100644 (file)
@@ -721,11 +721,11 @@ AFSInitializeControlDevice()
         //
 
         KeInitializeEvent( &pDeviceExt->Specific.Control.CommServiceCB.IrpPoolHasEntries,
-                           NotificationEvent,
+                           SynchronizationEvent,
                            FALSE);
 
         KeInitializeEvent( &pDeviceExt->Specific.Control.CommServiceCB.IrpPoolHasReleaseEntries,
-                           NotificationEvent,
+                           SynchronizationEvent,
                            FALSE);
 
         KeInitializeEvent( &pDeviceExt->Specific.Control.ExtentReleaseEvent,
index 7507a57012f72c867b199e05bea65af3aadfeebd..56a8e37ec38d73316865c30464f964195f7cf627 100644 (file)
@@ -70,7 +70,7 @@ AFSInitializeWorkerPool()
         pDevExt->Specific.Library.WorkerCount = 0;
 
         KeInitializeEvent( &pDevExt->Specific.Library.WorkerQueueHasItems,
-                           NotificationEvent,
+                           SynchronizationEvent,
                            FALSE);
 
         //
@@ -152,7 +152,7 @@ AFSInitializeWorkerPool()
         pDevExt->Specific.Library.IOWorkerCount = 0;
 
         KeInitializeEvent( &pDevExt->Specific.Library.IOWorkerQueueHasItems,
-                           NotificationEvent,
+                           SynchronizationEvent,
                            FALSE);
 
         //
@@ -265,11 +265,34 @@ AFSRemoveWorkerPool()
     pDevExt = (AFSDeviceExt *)AFSLibraryDeviceObject->DeviceExtension;
 
     //
-    // Loop through the workers shutting them down
+    // Loop through the workers shutting them down in two stages.
+    // First, clear AFS_WORKER_PROCESS_REQUESTS so that workers
+    // stop processing requests.  Second, call AFSShutdownWorkerThread()
+    // to wake the workers and wait for them to exit.
     //
 
     pCurrentWorker = pDevExt->Specific.Library.PoolHead;
 
+    while( index < pDevExt->Specific.Library.WorkerCount)
+    {
+
+        ClearFlag( pCurrentWorker->State, AFS_WORKER_PROCESS_REQUESTS);
+
+        pCurrentWorker = pCurrentWorker->fLink;
+
+        if ( pCurrentWorker == NULL)
+        {
+
+            break;
+        }
+
+        index++;
+    }
+
+    pCurrentWorker = pDevExt->Specific.Library.PoolHead;
+
+    index = 0;
+
     while( index < pDevExt->Specific.Library.WorkerCount)
     {
 
@@ -295,13 +318,36 @@ AFSRemoveWorkerPool()
     ExDeleteResourceLite( &pDevExt->Specific.Library.QueueLock);
 
     //
-    // Loop through the IO workers shutting them down
+    // Loop through the IO workers shutting them down in two stages.
+    // First, clear AFS_WORKER_PROCESS_REQUESTS so that workers
+    // stop processing requests.  Second, call AFSShutdownWorkerThread()
+    // to wake the workers and wait for them to exit.
     //
 
     pCurrentWorker = pDevExt->Specific.Library.IOPoolHead;
 
     index = 0;
 
+    while( index < pDevExt->Specific.Library.IOWorkerCount)
+    {
+
+        ClearFlag( pCurrentWorker->State, AFS_WORKER_PROCESS_REQUESTS);
+
+        pCurrentWorker = pCurrentWorker->fLink;
+
+        if ( pCurrentWorker == NULL)
+        {
+
+            break;
+        }
+
+        index++;
+    }
+
+    pCurrentWorker = pDevExt->Specific.Library.IOPoolHead;
+
+    index = 0;
+
     while( index < pDevExt->Specific.Library.IOWorkerCount)
     {
 
@@ -515,7 +561,7 @@ AFSShutdownVolumeWorker( IN AFSVolumeCB *VolumeCB)
 //
 // Description:
 //
-//      This function shusdown a worker thread in the pool
+//      This function shutsdown a worker thread in the pool
 //
 // Return:
 //
@@ -533,12 +579,6 @@ AFSShutdownWorkerThread( IN AFSWorkQueueContext *PoolContext)
         BooleanFlagOn( PoolContext->State, AFS_WORKER_INITIALIZED))
     {
 
-        //
-        // Clear the 'keep processing' flag
-        //
-
-        ClearFlag( PoolContext->State, AFS_WORKER_PROCESS_REQUESTS);
-
         //
         // Wake up the thread if it is a sleep
         //
@@ -584,12 +624,6 @@ AFSShutdownIOWorkerThread( IN AFSWorkQueueContext *PoolContext)
         BooleanFlagOn( PoolContext->State, AFS_WORKER_INITIALIZED))
     {
 
-        //
-        // Clear the 'keep processing' flag
-        //
-
-        ClearFlag( PoolContext->State, AFS_WORKER_PROCESS_REQUESTS);
-
         //
         // Wake up the thread if it is a sleep
         //
@@ -632,7 +666,6 @@ AFSWorkerThread( IN PVOID Context)
     AFSWorkQueueContext *pPoolContext = (AFSWorkQueueContext *)Context;
     AFSWorkItem *pWorkItem;
     BOOLEAN freeWorkItem = TRUE;
-    BOOLEAN exitThread = FALSE;
     AFSDeviceExt *pLibraryDevExt = NULL;
     LONG lCount;
 
@@ -646,22 +679,21 @@ AFSWorkerThread( IN PVOID Context)
                 0,
                 FALSE);
 
-
     //
     // Indicate we are initialized
     //
 
     SetFlag( pPoolContext->State, AFS_WORKER_INITIALIZED);
 
+    ntStatus = KeWaitForSingleObject( &pLibraryDevExt->Specific.Library.WorkerQueueHasItems,
+                                      Executive,
+                                      KernelMode,
+                                      FALSE,
+                                      NULL);
+
     while( BooleanFlagOn( pPoolContext->State, AFS_WORKER_PROCESS_REQUESTS))
     {
 
-        ntStatus = KeWaitForSingleObject( &pLibraryDevExt->Specific.Library.WorkerQueueHasItems,
-                                          Executive,
-                                          KernelMode,
-                                          FALSE,
-                                          NULL);
-
         if( !NT_SUCCESS( ntStatus))
         {
 
@@ -772,8 +804,23 @@ AFSWorkerThread( IN PVOID Context)
                 }
             }
         }
+
+        ntStatus = KeWaitForSingleObject( &pLibraryDevExt->Specific.Library.WorkerQueueHasItems,
+                                          Executive,
+                                          KernelMode,
+                                          FALSE,
+                                          NULL);
+
     } // worker thread loop
 
+    ClearFlag( pPoolContext->State, AFS_WORKER_INITIALIZED);
+
+    // Wake up another worker so they too can exit
+
+    KeSetEvent( &pLibraryDevExt->Specific.Library.WorkerQueueHasItems,
+                0,
+                FALSE);
+
     PsTerminateSystemThread( 0);
 
     return;
@@ -787,7 +834,6 @@ AFSIOWorkerThread( IN PVOID Context)
     AFSWorkQueueContext *pPoolContext = (AFSWorkQueueContext *)Context;
     AFSWorkItem *pWorkItem;
     BOOLEAN freeWorkItem = TRUE;
-    BOOLEAN exitThread = FALSE;
     AFSDeviceExt *pLibraryDevExt = NULL, *pRdrDevExt = NULL;
 
     pLibraryDevExt = (AFSDeviceExt *)AFSLibraryDeviceObject->DeviceExtension;
@@ -807,15 +853,15 @@ AFSIOWorkerThread( IN PVOID Context)
 
     SetFlag( pPoolContext->State, AFS_WORKER_INITIALIZED);
 
+    ntStatus = KeWaitForSingleObject( &pLibraryDevExt->Specific.Library.IOWorkerQueueHasItems,
+                                      Executive,
+                                      KernelMode,
+                                      FALSE,
+                                      NULL);
+
     while( BooleanFlagOn( pPoolContext->State, AFS_WORKER_PROCESS_REQUESTS))
     {
 
-        ntStatus = KeWaitForSingleObject( &pLibraryDevExt->Specific.Library.IOWorkerQueueHasItems,
-                                          Executive,
-                                          KernelMode,
-                                          FALSE,
-                                          NULL);
-
         if( !NT_SUCCESS( ntStatus))
         {
 
@@ -886,8 +932,23 @@ AFSIOWorkerThread( IN PVOID Context)
                 }
             }
         }
+
+        ntStatus = KeWaitForSingleObject( &pLibraryDevExt->Specific.Library.IOWorkerQueueHasItems,
+                                          Executive,
+                                          KernelMode,
+                                          FALSE,
+                                          NULL);
+
     } // worker thread loop
 
+    ClearFlag( pPoolContext->State, AFS_WORKER_INITIALIZED);
+
+    // Wake up another IOWorker so they too can exit
+
+    KeSetEvent( &pLibraryDevExt->Specific.Library.IOWorkerQueueHasItems,
+                0,
+                FALSE);
+
     PsTerminateSystemThread( 0);
 
     return;
@@ -901,7 +962,6 @@ AFSPrimaryVolumeWorkerThread( IN PVOID Context)
     AFSWorkQueueContext *pPoolContext = (AFSWorkQueueContext *)&AFSGlobalRoot->VolumeWorkerContext;
     AFSDeviceExt *pControlDeviceExt = NULL;
     AFSDeviceExt *pRDRDeviceExt = NULL;
-    BOOLEAN exitThread = FALSE;
     LARGE_INTEGER DueTime;
     LONG TimeOut;
     KTIMER Timer;
@@ -1861,14 +1921,19 @@ AFSRemoveWorkItem()
         {
 
             pDevExt->Specific.Library.QueueTail = NULL;
+        }
+        else
+        {
 
-            KeResetEvent(&(pDevExt->Specific.Library.WorkerQueueHasItems));
+            //
+            // Wake up another worker
+            //
+
+            KeSetEvent( &(pDevExt->Specific.Library.WorkerQueueHasItems),
+                        0,
+                        FALSE);
         }
     }
-    else
-    {
-        KeResetEvent(&(pDevExt->Specific.Library.WorkerQueueHasItems));
-    }
 
     AFSReleaseResource( &pDevExt->Specific.Library.QueueLock);
 
@@ -1915,14 +1980,19 @@ AFSRemoveIOWorkItem()
         {
 
             pDevExt->Specific.Library.IOQueueTail = NULL;
+        }
+        else
+        {
 
-            KeResetEvent(&(pDevExt->Specific.Library.IOWorkerQueueHasItems));
+            //
+            // Wake up another worker
+            //
+
+            KeSetEvent( &(pDevExt->Specific.Library.IOWorkerQueueHasItems),
+                        0,
+                        FALSE);
         }
     }
-    else
-    {
-        KeResetEvent(&(pDevExt->Specific.Library.IOWorkerQueueHasItems));
-    }
 
     AFSReleaseResource( &pDevExt->Specific.Library.IOQueueLock);