Currently, when we write ubik i/o operations to the db log, we tend to
issue several syscalls involving small writes and fstat()s. This is
because each "log" operation involves at least one write, and each log
operation tends to be pretty small.
Each logged operation hitting disk separately is unnecessary, since
the db log does not need to hit the disk at all until we are ready to
commit the transaction. So to reduce the number of syscalls when
writing to the db, change our log writes to be buffered in memory
(using stdio calls). This also avoids needing to fstat() the
underlying log file, since we open the underlying file in append-only
mode, since we only ever append to (and truncate) the log file.
To implement this, we introduce a new 'buffered_append' phys
operation, to explicitly separate our buffered and non-buffered
operations, to try to avoid any bugs from mixing buffered and
non-buffered i/o. This new operation is only used for the db log.
Reviewed-on: https://gerrit.openafs.org/13070
Reviewed-by: Mark Vitale <mvitale@sinenomine.net>
Reviewed-by: Michael Meffie <mmeffie@sinenomine.net>
Reviewed-by: Joe Gorse <jhgorse@gmail.com>
Reviewed-by: Benjamin Kaduk <kaduk@mit.edu>
Reviewed-by: Marcio Brito Barbosa <mbarbosa@sinenomine.net>
Tested-by: BuildBot <buildbot@rampaginggeek.com>
(cherry picked from commit
800318b43fdf461ad95cd7f3940718f3f0a609a7)
Change-Id: Ia40d75e7bdeb6a9f6c316aaea6fd20d5c8d80625
Reviewed-on: https://gerrit.openafs.org/13353
Reviewed-by: Michael Meffie <mmeffie@sinenomine.net>
Reviewed-by: Mark Vitale <mvitale@sinenomine.net>
Reviewed-by: Marcio Brito Barbosa <mbarbosa@sinenomine.net>
Reviewed-by: Cheyenne Wills <cwills@sinenomine.net>
Tested-by: BuildBot <buildbot@rampaginggeek.com>
Reviewed-by: Stephan Wiesand <stephan.wiesand@desy.de>
static int
udisk_LogOpcode(struct ubik_dbase *adbase, afs_int32 aopcode, int async)
{
- struct ubik_stat ustat;
afs_int32 code;
- /* figure out where to write */
- code = (*adbase->stat) (adbase, LOGFILE, &ustat);
- if (code < 0)
- return code;
-
/* setup data and do write */
aopcode = htonl(aopcode);
- code =
- (*adbase->write) (adbase, LOGFILE, (char *)&aopcode, ustat.size,
- sizeof(afs_int32));
+ code = (*adbase->buffered_append)(adbase, LOGFILE, &aopcode, sizeof(afs_int32));
if (code != sizeof(afs_int32))
return UIOERROR;
{
afs_int32 code;
afs_int32 data[3];
- struct ubik_stat ustat;
-
- /* figure out where to write */
- code = (*adbase->stat) (adbase, LOGFILE, &ustat);
- if (code)
- return code;
/* setup data */
data[0] = htonl(LOGEND);
/* do write */
code =
- (*adbase->write) (adbase, LOGFILE, (char *)data, ustat.size,
- 3 * sizeof(afs_int32));
+ (*adbase->buffered_append)(adbase, LOGFILE, data, 3 * sizeof(afs_int32));
if (code != 3 * sizeof(afs_int32))
return UIOERROR;
{
afs_int32 code;
afs_int32 data[3];
- struct ubik_stat ustat;
-
- /* figure out where to write */
- code = (*adbase->stat) (adbase, LOGFILE, &ustat);
- if (code < 0)
- return code;
/* setup data */
data[0] = htonl(LOGTRUNCATE);
/* do write */
code =
- (*adbase->write) (adbase, LOGFILE, (char *)data, ustat.size,
- 3 * sizeof(afs_int32));
+ (*adbase->buffered_append)(adbase, LOGFILE, data, 3 * sizeof(afs_int32));
if (code != 3 * sizeof(afs_int32))
return UIOERROR;
return 0;
udisk_LogWriteData(struct ubik_dbase *adbase, afs_int32 afile, void *abuffer,
afs_int32 apos, afs_int32 alen)
{
- struct ubik_stat ustat;
afs_int32 code;
afs_int32 data[4];
- afs_int32 lpos;
-
- /* find end of log */
- code = (*adbase->stat) (adbase, LOGFILE, &ustat);
- lpos = ustat.size;
- if (code < 0)
- return code;
/* setup header */
data[0] = htonl(LOGDATA);
/* write header */
code =
- (*adbase->write) (adbase, LOGFILE, (char *)data, lpos, 4 * sizeof(afs_int32));
+ (*adbase->buffered_append)(adbase, LOGFILE, data, 4 * sizeof(afs_int32));
if (code != 4 * sizeof(afs_int32))
return UIOERROR;
- lpos += 4 * sizeof(afs_int32);
/* write data */
- code = (*adbase->write) (adbase, LOGFILE, abuffer, lpos, alen);
+ code = (*adbase->buffered_append)(adbase, LOGFILE, abuffer, alen);
if (code != alen)
return UIOERROR;
return 0;
int refCount;
} fdcache[MAXFDCACHE];
+/* Cache a stdio handle for a given database file, for uphys_buf_append
+ * operations. We only use buf_append for one file at a time, so only try to
+ * cache a single file handle, since that's all we should need. */
+static struct buf_fdcache {
+ int fileID;
+ FILE *stream;
+} buf_fdcache;
+
static char pbuffer[1024];
+static int uphys_buf_flush(struct ubik_dbase *adbase, afs_int32 afid);
+
/*!
* \warning Beware, when using this function, of the header in front of most files.
*/
afs_int32 asize)
{
afs_int32 code, fd;
+
+ /* Just in case there's memory-buffered data for this file, flush it before
+ * truncating. */
+ if (uphys_buf_flush(adbase, afile) < 0) {
+ return UIOERROR;
+ }
+
fd = uphys_open(adbase, afile);
if (fd < 0)
return UNOENT;
uphys_sync(struct ubik_dbase *adbase, afs_int32 afile)
{
afs_int32 code, fd;
+
+ /* Flush any in-memory data, so we can sync it. */
+ if (uphys_buf_flush(adbase, afile) < 0) {
+ return -1;
+ }
+
fd = uphys_open(adbase, afile);
code = fsync(fd);
uphys_close(fd);
}
}
}
+
+static FILE *
+uphys_buf_append_open(struct ubik_dbase *adbase, afs_int32 afid)
+{
+ /* If we have a cached handle open for this file, just return it. */
+ if (buf_fdcache.stream && buf_fdcache.fileID == afid) {
+ return buf_fdcache.stream;
+ }
+
+ /* Otherwise, close the existing handle, and open a new handle for the
+ * given file. */
+
+ if (buf_fdcache.stream) {
+ fclose(buf_fdcache.stream);
+ }
+
+ snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d", adbase->pathName,
+ (afid<0)?"SYS":"", (afid<0)?-afid:afid);
+ buf_fdcache.stream = fopen(pbuffer, "a");
+ buf_fdcache.fileID = afid;
+ return buf_fdcache.stream;
+}
+
+static int
+uphys_buf_flush(struct ubik_dbase *adbase, afs_int32 afid)
+{
+ if (buf_fdcache.stream && buf_fdcache.fileID == afid) {
+ int code = fflush(buf_fdcache.stream);
+ if (code) {
+ return -1;
+ }
+ }
+ return 0;
+}
+
+/* Append the given data to the given database file, allowing the data to be
+ * buffered in memory. */
+int
+uphys_buf_append(struct ubik_dbase *adbase, afs_int32 afid, void *adata,
+ afs_int32 alength)
+{
+ FILE *stream;
+ size_t code;
+
+ stream = uphys_buf_append_open(adbase, afid);
+ if (!stream) {
+ return -1;
+ }
+
+ code = fwrite(adata, alength, 1, stream);
+ if (code != 1) {
+ return -1;
+ }
+ return alength;
+}
tdb->getlabel = uphys_getlabel;
tdb->setlabel = uphys_setlabel;
tdb->getnfiles = uphys_getnfiles;
+ tdb->buffered_append = uphys_buf_append;
tdb->readers = 0;
tdb->tidCounter = tdb->writeTidCounter = 0;
*dbase = tdb;
int (*setlabel) (struct ubik_dbase * adbase, afs_int32 afile, struct ubik_version * aversion); /*!< set the version label */
int (*getlabel) (struct ubik_dbase * adbase, afs_int32 afile, struct ubik_version * aversion); /*!< retrieve the version label */
int (*getnfiles) (struct ubik_dbase * adbase); /*!< find out number of files */
+ int (*buffered_append)(struct ubik_dbase *adbase, afs_int32 afid, void *adata, afs_int32 alength);
short readers; /*!< number of current read transactions */
struct ubik_version cachedVersion; /*!< version of caller's cached data */
struct Lock cache_lock; /*!< protects cached application data */
extern int uphys_sync(struct ubik_dbase *adbase, afs_int32 afile);
extern void uphys_invalidate(struct ubik_dbase *adbase,
afs_int32 afid);
+extern int uphys_buf_append(struct ubik_dbase *adbase, afs_int32 afid,
+ void *buf, afs_int32 alength);
/*! \name recovery.c */
extern int urecovery_ResetState(void);