]> git.wh0rd.org - dump.git/blobdiff - dump/tape.c
Use sys_clone under Linux to share I/O contexts between dump processes.
[dump.git] / dump / tape.c
index 6f61210c6da4f0957b31d7c6286a7f408d044fbf..edcf5d691925a6532dd71e8b30f75514d8190109 100644 (file)
@@ -37,7 +37,7 @@
 
 #ifndef lint
 static const char rcsid[] =
-       "$Id: tape.c,v 1.74 2003/03/30 15:40:37 stelian Exp $";
+       "$Id: tape.c,v 1.91 2009/06/18 09:50:54 stelian Exp $";
 #endif /* not lint */
 
 #include <config.h>
@@ -60,6 +60,11 @@ int    write(), read();
 #ifdef __linux__
 #include <sys/types.h>
 #include <sys/time.h>
+#include <sys/ioctl.h>
+#include <sys/mount.h> /* for definition of BLKFLSBUF */
+#ifndef BLKFLSBUF      /* last resort... */
+#define BLKFLSBUF _IO(0x12, 97) /* Flush buffer cache.  */
+#endif
 #include <time.h>
 #endif
 #include <sys/param.h>
@@ -95,22 +100,28 @@ int    write(), read();
 #include <bzlib.h>
 #endif /* HAVE_BZLIB */
 
+#ifdef HAVE_LZO
+#include <minilzo.h>
+#endif /* HAVE_LZO */
+
 #include "dump.h"
 
 int    writesize;              /* size of malloc()ed buffer for tape */
 long   lastspclrec = -1;       /* tape block number of last written header */
 int    trecno = 0;             /* next record to write in current block */
-extern long blocksperfile;     /* number of blocks per output file */
+extern long *blocksperfiles;   /* number of blocks per output file(s) */
+long   blocksperfiles_current; /* current position in blocksperfiles */
 long   blocksthisvol;          /* number of blocks on current output file */
 extern int ntrec;              /* blocking factor on tape */
 extern int cartridge;
 char   *nexttape;
 extern  pid_t rshpid;
+int    eot_code = 1;
 long long tapea_bytes = 0;     /* bytes_written at start of current volume */
 static int magtapeout;         /* output is really a tape */
 
-static ssize_t dump_atomic_read __P((int, void *, size_t));
-static ssize_t dump_atomic_write __P((int, const void *, size_t));
+static ssize_t dump_atomic_read __P((int, char *, size_t));
+static ssize_t dump_atomic_write __P((int, const char *, size_t));
 #ifdef WRITEDEBUG
 static void doslave __P((int, int, int));
 #else
@@ -122,7 +133,8 @@ static      void killall __P((void));
 static void rollforward __P((void));
 #ifdef USE_QFA
 static int GetTapePos __P((long long *));
-static void MkTapeString __P((struct s_spcl *, long long));
+static int MkTapeString __P((struct s_spcl *, long long));
+#define FILESQFAPOS    20
 #endif
 
 /*
@@ -134,7 +146,7 @@ static void MkTapeString __P((struct s_spcl *, long long));
  * The following structure defines the instruction packets sent to slaves.
  */
 struct req {
-       daddr_t dblk;
+       ext2_loff_t dblk;
        int count;
 };
 int reqsiz;
@@ -175,6 +187,41 @@ static sigjmp_buf jmpbuf;  /* where to jump to if we are ready when the */
 static int gtperr = 0;
 #endif
 
+/*
+ * Determine if we can use Linux' clone system call.  If so, call it
+ * with the CLONE_IO flag so that all processes will share the same I/O
+ * context, allowing the I/O schedulers to make better scheduling decisions.
+ */
+#ifdef __linux__
+/* first, pull in the header files that define sys_clone and CLONE_IO */
+#include <syscall.h>
+#define _GNU_SOURCE
+#include <sched.h>
+#include <unistd.h>
+#undef _GNU_SOURCE
+
+/* If either is not present, fall back on the fork behaviour */
+#if ! defined(SYS_clone) || ! defined (CLONE_IO)
+#define fork_clone_io fork
+#else /* SYS_clone */
+/* CLONE_IO is available, determine which version of sys_clone to use */
+#include <linux/version.h>
+/*
+ * Kernel 2.5.49 introduced two extra parameters to the clone system call.
+ * Neither is useful in our case, so this is easy to handle.
+ */
+#if LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,49)
+/* clone_flags, child_stack, parent_tidptr, child_tidptr */
+#define CLONE_ARGS SIGCHLD|CLONE_IO, 0, NULL, NULL
+#else
+#define CLONE_ARGS SIGCHLD|CLONE_IO, 0
+#endif /* LINUX_VERSION_CODE */
+pid_t fork_clone_io(void);
+#endif /* SYS_clone */
+#else /* __linux__ not defined */
+#define fork_clone_io fork
+#endif /* __linux__ */
+
 int
 alloctape(void)
 {
@@ -190,7 +237,7 @@ alloctape(void)
         * repositioning after stopping, i.e, streaming mode, where the gap is
         * variable, 0.30" to 0.45".  The gap is maximal when the tape stops.
         */
-       if (blocksperfile == 0 && !unlimited)
+       if (!blocksperfiles && !unlimited)
                tenths = (cartridge ? 16 : density == 625 ? 5 : 8);
        else {
                tenths = 0;
@@ -226,7 +273,7 @@ void
 writerec(const void *dp, int isspcl)
 {
 
-       slp->req[trecno].dblk = (daddr_t)0;
+       slp->req[trecno].dblk = (ext2_loff_t)0;
        slp->req[trecno].count = 1;
        /* XXX post increment triggers an egcs-1.1.2-12 bug on alpha/sparc */
        *(union u_spcl *)(*(nextblock)) = *(union u_spcl *)dp;
@@ -234,7 +281,7 @@ writerec(const void *dp, int isspcl)
        /* Need to write it to the archive file */
        if (! AfileActive && isspcl && (spcl.c_type == TS_END))
                AfileActive = 1;
-       if (AfileActive && Afile >= 0) {
+       if (AfileActive && Afile >= 0 && !(spcl.c_flags & DR_EXTATTRIBUTES)) {
                /* When we dump an inode which is not a directory,
                 * it means we ended the archive contents */
                if (isspcl && (spcl.c_type == TS_INODE) &&
@@ -264,9 +311,10 @@ writerec(const void *dp, int isspcl)
 }
 
 void
-dumpblock(daddr_t blkno, int size)
+dumpblock(blk_t blkno, int size)
 {
-       int avail, tpblks, dblkno;
+       int avail, tpblks;
+       ext2_loff_t dblkno;
 
        dblkno = fsbtodb(sblock, blkno);
        tpblks = size >> tp_bshift;
@@ -407,6 +455,10 @@ flushtape(void)
 
        int siz = (char *)nextblock - (char *)slp->req;
 
+       /* make sure returned has sane values in case we don't read 
+        * them from the slave in this pass */
+       returned.unclen = returned.clen = writesize;
+
        slp->req[trecno].count = 0;                     /* Sentinel */
 
        if (dump_atomic_write( slp->fd, (char *)slp->req, siz) != siz)
@@ -462,10 +514,14 @@ flushtape(void)
        }
 
        blks = 0;
-       if (spcl.c_type != TS_END) {
-               for (i = 0; i < spcl.c_count; i++)
-                       if (spcl.c_addr[i] != 0)
-                               blks++;
+       if (spcl.c_type == TS_CLRI || spcl.c_type == TS_BITS)
+               blks = spcl.c_count;
+       else {
+               if (spcl.c_type != TS_END) {
+                       for (i = 0; i < spcl.c_count; i++)
+                               if (spcl.c_addr[i] != 0)
+                                       blks++;
+               }
        }
        slp->count = lastspclrec + blks + 1 - spcl.c_tapea;
        slp->tapea = spcl.c_tapea;
@@ -477,9 +533,9 @@ flushtape(void)
        blockswritten += ntrec;
        blocksthisvol += ntrec;
        if (!pipeout && !unlimited) {
-               if (blocksperfile) {
-                       if ( compressed ? (bytes_written - tapea_bytes + SLAVES * (writesize + sizeof(struct tapebuf))) >= (((long long)blocksperfile) * 1024)
-                                       : blocksthisvol >= blocksperfile ) {
+               if (blocksperfiles && blocksperfiles[blocksperfiles_current]) {
+                       if ( compressed ? (bytes_written - tapea_bytes + SLAVES * (writesize + sizeof(struct tapebuf))) >= (((long long)blocksperfiles[blocksperfiles_current]) * 1024)
+                                       : blocksthisvol >= blocksperfiles[blocksperfiles_current] ) {
                                close_rewind();
                                startnewtape(0);
                        }
@@ -565,8 +621,6 @@ close_rewind(void)
 {
        int eot_code = 1;
        (void)trewind();
-       if (nexttape || Mflag)
-               return;
        if (eot_script) {
                msg("Launching %s\n", eot_script);
                eot_code = system_command(eot_script, tape, tapeno);
@@ -577,6 +631,8 @@ close_rewind(void)
        }
        if (eot_code == 0)
                return;
+       if (nexttape || Mflag)
+               return;
        if (!nogripe) {
                msg("Change Volumes: Mount volume #%d\n", tapeno+1);
                broadcast("CHANGE DUMP VOLUMES!\7\7\n");
@@ -603,6 +659,10 @@ rollforward(void)
        tslp = &slaves[SLAVES];
        ntb = (union u_spcl *)tslp->tblock[1];
 
+       /* make sure returned has sane values in case we don't read 
+        * them from the slave in this pass */
+       returned.unclen = returned.clen = writesize;
+
        /*
         * Each of the N slaves should have requests that need to
         * be replayed on the next tape.  Use the extra slave buffers
@@ -730,6 +790,16 @@ rollforward(void)
 #endif
 }
 
+#ifdef __linux__
+#if defined(SYS_clone) && defined(CLONE_IO)
+pid_t
+fork_clone_io(void)
+{
+       return syscall(SYS_clone, CLONE_ARGS);
+}
+#endif
+#endif
+
 /*
  * We implement taking and restoring checkpoints on the tape level.
  * When each tape is opened, a new process is created by forking; this
@@ -776,7 +846,7 @@ restore_check_point:
        /*
         *      All signals are inherited...
         */
-       childpid = fork();
+       childpid = fork_clone_io();
        if (childpid < 0) {
                msg("Context save fork fails in parent %d\n", parentpid);
                Exit(X_ABORT);
@@ -864,6 +934,8 @@ restore_check_point:
                        tape[MAXPATHLEN - 1] = '\0';
                        msg("Dumping volume %d on %s\n", tapeno, tape);
                }
+               if (blocksperfiles && blocksperfiles_current < *blocksperfiles)
+                       blocksperfiles_current++;
 #ifdef RDUMP
                while ((tapefd = (host ? rmtopen(tape, O_WRONLY|O_CREAT|O_TRUNC) : pipeout ? 
                        fileno(stdout) : 
@@ -990,7 +1062,7 @@ enslave(void)
                }
 
                if (socketpair(AF_UNIX, SOCK_STREAM, 0, cmd) < 0 ||
-                   (slaves[i].pid = fork()) < 0)
+                   (slaves[i].pid = fork_clone_io()) < 0)
                        quit("too many slaves, %d (recompile smaller): %s\n",
                            i, strerror(errno));
 
@@ -1072,15 +1144,15 @@ doslave(int cmd,
        int nextslave;
        volatile int wrote = 0, size, eot_count, bufsize;
        char * volatile buffer;
-#if defined(HAVE_ZLIB) || defined(HAVE_BZLIB)
+#if defined(HAVE_ZLIB) || defined(HAVE_BZLIB) || defined(HAVE_LZO)
        struct tapebuf * volatile comp_buf = NULL;
        int compresult;
        volatile int do_compress = !first;
        unsigned long worklen;
-#ifdef HAVE_BZLIB
-       unsigned int worklen2;
+#ifdef HAVE_LZO
+       lzo_align_t __LZO_MMODEL *LZO_WorkMem;
 #endif
-#endif /* HAVE_ZLIB || HAVE_BZLIB */
+#endif /* HAVE_ZLIB || HAVE_BZLIB || HAVE_LZO */
        struct slave_results returns;
 #ifdef __linux__
        errcode_t retval;
@@ -1089,6 +1161,9 @@ doslave(int cmd,
        long long curtapepos;
        union u_spcl *uspclptr;
        struct s_spcl *spclptr;
+       /* long         maxntrecs = 300000000 / (ntrec * 1024);  last tested: 50 000 000 */
+       long            maxntrecs = 50000;      /* every 50MB */
+       long            cntntrecs = maxntrecs;
 #endif /* USE_QFA */
        sigset_t set;
 
@@ -1105,7 +1180,7 @@ doslave(int cmd,
                quit("slave couldn't reopen disk: %s\n", strerror(errno));
 #ifdef __linux__
 #ifdef BLKFLSBUF
-       (void)ioctl(diskfd, BLKFLSBUF);
+       (void)ioctl(diskfd, BLKFLSBUF, 0);
 #endif
        ext2fs_close(fs);
        retval = dump_fs_open(disk, &fs);
@@ -1121,18 +1196,34 @@ doslave(int cmd,
                quit("master/slave protocol botched - didn't get pid of next slave.\n");
        }
 
-#if defined(HAVE_ZLIB) || defined(HAVE_BZLIB)
+#if defined(HAVE_ZLIB) || defined(HAVE_BZLIB) || defined(HAVE_LZO)
        /* if we're doing a compressed dump, allocate the compress buffer */
        if (compressed) {
-               comp_buf = malloc(sizeof(struct tapebuf) + TP_BSIZE + writesize);
+               int bsiz = sizeof(struct tapebuf) + writesize;
+               /* Add extra space to deal with compression enlarging the buffer */
+               if (TP_BSIZE > writesize/16 + 67)
+                       bsiz += TP_BSIZE;
+               else
+                       bsiz += writesize/16 + 67;
+               comp_buf = malloc(bsiz);
                if (comp_buf == NULL)
                        quit("couldn't allocate a compress buffer.\n");
-               if (bzipflag)
-                       comp_buf->flags = COMPRESS_BZLIB;
-               else
+               if (zipflag == COMPRESS_ZLIB)
                        comp_buf->flags = COMPRESS_ZLIB;
+               else if (zipflag == COMPRESS_BZLIB)
+                       comp_buf->flags = COMPRESS_BZLIB;
+                else if (zipflag == COMPRESS_LZO) {
+                       comp_buf->flags = COMPRESS_LZO;
+                       if (lzo_init() != LZO_E_OK) quit("lzo_init failed\n");
+                } else 
+                       quit("internal error - unknown compression method: %d\n", zipflag);
        }
-#endif /* HAVE_ZLIB || HAVE_BZLIB */
+#ifdef HAVE_LZO
+       LZO_WorkMem = malloc(LZO1X_1_MEM_COMPRESS);
+       if (!LZO_WorkMem)
+               quit("couldn't allocate a compress buffer.\n");
+#endif
+#endif /* HAVE_ZLIB || HAVE_BZLIB || HAVE_LZO */
 
        /*
         * Get list of blocks to dump, read the blocks into tape buffer
@@ -1161,7 +1252,7 @@ doslave(int cmd,
                bufsize = writesize;                    /* length to write */
                returns.clen = returns.unclen = bufsize;
 
-#if defined(HAVE_ZLIB) || defined(HAVE_BZLIB)
+#if defined(HAVE_ZLIB) || defined(HAVE_BZLIB) || defined(HAVE_LZO)
                /* 
                 * When writing a compressed dump, each block except
                 * the first one on each tape is written
@@ -1180,7 +1271,7 @@ doslave(int cmd,
                        worklen = TP_BSIZE + writesize;
                        compresult = 1;
 #ifdef HAVE_ZLIB
-                       if (!bzipflag) {
+                       if (zipflag == COMPRESS_ZLIB) {
                                compresult = compress2(comp_buf->buf, 
                                                       &worklen,
                                                       (char *)slp->tblock[0],
@@ -1193,8 +1284,8 @@ doslave(int cmd,
                        }
 #endif /* HAVE_ZLIB */
 #ifdef HAVE_BZLIB
-                       if (bzipflag) {
-                               worklen2 = worklen;
+                       if (zipflag == COMPRESS_BZLIB) {
+                               unsigned int worklen2 = worklen;
                                compresult = BZ2_bzBuffToBuffCompress(
                                                       comp_buf->buf,
                                                       &worklen2,
@@ -1210,6 +1301,20 @@ doslave(int cmd,
                        }
 
 #endif /* HAVE_BZLIB */
+#ifdef HAVE_LZO
+                       if (zipflag == COMPRESS_LZO) {
+                               lzo_uint worklen2 = worklen;
+                               compresult = lzo1x_1_compress((char *)slp->tblock[0],writesize,
+                                                              comp_buf->buf,
+                                                             &worklen2,
+                                                              LZO_WorkMem);
+                               worklen = worklen2;
+                               if (compresult == LZO_E_OK)
+                                       compresult = 1;
+                               else
+                                       compresult = 0;
+                       }
+#endif /* HAVE_LZO */
                        if (compresult && worklen <= ((unsigned long)writesize - 16)) {
                                /* write the compressed buffer */
                                comp_buf->length = worklen;
@@ -1229,7 +1334,7 @@ doslave(int cmd,
                }
                /* compress the remaining blocks if we're compressing */
                do_compress = compressed;
-#endif /* HAVE_ZLIB  || HAVE_BZLIB */
+#endif /* HAVE_ZLIB  || HAVE_BZLIB || HAVE_LZO */
 
                if (sigsetjmp(jmpbuf, 1) == 0) {
                        ready = 1;
@@ -1242,23 +1347,31 @@ doslave(int cmd,
 #ifdef USE_QFA
                if (gTapeposfd >= 0) {
                        int i;
-                       int firstpass = 1;
-                       for (i = 0; i < ntrec; ++i) {
+                       int foundone = 0;
+
+                       for (i = 0; (i < ntrec) && !foundone; ++i) {
                                uspclptr = (union u_spcl *)&slp->tblock[i];
                                spclptr = &uspclptr->s_spcl;
                                if ((spclptr->c_magic == NFS_MAGIC) && 
-                                   (spclptr->c_type == TS_INODE) &&
-                                   ((spclptr->c_dinode.di_mode & S_IFMT) != IFDIR) &&
-                                   (spclptr->c_date == gThisDumpDate)) {
-                                       /* if an error occured previously don't
-                                        * try again */
-                                       if (firstpass) {
-                                               firstpass = 0;
+                                                       (spclptr->c_type == TS_INODE) &&
+                                                       (spclptr->c_date == gThisDumpDate) &&
+                                                       !(spclptr->c_dinode.di_mode & S_IFDIR) &&
+                                                       !(spclptr->c_flags & DR_EXTATTRIBUTES)
+                                               ) {
+                                       foundone = 1;
+                                       /* if (cntntrecs >= maxntrecs) {         only write every maxntrecs amount of data */
+                                               cntntrecs = 0;
                                                if (gtperr == 0) 
                                                        gtperr = GetTapePos(&curtapepos);
-                                       }
-                                       if (gtperr == 0)
-                                                       MkTapeString(spclptr, curtapepos);
+                                               /* if an error occured previously don't
+                                                * try again */
+                                               if (gtperr == 0) {
+#ifdef DEBUG_QFA
+                                                       msg("inode %ld at tapepos %ld\n", spclptr->c_inumber, curtapepos);
+#endif
+                                                       gtperr = MkTapeString(spclptr, curtapepos);
+                                               }
+                                       /* } */
                                }
                        }
                }
@@ -1274,11 +1387,12 @@ doslave(int cmd,
 #ifdef WRITEDEBUG
                        printf("slave %d wrote %d\n", slave_number, wrote);
 #endif
-                       if (wrote < 0)
+                       if (wrote < 0 && errno != ENOSPC)
                                break;
-                       if (wrote == 0)
+                       if (wrote == 0 || (wrote < 0 && errno == ENOSPC))
                                eot_count++;
-                       size += wrote;
+                       else
+                               size += wrote;
                }
 
 #ifdef WRITEDEBUG
@@ -1314,6 +1428,11 @@ doslave(int cmd,
                 * Signal the next slave to go.
                 */
                (void) kill(nextslave, SIGUSR2);
+#ifdef USE_QFA
+               if (gTapeposfd >= 0) {
+                       cntntrecs += ntrec;
+               }
+#endif /* USE_QFA */
        }
        if (nread != 0)
                quit("error reading command pipe: %s\n", strerror(errno));
@@ -1325,13 +1444,13 @@ doslave(int cmd,
  * loop until the count is satisfied (or error).
  */
 static ssize_t
-dump_atomic_read(int fd, void *buf, size_t count)
+dump_atomic_read(int fd, char *buf, size_t count)
 {
        int got, need = count;
 
        do {
                while ((got = read(fd, buf, need)) > 0 && (need -= got) > 0)
-                       (char *)buf += got;
+                       buf += got;
        } while (got == -1 && errno == EINTR);
        return (got < 0 ? got : (ssize_t)count - need);
 }
@@ -1342,19 +1461,39 @@ dump_atomic_read(int fd, void *buf, size_t count)
  * loop until the count is satisfied (or error).
  */
 static ssize_t
-dump_atomic_write(int fd, const void *buf, size_t count)
+dump_atomic_write(int fd, const char *buf, size_t count)
 {
        int got, need = count;
 
        do {
                while ((got = write(fd, buf, need)) > 0 && (need -= got) > 0)
-                       (char *)buf += got;
+                       buf += got;
        } while (got == -1 && errno == EINTR);
        return (got < 0 ? got : (ssize_t)count - need);
 }
 
 
+/*
+int
+SetLogicalPos(void)
+{
+       int     err = 0;
+       struct mt_pos buf;
+
+       buf.mt_op = MTSETDRVBUFFER;
+       buf.mt_count = MT_ST_BOOLEANS | MT_ST_SCSI2LOGICAL;
+       if (ioctl(tapefd, MTIOCTOP, &buf) == -1) {
+               err = errno;
+               msg("[%ld] error: %d (setting logical)\n", 
+                       (unsigned long)getpid(), err);
+       }
+       return err;
+}
+*/
+
 #ifdef USE_QFA
+#define LSEEK_GET_TAPEPOS      10
+#define LSEEK_GO2_TAPEPOS      11
 /*
  * read the current tape position
  */
@@ -1365,7 +1504,7 @@ GetTapePos(long long *pos)
 
 #ifdef RDUMP
        if (host) {
-               *pos = (long long) rmtseek(0, SEEK_CUR);
+               *pos = (long long) rmtseek((OFF_T)0, (int)LSEEK_GET_TAPEPOS);
                err = *pos < 0;
        }
        else 
@@ -1391,8 +1530,10 @@ GetTapePos(long long *pos)
        return err;
 }
 
-static void 
-MkTapeString(struct s_spcl *spclptr, long long curtapepos) {
+static int 
+MkTapeString(struct s_spcl *spclptr, long long curtapepos)
+{
+       int     err = 0;
 
 #ifdef DEBUG_QFA
        msg("inode %ld at tapepos %lld\n", spclptr->c_inumber, curtapepos);
@@ -1404,7 +1545,9 @@ MkTapeString(struct s_spcl *spclptr, long long curtapepos) {
                 curtapepos);
        gTps[sizeof(gTps) - 1] = '\0';
        if (write(gTapeposfd, gTps, strlen(gTps)) != (ssize_t)strlen(gTps)) {
-               warn("error writing tapepos file.\n");
+               err = errno;
+       warn("error writing tapepos file. (error %d)\n", errno);
        }
+       return err;
 }
 #endif /* USE_QFA */