From: Matt Benjamin Date: Tue, 20 May 2008 21:24:16 +0000 (+0000) Subject: rx-connection-clones-20080520 X-Git-Tag: openafs-devel-1_5_61~1082 X-Git-Url: https://git.michaelhowe.org/gitweb/?a=commitdiff_plain;h=f00df529b79d9df64b77dd7b054eb362a92c279e;p=packages%2Fo%2Fopenafs.git rx-connection-clones-20080520 LICENSE IPL10 FIXES 89557 add connection clones to allow more than maxcalls per "connection" --- diff --git a/src/rx/rx.c b/src/rx/rx.c index 3341cb15d..34c3b23a4 100644 --- a/src/rx/rx.c +++ b/src/rx/rx.c @@ -738,54 +738,77 @@ rx_NewConnection(register afs_uint32 shost, u_short sport, u_short sservice, int serviceSecurityIndex) { int hashindex, i; - afs_int32 cid; - register struct rx_connection *conn; + afs_int32 cid, cix, nclones; + register struct rx_connection *conn, *tconn, *ptconn; SPLVAR; clock_NewTime(); dpf(("rx_NewConnection(host %x, port %u, service %u, securityObject %x, serviceSecurityIndex %d)\n", ntohl(shost), ntohs(sport), sservice, securityObject, serviceSecurityIndex)); + conn = tconn = 0; + nclones = rx_max_clones_per_connection; + /* Vasilsi said: "NETPRI protects Cid and Alloc", but can this be true in * the case of kmem_alloc? */ - conn = rxi_AllocConnection(); -#ifdef RX_ENABLE_LOCKS - MUTEX_INIT(&conn->conn_call_lock, "conn call lock", MUTEX_DEFAULT, 0); - MUTEX_INIT(&conn->conn_data_lock, "conn call lock", MUTEX_DEFAULT, 0); - CV_INIT(&conn->conn_call_cv, "conn call cv", CV_DEFAULT, 0); -#endif + NETPRI; MUTEX_ENTER(&rx_connHashTable_lock); - cid = (rx_nextCid += RX_MAXCALLS); - conn->type = RX_CLIENT_CONNECTION; - conn->cid = cid; - conn->epoch = rx_epoch; - conn->peer = rxi_FindPeer(shost, sport, 0, 1); - conn->serviceId = sservice; - conn->securityObject = securityObject; - conn->securityData = (void *) 0; - conn->securityIndex = serviceSecurityIndex; - rx_SetConnDeadTime(conn, rx_connDeadTime); - conn->ackRate = RX_FAST_ACK_RATE; - conn->nSpecific = 0; - conn->specific = NULL; - conn->challengeEvent = NULL; - conn->delayedAbortEvent = NULL; - conn->abortCount = 0; - conn->error = 0; + + /* send in the clones */ + for(cix = 0; cix <= nclones; ++cix) { + + ptconn = tconn; + tconn = rxi_AllocConnection(); + tconn->type = RX_CLIENT_CONNECTION; + tconn->epoch = rx_epoch; + tconn->peer = rxi_FindPeer(shost, sport, 0, 1); + tconn->serviceId = sservice; + tconn->securityObject = securityObject; + tconn->securityData = (void *) 0; + tconn->securityIndex = serviceSecurityIndex; + tconn->ackRate = RX_FAST_ACK_RATE; + tconn->nSpecific = 0; + tconn->specific = NULL; + tconn->challengeEvent = NULL; + tconn->delayedAbortEvent = NULL; + tconn->abortCount = 0; + tconn->error = 0; for (i = 0; i < RX_MAXCALLS; i++) { - conn->twind[i] = rx_initSendWindow; - conn->rwind[i] = rx_initReceiveWindow; + tconn->twind[i] = rx_initSendWindow; + tconn->rwind[i] = rx_initReceiveWindow; } - - RXS_NewConnection(securityObject, conn); - hashindex = - CONN_HASH(shost, sport, conn->cid, conn->epoch, RX_CLIENT_CONNECTION); - - conn->refCount++; /* no lock required since only this thread knows... */ - conn->next = rx_connHashTable[hashindex]; - rx_connHashTable[hashindex] = conn; - rx_MutexIncrement(rx_stats.nClientConns, rx_stats_mutex); + tconn->parent = 0; + tconn->next_clone = 0; + tconn->nclones = nclones; + rx_SetConnDeadTime(tconn, rx_connDeadTime); + + if(cix == 0) { + conn = tconn; + } else { + tconn->flags |= RX_CLONED_CONNECTION; + tconn->parent = conn; + ptconn->next_clone = tconn; + } + + /* generic connection setup */ +#ifdef RX_ENABLE_LOCKS + MUTEX_INIT(&tconn->conn_call_lock, "conn call lock", MUTEX_DEFAULT, 0); + MUTEX_INIT(&tconn->conn_data_lock, "conn data lock", MUTEX_DEFAULT, 0); + CV_INIT(&tconn->conn_call_cv, "conn call cv", CV_DEFAULT, 0); +#endif + cid = (rx_nextCid += RX_MAXCALLS); + tconn->cid = cid; + RXS_NewConnection(securityObject, tconn); + hashindex = + CONN_HASH(shost, sport, tconn->cid, tconn->epoch, + RX_CLIENT_CONNECTION); + tconn->refCount++; /* no lock required since only this thread knows */ + tconn->next = rx_connHashTable[hashindex]; + rx_connHashTable[hashindex] = tconn; + rx_MutexIncrement(rx_stats.nClientConns, rx_stats_mutex); + } + MUTEX_EXIT(&rx_connHashTable_lock); USERPRI; return conn; @@ -794,10 +817,14 @@ rx_NewConnection(register afs_uint32 shost, u_short sport, u_short sservice, void rx_SetConnDeadTime(register struct rx_connection *conn, register int seconds) { - /* The idea is to set the dead time to a value that allows several - * keepalives to be dropped without timing out the connection. */ - conn->secondsUntilDead = MAX(seconds, 6); - conn->secondsUntilPing = conn->secondsUntilDead / 6; + /* The idea is to set the dead time to a value that allows several + * keepalives to be dropped without timing out the connection. */ + struct rx_connection *tconn; + tconn = conn; + do { + tconn->secondsUntilDead = MAX(seconds, 6); + tconn->secondsUntilPing = tconn->secondsUntilDead / 6; + } while(tconn->next_clone && (tconn = tconn->next_clone)); } int rxi_lowPeerRefCount = 0; @@ -864,18 +891,42 @@ rxi_CleanupConnection(struct rx_connection *conn) void rxi_DestroyConnection(register struct rx_connection *conn) { - MUTEX_ENTER(&rx_connHashTable_lock); - rxi_DestroyConnectionNoLock(conn); - /* conn should be at the head of the cleanup list */ - if (conn == rx_connCleanup_list) { + register struct rx_connection *tconn, *dtconn; + + MUTEX_ENTER(&rx_connHashTable_lock); + + if(!(conn->flags & RX_CLONED_CONNECTION)) { + tconn = conn->next_clone; + conn->next_clone = 0; /* once */ + do { + if(tconn) { + dtconn = tconn; + tconn = tconn->next_clone; + rxi_DestroyConnectionNoLock(dtconn); + /* destroyed? */ + if (dtconn == rx_connCleanup_list) { + rx_connCleanup_list = rx_connCleanup_list->next; + MUTEX_EXIT(&rx_connHashTable_lock); + /* rxi_CleanupConnection will free tconn */ + rxi_CleanupConnection(dtconn); + MUTEX_ENTER(&rx_connHashTable_lock); + (conn->nclones)--; + } + } + } while(tconn); + } + + rxi_DestroyConnectionNoLock(conn); + /* conn should be at the head of the cleanup list */ + if (conn == rx_connCleanup_list) { rx_connCleanup_list = rx_connCleanup_list->next; MUTEX_EXIT(&rx_connHashTable_lock); rxi_CleanupConnection(conn); - } + } #ifdef RX_ENABLE_LOCKS - else { + else { MUTEX_EXIT(&rx_connHashTable_lock); - } + } #endif /* RX_ENABLE_LOCKS */ } @@ -1061,6 +1112,7 @@ rx_NewCall(register struct rx_connection *conn) { register int i; register struct rx_call *call; + register struct rx_connection *tconn; struct clock queueTime; SPLVAR; @@ -1103,39 +1155,51 @@ rx_NewCall(register struct rx_connection *conn) } MUTEX_EXIT(&conn->conn_data_lock); + /* search for next free call on this connection or + * its clones, if any */ for (;;) { - for (i = 0; i < RX_MAXCALLS; i++) { - call = conn->call[i]; - if (call) { - MUTEX_ENTER(&call->lock); - if (call->state == RX_STATE_DALLY) { - rxi_ResetCall(call, 0); - (*call->callNumber)++; - break; + tconn = conn; + do { + for (i = 0; i < RX_MAXCALLS; i++) { + call = tconn->call[i]; + if (call) { + MUTEX_ENTER(&call->lock); + if (call->state == RX_STATE_DALLY) { + rxi_ResetCall(call, 0); + (*call->callNumber)++; + goto f_call; + } + MUTEX_EXIT(&call->lock); + } else { + call = rxi_NewCall(tconn, i); + goto f_call; + } + } /* for i < RX_MAXCALLS */ + } while (tconn->next_clone && (tconn = tconn->next_clone)); + + f_call: + + if (i < RX_MAXCALLS) { + break; } - MUTEX_EXIT(&call->lock); - } else { - call = rxi_NewCall(conn, i); - break; - } - } - if (i < RX_MAXCALLS) { - break; - } - MUTEX_ENTER(&conn->conn_data_lock); - conn->flags |= RX_CONN_MAKECALL_WAITING; - conn->makeCallWaiters++; - MUTEX_EXIT(&conn->conn_data_lock); + + /* to be here, all available calls for this connection (and all + * its clones) must be in use */ + + MUTEX_ENTER(&conn->conn_data_lock); + conn->flags |= RX_CONN_MAKECALL_WAITING; + conn->makeCallWaiters++; + MUTEX_EXIT(&conn->conn_data_lock); #ifdef RX_ENABLE_LOCKS - CV_WAIT(&conn->conn_call_cv, &conn->conn_call_lock); + CV_WAIT(&conn->conn_call_cv, &conn->conn_call_lock); #else - osi_rxSleep(conn); + osi_rxSleep(conn); #endif - MUTEX_ENTER(&conn->conn_data_lock); - conn->makeCallWaiters--; - MUTEX_EXIT(&conn->conn_data_lock); - } + MUTEX_ENTER(&conn->conn_data_lock); + conn->makeCallWaiters--; + MUTEX_EXIT(&conn->conn_data_lock); + } /* for ;; */ /* * Wake up anyone else who might be giving us a chance to * run (see code above that avoids resource starvation). diff --git a/src/rx/rx.h b/src/rx/rx.h index ebe0047e1..d3ae90a3b 100644 --- a/src/rx/rx.h +++ b/src/rx/rx.h @@ -212,6 +212,20 @@ returned with an error code of RX_CALL_DEAD ( transient error ) */ #define rx_EnableHotThread() (rx_enable_hot_thread = 1) #define rx_DisableHotThread() (rx_enable_hot_thread = 0) +/* Macros to set max connection clones (each allows RX_MAXCALLS + * outstanding calls */ + +#define rx_SetMaxCalls(v) \ +do {\ + rx_SetCloneMax(v/4); \ +} while(0); + +#define rx_SetCloneMax(v) \ +do {\ + if(v < RX_HARD_MAX_CLONES) \ + rx_max_clones_per_connection = v; \ +} while(0); + #define rx_PutConnection(conn) rx_DestroyConnection(conn) /* A connection is an authenticated communication path, allowing @@ -222,7 +236,9 @@ struct rx_connection_rx_lock { struct rx_peer_rx_lock *peer; #else struct rx_connection { - struct rx_connection *next; /* on hash chain _or_ free list */ + struct rx_connection *next; /* on hash chain _or_ free list */ + struct rx_connection *parent; /* primary connection, if this is a clone */ + struct rx_connection *next_clone; /* next in list of clones */ struct rx_peer *peer; #endif #ifdef RX_ENABLE_LOCKS @@ -230,6 +246,7 @@ struct rx_connection { afs_kcondvar_t conn_call_cv; afs_kmutex_t conn_data_lock; /* locks packet data */ #endif + afs_uint32 nclones; /* count of clone connections (if not a clone) */ afs_uint32 epoch; /* Process start time of client side of connection */ afs_uint32 cid; /* Connection id (call channel is bottom bits) */ afs_int32 error; /* If this connection is in error, this is it */ @@ -429,6 +446,7 @@ struct rx_peer { #define RX_CONN_RESET 16 /* connection is reset, remove */ #define RX_CONN_BUSY 32 /* connection is busy; don't delete */ #define RX_CONN_ATTACHWAIT 64 /* attach waiting for peer->lastReach */ +#define RX_CLONED_CONNECTION 128 /* connection is a clone */ /* Type of connection, client or server */ #define RX_CLIENT_CONNECTION 0 diff --git a/src/rx/rx_globals.h b/src/rx/rx_globals.h index 749a380d7..fc4272fc6 100644 --- a/src/rx/rx_globals.h +++ b/src/rx/rx_globals.h @@ -591,4 +591,13 @@ EXT2 int rx_enable_stats GLOBALSINIT(0); */ EXT int rx_enable_hot_thread GLOBALSINIT(0); +/* + * Set rx_max_clones_per_connection to a value > 0 to enable connection clone + * workaround to RX_MAXCALLS limit. + */ + +#define RX_HARD_MAX_CLONES 10 + +EXT int rx_max_clones_per_connection GLOBALSINIT(2); + #endif /* AFS_RX_GLOBALS_H */