| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.runners.direct.portable; |
| |
| import static java.util.Arrays.asList; |
| |
| import java.util.ArrayList; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentMap; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.atomic.AtomicReference; |
| import javax.annotation.Nullable; |
| import javax.annotation.concurrent.GuardedBy; |
| import org.apache.beam.runners.core.metrics.DistributionData; |
| import org.apache.beam.runners.core.metrics.GaugeData; |
| import org.apache.beam.runners.core.metrics.MetricUpdates; |
| import org.apache.beam.runners.core.metrics.MetricUpdates.MetricUpdate; |
| import org.apache.beam.runners.core.metrics.MetricsMap; |
| import org.apache.beam.sdk.metrics.DistributionResult; |
| import org.apache.beam.sdk.metrics.GaugeResult; |
| import org.apache.beam.sdk.metrics.MetricFiltering; |
| import org.apache.beam.sdk.metrics.MetricKey; |
| import org.apache.beam.sdk.metrics.MetricQueryResults; |
| import org.apache.beam.sdk.metrics.MetricResult; |
| import org.apache.beam.sdk.metrics.MetricResults; |
| import org.apache.beam.sdk.metrics.MetricsFilter; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.MoreExecutors; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.ThreadFactoryBuilder; |
| |
| /** Implementation of {@link MetricResults} for the Direct Runner. */ |
| class DirectMetrics extends MetricResults { |
| |
| // TODO: (BEAM-723) Create a shared ExecutorService for maintenance tasks in the DirectRunner. |
| private static final ExecutorService COUNTER_COMMITTER = |
| Executors.newCachedThreadPool( |
| new ThreadFactoryBuilder() |
| .setThreadFactory(MoreExecutors.platformThreadFactory()) |
| .setDaemon(true) |
| .setNameFormat("direct-metrics-counter-committer") |
| .build()); |
| |
| private interface MetricAggregation<UpdateT, ResultT> { |
| UpdateT zero(); |
| |
| UpdateT combine(Iterable<UpdateT> updates); |
| |
| ResultT extract(UpdateT data); |
| } |
| |
| /** |
| * Implementation of a metric in the direct runner. |
| * |
| * @param <UpdateT> The type of raw data received and aggregated across updates. |
| * @param <ResultT> The type of result extracted from the data. |
| */ |
| private static class DirectMetric<UpdateT, ResultT> { |
| private final MetricAggregation<UpdateT, ResultT> aggregation; |
| |
| private final AtomicReference<UpdateT> finishedCommitted; |
| |
| private final Object attemptedLock = new Object(); |
| |
| @GuardedBy("attemptedLock") |
| private volatile UpdateT finishedAttempted; |
| |
| private final ConcurrentMap<CommittedBundle<?>, UpdateT> inflightAttempted = |
| new ConcurrentHashMap<>(); |
| |
| public DirectMetric(MetricAggregation<UpdateT, ResultT> aggregation) { |
| this.aggregation = aggregation; |
| finishedCommitted = new AtomicReference<>(aggregation.zero()); |
| finishedAttempted = aggregation.zero(); |
| } |
| |
| /** |
| * Add the given {@code tentativeCumulative} update to the physical aggregate. |
| * |
| * @param bundle The bundle receiving an update. |
| * @param tentativeCumulative The new cumulative value for the given bundle. |
| */ |
| public void updatePhysical(CommittedBundle<?> bundle, UpdateT tentativeCumulative) { |
| // Add (or update) the cumulatiev value for the given bundle. |
| inflightAttempted.put(bundle, tentativeCumulative); |
| } |
| |
| /** |
| * Commit a physical value for the given {@code bundle}. |
| * |
| * @param bundle The bundle being committed. |
| * @param finalCumulative The final cumulative value for the given bundle. |
| */ |
| @SuppressWarnings("FutureReturnValueIgnored") // direct runner metrics are best-effort; |
| // we choose not to block on async commit |
| public void commitPhysical(final CommittedBundle<?> bundle, final UpdateT finalCumulative) { |
| // To prevent a query from blocking the commit, we perform the commit in two steps. |
| // 1. We perform a non-blocking write to the uncommitted table to make the new value |
| // available immediately. |
| // 2. We submit a runnable that will commit the update and remove the tentative value in |
| // a synchronized block. |
| inflightAttempted.put(bundle, finalCumulative); |
| COUNTER_COMMITTER.submit( |
| () -> { |
| synchronized (attemptedLock) { |
| finishedAttempted = aggregation.combine(asList(finishedAttempted, finalCumulative)); |
| inflightAttempted.remove(bundle); |
| } |
| }); |
| } |
| |
| /** Extract the latest values from all attempted and in-progress bundles. */ |
| public ResultT extractLatestAttempted() { |
| ArrayList<UpdateT> updates = new ArrayList<>(inflightAttempted.size() + 1); |
| // Within this block we know that will be consistent. Specifically, the only change that can |
| // happen concurrently is the addition of new (larger) values to inflightAttempted. |
| synchronized (attemptedLock) { |
| updates.add(finishedAttempted); |
| updates.addAll(inflightAttempted.values()); |
| } |
| return aggregation.extract(aggregation.combine(updates)); |
| } |
| |
| /** |
| * Commit a logical value for the given {@code bundle}. |
| * |
| * @param bundle The bundle being committed. |
| * @param finalCumulative The final cumulative value for the given bundle. |
| */ |
| public void commitLogical(final CommittedBundle<?> bundle, final UpdateT finalCumulative) { |
| UpdateT current; |
| do { |
| current = finishedCommitted.get(); |
| } while (!finishedCommitted.compareAndSet( |
| current, aggregation.combine(asList(current, finalCumulative)))); |
| } |
| |
| /** Extract the value from all successfully committed bundles. */ |
| public ResultT extractCommitted() { |
| return aggregation.extract(finishedCommitted.get()); |
| } |
| } |
| |
| private static final MetricAggregation<Long, Long> COUNTER = |
| new MetricAggregation<Long, Long>() { |
| @Override |
| public Long zero() { |
| return 0L; |
| } |
| |
| @Override |
| public Long combine(Iterable<Long> updates) { |
| long value = 0; |
| for (long update : updates) { |
| value += update; |
| } |
| return value; |
| } |
| |
| @Override |
| public Long extract(Long data) { |
| return data; |
| } |
| }; |
| |
| private static final MetricAggregation<DistributionData, DistributionResult> DISTRIBUTION = |
| new MetricAggregation<DistributionData, DistributionResult>() { |
| @Override |
| public DistributionData zero() { |
| return DistributionData.EMPTY; |
| } |
| |
| @Override |
| public DistributionData combine(Iterable<DistributionData> updates) { |
| DistributionData result = DistributionData.EMPTY; |
| for (DistributionData update : updates) { |
| result = result.combine(update); |
| } |
| return result; |
| } |
| |
| @Override |
| public DistributionResult extract(DistributionData data) { |
| return data.extractResult(); |
| } |
| }; |
| |
| private static final MetricAggregation<GaugeData, GaugeResult> GAUGE = |
| new MetricAggregation<GaugeData, GaugeResult>() { |
| @Override |
| public GaugeData zero() { |
| return GaugeData.empty(); |
| } |
| |
| @Override |
| public GaugeData combine(Iterable<GaugeData> updates) { |
| GaugeData result = GaugeData.empty(); |
| for (GaugeData update : updates) { |
| result = result.combine(update); |
| } |
| return result; |
| } |
| |
| @Override |
| public GaugeResult extract(GaugeData data) { |
| return data.extractResult(); |
| } |
| }; |
| |
| /** The current values of counters in memory. */ |
| private MetricsMap<MetricKey, DirectMetric<Long, Long>> counters = |
| new MetricsMap<>(unusedKey -> new DirectMetric<>(COUNTER)); |
| |
| private MetricsMap<MetricKey, DirectMetric<DistributionData, DistributionResult>> distributions = |
| new MetricsMap<>(unusedKey -> new DirectMetric<>(DISTRIBUTION)); |
| private MetricsMap<MetricKey, DirectMetric<GaugeData, GaugeResult>> gauges = |
| new MetricsMap<>(unusedKey -> new DirectMetric<>(GAUGE)); |
| |
| @Override |
| public MetricQueryResults queryMetrics(@Nullable MetricsFilter filter) { |
| ImmutableList.Builder<MetricResult<Long>> counterResults = ImmutableList.builder(); |
| for (Entry<MetricKey, DirectMetric<Long, Long>> counter : counters.entries()) { |
| maybeExtractResult(filter, counterResults, counter); |
| } |
| ImmutableList.Builder<MetricResult<DistributionResult>> distributionResults = |
| ImmutableList.builder(); |
| for (Entry<MetricKey, DirectMetric<DistributionData, DistributionResult>> distribution : |
| distributions.entries()) { |
| maybeExtractResult(filter, distributionResults, distribution); |
| } |
| ImmutableList.Builder<MetricResult<GaugeResult>> gaugeResults = ImmutableList.builder(); |
| for (Entry<MetricKey, DirectMetric<GaugeData, GaugeResult>> gauge : gauges.entries()) { |
| maybeExtractResult(filter, gaugeResults, gauge); |
| } |
| |
| return MetricQueryResults.create( |
| counterResults.build(), distributionResults.build(), gaugeResults.build()); |
| } |
| |
| private <ResultT> void maybeExtractResult( |
| MetricsFilter filter, |
| ImmutableList.Builder<MetricResult<ResultT>> resultsBuilder, |
| Map.Entry<MetricKey, ? extends DirectMetric<?, ResultT>> entry) { |
| if (MetricFiltering.matches(filter, entry.getKey())) { |
| resultsBuilder.add( |
| MetricResult.create( |
| entry.getKey(), |
| entry.getValue().extractCommitted(), |
| entry.getValue().extractLatestAttempted())); |
| } |
| } |
| |
| /** Apply metric updates that represent physical counter deltas to the current metric values. */ |
| public void updatePhysical(CommittedBundle<?> bundle, MetricUpdates updates) { |
| for (MetricUpdate<Long> counter : updates.counterUpdates()) { |
| counters.get(counter.getKey()).updatePhysical(bundle, counter.getUpdate()); |
| } |
| for (MetricUpdate<DistributionData> distribution : updates.distributionUpdates()) { |
| distributions.get(distribution.getKey()).updatePhysical(bundle, distribution.getUpdate()); |
| } |
| for (MetricUpdate<GaugeData> gauge : updates.gaugeUpdates()) { |
| gauges.get(gauge.getKey()).updatePhysical(bundle, gauge.getUpdate()); |
| } |
| } |
| |
| public void commitPhysical(CommittedBundle<?> bundle, MetricUpdates updates) { |
| for (MetricUpdate<Long> counter : updates.counterUpdates()) { |
| counters.get(counter.getKey()).commitPhysical(bundle, counter.getUpdate()); |
| } |
| for (MetricUpdate<DistributionData> distribution : updates.distributionUpdates()) { |
| distributions.get(distribution.getKey()).commitPhysical(bundle, distribution.getUpdate()); |
| } |
| for (MetricUpdate<GaugeData> gauge : updates.gaugeUpdates()) { |
| gauges.get(gauge.getKey()).commitPhysical(bundle, gauge.getUpdate()); |
| } |
| } |
| |
| /** Apply metric updates that represent new logical values from a bundle being committed. */ |
| public void commitLogical(CommittedBundle<?> bundle, MetricUpdates updates) { |
| for (MetricUpdate<Long> counter : updates.counterUpdates()) { |
| counters.get(counter.getKey()).commitLogical(bundle, counter.getUpdate()); |
| } |
| for (MetricUpdate<DistributionData> distribution : updates.distributionUpdates()) { |
| distributions.get(distribution.getKey()).commitLogical(bundle, distribution.getUpdate()); |
| } |
| for (MetricUpdate<GaugeData> gauge : updates.gaugeUpdates()) { |
| gauges.get(gauge.getKey()).commitLogical(bundle, gauge.getUpdate()); |
| } |
| } |
| } |