| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.storm; |
| |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| |
| import com.esotericsoftware.kryo.Serializer; |
| |
| import org.apache.storm.serialization.IKryoDecorator; |
| import org.apache.storm.serialization.IKryoFactory; |
| |
| /** |
| * Topology configs are specified as a plain old map. This class provides a |
| * convenient way to create a topology config map by providing setter methods for |
| * all the configs that can be set. It also makes it easier to do things like add |
| * serializations. |
| * <p> |
| * <p>This class also provides constants for all the configurations possible on a Storm |
| * cluster and Storm topology. Default values for these configs can be found in |
| * defaults.yaml.</p> |
| * <p> |
| * <p>Note that you may put other configurations in any of the configs. Storm |
| * will ignore anything it doesn't recognize, but your topologies are free to make |
| * use of them by reading them in the prepare method of Bolts or the open method of |
| * Spouts. .</p> |
| */ |
| @SuppressWarnings({"rawtypes", "unchecked"}) |
| public class Config extends HashMap<String, Object> { |
| private static final long serialVersionUID = 4781760255471579334L; |
| |
| /** |
| * True if Storm should timeout messages or not. Defaults to true. This is meant to be used |
| * in unit tests to prevent tuples from being accidentally timed out during the test. |
| * Same functionality in Heron. |
| */ |
| public static final String TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS = "topology.enable.message.timeouts"; |
| /** |
| * Whether or not the master should optimize topologies by running multiple |
| * tasks in a single thread where appropriate. |
| * This has no impact in Heron |
| */ |
| public static final String TOPOLOGY_OPTIMIZE = "topology.optimize"; |
| /** |
| * How many instances to create for a spout/bolt. A task runs on a thread with zero or more |
| * other tasks for the same spout/bolt. The number of tasks for a spout/bolt is always |
| * the same throughout the lifetime of a topology, but the number of executors (threads) for |
| * a spout/bolt can change over time. This allows a topology to scale to more or less resources |
| * without redeploying the topology or violating the constraints of Storm (such as a fields grouping |
| * guaranteeing that the same value goes to the same task). |
| * This has no impact on Heron. |
| */ |
| public static final String TOPOLOGY_TASKS = "topology.tasks"; |
| /** |
| * A list of serialization registrations for Kryo ( http://code.google.com/p/kryo/ ), |
| * the underlying serialization framework for Storm. A serialization can either |
| * be the name of a class (in which case Kryo will automatically create a serializer for the class |
| * that saves all the object's fields), or an implementation of com.esotericsoftware.kryo.Serializer. |
| * <p> |
| * See Kryo's documentation for more information about writing custom serializers. |
| * Same in Heron. |
| */ |
| public static final String TOPOLOGY_KRYO_REGISTER = "topology.kryo.register"; |
| /** |
| * A list of classes that customize storm's kryo instance during start-up. |
| * Each listed class name must implement IKryoDecorator. During start-up the |
| * listed class is instantiated with 0 arguments, then its 'decorate' method |
| * is called with storm's kryo instance as the only argument. |
| * Same in Heron. |
| */ |
| public static final String TOPOLOGY_KRYO_DECORATORS = "topology.kryo.decorators"; |
| /** |
| * Class that specifies how to create a Kryo instance for serialization. Storm will then apply |
| * topology.kryo.register and topology.kryo.decorators on top of this. The default implementation |
| * implements topology.fall.back.on.java.serialization and turns references off. |
| * Same in Heron. |
| */ |
| public static final String TOPOLOGY_KRYO_FACTORY = "topology.kryo.factory"; |
| /** |
| * Whether or not Storm should skip the loading of kryo registrations for which it |
| * does not know the class or have the serializer implementation. Otherwise, the task will |
| * fail to load and will throw an error at runtime. The use case of this is if you want to |
| * declare your serializations on the storm.yaml files on the cluster rather than every single |
| * time you submit a topology. Different applications may use different serializations and so |
| * a single application may not have the code for the other serializers used by other apps. |
| * By setting this config to true, Storm will ignore that it doesn't have those other serializations |
| * rather than throw an error. |
| * Same in Heron. |
| */ |
| public static final String TOPOLOGY_SKIP_MISSING_KRYO_REGISTRATIONS = |
| "topology.skip.missing.kryo.registrations"; |
| /** |
| * The maximum amount of time a component gives a source of state to synchronize before it requests |
| * synchronization again. |
| * This is not implemented in Heron. |
| */ |
| public static final String TOPOLOGY_STATE_SYNCHRONIZATION_TIMEOUT_SECS = |
| "topology.state.synchronization.timeout.secs"; |
| /** |
| * Whether or not to use Java serialization in a topology. |
| * This has the same meaning in Heron. |
| */ |
| public static final String TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION = |
| "topology.fall.back.on.java.serialization"; |
| /** |
| * Topology-specific options for the worker child process. This is used in addition to WORKER_CHILDOPTS. |
| * Not yet implemented in Heron. |
| */ |
| public static final String TOPOLOGY_WORKER_CHILDOPTS = "topology.worker.childopts"; |
| /** |
| * This config is available for TransactionalSpouts, and contains the id ( a String) for |
| * the transactional topology. This id is used to store the state of the transactional |
| * topology in Zookeeper. |
| * This is not implemented in Heron. |
| */ |
| public static final String TOPOLOGY_TRANSACTIONAL_ID = "topology.transactional.id"; |
| /** |
| * How often a tick tuple from the "__system" component and "__tick" stream should be sent |
| * to tasks. Meant to be used as a component-specific configuration. |
| * Same meaning in Heron. |
| */ |
| public static final String TOPOLOGY_TICK_TUPLE_FREQ_SECS = "topology.tick.tuple.freq.secs"; |
| /** |
| * The interval in seconds to use for determining whether to throttle error reported to Zookeeper. For example, |
| * an interval of 10 seconds with topology.max.error.report.per.interval set to 5 will only allow 5 errors to be |
| * reported to Zookeeper per task for every 10 second interval of time. |
| * This is not supported in Heron. |
| */ |
| public static final String TOPOLOGY_ERROR_THROTTLE_INTERVAL_SECS = |
| "topology.error.throttle.interval.secs"; |
| /** |
| * See doc for TOPOLOGY_ERROR_THROTTLE_INTERVAL_SECS. |
| * This is not supported in Heron |
| */ |
| public static final String TOPOLOGY_MAX_ERROR_REPORT_PER_INTERVAL = |
| "topology.max.error.report.per.interval"; |
| /** |
| * When set to true, Storm will log every message that's emitted. |
| */ |
| public static final String TOPOLOGY_DEBUG = "topology.debug"; |
| /** |
| * This currently gets translated to TOPOLOGY_STMGRS. Please see the |
| * documentation for TOPOLOGY_STMGRS. |
| */ |
| public static final String TOPOLOGY_WORKERS = "topology.workers"; |
| /** |
| * How many executors to spawn for ackers. |
| * <p>If this is set to 0, then Storm will immediately ack tuples as soon |
| * as they come off the spout, effectively disabling reliability. |
| * In Heron any values of > 0 means to enable acking. |
| */ |
| public static final String TOPOLOGY_ACKER_EXECUTORS = "topology.acker.executors"; |
| /** |
| * The maximum amount of time given to the topology to fully process a message |
| * emitted by a spout. If the message is not acked within this time frame, Storm |
| * will fail the message on the spout. Some spouts implementations will then replay |
| * the message at a later time. |
| * This has the same meaning in Heron. |
| */ |
| public static final String TOPOLOGY_MESSAGE_TIMEOUT_SECS = "topology.message.timeout.secs"; |
| /* |
| * A list of classes implementing IMetricsConsumer (See storm.yaml.example for exact config format). |
| * Each listed class will be routed all the metrics data generated by the storm metrics API. |
| * Each listed class maps 1:1 to a system bolt named __metrics_ClassName#N, and it's parallelism is configurable. |
| * This is not supported by Heron. |
| */ |
| public static final String TOPOLOGY_METRICS_CONSUMER_REGISTER = |
| "topology.metrics.consumer.register"; |
| /** |
| * The maximum parallelism allowed for a component in this topology. This configuration is |
| * typically used in testing to limit the number of threads spawned in simulator. |
| * This does not have any impact in Heron |
| */ |
| public static final String TOPOLOGY_MAX_TASK_PARALLELISM = "topology.max.task.parallelism"; |
| /** |
| * The maximum number of tuples that can be pending on a spout task at any given time. |
| * This config applies to individual tasks, not to spouts or topologies as a whole. |
| * <p> |
| * A pending tuple is one that has been emitted from a spout but has not been acked or failed yet. |
| * Note that this config parameter has no effect for unreliable spouts that don't tag |
| * their tuples with a message id. |
| * This has same meaning in Heron. |
| */ |
| public static final String TOPOLOGY_MAX_SPOUT_PENDING = "topology.max.spout.pending"; |
| /** |
| * A class that implements a strategy for what to do when a spout needs to wait. Waiting is |
| * triggered in one of two conditions: |
| * <p> |
| * 1. nextTuple emits no tuples |
| * 2. The spout has hit maxSpoutPending and can't emit any more tuples |
| * This is not yet implemented in Heron. |
| */ |
| public static final String TOPOLOGY_SPOUT_WAIT_STRATEGY = "topology.spout.wait.strategy"; |
| /** |
| * The amount of milliseconds the SleepEmptyEmitStrategy should sleep for. |
| * This is not yet implemented in Heron |
| */ |
| public static final String TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS = |
| "topology.sleep.spout.wait.strategy.time.ms"; |
| /** |
| * The percentage of tuples to sample to produce stats for a task. |
| * This is not implemented in Heron. |
| */ |
| public static final String TOPOLOGY_STATS_SAMPLE_RATE = "topology.stats.sample.rate"; |
| /** |
| * A list of task hooks that are automatically added to every spout and bolt in the topology. An example |
| * of when you'd do this is to add a hook that integrates with your internal |
| * monitoring system. These hooks are instantiated using the zero-arg constructor. |
| */ |
| public static final String TOPOLOGY_AUTO_TASK_HOOKS = "topology.auto.task.hooks"; |
| /** |
| * Name of the topology. This config is automatically set by Storm when the topology is submitted. |
| * Same in Heron |
| */ |
| public static final String TOPOLOGY_NAME = "topology.name"; |
| |
| |
| /** |
| * Name of the team which owns this topology. |
| * Same in Heron |
| */ |
| public static final String TOPOLOGY_TEAM_NAME = "topology.team.name"; |
| |
| /** |
| * Email of the team which owns this topology. |
| * Same in Heron |
| */ |
| public static final String TOPOLOGY_TEAM_EMAIL = "topology.team.email"; |
| |
| /** |
| * Cap ticket (if filed) for the topology. If the topology is in prod this has to be set or it |
| * cannot be deployed. |
| * Same in Heron |
| */ |
| public static final String TOPOLOGY_CAP_TICKET = "topology.cap.ticket"; |
| |
| /** |
| * Project name of the topology, to help us with tagging which topologies are part of which project. For example, if topology A and |
| * Topology B are part of the same project, we will like to aggregate them as part of the same project. This is required by Cap team. |
| * Same in Heron |
| */ |
| public static final String TOPOLOGY_PROJECT_NAME = "topology.project.name"; |
| |
| /** |
| * A list of hosts of ZooKeeper servers used to manage the cluster. |
| */ |
| public static final String STORM_ZOOKEEPER_SERVERS = "storm.zookeeper.servers"; |
| |
| /** |
| * The port Storm will use to connect to each of the ZooKeeper servers. |
| */ |
| public static final String STORM_ZOOKEEPER_PORT = "storm.zookeeper.port"; |
| |
| /** |
| * The root directory in ZooKeeper for metadata about TransactionalSpouts. |
| */ |
| public static final String TRANSACTIONAL_ZOOKEEPER_ROOT = "transactional.zookeeper.root"; |
| |
| /** |
| * The session timeout for clients to ZooKeeper. |
| */ |
| public static final String STORM_ZOOKEEPER_SESSION_TIMEOUT = "storm.zookeeper.session.timeout"; |
| |
| /** |
| * The connection timeout for clients to ZooKeeper. |
| */ |
| public static final String STORM_ZOOKEEPER_CONNECTION_TIMEOUT = |
| "storm.zookeeper.connection.timeout"; |
| |
| /** |
| * The number of times to retry a Zookeeper operation. |
| */ |
| public static final String STORM_ZOOKEEPER_RETRY_TIMES = "storm.zookeeper.retry.times"; |
| |
| /** |
| * The interval between retries of a Zookeeper operation. |
| */ |
| public static final String STORM_ZOOKEEPER_RETRY_INTERVAL = "storm.zookeeper.retry.interval"; |
| |
| /** |
| * The list of zookeeper servers in which to keep the transactional state. If null (which is default), |
| * will use storm.zookeeper.servers |
| */ |
| public static final String TRANSACTIONAL_ZOOKEEPER_SERVERS = "transactional.zookeeper.servers"; |
| |
| /** |
| * The port to use to connect to the transactional zookeeper servers. If null (which is default), |
| * will use storm.zookeeper.port |
| */ |
| public static final String TRANSACTIONAL_ZOOKEEPER_PORT = "transactional.zookeeper.port"; |
| |
| /* |
| * Bolt-specific configuration for windowed bolts to specify the window length as a count of number of tuples |
| * in the window. |
| */ |
| public static final String TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT |
| = "topology.bolts.window.length.count"; |
| |
| /* |
| * Bolt-specific configuration for windowed bolts to specify the window length in time duration. |
| */ |
| public static final String TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS |
| = "topology.bolts.window.length.duration.ms"; |
| |
| /* |
| * Bolt-specific configuration for windowed bolts to specify the sliding interval as a count of number of tuples. |
| */ |
| public static final String TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT |
| = "topology.bolts.window.sliding.interval.count"; |
| |
| /* |
| * Bolt-specific configuration for windowed bolts to specify the sliding interval in time duration. |
| */ |
| public static final String TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS |
| = "topology.bolts.window.sliding.interval.duration.ms"; |
| |
| /** |
| * Bolt-specific configuration for windowed bolts to specify the name of the stream on which late tuples are |
| * going to be emitted. This configuration should only be used from the BaseWindowedBolt.withLateTupleStream builder |
| * method, and not as global parameter, otherwise IllegalArgumentException is going to be thrown. |
| */ |
| public static final String TOPOLOGY_BOLTS_LATE_TUPLE_STREAM |
| = "topology.bolts.late.tuple.stream"; |
| |
| /** |
| * Bolt-specific configuration for windowed bolts to specify the maximum time lag of the tuple timestamp |
| * in milliseconds. It means that the tuple timestamps cannot be out of order by more than this amount. |
| * This config will be effective only if {@link org.apache.storm.windowing.TimestampExtractor} is specified. |
| */ |
| public static final String TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS |
| = "topology.bolts.tuple.timestamp.max.lag.ms"; |
| |
| /* |
| * Bolt-specific configuration for windowed bolts to specify the time interval for generating |
| * watermark events. Watermark event tracks the progress of time when tuple timestamp is used. |
| * This config is effective only if {@link org.apache.storm.windowing.TimestampExtractor} is specified. |
| */ |
| public static final String TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS |
| = "topology.bolts.watermark.event.interval.ms"; |
| |
| /** |
| * Topology-specific environment variables for the worker child process. |
| * This is added to the existing environment (that of the supervisor) |
| */ |
| public static final String TOPOLOGY_ENVIRONMENT = "topology.environment"; |
| |
| /** |
| * ---- DO NOT USE ----- |
| * This variable is used to rewrite the TOPOLOGY_AUTO_TASK_HOOKS variable. |
| * As such this is a strictly internal config variable that is not exposed the user |
| */ |
| public static final String STORMCOMPAT_TOPOLOGY_AUTO_TASK_HOOKS = |
| "stormcompat.topology.auto.task.hooks"; |
| |
| public static void setDebug(Map conf, boolean isOn) { |
| conf.put(Config.TOPOLOGY_DEBUG, isOn); |
| } |
| |
| public static void setTeamName(Map conf, String teamName) { |
| conf.put(Config.TOPOLOGY_TEAM_NAME, teamName); |
| } |
| |
| public static void setTeamEmail(Map conf, String teamEmail) { |
| conf.put(Config.TOPOLOGY_TEAM_EMAIL, teamEmail); |
| } |
| |
| public static void setTopologyCapTicket(Map conf, String ticket) { |
| conf.put(Config.TOPOLOGY_CAP_TICKET, ticket); |
| } |
| |
| public static void setTopologyProjectName(Map conf, String project) { |
| conf.put(Config.TOPOLOGY_PROJECT_NAME, project); |
| } |
| |
| public static void setNumWorkers(Map conf, int workers) { |
| conf.put(Config.TOPOLOGY_WORKERS, workers); |
| } |
| |
| public static void setNumAckers(Map conf, int numExecutors) { |
| conf.put(Config.TOPOLOGY_ACKER_EXECUTORS, numExecutors); |
| } |
| |
| public static void setMessageTimeoutSecs(Map conf, int secs) { |
| conf.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, secs); |
| } |
| |
| public static void registerSerialization(Map conf, Class klass) { |
| getRegisteredSerializations(conf).add(klass.getName()); |
| } |
| |
| public static void registerSerialization( |
| Map conf, Class klass, Class<? extends Serializer> serializerClass) { |
| Map<String, String> register = new HashMap<>(); |
| register.put(klass.getName(), serializerClass.getName()); |
| getRegisteredSerializations(conf).add(register); |
| } |
| |
| @SuppressWarnings("rawtypes") |
| public static void registerDecorator( |
| Map conf, |
| Class<? extends IKryoDecorator> klass) { |
| getRegisteredDecorators(conf).add(klass.getName()); |
| } |
| |
| public static void setKryoFactory(Map conf, Class<? extends IKryoFactory> klass) { |
| conf.put(Config.TOPOLOGY_KRYO_FACTORY, klass.getName()); |
| } |
| |
| public static void setSkipMissingKryoRegistrations(Map conf, boolean skip) { |
| conf.put(Config.TOPOLOGY_SKIP_MISSING_KRYO_REGISTRATIONS, skip); |
| } |
| |
| public static void setMaxTaskParallelism(Map conf, int max) { |
| conf.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, max); |
| } |
| |
| public static void setMaxSpoutPending(Map conf, int max) { |
| conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, max); |
| } |
| |
| public static void setStatsSampleRate(Map conf, double rate) { |
| conf.put(Config.TOPOLOGY_STATS_SAMPLE_RATE, rate); |
| } |
| |
| public static void setFallBackOnJavaSerialization(Map conf, boolean fallback) { |
| conf.put(Config.TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION, fallback); |
| } |
| |
| @SuppressWarnings("rawtypes") // List can contain strings or maps |
| private static List getRegisteredSerializations(Map conf) { |
| List ret; |
| if (!conf.containsKey(Config.TOPOLOGY_KRYO_REGISTER)) { |
| ret = new ArrayList(); |
| } else { |
| ret = new ArrayList((List) conf.get(Config.TOPOLOGY_KRYO_REGISTER)); |
| } |
| conf.put(Config.TOPOLOGY_KRYO_REGISTER, ret); |
| return ret; |
| } |
| |
| @SuppressWarnings("rawtypes") |
| private static List<String> getRegisteredDecorators(Map conf) { |
| List<String> ret; |
| if (!conf.containsKey(Config.TOPOLOGY_KRYO_DECORATORS)) { |
| ret = new ArrayList<>(); |
| } else { |
| ret = new ArrayList<>((List<String>) conf.get(Config.TOPOLOGY_KRYO_DECORATORS)); |
| } |
| conf.put(Config.TOPOLOGY_KRYO_DECORATORS, ret); |
| return ret; |
| } |
| |
| public static void setEnvironment(Map<String, Object> conf, Map env) { |
| conf.put(Config.TOPOLOGY_ENVIRONMENT, env); |
| } |
| |
| public void setDebug(boolean isOn) { |
| setDebug(this, isOn); |
| } |
| |
| public void setTeamName(String teamName) { |
| setTeamName(this, teamName); |
| } |
| |
| public void setTeamEmail(String teamEmail) { |
| setTeamEmail(this, teamEmail); |
| } |
| |
| public void setTopologyCapTicket(String ticket) { |
| setTopologyCapTicket(this, ticket); |
| } |
| |
| public void setTopologyProjectName(String project) { |
| setTopologyProjectName(this, project); |
| } |
| |
| /** |
| * Set topology optimization |
| * @param isOn |
| * @deprecated we don't have optimization |
| */ |
| @Deprecated |
| public void setOptimize(boolean isOn) { |
| put(Config.TOPOLOGY_OPTIMIZE, isOn); |
| } |
| |
| public void setNumWorkers(int workers) { |
| setNumWorkers(this, workers); |
| } |
| |
| public void setNumAckers(int numExecutors) { |
| setNumAckers(this, numExecutors); |
| } |
| |
| public void setMessageTimeoutSecs(int secs) { |
| setMessageTimeoutSecs(this, secs); |
| } |
| |
| public void registerSerialization(Class klass) { |
| registerSerialization(this, klass); |
| } |
| |
| public void registerSerialization(Class klass, Class<? extends Serializer> serializerClass) { |
| registerSerialization(this, klass, serializerClass); |
| } |
| |
| public void registerMetricsConsumer(Class klass, Object argument, long parallelismHint) { |
| HashMap m = new HashMap<>(); |
| m.put("class", klass.getCanonicalName()); |
| m.put("parallelism.hint", parallelismHint); |
| m.put("argument", argument); |
| |
| List l = (List) this.get(TOPOLOGY_METRICS_CONSUMER_REGISTER); |
| if (l == null) { |
| l = new ArrayList<>(); |
| } |
| l.add(m); |
| this.put(TOPOLOGY_METRICS_CONSUMER_REGISTER, l); |
| } |
| |
| public void registerMetricsConsumer(Class klass, long parallelismHint) { |
| registerMetricsConsumer(klass, null, parallelismHint); |
| } |
| |
| public void registerMetricsConsumer(Class klass) { |
| registerMetricsConsumer(klass, null, 1L); |
| } |
| |
| public void registerDecorator(Class<? extends IKryoDecorator> klass) { |
| registerDecorator(this, klass); |
| } |
| |
| public void setKryoFactory(Class<? extends IKryoFactory> klass) { |
| setKryoFactory(this, klass); |
| } |
| |
| public void setSkipMissingKryoRegistrations(boolean skip) { |
| setSkipMissingKryoRegistrations(this, skip); |
| } |
| |
| public void setMaxTaskParallelism(int max) { |
| setMaxTaskParallelism(this, max); |
| } |
| |
| public void setMaxSpoutPending(int max) { |
| setMaxSpoutPending(this, max); |
| } |
| |
| public void setStatsSampleRate(double rate) { |
| setStatsSampleRate(this, rate); |
| } |
| |
| public void setFallBackOnJavaSerialization(boolean fallback) { |
| setFallBackOnJavaSerialization(this, fallback); |
| } |
| } |
| |