blob: fb7e729d85d1cbaa150184d0d199fcad6fb1f8da [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.accumulo.core.client.mapreduce.impl;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import org.apache.accumulo.core.client.impl.Table;
import org.apache.accumulo.core.client.mapreduce.RangeInputSplit;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.PartialKey;
import org.apache.accumulo.core.data.Range;
/**
* The Class BatchInputSplit. Encapsulates a set of Accumulo ranges on a single tablet for use in
* Map Reduce jobs. Can contain several Ranges per split.
*/
public class BatchInputSplit extends RangeInputSplit {
private Collection<Range> ranges;
private float[] rangeProgress = null;
public BatchInputSplit() {
ranges = Collections.emptyList();
}
public BatchInputSplit(BatchInputSplit split) throws IOException {
super(split);
this.setRanges(split.getRanges());
}
public BatchInputSplit(String table, Table.ID tableId, Collection<Range> ranges,
String[] locations) {
super(table, tableId.canonicalID(), new Range(), locations);
this.ranges = ranges;
}
/**
* Save progress on each call to this function, implied by value of currentKey, and return average
* ranges in the split
*/
@Override
public float getProgress(Key currentKey) {
if (null == rangeProgress)
rangeProgress = new float[ranges.size()];
float total = 0; // progress per range could be on different scales, this number is "fuzzy"
if (currentKey == null) {
for (float progress : rangeProgress)
total += progress;
} else {
int i = 0;
for (Range range : ranges) {
if (range.contains(currentKey)) {
// find the current range and report as if that is the single range
if (range.getStartKey() != null && range.getEndKey() != null) {
if (range.getStartKey().compareTo(range.getEndKey(), PartialKey.ROW) != 0) {
// just look at the row progress
rangeProgress[i] = SplitUtils.getProgress(range.getStartKey().getRowData(),
range.getEndKey().getRowData(), currentKey.getRowData());
} else if (range.getStartKey().compareTo(range.getEndKey(),
PartialKey.ROW_COLFAM) != 0) {
// just look at the column family progress
rangeProgress[i] = SplitUtils.getProgress(range.getStartKey().getColumnFamilyData(),
range.getEndKey().getColumnFamilyData(), currentKey.getColumnFamilyData());
} else if (range.getStartKey().compareTo(range.getEndKey(),
PartialKey.ROW_COLFAM_COLQUAL) != 0) {
// just look at the column qualifier progress
rangeProgress[i] = SplitUtils.getProgress(
range.getStartKey().getColumnQualifierData(),
range.getEndKey().getColumnQualifierData(), currentKey.getColumnQualifierData());
}
}
total += rangeProgress[i];
}
i++;
}
}
return total / ranges.size();
}
/**
* This implementation of length is only an estimate, it does not provide exact values. Do not
* have your code rely on this return value.
*/
@Override
public long getLength() throws IOException {
long sum = 0;
for (Range range : ranges)
sum += SplitUtils.getRangeLength(range);
return sum;
}
@Override
public void readFields(DataInput in) throws IOException {
super.readFields(in);
int numRanges = in.readInt();
ranges = new ArrayList<>(numRanges);
for (int i = 0; i < numRanges; ++i) {
Range r = new Range();
r.readFields(in);
ranges.add(r);
}
}
@Override
public void write(DataOutput out) throws IOException {
super.write(out);
out.writeInt(ranges.size());
for (Range r : ranges)
r.write(out);
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder(256);
sb.append("BatchInputSplit:");
sb.append(" Ranges: ").append(Arrays.asList(ranges));
sb.append(super.toString());
return sb.toString();
}
public void setRanges(Collection<Range> ranges) {
this.ranges = ranges;
}
public Collection<Range> getRanges() {
return ranges;
}
@Override
public Range getRange() {
throw new UnsupportedOperationException();
}
@Override
public void setRange(Range range) {
throw new UnsupportedOperationException();
}
@Override
public Boolean isIsolatedScan() {
throw new UnsupportedOperationException();
}
@Override
public void setIsolatedScan(Boolean isolatedScan) {
throw new UnsupportedOperationException();
}
@Override
public Boolean isOffline() {
throw new UnsupportedOperationException();
}
@Override
public void setOffline(Boolean offline) {
throw new UnsupportedOperationException();
}
@Override
public Boolean usesLocalIterators() {
throw new UnsupportedOperationException();
}
@Override
public void setUsesLocalIterators(Boolean localIterators) {
throw new UnsupportedOperationException();
}
}