Checkpoint demo class
diff --git a/tephra-hbase-compat-0.98/src/main/java/co/cask/tephra/hbase98/CheckpointDemo.java b/tephra-hbase-compat-0.98/src/main/java/co/cask/tephra/hbase98/CheckpointDemo.java
new file mode 100644
index 0000000..6c9b399
--- /dev/null
+++ b/tephra-hbase-compat-0.98/src/main/java/co/cask/tephra/hbase98/CheckpointDemo.java
@@ -0,0 +1,162 @@
+/*
+ * Copyright © 2015 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package co.cask.tephra.hbase98;
+
+import co.cask.tephra.Transaction;
+import co.cask.tephra.TransactionContext;
+import co.cask.tephra.TransactionFailureException;
+import co.cask.tephra.distributed.TransactionServiceClient;
+import co.cask.tephra.hbase98.coprocessor.TransactionProcessor;
+import co.cask.tephra.runtime.ConfigModule;
+import co.cask.tephra.runtime.DiscoveryModules;
+import co.cask.tephra.runtime.TransactionClientModule;
+import co.cask.tephra.runtime.TransactionModules;
+import co.cask.tephra.runtime.ZKModule;
+import co.cask.tephra.util.ConfigurationFactory;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.twill.zookeeper.ZKClientService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ *
+ */
+public class CheckpointDemo {
+  private static final Logger LOG = LoggerFactory.getLogger(CheckpointDemo.class);
+
+  private static final byte[] FAMILY = Bytes.toBytes("f");
+  private static final byte[] COL = Bytes.toBytes("c");
+  private static final byte[] VAL = Bytes.toBytes("1");
+  private static final byte[][] SPLIT_KEYS =
+      { Bytes.toBytes(1), Bytes.toBytes(2), Bytes.toBytes(3), Bytes.toBytes(4) };
+
+  public static void main(String[] args) {
+    if (args.length < 1 || args.length > 2) {
+      System.err.println("Usage: java " + CheckpointDemo.class.getName() + " <tablename> [-c]");
+      System.exit(1);
+    }
+    String tableString = args[0];
+    boolean checkpoint = false;
+    if (args.length > 1) {
+      checkpoint = "-c".equals(args[1]);
+    }
+    Configuration conf = new ConfigurationFactory().get();
+    Injector injector = Guice.createInjector(
+        new ConfigModule(conf),
+        new ZKModule(),
+        new DiscoveryModules().getDistributedModules(),
+        new TransactionModules().getDistributedModules(),
+        new TransactionClientModule()
+    );
+    LOG.info("Running tx checkpoint demo.");
+
+    ZKClientService zkClient = injector.getInstance(ZKClientService.class);
+    zkClient.startAndWait();
+
+    HTable table = null;
+    try {
+      TableName tableName = TableName.valueOf(tableString);
+      TransactionServiceClient client = injector.getInstance(TransactionServiceClient.class);
+
+      createTable(conf, tableName);
+
+      table = new HTable(conf, tableName);
+      TransactionAwareHTable txTable = new TransactionAwareHTable(table);
+
+      TransactionContext txContext = new TransactionContext(client, txTable);
+
+      txContext.start();
+      LOG.info("Tx write pointer={}", txContext.getCurrentTransaction().getWritePointer());
+      LOG.info("Loading 5 initial records");
+      for (int i = 0; i < 5; i++) {
+        txTable.put(new Put(Bytes.toBytes(i)).add(FAMILY, COL, VAL));
+        TimeUnit.SECONDS.sleep(1);
+      }
+
+      if (checkpoint) {
+        LOG.info("Performing checkpoint");
+        txContext.checkpoint();
+        Transaction tx = txContext.getCurrentTransaction();
+        tx.setVisibility(Transaction.VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT);
+        LOG.info("Tx write pointer={}", txContext.getCurrentTransaction().getWritePointer());
+      }
+
+      LOG.info("Starting scan + insert over same table");
+      List<Scan> scans = new ArrayList<Scan>();
+      List<HRegionLocation> allRegions = table.getRegionsInRange(HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
+      for (HRegionLocation loc : allRegions) {
+        scans.add(new Scan(loc.getRegionInfo().getStartKey(), loc.getRegionInfo().getEndKey()));
+      }
+      for (Scan scan : scans) {
+        LOG.info("Scanning from " + Bytes.toStringBinary(scan.getStartRow())
+            + " to " + Bytes.toStringBinary(scan.getStopRow()));
+        ResultScanner scanner = txTable.getScanner(scan);
+        for (Result row : scanner) {
+          int readRow = Bytes.toInt(row.getRow());
+          long readTs = row.getColumnLatestCell(FAMILY, COL).getTimestamp();
+          int rowToWrite = readRow * 10;
+          LOG.info("Inserting row={} for read row: key={}, ts={}", rowToWrite, readRow, readTs);
+          txTable.put(new Put(Bytes.toBytes(rowToWrite)).add(FAMILY, COL, VAL));
+        }
+      }
+      txContext.finish();
+
+    } catch (Exception e) {
+      LOG.error("Failed on: " + e.getMessage(), e);
+    } finally {
+      if (table != null) {
+        try { table.close(); } catch (IOException ignored) { }
+      }
+      zkClient.stopAndWait();
+    }
+  }
+
+  private static void createTable(Configuration conf, TableName tableName) throws IOException {
+    HBaseAdmin admin = new HBaseAdmin(conf);
+    try {
+      if (!admin.tableExists(tableName)) {
+        HTableDescriptor tableDesc = new HTableDescriptor(tableName);
+        tableDesc.addCoprocessor(TransactionProcessor.class.getName());
+        HColumnDescriptor columnDesc = new HColumnDescriptor(FAMILY);
+        columnDesc.setMaxVersions(Integer.MAX_VALUE);
+        tableDesc.addFamily(columnDesc);
+        admin.createTable(tableDesc, SPLIT_KEYS);
+      }
+    } finally {
+      admin.close();
+    }
+  }
+}