blob: bac5fd7c5ba0a7995919022f66072c2334001964 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow.worker;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.security.Security;
import java.util.logging.Handler;
import java.util.logging.LogManager;
import javax.annotation.Nullable;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
import org.apache.beam.runners.dataflow.worker.ExperimentContext.Experiment;
import org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingInitializer;
import org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingMDC;
import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.TextFormat;
import org.conscrypt.OpenSSLProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** A helper class for initialization of the Dataflow worker harness. */
public final class DataflowWorkerHarnessHelper {
private static final Logger LOG = LoggerFactory.getLogger(DataflowWorkerHarnessHelper.class);
private static final String CONTROL_API_SERVICE_DESCRIPTOR = "CONTROL_API_SERVICE_DESCRIPTOR";
private static final String LOGGING_API_SERVICE_DESCRIPTOR = "LOGGING_API_SERVICE_DESCRIPTOR";
private static final String ROOT_LOGGER_NAME = "";
private static final String PIPELINE_PATH = "PIPELINE_PATH";
public static DataflowWorkerHarnessOptions initializeGlobalStateAndPipelineOptions(
Class<?> workerHarnessClass) throws Exception {
/* Extract pipeline options. */
DataflowWorkerHarnessOptions pipelineOptions =
WorkerPipelineOptionsFactory.createFromSystemProperties();
pipelineOptions.setAppName(workerHarnessClass.getSimpleName());
/* Configure logging with job-specific properties. */
DataflowWorkerLoggingMDC.setJobId(pipelineOptions.getJobId());
DataflowWorkerLoggingMDC.setWorkerId(pipelineOptions.getWorkerId());
ExperimentContext ec = ExperimentContext.parseFrom(pipelineOptions);
String experimentName = Experiment.EnableConscryptSecurityProvider.getName();
if (ec.isEnabled(Experiment.EnableConscryptSecurityProvider)) {
/* Enable fast SSL provider. */
LOG.info(
"Dataflow runner is using conscrypt SSL. To disable this feature, "
+ "remove the pipeline option --experiments={}",
experimentName);
Security.insertProviderAt(new OpenSSLProvider(), 1);
} else {
LOG.info(
"Not using conscrypt SSL. Note this is the default Java behavior, but may "
+ "have reduced performance. To use conscrypt SSL pass pipeline option "
+ "--experiments={}",
experimentName);
}
return pipelineOptions;
}
public static void initializeLogging(Class<?> workerHarnessClass) {
/* Set up exception handling tied to the workerHarnessClass. */
Thread.setDefaultUncaughtExceptionHandler(
new WorkerUncaughtExceptionHandler(LoggerFactory.getLogger(workerHarnessClass)));
// Reset the global log manager, get the root logger and remove the default log handlers.
LogManager logManager = LogManager.getLogManager();
logManager.reset();
java.util.logging.Logger rootLogger = logManager.getLogger(ROOT_LOGGER_NAME);
for (Handler handler : rootLogger.getHandlers()) {
rootLogger.removeHandler(handler);
}
DataflowWorkerLoggingInitializer.initialize();
}
public static void configureLogging(DataflowWorkerHarnessOptions pipelineOptions) {
DataflowWorkerLoggingInitializer.configure(pipelineOptions);
}
public static Endpoints.ApiServiceDescriptor parseApiServiceDescriptorFromText(
String descriptorText) throws TextFormat.ParseException {
Endpoints.ApiServiceDescriptor.Builder apiServiceDescriptorBuilder =
Endpoints.ApiServiceDescriptor.newBuilder();
TextFormat.merge(descriptorText, apiServiceDescriptorBuilder);
return apiServiceDescriptorBuilder.build();
}
public static Endpoints.ApiServiceDescriptor getLoggingDescriptor()
throws TextFormat.ParseException {
return parseApiServiceDescriptorFromText(System.getenv().get(LOGGING_API_SERVICE_DESCRIPTOR));
}
public static Endpoints.ApiServiceDescriptor getControlDescriptor()
throws TextFormat.ParseException {
return parseApiServiceDescriptorFromText(System.getenv().get(CONTROL_API_SERVICE_DESCRIPTOR));
}
// TODO: make env logic private to main() so it is never done outside of initializing the process
public static @Nullable RunnerApi.Pipeline getPipelineFromEnv() throws IOException {
String pipelinePath = System.getenv(PIPELINE_PATH);
if (pipelinePath == null) {
LOG.warn("Missing pipeline environment variable '{}'", PIPELINE_PATH);
return null;
}
File pipelineFile = new File(System.getenv(PIPELINE_PATH));
if (!pipelineFile.exists()) {
LOG.warn("Pipeline path '{}' does not exist", pipelineFile);
return null;
}
try (FileInputStream inputStream = new FileInputStream(pipelineFile)) {
RunnerApi.Pipeline pipelineProto = RunnerApi.Pipeline.parseFrom(inputStream);
LOG.info("Found portable pipeline:\n{}", TextFormat.printToString(pipelineProto));
return pipelineProto;
}
}
}