From 201c954a36fe6ec19f96e4c8e24e6e080c3ba55a Mon Sep 17 00:00:00 2001 From: Jeffrey Altman Date: Thu, 19 Jan 2012 01:21:02 -0500 Subject: [PATCH] Windows: Redesign daemon thread queue management The daemon thread worker pool has some very poor properties. The threads spend a significant amount of time polling for ready to process tasks because so frequently a store/fetch data request is accompanied by many other requests for the same FID that would block. Lets try a new approach. Create one queue for each worker thread and assign the tasks to a thread by a hash of the FID. This ensures that all tasks for a single FID are serialized and prevents multiple threads from attempting to perform the same task only to decide that the thread would be forced to block. Change-Id: I1d00ba0df1aa646e05b2cb3cb0796629f2e6d233 Reviewed-on: http://gerrit.openafs.org/6575 Tested-by: BuildBot Reviewed-by: Jeffrey Altman Tested-by: Jeffrey Altman --- src/WINNT/afsd/cm_callback.c | 2 - src/WINNT/afsd/cm_daemon.c | 97 +++++++++++++++++++----------------- src/WINNT/afsd/cm_daemon.h | 5 +- 3 files changed, 53 insertions(+), 51 deletions(-) diff --git a/src/WINNT/afsd/cm_callback.c b/src/WINNT/afsd/cm_callback.c index 1fe0ade4e..707c18c48 100644 --- a/src/WINNT/afsd/cm_callback.c +++ b/src/WINNT/afsd/cm_callback.c @@ -523,7 +523,6 @@ extern osi_rwlock_t cm_aclLock; extern osi_rwlock_t buf_globalLock; extern osi_rwlock_t cm_cellLock; extern osi_rwlock_t cm_connLock; -extern osi_rwlock_t cm_daemonLock; extern osi_rwlock_t cm_dnlcLock; extern osi_rwlock_t cm_scacheLock; extern osi_rwlock_t cm_serverLock; @@ -558,7 +557,6 @@ static struct _ltable { {"cm_connLock", (char*)&cm_connLock, LOCKTYPE_RW}, {"cm_userLock", (char*)&cm_userLock, LOCKTYPE_RW}, {"cm_volumeLock", (char*)&cm_volumeLock, LOCKTYPE_RW}, - {"cm_daemonLock", (char*)&cm_daemonLock, LOCKTYPE_RW}, {"cm_dnlcLock", (char*)&cm_dnlcLock, LOCKTYPE_RW}, {"cm_utilsLock", (char*)&cm_utilsLock, LOCKTYPE_RW}, {"smb_globalLock", (char*)&smb_globalLock, LOCKTYPE_RW}, diff --git a/src/WINNT/afsd/cm_daemon.c b/src/WINNT/afsd/cm_daemon.c index 9afe72525..06e9a18de 100644 --- a/src/WINNT/afsd/cm_daemon.c +++ b/src/WINNT/afsd/cm_daemon.c @@ -42,18 +42,14 @@ long cm_daemonPerformanceTuningInterval = 0; long cm_daemonRankServerInterval = 600; long cm_daemonRDRShakeExtentsInterval = 0; -osi_rwlock_t cm_daemonLock; - -long cm_bkgQueueCount; /* # of queued requests */ - -int cm_bkgWaitingForCount; /* true if someone's waiting for cm_bkgQueueCount to drop */ - -cm_bkgRequest_t *cm_bkgListp; /* first elt in the list of requests */ -cm_bkgRequest_t *cm_bkgListEndp; /* last elt in the list of requests */ +osi_rwlock_t *cm_daemonLockp; +afs_uint64 *cm_bkgQueueCountp; /* # of queued requests */ +cm_bkgRequest_t **cm_bkgListpp; /* first elt in the list of requests */ +cm_bkgRequest_t **cm_bkgListEndpp; /* last elt in the list of requests */ extern int powerStateSuspended; int daemon_ShutdownFlag = 0; -static int cm_nDaemons = 0; +int cm_nDaemons = 0; static time_t lastIPAddrChange = 0; static EVENT_HANDLE cm_Daemon_ShutdownEvent = NULL; @@ -148,7 +144,7 @@ void * cm_BkgDaemon(void * vparm) rx_StartClientThread(); - lock_ObtainWrite(&cm_daemonLock); + lock_ObtainWrite(&cm_daemonLockp[daemonID]); while (daemon_ShutdownFlag == 0) { int willBlock = 0; @@ -156,14 +152,14 @@ void * cm_BkgDaemon(void * vparm) Sleep(1000); continue; } - if (!cm_bkgListEndp) { - osi_SleepW((LONG_PTR)&cm_bkgListp, &cm_daemonLock); - lock_ObtainWrite(&cm_daemonLock); + if (!cm_bkgListEndpp[daemonID]) { + osi_SleepW((LONG_PTR)&cm_bkgListpp[daemonID], &cm_daemonLockp[daemonID]); + lock_ObtainWrite(&cm_daemonLockp[daemonID]); continue; } /* we found a request */ - for (rp = cm_bkgListEndp; rp; rp = (cm_bkgRequest_t *) osi_QPrev(&rp->q)) + for (rp = cm_bkgListEndpp[daemonID]; rp; rp = (cm_bkgRequest_t *) osi_QPrev(&rp->q)) { if (rp->scp->flags & CM_SCACHEFLAG_DELETED) break; @@ -192,28 +188,31 @@ void * cm_BkgDaemon(void * vparm) * This polling cycle needs to be replaced with a proper * producer/consumer dynamic worker pool. */ - lock_ReleaseWrite(&cm_daemonLock); - Sleep(willBlock ? 25 : 1000); - lock_ObtainWrite(&cm_daemonLock); + osi_Log2(afsd_logp,"cm_BkgDaemon[%u] sleeping %dms all tasks would block", + daemonID, willBlock ? 100 : 1000); + + lock_ReleaseWrite(&cm_daemonLockp[daemonID]); + Sleep(willBlock ? 100 : 1000); + lock_ObtainWrite(&cm_daemonLockp[daemonID]); continue; } - osi_QRemoveHT((osi_queue_t **) &cm_bkgListp, (osi_queue_t **) &cm_bkgListEndp, &rp->q); - osi_assertx(cm_bkgQueueCount-- > 0, "cm_bkgQueueCount 0"); - lock_ReleaseWrite(&cm_daemonLock); + osi_QRemoveHT((osi_queue_t **) &cm_bkgListpp[daemonID], (osi_queue_t **) &cm_bkgListEndpp[daemonID], &rp->q); + osi_assertx(cm_bkgQueueCountp[daemonID]-- > 0, "cm_bkgQueueCount 0"); + lock_ReleaseWrite(&cm_daemonLockp[daemonID]); - osi_Log1(afsd_logp,"cm_BkgDaemon processing request 0x%p", rp); + osi_Log2(afsd_logp,"cm_BkgDaemon[%u] processing request 0x%p", daemonID, rp); if (rp->scp->flags & CM_SCACHEFLAG_DELETED) { - osi_Log1(afsd_logp,"cm_BkgDaemon DELETED scp 0x%x",rp->scp); + osi_Log2(afsd_logp,"cm_BkgDaemon[%u] DELETED scp 0x%x", daemonID, rp->scp); code = CM_ERROR_BADFD; } else { #ifdef DEBUG_REFCOUNT - osi_Log2(afsd_logp,"cm_BkgDaemon (before) scp 0x%x ref %d",rp->scp, rp->scp->refCount); + osi_Log3(afsd_logp,"cm_BkgDaemon[%u] (before) scp 0x%x ref %d", daemonID, rp->scp, rp->scp->refCount); #endif code = (*rp->procp)(rp->scp, rp->p1, rp->p2, rp->p3, rp->p4, rp->userp, &rp->req); #ifdef DEBUG_REFCOUNT - osi_Log2(afsd_logp,"cm_BkgDaemon (after) scp 0x%x ref %d",rp->scp, rp->scp->refCount); + osi_Log3(afsd_logp,"cm_BkgDaemon[%u] (after) scp 0x%x ref %d", daemonID, rp->scp, rp->scp->refCount); #endif } @@ -233,29 +232,29 @@ void * cm_BkgDaemon(void * vparm) case CM_ERROR_PARTIALWRITE: if (rp->procp == cm_BkgStore || rp->procp == RDR_BkgFetch) { - osi_Log2(afsd_logp, - "cm_BkgDaemon re-queueing failed request 0x%p code 0x%x", - rp, code); - lock_ObtainWrite(&cm_daemonLock); - cm_bkgQueueCount++; - osi_QAddT((osi_queue_t **) &cm_bkgListp, (osi_queue_t **)&cm_bkgListEndp, &rp->q); + osi_Log3(afsd_logp, + "cm_BkgDaemon[%u] re-queueing failed request 0x%p code 0x%x", + daemonID, rp, code); + lock_ObtainWrite(&cm_daemonLockp[daemonID]); + cm_bkgQueueCountp[daemonID]++; + osi_QAddT((osi_queue_t **) &cm_bkgListpp[daemonID], (osi_queue_t **)&cm_bkgListEndpp[daemonID], &rp->q); break; } /* otherwise fall through */ case 0: /* success */ default: /* other error */ if (code == 0) { - osi_Log1(afsd_logp,"cm_BkgDaemon SUCCESS: request 0x%p", rp); + osi_Log2(afsd_logp,"cm_BkgDaemon[%u] SUCCESS: request 0x%p", daemonID, rp); } else { - osi_Log2(afsd_logp,"cm_BkgDaemon FAILED: request dropped 0x%p code 0x%x", - rp, code); + osi_Log3(afsd_logp,"cm_BkgDaemon[%u] FAILED: request dropped 0x%p code 0x%x", + daemonID, rp, code); } cm_ReleaseUser(rp->userp); cm_ReleaseSCache(rp->scp); free(rp); - lock_ObtainWrite(&cm_daemonLock); + lock_ObtainWrite(&cm_daemonLockp[daemonID]); } } - lock_ReleaseWrite(&cm_daemonLock); + lock_ReleaseWrite(&cm_daemonLockp[daemonID]); thrd_SetEvent(cm_BkgDaemon_ShutdownEvent[daemonID]); pthread_exit(NULL); return NULL; @@ -265,6 +264,7 @@ void cm_QueueBKGRequest(cm_scache_t *scp, cm_bkgProc_t *procp, afs_uint32 p1, af cm_user_t *userp, cm_req_t *reqp) { cm_bkgRequest_t *rp; + afs_uint32 daemonID = scp->fid.hash % cm_nDaemons; rp = malloc(sizeof(*rp)); memset(rp, 0, sizeof(*rp)); @@ -280,14 +280,12 @@ void cm_QueueBKGRequest(cm_scache_t *scp, cm_bkgProc_t *procp, afs_uint32 p1, af rp->p4 = p4; rp->req = *reqp; - lock_ObtainWrite(&cm_daemonLock); - cm_bkgQueueCount++; - osi_QAdd((osi_queue_t **) &cm_bkgListp, &rp->q); - if (!cm_bkgListEndp) - cm_bkgListEndp = rp; - lock_ReleaseWrite(&cm_daemonLock); + lock_ObtainWrite(&cm_daemonLockp[daemonID]); + cm_bkgQueueCountp[daemonID]++; + osi_QAddH((osi_queue_t **) &cm_bkgListpp[daemonID], (osi_queue_t **)&cm_bkgListEndpp[daemonID], &rp->q); + lock_ReleaseWrite(&cm_daemonLockp[daemonID]); - osi_Wakeup((LONG_PTR) &cm_bkgListp); + osi_Wakeup((LONG_PTR) &cm_bkgListpp[daemonID]); } static int @@ -774,10 +772,10 @@ void cm_DaemonShutdown(void) DWORD code; daemon_ShutdownFlag = 1; - osi_Wakeup((LONG_PTR) &cm_bkgListp); /* wait for shutdown */ for ( i=0; i CM_MAX_DAEMONS) ? CM_MAX_DAEMONS : nDaemons; if (osi_Once(&once)) { - lock_InitializeRWLock(&cm_daemonLock, "cm_daemonLock", - LOCK_HIERARCHY_DAEMON_GLOBAL); - osi_EndOnce(&once); - /* creating IP Address Change monitor daemon */ pstatus = pthread_create(&phandle, &tattr, cm_IpAddrDaemon, 0); osi_assertx(pstatus == 0, "cm_IpAddrDaemon thread creation failure"); @@ -829,10 +823,19 @@ void cm_InitDaemon(int nDaemons) pstatus = pthread_create(&phandle, &tattr, cm_LockDaemon, 0); osi_assertx(pstatus == 0, "cm_LockDaemon thread creation failure"); + cm_bkgListpp = malloc(nDaemons * sizeof(void *)); + cm_bkgListEndpp = malloc(nDaemons * sizeof(void *)); + cm_bkgQueueCountp = malloc(nDaemons * sizeof(afs_uint64)); + cm_daemonLockp = malloc(nDaemons * sizeof(osi_rwlock_t)); + for(i=0; i < cm_nDaemons; i++) { + lock_InitializeRWLock(&cm_daemonLockp[i], "cm_daemonLock", + LOCK_HIERARCHY_DAEMON_GLOBAL); + cm_bkgListpp[i] = cm_bkgListEndpp[i] = NULL; pstatus = pthread_create(&phandle, &tattr, cm_BkgDaemon, (LPVOID)(LONG_PTR)i); osi_assertx(pstatus == 0, "cm_BkgDaemon thread creation failure"); } + osi_EndOnce(&once); } pthread_attr_destroy(&tattr); diff --git a/src/WINNT/afsd/cm_daemon.h b/src/WINNT/afsd/cm_daemon.h index 087260191..b70f38068 100644 --- a/src/WINNT/afsd/cm_daemon.h +++ b/src/WINNT/afsd/cm_daemon.h @@ -18,7 +18,8 @@ extern long cm_daemonCheckCBInterval; extern long cm_daemonCheckLockInterval; extern long cm_daemonTokenCheckInterval; -extern osi_rwlock_t cm_daemonLock; +extern osi_rwlock_t *cm_daemonLockp; +extern int cm_nDaemons; void cm_DaemonShutdown(void); @@ -28,7 +29,7 @@ typedef afs_int32 (cm_bkgProc_t)(cm_scache_t *scp, afs_uint32 p1, afs_uint32 p2, afs_uint32 p4, struct cm_user *up, cm_req_t *reqp); typedef struct cm_bkgRequest { - osi_queue_t q; + osi_queue_t q; cm_bkgProc_t *procp; cm_scache_t *scp; afs_uint32 p1; -- 2.39.5