]> git.michaelhowe.org Git - packages/o/openafs.git/commitdiff
STABLE12-rx-avoid-deadlock-in-attaching-call-20030122
authorNickolai Zeldovich <kolya@mit.edu>
Fri, 31 Jan 2003 21:30:56 +0000 (21:30 +0000)
committerGarry Zacheiss <zacheiss@mit.edu>
Fri, 31 Jan 2003 21:30:56 +0000 (21:30 +0000)
Avoid a subtle cycle in the locking order hierarchy involving
the rxi_ServerThreadSelectingCall flag, and instead rely on
call->lock to make sure we don't attach the same call twice.
Also some miscellaneous cleanup and code unification.

(cherry picked from commit a6d9298d3d299cc2c776b22a2800b5c2044ea497)

src/rx/rx.c
src/rx/rx_globals.h

index 6b5f7b387693966a3f7a0daf6defdb93b3db5abc..d49afa9af0deb182d952358668e2be8f9570b712 100644 (file)
@@ -251,7 +251,6 @@ assert(pthread_once(&rx_once_init, rxi_InitPthread)==0);
  */
 
 #ifdef RX_ENABLE_LOCKS
-static int rxi_ServerThreadSelectingCall;
 static afs_kmutex_t rx_rpc_stats;
 void rxi_StartUnlocked();
 #endif
@@ -425,7 +424,6 @@ int rx_Init(u_int port)
 #ifndef KERNEL
     MUTEX_INIT(&rxi_keyCreate_lock, "rxi_keyCreate_lock", MUTEX_DEFAULT, 0);
 #endif /* !KERNEL */
-    CV_INIT(&rx_serverPool_cv, "rx_serverPool_cv",CV_DEFAULT, 0);
 #if defined(KERNEL) && defined(AFS_HPUX110_ENV)
     if ( !uniprocessor )
       rx_sleepLock = alloc_spinlock(LAST_HELD_ORDER-10, "rx_sleepLock");
@@ -1421,7 +1419,7 @@ struct rx_service *cur_service;
 osi_socket *socketp;
 {
     struct rx_serverQueueEntry *sq;
-    register struct rx_call *call = (struct rx_call *) 0, *choice2;
+    register struct rx_call *call = (struct rx_call *) 0;
     struct rx_service *service = NULL;
     SPLVAR;
 
@@ -1443,8 +1441,8 @@ osi_socket *socketp;
     }
     while (1) {
        if (queue_IsNotEmpty(&rx_incomingCallQueue)) {
-           register struct rx_call *tcall, *ncall;
-           choice2 = (struct rx_call *) 0;
+           register struct rx_call *tcall, *ncall, *choice2 = NULL;
+
            /* Scan for eligible incoming calls.  A call is not eligible
             * if the maximum number of calls for its service type are
             * already executing */
@@ -1453,66 +1451,61 @@ osi_socket *socketp;
             * have all their input data available immediately.  This helps 
             * keep threads from blocking, waiting for data from the client. */
            for (queue_Scan(&rx_incomingCallQueue, tcall, ncall, rx_call)) {
-             service = tcall->conn->service;
-             if (!QuotaOK(service)) {
-               continue;
-             }
-             if (!tno || !tcall->queue_item_header.next  ) {
-               /* If we're thread 0, then  we'll just use 
-                * this call. If we haven't been able to find an optimal 
-                * choice, and we're at the end of the list, then use a 
-                * 2d choice if one has been identified.  Otherwise... */
-               call = (choice2 ? choice2 : tcall);
-               service = call->conn->service;
-             } else if (!queue_IsEmpty(&tcall->rq)) {
-               struct rx_packet *rp;
-               rp = queue_First(&tcall->rq, rx_packet);
-               if (rp->header.seq == 1) {
-                 if (!meltdown_1pkt ||             
-                     (rp->header.flags & RX_LAST_PACKET)) {
-                   call = tcall;
-                 } else if (rxi_2dchoice && !choice2 &&
-                            !(tcall->flags & RX_CALL_CLEARED) &&
-                            (tcall->rprev > rxi_HardAckRate)) {
-                   choice2 = tcall;
-                 } else rxi_md2cnt++;
+               service = tcall->conn->service;
+               if (!QuotaOK(service)) {
+                   continue;
+               }
+               if (!tno || !tcall->queue_item_header.next) {
+                   /* If we're thread 0, then  we'll just use 
+                    * this call. If we haven't been able to find an optimal 
+                    * choice, and we're at the end of the list, then use a 
+                    * 2d choice if one has been identified.  Otherwise... */
+                   call = (choice2 ? choice2 : tcall);
+                   service = call->conn->service;
+               } else if (!queue_IsEmpty(&tcall->rq)) {
+                   struct rx_packet *rp;
+                   rp = queue_First(&tcall->rq, rx_packet);
+                   if (rp->header.seq == 1) {
+                       if (!meltdown_1pkt ||
+                           (rp->header.flags & RX_LAST_PACKET)) {
+                           call = tcall;
+                       } else if (rxi_2dchoice && !choice2 &&
+                                  !(tcall->flags & RX_CALL_CLEARED) &&
+                                  (tcall->rprev > rxi_HardAckRate)) {
+                           choice2 = tcall;
+                       } else rxi_md2cnt++;
+                   }
+               }
+               if (call) {
+                   break;
+               } else {
+                   ReturnToServerPool(service);
                }
-             }
-             if (call)  {
-               break;
-             } else {
-                 ReturnToServerPool(service);
-             }
            }
-         }
+       }
 
        if (call) {
            queue_Remove(call);
-           rxi_ServerThreadSelectingCall = 1;
            MUTEX_EXIT(&rx_serverPool_lock);
            MUTEX_ENTER(&call->lock);
-           MUTEX_ENTER(&rx_serverPool_lock);
 
-           if (queue_IsEmpty(&call->rq) ||
-               queue_First(&call->rq, rx_packet)->header.seq != 1)
-             rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
-
-           CLEAR_CALL_QUEUE_LOCK(call);
-           if (call->error) {
+           if (call->state != RX_STATE_PRECALL || call->error) {
                MUTEX_EXIT(&call->lock);
+               MUTEX_ENTER(&rx_serverPool_lock);
                ReturnToServerPool(service);
-               rxi_ServerThreadSelectingCall = 0;
-               CV_SIGNAL(&rx_serverPool_cv);
-               call = (struct rx_call*)0;
+               call = NULL;
                continue;
            }
-           call->flags &= (~RX_CALL_WAIT_PROC);
+
+           if (queue_IsEmpty(&call->rq) ||
+               queue_First(&call->rq, rx_packet)->header.seq != 1)
+               rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
+
+           CLEAR_CALL_QUEUE_LOCK(call);
+           call->flags &= ~RX_CALL_WAIT_PROC;
            MUTEX_ENTER(&rx_stats_mutex);
            rx_nWaiting--;
            MUTEX_EXIT(&rx_stats_mutex);
-           rxi_ServerThreadSelectingCall = 0;
-           CV_SIGNAL(&rx_serverPool_cv);
-           MUTEX_EXIT(&rx_serverPool_lock);
            break;
        }
        else {
@@ -3926,23 +3919,12 @@ rxi_AttachServerProc(call, socket, tnop, newcallp)
 {
     register struct rx_serverQueueEntry *sq;
     register struct rx_service *service = call->conn->service;
-#ifdef RX_ENABLE_LOCKS
     register int haveQuota = 0;
-#endif /* RX_ENABLE_LOCKS */
+
     /* May already be attached */
     if (call->state == RX_STATE_ACTIVE) return;
 
     MUTEX_ENTER(&rx_serverPool_lock);
-#ifdef RX_ENABLE_LOCKS
-    while(rxi_ServerThreadSelectingCall) {
-       MUTEX_EXIT(&call->lock);
-       CV_WAIT(&rx_serverPool_cv, &rx_serverPool_lock);
-       MUTEX_EXIT(&rx_serverPool_lock);
-       MUTEX_ENTER(&call->lock);
-       MUTEX_ENTER(&rx_serverPool_lock);
-       /* Call may have been attached */
-       if (call->state == RX_STATE_ACTIVE) return;
-    }
 
     haveQuota = QuotaOK(service);
     if ((!haveQuota) || queue_IsEmpty(&rx_idleServerQueue)) {
@@ -3950,8 +3932,11 @@ rxi_AttachServerProc(call, socket, tnop, newcallp)
          * put the call on the incoming call queue (unless it's
          * already on the queue).
          */
+#ifdef RX_ENABLE_LOCKS
        if (haveQuota)
            ReturnToServerPool(service);
+#endif /* RX_ENABLE_LOCKS */
+
        if (!(call->flags & RX_CALL_WAIT_PROC)) {
            call->flags |= RX_CALL_WAIT_PROC;
            MUTEX_ENTER(&rx_stats_mutex);
@@ -3962,20 +3947,6 @@ rxi_AttachServerProc(call, socket, tnop, newcallp)
            queue_Append(&rx_incomingCallQueue, call);
        }
     }
-#else /* RX_ENABLE_LOCKS */
-    if (!QuotaOK(service) || queue_IsEmpty(&rx_idleServerQueue)) {
-       /* If there are no processes available to service this call,
-         * put the call on the incoming call queue (unless it's
-         * already on the queue).
-         */
-       if (!(call->flags & RX_CALL_WAIT_PROC)) {
-           call->flags |= RX_CALL_WAIT_PROC;
-           rx_nWaiting++;
-           rxi_calltrace(RX_CALL_ARRIVAL, call);
-           queue_Append(&rx_incomingCallQueue, call);
-       }
-    }
-#endif /* RX_ENABLE_LOCKS */
     else {
        sq = queue_First(&rx_idleServerQueue, rx_serverQueueEntry);
 
index bcdf2dccb6a83af7b690ad0d02f25f816c9a2049..160e2c25e402cbc172b2567749595ca9fb492b10 100644 (file)
@@ -32,7 +32,6 @@ EXT struct rx_service *rx_services[RX_MAX_SERVICES+1];
 #ifdef RX_ENABLE_LOCKS
 /* Protects nRequestsRunning as well as pool allocation variables. */
 EXT afs_kmutex_t rx_serverPool_lock;
-EXT afs_kcondvar_t rx_serverPool_cv;
 #endif /* RX_ENABLE_LOCKS */
 
 /* Incoming calls wait on this queue when there are no available server processes */