blob: 58fbcb6c3737516ba031d6ee3023afae60121923 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.giraph.ooc.persistence;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.conf.IntConfOption;
import org.apache.giraph.utils.ExtendedDataOutput;
import org.apache.giraph.utils.io.BigDataInput;
import org.apache.giraph.utils.io.BigDataOutput;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingDeque;
/**
* Implementation of data accessor which keeps all the data serialized but in
* memory. Useful to keep the number of used objects under control.
*/
public class InMemoryDataAccessor implements OutOfCoreDataAccessor {
/** Configuration */
private final ImmutableClassesGiraphConfiguration<?, ?, ?> conf;
/** Factory for data outputs */
private final PooledBigDataOutputFactory outputFactory;
/** DataInputOutput for each DataIndex used */
private final ConcurrentHashMap<
DataIndex, PooledBigDataOutputFactory.Output> data;
/**
* Constructor
*
* @param conf Configuration
*/
public InMemoryDataAccessor(
ImmutableClassesGiraphConfiguration<?, ?, ?> conf) {
this.conf = conf;
outputFactory = new PooledBigDataOutputFactory(conf);
data = new ConcurrentHashMap<>();
}
@Override
public void initialize() {
// No-op
}
@Override
public void shutdown() {
// No-op
}
@Override
public int getNumAccessorThreads() {
return GiraphConstants.NUM_OUT_OF_CORE_THREADS.get(conf);
}
@Override
public DataInputWrapper prepareInput(int threadId,
DataIndex index) throws IOException {
return new InMemoryDataInputWrapper(
new BigDataInput(data.get(index)), index);
}
@Override
public DataOutputWrapper prepareOutput(int threadId,
DataIndex index, boolean shouldAppend) throws IOException {
// Don't need to worry about synchronization here since only one thread
// can deal with one index
PooledBigDataOutputFactory.Output output = data.get(index);
if (output == null || !shouldAppend) {
output = outputFactory.createOutput();
data.put(index, output);
}
return new InMemoryDataOutputWrapper(output);
}
@Override
public boolean dataExist(int threadId, DataIndex index) {
return data.containsKey(index);
}
/**
* {@link DataOutputWrapper} implementation for {@link InMemoryDataAccessor}
*/
public static class InMemoryDataOutputWrapper implements DataOutputWrapper {
/** Output to write data to */
private final BigDataOutput output;
/** Size of output at the moment it was created */
private final long initialSize;
/**
* Constructor
*
* @param output Output to write data to
*/
public InMemoryDataOutputWrapper(BigDataOutput output) {
this.output = output;
initialSize = output.getSize();
}
@Override
public DataOutput getDataOutput() {
return output;
}
@Override
public long finalizeOutput() {
return output.getSize() - initialSize;
}
}
/**
* {@link DataInputWrapper} implementation for {@link InMemoryDataAccessor}
*/
public class InMemoryDataInputWrapper implements DataInputWrapper {
/** Input to read data from */
private final BigDataInput input;
/** DataIndex which this wrapper belongs to */
private final DataIndex index;
/**
* Constructor
*
* @param input Input to read data from
* @param index DataIndex which this wrapper belongs to
*/
public InMemoryDataInputWrapper(
BigDataInput input, DataIndex index) {
this.input = input;
this.index = index;
}
@Override
public DataInput getDataInput() {
return input;
}
@Override
public long finalizeInput(boolean deleteOnClose) {
if (deleteOnClose) {
data.remove(index).returnData();
}
return input.getPos();
}
}
/**
* Factory for pooled big data outputs
*/
private static class PooledBigDataOutputFactory {
/** How big pool of byte arrays to keep */
public static final IntConfOption BYTE_ARRAY_POOL_SIZE =
new IntConfOption("giraph.inMemoryDataAccessor.poolSize", 1024,
"How big pool of byte arrays to keep");
/** How big byte arrays to make */
public static final IntConfOption BYTE_ARRAY_SIZE =
new IntConfOption("giraph.inMemoryDataAccessor.byteArraySize", 1 << 21,
"How big byte arrays to make");
/** Configuration */
private final ImmutableClassesGiraphConfiguration conf;
/** Pool of reusable byte[] */
private final LinkedBlockingDeque<byte[]> byteArrayPool;
/** How big byte arrays to make */
private final int byteArraySize;
/**
* Constructor
*
* @param conf Configuration
*/
public PooledBigDataOutputFactory(
ImmutableClassesGiraphConfiguration conf) {
this.conf = conf;
byteArrayPool = new LinkedBlockingDeque<>(BYTE_ARRAY_POOL_SIZE.get(conf));
byteArraySize = BYTE_ARRAY_SIZE.get(conf);
}
/**
* Create new output to write to
*
* @return Output to write to
*/
public Output createOutput() {
return new Output(conf);
}
/**
* Implementation of BigDataOutput
*/
private class Output extends BigDataOutput {
/**
* Constructor
*
* @param conf Configuration
*/
public Output(ImmutableClassesGiraphConfiguration conf) {
super(conf);
}
/**
* Return all data structures related to this data output.
* Can't use the same instance after this call anymore.
*/
protected void returnData() {
if (dataOutputs != null) {
for (ExtendedDataOutput dataOutput : dataOutputs) {
byteArrayPool.offer(dataOutput.getByteArray());
}
}
byteArrayPool.offer(currentDataOutput.getByteArray());
}
@Override
protected ExtendedDataOutput createOutput(int size) {
byte[] data = byteArrayPool.pollLast();
return conf.createExtendedDataOutput(
data == null ? new byte[byteArraySize] : data, 0);
}
@Override
protected int getMaxSize() {
return byteArraySize;
}
}
}
}