blob: 6c9b399b714e2926157bd3e64ccbdc358e6b2c39 [file] [log] [blame]
/*
* Copyright © 2015 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package co.cask.tephra.hbase98;
import co.cask.tephra.Transaction;
import co.cask.tephra.TransactionContext;
import co.cask.tephra.TransactionFailureException;
import co.cask.tephra.distributed.TransactionServiceClient;
import co.cask.tephra.hbase98.coprocessor.TransactionProcessor;
import co.cask.tephra.runtime.ConfigModule;
import co.cask.tephra.runtime.DiscoveryModules;
import co.cask.tephra.runtime.TransactionClientModule;
import co.cask.tephra.runtime.TransactionModules;
import co.cask.tephra.runtime.ZKModule;
import co.cask.tephra.util.ConfigurationFactory;
import com.google.inject.Guice;
import com.google.inject.Injector;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.twill.zookeeper.ZKClientService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
*
*/
public class CheckpointDemo {
private static final Logger LOG = LoggerFactory.getLogger(CheckpointDemo.class);
private static final byte[] FAMILY = Bytes.toBytes("f");
private static final byte[] COL = Bytes.toBytes("c");
private static final byte[] VAL = Bytes.toBytes("1");
private static final byte[][] SPLIT_KEYS =
{ Bytes.toBytes(1), Bytes.toBytes(2), Bytes.toBytes(3), Bytes.toBytes(4) };
public static void main(String[] args) {
if (args.length < 1 || args.length > 2) {
System.err.println("Usage: java " + CheckpointDemo.class.getName() + " <tablename> [-c]");
System.exit(1);
}
String tableString = args[0];
boolean checkpoint = false;
if (args.length > 1) {
checkpoint = "-c".equals(args[1]);
}
Configuration conf = new ConfigurationFactory().get();
Injector injector = Guice.createInjector(
new ConfigModule(conf),
new ZKModule(),
new DiscoveryModules().getDistributedModules(),
new TransactionModules().getDistributedModules(),
new TransactionClientModule()
);
LOG.info("Running tx checkpoint demo.");
ZKClientService zkClient = injector.getInstance(ZKClientService.class);
zkClient.startAndWait();
HTable table = null;
try {
TableName tableName = TableName.valueOf(tableString);
TransactionServiceClient client = injector.getInstance(TransactionServiceClient.class);
createTable(conf, tableName);
table = new HTable(conf, tableName);
TransactionAwareHTable txTable = new TransactionAwareHTable(table);
TransactionContext txContext = new TransactionContext(client, txTable);
txContext.start();
LOG.info("Tx write pointer={}", txContext.getCurrentTransaction().getWritePointer());
LOG.info("Loading 5 initial records");
for (int i = 0; i < 5; i++) {
txTable.put(new Put(Bytes.toBytes(i)).add(FAMILY, COL, VAL));
TimeUnit.SECONDS.sleep(1);
}
if (checkpoint) {
LOG.info("Performing checkpoint");
txContext.checkpoint();
Transaction tx = txContext.getCurrentTransaction();
tx.setVisibility(Transaction.VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT);
LOG.info("Tx write pointer={}", txContext.getCurrentTransaction().getWritePointer());
}
LOG.info("Starting scan + insert over same table");
List<Scan> scans = new ArrayList<Scan>();
List<HRegionLocation> allRegions = table.getRegionsInRange(HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
for (HRegionLocation loc : allRegions) {
scans.add(new Scan(loc.getRegionInfo().getStartKey(), loc.getRegionInfo().getEndKey()));
}
for (Scan scan : scans) {
LOG.info("Scanning from " + Bytes.toStringBinary(scan.getStartRow())
+ " to " + Bytes.toStringBinary(scan.getStopRow()));
ResultScanner scanner = txTable.getScanner(scan);
for (Result row : scanner) {
int readRow = Bytes.toInt(row.getRow());
long readTs = row.getColumnLatestCell(FAMILY, COL).getTimestamp();
int rowToWrite = readRow * 10;
LOG.info("Inserting row={} for read row: key={}, ts={}", rowToWrite, readRow, readTs);
txTable.put(new Put(Bytes.toBytes(rowToWrite)).add(FAMILY, COL, VAL));
}
}
txContext.finish();
} catch (Exception e) {
LOG.error("Failed on: " + e.getMessage(), e);
} finally {
if (table != null) {
try { table.close(); } catch (IOException ignored) { }
}
zkClient.stopAndWait();
}
}
private static void createTable(Configuration conf, TableName tableName) throws IOException {
HBaseAdmin admin = new HBaseAdmin(conf);
try {
if (!admin.tableExists(tableName)) {
HTableDescriptor tableDesc = new HTableDescriptor(tableName);
tableDesc.addCoprocessor(TransactionProcessor.class.getName());
HColumnDescriptor columnDesc = new HColumnDescriptor(FAMILY);
columnDesc.setMaxVersions(Integer.MAX_VALUE);
tableDesc.addFamily(columnDesc);
admin.createTable(tableDesc, SPLIT_KEYS);
}
} finally {
admin.close();
}
}
}