From 249d65e72592b707fcc794be5f8af10a67c55e26 Mon Sep 17 00:00:00 2001 From: Marc Dionne Date: Sat, 22 Jan 2011 16:04:47 -0500 Subject: [PATCH] ubik: Abstract common code in ContactQuorum* functions These functions are mostly identical except for the arguments and the called operation. Move as much common code as possible to an iterator function and a return code check function. The DISK_WriteV case is treated a bit differently since it can fall back to using DISK_Write. This reduces code duplication and shoud simplify maintenance and future changes. There should be no functional changes. Change-Id: If2f88c670da47eaad4baa81975ecc307013f5ce8 Reviewed-on: http://gerrit.openafs.org/3970 Tested-by: BuildBot Reviewed-by: Andrew Deason Reviewed-by: Jeffrey Altman Reviewed-by: Derrick Brashear --- src/ubik/ubik.c | 405 ++++++++++++++++++------------------------------ 1 file changed, 150 insertions(+), 255 deletions(-) diff --git a/src/ubik/ubik.c b/src/ubik/ubik.c index 16584b60c..da5fa84b5 100644 --- a/src/ubik/ubik.c +++ b/src/ubik/ubik.c @@ -122,6 +122,68 @@ Quorum_EndIO(struct ubik_trans *atrans, struct rx_connection *aconn) #endif /* AFS_PTHREAD_ENV */ } + +/* + * Iterate over all servers. Callers pass in *ts which is used to track + * the current server. + * - Returns 1 if there are no more servers + * - Returns 0 with conn set to the connection for the current server if + * it's up and current + */ +static int +ContactQuorum_iterate(struct ubik_trans *atrans, int aflags, struct ubik_server **ts, + struct rx_connection **conn, afs_int32 *rcode, + afs_int32 *okcalls, afs_int32 code) +{ + if (!*ts) { + /* Initial call - start iterating over servers */ + *ts = ubik_servers; + *conn = NULL; + *rcode = 0; + *okcalls = 0; + } else { + if (*conn) { + Quorum_EndIO(atrans, *conn); + *conn = NULL; + if (code) { /* failure */ + *rcode = code; + (*ts)->up = 0; /* mark as down now; beacons will no longer be sent */ + (*ts)->beaconSinceDown = 0; + (*ts)->currentDB = 0; + urecovery_LostServer(*ts); /* tell recovery to try to resend dbase later */ + } else { /* success */ + if (!(*ts)->isClone) + (*okcalls)++; /* count up how many worked */ + if (aflags & CStampVersion) { + (*ts)->version = atrans->dbase->version; + } + } + } + *ts = (*ts)->next; + } + if (!(*ts)) + return 1; + if (!(*ts)->up || !(*ts)->currentDB) { + (*ts)->currentDB = 0; /* db is no longer current; we just missed an update */ + return 0; /* not up-to-date, don't bother. NULL conn will tell caller not to use */ + } + *conn = Quorum_StartIO(atrans, *ts); + return 0; +} + +static int +ContactQuorum_rcode(int okcalls, afs_int32 rcode) +{ + /* + * return 0 if we successfully contacted a quorum, otherwise return error code. + * We don't have to contact ourselves (that was done locally) + */ + if (okcalls + 1 >= ubik_quorum) + return 0; + else + return rcode; +} + /*! * \brief Perform an operation at a quorum, handling error conditions. * \return 0 if all worked and a quorum was contacted successfully @@ -140,314 +202,147 @@ afs_int32 ContactQuorum_NoArguments(afs_int32 (*proc)(struct rx_connection *, ubik_tid *), struct ubik_trans *atrans, int aflags) { - struct ubik_server *ts; - afs_int32 code; - afs_int32 rcode, okcalls; + struct ubik_server *ts = NULL; + afs_int32 code = 0, rcode, okcalls; struct rx_connection *conn; + int done; - rcode = 0; - okcalls = 0; - for (ts = ubik_servers; ts; ts = ts->next) { - /* for each server */ - if (!ts->up || !ts->currentDB) { - ts->currentDB = 0; /* db is no longer current; we just missed an update */ - continue; /* not up-to-date, don't bother */ - } - - conn = Quorum_StartIO(atrans, ts); - - code = (*proc)(conn, &atrans->tid); - - Quorum_EndIO(atrans, conn); - conn = NULL; - - if (code) { /* failure */ - rcode = code; - ts->up = 0; /* mark as down now; beacons will no longer be sent */ - ts->currentDB = 0; - ts->beaconSinceDown = 0; - urecovery_LostServer(ts); /* tell recovery to try to resend dbase later */ - } else { /* success */ - if (!ts->isClone) - okcalls++; /* count up how many worked */ - if (aflags & CStampVersion) { - ts->version = atrans->dbase->version; - } - } + done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code); + while (!done) { + if (conn) + code = (*proc)(conn, &atrans->tid); + done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code); } - /* return 0 if we successfully contacted a quorum, otherwise return error code. We don't have to contact ourselves (that was done locally) */ - if (okcalls + 1 >= ubik_quorum) - return 0; - else - return rcode; + return ContactQuorum_rcode(okcalls, rcode); } + afs_int32 ContactQuorum_DISK_Lock(struct ubik_trans *atrans, int aflags,afs_int32 file, afs_int32 position, afs_int32 length, afs_int32 type) { - struct ubik_server *ts; - afs_int32 code; - afs_int32 rcode, okcalls; + struct ubik_server *ts = NULL; + afs_int32 code = 0, rcode, okcalls; struct rx_connection *conn; + int done; - rcode = 0; - okcalls = 0; - for (ts = ubik_servers; ts; ts = ts->next) { - /* for each server */ - if (!ts->up || !ts->currentDB) { - ts->currentDB = 0; /* db is no longer current; we just missed an update */ - continue; /* not up-to-date, don't bother */ - } - - conn = Quorum_StartIO(atrans, ts); - - code = DISK_Lock(conn, &atrans->tid, file, position, length, type); - - Quorum_EndIO(atrans, conn); - conn = NULL; - - if (code) { /* failure */ - rcode = code; - ts->up = 0; /* mark as down now; beacons will no longer be sent */ - ts->currentDB = 0; - ts->beaconSinceDown = 0; - urecovery_LostServer(ts); /* tell recovery to try to resend dbase later */ - } else { /* success */ - if (!ts->isClone) - okcalls++; /* count up how many worked */ - if (aflags & CStampVersion) { - ts->version = atrans->dbase->version; - } - } + done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code); + while (!done) { + if (conn) + code = DISK_Lock(conn, &atrans->tid, file, position, length, type); + done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code); } - /* return 0 if we successfully contacted a quorum, otherwise return error code. We don't have to contact ourselves (that was done locally) */ - if (okcalls + 1 >= ubik_quorum) - return 0; - else - return rcode; + return ContactQuorum_rcode(okcalls, rcode); } + afs_int32 ContactQuorum_DISK_Write(struct ubik_trans *atrans, int aflags, afs_int32 file, afs_int32 position, bulkdata *data) { - struct ubik_server *ts; - afs_int32 code; - afs_int32 rcode, okcalls; + struct ubik_server *ts = NULL; + afs_int32 code = 0, rcode, okcalls; struct rx_connection *conn; + int done; - rcode = 0; - okcalls = 0; - for (ts = ubik_servers; ts; ts = ts->next) { - /* for each server */ - if (!ts->up || !ts->currentDB) { - ts->currentDB = 0; /* db is no longer current; we just missed an update */ - continue; /* not up-to-date, don't bother */ - } - - conn = Quorum_StartIO(atrans, ts); - - code = DISK_Write(conn, &atrans->tid, file, position, data); - - Quorum_EndIO(atrans, conn); - conn = NULL; - - if (code) { /* failure */ - rcode = code; - ts->up = 0; /* mark as down now; beacons will no longer be sent */ - ts->currentDB = 0; - ts->beaconSinceDown = 0; - urecovery_LostServer(ts); /* tell recovery to try to resend dbase later */ - } else { /* success */ - if (!ts->isClone) - okcalls++; /* count up how many worked */ - if (aflags & CStampVersion) { - ts->version = atrans->dbase->version; - } - } + done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code); + while (!done) { + if (conn) + code = DISK_Write(conn, &atrans->tid, file, position, data); + done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code); } - /* return 0 if we successfully contacted a quorum, otherwise return error code. We don't have to contact ourselves (that was done locally) */ - if (okcalls + 1 >= ubik_quorum) - return 0; - else - return rcode; + return ContactQuorum_rcode(okcalls, rcode); } + afs_int32 ContactQuorum_DISK_Truncate(struct ubik_trans *atrans, int aflags, afs_int32 file, afs_int32 length) { - struct ubik_server *ts; - afs_int32 code; - afs_int32 rcode, okcalls; + struct ubik_server *ts = NULL; + afs_int32 code = 0, rcode, okcalls; struct rx_connection *conn; + int done; - rcode = 0; - okcalls = 0; - for (ts = ubik_servers; ts; ts = ts->next) { - /* for each server */ - if (!ts->up || !ts->currentDB) { - ts->currentDB = 0; /* db is no longer current; we just missed an update */ - continue; /* not up-to-date, don't bother */ - } - - conn = Quorum_StartIO(atrans, ts); - - code = DISK_Truncate(conn, &atrans->tid, file, length); - - Quorum_EndIO(atrans, conn); - conn = NULL; - - if (code) { /* failure */ - rcode = code; - ts->up = 0; /* mark as down now; beacons will no longer be sent */ - ts->currentDB = 0; - ts->beaconSinceDown = 0; - urecovery_LostServer(ts); /* tell recovery to try to resend dbase later */ - } else { /* success */ - if (!ts->isClone) - okcalls++; /* count up how many worked */ - if (aflags & CStampVersion) { - ts->version = atrans->dbase->version; - } - } + done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code); + while (!done) { + if (conn) + code = DISK_Truncate(conn, &atrans->tid, file, length); + done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code); } - /* return 0 if we successfully contacted a quorum, otherwise return error code. We don't have to contact ourselves (that was done locally) */ - if (okcalls + 1 >= ubik_quorum) - return 0; - else - return rcode; + return ContactQuorum_rcode(okcalls, rcode); } + afs_int32 ContactQuorum_DISK_WriteV(struct ubik_trans *atrans, int aflags, iovec_wrt * io_vector, iovec_buf *io_buffer) { - struct ubik_server *ts; - afs_int32 code; - afs_int32 rcode, okcalls; + struct ubik_server *ts = NULL; + afs_int32 code = 0, rcode, okcalls; struct rx_connection *conn; - - rcode = 0; - okcalls = 0; - for (ts = ubik_servers; ts; ts = ts->next) { - /* for each server */ - if (!ts->up || !ts->currentDB) { - ts->currentDB = 0; /* db is no longer current; we just missed an update */ - continue; /* not up-to-date, don't bother */ - } - - conn = Quorum_StartIO(atrans, ts); - - code = DISK_WriteV(conn, &atrans->tid, io_vector, io_buffer); - - Quorum_EndIO(atrans, conn); - conn = NULL; - - if ((code <= -450) && (code > -500)) { - /* An RPC interface mismatch (as defined in comerr/error_msg.c). - * Un-bulk the entries and do individual DISK_Write calls - * instead of DISK_WriteV. - */ - struct ubik_iovec *iovec = - (struct ubik_iovec *)io_vector->iovec_wrt_val; - char *iobuf = (char *)io_buffer->iovec_buf_val; - bulkdata tcbs; - afs_int32 i, offset; - - conn = Quorum_StartIO(atrans, ts); - - for (i = 0, offset = 0; i < io_vector->iovec_wrt_len; i++) { - /* Sanity check for going off end of buffer */ - if ((offset + iovec[i].length) > io_buffer->iovec_buf_len) { - code = UINTERNAL; - break; + int done; + + done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code); + while (!done) { + if (conn) { + code = DISK_WriteV(conn, &atrans->tid, io_vector, io_buffer); + if ((code <= -450) && (code > -500)) { + /* An RPC interface mismatch (as defined in comerr/error_msg.c). + * Un-bulk the entries and do individual DISK_Write calls + * instead of DISK_WriteV. + */ + struct ubik_iovec *iovec = + (struct ubik_iovec *)io_vector->iovec_wrt_val; + char *iobuf = (char *)io_buffer->iovec_buf_val; + bulkdata tcbs; + afs_int32 i, offset; + + Quorum_EndIO(atrans, conn); + conn = Quorum_StartIO(atrans, ts); + + for (i = 0, offset = 0; i < io_vector->iovec_wrt_len; i++) { + /* Sanity check for going off end of buffer */ + if ((offset + iovec[i].length) > io_buffer->iovec_buf_len) { + code = UINTERNAL; + break; + } + tcbs.bulkdata_len = iovec[i].length; + tcbs.bulkdata_val = &iobuf[offset]; + code = DISK_Write(conn, &atrans->tid, iovec[i].file, + iovec[i].position, &tcbs); + if (code) + break; + offset += iovec[i].length; } - tcbs.bulkdata_len = iovec[i].length; - tcbs.bulkdata_val = &iobuf[offset]; - - code = - DISK_Write(conn, &atrans->tid, iovec[i].file, - iovec[i].position, &tcbs); - if (code) - break; - - offset += iovec[i].length; - } - - Quorum_EndIO(atrans, conn); - conn = NULL; - } - - if (code) { /* failure */ - rcode = code; - ts->up = 0; /* mark as down now; beacons will no longer be sent */ - ts->currentDB = 0; - ts->beaconSinceDown = 0; - urecovery_LostServer(ts); /* tell recovery to try to resend dbase later */ - } else { /* success */ - if (!ts->isClone) - okcalls++; /* count up how many worked */ - if (aflags & CStampVersion) { - ts->version = atrans->dbase->version; } } + done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code); } - /* return 0 if we successfully contacted a quorum, otherwise return error code. We don't have to contact ourselves (that was done locally) */ - if (okcalls + 1 >= ubik_quorum) - return 0; - else - return rcode; + return ContactQuorum_rcode(okcalls, rcode); } + afs_int32 ContactQuorum_DISK_SetVersion(struct ubik_trans *atrans, int aflags, ubik_version *OldVersion, ubik_version *NewVersion) { - struct ubik_server *ts; - afs_int32 code; - afs_int32 rcode, okcalls; + struct ubik_server *ts = NULL; + afs_int32 code = 0, rcode, okcalls; struct rx_connection *conn; + int done; - rcode = 0; - okcalls = 0; - for (ts = ubik_servers; ts; ts = ts->next) { - /* for each server */ - if (!ts->up || !ts->currentDB) { - ts->currentDB = 0; /* db is no longer current; we just missed an update */ - continue; /* not up-to-date, don't bother */ - } - - conn = Quorum_StartIO(atrans, ts); - - code = DISK_SetVersion(conn, &atrans->tid, OldVersion, NewVersion); - - Quorum_EndIO(atrans, conn); - conn = NULL; - - if (code) { /* failure */ - rcode = code; - ts->up = 0; /* mark as down now; beacons will no longer be sent */ - ts->currentDB = 0; - ts->beaconSinceDown = 0; - urecovery_LostServer(ts); /* tell recovery to try to resend dbase later */ - } else { /* success */ - if (!ts->isClone) - okcalls++; /* count up how many worked */ - if (aflags & CStampVersion) { - ts->version = atrans->dbase->version; - } - } + done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code); + while (!done) { + if (conn) + code = DISK_SetVersion(conn, &atrans->tid, OldVersion, NewVersion); + done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code); } - /* return 0 if we successfully contacted a quorum, otherwise return error code. We don't have to contact ourselves (that was done locally) */ - if (okcalls + 1 >= ubik_quorum) - return 0; - else - return rcode; + return ContactQuorum_rcode(okcalls, rcode); } + /*! * \brief This routine initializes the ubik system for a set of servers. * \return 0 for success, or an error code on failure. -- 2.39.5