From 4c58b1db9f34c220d839dd4a1b38ed11e9ce2197 Mon Sep 17 00:00:00 2001 From: Jeffrey Altman Date: Sat, 14 Nov 2009 16:33:31 -0500 Subject: [PATCH] Windows: Improvements to background fetch processing Log offset and length in cm_BkgPrefetch() Convert mxheld to rwheld in cm_BkgPrefetch() now that cm_scache_t objects use rwlocks. Do not clear CM_SCACHEFLAG_PREFETCHING from within the error returns from cm_CheckFetchRange(). Let the caller decide if that is appropriate. Add CM_BUF_CMBKGFETCH cm_buf_t cmFlag to make it possible to quickly detect if a background fetch operation has already been queued for a particular cm_buf_t data range. LICENSE MIT Change-Id: I4ac9a2f84ddd64cba86612d7a2abe849bd0bec0b Reviewed-on: http://gerrit.openafs.org/827 Reviewed-by: Derrick Brashear Tested-by: Jeffrey Altman Reviewed-by: Jeffrey Altman --- src/WINNT/afsd/cm_buf.h | 4 +- src/WINNT/afsd/cm_dcache.c | 111 +++++++++++++++++++++++++++++-------- src/WINNT/afsd/cm_dcache.h | 6 +- 3 files changed, 93 insertions(+), 28 deletions(-) diff --git a/src/WINNT/afsd/cm_buf.h b/src/WINNT/afsd/cm_buf.h index 886ec36fa..6ca613c34 100644 --- a/src/WINNT/afsd/cm_buf.h +++ b/src/WINNT/afsd/cm_buf.h @@ -94,6 +94,8 @@ typedef struct cm_buf { #define CM_BUF_CMSTORING 2 /* storing this buffer */ #define CM_BUF_CMFULLYFETCHED 4 /* read-while-fetching optimization */ #define CM_BUF_CMWRITING 8 /* writing to this buffer */ +#define CM_BUF_CMBKGFETCH 16 /* background fetch queued by + * prefetch or redirector */ /* waiting is done based on scp->flags. Removing bits from cmFlags should be followed by waking the scp. */ @@ -101,7 +103,7 @@ typedef struct cm_buf { #define CM_BUF_WRITING 2 /* now writing buffer to the disk */ #define CM_BUF_INHASH 4 /* in the hash table */ #define CM_BUF_DIRTY 8 /* buffer is dirty */ -#define CM_BUF_INLRU 0x10 /* in lru queue */ +#define CM_BUF_INLRU 0x10 /* in lru queue (aka free list) */ #define CM_BUF_ERROR 0x20 /* something went wrong on delayed write */ #define CM_BUF_WAITING 0x40 /* someone's waiting for a flag to change */ #define CM_BUF_INDL 0x80 /* in the dirty list */ diff --git a/src/WINNT/afsd/cm_dcache.c b/src/WINNT/afsd/cm_dcache.c index 340eb67e4..1f4784361 100644 --- a/src/WINNT/afsd/cm_dcache.c +++ b/src/WINNT/afsd/cm_dcache.c @@ -559,8 +559,12 @@ int cm_HaveBuffer(cm_scache_t *scp, cm_buf_t *bufp, int isBufLocked) return 0; } -/* used when deciding whether to do a prefetch or not */ -long cm_CheckFetchRange(cm_scache_t *scp, osi_hyper_t *startBasep, osi_hyper_t *length, +/* + * used when deciding whether to do a background fetch or not. + * call with scp->rw write-locked. + */ +afs_int32 +cm_CheckFetchRange(cm_scache_t *scp, osi_hyper_t *startBasep, osi_hyper_t *length, cm_user_t *userp, cm_req_t *reqp, osi_hyper_t *realBasep) { osi_hyper_t tbase; @@ -577,16 +581,12 @@ long cm_CheckFetchRange(cm_scache_t *scp, osi_hyper_t *startBasep, osi_hyper_t * tlength = *length; tblocksize = ConvertLongToLargeInteger(cm_data.buf_blockSize); stop = 0; - lock_ObtainWrite(&scp->rw); while (LargeIntegerGreaterThanZero(tlength)) { /* get callback so we can do a meaningful dataVersion comparison */ code = cm_SyncOp(scp, NULL, userp, reqp, 0, CM_SCACHESYNC_NEEDCALLBACK | CM_SCACHESYNC_GETSTATUS); - if (code) { - scp->flags &= ~CM_SCACHEFLAG_PREFETCHING; - lock_ReleaseWrite(&scp->rw); + if (code) return code; - } if (LargeIntegerGreaterThanOrEqualTo(tbase, scp->length)) { /* we're past the end of file */ @@ -596,7 +596,7 @@ long cm_CheckFetchRange(cm_scache_t *scp, osi_hyper_t *startBasep, osi_hyper_t * bp = buf_Find(scp, &tbase); /* We cheat slightly by not locking the bp mutex. */ if (bp) { - if ((bp->cmFlags & (CM_BUF_CMFETCHING | CM_BUF_CMSTORING)) == 0 + if ((bp->cmFlags & (CM_BUF_CMFETCHING | CM_BUF_CMSTORING | CM_BUF_CMBKGFETCH)) == 0 && (bp->dataVersion < scp->bufDataVersionLow || bp->dataVersion > scp->dataVersion)) stop = 1; buf_Release(bp); @@ -620,7 +620,6 @@ long cm_CheckFetchRange(cm_scache_t *scp, osi_hyper_t *startBasep, osi_hyper_t * */ if (stop == 0) { /* return non-zero code since realBasep won't be valid */ - scp->flags &= ~CM_SCACHEFLAG_PREFETCHING; code = -1; } else { @@ -628,7 +627,6 @@ long cm_CheckFetchRange(cm_scache_t *scp, osi_hyper_t *startBasep, osi_hyper_t * *realBasep = tbase; code = 0; } - lock_ReleaseWrite(&scp->rw); return code; } @@ -709,8 +707,8 @@ cm_BkgPrefetch(cm_scache_t *scp, afs_uint32 p1, afs_uint32 p2, afs_uint32 p3, af osi_hyper_t end; osi_hyper_t fetched; osi_hyper_t tblocksize; - long code; - int mxheld = 0; + afs_int32 code; + int rxheld = 0; cm_buf_t *bp = NULL; cm_req_t req; @@ -729,15 +727,16 @@ cm_BkgPrefetch(cm_scache_t *scp, afs_uint32 p1, afs_uint32 p2, afs_uint32 p3, af end = LargeIntegerAdd(base, length); - osi_Log3(afsd_logp, "Starting BKG prefetch scp 0x%p, base 0x%x:%x", scp, p2, p1); + osi_Log5(afsd_logp, "Starting BKG prefetch scp 0x%p offset 0x%x:%x length 0x%x:%x", + scp, p2, p1, p4, p3); for ( code = 0, offset = base; code == 0 && LargeIntegerLessThan(offset, end); offset = LargeIntegerAdd(offset, tblocksize) ) { - if (mxheld) { + if (rxheld) { lock_ReleaseWrite(&scp->rw); - mxheld = 0; + rxheld = 0; } code = buf_Get(scp, &offset, &req, &bp); @@ -746,32 +745,56 @@ cm_BkgPrefetch(cm_scache_t *scp, afs_uint32 p1, afs_uint32 p2, afs_uint32 p3, af if (bp->cmFlags & CM_BUF_CMFETCHING) { /* skip this buffer as another thread is already fetching it */ + if (!rxheld) { + lock_ObtainWrite(&scp->rw); + rxheld = 1; + } + bp->cmFlags &= ~CM_BUF_CMBKGFETCH; buf_Release(bp); bp = NULL; continue; } - if (!mxheld) { + if (!rxheld) { lock_ObtainWrite(&scp->rw); - mxheld = 1; + rxheld = 1; } code = cm_GetBuffer(scp, bp, NULL, userp, &req); if (code == 0) fetched = LargeIntegerAdd(fetched, tblocksize); buf_Release(bp); + bp->cmFlags &= ~CM_BUF_CMBKGFETCH; } - if (!mxheld) { + if (!rxheld) { lock_ObtainWrite(&scp->rw); - mxheld = 1; + rxheld = 1; + } + + /* Clear flag from any remaining buffers */ + for ( ; + LargeIntegerLessThan(offset, end); + offset = LargeIntegerAdd(offset, tblocksize) ) + { + bp = buf_Find(scp, &offset); + if (bp) { + bp->cmFlags &= ~CM_BUF_CMBKGFETCH; + buf_Release(bp); + } } cm_ClearPrefetchFlag(LargeIntegerGreaterThanZero(fetched) ? 0 : code, scp, &base, &fetched); + + /* wakeup anyone who is waiting */ + if (scp->flags & CM_SCACHEFLAG_WAITING) { + osi_Log1(afsd_logp, "CM BkgPrefetch Waking scp 0x%p", scp); + osi_Wakeup((LONG_PTR) &scp->flags); + } lock_ReleaseWrite(&scp->rw); - osi_Log4(afsd_logp, "Ending BKG prefetch scp 0x%p, code %d bytes 0x%x:%x", - scp, code, fetched.HighPart, fetched.LowPart); + osi_Log4(afsd_logp, "Ending BKG prefetch scp 0x%p code 0x%x fetched 0x%x:%x", + scp, code, fetched.HighPart, fetched.LowPart); return code; } @@ -782,9 +805,16 @@ void cm_ConsiderPrefetch(cm_scache_t *scp, osi_hyper_t *offsetp, afs_uint32 coun cm_user_t *userp, cm_req_t *reqp) { long code; + int rwheld = 0; osi_hyper_t realBase; osi_hyper_t readBase; osi_hyper_t readLength; + osi_hyper_t readEnd; + osi_hyper_t offset; + osi_hyper_t tblocksize; /* a long long temp variable */ + cm_buf_t *bp; + + tblocksize = ConvertLongToLargeInteger(cm_data.buf_blockSize); readBase = *offsetp; /* round up to chunk boundary */ @@ -794,6 +824,7 @@ void cm_ConsiderPrefetch(cm_scache_t *scp, osi_hyper_t *offsetp, afs_uint32 coun readLength = ConvertLongToLargeInteger(count); lock_ObtainWrite(&scp->rw); + rwheld = 1; if ((scp->flags & CM_SCACHEFLAG_PREFETCHING) || LargeIntegerLessThanOrEqualTo(readBase, scp->prefetch.base)) { lock_ReleaseWrite(&scp->rw); @@ -807,12 +838,44 @@ void cm_ConsiderPrefetch(cm_scache_t *scp, osi_hyper_t *offsetp, afs_uint32 coun if (LargeIntegerGreaterThan(scp->prefetch.end, readBase)) readBase = scp->prefetch.end; - lock_ReleaseWrite(&scp->rw); - code = cm_CheckFetchRange(scp, &readBase, &readLength, userp, reqp, &realBase); - if (code) + if (code) { + scp->flags &= ~CM_SCACHEFLAG_PREFETCHING; + lock_ReleaseWrite(&scp->rw); return; /* can't find something to prefetch */ + } + + readEnd = LargeIntegerAdd(realBase, readLength); + + /* + * Mark each buffer in the range as queued for a + * background fetch + */ + for ( offset = realBase; + LargeIntegerLessThan(offset, readEnd); + offset = LargeIntegerAdd(offset, tblocksize) ) + { + if (rwheld) { + lock_ReleaseWrite(&scp->rw); + rwheld = 0; + } + + bp = buf_Find(scp, &offset); + if (!bp) + continue; + + if (!rwheld) { + lock_ObtainWrite(&scp->rw); + rwheld = 1; + } + + bp->cmFlags |= CM_BUF_CMBKGFETCH; + buf_Release(bp); + } + + if (rwheld) + lock_ReleaseWrite(&scp->rw); osi_Log2(afsd_logp, "BKG Prefetch request scp 0x%p, base 0x%x", scp, realBase.LowPart); diff --git a/src/WINNT/afsd/cm_dcache.h b/src/WINNT/afsd/cm_dcache.h index 2d7e0586d..cb3e74799 100644 --- a/src/WINNT/afsd/cm_dcache.h +++ b/src/WINNT/afsd/cm_dcache.h @@ -31,9 +31,9 @@ extern int cm_HaveBuffer(struct cm_scache *, struct cm_buf *, int haveBufLocked) extern long cm_GetBuffer(struct cm_scache *, struct cm_buf *, int *, struct cm_user *, struct cm_req *); -extern long cm_CheckFetchRange(cm_scache_t *scp, osi_hyper_t *startBasep, - osi_hyper_t *length, cm_user_t *up, - cm_req_t *reqp, osi_hyper_t *realBasep); +extern afs_int32 cm_CheckFetchRange(cm_scache_t *scp, osi_hyper_t *startBasep, + osi_hyper_t *length, cm_user_t *up, + cm_req_t *reqp, osi_hyper_t *realBasep); extern long cm_SetupFetchBIOD(cm_scache_t *scp, osi_hyper_t *offsetp, cm_bulkIO_t *biop, cm_user_t *up, cm_req_t *reqp); -- 2.39.5