| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.nemo.examples.beam; |
| |
| import org.apache.beam.sdk.Pipeline; |
| import org.apache.beam.sdk.io.TextIO; |
| import org.apache.beam.sdk.io.hadoop.format.HadoopFormatIO; |
| import org.apache.beam.sdk.transforms.DoFn; |
| import org.apache.beam.sdk.transforms.MapElements; |
| import org.apache.beam.sdk.transforms.ParDo; |
| import org.apache.beam.sdk.transforms.SimpleFunction; |
| import org.apache.beam.sdk.values.KV; |
| import org.apache.beam.sdk.values.PCollection; |
| import org.apache.beam.sdk.values.PDone; |
| import org.apache.beam.sdk.values.TypeDescriptor; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FSDataOutputStream; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.io.LongWritable; |
| import org.apache.hadoop.io.Text; |
| import org.apache.hadoop.mapred.JobConf; |
| import org.apache.hadoop.mapreduce.InputFormat; |
| import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.IOException; |
| import java.util.UUID; |
| |
| /** |
| * Helper class for handling source/sink in a generic way. |
| * Assumes String-type PCollections. |
| */ |
| final class GenericSourceSink { |
| /** |
| * Default Constructor. |
| */ |
| private GenericSourceSink() { |
| } |
| |
| /** |
| * Read data. |
| * @param pipeline beam pipeline |
| * @param path path to read |
| * @return returns the read value |
| */ |
| public static PCollection<String> read(final Pipeline pipeline, |
| final String path) { |
| if (isHDFSPath(path)) { |
| final Configuration hadoopConf = new Configuration(true); |
| hadoopConf.set("mapreduce.input.fileinputformat.inputdir", path); |
| hadoopConf.setClass("mapreduce.job.inputformat.class", TextInputFormat.class, InputFormat.class); |
| hadoopConf.setClass("key.class", LongWritable.class, Object.class); |
| hadoopConf.setClass("value.class", Text.class, Object.class); |
| |
| // Without translations, Beam internally does some weird cloning |
| final HadoopFormatIO.Read<Long, String> read = HadoopFormatIO.<Long, String>read() |
| .withConfiguration(hadoopConf) |
| .withKeyTranslation(new SimpleFunction<LongWritable, Long>() { |
| @Override |
| public Long apply(final LongWritable longWritable) { |
| return longWritable.get(); |
| } |
| }) |
| .withValueTranslation(new SimpleFunction<Text, String>() { |
| @Override |
| public String apply(final Text text) { |
| return text.toString(); |
| } |
| }); |
| return pipeline.apply(read).apply(MapElements.into(TypeDescriptor.of(String.class)).via(KV::getValue)); |
| } else { |
| return pipeline.apply(TextIO.read().from(path)); |
| } |
| } |
| |
| /** |
| * Write data. |
| * NEMO-365: This method could later be replaced using the HadoopFormatIO class. |
| * @param dataToWrite data to write |
| * @param path path to write data |
| * @return returns {@link PDone} |
| */ |
| public static PDone write(final PCollection<String> dataToWrite, |
| final String path) { |
| if (isHDFSPath(path)) { |
| dataToWrite.apply(ParDo.of(new HDFSWrite(path))); |
| return PDone.in(dataToWrite.getPipeline()); |
| } else { |
| return dataToWrite.apply(TextIO.write().to(path)); |
| } |
| } |
| |
| /** |
| * Check if given path is HDFS path. |
| * @param path path to check |
| * @return boolean value indicating whether the path is HDFS path or not |
| */ |
| private static boolean isHDFSPath(final String path) { |
| return path.startsWith("hdfs://") || path.startsWith("s3a://") || path.startsWith("file://"); |
| } |
| } |
| |
| /** |
| * Write output to HDFS according to the parallelism. |
| */ |
| final class HDFSWrite extends DoFn<String, Void> { |
| private static final Logger LOG = LoggerFactory.getLogger(HDFSWrite.class.getName()); |
| |
| private final String path; |
| private Path fileName; |
| private FileSystem fileSystem; |
| private FSDataOutputStream outputStream; |
| |
| /** |
| * Constructor. |
| * |
| * @param path HDFS path |
| */ |
| HDFSWrite(final String path) { |
| this.path = path; |
| } |
| |
| /** |
| * Writes to exactly one file. |
| * (The number of total output files are determined according to the parallelism.) |
| * i.e. if parallelism is 2, then there are total 2 output files. |
| */ |
| @Setup |
| public void setup() { |
| // Creating a side-effect in Setup is discouraged, but we do it anyways for now as we're extending DoFn. |
| // TODO #273: Our custom HDFSWrite should implement WriteOperation |
| fileName = new Path(path + UUID.randomUUID().toString()); |
| try { |
| fileSystem = fileName.getFileSystem(new JobConf()); |
| outputStream = fileSystem.create(fileName, false); |
| } catch (IOException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| /** |
| * process element. |
| * @param c context {@link ProcessContext} |
| * @throws Exception exception. |
| */ |
| @ProcessElement |
| public void processElement(final ProcessContext c) throws Exception { |
| try { |
| outputStream.writeBytes(c.element() + "\n"); |
| } catch (Exception e) { |
| outputStream.close(); |
| fileSystem.delete(fileName, true); |
| throw new RuntimeException(e); |
| } |
| } |
| |
| /** |
| * Teardown. |
| * @throws IOException output stream exception |
| */ |
| @Teardown |
| public void tearDown() throws IOException { |
| outputStream.close(); |
| } |
| } |