From 5b9b0767966b81a5522a3dade28b6a4d58d4f91e Mon Sep 17 00:00:00 2001 From: Tom Keiser Date: Mon, 30 May 2005 04:41:45 +0000 Subject: [PATCH] STABLE14-rx-fpq-bulk-free-20050529 FIXES 19027 After profiling RX for a while, I've found a few more bottlenecks in the packet handling code. This patch addresses a couple of these issues. The major change in this patch is a new API to allow bulk packet alloc/free ops on rx_queue's of packets. Benefits include reduced lock contention on rx_freePktQ_lock, elimination of a lot of unnecessary cache line invalidates, and reduced register window thrashing on sparc. In addition, this patch dedicates one rx_packet per thread to rxi_SendAck, since that function is in the critical path, and represents a large percentage of execution time. (cherry picked from commit d049ca2ae4e7708df028fca739c2a35ccb906bfb) --- src/rx/rx.c | 61 +++++++------- src/rx/rx_globals.h | 36 +++++++++ src/rx/rx_packet.c | 180 +++++++++++++++++++++++++++++------------ src/rx/rx_packet.h | 1 + src/rx/rx_prototypes.h | 3 +- src/rx/rx_rdwr.c | 96 ++++++---------------- 6 files changed, 226 insertions(+), 151 deletions(-) diff --git a/src/rx/rx.c b/src/rx/rx.c index abe659151..bc3fe35cd 100644 --- a/src/rx/rx.c +++ b/src/rx/rx.c @@ -1929,17 +1929,14 @@ rx_EndCall(register struct rx_call *call, afs_int32 rc) * kernel version, and may interrupt the macros rx_Read or * rx_Write, which run at normal priority for efficiency. */ if (call->currentPacket) { - rxi_FreePacket(call->currentPacket); + queue_Prepend(&call->iovq, call->currentPacket); call->currentPacket = (struct rx_packet *)0; - call->nLeft = call->nFree = call->curlen = 0; - } else - call->nLeft = call->nFree = call->curlen = 0; + } + + call->nLeft = call->nFree = call->curlen = 0; /* Free any packets from the last call to ReadvProc/WritevProc */ - for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) { - queue_Remove(tp); - rxi_FreePacket(tp); - } + rxi_FreePackets(0, &call->iovq); CALL_RELE(call, RX_CALL_REFCOUNT_BEGIN); MUTEX_EXIT(&call->lock); @@ -4111,8 +4108,6 @@ rxi_SetAcksInTransmitQueue(register struct rx_call *call) int someAcked = 0; for (queue_Scan(&call->tq, p, tp, rx_packet)) { - if (!p) - break; p->flags |= RX_PKTFLAG_ACKED; someAcked = 1; } @@ -4147,8 +4142,6 @@ rxi_ClearTransmitQueue(register struct rx_call *call, register int force) if (!force && (call->flags & RX_CALL_TQ_BUSY)) { int someAcked = 0; for (queue_Scan(&call->tq, p, tp, rx_packet)) { - if (!p) - break; p->flags |= RX_PKTFLAG_ACKED; someAcked = 1; } @@ -4158,12 +4151,7 @@ rxi_ClearTransmitQueue(register struct rx_call *call, register int force) } } else { #endif /* AFS_GLOBAL_RXLOCK_KERNEL */ - for (queue_Scan(&call->tq, p, tp, rx_packet)) { - if (!p) - break; - queue_Remove(p); - rxi_FreePacket(p); - } + rxi_FreePackets(0, &call->tq); #ifdef AFS_GLOBAL_RXLOCK_KERNEL call->flags &= ~RX_CALL_TQ_CLEARME; } @@ -4188,15 +4176,8 @@ rxi_ClearTransmitQueue(register struct rx_call *call, register int force) void rxi_ClearReceiveQueue(register struct rx_call *call) { - register struct rx_packet *p, *tp; if (queue_IsNotEmpty(&call->rq)) { - for (queue_Scan(&call->rq, p, tp, rx_packet)) { - if (!p) - break; - queue_Remove(p); - rxi_FreePacket(p); - rx_packetReclaims++; - } + rx_packetReclaims += rxi_FreePackets(0, &call->rq); call->flags &= ~(RX_CALL_RECEIVE_DONE | RX_CALL_HAVE_LAST); } if (call->state == RX_STATE_PRECALL) { @@ -4524,6 +4505,9 @@ rxi_SendAck(register struct rx_call *call, register struct rx_packet *p; u_char offset; afs_int32 templ; +#ifdef RX_ENABLE_TSFPQ + struct rx_ts_info_t * rx_ts_info; +#endif /* * Open the receive window once a thread starts reading packets @@ -4541,24 +4525,41 @@ rxi_SendAck(register struct rx_call *call, if (p) { rx_computelen(p, p->length); /* reset length, you never know */ } /* where that's been... */ +#ifdef RX_ENABLE_TSFPQ + else { + RX_TS_INFO_GET(rx_ts_info); + if ((p = rx_ts_info->local_special_packet)) { + rx_computelen(p, p->length); + } else if ((p = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL))) { + rx_ts_info->local_special_packet = p; + } else { /* We won't send the ack, but don't panic. */ + return optionalPacket; + } + } +#else else if (!(p = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL))) { /* We won't send the ack, but don't panic. */ return optionalPacket; } +#endif templ = rx_AckDataSize(call->rwind) + 4 * sizeof(afs_int32) - rx_GetDataSize(p); if (templ > 0) { - if (rxi_AllocDataBuf(p, templ, RX_PACKET_CLASS_SPECIAL)) { + if (rxi_AllocDataBuf(p, templ, RX_PACKET_CLASS_SPECIAL) > 0) { +#ifndef RX_ENABLE_TSFPQ if (!optionalPacket) rxi_FreePacket(p); +#endif return optionalPacket; } templ = rx_AckDataSize(call->rwind) + 2 * sizeof(afs_int32); if (rx_Contiguous(p) < templ) { +#ifndef RX_ENABLE_TSFPQ if (!optionalPacket) rxi_FreePacket(p); +#endif return optionalPacket; } } @@ -4586,8 +4587,10 @@ rxi_SendAck(register struct rx_call *call, for (offset = 0, queue_Scan(&call->rq, rqp, nxp, rx_packet)) { if (!rqp || !call->rq.next || (rqp->header.seq > (call->rnext + call->rwind))) { +#ifndef RX_ENABLE_TSFPQ if (!optionalPacket) rxi_FreePacket(p); +#endif rxi_CallError(call, RX_CALL_DEAD); return optionalPacket; } @@ -4597,8 +4600,10 @@ rxi_SendAck(register struct rx_call *call, ap->acks[offset++] = RX_ACK_TYPE_ACK; if ((offset > (u_char) rx_maxReceiveWindow) || (offset > call->rwind)) { +#ifndef RX_ENABLE_TSFPQ if (!optionalPacket) rxi_FreePacket(p); +#endif rxi_CallError(call, RX_CALL_DEAD); return optionalPacket; } @@ -4678,8 +4683,10 @@ rxi_SendAck(register struct rx_call *call, MUTEX_ENTER(&rx_stats_mutex); rx_stats.ackPacketsSent++; MUTEX_EXIT(&rx_stats_mutex); +#ifndef RX_ENABLE_TSFPQ if (!optionalPacket) rxi_FreePacket(p); +#endif return optionalPacket; /* Return packet for re-use by caller */ } diff --git a/src/rx/rx_globals.h b/src/rx/rx_globals.h index 2cb72941c..a2a11729a 100644 --- a/src/rx/rx_globals.h +++ b/src/rx/rx_globals.h @@ -162,7 +162,9 @@ typedef struct rx_ts_info_t { /* FPQ stats */ int checkin_ops; + int checkin_xfer; int checkout_ops; + int checkout_xfer; int gtol_ops; int gtol_xfer; int ltog_ops; @@ -170,6 +172,7 @@ typedef struct rx_ts_info_t { int alloc_ops; int alloc_xfer; } _FPQ; + struct rx_packet * local_special_packet; } rx_ts_info_t; EXT struct rx_ts_info_t * rx_ts_info_init(); /* init function for thread-specific data struct */ #define RX_TS_INFO_GET(ts_info_p) \ @@ -216,6 +219,7 @@ EXT struct rx_queue rx_freePacketQueue; (p)->niovecs = 2; \ (p)->length = RX_FIRSTBUFFERSIZE; \ } while(0) + #ifdef RX_ENABLE_LOCKS EXT afs_kmutex_t rx_freePktQ_lock; #endif /* RX_ENABLE_LOCKS */ @@ -329,6 +333,22 @@ EXT void rxi_FlushLocalPacketsTSFPQ(void); /* flush all thread-local packets to RX_FPQ_MARK_USED(p); \ (rx_ts_info_p)->_FPQ.len--; \ (rx_ts_info_p)->_FPQ.checkout_ops++; \ + (rx_ts_info_p)->_FPQ.checkout_xfer++; \ + } while(0) +/* checkout multiple packets from the thread-specific free packet queue */ +#define RX_TS_FPQ_CHECKOUT2(rx_ts_info_p,num_transfer,q) \ + do { \ + register int i; \ + register struct rx_packet *p; \ + for (i=0, p=queue_First(&((rx_ts_info_p)->_FPQ), rx_packet); \ + i < (num_transfer); \ + i++, p=queue_Next(p, rx_packet)) { \ + RX_FPQ_MARK_USED(p); \ + } \ + queue_SplitBeforeAppend(&((rx_ts_info_p)->_FPQ),(q),p); \ + (rx_ts_info_p)->_FPQ.len -= (num_transfer); \ + (rx_ts_info_p)->_FPQ.checkout_ops++; \ + (rx_ts_info_p)->_FPQ.checkout_xfer += (num_transfer); \ } while(0) /* check a packet into the thread-specific free packet queue */ #define RX_TS_FPQ_CHECKIN(rx_ts_info_p,p) \ @@ -337,6 +357,22 @@ EXT void rxi_FlushLocalPacketsTSFPQ(void); /* flush all thread-local packets to RX_FPQ_MARK_FREE(p); \ (rx_ts_info_p)->_FPQ.len++; \ (rx_ts_info_p)->_FPQ.checkin_ops++; \ + (rx_ts_info_p)->_FPQ.checkin_xfer++; \ + } while(0) +/* check multiple packets into the thread-specific free packet queue */ +/* num_transfer must equal length of (q); it is not a means of checking + * in part of (q). passing num_transfer just saves us instructions + * since caller already knows length of (q) for other reasons */ +#define RX_TS_FPQ_CHECKIN2(rx_ts_info_p,num_transfer,q) \ + do { \ + register struct rx_packet *p, *np; \ + for (queue_Scan((q), p, np, rx_packet)) { \ + RX_FPQ_MARK_FREE(p); \ + } \ + queue_SplicePrepend(&((rx_ts_info_p)->_FPQ),(q)); \ + (rx_ts_info_p)->_FPQ.len += (num_transfer); \ + (rx_ts_info_p)->_FPQ.checkin_ops++; \ + (rx_ts_info_p)->_FPQ.checkin_xfer += (num_transfer); \ } while(0) #endif /* AFS_PTHREAD_ENV */ diff --git a/src/rx/rx_packet.c b/src/rx/rx_packet.c index 482f54bc5..b1e468655 100644 --- a/src/rx/rx_packet.c +++ b/src/rx/rx_packet.c @@ -108,6 +108,8 @@ struct rx_packet *rx_mallocedP = 0; extern char cml_version_number[]; extern int (*rx_almostSent) (); +static int AllocPacketBufs(int class, int num_pkts, struct rx_queue *q); + static void rxi_SendDebugPacket(struct rx_packet *apacket, osi_socket asocket, afs_int32 ahost, short aport, afs_int32 istack); @@ -244,39 +246,63 @@ rx_SlowWritePacket(struct rx_packet * packet, int offset, int resid, char *in) return (resid ? (r - resid) : r); } +int +rxi_AllocPackets(int class, int num_pkts, struct rx_queue * q) +{ + register struct rx_packet *p, *np; + + num_pkts = AllocPacketBufs(class, num_pkts, q); + + for (queue_Scan(q, p, np, rx_packet)) { + RX_PACKET_IOV_FULLINIT(p); + } + + return num_pkts; +} + #ifdef RX_ENABLE_TSFPQ -static struct rx_packet * -allocCBuf(int class) +static int +AllocPacketBufs(int class, int num_pkts, struct rx_queue * q) { - struct rx_packet *c; + register struct rx_packet *c; register struct rx_ts_info_t * rx_ts_info; + int transfer, alloc; SPLVAR; RX_TS_INFO_GET(rx_ts_info); - if (queue_IsEmpty(&rx_ts_info->_FPQ)) { + transfer = num_pkts - rx_ts_info->_FPQ.len; + if (transfer > 0) { NETPRI; MUTEX_ENTER(&rx_freePktQ_lock); - - if (queue_IsEmpty(&rx_freePacketQueue)) { - rxi_MorePacketsNoLock(rx_initSendWindow); + + if ((transfer + rx_TSFPQGlobSize) <= rx_nFreePackets) { + transfer += rx_TSFPQGlobSize; + } else if (transfer <= rx_nFreePackets) { + transfer = rx_nFreePackets; + } else { + /* alloc enough for us, plus a few globs for other threads */ + alloc = transfer + (3 * rx_TSFPQGlobSize) - rx_nFreePackets; + rxi_MorePacketsNoLock(MAX(alloc, rx_initSendWindow)); + transfer += rx_TSFPQGlobSize; } - RX_TS_FPQ_GTOL(rx_ts_info); + RX_TS_FPQ_GTOL2(rx_ts_info, transfer); MUTEX_EXIT(&rx_freePktQ_lock); USERPRI; } - RX_TS_FPQ_CHECKOUT(rx_ts_info, c); + RX_TS_FPQ_CHECKOUT2(rx_ts_info, num_pkts, q); - return c; + return num_pkts; } #else /* RX_ENABLE_TSFPQ */ -static struct rx_packet * -allocCBuf(int class) +static int +AllocPacketBufs(int class, int num_pkts, struct rx_queue * q) { struct rx_packet *c; + int i, overq = 0; SPLVAR; NETPRI; @@ -284,8 +310,10 @@ allocCBuf(int class) MUTEX_ENTER(&rx_freePktQ_lock); #ifdef KERNEL - if (rxi_OverQuota(class)) { - c = NULL; + for (; (num_pkts > 0) && (rxi_OverQuota2(class,num_pkts)); + num_pkts--, overq++); + + if (overq) { rxi_NeedMorePackets = TRUE; MUTEX_ENTER(&rx_stats_mutex); switch (class) { @@ -306,27 +334,30 @@ allocCBuf(int class) break; } MUTEX_EXIT(&rx_stats_mutex); - goto done; } - if (queue_IsEmpty(&rx_freePacketQueue)) { - c = NULL; + if (rx_nFreePackets < num_pkts) + num_pkts = rx_nFreePackets; + + if (!num_pkts) { rxi_NeedMorePackets = TRUE; goto done; } #else /* KERNEL */ - if (queue_IsEmpty(&rx_freePacketQueue)) { - rxi_MorePacketsNoLock(rx_initSendWindow); + if (rx_nFreePackets < num_pkts) { + rxi_MorePacketsNoLock(MAX((num_pkts-rx_nFreePackets), rx_initSendWindow)); } #endif /* KERNEL */ - rx_nFreePackets--; - c = queue_First(&rx_freePacketQueue, rx_packet); - queue_Remove(c); - if (!(c->flags & RX_PKTFLAG_FREE)) - osi_Panic("rxi_AllocPacket: packet not free\n"); - c->flags = 0; /* clear RX_PKTFLAG_FREE, initialize the rest */ - c->header.flags = 0; + for (i=0, c=queue_First(&rx_freePacketQueue, rx_packet); + i < num_pkts; + i++, c=queue_Next(c, rx_packet)) { + RX_FPQ_MARK_USED(c); + } + + queue_SplitBeforeAppend(&rx_freePacketQueue,q,c); + + rx_nFreePackets -= num_pkts; #ifdef KERNEL done: @@ -334,7 +365,7 @@ allocCBuf(int class) MUTEX_EXIT(&rx_freePktQ_lock); USERPRI; - return c; + return num_pkts; } #endif /* RX_ENABLE_TSFPQ */ @@ -342,15 +373,22 @@ allocCBuf(int class) * Free a packet currently used as a continuation buffer */ #ifdef RX_ENABLE_TSFPQ -void -rxi_freeCBuf(struct rx_packet *c) +/* num_pkts=0 means queue length is unknown */ +int +rxi_FreePackets(int num_pkts, struct rx_queue * q) { register struct rx_ts_info_t * rx_ts_info; - register int i; + register struct rx_packet *c, *nc; SPLVAR; + if (!num_pkts) { + queue_Count(q, c, nc, rx_packet, num_pkts); + if (!num_pkts) + return 0; + } + RX_TS_INFO_GET(rx_ts_info); - RX_TS_FPQ_CHECKIN(rx_ts_info,c); + RX_TS_FPQ_CHECKIN2(rx_ts_info, num_pkts, q); if (rx_ts_info->_FPQ.len > rx_TSFPQLocalMax) { NETPRI; @@ -364,22 +402,42 @@ rxi_freeCBuf(struct rx_packet *c) MUTEX_EXIT(&rx_freePktQ_lock); USERPRI; } + + return num_pkts; } #else /* RX_ENABLE_TSFPQ */ -void -rxi_freeCBuf(struct rx_packet *c) +/* num_pkts=0 means queue length is unknown */ +int +rxi_FreePackets(int num_pkts, struct rx_queue *q) { + register struct rx_packet *p, *np; SPLVAR; + if (!num_pkts) { + for (queue_Scan(q, p, np, rx_packet), num_pkts++) { + RX_FPQ_MARK_FREE(p); + } + if (!num_pkts) + return 0; + } else { + for (queue_Scan(q, p, np, rx_packet)) { + RX_FPQ_MARK_FREE(p); + } + } + NETPRI; MUTEX_ENTER(&rx_freePktQ_lock); - rxi_FreePacketNoLock(c); + queue_SpliceAppend(&rx_freePacketQueue, q); + rx_nFreePackets += num_pkts; + /* Wakeup anyone waiting for packets */ rxi_PacketsUnWait(); MUTEX_EXIT(&rx_freePktQ_lock); USERPRI; + + return num_pkts; } #endif /* RX_ENABLE_TSFPQ */ @@ -414,24 +472,38 @@ rxi_RoundUpPacket(struct rx_packet *p, unsigned int nb) * returns the number of bytes >0 which it failed to come up with. * Don't need to worry about locking on packet, since only * one thread can manipulate one at a time. Locking on continution - * packets is handled by allocCBuf */ + * packets is handled by AllocPacketBufs */ /* MTUXXX don't need to go throught the for loop if we can trust niovecs */ int rxi_AllocDataBuf(struct rx_packet *p, int nb, int class) { - int i; - - for (i = p->niovecs; nb > 0 && i < RX_MAXWVECS; i++) { - register struct rx_packet *cb; - if ((cb = allocCBuf(class))) { - p->wirevec[i].iov_base = (caddr_t) cb->localdata; - p->wirevec[i].iov_len = RX_CBUFFERSIZE; - nb -= RX_CBUFFERSIZE; - p->length += RX_CBUFFERSIZE; - p->niovecs++; - } else - break; - } + int i, nv; + struct rx_queue q; + register struct rx_packet *cb, *ncb; + + /* compute the number of cbuf's we need */ + nv = nb / RX_CBUFFERSIZE; + if ((nv * RX_CBUFFERSIZE) < nb) + nv++; + if ((nv + p->niovecs) > RX_MAXWVECS) + nv = RX_MAXWVECS - p->niovecs; + if (nv < 1) + return nb; + + /* allocate buffers */ + queue_Init(&q); + nv = AllocPacketBufs(class, nv, &q); + + /* setup packet iovs */ + for (i = p->niovecs, queue_Scan(&q, cb, ncb, rx_packet), i++) { + queue_Remove(cb); + p->wirevec[i].iov_base = (caddr_t) cb->localdata; + p->wirevec[i].iov_len = RX_CBUFFERSIZE; + } + + nb -= (nv * RX_CBUFFERSIZE); + p->length += (nv * RX_CBUFFERSIZE); + p->niovecs += nv; return nb; } @@ -2455,10 +2527,18 @@ rxi_PrepareSendPacket(register struct rx_call *call, if (len > 0) { osi_Panic("PrepareSendPacket 1\n"); /* MTUXXX */ } else { + struct rx_queue q; + int nb; + + queue_Init(&q); + /* Free any extra elements in the wirevec */ - for (j = MAX(2, i); j < p->niovecs; j++) { - rxi_freeCBuf(RX_CBUF_TO_PACKET(p->wirevec[j].iov_base, p)); + for (j = MAX(2, i), nb = j - p->niovecs; j < p->niovecs; j++) { + queue_Append(&q,RX_CBUF_TO_PACKET(p->wirevec[j].iov_base, p)); } + if (nb) + rxi_FreePackets(nb, &q); + p->niovecs = i; p->wirevec[i - 1].iov_len += len; } diff --git a/src/rx/rx_packet.h b/src/rx/rx_packet.h index 851749946..61f16809a 100644 --- a/src/rx/rx_packet.h +++ b/src/rx/rx_packet.h @@ -293,6 +293,7 @@ struct rx_packet { #ifdef KERNEL #define rxi_OverQuota(packetclass) (rx_nFreePackets - 1 < rx_packetQuota[packetclass]) +#define rxi_OverQuota2(packetclass,num_alloc) (rx_nFreePackets - (num_alloc) < rx_packetQuota[packetclass]) #endif /* KERNEL */ /* this returns an afs_int32 from byte offset o in packet p. offset must diff --git a/src/rx/rx_prototypes.h b/src/rx/rx_prototypes.h index 7dade4b31..fd354b9fb 100644 --- a/src/rx/rx_prototypes.h +++ b/src/rx/rx_prototypes.h @@ -439,7 +439,6 @@ extern afs_int32 rx_SlowReadPacket(struct rx_packet *packet, unsigned int offset, int resid, char *out); extern afs_int32 rx_SlowWritePacket(struct rx_packet *packet, int offset, int resid, char *in); -extern void rxi_freeCBuf(struct rx_packet *c); extern int rxi_RoundUpPacket(struct rx_packet *p, unsigned int nb); extern int rxi_AllocDataBuf(struct rx_packet *p, int nb, int cla_ss); extern void rxi_MorePackets(int apackets); @@ -453,6 +452,8 @@ extern int rxi_TrimDataBufs(struct rx_packet *p, int first); extern void rxi_FreePacket(struct rx_packet *p); extern struct rx_packet *rxi_AllocPacketNoLock(int cla_ss); extern struct rx_packet *rxi_AllocPacket(int cla_ss); +extern int rxi_AllocPackets(int cla_ss, int num_pkts, struct rx_queue *q); +extern int rxi_FreePackets(int num_pkts, struct rx_queue *q); extern struct rx_packet *rxi_AllocSendPacket(register struct rx_call *call, int want); extern int rxi_ReadPacket(int socket, register struct rx_packet *p, diff --git a/src/rx/rx_rdwr.c b/src/rx/rx_rdwr.c index 82dc947f5..5e29eeb4b 100644 --- a/src/rx/rx_rdwr.c +++ b/src/rx/rx_rdwr.c @@ -107,18 +107,15 @@ rxi_ReadProc(register struct rx_call *call, register char *buf, { register struct rx_packet *cp = call->currentPacket; register struct rx_packet *rp; - register struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */ register int requestCount; register unsigned int t; + /* XXXX took out clock_NewTime from here. Was it needed? */ requestCount = nbytes; /* Free any packets from the last call to ReadvProc/WritevProc */ - if (!queue_IsEmpty(&call->iovq)) { - for (queue_Scan(&call->iovq, rp, nxp, rx_packet)) { - queue_Remove(rp); - rxi_FreePacket(rp); - } + if (queue_IsNotEmpty(&call->iovq)) { + rxi_FreePackets(0, &call->iovq); } do { @@ -310,12 +307,7 @@ rx_ReadProc(struct rx_call *call, char *buf, int nbytes) * ReadvProc/WritevProc. */ if (!queue_IsEmpty(&call->iovq)) { - register struct rx_packet *rp; - register struct rx_packet *nxp; - for (queue_Scan(&call->iovq, rp, nxp, rx_packet)) { - queue_Remove(rp); - rxi_FreePacket(rp); - } + rxi_FreePackets(0, &call->iovq); } /* @@ -362,12 +354,7 @@ rx_ReadProc32(struct rx_call *call, afs_int32 * value) * ReadvProc/WritevProc. */ if (!queue_IsEmpty(&call->iovq)) { - register struct rx_packet *rp; - register struct rx_packet *nxp; - for (queue_Scan(&call->iovq, rp, nxp, rx_packet)) { - queue_Remove(rp); - rxi_FreePacket(rp); - } + rxi_FreePackets(0, &call->iovq); } /* @@ -570,7 +557,6 @@ rxi_ReadvProc(struct rx_call *call, struct iovec *iov, int *nio, int maxio, int nbytes) { struct rx_packet *rp; - struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */ int requestCount; int nextio; @@ -578,9 +564,8 @@ rxi_ReadvProc(struct rx_call *call, struct iovec *iov, int *nio, int maxio, nextio = 0; /* Free any packets from the last call to ReadvProc/WritevProc */ - for (queue_Scan(&call->iovq, rp, nxp, rx_packet)) { - queue_Remove(rp); - rxi_FreePacket(rp); + if (queue_IsNotEmpty(&call->iovq)) { + rxi_FreePackets(0, &call->iovq); } if (call->mode == RX_MODE_SENDING) { @@ -657,17 +642,12 @@ rxi_WriteProc(register struct rx_call *call, register char *buf, { struct rx_connection *conn = call->conn; register struct rx_packet *cp = call->currentPacket; - register struct rx_packet *tp; /* Temporary packet pointer */ - register struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */ register unsigned int t; int requestCount = nbytes; /* Free any packets from the last call to ReadvProc/WritevProc */ - if (!queue_IsEmpty(&call->iovq)) { - for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) { - queue_Remove(tp); - rxi_FreePacket(tp); - } + if (queue_IsNotEmpty(&call->iovq)) { + rxi_FreePackets(0, &call->iovq); } if (call->mode != RX_MODE_SENDING) { @@ -833,13 +813,8 @@ rx_WriteProc(struct rx_call *call, char *buf, int nbytes) * RX_CALL_IOVEC_WAIT is always cleared before returning from * ReadvProc/WritevProc. */ - if (!queue_IsEmpty(&call->iovq)) { - register struct rx_packet *rp; - register struct rx_packet *nxp; - for (queue_Scan(&call->iovq, rp, nxp, rx_packet)) { - queue_Remove(rp); - rxi_FreePacket(rp); - } + if (queue_IsNotEmpty(&call->iovq)) { + rxi_FreePackets(0, &call->iovq); } /* @@ -885,13 +860,8 @@ rx_WriteProc32(register struct rx_call *call, register afs_int32 * value) * RX_CALL_IOVEC_WAIT is always cleared before returning from * ReadvProc/WritevProc. */ - if (!queue_IsEmpty(&call->iovq)) { - register struct rx_packet *rp; - register struct rx_packet *nxp; - for (queue_Scan(&call->iovq, rp, nxp, rx_packet)) { - queue_Remove(rp); - rxi_FreePacket(rp); - } + if (queue_IsNotEmpty(&call->iovq)) { + rxi_FreePackets(0, &call->iovq); } /* @@ -938,8 +908,6 @@ rxi_WritevAlloc(struct rx_call *call, struct iovec *iov, int *nio, int maxio, { struct rx_connection *conn = call->conn; struct rx_packet *cp = call->currentPacket; - struct rx_packet *tp; /* temporary packet pointer */ - struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */ int requestCount; int nextio; /* Temporary values, real work is done in rxi_WritevProc */ @@ -952,9 +920,8 @@ rxi_WritevAlloc(struct rx_call *call, struct iovec *iov, int *nio, int maxio, nextio = 0; /* Free any packets from the last call to ReadvProc/WritevProc */ - for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) { - queue_Remove(tp); - rxi_FreePacket(tp); + if (queue_IsNotEmpty(&call->iovq)) { + rxi_FreePackets(0, &call->iovq); } if (call->mode != RX_MODE_SENDING) { @@ -1066,8 +1033,6 @@ int rxi_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes) { struct rx_packet *cp = call->currentPacket; - register struct rx_packet *tp; /* Temporary packet pointer */ - register struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */ int nextio; int requestCount; struct rx_queue tmpq; @@ -1092,14 +1057,11 @@ rxi_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes) #endif /* AFS_GLOBAL_RXLOCK_KERNEL */ if (call->error) { - for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) { - queue_Remove(tp); - rxi_FreePacket(tp); - } if (cp) { - rxi_FreePacket(cp); + queue_Prepend(&call->iovq, cp); cp = call->currentPacket = NULL; } + rxi_FreePackets(0, &call->iovq); return 0; } @@ -1126,10 +1088,7 @@ rxi_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes) if (queue_IsEmpty(&call->iovq)) { call->error = RX_PROTOCOL_ERROR; cp = call->currentPacket = NULL; - for (queue_Scan(&tmpq, tp, nxp, rx_packet)) { - queue_Remove(tp); - rxi_FreePacket(tp); - } + rxi_FreePackets(0, &tmpq); return 0; } cp = queue_First(&call->iovq, rx_packet); @@ -1150,14 +1109,11 @@ rxi_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes) if (iov[nextio].iov_base != call->curpos || iov[nextio].iov_len > (int)call->curlen) { call->error = RX_PROTOCOL_ERROR; - for (queue_Scan(&tmpq, tp, nxp, rx_packet)) { - queue_Remove(tp); - rxi_FreePacket(tp); - } if (cp) { - rxi_FreePacket(cp); + queue_Prepend(&tmpq, cp); call->currentPacket = NULL; } + rxi_FreePackets(0, &tmpq); return 0; } nbytes -= iov[nextio].iov_len; @@ -1178,10 +1134,7 @@ rxi_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes) /* Move the packets from the temporary queue onto the transmit queue. * We may end up with more than call->twind packets on the queue. */ - for (queue_Scan(&tmpq, tp, nxp, rx_packet)) { - queue_Remove(tp); - queue_Append(&call->tq, tp); - } + queue_SpliceAppend(&call->tq, &tmpq); if (!(call->flags & (RX_CALL_FAST_RECOVER | RX_CALL_FAST_RECOVER_WAIT))) { rxi_Start(0, call, 0, 0); @@ -1231,13 +1184,10 @@ void rxi_FlushWrite(register struct rx_call *call) { register struct rx_packet *cp = call->currentPacket; - register struct rx_packet *tp; /* Temporary packet pointer */ - register struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */ /* Free any packets from the last call to ReadvProc/WritevProc */ - for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) { - queue_Remove(tp); - rxi_FreePacket(tp); + if (queue_IsNotEmpty(&call->iovq)) { + rxi_FreePackets(0, &call->iovq); } if (call->mode == RX_MODE_SENDING) { -- 2.39.5