blob: 292039b83ad41421af6f15368ef68b7087973b35 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.metrics;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.config.HoodieWriteConfig;
import com.codahale.metrics.Timer;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
/**
* Wrapper for metrics-related operations.
*/
public class HoodieMetrics {
private static final Logger LOG = LogManager.getLogger(HoodieMetrics.class);
// Some timers
public String rollbackTimerName = null;
public String cleanTimerName = null;
public String commitTimerName = null;
public String deltaCommitTimerName = null;
public String finalizeTimerName = null;
public String compactionTimerName = null;
public String indexTimerName = null;
private HoodieWriteConfig config;
private String tableName;
private Timer rollbackTimer = null;
private Timer cleanTimer = null;
private Timer commitTimer = null;
private Timer deltaCommitTimer = null;
private Timer finalizeTimer = null;
private Timer compactionTimer = null;
private Timer indexTimer = null;
public HoodieMetrics(HoodieWriteConfig config, String tableName) {
this.config = config;
this.tableName = tableName;
if (config.isMetricsOn()) {
Metrics.init(config);
this.rollbackTimerName = getMetricsName("timer", HoodieTimeline.ROLLBACK_ACTION);
this.cleanTimerName = getMetricsName("timer", HoodieTimeline.CLEAN_ACTION);
this.commitTimerName = getMetricsName("timer", HoodieTimeline.COMMIT_ACTION);
this.deltaCommitTimerName = getMetricsName("timer", HoodieTimeline.DELTA_COMMIT_ACTION);
this.finalizeTimerName = getMetricsName("timer", "finalize");
this.compactionTimerName = getMetricsName("timer", HoodieTimeline.COMPACTION_ACTION);
this.indexTimerName = getMetricsName("timer", "index");
}
}
private Timer createTimer(String name) {
return config.isMetricsOn() ? Metrics.getInstance().getRegistry().timer(name) : null;
}
public Timer.Context getRollbackCtx() {
if (config.isMetricsOn() && rollbackTimer == null) {
rollbackTimer = createTimer(rollbackTimerName);
}
return rollbackTimer == null ? null : rollbackTimer.time();
}
public Timer.Context getCompactionCtx() {
if (config.isMetricsOn() && compactionTimer == null) {
compactionTimer = createTimer(commitTimerName);
}
return compactionTimer == null ? null : compactionTimer.time();
}
public Timer.Context getCleanCtx() {
if (config.isMetricsOn() && cleanTimer == null) {
cleanTimer = createTimer(cleanTimerName);
}
return cleanTimer == null ? null : cleanTimer.time();
}
public Timer.Context getCommitCtx() {
if (config.isMetricsOn() && commitTimer == null) {
commitTimer = createTimer(commitTimerName);
}
return commitTimer == null ? null : commitTimer.time();
}
public Timer.Context getFinalizeCtx() {
if (config.isMetricsOn() && finalizeTimer == null) {
finalizeTimer = createTimer(finalizeTimerName);
}
return finalizeTimer == null ? null : finalizeTimer.time();
}
public Timer.Context getDeltaCommitCtx() {
if (config.isMetricsOn() && deltaCommitTimer == null) {
deltaCommitTimer = createTimer(deltaCommitTimerName);
}
return deltaCommitTimer == null ? null : deltaCommitTimer.time();
}
public Timer.Context getIndexCtx() {
if (config.isMetricsOn() && indexTimer == null) {
indexTimer = createTimer(indexTimerName);
}
return indexTimer == null ? null : indexTimer.time();
}
public void updateCommitMetrics(long commitEpochTimeInMs, long durationInMs, HoodieCommitMetadata metadata,
String actionType) {
if (config.isMetricsOn()) {
long totalPartitionsWritten = metadata.fetchTotalPartitionsWritten();
long totalFilesInsert = metadata.fetchTotalFilesInsert();
long totalFilesUpdate = metadata.fetchTotalFilesUpdated();
long totalRecordsWritten = metadata.fetchTotalRecordsWritten();
long totalUpdateRecordsWritten = metadata.fetchTotalUpdateRecordsWritten();
long totalInsertRecordsWritten = metadata.fetchTotalInsertRecordsWritten();
long totalBytesWritten = metadata.fetchTotalBytesWritten();
long totalTimeTakenByScanner = metadata.getTotalScanTime();
long totalTimeTakenForInsert = metadata.getTotalCreateTime();
long totalTimeTakenForUpsert = metadata.getTotalUpsertTime();
long totalCompactedRecordsUpdated = metadata.getTotalCompactedRecordsUpdated();
long totalLogFilesCompacted = metadata.getTotalLogFilesCompacted();
long totalLogFilesSize = metadata.getTotalLogFilesSize();
Metrics.registerGauge(getMetricsName(actionType, "duration"), durationInMs);
Metrics.registerGauge(getMetricsName(actionType, "totalPartitionsWritten"), totalPartitionsWritten);
Metrics.registerGauge(getMetricsName(actionType, "totalFilesInsert"), totalFilesInsert);
Metrics.registerGauge(getMetricsName(actionType, "totalFilesUpdate"), totalFilesUpdate);
Metrics.registerGauge(getMetricsName(actionType, "totalRecordsWritten"), totalRecordsWritten);
Metrics.registerGauge(getMetricsName(actionType, "totalUpdateRecordsWritten"), totalUpdateRecordsWritten);
Metrics.registerGauge(getMetricsName(actionType, "totalInsertRecordsWritten"), totalInsertRecordsWritten);
Metrics.registerGauge(getMetricsName(actionType, "totalBytesWritten"), totalBytesWritten);
Metrics.registerGauge(getMetricsName(actionType, "commitTime"), commitEpochTimeInMs);
Metrics.registerGauge(getMetricsName(actionType, "totalScanTime"), totalTimeTakenByScanner);
Metrics.registerGauge(getMetricsName(actionType, "totalCreateTime"), totalTimeTakenForInsert);
Metrics.registerGauge(getMetricsName(actionType, "totalUpsertTime"), totalTimeTakenForUpsert);
Metrics.registerGauge(getMetricsName(actionType, "totalCompactedRecordsUpdated"), totalCompactedRecordsUpdated);
Metrics.registerGauge(getMetricsName(actionType, "totalLogFilesCompacted"), totalLogFilesCompacted);
Metrics.registerGauge(getMetricsName(actionType, "totalLogFilesSize"), totalLogFilesSize);
}
}
public void updateRollbackMetrics(long durationInMs, long numFilesDeleted) {
if (config.isMetricsOn()) {
LOG.info(
String.format("Sending rollback metrics (duration=%d, numFilesDeleted=%d)", durationInMs, numFilesDeleted));
Metrics.registerGauge(getMetricsName("rollback", "duration"), durationInMs);
Metrics.registerGauge(getMetricsName("rollback", "numFilesDeleted"), numFilesDeleted);
}
}
public void updateCleanMetrics(long durationInMs, int numFilesDeleted) {
if (config.isMetricsOn()) {
LOG.info(
String.format("Sending clean metrics (duration=%d, numFilesDeleted=%d)", durationInMs, numFilesDeleted));
Metrics.registerGauge(getMetricsName("clean", "duration"), durationInMs);
Metrics.registerGauge(getMetricsName("clean", "numFilesDeleted"), numFilesDeleted);
}
}
public void updateFinalizeWriteMetrics(long durationInMs, long numFilesFinalized) {
if (config.isMetricsOn()) {
LOG.info(String.format("Sending finalize write metrics (duration=%d, numFilesFinalized=%d)", durationInMs,
numFilesFinalized));
Metrics.registerGauge(getMetricsName("finalize", "duration"), durationInMs);
Metrics.registerGauge(getMetricsName("finalize", "numFilesFinalized"), numFilesFinalized);
}
}
public void updateIndexMetrics(final String action, final long durationInMs) {
if (config.isMetricsOn()) {
LOG.info(String.format("Sending index metrics (%s.duration, %d)", action, durationInMs));
Metrics.registerGauge(getMetricsName("index", String.format("%s.duration", action)), durationInMs);
}
}
String getMetricsName(String action, String metric) {
return config == null ? null : String.format("%s.%s.%s", tableName, action, metric);
}
/**
* By default, the timer context returns duration with nano seconds. Convert it to millisecond.
*/
public long getDurationInMs(long ctxDuration) {
return ctxDuration / 1000000;
}
}