From 1cca08960b263baabcf7f86b1596f07fb8449223 Mon Sep 17 00:00:00 2001 From: Jeffrey Altman Date: Wed, 28 Jan 2009 20:35:50 +0000 Subject: [PATCH] rx-atomic-macros-and-variables-20090127 LICENSE BSD adds macros to support accessing some variables as atomics, when atomic support is available; otherwise falls back to mutex-protected access. --- src/rx/rx.c | 204 +++++++++++++++++++++++-------------------- src/rx/rx.h | 83 +++++++++--------- src/rx/rx_internal.h | 161 ++++++++++++++++++---------------- src/rx/rx_kcommon.c | 11 +-- src/rx/rx_lwp.c | 4 +- src/rx/rx_multi.c | 2 + src/rx/rx_packet.c | 56 ++++++------ src/rx/rx_user.c | 4 +- 8 files changed, 274 insertions(+), 251 deletions(-) diff --git a/src/rx/rx.c b/src/rx/rx.c index 3f80a3795..9476aea8a 100644 --- a/src/rx/rx.c +++ b/src/rx/rx.c @@ -867,11 +867,11 @@ rx_NewConnection(afs_uint32 shost, u_short sport, u_short sservice, hashindex = CONN_HASH(shost, sport, tconn->cid, tconn->epoch, RX_CLIENT_CONNECTION); - tconn->refCount++; /* no lock required since only this thread knows */ + rx_AtomicIncrement_NL(tconn->refCount); /* no lock required since only this thread knows */ tconn->next = rx_connHashTable[hashindex]; rx_connHashTable[hashindex] = tconn; if (rx_stats_active) - rx_MutexIncrement(rx_stats.nClientConns, rx_stats_mutex); + rx_AtomicIncrement(rx_stats.nClientConns, rx_stats_mutex); } MUTEX_EXIT(&rx_connHashTable_lock); @@ -891,8 +891,8 @@ rx_SetConnDeadTime(struct rx_connection *conn, int seconds) tconn->secondsUntilPing = rx_ConnSecondsUntilDead(tconn) / 6; } -int rxi_lowPeerRefCount = 0; -int rxi_lowConnRefCount = 0; +rx_atomic_t rxi_lowPeerRefCount = 0; +rx_atomic_t rxi_lowConnRefCount = 0; /* * Cleanup a connection that was destroyed in rxi_DestroyConnectioNoLock. @@ -914,23 +914,23 @@ rxi_CleanupConnection(struct rx_connection *conn) * idle (refCount == 0) after rx_idlePeerTime (60 seconds) have passed. */ MUTEX_ENTER(&rx_peerHashTable_lock); - if (conn->peer->refCount < 2) { + if (rx_AtomicDecrement_NL(conn->peer->refCount) < 1) { conn->peer->idleWhen = clock_Sec(); - if (conn->peer->refCount < 1) { - conn->peer->refCount = 1; + if (rx_AtomicPeek_NL(conn->peer->refCount) < 0) { + rx_AtomicSwap_NL(&conn->peer->refCount, 0); + dpf(("UNDERCOUNT(peer %x)\n", conn->peer)); if (rx_stats_active) - rx_MutexIncrement(rxi_lowPeerRefCount, rx_stats_mutex); + rx_AtomicIncrement(rxi_lowPeerRefCount, rx_stats_mutex); } } - conn->peer->refCount--; MUTEX_EXIT(&rx_peerHashTable_lock); if (rx_stats_active) { if (conn->type == RX_SERVER_CONNECTION) - rx_MutexDecrement(rx_stats.nServerConns, rx_stats_mutex); + rx_AtomicDecrement(rx_stats.nServerConns, rx_stats_mutex); else - rx_MutexDecrement(rx_stats.nClientConns, rx_stats_mutex); + rx_AtomicDecrement(rx_stats.nClientConns, rx_stats_mutex); } #ifndef KERNEL if (conn->specific) { @@ -1014,17 +1014,15 @@ rxi_DestroyConnectionNoLock(register struct rx_connection *conn) NETPRI; MUTEX_ENTER(&conn->conn_data_lock); - if (conn->refCount > 0) - conn->refCount--; - else { + /* This requires the atomic type to be signed */ + if (rx_AtomicDecrement_NL(conn->refCount) < 0) { + dpf(("UNDERCOUNT(conn %x)\n", conn)); if (rx_stats_active) { - MUTEX_ENTER(&rx_stats_mutex); - rxi_lowConnRefCount++; - MUTEX_EXIT(&rx_stats_mutex); + rx_AtomicIncrement(rxi_lowConnRefCount, rx_stats_mutex); } } - if ((conn->refCount > 0) || (conn->flags & RX_CONN_BUSY)) { + if ((rx_AtomicPeek_NL(conn->refCount) > 0) || (conn->flags & RX_CONN_BUSY)) { /* Busy; wait till the last guy before proceeding */ MUTEX_EXIT(&conn->conn_data_lock); USERPRI; @@ -1145,7 +1143,7 @@ rx_GetConnection(register struct rx_connection *conn) SPLVAR; NETPRI; - rx_MutexIncrement(conn->refCount, conn->conn_data_lock); + rx_AtomicIncrement(conn->refCount, conn->conn_data_lock); USERPRI; } @@ -2144,7 +2142,7 @@ rx_Finalize(void) next = conn->next; if (conn->type == RX_CLIENT_CONNECTION) { /* MUTEX_ENTER(&conn->conn_data_lock); when used in kernel */ - conn->refCount++; + rx_AtomicIncrement(conn->refCount, conn->conn_data_lock); /* MUTEX_EXIT(&conn->conn_data_lock); when used in kernel */ #ifdef RX_ENABLE_LOCKS rxi_DestroyConnectionNoLock(conn); @@ -2261,7 +2259,7 @@ rxi_NewCall(register struct rx_connection *conn, register int channel) #endif /* AFS_GLOBAL_RXLOCK_KERNEL */ queue_Remove(call); if (rx_stats_active) - rx_MutexDecrement(rx_stats.nFreeCallStructs, rx_stats_mutex); + rx_AtomicDecrement(rx_stats.nFreeCallStructs, rx_stats_mutex); MUTEX_EXIT(&rx_freeCallQueue_lock); MUTEX_ENTER(&call->lock); CLEAR_CALL_QUEUE_LOCK(call); @@ -2283,7 +2281,7 @@ rxi_NewCall(register struct rx_connection *conn, register int channel) rx_allCallsp = call; call->call_id = #endif /* RXDEBUG_PACKET */ - rx_MutexIncrement(rx_stats.nCallStructs, rx_stats_mutex); + rx_AtomicIncrement(rx_stats.nCallStructs, rx_stats_mutex); MUTEX_EXIT(&rx_freeCallQueue_lock); MUTEX_INIT(&call->lock, "call lock", MUTEX_DEFAULT, NULL); @@ -2357,7 +2355,7 @@ rxi_FreeCall(register struct rx_call *call) queue_Append(&rx_freeCallQueue, call); #endif /* AFS_GLOBAL_RXLOCK_KERNEL */ if (rx_stats_active) - rx_MutexIncrement(rx_stats.nFreeCallStructs, rx_stats_mutex); + rx_AtomicIncrement(rx_stats.nFreeCallStructs, rx_stats_mutex); MUTEX_EXIT(&rx_freeCallQueue_lock); /* Destroy the connection if it was previously slated for @@ -2373,7 +2371,7 @@ rxi_FreeCall(register struct rx_call *call) * call lock held or are going through this section of code. */ if (conn->flags & RX_CONN_DESTROY_ME && !(conn->flags & RX_CONN_MAKECALL_WAITING)) { - rx_MutexIncrement(conn->refCount, conn->conn_data_lock); + rx_AtomicIncrement(conn->refCount, conn->conn_data_lock); #ifdef RX_ENABLE_LOCKS if (haveCTLock) rxi_DestroyConnectionNoLock(conn); @@ -2481,14 +2479,14 @@ rxi_FindPeer(register afs_uint32 host, register u_short port, rx_peerHashTable[hashIndex] = pp; rxi_InitPeerParams(pp); if (rx_stats_active) - rx_MutexIncrement(rx_stats.nPeerStructs, rx_stats_mutex); + rx_AtomicIncrement(rx_stats.nPeerStructs, rx_stats_mutex); } } if (pp && create) { - pp->refCount++; + rx_AtomicIncrement_NL(pp->refCount); } if (origPeer) - origPeer->refCount--; + rx_AtomicDecrement_NL(origPeer->refCount); MUTEX_EXIT(&rx_peerHashTable_lock); return pp; } @@ -2592,10 +2590,10 @@ rxi_FindConnection(osi_socket socket, register afs_int32 host, if (service->newConnProc) (*service->newConnProc) (conn); if (rx_stats_active) - rx_MutexIncrement(rx_stats.nServerConns, rx_stats_mutex); + rx_AtomicIncrement(rx_stats.nServerConns, rx_stats_mutex); } - rx_MutexIncrement(conn->refCount, conn->conn_data_lock); + rx_AtomicIncrement(conn->refCount, conn->conn_data_lock); rxLastConn = conn; /* store this connection as the last conn used */ MUTEX_EXIT(&rx_connHashTable_lock); @@ -2706,7 +2704,7 @@ rxi_ReceivePacket(register struct rx_packet *np, osi_socket socket, MUTEX_ENTER(&conn->conn_data_lock); if (np->header.type != RX_PACKET_TYPE_ABORT) np = rxi_SendConnectionAbort(conn, np, 1, 0); - conn->refCount--; + rx_AtomicDecrement_NL(conn->refCount); MUTEX_EXIT(&conn->conn_data_lock); return np; } @@ -2719,22 +2717,22 @@ rxi_ReceivePacket(register struct rx_packet *np, osi_socket socket, afs_int32 errcode = ntohl(rx_GetInt32(np, 0)); dpf(("rxi_ReceivePacket ABORT rx_GetInt32 = %d", errcode)); rxi_ConnectionError(conn, errcode); - rx_MutexDecrement(conn->refCount, conn->conn_data_lock); + rx_AtomicDecrement(conn->refCount, conn->conn_data_lock); return np; } case RX_PACKET_TYPE_CHALLENGE: tnp = rxi_ReceiveChallengePacket(conn, np, 1); - rx_MutexDecrement(conn->refCount, conn->conn_data_lock); + rx_AtomicDecrement(conn->refCount, conn->conn_data_lock); return tnp; case RX_PACKET_TYPE_RESPONSE: tnp = rxi_ReceiveResponsePacket(conn, np, 1); - rx_MutexDecrement(conn->refCount, conn->conn_data_lock); + rx_AtomicDecrement(conn->refCount, conn->conn_data_lock); return tnp; case RX_PACKET_TYPE_PARAMS: case RX_PACKET_TYPE_PARAMS + 1: case RX_PACKET_TYPE_PARAMS + 2: /* ignore these packet types for now */ - rx_MutexDecrement(conn->refCount, conn->conn_data_lock); + rx_AtomicDecrement(conn->refCount, conn->conn_data_lock); return np; @@ -2744,7 +2742,7 @@ rxi_ReceivePacket(register struct rx_packet *np, osi_socket socket, rxi_ConnectionError(conn, RX_PROTOCOL_ERROR); MUTEX_ENTER(&conn->conn_data_lock); tnp = rxi_SendConnectionAbort(conn, np, 1, 0); - conn->refCount--; + rx_AtomicDecrement_NL(conn->refCount); MUTEX_EXIT(&conn->conn_data_lock); return tnp; } @@ -2781,8 +2779,8 @@ rxi_ReceivePacket(register struct rx_packet *np, osi_socket socket, * it must be for the previous call. */ if (rx_stats_active) - rx_MutexIncrement(rx_stats.spuriousPacketsRead, rx_stats_mutex); - rx_MutexDecrement(conn->refCount, conn->conn_data_lock); + rx_AtomicIncrement(rx_stats.spuriousPacketsRead, rx_stats_mutex); + rx_AtomicDecrement(conn->refCount, conn->conn_data_lock); return np; } } @@ -2792,12 +2790,12 @@ rxi_ReceivePacket(register struct rx_packet *np, osi_socket socket, if (type == RX_SERVER_CONNECTION) { /* We're the server */ if (np->header.callNumber < currentCallNumber) { if (rx_stats_active) - rx_MutexIncrement(rx_stats.spuriousPacketsRead, rx_stats_mutex); + rx_AtomicIncrement(rx_stats.spuriousPacketsRead, rx_stats_mutex); #ifdef RX_ENABLE_LOCKS if (call) MUTEX_EXIT(&call->lock); #endif - rx_MutexDecrement(conn->refCount, conn->conn_data_lock); + rx_AtomicDecrement(conn->refCount, conn->conn_data_lock); return np; } if (!call) { @@ -2823,9 +2821,9 @@ rxi_ReceivePacket(register struct rx_packet *np, osi_socket socket, rxi_CallError(call, rx_BusyError); tp = rxi_SendCallAbort(call, np, 1, 0); MUTEX_EXIT(&call->lock); - rx_MutexDecrement(conn->refCount, conn->conn_data_lock); + rx_AtomicDecrement(conn->refCount, conn->conn_data_lock); if (rx_stats_active) - rx_MutexIncrement(rx_stats.nBusies, rx_stats_mutex); + rx_AtomicIncrement(rx_stats.nBusies, rx_stats_mutex); return tp; } rxi_KeepAliveOn(call); @@ -2861,7 +2859,7 @@ rxi_ReceivePacket(register struct rx_packet *np, osi_socket socket, tp = rxi_SendSpecial(call, conn, np, RX_PACKET_TYPE_BUSY, NULL, 0, 1); MUTEX_EXIT(&call->lock); - rx_MutexDecrement(conn->refCount, conn->conn_data_lock); + rx_AtomicDecrement(conn->refCount, conn->conn_data_lock); return tp; } rxi_ResetCall(call, 0); @@ -2884,9 +2882,9 @@ rxi_ReceivePacket(register struct rx_packet *np, osi_socket socket, rxi_CallError(call, rx_BusyError); tp = rxi_SendCallAbort(call, np, 1, 0); MUTEX_EXIT(&call->lock); - rx_MutexDecrement(conn->refCount, conn->conn_data_lock); + rx_AtomicDecrement(conn->refCount, conn->conn_data_lock); if (rx_stats_active) - rx_MutexIncrement(rx_stats.nBusies, rx_stats_mutex); + rx_AtomicIncrement(rx_stats.nBusies, rx_stats_mutex); return tp; } rxi_KeepAliveOn(call); @@ -2898,13 +2896,13 @@ rxi_ReceivePacket(register struct rx_packet *np, osi_socket socket, if (call && (call->state == RX_STATE_DALLY) && (np->header.type == RX_PACKET_TYPE_ACK)) { if (rx_stats_active) - rx_MutexIncrement(rx_stats.ignorePacketDally, rx_stats_mutex); + rx_AtomicIncrement(rx_stats.ignorePacketDally, rx_stats_mutex); #ifdef RX_ENABLE_LOCKS if (call) { MUTEX_EXIT(&call->lock); } #endif - rx_MutexDecrement(conn->refCount, conn->conn_data_lock); + rx_AtomicDecrement(conn->refCount, conn->conn_data_lock); return np; } @@ -2912,13 +2910,13 @@ rxi_ReceivePacket(register struct rx_packet *np, osi_socket socket, * isn't a current call, then no packet is relevant. */ if (!call || (np->header.callNumber != currentCallNumber)) { if (rx_stats_active) - rx_MutexIncrement(rx_stats.spuriousPacketsRead, rx_stats_mutex); + rx_AtomicIncrement(rx_stats.spuriousPacketsRead, rx_stats_mutex); #ifdef RX_ENABLE_LOCKS if (call) { MUTEX_EXIT(&call->lock); } #endif - rx_MutexDecrement(conn->refCount, conn->conn_data_lock); + rx_AtomicDecrement(conn->refCount, conn->conn_data_lock); return np; } /* If the service security object index stamped in the packet does not @@ -2927,7 +2925,7 @@ rxi_ReceivePacket(register struct rx_packet *np, osi_socket socket, #ifdef RX_ENABLE_LOCKS MUTEX_EXIT(&call->lock); #endif - rx_MutexDecrement(conn->refCount, conn->conn_data_lock); + rx_AtomicDecrement(conn->refCount, conn->conn_data_lock); return np; } @@ -2974,9 +2972,9 @@ rxi_ReceivePacket(register struct rx_packet *np, osi_socket socket, * XXX code in receiveackpacket. */ if (ntohl(rx_GetInt32(np, FIRSTACKOFFSET)) < call->tfirst) { if (rx_stats_active) - rx_MutexIncrement(rx_stats.spuriousPacketsRead, rx_stats_mutex); + rx_AtomicIncrement(rx_stats.spuriousPacketsRead, rx_stats_mutex); MUTEX_EXIT(&call->lock); - rx_MutexDecrement(conn->refCount, conn->conn_data_lock); + rx_AtomicDecrement(conn->refCount, conn->conn_data_lock); return np; } } @@ -3038,7 +3036,7 @@ rxi_ReceivePacket(register struct rx_packet *np, osi_socket socket, dpf(("rxi_ReceivePacket ABORT rx_DataOf = %d", errdata)); rxi_CallError(call, errdata); MUTEX_EXIT(&call->lock); - rx_MutexDecrement(conn->refCount, conn->conn_data_lock); + rx_AtomicDecrement(conn->refCount, conn->conn_data_lock); return np; /* xmitting; drop packet */ } case RX_PACKET_TYPE_BUSY: @@ -3084,7 +3082,7 @@ rxi_ReceivePacket(register struct rx_packet *np, osi_socket socket, * (if not, then the time won't actually be re-evaluated here). */ call->lastReceiveTime = clock_Sec(); MUTEX_EXIT(&call->lock); - rx_MutexDecrement(conn->refCount, conn->conn_data_lock); + rx_AtomicDecrement(conn->refCount, conn->conn_data_lock); return np; } @@ -3151,7 +3149,7 @@ rxi_CheckReachEvent(struct rxevent *event, void *arg1, void *arg2) conn->checkReachEvent = NULL; waiting = conn->flags & RX_CONN_ATTACHWAIT; if (event) - conn->refCount--; + rx_AtomicDecrement_NL(conn->refCount); MUTEX_EXIT(&conn->conn_data_lock); if (waiting) { @@ -3188,7 +3186,7 @@ rxi_CheckReachEvent(struct rxevent *event, void *arg1, void *arg2) when.sec += RX_CHECKREACH_TIMEOUT; MUTEX_ENTER(&conn->conn_data_lock); if (!conn->checkReachEvent) { - conn->refCount++; + rx_AtomicIncrement_NL(conn->refCount); conn->checkReachEvent = rxevent_PostNow(&when, &now, rxi_CheckReachEvent, conn, NULL); @@ -3270,7 +3268,7 @@ rxi_ReceiveDataPacket(register struct rx_call *call, struct rx_packet *tnp; struct clock when, now; if (rx_stats_active) - rx_MutexIncrement(rx_stats.dataPacketsRead, rx_stats_mutex); + rx_AtomicIncrement(rx_stats.dataPacketsRead, rx_stats_mutex); #ifdef KERNEL /* If there are no packet buffers, drop this new packet, unless we can find @@ -3281,7 +3279,7 @@ rxi_ReceiveDataPacket(register struct rx_call *call, rxi_NeedMorePackets = TRUE; MUTEX_EXIT(&rx_freePktQ_lock); if (rx_stats_active) - rx_MutexIncrement(rx_stats.noPacketBuffersOnRead, rx_stats_mutex); + rx_AtomicIncrement(rx_stats.noPacketBuffersOnRead, rx_stats_mutex); call->rprev = np->header.serial; rxi_calltrace(RX_TRACE_DROP, call); dpf(("packet %x dropped on receipt - quota problems", np)); @@ -3347,7 +3345,7 @@ rxi_ReceiveDataPacket(register struct rx_call *call, if (queue_IsNotEmpty(&call->rq) && queue_First(&call->rq, rx_packet)->header.seq == seq) { if (rx_stats_active) - rx_MutexIncrement(rx_stats.dupPacketsRead, rx_stats_mutex); + rx_AtomicIncrement(rx_stats.dupPacketsRead, rx_stats_mutex); dpf(("packet %x dropped on receipt - duplicate", np)); rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY); @@ -3436,7 +3434,7 @@ rxi_ReceiveDataPacket(register struct rx_call *call, * application already, then this is a duplicate */ if (seq < call->rnext) { if (rx_stats_active) - rx_MutexIncrement(rx_stats.dupPacketsRead, rx_stats_mutex); + rx_AtomicIncrement(rx_stats.dupPacketsRead, rx_stats_mutex); rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY); np = rxi_SendAck(call, np, serial, RX_ACK_DUPLICATE, istack); @@ -3464,7 +3462,7 @@ rxi_ReceiveDataPacket(register struct rx_call *call, /*Check for duplicate packet */ if (seq == tp->header.seq) { if (rx_stats_active) - rx_MutexIncrement(rx_stats.dupPacketsRead, rx_stats_mutex); + rx_AtomicIncrement(rx_stats.dupPacketsRead, rx_stats_mutex); rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY); np = rxi_SendAck(call, np, serial, RX_ACK_DUPLICATE, @@ -3709,7 +3707,7 @@ rxi_ReceiveAckPacket(register struct rx_call *call, struct rx_packet *np, int maxDgramPackets = 0; /* Set if peer supports AFS 3.5 jumbo datagrams */ if (rx_stats_active) - rx_MutexIncrement(rx_stats.ackPacketsRead, rx_stats_mutex); + rx_AtomicIncrement(rx_stats.ackPacketsRead, rx_stats_mutex); ap = (struct rx_ackPacket *)rx_DataOf(np); nbytes = rx_Contiguous(np) - (int)((ap->acks) - (u_char *) ap); if (nbytes < 0) @@ -4591,7 +4589,7 @@ rxi_ConnectionError(register struct rx_connection *conn, rxevent_Cancel(conn->checkReachEvent, (struct rx_call *)0, 0); conn->checkReachEvent = 0; conn->flags &= ~RX_CONN_ATTACHWAIT; - conn->refCount--; + rx_AtomicDecrement_NL(conn->refCount); } MUTEX_EXIT(&conn->conn_data_lock); @@ -4609,7 +4607,7 @@ rxi_ConnectionError(register struct rx_connection *conn, } rx_SetConnError(conn, error); if (rx_stats_active) - rx_MutexIncrement(rx_stats.fatalErrors, rx_stats_mutex); + rx_AtomicIncrement(rx_stats.fatalErrors, rx_stats_mutex); } } @@ -5048,7 +5046,7 @@ rxi_SendAck(register struct rx_call *call, } } if (rx_stats_active) - rx_MutexIncrement(rx_stats.ackPacketsSent, rx_stats_mutex); + rx_AtomicIncrement(rx_stats.ackPacketsSent, rx_stats_mutex); #ifndef RX_ENABLE_TSFPQ if (!optionalPacket) rxi_FreePacket(p); @@ -5073,7 +5071,7 @@ rxi_SendList(struct rx_call *call, struct rx_packet **list, int len, if (resending) peer->reSends += len; if (rx_stats_active) - rx_MutexIncrement(rx_stats.dataPacketsSent, rx_stats_mutex); + rx_AtomicIncrement(rx_stats.dataPacketsSent, rx_stats_mutex); MUTEX_EXIT(&peer->peer_lock); if (list[len - 1]->header.flags & RX_LAST_PACKET) { @@ -5109,7 +5107,7 @@ rxi_SendList(struct rx_call *call, struct rx_packet **list, int len, if (list[i]->header.serial) { requestAck = 1; if (rx_stats_active) - rx_MutexIncrement(rx_stats.dataPacketsReSent, rx_stats_mutex); + rx_AtomicIncrement(rx_stats.dataPacketsReSent, rx_stats_mutex); } else { /* improved RTO calculation- not Karn */ list[i]->firstSent = *now; @@ -5125,7 +5123,7 @@ rxi_SendList(struct rx_call *call, struct rx_packet **list, int len, if (resending) peer->reSends++; if (rx_stats_active) - rx_MutexIncrement(rx_stats.dataPacketsSent, rx_stats_mutex); + rx_AtomicIncrement(rx_stats.dataPacketsSent, rx_stats_mutex); MUTEX_EXIT(&peer->peer_lock); /* Tag this packet as not being the last in this group, @@ -5426,7 +5424,7 @@ rxi_Start(struct rxevent *event, /* Since we may block, don't trust this */ usenow.sec = usenow.usec = 0; if (rx_stats_active) - rx_MutexIncrement(rx_stats.ignoreAckedPacket, rx_stats_mutex); + rx_AtomicIncrement(rx_stats.ignoreAckedPacket, rx_stats_mutex); continue; /* Ignore this packet if it has been acknowledged */ } @@ -5991,7 +5989,7 @@ rxi_ComputeRoundTripTime(register struct rx_packet *p, rx_stats.maxRtt = *rttp; } clock_Add(&rx_stats.totalRtt, rttp); - rx_stats.nRttSamples++; + rx_AtomicIncrement_NL(rx_stats.nRttSamples); MUTEX_EXIT(&rx_stats_mutex); } @@ -6106,10 +6104,10 @@ rxi_ReapConnections(struct rxevent *unused, void *unused1, void *unused2) /* This only actually destroys the connection if * there are no outstanding calls */ MUTEX_ENTER(&conn->conn_data_lock); - if (!havecalls && !conn->refCount + if (!havecalls && (rx_AtomicPeek_NL(conn->refCount) == 0) && ((conn->lastSendTime + rx_idleConnectionTime) < now.sec)) { - conn->refCount++; /* it will be decr in rx_DestroyConn */ + rx_AtomicIncrement_NL(conn->refCount); /* it will be decr in rx_DestroyConn */ MUTEX_EXIT(&conn->conn_data_lock); #ifdef RX_ENABLE_LOCKS rxi_DestroyConnectionNoLock(conn); @@ -6152,7 +6150,7 @@ rxi_ReapConnections(struct rxevent *unused, void *unused1, void *unused2) for (prev = peer = *peer_ptr; peer; peer = next) { next = peer->next; code = MUTEX_TRYENTER(&peer->peer_lock); - if ((code) && (peer->refCount == 0) + if ((code) && (rx_AtomicPeek_NL(peer->refCount) == 0) && ((peer->idleWhen + rx_idlePeerTime) < now.sec)) { rx_interface_stat_p rpc_stat, nrpc_stat; size_t space; @@ -6177,7 +6175,7 @@ rxi_ReapConnections(struct rxevent *unused, void *unused1, void *unused2) } rxi_FreePeer(peer); if (rx_stats_active) - rx_MutexDecrement(rx_stats.nPeerStructs, rx_stats_mutex); + rx_AtomicDecrement(rx_stats.nPeerStructs, rx_stats_mutex); if (peer == *peer_ptr) { *peer_ptr = next; prev = next; @@ -6492,57 +6490,70 @@ rx_PrintTheseStats(FILE * file, struct rx_statistics *s, int size, } fprintf(file, "rx stats: free packets %d, allocs %d, ", (int)freePackets, - s->packetRequests); + rx_AtomicPeek_NL(s->packetRequests)); if (version >= RX_DEBUGI_VERSION_W_NEWPACKETTYPES) { fprintf(file, "alloc-failures(rcv %d/%d,send %d/%d,ack %d)\n", - s->receivePktAllocFailures, s->receiveCbufPktAllocFailures, - s->sendPktAllocFailures, s->sendCbufPktAllocFailures, - s->specialPktAllocFailures); + rx_AtomicPeek_NL(s->receivePktAllocFailures), + rx_AtomicPeek_NL(s->receiveCbufPktAllocFailures), + rx_AtomicPeek_NL(s->sendPktAllocFailures), + rx_AtomicPeek_NL(s->sendCbufPktAllocFailures), + rx_AtomicPeek_NL(s->specialPktAllocFailures)); } else { fprintf(file, "alloc-failures(rcv %d,send %d,ack %d)\n", - s->receivePktAllocFailures, s->sendPktAllocFailures, - s->specialPktAllocFailures); + rx_AtomicPeek_NL(s->receivePktAllocFailures), + rx_AtomicPeek_NL(s->sendPktAllocFailures), + rx_AtomicPeek_NL(s->specialPktAllocFailures)); } fprintf(file, " greedy %d, " "bogusReads %d (last from host %x), " "noPackets %d, " "noBuffers %d, " "selects %d, " - "sendSelects %d\n", s->socketGreedy, s->bogusPacketOnRead, - s->bogusHost, s->noPacketOnRead, s->noPacketBuffersOnRead, - s->selects, s->sendSelects); + "sendSelects %d\n", + rx_AtomicPeek_NL(s->socketGreedy), + rx_AtomicPeek_NL(s->bogusPacketOnRead), + rx_AtomicPeek_NL(s->bogusHost), + rx_AtomicPeek_NL(s->noPacketOnRead), + rx_AtomicPeek_NL(s->noPacketBuffersOnRead), + rx_AtomicPeek_NL(s->selects), + rx_AtomicPeek_NL(s->sendSelects)); fprintf(file, " packets read: "); for (i = 0; i < RX_N_PACKET_TYPES; i++) { - fprintf(file, "%s %d ", rx_packetTypes[i], s->packetsRead[i]); + fprintf(file, "%s %d ", rx_packetTypes[i], rx_AtomicPeek_NL(s->packetsRead[i])); } fprintf(file, "\n"); fprintf(file, " other read counters: data %d, " "ack %d, " "dup %d " - "spurious %d " "dally %d\n", s->dataPacketsRead, - s->ackPacketsRead, s->dupPacketsRead, s->spuriousPacketsRead, - s->ignorePacketDally); + "spurious %d " "dally %d\n", rx_AtomicPeek_NL(s->dataPacketsRead), + rx_AtomicPeek_NL(s->ackPacketsRead), + rx_AtomicPeek_NL(s->dupPacketsRead), + rx_AtomicPeek_NL(s->spuriousPacketsRead), + rx_AtomicPeek_NL(s->ignorePacketDally)); fprintf(file, " packets sent: "); for (i = 0; i < RX_N_PACKET_TYPES; i++) { - fprintf(file, "%s %d ", rx_packetTypes[i], s->packetsSent[i]); + fprintf(file, "%s %d ", rx_packetTypes[i], rx_AtomicPeek_NL(s->packetsSent[i])); } fprintf(file, "\n"); fprintf(file, " other send counters: ack %d, " "data %d (not resends), " "resends %d, " "pushed %d, " "acked&ignored %d\n", - s->ackPacketsSent, s->dataPacketsSent, s->dataPacketsReSent, - s->dataPacketsPushed, s->ignoreAckedPacket); + rx_AtomicPeek_NL(s->ackPacketsSent), + rx_AtomicPeek_NL(s->dataPacketsSent), + rx_AtomicPeek_NL(s->dataPacketsReSent), + rx_AtomicPeek_NL(s->dataPacketsPushed), + rx_AtomicPeek_NL(s->ignoreAckedPacket)); fprintf(file, " \t(these should be small) sendFailed %d, " "fatalErrors %d\n", - s->netSendFailures, (int)s->fatalErrors); + rx_AtomicPeek_NL(s->netSendFailures), rx_AtomicPeek_NL(s->fatalErrors)); - if (s->nRttSamples) { + if (rx_AtomicPeek_NL(s->nRttSamples)) { fprintf(file, " Average rtt is %0.3f, with %d samples\n", - clock_Float(&s->totalRtt) / s->nRttSamples, s->nRttSamples); + clock_Float(&s->totalRtt) / rx_AtomicPeek_NL(s->nRttSamples), rx_AtomicPeek_NL(s->nRttSamples)); fprintf(file, " Minimum rtt is %0.3f, maximum is %0.3f\n", clock_Float(&s->minRtt), clock_Float(&s->maxRtt)); @@ -6551,8 +6562,11 @@ rx_PrintTheseStats(FILE * file, struct rx_statistics *s, int size, fprintf(file, " %d server connections, " "%d client connections, " "%d peer structs, " "%d call structs, " "%d free call structs\n", - s->nServerConns, s->nClientConns, s->nPeerStructs, - s->nCallStructs, s->nFreeCallStructs); + rx_AtomicPeek_NL(s->nServerConns), + rx_AtomicPeek_NL(s->nClientConns), + rx_AtomicPeek_NL(s->nPeerStructs), + rx_AtomicPeek_NL(s->nCallStructs), + rx_AtomicPeek_NL(s->nFreeCallStructs)); #if !defined(AFS_PTHREAD_ENV) && !defined(AFS_USE_GETTIMEOFDAY) fprintf(file, " %d clock updates\n", clock_nUpdates); @@ -7056,7 +7070,7 @@ shutdown_rx(void) next = peer->next; rxi_FreePeer(peer); if (rx_stats_active) - rx_MutexDecrement(rx_stats.nPeerStructs, rx_stats_mutex); + rx_AtomicDecrement(rx_stats.nPeerStructs, rx_stats_mutex); } } } diff --git a/src/rx/rx.h b/src/rx/rx.h index 3f63d04d4..bfe8c6a0c 100644 --- a/src/rx/rx.h +++ b/src/rx/rx.h @@ -255,6 +255,8 @@ do {\ rx_max_clones_per_connection = v; \ } while(0); +typedef afs_int32 rx_atomic_t; + #define rx_PutConnection(conn) rx_DestroyConnection(conn) /* A connection is an authenticated communication path, allowing @@ -296,7 +298,7 @@ struct rx_connection { /* client-- to retransmit the challenge */ struct rx_service *service; /* used by servers only */ u_short serviceId; /* To stamp on requests (clients only) */ - afs_uint32 refCount; /* Reference count */ + rx_atomic_t refCount; /* Reference count */ u_char flags; /* Defined below */ u_char type; /* Type of connection, defined below */ u_char secondsUntilPing; /* how often to ping for each active call */ @@ -417,7 +419,8 @@ struct rx_peer { /* For garbage collection */ afs_uint32 idleWhen; /* When the refcountwent to zero */ - afs_uint32 refCount; /* Reference count for this structure */ + rx_atomic_t refCount; /* Reference count */ + /* Congestion control parameters */ u_char burstSize; /* Reinitialization size for the burst parameter */ @@ -852,47 +855,47 @@ struct rx_securityClass { * must equal sizeof(afs_int32). */ struct rx_statistics { /* General rx statistics */ - int packetRequests; /* Number of packet allocation requests */ - int receivePktAllocFailures; - int sendPktAllocFailures; - int specialPktAllocFailures; - int socketGreedy; /* Whether SO_GREEDY succeeded */ - int bogusPacketOnRead; /* Number of inappropriately short packets received */ - int bogusHost; /* Host address from bogus packets */ - int noPacketOnRead; /* Number of read packets attempted when there was actually no packet to read off the wire */ - int noPacketBuffersOnRead; /* Number of dropped data packets due to lack of packet buffers */ - int selects; /* Number of selects waiting for packet or timeout */ - int sendSelects; /* Number of selects forced when sending packet */ - int packetsRead[RX_N_PACKET_TYPES]; /* Total number of packets read, per type */ - int dataPacketsRead; /* Number of unique data packets read off the wire */ - int ackPacketsRead; /* Number of ack packets read */ - int dupPacketsRead; /* Number of duplicate data packets read */ - int spuriousPacketsRead; /* Number of inappropriate data packets */ - int packetsSent[RX_N_PACKET_TYPES]; /* Number of rxi_Sends: packets sent over the wire, per type */ - int ackPacketsSent; /* Number of acks sent */ - int pingPacketsSent; /* Total number of ping packets sent */ - int abortPacketsSent; /* Total number of aborts */ - int busyPacketsSent; /* Total number of busies sent received */ - int dataPacketsSent; /* Number of unique data packets sent */ - int dataPacketsReSent; /* Number of retransmissions */ - int dataPacketsPushed; /* Number of retransmissions pushed early by a NACK */ - int ignoreAckedPacket; /* Number of packets with acked flag, on rxi_Start */ + rx_atomic_t packetRequests; /* Number of packet allocation requests */ + rx_atomic_t receivePktAllocFailures; + rx_atomic_t sendPktAllocFailures; + rx_atomic_t specialPktAllocFailures; + rx_atomic_t socketGreedy; /* Whether SO_GREEDY succeeded */ + rx_atomic_t bogusPacketOnRead; /* Number of inappropriately short packets received */ + rx_atomic_t bogusHost; /* Host address from bogus packets */ + rx_atomic_t noPacketOnRead; /* Number of read packets attempted when there was actually no packet to read off the wire */ + rx_atomic_t noPacketBuffersOnRead; /* Number of dropped data packets due to lack of packet buffers */ + rx_atomic_t selects; /* Number of selects waiting for packet or timeout */ + rx_atomic_t sendSelects; /* Number of selects forced when sending packet */ + rx_atomic_t packetsRead[RX_N_PACKET_TYPES]; /* Total number of packets read, per type */ + rx_atomic_t dataPacketsRead; /* Number of unique data packets read off the wire */ + rx_atomic_t ackPacketsRead; /* Number of ack packets read */ + rx_atomic_t dupPacketsRead; /* Number of duplicate data packets read */ + rx_atomic_t spuriousPacketsRead; /* Number of inappropriate data packets */ + rx_atomic_t packetsSent[RX_N_PACKET_TYPES]; /* Number of rxi_Sends: packets sent over the wire, per type */ + rx_atomic_t ackPacketsSent; /* Number of acks sent */ + rx_atomic_t pingPacketsSent; /* Total number of ping packets sent */ + rx_atomic_t abortPacketsSent; /* Total number of aborts */ + rx_atomic_t busyPacketsSent; /* Total number of busies sent received */ + rx_atomic_t dataPacketsSent; /* Number of unique data packets sent */ + rx_atomic_t dataPacketsReSent; /* Number of retransmissions */ + rx_atomic_t dataPacketsPushed; /* Number of retransmissions pushed early by a NACK */ + rx_atomic_t ignoreAckedPacket; /* Number of packets with acked flag, on rxi_Start */ struct clock totalRtt; /* Total round trip time measured (use to compute average) */ struct clock minRtt; /* Minimum round trip time measured */ struct clock maxRtt; /* Maximum round trip time measured */ - int nRttSamples; /* Total number of round trip samples */ - int nServerConns; /* Total number of server connections */ - int nClientConns; /* Total number of client connections */ - int nPeerStructs; /* Total number of peer structures */ - int nCallStructs; /* Total number of call structures allocated */ - int nFreeCallStructs; /* Total number of previously allocated free call structures */ - int netSendFailures; - afs_int32 fatalErrors; - int ignorePacketDally; /* packets dropped because call is in dally state */ - int receiveCbufPktAllocFailures; - int sendCbufPktAllocFailures; - int nBusies; - int spares[4]; + rx_atomic_t nRttSamples; /* Total number of round trip samples */ + rx_atomic_t nServerConns; /* Total number of server connections */ + rx_atomic_t nClientConns; /* Total number of client connections */ + rx_atomic_t nPeerStructs; /* Total number of peer structures */ + rx_atomic_t nCallStructs; /* Total number of call structures allocated */ + rx_atomic_t nFreeCallStructs; /* Total number of previously allocated free call structures */ + rx_atomic_t netSendFailures; + rx_atomic_t fatalErrors; + rx_atomic_t ignorePacketDally; /* packets dropped because call is in dally state */ + rx_atomic_t receiveCbufPktAllocFailures; + rx_atomic_t sendCbufPktAllocFailures; + rx_atomic_t nBusies; + rx_atomic_t spares[4]; }; /* structures for debug input and output packets */ diff --git a/src/rx/rx_internal.h b/src/rx/rx_internal.h index f03f62b47..6e716d3e1 100644 --- a/src/rx/rx_internal.h +++ b/src/rx/rx_internal.h @@ -43,80 +43,76 @@ #include #pragma intrinsic(_InterlockedOr) #pragma intrinsic(_InterlockedAnd) -#define rx_MutexOr(object, operand, mutex) _InterlockedOr(&object, operand) -#define rx_MutexAnd(object, operand, mutex) _InterlockedAnd(&object, operand) -#endif -#else -#define rx_MutexOr(object, operand, mutex) InterlockedOr(&object, operand) -#define rx_MutexAnd(object, operand, mutex) InterlockedAnd(&object, operand) -#endif -#define rx_MutexIncrement(object, mutex) InterlockedIncrement(&object) -#define rx_MutexXor(object, operand, mutex) InterlockedXor(&object, operand) -#define rx_MutexAdd(object, addend, mutex) InterlockedExchangeAdd(&object, addend) -#define rx_MutexDecrement(object, mutex) InterlockedDecrement(&object) -#define rx_MutexAdd1Increment2(object1, addend, object2, mutex) \ - do { \ - MUTEX_ENTER(&mutex); \ - object1 += addend; \ - InterlockedIncrement(&object2); \ - MUTEX_EXIT(&mutex); \ - } while (0) -#define rx_MutexAdd1Decrement2(object1, addend, object2, mutex) \ - do { \ - MUTEX_ENTER(&mutex); \ - object1 += addend; \ - InterlockedDecrement(&object2); \ - MUTEX_EXIT(&mutex); \ - } while (0) +#define rx_AtomicOr(object, operand, mutex) _InterlockedOr(&object, operand) +#define rx_AtomicAnd(object, operand, mutex) _InterlockedAnd(&object, operand) +#endif /* __cplusplus */ +#else /* !WIN64 */ +#define rx_AtomicOr(object, operand, mutex) InterlockedOr(&object, operand) +#define rx_AtomicAnd(object, operand, mutex) InterlockedAnd(&object, operand) +#endif /* WIN64 */ +#define rx_AtomicIncrement_NL(object) InterlockedIncrement(&object) +#define rx_AtomicIncrement(object, mutex) InterlockedIncrement(&object) +#define rx_AtomicXor(object, operand, mutex) InterlockedXor(&object, operand) +#define rx_AtomicAdd_NL(object, addend) InterlockedExchangeAdd(&object, addend) +#define rx_AtomicAdd(object, addend, mutex) InterlockedExchangeAdd(&object, addend) +#define rx_AtomicDecrement_NL(object) InterlockedDecrement(&object) +#define rx_AtomicDecrement(object, mutex) InterlockedDecrement(&object) +#define rx_AtomicSwap_NL(object1, object2) InterlockedExchange ((volatile LONG *) object1, object2); +#define rx_AtomicSwap(object1, object2, mutex) InterlockedExchange ((volatile LONG *) object1, object2); #elif defined(AFS_DARWIN80_ENV) -#define rx_MutexIncrement(object, mutex) OSAtomicIncrement32(&object) -#define rx_MutexOr(object, operand, mutex) OSAtomicOr32(operand, &object) -#define rx_MutexAnd(object, operand, mutex) OSAtomicAnd32(operand, &object) -#define rx_MutexXor(object, operand, mutex) OSAtomicXor32(operand, &object) -#define rx_MutexAdd(object, addend, mutex) OSAtomicAdd32(addend, &object) -#define rx_MutexDecrement(object, mutex) OSAtomicDecrement32(&object) -#define rx_MutexAdd1Increment2(object1, addend, object2, mutex) \ - do { \ - MUTEX_ENTER(&mutex); \ - object1 += addend; \ - OSAtomicIncrement32(&object2); \ - MUTEX_EXIT(&mutex); \ - } while (0) -#define rx_MutexAdd1Decrement2(object1, addend, object2, mutex) \ - do { \ - MUTEX_ENTER(&mutex); \ - object1 += addend; \ - OSAtomicDecrement32(&object2); \ - MUTEX_EXIT(&mutex); \ - } while (0) +#define rx_AtomicIncrement_NL(object) OSAtomicIncrement32(&object) +#define rx_AtomicIncrement(object, mutex) OSAtomicIncrement32(&object) +#define rx_AtomicOr(object, operand, mutex) OSAtomicOr32(operand, &object) +#define rx_AtomicAnd(object, operand, mutex) OSAtomicAnd32(operand, &object) +#define rx_AtomicXor(object, operand, mutex) OSAtomicXor32(operand, &object) +#define rx_AtomicAdd_NL(object, addend) OSAtomicAdd32(addend, &object) +#define rx_AtomicAdd(object, addend, mutex) OSAtomicAdd32(addend, &object) +#define rx_AtomicDecrement_NL(object) OSAtomicDecrement32(&object) +#define rx_AtomicDecrement(object, mutex) OSAtomicDecrement32(&object) +#define rx_AtomicSwap_NL(oldval, newval) rx_AtomicSwap_int(oldval, newval) +#define rx_AtomicSwap(oldval, newval, mutex) rx_AtomicSwap_int(oldval, newval) +static inline afs_int32 rx_AtomicSwap_int(afs_int32 *oldval, afs_int32 newval) { + afs_int32 ret = *oldval; + OSAtomicCompareAndSwap32 ((afs_int32) *oldval,(afs_int32) newval, + (afs_int32*) oldval); + return ret; +} #elif defined(AFS_SUN58_ENV) -#define rx_MutexIncrement(object, mutex) atomic_inc_32(&object) -#define rx_MutexOr(object, operand, mutex) atomic_or_32(&object, operand) -#define rx_MutexAnd(object, operand, mutex) atomic_and_32(&object, operand) -#define rx_MutexXor(object, operand, mutex) \ - do { \ - MUTEX_ENTER(&mutex); \ - object ^= operand; \ - MUTEX_EXIT(&mutex); \ - } while(0) -#define rx_MutexXor(object, operand, mutex) OSAtomicXor32Barrier(operand, &object) -#define rx_MutexAdd(object, addend, mutex) atomic_add_32(&object, addend) -#define rx_MutexDecrement(object, mutex) atomic_dec_32(&object) -#define rx_MutexAdd1Increment2(object1, addend, object2, mutex) \ - do { \ - MUTEX_ENTER(&mutex); \ - object1 += addend; \ - atomic_inc_32(&object2); \ - MUTEX_EXIT(&mutex); \ - } while (0) -#define rx_MutexAdd1Decrement2(object1, addend, object2, mutex) \ - do { \ - MUTEX_ENTER(&mutex); \ - object1 += addend; \ - atomic_dec_32(&object2); \ - MUTEX_EXIT(&mutex); \ - } while (0) +#define rx_AtomicIncrement_NL(object) atomic_inc_32(&object) +#define rx_AtomicIncrement(object, mutex) atomic_inc_32(&object) +#define rx_AtomicOr(object, operand, mutex) atomic_or_32(&object, operand) +#define rx_AtomicAnd(object, operand, mutex) atomic_and_32(&object, operand) +#define rx_AtomicAdd_NL(object, addend) atomic_add_32(&object, addend) +#define rx_AtomicAdd(object, addend, mutex) atomic_add_32(&object, addend) +#define rx_AtomicDecrement_NL(object) atomic_dec_32(&object) +#define rx_AtomicDecrement(object, mutex) atomic_dec_32(&object) +#define rx_AtomicSwap_NL(oldval, newval) rx_AtomicSwap_int(oldval, newval) +#define rx_AtomicSwap(oldval, newval, mutex) rx_AtomicSwap_int(oldval, newval) +static inline afs_int32 rx_AtomicSwap_int(afs_int32 *oldval, afs_int32 newval) { + afs_int32 ret = *oldval; + atomic_cas_32((afs_int32) *oldval,(afs_int32) newval, + (afs_int32*) oldval); + return ret; +} #else +#define rx_AtomicIncrement_NL(object) (object)++ +#define rx_AtomicIncrement(object, mutex) rx_MutexIncrement(object, mutex) +#define rx_AtomicOr(object, operand, mutex) rx_MutexOr(object, operand, mutex) +#define rx_AtomicAnd(object, operand, mutex) rx_MutexAnd(object, operand, mutex) +#define rx_AtomicAdd_NL(object, addend) object += addend +#define rx_AtomicAdd(object, addend, mutex) rx_MutexAdd(object, addand, mutex) +#define rx_AtomicDecrement_NL(object) (object)-- +#define rx_AtomicDecrement(object, mutex) rx_MutexDecrement(object, mutex) +#define rx_AtomicSwap_NL(oldval, newval) rx_AtomicSwap_int(oldval, newval) +#define rx_AtomicSwap(oldval, newval, mutex) rx_AtomicSwap_int(oldval, newval) +static inline afs_int32 rx_AtomicSwap_int(afs_int32 *oldval, afs_int32 newval) { + afs_int32 ret = *oldval; + *oldval = newval; + return ret; +} +#endif +#define rx_AtomicPeek_NL(object) rx_AtomicAdd_NL(object, 0) +#define rx_AtomicPeek(object, mutex) rx_AtomicAdd(object, 0, mutex) #define rx_MutexIncrement(object, mutex) \ do { \ MUTEX_ENTER(&mutex); \ @@ -147,6 +143,12 @@ object += addend; \ MUTEX_EXIT(&mutex); \ } while(0) +#define rx_MutexDecrement(object, mutex) \ + do { \ + MUTEX_ENTER(&mutex); \ + object--; \ + MUTEX_EXIT(&mutex); \ + } while(0) #define rx_MutexAdd1Increment2(object1, addend, object2, mutex) \ do { \ MUTEX_ENTER(&mutex); \ @@ -161,12 +163,19 @@ object2--; \ MUTEX_EXIT(&mutex); \ } while(0) -#define rx_MutexDecrement(object, mutex) \ + +#define rx_MutexAdd1AtomicIncrement2(object1, addend, object2, mutex) \ do { \ MUTEX_ENTER(&mutex); \ - object--; \ + object1 += addend; \ + rx_AtomicIncrement(&object2); \ MUTEX_EXIT(&mutex); \ - } while(0) -#endif - + } while (0) +#define rx_MutexAdd1AtomicDecrement2(object1, addend, object2, mutex) \ + do { \ + MUTEX_ENTER(&mutex); \ + object1 += addend; \ + rx_AtomicDecrement(&object2); \ + MUTEX_EXIT(&mutex); \ + } while (0) #endif /* _RX_INTERNAL_H */ diff --git a/src/rx/rx_kcommon.c b/src/rx/rx_kcommon.c index 0b56e0f32..9dac1121b 100644 --- a/src/rx/rx_kcommon.c +++ b/src/rx/rx_kcommon.c @@ -1197,8 +1197,8 @@ rxk_ReadPacket(osi_socket so, struct rx_packet *p, int *host, int *port) if (nbytes <= 0) { if (rx_stats_active) { MUTEX_ENTER(&rx_stats_mutex); - rx_stats.bogusPacketOnRead++; - rx_stats.bogusHost = from.sin_addr.s_addr; + rx_AtomicIncrement_NL(rx_stats.bogusPacketOnRead); + rx_AtomicSwap_NL(&rx_stats.bogusHost, from.sin_addr.s_addr); MUTEX_EXIT(&rx_stats_mutex); } dpf(("B: bogus packet from [%x,%d] nb=%d", @@ -1212,11 +1212,8 @@ rxk_ReadPacket(osi_socket so, struct rx_packet *p, int *host, int *port) *host = from.sin_addr.s_addr; *port = from.sin_port; if (p->header.type > 0 && p->header.type < RX_N_PACKET_TYPES) { - if (rx_stats_active) { - MUTEX_ENTER(&rx_stats_mutex); - rx_stats.packetsRead[p->header.type - 1]++; - MUTEX_EXIT(&rx_stats_mutex); - } + if (rx_stats_active) + rx_AtomicIncrement(rx_stats.packetsRead[p->header.type - 1], rx_stats_mutex); } /* Free any empty packet buffers at the end of this packet */ diff --git a/src/rx/rx_lwp.c b/src/rx/rx_lwp.c index bef6846d6..f4c80a948 100644 --- a/src/rx/rx_lwp.c +++ b/src/rx/rx_lwp.c @@ -212,7 +212,7 @@ rxi_ListenerProc(fd_set * rfds, int *tnop, struct rx_call **newcallp) tv.tv_usec = cv.usec; tvp = &tv; } - rx_stats.selects++; + rx_AtomicIncrement(rx_stats.selects, rx_stats_mutex); *rfds = rx_selectMask; @@ -435,7 +435,7 @@ rxi_Sendmsg(osi_socket socket, struct msghdr *msg_p, int flags) fd_set *sfds = (fd_set *) 0; while (sendmsg(socket, msg_p, flags) == -1) { int err; - rx_stats.sendSelects++; + rx_AtomicIncrement(rx_stats.sendSelects, rx_stats_mutex); if (!sfds) { if (!(sfds = IOMGR_AllocFDSet())) { diff --git a/src/rx/rx_multi.c b/src/rx/rx_multi.c index d15fc920f..9201081ee 100644 --- a/src/rx/rx_multi.c +++ b/src/rx/rx_multi.c @@ -17,9 +17,11 @@ RCSID #include "afs/sysincludes.h" #include "rx/rx_internal.h" #include "rx/rx.h" +#include "rx/rx_globals.h" #else /* KERNEL */ # include "rx_internal.h" # include "rx.h" +# include "rx_globals.h" #endif /* KERNEL */ /* diff --git a/src/rx/rx_packet.c b/src/rx/rx_packet.c index c11fbb9c6..10fb2e3ab 100644 --- a/src/rx/rx_packet.c +++ b/src/rx/rx_packet.c @@ -316,19 +316,19 @@ AllocPacketBufs(int class, int num_pkts, struct rx_queue * q) if (rx_stats_active) { switch (class) { case RX_PACKET_CLASS_RECEIVE: - rx_MutexIncrement(rx_stats.receivePktAllocFailures, rx_stats_mutex); + rx_AtomicIncrement(rx_stats.receivePktAllocFailures, rx_stats_mutex); break; case RX_PACKET_CLASS_SEND: - rx_MutexIncrement(rx_stats.sendPktAllocFailures, rx_stats_mutex); + rx_AtomicIncrement(rx_stats.sendPktAllocFailures, rx_stats_mutex); break; case RX_PACKET_CLASS_SPECIAL: - rx_MutexIncrement(rx_stats.specialPktAllocFailures, rx_stats_mutex); + rx_AtomicIncrement(rx_stats.specialPktAllocFailures, rx_stats_mutex); break; case RX_PACKET_CLASS_RECV_CBUF: - rx_MutexIncrement(rx_stats.receiveCbufPktAllocFailures, rx_stats_mutex); + rx_AtomicIncrement(rx_stats.receiveCbufPktAllocFailures, rx_stats_mutex); break; case RX_PACKET_CLASS_SEND_CBUF: - rx_MutexIncrement(rx_stats.sendCbufPktAllocFailures, rx_stats_mutex); + rx_AtomicIncrement(rx_stats.sendCbufPktAllocFailures, rx_stats_mutex); break; } } @@ -1127,19 +1127,19 @@ rxi_AllocPacketNoLock(int class) if (rx_stats_active) { switch (class) { case RX_PACKET_CLASS_RECEIVE: - rx_MutexIncrement(rx_stats.receivePktAllocFailures, rx_stats_mutex); + rx_AtomicIncrement(rx_stats.receivePktAllocFailures, rx_stats_mutex); break; case RX_PACKET_CLASS_SEND: - rx_MutexIncrement(rx_stats.sendPktAllocFailures, rx_stats_mutex); + rx_AtomicIncrement(rx_stats.sendPktAllocFailures, rx_stats_mutex); break; case RX_PACKET_CLASS_SPECIAL: - rx_MutexIncrement(rx_stats.specialPktAllocFailures, rx_stats_mutex); + rx_AtomicIncrement(rx_stats.specialPktAllocFailures, rx_stats_mutex); break; case RX_PACKET_CLASS_RECV_CBUF: - rx_MutexIncrement(rx_stats.receiveCbufPktAllocFailures, rx_stats_mutex); + rx_AtomicIncrement(rx_stats.receiveCbufPktAllocFailures, rx_stats_mutex); break; case RX_PACKET_CLASS_SEND_CBUF: - rx_MutexIncrement(rx_stats.sendCbufPktAllocFailures, rx_stats_mutex); + rx_AtomicIncrement(rx_stats.sendCbufPktAllocFailures, rx_stats_mutex); break; } } @@ -1148,7 +1148,7 @@ rxi_AllocPacketNoLock(int class) #endif /* KERNEL */ if (rx_stats_active) - rx_MutexIncrement(rx_stats.packetRequests, rx_stats_mutex); + rx_AtomicIncrement(rx_stats.packetRequests, rx_stats_mutex); if (queue_IsEmpty(&rx_ts_info->_FPQ)) { #ifdef KERNEL @@ -1187,19 +1187,19 @@ rxi_AllocPacketNoLock(int class) if (rx_stats_active) { switch (class) { case RX_PACKET_CLASS_RECEIVE: - rx_MutexIncrement(rx_stats.receivePktAllocFailures, rx_stats_mutex); + rx_AtomicIncrement(rx_stats.receivePktAllocFailures, rx_stats_mutex); break; case RX_PACKET_CLASS_SEND: - rx_MutexIncrement(rx_stats.sendPktAllocFailures, rx_stats_mutex); + rx_AtomicIncrement(rx_stats.sendPktAllocFailures, rx_stats_mutex); break; case RX_PACKET_CLASS_SPECIAL: - rx_MutexIncrement(rx_stats.specialPktAllocFailures, rx_stats_mutex); + rx_AtomicIncrement(rx_stats.specialPktAllocFailures, rx_stats_mutex); break; case RX_PACKET_CLASS_RECV_CBUF: - rx_MutexIncrement(rx_stats.receiveCbufPktAllocFailures, rx_stats_mutex); + rx_AtomicIncrement(rx_stats.receiveCbufPktAllocFailures, rx_stats_mutex); break; case RX_PACKET_CLASS_SEND_CBUF: - rx_MutexIncrement(rx_stats.sendCbufPktAllocFailures, rx_stats_mutex); + rx_AtomicIncrement(rx_stats.sendCbufPktAllocFailures, rx_stats_mutex); break; } } @@ -1208,7 +1208,7 @@ rxi_AllocPacketNoLock(int class) #endif /* KERNEL */ if (rx_stats_active) - rx_MutexIncrement(rx_stats.packetRequests, rx_stats_mutex); + rx_AtomicIncrement(rx_stats.packetRequests, rx_stats_mutex); #ifdef KERNEL if (queue_IsEmpty(&rx_freePacketQueue)) @@ -1245,7 +1245,7 @@ rxi_AllocPacketTSFPQ(int class, int pull_global) RX_TS_INFO_GET(rx_ts_info); if (rx_stats_active) - rx_MutexIncrement(rx_stats.packetRequests, rx_stats_mutex); + rx_AtomicIncrement(rx_stats.packetRequests, rx_stats_mutex); if (pull_global && queue_IsEmpty(&rx_ts_info->_FPQ)) { MUTEX_ENTER(&rx_freePktQ_lock); @@ -1466,12 +1466,12 @@ rxi_ReadPacket(osi_socket socket, register struct rx_packet *p, afs_uint32 * hos if ((nbytes > tlen) || (p->length & 0x8000)) { /* Bogus packet */ if (nbytes < 0 && errno == EWOULDBLOCK) { if (rx_stats_active) - rx_MutexIncrement(rx_stats.noPacketOnRead, rx_stats_mutex); + rx_AtomicIncrement(rx_stats.noPacketOnRead, rx_stats_mutex); } else if (nbytes <= 0) { if (rx_stats_active) { MUTEX_ENTER(&rx_stats_mutex); - rx_stats.bogusPacketOnRead++; - rx_stats.bogusHost = from.sin_addr.s_addr; + rx_AtomicIncrement_NL(rx_stats.bogusPacketOnRead); + rx_AtomicSwap(&rx_stats.bogusHost, from.sin_addr.s_addr, rx_stats_mutex); MUTEX_EXIT(&rx_stats_mutex); } dpf(("B: bogus packet from [%x,%d] nb=%d", ntohl(from.sin_addr.s_addr), @@ -1504,7 +1504,7 @@ rxi_ReadPacket(osi_socket socket, register struct rx_packet *p, afs_uint32 * hos if (p->header.type > 0 && p->header.type < RX_N_PACKET_TYPES) { struct rx_peer *peer; if (rx_stats_active) - rx_MutexIncrement(rx_stats.packetsRead[p->header.type - 1], rx_stats_mutex); + rx_AtomicIncrement(rx_stats.packetsRead[p->header.type - 1], rx_stats_mutex); /* * Try to look up this peer structure. If it doesn't exist, * don't create a new one - @@ -1521,7 +1521,7 @@ rxi_ReadPacket(osi_socket socket, register struct rx_packet *p, afs_uint32 * hos * it may have no refCount, meaning we could race with * ReapConnections */ - if (peer && (peer->refCount > 0)) { + if (peer && (rx_AtomicPeek_NL(peer->refCount) > 0)) { MUTEX_ENTER(&peer->peer_lock); hadd32(peer->bytesReceived, p->length); MUTEX_EXIT(&peer->peer_lock); @@ -1984,7 +1984,7 @@ rxi_ReceiveDebugPacket(register struct rx_packet *ap, osi_socket asocket, tpeer.port = tp->port; tpeer.ifMTU = htons(tp->ifMTU); tpeer.idleWhen = htonl(tp->idleWhen); - tpeer.refCount = htons(tp->refCount); + tpeer.refCount = htons(rx_AtomicPeek_NL(tp->refCount)); tpeer.burstSize = tp->burstSize; tpeer.burst = tp->burst; tpeer.burstWait.sec = htonl(tp->burstWait.sec); @@ -2273,7 +2273,7 @@ rxi_SendPacket(struct rx_call *call, struct rx_connection *conn, p->length + RX_HEADER_SIZE, istack)) != 0) { /* send failed, so let's hurry up the resend, eh? */ if (rx_stats_active) - rx_MutexIncrement(rx_stats.netSendFailures, rx_stats_mutex); + rx_AtomicIncrement(rx_stats.netSendFailures, rx_stats_mutex); p->retryTime = p->timeSent; /* resend it very soon */ clock_Addmsec(&(p->retryTime), 10 + (((afs_uint32) p->backoff) << 8)); @@ -2314,7 +2314,7 @@ rxi_SendPacket(struct rx_call *call, struct rx_connection *conn, dpf(("%c %d %s: %x.%u.%u.%u.%u.%u.%u flags %d, packet %lx resend %d.%0.3d len %d", deliveryType, p->header.serial, rx_packetTypes[p->header.type - 1], ntohl(peer->host), ntohs(peer->port), p->header.serial, p->header.epoch, p->header.cid, p->header.callNumber, p->header.seq, p->header.flags, (unsigned long)p, p->retryTime.sec, p->retryTime.usec / 1000, p->length)); #endif if (rx_stats_active) - rx_MutexIncrement(rx_stats.packetsSent[p->header.type - 1], rx_stats_mutex); + rx_AtomicIncrement(rx_stats.packetsSent[p->header.type - 1], rx_stats_mutex); MUTEX_ENTER(&peer->peer_lock); hadd32(peer->bytesSent, p->length); MUTEX_EXIT(&peer->peer_lock); @@ -2460,7 +2460,7 @@ rxi_SendPacketList(struct rx_call *call, struct rx_connection *conn, istack)) != 0) { /* send failed, so let's hurry up the resend, eh? */ if (rx_stats_active) - rx_MutexIncrement(rx_stats.netSendFailures, rx_stats_mutex); + rx_AtomicIncrement(rx_stats.netSendFailures, rx_stats_mutex); for (i = 0; i < len; i++) { p = list[i]; p->retryTime = p->timeSent; /* resend it very soon */ @@ -2498,7 +2498,7 @@ rxi_SendPacketList(struct rx_call *call, struct rx_connection *conn, #endif if (rx_stats_active) - rx_MutexIncrement(rx_stats.packetsSent[p->header.type - 1], rx_stats_mutex); + rx_AtomicIncrement(rx_stats.packetsSent[p->header.type - 1], rx_stats_mutex); MUTEX_ENTER(&peer->peer_lock); hadd32(peer->bytesSent, p->length); MUTEX_EXIT(&peer->peer_lock); diff --git a/src/rx/rx_user.c b/src/rx/rx_user.c index 1a059d90a..000caf77f 100644 --- a/src/rx/rx_user.c +++ b/src/rx/rx_user.c @@ -200,9 +200,7 @@ rxi_GetHostUDPSocket(u_int ahost, u_short port) (osi_Msg "%s*WARNING* Unable to increase buffering on socket\n", name); if (rx_stats_active) { - MUTEX_ENTER(&rx_stats_mutex); - rx_stats.socketGreedy = greedy; - MUTEX_EXIT(&rx_stats_mutex); + rx_AtomicSwap(&rx_stats.socketGreedy, greedy, rx_stats_mutex); } } -- 2.39.5