* to manipulate the queue.
*/
-#if defined(RX_ENABLE_LOCKS) && defined(KERNEL)
+#if defined(RX_ENABLE_LOCKS)
static afs_kmutex_t rx_rpc_stats;
void rxi_StartUnlocked(struct rxevent *event, void *call,
void *arg1, int istack);
return rx_InitHost(htonl(INADDR_ANY), port);
}
+/* RTT Timer
+ * ---------
+ *
+ * The rxi_rto functions implement a TCP (RFC2988) style algorithm for
+ * maintaing the round trip timer.
+ *
+ */
+
+/*!
+ * Start a new RTT timer for a given call and packet.
+ *
+ * There must be no resendEvent already listed for this call, otherwise this
+ * will leak events - intended for internal use within the RTO code only
+ *
+ * @param[in] call
+ * the RX call to start the timer for
+ * @param[in] lastPacket
+ * a flag indicating whether the last packet has been sent or not
+ *
+ * @pre call must be locked before calling this function
+ *
+ */
+static_inline void
+rxi_rto_startTimer(struct rx_call *call, int lastPacket, int istack)
+{
+ struct clock now, retryTime;
+
+ clock_GetTime(&now);
+ retryTime = now;
+
+ clock_Add(&retryTime, &call->rto);
+
+ /* If we're sending the last packet, and we're the client, then the server
+ * may wait for an additional 400ms before returning the ACK, wait for it
+ * rather than hitting a timeout */
+ if (lastPacket && call->conn->type == RX_CLIENT_CONNECTION)
+ clock_Addmsec(&retryTime, 400);
+
+#ifdef RX_ENABLE_LOCKS
+ MUTEX_ENTER(&rx_refcnt_mutex);
+ CALL_HOLD(call, RX_CALL_REFCOUNT_RESEND);
+ MUTEX_EXIT(&rx_refcnt_mutex);
+ call->resendEvent = rxevent_PostNow2(&retryTime, &now, rxi_StartUnlocked,
+ call, 0, istack);
+#else /* RX_ENABLE_LOCKS */
+ call->resendEvent = rxevent_PostNow2(&retryTime, &now, rxi_Start,
+ call, 0, istack);
+#endif /* RX_ENABLE_LOCKS */
+}
+
+/*!
+ * Cancel an RTT timer for a given call.
+ *
+ *
+ * @param[in] call
+ * the RX call to cancel the timer for
+ *
+ * @pre call must be locked before calling this function
+ *
+ */
+
+static_inline void
+rxi_rto_cancel(struct rx_call *call)
+{
+ if (!call->resendEvent)
+ return;
+
+ rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
+}
+
+/*!
+ * Tell the RTO timer that we have sent a packet.
+ *
+ * If the timer isn't already running, then start it. If the timer is running,
+ * then do nothing.
+ *
+ * @param[in] call
+ * the RX call that the packet has been sent on
+ * @param[in] lastPacket
+ * A flag which is true if this is the last packet for the call
+ *
+ * @pre The call must be locked before calling this function
+ *
+ */
+
+static_inline void
+rxi_rto_packet_sent(struct rx_call *call, int lastPacket, int istack)
+{
+ if (call->resendEvent)
+ return;
+
+ rxi_rto_startTimer(call, lastPacket, istack);
+}
+
+/*!
+ * Tell the RTO timer that we have received an new ACK message
+ *
+ * This function should be called whenever a call receives an ACK that
+ * acknowledges new packets. Whatever happens, we stop the current timer.
+ * If there are unacked packets in the queue which have been sent, then
+ * we restart the timer from now. Otherwise, we leave it stopped.
+ *
+ * @param[in] call
+ * the RX call that the ACK has been received on
+ */
+
+static_inline void
+rxi_rto_packet_acked(struct rx_call *call, int istack)
+{
+ struct rx_packet *p, *nxp;
+
+ rxi_rto_cancel(call);
+
+ if (queue_IsEmpty(&call->tq))
+ return;
+
+ for (queue_Scan(&call->tq, p, nxp, rx_packet)) {
+ if (p->header.seq > call->tfirst + call->twind)
+ return;
+
+ if (!(p->flags & RX_PKTFLAG_ACKED) && p->flags & RX_PKTFLAG_SENT) {
+ rxi_rto_startTimer(call, p->header.flags & RX_LAST_PACKET, istack);
+ return;
+ }
+ }
+}
+
+
/**
* Set an initial round trip timeout for a peer connection
*
} else {
call->state = RX_STATE_DALLY;
rxi_ClearTransmitQueue(call, 0);
- rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
+ rxi_rto_cancel(call);
rxevent_Cancel(call->keepAliveEvent, call,
RX_CALL_REFCOUNT_ALIVE);
}
*call->callNumber = np->header.callNumber;
#ifdef RXDEBUG
if (np->header.callNumber == 0)
- dpf(("RecPacket call 0 %d %s: %x.%u.%u.%u.%u.%u.%u flags %d, packet %"AFS_PTR_FMT" resend %d.%.06d len %d",
+ dpf(("RecPacket call 0 %d %s: %x.%u.%u.%u.%u.%u.%u flags %d, packet %"AFS_PTR_FMT" len %d",
np->header.serial, rx_packetTypes[np->header.type - 1], ntohl(conn->peer->host), ntohs(conn->peer->port),
np->header.serial, np->header.epoch, np->header.cid, np->header.callNumber, np->header.seq,
- np->header.flags, np, np->retryTime.sec, np->retryTime.usec / 1000, np->length));
+ np->header.flags, np, np->length));
#endif
call->state = RX_STATE_PRECALL;
clock_GetTime(&call->queueTime);
*call->callNumber = np->header.callNumber;
#ifdef RXDEBUG
if (np->header.callNumber == 0)
- dpf(("RecPacket call 0 %d %s: %x.%u.%u.%u.%u.%u.%u flags %d, packet %"AFS_PTR_FMT" resend %d.%06d len %d",
+ dpf(("RecPacket call 0 %d %s: %x.%u.%u.%u.%u.%u.%u flags %d, packet %"AFS_PTR_FMT" len %d",
np->header.serial, rx_packetTypes[np->header.type - 1], ntohl(conn->peer->host), ntohs(conn->peer->port),
np->header.serial, np->header.epoch, np->header.cid, np->header.callNumber, np->header.seq,
- np->header.flags, np, np->retryTime.sec, np->retryTime.usec, np->length));
+ np->header.flags, np, np->length));
#endif
call->state = RX_STATE_PRECALL;
clock_GetTime(&call->queueTime);
* by the peer.
*
* The four section is packets which have not yet been transmitted.
- * These packets will have a retryTime of 0.
+ * These packets will have a header.serial of 0.
*/
/* First section - implicitly acknowledged packets that can be
missing = 1;
}
- /* If packet isn't yet acked, and it has been transmitted at least
- * once, reset retransmit time using latest timeout
- * ie, this should readjust the retransmit timer for all outstanding
- * packets... So we don't just retransmit when we should know better*/
-
- if (!(tp->flags & RX_PKTFLAG_ACKED) && !clock_IsZero(&tp->retryTime)) {
- tp->retryTime = tp->timeSent;
- clock_Add(&tp->retryTime, &call->rto);
- /* shift by eight because one quarter-sec ~ 256 milliseconds */
- clock_Addmsec(&(tp->retryTime), ((afs_uint32) tp->backoff) << 8);
- }
-
tp = queue_Next(tp, rx_packet);
}
- /* The third case, packets which the ack packet tells us
- * nothing about at all. We just need to adjust the retryTime to match
- * any new timeouts that have been calculated for this peer.
- * We use the fact that we send in order to terminate this loop as soon as
- * we find a packet that has not been sent.
+ /* We don't need to take any action with the 3rd or 4th section in the
+ * queue - they're not addressed by the contents of this ACK packet.
*/
- while (!queue_IsEnd(&call->tq, tp) && !clock_IsZero(&tp->retryTime)) {
- tp->retryTime = tp->timeSent;
- clock_Add(&tp->retryTime, &call->rto);
- clock_Addmsec(&tp->retryTime, ((afs_uint32) tp->backoff) << 8);
- tp = queue_Next(tp, rx_packet);
- }
-
- /* The fourth set of packets - those which have yet to be transmitted,
- * we don't care about at all here */
-
/* If the window has been extended by this acknowledge packet,
* then wakeup a sender waiting in alloc for window space, or try
* sending packets now, if he's been sitting on packets due to
call->nNacks = 0;
}
+ /* If the packet contained new acknowledgements, rather than just
+ * being a duplicate of one we have previously seen, then we can restart
+ * the RTT timer
+ */
+ if (newAckCount > 0)
+ rxi_rto_packet_acked(call, istack);
+
if (call->flags & RX_CALL_FAST_RECOVER) {
if (nNacked) {
call->cwind = MIN((int)(call->cwind + 1), rx_maxSendWindow);
peer->nDgramPackets = call->nDgramPackets;
peer->congestSeq++;
call->congestSeq = peer->congestSeq;
+
/* Reset the resend times on the packets that were nacked
- * so we will retransmit as soon as the window permits*/
+ * so we will retransmit as soon as the window permits
+ */
+
for (acked = 0, queue_ScanBackwards(&call->tq, tp, nxp, rx_packet)) {
if (acked) {
if (!(tp->flags & RX_PKTFLAG_ACKED)) {
- clock_Zero(&tp->retryTime);
+ tp->flags &= ~RX_PKTFLAG_SENT;
}
} else if (tp->flags & RX_PKTFLAG_ACKED) {
acked = 1;
call->flags |= RX_CALL_TQ_SOME_ACKED;
}
- rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
+ rxi_rto_cancel(call);
+
call->tfirst = call->tnext;
call->nSoftAcked = 0;
}
#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
- rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
+ rxi_rto_cancel(call);
call->tfirst = call->tnext; /* implicitly acknowledge all data already sent */
call->nSoftAcked = 0;
int i;
int requestAck = 0;
int lastPacket = 0;
- struct clock now, retryTime;
+ struct clock now;
struct rx_connection *conn = call->conn;
struct rx_peer *peer = conn->peer;
peer->nSent += xmit->len;
if (resending)
peer->reSends += xmit->len;
- retryTime = call->rto;
MUTEX_EXIT(&peer->peer_lock);
if (rx_stats_active) {
}
clock_GetTime(&now);
- clock_Add(&retryTime, &now);
if (xmit->list[xmit->len - 1]->header.flags & RX_LAST_PACKET) {
lastPacket = 1;
for (i = 0; i < xmit->len; i++) {
struct rx_packet *packet = xmit->list[i];
- packet->retryTime = retryTime;
- if (packet->header.serial) {
- /* Exponentially backoff retry times */
- if (packet->backoff < MAXBACKOFF) {
- /* so it can't stay == 0 */
- packet->backoff = (packet->backoff << 1) + 1;
- } else
- packet->backoff++;
- clock_Addmsec(&(packet->retryTime),
- ((afs_uint32) packet->backoff) << 8);
- }
-
- /* Wait a little extra for the ack on the last packet */
- if (lastPacket
- && !(packet->header.flags & RX_CLIENT_INITIATED)) {
- clock_Addmsec(&(packet->retryTime), 400);
- }
-
/* Record the time sent */
packet->timeSent = now;
+ packet->flags |= RX_PKTFLAG_SENT;
/* Ask for an ack on retransmitted packets, on every other packet
* if the peer doesn't support slow start. Ask for an ack on every
if (packet->header.serial) {
requestAck = 1;
} else {
- /* improved RTO calculation- not Karn */
packet->firstSent = now;
if (!lastPacket && (call->cwind <= (u_short) (conn->ackRate + 1)
|| (!(call->flags & RX_CALL_SLOW_START_OK)
CALL_RELE(call, RX_CALL_REFCOUNT_SEND);
MUTEX_EXIT(&rx_refcnt_mutex);
+ /* Tell the RTO calculation engine that we have sent a packet, and
+ * if it was the last one */
+ rxi_rto_packet_sent(call, lastPacket, istack);
+
/* Update last send time for this call (for keep-alive
* processing), and for the connection (so that we can discover
* idle connections) */
struct rx_packet *p;
struct rx_packet *nxp; /* Next pointer for queue_Scan */
- struct clock now, usenow, retryTime;
- int haveEvent;
int nXmitPackets;
int maxXmitPackets;
int resending = 0;
}
if (queue_IsEmpty(&call->tq)) {
- /* Nothing to do */
+ /* Nothing to do. This means that we've been raced, and that an
+ * ACK has come in between when we were triggered, and when we
+ * actually got to run. */
return;
}
+
+ /* Mark all of the pending packets in the queue as being lost */
+ for (queue_Scan(&call->tq, p, nxp, rx_packet)) {
+ if (!(p->flags & RX_PKTFLAG_ACKED))
+ p->flags &= ~RX_PKTFLAG_SENT;
+ }
}
if (call->error) {
if (queue_IsNotEmpty(&call->tq)) { /* If we have anything to send */
- clock_GetTime(&now);
- usenow = now;
-
/* Send (or resend) any packets that need it, subject to
* window restrictions and congestion burst control
* restrictions. Ask for an ack on the last packet sent in
#endif
if (p->flags & RX_PKTFLAG_ACKED) {
/* Since we may block, don't trust this */
- usenow.sec = usenow.usec = 0;
if (rx_stats_active)
rx_MutexIncrement(rx_stats.ignoreAckedPacket, rx_stats_mutex);
continue; /* Ignore this packet if it has been acknowledged */
}
/* Transmit the packet if it needs to be sent. */
- if (!clock_Lt(&now, &p->retryTime)) {
+ if (!(p->flags & RX_PKTFLAG_SENT)) {
if (nXmitPackets == maxXmitPackets) {
rxi_SendXmitList(call, call->xmitList,
nXmitPackets, istack,
resending);
goto restart;
}
- dpf(("call %d xmit packet %"AFS_PTR_FMT" now %u.%06u retryTime %u.%06u\n",
- *(call->callNumber), p,
- now.sec, now.usec,
- p->retryTime.sec, p->retryTime.usec));
+ dpf(("call %d xmit packet %"AFS_PTR_FMT"\n",
+ *(call->callNumber), p));
call->xmitList[nXmitPackets++] = p;
}
}
call->flags |= RX_CALL_TQ_CLEARME;
}
#endif /* RX_ENABLE_LOCKS */
- /* Don't bother doing retransmits if the TQ is cleared. */
- if (call->flags & RX_CALL_TQ_CLEARME) {
+ if (call->flags & RX_CALL_TQ_CLEARME)
rxi_ClearTransmitQueue(call, 1);
- } else
-#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
- {
-
- /* Always post a resend event, if there is anything in the
- * queue, and resend is possible. There should be at least
- * one unacknowledged packet in the queue ... otherwise none
- * of these packets should be on the queue in the first place.
- */
- if (call->resendEvent) {
- /* Cancel the existing event and post a new one */
- rxevent_Cancel(call->resendEvent, call,
- RX_CALL_REFCOUNT_RESEND);
- }
-
- /* The retry time is the retry time on the first unacknowledged
- * packet inside the current window */
- for (haveEvent =
- 0, queue_Scan(&call->tq, p, nxp, rx_packet)) {
- /* Don't set timers for packets outside the window */
- if (p->header.seq >= call->tfirst + call->twind) {
- break;
- }
-
- if (!(p->flags & RX_PKTFLAG_ACKED)
- && !clock_IsZero(&p->retryTime)) {
- haveEvent = 1;
- retryTime = p->retryTime;
- break;
- }
- }
-
- /* Post a new event to re-run rxi_Start when retries may be needed */
- if (haveEvent && !(call->flags & RX_CALL_NEED_START)) {
-#ifdef RX_ENABLE_LOCKS
- MUTEX_ENTER(&rx_refcnt_mutex);
- CALL_HOLD(call, RX_CALL_REFCOUNT_RESEND);
- MUTEX_EXIT(&rx_refcnt_mutex);
- call->resendEvent =
- rxevent_PostNow2(&retryTime, &usenow,
- rxi_StartUnlocked,
- (void *)call, 0, istack);
-#else /* RX_ENABLE_LOCKS */
- call->resendEvent =
- rxevent_PostNow2(&retryTime, &usenow, rxi_Start,
- (void *)call, 0, istack);
-#endif /* RX_ENABLE_LOCKS */
- }
- }
-#ifdef AFS_GLOBAL_RXLOCK_KERNEL
} while (call->flags & RX_CALL_NEED_START);
/*
* TQ references no longer protected by this flag; they must remain
}
#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
} else {
- if (call->resendEvent) {
- rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
- }
+ rxi_rto_cancel(call);
}
}
/* Cancel pending events */
rxevent_Cancel(call->delayedAckEvent, call,
RX_CALL_REFCOUNT_DELAY);
- rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
+ rxi_rto_cancel(call);
rxevent_Cancel(call->keepAliveEvent, call,
RX_CALL_REFCOUNT_ALIVE);
if (call->growMTUEvent)
/* send failed, so let's hurry up the resend, eh? */
if (rx_stats_active)
rx_MutexIncrement(rx_stats.netSendFailures, rx_stats_mutex);
- p->retryTime = p->timeSent; /* resend it very soon */
- clock_Addmsec(&(p->retryTime),
- 10 + (((afs_uint32) p->backoff) << 8));
+ p->flags &= ~RX_PKTFLAG_SENT; /* resend it very soon */
+
/* Some systems are nice and tell us right away that we cannot
* reach this recipient by returning an error code.
* So, when this happens let's "down" the host NOW so
#endif
#ifdef RXDEBUG
}
- dpf(("%c %d %s: %x.%u.%u.%u.%u.%u.%u flags %d, packet %"AFS_PTR_FMT" resend %d.%.3d len %d",
+ dpf(("%c %d %s: %x.%u.%u.%u.%u.%u.%u flags %d, packet %"AFS_PTR_FMT" len %d",
deliveryType, p->header.serial, rx_packetTypes[p->header.type - 1], ntohl(peer->host),
ntohs(peer->port), p->header.serial, p->header.epoch, p->header.cid, p->header.callNumber,
- p->header.seq, p->header.flags, p, p->retryTime.sec, p->retryTime.usec / 1000, p->length));
+ p->header.seq, p->header.flags, p, p->length));
#endif
if (rx_stats_active) {
rx_MutexIncrement(rx_stats.packetsSent[p->header.type - 1], rx_stats_mutex);
rx_MutexIncrement(rx_stats.netSendFailures, rx_stats_mutex);
for (i = 0; i < len; i++) {
p = list[i];
- p->retryTime = p->timeSent; /* resend it very soon */
- clock_Addmsec(&(p->retryTime),
- 10 + (((afs_uint32) p->backoff) << 8));
+ p->flags &= ~RX_PKTFLAG_SENT; /* resend it very soon */
}
/* Some systems are nice and tell us right away that we cannot
* reach this recipient by returning an error code.
osi_Assert(p != NULL);
- dpf(("%c %d %s: %x.%u.%u.%u.%u.%u.%u flags %d, packet %"AFS_PTR_FMT" resend %d.%.3d len %d",
+ dpf(("%c %d %s: %x.%u.%u.%u.%u.%u.%u flags %d, packet %"AFS_PTR_FMT" len %d",
deliveryType, p->header.serial, rx_packetTypes[p->header.type - 1], ntohl(peer->host),
ntohs(peer->port), p->header.serial, p->header.epoch, p->header.cid, p->header.callNumber,
- p->header.seq, p->header.flags, p, p->retryTime.sec, p->retryTime.usec / 1000, p->length));
+ p->header.seq, p->header.flags, p, p->length));
#endif
if (rx_stats_active) {
*call->callNumber = 1;
MUTEX_EXIT(&call->lock);
- p->flags &= ~RX_PKTFLAG_ACKED;
+ p->flags &= ~(RX_PKTFLAG_ACKED | RX_PKTFLAG_SENT);
+
p->header.cid = (conn->cid | call->channel);
p->header.serviceId = conn->serviceId;
p->header.securityIndex = conn->securityIndex;
if (last)
p->header.flags |= RX_LAST_PACKET;
- clock_Zero(&p->retryTime); /* Never yet transmitted */
clock_Zero(&p->firstSent); /* Never yet transmitted */
p->header.serial = 0; /* Another way of saying never transmitted... */
- p->backoff = 0;
/* Now that we're sure this is the last data on the call, make sure
* that the "length" and the sum of the iov_lens matches. */
#endif
for (p = rx_mallocedP; p; p = p->allNextp) {
- RXDPRINTF(RXDPRINTOUT, "%s - packet=0x%p, id=%u, firstSent=%u.%08u, timeSent=%u.%08u, retryTime=%u.%08u, firstSerial=%u, niovecs=%u, flags=0x%x, backoff=%u, length=%u header: epoch=%u, cid=%u, callNum=%u, seq=%u, serial=%u, type=%u, flags=0x%x, userStatus=%u, securityIndex=%u, serviceId=%u\r\n",
- cookie, p, p->packetId, p->firstSent.sec, p->firstSent.usec, p->timeSent.sec, p->timeSent.usec, p->retryTime.sec, p->retryTime.usec,
- p->firstSerial, p->niovecs, (afs_uint32)p->flags, (afs_uint32)p->backoff, (afs_uint32)p->length,
+ RXDPRINTF(RXDPRINTOUT, "%s - packet=0x%p, id=%u, firstSent=%u.%08u, timeSent=%u.%08u, firstSerial=%u, niovecs=%u, flags=0x%x, length=%u header: epoch=%u, cid=%u, callNum=%u, seq=%u, serial=%u, type=%u, flags=0x%x, userStatus=%u, securityIndex=%u, serviceId=%u\r\n",
+ cookie, p, p->packetId, p->firstSent.sec, p->firstSent.usec, p->timeSent.sec, p->timeSent.usec,
+ p->firstSerial, p->niovecs, (afs_uint32)p->flags, (afs_uint32)p->length,
p->header.epoch, p->header.cid, p->header.callNumber, p->header.seq, p->header.serial,
(afs_uint32)p->header.type, (afs_uint32)p->header.flags, (afs_uint32)p->header.userStatus,
(afs_uint32)p->header.securityIndex, (afs_uint32)p->header.serviceId);