From: Simon Wilkinson Date: Fri, 26 Oct 2012 13:50:51 +0000 (+0100) Subject: rx: Refactor rxi_ReceivePacket call selection X-Git-Tag: upstream/1.8.0_pre1^2~1863 X-Git-Url: https://git.michaelhowe.org/gitweb/?a=commitdiff_plain;h=3c6570413c94afe4107634e9ca3f923f3cd30c21;p=packages%2Fo%2Fopenafs.git rx: Refactor rxi_ReceivePacket call selection Refactor the call selection logic in rxi_ReceivePacket so that it is a little bit easier to follow, and better optimised to the common case. Split the current logic into a function for packets being received by a server, and a function for packets being received by a client. Change-Id: Ie27de7952cc13fa3b92619cfe68e671e6d5e170c Reviewed-on: http://gerrit.openafs.org/8297 Reviewed-by: Jeffrey Altman Tested-by: BuildBot Reviewed-by: Derrick Brashear --- diff --git a/src/rx/rx.c b/src/rx/rx.c index 7a255be87..a75401833 100644 --- a/src/rx/rx.c +++ b/src/rx/rx.c @@ -3214,12 +3214,13 @@ rxi_CheckBusy(struct rx_call *call) * or connected to a particular channel */ static_inline int -rxi_AbortIfServerBusy(osi_socket socket, afs_uint32 host, - u_short port, struct rx_packet *np) +rxi_AbortIfServerBusy(osi_socket socket, struct rx_connection *conn, + struct rx_packet *np) { if ((rx_BusyThreshold > 0) && (rx_atomic_read(&rx_nWaiting) > rx_BusyThreshold)) { - rxi_SendRawAbort(socket, host, port, rx_BusyError, np, 0); + rxi_SendRawAbort(socket, conn->peer->host, conn->peer->port, + rx_BusyError, np, 0); if (rx_stats_active) rx_atomic_inc(&rx_stats.nBusies); return 1; @@ -3228,6 +3229,133 @@ rxi_AbortIfServerBusy(osi_socket socket, afs_uint32 host, return 0; } +static_inline struct rx_call * +rxi_ReceiveClientCall(struct rx_packet *np, struct rx_connection *conn) +{ + int channel; + struct rx_call *call; + + channel = np->header.cid & RX_CHANNELMASK; + MUTEX_ENTER(&conn->conn_call_lock); + call = conn->call[channel]; + if (!call || conn->callNumber[channel] != np->header.callNumber) { + MUTEX_EXIT(&conn->conn_call_lock); + if (rx_stats_active) + rx_atomic_inc(&rx_stats.spuriousPacketsRead); + return NULL; + } + + MUTEX_ENTER(&call->lock); + MUTEX_EXIT(&conn->conn_call_lock); + + if ((call->state == RX_STATE_DALLY) + && np->header.type == RX_PACKET_TYPE_ACK) { + if (rx_stats_active) + rx_atomic_inc(&rx_stats.ignorePacketDally); + MUTEX_EXIT(&call->lock); + return NULL; + } + + return call; +} + +static_inline struct rx_call * +rxi_ReceiveServerCall(osi_socket socket, struct rx_packet *np, + struct rx_connection *conn) +{ + int channel; + struct rx_call *call; + + channel = np->header.cid & RX_CHANNELMASK; + MUTEX_ENTER(&conn->conn_call_lock); + call = conn->call[channel]; + + if (!call) { + if (rxi_AbortIfServerBusy(socket, conn, np)) { + MUTEX_EXIT(&conn->conn_call_lock); + return NULL; + } + + call = rxi_NewCall(conn, channel); /* returns locked call */ + *call->callNumber = np->header.callNumber; + MUTEX_EXIT(&conn->conn_call_lock); + + call->state = RX_STATE_PRECALL; + clock_GetTime(&call->queueTime); + call->app.bytesSent = 0; + call->app.bytesRcvd = 0; + rxi_KeepAliveOn(call); + + return call; + } + + if (np->header.callNumber == conn->callNumber[channel]) { + MUTEX_ENTER(&call->lock); + MUTEX_EXIT(&conn->conn_call_lock); + return call; + } + + if (np->header.callNumber < conn->callNumber[channel]) { + MUTEX_EXIT(&conn->conn_call_lock); + if (rx_stats_active) + rx_atomic_inc(&rx_stats.spuriousPacketsRead); + return NULL; + } + + MUTEX_ENTER(&call->lock); + MUTEX_EXIT(&conn->conn_call_lock); + + /* Wait until the transmit queue is idle before deciding + * whether to reset the current call. Chances are that the + * call will be in ether DALLY or HOLD state once the TQ_BUSY + * flag is cleared. + */ +#ifdef RX_ENABLE_LOCKS + if (call->state == RX_STATE_ACTIVE) { + rxi_WaitforTQBusy(call); + /* If we entered error state while waiting, + * must call rxi_CallError to permit rxi_ResetCall + * to processed when the tqWaiter count hits zero. + */ + if (call->error) { + rxi_CallError(call, call->error); + MUTEX_EXIT(&call->lock); + return NULL; + } + } +#endif /* RX_ENABLE_LOCKS */ + /* If the new call cannot be taken right now send a busy and set + * the error condition in this call, so that it terminates as + * quickly as possible */ + if (call->state == RX_STATE_ACTIVE) { + rxi_CallError(call, RX_CALL_DEAD); + rxi_SendSpecial(call, conn, NULL, RX_PACKET_TYPE_BUSY, + NULL, 0, 1); + MUTEX_EXIT(&call->lock); + return NULL; + } + + if (rxi_AbortIfServerBusy(socket, conn, np)) { + MUTEX_EXIT(&call->lock); + return NULL; + } + + rxi_ResetCall(call, 0); + /* The conn_call_lock is not held but no one else should be + * using this call channel while we are processing this incoming + * packet. This assignment should be safe. + */ + *call->callNumber = np->header.callNumber; + call->state = RX_STATE_PRECALL; + clock_GetTime(&call->queueTime); + call->app.bytesSent = 0; + call->app.bytesRcvd = 0; + rxi_KeepAliveOn(call); + + return call; +} + + /* There are two packet tracing routines available for testing and monitoring * Rx. One is called just after every packet is received and the other is * called just before every packet is sent. Received packets, have had their @@ -3252,8 +3380,6 @@ rxi_ReceivePacket(struct rx_packet *np, osi_socket socket, { struct rx_call *call; struct rx_connection *conn; - int channel; - afs_uint32 currentCallNumber; int type; #ifdef RXDEBUG char *packetType; @@ -3409,128 +3535,19 @@ rxi_ReceivePacket(struct rx_packet *np, osi_socket socket, } } - channel = np->header.cid & RX_CHANNELMASK; - MUTEX_ENTER(&conn->conn_call_lock); - call = conn->call[channel]; - - if (call) { - MUTEX_ENTER(&call->lock); - currentCallNumber = conn->callNumber[channel]; - MUTEX_EXIT(&conn->conn_call_lock); - } else if (type == RX_SERVER_CONNECTION) { /* No call allocated */ - - if (rxi_AbortIfServerBusy(socket, host, port, np)) { + if (type == RX_SERVER_CONNECTION) { + call = rxi_ReceiveServerCall(socket, np, conn); + if (call == NULL) { putConnection(conn); return np; - } - - call = rxi_NewCall(conn, channel); /* returns locked call */ - *call->callNumber = currentCallNumber = np->header.callNumber; - MUTEX_EXIT(&conn->conn_call_lock); - call->state = RX_STATE_PRECALL; - clock_GetTime(&call->queueTime); - call->app.bytesSent = 0; - call->app.bytesRcvd = 0; - rxi_KeepAliveOn(call); - - } else { /* RX_CLIENT_CONNECTION and No call allocated */ - /* This packet can't be for this call. If the new call address is - * 0 then no call is running on this channel. If there is a call - * then, since this is a client connection we're getting data for - * it must be for the previous call. - */ - MUTEX_EXIT(&conn->conn_call_lock); - if (rx_stats_active) - rx_atomic_inc(&rx_stats.spuriousPacketsRead); - putConnection(conn); - return np; - } - - /* There is a non-NULL locked call at this point */ - if (type == RX_SERVER_CONNECTION) { /* We're the server */ - if (np->header.callNumber < currentCallNumber) { - MUTEX_EXIT(&call->lock); - if (rx_stats_active) - rx_atomic_inc(&rx_stats.spuriousPacketsRead); - putConnection(conn); - return np; - } else if (np->header.callNumber != currentCallNumber) { - /* Wait until the transmit queue is idle before deciding - * whether to reset the current call. Chances are that the - * call will be in ether DALLY or HOLD state once the TQ_BUSY - * flag is cleared. - */ -#ifdef RX_ENABLE_LOCKS - if (call->state == RX_STATE_ACTIVE) { - rxi_WaitforTQBusy(call); - /* - * If we entered error state while waiting, - * must call rxi_CallError to permit rxi_ResetCall - * to processed when the tqWaiter count hits zero. - */ - if (call->error) { - rxi_CallError(call, call->error); - MUTEX_EXIT(&call->lock); - putConnection(conn); - return np; - } - } -#endif /* RX_ENABLE_LOCKS */ - /* If the new call cannot be taken right now send a busy and set - * the error condition in this call, so that it terminates as - * quickly as possible */ - if (call->state == RX_STATE_ACTIVE) { - struct rx_packet *tp; - - rxi_CallError(call, RX_CALL_DEAD); - tp = rxi_SendSpecial(call, conn, np, RX_PACKET_TYPE_BUSY, - NULL, 0, 1); - MUTEX_EXIT(&call->lock); - putConnection(conn); - return tp; - } - - if (rxi_AbortIfServerBusy(socket, host, port, np)) { - MUTEX_EXIT(&call->lock); - putConnection(conn); - return np; - } - - rxi_ResetCall(call, 0); - /* - * The conn_call_lock is not held but no one else should be - * using this call channel while we are processing this incoming - * packet. This assignment should be safe. - */ - *call->callNumber = np->header.callNumber; - call->state = RX_STATE_PRECALL; - clock_GetTime(&call->queueTime); - call->app.bytesSent = 0; - call->app.bytesRcvd = 0; - rxi_KeepAliveOn(call); - } else { - /* Continuing call; do nothing here. */ - } - } else { /* we're the client */ - /* Ignore all incoming acknowledgements for calls in DALLY state */ - if ((call->state == RX_STATE_DALLY) - && (np->header.type == RX_PACKET_TYPE_ACK)) { - if (rx_stats_active) - rx_atomic_inc(&rx_stats.ignorePacketDally); - MUTEX_EXIT(&call->lock); + } + } else { + call = rxi_ReceiveClientCall(np, conn); + if (call == NULL) { putConnection(conn); return np; } - /* Ignore anything that's not relevant to the current call. If there - * isn't a current call, then no packet is relevant. */ - if (np->header.callNumber != currentCallNumber) { - if (rx_stats_active) - rx_atomic_inc(&rx_stats.spuriousPacketsRead); - MUTEX_EXIT(&call->lock); - putConnection(conn); - return np; - } /* If the service security object index stamped in the packet does not * match the connection's security index, ignore the packet */ if (np->header.securityIndex != conn->securityIndex) {