blob: 252d902d7379db9541115509291c95d2a1123066 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.fn.harness;
import java.util.EnumMap;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import org.apache.beam.fn.harness.control.AddHarnessIdInterceptor;
import org.apache.beam.fn.harness.control.BeamFnControlClient;
import org.apache.beam.fn.harness.control.ProcessBundleHandler;
import org.apache.beam.fn.harness.control.RegisterHandler;
import org.apache.beam.fn.harness.data.BeamFnDataGrpcClient;
import org.apache.beam.fn.harness.logging.BeamFnLoggingClient;
import org.apache.beam.fn.harness.state.BeamFnStateGrpcClientCache;
import org.apache.beam.fn.harness.stream.HarnessStreamObserverFactories;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionRequest;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionResponse.Builder;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
import org.apache.beam.sdk.fn.IdGenerator;
import org.apache.beam.sdk.fn.IdGenerators;
import org.apache.beam.sdk.fn.JvmInitializers;
import org.apache.beam.sdk.fn.channel.ManagedChannelFactory;
import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
import org.apache.beam.sdk.function.ThrowingFunction;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.TextFormat;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Main entry point into the Beam SDK Fn Harness for Java.
*
* <p>This entry point expects the following environment variables:
*
* <ul>
* <li>HARNESS_ID: A String representing the ID of this FnHarness. This will be added to the
* headers of calls to the Beam Control Service
* <li>LOGGING_API_SERVICE_DESCRIPTOR: A {@link
* org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor} encoded as text
* representing the endpoint that is to be connected to for the Beam Fn Logging service.
* <li>CONTROL_API_SERVICE_DESCRIPTOR: A {@link Endpoints.ApiServiceDescriptor} encoded as text
* representing the endpoint that is to be connected to for the Beam Fn Control service.
* <li>PIPELINE_OPTIONS: A serialized form of {@link PipelineOptions}. See {@link PipelineOptions}
* for further details.
* </ul>
*/
public class FnHarness {
private static final String HARNESS_ID = "HARNESS_ID";
private static final String CONTROL_API_SERVICE_DESCRIPTOR = "CONTROL_API_SERVICE_DESCRIPTOR";
private static final String LOGGING_API_SERVICE_DESCRIPTOR = "LOGGING_API_SERVICE_DESCRIPTOR";
private static final String PIPELINE_OPTIONS = "PIPELINE_OPTIONS";
private static final Logger LOG = LoggerFactory.getLogger(FnHarness.class);
private static Endpoints.ApiServiceDescriptor getApiServiceDescriptor(String descriptor)
throws TextFormat.ParseException {
Endpoints.ApiServiceDescriptor.Builder apiServiceDescriptorBuilder =
Endpoints.ApiServiceDescriptor.newBuilder();
TextFormat.merge(descriptor, apiServiceDescriptorBuilder);
return apiServiceDescriptorBuilder.build();
}
public static void main(String[] args) throws Exception {
main(System::getenv);
}
@VisibleForTesting
public static void main(Function<String, String> environmentVarGetter) throws Exception {
JvmInitializers.runOnStartup();
System.out.format("SDK Fn Harness started%n");
System.out.format("Harness ID %s%n", environmentVarGetter.apply(HARNESS_ID));
System.out.format(
"Logging location %s%n", environmentVarGetter.apply(LOGGING_API_SERVICE_DESCRIPTOR));
System.out.format(
"Control location %s%n", environmentVarGetter.apply(CONTROL_API_SERVICE_DESCRIPTOR));
System.out.format("Pipeline options %s%n", environmentVarGetter.apply(PIPELINE_OPTIONS));
String id = environmentVarGetter.apply(HARNESS_ID);
PipelineOptions options =
PipelineOptionsTranslation.fromJson(environmentVarGetter.apply(PIPELINE_OPTIONS));
Endpoints.ApiServiceDescriptor loggingApiServiceDescriptor =
getApiServiceDescriptor(environmentVarGetter.apply(LOGGING_API_SERVICE_DESCRIPTOR));
Endpoints.ApiServiceDescriptor controlApiServiceDescriptor =
getApiServiceDescriptor(environmentVarGetter.apply(CONTROL_API_SERVICE_DESCRIPTOR));
main(id, options, loggingApiServiceDescriptor, controlApiServiceDescriptor);
}
/**
* Run a FnHarness with the given id and options that attaches to the specified logging and
* control API service descriptors.
*
* @param id Harness ID
* @param options The options for this pipeline
* @param loggingApiServiceDescriptor
* @param controlApiServiceDescriptor
* @throws Exception
*/
public static void main(
String id,
PipelineOptions options,
Endpoints.ApiServiceDescriptor loggingApiServiceDescriptor,
Endpoints.ApiServiceDescriptor controlApiServiceDescriptor)
throws Exception {
ManagedChannelFactory channelFactory;
List<String> experiments = options.as(ExperimentalOptions.class).getExperiments();
if (experiments != null && experiments.contains("beam_fn_api_epoll")) {
channelFactory = ManagedChannelFactory.createEpoll();
} else {
channelFactory = ManagedChannelFactory.createDefault();
}
OutboundObserverFactory outboundObserverFactory =
HarnessStreamObserverFactories.fromOptions(options);
channelFactory =
channelFactory.withInterceptors(ImmutableList.of(AddHarnessIdInterceptor.create(id)));
main(
id,
options,
loggingApiServiceDescriptor,
controlApiServiceDescriptor,
channelFactory,
outboundObserverFactory);
}
/**
* Run a FnHarness with the given id and options that attaches to the specified logging and
* control API service descriptors using the given channel factory and outbound observer factory.
*
* @param id Harness ID
* @param options The options for this pipeline
* @param loggingApiServiceDescriptor
* @param controlApiServiceDescriptor
* @param channelFactory
* @param outboundObserverFactory
* @throws Exception
*/
public static void main(
String id,
PipelineOptions options,
Endpoints.ApiServiceDescriptor loggingApiServiceDescriptor,
Endpoints.ApiServiceDescriptor controlApiServiceDescriptor,
ManagedChannelFactory channelFactory,
OutboundObserverFactory outboundObserverFactory)
throws Exception {
IdGenerator idGenerator = IdGenerators.decrementingLongs();
ExecutorService executorService = options.as(GcsOptions.class).getExecutorService();
// The logging client variable is not used per se, but during its lifetime (until close()) it
// intercepts logging and sends it to the logging service.
try (BeamFnLoggingClient logging =
new BeamFnLoggingClient(
options, loggingApiServiceDescriptor, channelFactory::forDescriptor)) {
LOG.info("Fn Harness started");
// Register standard file systems.
FileSystems.setDefaultPipelineOptions(options);
EnumMap<
BeamFnApi.InstructionRequest.RequestCase,
ThrowingFunction<InstructionRequest, Builder>>
handlers = new EnumMap<>(BeamFnApi.InstructionRequest.RequestCase.class);
RegisterHandler fnApiRegistry = new RegisterHandler();
BeamFnDataGrpcClient beamFnDataMultiplexer =
new BeamFnDataGrpcClient(options, channelFactory::forDescriptor, outboundObserverFactory);
BeamFnStateGrpcClientCache beamFnStateGrpcClientCache =
new BeamFnStateGrpcClientCache(
idGenerator, channelFactory::forDescriptor, outboundObserverFactory);
ProcessBundleHandler processBundleHandler =
new ProcessBundleHandler(
options, fnApiRegistry::getById, beamFnDataMultiplexer, beamFnStateGrpcClientCache);
handlers.put(BeamFnApi.InstructionRequest.RequestCase.REGISTER, fnApiRegistry::register);
// TODO(BEAM-6597): Collect MonitoringInfos in ProcessBundleProgressResponses.
handlers.put(
BeamFnApi.InstructionRequest.RequestCase.PROCESS_BUNDLE,
processBundleHandler::processBundle);
BeamFnControlClient control =
new BeamFnControlClient(
id, controlApiServiceDescriptor, channelFactory, outboundObserverFactory, handlers);
JvmInitializers.runBeforeProcessing(options);
LOG.info("Entering instruction processing loop");
control.processInstructionRequests(executorService);
} finally {
System.out.println("Shutting SDK harness down.");
executorService.shutdown();
}
}
}