From: Tom Keiser Date: Wed, 20 Apr 2005 22:23:47 +0000 (+0000) Subject: STABLE14-rx_fpq_take_three-20050420 X-Git-Tag: openafs-devel-1_3_82~37 X-Git-Url: https://git.michaelhowe.org/gitweb/?a=commitdiff_plain;h=cec1603004a10ac688882e0b3402d02774e7bf27;p=packages%2Fo%2Fopenafs.git STABLE14-rx_fpq_take_three-20050420 FIXES 17805 I've been stress testing a patch all weekend that changes the way thread-local packet quotas are computed. I was able to replicate the unbounded packet alloc problem on unix server components by eliminating my code from rxi_StartServerProcs that sets the maximum number of expected threads. This patch makes the upper thread limit get computed on the fly, adds some thread-local free packet queue statistics, and a few other minor tweaks. I still don't have a working windows development environment, so I can't say whether this will fix all the windows client problems. But, the unbounded packet allocation problem should go away with this patch. stress testing on windows succeeds as well. (cherry picked from commit 1099227e08af3df856d8a998746d98405e3ef04b) --- diff --git a/src/rx/rx.c b/src/rx/rx.c index 5f129ac12..e2c474d6a 100644 --- a/src/rx/rx.c +++ b/src/rx/rx.c @@ -653,10 +653,6 @@ rxi_StartServerProcs(int nExistingProcs) } nProcs += maxdiff; /* Extra processes needed to allow max number requested to run in any given service, under good conditions */ nProcs -= nExistingProcs; /* Subtract the number of procs that were previously created for use as server procs */ -#ifdef RX_ENABLE_TSFPQ - rx_TSFPQMaxProcs += nProcs; - RX_TS_FPQ_COMPUTE_LIMITS; -#endif /* RX_ENABLE_TSFPQ */ for (i = 0; i < nProcs; i++) { rxi_StartServerProc(rx_ServerProc, rx_stackSize); } @@ -672,10 +668,6 @@ rx_StartClientThread(void) int pid; pid = (int) pthread_self(); #endif /* AFS_PTHREAD_ENV */ -#ifdef RX_ENABLE_TSFPQ - rx_TSFPQMaxProcs++; - RX_TS_FPQ_COMPUTE_LIMITS; -#endif /* RX_ENABLE_TSFPQ */ } #endif /* AFS_NT40_ENV */ @@ -739,6 +731,12 @@ rx_StartServer(int donateMe) #endif /* AFS_NT40_ENV */ rx_ServerProc(); /* Never returns */ } +#ifdef RX_ENABLE_TSFPQ + /* no use leaving packets around in this thread's local queue if + * it isn't getting donated to the server thread pool. + */ + rxi_FlushLocalPacketsTSFPQ(); +#endif /* RX_ENABLE_TSFPQ */ return; } diff --git a/src/rx/rx_globals.h b/src/rx/rx_globals.h index 494738f6c..e3040bca4 100644 --- a/src/rx/rx_globals.h +++ b/src/rx/rx_globals.h @@ -159,6 +159,16 @@ typedef struct rx_ts_info_t { struct rx_queue queue; int len; /* local queue length */ int delta; /* number of new packets alloc'd locally since last sync w/ global queue */ + + /* FPQ stats */ + int checkin_ops; + int checkout_ops; + int gtol_ops; + int gtol_xfer; + int ltog_ops; + int ltog_xfer; + int alloc_ops; + int alloc_xfer; } _FPQ; } rx_ts_info_t; EXT struct rx_ts_info_t * rx_ts_info_init(); /* init function for thread-specific data struct */ @@ -210,14 +220,17 @@ EXT struct rx_queue rx_freePacketQueue; EXT afs_kmutex_t rx_freePktQ_lock; #endif /* RX_ENABLE_LOCKS */ -#if defined(AFS_PTHREAD_ENV) && !defined(AFS_NT40_ENV) +#if defined(AFS_PTHREAD_ENV) #define RX_ENABLE_TSFPQ EXT int rx_TSFPQGlobSize INIT(3); /* number of packets to transfer between global and local queues in one op */ EXT int rx_TSFPQLocalMax INIT(15); /* max number of packets on local FPQ before returning a glob to the global pool */ -EXT int rx_TSFPQMaxProcs INIT(1); /* max number of threads expected */ +EXT int rx_TSFPQMaxProcs INIT(0); /* max number of threads expected */ EXT void rxi_MorePacketsTSFPQ(int apackets, int flush_global, int num_keep_local); /* more flexible packet alloc function */ +EXT void rxi_AdjustLocalPacketsTSFPQ(int num_keep_local, int allow_overcommit); /* adjust thread-local queue length, for places where we know how many packets we will need a priori */ +EXT void rxi_FlushLocalPacketsTSFPQ(void); /* flush all thread-local packets to global queue */ #define RX_TS_FPQ_FLUSH_GLOBAL 1 #define RX_TS_FPQ_PULL_GLOBAL 1 +#define RX_TS_FPQ_ALLOW_OVERCOMMIT 1 /* compute the localmax and globsize values from rx_TSFPQMaxProcs and rx_nPackets. arbitarily set local max so that all threads consume 90% of packets, if all local queues are full. arbitarily set transfer glob size to 20% of max local packet queue length. @@ -247,10 +260,16 @@ EXT void rxi_MorePacketsTSFPQ(int apackets, int flush_global, int num_keep_local } \ (rx_ts_info_p)->_FPQ.len -= tsize; \ rx_nFreePackets += tsize; \ + (rx_ts_info_p)->_FPQ.ltog_ops++; \ + (rx_ts_info_p)->_FPQ.ltog_xfer += tsize; \ if ((rx_ts_info_p)->_FPQ.delta) { \ + (rx_ts_info_p)->_FPQ.alloc_ops++; \ + (rx_ts_info_p)->_FPQ.alloc_xfer += (rx_ts_info_p)->_FPQ.delta; \ + MUTEX_ENTER(&rx_stats_mutex); \ rx_nPackets += (rx_ts_info_p)->_FPQ.delta; \ - (rx_ts_info_p)->_FPQ.delta = 0; \ RX_TS_FPQ_COMPUTE_LIMITS; \ + MUTEX_EXIT(&rx_stats_mutex); \ + (rx_ts_info_p)->_FPQ.delta = 0; \ } \ } while(0) /* same as above, except user has direct control over number to transfer */ @@ -265,10 +284,16 @@ EXT void rxi_MorePacketsTSFPQ(int apackets, int flush_global, int num_keep_local } \ (rx_ts_info_p)->_FPQ.len -= (num_transfer); \ rx_nFreePackets += (num_transfer); \ + (rx_ts_info_p)->_FPQ.ltog_ops++; \ + (rx_ts_info_p)->_FPQ.ltog_xfer += (num_transfer); \ if ((rx_ts_info_p)->_FPQ.delta) { \ + (rx_ts_info_p)->_FPQ.alloc_ops++; \ + (rx_ts_info_p)->_FPQ.alloc_xfer += (rx_ts_info_p)->_FPQ.delta; \ + MUTEX_ENTER(&rx_stats_mutex); \ rx_nPackets += (rx_ts_info_p)->_FPQ.delta; \ - (rx_ts_info_p)->_FPQ.delta = 0; \ RX_TS_FPQ_COMPUTE_LIMITS; \ + MUTEX_EXIT(&rx_stats_mutex); \ + (rx_ts_info_p)->_FPQ.delta = 0; \ } \ } while(0) /* move packets from global to local (thread-specific) free packet queue. @@ -284,6 +309,23 @@ EXT void rxi_MorePacketsTSFPQ(int apackets, int flush_global, int num_keep_local } \ (rx_ts_info_p)->_FPQ.len += i; \ rx_nFreePackets -= i; \ + (rx_ts_info_p)->_FPQ.gtol_ops++; \ + (rx_ts_info_p)->_FPQ.gtol_xfer += i; \ + } while(0) +/* same as above, except user has direct control over number to transfer */ +#define RX_TS_FPQ_GTOL2(rx_ts_info_p,num_transfer) \ + do { \ + register int i; \ + register struct rx_packet * p; \ + for (i=0; i < (num_transfer); i++) { \ + p = queue_First(&rx_freePacketQueue, rx_packet); \ + queue_Remove(p); \ + queue_Append(&((rx_ts_info_p)->_FPQ),p); \ + } \ + (rx_ts_info_p)->_FPQ.len += i; \ + rx_nFreePackets -= i; \ + (rx_ts_info_p)->_FPQ.gtol_ops++; \ + (rx_ts_info_p)->_FPQ.gtol_xfer += i; \ } while(0) /* checkout a packet from the thread-specific free packet queue */ #define RX_TS_FPQ_CHECKOUT(rx_ts_info_p,p) \ @@ -292,6 +334,7 @@ EXT void rxi_MorePacketsTSFPQ(int apackets, int flush_global, int num_keep_local queue_Remove(p); \ RX_FPQ_MARK_USED(p); \ (rx_ts_info_p)->_FPQ.len--; \ + (rx_ts_info_p)->_FPQ.checkout_ops++; \ } while(0) /* check a packet into the thread-specific free packet queue */ #define RX_TS_FPQ_CHECKIN(rx_ts_info_p,p) \ @@ -299,8 +342,9 @@ EXT void rxi_MorePacketsTSFPQ(int apackets, int flush_global, int num_keep_local queue_Prepend(&((rx_ts_info_p)->_FPQ), (p)); \ RX_FPQ_MARK_FREE(p); \ (rx_ts_info_p)->_FPQ.len++; \ + (rx_ts_info_p)->_FPQ.checkin_ops++; \ } while(0) -#endif /* AFS_PTHREAD_ENV && !AFS_NT40_ENV */ +#endif /* AFS_PTHREAD_ENV */ /* Number of free packets */ EXT int rx_nFreePackets INIT(0); diff --git a/src/rx/rx_packet.c b/src/rx/rx_packet.c index d7dc99318..482f54bc5 100644 --- a/src/rx/rx_packet.c +++ b/src/rx/rx_packet.c @@ -568,11 +568,14 @@ rxi_MorePacketsNoLock(int apackets) queue_Append(&rx_freePacketQueue, p); } + rx_nFreePackets += apackets; #ifdef RX_ENABLE_TSFPQ /* TSFPQ patch also needs to keep track of total packets */ + MUTEX_ENTER(&rx_stats_mutex); rx_nPackets += apackets; RX_TS_FPQ_COMPUTE_LIMITS; + MUTEX_EXIT(&rx_stats_mutex); #endif /* RX_ENABLE_TSFPQ */ rxi_NeedMorePackets = FALSE; rxi_PacketsUnWait(); @@ -589,6 +592,44 @@ rxi_FreeAllPackets(void) UNPIN(rx_mallocedP, (rx_maxReceiveWindow + 2) * sizeof(struct rx_packet)); } +#ifdef RX_ENABLE_TSFPQ +void +rxi_AdjustLocalPacketsTSFPQ(int num_keep_local, int allow_overcommit) +{ + register struct rx_ts_info_t * rx_ts_info; + register int xfer; + SPLVAR; + + RX_TS_INFO_GET(rx_ts_info); + + if (num_keep_local != rx_ts_info->_FPQ.len) { + NETPRI; + MUTEX_ENTER(&rx_freePktQ_lock); + if (num_keep_local < rx_ts_info->_FPQ.len) { + xfer = rx_ts_info->_FPQ.len - num_keep_local; + RX_TS_FPQ_LTOG2(rx_ts_info, xfer); + rxi_PacketsUnWait(); + } else { + xfer = num_keep_local - rx_ts_info->_FPQ.len; + if ((num_keep_local > rx_TSFPQLocalMax) && !allow_overcommit) + xfer = rx_TSFPQLocalMax - rx_ts_info->_FPQ.len; + if (rx_nFreePackets < xfer) { + rxi_MorePacketsNoLock(xfer - rx_nFreePackets); + } + RX_TS_FPQ_GTOL2(rx_ts_info, xfer); + } + MUTEX_EXIT(&rx_freePktQ_lock); + USERPRI; + } +} + +void +rxi_FlushLocalPacketsTSFPQ(void) +{ + rxi_AdjustLocalPacketsTSFPQ(0, 0); +} +#endif /* RX_ENABLE_TSFPQ */ + /* Allocate more packets iff we need more continuation buffers */ /* In kernel, can't page in memory with interrupts disabled, so we * don't use the event mechanism. */ diff --git a/src/rx/rx_pthread.c b/src/rx/rx_pthread.c index a31826194..c0e4329ef 100644 --- a/src/rx/rx_pthread.c +++ b/src/rx/rx_pthread.c @@ -426,6 +426,13 @@ struct rx_ts_info_t * rx_ts_info_init() { rx_ts_info = (rx_ts_info_t *) malloc(sizeof(rx_ts_info_t)); assert(rx_ts_info != NULL && pthread_setspecific(rx_ts_info_key, rx_ts_info) == 0); memset(rx_ts_info, 0, sizeof(rx_ts_info_t)); +#ifdef RX_ENABLE_TSFPQ queue_Init(&rx_ts_info->_FPQ); + + MUTEX_ENTER(&rx_stats_mutex); + rx_TSFPQMaxProcs++; + RX_TS_FPQ_COMPUTE_LIMITS; + MUTEX_EXIT(&rx_stats_mutex); +#endif /* RX_ENABLE_TSFPQ */ return rx_ts_info; }