HBASE-25824 IntegrationTestLoadCommonCrawl (#3208)

* HBASE-25824 IntegrationTestLoadCommonCrawl

This integration test loads successful resource retrieval records from
the Common Crawl (https://commoncrawl.org/) public dataset into an HBase
table and writes records that can be used to later verify the presence
and integrity of those records.

Run like:

  ./bin/hbase org.apache.hadoop.hbase.test.IntegrationTestLoadCommonCrawl \
    -Dfs.s3n.awsAccessKeyId=<AWS access key> \
    -Dfs.s3n.awsSecretAccessKey=<AWS secret key> \
    /path/to/test-CC-MAIN-2021-10-warc.paths.gz \
    /path/to/tmp/warc-loader-output

Access to the Common Crawl dataset in S3 is made available to anyone by
Amazon AWS, but Hadoop's S3N filesystem still requires valid access
credentials to initialize.

The input path can either specify a directory or a file. The file may
optionally be compressed with gzip. If a directory, the loader expects
the directory to contain one or more WARC files from the Common Crawl
dataset. If a file, the loader expects a list of Hadoop S3N URIs which
point to S3 locations for one or more WARC files from the Common Crawl
dataset, one URI per line. Lines should be terminated with the UNIX line
terminator.

Included in hbase-it/src/test/resources/CC-MAIN-2021-10-warc.paths.gz
is a list of all WARC files comprising the Q1 2021 crawl archive. There
are 64,000 WARC files in this data set, each containing ~1GB of gzipped
data. The WARC files contain several record types, such as metadata,
request, and response, but we only load the response record types. If
the HBase table schema does not specify compression (by default) there
is roughly a 10x expansion. Loading the full crawl archive results in a
table approximately 640 TB in size.

The hadoop-aws jar will be needed at runtime to instantiate the S3N
filesystem. Use the -files ToolRunner argument to add it.

You can also split the Loader and Verify stages:

Load with:

  ./bin/hbase 'org.apache.hadoop.hbase.test.IntegrationTestLoadCommonCrawl$Loader' \
    -files /path/to/hadoop-aws.jar \
    -Dfs.s3n.awsAccessKeyId=<AWS access key> \
    -Dfs.s3n.awsSecretAccessKey=<AWS secret key> \
    /path/to/test-CC-MAIN-2021-10-warc.paths.gz \
    /path/to/tmp/warc-loader-output

Verify with:

  ./bin/hbase 'org.apache.hadoop.hbase.test.IntegrationTestLoadCommonCrawl$Verify' \
    /path/to/tmp/warc-loader-output

Signed-off-by: Michael Stack <stack@apache.org>
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadCommonCrawl.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadCommonCrawl.java
new file mode 100644
index 0000000..64a9540
--- /dev/null
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadCommonCrawl.java
@@ -0,0 +1,838 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.test;
+
+import java.io.BufferedReader;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.zip.GZIPInputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.IntegrationTestBase;
+import org.apache.hadoop.hbase.IntegrationTestingUtility;
+import org.apache.hadoop.hbase.MasterNotRunningException;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.hbase.test.util.CRC64;
+import org.apache.hadoop.hbase.test.util.warc.WARCInputFormat;
+import org.apache.hadoop.hbase.test.util.warc.WARCRecord;
+import org.apache.hadoop.hbase.test.util.warc.WARCWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.RegionSplitter;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
+
+/**
+ * This integration test loads successful resource retrieval records from the Common Crawl
+ * (https://commoncrawl.org/) public dataset into an HBase table and writes records that can be
+ * used to later verify the presence and integrity of those records.
+ * <p>
+ * Run like:
+ * <blockquote>
+ * ./bin/hbase org.apache.hadoop.hbase.test.IntegrationTestLoadCommonCrawl \<br>
+ * &nbsp;&nbsp; -Dfs.s3n.awsAccessKeyId=&lt;AWS access key&gt; \<br>
+ * &nbsp;&nbsp; -Dfs.s3n.awsSecretAccessKey=&lt;AWS secret key&gt; \<br>
+ * &nbsp;&nbsp; /path/to/test-CC-MAIN-2021-10-warc.paths.gz \<br>
+ * &nbsp;&nbsp; /path/to/tmp/warc-loader-output
+ * </blockquote>
+ * <p>
+ * Access to the Common Crawl dataset in S3 is made available to anyone by Amazon AWS, but
+ * Hadoop's S3N filesystem still requires valid access credentials to initialize.
+ * <p>
+ * The input path can either specify a directory or a file. The file may optionally be
+ * compressed with gzip. If a directory, the loader expects the directory to contain one or more
+ * WARC files from the Common Crawl dataset. If a file, the loader expects a list of Hadoop S3N
+ * URIs which point to S3 locations for one or more WARC files from the Common Crawl dataset,
+ * one URI per line. Lines should be terminated with the UNIX line terminator.
+ * <p>
+ * Included in hbase-it/src/test/resources/CC-MAIN-2021-10-warc.paths.gz is a list of all WARC
+ * files comprising the Q1 2021 crawl archive. There are 64,000 WARC files in this data set, each
+ * containing ~1GB of gzipped data. The WARC files contain several record types, such as metadata,
+ * request, and response, but we only load the response record types. If the HBase table schema
+ * does not specify compression (by default) there is roughly a 10x expansion. Loading the full
+ * crawl archive results in a table approximately 640 TB in size.
+ * <p>
+ * You can also split the Loader and Verify stages:
+ * <p>
+ * Load with:
+ * <blockquote>
+ * ./bin/hbase 'org.apache.hadoop.hbase.test.IntegrationTestLoadCommonCrawl$Loader' \<br>
+ * &nbsp;&nbsp; -files /path/to/hadoop-aws.jar \<br>
+ * &nbsp;&nbsp; -Dfs.s3n.awsAccessKeyId=&lt;AWS access key&gt; \<br>
+ * &nbsp;&nbsp; -Dfs.s3n.awsSecretAccessKey=&lt;AWS secret key&gt; \<br>
+ * &nbsp;&nbsp; /path/to/test-CC-MAIN-2021-10-warc.paths.gz \<br>
+ * &nbsp;&nbsp; /path/to/tmp/warc-loader-output
+ * </blockquote>
+ * <p>
+ * Note: The hadoop-aws jar will be needed at runtime to instantiate the S3N filesystem. Use
+ * the <tt>-files</tt> ToolRunner argument to add it.
+ * <p>
+ * Verify with:
+ * <blockquote>
+ * ./bin/hbase 'org.apache.hadoop.hbase.test.IntegrationTestLoadCommonCrawl$Verify' \<br>
+ * &nbsp;&nbsp; /path/to/tmp/warc-loader-output
+ * </blockquote>
+ * <p>
+ */
+public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
+
+  private static final Logger LOG = LoggerFactory.getLogger(IntegrationTestLoadCommonCrawl.class);
+
+  protected static String TABLE_NAME_KEY = "IntegrationTestLoadCommonCrawl.table";
+  protected static String DEFAULT_TABLE_NAME = "IntegrationTestLoadCommonCrawl";
+
+  protected static byte[] CONTENT_FAMILY_NAME = Bytes.toBytes("c");
+  protected static byte[] INFO_FAMILY_NAME = Bytes.toBytes("i");
+  protected static byte[] CONTENT_QUALIFIER = HConstants.EMPTY_BYTE_ARRAY;
+  protected static byte[] CONTENT_LENGTH_QUALIFIER = Bytes.toBytes("l");
+  protected static byte[] CONTENT_TYPE_QUALIFIER = Bytes.toBytes("t");
+  protected static byte[] CRC_QUALIFIER = Bytes.toBytes("c");
+  protected static byte[] DATE_QUALIFIER = Bytes.toBytes("d");
+  protected static byte[] IP_ADDRESS_QUALIFIER = Bytes.toBytes("a");
+  protected static byte[] RECORD_ID_QUALIFIER = Bytes.toBytes("r");
+  protected static byte[] TARGET_URI_QUALIFIER = Bytes.toBytes("u");
+
+  public static enum Counts {
+    REFERENCED, UNREFERENCED, CORRUPT
+  }
+
+  Path warcFileInputDir = null;
+  Path outputDir = null;
+  String[] args;
+
+  protected int runLoader(Path warcFileInputDir, Path outputDir) throws Exception {
+    Loader loader = new Loader();
+    loader.setConf(conf);
+    return loader.run(warcFileInputDir, outputDir);
+  }
+
+  protected int runVerify(Path inputDir) throws Exception {
+    Verify verify = new Verify();
+    verify.setConf(conf);
+    return verify.run(inputDir);
+  }
+
+  @Override
+  public int run(String[] args) {
+    if (args.length > 0) {
+      warcFileInputDir = new Path(args[0]);
+      if (args.length > 1) {
+        outputDir = new Path(args[1]);
+      }
+    }
+    try {
+      if (warcFileInputDir == null) {
+        throw new IllegalArgumentException("WARC input file or directory not specified");
+      }
+      if (outputDir == null) {
+        throw new IllegalArgumentException("Output directory not specified");
+      }
+      int res = runLoader(warcFileInputDir, outputDir);
+      if (res != 0) {
+        LOG.error("Loader failed");
+        return -1;
+      }
+      res = runVerify(outputDir);
+    } catch (Exception e) {
+      LOG.error("Tool failed with exception", e);
+      return -1;
+    }
+    return 0;
+  }
+
+  @Override
+  protected void processOptions(CommandLine cmd) {
+    processBaseOptions(cmd);
+    args = cmd.getArgs();
+  }
+
+  @Override
+  public void setUpCluster() throws Exception {
+    util = getTestingUtil(getConf());
+    boolean isDistributed = util.isDistributedCluster();
+    util.initializeCluster(isDistributed ? 1 : 3);
+    if (!isDistributed) {
+      util.startMiniMapReduceCluster();
+    }
+    this.setConf(util.getConfiguration());
+  }
+
+  @Override
+  public void cleanUpCluster() throws Exception {
+    super.cleanUpCluster();
+    if (util.isDistributedCluster()) {
+      util.shutdownMiniMapReduceCluster();
+    }
+  }
+
+  static TableName getTablename(Configuration c) {
+    return TableName.valueOf(c.get(TABLE_NAME_KEY, DEFAULT_TABLE_NAME));
+  }
+
+  @Override
+  public TableName getTablename() {
+    return getTablename(getConf());
+  }
+
+  @Override
+  protected Set<String> getColumnFamilies() {
+    Set<String> families = new HashSet<>();
+    families.add(Bytes.toString(CONTENT_FAMILY_NAME));
+    families.add(Bytes.toString(INFO_FAMILY_NAME));
+    return families;
+  }
+
+  @Override
+  public int runTestFromCommandLine() throws Exception {
+    return ToolRunner.run(getConf(), this, args);
+  }
+
+  public static void main(String[] args) throws Exception {
+    Configuration conf = HBaseConfiguration.create();
+    IntegrationTestingUtility.setUseDistributedCluster(conf);
+    int ret = ToolRunner.run(conf, new IntegrationTestLoadCommonCrawl(), args);
+    System.exit(ret);
+  }
+
+  public static class HBaseKeyWritable implements Writable {
+
+    private byte[] row;
+    private int rowOffset;
+    private int rowLength;
+    private byte[] family;
+    private int familyOffset;
+    private int familyLength;
+    private byte[] qualifier;
+    private int qualifierOffset;
+    private int qualifierLength;
+    private long ts;
+
+    public HBaseKeyWritable() { }
+
+    public HBaseKeyWritable(byte[] row, int rowOffset, int rowLength,
+        byte[] family, int familyOffset, int familyLength,
+        byte[] qualifier, int qualifierOffset, int qualifierLength, long ts) {
+      this.row = row;
+      this.rowOffset = rowOffset;
+      this.rowLength = rowLength;
+      this.family = family;
+      this.familyOffset = familyOffset;
+      this.familyLength = familyLength;
+      this.qualifier = qualifier;
+      this.qualifierOffset = qualifierOffset;
+      this.qualifierLength = qualifierLength;
+      this.ts = ts;
+    }
+
+    public HBaseKeyWritable(byte[] row, byte[] family, byte[] qualifier, long ts) {
+      this(row, 0, row.length,
+        family, 0, family.length,
+        qualifier, 0, qualifier != null ? qualifier.length : 0, ts);
+    }
+
+    public HBaseKeyWritable(byte[] row, byte[] family, long ts) {
+      this(row, family, HConstants.EMPTY_BYTE_ARRAY, ts);
+    }
+
+    public HBaseKeyWritable(Cell cell) {
+      this(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
+        cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
+        cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
+        cell.getTimestamp());
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      this.row = Bytes.toBytes(in.readUTF());
+      this.rowOffset = 0;
+      this.rowLength = row.length;
+      this.family = Bytes.toBytes(in.readUTF());
+      this.familyOffset = 0;
+      this.familyLength = family.length;
+      this.qualifier = Bytes.toBytes(in.readUTF());
+      this.qualifierOffset = 0;
+      this.qualifierLength = qualifier.length;
+      this.ts = in.readLong();
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      out.writeUTF(new String(row, rowOffset, rowLength, StandardCharsets.UTF_8));
+      out.writeUTF(new String(family, familyOffset, familyLength, StandardCharsets.UTF_8));
+      if (qualifier != null) {
+        out.writeUTF(new String(qualifier, qualifierOffset, qualifierLength,
+          StandardCharsets.UTF_8));
+      } else {
+        out.writeUTF("");
+      }
+      out.writeLong(ts);
+    }
+
+    public byte[] getRowArray() {
+      return row;
+    }
+
+    public void setRow(byte[] row) {
+      this.row = row;
+    }
+
+    public int getRowOffset() {
+      return rowOffset;
+    }
+
+    public void setRowOffset(int rowOffset) {
+      this.rowOffset = rowOffset;
+    }
+
+    public int getRowLength() {
+      return rowLength;
+    }
+
+    public void setRowLength(int rowLength) {
+      this.rowLength = rowLength;
+    }
+
+    public byte[] getFamilyArray() {
+      return family;
+    }
+
+    public void setFamily(byte[] family) {
+      this.family = family;
+    }
+
+    public int getFamilyOffset() {
+      return familyOffset;
+    }
+
+    public void setFamilyOffset(int familyOffset) {
+      this.familyOffset = familyOffset;
+    }
+
+    public int getFamilyLength() {
+      return familyLength;
+    }
+
+    public void setFamilyLength(int familyLength) {
+      this.familyLength = familyLength;
+    }
+
+    public byte[] getQualifierArray() {
+      return qualifier;
+    }
+
+    public void setQualifier(byte[] qualifier) {
+      this.qualifier = qualifier;
+    }
+
+    public int getQualifierOffset() {
+      return qualifierOffset;
+    }
+
+    public void setQualifierOffset(int qualifierOffset) {
+      this.qualifierOffset = qualifierOffset;
+    }
+
+    public int getQualifierLength() {
+      return qualifierLength;
+    }
+
+    public void setQualifierLength(int qualifierLength) {
+      this.qualifierLength = qualifierLength;
+    }
+
+    public long getTimestamp() {
+      return ts;
+    }
+
+    public void setTimestamp(long ts) {
+      this.ts = ts;
+    }
+  }
+
+  public static class Loader extends Configured implements Tool {
+
+    private static final Logger LOG = LoggerFactory.getLogger(Loader.class);
+    private static final String USAGE = "Loader <warInputDir | warFileList> <outputDir>";
+
+    void createSchema(TableName tableName) throws IOException {
+
+      try (Connection conn = ConnectionFactory.createConnection(getConf());
+           Admin admin = conn.getAdmin()) {
+        if (!admin.tableExists(tableName)) {
+
+          ColumnFamilyDescriptorBuilder contentFamilyBuilder =
+            ColumnFamilyDescriptorBuilder.newBuilder(CONTENT_FAMILY_NAME)
+              .setDataBlockEncoding(DataBlockEncoding.NONE)
+              .setBloomFilterType(BloomType.ROW)
+              .setMaxVersions(1000)
+              .setBlocksize(256 * 1024)
+              ;
+
+          ColumnFamilyDescriptorBuilder infoFamilyBuilder =
+            ColumnFamilyDescriptorBuilder.newBuilder(INFO_FAMILY_NAME)
+              .setDataBlockEncoding(DataBlockEncoding.NONE)
+              .setBloomFilterType(BloomType.ROWCOL)
+              .setMaxVersions(1000)
+              .setBlocksize(8 * 1024)
+              ;
+
+          Set<ColumnFamilyDescriptor> families = new HashSet<>();
+          families.add(contentFamilyBuilder.build());
+          families.add(infoFamilyBuilder.build());
+
+          TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName)
+              .setColumnFamilies(families)
+              .build();
+
+          if (getConf().getBoolean(HBaseTestingUtility.PRESPLIT_TEST_TABLE_KEY,
+            HBaseTestingUtility.PRESPLIT_TEST_TABLE)) {
+            int numberOfServers = admin.getRegionServers().size();
+            if (numberOfServers == 0) {
+              throw new IllegalStateException("No live regionservers");
+            }
+            int regionsPerServer = getConf().getInt(HBaseTestingUtility.REGIONS_PER_SERVER_KEY,
+              HBaseTestingUtility.DEFAULT_REGIONS_PER_SERVER);
+            int totalNumberOfRegions = numberOfServers * regionsPerServer;
+            LOG.info("Creating test table: " + tableDescriptor);
+            LOG.info("Number of live regionservers: " + numberOfServers + ", " +
+                "pre-splitting table into " + totalNumberOfRegions + " regions " +
+                "(default regions per server: " + regionsPerServer + ")");
+            byte[][] splits = new RegionSplitter.UniformSplit().split(totalNumberOfRegions);
+            admin.createTable(tableDescriptor, splits);
+          } else {
+            LOG.info("Creating test table: " + tableDescriptor);
+            admin.createTable(tableDescriptor);
+          }
+        }
+      } catch (MasterNotRunningException e) {
+        LOG.error("Master not running", e);
+        throw new IOException(e);
+      }
+    }
+
+    int run(Path warcFileInput, Path outputDir)
+        throws IOException, ClassNotFoundException, InterruptedException {
+
+      createSchema(getTablename(getConf()));
+
+      Job job = Job.getInstance(getConf());
+      job.setJobName(Loader.class.getName());
+      job.setNumReduceTasks(0);
+      job.setJarByClass(getClass());
+      job.setMapperClass(LoaderMapper.class);
+      job.setInputFormatClass(WARCInputFormat.class);
+      FileSystem fs = FileSystem.get(warcFileInput.toUri(), getConf());
+      if (fs.getFileStatus(warcFileInput).isDirectory()) {
+        LOG.info("Using directory as WARC input path: " + warcFileInput);
+        FileInputFormat.setInputPaths(job, warcFileInput);
+      } else {
+        LOG.info("Getting WARC input paths from file: " + warcFileInput);
+        List<Path> paths = new LinkedList<Path>();
+        try (FSDataInputStream is = fs.open(warcFileInput)) {
+          InputStreamReader reader;
+          if (warcFileInput.getName().toLowerCase().endsWith(".gz")) {
+            reader = new InputStreamReader(new GZIPInputStream(is));
+          } else {
+            reader = new InputStreamReader(is);
+          }
+          try (BufferedReader br = new BufferedReader(reader)) {
+            String line;
+            while ((line = br.readLine()) != null) {
+              paths.add(new Path(line));
+            }
+          }
+        }
+        LOG.info("Read " + paths.size() + " WARC input paths from " + warcFileInput);
+        FileInputFormat.setInputPaths(job, paths.toArray(new Path[paths.size()]));
+      }
+      job.setOutputFormatClass(SequenceFileOutputFormat.class);
+      SequenceFileOutputFormat.setOutputPath(job, outputDir);
+      SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK);
+      job.setOutputKeyClass(HBaseKeyWritable.class);
+      job.setOutputValueClass(BytesWritable.class);
+      TableMapReduceUtil.addDependencyJars(job);
+
+      LOG.info("Submitting job." +
+        " This will take time proportional to the number of input files, please be patient.");
+      boolean success = job.waitForCompletion(true);
+      if (!success) {
+        LOG.error("Failure during job " + job.getJobID());
+      }
+      return success ? 0 : 1;
+    }
+
+    @Override
+    public int run(String[] args) throws Exception {
+      if (args.length < 2) {
+        System.err.println(USAGE);
+        return 1;
+      }
+      try {
+        Path warcFileInput = new Path(args[0]);
+        Path outputDir = new Path(args[1]);
+        return run(warcFileInput, outputDir);
+      } catch (NumberFormatException e) {
+        System.err.println("Parsing loader arguments failed: " + e.getMessage());
+        System.err.println(USAGE);
+        return 1;
+      }
+    }
+
+    public static void main(String[] args) throws Exception {
+      System.exit(ToolRunner.run(HBaseConfiguration.create(), new Loader(), args));
+    }
+
+    public static class LoaderMapper
+        extends Mapper<LongWritable, WARCWritable, HBaseKeyWritable, BytesWritable> {
+
+      Configuration conf;
+      Connection conn;
+      Table table;
+
+      @Override
+      protected void setup(Context context) throws IOException, InterruptedException {
+        conn = ConnectionFactory.createConnection(context.getConfiguration());
+        table = conn.getTable(getTablename(conn.getConfiguration()));
+      }
+
+      @Override
+      protected void cleanup(Context context) throws IOException, InterruptedException {
+        try {
+          table.close();
+        } catch (Exception e) {
+          LOG.warn("Exception closing Table", e);
+        }
+        try {
+          conn.close();
+        } catch (Exception e) {
+          LOG.warn("Exception closing Connection", e);
+        }
+      }
+
+      @Override
+      protected void map(LongWritable key, WARCWritable value, Context output)
+          throws IOException, InterruptedException {
+        WARCRecord.Header warcHeader = value.getRecord().getHeader();
+        String recordID = warcHeader.getRecordID();
+        String targetURI = warcHeader.getTargetURI();
+        if (warcHeader.getRecordType().equals("response") && targetURI != null) {
+          String contentType = warcHeader.getField("WARC-Identified-Payload-Type");
+          if (contentType != null) {
+            LOG.debug("Processing record id=" + recordID + ", targetURI=\"" + targetURI + "\"");
+            long now = System.currentTimeMillis();
+
+            // Make row key
+
+            byte[] rowKey;
+            try {
+              rowKey = rowKeyFromTargetURI(targetURI);
+            } catch (IllegalArgumentException e) {
+              LOG.debug("Could not make a row key for record " + recordID + ", ignoring", e);
+              return;
+            } catch (URISyntaxException e) {
+              LOG.warn("Could not parse URI \"" + targetURI + "\" for record " + recordID +
+                ", ignoring");
+              return;
+            }
+
+            // Get the content and calculate the CRC64
+
+            byte[] content = value.getRecord().getContent();
+            CRC64 crc = new CRC64();
+            crc.update(content);
+            long crc64 = crc.getValue();
+
+            // Store to HBase
+
+            Put put = new Put(rowKey);
+            put.addColumn(CONTENT_FAMILY_NAME, CONTENT_QUALIFIER, now, content);
+            put.addColumn(INFO_FAMILY_NAME, CONTENT_LENGTH_QUALIFIER, now,
+              Bytes.toBytes(content.length));
+            put.addColumn(INFO_FAMILY_NAME, CONTENT_TYPE_QUALIFIER, now,
+              Bytes.toBytes(contentType));
+            put.addColumn(INFO_FAMILY_NAME, CRC_QUALIFIER, now, Bytes.toBytes(crc64));
+            put.addColumn(INFO_FAMILY_NAME, RECORD_ID_QUALIFIER, now, Bytes.toBytes(recordID));
+            put.addColumn(INFO_FAMILY_NAME, TARGET_URI_QUALIFIER, now, Bytes.toBytes(targetURI));
+            put.addColumn(INFO_FAMILY_NAME, DATE_QUALIFIER, now,
+              Bytes.toBytes(warcHeader.getDateString()));
+            String ipAddr = warcHeader.getField("WARC-IP-Address");
+            if (ipAddr != null) {
+              put.addColumn(INFO_FAMILY_NAME, IP_ADDRESS_QUALIFIER, now, Bytes.toBytes(ipAddr));
+            }
+            table.put(put);
+
+            // Write records out for later verification, one per HBase field except for the
+            // content record, which will be verified by CRC64.
+
+            output.write(new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, CRC_QUALIFIER, now),
+              new BytesWritable(Bytes.toBytes(crc64)));
+            output.write(new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, CONTENT_LENGTH_QUALIFIER,
+              now), new BytesWritable(Bytes.toBytes(content.length)));
+            output.write(new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, CONTENT_TYPE_QUALIFIER,
+              now), new BytesWritable(Bytes.toBytes(contentType)));
+            output.write(new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, RECORD_ID_QUALIFIER,
+              now), new BytesWritable(Bytes.toBytes(recordID)));
+            output.write(new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, TARGET_URI_QUALIFIER,
+              now), new BytesWritable(Bytes.toBytes(targetURI)));
+            output.write(new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, DATE_QUALIFIER, now),
+              new BytesWritable(Bytes.toBytes(warcHeader.getDateString())));
+            if (ipAddr != null) {
+              output.write(new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, IP_ADDRESS_QUALIFIER,
+                now), new BytesWritable(Bytes.toBytes(ipAddr)));
+            }
+          }
+        }
+      }
+
+      private byte[] rowKeyFromTargetURI(String targetUri)
+          throws URISyntaxException, IllegalArgumentException {
+        URI uri = new URI(targetUri);
+        StringBuffer sb = new StringBuffer();
+        // Ignore the scheme
+        // Reverse the components of the hostname
+        if (uri.getHost() != null) {
+          String[] hostComponents = uri.getHost().split("\\.");
+          for (int i = hostComponents.length - 1; i >= 0; i--) {
+            sb.append(hostComponents[i]);
+            if (i != 0) {
+              sb.append('.');
+            }
+          }
+        } else {
+          throw new IllegalArgumentException("URI is missing host component");
+        }
+        // Port
+        if (uri.getPort() != -1) {
+          sb.append(':');
+          sb.append(uri.getPort());
+        }
+        if (uri.getRawPath() != null) {
+          sb.append(uri.getRawPath());
+        }
+        if (uri.getRawQuery() != null) {
+          sb.append('?');
+          sb.append(uri.getRawQuery());
+        }
+        if (uri.getRawFragment() != null) {
+          sb.append('#');
+          sb.append(uri.getRawFragment());
+        }
+        // Constrain the key size to the maximum allowed row key length
+        if (sb.length() >  HConstants.MAX_ROW_LENGTH) {
+          sb.setLength(HConstants.MAX_ROW_LENGTH);
+        }
+        return Bytes.toBytes(sb.toString());
+      }
+
+    }
+  }
+
+  public static class OneFilePerMapperSFIF<K, V> extends SequenceFileInputFormat<K, V> {
+    @Override
+    protected boolean isSplitable(JobContext context, Path filename) {
+      return false;
+    }
+  }
+
+  public static class Verify extends Configured implements Tool {
+
+    public static final Logger LOG = LoggerFactory.getLogger(Verify.class);
+    public static final String USAGE = "Verify <inputDir>";
+
+    int run(Path inputDir) throws IOException, ClassNotFoundException, InterruptedException {
+      Job job = Job.getInstance(getConf());
+      job.setJobName(Verify.class.getName());
+      job.setJarByClass(getClass());
+      job.setMapperClass(VerifyMapper.class);
+      job.setInputFormatClass(OneFilePerMapperSFIF.class);
+      FileInputFormat.setInputPaths(job, inputDir);
+      job.setOutputFormatClass(NullOutputFormat.class);
+      job.setOutputKeyClass(NullWritable.class);
+      job.setOutputValueClass(NullWritable.class);
+      TableMapReduceUtil.addDependencyJars(job);
+      boolean success = job.waitForCompletion(true);
+      if (!success) {
+        LOG.error("Failure during job " + job.getJobID());
+      }
+      Counters counters = job.getCounters();
+      for (Counts c: Counts.values()) {
+        LOG.info(c + ": " + counters.findCounter(c).getValue());
+      }
+      return success ? 0 : 1;
+    }
+
+    @Override
+    public int run(String[] args) throws Exception {
+      if (args.length < 2) {
+        System.err.println(USAGE);
+        return 1;
+      }
+      Path loaderOutput = new Path(args[0]);
+      return run(loaderOutput);
+    }
+
+    public static void main(String[] args) throws Exception {
+      System.exit(ToolRunner.run(HBaseConfiguration.create(), new Verify(), args));
+    }
+
+    public static class VerifyMapper
+        extends Mapper<HBaseKeyWritable, BytesWritable, NullWritable, NullWritable> {
+
+      Connection conn;
+      Table table;
+
+      @Override
+      protected void setup(Context context) throws IOException, InterruptedException {
+        conn = ConnectionFactory.createConnection(context.getConfiguration());
+        table = conn.getTable(getTablename(conn.getConfiguration()));
+      }
+
+      @Override
+      protected void cleanup(Context context) throws IOException ,InterruptedException {
+        table.close();
+        conn.close();
+      }
+
+      @Override
+      protected void map(HBaseKeyWritable key, BytesWritable value, Context output)
+          throws IOException, InterruptedException {
+
+        byte[] row = Bytes.copy(key.getRowArray(), key.getRowOffset(), key.getRowLength());
+        byte[] family = Bytes.copy(key.getFamilyArray(), key.getFamilyOffset(),
+          key.getFamilyLength());
+        byte[] qualifier = Bytes.copy(key.getQualifierArray(), key.getQualifierOffset(),
+          key.getQualifierLength());
+        long ts = key.getTimestamp();
+
+        if (Bytes.equals(INFO_FAMILY_NAME, family) &&
+            Bytes.equals(CRC_QUALIFIER, qualifier)) {
+
+          long expectedCRC64 = Bytes.toLong(value.getBytes(), 0, value.getLength());
+
+          Result result =
+            table.get(new Get(row)
+              .addColumn(CONTENT_FAMILY_NAME, CONTENT_QUALIFIER)
+              .addColumn(INFO_FAMILY_NAME, CRC_QUALIFIER)
+              .setTimestamp(ts));
+
+          byte[] content = result.getValue(CONTENT_FAMILY_NAME, CONTENT_QUALIFIER);
+          if (content == null) {
+            LOG.info("Row " + Bytes.toStringBinary(row) + ": missing content");
+            output.getCounter(Counts.UNREFERENCED).increment(1);
+            return;
+          } else {
+            CRC64 crc = new CRC64();
+            crc.update(content);
+            if (crc.getValue() != expectedCRC64) {
+              LOG.info("Row " + Bytes.toStringBinary(row) + ": corrupt content");
+              output.getCounter(Counts.CORRUPT).increment(1);
+              return;
+            }
+          }
+          byte[] crc = result.getValue(INFO_FAMILY_NAME, CRC_QUALIFIER);
+          if (crc == null) {
+            LOG.info("Row " + Bytes.toStringBinary(row) + ": missing i:c");
+            output.getCounter(Counts.UNREFERENCED).increment(1);
+            return;
+          }
+          if (Bytes.toLong(crc) != expectedCRC64) {
+            LOG.info("Row " + Bytes.toStringBinary(row) + ": i:c mismatch");
+            output.getCounter(Counts.CORRUPT).increment(1);
+            return;
+          }
+
+        } else {
+
+          Result result =
+            table.get(new Get(row)
+              .addColumn(family, qualifier)
+              .setTimestamp(ts));
+
+          byte[] bytes = result.getValue(family, qualifier);
+          if (bytes == null) {
+            LOG.info("Row " + Bytes.toStringBinary(row) + ": missing " +
+              Bytes.toStringBinary(family) + ":" + Bytes.toStringBinary(qualifier));
+            output.getCounter(Counts.UNREFERENCED).increment(1);
+            return;
+          }
+          if (!Bytes.equals(bytes, 0, bytes.length, value.getBytes(), 0, value.getLength())) {
+            LOG.info("Row " + Bytes.toStringBinary(row) + ": " +
+              Bytes.toStringBinary(family) + ":" + Bytes.toStringBinary(qualifier) +
+              " mismatch");
+            output.getCounter(Counts.CORRUPT).increment(1);
+            return;
+          }
+
+        }
+
+        output.getCounter(Counts.REFERENCED).increment(1);
+      }
+
+    }
+
+  }
+
+}
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/util/CRC64.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/util/CRC64.java
new file mode 100644
index 0000000..4b4eacb
--- /dev/null
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/util/CRC64.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.test.util;
+
+// Cribbed from
+// hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/CRC64.java
+
+public class CRC64 {
+  private static final long POLY = 0x9a6c9329ac4bc9b5L;
+  private static final int TABLE_LENGTH = 256;
+  private static final long[] TABLE = new long[TABLE_LENGTH];
+  static {
+  /* Initialize a table constructed from POLY  */
+    for (int n = 0; n < TABLE_LENGTH; ++n) {
+      long crc = n;
+      for (int i = 0; i < 8; ++i) {
+        if ((crc & 1) == 1) {
+          crc = (crc >>> 1) ^ POLY;
+        } else {
+          crc >>>= 1;
+        }
+      }
+      TABLE[n] = crc;
+    }
+  }
+
+  private long value = -1;
+
+  public void reset() {
+    value = -1;
+  }
+
+  public void update(byte[] input, int off, int len) {
+    for (int i = off; i < off+len; i++) {
+      value = TABLE[(input[i] ^ (int) value) & 0xFF] ^ (value >>> 8);
+    }
+  }
+
+  public void update(byte[] input) {
+    update(input, 0, input.length);
+  }
+
+  public long getValue() {
+    // Return the compliment of 'value' to complete the calculation.
+    return ~value;
+  }
+
+}
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/util/warc/WARCFileReader.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/util/warc/WARCFileReader.java
new file mode 100644
index 0000000..4e7ee5a
--- /dev/null
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/util/warc/WARCFileReader.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/*
+ * The MIT License (MIT)
+ * Copyright (c) 2014 Martin Kleppmann
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ * THE SOFTWARE.
+ */
+
+package org.apache.hadoop.hbase.test.util.warc;
+
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Reads {@link WARCRecord}s from a WARC file, using Hadoop's filesystem APIs. (This means you
+ * can read from HDFS, S3 or any other filesystem supported by Hadoop). This implementation is
+ * not tied to the MapReduce APIs -- that link is provided by the mapred
+ * {@link com.martinkl.warc.mapred.WARCInputFormat} and the mapreduce
+ * {@link com.martinkl.warc.mapreduce.WARCInputFormat}.
+ */
+public class WARCFileReader {
+  private static final Logger logger = LoggerFactory.getLogger(WARCFileReader.class);
+
+  private final long fileSize;
+  private CountingInputStream byteStream = null;
+  private DataInputStream dataStream = null;
+  private long bytesRead = 0, recordsRead = 0;
+
+  /**
+   * Opens a file for reading. If the filename ends in `.gz`, it is automatically decompressed
+   * on the fly.
+   * @param conf The Hadoop configuration.
+   * @param filePath The Hadoop path to the file that should be read.
+   */
+  public WARCFileReader(Configuration conf, Path filePath) throws IOException {
+    FileSystem fs = filePath.getFileSystem(conf);
+    this.fileSize = fs.getFileStatus(filePath).getLen();
+    logger.info("Reading from " + filePath);
+
+    CompressionCodec codec = filePath.getName().endsWith(".gz") ?
+        WARCFileWriter.getGzipCodec(conf) : null;
+    byteStream = new CountingInputStream(new BufferedInputStream(fs.open(filePath)));
+    dataStream = new DataInputStream(codec == null ? byteStream :
+      codec.createInputStream(byteStream));
+  }
+
+  /**
+   * Reads the next record from the file.
+   * @return The record that was read.
+   */
+  public WARCRecord read() throws IOException {
+    WARCRecord record = new WARCRecord(dataStream);
+    recordsRead++;
+    return record;
+  }
+
+  /**
+   * Closes the file. No more reading is possible after the file has been closed.
+   */
+  public void close() throws IOException {
+    if (dataStream != null) {
+      dataStream.close();
+    }
+    byteStream = null;
+    dataStream = null;
+  }
+
+  /**
+   * Returns the number of records that have been read since the file was opened.
+   */
+  public long getRecordsRead() {
+    return recordsRead;
+  }
+
+  /**
+   * Returns the number of bytes that have been read from file since it was opened.
+   * If the file is compressed, this refers to the compressed file size.
+   */
+  public long getBytesRead() {
+    return bytesRead;
+  }
+
+  /**
+   * Returns the proportion of the file that has been read, as a number between 0.0
+   * and 1.0.
+   */
+  public float getProgress() {
+    if (fileSize == 0) {
+      return 1.0f;
+    }
+    return (float) bytesRead / (float) fileSize;
+  }
+
+  private class CountingInputStream extends FilterInputStream {
+    public CountingInputStream(InputStream in) {
+      super(in);
+    }
+
+    @Override
+    public int read() throws IOException {
+      int result = in.read();
+      if (result != -1) {
+        bytesRead++;
+      }
+      return result;
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+      int result = in.read(b, off, len);
+      if (result != -1) {
+        bytesRead += result;
+      }
+      return result;
+    }
+
+    @Override
+    public long skip(long n) throws IOException {
+      long result = in.skip(n);
+      bytesRead += result;
+      return result;
+    }
+  }
+}
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/util/warc/WARCFileWriter.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/util/warc/WARCFileWriter.java
new file mode 100644
index 0000000..5f361cd
--- /dev/null
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/util/warc/WARCFileWriter.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/*
+ * The MIT License (MIT)
+ * Copyright (c) 2014 Martin Kleppmann
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ * THE SOFTWARE.
+ */
+
+package org.apache.hadoop.hbase.test.util.warc;
+
+import java.io.BufferedOutputStream;
+import java.io.DataOutputStream;
+import java.io.FilterOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Writes {@link WARCRecord}s to a WARC file, using Hadoop's filesystem APIs. (This means you
+ * can write to HDFS, S3 or any other filesystem supported by Hadoop). This implementation is
+ * not tied to the MapReduce APIs -- that link is provided by the mapred
+ * {@link com.martinkl.warc.mapred.WARCOutputFormat} and the mapreduce
+ * {@link com.martinkl.warc.mapreduce.WARCOutputFormat}.
+ *
+ * WARCFileWriter keeps track of how much data it has written (optionally gzip-compressed);
+ * when the file becomes larger than some threshold, it is automatically closed and a
+ * new segment is started. A segment number is appended to the filename for that purpose.
+ * The segment number always starts at 00000, and by default a new segment is started when
+ * the file size exceeds 1GB. To change the target size for a segment, you can set the
+ * `warc.output.segment.size` key in the Hadoop configuration to the number of bytes.
+ * (Files may actually be a bit larger than this threshold, since we finish writing the
+ * current record before opening a new file.)
+ */
+public class WARCFileWriter {
+  private static final Logger logger = LoggerFactory.getLogger(WARCFileWriter.class);
+  public static final long DEFAULT_MAX_SEGMENT_SIZE = 1000000000L; // 1 GB
+
+  private final Configuration conf;
+  private final CompressionCodec codec;
+  private final Path workOutputPath;
+  private final Progressable progress;
+  private final String extensionFormat;
+  private final long maxSegmentSize;
+  private long segmentsCreated = 0, segmentsAttempted = 0, bytesWritten = 0;
+  private CountingOutputStream byteStream;
+  private DataOutputStream dataStream;
+
+  /**
+   * Creates a WARC file, and opens it for writing. If a file with the same name already
+   * exists, an attempt number in the filename is incremented until we find a file that
+   * doesn't already exist.
+   *
+   * @param conf The Hadoop configuration.
+   * @param codec If null, the file is uncompressed. If non-null, this compression codec
+   *              will be used. The codec's default file extension is appended to the filename.
+   * @param workOutputPath The directory and filename prefix to which the data should be
+   *                       written. We append a segment number and filename extensions to it.
+   */
+  public WARCFileWriter(Configuration conf, CompressionCodec codec, Path workOutputPath)
+      throws IOException {
+    this(conf, codec, workOutputPath, null);
+  }
+
+  /**
+   * Creates a WARC file, and opens it for writing. If a file with the same name already
+   * exists, it is *overwritten*. Note that this is different behaviour from the other
+   * constructor. Yes, this sucks. It will probably change in a future version.
+   *
+   * @param conf The Hadoop configuration.
+   * @param codec If null, the file is uncompressed. If non-null, this compression codec
+   *              will be used. The codec's default file extension is appended to the filename.
+   * @param workOutputPath The directory and filename prefix to which the data should be
+   *                       written. We append a segment number and filename extensions to it.
+   * @param progress An object used by the mapred API for tracking a task's progress.
+   */
+  public WARCFileWriter(Configuration conf, CompressionCodec codec, Path workOutputPath,
+      Progressable progress) throws IOException {
+    this.conf = conf;
+    this.codec = codec;
+    this.workOutputPath = workOutputPath;
+    this.progress = progress;
+    this.extensionFormat = ".seg-%05d.attempt-%05d.warc" +
+        (codec == null ? "" : codec.getDefaultExtension());
+    this.maxSegmentSize = conf.getLong("warc.output.segment.size", DEFAULT_MAX_SEGMENT_SIZE);
+    createSegment();
+  }
+
+  /**
+   * Instantiates a Hadoop codec for compressing and decompressing Gzip files. This is the
+   * most common compression applied to WARC files.
+   *
+   * @param conf The Hadoop configuration.
+   */
+  public static CompressionCodec getGzipCodec(Configuration conf) {
+    try {
+      return (CompressionCodec) ReflectionUtils.newInstance(
+        conf.getClassByName("org.apache.hadoop.io.compress.GzipCodec")
+          .asSubclass(CompressionCodec.class),
+        conf);
+    } catch (ClassNotFoundException e) {
+      logger.warn("GzipCodec could not be instantiated", e);
+      return null;
+    }
+  }
+
+  /**
+   * Creates an output segment file and sets up the output streams to point at it.
+   * If the file already exists, retries with a different filename. This is a bit nasty --
+   * after all, {@link FileOutputFormat}'s work directory concept is supposed to prevent
+   * filename clashes -- but it looks like Amazon Elastic MapReduce prevents use of per-task
+   * work directories if the output of a job is on S3.
+   *
+   * TODO: Investigate this and find a better solution.
+   */
+  private void createSegment() throws IOException {
+    segmentsAttempted = 0;
+    bytesWritten = 0;
+    boolean success = false;
+
+    while (!success) {
+      Path path = workOutputPath.suffix(String.format(extensionFormat, segmentsCreated,
+        segmentsAttempted));
+      FileSystem fs = path.getFileSystem(conf);
+
+      try {
+        // The o.a.h.mapred OutputFormats overwrite existing files, whereas
+        // the o.a.h.mapreduce OutputFormats don't overwrite. Bizarre...
+        // Here, overwrite if progress != null, i.e. if using mapred API.
+        FSDataOutputStream fsStream = (progress == null) ? fs.create(path, false) :
+          fs.create(path, progress);
+        byteStream = new CountingOutputStream(new BufferedOutputStream(fsStream));
+        dataStream = new DataOutputStream(codec == null ? byteStream :
+          codec.createOutputStream(byteStream));
+        segmentsCreated++;
+        logger.info("Writing to output file: {}", path);
+        success = true;
+
+      } catch (IOException e) {
+        if (e.getMessage().startsWith("File already exists")) {
+          logger.warn("Tried to create file {} but it already exists; retrying.", path);
+          segmentsAttempted++; // retry
+        } else {
+          throw e;
+        }
+      }
+    }
+  }
+
+  /**
+   * Appends a {@link WARCRecord} to the file, in WARC/1.0 format.
+   * @param record The record to be written.
+   */
+  public void write(WARCRecord record) throws IOException {
+    if (bytesWritten > maxSegmentSize) {
+      dataStream.close();
+      createSegment();
+    }
+    record.write(dataStream);
+  }
+
+  /**
+   * Appends a {@link WARCRecord} wrapped in a {@link WARCWritable} to the file.
+   * @param record The wrapper around the record to be written.
+   */
+  public void write(WARCWritable record) throws IOException {
+    if (record.getRecord() != null) {
+      write(record.getRecord());
+    }
+  }
+
+  /**
+   * Flushes any buffered data and closes the file.
+   */
+  public void close() throws IOException {
+    dataStream.close();
+  }
+
+  private class CountingOutputStream extends FilterOutputStream {
+    public CountingOutputStream(OutputStream out) {
+      super(out);
+    }
+
+    @Override
+    public void write(byte[] b, int off, int len) throws IOException {
+      out.write(b, off, len);
+      bytesWritten += len;
+    }
+
+    @Override
+    public void write(int b) throws IOException {
+      out.write(b);
+      bytesWritten++;
+    }
+
+    // Overriding close() because FilterOutputStream's close() method pre-JDK8 has bad behavior:
+    // it silently ignores any exception thrown by flush(). Instead, just close the delegate
+    // stream. It should flush itself if necessary. (Thanks to the Guava project for noticing
+    // this.)
+    @Override
+    public void close() throws IOException {
+      out.close();
+    }
+  }
+
+}
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/util/warc/WARCInputFormat.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/util/warc/WARCInputFormat.java
new file mode 100644
index 0000000..5d5f388
--- /dev/null
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/util/warc/WARCInputFormat.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/*
+ * The MIT License (MIT)
+ * Copyright (c) 2014 Martin Kleppmann
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ * THE SOFTWARE.
+ */
+
+package org.apache.hadoop.hbase.test.util.warc;
+
+import java.io.EOFException;
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+/**
+ * Hadoop InputFormat for mapreduce jobs ('new' API) that want to process data in WARC files.
+ *
+ * Usage:
+ *
+ * ```java
+ * Job job = new Job(getConf());
+ * job.setInputFormatClass(WARCInputFormat.class);
+ * ```
+ *
+ * Mappers should use a key of {@link org.apache.hadoop.io.LongWritable} (which is
+ * 1 for the first record in a file, 2 for the second record, etc.) and a value of
+ * {@link WARCWritable}.
+ */
+public class WARCInputFormat extends FileInputFormat<LongWritable, WARCWritable> {
+
+  /**
+   * Opens a WARC file (possibly compressed) for reading, and returns a RecordReader for
+   * accessing it.
+   */
+  @Override
+  public RecordReader<LongWritable, WARCWritable> createRecordReader(InputSplit split,
+      TaskAttemptContext context)
+          throws IOException, InterruptedException {
+    return new WARCReader();
+  }
+
+  /**
+   * Always returns false, as WARC files cannot be split.
+   */
+  @Override
+  protected boolean isSplitable(JobContext context, Path filename) {
+    return false;
+  }
+
+  private static class WARCReader extends RecordReader<LongWritable, WARCWritable> {
+    private final LongWritable key = new LongWritable();
+    private final WARCWritable value = new WARCWritable();
+    private WARCFileReader reader;
+
+    @Override
+    public void initialize(InputSplit split, TaskAttemptContext context)
+        throws IOException, InterruptedException {
+      reader = new WARCFileReader(context.getConfiguration(), ((FileSplit) split).getPath());
+    }
+
+    @Override
+    public boolean nextKeyValue() throws IOException {
+      try {
+        WARCRecord record = reader.read();
+        key.set(reader.getRecordsRead());
+        value.setRecord(record);
+        return true;
+      } catch (EOFException eof) {
+        return false;
+      }
+    }
+
+    @Override
+    public void close() throws IOException {
+      reader.close();
+    }
+
+    @Override
+    public float getProgress() throws IOException {
+      return reader.getProgress();
+    }
+
+    @Override
+    public LongWritable getCurrentKey() throws IOException, InterruptedException {
+      return key;
+    }
+
+    @Override
+    public WARCWritable getCurrentValue() throws IOException, InterruptedException {
+      return value;
+    }
+  }
+
+}
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/util/warc/WARCOutputFormat.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/util/warc/WARCOutputFormat.java
new file mode 100644
index 0000000..52d505c
--- /dev/null
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/util/warc/WARCOutputFormat.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/*
+ * The MIT License (MIT)
+ * Copyright (c) 2014 Martin Kleppmann
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ * THE SOFTWARE.
+ */
+
+package org.apache.hadoop.hbase.test.util.warc;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+/**
+ * Hadoop OutputFormat for mapreduce jobs ('new' API) that want to write data to WARC files.
+ *
+ * Usage:
+ *
+ * ```java
+ * Job job = new Job(getConf());
+ * job.setOutputFormatClass(WARCOutputFormat.class);
+ * job.setOutputKeyClass(NullWritable.class);
+ * job.setOutputValueClass(WARCWritable.class);
+ * FileOutputFormat.setCompressOutput(job, true);
+ * ```
+ *
+ * The tasks generating the output (usually the reducers, but may be the mappers if there
+ * are no reducers) should use `NullWritable.get()` as the output key, and the
+ * {@link WARCWritable} as the output value.
+ */
+public class WARCOutputFormat extends FileOutputFormat<NullWritable, WARCWritable> {
+
+  /**
+   * Creates a new output file in WARC format, and returns a RecordWriter for writing to it.
+   */
+  @Override
+  public RecordWriter<NullWritable, WARCWritable> getRecordWriter(TaskAttemptContext context)
+      throws IOException, InterruptedException {
+    return new WARCWriter(context);
+  }
+
+  private class WARCWriter extends RecordWriter<NullWritable, WARCWritable> {
+    private final WARCFileWriter writer;
+
+    public WARCWriter(TaskAttemptContext context) throws IOException {
+      Configuration conf = context.getConfiguration();
+      CompressionCodec codec =
+        getCompressOutput(context) ? WARCFileWriter.getGzipCodec(conf) : null;
+      Path workFile = getDefaultWorkFile(context, "");
+      this.writer = new WARCFileWriter(conf, codec, workFile);
+    }
+
+    @Override
+    public void write(NullWritable key, WARCWritable value)
+        throws IOException, InterruptedException {
+      writer.write(value);
+    }
+
+    @Override
+    public void close(TaskAttemptContext context) throws IOException, InterruptedException {
+      writer.close();
+    }
+  }
+
+}
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/util/warc/WARCRecord.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/util/warc/WARCRecord.java
new file mode 100644
index 0000000..b2ff85b
--- /dev/null
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/util/warc/WARCRecord.java
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/*
+ * The MIT License (MIT)
+ * Copyright (c) 2014 Martin Kleppmann
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ * THE SOFTWARE.
+ */
+
+package org.apache.hadoop.hbase.test.util.warc;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+/**
+ * Immutable implementation of a record in a WARC file. You create a {@link WARCRecord}
+ * by parsing it out of a {@link DataInput} stream.
+ *
+ * The file format is documented in the
+ * [ISO Standard](http://bibnum.bnf.fr/warc/WARC_ISO_28500_version1_latestdraft.pdf).
+ * In a nutshell, it's a textual format consisting of lines delimited by `\r\n`.
+ * Each record has the following structure:
+ *
+ * 1. A line indicating the WARC version number, such as `WARC/1.0`.
+ * 2. Several header lines (in key-value format, similar to HTTP or email headers),
+ *    giving information about the record. The header is terminated by an empty line.
+ * 3. A body consisting of raw bytes (the number of bytes is indicated in one of the headers).
+ * 4. A final separator of `\r\n\r\n` before the next record starts.
+ *
+ * There are various different types of records, as documented on
+ * {@link Header#getRecordType()}.
+ */
+public class WARCRecord {
+
+  public static final String WARC_VERSION = "WARC/1.0";
+  private static final Pattern VERSION_PATTERN = Pattern.compile("WARC/[0-9\\.]+");
+  private static final Pattern CONTINUATION_PATTERN = Pattern.compile("^[\\t ]+.*");
+  private static final String CRLF = "\r\n";
+  private static final byte[] CRLF_BYTES = { 13, 10 };
+
+  private final Header header;
+  private final byte[] content;
+
+  /**
+   * Creates a new WARCRecord by parsing it out of a {@link DataInput} stream.
+   * @param in The input source from which one record will be read.
+   */
+  public WARCRecord(DataInput in) throws IOException {
+    header = readHeader(in);
+    content = new byte[header.getContentLength()];
+    in.readFully(content);
+    readSeparator(in);
+  }
+
+  private static Header readHeader(DataInput in) throws IOException {
+    String versionLine = readLine(in);
+    if (!VERSION_PATTERN.matcher(versionLine).matches()) {
+      throw new IllegalStateException("Expected WARC version, but got: " + versionLine);
+    }
+
+    LinkedHashMap<String, String> headers = new LinkedHashMap<String, String>();
+    String line, fieldName = null;
+
+    do {
+      line = readLine(in);
+      if (fieldName != null && CONTINUATION_PATTERN.matcher(line).matches()) {
+        headers.put(fieldName, headers.get(fieldName) + line);
+      } else if (!line.isEmpty()) {
+        String[] field = line.split(":", 2);
+        if (field.length < 2) {
+          throw new IllegalStateException("Malformed header line: " + line);
+        }
+        fieldName = field[0].trim();
+        headers.put(fieldName, field[1].trim());
+      }
+    } while (!line.isEmpty());
+
+    return new Header(headers);
+  }
+
+  private static String readLine(DataInput in) throws IOException {
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    boolean seenCR = false, seenCRLF = false;
+    while (!seenCRLF) {
+      byte b = in.readByte();
+      if (!seenCR && b == 13) {
+        seenCR = true;
+      } else if (seenCR && b == 10) {
+        seenCRLF = true;
+      } else {
+        seenCR = false;
+        out.write(b);
+      }
+    }
+    return out.toString("UTF-8");
+  }
+
+  private static void readSeparator(DataInput in) throws IOException {
+    byte[] sep = new byte[4];
+    in.readFully(sep);
+    if (sep[0] != 13 || sep[1] != 10 || sep[2] != 13 || sep[3] != 10) {
+      throw new IllegalStateException(String.format(
+        "Expected final separator CR LF CR LF, but got: %d %d %d %d",
+        sep[0], sep[1], sep[2], sep[3]));
+    }
+  }
+
+  /**
+   * Returns the parsed header structure of the WARC record.
+   */
+  public Header getHeader() {
+    return header;
+  }
+
+  /**
+   * Returns the body of the record, as an unparsed raw array of bytes. The content
+   * of the body depends on the type of record (see {@link Header#getRecordType()}).
+   * For example, in the case of a `response` type header, the body consists of the
+   * full HTTP response returned by the server (HTTP headers followed by the body).
+   */
+  public byte[] getContent() {
+    return content;
+  }
+
+  /**
+   * Writes this record to a {@link DataOutput} stream. The output may, in some edge
+   * cases, be not byte-for-byte identical to what was parsed from a {@link DataInput}.
+   * However it has the same meaning and should not lose any information.
+   * @param out The output stream to which this record should be appended.
+   */
+  public void write(DataOutput out) throws IOException {
+    header.write(out);
+    out.write(CRLF_BYTES);
+    out.write(content);
+    out.write(CRLF_BYTES);
+    out.write(CRLF_BYTES);
+  }
+
+  /**
+   * Returns a human-readable string representation of the record.
+   */
+  @Override
+  public String toString() {
+    return header.toString();
+  }
+
+  /**
+   * Contains the parsed headers of a {@link WARCRecord}. Each record contains a number
+   * of headers in key-value format, where some header keys are standardised, but
+   * nonstandard ones can be added.
+   *
+   * The documentation of the methods in this class is excerpted from the
+   * [WARC 1.0 specification](http://bibnum.bnf.fr/warc/WARC_ISO_28500_version1_latestdraft.pdf).
+   * Please see the specification for more detail.
+   */
+  public final static class Header {
+    private final Map<String, String> fields;
+
+    private Header(Map<String, String> fields) {
+      this.fields = fields;
+    }
+
+    /**
+     * Returns the type of WARC record (the value of the `WARC-Type` header field).
+     * WARC 1.0 defines the following record types: (for full definitions, see the
+     * [spec](http://bibnum.bnf.fr/warc/WARC_ISO_28500_version1_latestdraft.pdf))
+     *
+     *  *  `warcinfo`: Describes the records that follow it, up through end of file,
+     *     end of input, or until next `warcinfo` record. Typically, this appears once and
+     *     at the beginning of a WARC file. For a web archive, it often contains information
+     *     about the web crawl which generated the following records.
+     *
+     *     The format of this descriptive record block may vary, though the use of the
+     *     `"application/warc-fields"` content-type is recommended. (...)
+     *
+     *  *  `response`: The record should contain a complete scheme-specific response, including
+     *     network protocol information where possible. For a target-URI of the `http` or
+     *     `https` schemes, a `response` record block should contain the full HTTP
+     *     response received over the network, including headers. That is, it contains the
+     *     'Response' message defined by section 6 of HTTP/1.1 (RFC2616).
+     *
+     *     The WARC record's Content-Type field should contain the value defined by HTTP/1.1,
+     *     `"application/http;msgtype=response"`. The payload of the record is defined as its
+     *     'entity-body' (per RFC2616), with any transfer-encoding removed.
+     *
+     *  *  `resource`: The record contains a resource, without full protocol response
+     *     information. For example: a file directly retrieved from a locally accessible
+     *     repository or the result of a networked retrieval where the protocol information
+     *     has been discarded. For a target-URI of the `http` or `https` schemes, a `resource`
+     *     record block shall contain the returned 'entity-body' (per RFC2616, with any
+     *     transfer-encodings removed), possibly truncated.
+     *
+     *  *  `request`: The record holds the details of a complete scheme-specific request,
+     *     including network protocol information where possible. For a target-URI of the
+     *     `http` or `https` schemes, a `request` record block should contain the full HTTP
+     *     request sent over the network, including headers. That is, it contains the
+     *     'Request' message defined by section 5 of HTTP/1.1 (RFC2616).
+     *
+     *     The WARC record's Content-Type field should contain the value defined by HTTP/1.1,
+     *     `"application/http;msgtype=request"`. The payload of a `request` record with a
+     *     target-URI of scheme `http` or `https` is defined as its 'entity-body' (per
+     *     RFC2616), with any transfer-encoding removed.
+     *
+     *  *  `metadata`: The record contains content created in order to further describe,
+     *     explain, or accompany a harvested resource, in ways not covered by other record
+     *     types. A `metadata` record will almost always refer to another record of another
+     *     type, with that other record holding original harvested or transformed content.
+     *
+     *     The format of the metadata record block may vary. The `"application/warc-fields"`
+     *     format may be used.
+     *
+     *  *  `revisit`: The record describes the revisitation of content already archived,
+     *     and might include only an abbreviated content body which has to be interpreted
+     *     relative to a previous record. Most typically, a `revisit` record is used
+     *     instead of a `response` or `resource` record to indicate that the content
+     *     visited was either a complete or substantial duplicate of material previously
+     *     archived.
+     *
+     *     A `revisit` record shall contain a WARC-Profile field which determines the
+     *     interpretation of the record's fields and record block. Please see the
+     *     specification for details.
+     *
+     *  *  `conversion`: The record shall contain an alternative version of another
+     *     record's content that was created as the result of an archival process.
+     *     Typically, this is used to hold content transformations that maintain viability
+     *     of content after widely available rendering tools for the originally stored
+     *     format disappear. As needed, the original content may be migrated (transformed)
+     *     to a more viable format in order to keep the information usable with current
+     *     tools while minimizing loss of information.
+     *
+     *  *  `continuation`: Record blocks from `continuation` records must be appended to
+     *     corresponding prior record blocks (eg. from other WARC files) to create the
+     *     logically complete full-sized original record. That is, `continuation`
+     *     records are used when a record that would otherwise cause a WARC file size to
+     *     exceed a desired limit is broken into segments. A continuation record shall
+     *     contain the named fields `WARC-Segment-Origin-ID` and `WARC-Segment-Number`,
+     *     and the last `continuation` record of a series shall contain a
+     *     `WARC-Segment-Total-Length` field. Please see the specification for details.
+     *
+     *  *  Other record types may be added in future, so this list is not exclusive.
+     *
+     * @return The record's `WARC-Type` header field, as a string.
+     */
+    public String getRecordType() {
+      return fields.get("WARC-Type");
+    }
+
+    /**
+     * A 14-digit UTC timestamp formatted according to YYYY-MM-DDThh:mm:ssZ, described
+     * in the W3C profile of ISO8601. The timestamp shall represent the instant that
+     * data capture for record creation began. Multiple records written as part of a
+     * single capture event shall use the same WARC-Date, even though the times of
+     * their writing will not be exactly synchronized.
+     *
+     * @return The record's `WARC-Date` header field, as a string.
+     */
+    public String getDateString() {
+      return fields.get("WARC-Date");
+    }
+
+    /**
+     * An identifier assigned to the current record that is globally unique for its
+     * period of intended use. No identifier scheme is mandated by this specification,
+     * but each record-id shall be a legal URI and clearly indicate a documented and
+     * registered scheme to which it conforms (e.g., via a URI scheme prefix such as
+     * `http:` or `urn:`).
+     *
+     * @return The record's `WARC-Record-ID` header field, as a string.
+     */
+    public String getRecordID() {
+      return fields.get("WARC-Record-ID");
+    }
+
+    /**
+     * The MIME type (RFC2045) of the information contained in the record's block. For
+     * example, in HTTP request and response records, this would be `application/http`
+     * as per section 19.1 of RFC2616 (or `application/http; msgtype=request` and
+     * `application/http; msgtype=response` respectively).
+     *
+     * In particular, the content-type is *not* the value of the HTTP Content-Type
+     * header in an HTTP response, but a MIME type to describe the full archived HTTP
+     * message (hence `application/http` if the block contains request or response
+     * headers).
+     *
+     * @return The record's `Content-Type` header field, as a string.
+     */
+    public String getContentType() {
+      return fields.get("Content-Type");
+    }
+
+    /**
+     * The original URI whose capture gave rise to the information content in this record.
+     * In the context of web harvesting, this is the URI that was the target of a
+     * crawler's retrieval request. For a `revisit` record, it is the URI that was the
+     * target of a retrieval request. Indirectly, such as for a `metadata`, or `conversion`
+     * record, it is a copy of the `WARC-Target-URI` appearing in the original record to
+     * which the newer record pertains. The URI in this value shall be properly escaped
+     * according to RFC3986, and written with no internal whitespace.
+     *
+     * @return The record's `WARC-Target-URI` header field, as a string.
+     */
+    public String getTargetURI() {
+      return fields.get("WARC-Target-URI");
+    }
+
+    /**
+     * The number of bytes in the body of the record, similar to RFC2616.
+     *
+     * @return The record's `Content-Length` header field, parsed into an int.
+     */
+    public int getContentLength() {
+      String lengthStr = fields.get("Content-Length");
+      if (lengthStr == null) {
+        throw new IllegalStateException("Missing Content-Length header");
+      }
+      try {
+        return Integer.parseInt(lengthStr);
+      } catch (NumberFormatException e) {
+        throw new IllegalStateException("Malformed Content-Length header: " + lengthStr);
+      }
+    }
+
+    /**
+     * Returns the value of a selected header field, or null if there is no header with
+     * that field name.
+     * @param field The name of the header to return (case-sensitive).
+     * @return The value associated with that field name, or null if not present.
+     */
+    public String getField(String field) {
+      return fields.get(field);
+    }
+
+    /**
+     * Appends this header to a {@link DataOutput} stream, in WARC/1.0 format.
+     * @param out The data output to which the header should be written.
+     */
+    public void write(DataOutput out) throws IOException {
+      out.write(toString().getBytes("UTF-8"));
+    }
+
+    /**
+     * Formats this header in WARC/1.0 format, consisting of a version line followed
+     * by colon-delimited key-value pairs, and `\r\n` line endings.
+     */
+    @Override
+    public String toString() {
+      StringBuilder buf = new StringBuilder();
+      buf.append(WARC_VERSION);
+      buf.append(CRLF);
+      for (Map.Entry<String, String> field : fields.entrySet()) {
+        buf.append(field.getKey());
+        buf.append(": ");
+        buf.append(field.getValue());
+        buf.append(CRLF);
+      }
+      return buf.toString();
+    }
+  }
+
+}
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/util/warc/WARCWritable.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/util/warc/WARCWritable.java
new file mode 100644
index 0000000..bf5fc6b
--- /dev/null
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/util/warc/WARCWritable.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/*
+ * The MIT License (MIT)
+ * Copyright (c) 2014 Martin Kleppmann
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ * THE SOFTWARE.
+ */
+
+package org.apache.hadoop.hbase.test.util.warc;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * A mutable wrapper around a {@link WARCRecord} implementing the Hadoop Writable interface.
+ * This allows WARC records to be used throughout Hadoop (e.g. written to sequence files
+ * when shuffling data between mappers and reducers). The record is encoded as a single
+ * record in standard WARC/1.0 format.
+ */
+public class WARCWritable implements Writable {
+
+  private WARCRecord record;
+
+  /** Creates an empty writable (with a null record). */
+  public WARCWritable() {
+    this.record = null;
+  }
+
+  /** Creates a writable wrapper around a given WARCRecord. */
+  public WARCWritable(WARCRecord record) {
+    this.record = record;
+  }
+
+  /** Returns the record currently wrapped by this writable. */
+  public WARCRecord getRecord() {
+    return record;
+  }
+
+  /** Updates the record held within this writable wrapper. */
+  public void setRecord(WARCRecord record) {
+    this.record = record;
+  }
+
+  /** Appends the current record to a {@link DataOutput} stream. */
+  @Override
+  public void write(DataOutput out) throws IOException {
+    if (record != null) {
+      record.write(out);
+    }
+  }
+
+  /**
+   * Parses a {@link WARCRecord} out of a {@link DataInput} stream, and makes it the
+   * writable's current record.
+   */
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    record = new WARCRecord(in);
+  }
+
+}
diff --git a/hbase-it/src/test/resources/CC-MAIN-2021-10-warc.paths.gz b/hbase-it/src/test/resources/CC-MAIN-2021-10-warc.paths.gz
new file mode 100644
index 0000000..100dd3e
--- /dev/null
+++ b/hbase-it/src/test/resources/CC-MAIN-2021-10-warc.paths.gz
Binary files differ
diff --git a/pom.xml b/pom.xml
index 96a7b9c..7827406 100755
--- a/pom.xml
+++ b/pom.xml
@@ -956,6 +956,8 @@
               <exclude>**/shaded/com/google/protobuf/**</exclude>
               <exclude>**/src/main/patches/**</exclude>
               <exclude>**/vote.tmpl</exclude>
+              <!-- gzipped list of S3N URIs for hbase-it -->
+              <exclude>**/CC-MAIN-2021-10-warc.paths.gz</exclude>
             </excludes>
           </configuration>
         </plugin>