| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hudi.cli.commands; |
| |
| import org.apache.hudi.cli.HoodieCLI; |
| import org.apache.hudi.cli.HoodiePrintHelper; |
| import org.apache.hudi.cli.HoodieTableHeaderFields; |
| import org.apache.hudi.cli.TableHeader; |
| import org.apache.hudi.cli.utils.CommitUtil; |
| import org.apache.hudi.cli.utils.InputStreamConsumer; |
| import org.apache.hudi.cli.utils.SparkUtil; |
| import org.apache.hudi.common.model.HoodieCommitMetadata; |
| import org.apache.hudi.common.model.HoodieWriteStat; |
| import org.apache.hudi.common.table.HoodieTableMetaClient; |
| import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; |
| import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline; |
| import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline; |
| import org.apache.hudi.common.table.timeline.HoodieInstant; |
| import org.apache.hudi.common.table.timeline.HoodieTimeline; |
| import org.apache.hudi.common.util.NumericUtils; |
| import org.apache.hudi.common.util.StringUtils; |
| |
| import org.apache.spark.launcher.SparkLauncher; |
| import org.springframework.shell.core.CommandMarker; |
| import org.springframework.shell.core.annotation.CliCommand; |
| import org.springframework.shell.core.annotation.CliOption; |
| import org.springframework.stereotype.Component; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.function.Function; |
| import java.util.stream.Collectors; |
| |
| /** |
| * CLI command to display commits options. |
| */ |
| @Component |
| public class CommitsCommand implements CommandMarker { |
| |
| private String printCommits(HoodieDefaultTimeline timeline, |
| final Integer limit, final String sortByField, |
| final boolean descending, |
| final boolean headerOnly, |
| final String tempTableName) throws IOException { |
| final List<Comparable[]> rows = new ArrayList<>(); |
| |
| final List<HoodieInstant> commits = timeline.getCommitsTimeline().filterCompletedInstants() |
| .getInstants().collect(Collectors.toList()); |
| // timeline can be read from multiple files. So sort is needed instead of reversing the collection |
| Collections.sort(commits, HoodieInstant.COMPARATOR.reversed()); |
| |
| for (int i = 0; i < commits.size(); i++) { |
| final HoodieInstant commit = commits.get(i); |
| final HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes( |
| timeline.getInstantDetails(commit).get(), |
| HoodieCommitMetadata.class); |
| rows.add(new Comparable[]{commit.getTimestamp(), |
| commitMetadata.fetchTotalBytesWritten(), |
| commitMetadata.fetchTotalFilesInsert(), |
| commitMetadata.fetchTotalFilesUpdated(), |
| commitMetadata.fetchTotalPartitionsWritten(), |
| commitMetadata.fetchTotalRecordsWritten(), |
| commitMetadata.fetchTotalUpdateRecordsWritten(), |
| commitMetadata.fetchTotalWriteErrors()}); |
| } |
| |
| final Map<String, Function<Object, String>> fieldNameToConverterMap = new HashMap<>(); |
| fieldNameToConverterMap.put(HoodieTableHeaderFields.HEADER_TOTAL_BYTES_WRITTEN, entry -> { |
| return NumericUtils.humanReadableByteCount((Double.valueOf(entry.toString()))); |
| }); |
| |
| final TableHeader header = new TableHeader() |
| .addTableHeaderField(HoodieTableHeaderFields.HEADER_COMMIT_TIME) |
| .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_BYTES_WRITTEN) |
| .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_FILES_ADDED) |
| .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_FILES_UPDATED) |
| .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_PARTITIONS_WRITTEN) |
| .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_RECORDS_WRITTEN) |
| .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_UPDATE_RECORDS_WRITTEN) |
| .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_ERRORS); |
| |
| return HoodiePrintHelper.print(header, fieldNameToConverterMap, sortByField, descending, |
| limit, headerOnly, rows, tempTableName); |
| } |
| |
| private String printCommitsWithMetadata(HoodieDefaultTimeline timeline, |
| final Integer limit, final String sortByField, |
| final boolean descending, |
| final boolean headerOnly, |
| final String tempTableName) throws IOException { |
| final List<Comparable[]> rows = new ArrayList<>(); |
| |
| final List<HoodieInstant> commits = timeline.getCommitsTimeline().filterCompletedInstants() |
| .getInstants().collect(Collectors.toList()); |
| // timeline can be read from multiple files. So sort is needed instead of reversing the collection |
| Collections.sort(commits, HoodieInstant.COMPARATOR.reversed()); |
| |
| for (int i = 0; i < commits.size(); i++) { |
| final HoodieInstant commit = commits.get(i); |
| final HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes( |
| timeline.getInstantDetails(commit).get(), |
| HoodieCommitMetadata.class); |
| |
| for (Map.Entry<String, List<HoodieWriteStat>> partitionWriteStat : |
| commitMetadata.getPartitionToWriteStats().entrySet()) { |
| for (HoodieWriteStat hoodieWriteStat : partitionWriteStat.getValue()) { |
| rows.add(new Comparable[]{ commit.getAction(), commit.getTimestamp(), hoodieWriteStat.getPartitionPath(), |
| hoodieWriteStat.getFileId(), hoodieWriteStat.getPrevCommit(), hoodieWriteStat.getNumWrites(), |
| hoodieWriteStat.getNumInserts(), hoodieWriteStat.getNumDeletes(), |
| hoodieWriteStat.getNumUpdateWrites(), hoodieWriteStat.getTotalWriteErrors(), |
| hoodieWriteStat.getTotalLogBlocks(), hoodieWriteStat.getTotalCorruptLogBlock(), |
| hoodieWriteStat.getTotalRollbackBlocks(), hoodieWriteStat.getTotalLogRecords(), |
| hoodieWriteStat.getTotalUpdatedRecordsCompacted(), hoodieWriteStat.getTotalWriteBytes() |
| }); |
| } |
| } |
| } |
| |
| final Map<String, Function<Object, String>> fieldNameToConverterMap = new HashMap<>(); |
| fieldNameToConverterMap.put(HoodieTableHeaderFields.HEADER_TOTAL_BYTES_WRITTEN, entry -> { |
| return NumericUtils.humanReadableByteCount((Double.valueOf(entry.toString()))); |
| }); |
| |
| TableHeader header = new TableHeader().addTableHeaderField(HoodieTableHeaderFields.HEADER_ACTION) |
| .addTableHeaderField(HoodieTableHeaderFields.HEADER_INSTANT) |
| .addTableHeaderField(HoodieTableHeaderFields.HEADER_PARTITION) |
| .addTableHeaderField(HoodieTableHeaderFields.HEADER_FILE_ID) |
| .addTableHeaderField(HoodieTableHeaderFields.HEADER_PREVIOUS_COMMIT) |
| .addTableHeaderField(HoodieTableHeaderFields.HEADER_NUM_WRITES) |
| .addTableHeaderField(HoodieTableHeaderFields.HEADER_NUM_INSERTS) |
| .addTableHeaderField(HoodieTableHeaderFields.HEADER_NUM_DELETES) |
| .addTableHeaderField(HoodieTableHeaderFields.HEADER_NUM_UPDATE_WRITES) |
| .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_ERRORS) |
| .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_LOG_BLOCKS) |
| .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_CORRUPT_LOG_BLOCKS) |
| .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_ROLLBACK_BLOCKS) |
| .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_LOG_RECORDS) |
| .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_UPDATED_RECORDS_COMPACTED) |
| .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_BYTES_WRITTEN); |
| |
| return HoodiePrintHelper.print(header, new HashMap<>(), sortByField, descending, |
| limit, headerOnly, rows, tempTableName); |
| } |
| |
| @CliCommand(value = "commits show", help = "Show the commits") |
| public String showCommits( |
| @CliOption(key = {"includeExtraMetadata"}, help = "Include extra metadata", |
| unspecifiedDefaultValue = "false") final boolean includeExtraMetadata, |
| @CliOption(key = {"createView"}, mandatory = false, help = "view name to store output table", |
| unspecifiedDefaultValue = "") final String exportTableName, |
| @CliOption(key = {"limit"}, help = "Limit commits", |
| unspecifiedDefaultValue = "-1") final Integer limit, |
| @CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") final String sortByField, |
| @CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") final boolean descending, |
| @CliOption(key = {"headeronly"}, help = "Print Header Only", |
| unspecifiedDefaultValue = "false") final boolean headerOnly) |
| throws IOException { |
| |
| HoodieActiveTimeline activeTimeline = HoodieCLI.getTableMetaClient().getActiveTimeline(); |
| if (includeExtraMetadata) { |
| return printCommitsWithMetadata(activeTimeline, limit, sortByField, descending, headerOnly, exportTableName); |
| } else { |
| return printCommits(activeTimeline, limit, sortByField, descending, headerOnly, exportTableName); |
| } |
| } |
| |
| @CliCommand(value = "commits showarchived", help = "Show the archived commits") |
| public String showArchivedCommits( |
| @CliOption(key = {"includeExtraMetadata"}, help = "Include extra metadata", |
| unspecifiedDefaultValue = "false") final boolean includeExtraMetadata, |
| @CliOption(key = {"createView"}, mandatory = false, help = "view name to store output table", |
| unspecifiedDefaultValue = "") final String exportTableName, |
| @CliOption(key = {"startTs"}, mandatory = false, help = "start time for commits, default: now - 10 days") |
| String startTs, |
| @CliOption(key = {"endTs"}, mandatory = false, help = "end time for commits, default: now - 1 day") |
| String endTs, |
| @CliOption(key = {"limit"}, mandatory = false, help = "Limit commits", unspecifiedDefaultValue = "-1") |
| final Integer limit, |
| @CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") |
| final String sortByField, |
| @CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") |
| final boolean descending, |
| @CliOption(key = {"headeronly"}, help = "Print Header Only", unspecifiedDefaultValue = "false") |
| final boolean headerOnly) |
| throws IOException { |
| if (StringUtils.isNullOrEmpty(startTs)) { |
| startTs = CommitUtil.getTimeDaysAgo(10); |
| } |
| if (StringUtils.isNullOrEmpty(endTs)) { |
| endTs = CommitUtil.getTimeDaysAgo(1); |
| } |
| HoodieArchivedTimeline archivedTimeline = HoodieCLI.getTableMetaClient().getArchivedTimeline(); |
| try { |
| archivedTimeline.loadInstantDetailsInMemory(startTs, endTs); |
| HoodieDefaultTimeline timelineRange = archivedTimeline.findInstantsInRange(startTs, endTs); |
| if (includeExtraMetadata) { |
| return printCommitsWithMetadata(timelineRange, limit, sortByField, descending, headerOnly, exportTableName); |
| } else { |
| return printCommits(timelineRange, limit, sortByField, descending, headerOnly, exportTableName); |
| } |
| } finally { |
| // clear the instant details from memory after printing to reduce usage |
| archivedTimeline.clearInstantDetailsFromMemory(startTs, endTs); |
| } |
| } |
| |
| @CliCommand(value = "commit rollback", help = "Rollback a commit") |
| public String rollbackCommit(@CliOption(key = {"commit"}, help = "Commit to rollback") final String instantTime, |
| @CliOption(key = {"sparkProperties"}, help = "Spark Properties File Path") final String sparkPropertiesPath, |
| @CliOption(key = "sparkMaster", unspecifiedDefaultValue = "", help = "Spark Master") String master, |
| @CliOption(key = "sparkMemory", unspecifiedDefaultValue = "4G", |
| help = "Spark executor memory") final String sparkMemory) |
| throws Exception { |
| HoodieActiveTimeline activeTimeline = HoodieCLI.getTableMetaClient().getActiveTimeline(); |
| HoodieTimeline completedTimeline = activeTimeline.getCommitsTimeline().filterCompletedInstants(); |
| HoodieTimeline filteredTimeline = completedTimeline.filter(instant -> instant.getTimestamp().equals(instantTime)); |
| if (filteredTimeline.empty()) { |
| return "Commit " + instantTime + " not found in Commits " + completedTimeline; |
| } |
| |
| SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath); |
| sparkLauncher.addAppArgs(SparkMain.SparkCommand.ROLLBACK.toString(), master, sparkMemory, instantTime, |
| HoodieCLI.getTableMetaClient().getBasePath()); |
| Process process = sparkLauncher.launch(); |
| InputStreamConsumer.captureOutput(process); |
| int exitCode = process.waitFor(); |
| // Refresh the current |
| HoodieCLI.refreshTableMetadata(); |
| if (exitCode != 0) { |
| return "Commit " + instantTime + " failed to roll back"; |
| } |
| return "Commit " + instantTime + " rolled back"; |
| } |
| |
| @CliCommand(value = "commit showpartitions", help = "Show partition level details of a commit") |
| public String showCommitPartitions( |
| @CliOption(key = {"createView"}, mandatory = false, help = "view name to store output table", |
| unspecifiedDefaultValue = "") final String exportTableName, |
| @CliOption(key = {"commit"}, help = "Commit to show") final String instantTime, |
| @CliOption(key = {"limit"}, help = "Limit commits", unspecifiedDefaultValue = "-1") final Integer limit, |
| @CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") final String sortByField, |
| @CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") final boolean descending, |
| @CliOption(key = {"headeronly"}, help = "Print Header Only", |
| unspecifiedDefaultValue = "false") final boolean headerOnly) |
| throws Exception { |
| |
| HoodieActiveTimeline activeTimeline = HoodieCLI.getTableMetaClient().getActiveTimeline(); |
| HoodieTimeline timeline = activeTimeline.getCommitsTimeline().filterCompletedInstants(); |
| HoodieInstant commitInstant = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, instantTime); |
| |
| if (!timeline.containsInstant(commitInstant)) { |
| return "Commit " + instantTime + " not found in Commits " + timeline; |
| } |
| HoodieCommitMetadata meta = HoodieCommitMetadata.fromBytes(activeTimeline.getInstantDetails(commitInstant).get(), |
| HoodieCommitMetadata.class); |
| List<Comparable[]> rows = new ArrayList<>(); |
| for (Map.Entry<String, List<HoodieWriteStat>> entry : meta.getPartitionToWriteStats().entrySet()) { |
| String path = entry.getKey(); |
| List<HoodieWriteStat> stats = entry.getValue(); |
| long totalFilesAdded = 0; |
| long totalFilesUpdated = 0; |
| long totalRecordsUpdated = 0; |
| long totalRecordsInserted = 0; |
| long totalBytesWritten = 0; |
| long totalWriteErrors = 0; |
| for (HoodieWriteStat stat : stats) { |
| if (stat.getPrevCommit().equals(HoodieWriteStat.NULL_COMMIT)) { |
| totalFilesAdded += 1; |
| } else { |
| totalFilesUpdated += 1; |
| totalRecordsUpdated += stat.getNumUpdateWrites(); |
| } |
| totalRecordsInserted += stat.getNumInserts(); |
| totalBytesWritten += stat.getTotalWriteBytes(); |
| totalWriteErrors += stat.getTotalWriteErrors(); |
| } |
| rows.add(new Comparable[] {path, totalFilesAdded, totalFilesUpdated, totalRecordsInserted, totalRecordsUpdated, |
| totalBytesWritten, totalWriteErrors}); |
| } |
| |
| Map<String, Function<Object, String>> fieldNameToConverterMap = new HashMap<>(); |
| fieldNameToConverterMap.put(HoodieTableHeaderFields.HEADER_TOTAL_BYTES_WRITTEN, entry -> |
| NumericUtils.humanReadableByteCount((Long.parseLong(entry.toString())))); |
| |
| TableHeader header = new TableHeader().addTableHeaderField(HoodieTableHeaderFields.HEADER_PARTITION_PATH) |
| .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_FILES_ADDED) |
| .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_FILES_UPDATED) |
| .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_RECORDS_INSERTED) |
| .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_RECORDS_UPDATED) |
| .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_BYTES_WRITTEN) |
| .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_ERRORS); |
| |
| return HoodiePrintHelper.print(header, fieldNameToConverterMap, sortByField, descending, |
| limit, headerOnly, rows, exportTableName); |
| } |
| |
| @CliCommand(value = "commit show_write_stats", help = "Show write stats of a commit") |
| public String showWriteStats( |
| @CliOption(key = {"createView"}, mandatory = false, help = "view name to store output table", |
| unspecifiedDefaultValue = "") final String exportTableName, |
| @CliOption(key = {"commit"}, help = "Commit to show") final String instantTime, |
| @CliOption(key = {"limit"}, help = "Limit commits", unspecifiedDefaultValue = "-1") final Integer limit, |
| @CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") final String sortByField, |
| @CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") final boolean descending, |
| @CliOption(key = {"headeronly"}, help = "Print Header Only", |
| unspecifiedDefaultValue = "false") final boolean headerOnly) |
| throws Exception { |
| |
| HoodieActiveTimeline activeTimeline = HoodieCLI.getTableMetaClient().getActiveTimeline(); |
| HoodieTimeline timeline = activeTimeline.getCommitsTimeline().filterCompletedInstants(); |
| HoodieInstant commitInstant = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, instantTime); |
| |
| if (!timeline.containsInstant(commitInstant)) { |
| return "Commit " + instantTime + " not found in Commits " + timeline; |
| } |
| HoodieCommitMetadata meta = HoodieCommitMetadata.fromBytes(activeTimeline.getInstantDetails(commitInstant).get(), |
| HoodieCommitMetadata.class); |
| long recordsWritten = meta.fetchTotalRecordsWritten(); |
| long bytesWritten = meta.fetchTotalBytesWritten(); |
| long avgRecSize = (long) Math.ceil((1.0 * bytesWritten) / recordsWritten); |
| List<Comparable[]> rows = new ArrayList<>(); |
| rows.add(new Comparable[] {bytesWritten, recordsWritten, avgRecSize}); |
| |
| Map<String, Function<Object, String>> fieldNameToConverterMap = new HashMap<>(); |
| fieldNameToConverterMap.put(HoodieTableHeaderFields.HEADER_TOTAL_BYTES_WRITTEN, entry -> |
| NumericUtils.humanReadableByteCount((Long.parseLong(entry.toString())))); |
| |
| TableHeader header = new TableHeader().addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_BYTES_WRITTEN_COMMIT) |
| .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_RECORDS_WRITTEN_COMMIT) |
| .addTableHeaderField(HoodieTableHeaderFields.HEADER_AVG_REC_SIZE_COMMIT); |
| |
| return HoodiePrintHelper.print(header, fieldNameToConverterMap, sortByField, descending, |
| limit, headerOnly, rows, exportTableName); |
| } |
| |
| @CliCommand(value = "commit showfiles", help = "Show file level details of a commit") |
| public String showCommitFiles( |
| @CliOption(key = {"createView"}, mandatory = false, help = "view name to store output table", |
| unspecifiedDefaultValue = "") final String exportTableName, |
| @CliOption(key = {"commit"}, help = "Commit to show") final String instantTime, |
| @CliOption(key = {"limit"}, help = "Limit commits", unspecifiedDefaultValue = "-1") final Integer limit, |
| @CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") final String sortByField, |
| @CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") final boolean descending, |
| @CliOption(key = {"headeronly"}, help = "Print Header Only", |
| unspecifiedDefaultValue = "false") final boolean headerOnly) |
| throws Exception { |
| |
| HoodieActiveTimeline activeTimeline = HoodieCLI.getTableMetaClient().getActiveTimeline(); |
| HoodieTimeline timeline = activeTimeline.getCommitsTimeline().filterCompletedInstants(); |
| HoodieInstant commitInstant = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, instantTime); |
| |
| if (!timeline.containsInstant(commitInstant)) { |
| return "Commit " + instantTime + " not found in Commits " + timeline; |
| } |
| HoodieCommitMetadata meta = HoodieCommitMetadata.fromBytes(activeTimeline.getInstantDetails(commitInstant).get(), |
| HoodieCommitMetadata.class); |
| List<Comparable[]> rows = new ArrayList<>(); |
| for (Map.Entry<String, List<HoodieWriteStat>> entry : meta.getPartitionToWriteStats().entrySet()) { |
| String path = entry.getKey(); |
| List<HoodieWriteStat> stats = entry.getValue(); |
| for (HoodieWriteStat stat : stats) { |
| rows.add(new Comparable[] {path, stat.getFileId(), stat.getPrevCommit(), stat.getNumUpdateWrites(), |
| stat.getNumWrites(), stat.getTotalWriteBytes(), stat.getTotalWriteErrors(), stat.getFileSizeInBytes()}); |
| } |
| } |
| |
| TableHeader header = new TableHeader().addTableHeaderField(HoodieTableHeaderFields.HEADER_PARTITION_PATH) |
| .addTableHeaderField(HoodieTableHeaderFields.HEADER_FILE_ID) |
| .addTableHeaderField(HoodieTableHeaderFields.HEADER_PREVIOUS_COMMIT) |
| .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_RECORDS_UPDATED) |
| .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_RECORDS_WRITTEN) |
| .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_BYTES_WRITTEN) |
| .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_ERRORS) |
| .addTableHeaderField(HoodieTableHeaderFields.HEADER_FILE_SIZE); |
| |
| return HoodiePrintHelper.print(header, new HashMap<>(), sortByField, descending, |
| limit, headerOnly, rows, exportTableName); |
| } |
| |
| @CliCommand(value = "commits compare", help = "Compare commits with another Hoodie table") |
| public String compareCommits(@CliOption(key = {"path"}, help = "Path of the table to compare to") final String path) { |
| |
| HoodieTableMetaClient source = HoodieCLI.getTableMetaClient(); |
| HoodieTableMetaClient target = HoodieTableMetaClient.builder().setConf(HoodieCLI.conf).setBasePath(path).build(); |
| HoodieTimeline targetTimeline = target.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); |
| HoodieTimeline sourceTimeline = source.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); |
| String targetLatestCommit = |
| targetTimeline.getInstants().iterator().hasNext() ? targetTimeline.lastInstant().get().getTimestamp() : "0"; |
| String sourceLatestCommit = |
| sourceTimeline.getInstants().iterator().hasNext() ? sourceTimeline.lastInstant().get().getTimestamp() : "0"; |
| |
| if (sourceLatestCommit != null |
| && HoodieTimeline.compareTimestamps(targetLatestCommit, HoodieTimeline.GREATER_THAN, sourceLatestCommit)) { |
| // source is behind the target |
| List<String> commitsToCatchup = targetTimeline.findInstantsAfter(sourceLatestCommit, Integer.MAX_VALUE) |
| .getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toList()); |
| return "Source " + source.getTableConfig().getTableName() + " is behind by " + commitsToCatchup.size() |
| + " commits. Commits to catch up - " + commitsToCatchup; |
| } else { |
| List<String> commitsToCatchup = sourceTimeline.findInstantsAfter(targetLatestCommit, Integer.MAX_VALUE) |
| .getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toList()); |
| return "Source " + source.getTableConfig().getTableName() + " is ahead by " + commitsToCatchup.size() |
| + " commits. Commits to catch up - " + commitsToCatchup; |
| } |
| } |
| |
| @CliCommand(value = "commits sync", help = "Compare commits with another Hoodie table") |
| public String syncCommits(@CliOption(key = {"path"}, help = "Path of the table to compare to") final String path) { |
| HoodieCLI.syncTableMetadata = HoodieTableMetaClient.builder().setConf(HoodieCLI.conf).setBasePath(path).build(); |
| HoodieCLI.state = HoodieCLI.CLIState.SYNC; |
| return "Load sync state between " + HoodieCLI.getTableMetaClient().getTableConfig().getTableName() + " and " |
| + HoodieCLI.syncTableMetadata.getTableConfig().getTableName(); |
| } |
| } |