| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.samza.rest.proxy.task; |
| |
| import com.google.common.base.Preconditions; |
| import com.google.common.collect.ImmutableList; |
| import com.google.common.collect.ImmutableMap; |
| import java.io.IOException; |
| import java.net.URI; |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.stream.Collectors; |
| import org.apache.samza.SamzaException; |
| import org.apache.samza.config.Config; |
| import org.apache.samza.config.ConfigFactory; |
| import org.apache.samza.config.JobConfig; |
| import org.apache.samza.config.MapConfig; |
| import org.apache.samza.config.StorageConfig; |
| import org.apache.samza.container.LocalityManager; |
| import org.apache.samza.container.grouper.task.TaskAssignmentManager; |
| import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore; |
| import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore; |
| import org.apache.samza.coordinator.stream.CoordinatorStreamSystemConsumer; |
| import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping; |
| import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping; |
| import org.apache.samza.coordinator.stream.messages.SetTaskModeMapping; |
| import org.apache.samza.metrics.MetricsRegistryMap; |
| import org.apache.samza.rest.model.Task; |
| import org.apache.samza.rest.proxy.installation.InstallationFinder; |
| import org.apache.samza.rest.proxy.installation.InstallationRecord; |
| import org.apache.samza.rest.proxy.job.JobInstance; |
| import org.apache.samza.util.CoordinatorStreamUtil; |
| import org.apache.samza.util.ReflectionUtil; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| |
| /** |
| * {@link TaskProxy} interface implementation for samza jobs running in yarn execution environment. |
| * getTasks implementation reads the jobModel of the job specified by {@link JobInstance} from coordinator stream. |
| */ |
| public class SamzaTaskProxy implements TaskProxy { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(SamzaTaskProxy.class); |
| |
| private static final MetricsRegistryMap METRICS_REGISTRY = new MetricsRegistryMap(); |
| |
| private final TaskResourceConfig taskResourceConfig; |
| |
| private final InstallationFinder installFinder; |
| |
| public SamzaTaskProxy(TaskResourceConfig taskResourceConfig, InstallationFinder installFinder) { |
| this.taskResourceConfig = taskResourceConfig; |
| this.installFinder = installFinder; |
| } |
| |
| /** |
| * Fetches the complete job model from the coordinator stream based upon the provided {@link JobInstance} |
| * param, transforms it to a list of {@link Task} and returns it. |
| * {@inheritDoc} |
| */ |
| @Override |
| public List<Task> getTasks(JobInstance jobInstance) throws IOException, InterruptedException { |
| Preconditions.checkArgument(installFinder.isInstalled(jobInstance), |
| String.format("Invalid job instance : %s", jobInstance)); |
| CoordinatorStreamSystemConsumer coordinatorStreamSystemConsumer = null; |
| try { |
| coordinatorStreamSystemConsumer = initializeCoordinatorStreamConsumer(jobInstance); |
| return readTasksFromCoordinatorStream(coordinatorStreamSystemConsumer); |
| } finally { |
| if (coordinatorStreamSystemConsumer != null) { |
| coordinatorStreamSystemConsumer.stop(); |
| } |
| } |
| } |
| |
| /** |
| * Initialize {@link CoordinatorStreamSystemConsumer} based upon {@link JobInstance} parameter. |
| * @param jobInstance the job instance to get CoordinatorStreamSystemConsumer for. |
| * @return built and initialized CoordinatorStreamSystemConsumer. |
| */ |
| protected CoordinatorStreamSystemConsumer initializeCoordinatorStreamConsumer(JobInstance jobInstance) { |
| Config coordinatorSystemConfig = getCoordinatorSystemConfig(jobInstance); |
| LOG.debug("Using config: {} to create coordinatorStream consumer.", coordinatorSystemConfig); |
| CoordinatorStreamSystemConsumer consumer = new CoordinatorStreamSystemConsumer(coordinatorSystemConfig, METRICS_REGISTRY); |
| LOG.debug("Registering coordinator system stream consumer."); |
| consumer.register(); |
| LOG.debug("Starting coordinator system stream consumer."); |
| consumer.start(); |
| LOG.debug("Bootstrapping coordinator system stream consumer."); |
| consumer.bootstrap(); |
| return consumer; |
| } |
| |
| /** |
| * Builds coordinator system config for the {@param jobInstance}. |
| * @param jobInstance the job instance to get the jobModel for. |
| * @return the constructed coordinator system config. |
| */ |
| private Config getCoordinatorSystemConfig(JobInstance jobInstance) { |
| try { |
| InstallationRecord record = installFinder.getAllInstalledJobs().get(jobInstance); |
| ConfigFactory configFactory = |
| ReflectionUtil.getObj(taskResourceConfig.getJobConfigFactory(), ConfigFactory.class); |
| Config config = configFactory.getConfig(new URI(String.format("file://%s", record.getConfigFilePath()))); |
| Map<String, String> configMap = ImmutableMap.of(JobConfig.JOB_ID, jobInstance.getJobId(), |
| JobConfig.JOB_NAME, jobInstance.getJobName()); |
| return CoordinatorStreamUtil.buildCoordinatorStreamConfig(new MapConfig(ImmutableList.of(config, configMap))); |
| } catch (Exception e) { |
| LOG.error(String.format("Failed to get coordinator stream config for job : %s", jobInstance), e); |
| throw new SamzaException(e); |
| } |
| } |
| |
| /** |
| * Builds list of {@link Task} from job model in coordinator stream. |
| * @param consumer system consumer associated with a job's coordinator stream. |
| * @return list of {@link Task} constructed from job model in coordinator stream. |
| */ |
| protected List<Task> readTasksFromCoordinatorStream(CoordinatorStreamSystemConsumer consumer) { |
| CoordinatorStreamStore coordinatorStreamStore = new CoordinatorStreamStore(consumer.getConfig(), new MetricsRegistryMap()); |
| LocalityManager localityManager = new LocalityManager(coordinatorStreamStore); |
| Map<String, Map<String, String>> containerIdToHostMapping = localityManager.readContainerLocality(); |
| TaskAssignmentManager taskAssignmentManager = new TaskAssignmentManager(new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetTaskContainerMapping.TYPE), new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetTaskModeMapping.TYPE)); |
| Map<String, String> taskNameToContainerIdMapping = taskAssignmentManager.readTaskAssignment(); |
| StorageConfig storageConfig = new StorageConfig(consumer.getConfig()); |
| List<String> storeNames = storageConfig.getStoreNames(); |
| return taskNameToContainerIdMapping.entrySet() |
| .stream() |
| .map(entry -> { |
| String hostName = containerIdToHostMapping.get(entry.getValue()).get(SetContainerHostMapping.HOST_KEY); |
| return new Task(hostName, entry.getKey(), entry.getValue(), new ArrayList<>(), storeNames); |
| }).collect(Collectors.toList()); |
| } |
| } |