/// *
// * Licensed to the Apache Software Foundation (ASF) under one or more
// * contributor license agreements. See the NOTICE file distributed with
// * this work for additional information regarding copyright ownership.
// * The ASF licenses this file to You under the Apache License, Version 2.0
// * (the "License"); you may not use this file except in compliance with
// * the License. You may obtain a copy of the License at
// *
// * http://www.apache.org/licenses/LICENSE-2.0
// *
// * Unless required by applicable law or agreed to in writing, software
// * distributed under the License is distributed on an "AS IS" BASIS,
// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// * See the License for the specific language governing permissions and
// * limitations under the License.
// */
//
// package org.apache.accumulo.examples.filedata;
//
// import static org.junit.jupiter.api.Assertions.assertEquals;
// import static org.junit.jupiter.api.Assertions.assertFalse;
// import static org.junit.jupiter.api.Assertions.assertNotNull;
// import static org.junit.jupiter.api.Assertions.fail;
//
// import java.io.File;
// import java.io.IOException;
// import java.io.InputStream;
// import java.util.ArrayList;
// import java.util.List;
// import java.util.Map.Entry;
//
// import org.apache.accumulo.core.client.Accumulo;
// import org.apache.accumulo.core.client.AccumuloClient;
// import org.apache.accumulo.core.client.BatchWriter;
// import org.apache.accumulo.core.client.BatchWriterConfig;
// import org.apache.accumulo.core.conf.Property;
// import org.apache.accumulo.core.data.Key;
// import org.apache.accumulo.core.data.Mutation;
// import org.apache.accumulo.core.data.Value;
// import org.apache.accumulo.core.security.Authorizations;
// import org.apache.accumulo.core.security.ColumnVisibility;
// import org.apache.accumulo.harness.AccumuloClusterHarness;
// import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
// import org.apache.hadoop.conf.Configuration;
// import org.apache.hadoop.conf.Configured;
// import org.apache.hadoop.mapreduce.Job;
// import org.apache.hadoop.mapreduce.Mapper;
// import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
// import org.apache.hadoop.util.Tool;
// import org.apache.hadoop.util.ToolRunner;
// import org.junit.jupiter.api.AfterEach;
// import org.junit.jupiter.api.BeforeAll;
// import org.junit.jupiter.api.BeforeEach;
// import org.junit.jupiter.api.Test;
//
// import com.google.common.collect.ArrayListMultimap;
// import com.google.common.collect.Multimap;
//
// public class ChunkInputFormatIT extends AccumuloClusterHarness {
// @Override
// public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
// cfg.setProperty(Property.TSERV_NATIVEMAP_ENABLED, "false");
// }
//
// // track errors in the map reduce job; jobs insert a dummy error for the map and cleanup tasks
/// (to
// // ensure test correctness), so error tests should check to see if there is at least one error
// // (could be more depending on the test) rather than zero
// private static final Multimap<String,AssertionError> assertionErrors =
/// ArrayListMultimap.create();
//
// private static final Authorizations AUTHS = new Authorizations("A", "B", "C", "D");
//
// private static List<Entry<Key,Value>> data;
// private static List<Entry<Key,Value>> baddata;
//
// private AccumuloClient client;
// private String tableName;
//
// @BeforeEach
// public void setupInstance() throws Exception {
// client = Accumulo.newClient().from(getClientProps()).build();
// tableName = getUniqueNames(1)[0];
// client.securityOperations().changeUserAuthorizations(client.whoami(), AUTHS);
// }
//
// @AfterEach
// public void teardown() {
// client.close();
// }
//
// @BeforeAll
// public static void setupClass() {
// System.setProperty("hadoop.tmp.dir", System.getProperty("user.dir") + "/target/hadoop-tmp");
//
// data = new ArrayList<>();
// ChunkInputStreamIT.addData(data, "a", "refs", "ida\0ext", "A&B", "ext");
// ChunkInputStreamIT.addData(data, "a", "refs", "ida\0name", "A&B", "name");
// ChunkInputStreamIT.addData(data, "a", "~chunk", 100, 0, "A&B", "asdfjkl;");
// ChunkInputStreamIT.addData(data, "a", "~chunk", 100, 1, "A&B", "");
// ChunkInputStreamIT.addData(data, "b", "refs", "ida\0ext", "A&B", "ext");
// ChunkInputStreamIT.addData(data, "b", "refs", "ida\0name", "A&B", "name");
// ChunkInputStreamIT.addData(data, "b", "~chunk", 100, 0, "A&B", "qwertyuiop");
// ChunkInputStreamIT.addData(data, "b", "~chunk", 100, 0, "B&C", "qwertyuiop");
// ChunkInputStreamIT.addData(data, "b", "~chunk", 100, 1, "A&B", "");
// ChunkInputStreamIT.addData(data, "b", "~chunk", 100, 1, "B&C", "");
// ChunkInputStreamIT.addData(data, "b", "~chunk", 100, 1, "D", "");
// baddata = new ArrayList<>();
// ChunkInputStreamIT.addData(baddata, "c", "refs", "ida\0ext", "A&B", "ext");
// ChunkInputStreamIT.addData(baddata, "c", "refs", "ida\0name", "A&B", "name");
// }
//
// public static void entryEquals(Entry<Key,Value> e1, Entry<Key,Value> e2) {
// assertEquals(e1.getKey(), e2.getKey());
// assertEquals(e1.getValue(), e2.getValue());
// }
//
// public static class CIFTester extends Configured implements Tool {
// public static class TestMapper
// extends Mapper<List<Entry<Key,Value>>,InputStream,List<Entry<Key,Value>>,InputStream> {
// int count = 0;
//
// @Override
// protected void map(List<Entry<Key,Value>> key, InputStream value, Context context)
// throws IOException {
// String table = context.getConfiguration().get("MRTester_tableName");
// assertNotNull(table);
//
// byte[] b = new byte[20];
// int read;
// try (value) {
// switch (count) {
// case 0:
// assertEquals(key.size(), 2);
// entryEquals(key.get(0), data.get(0));
// entryEquals(key.get(1), data.get(1));
// assertEquals(read = value.read(b), 8);
// assertEquals(new String(b, 0, read), "asdfjkl;");
// assertEquals(read = value.read(b), -1);
// break;
// case 1:
// assertEquals(key.size(), 2);
// entryEquals(key.get(0), data.get(4));
// entryEquals(key.get(1), data.get(5));
// assertEquals(read = value.read(b), 10);
// assertEquals(new String(b, 0, read), "qwertyuiop");
// assertEquals(read = value.read(b), -1);
// break;
// default:
// fail();
// }
// } catch (AssertionError e) {
// assertionErrors.put(table, e);
// }
// count++;
// }
//
// @Override
// protected void cleanup(Context context) {
// String table = context.getConfiguration().get("MRTester_tableName");
// assertNotNull(table);
//
// try {
// assertEquals(2, count);
// } catch (AssertionError e) {
// assertionErrors.put(table, e);
// }
// }
// }
//
// public static class TestNoClose
// extends Mapper<List<Entry<Key,Value>>,InputStream,List<Entry<Key,Value>>,InputStream> {
// int count = 0;
//
// @Override
// protected void map(List<Entry<Key,Value>> key, InputStream value, Context context)
// throws IOException, InterruptedException {
// String table = context.getConfiguration().get("MRTester_tableName");
// assertNotNull(table);
//
// byte[] b = new byte[5];
// int read;
// try {
// switch (count) {
// case 0:
// assertEquals(read = value.read(b), 5);
// assertEquals(new String(b, 0, read), "asdfj");
// break;
// default:
// fail();
// }
// } catch (AssertionError e) {
// assertionErrors.put(table, e);
// }
// count++;
// try {
// context.nextKeyValue();
// fail();
// } catch (IOException ioe) {
// assertionErrors.put(table + "_map_ioexception", new AssertionError(toString(), ioe));
// }
// }
// }
//
// public static class TestBadData
// extends Mapper<List<Entry<Key,Value>>,InputStream,List<Entry<Key,Value>>,InputStream> {
// @Override
// protected void map(List<Entry<Key,Value>> key, InputStream value, Context context) {
// String table = context.getConfiguration().get("MRTester_tableName");
// assertNotNull(table);
//
// byte[] b = new byte[20];
// try {
// assertEquals(key.size(), 2);
// entryEquals(key.get(0), baddata.get(0));
// entryEquals(key.get(1), baddata.get(1));
// } catch (AssertionError e) {
// assertionErrors.put(table, e);
// }
// try {
// assertFalse(value.read(b) > 0);
// try {
// fail();
// } catch (AssertionError e) {
// assertionErrors.put(table, e);
// }
// } catch (Exception e) {
// // expected, ignore
// }
// try {
// value.close();
// try {
// fail();
// } catch (AssertionError e) {
// assertionErrors.put(table, e);
// }
// } catch (Exception e) {
// // expected, ignore
// }
// }
// }
//
// @SuppressWarnings("deprecation")
// @Override
// public int run(String[] args) throws Exception {
// if (args.length != 2) {
// throw new IllegalArgumentException(
// "Usage : " + CIFTester.class.getName() + " <table> <mapperClass>");
// }
//
// String table = args[0];
// assertionErrors.put(table, new AssertionError("Dummy"));
// assertionErrors.put(table + "_map_ioexception", new AssertionError("Dummy_ioexception"));
// getConf().set("MRTester_tableName", table);
//
// Job job = Job.getInstance(getConf());
// job.setJobName(this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
// job.setJarByClass(this.getClass());
//
// job.setInputFormatClass(ChunkInputFormat.class);
//
// ChunkInputFormat.setZooKeeperInstance(job, getCluster().getClientConfig());
// ChunkInputFormat.setConnectorInfo(job, getAdminPrincipal(), getAdminToken());
// ChunkInputFormat.setInputTableName(job, table);
// ChunkInputFormat.setScanAuthorizations(job, AUTHS);
//
// @SuppressWarnings("unchecked")
// Class<? extends Mapper<?,?,?,?>> forName = (Class<? extends Mapper<?,?,?,?>>) Class
// .forName(args[1]);
// job.setMapperClass(forName);
// job.setMapOutputKeyClass(Key.class);
// job.setMapOutputValueClass(Value.class);
// job.setOutputFormatClass(NullOutputFormat.class);
//
// job.setNumReduceTasks(0);
//
// job.waitForCompletion(true);
//
// return job.isSuccessful() ? 0 : 1;
// }
//
// public static int main(String... args) throws Exception {
// Configuration conf = new Configuration();
// conf.set("mapreduce.framework.name", "local");
// conf.set("mapreduce.cluster.local.dir",
// new File(System.getProperty("user.dir"), "target/mapreduce-tmp").getAbsolutePath());
// return ToolRunner.run(conf, new CIFTester(), args);
// }
// }
//
// @Test
// public void test() throws Exception {
// client.tableOperations().create(tableName);
// BatchWriter bw = client.createBatchWriter(tableName, new BatchWriterConfig());
//
// for (Entry<Key,Value> e : data) {
// Key k = e.getKey();
// Mutation m = new Mutation(k.getRow());
// m.put(k.getColumnFamily(), k.getColumnQualifier(),
// new ColumnVisibility(k.getColumnVisibility()), k.getTimestamp(), e.getValue());
// bw.addMutation(m);
// }
// bw.close();
//
// assertEquals(0, CIFTester.main(tableName, CIFTester.TestMapper.class.getName()));
// assertEquals(1, assertionErrors.get(tableName).size());
// }
//
// @Test
// public void testErrorOnNextWithoutClose() throws Exception {
// client.tableOperations().create(tableName);
// BatchWriter bw = client.createBatchWriter(tableName, new BatchWriterConfig());
//
// for (Entry<Key,Value> e : data) {
// Key k = e.getKey();
// Mutation m = new Mutation(k.getRow());
// m.put(k.getColumnFamily(), k.getColumnQualifier(),
// new ColumnVisibility(k.getColumnVisibility()), k.getTimestamp(), e.getValue());
// bw.addMutation(m);
// }
// bw.close();
//
// assertEquals(1, CIFTester.main(tableName, CIFTester.TestNoClose.class.getName()));
// assertEquals(1, assertionErrors.get(tableName).size());
// // this should actually exist, in addition to the dummy entry
// assertEquals(2, assertionErrors.get(tableName + "_map_ioexception").size());
// }
//
// @Test
// public void testInfoWithoutChunks() throws Exception {
// client.tableOperations().create(tableName);
// BatchWriter bw = client.createBatchWriter(tableName, new BatchWriterConfig());
// for (Entry<Key,Value> e : baddata) {
// Key k = e.getKey();
// Mutation m = new Mutation(k.getRow());
// m.put(k.getColumnFamily(), k.getColumnQualifier(),
// new ColumnVisibility(k.getColumnVisibility()), k.getTimestamp(), e.getValue());
// bw.addMutation(m);
// }
// bw.close();
//
// assertEquals(0, CIFTester.main(tableName, CIFTester.TestBadData.class.getName()));
// assertEquals(1, assertionErrors.get(tableName).size());
// }
// }
