blob: 9fb2c7c2609956b54fac74d9a84d2fcc607982c7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.dispatcher;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.SchedulerExecutionMode;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory;
import org.apache.flink.runtime.jobmaster.JobManagerRunner;
import org.apache.flink.runtime.jobmaster.JobManagerSharedServices;
import org.apache.flink.runtime.jobmaster.JobMasterConfiguration;
import org.apache.flink.runtime.jobmaster.JobMasterServiceLeadershipRunner;
import org.apache.flink.runtime.jobmaster.SlotPoolServiceSchedulerFactory;
import org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory;
import org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceProcessFactory;
import org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFactory;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.runtime.shuffle.ShuffleServiceLoader;
import org.apache.flink.util.Preconditions;
import static org.apache.flink.util.Preconditions.checkArgument;
/** Factory which creates a {@link JobMasterServiceLeadershipRunner}. */
public enum JobMasterServiceLeadershipRunnerFactory implements JobManagerRunnerFactory {
INSTANCE;
@Override
public JobManagerRunner createJobManagerRunner(
JobGraph jobGraph,
Configuration configuration,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
JobManagerSharedServices jobManagerServices,
JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
FatalErrorHandler fatalErrorHandler,
long initializationTimestamp)
throws Exception {
checkArgument(jobGraph.getNumberOfVertices() > 0, "The given job is empty");
final JobMasterConfiguration jobMasterConfiguration =
JobMasterConfiguration.fromConfiguration(configuration);
final RunningJobsRegistry runningJobsRegistry =
highAvailabilityServices.getRunningJobsRegistry();
final LeaderElectionService jobManagerLeaderElectionService =
highAvailabilityServices.getJobManagerLeaderElectionService(jobGraph.getJobID());
final SlotPoolServiceSchedulerFactory slotPoolServiceSchedulerFactory =
DefaultSlotPoolServiceSchedulerFactory.fromConfiguration(
configuration, jobGraph.getJobType());
if (jobMasterConfiguration.getConfiguration().get(JobManagerOptions.SCHEDULER_MODE)
== SchedulerExecutionMode.REACTIVE) {
Preconditions.checkState(
slotPoolServiceSchedulerFactory.getSchedulerType()
== JobManagerOptions.SchedulerType.Adaptive,
"Adaptive Scheduler is required for reactive mode");
}
final ShuffleMaster<?> shuffleMaster =
ShuffleServiceLoader.loadShuffleServiceFactory(configuration)
.createShuffleMaster(configuration);
final LibraryCacheManager.ClassLoaderLease classLoaderLease =
jobManagerServices
.getLibraryCacheManager()
.registerClassLoaderLease(jobGraph.getJobID());
final ClassLoader userCodeClassLoader =
classLoaderLease
.getOrResolveClassLoader(
jobGraph.getUserJarBlobKeys(), jobGraph.getClasspaths())
.asClassLoader();
final DefaultJobMasterServiceFactory jobMasterServiceFactory =
new DefaultJobMasterServiceFactory(
jobManagerServices.getScheduledExecutorService(),
rpcService,
jobMasterConfiguration,
jobGraph,
highAvailabilityServices,
slotPoolServiceSchedulerFactory,
jobManagerServices,
heartbeatServices,
jobManagerJobMetricGroupFactory,
fatalErrorHandler,
userCodeClassLoader,
shuffleMaster,
initializationTimestamp);
final DefaultJobMasterServiceProcessFactory jobMasterServiceProcessFactory =
new DefaultJobMasterServiceProcessFactory(
jobGraph.getJobID(),
jobGraph.getName(),
jobGraph.getCheckpointingSettings(),
initializationTimestamp,
jobMasterServiceFactory);
return new JobMasterServiceLeadershipRunner(
jobMasterServiceProcessFactory,
jobManagerLeaderElectionService,
runningJobsRegistry,
classLoaderLease,
fatalErrorHandler);
}
}