| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hudi.io.storage; |
| |
| import org.apache.hudi.avro.HoodieAvroUtils; |
| import org.apache.hudi.client.common.TaskContextSupplier; |
| import org.apache.hudi.common.bloom.BloomFilter; |
| import org.apache.hudi.common.fs.FSUtils; |
| import org.apache.hudi.common.fs.HoodieWrapperFileSystem; |
| import org.apache.hudi.common.model.HoodieRecord; |
| import org.apache.hudi.common.model.HoodieRecordPayload; |
| import org.apache.avro.Schema; |
| import org.apache.avro.generic.GenericRecord; |
| import org.apache.avro.generic.IndexedRecord; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.hbase.HColumnDescriptor; |
| import org.apache.hadoop.hbase.KeyValue; |
| import org.apache.hadoop.hbase.io.hfile.CacheConfig; |
| import org.apache.hadoop.hbase.io.hfile.HFile; |
| import org.apache.hadoop.hbase.io.hfile.HFileContext; |
| import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; |
| import org.apache.hadoop.io.Writable; |
| |
| import java.io.DataInput; |
| import java.io.DataOutput; |
| import java.io.IOException; |
| import java.util.concurrent.atomic.AtomicLong; |
| |
| /** |
| * HoodieHFileWriter writes IndexedRecords into an HFile. The record's key is used as the key and the |
| * AVRO encoded record bytes are saved as the value. |
| * |
| * Limitations (compared to columnar formats like Parquet or ORC): |
| * 1. Records should be added in order of keys |
| * 2. There are no column stats |
| */ |
| public class HoodieHFileWriter<T extends HoodieRecordPayload, R extends IndexedRecord> |
| implements HoodieFileWriter<R> { |
| private static AtomicLong recordIndex = new AtomicLong(1); |
| |
| private final Path file; |
| private HoodieHFileConfig hfileConfig; |
| private final HoodieWrapperFileSystem fs; |
| private final long maxFileSize; |
| private final String instantTime; |
| private final TaskContextSupplier taskContextSupplier; |
| private HFile.Writer writer; |
| private String minRecordKey; |
| private String maxRecordKey; |
| |
| // This is private in CacheConfig so have been copied here. |
| private static String DROP_BEHIND_CACHE_COMPACTION_KEY = "hbase.hfile.drop.behind.compaction"; |
| |
| public HoodieHFileWriter(String instantTime, Path file, HoodieHFileConfig hfileConfig, Schema schema, |
| TaskContextSupplier taskContextSupplier) throws IOException { |
| |
| Configuration conf = FSUtils.registerFileSystem(file, hfileConfig.getHadoopConf()); |
| this.file = HoodieWrapperFileSystem.convertToHoodiePath(file, conf); |
| this.fs = (HoodieWrapperFileSystem) this.file.getFileSystem(conf); |
| this.hfileConfig = hfileConfig; |
| |
| // TODO - compute this compression ratio dynamically by looking at the bytes written to the |
| // stream and the actual file size reported by HDFS |
| // this.maxFileSize = hfileConfig.getMaxFileSize() |
| // + Math.round(hfileConfig.getMaxFileSize() * hfileConfig.getCompressionRatio()); |
| this.maxFileSize = hfileConfig.getMaxFileSize(); |
| this.instantTime = instantTime; |
| this.taskContextSupplier = taskContextSupplier; |
| |
| HFileContext context = new HFileContextBuilder().withBlockSize(hfileConfig.getBlockSize()) |
| .withCompression(hfileConfig.getCompressionAlgorithm()) |
| .build(); |
| |
| conf.set(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, String.valueOf(hfileConfig.shouldPrefetchBlocksOnOpen())); |
| conf.set(HColumnDescriptor.CACHE_DATA_IN_L1, String.valueOf(hfileConfig.shouldCacheDataInL1())); |
| conf.set(DROP_BEHIND_CACHE_COMPACTION_KEY, String.valueOf(hfileConfig.shouldDropBehindCacheCompaction())); |
| CacheConfig cacheConfig = new CacheConfig(conf); |
| this.writer = HFile.getWriterFactory(conf, cacheConfig).withPath(this.fs, this.file).withFileContext(context).create(); |
| |
| writer.appendFileInfo(HoodieHFileReader.KEY_SCHEMA.getBytes(), schema.toString().getBytes()); |
| } |
| |
| @Override |
| public void writeAvroWithMetadata(R avroRecord, HoodieRecord record) throws IOException { |
| String seqId = |
| HoodieRecord.generateSequenceId(instantTime, taskContextSupplier.getPartitionIdSupplier().get(), recordIndex.getAndIncrement()); |
| HoodieAvroUtils.addHoodieKeyToRecord((GenericRecord) avroRecord, record.getRecordKey(), record.getPartitionPath(), |
| file.getName()); |
| HoodieAvroUtils.addCommitMetadataToRecord((GenericRecord) avroRecord, instantTime, seqId); |
| |
| writeAvro(record.getRecordKey(), (IndexedRecord)avroRecord); |
| } |
| |
| @Override |
| public boolean canWrite() { |
| return fs.getBytesWritten(file) < maxFileSize; |
| } |
| |
| @Override |
| public void writeAvro(String recordKey, IndexedRecord object) throws IOException { |
| byte[] value = HoodieAvroUtils.avroToBytes((GenericRecord)object); |
| KeyValue kv = new KeyValue(recordKey.getBytes(), null, null, value); |
| writer.append(kv); |
| |
| if (hfileConfig.useBloomFilter()) { |
| hfileConfig.getBloomFilter().add(recordKey); |
| if (minRecordKey != null) { |
| minRecordKey = minRecordKey.compareTo(recordKey) <= 0 ? minRecordKey : recordKey; |
| } else { |
| minRecordKey = recordKey; |
| } |
| |
| if (maxRecordKey != null) { |
| maxRecordKey = maxRecordKey.compareTo(recordKey) >= 0 ? maxRecordKey : recordKey; |
| } else { |
| maxRecordKey = recordKey; |
| } |
| } |
| } |
| |
| @Override |
| public void close() throws IOException { |
| if (hfileConfig.useBloomFilter()) { |
| final BloomFilter bloomFilter = hfileConfig.getBloomFilter(); |
| if (minRecordKey == null) { |
| minRecordKey = ""; |
| } |
| if (maxRecordKey == null) { |
| maxRecordKey = ""; |
| } |
| writer.appendFileInfo(HoodieHFileReader.KEY_MIN_RECORD.getBytes(), minRecordKey.getBytes()); |
| writer.appendFileInfo(HoodieHFileReader.KEY_MAX_RECORD.getBytes(), maxRecordKey.getBytes()); |
| writer.appendFileInfo(HoodieHFileReader.KEY_BLOOM_FILTER_TYPE_CODE.getBytes(), |
| bloomFilter.getBloomFilterTypeCode().toString().getBytes()); |
| writer.appendMetaBlock(HoodieHFileReader.KEY_BLOOM_FILTER_META_BLOCK, new Writable() { |
| @Override |
| public void write(DataOutput out) throws IOException { |
| out.write(bloomFilter.serializeToString().getBytes()); |
| } |
| |
| @Override |
| public void readFields(DataInput in) throws IOException { } |
| }); |
| } |
| |
| writer.close(); |
| writer = null; |
| } |
| } |