blob: dfa3133ce617502a9d7cbf5c221d0eed3f0eb510 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow.worker;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
import com.google.api.services.dataflow.model.CounterStructuredName;
import com.google.api.services.dataflow.model.CounterUpdate;
import com.google.api.services.dataflow.model.MetricShortId;
import com.google.api.services.dataflow.model.ReportWorkItemStatusRequest;
import com.google.api.services.dataflow.model.ReportWorkItemStatusResponse;
import com.google.api.services.dataflow.model.WorkItemServiceState;
import com.google.api.services.dataflow.model.WorkItemStatus;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Nullable;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Mapping from counter names to short IDs.
*
* <p>This cache is non-evicting, and lives for the lifetime of the worker harness. This behavior is
* fine because the total number of unique counters is expected to be small and limited by the
* backend.
*/
public class CounterShortIdCache {
private static final Logger LOG = LoggerFactory.getLogger(CounterShortIdCache.class);
private Cache cache = new Cache();
/**
* Add any new short ids received to the table. The outgoing request will have the full counter
* updates, and the incoming responses have the associated short ids. By matching up short ids
* with the counters in order we can build a mapping of name -> short_id for future use.
*/
public void storeNewShortIds(
final ReportWorkItemStatusRequest request, final ReportWorkItemStatusResponse reply) {
checkArgument(
request.getWorkItemStatuses() != null
&& reply.getWorkItemServiceStates() != null
&& request.getWorkItemStatuses().size() == reply.getWorkItemServiceStates().size(),
"RequestWorkItemStatus request and response are unbalanced, status: %s, states: %s",
request.getWorkItemStatuses(),
reply.getWorkItemServiceStates());
for (int i = 0; i < request.getWorkItemStatuses().size(); i++) {
WorkItemServiceState state = reply.getWorkItemServiceStates().get(i);
WorkItemStatus status = request.getWorkItemStatuses().get(i);
if (state.getMetricShortId() == null) {
continue;
}
checkArgument(
status.getCounterUpdates() != null,
"Response has shortids but no corresponding CounterUpdate");
for (MetricShortId shortIdMsg : state.getMetricShortId()) {
int metricIndex = MoreObjects.firstNonNull(shortIdMsg.getMetricIndex(), 0);
checkArgument(
metricIndex < status.getCounterUpdates().size(),
"Received aggregate index outside range of sent update %s >= %s",
shortIdMsg.getMetricIndex(),
status.getCounterUpdates().size());
CounterUpdate update = status.getCounterUpdates().get(metricIndex);
cache.insert(update, checkNotNull(shortIdMsg.getShortId(), "Shortid should be non-null"));
}
}
}
/**
* If any aggregates match a short id in the table, replace their name and type with the short id.
*/
public void shortenIdsIfAvailable(@Nullable java.util.List<CounterUpdate> counters) {
if (counters == null) {
return;
}
for (CounterUpdate update : counters) {
cache.shortenIdsIfAvailable(update);
}
}
private static class Cache<K> {
private final ConcurrentHashMap<String, Long> unstructuredShortIdMap =
new ConcurrentHashMap<>();
private final ConcurrentHashMap<CounterStructuredName, Long> structuredShortIdMap =
new ConcurrentHashMap<>();
public void shortenIdsIfAvailable(CounterUpdate update) {
Long shortId;
if (update.getNameAndKind() != null) {
String name =
checkNotNull(
update.getNameAndKind().getName(), "Counter update name should be non-null");
shortId = unstructuredShortIdMap.get(name);
} else if (update.getStructuredNameAndMetadata() != null) {
CounterStructuredName name =
checkNotNull(
update.getStructuredNameAndMetadata().getName(),
"Counter update structured-name should be non-null");
shortId = structuredShortIdMap.get(name);
} else {
throw new IllegalArgumentException(
"CounterUpdate should have nameAndKind or structuredNameAndmetadata");
}
if (shortId != null) {
update.setNameAndKind(null);
update.setStructuredNameAndMetadata(null);
update.setShortId(shortId);
}
}
public void insert(CounterUpdate update, long shortId) {
Long oldValue;
if (update.getNameAndKind() != null) {
String name =
checkNotNull(
update.getNameAndKind().getName(), "Counter update name should be non-null");
oldValue = unstructuredShortIdMap.putIfAbsent(name, shortId);
} else if (update.getStructuredNameAndMetadata() != null) {
CounterStructuredName name =
checkNotNull(
update.getStructuredNameAndMetadata().getName(),
"Counter update structured-name should be non-null");
oldValue = structuredShortIdMap.putIfAbsent(name, shortId);
} else {
throw new IllegalArgumentException(
"CounterUpdate should have nameAndKind or structuredNameAndmetadata");
}
checkArgument(
oldValue == null || oldValue.equals(shortId),
"Received counter %s with incompatible short IDs. %s first ID and then %s",
update,
oldValue,
shortId);
}
}
}