blob: 4e17bb725954f116b5c3b9cbf59873daebb8c7eb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.accumulo.testing.continuous;
import static org.apache.accumulo.testing.continuous.ContinuousIngest.genCol;
import static org.apache.accumulo.testing.continuous.ContinuousIngest.genLong;
import static org.apache.accumulo.testing.continuous.ContinuousIngest.genRow;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.zip.CRC32;
import java.util.zip.Checksum;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.ColumnVisibility;
import org.apache.accumulo.testing.TestProps;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
/**
* Generates a continuous ingest linked list per map reduce split. Each linked list is of
* configurable length.
*/
public class ContinuousInputFormat extends InputFormat<Key,Value> {
private static final String PROP_UUID = "mrbulk.uuid";
private static final String PROP_MAP_TASK = "mrbulk.map.task";
private static final String PROP_MAP_NODES = "mrbulk.map.nodes";
private static final String PROP_ROW_MIN = "mrbulk.row.min";
private static final String PROP_ROW_MAX = "mrbulk.row.max";
private static final String PROP_FAM_MAX = "mrbulk.fam.max";
private static final String PROP_QUAL_MAX = "mrbulk.qual.max";
private static final String PROP_CHECKSUM = "mrbulk.checksum";
private static final String PROP_VIS = "mrbulk.vis";
private static class RandomSplit extends InputSplit implements Writable {
@Override
public void write(DataOutput dataOutput) {}
@Override
public void readFields(DataInput dataInput) {}
@Override
public long getLength() {
return 0;
}
@Override
public String[] getLocations() {
return new String[0];
}
}
@Override
public List<InputSplit> getSplits(JobContext jobContext) {
int numTask = jobContext.getConfiguration().getInt(PROP_MAP_TASK, 1);
List<InputSplit> splits = new ArrayList<>();
for (int i = 0; i < numTask; i++) {
splits.add(new RandomSplit());
}
return splits;
}
public static void configure(Configuration conf, String uuid, ContinuousEnv env) {
conf.set(PROP_UUID, uuid);
conf.setInt(PROP_MAP_TASK, env.getBulkMapTask());
conf.setLong(PROP_MAP_NODES, env.getBulkMapNodes());
conf.setLong(PROP_ROW_MIN, env.getRowMin());
conf.setLong(PROP_ROW_MAX, env.getRowMax());
conf.setInt(PROP_FAM_MAX, env.getMaxColF());
conf.setInt(PROP_QUAL_MAX, env.getMaxColQ());
conf.setBoolean(PROP_CHECKSUM,
Boolean.parseBoolean(env.getTestProperty(TestProps.CI_INGEST_CHECKSUM)));
conf.set(PROP_VIS, env.getTestProperty(TestProps.CI_INGEST_VISIBILITIES));
}
@Override
public RecordReader<Key,Value> createRecordReader(InputSplit inputSplit,
TaskAttemptContext taskAttemptContext) {
return new RecordReader<Key,Value>() {
long numNodes;
long nodeCount;
private Random random;
private byte[] uuid;
long minRow;
long maxRow;
int maxFam;
int maxQual;
List<ColumnVisibility> visibilities;
boolean checksum;
Key prevKey;
Key currKey;
Value currValue;
@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext job) {
numNodes = job.getConfiguration().getLong(PROP_MAP_NODES, 1000000);
uuid = job.getConfiguration().get(PROP_UUID).getBytes(StandardCharsets.UTF_8);
minRow = job.getConfiguration().getLong(PROP_ROW_MIN, 0);
maxRow = job.getConfiguration().getLong(PROP_ROW_MAX, Long.MAX_VALUE);
maxFam = job.getConfiguration().getInt(PROP_FAM_MAX, Short.MAX_VALUE);
maxQual = job.getConfiguration().getInt(PROP_QUAL_MAX, Short.MAX_VALUE);
checksum = job.getConfiguration().getBoolean(PROP_CHECKSUM, false);
visibilities = ContinuousIngest.parseVisibilities(job.getConfiguration().get(PROP_VIS));
random = new Random(new SecureRandom().nextLong());
nodeCount = 0;
}
private Key genKey(CRC32 cksum) {
byte[] row = genRow(genLong(minRow, maxRow, random));
byte[] fam = genCol(random.nextInt(maxFam));
byte[] qual = genCol(random.nextInt(maxQual));
byte[] cv = visibilities.get(random.nextInt(visibilities.size())).flatten();
if (cksum != null) {
cksum.update(row);
cksum.update(fam);
cksum.update(qual);
cksum.update(cv);
}
return new Key(row, fam, qual, cv);
}
private byte[] createValue(byte[] ingestInstanceId, byte[] prevRow, Checksum cksum) {
return ContinuousIngest.createValue(ingestInstanceId, nodeCount, prevRow, cksum);
}
@Override
public boolean nextKeyValue() {
if (nodeCount < numNodes) {
CRC32 cksum = checksum ? new CRC32() : null;
prevKey = currKey;
byte[] prevRow = prevKey != null ? prevKey.getRowData().toArray() : null;
currKey = genKey(cksum);
currValue = new Value(createValue(uuid, prevRow, cksum));
nodeCount++;
return true;
} else {
return false;
}
}
@Override
public Key getCurrentKey() {
return currKey;
}
@Override
public Value getCurrentValue() {
return currValue;
}
@Override
public float getProgress() {
return nodeCount * 1.0f / numNodes;
}
@Override
public void close() throws IOException {
}
};
}
}