From 615eb81a496d20a3532f0a68e56781b2171ad4ac Mon Sep 17 00:00:00 2001 From: Andrew Deason Date: Wed, 9 Jun 2010 12:45:57 -0500 Subject: [PATCH] ubik: Drop dbase versionLock during I/O and sleeps Currently we hold versionLock during all ubik network I/O and while we are sleeping for whatever reason. For pthreaded ubik, to allow other things to happen during those times, drop the lock and reacquire when we hit the net or sleep. Change-Id: I414b3adfadc0bb506b98e8677ec5dfbb269e0129 Reviewed-on: http://gerrit.openafs.org/2107 Reviewed-by: Derrick Brashear Tested-by: Derrick Brashear --- src/ubik/ubik.c | 100 +++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 91 insertions(+), 9 deletions(-) diff --git a/src/ubik/ubik.c b/src/ubik/ubik.c index b8088a568..4b608c9cd 100644 --- a/src/ubik/ubik.c +++ b/src/ubik/ubik.c @@ -94,6 +94,30 @@ struct rx_securityClass *ubik_sc[3]; #define CStampVersion 1 /* meaning set ts->version */ +static_inline struct rx_connection * +Quorum_StartIO(struct ubik_trans *atrans, struct ubik_server *as) +{ + struct rx_connection *conn; + + conn = as->disk_rxcid; + +#ifdef AFS_PTHREAD_ENV + rx_GetConnection(conn); + DBRELE(atrans->dbase); +#endif /* AFS_PTHREAD_ENV */ + + return conn; +} + +static_inline void +Quorum_EndIO(struct ubik_trans *atrans, struct rx_connection *aconn) +{ +#ifdef AFS_PTHREAD_ENV + DBHOLD(atrans->dbase); + rx_PutConnection(aconn); +#endif /* AFS_PTHREAD_ENV */ +} + /*! * \brief Perform an operation at a quorum, handling error conditions. * \return 0 if all worked and a quorum was contacted successfully @@ -115,6 +139,7 @@ ContactQuorum_NoArguments(afs_int32 (*proc)(struct rx_connection *, ubik_tid *), struct ubik_server *ts; afs_int32 code; afs_int32 rcode, okcalls; + struct rx_connection *conn; rcode = 0; okcalls = 0; @@ -124,7 +149,14 @@ ContactQuorum_NoArguments(afs_int32 (*proc)(struct rx_connection *, ubik_tid *), ts->currentDB = 0; /* db is no longer current; we just missed an update */ continue; /* not up-to-date, don't bother */ } - code = (*proc)(ts->disk_rxcid, &atrans->tid); + + conn = Quorum_StartIO(atrans, ts); + + code = (*proc)(conn, &atrans->tid); + + Quorum_EndIO(atrans, conn); + conn = NULL; + if (code) { /* failure */ rcode = code; ts->up = 0; /* mark as down now; beacons will no longer be sent */ @@ -153,6 +185,7 @@ ContactQuorum_DISK_Lock(struct ubik_trans *atrans, int aflags,afs_int32 file, struct ubik_server *ts; afs_int32 code; afs_int32 rcode, okcalls; + struct rx_connection *conn; rcode = 0; okcalls = 0; @@ -162,8 +195,14 @@ ContactQuorum_DISK_Lock(struct ubik_trans *atrans, int aflags,afs_int32 file, ts->currentDB = 0; /* db is no longer current; we just missed an update */ continue; /* not up-to-date, don't bother */ } - code = DISK_Lock(ts->disk_rxcid, &atrans->tid, file, position, length, - type); + + conn = Quorum_StartIO(atrans, ts); + + code = DISK_Lock(conn, &atrans->tid, file, position, length, type); + + Quorum_EndIO(atrans, conn); + conn = NULL; + if (code) { /* failure */ rcode = code; ts->up = 0; /* mark as down now; beacons will no longer be sent */ @@ -192,6 +231,7 @@ ContactQuorum_DISK_Write(struct ubik_trans *atrans, int aflags, struct ubik_server *ts; afs_int32 code; afs_int32 rcode, okcalls; + struct rx_connection *conn; rcode = 0; okcalls = 0; @@ -201,7 +241,14 @@ ContactQuorum_DISK_Write(struct ubik_trans *atrans, int aflags, ts->currentDB = 0; /* db is no longer current; we just missed an update */ continue; /* not up-to-date, don't bother */ } - code = DISK_Write(ts->disk_rxcid, &atrans->tid, file, position, data); + + conn = Quorum_StartIO(atrans, ts); + + code = DISK_Write(conn, &atrans->tid, file, position, data); + + Quorum_EndIO(atrans, conn); + conn = NULL; + if (code) { /* failure */ rcode = code; ts->up = 0; /* mark as down now; beacons will no longer be sent */ @@ -230,6 +277,7 @@ ContactQuorum_DISK_Truncate(struct ubik_trans *atrans, int aflags, struct ubik_server *ts; afs_int32 code; afs_int32 rcode, okcalls; + struct rx_connection *conn; rcode = 0; okcalls = 0; @@ -239,7 +287,14 @@ ContactQuorum_DISK_Truncate(struct ubik_trans *atrans, int aflags, ts->currentDB = 0; /* db is no longer current; we just missed an update */ continue; /* not up-to-date, don't bother */ } - code = DISK_Truncate(ts->disk_rxcid, &atrans->tid, file, length); + + conn = Quorum_StartIO(atrans, ts); + + code = DISK_Truncate(conn, &atrans->tid, file, length); + + Quorum_EndIO(atrans, conn); + conn = NULL; + if (code) { /* failure */ rcode = code; ts->up = 0; /* mark as down now; beacons will no longer be sent */ @@ -268,6 +323,7 @@ ContactQuorum_DISK_WriteV(struct ubik_trans *atrans, int aflags, struct ubik_server *ts; afs_int32 code; afs_int32 rcode, okcalls; + struct rx_connection *conn; rcode = 0; okcalls = 0; @@ -278,7 +334,12 @@ ContactQuorum_DISK_WriteV(struct ubik_trans *atrans, int aflags, continue; /* not up-to-date, don't bother */ } - code = DISK_WriteV(ts->disk_rxcid, &atrans->tid, io_vector, io_buffer); + conn = Quorum_StartIO(atrans, ts); + + code = DISK_WriteV(conn, &atrans->tid, io_vector, io_buffer); + + Quorum_EndIO(atrans, conn); + conn = NULL; if ((code <= -450) && (code > -500)) { /* An RPC interface mismatch (as defined in comerr/error_msg.c). @@ -291,6 +352,8 @@ ContactQuorum_DISK_WriteV(struct ubik_trans *atrans, int aflags, bulkdata tcbs; afs_int32 i, offset; + conn = Quorum_StartIO(atrans, ts); + for (i = 0, offset = 0; i < io_vector->iovec_wrt_len; i++) { /* Sanity check for going off end of buffer */ if ((offset + iovec[i].length) > io_buffer->iovec_buf_len) { @@ -299,14 +362,18 @@ ContactQuorum_DISK_WriteV(struct ubik_trans *atrans, int aflags, } tcbs.bulkdata_len = iovec[i].length; tcbs.bulkdata_val = &iobuf[offset]; + code = - DISK_Write(ts->disk_rxcid, &atrans->tid, iovec[i].file, + DISK_Write(conn, &atrans->tid, iovec[i].file, iovec[i].position, &tcbs); if (code) break; offset += iovec[i].length; } + + Quorum_EndIO(atrans, conn); + conn = NULL; } if (code) { /* failure */ @@ -338,6 +405,7 @@ ContactQuorum_DISK_SetVersion(struct ubik_trans *atrans, int aflags, struct ubik_server *ts; afs_int32 code; afs_int32 rcode, okcalls; + struct rx_connection *conn; rcode = 0; okcalls = 0; @@ -347,8 +415,14 @@ ContactQuorum_DISK_SetVersion(struct ubik_trans *atrans, int aflags, ts->currentDB = 0; /* db is no longer current; we just missed an update */ continue; /* not up-to-date, don't bother */ } - code = DISK_SetVersion(ts->disk_rxcid, &atrans->tid, OldVersion, - NewVersion); + + conn = Quorum_StartIO(atrans, ts); + + code = DISK_SetVersion(conn, &atrans->tid, OldVersion, NewVersion); + + Quorum_EndIO(atrans, conn); + conn = NULL; + if (code) { /* failure */ rcode = code; ts->up = 0; /* mark as down now; beacons will no longer be sent */ @@ -947,15 +1021,23 @@ ubik_EndTrans(struct ubik_trans *transPtr) } for (ts = ubik_servers; ts; ts = ts->next) { if (!ts->beaconSinceDown && now <= ts->lastBeaconSent + BIGTIME) { + /* this guy could have some damaged data, wait for him */ code = 1; tv.tv_sec = 1; /* try again after a while (ha ha) */ tv.tv_usec = 0; + #ifdef AFS_PTHREAD_ENV + /* we could release the dbase outside of the loop, but we do + * it here, in the loop, to avoid an unnecessary RELE/HOLD + * if all sites are up */ + DBRELE(dbase); select(0, 0, 0, 0, &tv); + DBHOLD(dbase); #else IOMGR_Select(0, 0, 0, 0, &tv); /* poll, should we wait on something? */ #endif + break; } } -- 2.39.5