blob: 3ba021b2e64fd5a1b31278f030c8907e43d6c127 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.common.table.view;
import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTimeline;
import org.apache.hudi.common.table.TableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hadoop.fs.FileStatus;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* TableFileSystemView Implementations based on in-memory storage.
*
* @see TableFileSystemView
* @since 0.3.0
*/
public class HoodieTableFileSystemView extends IncrementalTimelineSyncFileSystemView {
private static final Logger LOG = LogManager.getLogger(HoodieTableFileSystemView.class);
// mapping from partition paths to file groups contained within them
protected Map<String, List<HoodieFileGroup>> partitionToFileGroupsMap;
/**
* PartitionPath + File-Id to pending compaction instant time.
*/
protected Map<HoodieFileGroupId, Pair<String, CompactionOperation>> fgIdToPendingCompaction;
/**
* Flag to determine if closed.
*/
private boolean closed = false;
HoodieTableFileSystemView(boolean enableIncrementalTimelineSync) {
super(enableIncrementalTimelineSync);
}
/**
* Create a file system view, as of the given timeline.
*/
public HoodieTableFileSystemView(HoodieTableMetaClient metaClient, HoodieTimeline visibleActiveTimeline) {
this(metaClient, visibleActiveTimeline, false);
}
/**
* Create a file system view, as of the given timeline.
*/
public HoodieTableFileSystemView(HoodieTableMetaClient metaClient, HoodieTimeline visibleActiveTimeline,
boolean enableIncrementalTimelineSync) {
super(enableIncrementalTimelineSync);
init(metaClient, visibleActiveTimeline);
}
@Override
public void init(HoodieTableMetaClient metaClient, HoodieTimeline visibleActiveTimeline) {
this.partitionToFileGroupsMap = createPartitionToFileGroups();
super.init(metaClient, visibleActiveTimeline);
}
public void init(HoodieTableMetaClient metaClient, HoodieTimeline visibleActiveTimeline,
FileStatus[] fileStatuses) {
init(metaClient, visibleActiveTimeline);
addFilesToView(fileStatuses);
}
@Override
protected void resetViewState() {
this.fgIdToPendingCompaction = null;
this.partitionToFileGroupsMap = null;
}
protected Map<String, List<HoodieFileGroup>> createPartitionToFileGroups() {
return new ConcurrentHashMap<>();
}
protected Map<HoodieFileGroupId, Pair<String, CompactionOperation>> createFileIdToPendingCompactionMap(
Map<HoodieFileGroupId, Pair<String, CompactionOperation>> fileIdToPendingCompaction) {
return fileIdToPendingCompaction;
}
/**
* Create a file system view, as of the given timeline, with the provided file statuses.
*/
public HoodieTableFileSystemView(HoodieTableMetaClient metaClient, HoodieTimeline visibleActiveTimeline,
FileStatus[] fileStatuses) {
this(metaClient, visibleActiveTimeline);
addFilesToView(fileStatuses);
}
/**
* This method is only used when this object is deserialized in a spark executor.
*
* @deprecated
*/
private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException {
in.defaultReadObject();
}
private void writeObject(java.io.ObjectOutputStream out) throws IOException {
out.defaultWriteObject();
}
@Override
protected boolean isPendingCompactionScheduledForFileId(HoodieFileGroupId fgId) {
return fgIdToPendingCompaction.containsKey(fgId);
}
@Override
protected void resetPendingCompactionOperations(Stream<Pair<String, CompactionOperation>> operations) {
// Build fileId to Pending Compaction Instants
this.fgIdToPendingCompaction = createFileIdToPendingCompactionMap(operations.map(entry ->
Pair.of(entry.getValue().getFileGroupId(), Pair.of(entry.getKey(), entry.getValue()))).collect(Collectors.toMap(Pair::getKey, Pair::getValue)));
}
@Override
protected void addPendingCompactionOperations(Stream<Pair<String, CompactionOperation>> operations) {
operations.forEach(opInstantPair -> {
ValidationUtils.checkArgument(!fgIdToPendingCompaction.containsKey(opInstantPair.getValue().getFileGroupId()),
"Duplicate FileGroupId found in pending compaction operations. FgId :"
+ opInstantPair.getValue().getFileGroupId());
fgIdToPendingCompaction.put(opInstantPair.getValue().getFileGroupId(),
Pair.of(opInstantPair.getKey(), opInstantPair.getValue()));
});
}
@Override
protected void removePendingCompactionOperations(Stream<Pair<String, CompactionOperation>> operations) {
operations.forEach(opInstantPair -> {
ValidationUtils.checkArgument(fgIdToPendingCompaction.containsKey(opInstantPair.getValue().getFileGroupId()),
"Trying to remove a FileGroupId which is not found in pending compaction operations. FgId :"
+ opInstantPair.getValue().getFileGroupId());
fgIdToPendingCompaction.remove(opInstantPair.getValue().getFileGroupId());
});
}
/**
* Given a partition path, obtain all filegroups within that. All methods, that work at the partition level go through
* this.
*/
@Override
Stream<HoodieFileGroup> fetchAllStoredFileGroups(String partition) {
final List<HoodieFileGroup> fileGroups = new ArrayList<>(partitionToFileGroupsMap.get(partition));
return fileGroups.stream();
}
public Stream<HoodieFileGroup> getAllFileGroups() {
return fetchAllStoredFileGroups();
}
@Override
Stream<Pair<String, CompactionOperation>> fetchPendingCompactionOperations() {
return fgIdToPendingCompaction.values().stream();
}
@Override
protected Option<Pair<String, CompactionOperation>> getPendingCompactionOperationWithInstant(HoodieFileGroupId fgId) {
return Option.ofNullable(fgIdToPendingCompaction.get(fgId));
}
@Override
protected boolean isPartitionAvailableInStore(String partitionPath) {
return partitionToFileGroupsMap.containsKey(partitionPath);
}
@Override
protected void storePartitionView(String partitionPath, List<HoodieFileGroup> fileGroups) {
LOG.info("Adding file-groups for partition :" + partitionPath + ", #FileGroups=" + fileGroups.size());
List<HoodieFileGroup> newList = new ArrayList<>(fileGroups);
partitionToFileGroupsMap.put(partitionPath, newList);
}
@Override
public Stream<HoodieFileGroup> fetchAllStoredFileGroups() {
return partitionToFileGroupsMap.values().stream().flatMap(Collection::stream);
}
@Override
public void close() {
closed = true;
super.reset();
partitionToFileGroupsMap = null;
fgIdToPendingCompaction = null;
}
@Override
public boolean isClosed() {
return closed;
}
}