]> git.michaelhowe.org Git - packages/o/openafs.git/commitdiff
Rx: Consolidate wait for tq busy and make its use uniform
authorJeffrey Altman <jaltman@your-file-system.com>
Tue, 12 Oct 2010 14:53:43 +0000 (10:53 -0400)
committerDerrick Brashear <shadow@dementia.org>
Sat, 9 Jul 2011 05:04:14 +0000 (22:04 -0700)
rxi_WaitforTQBusy() is now used wherever a wait for the transmit
queue is required.  It returns either when the transmit queue is
no longer busy or when the call enters an error state.

Having made this change it is clear that call->currentPacket is
not always validated when the call->lock is reacquired which may be
true when rxi_WaitforTQBusy() is called.

(cherry picked from commit e45abc6cc20236b9e91c23cb6f8e90f51b6a4a99)
Reviewed-on: http://gerrit.openafs.org/2966
Reviewed-by: Jeffrey Altman <jaltman@openafs.org>
Tested-by: Jeffrey Altman <jaltman@openafs.org>
Change-Id: I3492d351581549872b8332a626dae344757c6a6e
Reviewed-on: http://gerrit.openafs.org/4929
Reviewed-by: Derrick Brashear <shadow@dementia.org>
Tested-by: Derrick Brashear <shadow@dementia.org>
src/rx/rx.c
src/rx/rx_prototypes.h
src/rx/rx_rdwr.c

index 7dcb9f46b33d6f2d9ea3a9340a9c67f30cb3ad60..45f274374279e88a1ab48f64e1a6d737fb78f4b1 100644 (file)
@@ -1358,8 +1358,8 @@ rx_GetConnection(struct rx_connection *conn)
 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
 /* Wait for the transmit queue to no longer be busy.
  * requires the call->lock to be held */
-static void rxi_WaitforTQBusy(struct rx_call *call) {
-    while (call->flags & RX_CALL_TQ_BUSY) {
+void rxi_WaitforTQBusy(struct rx_call *call) {
+    while (!call->error && (call->flags & RX_CALL_TQ_BUSY)) {
        call->flags |= RX_CALL_TQ_WAIT;
        call->tqWaiters++;
 #ifdef RX_ENABLE_LOCKS
@@ -3338,20 +3338,22 @@ rxi_ReceivePacket(struct rx_packet *np, osi_socket socket,
             * flag is cleared.
             */
 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
-           while ((call->state == RX_STATE_ACTIVE)
-                  && (call->flags & RX_CALL_TQ_BUSY)) {
-               call->flags |= RX_CALL_TQ_WAIT;
-               call->tqWaiters++;
-#ifdef RX_ENABLE_LOCKS
-               osirx_AssertMine(&call->lock, "rxi_Start lock3");
-               CV_WAIT(&call->cv_tq, &call->lock);
-#else /* RX_ENABLE_LOCKS */
-               osi_rxSleep(&call->tq);
-#endif /* RX_ENABLE_LOCKS */
-               call->tqWaiters--;
-               if (call->tqWaiters == 0)
-                   call->flags &= ~RX_CALL_TQ_WAIT;
-           }
+            if (call->state == RX_STATE_ACTIVE) {
+                rxi_WaitforTQBusy(call);
+                /*
+                 * If we entered error state while waiting,
+                 * must call rxi_CallError to permit rxi_ResetCall
+                 * to processed when the tqWaiter count hits zero.
+                 */
+                if (call->error) {
+                    rxi_CallError(call, call->error);
+                    MUTEX_EXIT(&call->lock);
+                    MUTEX_ENTER(&rx_refcnt_mutex);
+                    conn->refCount--;
+                    MUTEX_EXIT(&rx_refcnt_mutex);
+                    return np;
+                }
+            }
 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
            /* If the new call cannot be taken right now send a busy and set
             * the error condition in this call, so that it terminates as
@@ -5974,7 +5976,7 @@ rxi_Start(struct rxevent *event,
           void *arg0, void *arg1, int istack)
 {
     struct rx_call *call = arg0;
-
+    struct rx_peer *peer = call->conn->peer;
     struct rx_packet *p;
     struct rx_packet *nxp;     /* Next pointer for queue_Scan */
     int nXmitPackets;
@@ -6002,12 +6004,42 @@ rxi_Start(struct rxevent *event,
             * actually got to run. */
            return;
        }
+#ifdef  AFS_GLOBAL_RXLOCK_KERNEL
+        if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
+           /* someone else is waiting to start recovery */
+           return;
+        }
+       call->flags |= RX_CALL_FAST_RECOVER_WAIT;
+       rxi_WaitforTQBusy(call);
+#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
+       call->flags &= ~RX_CALL_FAST_RECOVER_WAIT;
+#ifdef AFS_GLOBAL_RXLOCK_KERNEL
+        if (call->error) {
+            if (rx_stats_active)
+               rx_MutexIncrement(rx_tq_debug.rxi_start_in_error, rx_stats_mutex);
+            return;
+        }
+#endif
+        call->flags |= RX_CALL_FAST_RECOVER;
 
-       /* Mark all of the pending packets in the queue as being lost */
-       for (queue_Scan(&call->tq, p, nxp, rx_packet)) {
-           if (!(p->flags & RX_PKTFLAG_ACKED))
-               p->flags &= ~RX_PKTFLAG_SENT;
-       }
+        if (peer->maxDgramPackets > 1) {
+            call->MTU = RX_JUMBOBUFFERSIZE + RX_HEADER_SIZE;
+        } else {
+            call->MTU = MIN(peer->natMTU, peer->maxMTU);
+        }
+        call->ssthresh = MAX(4, MIN((int)call->cwind, (int)call->twind)) >> 1;
+        call->nDgramPackets = 1;
+        call->cwind = 1;
+        call->nextCwind = 1;
+        call->nAcks = 0;
+        call->nNacks = 0;
+        MUTEX_ENTER(&peer->peer_lock);
+        peer->MTU = call->MTU;
+        peer->cwind = call->cwind;
+        peer->nDgramPackets = 1;
+        peer->congestSeq++;
+        call->congestSeq = peer->congestSeq;
+        MUTEX_EXIT(&peer->peer_lock);
     }
 
     if (call->error) {
index 8925b4947219d8890dddc46c1bd85e930f0abb9e..86c458936e0a7d0853161bb4ec4701b21bdeee7a 100644 (file)
@@ -580,7 +580,9 @@ extern void rxi_PrepareSendPacket(struct rx_call *call,
 extern int rxi_AdjustIfMTU(int mtu);
 extern int rxi_AdjustMaxMTU(int mtu, int peerMaxMTU);
 extern int rxi_AdjustDgramPackets(int frags, int mtu);
-
+#ifdef  AFS_GLOBAL_RXLOCK_KERNEL
+extern void rxi_WaitforTQBusy(struct rx_call *call);
+#endif
 
 /* rxperf.c */
 
index 54743dbd1df7bfa792f7237f74577a13f3dac607..90f201595915ac24a20d6927453676a6c7278435 100644 (file)
@@ -730,6 +730,10 @@ rxi_WriteProc(struct rx_call *call, char *buf,
     do {
        if (call->nFree == 0) {
            MUTEX_ENTER(&call->lock);
+#ifdef AFS_GLOBAL_RXLOCK_KERNEL
+            rxi_WaitforTQBusy(call);
+#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
+            cp = call->currentPacket;
             if (call->error)
                 call->mode = RX_MODE_ERROR;
            if (!call->error && cp) {
@@ -742,23 +746,6 @@ rxi_WriteProc(struct rx_call *call, char *buf,
                 cp->flags &= ~RX_PKTFLAG_CP;
 #endif
                call->currentPacket = (struct rx_packet *)0;
-#ifdef AFS_GLOBAL_RXLOCK_KERNEL
-               /* Wait until TQ_BUSY is reset before adding any
-                * packets to the transmit queue
-                */
-               while (call->flags & RX_CALL_TQ_BUSY) {
-                   call->flags |= RX_CALL_TQ_WAIT;
-                    call->tqWaiters++;
-#ifdef RX_ENABLE_LOCKS
-                   CV_WAIT(&call->cv_tq, &call->lock);
-#else /* RX_ENABLE_LOCKS */
-                   osi_rxSleep(&call->tq);
-#endif /* RX_ENABLE_LOCKS */
-                    call->tqWaiters--;
-                    if (call->tqWaiters == 0)
-                        call->flags &= ~RX_CALL_TQ_WAIT;
-               }
-#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
                clock_NewTime();        /* Bogus:  need new time package */
                /* The 0, below, specifies that it is not the last packet:
                 * there will be others. PrepareSendPacket may
@@ -1148,20 +1135,7 @@ rxi_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes)
        call->error = RX_PROTOCOL_ERROR;
     }
 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
-    /* Wait until TQ_BUSY is reset before trying to move any
-     * packets to the transmit queue.  */
-    while (!call->error && call->flags & RX_CALL_TQ_BUSY) {
-       call->flags |= RX_CALL_TQ_WAIT;
-        call->tqWaiters++;
-#ifdef RX_ENABLE_LOCKS
-       CV_WAIT(&call->cv_tq, &call->lock);
-#else /* RX_ENABLE_LOCKS */
-       osi_rxSleep(&call->tq);
-#endif /* RX_ENABLE_LOCKS */
-        call->tqWaiters--;
-        if (call->tqWaiters == 0)
-            call->flags &= ~RX_CALL_TQ_WAIT;
-    }
+    rxi_WaitforTQBusy(call);
 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
     cp = call->currentPacket;
 
@@ -1177,6 +1151,7 @@ rxi_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes)
 #ifdef RXDEBUG_PACKET
             call->iovqc++;
 #endif /* RXDEBUG_PACKET */
+           call->currentPacket = (struct rx_packet *)0;
        }
 #ifdef RXDEBUG_PACKET
         call->iovqc -=
@@ -1384,23 +1359,8 @@ rxi_FlushWrite(struct rx_call *call)
 
         MUTEX_ENTER(&call->lock);
 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
-       /* Wait until TQ_BUSY is reset before adding any
-        * packets to the transmit queue
-        */
-       while (call->flags & RX_CALL_TQ_BUSY) {
-           call->flags |= RX_CALL_TQ_WAIT;
-            call->tqWaiters++;
-#ifdef RX_ENABLE_LOCKS
-           CV_WAIT(&call->cv_tq, &call->lock);
-#else /* RX_ENABLE_LOCKS */
-           osi_rxSleep(&call->tq);
-#endif /* RX_ENABLE_LOCKS */
-            call->tqWaiters--;
-            if (call->tqWaiters == 0)
-                call->flags &= ~RX_CALL_TQ_WAIT;
-       }
+        rxi_WaitforTQBusy(call);
 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
-
         if (call->error)
             call->mode = RX_MODE_ERROR;