blob: 094e4a5e21b7a36610ee7008de255cc665de05d0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.giraph.utils.io;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.utils.ExtendedDataOutput;
import org.apache.hadoop.io.Writable;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
* Implementations of {@link ExtendedDataOutput} are limited because they can
* only handle up to 1GB of data. This {@link DataOutput} overcomes that
* limitation, with almost no additional cost when data is not huge.
*
* Goes in pair with {@link BigDataInput}
*/
public class BigDataOutput implements DataOutput, Writable {
/** Default initial size of the stream */
private static final int DEFAULT_INITIAL_SIZE = 16;
/** Max allowed size of the stream */
private static final int MAX_SIZE = 1 << 25;
/**
* Create a new stream when we have less then this number of bytes left in
* the stream. Should be larger than the largest serialized primitive.
*/
private static final int SIZE_DELTA = 100;
/** Data output which we are currently writing to */
protected ExtendedDataOutput currentDataOutput;
/** List of filled outputs, will be null until we get a lot of data */
protected List<ExtendedDataOutput> dataOutputs;
/** Configuration */
protected final ImmutableClassesGiraphConfiguration conf;
/**
* Constructor
*
* @param conf Configuration
*/
public BigDataOutput(ImmutableClassesGiraphConfiguration conf) {
this(DEFAULT_INITIAL_SIZE, conf);
}
/**
* Constructor
*
* @param initialSize Initial size of data output
* @param conf Configuration
*/
public BigDataOutput(int initialSize,
ImmutableClassesGiraphConfiguration conf) {
this.conf = conf;
dataOutputs = null;
currentDataOutput = createOutput(initialSize);
}
/**
* Get max size for single data output
*
* @return Max size for single data output
*/
protected int getMaxSize() {
return MAX_SIZE;
}
/**
* Create next data output
*
* @param size Size of data output to create
* @return Created data output
*/
protected ExtendedDataOutput createOutput(int size) {
return conf.createExtendedDataOutput(size);
}
/**
* Get DataOutput which data should be written to. If current DataOutput is
* full it will create a new one.
*
* @return DataOutput which data should be written to
*/
private ExtendedDataOutput getDataOutputToWriteTo() {
return getDataOutputToWriteTo(SIZE_DELTA);
}
/**
* Get DataOutput which data should be written to. If current DataOutput is
* full it will create a new one.
*
* @param additionalSize How many additional bytes we need space for
* @return DataOutput which data should be written to
*/
private ExtendedDataOutput getDataOutputToWriteTo(int additionalSize) {
if (currentDataOutput.getPos() + additionalSize >= getMaxSize()) {
if (dataOutputs == null) {
dataOutputs = new ArrayList<>(1);
}
dataOutputs.add(currentDataOutput);
currentDataOutput = createOutput(getMaxSize());
}
return currentDataOutput;
}
/**
* Get number of DataOutputs which contain written data.
*
* @return Number of DataOutputs which contain written data
*/
public int getNumberOfDataOutputs() {
return (dataOutputs == null) ? 1 : dataOutputs.size() + 1;
}
/**
* Get DataOutputs which contain written data.
*
* @return DataOutputs which contain written data
*/
public Iterable<ExtendedDataOutput> getDataOutputs() {
ArrayList<ExtendedDataOutput> currentList =
Lists.newArrayList(currentDataOutput);
if (dataOutputs == null) {
return currentList;
} else {
return Iterables.concat(dataOutputs, currentList);
}
}
public ImmutableClassesGiraphConfiguration getConf() {
return conf;
}
/**
* Get number of bytes written to this data output
*
* @return Size in bytes
*/
public long getSize() {
long size = currentDataOutput.getPos();
if (dataOutputs != null) {
for (ExtendedDataOutput dataOutput : dataOutputs) {
size += dataOutput.getPos();
}
}
return size;
}
@Override
public void write(int b) throws IOException {
getDataOutputToWriteTo().write(b);
}
@Override
public void write(byte[] b) throws IOException {
getDataOutputToWriteTo(b.length + SIZE_DELTA).write(b);
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
getDataOutputToWriteTo(len + SIZE_DELTA).write(b, off, len);
}
@Override
public void writeBoolean(boolean v) throws IOException {
getDataOutputToWriteTo().writeBoolean(v);
}
@Override
public void writeByte(int v) throws IOException {
getDataOutputToWriteTo().writeByte(v);
}
@Override
public void writeShort(int v) throws IOException {
getDataOutputToWriteTo().writeShort(v);
}
@Override
public void writeChar(int v) throws IOException {
getDataOutputToWriteTo().writeChar(v);
}
@Override
public void writeInt(int v) throws IOException {
getDataOutputToWriteTo().writeInt(v);
}
@Override
public void writeLong(long v) throws IOException {
getDataOutputToWriteTo().writeLong(v);
}
@Override
public void writeFloat(float v) throws IOException {
getDataOutputToWriteTo().writeFloat(v);
}
@Override
public void writeDouble(double v) throws IOException {
getDataOutputToWriteTo().writeDouble(v);
}
@Override
public void writeBytes(String s) throws IOException {
getDataOutputToWriteTo().writeBytes(s);
}
@Override
public void writeChars(String s) throws IOException {
getDataOutputToWriteTo().writeChars(s);
}
@Override
public void writeUTF(String s) throws IOException {
getDataOutputToWriteTo().writeUTF(s);
}
/**
* Write one of data outputs to another data output
*
* @param dataOutput Data output to write
* @param out Data output to write to
*/
private void writeExtendedDataOutput(ExtendedDataOutput dataOutput,
DataOutput out) throws IOException {
out.writeInt(dataOutput.getPos());
out.write(dataOutput.getByteArray(), 0, dataOutput.getPos());
}
/**
* Read data output from data input
*
* @param in Data input to read from
* @return Data output read
*/
private ExtendedDataOutput readExtendedDataOutput(
DataInput in) throws IOException {
int length = in.readInt();
byte[] data = new byte[length];
in.readFully(data);
return conf.createExtendedDataOutput(data, data.length);
}
@Override
public void write(DataOutput out) throws IOException {
if (dataOutputs == null) {
out.writeInt(0);
} else {
out.writeInt(dataOutputs.size());
for (ExtendedDataOutput stream : dataOutputs) {
writeExtendedDataOutput(stream, out);
}
}
writeExtendedDataOutput(currentDataOutput, out);
}
@Override
public void readFields(DataInput in) throws IOException {
int size = in.readInt();
if (size == 0) {
dataOutputs = null;
} else {
dataOutputs = new ArrayList<ExtendedDataOutput>(size);
while (size-- > 0) {
dataOutputs.add(readExtendedDataOutput(in));
}
}
currentDataOutput = readExtendedDataOutput(in);
}
}