From: Tom Keiser Date: Sun, 3 Apr 2005 21:21:44 +0000 (+0000) Subject: STABLE14-rx-2tier-freepacketq-20050403 X-Git-Tag: openafs-devel-1_3_81~11 X-Git-Url: https://git.michaelhowe.org/gitweb/?a=commitdiff_plain;h=3718f9a5357442ed9af0176cd94788c5d776b964;p=packages%2Fo%2Fopenafs.git STABLE14-rx-2tier-freepacketq-20050403 FIXES 17805 The attached patch turns the rx free packet queue into a 2-tiered cache with a local thread-specific queue, backed by a global queue when using pthreaded rx. The patch is against 1.3.79. Preliminary benchmarks show scalability much closer to 2 cpus on sparc with this patch. Also, fileserver performance under heavy load is improved: 50% improvement in throughput for sequential writes, and a 10% improvement in performance for random writes against an smp sparc solaris 10 fileserver. (cherry picked from commit 5c6b956257f4f43122d393eaf119ad7616084576) --- diff --git a/src/rx/rx.c b/src/rx/rx.c index 31907fdfd..8cf23d508 100644 --- a/src/rx/rx.c +++ b/src/rx/rx.c @@ -217,6 +217,7 @@ rxi_InitPthread(void) assert(pthread_cond_init(&rx_listener_cond, (const pthread_condattr_t *)0) == 0); assert(pthread_key_create(&rx_thread_id_key, NULL) == 0); + assert(pthread_key_create(&rx_ts_info_key, NULL) == 0); } pthread_once_t rx_once_init = PTHREAD_ONCE_INIT; @@ -461,10 +462,15 @@ rx_InitHost(u_int host, u_int port) /* Malloc up a bunch of packets & buffers */ rx_nFreePackets = 0; - rx_nPackets = rx_extraPackets + RX_MAX_QUOTA + 2; /* fudge */ queue_Init(&rx_freePacketQueue); rxi_NeedMorePackets = FALSE; +#ifdef RX_ENABLE_TSFPQ + rx_nPackets = 0; /* in TSFPQ version, rx_nPackets is managed by rxi_MorePackets* */ + rxi_MorePacketsTSFPQ(rx_extraPackets + RX_MAX_QUOTA + 2, RX_TS_FPQ_FLUSH_GLOBAL, 0); +#else /* RX_ENABLE_TSFPQ */ + rx_nPackets = rx_extraPackets + RX_MAX_QUOTA + 2; /* fudge */ rxi_MorePackets(rx_nPackets); +#endif /* RX_ENABLE_TSFPQ */ rx_CheckPackets(); NETPRI; @@ -649,6 +655,10 @@ rxi_StartServerProcs(int nExistingProcs) } nProcs += maxdiff; /* Extra processes needed to allow max number requested to run in any given service, under good conditions */ nProcs -= nExistingProcs; /* Subtract the number of procs that were previously created for use as server procs */ +#ifdef RX_ENABLE_TSFPQ + rx_TSFPQMaxProcs += nProcs; + RX_TS_FPQ_COMPUTE_LIMITS; +#endif /* RX_ENABLE_TSFPQ */ for (i = 0; i < nProcs; i++) { rxi_StartServerProc(rx_ServerProc, rx_stackSize); } diff --git a/src/rx/rx_globals.h b/src/rx/rx_globals.h index 48774f00f..fffb414d0 100644 --- a/src/rx/rx_globals.h +++ b/src/rx/rx_globals.h @@ -9,6 +9,10 @@ /* RX: Globals for internal use, basically */ +#ifndef AFS_RX_GLOBALS_H +#define AFS_RX_GLOBALS_H + + #ifdef KERNEL #include "rx/rx.h" #else /* KERNEL */ @@ -22,7 +26,7 @@ #else #define EXT extern #endif -#endif +#endif /* !INIT */ /* Basic socket for client requests; other sockets (for receiving server requests) are in the service structures */ EXT osi_socket rx_socket; @@ -141,11 +145,161 @@ EXT int rxi_HardAckRate INIT(RX_FAST_ACK_RATE + 1); EXT int rx_nPackets INIT(100); /* obsolete; use rx_extraPackets now */ +/* + * pthreads thread-specific rx info support + * the rx_ts_info_t struct is meant to support all kinds of + * thread-specific rx data: + * + * _FPQ member contains a thread-specific free packet queue + */ +#ifdef AFS_PTHREAD_ENV +EXT pthread_key_t rx_ts_info_key; +typedef struct rx_ts_info_t { + struct { + struct rx_queue queue; + int len; /* local queue length */ + int delta; /* number of new packets alloc'd locally since last sync w/ global queue */ + } _FPQ; +} rx_ts_info_t; +EXT struct rx_ts_info_t * rx_ts_info_init(); /* init function for thread-specific data struct */ +#define RX_TS_INFO_GET(ts_info_p) \ + do { \ + ts_info_p = (struct rx_ts_info_t*)pthread_getspecific(rx_ts_info_key); \ + if (ts_info_p == NULL) { \ + assert((ts_info_p = rx_ts_info_init()) != NULL); \ + } \ + } while(0) +#endif /* AFS_PTHREAD_ENV */ + + /* List of free packets */ +/* in pthreads rx, free packet queue is now a two-tiered queueing system + * in which the first tier is thread-specific, and the second tier is + * a global free packet queue */ EXT struct rx_queue rx_freePacketQueue; +#define RX_FPQ_MARK_FREE(p) \ + do { \ + if ((p)->flags & RX_PKTFLAG_FREE) \ + osi_Panic("rx packet already free\n"); \ + (p)->flags |= RX_PKTFLAG_FREE; \ + } while(0) +#define RX_FPQ_MARK_USED(p) \ + do { \ + if (!((p)->flags & RX_PKTFLAG_FREE)) \ + osi_Panic("rx packet not free\n"); \ + (p)->flags = 0; /* clear RX_PKTFLAG_FREE, initialize the rest */ \ + (p)->header.flags = 0; \ + } while(0) +#define RX_PACKET_IOV_INIT(p) \ + do { \ + (p)->wirevec[0].iov_base = (char *)((p)->wirehead); \ + (p)->wirevec[0].iov_len = RX_HEADER_SIZE; \ + (p)->wirevec[1].iov_base = (char *)((p)->localdata); \ + (p)->wirevec[1].iov_len = RX_FIRSTBUFFERSIZE; \ + } while(0) +#define RX_PACKET_IOV_FULLINIT(p) \ + do { \ + (p)->wirevec[0].iov_base = (char *)((p)->wirehead); \ + (p)->wirevec[0].iov_len = RX_HEADER_SIZE; \ + (p)->wirevec[1].iov_base = (char *)((p)->localdata); \ + (p)->wirevec[1].iov_len = RX_FIRSTBUFFERSIZE; \ + (p)->niovecs = 2; \ + (p)->length = RX_FIRSTBUFFERSIZE; \ + } while(0) #ifdef RX_ENABLE_LOCKS EXT afs_kmutex_t rx_freePktQ_lock; -#endif +#ifdef AFS_PTHREAD_ENV +#define RX_ENABLE_TSFPQ +EXT int rx_TSFPQGlobSize INIT(3); /* number of packets to transfer between global and local queues in one op */ +EXT int rx_TSFPQLocalMax INIT(15); /* max number of packets on local FPQ before returning a glob to the global pool */ +EXT int rx_TSFPQMaxProcs INIT(1); /* max number of threads expected */ +EXT void rxi_MorePacketsTSFPQ(int apackets, int flush_global, int num_keep_local); /* more flexible packet alloc function */ +#define RX_TS_FPQ_FLUSH_GLOBAL 1 +#define RX_TS_FPQ_PULL_GLOBAL 1 +/* compute the localmax and globsize values from rx_TSFPQMaxProcs and rx_nPackets. + arbitarily set local max so that all threads consume 90% of packets, if all local queues are full. + arbitarily set transfer glob size to 20% of max local packet queue length. + also set minimum values of 15 and 3. */ +#define RX_TS_FPQ_COMPUTE_LIMITS \ + do { \ + register int newmax, newglob; \ + newmax = (rx_nPackets * 9) / (10 * rx_TSFPQMaxProcs); \ + newmax = (newmax >= 15) ? newmax : 15; \ + newglob = newmax / 5; \ + newglob = (newglob >= 3) ? newglob : 3; \ + rx_TSFPQLocalMax = newmax; \ + rx_TSFPQGlobSize = newglob; \ + } while(0) +/* move packets from local (thread-specific) to global free packet queue. + rx_freePktQ_lock must be held. default is to move the difference between the current lenght, and the + allowed max plus one extra glob. */ +#define RX_TS_FPQ_LTOG(rx_ts_info_p) \ + do { \ + register int i; \ + register struct rx_packet * p; \ + register int tsize = (rx_ts_info_p)->_FPQ.len - rx_TSFPQLocalMax + rx_TSFPQGlobSize; \ + for (i=0; i < tsize; i++) { \ + p = queue_Last(&((rx_ts_info_p)->_FPQ), rx_packet); \ + queue_Remove(p); \ + queue_Prepend(&rx_freePacketQueue,p); \ + } \ + (rx_ts_info_p)->_FPQ.len -= tsize; \ + rx_nFreePackets += tsize; \ + if ((rx_ts_info_p)->_FPQ.delta) { \ + rx_nPackets += (rx_ts_info_p)->_FPQ.delta; \ + (rx_ts_info_p)->_FPQ.delta = 0; \ + RX_TS_FPQ_COMPUTE_LIMITS; \ + } \ + } while(0) +/* same as above, except user has direct control over number to transfer */ +#define RX_TS_FPQ_LTOG2(rx_ts_info_p,num_transfer) \ + do { \ + register int i; \ + register struct rx_packet * p; \ + for (i=0; i < (num_transfer); i++) { \ + p = queue_Last(&((rx_ts_info_p)->_FPQ), rx_packet); \ + queue_Remove(p); \ + queue_Prepend(&rx_freePacketQueue,p); \ + } \ + (rx_ts_info_p)->_FPQ.len -= (num_transfer); \ + rx_nFreePackets += (num_transfer); \ + if ((rx_ts_info_p)->_FPQ.delta) { \ + rx_nPackets += (rx_ts_info_p)->_FPQ.delta; \ + (rx_ts_info_p)->_FPQ.delta = 0; \ + RX_TS_FPQ_COMPUTE_LIMITS; \ + } \ + } while(0) +/* move packets from global to local (thread-specific) free packet queue. + rx_freePktQ_lock must be held. */ +#define RX_TS_FPQ_GTOL(rx_ts_info_p) \ + do { \ + register int i; \ + register struct rx_packet * p; \ + for (i=0; (i < rx_TSFPQGlobSize) && queue_IsNotEmpty(&rx_freePacketQueue); i++) { \ + p = queue_First(&rx_freePacketQueue, rx_packet); \ + queue_Remove(p); \ + queue_Append(&((rx_ts_info_p)->_FPQ),p); \ + } \ + (rx_ts_info_p)->_FPQ.len += i; \ + rx_nFreePackets -= i; \ + } while(0) +/* checkout a packet from the thread-specific free packet queue */ +#define RX_TS_FPQ_CHECKOUT(rx_ts_info_p,p) \ + do { \ + (p) = queue_First(&((rx_ts_info_p)->_FPQ), rx_packet); \ + queue_Remove(p); \ + RX_FPQ_MARK_USED(p); \ + (rx_ts_info_p)->_FPQ.len--; \ + } while(0) +/* check a packet into the thread-specific free packet queue */ +#define RX_TS_FPQ_CHECKIN(rx_ts_info_p,p) \ + do { \ + queue_Prepend(&((rx_ts_info_p)->_FPQ), (p)); \ + RX_FPQ_MARK_FREE(p); \ + (rx_ts_info_p)->_FPQ.len++; \ + } while(0) +#endif /* AFS_PTHREAD_ENV */ +#endif /* RX_ENABLE_LOCKS */ /* Number of free packets */ EXT int rx_nFreePackets INIT(0); @@ -338,3 +492,5 @@ EXT int rx_enable_stats INIT(0); * the request path. */ EXT int rx_enable_hot_thread INIT(0); + +#endif /* AFS_RX_GLOBALS_H */ diff --git a/src/rx/rx_packet.c b/src/rx/rx_packet.c index c4247ae98..e9eade689 100644 --- a/src/rx/rx_packet.c +++ b/src/rx/rx_packet.c @@ -244,6 +244,35 @@ rx_SlowWritePacket(struct rx_packet * packet, int offset, int resid, char *in) return (resid ? (r - resid) : r); } +#ifdef RX_ENABLE_TSFPQ +static struct rx_packet * +allocCBuf(int class) +{ + struct rx_packet *c; + register struct rx_ts_info_t * rx_ts_info; + SPLVAR; + + RX_TS_INFO_GET(rx_ts_info); + + if (queue_IsEmpty(&rx_ts_info->_FPQ)) { + NETPRI; + MUTEX_ENTER(&rx_freePktQ_lock); + + if (queue_IsEmpty(&rx_freePacketQueue)) { + rxi_MorePacketsNoLock(rx_initSendWindow); + } + + RX_TS_FPQ_GTOL(rx_ts_info); + + MUTEX_EXIT(&rx_freePktQ_lock); + USERPRI; + } + + RX_TS_FPQ_CHECKOUT(rx_ts_info, c); + + return c; +} +#else /* RX_ENABLE_TSFPQ */ static struct rx_packet * allocCBuf(int class) { @@ -251,6 +280,7 @@ allocCBuf(int class) SPLVAR; NETPRI; + MUTEX_ENTER(&rx_freePktQ_lock); #ifdef KERNEL @@ -306,10 +336,36 @@ allocCBuf(int class) USERPRI; return c; } +#endif /* RX_ENABLE_TSFPQ */ /* * Free a packet currently used as a continuation buffer */ +#ifdef RX_ENABLE_TSFPQ +void +rxi_freeCBuf(struct rx_packet *c) +{ + register struct rx_ts_info_t * rx_ts_info; + register int i; + SPLVAR; + + RX_TS_INFO_GET(rx_ts_info); + RX_TS_FPQ_CHECKIN(rx_ts_info,c); + + if (rx_ts_info->_FPQ.len > rx_TSFPQLocalMax) { + NETPRI; + MUTEX_ENTER(&rx_freePktQ_lock); + + RX_TS_FPQ_LTOG(rx_ts_info); + + /* Wakeup anyone waiting for packets */ + rxi_PacketsUnWait(); + + MUTEX_EXIT(&rx_freePktQ_lock); + USERPRI; + } +} +#else /* RX_ENABLE_TSFPQ */ void rxi_freeCBuf(struct rx_packet *c) { @@ -325,6 +381,7 @@ rxi_freeCBuf(struct rx_packet *c) MUTEX_EXIT(&rx_freePktQ_lock); USERPRI; } +#endif /* RX_ENABLE_TSFPQ */ /* this one is kind of awful. * In rxkad, the packet has been all shortened, and everything, ready for @@ -380,6 +437,45 @@ rxi_AllocDataBuf(struct rx_packet *p, int nb, int class) } /* Add more packet buffers */ +#ifdef RX_ENABLE_TSFPQ +void +rxi_MorePackets(int apackets) +{ + struct rx_packet *p, *e; + register struct rx_ts_info_t * rx_ts_info; + int getme; + SPLVAR; + + getme = apackets * sizeof(struct rx_packet); + p = rx_mallocedP = (struct rx_packet *)osi_Alloc(getme); + + PIN(p, getme); /* XXXXX */ + memset((char *)p, 0, getme); + RX_TS_INFO_GET(rx_ts_info); + + for (e = p + apackets; p < e; p++) { + RX_PACKET_IOV_INIT(p); + p->niovecs = 2; + + RX_TS_FPQ_CHECKIN(rx_ts_info,p); + } + rx_ts_info->_FPQ.delta += apackets; + + if (rx_ts_info->_FPQ.len > rx_TSFPQLocalMax) { + NETPRI; + AFS_RXGLOCK(); + MUTEX_ENTER(&rx_freePktQ_lock); + + RX_TS_FPQ_LTOG(rx_ts_info); + rxi_NeedMorePackets = FALSE; + rxi_PacketsUnWait(); + + AFS_RXGUNLOCK(); + MUTEX_EXIT(&rx_freePktQ_lock); + USERPRI; + } +} +#else /* RX_ENABLE_TSFPQ */ void rxi_MorePackets(int apackets) { @@ -397,10 +493,7 @@ rxi_MorePackets(int apackets) MUTEX_ENTER(&rx_freePktQ_lock); for (e = p + apackets; p < e; p++) { - p->wirevec[0].iov_base = (char *)(p->wirehead); - p->wirevec[0].iov_len = RX_HEADER_SIZE; - p->wirevec[1].iov_base = (char *)(p->localdata); - p->wirevec[1].iov_len = RX_FIRSTBUFFERSIZE; + RX_PACKET_IOV_INIT(p); p->flags |= RX_PKTFLAG_FREE; p->niovecs = 2; @@ -414,6 +507,48 @@ rxi_MorePackets(int apackets) MUTEX_EXIT(&rx_freePktQ_lock); USERPRI; } +#endif /* RX_ENABLE_TSFPQ */ + +#ifdef RX_ENABLE_TSFPQ +void +rxi_MorePacketsTSFPQ(int apackets, int flush_global, int num_keep_local) +{ + struct rx_packet *p, *e; + register struct rx_ts_info_t * rx_ts_info; + int getme; + SPLVAR; + + getme = apackets * sizeof(struct rx_packet); + p = rx_mallocedP = (struct rx_packet *)osi_Alloc(getme); + + PIN(p, getme); /* XXXXX */ + memset((char *)p, 0, getme); + RX_TS_INFO_GET(rx_ts_info); + + for (e = p + apackets; p < e; p++) { + RX_PACKET_IOV_INIT(p); + p->niovecs = 2; + + RX_TS_FPQ_CHECKIN(rx_ts_info,p); + } + rx_ts_info->_FPQ.delta += apackets; + + if (flush_global && + (num_keep_local < apackets)) { + NETPRI; + AFS_RXGLOCK(); + MUTEX_ENTER(&rx_freePktQ_lock); + + RX_TS_FPQ_LTOG2(rx_ts_info, (apackets - num_keep_local)); + rxi_NeedMorePackets = FALSE; + rxi_PacketsUnWait(); + + AFS_RXGUNLOCK(); + MUTEX_EXIT(&rx_freePktQ_lock); + USERPRI; + } +} +#endif /* RX_ENABLE_TSFPQ */ #ifndef KERNEL /* Add more packet buffers */ @@ -433,16 +568,18 @@ rxi_MorePacketsNoLock(int apackets) memset((char *)p, 0, getme); for (e = p + apackets; p < e; p++) { - p->wirevec[0].iov_base = (char *)(p->wirehead); - p->wirevec[0].iov_len = RX_HEADER_SIZE; - p->wirevec[1].iov_base = (char *)(p->localdata); - p->wirevec[1].iov_len = RX_FIRSTBUFFERSIZE; + RX_PACKET_IOV_INIT(p); p->flags |= RX_PKTFLAG_FREE; p->niovecs = 2; queue_Append(&rx_freePacketQueue, p); } rx_nFreePackets += apackets; +#ifdef RX_ENABLE_TSFPQ + /* TSFPQ patch also needs to keep track of total packets */ + rx_nPackets += apackets; + RX_TS_FPQ_COMPUTE_LIMITS; +#endif /* RX_ENABLE_TSFPQ */ rxi_NeedMorePackets = FALSE; rxi_PacketsUnWait(); } @@ -483,17 +620,55 @@ rx_CheckPackets(void) */ /* Actually free the packet p. */ +#ifdef RX_ENABLE_TSFPQ void rxi_FreePacketNoLock(struct rx_packet *p) { + register struct rx_ts_info_t * rx_ts_info; dpf(("Free %lx\n", (unsigned long)p)); - if (p->flags & RX_PKTFLAG_FREE) - osi_Panic("rxi_FreePacketNoLock: packet already free\n"); + RX_TS_INFO_GET(rx_ts_info); + RX_TS_FPQ_CHECKIN(rx_ts_info,p); + if (rx_ts_info->_FPQ.len > rx_TSFPQLocalMax) { + RX_TS_FPQ_LTOG(rx_ts_info); + } +} +#else /* RX_ENABLE_TSFPQ */ +void +rxi_FreePacketNoLock(struct rx_packet *p) +{ + dpf(("Free %lx\n", (unsigned long)p)); + + RX_FPQ_MARK_FREE(p); rx_nFreePackets++; - p->flags |= RX_PKTFLAG_FREE; queue_Append(&rx_freePacketQueue, p); } +#endif /* RX_ENABLE_TSFPQ */ + +#ifdef RX_ENABLE_TSFPQ +void +rxi_FreePacketTSFPQ(struct rx_packet *p, int flush_global) +{ + register struct rx_ts_info_t * rx_ts_info; + dpf(("Free %lx\n", (unsigned long)p)); + + RX_TS_INFO_GET(rx_ts_info); + RX_TS_FPQ_CHECKIN(rx_ts_info,p); + + if (flush_global && (rx_ts_info->_FPQ.len > rx_TSFPQLocalMax)) { + NETPRI; + MUTEX_ENTER(&rx_freePktQ_lock); + + RX_TS_FPQ_LTOG(rx_ts_info); + + /* Wakeup anyone waiting for packets */ + rxi_PacketsUnWait(); + + MUTEX_EXIT(&rx_freePktQ_lock); + USERPRI; + } +} +#endif /* RX_ENABLE_TSFPQ */ int rxi_FreeDataBufsNoLock(struct rx_packet *p, int first) @@ -517,6 +692,45 @@ rxi_FreeDataBufsNoLock(struct rx_packet *p, int first) return 0; } +#ifdef RX_ENABLE_TSFPQ +int +rxi_FreeDataBufsTSFPQ(struct rx_packet *p, int first, int flush_global) +{ + struct iovec *iov, *end; + register struct rx_ts_info_t * rx_ts_info; + + RX_TS_INFO_GET(rx_ts_info); + + if (first != 1) /* MTUXXX */ + osi_Panic("FreeDataBufs 1: first must be 1"); + iov = &p->wirevec[1]; + end = iov + (p->niovecs - 1); + if (iov->iov_base != (caddr_t) p->localdata) /* MTUXXX */ + osi_Panic("FreeDataBufs 2: vec 1 must be localdata"); + for (iov++; iov < end; iov++) { + if (!iov->iov_base) + osi_Panic("FreeDataBufs 3: vecs 2-niovecs must not be NULL"); + RX_TS_FPQ_CHECKIN(rx_ts_info,RX_CBUF_TO_PACKET(iov->iov_base, p)); + } + p->length = 0; + p->niovecs = 0; + + if (flush_global && (rx_ts_info->_FPQ.len > rx_TSFPQLocalMax)) { + NETPRI; + MUTEX_ENTER(&rx_freePktQ_lock); + + RX_TS_FPQ_LTOG(rx_ts_info); + + /* Wakeup anyone waiting for packets */ + rxi_PacketsUnWait(); + + MUTEX_EXIT(&rx_freePktQ_lock); + USERPRI; + } + return 0; +} +#endif /* RX_ENABLE_TSFPQ */ + int rxi_nBadIovecs = 0; /* rxi_RestoreDataBufs @@ -530,10 +744,7 @@ rxi_RestoreDataBufs(struct rx_packet *p) int i; struct iovec *iov = &p->wirevec[2]; - p->wirevec[0].iov_base = (char *)(p->wirehead); - p->wirevec[0].iov_len = RX_HEADER_SIZE; - p->wirevec[1].iov_base = (char *)(p->localdata); - p->wirevec[1].iov_len = RX_FIRSTBUFFERSIZE; + RX_PACKET_IOV_INIT(p); for (i = 2, iov = &p->wirevec[2]; i < p->niovecs; i++, iov++) { if (!iov->iov_base) { @@ -545,6 +756,53 @@ rxi_RestoreDataBufs(struct rx_packet *p) } } +#ifdef RX_ENABLE_TSFPQ +int +rxi_TrimDataBufs(struct rx_packet *p, int first) +{ + int length; + struct iovec *iov, *end; + register struct rx_ts_info_t * rx_ts_info; + SPLVAR; + + if (first != 1) + osi_Panic("TrimDataBufs 1: first must be 1"); + + /* Skip over continuation buffers containing message data */ + iov = &p->wirevec[2]; + end = iov + (p->niovecs - 2); + length = p->length - p->wirevec[1].iov_len; + for (; iov < end && length > 0; iov++) { + if (!iov->iov_base) + osi_Panic("TrimDataBufs 3: vecs 1-niovecs must not be NULL"); + length -= iov->iov_len; + } + + /* iov now points to the first empty data buffer. */ + if (iov >= end) + return 0; + + RX_TS_INFO_GET(rx_ts_info); + for (; iov < end; iov++) { + if (!iov->iov_base) + osi_Panic("TrimDataBufs 4: vecs 2-niovecs must not be NULL"); + RX_TS_FPQ_CHECKIN(rx_ts_info,RX_CBUF_TO_PACKET(iov->iov_base, p)); + p->niovecs--; + } + if (rx_ts_info->_FPQ.len > rx_TSFPQLocalMax) { + NETPRI; + MUTEX_ENTER(&rx_freePktQ_lock); + + RX_TS_FPQ_LTOG(rx_ts_info); + rxi_PacketsUnWait(); + + MUTEX_EXIT(&rx_freePktQ_lock); + USERPRI; + } + + return 0; +} +#else /* RX_ENABLE_TSFPQ */ int rxi_TrimDataBufs(struct rx_packet *p, int first) { @@ -585,9 +843,18 @@ rxi_TrimDataBufs(struct rx_packet *p, int first) return 0; } +#endif /* RX_ENABLE_TSFPQ */ /* Free the packet p. P is assumed not to be on any queue, i.e. * remove it yourself first if you call this routine. */ +#ifdef RX_ENABLE_TSFPQ +void +rxi_FreePacket(struct rx_packet *p) +{ + rxi_FreeDataBufsTSFPQ(p, 1, 0); + rxi_FreePacketTSFPQ(p, RX_TS_FPQ_FLUSH_GLOBAL); +} +#else /* RX_ENABLE_TSFPQ */ void rxi_FreePacket(struct rx_packet *p) { @@ -604,12 +871,78 @@ rxi_FreePacket(struct rx_packet *p) MUTEX_EXIT(&rx_freePktQ_lock); USERPRI; } - +#endif /* RX_ENABLE_TSFPQ */ /* rxi_AllocPacket sets up p->length so it reflects the number of * bytes in the packet at this point, **not including** the header. * The header is absolutely necessary, besides, this is the way the * length field is usually used */ +#ifdef RX_ENABLE_TSFPQ +struct rx_packet * +rxi_AllocPacketNoLock(int class) +{ + register struct rx_packet *p; + register struct rx_ts_info_t * rx_ts_info; + + RX_TS_INFO_GET(rx_ts_info); + +#ifdef KERNEL + if (rxi_OverQuota(class)) { + rxi_NeedMorePackets = TRUE; + MUTEX_ENTER(&rx_stats_mutex); + switch (class) { + case RX_PACKET_CLASS_RECEIVE: + rx_stats.receivePktAllocFailures++; + break; + case RX_PACKET_CLASS_SEND: + rx_stats.sendPktAllocFailures++; + break; + case RX_PACKET_CLASS_SPECIAL: + rx_stats.specialPktAllocFailures++; + break; + case RX_PACKET_CLASS_RECV_CBUF: + rx_stats.receiveCbufPktAllocFailures++; + break; + case RX_PACKET_CLASS_SEND_CBUF: + rx_stats.sendCbufPktAllocFailures++; + break; + } + MUTEX_EXIT(&rx_stats_mutex); + return (struct rx_packet *)0; + } +#endif /* KERNEL */ + + MUTEX_ENTER(&rx_stats_mutex); + rx_stats.packetRequests++; + MUTEX_EXIT(&rx_stats_mutex); + + if (queue_IsEmpty(&rx_ts_info->_FPQ)) { + +#ifdef KERNEL + if (queue_IsEmpty(&rx_freePacketQueue)) + osi_Panic("rxi_AllocPacket error"); +#else /* KERNEL */ + if (queue_IsEmpty(&rx_freePacketQueue)) + rxi_MorePacketsNoLock(rx_initSendWindow); +#endif /* KERNEL */ + + + RX_TS_FPQ_GTOL(rx_ts_info); + } + + RX_TS_FPQ_CHECKOUT(rx_ts_info,p); + + dpf(("Alloc %lx, class %d\n", (unsigned long)p, class)); + + + /* have to do this here because rx_FlushWrite fiddles with the iovs in + * order to truncate outbound packets. In the near future, may need + * to allocate bufs from a static pool here, and/or in AllocSendPacket + */ + RX_PACKET_IOV_FULLINIT(p); + return p; +} +#else /* RX_ENABLE_TSFPQ */ struct rx_packet * rxi_AllocPacketNoLock(int class) { @@ -655,28 +988,70 @@ rxi_AllocPacketNoLock(int class) rx_nFreePackets--; p = queue_First(&rx_freePacketQueue, rx_packet); - if (!(p->flags & RX_PKTFLAG_FREE)) - osi_Panic("rxi_AllocPacket: packet not free\n"); + queue_Remove(p); + RX_FPQ_MARK_USED(p); dpf(("Alloc %lx, class %d\n", (unsigned long)p, class)); - queue_Remove(p); - p->flags = 0; /* clear RX_PKTFLAG_FREE, initialize the rest */ - p->header.flags = 0; /* have to do this here because rx_FlushWrite fiddles with the iovs in * order to truncate outbound packets. In the near future, may need * to allocate bufs from a static pool here, and/or in AllocSendPacket */ - p->wirevec[0].iov_base = (char *)(p->wirehead); - p->wirevec[0].iov_len = RX_HEADER_SIZE; - p->wirevec[1].iov_base = (char *)(p->localdata); - p->wirevec[1].iov_len = RX_FIRSTBUFFERSIZE; - p->niovecs = 2; - p->length = RX_FIRSTBUFFERSIZE; + RX_PACKET_IOV_FULLINIT(p); + return p; +} +#endif /* RX_ENABLE_TSFPQ */ + +#ifdef RX_ENABLE_TSFPQ +struct rx_packet * +rxi_AllocPacketTSFPQ(int class, int pull_global) +{ + register struct rx_packet *p; + register struct rx_ts_info_t * rx_ts_info; + + RX_TS_INFO_GET(rx_ts_info); + + MUTEX_ENTER(&rx_stats_mutex); + rx_stats.packetRequests++; + MUTEX_EXIT(&rx_stats_mutex); + + if (pull_global && queue_IsEmpty(&rx_ts_info->_FPQ)) { + MUTEX_ENTER(&rx_freePktQ_lock); + + if (queue_IsEmpty(&rx_freePacketQueue)) + rxi_MorePacketsNoLock(rx_initSendWindow); + + RX_TS_FPQ_GTOL(rx_ts_info); + + MUTEX_EXIT(&rx_freePktQ_lock); + } else if (queue_IsEmpty(&rx_ts_info->_FPQ)) { + return NULL; + } + + RX_TS_FPQ_CHECKOUT(rx_ts_info,p); + + dpf(("Alloc %lx, class %d\n", (unsigned long)p, class)); + + /* have to do this here because rx_FlushWrite fiddles with the iovs in + * order to truncate outbound packets. In the near future, may need + * to allocate bufs from a static pool here, and/or in AllocSendPacket + */ + RX_PACKET_IOV_FULLINIT(p); return p; } +#endif /* RX_ENABLE_TSFPQ */ +#ifdef RX_ENABLE_TSFPQ +struct rx_packet * +rxi_AllocPacket(int class) +{ + register struct rx_packet *p; + + p = rxi_AllocPacketTSFPQ(class, RX_TS_FPQ_PULL_GLOBAL); + return p; +} +#else /* RX_ENABLE_TSFPQ */ struct rx_packet * rxi_AllocPacket(int class) { @@ -687,6 +1062,7 @@ rxi_AllocPacket(int class) MUTEX_EXIT(&rx_freePktQ_lock); return p; } +#endif /* RX_ENABLE_TSFPQ */ /* This guy comes up with as many buffers as it {takes,can get} given * the MTU for this call. It also sets the packet length before @@ -706,6 +1082,28 @@ rxi_AllocSendPacket(register struct rx_call *call, int want) rx_GetSecurityHeaderSize(rx_ConnectionOf(call)) + rx_GetSecurityMaxTrailerSize(rx_ConnectionOf(call)); +#ifdef RX_ENABLE_TSFPQ + if ((p = rxi_AllocPacketTSFPQ(RX_PACKET_CLASS_SEND, 0))) { + want += delta; + want = MIN(want, mud); + + if ((unsigned)want > p->length) + (void)rxi_AllocDataBuf(p, (want - p->length), + RX_PACKET_CLASS_SEND_CBUF); + + if ((unsigned)p->length > mud) + p->length = mud; + + if (delta >= p->length) { + rxi_FreePacket(p); + p = NULL; + } else { + p->length -= delta; + } + return p; + } +#endif /* RX_ENABLE_TSFPQ */ + while (!(call->error)) { MUTEX_ENTER(&rx_freePktQ_lock); /* if an error occurred, or we get the packet we want, we're done */ diff --git a/src/rx/rx_pthread.c b/src/rx/rx_pthread.c index 4791d1092..a31826194 100644 --- a/src/rx/rx_pthread.c +++ b/src/rx/rx_pthread.c @@ -420,3 +420,12 @@ rxi_Sendmsg(osi_socket socket, struct msghdr *msg_p, int flags) } return 0; } + +struct rx_ts_info_t * rx_ts_info_init() { + register struct rx_ts_info_t * rx_ts_info; + rx_ts_info = (rx_ts_info_t *) malloc(sizeof(rx_ts_info_t)); + assert(rx_ts_info != NULL && pthread_setspecific(rx_ts_info_key, rx_ts_info) == 0); + memset(rx_ts_info, 0, sizeof(rx_ts_info_t)); + queue_Init(&rx_ts_info->_FPQ); + return rx_ts_info; +}