From e29f391bded8b0ec097414300dbdebbd8f5ec0a2 Mon Sep 17 00:00:00 2001 From: Simon Wilkinson Date: Fri, 1 Oct 2010 23:17:56 -0400 Subject: [PATCH] rx: Reduce dependence on call->lock This patch reduces our dependence on call->lock, by allowing more of the reader thread to run lock free. Doing so requires that call->mode only be set by the reader thread. As a result, call->mode can only be set to RX_CALL_ERROR by rxi_CallError(). The mode is set to RX_CALL_ERROR by the reader thread immediately after regaining the call->lock when it has been dropped. Reviewed-on: http://gerrit.openafs.org/2880 Tested-by: BuildBot Reviewed-by: Jeffrey Altman Tested-by: Jeffrey Altman Reviewed-by: Simon Wilkinson Reviewed-by: Derrick Brashear Tested-by: Derrick Brashear (cherry picked from commit e445faa68c5ec6e47d3fd9d7318ade71d98703a9) Change-Id: I9042caf4364ce65704e3ddfde159be50d0c11c1b Reviewed-on: http://gerrit.openafs.org/3049 --- src/rx/rx.c | 8 +++- src/rx/rx_packet.c | 22 +++++++--- src/rx/rx_rdwr.c | 106 +++++++++++++++++++++++++++++---------------- 3 files changed, 92 insertions(+), 44 deletions(-) diff --git a/src/rx/rx.c b/src/rx/rx.c index bacea6b88..bceda5cad 100644 --- a/src/rx/rx.c +++ b/src/rx/rx.c @@ -2043,6 +2043,7 @@ rx_EndCall(struct rx_call *call, afs_int32 rc) call->arrivalProc = (void (*)())0; if (rc && call->error == 0) { rxi_CallError(call, rc); + call->mode = RX_MODE_ERROR; /* Send an abort message to the peer if this error code has * only just been set. If it was set previously, assume the * peer has already been sent the error code or will request it @@ -2052,10 +2053,14 @@ rx_EndCall(struct rx_call *call, afs_int32 rc) if (conn->type == RX_SERVER_CONNECTION) { /* Make sure reply or at least dummy reply is sent */ if (call->mode == RX_MODE_RECEIVING) { + MUTEX_EXIT(&call->lock); rxi_WriteProc(call, 0, 0); + MUTEX_ENTER(&call->lock); } if (call->mode == RX_MODE_SENDING) { + MUTEX_EXIT(&call->lock); rxi_FlushWrite(call); + MUTEX_ENTER(&call->lock); } rxi_calltrace(RX_CALL_END, call); /* Call goes to hold state until reply packets are acknowledged */ @@ -2074,7 +2079,9 @@ rx_EndCall(struct rx_call *call, afs_int32 rc) * no reply arguments are expected */ if ((call->mode == RX_MODE_SENDING) || (call->mode == RX_MODE_RECEIVING && call->rnext == 1)) { + MUTEX_EXIT(&call->lock); (void)rxi_ReadProc(call, &dummy, 1); + MUTEX_ENTER(&call->lock); } /* If we had an outstanding delayed ack, be nice to the server @@ -4824,7 +4831,6 @@ rxi_CallError(struct rx_call *call, afs_int32 error) rxi_ResetCall(call, 0); #endif call->error = error; - call->mode = RX_MODE_ERROR; } /* Reset various fields in a call structure, and wakeup waiting diff --git a/src/rx/rx_packet.c b/src/rx/rx_packet.c index c5ba1621b..a8b84cb34 100644 --- a/src/rx/rx_packet.c +++ b/src/rx/rx_packet.c @@ -2689,25 +2689,36 @@ rxi_DecodePacketHeader(struct rx_packet *p) /* Note: top 16 bits of this last word are the security checksum */ } +/* + * LOCKS HELD: called with call->lock held. + * + * PrepareSendPacket is the only place in the code that + * can increment call->tnext. This could become an atomic + * in the future. Beyond that there is nothing in this + * function that requires the call being locked. This + * function can only be called by the application thread. + */ void rxi_PrepareSendPacket(struct rx_call *call, struct rx_packet *p, int last) { struct rx_connection *conn = call->conn; + afs_uint32 seq = call->tnext++; unsigned int i; afs_int32 len; /* len must be a signed type; it can go negative */ + /* No data packets on call 0. Where do these come from? */ + if (*call->callNumber == 0) + *call->callNumber = 1; + + MUTEX_EXIT(&call->lock); p->flags &= ~RX_PKTFLAG_ACKED; p->header.cid = (conn->cid | call->channel); p->header.serviceId = conn->serviceId; p->header.securityIndex = conn->securityIndex; - /* No data packets on call 0. Where do these come from? */ - if (*call->callNumber == 0) - *call->callNumber = 1; - p->header.callNumber = *call->callNumber; - p->header.seq = call->tnext++; + p->header.seq = seq; p->header.epoch = conn->epoch; p->header.type = RX_PACKET_TYPE_DATA; p->header.flags = 0; @@ -2747,6 +2758,7 @@ rxi_PrepareSendPacket(struct rx_call *call, if (len) p->wirevec[i - 1].iov_len += len; RXS_PreparePacket(conn->securityObject, call, p); + MUTEX_ENTER(&call->lock); } /* Given an interface MTU size, calculate an adjusted MTU size that diff --git a/src/rx/rx_rdwr.c b/src/rx/rx_rdwr.c index 27acf1bf9..d0d6a37fc 100644 --- a/src/rx/rx_rdwr.c +++ b/src/rx/rx_rdwr.c @@ -95,7 +95,7 @@ static int rxdb_fileID = RXDB_FILE_RX_RDWR; #endif /* RX_LOCKS_DB */ /* rxi_ReadProc -- internal version. * - * LOCKS USED -- called at netpri with rx global lock and call->lock held. + * LOCKS USED -- called at netpri */ int rxi_ReadProc(struct rx_call *call, char *buf, @@ -120,13 +120,18 @@ rxi_ReadProc(struct rx_call *call, char *buf, do { if (call->nLeft == 0) { /* Get next packet */ + MUTEX_ENTER(&call->lock); for (;;) { if (call->error || (call->mode != RX_MODE_RECEIVING)) { if (call->error) { + call->mode = RX_MODE_ERROR; + MUTEX_EXIT(&call->lock); return 0; } if (call->mode == RX_MODE_SENDING) { + MUTEX_EXIT(&call->lock); rxi_FlushWrite(call); + MUTEX_ENTER(&call->lock); continue; } } @@ -161,7 +166,6 @@ rxi_ReadProc(struct rx_call *call, char *buf, rp = rxi_SendConnectionAbort(conn, rp, 0, 0); MUTEX_EXIT(&conn->conn_data_lock); rxi_FreePacket(rp); - MUTEX_ENTER(&call->lock); return 0; } @@ -233,6 +237,7 @@ rxi_ReadProc(struct rx_call *call, char *buf, /* Are there ever going to be any more packets? */ if (call->flags & RX_CALL_RECEIVE_DONE) { + MUTEX_EXIT(&call->lock); return requestCount - nbytes; } /* Wait for in-sequence packet */ @@ -251,10 +256,12 @@ rxi_ReadProc(struct rx_call *call, char *buf, call->startWait = 0; #ifdef RX_ENABLE_LOCKS if (call->error) { + MUTEX_EXIT(&call->lock); return 0; } #endif /* RX_ENABLE_LOCKS */ } + MUTEX_EXIT(&call->lock); } else /* assert(cp); */ /* MTUXXX this should be replaced by some error-recovery code before shipping */ @@ -350,9 +357,7 @@ rx_ReadProc(struct rx_call *call, char *buf, int nbytes) } NETPRI; - MUTEX_ENTER(&call->lock); bytes = rxi_ReadProc(call, buf, nbytes); - MUTEX_EXIT(&call->lock); USERPRI; return bytes; } @@ -399,9 +404,7 @@ rx_ReadProc32(struct rx_call *call, afs_int32 * value) } NETPRI; - MUTEX_ENTER(&call->lock); bytes = rxi_ReadProc(call, (char *)value, sizeof(afs_int32)); - MUTEX_EXIT(&call->lock); USERPRI; return bytes; @@ -593,12 +596,14 @@ rxi_FillReadVec(struct rx_call *call, afs_uint32 serial) * except the last packet (new current packet) are moved to the iovq * while the application is processing the data. * - * LOCKS USED -- called at netpri with rx global lock and call->lock held. + * LOCKS USED -- called at netpri. */ int rxi_ReadvProc(struct rx_call *call, struct iovec *iov, int *nio, int maxio, int nbytes) { + int bytes; + /* Free any packets from the last call to ReadvProc/WritevProc */ if (queue_IsNotEmpty(&call->iovq)) { #ifdef RXDEBUG_PACKET @@ -611,9 +616,9 @@ rxi_ReadvProc(struct rx_call *call, struct iovec *iov, int *nio, int maxio, rxi_FlushWrite(call); } - if (call->error) { - return 0; - } + MUTEX_ENTER(&call->lock); + if (call->error) + goto error; /* Get whatever data is currently available in the receive queue. * If rxi_FillReadVec sends an ack packet then it is possible @@ -645,15 +650,20 @@ rxi_ReadvProc(struct rx_call *call, struct iovec *iov, int *nio, int maxio, call->startWait = 0; } call->flags &= ~RX_CALL_IOVEC_WAIT; -#ifdef RX_ENABLE_LOCKS - if (call->error) { - return 0; - } -#endif /* RX_ENABLE_LOCKS */ + + if (call->error) + goto error; call->iov = NULL; *nio = call->iovNext; - return nbytes - call->iovNBytes; + bytes = nbytes - call->iovNBytes; + MUTEX_EXIT(&call->lock); + return bytes; + + error: + MUTEX_EXIT(&call->lock); + call->mode = RX_MODE_ERROR; + return 0; } int @@ -664,16 +674,15 @@ rx_ReadvProc(struct rx_call *call, struct iovec *iov, int *nio, int maxio, SPLVAR; NETPRI; - MUTEX_ENTER(&call->lock); bytes = rxi_ReadvProc(call, iov, nio, maxio, nbytes); - MUTEX_EXIT(&call->lock); USERPRI; return bytes; } /* rxi_WriteProc -- internal version. * - * LOCKS USED -- called at netpri with rx global lock and call->lock held. */ + * LOCKS USED -- called at netpri + */ int rxi_WriteProc(struct rx_call *call, char *buf, @@ -716,6 +725,9 @@ rxi_WriteProc(struct rx_call *call, char *buf, * anyway. */ do { if (call->nFree == 0) { + MUTEX_ENTER(&call->lock); + if (call->error) + call->mode = RX_MODE_ERROR; if (!call->error && cp) { /* Clear the current packet now so that if * we are forced to wait and drop the lock @@ -787,6 +799,8 @@ rxi_WriteProc(struct rx_call *call, char *buf, call->startWait = 0; #ifdef RX_ENABLE_LOCKS if (call->error) { + call->mode = RX_MODE_ERROR; + MUTEX_EXIT(&call->lock); return 0; } #endif /* RX_ENABLE_LOCKS */ @@ -807,6 +821,7 @@ rxi_WriteProc(struct rx_call *call, char *buf, cp->wirevec[1].iov_len - call->conn->securityHeaderSize; } if (call->error) { + call->mode = RX_MODE_ERROR; if (cp) { #ifdef RX_TRACK_PACKETS cp->flags &= ~RX_PKTFLAG_CP; @@ -814,8 +829,10 @@ rxi_WriteProc(struct rx_call *call, char *buf, rxi_FreePacket(cp); call->currentPacket = NULL; } + MUTEX_EXIT(&call->lock); return 0; } + MUTEX_EXIT(&call->lock); } if (cp && (int)call->nFree < nbytes) { @@ -907,9 +924,7 @@ rx_WriteProc(struct rx_call *call, char *buf, int nbytes) } NETPRI; - MUTEX_ENTER(&call->lock); bytes = rxi_WriteProc(call, buf, nbytes); - MUTEX_EXIT(&call->lock); USERPRI; return bytes; } @@ -953,9 +968,7 @@ rx_WriteProc32(struct rx_call *call, afs_int32 * value) } NETPRI; - MUTEX_ENTER(&call->lock); bytes = rxi_WriteProc(call, (char *)value, sizeof(afs_int32)); - MUTEX_EXIT(&call->lock); USERPRI; return bytes; } @@ -965,7 +978,8 @@ rx_WriteProc32(struct rx_call *call, afs_int32 * value) * Fill in an iovec to point to data in packet buffers. The application * calls rxi_WritevProc when the buffers are full. * - * LOCKS USED -- called at netpri with rx global lock and call->lock held. */ + * LOCKS USED -- called at netpri. + */ int rxi_WritevAlloc(struct rx_call *call, struct iovec *iov, int *nio, int maxio, @@ -1020,7 +1034,9 @@ rxi_WritevAlloc(struct rx_call *call, struct iovec *iov, int *nio, int maxio, if (tnFree == 0) { /* current packet is full, allocate a new one */ + MUTEX_ENTER(&call->lock); cp = rxi_AllocSendPacket(call, nbytes); + MUTEX_EXIT(&call->lock); if (cp == NULL) { /* out of space, return what we have */ *nio = nextio; @@ -1093,9 +1109,7 @@ rx_WritevAlloc(struct rx_call *call, struct iovec *iov, int *nio, int maxio, SPLVAR; NETPRI; - MUTEX_ENTER(&call->lock); bytes = rxi_WritevAlloc(call, iov, nio, maxio, nbytes); - MUTEX_EXIT(&call->lock); USERPRI; return bytes; } @@ -1104,8 +1118,8 @@ rx_WritevAlloc(struct rx_call *call, struct iovec *iov, int *nio, int maxio, * * Send buffers allocated in rxi_WritevAlloc. * - * LOCKS USED -- called at netpri with rx global lock and call->lock held. */ - + * LOCKS USED -- called at netpri. + */ int rxi_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes) { @@ -1123,7 +1137,10 @@ rxi_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes) requestCount = nbytes; nextio = 0; - if (call->mode != RX_MODE_SENDING) { + MUTEX_ENTER(&call->lock); + if (call->error) { + call->mode = RX_MODE_ERROR; + } else if (call->mode != RX_MODE_SENDING) { call->error = RX_PROTOCOL_ERROR; } #ifdef AFS_GLOBAL_RXLOCK_KERNEL @@ -1142,10 +1159,11 @@ rxi_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes) call->flags &= ~RX_CALL_TQ_WAIT; } #endif /* AFS_GLOBAL_RXLOCK_KERNEL */ - /* cp is no longer valid since we may have given up the lock */ cp = call->currentPacket; if (call->error) { + call->mode = RX_MODE_ERROR; + MUTEX_EXIT(&call->lock); if (cp) { #ifdef RX_TRACK_PACKETS cp->flags &= ~RX_PKTFLAG_CP; @@ -1155,7 +1173,6 @@ rxi_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes) #ifdef RXDEBUG_PACKET call->iovqc++; #endif /* RXDEBUG_PACKET */ - cp = call->currentPacket = (struct rx_packet *)0; } #ifdef RXDEBUG_PACKET call->iovqc -= @@ -1192,6 +1209,7 @@ rxi_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes) /* The head of the iovq is now the current packet */ if (nbytes) { if (queue_IsEmpty(&call->iovq)) { + MUTEX_EXIT(&call->lock); call->error = RX_PROTOCOL_ERROR; #ifdef RXDEBUG_PACKET tmpqc -= @@ -1226,6 +1244,7 @@ rxi_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes) if (iov[nextio].iov_base != call->curpos || iov[nextio].iov_len > (int)call->curlen) { call->error = RX_PROTOCOL_ERROR; + MUTEX_EXIT(&call->lock); if (cp) { #ifdef RX_TRACK_PACKETS cp->flags &= ~RX_PKTFLAG_CP; @@ -1267,6 +1286,10 @@ rxi_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes) p->flags |= RX_PKTFLAG_TQ; } #endif + + if (call->error) + call->mode = RX_MODE_ERROR; + queue_SpliceAppend(&call->tq, &tmpq); if (!(call->flags & (RX_CALL_FAST_RECOVER | RX_CALL_FAST_RECOVER_WAIT))) { @@ -1285,19 +1308,23 @@ rxi_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes) #endif call->startWait = 0; } + /* cp is no longer valid since we may have given up the lock */ cp = call->currentPacket; if (call->error) { + call->mode = RX_MODE_ERROR; + call->currentPacket = NULL; + MUTEX_EXIT(&call->lock); if (cp) { #ifdef RX_TRACK_PACKETS cp->flags &= ~RX_PKTFLAG_CP; #endif rxi_FreePacket(cp); - cp = call->currentPacket = (struct rx_packet *)0; } return 0; } + MUTEX_EXIT(&call->lock); return requestCount - nbytes; } @@ -1309,15 +1336,16 @@ rx_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes) SPLVAR; NETPRI; - MUTEX_ENTER(&call->lock); bytes = rxi_WritevProc(call, iov, nio, nbytes); - MUTEX_EXIT(&call->lock); USERPRI; return bytes; } /* Flush any buffered data to the stream, switch to read mode - * (clients) or to EOF mode (servers) */ + * (clients) or to EOF mode (servers) + * + * LOCKS HELD: called at netpri. + */ void rxi_FlushWrite(struct rx_call *call) { @@ -1350,6 +1378,7 @@ rxi_FlushWrite(struct rx_call *call) } #endif + MUTEX_ENTER(&call->lock); #ifdef AFS_GLOBAL_RXLOCK_KERNEL /* Wait until TQ_BUSY is reset before adding any * packets to the transmit queue @@ -1368,7 +1397,9 @@ rxi_FlushWrite(struct rx_call *call) } #endif /* AFS_GLOBAL_RXLOCK_KERNEL */ - /* cp is no longer valid since we may have given up the lock */ + if (call->error) + call->mode = RX_MODE_ERROR; + cp = call->currentPacket; if (cp) { @@ -1407,6 +1438,7 @@ rxi_FlushWrite(struct rx_call *call) flags & (RX_CALL_FAST_RECOVER | RX_CALL_FAST_RECOVER_WAIT))) { rxi_Start(0, call, 0, 0); } + MUTEX_EXIT(&call->lock); } } @@ -1417,8 +1449,6 @@ rx_FlushWrite(struct rx_call *call) { SPLVAR; NETPRI; - MUTEX_ENTER(&call->lock); rxi_FlushWrite(call); - MUTEX_EXIT(&call->lock); USERPRI; } -- 2.39.5