blob: 57cde987e3b9cfd8ba31d1446b4f16f533b5304d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.wayang.java.operators;
import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Spliterators;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.Validate;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.logging.log4j.LogManager;
import org.apache.wayang.basic.channels.FileChannel;
import org.apache.wayang.basic.operators.ObjectFileSource;
import org.apache.wayang.core.api.exception.WayangException;
import org.apache.wayang.core.optimizer.OptimizationContext;
import org.apache.wayang.core.plan.wayangplan.ExecutionOperator;
import org.apache.wayang.core.plan.wayangplan.Operator;
import org.apache.wayang.core.platform.ChannelDescriptor;
import org.apache.wayang.core.platform.ChannelInstance;
import org.apache.wayang.core.platform.lineage.ExecutionLineageNode;
import org.apache.wayang.core.types.DataSetType;
import org.apache.wayang.core.util.Tuple;
import org.apache.wayang.core.util.fs.FileSystems;
import org.apache.wayang.java.channels.StreamChannel;
import org.apache.wayang.java.execution.JavaExecutor;
import org.apache.wayang.java.platform.JavaPlatform;
/**
* {@link Operator} for the {@link JavaPlatform} that creates a sequence file. Consistent with Spark's object files.
*
* @see JavaObjectFileSink
*/
public class JavaObjectFileSource<T> extends ObjectFileSource<T> implements JavaExecutionOperator {
public JavaObjectFileSource(ObjectFileSource<T> that) {
super(that);
}
public JavaObjectFileSource(DataSetType<T> type) {
super(null, type);
}
public JavaObjectFileSource(String sourcePath, DataSetType<T> type) {
super(sourcePath, type);
}
@Override
public Tuple<Collection<ExecutionLineageNode>, Collection<ChannelInstance>> evaluate(
ChannelInstance[] inputs,
ChannelInstance[] outputs,
JavaExecutor javaExecutor,
OptimizationContext.OperatorContext operatorContext) {
assert outputs.length == this.getNumOutputs();
SequenceFileIterator sequenceFileIterator;
final String path;
if (this.getInputUrl() == null) {
final FileChannel.Instance input = (FileChannel.Instance) inputs[0];
path = input.getSinglePath();
} else {
assert inputs.length == 0;
path = this.getInputUrl();
}
try {
final String actualInputPath = FileSystems.findActualSingleInputPath(path);
sequenceFileIterator = new SequenceFileIterator<>(actualInputPath);
Stream<?> sequenceFileStream =
StreamSupport.stream(Spliterators.spliteratorUnknownSize(sequenceFileIterator, 0), false);
((StreamChannel.Instance) outputs[0]).accept(sequenceFileStream);
} catch (IOException e) {
throw new WayangException(String.format("%s failed to read from %s.", this, path), e);
}
return ExecutionOperator.modelEagerExecution(inputs, outputs, operatorContext);
}
@Override
public String getLoadProfileEstimatorConfigurationKey() {
return "wayang.java.objectfilesource.load";
}
@Override
protected ExecutionOperator createCopy() {
return new JavaObjectFileSource<>(this.getInputUrl(), this.getType());
}
@Override
public List<ChannelDescriptor> getSupportedInputChannels(int index) {
assert index <= this.getNumInputs() || (index == 0 && this.getNumInputs() == 0);
return Collections.singletonList(FileChannel.HDFS_OBJECT_FILE_DESCRIPTOR);
}
@Override
public List<ChannelDescriptor> getSupportedOutputChannels(int index) {
assert index <= this.getNumOutputs() || (index == 0 && this.getNumOutputs() == 0);
return Collections.singletonList(StreamChannel.DESCRIPTOR);
}
private static class SequenceFileIterator<T> implements Iterator<T>, AutoCloseable, Closeable {
private SequenceFile.Reader sequenceFileReader;
private final NullWritable nullWritable = NullWritable.get();
private final BytesWritable bytesWritable = new BytesWritable();
private Object[] nextElements;
private ArrayList nextElements_cole;
private int nextIndex;
SequenceFileIterator(String path) throws IOException {
final SequenceFile.Reader.Option fileOption = SequenceFile.Reader.file(new Path(path));
this.sequenceFileReader = new SequenceFile.Reader(new Configuration(true), fileOption);
Validate.isTrue(this.sequenceFileReader.getKeyClass().equals(NullWritable.class));
Validate.isTrue(this.sequenceFileReader.getValueClass().equals(BytesWritable.class));
this.tryAdvance();
}
private void tryAdvance() {
if (this.nextElements != null && ++this.nextIndex < this.nextElements.length) return;
if (this.nextElements_cole != null && ++this.nextIndex < this.nextElements_cole.size()) return;
try {
if (!this.sequenceFileReader.next(this.nullWritable, this.bytesWritable)) {
this.nextElements = null;
return;
}
Object tmp = new ObjectInputStream(new ByteArrayInputStream(this.bytesWritable.getBytes())).readObject();
if(tmp instanceof Collection) {
this.nextElements = null;
this.nextElements_cole = (ArrayList) tmp;
}else if(tmp instanceof Object[]){
this.nextElements = (Object[]) tmp;
this.nextElements_cole = null;
}else {
this.nextElements = new Object[1];
this.nextElements[0] = tmp;
}
this.nextIndex = 0;
} catch (IOException | ClassNotFoundException e) {
this.nextElements = null;
IOUtils.closeQuietly(this);
throw new WayangException("Reading failed.", e);
}
}
@Override
public boolean hasNext() {
return this.nextElements != null || this.nextElements_cole != null;
}
@Override
public T next() {
Validate.isTrue(this.hasNext());
@SuppressWarnings("unchecked")
final T result;
if(this.nextElements_cole != null){
result = (T) this.nextElements_cole.get(this.nextIndex);
}else if (this.nextElements != null) {
result = (T) this.nextElements[this.nextIndex];
}else{
result = null;
}
this.tryAdvance();
return result;
}
@Override
public void close() {
if (this.sequenceFileReader != null) {
try {
this.sequenceFileReader.close();
} catch (Throwable t) {
LogManager.getLogger(this.getClass()).error("Closing failed.", t);
}
this.sequenceFileReader = null;
}
}
}
}