Comment out portions of filedata example (#108)
With the example repo now targeting Accumulo-3.0.0-SNAPSHOT, the
filedata example now longer works. In addition, it prevents the
project from building. Several files have been commented in order to
allow the project to build.
A follow-on issue will be created to address the non-working example.
diff --git a/src/main/java/org/apache/accumulo/examples/filedata/CharacterHistogram.java b/src/main/java/org/apache/accumulo/examples/filedata/CharacterHistogram.java
index 01c08bc..d24c4c9 100644
--- a/src/main/java/org/apache/accumulo/examples/filedata/CharacterHistogram.java
+++ b/src/main/java/org/apache/accumulo/examples/filedata/CharacterHistogram.java
@@ -1,112 +1,112 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.examples.filedata;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.Properties;
-
-import org.apache.accumulo.core.client.security.tokens.PasswordToken;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.iterators.user.SummingArrayCombiner;
-import org.apache.accumulo.core.security.ColumnVisibility;
-import org.apache.accumulo.examples.cli.ClientOpts;
-import org.apache.accumulo.hadoop.mapreduce.AccumuloOutputFormat;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-
-import com.beust.jcommander.Parameter;
-
-/**
- * A MapReduce that computes a histogram of byte frequency for each file and stores the histogram
- * alongside the file data. The {@link ChunkInputFormat} is used to read the file data from
- * Accumulo.
- */
-public class CharacterHistogram {
-
- private static final String VIS = "vis";
-
- public static class HistMapper extends Mapper<List<Entry<Key,Value>>,InputStream,Text,Mutation> {
- private ColumnVisibility cv;
-
- @Override
- public void map(List<Entry<Key,Value>> k, InputStream v, Context context)
- throws IOException, InterruptedException {
- Long[] hist = new Long[256];
- Arrays.fill(hist, 0L);
- int b = v.read();
- while (b >= 0) {
- hist[b] += 1L;
- b = v.read();
- }
- v.close();
- Mutation m = new Mutation(k.get(0).getKey().getRow());
- m.put("info", "hist", cv,
- new Value(SummingArrayCombiner.STRING_ARRAY_ENCODER.encode(Arrays.asList(hist))));
- context.write(new Text(), m);
- }
-
- @Override
- protected void setup(Context context) {
- cv = new ColumnVisibility(context.getConfiguration().get(VIS, ""));
- }
- }
-
- static class Opts extends ClientOpts {
- @Parameter(names = {"-t", "--table"}, required = true, description = "table to use")
- String tableName;
- @Parameter(names = "--vis")
- String visibilities = "";
- }
-
- @SuppressWarnings("deprecation")
- public static void main(String[] args) throws Exception {
- Opts opts = new Opts();
- opts.parseArgs(CharacterHistogram.class.getName(), args);
-
- Job job = Job.getInstance(opts.getHadoopConfig());
- job.setJobName(CharacterHistogram.class.getSimpleName());
- job.setJarByClass(CharacterHistogram.class);
- job.setInputFormatClass(ChunkInputFormat.class);
- job.getConfiguration().set(VIS, opts.visibilities);
- job.setMapperClass(HistMapper.class);
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(Mutation.class);
-
- job.setNumReduceTasks(0);
-
- Properties props = opts.getClientProperties();
- ChunkInputFormat.setZooKeeperInstance(job, props.getProperty("instance.name"),
- props.getProperty("instance.zookeepers"));
- PasswordToken token = new PasswordToken(props.getProperty("auth.token"));
- ChunkInputFormat.setConnectorInfo(job, props.getProperty("auth.principal"), token);
- ChunkInputFormat.setInputTableName(job, opts.tableName);
- ChunkInputFormat.setScanAuthorizations(job, opts.auths);
-
- job.setOutputFormatClass(AccumuloOutputFormat.class);
- AccumuloOutputFormat.configure().clientProperties(opts.getClientProperties())
- .defaultTable(opts.tableName).createTables(true).store(job);
-
- System.exit(job.waitForCompletion(true) ? 0 : 1);
- }
-}
+/// *
+// * Licensed to the Apache Software Foundation (ASF) under one or more
+// * contributor license agreements. See the NOTICE file distributed with
+// * this work for additional information regarding copyright ownership.
+// * The ASF licenses this file to You under the Apache License, Version 2.0
+// * (the "License"); you may not use this file except in compliance with
+// * the License. You may obtain a copy of the License at
+// *
+// * http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing, software
+// * distributed under the License is distributed on an "AS IS" BASIS,
+// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// * See the License for the specific language governing permissions and
+// * limitations under the License.
+// */
+// package org.apache.accumulo.examples.filedata;
+//
+// import java.io.IOException;
+// import java.io.InputStream;
+// import java.util.Arrays;
+// import java.util.List;
+// import java.util.Map.Entry;
+// import java.util.Properties;
+//
+// import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+// import org.apache.accumulo.core.data.Key;
+// import org.apache.accumulo.core.data.Mutation;
+// import org.apache.accumulo.core.data.Value;
+// import org.apache.accumulo.core.iterators.user.SummingArrayCombiner;
+// import org.apache.accumulo.core.security.ColumnVisibility;
+// import org.apache.accumulo.examples.cli.ClientOpts;
+// import org.apache.accumulo.hadoop.mapreduce.AccumuloOutputFormat;
+// import org.apache.hadoop.io.Text;
+// import org.apache.hadoop.mapreduce.Job;
+// import org.apache.hadoop.mapreduce.Mapper;
+//
+// import com.beust.jcommander.Parameter;
+//
+/// **
+// * A MapReduce that computes a histogram of byte frequency for each file and stores the histogram
+// * alongside the file data. The {@link ChunkInputFormat} is used to read the file data from
+// * Accumulo.
+// */
+// public class CharacterHistogram {
+//
+// private static final String VIS = "vis";
+//
+// public static class HistMapper extends Mapper<List<Entry<Key,Value>>,InputStream,Text,Mutation> {
+// private ColumnVisibility cv;
+//
+// @Override
+// public void map(List<Entry<Key,Value>> k, InputStream v, Context context)
+// throws IOException, InterruptedException {
+// Long[] hist = new Long[256];
+// Arrays.fill(hist, 0L);
+// int b = v.read();
+// while (b >= 0) {
+// hist[b] += 1L;
+// b = v.read();
+// }
+// v.close();
+// Mutation m = new Mutation(k.get(0).getKey().getRow());
+// m.put("info", "hist", cv,
+// new Value(SummingArrayCombiner.STRING_ARRAY_ENCODER.encode(Arrays.asList(hist))));
+// context.write(new Text(), m);
+// }
+//
+// @Override
+// protected void setup(Context context) {
+// cv = new ColumnVisibility(context.getConfiguration().get(VIS, ""));
+// }
+// }
+//
+// static class Opts extends ClientOpts {
+// @Parameter(names = {"-t", "--table"}, required = true, description = "table to use")
+// String tableName;
+// @Parameter(names = "--vis")
+// String visibilities = "";
+// }
+//
+// @SuppressWarnings("deprecation")
+// public static void main(String[] args) throws Exception {
+// Opts opts = new Opts();
+// opts.parseArgs(CharacterHistogram.class.getName(), args);
+//
+// Job job = Job.getInstance(opts.getHadoopConfig());
+// job.setJobName(CharacterHistogram.class.getSimpleName());
+// job.setJarByClass(CharacterHistogram.class);
+// job.setInputFormatClass(ChunkInputFormat.class);
+// job.getConfiguration().set(VIS, opts.visibilities);
+// job.setMapperClass(HistMapper.class);
+// job.setMapOutputKeyClass(Text.class);
+// job.setMapOutputValueClass(Mutation.class);
+//
+// job.setNumReduceTasks(0);
+//
+// Properties props = opts.getClientProperties();
+// ChunkInputFormat.setZooKeeperInstance(job, props.getProperty("instance.name"),
+// props.getProperty("instance.zookeepers"));
+// PasswordToken token = new PasswordToken(props.getProperty("auth.token"));
+// ChunkInputFormat.setConnectorInfo(job, props.getProperty("auth.principal"), token);
+// ChunkInputFormat.setInputTableName(job, opts.tableName);
+// ChunkInputFormat.setScanAuthorizations(job, opts.auths);
+//
+// job.setOutputFormatClass(AccumuloOutputFormat.class);
+// AccumuloOutputFormat.configure().clientProperties(opts.getClientProperties())
+// .defaultTable(opts.tableName).createTables(true).store(job);
+//
+// System.exit(job.waitForCompletion(true) ? 0 : 1);
+// }
+// }
diff --git a/src/main/java/org/apache/accumulo/examples/filedata/ChunkInputFormat.java b/src/main/java/org/apache/accumulo/examples/filedata/ChunkInputFormat.java
index 914375f..00965a2 100644
--- a/src/main/java/org/apache/accumulo/examples/filedata/ChunkInputFormat.java
+++ b/src/main/java/org/apache/accumulo/examples/filedata/ChunkInputFormat.java
@@ -1,84 +1,85 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.examples.filedata;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map.Entry;
-
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.examples.util.FormatUtil;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-import com.google.common.collect.Iterators;
-import com.google.common.collect.PeekingIterator;
-
-/**
- * An InputFormat that turns the file data ingested with {@link FileDataIngest} into an InputStream
- * using {@link ChunkInputStream}. Mappers used with this InputFormat must close the InputStream.
- */
-@SuppressWarnings("deprecation")
-public class ChunkInputFormat extends
- org.apache.accumulo.core.client.mapreduce.InputFormatBase<List<Entry<Key,Value>>,InputStream> {
- @Override
- public RecordReader<List<Entry<Key,Value>>,InputStream> createRecordReader(InputSplit split,
- TaskAttemptContext context) {
- return new RecordReaderBase<>() {
- private PeekingIterator<Entry<Key,Value>> peekingScannerIterator;
-
- @Override
- public void initialize(InputSplit inSplit, TaskAttemptContext attempt) throws IOException {
- super.initialize(inSplit, attempt);
- peekingScannerIterator = Iterators.peekingIterator(scannerIterator);
- currentK = new ArrayList<>();
- currentV = new ChunkInputStream();
- }
-
- @Override
- public boolean nextKeyValue() throws IOException {
- log.debug("nextKeyValue called");
-
- currentK.clear();
- if (peekingScannerIterator.hasNext()) {
- ++numKeysRead;
- Entry<Key,Value> entry = peekingScannerIterator.peek();
- while (!entry.getKey().getColumnFamily().equals(FileDataIngest.CHUNK_CF)) {
- currentK.add(entry);
- peekingScannerIterator.next();
- if (!peekingScannerIterator.hasNext()) {
- return true;
- }
- entry = peekingScannerIterator.peek();
- }
- currentKey = entry.getKey();
- ((ChunkInputStream) currentV).setSource(peekingScannerIterator);
- if (log.isTraceEnabled()) {
- log.trace("Processing key/value pair: " + FormatUtil.formatTableEntry(entry, true));
- }
-
- return true;
- }
- return false;
- }
- };
- }
-}
+/// *
+// * Licensed to the Apache Software Foundation (ASF) under one or more
+// * contributor license agreements. See the NOTICE file distributed with
+// * this work for additional information regarding copyright ownership.
+// * The ASF licenses this file to You under the Apache License, Version 2.0
+// * (the "License"); you may not use this file except in compliance with
+// * the License. You may obtain a copy of the License at
+// *
+// * http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing, software
+// * distributed under the License is distributed on an "AS IS" BASIS,
+// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// * See the License for the specific language governing permissions and
+// * limitations under the License.
+// */
+// package org.apache.accumulo.examples.filedata;
+//
+// import java.io.IOException;
+// import java.io.InputStream;
+// import java.util.ArrayList;
+// import java.util.List;
+// import java.util.Map.Entry;
+//
+// import org.apache.accumulo.core.data.Key;
+// import org.apache.accumulo.core.data.Value;
+// import org.apache.accumulo.examples.util.FormatUtil;
+// import org.apache.hadoop.mapreduce.InputSplit;
+// import org.apache.hadoop.mapreduce.RecordReader;
+// import org.apache.hadoop.mapreduce.TaskAttemptContext;
+//
+// import com.google.common.collect.Iterators;
+// import com.google.common.collect.PeekingIterator;
+//
+/// **
+// * An InputFormat that turns the file data ingested with {@link FileDataIngest} into an
+/// InputStream
+// * using {@link ChunkInputStream}. Mappers used with this InputFormat must close the InputStream.
+// */
+// @SuppressWarnings("deprecation")
+// public class ChunkInputFormat extends
+// org.apache.accumulo.core.client.mapreduce.InputFormatBase<List<Entry<Key,Value>>,InputStream> {
+// @Override
+// public RecordReader<List<Entry<Key,Value>>,InputStream> createRecordReader(InputSplit split,
+// TaskAttemptContext context) {
+// return new RecordReaderBase<>() {
+// private PeekingIterator<Entry<Key,Value>> peekingScannerIterator;
+//
+// @Override
+// public void initialize(InputSplit inSplit, TaskAttemptContext attempt) throws IOException {
+// super.initialize(inSplit, attempt);
+// peekingScannerIterator = Iterators.peekingIterator(scannerIterator);
+// currentK = new ArrayList<>();
+// currentV = new ChunkInputStream();
+// }
+//
+// @Override
+// public boolean nextKeyValue() throws IOException {
+// log.debug("nextKeyValue called");
+//
+// currentK.clear();
+// if (peekingScannerIterator.hasNext()) {
+// ++numKeysRead;
+// Entry<Key,Value> entry = peekingScannerIterator.peek();
+// while (!entry.getKey().getColumnFamily().equals(FileDataIngest.CHUNK_CF)) {
+// currentK.add(entry);
+// peekingScannerIterator.next();
+// if (!peekingScannerIterator.hasNext()) {
+// return true;
+// }
+// entry = peekingScannerIterator.peek();
+// }
+// currentKey = entry.getKey();
+// ((ChunkInputStream) currentV).setSource(peekingScannerIterator);
+// if (log.isTraceEnabled()) {
+// log.trace("Processing key/value pair: " + FormatUtil.formatTableEntry(entry, true));
+// }
+//
+// return true;
+// }
+// return false;
+// }
+// };
+// }
+// }
diff --git a/src/test/java/org/apache/accumulo/examples/filedata/ChunkInputFormatIT.java b/src/test/java/org/apache/accumulo/examples/filedata/ChunkInputFormatIT.java
index f24aa15..d5263aa 100644
--- a/src/test/java/org/apache/accumulo/examples/filedata/ChunkInputFormatIT.java
+++ b/src/test/java/org/apache/accumulo/examples/filedata/ChunkInputFormatIT.java
@@ -1,343 +1,345 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.accumulo.examples.filedata;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.fail;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map.Entry;
-
-import org.apache.accumulo.core.client.Accumulo;
-import org.apache.accumulo.core.client.AccumuloClient;
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.BatchWriterConfig;
-import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.security.ColumnVisibility;
-import org.apache.accumulo.harness.AccumuloClusterHarness;
-import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Multimap;
-
-public class ChunkInputFormatIT extends AccumuloClusterHarness {
- @Override
- public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
- cfg.setProperty(Property.TSERV_NATIVEMAP_ENABLED, "false");
- }
-
- // track errors in the map reduce job; jobs insert a dummy error for the map and cleanup tasks (to
- // ensure test correctness), so error tests should check to see if there is at least one error
- // (could be more depending on the test) rather than zero
- private static final Multimap<String,AssertionError> assertionErrors = ArrayListMultimap.create();
-
- private static final Authorizations AUTHS = new Authorizations("A", "B", "C", "D");
-
- private static List<Entry<Key,Value>> data;
- private static List<Entry<Key,Value>> baddata;
-
- private AccumuloClient client;
- private String tableName;
-
- @BeforeEach
- public void setupInstance() throws Exception {
- client = Accumulo.newClient().from(getClientProps()).build();
- tableName = getUniqueNames(1)[0];
- client.securityOperations().changeUserAuthorizations(client.whoami(), AUTHS);
- }
-
- @AfterEach
- public void teardown() {
- client.close();
- }
-
- @BeforeAll
- public static void setupClass() {
- System.setProperty("hadoop.tmp.dir", System.getProperty("user.dir") + "/target/hadoop-tmp");
-
- data = new ArrayList<>();
- ChunkInputStreamIT.addData(data, "a", "refs", "ida\0ext", "A&B", "ext");
- ChunkInputStreamIT.addData(data, "a", "refs", "ida\0name", "A&B", "name");
- ChunkInputStreamIT.addData(data, "a", "~chunk", 100, 0, "A&B", "asdfjkl;");
- ChunkInputStreamIT.addData(data, "a", "~chunk", 100, 1, "A&B", "");
- ChunkInputStreamIT.addData(data, "b", "refs", "ida\0ext", "A&B", "ext");
- ChunkInputStreamIT.addData(data, "b", "refs", "ida\0name", "A&B", "name");
- ChunkInputStreamIT.addData(data, "b", "~chunk", 100, 0, "A&B", "qwertyuiop");
- ChunkInputStreamIT.addData(data, "b", "~chunk", 100, 0, "B&C", "qwertyuiop");
- ChunkInputStreamIT.addData(data, "b", "~chunk", 100, 1, "A&B", "");
- ChunkInputStreamIT.addData(data, "b", "~chunk", 100, 1, "B&C", "");
- ChunkInputStreamIT.addData(data, "b", "~chunk", 100, 1, "D", "");
- baddata = new ArrayList<>();
- ChunkInputStreamIT.addData(baddata, "c", "refs", "ida\0ext", "A&B", "ext");
- ChunkInputStreamIT.addData(baddata, "c", "refs", "ida\0name", "A&B", "name");
- }
-
- public static void entryEquals(Entry<Key,Value> e1, Entry<Key,Value> e2) {
- assertEquals(e1.getKey(), e2.getKey());
- assertEquals(e1.getValue(), e2.getValue());
- }
-
- public static class CIFTester extends Configured implements Tool {
- public static class TestMapper
- extends Mapper<List<Entry<Key,Value>>,InputStream,List<Entry<Key,Value>>,InputStream> {
- int count = 0;
-
- @Override
- protected void map(List<Entry<Key,Value>> key, InputStream value, Context context)
- throws IOException {
- String table = context.getConfiguration().get("MRTester_tableName");
- assertNotNull(table);
-
- byte[] b = new byte[20];
- int read;
- try (value) {
- switch (count) {
- case 0:
- assertEquals(key.size(), 2);
- entryEquals(key.get(0), data.get(0));
- entryEquals(key.get(1), data.get(1));
- assertEquals(read = value.read(b), 8);
- assertEquals(new String(b, 0, read), "asdfjkl;");
- assertEquals(read = value.read(b), -1);
- break;
- case 1:
- assertEquals(key.size(), 2);
- entryEquals(key.get(0), data.get(4));
- entryEquals(key.get(1), data.get(5));
- assertEquals(read = value.read(b), 10);
- assertEquals(new String(b, 0, read), "qwertyuiop");
- assertEquals(read = value.read(b), -1);
- break;
- default:
- fail();
- }
- } catch (AssertionError e) {
- assertionErrors.put(table, e);
- }
- count++;
- }
-
- @Override
- protected void cleanup(Context context) {
- String table = context.getConfiguration().get("MRTester_tableName");
- assertNotNull(table);
-
- try {
- assertEquals(2, count);
- } catch (AssertionError e) {
- assertionErrors.put(table, e);
- }
- }
- }
-
- public static class TestNoClose
- extends Mapper<List<Entry<Key,Value>>,InputStream,List<Entry<Key,Value>>,InputStream> {
- int count = 0;
-
- @Override
- protected void map(List<Entry<Key,Value>> key, InputStream value, Context context)
- throws IOException, InterruptedException {
- String table = context.getConfiguration().get("MRTester_tableName");
- assertNotNull(table);
-
- byte[] b = new byte[5];
- int read;
- try {
- switch (count) {
- case 0:
- assertEquals(read = value.read(b), 5);
- assertEquals(new String(b, 0, read), "asdfj");
- break;
- default:
- fail();
- }
- } catch (AssertionError e) {
- assertionErrors.put(table, e);
- }
- count++;
- try {
- context.nextKeyValue();
- fail();
- } catch (IOException ioe) {
- assertionErrors.put(table + "_map_ioexception", new AssertionError(toString(), ioe));
- }
- }
- }
-
- public static class TestBadData
- extends Mapper<List<Entry<Key,Value>>,InputStream,List<Entry<Key,Value>>,InputStream> {
- @Override
- protected void map(List<Entry<Key,Value>> key, InputStream value, Context context) {
- String table = context.getConfiguration().get("MRTester_tableName");
- assertNotNull(table);
-
- byte[] b = new byte[20];
- try {
- assertEquals(key.size(), 2);
- entryEquals(key.get(0), baddata.get(0));
- entryEquals(key.get(1), baddata.get(1));
- } catch (AssertionError e) {
- assertionErrors.put(table, e);
- }
- try {
- assertFalse(value.read(b) > 0);
- try {
- fail();
- } catch (AssertionError e) {
- assertionErrors.put(table, e);
- }
- } catch (Exception e) {
- // expected, ignore
- }
- try {
- value.close();
- try {
- fail();
- } catch (AssertionError e) {
- assertionErrors.put(table, e);
- }
- } catch (Exception e) {
- // expected, ignore
- }
- }
- }
-
- @SuppressWarnings("deprecation")
- @Override
- public int run(String[] args) throws Exception {
- if (args.length != 2) {
- throw new IllegalArgumentException(
- "Usage : " + CIFTester.class.getName() + " <table> <mapperClass>");
- }
-
- String table = args[0];
- assertionErrors.put(table, new AssertionError("Dummy"));
- assertionErrors.put(table + "_map_ioexception", new AssertionError("Dummy_ioexception"));
- getConf().set("MRTester_tableName", table);
-
- Job job = Job.getInstance(getConf());
- job.setJobName(this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
- job.setJarByClass(this.getClass());
-
- job.setInputFormatClass(ChunkInputFormat.class);
-
- ChunkInputFormat.setZooKeeperInstance(job, getCluster().getClientConfig());
- ChunkInputFormat.setConnectorInfo(job, getAdminPrincipal(), getAdminToken());
- ChunkInputFormat.setInputTableName(job, table);
- ChunkInputFormat.setScanAuthorizations(job, AUTHS);
-
- @SuppressWarnings("unchecked")
- Class<? extends Mapper<?,?,?,?>> forName = (Class<? extends Mapper<?,?,?,?>>) Class
- .forName(args[1]);
- job.setMapperClass(forName);
- job.setMapOutputKeyClass(Key.class);
- job.setMapOutputValueClass(Value.class);
- job.setOutputFormatClass(NullOutputFormat.class);
-
- job.setNumReduceTasks(0);
-
- job.waitForCompletion(true);
-
- return job.isSuccessful() ? 0 : 1;
- }
-
- public static int main(String... args) throws Exception {
- Configuration conf = new Configuration();
- conf.set("mapreduce.framework.name", "local");
- conf.set("mapreduce.cluster.local.dir",
- new File(System.getProperty("user.dir"), "target/mapreduce-tmp").getAbsolutePath());
- return ToolRunner.run(conf, new CIFTester(), args);
- }
- }
-
- @Test
- public void test() throws Exception {
- client.tableOperations().create(tableName);
- BatchWriter bw = client.createBatchWriter(tableName, new BatchWriterConfig());
-
- for (Entry<Key,Value> e : data) {
- Key k = e.getKey();
- Mutation m = new Mutation(k.getRow());
- m.put(k.getColumnFamily(), k.getColumnQualifier(),
- new ColumnVisibility(k.getColumnVisibility()), k.getTimestamp(), e.getValue());
- bw.addMutation(m);
- }
- bw.close();
-
- assertEquals(0, CIFTester.main(tableName, CIFTester.TestMapper.class.getName()));
- assertEquals(1, assertionErrors.get(tableName).size());
- }
-
- @Test
- public void testErrorOnNextWithoutClose() throws Exception {
- client.tableOperations().create(tableName);
- BatchWriter bw = client.createBatchWriter(tableName, new BatchWriterConfig());
-
- for (Entry<Key,Value> e : data) {
- Key k = e.getKey();
- Mutation m = new Mutation(k.getRow());
- m.put(k.getColumnFamily(), k.getColumnQualifier(),
- new ColumnVisibility(k.getColumnVisibility()), k.getTimestamp(), e.getValue());
- bw.addMutation(m);
- }
- bw.close();
-
- assertEquals(1, CIFTester.main(tableName, CIFTester.TestNoClose.class.getName()));
- assertEquals(1, assertionErrors.get(tableName).size());
- // this should actually exist, in addition to the dummy entry
- assertEquals(2, assertionErrors.get(tableName + "_map_ioexception").size());
- }
-
- @Test
- public void testInfoWithoutChunks() throws Exception {
- client.tableOperations().create(tableName);
- BatchWriter bw = client.createBatchWriter(tableName, new BatchWriterConfig());
- for (Entry<Key,Value> e : baddata) {
- Key k = e.getKey();
- Mutation m = new Mutation(k.getRow());
- m.put(k.getColumnFamily(), k.getColumnQualifier(),
- new ColumnVisibility(k.getColumnVisibility()), k.getTimestamp(), e.getValue());
- bw.addMutation(m);
- }
- bw.close();
-
- assertEquals(0, CIFTester.main(tableName, CIFTester.TestBadData.class.getName()));
- assertEquals(1, assertionErrors.get(tableName).size());
- }
-}
+/// *
+// * Licensed to the Apache Software Foundation (ASF) under one or more
+// * contributor license agreements. See the NOTICE file distributed with
+// * this work for additional information regarding copyright ownership.
+// * The ASF licenses this file to You under the Apache License, Version 2.0
+// * (the "License"); you may not use this file except in compliance with
+// * the License. You may obtain a copy of the License at
+// *
+// * http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing, software
+// * distributed under the License is distributed on an "AS IS" BASIS,
+// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// * See the License for the specific language governing permissions and
+// * limitations under the License.
+// */
+//
+// package org.apache.accumulo.examples.filedata;
+//
+// import static org.junit.jupiter.api.Assertions.assertEquals;
+// import static org.junit.jupiter.api.Assertions.assertFalse;
+// import static org.junit.jupiter.api.Assertions.assertNotNull;
+// import static org.junit.jupiter.api.Assertions.fail;
+//
+// import java.io.File;
+// import java.io.IOException;
+// import java.io.InputStream;
+// import java.util.ArrayList;
+// import java.util.List;
+// import java.util.Map.Entry;
+//
+// import org.apache.accumulo.core.client.Accumulo;
+// import org.apache.accumulo.core.client.AccumuloClient;
+// import org.apache.accumulo.core.client.BatchWriter;
+// import org.apache.accumulo.core.client.BatchWriterConfig;
+// import org.apache.accumulo.core.conf.Property;
+// import org.apache.accumulo.core.data.Key;
+// import org.apache.accumulo.core.data.Mutation;
+// import org.apache.accumulo.core.data.Value;
+// import org.apache.accumulo.core.security.Authorizations;
+// import org.apache.accumulo.core.security.ColumnVisibility;
+// import org.apache.accumulo.harness.AccumuloClusterHarness;
+// import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+// import org.apache.hadoop.conf.Configuration;
+// import org.apache.hadoop.conf.Configured;
+// import org.apache.hadoop.mapreduce.Job;
+// import org.apache.hadoop.mapreduce.Mapper;
+// import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+// import org.apache.hadoop.util.Tool;
+// import org.apache.hadoop.util.ToolRunner;
+// import org.junit.jupiter.api.AfterEach;
+// import org.junit.jupiter.api.BeforeAll;
+// import org.junit.jupiter.api.BeforeEach;
+// import org.junit.jupiter.api.Test;
+//
+// import com.google.common.collect.ArrayListMultimap;
+// import com.google.common.collect.Multimap;
+//
+// public class ChunkInputFormatIT extends AccumuloClusterHarness {
+// @Override
+// public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+// cfg.setProperty(Property.TSERV_NATIVEMAP_ENABLED, "false");
+// }
+//
+// // track errors in the map reduce job; jobs insert a dummy error for the map and cleanup tasks
+/// (to
+// // ensure test correctness), so error tests should check to see if there is at least one error
+// // (could be more depending on the test) rather than zero
+// private static final Multimap<String,AssertionError> assertionErrors =
+/// ArrayListMultimap.create();
+//
+// private static final Authorizations AUTHS = new Authorizations("A", "B", "C", "D");
+//
+// private static List<Entry<Key,Value>> data;
+// private static List<Entry<Key,Value>> baddata;
+//
+// private AccumuloClient client;
+// private String tableName;
+//
+// @BeforeEach
+// public void setupInstance() throws Exception {
+// client = Accumulo.newClient().from(getClientProps()).build();
+// tableName = getUniqueNames(1)[0];
+// client.securityOperations().changeUserAuthorizations(client.whoami(), AUTHS);
+// }
+//
+// @AfterEach
+// public void teardown() {
+// client.close();
+// }
+//
+// @BeforeAll
+// public static void setupClass() {
+// System.setProperty("hadoop.tmp.dir", System.getProperty("user.dir") + "/target/hadoop-tmp");
+//
+// data = new ArrayList<>();
+// ChunkInputStreamIT.addData(data, "a", "refs", "ida\0ext", "A&B", "ext");
+// ChunkInputStreamIT.addData(data, "a", "refs", "ida\0name", "A&B", "name");
+// ChunkInputStreamIT.addData(data, "a", "~chunk", 100, 0, "A&B", "asdfjkl;");
+// ChunkInputStreamIT.addData(data, "a", "~chunk", 100, 1, "A&B", "");
+// ChunkInputStreamIT.addData(data, "b", "refs", "ida\0ext", "A&B", "ext");
+// ChunkInputStreamIT.addData(data, "b", "refs", "ida\0name", "A&B", "name");
+// ChunkInputStreamIT.addData(data, "b", "~chunk", 100, 0, "A&B", "qwertyuiop");
+// ChunkInputStreamIT.addData(data, "b", "~chunk", 100, 0, "B&C", "qwertyuiop");
+// ChunkInputStreamIT.addData(data, "b", "~chunk", 100, 1, "A&B", "");
+// ChunkInputStreamIT.addData(data, "b", "~chunk", 100, 1, "B&C", "");
+// ChunkInputStreamIT.addData(data, "b", "~chunk", 100, 1, "D", "");
+// baddata = new ArrayList<>();
+// ChunkInputStreamIT.addData(baddata, "c", "refs", "ida\0ext", "A&B", "ext");
+// ChunkInputStreamIT.addData(baddata, "c", "refs", "ida\0name", "A&B", "name");
+// }
+//
+// public static void entryEquals(Entry<Key,Value> e1, Entry<Key,Value> e2) {
+// assertEquals(e1.getKey(), e2.getKey());
+// assertEquals(e1.getValue(), e2.getValue());
+// }
+//
+// public static class CIFTester extends Configured implements Tool {
+// public static class TestMapper
+// extends Mapper<List<Entry<Key,Value>>,InputStream,List<Entry<Key,Value>>,InputStream> {
+// int count = 0;
+//
+// @Override
+// protected void map(List<Entry<Key,Value>> key, InputStream value, Context context)
+// throws IOException {
+// String table = context.getConfiguration().get("MRTester_tableName");
+// assertNotNull(table);
+//
+// byte[] b = new byte[20];
+// int read;
+// try (value) {
+// switch (count) {
+// case 0:
+// assertEquals(key.size(), 2);
+// entryEquals(key.get(0), data.get(0));
+// entryEquals(key.get(1), data.get(1));
+// assertEquals(read = value.read(b), 8);
+// assertEquals(new String(b, 0, read), "asdfjkl;");
+// assertEquals(read = value.read(b), -1);
+// break;
+// case 1:
+// assertEquals(key.size(), 2);
+// entryEquals(key.get(0), data.get(4));
+// entryEquals(key.get(1), data.get(5));
+// assertEquals(read = value.read(b), 10);
+// assertEquals(new String(b, 0, read), "qwertyuiop");
+// assertEquals(read = value.read(b), -1);
+// break;
+// default:
+// fail();
+// }
+// } catch (AssertionError e) {
+// assertionErrors.put(table, e);
+// }
+// count++;
+// }
+//
+// @Override
+// protected void cleanup(Context context) {
+// String table = context.getConfiguration().get("MRTester_tableName");
+// assertNotNull(table);
+//
+// try {
+// assertEquals(2, count);
+// } catch (AssertionError e) {
+// assertionErrors.put(table, e);
+// }
+// }
+// }
+//
+// public static class TestNoClose
+// extends Mapper<List<Entry<Key,Value>>,InputStream,List<Entry<Key,Value>>,InputStream> {
+// int count = 0;
+//
+// @Override
+// protected void map(List<Entry<Key,Value>> key, InputStream value, Context context)
+// throws IOException, InterruptedException {
+// String table = context.getConfiguration().get("MRTester_tableName");
+// assertNotNull(table);
+//
+// byte[] b = new byte[5];
+// int read;
+// try {
+// switch (count) {
+// case 0:
+// assertEquals(read = value.read(b), 5);
+// assertEquals(new String(b, 0, read), "asdfj");
+// break;
+// default:
+// fail();
+// }
+// } catch (AssertionError e) {
+// assertionErrors.put(table, e);
+// }
+// count++;
+// try {
+// context.nextKeyValue();
+// fail();
+// } catch (IOException ioe) {
+// assertionErrors.put(table + "_map_ioexception", new AssertionError(toString(), ioe));
+// }
+// }
+// }
+//
+// public static class TestBadData
+// extends Mapper<List<Entry<Key,Value>>,InputStream,List<Entry<Key,Value>>,InputStream> {
+// @Override
+// protected void map(List<Entry<Key,Value>> key, InputStream value, Context context) {
+// String table = context.getConfiguration().get("MRTester_tableName");
+// assertNotNull(table);
+//
+// byte[] b = new byte[20];
+// try {
+// assertEquals(key.size(), 2);
+// entryEquals(key.get(0), baddata.get(0));
+// entryEquals(key.get(1), baddata.get(1));
+// } catch (AssertionError e) {
+// assertionErrors.put(table, e);
+// }
+// try {
+// assertFalse(value.read(b) > 0);
+// try {
+// fail();
+// } catch (AssertionError e) {
+// assertionErrors.put(table, e);
+// }
+// } catch (Exception e) {
+// // expected, ignore
+// }
+// try {
+// value.close();
+// try {
+// fail();
+// } catch (AssertionError e) {
+// assertionErrors.put(table, e);
+// }
+// } catch (Exception e) {
+// // expected, ignore
+// }
+// }
+// }
+//
+// @SuppressWarnings("deprecation")
+// @Override
+// public int run(String[] args) throws Exception {
+// if (args.length != 2) {
+// throw new IllegalArgumentException(
+// "Usage : " + CIFTester.class.getName() + " <table> <mapperClass>");
+// }
+//
+// String table = args[0];
+// assertionErrors.put(table, new AssertionError("Dummy"));
+// assertionErrors.put(table + "_map_ioexception", new AssertionError("Dummy_ioexception"));
+// getConf().set("MRTester_tableName", table);
+//
+// Job job = Job.getInstance(getConf());
+// job.setJobName(this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
+// job.setJarByClass(this.getClass());
+//
+// job.setInputFormatClass(ChunkInputFormat.class);
+//
+// ChunkInputFormat.setZooKeeperInstance(job, getCluster().getClientConfig());
+// ChunkInputFormat.setConnectorInfo(job, getAdminPrincipal(), getAdminToken());
+// ChunkInputFormat.setInputTableName(job, table);
+// ChunkInputFormat.setScanAuthorizations(job, AUTHS);
+//
+// @SuppressWarnings("unchecked")
+// Class<? extends Mapper<?,?,?,?>> forName = (Class<? extends Mapper<?,?,?,?>>) Class
+// .forName(args[1]);
+// job.setMapperClass(forName);
+// job.setMapOutputKeyClass(Key.class);
+// job.setMapOutputValueClass(Value.class);
+// job.setOutputFormatClass(NullOutputFormat.class);
+//
+// job.setNumReduceTasks(0);
+//
+// job.waitForCompletion(true);
+//
+// return job.isSuccessful() ? 0 : 1;
+// }
+//
+// public static int main(String... args) throws Exception {
+// Configuration conf = new Configuration();
+// conf.set("mapreduce.framework.name", "local");
+// conf.set("mapreduce.cluster.local.dir",
+// new File(System.getProperty("user.dir"), "target/mapreduce-tmp").getAbsolutePath());
+// return ToolRunner.run(conf, new CIFTester(), args);
+// }
+// }
+//
+// @Test
+// public void test() throws Exception {
+// client.tableOperations().create(tableName);
+// BatchWriter bw = client.createBatchWriter(tableName, new BatchWriterConfig());
+//
+// for (Entry<Key,Value> e : data) {
+// Key k = e.getKey();
+// Mutation m = new Mutation(k.getRow());
+// m.put(k.getColumnFamily(), k.getColumnQualifier(),
+// new ColumnVisibility(k.getColumnVisibility()), k.getTimestamp(), e.getValue());
+// bw.addMutation(m);
+// }
+// bw.close();
+//
+// assertEquals(0, CIFTester.main(tableName, CIFTester.TestMapper.class.getName()));
+// assertEquals(1, assertionErrors.get(tableName).size());
+// }
+//
+// @Test
+// public void testErrorOnNextWithoutClose() throws Exception {
+// client.tableOperations().create(tableName);
+// BatchWriter bw = client.createBatchWriter(tableName, new BatchWriterConfig());
+//
+// for (Entry<Key,Value> e : data) {
+// Key k = e.getKey();
+// Mutation m = new Mutation(k.getRow());
+// m.put(k.getColumnFamily(), k.getColumnQualifier(),
+// new ColumnVisibility(k.getColumnVisibility()), k.getTimestamp(), e.getValue());
+// bw.addMutation(m);
+// }
+// bw.close();
+//
+// assertEquals(1, CIFTester.main(tableName, CIFTester.TestNoClose.class.getName()));
+// assertEquals(1, assertionErrors.get(tableName).size());
+// // this should actually exist, in addition to the dummy entry
+// assertEquals(2, assertionErrors.get(tableName + "_map_ioexception").size());
+// }
+//
+// @Test
+// public void testInfoWithoutChunks() throws Exception {
+// client.tableOperations().create(tableName);
+// BatchWriter bw = client.createBatchWriter(tableName, new BatchWriterConfig());
+// for (Entry<Key,Value> e : baddata) {
+// Key k = e.getKey();
+// Mutation m = new Mutation(k.getRow());
+// m.put(k.getColumnFamily(), k.getColumnQualifier(),
+// new ColumnVisibility(k.getColumnVisibility()), k.getTimestamp(), e.getValue());
+// bw.addMutation(m);
+// }
+// bw.close();
+//
+// assertEquals(0, CIFTester.main(tableName, CIFTester.TestBadData.class.getName()));
+// assertEquals(1, assertionErrors.get(tableName).size());
+// }
+// }