blob: 65fbdf2af62043a6e50cf3bff7cdbf835b362bfb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.core.metrics;
import static org.apache.beam.model.pipeline.v1.MetricsApi.monitoringInfoSpec;
import java.time.Instant;
import java.util.HashMap;
import javax.annotation.Nullable;
import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfoSpec;
import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfoSpecs;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
/**
* Simplified building of MonitoringInfo fields, allows setting one field at a time with simpler
* method calls, without needing to dive into the details of the nested protos.
*
* <p>There is no need to set the type field, by setting the appropriate value field: (i.e.
* setInt64Value), the typeUrn field is automatically set.
*
* <p>Additionally, if validateAndDropInvalid is set to true in the ctor, then MonitoringInfos will
* be returned as null when build() is called if any fields are not properly set. This is based on
* comparing the fields which are set to the MonitoringInfoSpec in beam_fn_api.proto.
*
* <p>Example Usage (ElementCount counter):
*
* <p>SimpleMonitoringInfoBuilder builder = new SimpleMonitoringInfoBuilder();
* builder.setUrn(SimpleMonitoringInfoBuilder.ELEMENT_COUNT_URN); builder.setInt64Value(1);
* builder.setPTransformLabel("myTransform");
* builder.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, "myTransform"); MonitoringInfo mi =
* builder.build();
*/
public class SimpleMonitoringInfoBuilder {
private final boolean validateAndDropInvalid;
private static final HashMap<String, MonitoringInfoSpec> specs =
new HashMap<String, MonitoringInfoSpec>();
private MonitoringInfo.Builder builder;
private SpecMonitoringInfoValidator validator = new SpecMonitoringInfoValidator();
static {
for (MonitoringInfoSpecs.Enum val : MonitoringInfoSpecs.Enum.values()) {
// The enum iterator inserts an UNRECOGNIZED = -1 value which isn't explicitly added in
// the proto files.
if (!val.name().equals("UNRECOGNIZED")) {
MonitoringInfoSpec spec =
val.getValueDescriptor().getOptions().getExtension(monitoringInfoSpec);
SimpleMonitoringInfoBuilder.specs.put(spec.getUrn(), spec);
}
}
}
public SimpleMonitoringInfoBuilder() {
this(true);
}
public SimpleMonitoringInfoBuilder(boolean validateAndDropInvalid) {
this.builder = MonitoringInfo.newBuilder();
this.validateAndDropInvalid = validateAndDropInvalid;
}
/**
* Sets the urn of the MonitoringInfo.
*
* @param urn The urn of the MonitoringInfo
*/
public SimpleMonitoringInfoBuilder setUrn(String urn) {
this.builder.setUrn(urn);
return this;
}
/** Sets the timestamp of the MonitoringInfo to the current time. */
public SimpleMonitoringInfoBuilder setTimestampToNow() {
Instant time = Instant.now();
this.builder.getTimestampBuilder().setSeconds(time.getEpochSecond()).setNanos(time.getNano());
return this;
}
/** Sets the int64Value of the CounterData in the MonitoringInfo, and the appropriate type URN. */
public SimpleMonitoringInfoBuilder setInt64Value(long value) {
this.builder.getMetricBuilder().getCounterDataBuilder().setInt64Value(value);
this.setInt64TypeUrn();
return this;
}
/**
* Sets the IntDistributionData of the DistributionData in the MonitoringInfo, and the appropriate
* type URN.
*/
public SimpleMonitoringInfoBuilder setInt64DistributionValue(DistributionData data) {
this.builder
.getMetricBuilder()
.getDistributionDataBuilder()
.getIntDistributionDataBuilder()
.setCount(data.count())
.setSum(data.sum())
.setMin(data.min())
.setMax(data.max());
this.setInt64DistributionTypeUrn();
return this;
}
/** Sets the the appropriate type URN for int64 distribution tuples. */
public SimpleMonitoringInfoBuilder setInt64DistributionTypeUrn() {
this.builder.setType(MonitoringInfoConstants.TypeUrns.DISTRIBUTION_INT64);
return this;
}
/** Sets the the appropriate type URN for sum int64 counters. */
public SimpleMonitoringInfoBuilder setInt64TypeUrn() {
this.builder.setType(MonitoringInfoConstants.TypeUrns.SUM_INT64);
return this;
}
/** Sets the MonitoringInfo label to the given name and value. */
public SimpleMonitoringInfoBuilder setLabel(String labelName, String labelValue) {
this.builder.putLabels(labelName, labelValue);
return this;
}
public void clear() {
this.builder = MonitoringInfo.newBuilder();
}
/** Clear the builder and merge from the provided monitoringInfo. */
public void merge(MonitoringInfo monitoringInfo) {
this.builder.mergeFrom(monitoringInfo);
}
/**
* @return A copy of the MonitoringInfo with the timestamp cleared, to allow comparing two
* MonitoringInfos.
*/
@VisibleForTesting
public static MonitoringInfo copyAndClearTimestamp(MonitoringInfo input) {
MonitoringInfo.Builder builder = MonitoringInfo.newBuilder();
builder.mergeFrom(input);
builder.clearTimestamp();
return builder.build();
}
/**
* Builds the provided MonitoringInfo. Returns null if validateAndDropInvalid set and fields do
* not match respecting MonitoringInfoSpec based on urn.
*/
@Nullable
public MonitoringInfo build() {
final MonitoringInfo result = this.builder.build();
if (validateAndDropInvalid && this.validator.validate(result).isPresent()) {
return null;
}
return result;
}
}