From e356090ba21395e110b8a403a5efaf725ca3ffc8 Mon Sep 17 00:00:00 2001 From: Jeffrey Altman Date: Sun, 17 Oct 2010 00:35:36 -0400 Subject: [PATCH] Windows: Use rx_Readv / rx_Writev When USE_RX_IOVEC is defined, cm_BufWrite() will utilize rx_Writev() instead of rx_Write() and cm_GetBuffer() will use rx_Readv() instead of rx_Read() to improve throughput. LICENSE MIT Change-Id: Ib70dfd4fd7a79c9ce36ef4fd8f4bb46a946621fd Reviewed-on: http://gerrit.openafs.org/2999 Tested-by: BuildBot Tested-by: Jeffrey Altman Reviewed-by: Jeffrey Altman --- src/WINNT/afsd/cm_dcache.c | 155 ++++++++++++++++++++++++++++++++++++- 1 file changed, 151 insertions(+), 4 deletions(-) diff --git a/src/WINNT/afsd/cm_dcache.c b/src/WINNT/afsd/cm_dcache.c index 72081fb4e..75fb05c82 100644 --- a/src/WINNT/afsd/cm_dcache.c +++ b/src/WINNT/afsd/cm_dcache.c @@ -13,9 +13,6 @@ #include #include #include -#ifdef COMMENT -#include -#endif #include #include #include @@ -30,6 +27,8 @@ extern void afsi_log(char *pattern, ...); extern osi_mutex_t cm_Freelance_Lock; #endif +#define USE_RX_IOVEC 1 + /* we can access connp->serverp without holding a lock because that never changes since the connection is made. */ #define SERVERHAS64BIT(connp) (!((connp)->serverp->flags & CM_SERVERFLAG_NO64BIT)) @@ -195,10 +194,73 @@ long cm_BufWrite(void *vscp, osi_hyper_t *offsetp, long length, long flags, } if (code == 0) { + afs_uint32 buf_offset; + /* write the data from the the list of buffers */ qdp = NULL; while(nbytes > 0) { - afs_uint32 buf_offset; +#ifdef USE_RX_IOVEC + struct iovec tiov[RX_MAXIOVECS]; + afs_int32 tnio, vlen, vbytes, iov, voffset; + afs_uint32 vleft; + + vbytes = rx_WritevAlloc(rxcallp, tiov, &tnio, RX_MAXIOVECS, nbytes); + if (vbytes <= 0) { + code = RX_PROTOCOL_ERROR; + break; + } + + for ( iov = voffset = vlen = 0; vlen < vbytes; vlen += wbytes) { + if (qdp == NULL) { + qdp = biod.bufListEndp; + buf_offset = offsetp->LowPart % cm_data.buf_blockSize; + } else if (buf_offset == cm_data.buf_blockSize) { + qdp = (osi_queueData_t *) osi_QPrev(&qdp->q); + buf_offset = 0; + } + + osi_assertx(qdp != NULL, "null osi_queueData_t"); + bufp = osi_GetQData(qdp); + bufferp = bufp->datap + buf_offset; + wbytes = vbytes - vlen; + if (wbytes > cm_data.buf_blockSize - buf_offset) + wbytes = cm_data.buf_blockSize - buf_offset; + + vleft = tiov[iov].iov_len - voffset; + while (wbytes > vleft) { + memcpy(tiov[iov].iov_base + voffset, bufferp, vleft); + vlen += vleft; + wbytes -= vleft; + bufferp += vleft; + + iov++; + voffset = 0; + vleft = tiov[iov].iov_len; + } + + memcpy(tiov[iov].iov_base + voffset, bufferp, wbytes); + if (tiov[iov].iov_len == voffset + wbytes) { + iov++; + voffset = 0; + vleft = tiov[iov].iov_len; + } else { + voffset += wbytes; + vleft -= wbytes; + } + buf_offset += wbytes; + } + + temp = rx_Writev(rxcallp, tiov, tnio, vbytes); + if (temp != vbytes) { + osi_Log3(afsd_logp, "rx_Writev failed bp 0x%p, %d != %d", bufp, temp, vbytes); + code = (rxcallp->error < 0) ? rxcallp->error : RX_PROTOCOL_ERROR; + break; + } + + osi_Log3(afsd_logp, "rx_Writev succeeded bp 0x%p offset 0x%x, wrote %u", + bufp, buf_offset, vbytes); + nbytes -= vbytes; +#else /* USE_RX_IOVEC */ if (qdp == NULL) { qdp = biod.bufListEndp; buf_offset = offsetp->LowPart % cm_data.buf_blockSize; @@ -206,6 +268,7 @@ long cm_BufWrite(void *vscp, osi_hyper_t *offsetp, long length, long flags, qdp = (osi_queueData_t *) osi_QPrev(&qdp->q); buf_offset = 0; } + osi_assertx(qdp != NULL, "null osi_queueData_t"); bufp = osi_GetQData(qdp); bufferp = bufp->datap + buf_offset; @@ -223,6 +286,7 @@ long cm_BufWrite(void *vscp, osi_hyper_t *offsetp, long length, long flags, osi_Log2(afsd_logp, "rx_Write succeeded bp 0x%p, %d",bufp,temp); } nbytes -= wbytes; +#endif /* USE_RX_IOVEC */ } /* while more bytes to write */ } /* if RPC started successfully */ @@ -1511,6 +1575,7 @@ long cm_GetBuffer(cm_scache_t *scp, cm_buf_t *bufp, int *cpffp, cm_user_t *userp AFSCallBack callback; AFSVolSync volSync; char *bufferp; + afs_uint32 buffer_offset; cm_buf_t *tbufp; /* buf we're filling */ osi_queueData_t *qdp; /* q element we're scanning */ AFSFid tfid; @@ -1786,9 +1851,11 @@ long cm_GetBuffer(cm_scache_t *scp, cm_buf_t *bufp, int *cpffp, cm_user_t *userp if (qdp) { tbufp = osi_GetQData(qdp); bufferp = tbufp->datap; + buffer_offset = 0; } else bufferp = NULL; + /* fill length_found of data from the pipe into the pages. * When we stop, qdp will point at the last page we're * dealing with, and bufferp will tell us where we @@ -1797,6 +1864,80 @@ long cm_GetBuffer(cm_scache_t *scp, cm_buf_t *bufp, int *cpffp, cm_user_t *userp * clear later pages out, if we fetch past EOF). */ while (length_found > 0) { +#ifdef USE_RX_IOVEC + struct iovec tiov[RX_MAXIOVECS]; + afs_int32 tnio, iov, iov_offset; + + temp = rx_Readv(rxcallp, tiov, &tnio, RX_MAXIOVECS, length_found); + osi_Log1(afsd_logp, "cm_GetBuffer rx_Readv returns %d", temp); + if (temp != length_found && temp < cm_data.buf_blockSize) { + /* + * If the file server returned (filesize - offset), + * then the first rx_Read will return zero octets of data. + * If it does, do not treat it as an error. Correct the + * length_found and continue as if the file server said + * it was sending us zero octets of data. + */ + if (fs_fetchdata_offset_bug && first_read) + length_found = 0; + else + code = (rxcallp->error < 0) ? rxcallp->error : RX_PROTOCOL_ERROR; + break; + } + + iov = 0; + iov_offset = 0; + rbytes = temp; + + while (rbytes > 0) { + afs_int32 len; + + osi_assertx(bufferp != NULL, "null cm_buf_t"); + + len = MIN(tiov[iov].iov_len - iov_offset, cm_data.buf_blockSize - buffer_offset); + memcpy(bufferp + buffer_offset, tiov[iov].iov_base + iov_offset, len); + iov_offset += len; + buffer_offset += len; + rbytes -= len; + + if (iov_offset == tiov[iov].iov_len) { + iov++; + iov_offset = 0; + } + + if (buffer_offset == cm_data.buf_blockSize) { + /* allow read-while-fetching. + * if this is the last buffer, clear the + * PREFETCHING flag, so the reader waiting for + * this buffer will start a prefetch. + */ + tbufp->cmFlags |= CM_BUF_CMFULLYFETCHED; + lock_ObtainWrite(&scp->rw); + if (scp->flags & CM_SCACHEFLAG_WAITING) { + osi_Log1(afsd_logp, "CM GetBuffer Waking scp 0x%p", scp); + osi_Wakeup((LONG_PTR) &scp->flags); + } + if (cpffp && !*cpffp && !osi_QPrev(&qdp->q)) { + osi_hyper_t tlength = ConvertLongToLargeInteger(biod.length); + *cpffp = 1; + cm_ClearPrefetchFlag(0, scp, &biod.offset, &tlength); + } + lock_ReleaseWrite(&scp->rw); + + /* Advance the buffer */ + qdp = (osi_queueData_t *) osi_QPrev(&qdp->q); + if (qdp) { + tbufp = osi_GetQData(qdp); + bufferp = tbufp->datap; + buffer_offset = 0; + } + else + bufferp = NULL; + } + } + + length_found -= temp; +#else /* USE_RX_IOVEC */ /* assert that there are still more buffers; * our check above for length_found being less than * biod.length should ensure this. @@ -1854,6 +1995,7 @@ long cm_GetBuffer(cm_scache_t *scp, cm_buf_t *bufp, int *cpffp, cm_user_t *userp bufferp = NULL; } else bufferp += temp; +#endif /* USE_RX_IOVEC */ } /* zero out remainder of last pages, in case we are @@ -1862,12 +2004,17 @@ long cm_GetBuffer(cm_scache_t *scp, cm_buf_t *bufp, int *cpffp, cm_user_t *userp * a page. Zero the remainder of that page, and then * all of the rest of the pages. */ +#ifdef USE_RX_IOVEC + rbytes = cm_data.buf_blockSize - buffer_offset; + bufferp = tbufp->datap + buffer_offset; +#else /* USE_RX_IOVEC */ /* bytes fetched */ osi_assertx((bufferp - tbufp->datap) < LONG_MAX, "data >= LONG_MAX"); rbytes = (long) (bufferp - tbufp->datap); /* bytes left to zero */ rbytes = cm_data.buf_blockSize - rbytes; +#endif /* USE_RX_IOVEC */ while(qdp) { if (rbytes != 0) memset(bufferp, 0, rbytes); -- 2.39.5