]> git.michaelhowe.org Git - packages/o/openafs.git/commitdiff
DEVEL15-rx-retain-windowing-per-peer-20080508
authorDerrick Brashear <shadow@dementia.org>
Thu, 8 May 2008 22:25:58 +0000 (22:25 +0000)
committerDerrick Brashear <shadow@dementia.org>
Thu, 8 May 2008 22:25:58 +0000 (22:25 +0000)
LICENSE IPL10

we learned about the peer in a previous connection... retain the information
and keep using it. widen the available window.

makes rx perform better over high latency wans. needs to be present in both
sides for maximal effect.

(cherry picked from commit 3feee9278bc8d0a22630508f3aca10835bf52866)

src/rx/rx.c
src/rx/rx.h
src/rx/rx_globals.h
src/rx/rx_rdwr.c
src/rx/test/generator.c

index 5b12b70f208a4cc5ee0367b1f4deba1ecd9f69a3..505ba0012376b807544141cca02b62395933fbf8 100644 (file)
@@ -741,7 +741,7 @@ rx_NewConnection(register afs_uint32 shost, u_short sport, u_short sservice,
                 register struct rx_securityClass *securityObject,
                 int serviceSecurityIndex)
 {
-    int hashindex;
+    int hashindex, i;
     afs_int32 cid;
     register struct rx_connection *conn;
 
@@ -777,6 +777,10 @@ rx_NewConnection(register afs_uint32 shost, u_short sport, u_short sservice,
     conn->delayedAbortEvent = NULL;
     conn->abortCount = 0;
     conn->error = 0;
+    for (i = 0; i < RX_MAXCALLS; i++) {
+       conn->twind[i] = rx_initSendWindow;
+       conn->rwind[i] = rx_initReceiveWindow;
+    }
 
     RXS_NewConnection(securityObject, conn);
     hashindex =
@@ -2153,6 +2157,8 @@ rxi_NewCall(register struct rx_connection *conn, register int channel)
     }
     call->channel = channel;
     call->callNumber = &conn->callNumber[channel];
+    call->rwind = conn->rwind[channel];
+    call->twind = conn->twind[channel];
     /* Note that the next expected call number is retained (in
      * conn->callNumber[i]), even if we reallocate the call structure
      */
@@ -2313,7 +2319,7 @@ rxi_FindConnection(osi_socket socket, register afs_int32 host,
                   register u_short port, u_short serviceId, afs_uint32 cid,
                   afs_uint32 epoch, int type, u_int securityIndex)
 {
-    int hashindex, flag;
+    int hashindex, flag, i;
     register struct rx_connection *conn;
     hashindex = CONN_HASH(host, port, cid, epoch, type);
     MUTEX_ENTER(&rx_connHashTable_lock);
@@ -2383,6 +2389,10 @@ rxi_FindConnection(osi_socket socket, register afs_int32 host,
        conn->specific = NULL;
        rx_SetConnDeadTime(conn, service->connDeadTime);
        rx_SetConnIdleDeadTime(conn, service->idleDeadTime);
+       for (i = 0; i < RX_MAXCALLS; i++) {
+           conn->twind[i] = rx_initSendWindow;
+           conn->rwind[i] = rx_initReceiveWindow;
+       }
        /* Notify security object of the new connection */
        RXS_NewConnection(conn->securityObject, conn);
        /* XXXX Connection timeout? */
@@ -3759,6 +3769,7 @@ rxi_ReceiveAckPacket(register struct rx_call *call, struct rx_packet *np,
            if (tSize < call->twind) {  /* smaller than our send */
                call->twind = tSize;    /* window, we must send less... */
                call->ssthresh = MIN(call->twind, call->ssthresh);
+               call->conn->twind[call->channel] = call->twind;
            }
 
            /* Only send jumbograms to 3.4a fileservers. 3.3a RX gets the
@@ -3782,9 +3793,11 @@ rxi_ReceiveAckPacket(register struct rx_call *call, struct rx_packet *np,
             */
            if (tSize < call->twind) {
                call->twind = tSize;
+               call->conn->twind[call->channel] = call->twind;
                call->ssthresh = MIN(call->twind, call->ssthresh);
            } else if (tSize > call->twind) {
                call->twind = tSize;
+               call->conn->twind[call->channel] = call->twind;
            }
 
            /*
@@ -4499,8 +4512,8 @@ rxi_ResetCall(register struct rx_call *call, register int newcall)
     }
     queue_Init(&call->rq);
     call->error = 0;
-    call->rwind = rx_initReceiveWindow;
-    call->twind = rx_initSendWindow;
+    call->twind = call->conn->twind[call->channel];
+    call->rwind = call->conn->rwind[call->channel];
     call->nSoftAcked = 0;
     call->nextCwind = 0;
     call->nAcks = 0;
@@ -4611,7 +4624,7 @@ rxi_SendAck(register struct rx_call *call,
      * Open the receive window once a thread starts reading packets
      */
     if (call->rnext > 1) {
-       call->rwind = rx_maxReceiveWindow;
+       call->conn->rwind[call->channel] = call->rwind = rx_maxReceiveWindow;
     }
 
     call->nHardAcks = 0;
index 51a2f01df87986fb6559c698f0d83c3c08b6e685..1d6c35d3c305a01f86ecd8d5c29fd33604b68ed5 100644 (file)
@@ -233,6 +233,8 @@ struct rx_connection {
     struct rx_call *call[RX_MAXCALLS];
 #endif
     afs_uint32 callNumber[RX_MAXCALLS];        /* Current call numbers */
+    afs_uint32 rwind[RX_MAXCALLS];
+    u_short twind[RX_MAXCALLS];
     afs_uint32 serial;         /* Next outgoing packet serial number */
     afs_uint32 lastSerial;     /* # of last packet received, for computing skew */
     afs_int32 maxSerial;       /* largest serial number seen on incoming packets */
index 563201258d071aab6f34a6f643291ae73d302837..749a380d740ae8a37586533cf75a438bd4b4fc05 100644 (file)
@@ -119,9 +119,9 @@ EXT int rx_BusyError GLOBALSINIT(-1);
 
 EXT int rx_minWindow GLOBALSINIT(1);
 EXT int rx_initReceiveWindow GLOBALSINIT(16);  /* how much to accept */
-EXT int rx_maxReceiveWindow GLOBALSINIT(32);   /* how much to accept */
-EXT int rx_initSendWindow GLOBALSINIT(8);
-EXT int rx_maxSendWindow GLOBALSINIT(32);
+EXT int rx_maxReceiveWindow GLOBALSINIT(64);   /* how much to accept */
+EXT int rx_initSendWindow GLOBALSINIT(16);
+EXT int rx_maxSendWindow GLOBALSINIT(64);
 EXT int rx_nackThreshold GLOBALSINIT(3);       /* Number NACKS to trigger congestion recovery */
 EXT int rx_nDgramThreshold GLOBALSINIT(4);     /* Number of packets before increasing
                                         * packets per datagram */
index 0c019c2c7a079bfb2b6834c98f420806d60b0a63..c2df01c79b76a4746443daca9eb9192dc6d9061c 100644 (file)
@@ -700,7 +700,7 @@ rxi_WriteProc(register struct rx_call *call, register char *buf,
            }
            /* Wait for transmit window to open up */
            while (!call->error
-                  && call->tnext + 1 > call->tfirst + call->twind) {
+                  && call->tnext + 1 > call->tfirst + (2 * call->twind)) {
                clock_NewTime();
                call->startWait = clock_Sec();
 
@@ -1138,7 +1138,7 @@ rxi_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes)
     }
 
     /* Wait for the length of the transmit queue to fall below call->twind */
-    while (!call->error && call->tnext + 1 > call->tfirst + call->twind) {
+    while (!call->error && call->tnext + 1 > call->tfirst + (2 * call->twind)) {
        clock_NewTime();
        call->startWait = clock_Sec();
 #ifdef RX_ENABLE_LOCKS
index 214e2d726668e3cabd679e831ea01665063a5fdc..a7d45368624ce79cb91e0ed92da9d1969fe06d2a 100644 (file)
@@ -60,7 +60,6 @@ RCSID
 #include <stdlib.h>
 #include <limits.h>
 #include <float.h>
-#include <malloc.h>
 #include <assert.h>
 #include "generator.h"