From 54bf41004b901ca090d63e239768588fa90bc806 Mon Sep 17 00:00:00 2001 From: Jeffrey Altman Date: Mon, 17 May 2010 16:01:03 -0400 Subject: [PATCH] rx: work harder to notice and handle MorePackets request in particular, we did badly at handling kernel requests for more packets, but other cases did not properly keep packet stats either. attempt to globally better handle demand for more packets. Change-Id: I88837fed880f582444221ec53d280ca4070b607d Reviewed-on: http://gerrit.openafs.org/1978 Tested-by: Derrick Brashear Tested-by: Jeffrey Altman Reviewed-by: Jeffrey Altman Reviewed-by: Hartmut Reuter Reviewed-by: Derrick Brashear --- src/rx/rx.c | 19 ++++++++++++++++--- src/rx/rx_globals.h | 8 ++++---- src/rx/rx_kcommon.c | 6 +++++- src/rx/rx_lwp.c | 3 +++ src/rx/rx_packet.c | 14 +++++++------- src/rx/rx_packet.h | 2 +- src/rx/rx_pthread.c | 3 +++ 7 files changed, 39 insertions(+), 16 deletions(-) diff --git a/src/rx/rx.c b/src/rx/rx.c index 0f1aca2bf..3d9cb7e93 100644 --- a/src/rx/rx.c +++ b/src/rx/rx.c @@ -509,12 +509,17 @@ rx_InitHost(u_int host, u_int port) rx_nFreePackets = 0; queue_Init(&rx_freePacketQueue); rxi_NeedMorePackets = FALSE; + rx_nPackets = 0; /* rx_nPackets is managed by rxi_MorePackets* */ + + /* enforce a minimum number of allocated packets */ + if (rx_extraPackets < rxi_nSendFrags * rx_maxSendWindow) + rx_extraPackets = rxi_nSendFrags * rx_maxSendWindow; + + /* allocate the initial free packet pool */ #ifdef RX_ENABLE_TSFPQ - rx_nPackets = 0; /* in TSFPQ version, rx_nPackets is managed by rxi_MorePackets* */ rxi_MorePacketsTSFPQ(rx_extraPackets + RX_MAX_QUOTA + 2, RX_TS_FPQ_FLUSH_GLOBAL, 0); #else /* RX_ENABLE_TSFPQ */ - rx_nPackets = rx_extraPackets + RX_MAX_QUOTA + 2; /* fudge */ - rxi_MorePackets(rx_nPackets); + rxi_MorePackets(rx_extraPackets + RX_MAX_QUOTA + 2); /* fudge */ #endif /* RX_ENABLE_TSFPQ */ rx_CheckPackets(); @@ -665,8 +670,10 @@ QuotaOK(struct rx_service *aservice) /* otherwise, can use only if there are enough to allow everyone * to go to their min quota after this guy starts. */ + MUTEX_ENTER(&rx_quota_mutex); if (rxi_availProcs > rxi_minDeficit) rc = 1; + MUTEX_EXIT(&rx_quota_mutex); return rc; } #endif /* RX_ENABLE_LOCKS */ @@ -1845,9 +1852,11 @@ rx_GetCall(int tno, struct rx_service *cur_service, osi_socket * socketp) if (cur_service != NULL) { cur_service->nRequestsRunning--; + MUTEX_ENTER(&rx_quota_mutex); if (cur_service->nRequestsRunning < cur_service->minProcs) rxi_minDeficit++; rxi_availProcs++; + MUTEX_EXIT(&rx_quota_mutex); } if (queue_IsNotEmpty(&rx_incomingCallQueue)) { struct rx_call *tcall, *ncall; @@ -1910,9 +1919,11 @@ rx_GetCall(int tno, struct rx_service *cur_service, osi_socket * socketp) service->nRequestsRunning++; /* just started call in minProcs pool, need fewer to maintain * guarantee */ + MUTEX_ENTER(&rx_quota_mutex); if (service->nRequestsRunning <= service->minProcs) rxi_minDeficit--; rxi_availProcs--; + MUTEX_EXIT(&rx_quota_mutex); rx_nWaiting--; /* MUTEX_EXIT(&call->lock); */ } else { @@ -4415,9 +4426,11 @@ rxi_AttachServerProc(struct rx_call *call, CV_SIGNAL(&sq->cv); #else service->nRequestsRunning++; + MUTEX_ENTER(&rx_quota_mutex); if (service->nRequestsRunning <= service->minProcs) rxi_minDeficit--; rxi_availProcs--; + MUTEX_EXIT(&rx_quota_mutex); osi_rxWakeup(sq); #endif } diff --git a/src/rx/rx_globals.h b/src/rx/rx_globals.h index a265ba1c7..a8d3c2449 100644 --- a/src/rx/rx_globals.h +++ b/src/rx/rx_globals.h @@ -72,8 +72,8 @@ EXT int rx_intentionallyDroppedOnReadPer100 GLOBALSINIT(0); /* Dropped on Read /* extra packets to add to the quota */ EXT int rx_extraQuota GLOBALSINIT(0); -/* extra packets to alloc (2 windows by deflt) */ -EXT int rx_extraPackets GLOBALSINIT(32); +/* extra packets to alloc (2 * maxWindowSize by default) */ +EXT int rx_extraPackets GLOBALSINIT(256); EXT int rx_stackSize GLOBALSINIT(RX_DEFAULT_STACK_SIZE); @@ -133,7 +133,7 @@ EXT int rx_initSendWindow GLOBALSINIT(16); EXT int rx_maxSendWindow GLOBALSINIT(128); EXT int rx_nackThreshold GLOBALSINIT(3); /* Number NACKS to trigger congestion recovery */ EXT int rx_nDgramThreshold GLOBALSINIT(4); /* Number of packets before increasing - * packets per datagram */ + * packets per datagram */ #define RX_MAX_FRAGS 4 EXT int rxi_nSendFrags GLOBALSINIT(RX_MAX_FRAGS); /* max fragments in a datagram */ EXT int rxi_nRecvFrags GLOBALSINIT(RX_MAX_FRAGS); @@ -158,7 +158,7 @@ EXT int rxi_HardAckRate GLOBALSINIT(RX_FAST_ACK_RATE + 1); #define ACKHACK(p,r) { if (((p)->header.seq & (rxi_SoftAckRate))==0) (p)->header.flags |= RX_REQUEST_ACK; } -EXT int rx_nPackets GLOBALSINIT(100); /* obsolete; use rx_extraPackets now */ +EXT int rx_nPackets GLOBALSINIT(0); /* preallocate packets with rx_extraPackets */ /* * pthreads thread-specific rx info support diff --git a/src/rx/rx_kcommon.c b/src/rx/rx_kcommon.c index 31fcaced2..d88ee40cf 100644 --- a/src/rx/rx_kcommon.c +++ b/src/rx/rx_kcommon.c @@ -261,12 +261,13 @@ rx_ServerProc(void *unused) { int threadID; -/* jaltman - rxi_dataQuota is protected by a mutex everywhere else */ rxi_MorePackets(rx_maxReceiveWindow + 2); /* alloc more packets */ + MUTEX_ENTER(&rx_quota_mutex); rxi_dataQuota += rx_initSendWindow; /* Reserve some pkts for hard times */ /* threadID is used for making decisions in GetCall. Get it by bumping * number of threads handling incoming calls */ threadID = rxi_availProcs++; + MUTEX_EXIT(&rx_quota_mutex); #ifdef RX_ENABLE_LOCKS AFS_GUNLOCK(); @@ -1258,6 +1259,9 @@ rxk_Listener(void) AFS_GUNLOCK(); #endif /* RX_ENABLE_LOCKS && !AFS_SUN5_ENV */ while (afs_termState != AFSOP_STOP_RXK_LISTENER) { + /* See if a check for additional packets was issued */ + rx_CheckPackets(); + if (rxp) { rxi_RestoreDataBufs(rxp); } else { diff --git a/src/rx/rx_lwp.c b/src/rx/rx_lwp.c index 715eb344c..060025b29 100644 --- a/src/rx/rx_lwp.c +++ b/src/rx/rx_lwp.c @@ -187,6 +187,9 @@ rxi_ListenerProc(fd_set * rfds, int *tnop, struct rx_call **newcallp) (*swapNameProgram) (pid, "listener", &name[0]); for (;;) { + /* See if a check for additional packets was issued */ + rx_CheckPackets(); + /* Grab a new packet only if necessary (otherwise re-use the old one) */ if (p) { rxi_RestoreDataBufs(p); diff --git a/src/rx/rx_packet.c b/src/rx/rx_packet.c index 417de1746..60697b847 100644 --- a/src/rx/rx_packet.c +++ b/src/rx/rx_packet.c @@ -614,6 +614,7 @@ rxi_MorePackets(int apackets) rx_mallocedP = p; } + rx_nPackets += apackets; rx_nFreePackets += apackets; rxi_NeedMorePackets = FALSE; rxi_PacketsUnWait(); @@ -722,13 +723,12 @@ rxi_MorePacketsNoLock(int apackets) } rx_nFreePackets += apackets; -#ifdef RX_ENABLE_TSFPQ - /* TSFPQ patch also needs to keep track of total packets */ MUTEX_ENTER(&rx_packets_mutex); rx_nPackets += apackets; +#ifdef RX_ENABLE_TSFPQ RX_TS_FPQ_COMPUTE_LIMITS; - MUTEX_EXIT(&rx_packets_mutex); #endif /* RX_ENABLE_TSFPQ */ + MUTEX_EXIT(&rx_packets_mutex); rxi_NeedMorePackets = FALSE; rxi_PacketsUnWait(); } @@ -789,7 +789,7 @@ void rx_CheckPackets(void) { if (rxi_NeedMorePackets) { - rxi_MorePackets(rx_initSendWindow); + rxi_MorePackets(rx_maxSendWindow); } } @@ -1153,7 +1153,7 @@ rxi_AllocPacketNoLock(int class) osi_Panic("rxi_AllocPacket error"); #else /* KERNEL */ if (queue_IsEmpty(&rx_freePacketQueue)) - rxi_MorePacketsNoLock(4 * rx_initSendWindow); + rxi_MorePacketsNoLock(rx_maxSendWindow); #endif /* KERNEL */ @@ -1212,7 +1212,7 @@ rxi_AllocPacketNoLock(int class) osi_Panic("rxi_AllocPacket error"); #else /* KERNEL */ if (queue_IsEmpty(&rx_freePacketQueue)) - rxi_MorePacketsNoLock(4 * rx_initSendWindow); + rxi_MorePacketsNoLock(rx_maxSendWindow); #endif /* KERNEL */ rx_nFreePackets--; @@ -1247,7 +1247,7 @@ rxi_AllocPacketTSFPQ(int class, int pull_global) MUTEX_ENTER(&rx_freePktQ_lock); if (queue_IsEmpty(&rx_freePacketQueue)) - rxi_MorePacketsNoLock(4 * rx_initSendWindow); + rxi_MorePacketsNoLock(rx_maxSendWindow); RX_TS_FPQ_GTOL(rx_ts_info); diff --git a/src/rx/rx_packet.h b/src/rx/rx_packet.h index 7028825b7..5f151d82f 100644 --- a/src/rx/rx_packet.h +++ b/src/rx/rx_packet.h @@ -135,7 +135,7 @@ * grows past 13, rxdebug packets * will need to be modified */ -/* Packet classes, for rx_AllocPacket */ +/* Packet classes, for rx_AllocPacket and rx_packetQuota */ #define RX_PACKET_CLASS_RECEIVE 0 #define RX_PACKET_CLASS_SEND 1 #define RX_PACKET_CLASS_SPECIAL 2 diff --git a/src/rx/rx_pthread.c b/src/rx/rx_pthread.c index cab61ec34..4efa7f46b 100644 --- a/src/rx/rx_pthread.c +++ b/src/rx/rx_pthread.c @@ -217,6 +217,9 @@ rxi_ListenerProc(osi_socket sock, int *tnop, struct rx_call **newcallp) MUTEX_EXIT(&listener_mutex); for (;;) { + /* See if a check for additional packets was issued */ + rx_CheckPackets(); + /* * Grab a new packet only if necessary (otherwise re-use the old one) */ -- 2.39.5