Checkpoint demo class
diff --git a/tephra-hbase-compat-0.98/src/main/java/co/cask/tephra/hbase98/CheckpointDemo.java b/tephra-hbase-compat-0.98/src/main/java/co/cask/tephra/hbase98/CheckpointDemo.java
new file mode 100644
index 0000000..6c9b399
--- /dev/null
+++ b/tephra-hbase-compat-0.98/src/main/java/co/cask/tephra/hbase98/CheckpointDemo.java
@@ -0,0 +1,162 @@
+/*
+ * Copyright © 2015 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package co.cask.tephra.hbase98;
+
+import co.cask.tephra.Transaction;
+import co.cask.tephra.TransactionContext;
+import co.cask.tephra.TransactionFailureException;
+import co.cask.tephra.distributed.TransactionServiceClient;
+import co.cask.tephra.hbase98.coprocessor.TransactionProcessor;
+import co.cask.tephra.runtime.ConfigModule;
+import co.cask.tephra.runtime.DiscoveryModules;
+import co.cask.tephra.runtime.TransactionClientModule;
+import co.cask.tephra.runtime.TransactionModules;
+import co.cask.tephra.runtime.ZKModule;
+import co.cask.tephra.util.ConfigurationFactory;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.twill.zookeeper.ZKClientService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ *
+ */
+public class CheckpointDemo {
+ private static final Logger LOG = LoggerFactory.getLogger(CheckpointDemo.class);
+
+ private static final byte[] FAMILY = Bytes.toBytes("f");
+ private static final byte[] COL = Bytes.toBytes("c");
+ private static final byte[] VAL = Bytes.toBytes("1");
+ private static final byte[][] SPLIT_KEYS =
+ { Bytes.toBytes(1), Bytes.toBytes(2), Bytes.toBytes(3), Bytes.toBytes(4) };
+
+ public static void main(String[] args) {
+ if (args.length < 1 || args.length > 2) {
+ System.err.println("Usage: java " + CheckpointDemo.class.getName() + " <tablename> [-c]");
+ System.exit(1);
+ }
+ String tableString = args[0];
+ boolean checkpoint = false;
+ if (args.length > 1) {
+ checkpoint = "-c".equals(args[1]);
+ }
+ Configuration conf = new ConfigurationFactory().get();
+ Injector injector = Guice.createInjector(
+ new ConfigModule(conf),
+ new ZKModule(),
+ new DiscoveryModules().getDistributedModules(),
+ new TransactionModules().getDistributedModules(),
+ new TransactionClientModule()
+ );
+ LOG.info("Running tx checkpoint demo.");
+
+ ZKClientService zkClient = injector.getInstance(ZKClientService.class);
+ zkClient.startAndWait();
+
+ HTable table = null;
+ try {
+ TableName tableName = TableName.valueOf(tableString);
+ TransactionServiceClient client = injector.getInstance(TransactionServiceClient.class);
+
+ createTable(conf, tableName);
+
+ table = new HTable(conf, tableName);
+ TransactionAwareHTable txTable = new TransactionAwareHTable(table);
+
+ TransactionContext txContext = new TransactionContext(client, txTable);
+
+ txContext.start();
+ LOG.info("Tx write pointer={}", txContext.getCurrentTransaction().getWritePointer());
+ LOG.info("Loading 5 initial records");
+ for (int i = 0; i < 5; i++) {
+ txTable.put(new Put(Bytes.toBytes(i)).add(FAMILY, COL, VAL));
+ TimeUnit.SECONDS.sleep(1);
+ }
+
+ if (checkpoint) {
+ LOG.info("Performing checkpoint");
+ txContext.checkpoint();
+ Transaction tx = txContext.getCurrentTransaction();
+ tx.setVisibility(Transaction.VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT);
+ LOG.info("Tx write pointer={}", txContext.getCurrentTransaction().getWritePointer());
+ }
+
+ LOG.info("Starting scan + insert over same table");
+ List<Scan> scans = new ArrayList<Scan>();
+ List<HRegionLocation> allRegions = table.getRegionsInRange(HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
+ for (HRegionLocation loc : allRegions) {
+ scans.add(new Scan(loc.getRegionInfo().getStartKey(), loc.getRegionInfo().getEndKey()));
+ }
+ for (Scan scan : scans) {
+ LOG.info("Scanning from " + Bytes.toStringBinary(scan.getStartRow())
+ + " to " + Bytes.toStringBinary(scan.getStopRow()));
+ ResultScanner scanner = txTable.getScanner(scan);
+ for (Result row : scanner) {
+ int readRow = Bytes.toInt(row.getRow());
+ long readTs = row.getColumnLatestCell(FAMILY, COL).getTimestamp();
+ int rowToWrite = readRow * 10;
+ LOG.info("Inserting row={} for read row: key={}, ts={}", rowToWrite, readRow, readTs);
+ txTable.put(new Put(Bytes.toBytes(rowToWrite)).add(FAMILY, COL, VAL));
+ }
+ }
+ txContext.finish();
+
+ } catch (Exception e) {
+ LOG.error("Failed on: " + e.getMessage(), e);
+ } finally {
+ if (table != null) {
+ try { table.close(); } catch (IOException ignored) { }
+ }
+ zkClient.stopAndWait();
+ }
+ }
+
+ private static void createTable(Configuration conf, TableName tableName) throws IOException {
+ HBaseAdmin admin = new HBaseAdmin(conf);
+ try {
+ if (!admin.tableExists(tableName)) {
+ HTableDescriptor tableDesc = new HTableDescriptor(tableName);
+ tableDesc.addCoprocessor(TransactionProcessor.class.getName());
+ HColumnDescriptor columnDesc = new HColumnDescriptor(FAMILY);
+ columnDesc.setMaxVersions(Integer.MAX_VALUE);
+ tableDesc.addFamily(columnDesc);
+ admin.createTable(tableDesc, SPLIT_KEYS);
+ }
+ } finally {
+ admin.close();
+ }
+ }
+}