| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.wayang.java.operators; |
| |
| import org.apache.commons.io.IOUtils; |
| import org.apache.commons.lang3.Validate; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.io.BytesWritable; |
| import org.apache.hadoop.io.NullWritable; |
| import org.apache.hadoop.io.SequenceFile; |
| import org.apache.wayang.basic.channels.FileChannel; |
| import org.apache.wayang.core.api.exception.WayangException; |
| import org.apache.wayang.core.optimizer.OptimizationContext; |
| import org.apache.wayang.core.plan.wayangplan.ExecutionOperator; |
| import org.apache.wayang.core.plan.wayangplan.Operator; |
| import org.apache.wayang.core.plan.wayangplan.UnarySource; |
| import org.apache.wayang.core.platform.ChannelDescriptor; |
| import org.apache.wayang.core.platform.ChannelInstance; |
| import org.apache.wayang.core.platform.lineage.ExecutionLineageNode; |
| import org.apache.wayang.core.types.DataSetType; |
| import org.apache.wayang.core.util.Tuple; |
| import org.apache.wayang.core.util.fs.FileSystems; |
| import org.apache.wayang.java.channels.StreamChannel; |
| import org.apache.wayang.java.execution.JavaExecutor; |
| import org.apache.wayang.java.platform.JavaPlatform; |
| import org.apache.logging.log4j.LogManager; |
| |
| import java.io.ByteArrayInputStream; |
| import java.io.Closeable; |
| import java.io.IOException; |
| import java.io.ObjectInputStream; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Spliterators; |
| import java.util.stream.Stream; |
| import java.util.stream.StreamSupport; |
| |
| /** |
| * {@link Operator} for the {@link JavaPlatform} that creates a sequence file. Consistent with Spark's object files. |
| * |
| * @see JavaObjectFileSink |
| */ |
| public class JavaObjectFileSource<T> extends UnarySource<T> implements JavaExecutionOperator { |
| |
| private final String sourcePath; |
| |
| public JavaObjectFileSource(DataSetType<T> type) { |
| this(null, type); |
| } |
| |
| public JavaObjectFileSource(String sourcePath, DataSetType<T> type) { |
| super(type); |
| this.sourcePath = sourcePath; |
| } |
| |
| @Override |
| public Tuple<Collection<ExecutionLineageNode>, Collection<ChannelInstance>> evaluate( |
| ChannelInstance[] inputs, |
| ChannelInstance[] outputs, |
| JavaExecutor javaExecutor, |
| OptimizationContext.OperatorContext operatorContext) { |
| assert outputs.length == this.getNumOutputs(); |
| |
| SequenceFileIterator sequenceFileIterator; |
| final String path; |
| if (this.sourcePath == null) { |
| final FileChannel.Instance input = (FileChannel.Instance) inputs[0]; |
| path = input.getSinglePath(); |
| } else { |
| assert inputs.length == 0; |
| path = this.sourcePath; |
| } |
| try { |
| final String actualInputPath = FileSystems.findActualSingleInputPath(path); |
| sequenceFileIterator = new SequenceFileIterator<>(actualInputPath); |
| Stream<?> sequenceFileStream = |
| StreamSupport.stream(Spliterators.spliteratorUnknownSize(sequenceFileIterator, 0), false); |
| ((StreamChannel.Instance) outputs[0]).accept(sequenceFileStream); |
| } catch (IOException e) { |
| throw new WayangException(String.format("%s failed to read from %s.", this, path), e); |
| } |
| |
| return ExecutionOperator.modelEagerExecution(inputs, outputs, operatorContext); |
| } |
| |
| @Override |
| public String getLoadProfileEstimatorConfigurationKey() { |
| return "wayang.java.objectfilesource.load"; |
| } |
| |
| @Override |
| protected ExecutionOperator createCopy() { |
| return new JavaObjectFileSource<>(this.sourcePath, this.getType()); |
| } |
| |
| @Override |
| public List<ChannelDescriptor> getSupportedInputChannels(int index) { |
| assert index <= this.getNumInputs() || (index == 0 && this.getNumInputs() == 0); |
| return Collections.singletonList(FileChannel.HDFS_OBJECT_FILE_DESCRIPTOR); |
| } |
| |
| @Override |
| public List<ChannelDescriptor> getSupportedOutputChannels(int index) { |
| assert index <= this.getNumOutputs() || (index == 0 && this.getNumOutputs() == 0); |
| return Collections.singletonList(StreamChannel.DESCRIPTOR); |
| } |
| |
| private static class SequenceFileIterator<T> implements Iterator<T>, AutoCloseable, Closeable { |
| |
| private SequenceFile.Reader sequenceFileReader; |
| |
| private final NullWritable nullWritable = NullWritable.get(); |
| |
| private final BytesWritable bytesWritable = new BytesWritable(); |
| |
| private Object[] nextElements; |
| |
| private ArrayList nextElements_cole; |
| |
| private int nextIndex; |
| |
| SequenceFileIterator(String path) throws IOException { |
| final SequenceFile.Reader.Option fileOption = SequenceFile.Reader.file(new Path(path)); |
| this.sequenceFileReader = new SequenceFile.Reader(new Configuration(true), fileOption); |
| Validate.isTrue(this.sequenceFileReader.getKeyClass().equals(NullWritable.class)); |
| Validate.isTrue(this.sequenceFileReader.getValueClass().equals(BytesWritable.class)); |
| this.tryAdvance(); |
| } |
| |
| private void tryAdvance() { |
| if (this.nextElements != null && ++this.nextIndex < this.nextElements.length) return; |
| if (this.nextElements_cole != null && ++this.nextIndex < this.nextElements_cole.size()) return; |
| try { |
| if (!this.sequenceFileReader.next(this.nullWritable, this.bytesWritable)) { |
| this.nextElements = null; |
| return; |
| } |
| Object tmp = new ObjectInputStream(new ByteArrayInputStream(this.bytesWritable.getBytes())).readObject(); |
| if(tmp instanceof Collection) { |
| this.nextElements = null; |
| this.nextElements_cole = (ArrayList) tmp; |
| }else if(tmp instanceof Object[]){ |
| this.nextElements = (Object[]) tmp; |
| this.nextElements_cole = null; |
| }else { |
| this.nextElements = new Object[1]; |
| this.nextElements[0] = tmp; |
| |
| } |
| this.nextIndex = 0; |
| } catch (IOException | ClassNotFoundException e) { |
| this.nextElements = null; |
| IOUtils.closeQuietly(this); |
| throw new WayangException("Reading failed.", e); |
| } |
| } |
| |
| @Override |
| public boolean hasNext() { |
| return this.nextElements != null || this.nextElements_cole != null; |
| } |
| |
| @Override |
| public T next() { |
| Validate.isTrue(this.hasNext()); |
| @SuppressWarnings("unchecked") |
| final T result; |
| if(this.nextElements_cole != null){ |
| result = (T) this.nextElements_cole.get(this.nextIndex); |
| }else if (this.nextElements != null) { |
| result = (T) this.nextElements[this.nextIndex]; |
| }else{ |
| result = null; |
| } |
| |
| this.tryAdvance(); |
| return result; |
| } |
| |
| @Override |
| public void close() { |
| if (this.sequenceFileReader != null) { |
| try { |
| this.sequenceFileReader.close(); |
| } catch (Throwable t) { |
| LogManager.getLogger(this.getClass()).error("Closing failed.", t); |
| } |
| this.sequenceFileReader = null; |
| } |
| } |
| } |
| |
| } |