blob: 80a740fb11d65c250c600dd783a04c05ff033d31 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.fluo.recipes.test;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import com.google.common.base.Splitter;
import com.google.common.collect.Iterables;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.fluo.api.client.FluoClient;
import org.apache.fluo.api.client.FluoFactory;
import org.apache.fluo.api.client.Snapshot;
import org.apache.fluo.api.config.FluoConfiguration;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.data.RowColumnValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Helper for creating integration tests that connect to a MiniFluo/MiniAccumuloCluster instance.
*
* @since 1.0.0
*/
public class FluoITHelper {
private static final Logger log = LoggerFactory.getLogger(FluoITHelper.class);
/**
* Prints list of RowColumnValue objects
*
* @param rcvList RowColumnValue list
*/
public static void printRowColumnValues(Collection<RowColumnValue> rcvList) {
System.out.println("== RDD start ==");
rcvList.forEach(rcv -> System.out.println("rc " + Hex.encNonAscii(rcv, " ")));
System.out.println("== RDD end ==");
}
public static void printFluoTable(FluoConfiguration conf) {
try (FluoClient client = FluoFactory.newClient(conf)) {
printFluoTable(client);
}
}
/**
* Prints Fluo table accessible using provided client
*
* @param client Fluo client to table
*/
public static void printFluoTable(FluoClient client) {
try (Snapshot s = client.newSnapshot()) {
System.out.println("== fluo start ==");
for (RowColumnValue rcv : s.scanner().build()) {
StringBuilder sb = new StringBuilder();
Hex.encNonAscii(sb, rcv.getRow());
sb.append(" ");
Hex.encNonAscii(sb, rcv.getColumn(), " ");
sb.append("\t");
Hex.encNonAscii(sb, rcv.getValue());
System.out.println(sb.toString());
}
System.out.println("=== fluo end ===");
}
}
// @formatter:off
public static boolean verifyFluoTable(FluoConfiguration conf,
Collection<RowColumnValue> expected) {
// @formatter:on
try (FluoClient client = FluoFactory.newClient(conf)) {
return verifyFluoTable(client, expected);
}
}
/**
* Verifies that the actual data in provided Fluo instance matches expected data
*
* @param client Fluo client to instance with actual data
* @param expected RowColumnValue list containing expected data
* @return True if actual data matches expected data
*/
public static boolean verifyFluoTable(FluoClient client, Collection<RowColumnValue> expected) {
expected = sort(expected);
try (Snapshot s = client.newSnapshot()) {
Iterator<RowColumnValue> fluoIter = s.scanner().build().iterator();
Iterator<RowColumnValue> rcvIter = expected.iterator();
while (fluoIter.hasNext() && rcvIter.hasNext()) {
RowColumnValue actualRcv = fluoIter.next();
RowColumnValue rcv = rcvIter.next();
boolean retval = diff("fluo row", rcv.getRow(), actualRcv.getRow());
retval |= diff("fluo fam", rcv.getColumn().getFamily(), actualRcv.getColumn().getFamily());
retval |=
diff("fluo qual", rcv.getColumn().getQualifier(), actualRcv.getColumn().getQualifier());
retval |= diff("fluo val", rcv.getValue(), actualRcv.getValue());
if (retval) {
log.error("Difference found - row {} cf {} cq {} val {}", rcv.getsRow(),
rcv.getColumn().getsFamily(), rcv.getColumn().getsQualifier(), rcv.getsValue());
return false;
}
log.debug("Verified {}", Hex.encNonAscii(rcv, " "));
}
if (fluoIter.hasNext() || rcvIter.hasNext()) {
log.error("An iterator still has more data");
return false;
}
log.debug("Actual data matched expected data");
return true;
}
}
/**
* Prints specified Accumulo table (accessible using Accumulo connector parameter)
*
* @param conn Accumulo connector of to instance with table to print
* @param accumuloTable Accumulo table to print
* @deprecated since 1.3.0 use {@link #printAccumuloTable(AccumuloClient, String)}
*/
@Deprecated(since = "1.3.0", forRemoval = true)
public static void printAccumuloTable(Connector conn, String accumuloTable) {
Scanner scanner = null;
try {
scanner = conn.createScanner(accumuloTable, Authorizations.EMPTY);
} catch (TableNotFoundException e) {
throw new IllegalStateException(e);
}
Iterator<Map.Entry<Key, Value>> iterator = scanner.iterator();
System.out.println("== accumulo start ==");
while (iterator.hasNext()) {
Map.Entry<Key, Value> entry = iterator.next();
System.out.println(entry.getKey() + " " + entry.getValue());
}
System.out.println("== accumulo end ==");
}
/**
* Prints specified Accumulo table
*
* @param client Accumulo clientto instance with table to print
* @param accumuloTable Accumulo table to print
*
* @since 1.3.0
*/
@SuppressWarnings("deprecation")
public static void printAccumuloTable(AccumuloClient client, String accumuloTable) {
try {
printAccumuloTable(Connector.from(client), accumuloTable);
} catch (AccumuloSecurityException | AccumuloException e) {
throw new RuntimeException(e);
}
}
private static boolean diff(String dataType, String expected, String actual) {
if (!expected.equals(actual)) {
log.error("Difference found in {} - expected {} actual {}", dataType, expected, actual);
return true;
}
return false;
}
private static boolean diff(String dataType, Bytes expected, Bytes actual) {
if (!expected.equals(actual)) {
log.error("Difference found in {} - expected {} actual {}", dataType,
Hex.encNonAscii(expected), Hex.encNonAscii(actual));
return true;
}
return false;
}
/**
* Verifies that actual data in Accumulo table matches expected data
*
* @param conn Connector to Accumulo instance with actual data
* @param accumuloTable Accumulo table with actual data
* @param expected RowColumnValue list containing expected data
* @return True if actual data matches expected data
* @deprecated since 1.3.0 use {@link #verifyAccumuloTable(AccumuloClient, String, Collection)}
*/
@Deprecated(since = "1.3.0", forRemoval = true)
public static boolean verifyAccumuloTable(Connector conn, String accumuloTable,
Collection<RowColumnValue> expected) {
expected = sort(expected);
Scanner scanner;
try {
scanner = conn.createScanner(accumuloTable, Authorizations.EMPTY);
} catch (TableNotFoundException e) {
throw new IllegalStateException(e);
}
Iterator<Map.Entry<Key, Value>> scanIter = scanner.iterator();
Iterator<RowColumnValue> rcvIter = expected.iterator();
while (scanIter.hasNext() && rcvIter.hasNext()) {
RowColumnValue rcv = rcvIter.next();
Map.Entry<Key, Value> kvEntry = scanIter.next();
Key key = kvEntry.getKey();
Column col = rcv.getColumn();
boolean retval = diff("row", rcv.getRow().toString(), key.getRow().toString());
retval |= diff("fam", col.getFamily().toString(), key.getColumnFamily().toString());
retval |= diff("qual", col.getQualifier().toString(), key.getColumnQualifier().toString());
retval |= diff("val", rcv.getValue().toString(), kvEntry.getValue().toString());
if (retval) {
log.error("Difference found - row {} cf {} cq {} val {}", rcv.getRow().toString(),
col.getFamily().toString(), col.getQualifier().toString(), rcv.getValue().toString());
return false;
}
log.debug("Verified row {} cf {} cq {} val {}", rcv.getRow().toString(),
col.getFamily().toString(), col.getQualifier().toString(), rcv.getValue().toString());
}
if (scanIter.hasNext() || rcvIter.hasNext()) {
log.error("An iterator still has more data");
return false;
}
log.debug("Actual data matched expected data");
return true;
}
/**
* Verifies that actual data in Accumulo table matches expected data
*
* @param client Client from Accumulo instance with actual data
* @param accumuloTable Accumulo table with actual data
* @param expected RowColumnValue list containing expected data
* @return True if actual data matches expected data
*
* @since 1.3.0
*/
@SuppressWarnings("deprecation")
public static boolean verifyAccumuloTable(AccumuloClient client, String accumuloTable,
Collection<RowColumnValue> expected) {
try {
return verifyAccumuloTable(Connector.from(client), accumuloTable, expected);
} catch (AccumuloSecurityException | AccumuloException e) {
throw new RuntimeException(e);
}
}
/**
* Verifies that expected list of RowColumnValues matches actual
*
* @param expected RowColumnValue list containing expected data
* @param actual RowColumnValue list containing actual data
* @return True if actual data matches expected data
*/
public static boolean verifyRowColumnValues(Collection<RowColumnValue> expected,
Collection<RowColumnValue> actual) {
expected = sort(expected);
actual = sort(actual);
Iterator<RowColumnValue> expectIter = expected.iterator();
Iterator<RowColumnValue> actualIter = actual.iterator();
while (expectIter.hasNext() && actualIter.hasNext()) {
RowColumnValue expRcv = expectIter.next();
RowColumnValue actRcv = actualIter.next();
boolean retval = diff("rcv row", expRcv.getRow(), actRcv.getRow());
retval |= diff("rcv fam", expRcv.getColumn().getFamily(), actRcv.getColumn().getFamily());
retval |=
diff("rcv qual", expRcv.getColumn().getQualifier(), actRcv.getColumn().getQualifier());
retval |= diff("rcv val", expRcv.getValue(), actRcv.getValue());
if (retval) {
log.error("Difference found in RowColumnValue lists - expected {} actual {}", expRcv,
actRcv);
return false;
}
log.debug("Verified row/col/val: {}", expRcv);
}
if (expectIter.hasNext() || actualIter.hasNext()) {
log.error("A RowColumnValue list iterator still has more data");
return false;
}
log.debug("Actual data matched expected data");
return true;
}
private static List<RowColumnValue> sort(Collection<RowColumnValue> input) {
ArrayList<RowColumnValue> copy = new ArrayList<>(input);
Collections.sort(copy);
return copy;
}
/**
* A helper method for parsing test data. Each string passed in is expected to have the following
* format {@literal <row>|<family>|<qualifier>|<value>}
*/
public static List<RowColumnValue> parse(String... data) {
return parse(Splitter.on('|'), data);
}
/**
* A helper method for parsing test data. Each string passed in is split using the specified
* splitter into four fields for row, family, qualifier, and value.
*/
public static List<RowColumnValue> parse(Splitter splitter, String... data) {
ArrayList<RowColumnValue> ret = new ArrayList<>();
for (String line : data) {
Iterable<String> cols = splitter.split(line);
if (Iterables.size(cols) != 4) {
throw new IllegalArgumentException("Bad input " + line);
}
Iterator<String> iter = cols.iterator();
RowColumnValue rcv = new RowColumnValue(Bytes.of(iter.next()),
new Column(iter.next(), iter.next()), Bytes.of(iter.next()));
ret.add(rcv);
}
return ret;
}
}