| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hudi.io; |
| |
| import org.apache.hudi.client.WriteStatus; |
| import org.apache.hudi.common.engine.TaskContextSupplier; |
| import org.apache.hudi.common.fs.FSUtils; |
| import org.apache.hudi.common.model.HoodieRecord; |
| import org.apache.hudi.common.util.Option; |
| import org.apache.hudi.config.HoodieWriteConfig; |
| import org.apache.hudi.exception.HoodieException; |
| import org.apache.hudi.table.HoodieTable; |
| import org.apache.hudi.table.marker.WriteMarkers; |
| import org.apache.hudi.table.marker.WriteMarkersFactory; |
| |
| import org.apache.avro.Schema; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.log4j.LogManager; |
| import org.apache.log4j.Logger; |
| |
| import java.io.IOException; |
| import java.util.List; |
| |
| /** |
| * A {@link HoodieCreateHandle} that supports CREATE write incrementally(mini-batches). |
| * |
| * <p>For the first mini-batch, it initializes and sets up the next file path to write, |
| * then closes the file writer. The subsequent mini-batches are appended to a file with new name, |
| * the new file would then rename to this file name, |
| * behaves like each mini-batch data are appended to the same file. |
| * |
| * @see FlinkMergeAndReplaceHandle |
| */ |
| public class FlinkCreateHandle<T, I, K, O> |
| extends HoodieCreateHandle<T, I, K, O> implements MiniBatchHandle { |
| |
| private static final Logger LOG = LogManager.getLogger(FlinkCreateHandle.class); |
| |
| private boolean isClosed = false; |
| |
| public FlinkCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable, |
| String partitionPath, String fileId, TaskContextSupplier taskContextSupplier) { |
| this(config, instantTime, hoodieTable, partitionPath, fileId, Option.empty(), |
| taskContextSupplier); |
| } |
| |
| public FlinkCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable, |
| String partitionPath, String fileId, Option<Schema> schemaOption, |
| TaskContextSupplier taskContextSupplier) { |
| super(config, instantTime, hoodieTable, partitionPath, fileId, schemaOption, |
| taskContextSupplier); |
| // delete invalid data files generated by task retry. |
| if (getAttemptId() > 0) { |
| deleteInvalidDataFile(getAttemptId() - 1); |
| } |
| } |
| |
| /** |
| * The flink checkpoints start in sequence and asynchronously, when one write task finish the checkpoint(A) |
| * (thus the fs view got the written data files some of which may be invalid), |
| * it goes on with the next round checkpoint(B) write immediately, |
| * if it tries to reuse the last small data bucket(small file) of an invalid data file, |
| * finally, when the coordinator receives the checkpoint success event of checkpoint(A), |
| * the invalid data file would be cleaned, |
| * and this merger got a FileNotFoundException when it close the write file handle. |
| * |
| * <p> To solve, deletes the invalid data file eagerly |
| * so that the invalid file small bucket would never be reused. |
| * |
| * @param lastAttemptId The last attempt ID |
| */ |
| private void deleteInvalidDataFile(long lastAttemptId) { |
| final String lastWriteToken = FSUtils.makeWriteToken(getPartitionId(), getStageId(), lastAttemptId); |
| final String lastDataFileName = FSUtils.makeBaseFileName(instantTime, |
| lastWriteToken, this.fileId, hoodieTable.getBaseFileExtension()); |
| final Path path = makeNewFilePath(partitionPath, lastDataFileName); |
| try { |
| if (fs.exists(path)) { |
| LOG.info("Deleting invalid INSERT file due to task retry: " + lastDataFileName); |
| fs.delete(path, false); |
| } |
| } catch (IOException e) { |
| throw new HoodieException("Error while deleting the INSERT file due to task retry: " + lastDataFileName, e); |
| } |
| } |
| |
| @Override |
| protected void createMarkerFile(String partitionPath, String dataFileName) { |
| WriteMarkers writeMarkers = WriteMarkersFactory.get(config.getMarkersType(), hoodieTable, instantTime); |
| writeMarkers.createIfNotExists(partitionPath, dataFileName, getIOType()); |
| } |
| |
| @Override |
| public Path makeNewPath(String partitionPath) { |
| Path path = super.makeNewPath(partitionPath); |
| // If the data file already exists, it means the write task write new data bucket multiple times |
| // in one hoodie commit, rolls over to a new name instead. |
| |
| // Write to a new file which behaves like a different task write. |
| try { |
| int rollNumber = 0; |
| while (fs.exists(path)) { |
| Path existing = path; |
| path = newFilePathWithRollover(rollNumber++); |
| LOG.warn("Duplicate write for INSERT bucket with path: " + existing + ", rolls over to new path: " + path); |
| } |
| return path; |
| } catch (IOException e) { |
| throw new HoodieException("Checking existing path for create handle error: " + path, e); |
| } |
| } |
| |
| @Override |
| public boolean canWrite(HoodieRecord record) { |
| return true; |
| } |
| |
| /** |
| * Use the writeToken + "-" + rollNumber as the new writeToken of a mini-batch write. |
| */ |
| private Path newFilePathWithRollover(int rollNumber) { |
| final String dataFileName = FSUtils.makeBaseFileName(instantTime, writeToken + "-" + rollNumber, fileId, |
| hoodieTable.getBaseFileExtension()); |
| return makeNewFilePath(partitionPath, dataFileName); |
| } |
| |
| @Override |
| public List<WriteStatus> close() { |
| try { |
| return super.close(); |
| } finally { |
| this.isClosed = true; |
| } |
| } |
| |
| @Override |
| public void closeGracefully() { |
| if (isClosed) { |
| return; |
| } |
| try { |
| close(); |
| } catch (Throwable throwable) { |
| LOG.warn("Error while trying to dispose the CREATE handle", throwable); |
| try { |
| fs.delete(path, false); |
| LOG.info("Deleting the intermediate CREATE data file: " + path + " success!"); |
| } catch (IOException e) { |
| // logging a warning and ignore the exception. |
| LOG.warn("Deleting the intermediate CREATE data file: " + path + " failed", e); |
| } |
| } |
| } |
| |
| @Override |
| public Path getWritePath() { |
| return path; |
| } |
| } |