blob: 070d21b505b887e474fc8da604f38123eee24c1d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.stats;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.java.util.metrics.AbstractMonitor;
import org.apache.druid.java.util.metrics.MonitorUtils;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
import org.apache.druid.segment.realtime.FireDepartment;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import java.util.Map;
/**
* Replaces the old RealtimeMetricsMonitor for indexing tasks that use a single FireDepartment, with changes to
* read row ingestion stats from RowIngestionMeters (which supports moving averages) instead of FireDepartmentMetrics.
* See comment on RowIngestionMeters for more information regarding relationship between RowIngestionMeters and
* FireDepartmentMetrics.
*/
public class TaskRealtimeMetricsMonitor extends AbstractMonitor
{
private static final EmittingLogger log = new EmittingLogger(TaskRealtimeMetricsMonitor.class);
private final FireDepartment fireDepartment;
private final RowIngestionMeters rowIngestionMeters;
private final Map<String, String[]> dimensions;
private FireDepartmentMetrics previousFireDepartmentMetrics;
private RowIngestionMetersTotals previousRowIngestionMetersTotals;
public TaskRealtimeMetricsMonitor(
FireDepartment fireDepartment,
RowIngestionMeters rowIngestionMeters,
Map<String, String[]> dimensions
)
{
this.fireDepartment = fireDepartment;
this.rowIngestionMeters = rowIngestionMeters;
this.dimensions = ImmutableMap.copyOf(dimensions);
previousFireDepartmentMetrics = new FireDepartmentMetrics();
previousRowIngestionMetersTotals = new RowIngestionMetersTotals(0, 0, 0, 0);
}
@Override
public boolean doMonitor(ServiceEmitter emitter)
{
FireDepartmentMetrics metrics = fireDepartment.getMetrics().snapshot();
RowIngestionMetersTotals rowIngestionMetersTotals = rowIngestionMeters.getTotals();
final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.DATASOURCE, fireDepartment.getDataSchema().getDataSource());
MonitorUtils.addDimensionsToBuilder(builder, dimensions);
final long thrownAway = rowIngestionMetersTotals.getThrownAway() - previousRowIngestionMetersTotals.getThrownAway();
if (thrownAway > 0) {
log.warn(
"[%,d] events thrown away. Possible causes: null events, events filtered out by transformSpec, or events outside earlyMessageRejectionPeriod / lateMessageRejectionPeriod.",
thrownAway
);
}
emitter.emit(builder.build("ingest/events/thrownAway", thrownAway));
final long unparseable = rowIngestionMetersTotals.getUnparseable()
- previousRowIngestionMetersTotals.getUnparseable();
if (unparseable > 0) {
log.error("[%,d] unparseable events discarded. Turn on debug logging to see exception stack trace.", unparseable);
}
emitter.emit(builder.build("ingest/events/unparseable", unparseable));
final long processedWithError = rowIngestionMetersTotals.getProcessedWithError() - previousRowIngestionMetersTotals.getProcessedWithError();
if (processedWithError > 0) {
log.error("[%,d] events processed with errors! Set logParseExceptions to true in the ingestion spec to log these errors.", processedWithError);
}
emitter.emit(builder.build("ingest/events/processedWithError", processedWithError));
emitter.emit(builder.build("ingest/events/processed", rowIngestionMetersTotals.getProcessed() - previousRowIngestionMetersTotals.getProcessed()));
final long dedup = metrics.dedup() - previousFireDepartmentMetrics.dedup();
if (dedup > 0) {
log.warn("[%,d] duplicate events!", dedup);
}
emitter.emit(builder.build("ingest/events/duplicate", dedup));
emitter.emit(builder.build("ingest/rows/output", metrics.rowOutput() - previousFireDepartmentMetrics.rowOutput()));
emitter.emit(builder.build("ingest/persists/count", metrics.numPersists() - previousFireDepartmentMetrics.numPersists()));
emitter.emit(builder.build("ingest/persists/time", metrics.persistTimeMillis() - previousFireDepartmentMetrics.persistTimeMillis()));
emitter.emit(builder.build("ingest/persists/cpu", metrics.persistCpuTime() - previousFireDepartmentMetrics.persistCpuTime()));
emitter.emit(
builder.build(
"ingest/persists/backPressure",
metrics.persistBackPressureMillis() - previousFireDepartmentMetrics.persistBackPressureMillis()
)
);
emitter.emit(builder.build("ingest/persists/failed", metrics.failedPersists() - previousFireDepartmentMetrics.failedPersists()));
emitter.emit(builder.build("ingest/handoff/failed", metrics.failedHandoffs() - previousFireDepartmentMetrics.failedHandoffs()));
emitter.emit(builder.build("ingest/merge/time", metrics.mergeTimeMillis() - previousFireDepartmentMetrics.mergeTimeMillis()));
emitter.emit(builder.build("ingest/merge/cpu", metrics.mergeCpuTime() - previousFireDepartmentMetrics.mergeCpuTime()));
emitter.emit(builder.build("ingest/handoff/count", metrics.handOffCount() - previousFireDepartmentMetrics.handOffCount()));
emitter.emit(builder.build("ingest/sink/count", metrics.sinkCount()));
emitter.emit(builder.build("ingest/events/messageGap", metrics.messageGap()));
previousRowIngestionMetersTotals = rowIngestionMetersTotals;
previousFireDepartmentMetrics = metrics;
return true;
}
}