blob: a0572601e8f80960dd44c896525e0e311fbb835d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.direct.portable;
import static java.util.Arrays.asList;
import java.util.ArrayList;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.apache.beam.runners.core.metrics.DistributionData;
import org.apache.beam.runners.core.metrics.GaugeData;
import org.apache.beam.runners.core.metrics.MetricUpdates;
import org.apache.beam.runners.core.metrics.MetricUpdates.MetricUpdate;
import org.apache.beam.runners.core.metrics.MetricsMap;
import org.apache.beam.sdk.metrics.DistributionResult;
import org.apache.beam.sdk.metrics.GaugeResult;
import org.apache.beam.sdk.metrics.MetricFiltering;
import org.apache.beam.sdk.metrics.MetricKey;
import org.apache.beam.sdk.metrics.MetricQueryResults;
import org.apache.beam.sdk.metrics.MetricResult;
import org.apache.beam.sdk.metrics.MetricResults;
import org.apache.beam.sdk.metrics.MetricsFilter;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.MoreExecutors;
import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.ThreadFactoryBuilder;
/** Implementation of {@link MetricResults} for the Direct Runner. */
class DirectMetrics extends MetricResults {
// TODO: (BEAM-723) Create a shared ExecutorService for maintenance tasks in the DirectRunner.
private static final ExecutorService COUNTER_COMMITTER =
Executors.newCachedThreadPool(
new ThreadFactoryBuilder()
.setThreadFactory(MoreExecutors.platformThreadFactory())
.setDaemon(true)
.setNameFormat("direct-metrics-counter-committer")
.build());
private interface MetricAggregation<UpdateT, ResultT> {
UpdateT zero();
UpdateT combine(Iterable<UpdateT> updates);
ResultT extract(UpdateT data);
}
/**
* Implementation of a metric in the direct runner.
*
* @param <UpdateT> The type of raw data received and aggregated across updates.
* @param <ResultT> The type of result extracted from the data.
*/
private static class DirectMetric<UpdateT, ResultT> {
private final MetricAggregation<UpdateT, ResultT> aggregation;
private final AtomicReference<UpdateT> finishedCommitted;
private final Object attemptedLock = new Object();
@GuardedBy("attemptedLock")
private volatile UpdateT finishedAttempted;
private final ConcurrentMap<CommittedBundle<?>, UpdateT> inflightAttempted =
new ConcurrentHashMap<>();
public DirectMetric(MetricAggregation<UpdateT, ResultT> aggregation) {
this.aggregation = aggregation;
finishedCommitted = new AtomicReference<>(aggregation.zero());
finishedAttempted = aggregation.zero();
}
/**
* Add the given {@code tentativeCumulative} update to the physical aggregate.
*
* @param bundle The bundle receiving an update.
* @param tentativeCumulative The new cumulative value for the given bundle.
*/
public void updatePhysical(CommittedBundle<?> bundle, UpdateT tentativeCumulative) {
// Add (or update) the cumulatiev value for the given bundle.
inflightAttempted.put(bundle, tentativeCumulative);
}
/**
* Commit a physical value for the given {@code bundle}.
*
* @param bundle The bundle being committed.
* @param finalCumulative The final cumulative value for the given bundle.
*/
@SuppressWarnings("FutureReturnValueIgnored") // direct runner metrics are best-effort;
// we choose not to block on async commit
public void commitPhysical(final CommittedBundle<?> bundle, final UpdateT finalCumulative) {
// To prevent a query from blocking the commit, we perform the commit in two steps.
// 1. We perform a non-blocking write to the uncommitted table to make the new value
// available immediately.
// 2. We submit a runnable that will commit the update and remove the tentative value in
// a synchronized block.
inflightAttempted.put(bundle, finalCumulative);
COUNTER_COMMITTER.submit(
() -> {
synchronized (attemptedLock) {
finishedAttempted = aggregation.combine(asList(finishedAttempted, finalCumulative));
inflightAttempted.remove(bundle);
}
});
}
/** Extract the latest values from all attempted and in-progress bundles. */
public ResultT extractLatestAttempted() {
ArrayList<UpdateT> updates = new ArrayList<>(inflightAttempted.size() + 1);
// Within this block we know that will be consistent. Specifically, the only change that can
// happen concurrently is the addition of new (larger) values to inflightAttempted.
synchronized (attemptedLock) {
updates.add(finishedAttempted);
updates.addAll(inflightAttempted.values());
}
return aggregation.extract(aggregation.combine(updates));
}
/**
* Commit a logical value for the given {@code bundle}.
*
* @param bundle The bundle being committed.
* @param finalCumulative The final cumulative value for the given bundle.
*/
public void commitLogical(final CommittedBundle<?> bundle, final UpdateT finalCumulative) {
UpdateT current;
do {
current = finishedCommitted.get();
} while (!finishedCommitted.compareAndSet(
current, aggregation.combine(asList(current, finalCumulative))));
}
/** Extract the value from all successfully committed bundles. */
public ResultT extractCommitted() {
return aggregation.extract(finishedCommitted.get());
}
}
private static final MetricAggregation<Long, Long> COUNTER =
new MetricAggregation<Long, Long>() {
@Override
public Long zero() {
return 0L;
}
@Override
public Long combine(Iterable<Long> updates) {
long value = 0;
for (long update : updates) {
value += update;
}
return value;
}
@Override
public Long extract(Long data) {
return data;
}
};
private static final MetricAggregation<DistributionData, DistributionResult> DISTRIBUTION =
new MetricAggregation<DistributionData, DistributionResult>() {
@Override
public DistributionData zero() {
return DistributionData.EMPTY;
}
@Override
public DistributionData combine(Iterable<DistributionData> updates) {
DistributionData result = DistributionData.EMPTY;
for (DistributionData update : updates) {
result = result.combine(update);
}
return result;
}
@Override
public DistributionResult extract(DistributionData data) {
return data.extractResult();
}
};
private static final MetricAggregation<GaugeData, GaugeResult> GAUGE =
new MetricAggregation<GaugeData, GaugeResult>() {
@Override
public GaugeData zero() {
return GaugeData.empty();
}
@Override
public GaugeData combine(Iterable<GaugeData> updates) {
GaugeData result = GaugeData.empty();
for (GaugeData update : updates) {
result = result.combine(update);
}
return result;
}
@Override
public GaugeResult extract(GaugeData data) {
return data.extractResult();
}
};
/** The current values of counters in memory. */
private MetricsMap<MetricKey, DirectMetric<Long, Long>> counters =
new MetricsMap<>(unusedKey -> new DirectMetric<>(COUNTER));
private MetricsMap<MetricKey, DirectMetric<DistributionData, DistributionResult>> distributions =
new MetricsMap<>(unusedKey -> new DirectMetric<>(DISTRIBUTION));
private MetricsMap<MetricKey, DirectMetric<GaugeData, GaugeResult>> gauges =
new MetricsMap<>(unusedKey -> new DirectMetric<>(GAUGE));
@Override
public MetricQueryResults queryMetrics(@Nullable MetricsFilter filter) {
ImmutableList.Builder<MetricResult<Long>> counterResults = ImmutableList.builder();
for (Entry<MetricKey, DirectMetric<Long, Long>> counter : counters.entries()) {
maybeExtractResult(filter, counterResults, counter);
}
ImmutableList.Builder<MetricResult<DistributionResult>> distributionResults =
ImmutableList.builder();
for (Entry<MetricKey, DirectMetric<DistributionData, DistributionResult>> distribution :
distributions.entries()) {
maybeExtractResult(filter, distributionResults, distribution);
}
ImmutableList.Builder<MetricResult<GaugeResult>> gaugeResults = ImmutableList.builder();
for (Entry<MetricKey, DirectMetric<GaugeData, GaugeResult>> gauge : gauges.entries()) {
maybeExtractResult(filter, gaugeResults, gauge);
}
return MetricQueryResults.create(
counterResults.build(), distributionResults.build(), gaugeResults.build());
}
private <ResultT> void maybeExtractResult(
MetricsFilter filter,
ImmutableList.Builder<MetricResult<ResultT>> resultsBuilder,
Map.Entry<MetricKey, ? extends DirectMetric<?, ResultT>> entry) {
if (MetricFiltering.matches(filter, entry.getKey())) {
resultsBuilder.add(
MetricResult.create(
entry.getKey(),
entry.getValue().extractCommitted(),
entry.getValue().extractLatestAttempted()));
}
}
/** Apply metric updates that represent physical counter deltas to the current metric values. */
public void updatePhysical(CommittedBundle<?> bundle, MetricUpdates updates) {
for (MetricUpdate<Long> counter : updates.counterUpdates()) {
counters.get(counter.getKey()).updatePhysical(bundle, counter.getUpdate());
}
for (MetricUpdate<DistributionData> distribution : updates.distributionUpdates()) {
distributions.get(distribution.getKey()).updatePhysical(bundle, distribution.getUpdate());
}
for (MetricUpdate<GaugeData> gauge : updates.gaugeUpdates()) {
gauges.get(gauge.getKey()).updatePhysical(bundle, gauge.getUpdate());
}
}
public void commitPhysical(CommittedBundle<?> bundle, MetricUpdates updates) {
for (MetricUpdate<Long> counter : updates.counterUpdates()) {
counters.get(counter.getKey()).commitPhysical(bundle, counter.getUpdate());
}
for (MetricUpdate<DistributionData> distribution : updates.distributionUpdates()) {
distributions.get(distribution.getKey()).commitPhysical(bundle, distribution.getUpdate());
}
for (MetricUpdate<GaugeData> gauge : updates.gaugeUpdates()) {
gauges.get(gauge.getKey()).commitPhysical(bundle, gauge.getUpdate());
}
}
/** Apply metric updates that represent new logical values from a bundle being committed. */
public void commitLogical(CommittedBundle<?> bundle, MetricUpdates updates) {
for (MetricUpdate<Long> counter : updates.counterUpdates()) {
counters.get(counter.getKey()).commitLogical(bundle, counter.getUpdate());
}
for (MetricUpdate<DistributionData> distribution : updates.distributionUpdates()) {
distributions.get(distribution.getKey()).commitLogical(bundle, distribution.getUpdate());
}
for (MetricUpdate<GaugeData> gauge : updates.gaugeUpdates()) {
gauges.get(gauge.getKey()).commitLogical(bundle, gauge.getUpdate());
}
}
}