| /* |
| * Copyright (c) 2013 DataTorrent, Inc. ALL Rights Reserved. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package com.datatorrent.contrib.hbase; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.net.InetSocketAddress; |
| import java.util.List; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.hbase.HBaseConfiguration; |
| import org.apache.hadoop.hbase.HColumnDescriptor; |
| import org.apache.hadoop.hbase.HTableDescriptor; |
| import org.apache.hadoop.hbase.KeyValue; |
| import org.apache.hadoop.hbase.LocalHBaseCluster; |
| import org.apache.hadoop.hbase.client.Get; |
| import org.apache.hadoop.hbase.client.HBaseAdmin; |
| import org.apache.hadoop.hbase.client.HTable; |
| import org.apache.hadoop.hbase.client.Put; |
| import org.apache.hadoop.hbase.client.Result; |
| import org.apache.hadoop.hbase.util.Bytes; |
| import org.apache.zookeeper.server.NIOServerCnxnFactory; |
| import org.apache.zookeeper.server.ZooKeeperServer; |
| |
| //import org.apache.hadoop.hbase.EmptyWatcher; |
| |
| /** |
| * |
| */ |
| public class HBaseTestHelper |
| { |
| |
| public static final byte[] table_bytes = Bytes.toBytes("table1"); |
| public static final byte[] colfam0_bytes = Bytes.toBytes("colfam0"); |
| public static final byte[] col0_bytes = Bytes.toBytes("col-0"); |
| |
| public static void startLocalCluster() throws IOException, InterruptedException { |
| startZooKeeperServer(); |
| //Configuration conf = HBaseConfiguration.create(); |
| Configuration conf = getConfiguration(); |
| LocalHBaseCluster lc = new LocalHBaseCluster(conf); |
| lc.startup(); |
| } |
| |
| private static void startZooKeeperServer() throws IOException, InterruptedException { |
| String zooLocation = System.getProperty("java.io.tmpdir"); |
| File zooFile = new File(zooLocation, "zookeeper-malhartest"); |
| ZooKeeperServer zooKeeper = new ZooKeeperServer(zooFile, zooFile, 2000); |
| |
| NIOServerCnxnFactory serverFactory = new NIOServerCnxnFactory(); |
| serverFactory.configure(new InetSocketAddress(2181),10); |
| serverFactory.startup(zooKeeper); |
| } |
| |
| public static void populateHBase() throws IOException { |
| Configuration conf = getConfiguration(); |
| clearHBase(conf); |
| HTable table1 = new HTable(conf, "table1"); |
| for (int i=0; i < 500; ++i) { |
| Put put = new Put(Bytes.toBytes("row" + i)); |
| for (int j=0; j < 500; ++j) { |
| put.add(colfam0_bytes, Bytes.toBytes("col" + "-" + j ), Bytes.toBytes("val" + "-" + i + "-" + j)); |
| } |
| table1.put(put); |
| } |
| } |
| |
| public static void clearHBase() throws IOException { |
| Configuration conf = getConfiguration(); |
| clearHBase(conf); |
| } |
| |
| private static void clearHBase(Configuration conf) throws IOException { |
| HBaseAdmin admin = new HBaseAdmin(conf); |
| if (admin.tableExists(table_bytes)) { |
| try { |
| admin.disableTable(table_bytes); |
| } catch (Exception ex) { |
| } |
| try { |
| admin.deleteTable(table_bytes); |
| } catch (Exception ex) { |
| } |
| } |
| HTableDescriptor tdesc = new HTableDescriptor(table_bytes); |
| HColumnDescriptor cdesc = new HColumnDescriptor(colfam0_bytes); |
| tdesc.addFamily(cdesc); |
| admin.createTable(tdesc); |
| } |
| |
| public static HBaseTuple getHBaseTuple(String row, String colFamily, String colName) throws IOException { |
| Configuration conf = getConfiguration(); |
| HTable table1 = new HTable(conf, "table1"); |
| Get get = new Get(Bytes.toBytes(row)); |
| get.addFamily(Bytes.toBytes(colFamily)); |
| get.addColumn(Bytes.toBytes(colFamily), Bytes.toBytes(colName)); |
| Result result = table1.get(get); |
| return getHBaseTuple(result); |
| } |
| |
| public static HBaseTuple getHBaseTuple(Result result) { |
| HBaseTuple tuple = null; |
| KeyValue[] kvs = result.raw(); |
| for (KeyValue kv : kvs) { |
| tuple = getHBaseTuple(kv); |
| break; |
| } |
| return tuple; |
| } |
| |
| public static HBaseTuple getHBaseTuple(KeyValue kv) { |
| HBaseTuple tuple = new HBaseTuple(); |
| tuple.setRow(new String(kv.getRow())); |
| tuple.setColFamily(new String(kv.getFamily())); |
| tuple.setColName(new String(kv.getQualifier())); |
| tuple.setColValue(new String(kv.getValue())); |
| return tuple; |
| } |
| |
| public static HBaseTuple findTuple(List<HBaseTuple> tuples, String row, String colFamily, String colName) { |
| HBaseTuple mtuple = null; |
| for (HBaseTuple tuple : tuples) { |
| if (tuple.getRow().equals(row) && tuple.getColFamily().equals(colFamily) |
| && tuple.getColName().equals(colName)) { |
| mtuple = tuple; |
| break; |
| } |
| } |
| return mtuple; |
| } |
| |
| private static Configuration getConfiguration() { |
| Configuration conf = HBaseConfiguration.create(); |
| conf.set("hbase.zookeeper.quorum", "127.0.0.1"); |
| conf.set("hbase.zookeeper.property.clientPort", "2181"); |
| return conf; |
| } |
| |
| } |
| |