| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.heron.scheduler.utils; |
| |
| import java.util.logging.Logger; |
| |
| import org.apache.heron.api.generated.TopologyAPI; |
| import org.apache.heron.api.utils.TopologyUtils; |
| import org.apache.heron.common.basics.SysUtils; |
| import org.apache.heron.spi.common.Config; |
| import org.apache.heron.spi.common.Context; |
| import org.apache.heron.spi.common.Key; |
| import org.apache.heron.spi.packing.IPacking; |
| import org.apache.heron.spi.packing.PackingException; |
| import org.apache.heron.spi.packing.PackingPlan; |
| import org.apache.heron.spi.scheduler.IScheduler; |
| import org.apache.heron.spi.scheduler.SchedulerException; |
| import org.apache.heron.spi.statemgr.SchedulerStateManagerAdaptor; |
| import org.apache.heron.spi.utils.ReflectionUtils; |
| |
| /** |
| * {@link LauncherUtils} contains helper methods used by the server and client side launch |
| * controllers; {@link org.apache.heron.scheduler.LaunchRunner} and |
| * {@link org.apache.heron.scheduler.SchedulerMain} |
| */ |
| public class LauncherUtils { |
| private static final Logger LOG = Logger.getLogger(LauncherUtils.class.getName()); |
| |
| private static LauncherUtils instance = new LauncherUtils(); |
| |
| public static LauncherUtils getInstance() { |
| return instance; |
| } |
| |
| /** |
| * Returns a packing plan generated by configured packing class |
| */ |
| public PackingPlan createPackingPlan(final Config config, final Config runtime) |
| throws PackingException { |
| // Create an instance of the packing class |
| String packingClass = Context.packingClass(config); |
| IPacking packing; |
| try { |
| // create an instance of the packing class |
| packing = ReflectionUtils.newInstance(packingClass); |
| } catch (IllegalAccessException | InstantiationException | ClassNotFoundException e) { |
| throw new PackingException(String.format( |
| "Failed to instantiate packing instance using packing class %s", packingClass), e); |
| } |
| |
| try { |
| TopologyAPI.Topology topology = Runtime.topology(runtime); |
| packing.initialize(config, topology); |
| return packing.pack(); |
| } finally { |
| SysUtils.closeIgnoringExceptions(packing); |
| } |
| } |
| |
| /** |
| * Invoke the onScheduler() in IScheduler directly as a library |
| * |
| * @param config The Config to initialize IScheduler |
| * @param runtime The runtime Config to initialize IScheduler |
| * @param scheduler the IScheduler to invoke |
| * @param packing The PackingPlan to scheduler for OnSchedule() |
| * @return true if scheduling successfully |
| */ |
| public boolean onScheduleAsLibrary( |
| Config config, |
| Config runtime, |
| IScheduler scheduler, |
| PackingPlan packing) { |
| boolean ret = false; |
| |
| try { |
| scheduler.initialize(config, runtime); |
| ret = scheduler.onSchedule(packing); |
| |
| if (ret) { |
| // Set the SchedulerLocation at last step, |
| // since some methods in IScheduler will provide correct values |
| // only after IScheduler.onSchedule is invoked correctly |
| ret = SchedulerUtils.setLibSchedulerLocation(runtime, scheduler, false); |
| } else { |
| LOG.severe("Failed to invoke IScheduler as library"); |
| } |
| } finally { |
| scheduler.close(); |
| } |
| |
| return ret; |
| } |
| |
| /** |
| * Creates and initializes scheduler instance |
| * |
| * @return initialized scheduler instances |
| */ |
| public IScheduler getSchedulerInstance(Config config, Config runtime) |
| throws SchedulerException { |
| String schedulerClass = Context.schedulerClass(config); |
| IScheduler scheduler; |
| try { |
| // create an instance of scheduler |
| scheduler = ReflectionUtils.newInstance(schedulerClass); |
| } catch (IllegalAccessException | InstantiationException | ClassNotFoundException e) { |
| throw new SchedulerException(String.format("Failed to instantiate scheduler using class '%s'", |
| schedulerClass)); |
| } |
| |
| scheduler.initialize(config, runtime); |
| return scheduler; |
| } |
| |
| /** |
| * Creates initial runtime config instance using topology information. |
| * |
| * @return initial runtime config instance |
| */ |
| public Config createPrimaryRuntime(TopologyAPI.Topology topology) { |
| return Config.newBuilder() |
| .put(Key.TOPOLOGY_ID, topology.getId()) |
| .put(Key.TOPOLOGY_NAME, topology.getName()) |
| .put(Key.TOPOLOGY_DEFINITION, topology) |
| .put(Key.NUM_CONTAINERS, 1 + TopologyUtils.getNumContainers(topology)) |
| .build(); |
| } |
| |
| /** |
| * Creates initial runtime config of scheduler state manager adaptor |
| * |
| * @return adaptor config |
| */ |
| public Config createAdaptorRuntime(SchedulerStateManagerAdaptor adaptor) { |
| return Config.newBuilder() |
| .put(Key.SCHEDULER_STATE_MANAGER_ADAPTOR, adaptor).build(); |
| } |
| |
| /** |
| * Creates a config instance with packing plan info added to runtime config |
| * |
| * @return packing details config |
| */ |
| public Config createConfigWithPackingDetails(Config runtime, PackingPlan packing) { |
| return Config.newBuilder() |
| .putAll(runtime) |
| .put(Key.COMPONENT_RAMMAP, packing.getComponentRamDistribution()) |
| .put(Key.NUM_CONTAINERS, 1 + packing.getContainers().size()) |
| .build(); |
| } |
| } |