blob: 8797fefa1c8480358017e1208e31d12715bc5b15 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.accumulo.examples.filedata;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map.Entry;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.security.ColumnVisibility;
import org.apache.accumulo.harness.AccumuloClusterHarness;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
public class ChunkInputFormatIT extends AccumuloClusterHarness {
@Override
public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
cfg.setProperty(Property.TSERV_NATIVEMAP_ENABLED, "false");
}
// track errors in the map reduce job; jobs insert a dummy error for the map and cleanup tasks (to
// ensure test correctness), so error tests should check to see if there is at least one error
// (could be more depending on the test) rather than zero
private static Multimap<String,AssertionError> assertionErrors = ArrayListMultimap.create();
private static final Authorizations AUTHS = new Authorizations("A", "B", "C", "D");
private static List<Entry<Key,Value>> data;
private static List<Entry<Key,Value>> baddata;
private AccumuloClient client;
private String tableName;
@Before
public void setupInstance() throws Exception {
client = createAccumuloClient();
tableName = getUniqueNames(1)[0];
client.securityOperations().changeUserAuthorizations(client.whoami(), AUTHS);
}
@After
public void teardown() {
client.close();
}
@BeforeClass
public static void setupClass() {
System.setProperty("hadoop.tmp.dir", System.getProperty("user.dir") + "/target/hadoop-tmp");
data = new ArrayList<>();
ChunkInputStreamIT.addData(data, "a", "refs", "ida\0ext", "A&B", "ext");
ChunkInputStreamIT.addData(data, "a", "refs", "ida\0name", "A&B", "name");
ChunkInputStreamIT.addData(data, "a", "~chunk", 100, 0, "A&B", "asdfjkl;");
ChunkInputStreamIT.addData(data, "a", "~chunk", 100, 1, "A&B", "");
ChunkInputStreamIT.addData(data, "b", "refs", "ida\0ext", "A&B", "ext");
ChunkInputStreamIT.addData(data, "b", "refs", "ida\0name", "A&B", "name");
ChunkInputStreamIT.addData(data, "b", "~chunk", 100, 0, "A&B", "qwertyuiop");
ChunkInputStreamIT.addData(data, "b", "~chunk", 100, 0, "B&C", "qwertyuiop");
ChunkInputStreamIT.addData(data, "b", "~chunk", 100, 1, "A&B", "");
ChunkInputStreamIT.addData(data, "b", "~chunk", 100, 1, "B&C", "");
ChunkInputStreamIT.addData(data, "b", "~chunk", 100, 1, "D", "");
baddata = new ArrayList<>();
ChunkInputStreamIT.addData(baddata, "c", "refs", "ida\0ext", "A&B", "ext");
ChunkInputStreamIT.addData(baddata, "c", "refs", "ida\0name", "A&B", "name");
}
public static void entryEquals(Entry<Key,Value> e1, Entry<Key,Value> e2) {
assertEquals(e1.getKey(), e2.getKey());
assertEquals(e1.getValue(), e2.getValue());
}
public static class CIFTester extends Configured implements Tool {
public static class TestMapper
extends Mapper<List<Entry<Key,Value>>,InputStream,List<Entry<Key,Value>>,InputStream> {
int count = 0;
@Override
protected void map(List<Entry<Key,Value>> key, InputStream value, Context context)
throws IOException, InterruptedException {
String table = context.getConfiguration().get("MRTester_tableName");
assertNotNull(table);
byte[] b = new byte[20];
int read;
try {
switch (count) {
case 0:
assertEquals(key.size(), 2);
entryEquals(key.get(0), data.get(0));
entryEquals(key.get(1), data.get(1));
assertEquals(read = value.read(b), 8);
assertEquals(new String(b, 0, read), "asdfjkl;");
assertEquals(read = value.read(b), -1);
break;
case 1:
assertEquals(key.size(), 2);
entryEquals(key.get(0), data.get(4));
entryEquals(key.get(1), data.get(5));
assertEquals(read = value.read(b), 10);
assertEquals(new String(b, 0, read), "qwertyuiop");
assertEquals(read = value.read(b), -1);
break;
default:
fail();
}
} catch (AssertionError e) {
assertionErrors.put(table, e);
} finally {
value.close();
}
count++;
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
String table = context.getConfiguration().get("MRTester_tableName");
assertNotNull(table);
try {
assertEquals(2, count);
} catch (AssertionError e) {
assertionErrors.put(table, e);
}
}
}
public static class TestNoClose
extends Mapper<List<Entry<Key,Value>>,InputStream,List<Entry<Key,Value>>,InputStream> {
int count = 0;
@Override
protected void map(List<Entry<Key,Value>> key, InputStream value, Context context)
throws IOException, InterruptedException {
String table = context.getConfiguration().get("MRTester_tableName");
assertNotNull(table);
byte[] b = new byte[5];
int read;
try {
switch (count) {
case 0:
assertEquals(read = value.read(b), 5);
assertEquals(new String(b, 0, read), "asdfj");
break;
default:
fail();
}
} catch (AssertionError e) {
assertionErrors.put(table, e);
}
count++;
try {
context.nextKeyValue();
fail();
} catch (IOException ioe) {
assertionErrors.put(table + "_map_ioexception", new AssertionError(toString(), ioe));
}
}
}
public static class TestBadData
extends Mapper<List<Entry<Key,Value>>,InputStream,List<Entry<Key,Value>>,InputStream> {
@Override
protected void map(List<Entry<Key,Value>> key, InputStream value, Context context)
throws IOException, InterruptedException {
String table = context.getConfiguration().get("MRTester_tableName");
assertNotNull(table);
byte[] b = new byte[20];
try {
assertEquals(key.size(), 2);
entryEquals(key.get(0), baddata.get(0));
entryEquals(key.get(1), baddata.get(1));
} catch (AssertionError e) {
assertionErrors.put(table, e);
}
try {
assertFalse(value.read(b) > 0);
try {
fail();
} catch (AssertionError e) {
assertionErrors.put(table, e);
}
} catch (Exception e) {
// expected, ignore
}
try {
value.close();
try {
fail();
} catch (AssertionError e) {
assertionErrors.put(table, e);
}
} catch (Exception e) {
// expected, ignore
}
}
}
@Override
public int run(String[] args) throws Exception {
if (args.length != 2) {
throw new IllegalArgumentException(
"Usage : " + CIFTester.class.getName() + " <table> <mapperClass>");
}
String table = args[0];
assertionErrors.put(table, new AssertionError("Dummy"));
assertionErrors.put(table + "_map_ioexception", new AssertionError("Dummy_ioexception"));
getConf().set("MRTester_tableName", table);
Job job = Job.getInstance(getConf());
job.setJobName(this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
job.setJarByClass(this.getClass());
job.setInputFormatClass(ChunkInputFormat.class);
ChunkInputFormat.setZooKeeperInstance(job, getCluster().getClientConfig());
ChunkInputFormat.setConnectorInfo(job, getAdminPrincipal(), getAdminToken());
ChunkInputFormat.setInputTableName(job, table);
ChunkInputFormat.setScanAuthorizations(job, AUTHS);
@SuppressWarnings("unchecked")
Class<? extends Mapper<?,?,?,?>> forName = (Class<? extends Mapper<?,?,?,?>>) Class
.forName(args[1]);
job.setMapperClass(forName);
job.setMapOutputKeyClass(Key.class);
job.setMapOutputValueClass(Value.class);
job.setOutputFormatClass(NullOutputFormat.class);
job.setNumReduceTasks(0);
job.waitForCompletion(true);
return job.isSuccessful() ? 0 : 1;
}
public static int main(String... args) throws Exception {
Configuration conf = new Configuration();
conf.set("mapreduce.framework.name", "local");
conf.set("mapreduce.cluster.local.dir",
new File(System.getProperty("user.dir"), "target/mapreduce-tmp").getAbsolutePath());
return ToolRunner.run(conf, new CIFTester(), args);
}
}
@Test
public void test() throws Exception {
client.tableOperations().create(tableName);
BatchWriter bw = client.createBatchWriter(tableName, new BatchWriterConfig());
for (Entry<Key,Value> e : data) {
Key k = e.getKey();
Mutation m = new Mutation(k.getRow());
m.put(k.getColumnFamily(), k.getColumnQualifier(),
new ColumnVisibility(k.getColumnVisibility()), k.getTimestamp(), e.getValue());
bw.addMutation(m);
}
bw.close();
assertEquals(0, CIFTester.main(tableName, CIFTester.TestMapper.class.getName()));
assertEquals(1, assertionErrors.get(tableName).size());
}
@Test
public void testErrorOnNextWithoutClose() throws Exception {
client.tableOperations().create(tableName);
BatchWriter bw = client.createBatchWriter(tableName, new BatchWriterConfig());
for (Entry<Key,Value> e : data) {
Key k = e.getKey();
Mutation m = new Mutation(k.getRow());
m.put(k.getColumnFamily(), k.getColumnQualifier(),
new ColumnVisibility(k.getColumnVisibility()), k.getTimestamp(), e.getValue());
bw.addMutation(m);
}
bw.close();
assertEquals(1, CIFTester.main(tableName, CIFTester.TestNoClose.class.getName()));
assertEquals(1, assertionErrors.get(tableName).size());
// this should actually exist, in addition to the dummy entry
assertEquals(2, assertionErrors.get(tableName + "_map_ioexception").size());
}
@Test
public void testInfoWithoutChunks() throws Exception {
client.tableOperations().create(tableName);
BatchWriter bw = client.createBatchWriter(tableName, new BatchWriterConfig());
for (Entry<Key,Value> e : baddata) {
Key k = e.getKey();
Mutation m = new Mutation(k.getRow());
m.put(k.getColumnFamily(), k.getColumnQualifier(),
new ColumnVisibility(k.getColumnVisibility()), k.getTimestamp(), e.getValue());
bw.addMutation(m);
}
bw.close();
assertEquals(0, CIFTester.main(tableName, CIFTester.TestBadData.class.getName()));
assertEquals(1, assertionErrors.get(tableName).size());
}
}