]> git.wh0rd.org - dump.git/blobdiff - dump/tape.c
Regenerate configure.
[dump.git] / dump / tape.c
index 3e41622b3f51057daa35ac73855ae17245f250f9..bab3379782ca445b26caa219d4a244522c6ecc87 100644 (file)
@@ -37,7 +37,7 @@
 
 #ifndef lint
 static const char rcsid[] =
-       "$Id: tape.c,v 1.90 2008/06/04 19:27:48 stelian Exp $";
+       "$Id: tape.c,v 1.96 2011/06/10 13:41:41 stelian Exp $";
 #endif /* not lint */
 
 #include <config.h>
@@ -92,19 +92,9 @@ int    write(), read();
 
 #include <protocols/dumprestore.h>
 
-#ifdef HAVE_ZLIB
-#include <zlib.h>
-#endif /* HAVE_ZLIB */
-
-#ifdef HAVE_BZLIB
-#include <bzlib.h>
-#endif /* HAVE_BZLIB */
-
-#ifdef HAVE_LZO
-#include <minilzo.h>
-#endif /* HAVE_LZO */
-
 #include "dump.h"
+#include "indexer.h"
+#include "slave.h"
 
 int    writesize;              /* size of malloc()ed buffer for tape */
 long   lastspclrec = -1;       /* tape block number of last written header */
@@ -131,24 +121,7 @@ static     void enslave __P((void));
 static void flushtape __P((void));
 static void killall __P((void));
 static void rollforward __P((void));
-#ifdef USE_QFA
-static int GetTapePos __P((long long *));
-static int MkTapeString __P((struct s_spcl *, long long));
-#define FILESQFAPOS    20
-#endif
 
-/*
- * Concurrent dump mods (Caltech) - disk block reading and tape writing
- * are exported to several slave processes.  While one slave writes the
- * tape, the others read disk blocks; they pass control of the tape in
- * a ring via signals. The parent process traverses the filesystem and
- * sends writeheader()'s and lists of daddr's to the slaves via pipes.
- * The following structure defines the instruction packets sent to slaves.
- */
-struct req {
-       ext2_loff_t dblk;
-       int count;
-};
 int reqsiz;
 
 struct slave_results {
@@ -156,19 +129,7 @@ struct slave_results {
        ssize_t clen;           /* compressed length */
 };
 
-#define SLAVES 3               /* 1 slave writing, 1 reading, 1 for slack */
-struct slave {
-       int tapea;              /* header number at start of this chunk */
-       int count;              /* count to next header (used for TS_TAPE */
-                               /* after EOT) */
-       int inode;              /* inode that we are currently dealing with */
-       int fd;                 /* FD for this slave */
-       int pid;                /* PID for this slave */
-       int sent;               /* 1 == we've sent this slave requests */
-       int firstrec;           /* record number of this block */
-       char (*tblock)[TP_BSIZE]; /* buffer for data blocks */
-       struct req *req;        /* buffer for requests */
-} slaves[SLAVES+1];
+struct slave slaves[SLAVES+1];
 struct slave *slp;
 
 char   (*nextblock)[TP_BSIZE];
@@ -178,14 +139,60 @@ static int tapea_volume;  /* value of spcl.c_tapea at volume start */
 
 int master;            /* pid of master, for sending error signals */
 int tenths;            /* length of tape overhead per block written */
-static int caught;     /* have we caught the signal to proceed? */
-static int ready;      /* have we reached the lock point without having */
+static int caught1;    /* have we caught the signal to proceed? */
+static int ready1;     /* have we reached the lock point without having */
+                       /* received the SIGUSR2 signal from the prev slave? */
+static sigjmp_buf jmpbuf1;     /* where to jump to if we are ready when the */
+                       /* SIGUSR1 arrives from the previous slave */
+static int caught2;    /* have we caught the signal to proceed? */
+static int ready2;     /* have we reached the lock point without having */
                        /* received the SIGUSR2 signal from the prev slave? */
-static sigjmp_buf jmpbuf     /* where to jump to if we are ready when the */
+static sigjmp_buf jmpbuf2;     /* where to jump to if we are ready when the */
                        /* SIGUSR2 arrives from the previous slave */
-#ifdef USE_QFA
-static int gtperr = 0;
+
+/*
+ * Determine if we can use Linux' clone system call.  If so, call it
+ * with the CLONE_IO flag so that all processes will share the same I/O
+ * context, allowing the I/O schedulers to make better scheduling decisions.
+ */
+#ifdef __linux__
+/* first, pull in the header files that define sys_clone and CLONE_IO */
+#include <syscall.h>
+#define _GNU_SOURCE
+#include <sched.h>
+#include <unistd.h>
+#undef _GNU_SOURCE
+
+/* If either is not present, fall back on the fork behaviour */
+#if ! defined(SYS_clone) || ! defined (CLONE_IO)
+#define fork_clone_io fork
+#else /* SYS_clone */
+/* CLONE_IO is available, determine which version of sys_clone to use */
+#include <linux/version.h>
+/*
+ * Kernel 2.5.49 introduced two extra parameters to the clone system call.
+ * Neither is useful in our case, so this is easy to handle.
+ */
+#if LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,49)
+/*
+ * Parameters of the sys_clone syscall are
+ * clone_flags, child_stack, parent_tidptr, child_tidptr
+ * on all architectures except s390 and s390x
+ * s390* have child_stack, clone_flags, parent_tidptr, child_tidptr
+ */
+#if defined(__s390__) || defined(__s390x__)
+#define CLONE_ARGS 0, SIGCHLD|CLONE_IO, NULL, NULL
+#else
+#define CLONE_ARGS SIGCHLD|CLONE_IO, 0, NULL, NULL
 #endif
+#else
+#define CLONE_ARGS SIGCHLD|CLONE_IO, 0
+#endif /* LINUX_VERSION_CODE */
+pid_t fork_clone_io(void);
+#endif /* SYS_clone */
+#else /* __linux__ not defined */
+#define fork_clone_io fork
+#endif /* __linux__ */
 
 int
 alloctape(void)
@@ -237,34 +244,13 @@ alloctape(void)
 void
 writerec(const void *dp, int isspcl)
 {
-
        slp->req[trecno].dblk = (ext2_loff_t)0;
        slp->req[trecno].count = 1;
        /* XXX post increment triggers an egcs-1.1.2-12 bug on alpha/sparc */
        *(union u_spcl *)(*(nextblock)) = *(union u_spcl *)dp;
 
        /* Need to write it to the archive file */
-       if (! AfileActive && isspcl && (spcl.c_type == TS_END))
-               AfileActive = 1;
-       if (AfileActive && Afile >= 0 && !(spcl.c_flags & DR_EXTATTRIBUTES)) {
-               /* When we dump an inode which is not a directory,
-                * it means we ended the archive contents */
-               if (isspcl && (spcl.c_type == TS_INODE) &&
-                   ((spcl.c_dinode.di_mode & S_IFMT) != IFDIR))
-                       AfileActive = 0;
-               else {
-                       union u_spcl tmp;
-                       tmp = *(union u_spcl *)dp;
-                       /* Write the record, _uncompressed_ */
-                       if (isspcl) {
-                               tmp.s_spcl.c_flags &= ~DR_COMPRESSED;
-                               mkchecksum(&tmp);
-                       }
-                       if (write(Afile, &tmp, TP_BSIZE) != TP_BSIZE)
-                               msg("error writing archive file: %s\n", 
-                               strerror(errno));
-               }
-       }
+       indexer->writerec(dp, isspcl);
 
        nextblock++;
        if (isspcl)
@@ -755,6 +741,16 @@ rollforward(void)
 #endif
 }
 
+#ifdef __linux__
+#if defined(SYS_clone) && defined(CLONE_IO)
+pid_t
+fork_clone_io(void)
+{
+       return syscall(SYS_clone, CLONE_ARGS);
+}
+#endif
+#endif
+
 /*
  * We implement taking and restoring checkpoints on the tape level.
  * When each tape is opened, a new process is created by forking; this
@@ -801,7 +797,7 @@ restore_check_point:
        /*
         *      All signals are inherited...
         */
-       childpid = fork();
+       childpid = fork_clone_io();
        if (childpid < 0) {
                msg("Context save fork fails in parent %d\n", parentpid);
                Exit(X_ABORT);
@@ -944,6 +940,7 @@ restore_check_point:
                                tapeno, slp->inode);
                if (tapeno < (int)TP_NINOS)
                        volinfo[tapeno] = slp->inode;
+               transformation->startNewTape(transformation, NULL, 0);
        }
 }
 
@@ -974,15 +971,26 @@ Exit(int status)
        exit(status);
 }
 
+/*
+ * proceed - handler for SIGUSR1, used to synchronize IO between the slaves.
+ */
+static void
+proceed1(UNUSED(int signo))
+{
+       if (ready1)
+               siglongjmp(jmpbuf1, 1);
+       caught1++;
+}
+
 /*
  * proceed - handler for SIGUSR2, used to synchronize IO between the slaves.
  */
 static void
-proceed(UNUSED(int signo))
+proceed2(UNUSED(int signo))
 {
-       if (ready)
-               siglongjmp(jmpbuf, 1);
-       caught++;
+       if (ready2)
+               siglongjmp(jmpbuf2, 1);
+       caught2++;
 }
 
 void
@@ -1004,20 +1012,25 @@ enslave(void)
        sigaction(SIGTERM, &sa, NULL); /* Slave sends SIGTERM on dumpabort() */
        sa.sa_handler = sigpipe;
        sigaction(SIGPIPE, &sa, NULL);
-       sa.sa_handler = proceed;
+       sa.sa_handler = proceed1;
+       sa.sa_flags = SA_RESTART;
+       sigaction(SIGUSR1, &sa, NULL); /* Slave sends SIGUSR1 to next slave */
+       sa.sa_handler = proceed2;
        sa.sa_flags = SA_RESTART;
        sigaction(SIGUSR2, &sa, NULL); /* Slave sends SIGUSR2 to next slave */
    }
 
        for (i = 0; i < SLAVES; i++) {
                if (i == slp - &slaves[0]) {
-                       caught = 1;
+                       caught1 = 1;
+                       caught2 = 1;
                } else {
-                       caught = 0;
+                       caught1 = 0;
+                       caught2 = 0;
                }
 
                if (socketpair(AF_UNIX, SOCK_STREAM, 0, cmd) < 0 ||
-                   (slaves[i].pid = fork()) < 0)
+                   (slaves[i].pid = fork_clone_io()) < 0)
                        quit("too many slaves, %d (recompile smaller): %s\n",
                            i, strerror(errno));
 
@@ -1086,7 +1099,9 @@ killall(void)
  * previous process before writing to the tape, and sends SIGUSR2
  * to the next process when the tape write completes. On tape errors
  * a SIGUSR1 is sent to the master which then terminates all of the
- * slaves.
+ * slaves.  Each process sends SIGUSR1 to the next to signal that it
+ * is time to start reading from the disk, after it finishes reading
+ * and moves to the compression phase.
  */
 static void
 doslave(int cmd, 
@@ -1099,34 +1114,34 @@ doslave(int cmd,
        int nextslave;
        volatile int wrote = 0, size, eot_count, bufsize;
        char * volatile buffer;
-#if defined(HAVE_ZLIB) || defined(HAVE_BZLIB) || defined(HAVE_LZO)
+#if defined(HAVE_BLOCK_TRANSFORMATION)
        struct tapebuf * volatile comp_buf = NULL;
        int compresult;
        volatile int do_compress = !first;
        unsigned long worklen;
-#ifdef HAVE_LZO
-       lzo_align_t __LZO_MMODEL *LZO_WorkMem;
-#endif
-#endif /* HAVE_ZLIB || HAVE_BZLIB || HAVE_LZO */
+#endif /* HAVE_BLOCK_TRANSFORMATION */
+
        struct slave_results returns;
 #ifdef __linux__
        errcode_t retval;
 #endif
 #ifdef USE_QFA
-       long long curtapepos;
-       union u_spcl *uspclptr;
-       struct s_spcl *spclptr;
-       /* long         maxntrecs = 300000000 / (ntrec * 1024);  last tested: 50 000 000 */
-       long            maxntrecs = 50000;      /* every 50MB */
-       long            cntntrecs = maxntrecs;
+    QFA_State qfa_state;
 #endif /* USE_QFA */
        sigset_t set;
 
        sigemptyset(&set);
+       sigaddset(&set, SIGUSR1);
        sigaddset(&set, SIGUSR2);
        sigprocmask(SIG_BLOCK, &set, NULL);
        sigemptyset(&set);
 
+#ifdef HAVE_BLOCK_TRANSFORMATION
+       transformation->startDiskIOProcess(transformation);
+#endif /* HAVE_BLOCK_TRANSFORMATION */
+
+       indexer->openQfaState(&qfa_state);
+
        /*
         * Need our own seek pointer.
         */
@@ -1151,34 +1166,29 @@ doslave(int cmd,
                quit("master/slave protocol botched - didn't get pid of next slave.\n");
        }
 
-#if defined(HAVE_ZLIB) || defined(HAVE_BZLIB) || defined(HAVE_LZO)
+#if defined(HAVE_BLOCK_TRANSFORMATION)
        /* if we're doing a compressed dump, allocate the compress buffer */
        if (compressed) {
                int bsiz = sizeof(struct tapebuf) + writesize;
-               /* Add extra space to deal with compression enlarging the buffer */
-               if (TP_BSIZE > writesize/16 + 67)
+               /* Add extra space to deal with compression or encryption enlarging the buffer */
+               if (TP_BSIZE > writesize/16 + 200)
                        bsiz += TP_BSIZE;
                else
-                       bsiz += writesize/16 + 67;
+                       bsiz += writesize/16 + 200;
                comp_buf = malloc(bsiz);
                if (comp_buf == NULL)
                        quit("couldn't allocate a compress buffer.\n");
+               transformation->initialize(transformation, 1);
                if (zipflag == COMPRESS_ZLIB)
                        comp_buf->flags = COMPRESS_ZLIB;
                else if (zipflag == COMPRESS_BZLIB)
                        comp_buf->flags = COMPRESS_BZLIB;
-                else if (zipflag == COMPRESS_LZO) {
+        else if (zipflag == COMPRESS_LZO)
                        comp_buf->flags = COMPRESS_LZO;
-                       if (lzo_init() != LZO_E_OK) quit("lzo_init failed\n");
-                } else 
+        else
                        quit("internal error - unknown compression method: %d\n", zipflag);
        }
-#ifdef HAVE_LZO
-       LZO_WorkMem = malloc(LZO1X_1_MEM_COMPRESS);
-       if (!LZO_WorkMem)
-               quit("couldn't allocate a compress buffer.\n");
-#endif
-#endif /* HAVE_ZLIB || HAVE_BZLIB || HAVE_LZO */
+#endif /* HAVE_BLOCK_TRANSFORMATION */
 
        /*
         * Get list of blocks to dump, read the blocks into tape buffer
@@ -1186,8 +1196,18 @@ doslave(int cmd,
        while ((nread = dump_atomic_read( cmd, (char *)slp->req, reqsiz)) == reqsiz) {
                struct req *p = slp->req;
 
+               /* wait for previous slave to finish reading */
+               if (sigsetjmp(jmpbuf1, 1) == 0) {
+                       ready1 = 1;
+                       if (!caught1)
+                               sigsuspend(&set);
+               }
+               ready1 = 0;
+               caught1 = 0;
+
                for (trecno = 0; trecno < ntrec;
                     trecno += p->count, p += p->count) {
+                        
                        if (p->dblk) {  /* read a disk block */
                                bread(p->dblk, slp->tblock[trecno],
                                        p->count * TP_BSIZE);
@@ -1198,6 +1218,8 @@ doslave(int cmd,
                                       quit("master/slave protocol botched.\n");
                        }
                }
+               /* signal next slave to start reading */
+               (void) kill(nextslave, SIGUSR1);
 
                /* Try to write the data... */
                wrote = 0;
@@ -1207,7 +1229,7 @@ doslave(int cmd,
                bufsize = writesize;                    /* length to write */
                returns.clen = returns.unclen = bufsize;
 
-#if defined(HAVE_ZLIB) || defined(HAVE_BZLIB) || defined(HAVE_LZO)
+#if defined(HAVE_BLOCK_TRANSFORMATION)
                /* 
                 * When writing a compressed dump, each block except
                 * the first one on each tape is written
@@ -1220,56 +1242,15 @@ doslave(int cmd,
                 * The first block written by each slave is not compressed
                 * and does not have a prefix.
                 */
-
                if (compressed && do_compress) {
                        comp_buf->length = bufsize;
                        worklen = TP_BSIZE + writesize;
                        compresult = 1;
-#ifdef HAVE_ZLIB
-                       if (zipflag == COMPRESS_ZLIB) {
-                               compresult = compress2(comp_buf->buf, 
-                                                      &worklen,
-                                                      (char *)slp->tblock[0],
-                                                      writesize, 
-                                                      compressed);
-                               if (compresult == Z_OK)
-                                       compresult = 1;
-                               else
-                                       compresult = 0;
-                       }
-#endif /* HAVE_ZLIB */
-#ifdef HAVE_BZLIB
-                       if (zipflag == COMPRESS_BZLIB) {
-                               unsigned int worklen2 = worklen;
-                               compresult = BZ2_bzBuffToBuffCompress(
-                                                      comp_buf->buf,
-                                                      &worklen2,
-                                                      (char *)slp->tblock[0],
-                                                      writesize,
-                                                      compressed,
-                                                      0, 30);
-                               worklen = worklen2;
-                               if (compresult == BZ_OK)
-                                       compresult = 1;
-                               else
-                                       compresult = 0;
-                       }
 
-#endif /* HAVE_BZLIB */
-#ifdef HAVE_LZO
-                       if (zipflag == COMPRESS_LZO) {
-                               lzo_uint worklen2 = worklen;
-                               compresult = lzo1x_1_compress((char *)slp->tblock[0],writesize,
-                                                              comp_buf->buf,
-                                                             &worklen2,
-                                                              LZO_WorkMem);
-                               worklen = worklen2;
-                               if (compresult == LZO_E_OK)
-                                       compresult = 1;
-                               else
-                                       compresult = 0;
-                       }
-#endif /* HAVE_LZO */
+                       // tapebuf: compressed, flags, length
+                       compresult = transformation->compress(transformation, comp_buf,
+                                       &worklen, slp->tblock[0], writesize);
+
                        if (compresult && worklen <= ((unsigned long)writesize - 16)) {
                                /* write the compressed buffer */
                                comp_buf->length = worklen;
@@ -1289,48 +1270,17 @@ doslave(int cmd,
                }
                /* compress the remaining blocks if we're compressing */
                do_compress = compressed;
-#endif /* HAVE_ZLIB  || HAVE_BZLIB || HAVE_LZO */
+#endif /* HAVE_BLOCK_TRANSFORMATION */
 
-               if (sigsetjmp(jmpbuf, 1) == 0) {
-                       ready = 1;
-                       if (!caught)
+               if (sigsetjmp(jmpbuf2, 1) == 0) {
+                       ready2 = 1;
+                       if (!caught2)
                                sigsuspend(&set);
                }
-               ready = 0;
-               caught = 0;
+               ready2 = 0;
+               caught2 = 0;
 
-#ifdef USE_QFA
-               if (gTapeposfd >= 0) {
-                       int i;
-                       int foundone = 0;
-
-                       for (i = 0; (i < ntrec) && !foundone; ++i) {
-                               uspclptr = (union u_spcl *)&slp->tblock[i];
-                               spclptr = &uspclptr->s_spcl;
-                               if ((spclptr->c_magic == NFS_MAGIC) && 
-                                                       (spclptr->c_type == TS_INODE) &&
-                                                       (spclptr->c_date == gThisDumpDate) &&
-                                                       !(spclptr->c_dinode.di_mode & S_IFDIR) &&
-                                                       !(spclptr->c_flags & DR_EXTATTRIBUTES)
-                                               ) {
-                                       foundone = 1;
-                                       /* if (cntntrecs >= maxntrecs) {         only write every maxntrecs amount of data */
-                                               cntntrecs = 0;
-                                               if (gtperr == 0) 
-                                                       gtperr = GetTapePos(&curtapepos);
-                                               /* if an error occured previously don't
-                                                * try again */
-                                               if (gtperr == 0) {
-#ifdef DEBUG_QFA
-                                                       msg("inode %ld at tapepos %ld\n", spclptr->c_inumber, curtapepos);
-#endif
-                                                       gtperr = MkTapeString(spclptr, curtapepos);
-                                               }
-                                       /* } */
-                               }
-                       }
-               }
-#endif /* USE_QFA */
+               indexer->updateQfa(&qfa_state);
                                                
                while (eot_count < 10 && size < bufsize) {
 #ifdef RDUMP
@@ -1383,12 +1333,14 @@ doslave(int cmd,
                 * Signal the next slave to go.
                 */
                (void) kill(nextslave, SIGUSR2);
-#ifdef USE_QFA
-               if (gTapeposfd >= 0) {
-                       cntntrecs += ntrec;
-               }
-#endif /* USE_QFA */
+
+               indexer->updateQfaState(&qfa_state);
        }
+
+#ifdef HAVE_BLOCK_TRANSFORMATION
+       transformation->endDiskIOProcess(transformation);
+#endif /* HAVE_BLOCK_TRANSFORMATION */
+
        if (nread != 0)
                quit("error reading command pipe: %s\n", strerror(errno));
 }
@@ -1445,64 +1397,3 @@ SetLogicalPos(void)
        return err;
 }
 */
-
-#ifdef USE_QFA
-#define LSEEK_GET_TAPEPOS      10
-#define LSEEK_GO2_TAPEPOS      11
-/*
- * read the current tape position
- */
-static int
-GetTapePos(long long *pos)
-{
-       int err = 0;
-
-#ifdef RDUMP
-       if (host) {
-               *pos = (long long) rmtseek((OFF_T)0, (int)LSEEK_GET_TAPEPOS);
-               err = *pos < 0;
-       }
-       else 
-#endif
-       {
-       if (magtapeout) {
-               long mtpos;
-               *pos = 0;
-               err = (ioctl(tapefd, MTIOCPOS, &mtpos) < 0);
-               *pos = (long long)mtpos;
-       }
-       else {
-               *pos = LSEEK(tapefd, 0, SEEK_CUR);
-               err = (*pos < 0);
-       }
-       }
-       if (err) {
-               err = errno;
-               msg("[%ld] error: %d (getting tapepos: %lld)\n", getpid(), 
-                       err, *pos);
-               return err;
-       }
-       return err;
-}
-
-static int 
-MkTapeString(struct s_spcl *spclptr, long long curtapepos)
-{
-       int     err = 0;
-
-#ifdef DEBUG_QFA
-       msg("inode %ld at tapepos %lld\n", spclptr->c_inumber, curtapepos);
-#endif
-
-       snprintf(gTps, sizeof(gTps), "%ld\t%d\t%lld\n", 
-                (unsigned long)spclptr->c_inumber, 
-                tapeno, 
-                curtapepos);
-       gTps[sizeof(gTps) - 1] = '\0';
-       if (write(gTapeposfd, gTps, strlen(gTps)) != (ssize_t)strlen(gTps)) {
-               err = errno;
-       warn("error writing tapepos file. (error %d)\n", errno);
-       }
-       return err;
-}
-#endif /* USE_QFA */