blob: c8476a74b91f3b52233809bee6fd086a0e2fc573 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.statefun.flink.core.protorouter;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
import java.util.List;
import java.util.Objects;
import org.apache.flink.statefun.flink.common.protopath.ProtobufPath;
import org.apache.flink.statefun.sdk.Address;
import org.apache.flink.statefun.sdk.FunctionType;
final class AddressResolver {
/**
* Creates an address resolver that is able to produce {@link Address} from an address template
* and a Protocol Buffers message.
*
* <p>An address template is an address of the form function-namespace/function-type/function-id.
* where each component can contain multiple {@link ProtobufPath} expressions.
*
* @param messageDescriptor the protocol buffers message descriptor that would be used to extract
* the target address from.
* @param addressTemplate the template that would be used to extract the target address by.
* @return an instance of an address evaluator, that is able to produce an {@link Address} given a
* Protocol Buffers message.
*/
static AddressResolver fromAddressTemplate(
Descriptors.Descriptor messageDescriptor, String addressTemplate) {
Objects.requireNonNull(messageDescriptor);
Objects.requireNonNull(addressTemplate);
int lastSlash = addressTemplate.lastIndexOf("/");
if (lastSlash <= 0) {
throw new IllegalArgumentException(
"The address template is not of the form <function type>/<id>");
}
String functionTypeTemplate = addressTemplate.substring(0, lastSlash);
String idTemplate = addressTemplate.substring(lastSlash + 1);
if (idTemplate.isEmpty()) {
throw new IllegalArgumentException(
"The address template is not of the form <function type>/<id>");
}
lastSlash = functionTypeTemplate.lastIndexOf("/");
if (lastSlash <= 0) {
throw new IllegalArgumentException(
"The function type template is not of the form <function namespace>/<function name>");
}
String functionNamespaceTemplate = functionTypeTemplate.substring(0, lastSlash);
String functionNameIdTemplate = functionTypeTemplate.substring(lastSlash + 1);
if (functionNameIdTemplate.isEmpty()) {
throw new IllegalArgumentException(
"The address template is not of the form <function type>/<id>");
}
return new AddressResolver(
evaluator(messageDescriptor, functionNamespaceTemplate),
evaluator(messageDescriptor, functionNameIdTemplate),
evaluator(messageDescriptor, idTemplate));
}
static TemplateEvaluator evaluator(Descriptors.Descriptor descriptor, String template) {
List<TemplateParser.TextFragment> fragments = TemplateParser.parseTemplateString(template);
return new TemplateEvaluator(descriptor, fragments);
}
private final TemplateEvaluator functionNamespace;
private final TemplateEvaluator functionName;
private final TemplateEvaluator functionId;
private AddressResolver(
TemplateEvaluator functionNamespace,
TemplateEvaluator functionName,
TemplateEvaluator functionId) {
this.functionNamespace = Objects.requireNonNull(functionNamespace);
this.functionName = Objects.requireNonNull(functionName);
this.functionId = Objects.requireNonNull(functionId);
}
Address evaluate(Message message) {
FunctionType functionType =
new FunctionType(functionNamespace.evaluate(message), functionName.evaluate(message));
return new Address(functionType, functionId.evaluate(message));
}
}