From 887a6eb59b5aa87bc8a5c3ec1fc78f80ac19a6f2 Mon Sep 17 00:00:00 2001 From: Jeffrey Altman Date: Sun, 11 Jan 2009 05:27:01 +0000 Subject: [PATCH] rx-finer-grained-locking-20090110 LICENSE MIT not everything should be under the rx_stats_mutex. doing so results in too much lock contention. add new mutexes: rx_quota_mutex, rx_waiting_mutex, rx_pthread_mutex, and rx_packets_mutex. Each new mutex protects an associated group of variables. --- src/rx/rx.c | 98 +++++++++++++++++++++++++++++++-------------- src/rx/rx_globals.h | 17 +++++--- src/rx/rx_kcommon.c | 1 + src/rx/rx_packet.c | 13 +++--- src/rx/rx_pthread.c | 18 +++++---- 5 files changed, 98 insertions(+), 49 deletions(-) diff --git a/src/rx/rx.c b/src/rx/rx.c index 6802c3815..49184f4b4 100644 --- a/src/rx/rx.c +++ b/src/rx/rx.c @@ -165,6 +165,10 @@ static unsigned int rxi_rpc_process_stat_cnt; */ extern pthread_mutex_t rx_stats_mutex; +extern pthread_mutex_t rx_waiting_mutex; +extern pthread_mutex_t rx_quota_mutex; +extern pthread_mutex_t rx_pthread_mutex; +extern pthread_mutex_t rx_packets_mutex; extern pthread_mutex_t des_init_mutex; extern pthread_mutex_t des_random_mutex; extern pthread_mutex_t rx_clock_mutex; @@ -193,6 +197,14 @@ rxi_InitPthread(void) == 0); assert(pthread_mutex_init(&rx_stats_mutex, (const pthread_mutexattr_t *)0) == 0); + assert(pthread_mutex_init(&rx_waiting_mutex, (const pthread_mutexattr_t *)0) + == 0); + assert(pthread_mutex_init(&rx_quota_mutex, (const pthread_mutexattr_t *)0) + == 0); + assert(pthread_mutex_init(&rx_pthread_mutex, (const pthread_mutexattr_t *)0) + == 0); + assert(pthread_mutex_init(&rx_packets_mutex, (const pthread_mutexattr_t *)0) + == 0); assert(pthread_mutex_init (&rxi_connCacheMutex, (const pthread_mutexattr_t *)0) == 0); assert(pthread_mutex_init(&rx_init_mutex, (const pthread_mutexattr_t *)0) @@ -256,19 +268,40 @@ pthread_once_t rx_once_init = PTHREAD_ONCE_INIT; assert(pthread_once(&rx_once_init, rxi_InitPthread)==0) /* * The rx_stats_mutex mutex protects the following global variables: - * rxi_dataQuota - * rxi_minDeficit - * rxi_availProcs - * rxi_totalMin * rxi_lowConnRefCount * rxi_lowPeerRefCount * rxi_nCalls * rxi_Alloccnt * rxi_Allocsize - * rx_nFreePackets * rx_tq_debug * rx_stats */ + +/* + * The rx_quota_mutex mutex protects the following global variables: + * rxi_dataQuota + * rxi_minDeficit + * rxi_availProcs + * rxi_totalMin + */ + +/* + * The rx_freePktQ_lock protects the following global variables: + * rx_nFreePackets + */ + +/* + * The rx_packets_mutex mutex protects the following global variables: + * rx_nPackets + * rx_TSFPQLocalMax + * rx_TSFPQGlobSize + * rx_TSFPQMaxProcs + */ + +/* + * The rx_pthread_mutex mutex protects the following global variables: + * rxi_pthread_hinum + */ #else #define INIT_PTHREAD_LOCKS #endif @@ -457,6 +490,10 @@ rx_InitHost(u_int host, u_int port) rxdb_init(); #endif /* RX_LOCKS_DB */ MUTEX_INIT(&rx_stats_mutex, "rx_stats_mutex", MUTEX_DEFAULT, 0); + MUTEX_INIT(&rx_waiting_mutex, "rx_waiting_mutex", MUTEX_DEFAULT, 0); + MUTEX_INIT(&rx_quota_mutex, "rx_quota_mutex", MUTEX_DEFAULT, 0); + MUTEX_INIT(&rx_pthread_mutex, "rx_pthread_mutex", MUTEX_DEFAULT, 0); + MUTEX_INIT(&rx_packets_mutex, "rx_packets_mutex", MUTEX_DEFAULT, 0); MUTEX_INIT(&rx_rpc_stats, "rx_rpc_stats", MUTEX_DEFAULT, 0); MUTEX_INIT(&rx_freePktQ_lock, "rx_freePktQ_lock", MUTEX_DEFAULT, 0); MUTEX_INIT(&freeSQEList_lock, "freeSQEList lock", MUTEX_DEFAULT, 0); @@ -538,7 +575,7 @@ rx_InitHost(u_int host, u_int port) rx_SetEpoch(tv.tv_sec); /* Start time of this package, rxkad * will provide a randomer value. */ #endif - rx_MutexAdd(rxi_dataQuota, rx_extraQuota, rx_stats_mutex); /* + extra pkts caller asked to rsrv */ + rx_MutexAdd(rxi_dataQuota, rx_extraQuota, rx_stats_quota); /* + extra pkts caller asked to rsrv */ /* *Slightly* random start time for the cid. This is just to help * out with the hashing function at the peer */ rx_nextCid = ((tv.tv_sec ^ tv.tv_usec) << RX_CIDSHIFT); @@ -600,7 +637,8 @@ QuotaOK(register struct rx_service *aservice) /* otherwise, can use only if there are enough to allow everyone * to go to their min quota after this guy starts. */ - MUTEX_ENTER(&rx_stats_mutex); + + MUTEX_ENTER(&rx_quota_mutex); if ((aservice->nRequestsRunning < aservice->minProcs) || (rxi_availProcs > rxi_minDeficit)) { aservice->nRequestsRunning++; @@ -609,10 +647,10 @@ QuotaOK(register struct rx_service *aservice) if (aservice->nRequestsRunning <= aservice->minProcs) rxi_minDeficit--; rxi_availProcs--; - MUTEX_EXIT(&rx_stats_mutex); + MUTEX_EXIT(&rx_quota_mutex); return 1; } - MUTEX_EXIT(&rx_stats_mutex); + MUTEX_EXIT(&rx_quota_mutex); return 0; } @@ -621,11 +659,11 @@ static void ReturnToServerPool(register struct rx_service *aservice) { aservice->nRequestsRunning--; - MUTEX_ENTER(&rx_stats_mutex); + MUTEX_ENTER(&rx_quota_mutex); if (aservice->nRequestsRunning < aservice->minProcs) rxi_minDeficit++; rxi_availProcs++; - MUTEX_EXIT(&rx_stats_mutex); + MUTEX_EXIT(&rx_quota_mutex); } #else /* RX_ENABLE_LOCKS */ @@ -726,13 +764,13 @@ rx_StartServer(int donateMe) service = rx_services[i]; if (service == (struct rx_service *)0) break; - MUTEX_ENTER(&rx_stats_mutex); + MUTEX_ENTER(&rx_quota_mutex); rxi_totalMin += service->minProcs; /* below works even if a thread is running, since minDeficit would * still have been decremented and later re-incremented. */ rxi_minDeficit += service->minProcs; - MUTEX_EXIT(&rx_stats_mutex); + MUTEX_EXIT(&rx_quota_mutex); } /* Turn on reaping of idle server connections */ @@ -1689,7 +1727,7 @@ rx_GetCall(int tno, struct rx_service *cur_service, osi_socket * socketp) if (call->flags & RX_CALL_WAIT_PROC) { call->flags &= ~RX_CALL_WAIT_PROC; - rx_MutexDecrement(rx_nWaiting, rx_stats_mutex); + rx_MutexDecrement(rx_nWaiting, rx_waiting_mutex); } if (call->state != RX_STATE_PRECALL || call->error) { @@ -3083,7 +3121,8 @@ static int TooLow(struct rx_packet *ap, struct rx_call *acall) { int rc = 0; - MUTEX_ENTER(&rx_stats_mutex); + + MUTEX_ENTER(&rx_quota_mutex); if (((ap->header.seq != 1) && (acall->flags & RX_CALL_CLEARED) && (acall->state == RX_STATE_PRECALL)) || ((rx_nFreePackets < rxi_dataQuota + 2) @@ -3091,7 +3130,7 @@ TooLow(struct rx_packet *ap, struct rx_call *acall) && (acall->flags & RX_CALL_READER_WAIT)))) { rc = 1; } - MUTEX_EXIT(&rx_stats_mutex); + MUTEX_EXIT(&rx_quota_mutex); return rc; } #endif /* KERNEL */ @@ -4219,10 +4258,10 @@ rxi_AttachServerProc(register struct rx_call *call, if (!(call->flags & RX_CALL_WAIT_PROC)) { call->flags |= RX_CALL_WAIT_PROC; - MUTEX_ENTER(&rx_stats_mutex); - rx_nWaiting++; - rx_nWaited++; - MUTEX_EXIT(&rx_stats_mutex); + MUTEX_ENTER(&rx_waiting_mutex); + rx_nWaiting++; + rx_nWaited++; + MUTEX_EXIT(&rx_waiting_mutex); rxi_calltrace(RX_CALL_ARRIVAL, call); SET_CALL_QUEUE_LOCK(call, &rx_serverPool_lock); queue_Append(&rx_incomingCallQueue, call); @@ -4249,9 +4288,10 @@ rxi_AttachServerProc(register struct rx_call *call, call->flags &= ~RX_CALL_WAIT_PROC; if (queue_IsOnQueue(call)) { queue_Remove(call); - MUTEX_ENTER(&rx_stats_mutex); - rx_nWaiting--; - MUTEX_EXIT(&rx_stats_mutex); + + MUTEX_ENTER(&rx_waiting_mutex); + rx_nWaiting--; + MUTEX_EXIT(&rx_waiting_mutex); } } call->state = RX_STATE_ACTIVE; @@ -4744,9 +4784,10 @@ rxi_ResetCall(register struct rx_call *call, register int newcall) if (queue_IsOnQueue(call)) { queue_Remove(call); if (flags & RX_CALL_WAIT_PROC) { - MUTEX_ENTER(&rx_stats_mutex); - rx_nWaiting--; - MUTEX_EXIT(&rx_stats_mutex); + + MUTEX_ENTER(&rx_waiting_mutex); + rx_nWaiting--; + MUTEX_EXIT(&rx_waiting_mutex); } } MUTEX_EXIT(call->call_queue_lock); @@ -7007,11 +7048,10 @@ shutdown_rx(void) rxi_FreeAllPackets(); - MUTEX_ENTER(&rx_stats_mutex); + MUTEX_ENTER(&rx_quota_mutex); rxi_dataQuota = RX_MAX_QUOTA; rxi_availProcs = rxi_totalMin = rxi_minDeficit = 0; - MUTEX_EXIT(&rx_stats_mutex); - + MUTEX_EXIT(&rx_quota_mutex); rxinit_status = 1; UNLOCK_RX_INIT; } diff --git a/src/rx/rx_globals.h b/src/rx/rx_globals.h index 87ef9878a..a25f70d9c 100644 --- a/src/rx/rx_globals.h +++ b/src/rx/rx_globals.h @@ -267,6 +267,7 @@ void rxi_FlushLocalPacketsTSFPQ(void); /* flush all thread-local packets to glob * by each call to AllocPacketBufs() will increase indefinitely without a cap on the transfer * glob size. A cap of 64 is selected because that will produce an allocation of greater than * three times that amount which is greater than half of ncalls * maxReceiveWindow. + * Must be called under rx_packets_mutex. */ #define RX_TS_FPQ_COMPUTE_LIMITS \ do { \ @@ -308,9 +309,9 @@ void rxi_FlushLocalPacketsTSFPQ(void); /* flush all thread-local packets to glob (rx_ts_info_p)->_FPQ.ltog_ops++; \ (rx_ts_info_p)->_FPQ.ltog_xfer += tsize; \ if ((rx_ts_info_p)->_FPQ.delta) { \ - MUTEX_ENTER(&rx_stats_mutex); \ + MUTEX_ENTER(&rx_packets_mutex); \ RX_TS_FPQ_COMPUTE_LIMITS; \ - MUTEX_EXIT(&rx_stats_mutex); \ + MUTEX_EXIT(&rx_packets_mutex); \ (rx_ts_info_p)->_FPQ.delta = 0; \ } \ } while(0) @@ -328,9 +329,9 @@ void rxi_FlushLocalPacketsTSFPQ(void); /* flush all thread-local packets to glob (rx_ts_info_p)->_FPQ.ltog_ops++; \ (rx_ts_info_p)->_FPQ.ltog_xfer += (num_transfer); \ if ((rx_ts_info_p)->_FPQ.delta) { \ - MUTEX_ENTER(&rx_stats_mutex); \ + MUTEX_ENTER(&rx_packets_mutex); \ RX_TS_FPQ_COMPUTE_LIMITS; \ - MUTEX_EXIT(&rx_stats_mutex); \ + MUTEX_EXIT(&rx_packets_mutex); \ (rx_ts_info_p)->_FPQ.delta = 0; \ } \ } while(0) @@ -601,14 +602,18 @@ EXT int rxi_callAbortDelay GLOBALSINIT(3000); EXT int rxi_fcfs_thread_num GLOBALSINIT(0); EXT pthread_key_t rx_thread_id_key; /* keep track of pthread numbers - protected by rx_stats_mutex, - except in rx_Init() before mutex exists! */ + * except in rx_Init() before mutex exists! */ EXT int rxi_pthread_hinum GLOBALSINIT(0); #else #define rxi_fcfs_thread_num (0) #endif #if defined(RX_ENABLE_LOCKS) -EXT afs_kmutex_t rx_stats_mutex; /* used to activate stats gathering */ +EXT afs_kmutex_t rx_stats_mutex; /* used to protect stats gathering */ +EXT afs_kmutex_t rx_waiting_mutex; /* used to protect waiting counters */ +EXT afs_kmutex_t rx_quota_mutex; /* used to protect quota counters */ +EXT afs_kmutex_t rx_pthread_mutex; /* used to protect pthread counters */ +EXT afs_kmutex_t rx_packets_mutex; /* used to protect packet counters */ #endif EXT2 int rx_enable_stats GLOBALSINIT(0); diff --git a/src/rx/rx_kcommon.c b/src/rx/rx_kcommon.c index 72c8dbacb..ea5d28437 100644 --- a/src/rx/rx_kcommon.c +++ b/src/rx/rx_kcommon.c @@ -289,6 +289,7 @@ rx_ServerProc(void *unused) { int threadID; +/* jaltman - rxi_dataQuota is protected by a mutex everywhere else */ rxi_MorePackets(rx_maxReceiveWindow + 2); /* alloc more packets */ rxi_dataQuota += rx_initSendWindow; /* Reserve some pkts for hard times */ /* threadID is used for making decisions in GetCall. Get it by bumping diff --git a/src/rx/rx_packet.c b/src/rx/rx_packet.c index d68ce7c4f..7f0be720c 100644 --- a/src/rx/rx_packet.c +++ b/src/rx/rx_packet.c @@ -549,10 +549,11 @@ rxi_MorePackets(int apackets) RX_TS_FPQ_LOCAL_ALLOC(rx_ts_info,apackets); /* TSFPQ patch also needs to keep track of total packets */ - MUTEX_ENTER(&rx_stats_mutex); + + MUTEX_ENTER(&rx_packets_mutex); rx_nPackets += apackets; RX_TS_FPQ_COMPUTE_LIMITS; - MUTEX_EXIT(&rx_stats_mutex); + MUTEX_EXIT(&rx_packets_mutex); for (e = p + apackets; p < e; p++) { RX_PACKET_IOV_INIT(p); @@ -641,10 +642,10 @@ rxi_MorePacketsTSFPQ(int apackets, int flush_global, int num_keep_local) RX_TS_FPQ_LOCAL_ALLOC(rx_ts_info,apackets); /* TSFPQ patch also needs to keep track of total packets */ - MUTEX_ENTER(&rx_stats_mutex); + MUTEX_ENTER(&rx_packets_mutex); rx_nPackets += apackets; RX_TS_FPQ_COMPUTE_LIMITS; - MUTEX_EXIT(&rx_stats_mutex); + MUTEX_EXIT(&rx_packets_mutex); for (e = p + apackets; p < e; p++) { RX_PACKET_IOV_INIT(p); @@ -724,10 +725,10 @@ rxi_MorePacketsNoLock(int apackets) rx_nFreePackets += apackets; #ifdef RX_ENABLE_TSFPQ /* TSFPQ patch also needs to keep track of total packets */ - MUTEX_ENTER(&rx_stats_mutex); + MUTEX_ENTER(&rx_packets_mutex); rx_nPackets += apackets; RX_TS_FPQ_COMPUTE_LIMITS; - MUTEX_EXIT(&rx_stats_mutex); + MUTEX_EXIT(&rx_packets_mutex); #endif /* RX_ENABLE_TSFPQ */ rxi_NeedMorePackets = FALSE; rxi_PacketsUnWait(); diff --git a/src/rx/rx_pthread.c b/src/rx/rx_pthread.c index cc27ae492..028365e29 100644 --- a/src/rx/rx_pthread.c +++ b/src/rx/rx_pthread.c @@ -288,7 +288,7 @@ rx_ServerProc(void * dummy) struct rx_call *newcall = NULL; rxi_MorePackets(rx_maxReceiveWindow + 2); /* alloc more packets */ - MUTEX_ENTER(&rx_stats_mutex); + MUTEX_ENTER(&rx_quota_mutex); rxi_dataQuota += rx_initSendWindow; /* Reserve some pkts for hard times */ /* threadID is used for making decisions in GetCall. Get it by bumping * number of threads handling incoming calls */ @@ -303,11 +303,13 @@ rx_ServerProc(void * dummy) * So either introduce yet another counter or flag the FCFS * thread... chose the latter. */ + MUTEX_ENTER(&rx_pthread_mutex); threadID = ++rxi_pthread_hinum; + MUTEX_EXIT(&rx_pthread_mutex); if (rxi_fcfs_thread_num == 0 && rxi_fcfs_thread_num != threadID) rxi_fcfs_thread_num = threadID; ++rxi_availProcs; - MUTEX_EXIT(&rx_stats_mutex); + MUTEX_EXIT(&rx_quota_mutex); while (1) { sock = OSI_NULLSOCKET; @@ -358,9 +360,9 @@ rxi_StartListener(void) dpf(("Unable to create Rx event handling thread\n")); exit(1); } - MUTEX_ENTER(&rx_stats_mutex); + MUTEX_ENTER(&rx_pthread_mutex); ++rxi_pthread_hinum; - MUTEX_EXIT(&rx_stats_mutex); + MUTEX_EXIT(&rx_pthread_mutex); AFS_SIGSET_RESTORE(); assert(pthread_mutex_lock(&listener_mutex) == 0); @@ -397,9 +399,9 @@ rxi_Listen(osi_socket sock) dpf(("Unable to create socket listener thread\n")); exit(1); } - MUTEX_ENTER(&rx_stats_mutex); + MUTEX_ENTER(&rx_pthread_mutex); ++rxi_pthread_hinum; - MUTEX_EXIT(&rx_stats_mutex); + MUTEX_EXIT(&rx_pthread_mutex); AFS_SIGSET_RESTORE(); return 0; } @@ -452,10 +454,10 @@ struct rx_ts_info_t * rx_ts_info_init() { #ifdef RX_ENABLE_TSFPQ queue_Init(&rx_ts_info->_FPQ); - MUTEX_ENTER(&rx_stats_mutex); + MUTEX_ENTER(&rx_packets_mutex); rx_TSFPQMaxProcs++; RX_TS_FPQ_COMPUTE_LIMITS; - MUTEX_EXIT(&rx_stats_mutex); + MUTEX_EXIT(&rx_packets_mutex); #endif /* RX_ENABLE_TSFPQ */ return rx_ts_info; } -- 2.39.5