From de4d12dd5366bfddb6fde5bb050ce8fbaeb95f1a Mon Sep 17 00:00:00 2001 From: Jeffrey Altman Date: Fri, 20 Jan 2012 14:43:06 -0500 Subject: [PATCH] Windows: Stop the thundering herd The afs redirector used notification events to wake up worker threads when a task was added to a work queue. Notification events when signalled wake up all threads instead of just one. Instead, use synchronization events to wake up a single thread at a time and restructure the code to permit workers to wake up additional workers if there is additional work to be performed or during library shutdown. Thanks to Peter Scott for his assistance. Change-Id: I0fb9d8578035f606f03170622fc9c50a1dbfee3a Reviewed-on: http://gerrit.openafs.org/6595 Tested-by: BuildBot Reviewed-by: Jeffrey Altman Tested-by: Jeffrey Altman --- src/WINNT/afsrdr/kernel/fs/AFSCommSupport.cpp | 37 +++-- src/WINNT/afsrdr/kernel/fs/AFSGeneric.cpp | 4 +- src/WINNT/afsrdr/kernel/lib/AFSWorker.cpp | 156 +++++++++++++----- 3 files changed, 137 insertions(+), 60 deletions(-) diff --git a/src/WINNT/afsrdr/kernel/fs/AFSCommSupport.cpp b/src/WINNT/afsrdr/kernel/fs/AFSCommSupport.cpp index 9585be63f..0baab2223 100644 --- a/src/WINNT/afsrdr/kernel/fs/AFSCommSupport.cpp +++ b/src/WINNT/afsrdr/kernel/fs/AFSCommSupport.cpp @@ -938,12 +938,9 @@ AFSCleanupIrpPool() // // Set the event to release any waiting workers + // (everyone waits on IrpPoolHasReleaseEntries) // - KeSetEvent( &pCommSrvc->IrpPoolHasEntries, - 0, - FALSE); - KeSetEvent( &pCommSrvc->IrpPoolHasReleaseEntries, 0, FALSE); @@ -1240,6 +1237,14 @@ AFSProcessIrpRequest( IN PIRP Irp) AFSReleaseResource( &pCommSrvc->IrpPoolLock); + // + // Wake up the next worker since this is a SynchronizationEvent + // + + KeSetEvent( &pCommSrvc->IrpPoolHasReleaseEntries, + 0, + FALSE); + try_return( ntStatus = STATUS_DEVICE_NOT_READY); } @@ -1291,16 +1296,16 @@ AFSProcessIrpRequest( IN PIRP Irp) pEntry = pEntry->fLink; } - if( pCommSrvc->RequestPoolHead == NULL) + if( pEntry != NULL) { - KeClearEvent( &pCommSrvc->IrpPoolHasEntries); - } - - if( pEntry == NULL) - { + // + // There might be another release entry pending + // - KeClearEvent( &pCommSrvc->IrpPoolHasReleaseEntries); + KeSetEvent( &pCommSrvc->IrpPoolHasReleaseEntries, + 0, + FALSE); } // @@ -1326,11 +1331,13 @@ AFSProcessIrpRequest( IN PIRP Irp) pCommSrvc->RequestPoolTail = NULL; } - } - else - { + else + { - KeClearEvent( &pCommSrvc->IrpPoolHasEntries); + KeSetEvent( &pCommSrvc->IrpPoolHasEntries, + 0, + FALSE); + } } // diff --git a/src/WINNT/afsrdr/kernel/fs/AFSGeneric.cpp b/src/WINNT/afsrdr/kernel/fs/AFSGeneric.cpp index 3a50e5ca1..9150c6670 100644 --- a/src/WINNT/afsrdr/kernel/fs/AFSGeneric.cpp +++ b/src/WINNT/afsrdr/kernel/fs/AFSGeneric.cpp @@ -721,11 +721,11 @@ AFSInitializeControlDevice() // KeInitializeEvent( &pDeviceExt->Specific.Control.CommServiceCB.IrpPoolHasEntries, - NotificationEvent, + SynchronizationEvent, FALSE); KeInitializeEvent( &pDeviceExt->Specific.Control.CommServiceCB.IrpPoolHasReleaseEntries, - NotificationEvent, + SynchronizationEvent, FALSE); KeInitializeEvent( &pDeviceExt->Specific.Control.ExtentReleaseEvent, diff --git a/src/WINNT/afsrdr/kernel/lib/AFSWorker.cpp b/src/WINNT/afsrdr/kernel/lib/AFSWorker.cpp index 7507a5701..56a8e37ec 100644 --- a/src/WINNT/afsrdr/kernel/lib/AFSWorker.cpp +++ b/src/WINNT/afsrdr/kernel/lib/AFSWorker.cpp @@ -70,7 +70,7 @@ AFSInitializeWorkerPool() pDevExt->Specific.Library.WorkerCount = 0; KeInitializeEvent( &pDevExt->Specific.Library.WorkerQueueHasItems, - NotificationEvent, + SynchronizationEvent, FALSE); // @@ -152,7 +152,7 @@ AFSInitializeWorkerPool() pDevExt->Specific.Library.IOWorkerCount = 0; KeInitializeEvent( &pDevExt->Specific.Library.IOWorkerQueueHasItems, - NotificationEvent, + SynchronizationEvent, FALSE); // @@ -265,11 +265,34 @@ AFSRemoveWorkerPool() pDevExt = (AFSDeviceExt *)AFSLibraryDeviceObject->DeviceExtension; // - // Loop through the workers shutting them down + // Loop through the workers shutting them down in two stages. + // First, clear AFS_WORKER_PROCESS_REQUESTS so that workers + // stop processing requests. Second, call AFSShutdownWorkerThread() + // to wake the workers and wait for them to exit. // pCurrentWorker = pDevExt->Specific.Library.PoolHead; + while( index < pDevExt->Specific.Library.WorkerCount) + { + + ClearFlag( pCurrentWorker->State, AFS_WORKER_PROCESS_REQUESTS); + + pCurrentWorker = pCurrentWorker->fLink; + + if ( pCurrentWorker == NULL) + { + + break; + } + + index++; + } + + pCurrentWorker = pDevExt->Specific.Library.PoolHead; + + index = 0; + while( index < pDevExt->Specific.Library.WorkerCount) { @@ -295,13 +318,36 @@ AFSRemoveWorkerPool() ExDeleteResourceLite( &pDevExt->Specific.Library.QueueLock); // - // Loop through the IO workers shutting them down + // Loop through the IO workers shutting them down in two stages. + // First, clear AFS_WORKER_PROCESS_REQUESTS so that workers + // stop processing requests. Second, call AFSShutdownWorkerThread() + // to wake the workers and wait for them to exit. // pCurrentWorker = pDevExt->Specific.Library.IOPoolHead; index = 0; + while( index < pDevExt->Specific.Library.IOWorkerCount) + { + + ClearFlag( pCurrentWorker->State, AFS_WORKER_PROCESS_REQUESTS); + + pCurrentWorker = pCurrentWorker->fLink; + + if ( pCurrentWorker == NULL) + { + + break; + } + + index++; + } + + pCurrentWorker = pDevExt->Specific.Library.IOPoolHead; + + index = 0; + while( index < pDevExt->Specific.Library.IOWorkerCount) { @@ -515,7 +561,7 @@ AFSShutdownVolumeWorker( IN AFSVolumeCB *VolumeCB) // // Description: // -// This function shusdown a worker thread in the pool +// This function shutsdown a worker thread in the pool // // Return: // @@ -533,12 +579,6 @@ AFSShutdownWorkerThread( IN AFSWorkQueueContext *PoolContext) BooleanFlagOn( PoolContext->State, AFS_WORKER_INITIALIZED)) { - // - // Clear the 'keep processing' flag - // - - ClearFlag( PoolContext->State, AFS_WORKER_PROCESS_REQUESTS); - // // Wake up the thread if it is a sleep // @@ -584,12 +624,6 @@ AFSShutdownIOWorkerThread( IN AFSWorkQueueContext *PoolContext) BooleanFlagOn( PoolContext->State, AFS_WORKER_INITIALIZED)) { - // - // Clear the 'keep processing' flag - // - - ClearFlag( PoolContext->State, AFS_WORKER_PROCESS_REQUESTS); - // // Wake up the thread if it is a sleep // @@ -632,7 +666,6 @@ AFSWorkerThread( IN PVOID Context) AFSWorkQueueContext *pPoolContext = (AFSWorkQueueContext *)Context; AFSWorkItem *pWorkItem; BOOLEAN freeWorkItem = TRUE; - BOOLEAN exitThread = FALSE; AFSDeviceExt *pLibraryDevExt = NULL; LONG lCount; @@ -646,22 +679,21 @@ AFSWorkerThread( IN PVOID Context) 0, FALSE); - // // Indicate we are initialized // SetFlag( pPoolContext->State, AFS_WORKER_INITIALIZED); + ntStatus = KeWaitForSingleObject( &pLibraryDevExt->Specific.Library.WorkerQueueHasItems, + Executive, + KernelMode, + FALSE, + NULL); + while( BooleanFlagOn( pPoolContext->State, AFS_WORKER_PROCESS_REQUESTS)) { - ntStatus = KeWaitForSingleObject( &pLibraryDevExt->Specific.Library.WorkerQueueHasItems, - Executive, - KernelMode, - FALSE, - NULL); - if( !NT_SUCCESS( ntStatus)) { @@ -772,8 +804,23 @@ AFSWorkerThread( IN PVOID Context) } } } + + ntStatus = KeWaitForSingleObject( &pLibraryDevExt->Specific.Library.WorkerQueueHasItems, + Executive, + KernelMode, + FALSE, + NULL); + } // worker thread loop + ClearFlag( pPoolContext->State, AFS_WORKER_INITIALIZED); + + // Wake up another worker so they too can exit + + KeSetEvent( &pLibraryDevExt->Specific.Library.WorkerQueueHasItems, + 0, + FALSE); + PsTerminateSystemThread( 0); return; @@ -787,7 +834,6 @@ AFSIOWorkerThread( IN PVOID Context) AFSWorkQueueContext *pPoolContext = (AFSWorkQueueContext *)Context; AFSWorkItem *pWorkItem; BOOLEAN freeWorkItem = TRUE; - BOOLEAN exitThread = FALSE; AFSDeviceExt *pLibraryDevExt = NULL, *pRdrDevExt = NULL; pLibraryDevExt = (AFSDeviceExt *)AFSLibraryDeviceObject->DeviceExtension; @@ -807,15 +853,15 @@ AFSIOWorkerThread( IN PVOID Context) SetFlag( pPoolContext->State, AFS_WORKER_INITIALIZED); + ntStatus = KeWaitForSingleObject( &pLibraryDevExt->Specific.Library.IOWorkerQueueHasItems, + Executive, + KernelMode, + FALSE, + NULL); + while( BooleanFlagOn( pPoolContext->State, AFS_WORKER_PROCESS_REQUESTS)) { - ntStatus = KeWaitForSingleObject( &pLibraryDevExt->Specific.Library.IOWorkerQueueHasItems, - Executive, - KernelMode, - FALSE, - NULL); - if( !NT_SUCCESS( ntStatus)) { @@ -886,8 +932,23 @@ AFSIOWorkerThread( IN PVOID Context) } } } + + ntStatus = KeWaitForSingleObject( &pLibraryDevExt->Specific.Library.IOWorkerQueueHasItems, + Executive, + KernelMode, + FALSE, + NULL); + } // worker thread loop + ClearFlag( pPoolContext->State, AFS_WORKER_INITIALIZED); + + // Wake up another IOWorker so they too can exit + + KeSetEvent( &pLibraryDevExt->Specific.Library.IOWorkerQueueHasItems, + 0, + FALSE); + PsTerminateSystemThread( 0); return; @@ -901,7 +962,6 @@ AFSPrimaryVolumeWorkerThread( IN PVOID Context) AFSWorkQueueContext *pPoolContext = (AFSWorkQueueContext *)&AFSGlobalRoot->VolumeWorkerContext; AFSDeviceExt *pControlDeviceExt = NULL; AFSDeviceExt *pRDRDeviceExt = NULL; - BOOLEAN exitThread = FALSE; LARGE_INTEGER DueTime; LONG TimeOut; KTIMER Timer; @@ -1861,14 +1921,19 @@ AFSRemoveWorkItem() { pDevExt->Specific.Library.QueueTail = NULL; + } + else + { - KeResetEvent(&(pDevExt->Specific.Library.WorkerQueueHasItems)); + // + // Wake up another worker + // + + KeSetEvent( &(pDevExt->Specific.Library.WorkerQueueHasItems), + 0, + FALSE); } } - else - { - KeResetEvent(&(pDevExt->Specific.Library.WorkerQueueHasItems)); - } AFSReleaseResource( &pDevExt->Specific.Library.QueueLock); @@ -1915,14 +1980,19 @@ AFSRemoveIOWorkItem() { pDevExt->Specific.Library.IOQueueTail = NULL; + } + else + { - KeResetEvent(&(pDevExt->Specific.Library.IOWorkerQueueHasItems)); + // + // Wake up another worker + // + + KeSetEvent( &(pDevExt->Specific.Library.IOWorkerQueueHasItems), + 0, + FALSE); } } - else - { - KeResetEvent(&(pDevExt->Specific.Library.IOWorkerQueueHasItems)); - } AFSReleaseResource( &pDevExt->Specific.Library.IOQueueLock); -- 2.39.5