| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.flink.formats.sequencefile; |
| |
| import org.apache.flink.annotation.PublicEvolving; |
| import org.apache.flink.api.common.serialization.BulkWriter; |
| import org.apache.flink.api.java.tuple.Tuple2; |
| import org.apache.flink.core.fs.FSDataOutputStream; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.io.SequenceFile; |
| import org.apache.hadoop.io.Writable; |
| import org.apache.hadoop.io.compress.CompressionCodec; |
| import org.apache.hadoop.io.compress.CompressionCodecFactory; |
| |
| import java.io.IOException; |
| |
| import static org.apache.flink.util.Preconditions.checkNotNull; |
| |
| /** |
| * A factory that creates a SequenceFile {@link BulkWriter}. |
| * |
| * @param <K> The type of key to write. It should be writable. |
| * @param <V> The type of value to write. It should be writable. |
| */ |
| @PublicEvolving |
| public class SequenceFileWriterFactory<K extends Writable, V extends Writable> |
| implements BulkWriter.Factory<Tuple2<K, V>> { |
| |
| private static final long serialVersionUID = 1L; |
| |
| /** A constant specifying that no compression is requested. */ |
| public static final String NO_COMPRESSION = "NO_COMPRESSION"; |
| |
| private final SerializableHadoopConfiguration serializableHadoopConfig; |
| private final Class<K> keyClass; |
| private final Class<V> valueClass; |
| private final String compressionCodecName; |
| private final SequenceFile.CompressionType compressionType; |
| |
| /** |
| * Creates a new SequenceFileWriterFactory using the given builder to assemble the |
| * SequenceFileWriter. |
| * |
| * @param hadoopConf The Hadoop configuration for Sequence File Writer. |
| * @param keyClass The class of key to write. |
| * @param valueClass The class of value to write. |
| */ |
| public SequenceFileWriterFactory( |
| Configuration hadoopConf, Class<K> keyClass, Class<V> valueClass) { |
| this(hadoopConf, keyClass, valueClass, NO_COMPRESSION, SequenceFile.CompressionType.BLOCK); |
| } |
| |
| /** |
| * Creates a new SequenceFileWriterFactory using the given builder to assemble the |
| * SequenceFileWriter. |
| * |
| * @param hadoopConf The Hadoop configuration for Sequence File Writer. |
| * @param keyClass The class of key to write. |
| * @param valueClass The class of value to write. |
| * @param compressionCodecName The name of compression codec. |
| */ |
| public SequenceFileWriterFactory( |
| Configuration hadoopConf, |
| Class<K> keyClass, |
| Class<V> valueClass, |
| String compressionCodecName) { |
| this( |
| hadoopConf, |
| keyClass, |
| valueClass, |
| compressionCodecName, |
| SequenceFile.CompressionType.BLOCK); |
| } |
| |
| /** |
| * Creates a new SequenceFileWriterFactory using the given builder to assemble the |
| * SequenceFileWriter. |
| * |
| * @param hadoopConf The Hadoop configuration for Sequence File Writer. |
| * @param keyClass The class of key to write. |
| * @param valueClass The class of value to write. |
| * @param compressionCodecName The name of compression codec. |
| * @param compressionType The type of compression level. |
| */ |
| public SequenceFileWriterFactory( |
| Configuration hadoopConf, |
| Class<K> keyClass, |
| Class<V> valueClass, |
| String compressionCodecName, |
| SequenceFile.CompressionType compressionType) { |
| this.serializableHadoopConfig = |
| new SerializableHadoopConfiguration(checkNotNull(hadoopConf)); |
| this.keyClass = checkNotNull(keyClass); |
| this.valueClass = checkNotNull(valueClass); |
| this.compressionCodecName = checkNotNull(compressionCodecName); |
| this.compressionType = checkNotNull(compressionType); |
| } |
| |
| @Override |
| public SequenceFileWriter<K, V> create(FSDataOutputStream out) throws IOException { |
| org.apache.hadoop.fs.FSDataOutputStream stream = |
| new org.apache.hadoop.fs.FSDataOutputStream(out, null); |
| CompressionCodec compressionCodec = |
| getCompressionCodec(serializableHadoopConfig.get(), compressionCodecName); |
| SequenceFile.Writer writer = |
| SequenceFile.createWriter( |
| serializableHadoopConfig.get(), |
| SequenceFile.Writer.stream(stream), |
| SequenceFile.Writer.keyClass(keyClass), |
| SequenceFile.Writer.valueClass(valueClass), |
| SequenceFile.Writer.compression(compressionType, compressionCodec)); |
| return new SequenceFileWriter<>(writer); |
| } |
| |
| private CompressionCodec getCompressionCodec(Configuration conf, String compressionCodecName) { |
| checkNotNull(conf); |
| checkNotNull(compressionCodecName); |
| |
| if (compressionCodecName.equals(NO_COMPRESSION)) { |
| return null; |
| } |
| |
| CompressionCodecFactory codecFactory = new CompressionCodecFactory(conf); |
| CompressionCodec codec = codecFactory.getCodecByName(compressionCodecName); |
| if (codec == null) { |
| throw new RuntimeException("Codec " + compressionCodecName + " not found."); |
| } |
| return codec; |
| } |
| } |