blob: 33f567e3f2db3df527658a543d21f5f79d3ec4ed [file] [log] [blame]
/*
* Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datatorrent.contrib.accumulo;
import java.io.IOException;
import java.util.List;
import java.util.Map.Entry;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.ZooKeeperInstance;
import org.apache.accumulo.core.client.admin.TableOperations;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.datatorrent.common.util.DTThrowable;
import org.apache.accumulo.core.client.mock.MockInstance;
public class AccumuloTestHelper {
static Connector con;
public static final byte[] colfam0_bytes = "colfam0".getBytes();
public static final byte[] col0_bytes = "col-0".getBytes();
private static final Logger logger = LoggerFactory.getLogger(AccumuloTestHelper.class);
public static void createTable() {
TableOperations tableoper = con.tableOperations();
if (!tableoper.exists("tab1")) {
try {
tableoper.create("tab1");
} catch (Exception e) {
logger.error("error in test helper");
DTThrowable.rethrow(e);
}
}
}
public static void clearTable() {
TableOperations tableoper = con.tableOperations();
if (!tableoper.exists("tab1")) {
try {
tableoper.create("tab1");
} catch (Exception e) {
logger.error("error in test helper");
DTThrowable.rethrow(e);
}
}
try {
tableoper.deleteRows("tab1", null, null);
} catch (AccumuloException e) {
logger.error("error in test helper");
DTThrowable.rethrow(e);
} catch (AccumuloSecurityException e) {
logger.error("error in test helper");
DTThrowable.rethrow(e);
} catch (TableNotFoundException e) {
logger.error("error in test helper");
DTThrowable.rethrow(e);
}
}
public static void populateAccumulo() throws IOException {
BatchWriterConfig config = new BatchWriterConfig();
BatchWriter batchwriter = null;
try {
batchwriter = con.createBatchWriter("tab1", config);
} catch (TableNotFoundException e) {
logger.error("error in test helper");
DTThrowable.rethrow(e);
}
try {
for (int i = 0; i < 500; ++i) {
String rowstr="row" + i;
Mutation mutation = new Mutation(rowstr.getBytes());
for (int j = 0; j < 500; ++j) {
String colstr="col" + "-" + j;
String valstr="val" + "-" + i + "-" + j;
mutation.put(colfam0_bytes,colstr.getBytes(),
System.currentTimeMillis(),
valstr.getBytes());
}
batchwriter.addMutation(mutation);
}
batchwriter.close();
} catch (MutationsRejectedException e) {
logger.error("error in test helper");
DTThrowable.rethrow(e);
}
}
public static AccumuloTuple findTuple(List<AccumuloTuple> tuples,
String row, String colFamily,String colQualifier,Long timestamp,String colVisibility) {
AccumuloTuple mtuple = null;
for (AccumuloTuple tuple : tuples) {
if (tuple.getRow().equals(row)
&& tuple.getColumnFamily().equals(colFamily)
&& tuple.getColumnQualifier().equals(colQualifier)
&& tuple.getColumnVisibility().equals(colVisibility)
&& tuple.getTimestamp()==(timestamp)) {
mtuple = tuple;
break;
}
}
return mtuple;
}
public static void deleteTable() {
TableOperations tableoper = con.tableOperations();
if (tableoper.exists("tab1")) {
try {
tableoper.delete("tab1");
} catch (AccumuloException e) {
logger.error("error in test helper");
DTThrowable.rethrow(e);
} catch (AccumuloSecurityException e) {
logger.error("error in test helper");
DTThrowable.rethrow(e);
} catch (TableNotFoundException e) {
logger.error("error in test helper");
DTThrowable.rethrow(e);
}
}
}
public static void getConnector() {
//MockInstance instance = new MockInstance();
Instance instance = new ZooKeeperInstance("instance", "127.0.0.1");
try {
logger.debug("connecting..");
con=instance.getConnector("root","".getBytes());
logger.debug("connection done..");
} catch (AccumuloException e) {
logger.error("error in test helper");
DTThrowable.rethrow(e);
} catch (AccumuloSecurityException e) {
logger.error("error in test helper");
DTThrowable.rethrow(e);
}
}
public static AccumuloTuple getAccumuloTuple(String row, String colFam,
String colName) {
Authorizations auths = new Authorizations();
Scanner scan = null;
try {
scan = con.createScanner("tab1", auths);
} catch (TableNotFoundException e) {
logger.error("error in test helper");
DTThrowable.rethrow(e);
}
scan.setRange(new Range(new Text(row)));
scan.fetchColumn(new Text(colFam), new Text(colName));
// assuming only one row
for (Entry<Key, Value> entry : scan) {
AccumuloTuple tuple = new AccumuloTuple();
tuple.setRow(entry.getKey().getRow().toString());
tuple.setColumnFamily(entry.getKey().getColumnFamily().toString());
tuple.setColumnQualifier(entry.getKey().getColumnQualifier().toString());
tuple.setColumnValue(entry.getValue().toString());
return tuple;
}
return null;
}
}