From 1099227e08af3df856d8a998746d98405e3ef04b Mon Sep 17 00:00:00 2001 From: Tom Keiser Date: Wed, 20 Apr 2005 22:21:16 +0000 Subject: [PATCH] rx_fpq_take_three-20050420 FIXES 17805 I've been stress testing a patch all weekend that changes the way thread-local packet quotas are computed. I was able to replicate the unbounded packet alloc problem on unix server components by eliminating my code from rxi_StartServerProcs that sets the maximum number of expected threads. This patch makes the upper thread limit get computed on the fly, adds some thread-local free packet queue statistics, and a few other minor tweaks. I still don't have a working windows development environment, so I can't say whether this will fix all the windows client problems. But, the unbounded packet allocation problem should go away with this patch. stress testing on windows succeeds as well. --- src/rx/rx.c | 14 +++++------- src/rx/rx_globals.h | 54 ++++++++++++++++++++++++++++++++++++++++----- src/rx/rx_packet.c | 41 ++++++++++++++++++++++++++++++++++ src/rx/rx_pthread.c | 7 ++++++ 4 files changed, 103 insertions(+), 13 deletions(-) diff --git a/src/rx/rx.c b/src/rx/rx.c index 5f129ac12..e2c474d6a 100644 --- a/src/rx/rx.c +++ b/src/rx/rx.c @@ -653,10 +653,6 @@ rxi_StartServerProcs(int nExistingProcs) } nProcs += maxdiff; /* Extra processes needed to allow max number requested to run in any given service, under good conditions */ nProcs -= nExistingProcs; /* Subtract the number of procs that were previously created for use as server procs */ -#ifdef RX_ENABLE_TSFPQ - rx_TSFPQMaxProcs += nProcs; - RX_TS_FPQ_COMPUTE_LIMITS; -#endif /* RX_ENABLE_TSFPQ */ for (i = 0; i < nProcs; i++) { rxi_StartServerProc(rx_ServerProc, rx_stackSize); } @@ -672,10 +668,6 @@ rx_StartClientThread(void) int pid; pid = (int) pthread_self(); #endif /* AFS_PTHREAD_ENV */ -#ifdef RX_ENABLE_TSFPQ - rx_TSFPQMaxProcs++; - RX_TS_FPQ_COMPUTE_LIMITS; -#endif /* RX_ENABLE_TSFPQ */ } #endif /* AFS_NT40_ENV */ @@ -739,6 +731,12 @@ rx_StartServer(int donateMe) #endif /* AFS_NT40_ENV */ rx_ServerProc(); /* Never returns */ } +#ifdef RX_ENABLE_TSFPQ + /* no use leaving packets around in this thread's local queue if + * it isn't getting donated to the server thread pool. + */ + rxi_FlushLocalPacketsTSFPQ(); +#endif /* RX_ENABLE_TSFPQ */ return; } diff --git a/src/rx/rx_globals.h b/src/rx/rx_globals.h index 494738f6c..e3040bca4 100644 --- a/src/rx/rx_globals.h +++ b/src/rx/rx_globals.h @@ -159,6 +159,16 @@ typedef struct rx_ts_info_t { struct rx_queue queue; int len; /* local queue length */ int delta; /* number of new packets alloc'd locally since last sync w/ global queue */ + + /* FPQ stats */ + int checkin_ops; + int checkout_ops; + int gtol_ops; + int gtol_xfer; + int ltog_ops; + int ltog_xfer; + int alloc_ops; + int alloc_xfer; } _FPQ; } rx_ts_info_t; EXT struct rx_ts_info_t * rx_ts_info_init(); /* init function for thread-specific data struct */ @@ -210,14 +220,17 @@ EXT struct rx_queue rx_freePacketQueue; EXT afs_kmutex_t rx_freePktQ_lock; #endif /* RX_ENABLE_LOCKS */ -#if defined(AFS_PTHREAD_ENV) && !defined(AFS_NT40_ENV) +#if defined(AFS_PTHREAD_ENV) #define RX_ENABLE_TSFPQ EXT int rx_TSFPQGlobSize INIT(3); /* number of packets to transfer between global and local queues in one op */ EXT int rx_TSFPQLocalMax INIT(15); /* max number of packets on local FPQ before returning a glob to the global pool */ -EXT int rx_TSFPQMaxProcs INIT(1); /* max number of threads expected */ +EXT int rx_TSFPQMaxProcs INIT(0); /* max number of threads expected */ EXT void rxi_MorePacketsTSFPQ(int apackets, int flush_global, int num_keep_local); /* more flexible packet alloc function */ +EXT void rxi_AdjustLocalPacketsTSFPQ(int num_keep_local, int allow_overcommit); /* adjust thread-local queue length, for places where we know how many packets we will need a priori */ +EXT void rxi_FlushLocalPacketsTSFPQ(void); /* flush all thread-local packets to global queue */ #define RX_TS_FPQ_FLUSH_GLOBAL 1 #define RX_TS_FPQ_PULL_GLOBAL 1 +#define RX_TS_FPQ_ALLOW_OVERCOMMIT 1 /* compute the localmax and globsize values from rx_TSFPQMaxProcs and rx_nPackets. arbitarily set local max so that all threads consume 90% of packets, if all local queues are full. arbitarily set transfer glob size to 20% of max local packet queue length. @@ -247,10 +260,16 @@ EXT void rxi_MorePacketsTSFPQ(int apackets, int flush_global, int num_keep_local } \ (rx_ts_info_p)->_FPQ.len -= tsize; \ rx_nFreePackets += tsize; \ + (rx_ts_info_p)->_FPQ.ltog_ops++; \ + (rx_ts_info_p)->_FPQ.ltog_xfer += tsize; \ if ((rx_ts_info_p)->_FPQ.delta) { \ + (rx_ts_info_p)->_FPQ.alloc_ops++; \ + (rx_ts_info_p)->_FPQ.alloc_xfer += (rx_ts_info_p)->_FPQ.delta; \ + MUTEX_ENTER(&rx_stats_mutex); \ rx_nPackets += (rx_ts_info_p)->_FPQ.delta; \ - (rx_ts_info_p)->_FPQ.delta = 0; \ RX_TS_FPQ_COMPUTE_LIMITS; \ + MUTEX_EXIT(&rx_stats_mutex); \ + (rx_ts_info_p)->_FPQ.delta = 0; \ } \ } while(0) /* same as above, except user has direct control over number to transfer */ @@ -265,10 +284,16 @@ EXT void rxi_MorePacketsTSFPQ(int apackets, int flush_global, int num_keep_local } \ (rx_ts_info_p)->_FPQ.len -= (num_transfer); \ rx_nFreePackets += (num_transfer); \ + (rx_ts_info_p)->_FPQ.ltog_ops++; \ + (rx_ts_info_p)->_FPQ.ltog_xfer += (num_transfer); \ if ((rx_ts_info_p)->_FPQ.delta) { \ + (rx_ts_info_p)->_FPQ.alloc_ops++; \ + (rx_ts_info_p)->_FPQ.alloc_xfer += (rx_ts_info_p)->_FPQ.delta; \ + MUTEX_ENTER(&rx_stats_mutex); \ rx_nPackets += (rx_ts_info_p)->_FPQ.delta; \ - (rx_ts_info_p)->_FPQ.delta = 0; \ RX_TS_FPQ_COMPUTE_LIMITS; \ + MUTEX_EXIT(&rx_stats_mutex); \ + (rx_ts_info_p)->_FPQ.delta = 0; \ } \ } while(0) /* move packets from global to local (thread-specific) free packet queue. @@ -284,6 +309,23 @@ EXT void rxi_MorePacketsTSFPQ(int apackets, int flush_global, int num_keep_local } \ (rx_ts_info_p)->_FPQ.len += i; \ rx_nFreePackets -= i; \ + (rx_ts_info_p)->_FPQ.gtol_ops++; \ + (rx_ts_info_p)->_FPQ.gtol_xfer += i; \ + } while(0) +/* same as above, except user has direct control over number to transfer */ +#define RX_TS_FPQ_GTOL2(rx_ts_info_p,num_transfer) \ + do { \ + register int i; \ + register struct rx_packet * p; \ + for (i=0; i < (num_transfer); i++) { \ + p = queue_First(&rx_freePacketQueue, rx_packet); \ + queue_Remove(p); \ + queue_Append(&((rx_ts_info_p)->_FPQ),p); \ + } \ + (rx_ts_info_p)->_FPQ.len += i; \ + rx_nFreePackets -= i; \ + (rx_ts_info_p)->_FPQ.gtol_ops++; \ + (rx_ts_info_p)->_FPQ.gtol_xfer += i; \ } while(0) /* checkout a packet from the thread-specific free packet queue */ #define RX_TS_FPQ_CHECKOUT(rx_ts_info_p,p) \ @@ -292,6 +334,7 @@ EXT void rxi_MorePacketsTSFPQ(int apackets, int flush_global, int num_keep_local queue_Remove(p); \ RX_FPQ_MARK_USED(p); \ (rx_ts_info_p)->_FPQ.len--; \ + (rx_ts_info_p)->_FPQ.checkout_ops++; \ } while(0) /* check a packet into the thread-specific free packet queue */ #define RX_TS_FPQ_CHECKIN(rx_ts_info_p,p) \ @@ -299,8 +342,9 @@ EXT void rxi_MorePacketsTSFPQ(int apackets, int flush_global, int num_keep_local queue_Prepend(&((rx_ts_info_p)->_FPQ), (p)); \ RX_FPQ_MARK_FREE(p); \ (rx_ts_info_p)->_FPQ.len++; \ + (rx_ts_info_p)->_FPQ.checkin_ops++; \ } while(0) -#endif /* AFS_PTHREAD_ENV && !AFS_NT40_ENV */ +#endif /* AFS_PTHREAD_ENV */ /* Number of free packets */ EXT int rx_nFreePackets INIT(0); diff --git a/src/rx/rx_packet.c b/src/rx/rx_packet.c index d7dc99318..482f54bc5 100644 --- a/src/rx/rx_packet.c +++ b/src/rx/rx_packet.c @@ -568,11 +568,14 @@ rxi_MorePacketsNoLock(int apackets) queue_Append(&rx_freePacketQueue, p); } + rx_nFreePackets += apackets; #ifdef RX_ENABLE_TSFPQ /* TSFPQ patch also needs to keep track of total packets */ + MUTEX_ENTER(&rx_stats_mutex); rx_nPackets += apackets; RX_TS_FPQ_COMPUTE_LIMITS; + MUTEX_EXIT(&rx_stats_mutex); #endif /* RX_ENABLE_TSFPQ */ rxi_NeedMorePackets = FALSE; rxi_PacketsUnWait(); @@ -589,6 +592,44 @@ rxi_FreeAllPackets(void) UNPIN(rx_mallocedP, (rx_maxReceiveWindow + 2) * sizeof(struct rx_packet)); } +#ifdef RX_ENABLE_TSFPQ +void +rxi_AdjustLocalPacketsTSFPQ(int num_keep_local, int allow_overcommit) +{ + register struct rx_ts_info_t * rx_ts_info; + register int xfer; + SPLVAR; + + RX_TS_INFO_GET(rx_ts_info); + + if (num_keep_local != rx_ts_info->_FPQ.len) { + NETPRI; + MUTEX_ENTER(&rx_freePktQ_lock); + if (num_keep_local < rx_ts_info->_FPQ.len) { + xfer = rx_ts_info->_FPQ.len - num_keep_local; + RX_TS_FPQ_LTOG2(rx_ts_info, xfer); + rxi_PacketsUnWait(); + } else { + xfer = num_keep_local - rx_ts_info->_FPQ.len; + if ((num_keep_local > rx_TSFPQLocalMax) && !allow_overcommit) + xfer = rx_TSFPQLocalMax - rx_ts_info->_FPQ.len; + if (rx_nFreePackets < xfer) { + rxi_MorePacketsNoLock(xfer - rx_nFreePackets); + } + RX_TS_FPQ_GTOL2(rx_ts_info, xfer); + } + MUTEX_EXIT(&rx_freePktQ_lock); + USERPRI; + } +} + +void +rxi_FlushLocalPacketsTSFPQ(void) +{ + rxi_AdjustLocalPacketsTSFPQ(0, 0); +} +#endif /* RX_ENABLE_TSFPQ */ + /* Allocate more packets iff we need more continuation buffers */ /* In kernel, can't page in memory with interrupts disabled, so we * don't use the event mechanism. */ diff --git a/src/rx/rx_pthread.c b/src/rx/rx_pthread.c index a31826194..c0e4329ef 100644 --- a/src/rx/rx_pthread.c +++ b/src/rx/rx_pthread.c @@ -426,6 +426,13 @@ struct rx_ts_info_t * rx_ts_info_init() { rx_ts_info = (rx_ts_info_t *) malloc(sizeof(rx_ts_info_t)); assert(rx_ts_info != NULL && pthread_setspecific(rx_ts_info_key, rx_ts_info) == 0); memset(rx_ts_info, 0, sizeof(rx_ts_info_t)); +#ifdef RX_ENABLE_TSFPQ queue_Init(&rx_ts_info->_FPQ); + + MUTEX_ENTER(&rx_stats_mutex); + rx_TSFPQMaxProcs++; + RX_TS_FPQ_COMPUTE_LIMITS; + MUTEX_EXIT(&rx_stats_mutex); +#endif /* RX_ENABLE_TSFPQ */ return rx_ts_info; } -- 2.39.5