]> git.michaelhowe.org Git - packages/o/openafs.git/commitdiff
rx-avoid-deadlock-in-attaching-call-20030122
authorNickolai Zeldovich <kolya@mit.edu>
Wed, 22 Jan 2003 06:49:48 +0000 (06:49 +0000)
committerNickolai Zeldovich <kolya@mit.edu>
Wed, 22 Jan 2003 06:49:48 +0000 (06:49 +0000)
Avoid a subtle cycle in the locking order hierarchy involving
the rxi_ServerThreadSelectingCall flag, and instead rely on
call->lock to make sure we don't attach the same call twice.
Also some miscellaneous cleanup and code unification.

src/rx/rx.c
src/rx/rx_globals.h

index ea76368af51cc4a57fcff744cca8cb093fc1f1a6..122fc8498a40dac1bff61bc08947bf68cf32fecb 100644 (file)
@@ -256,7 +256,6 @@ assert(pthread_once(&rx_once_init, rxi_InitPthread)==0);
  */
 
 #ifdef RX_ENABLE_LOCKS
-static int rxi_ServerThreadSelectingCall;
 static afs_kmutex_t rx_rpc_stats;
 void rxi_StartUnlocked();
 #endif
@@ -426,7 +425,6 @@ int rx_Init(u_int port)
 #ifndef KERNEL
     MUTEX_INIT(&rxi_keyCreate_lock, "rxi_keyCreate_lock", MUTEX_DEFAULT, 0);
 #endif /* !KERNEL */
-    CV_INIT(&rx_serverPool_cv, "rx_serverPool_cv",CV_DEFAULT, 0);
 #if defined(KERNEL) && defined(AFS_HPUX110_ENV)
     if ( !uniprocessor )
       rx_sleepLock = alloc_spinlock(LAST_HELD_ORDER-10, "rx_sleepLock");
@@ -1395,7 +1393,7 @@ void rx_WakeupServerProcs(void)
 struct rx_call *rx_GetCall(int tno, struct rx_service *cur_service, osi_socket *socketp)
 {
     struct rx_serverQueueEntry *sq;
-    register struct rx_call *call = (struct rx_call *) 0, *choice2;
+    register struct rx_call *call = (struct rx_call *) 0;
     struct rx_service *service = NULL;
     SPLVAR;
 
@@ -1417,8 +1415,8 @@ struct rx_call *rx_GetCall(int tno, struct rx_service *cur_service, osi_socket *
     }
     while (1) {
        if (queue_IsNotEmpty(&rx_incomingCallQueue)) {
-           register struct rx_call *tcall, *ncall;
-           choice2 = (struct rx_call *) 0;
+           register struct rx_call *tcall, *ncall, *choice2 = NULL;
+
            /* Scan for eligible incoming calls.  A call is not eligible
             * if the maximum number of calls for its service type are
             * already executing */
@@ -1427,66 +1425,61 @@ struct rx_call *rx_GetCall(int tno, struct rx_service *cur_service, osi_socket *
             * have all their input data available immediately.  This helps 
             * keep threads from blocking, waiting for data from the client. */
            for (queue_Scan(&rx_incomingCallQueue, tcall, ncall, rx_call)) {
-             service = tcall->conn->service;
-             if (!QuotaOK(service)) {
-               continue;
-             }
-             if (!tno || !tcall->queue_item_header.next  ) {
-               /* If we're thread 0, then  we'll just use 
-                * this call. If we haven't been able to find an optimal 
-                * choice, and we're at the end of the list, then use a 
-                * 2d choice if one has been identified.  Otherwise... */
-               call = (choice2 ? choice2 : tcall);
-               service = call->conn->service;
-             } else if (!queue_IsEmpty(&tcall->rq)) {
-               struct rx_packet *rp;
-               rp = queue_First(&tcall->rq, rx_packet);
-               if (rp->header.seq == 1) {
-                 if (!meltdown_1pkt ||             
-                     (rp->header.flags & RX_LAST_PACKET)) {
-                   call = tcall;
-                 } else if (rxi_2dchoice && !choice2 &&
-                            !(tcall->flags & RX_CALL_CLEARED) &&
-                            (tcall->rprev > rxi_HardAckRate)) {
-                   choice2 = tcall;
-                 } else rxi_md2cnt++;
+               service = tcall->conn->service;
+               if (!QuotaOK(service)) {
+                   continue;
+               }
+               if (!tno || !tcall->queue_item_header.next) {
+                   /* If we're thread 0, then  we'll just use 
+                    * this call. If we haven't been able to find an optimal 
+                    * choice, and we're at the end of the list, then use a 
+                    * 2d choice if one has been identified.  Otherwise... */
+                   call = (choice2 ? choice2 : tcall);
+                   service = call->conn->service;
+               } else if (!queue_IsEmpty(&tcall->rq)) {
+                   struct rx_packet *rp;
+                   rp = queue_First(&tcall->rq, rx_packet);
+                   if (rp->header.seq == 1) {
+                       if (!meltdown_1pkt ||
+                           (rp->header.flags & RX_LAST_PACKET)) {
+                           call = tcall;
+                       } else if (rxi_2dchoice && !choice2 &&
+                                  !(tcall->flags & RX_CALL_CLEARED) &&
+                                  (tcall->rprev > rxi_HardAckRate)) {
+                           choice2 = tcall;
+                       } else rxi_md2cnt++;
+                   }
+               }
+               if (call) {
+                   break;
+               } else {
+                   ReturnToServerPool(service);
                }
-             }
-             if (call)  {
-               break;
-             } else {
-                 ReturnToServerPool(service);
-             }
            }
-         }
+       }
 
        if (call) {
            queue_Remove(call);
-           rxi_ServerThreadSelectingCall = 1;
            MUTEX_EXIT(&rx_serverPool_lock);
            MUTEX_ENTER(&call->lock);
-           MUTEX_ENTER(&rx_serverPool_lock);
 
-           if (queue_IsEmpty(&call->rq) ||
-               queue_First(&call->rq, rx_packet)->header.seq != 1)
-             rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
-
-           CLEAR_CALL_QUEUE_LOCK(call);
-           if (call->error) {
+           if (call->state != RX_STATE_PRECALL || call->error) {
                MUTEX_EXIT(&call->lock);
+               MUTEX_ENTER(&rx_serverPool_lock);
                ReturnToServerPool(service);
-               rxi_ServerThreadSelectingCall = 0;
-               CV_SIGNAL(&rx_serverPool_cv);
-               call = (struct rx_call*)0;
+               call = NULL;
                continue;
            }
-           call->flags &= (~RX_CALL_WAIT_PROC);
+
+           if (queue_IsEmpty(&call->rq) ||
+               queue_First(&call->rq, rx_packet)->header.seq != 1)
+               rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
+
+           CLEAR_CALL_QUEUE_LOCK(call);
+           call->flags &= ~RX_CALL_WAIT_PROC;
            MUTEX_ENTER(&rx_stats_mutex);
            rx_nWaiting--;
            MUTEX_EXIT(&rx_stats_mutex);
-           rxi_ServerThreadSelectingCall = 0;
-           CV_SIGNAL(&rx_serverPool_cv);
-           MUTEX_EXIT(&rx_serverPool_lock);
            break;
        }
        else {
@@ -3784,8 +3777,8 @@ struct rx_packet *rxi_ReceiveResponsePacket(register struct rx_connection *conn,
            struct rx_call *call = conn->call[i];
            if (call) {
                MUTEX_ENTER(&call->lock);
-                if (call->state == RX_STATE_PRECALL)
-                    rxi_AttachServerProc(call, (osi_socket) -1, NULL, NULL);
+               if (call->state == RX_STATE_PRECALL)
+                   rxi_AttachServerProc(call, (osi_socket) -1, NULL, NULL);
                MUTEX_EXIT(&call->lock);
            }
        }
@@ -3845,23 +3838,12 @@ void rxi_AttachServerProc(register struct rx_call *call,
 {
     register struct rx_serverQueueEntry *sq;
     register struct rx_service *service = call->conn->service;
-#ifdef RX_ENABLE_LOCKS
     register int haveQuota = 0;
-#endif /* RX_ENABLE_LOCKS */
+
     /* May already be attached */
     if (call->state == RX_STATE_ACTIVE) return;
 
     MUTEX_ENTER(&rx_serverPool_lock);
-#ifdef RX_ENABLE_LOCKS
-    while(rxi_ServerThreadSelectingCall) {
-       MUTEX_EXIT(&call->lock);
-       CV_WAIT(&rx_serverPool_cv, &rx_serverPool_lock);
-       MUTEX_EXIT(&rx_serverPool_lock);
-       MUTEX_ENTER(&call->lock);
-       MUTEX_ENTER(&rx_serverPool_lock);
-       /* Call may have been attached */
-       if (call->state == RX_STATE_ACTIVE) return;
-    }
 
     haveQuota = QuotaOK(service);
     if ((!haveQuota) || queue_IsEmpty(&rx_idleServerQueue)) {
@@ -3869,8 +3851,11 @@ void rxi_AttachServerProc(register struct rx_call *call,
          * put the call on the incoming call queue (unless it's
          * already on the queue).
          */
+#ifdef RX_ENABLE_LOCKS
        if (haveQuota)
            ReturnToServerPool(service);
+#endif /* RX_ENABLE_LOCKS */
+
        if (!(call->flags & RX_CALL_WAIT_PROC)) {
            call->flags |= RX_CALL_WAIT_PROC;
            MUTEX_ENTER(&rx_stats_mutex);
@@ -3881,20 +3866,6 @@ void rxi_AttachServerProc(register struct rx_call *call,
            queue_Append(&rx_incomingCallQueue, call);
        }
     }
-#else /* RX_ENABLE_LOCKS */
-    if (!QuotaOK(service) || queue_IsEmpty(&rx_idleServerQueue)) {
-       /* If there are no processes available to service this call,
-         * put the call on the incoming call queue (unless it's
-         * already on the queue).
-         */
-       if (!(call->flags & RX_CALL_WAIT_PROC)) {
-           call->flags |= RX_CALL_WAIT_PROC;
-           rx_nWaiting++;
-           rxi_calltrace(RX_CALL_ARRIVAL, call);
-           queue_Append(&rx_incomingCallQueue, call);
-       }
-    }
-#endif /* RX_ENABLE_LOCKS */
     else {
        sq = queue_First(&rx_idleServerQueue, rx_serverQueueEntry);
 
index 536334660030fa9ee7d80bb6f8dd97de9196ee6c..2c2b425dedc5d4d1642c1bdef9e96333eb3d96be 100644 (file)
@@ -32,7 +32,6 @@ EXT struct rx_service *rx_services[RX_MAX_SERVICES+1];
 #ifdef RX_ENABLE_LOCKS
 /* Protects nRequestsRunning as well as pool allocation variables. */
 EXT afs_kmutex_t rx_serverPool_lock;
-EXT afs_kcondvar_t rx_serverPool_cv;
 #endif /* RX_ENABLE_LOCKS */
 
 /* Incoming calls wait on this queue when there are no available server processes */