From 864f275da9c7e383f204b2055ee27598e03358e6 Mon Sep 17 00:00:00 2001 From: Nickolai Zeldovich Date: Fri, 31 Jan 2003 21:30:56 +0000 Subject: [PATCH] STABLE12-rx-avoid-deadlock-in-attaching-call-20030122 Avoid a subtle cycle in the locking order hierarchy involving the rxi_ServerThreadSelectingCall flag, and instead rely on call->lock to make sure we don't attach the same call twice. Also some miscellaneous cleanup and code unification. (cherry picked from commit a6d9298d3d299cc2c776b22a2800b5c2044ea497) --- src/rx/rx.c | 123 +++++++++++++++++--------------------------- src/rx/rx_globals.h | 1 - 2 files changed, 47 insertions(+), 77 deletions(-) diff --git a/src/rx/rx.c b/src/rx/rx.c index 6b5f7b387..d49afa9af 100644 --- a/src/rx/rx.c +++ b/src/rx/rx.c @@ -251,7 +251,6 @@ assert(pthread_once(&rx_once_init, rxi_InitPthread)==0); */ #ifdef RX_ENABLE_LOCKS -static int rxi_ServerThreadSelectingCall; static afs_kmutex_t rx_rpc_stats; void rxi_StartUnlocked(); #endif @@ -425,7 +424,6 @@ int rx_Init(u_int port) #ifndef KERNEL MUTEX_INIT(&rxi_keyCreate_lock, "rxi_keyCreate_lock", MUTEX_DEFAULT, 0); #endif /* !KERNEL */ - CV_INIT(&rx_serverPool_cv, "rx_serverPool_cv",CV_DEFAULT, 0); #if defined(KERNEL) && defined(AFS_HPUX110_ENV) if ( !uniprocessor ) rx_sleepLock = alloc_spinlock(LAST_HELD_ORDER-10, "rx_sleepLock"); @@ -1421,7 +1419,7 @@ struct rx_service *cur_service; osi_socket *socketp; { struct rx_serverQueueEntry *sq; - register struct rx_call *call = (struct rx_call *) 0, *choice2; + register struct rx_call *call = (struct rx_call *) 0; struct rx_service *service = NULL; SPLVAR; @@ -1443,8 +1441,8 @@ osi_socket *socketp; } while (1) { if (queue_IsNotEmpty(&rx_incomingCallQueue)) { - register struct rx_call *tcall, *ncall; - choice2 = (struct rx_call *) 0; + register struct rx_call *tcall, *ncall, *choice2 = NULL; + /* Scan for eligible incoming calls. A call is not eligible * if the maximum number of calls for its service type are * already executing */ @@ -1453,66 +1451,61 @@ osi_socket *socketp; * have all their input data available immediately. This helps * keep threads from blocking, waiting for data from the client. */ for (queue_Scan(&rx_incomingCallQueue, tcall, ncall, rx_call)) { - service = tcall->conn->service; - if (!QuotaOK(service)) { - continue; - } - if (!tno || !tcall->queue_item_header.next ) { - /* If we're thread 0, then we'll just use - * this call. If we haven't been able to find an optimal - * choice, and we're at the end of the list, then use a - * 2d choice if one has been identified. Otherwise... */ - call = (choice2 ? choice2 : tcall); - service = call->conn->service; - } else if (!queue_IsEmpty(&tcall->rq)) { - struct rx_packet *rp; - rp = queue_First(&tcall->rq, rx_packet); - if (rp->header.seq == 1) { - if (!meltdown_1pkt || - (rp->header.flags & RX_LAST_PACKET)) { - call = tcall; - } else if (rxi_2dchoice && !choice2 && - !(tcall->flags & RX_CALL_CLEARED) && - (tcall->rprev > rxi_HardAckRate)) { - choice2 = tcall; - } else rxi_md2cnt++; + service = tcall->conn->service; + if (!QuotaOK(service)) { + continue; + } + if (!tno || !tcall->queue_item_header.next) { + /* If we're thread 0, then we'll just use + * this call. If we haven't been able to find an optimal + * choice, and we're at the end of the list, then use a + * 2d choice if one has been identified. Otherwise... */ + call = (choice2 ? choice2 : tcall); + service = call->conn->service; + } else if (!queue_IsEmpty(&tcall->rq)) { + struct rx_packet *rp; + rp = queue_First(&tcall->rq, rx_packet); + if (rp->header.seq == 1) { + if (!meltdown_1pkt || + (rp->header.flags & RX_LAST_PACKET)) { + call = tcall; + } else if (rxi_2dchoice && !choice2 && + !(tcall->flags & RX_CALL_CLEARED) && + (tcall->rprev > rxi_HardAckRate)) { + choice2 = tcall; + } else rxi_md2cnt++; + } + } + if (call) { + break; + } else { + ReturnToServerPool(service); } - } - if (call) { - break; - } else { - ReturnToServerPool(service); - } } - } + } if (call) { queue_Remove(call); - rxi_ServerThreadSelectingCall = 1; MUTEX_EXIT(&rx_serverPool_lock); MUTEX_ENTER(&call->lock); - MUTEX_ENTER(&rx_serverPool_lock); - if (queue_IsEmpty(&call->rq) || - queue_First(&call->rq, rx_packet)->header.seq != 1) - rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0); - - CLEAR_CALL_QUEUE_LOCK(call); - if (call->error) { + if (call->state != RX_STATE_PRECALL || call->error) { MUTEX_EXIT(&call->lock); + MUTEX_ENTER(&rx_serverPool_lock); ReturnToServerPool(service); - rxi_ServerThreadSelectingCall = 0; - CV_SIGNAL(&rx_serverPool_cv); - call = (struct rx_call*)0; + call = NULL; continue; } - call->flags &= (~RX_CALL_WAIT_PROC); + + if (queue_IsEmpty(&call->rq) || + queue_First(&call->rq, rx_packet)->header.seq != 1) + rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0); + + CLEAR_CALL_QUEUE_LOCK(call); + call->flags &= ~RX_CALL_WAIT_PROC; MUTEX_ENTER(&rx_stats_mutex); rx_nWaiting--; MUTEX_EXIT(&rx_stats_mutex); - rxi_ServerThreadSelectingCall = 0; - CV_SIGNAL(&rx_serverPool_cv); - MUTEX_EXIT(&rx_serverPool_lock); break; } else { @@ -3926,23 +3919,12 @@ rxi_AttachServerProc(call, socket, tnop, newcallp) { register struct rx_serverQueueEntry *sq; register struct rx_service *service = call->conn->service; -#ifdef RX_ENABLE_LOCKS register int haveQuota = 0; -#endif /* RX_ENABLE_LOCKS */ + /* May already be attached */ if (call->state == RX_STATE_ACTIVE) return; MUTEX_ENTER(&rx_serverPool_lock); -#ifdef RX_ENABLE_LOCKS - while(rxi_ServerThreadSelectingCall) { - MUTEX_EXIT(&call->lock); - CV_WAIT(&rx_serverPool_cv, &rx_serverPool_lock); - MUTEX_EXIT(&rx_serverPool_lock); - MUTEX_ENTER(&call->lock); - MUTEX_ENTER(&rx_serverPool_lock); - /* Call may have been attached */ - if (call->state == RX_STATE_ACTIVE) return; - } haveQuota = QuotaOK(service); if ((!haveQuota) || queue_IsEmpty(&rx_idleServerQueue)) { @@ -3950,8 +3932,11 @@ rxi_AttachServerProc(call, socket, tnop, newcallp) * put the call on the incoming call queue (unless it's * already on the queue). */ +#ifdef RX_ENABLE_LOCKS if (haveQuota) ReturnToServerPool(service); +#endif /* RX_ENABLE_LOCKS */ + if (!(call->flags & RX_CALL_WAIT_PROC)) { call->flags |= RX_CALL_WAIT_PROC; MUTEX_ENTER(&rx_stats_mutex); @@ -3962,20 +3947,6 @@ rxi_AttachServerProc(call, socket, tnop, newcallp) queue_Append(&rx_incomingCallQueue, call); } } -#else /* RX_ENABLE_LOCKS */ - if (!QuotaOK(service) || queue_IsEmpty(&rx_idleServerQueue)) { - /* If there are no processes available to service this call, - * put the call on the incoming call queue (unless it's - * already on the queue). - */ - if (!(call->flags & RX_CALL_WAIT_PROC)) { - call->flags |= RX_CALL_WAIT_PROC; - rx_nWaiting++; - rxi_calltrace(RX_CALL_ARRIVAL, call); - queue_Append(&rx_incomingCallQueue, call); - } - } -#endif /* RX_ENABLE_LOCKS */ else { sq = queue_First(&rx_idleServerQueue, rx_serverQueueEntry); diff --git a/src/rx/rx_globals.h b/src/rx/rx_globals.h index bcdf2dccb..160e2c25e 100644 --- a/src/rx/rx_globals.h +++ b/src/rx/rx_globals.h @@ -32,7 +32,6 @@ EXT struct rx_service *rx_services[RX_MAX_SERVICES+1]; #ifdef RX_ENABLE_LOCKS /* Protects nRequestsRunning as well as pool allocation variables. */ EXT afs_kmutex_t rx_serverPool_lock; -EXT afs_kcondvar_t rx_serverPool_cv; #endif /* RX_ENABLE_LOCKS */ /* Incoming calls wait on this queue when there are no available server processes */ -- 2.39.5