| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| |
| package org.apache.heron.streamlet; |
| |
| import java.io.Serializable; |
| |
| import org.apache.heron.common.basics.ByteAmount; |
| import org.apache.heron.streamlet.impl.KryoSerializer; |
| |
| /** |
| * Config is the way users configure the execution of the topology. |
| * Things like streamlet delivery semantics, resources used, as well as |
| * user-defined key/value pairs are passed on to the topology runner via |
| * this class. |
| */ |
| public final class Config implements Serializable { |
| private static final long serialVersionUID = 6204498077403076352L; |
| private final double cpu; |
| private final ByteAmount ram; |
| private final DeliverySemantics deliverySemantics; |
| private final Serializer serializer; |
| private org.apache.heron.api.Config heronConfig; |
| private static final long MB = 1024 * 1024; |
| private static final long GB = 1024 * MB; |
| |
| /** |
| * An enum encapsulating the delivery semantics that can be applied to Heron topologies. The |
| * options are currently: at most once, at least once, or effectively once. |
| */ |
| public enum DeliverySemantics { |
| ATMOST_ONCE, |
| ATLEAST_ONCE, |
| EFFECTIVELY_ONCE |
| } |
| |
| /** |
| * An enum encapsulating the serializers that can be used for data in the topology. The options |
| * are currently: the Kryo serializer or the native Java serializer. |
| */ |
| public enum Serializer { |
| JAVA, |
| KRYO |
| } |
| |
| private static class Defaults { |
| static final boolean USE_KRYO = true; |
| static final double CPU = -1.0; // -1 means undefined |
| static final ByteAmount RAM = ByteAmount.fromBytes(-1); // -1 means undefined |
| static final DeliverySemantics SEMANTICS = DeliverySemantics.ATMOST_ONCE; |
| static final Serializer SERIALIZER = Serializer.KRYO; |
| } |
| |
| private Config(Builder builder) { |
| serializer = builder.serializer; |
| heronConfig = builder.config; |
| cpu = builder.cpu; |
| ram = builder.ram; |
| deliverySemantics = builder.deliverySemantics; |
| } |
| |
| /** |
| * Sets the topology to use the default configuration: 100 megabytes of RAM per container, 1.0 |
| * CPUs per container, at-most-once delivery semantics, and the Kryo serializer. |
| */ |
| public static Config defaultConfig() { |
| return new Builder() |
| .build(); |
| } |
| |
| /** |
| * Returns a new {@link Builder} that can be used to create a configuration object for Streamlet |
| * API topologies |
| */ |
| public static Builder newBuilder() { |
| return new Builder(); |
| } |
| |
| public org.apache.heron.api.Config getHeronConfig() { |
| return heronConfig; |
| } |
| |
| /** |
| * Gets the CPU used per topology container |
| * @return the per-container CPU as a double |
| */ |
| public double getPerContainerCpu() { |
| return cpu; |
| } |
| |
| /** |
| * Gets the RAM used per topology container as a number of bytes |
| * @return the per-container RAM in bytes |
| */ |
| public long getPerContainerRam() { |
| return getPerContainerRamAsBytes(); |
| } |
| |
| /** |
| * Gets the RAM used per topology container as a number of bytes |
| * @return the per-container RAM in bytes |
| */ |
| public long getPerContainerRamAsBytes() { |
| return ram.asBytes(); |
| } |
| |
| /** |
| * Gets the RAM used per topology container as a number of megabytes |
| * @return the per-container RAM in megabytes |
| */ |
| public long getPerContainerRamAsMegabytes() { |
| return ram.asMegabytes(); |
| } |
| |
| /** |
| * Gets the RAM used per topology container as a number of gigabytes |
| * @return the per-container RAM in gigabytes |
| */ |
| public long getPerContainerRamAsGigabytes() { |
| return ram.asGigabytes(); |
| } |
| |
| /** |
| * Gets the delivery semantics applied to the topology |
| * @return the delivery semantics as an enum |
| */ |
| public DeliverySemantics getDeliverySemantics() { |
| return deliverySemantics; |
| } |
| |
| /** |
| * Gets the serializer used by the topology |
| * @return the serializer as an enum |
| */ |
| public Serializer getSerializer() { |
| return serializer; |
| } |
| |
| private static org.apache.heron.api.Config.TopologyReliabilityMode translateSemantics( |
| DeliverySemantics semantics) { |
| switch (semantics) { |
| case ATMOST_ONCE: |
| return org.apache.heron.api.Config.TopologyReliabilityMode.ATMOST_ONCE; |
| case ATLEAST_ONCE: |
| return org.apache.heron.api.Config.TopologyReliabilityMode.ATLEAST_ONCE; |
| case EFFECTIVELY_ONCE: |
| return org.apache.heron.api.Config.TopologyReliabilityMode.EFFECTIVELY_ONCE; |
| default: |
| return org.apache.heron.api.Config.TopologyReliabilityMode.ATMOST_ONCE; |
| } |
| } |
| |
| public static final class Builder { |
| private org.apache.heron.api.Config config; |
| private double cpu; |
| private ByteAmount ram; |
| private DeliverySemantics deliverySemantics; |
| private Serializer serializer; |
| |
| private Builder() { |
| config = new org.apache.heron.api.Config(); |
| cpu = Defaults.CPU; |
| ram = Defaults.RAM; |
| deliverySemantics = Defaults.SEMANTICS; |
| serializer = Serializer.KRYO; |
| } |
| |
| /** |
| * Sets the per-container (per-instance) CPU to be used by this topology |
| * @param perContainerCpu Per-container (per-instance) CPU as a double |
| */ |
| public Builder setPerContainerCpu(double perContainerCpu) { |
| this.cpu = perContainerCpu; |
| // Different packing algorithm might use different configs. Set all of them here. |
| config.setContainerCpuRequested(perContainerCpu); |
| config.setContainerMaxCpuHint(perContainerCpu); |
| return this; |
| } |
| |
| /** |
| * Sets the per-container (per-instance) RAM to be used by this topology |
| * @param perContainerRam Per-container (per-instance) RAM expressed as a Long. |
| */ |
| public Builder setPerContainerRam(long perContainerRam) { |
| return setPerContainerRamInBytes(perContainerRam); |
| } |
| |
| /** |
| * Sets the per-container (per-instance) RAM to be used by this topology as a number of bytes |
| * @param perContainerRam Per-container (per-instance) RAM expressed as a Long. |
| */ |
| public Builder setPerContainerRamInBytes(long perContainerRam) { |
| this.ram = ByteAmount.fromBytes(perContainerRam); |
| // Different packing algorithm might use different configs. Set all of them here. |
| config.setContainerRamRequested(ram); |
| config.setContainerMaxRamHint(ram); |
| return this; |
| } |
| |
| /** |
| * Sets the per-container (per-instance) RAM to be used by this topology in megabytes |
| * @param perContainerRamMB Per-container (per-instance) RAM expressed as a Long. |
| */ |
| public Builder setPerContainerRamInMegabytes(long perContainerRamMB) { |
| return setPerContainerRam(perContainerRamMB * MB); |
| } |
| |
| /** |
| * Sets the per-container (per-instance) RAM to be used by this topology in gigabytes |
| * @param perContainerRamGB Per-container (per-instance) RAM expressed as a Long. |
| */ |
| public Builder setPerContainerRamInGigabytes(long perContainerRamGB) { |
| return setPerContainerRam(perContainerRamGB * GB); |
| } |
| |
| /** |
| * Sets the number of containers to run this topology |
| * @param numContainers The number of containers across which to distribute this topology |
| */ |
| public Builder setNumContainers(int numContainers) { |
| config.setNumStmgrs(numContainers); |
| return this; |
| } |
| |
| /** |
| * Sets the delivery semantics of the topology |
| * @param semantics The delivery semantic to be enforced |
| */ |
| public Builder setDeliverySemantics(DeliverySemantics semantics) { |
| this.deliverySemantics = semantics; |
| config.setTopologyReliabilityMode(Config.translateSemantics(semantics)); |
| return this; |
| } |
| |
| /** |
| * Sets some user-defined key/value mapping |
| * @param key The user-defined key |
| * @param value The user-defined value |
| */ |
| public Builder setUserConfig(String key, Object value) { |
| config.put(key, value); |
| return this; |
| } |
| |
| private void useKryo() { |
| try { |
| config.setSerializationClassName(KryoSerializer.class.getName()); |
| } catch (NoClassDefFoundError e) { |
| throw new RuntimeException("Linking with kryo is needed because useKryoSerializer is used"); |
| } |
| } |
| |
| /** |
| * Sets the {@link Serializer} to be used by the topology (current options are {@link |
| * KryoSerializer} and the native Java serializer. |
| * @param topologySerializer The data serializer to use for streamlet elements in the topology. |
| */ |
| public Builder setSerializer(Serializer topologySerializer) { |
| this.serializer = topologySerializer; |
| return this; |
| } |
| |
| public Config build() { |
| if (serializer.equals(Serializer.KRYO)) { |
| useKryo(); |
| } |
| return new Config(this); |
| } |
| } |
| } |