blob: 2190a8fa14bc66896a036210b192ee89705404a1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.fnexecution.wire;
import java.util.Map.Entry;
import java.util.function.Predicate;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.RunnerApi.Coder;
import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
import org.apache.beam.runners.core.construction.ModelCoders;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.LengthPrefixCoder;
/** Utilities for replacing or wrapping unknown coders with {@link LengthPrefixCoder}. */
public class LengthPrefixUnknownCoders {
/**
* Recursively traverses the coder tree and wraps the first unknown coder in every branch with a
* {@link LengthPrefixCoder} unless an ancestor coder is itself a {@link LengthPrefixCoder}. If
* {@code replaceWithByteArrayCoder} is set, then replaces that unknown coder with a {@link
* ByteArrayCoder}. Registers the new coders in the given {@link Components.Builder}. Note that no
* ids that are generated will collide with the ids supplied within the {@link
* Components#getCodersMap() coder map} key space.
*
* @param coderId The root coder contained within {@code coders} to start the recursive descent
* from.
* @param components Components builder that initially contains the root coder and all component
* coders, and will be modified to contain all the necessary additional coders (including the
* resulting coder).
* @param replaceWithByteArrayCoder whether to replace an unknown coder with a {@link
* ByteArrayCoder}.
* @return Id of the original coder (if unchanged) or the newly generated length-prefixed coder.
*/
public static String addLengthPrefixedCoder(
String coderId, RunnerApi.Components.Builder components, boolean replaceWithByteArrayCoder) {
String lengthPrefixedByteArrayCoderId = addLengthPrefixByteArrayCoder(components);
String urn = components.getCodersOrThrow(coderId).getSpec().getSpec().getUrn();
// We handle three cases:
// 1) the requested coder is already a length prefix coder. In this case we just honor the
// request to replace the coder with a byte array coder.
// 2) the requested coder is a known coder but not a length prefix coder. In this case we
// rebuild the coder by recursively length prefixing any unknown component coders.
// 3) the requested coder is an unknown coder. In this case we either wrap the requested coder
// with a length prefix coder or replace it with a length prefix byte array coder.
if (ModelCoders.LENGTH_PREFIX_CODER_URN.equals(urn)) {
return replaceWithByteArrayCoder ? lengthPrefixedByteArrayCoderId : coderId;
} else if (ModelCoders.urns().contains(urn)) {
return addForModelCoder(coderId, components, replaceWithByteArrayCoder);
} else {
return replaceWithByteArrayCoder
? lengthPrefixedByteArrayCoderId
: addWrappedWithLengthPrefixCoder(coderId, components);
}
}
private static String addForModelCoder(
String coderId, RunnerApi.Components.Builder components, boolean replaceWithByteArrayCoder) {
Coder coder = components.getCodersOrThrow(coderId);
RunnerApi.Coder.Builder builder = coder.toBuilder().clearComponentCoderIds();
for (String componentCoderId : coder.getComponentCoderIdsList()) {
builder.addComponentCoderIds(
addLengthPrefixedCoder(componentCoderId, components, replaceWithByteArrayCoder));
}
return addCoder(builder.build(), components, coderId + "-length_prefix");
}
// If we are handling an unknown URN then we need to wrap it with a length prefix coder.
// If requested we also replace the unknown coder with a byte array coder.
private static String addWrappedWithLengthPrefixCoder(
String coderId, RunnerApi.Components.Builder components) {
Coder.Builder lengthPrefixed = Coder.newBuilder().addComponentCoderIds(coderId);
lengthPrefixed
.getSpecBuilder()
.getSpecBuilder()
.setUrn(ModelCoders.LENGTH_PREFIX_CODER_URN)
.build();
return addCoder(lengthPrefixed.build(), components, coderId + "-length_prefix");
}
/** Adds the (singleton) length-prefixed byte array coder. */
private static String addLengthPrefixByteArrayCoder(RunnerApi.Components.Builder components) {
// Add byte array coder
Coder.Builder byteArrayCoder = Coder.newBuilder();
byteArrayCoder.getSpecBuilder().getSpecBuilder().setUrn(ModelCoders.BYTES_CODER_URN);
String byteArrayCoderId = addCoder(byteArrayCoder.build(), components, "byte_array");
// Wrap it into length-prefixed coder
Coder.Builder lengthPrefixByteArrayCoder = Coder.newBuilder();
lengthPrefixByteArrayCoder
.addComponentCoderIds(byteArrayCoderId)
.getSpecBuilder()
.getSpecBuilder()
.setUrn(ModelCoders.LENGTH_PREFIX_CODER_URN);
return addCoder(lengthPrefixByteArrayCoder.build(), components, "length_prefix_byte_array");
}
private static String addCoder(
RunnerApi.Coder coder, RunnerApi.Components.Builder components, String uniqueIdPrefix) {
for (Entry<String, Coder> entry : components.getCodersMap().entrySet()) {
if (entry.getValue().equals(coder)) {
return entry.getKey();
}
}
String id = generateUniqueId(uniqueIdPrefix, components::containsCoders);
components.putCoders(id, coder);
return id;
}
/** Generates a unique id given a prefix and the set of existing ids. */
static String generateUniqueId(String prefix, Predicate<String> isExistingId) {
int i = 0;
while (isExistingId.test(prefix + i)) {
i += 1;
}
return prefix + i;
}
}