blob: 716891976624a7ad49a524a4d63c0cca0559de1a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.hooks;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.heron.common.basics.TypeUtils;
import org.apache.storm.Config;
import org.apache.storm.hooks.info.BoltAckInfo;
import org.apache.storm.hooks.info.BoltExecuteInfo;
import org.apache.storm.hooks.info.BoltFailInfo;
import org.apache.storm.hooks.info.EmitInfo;
import org.apache.storm.hooks.info.SpoutAckInfo;
import org.apache.storm.hooks.info.SpoutFailInfo;
import org.apache.storm.task.TopologyContext;
/**
* There would be types of task hooks inside ITaskHookDelegate:
* 1. task hook's classes specified in config statically.
* The task hooks' objects would be instantiated by using reflection and added into
* the list of ITaskHook when the method
* prepare(Map conf, org.apache.heron.api.topology.TopologyContext context)
* is invoked.
* 2. task hook added dynamically by invoking addHook(ITaskHook)
*/
@SuppressWarnings("rawtypes")
public class ITaskHookDelegate implements org.apache.heron.api.hooks.ITaskHook {
private List<ITaskHook> hooks;
private Map conf;
// zero arg constructor
public ITaskHookDelegate() {
hooks = new LinkedList<ITaskHook>();
}
public void addHook(ITaskHook hook) {
hooks.add(hook);
}
public List<ITaskHook> getHooks() {
return hooks;
}
public Map getConf() {
return conf;
}
@Override
public void prepare(Map<String, Object> newConf,
org.apache.heron.api.topology.TopologyContext context) {
this.conf = newConf;
if (!newConf.containsKey(Config.STORMCOMPAT_TOPOLOGY_AUTO_TASK_HOOKS)) {
throw new RuntimeException("StormCompat Translation not done for task hooks");
}
List<String> hookClassNames =
TypeUtils.getListOfStrings(newConf.get(Config.STORMCOMPAT_TOPOLOGY_AUTO_TASK_HOOKS));
for (String className : hookClassNames) {
ITaskHook hook;
try {
hook = (ITaskHook) Class.forName(className).newInstance();
} catch (ClassNotFoundException ex) {
throw new RuntimeException(ex + " ITaskHook class must be in class path.");
} catch (InstantiationException ex) {
throw new RuntimeException(ex + " ITaskHook class must be concrete.");
} catch (IllegalAccessException ex) {
throw new RuntimeException(ex + " ITaskHook class must have a no-arg constructor.");
}
hooks.add(hook);
}
// Invoke the prepare() for all ITaskHooks
TopologyContext ctxt = new TopologyContext(context);
for (ITaskHook hook : hooks) {
hook.prepare(newConf, ctxt);
}
}
@Override
public void cleanup() {
for (ITaskHook hook : hooks) {
hook.cleanup();
}
}
@Override
public void emit(org.apache.heron.api.hooks.info.EmitInfo info) {
EmitInfo emit = new EmitInfo(info);
for (ITaskHook hook : hooks) {
hook.emit(emit);
}
}
@Override
public void spoutAck(org.apache.heron.api.hooks.info.SpoutAckInfo info) {
SpoutAckInfo ack = new SpoutAckInfo(info);
for (ITaskHook hook : hooks) {
hook.spoutAck(ack);
}
}
@Override
public void spoutFail(org.apache.heron.api.hooks.info.SpoutFailInfo info) {
SpoutFailInfo fail = new SpoutFailInfo(info);
for (ITaskHook hook : hooks) {
hook.spoutFail(fail);
}
}
@Override
public void boltAck(org.apache.heron.api.hooks.info.BoltAckInfo info) {
BoltAckInfo ack = new BoltAckInfo(info);
for (ITaskHook hook : hooks) {
hook.boltAck(ack);
}
}
@Override
public void boltFail(org.apache.heron.api.hooks.info.BoltFailInfo info) {
BoltFailInfo fail = new BoltFailInfo(info);
for (ITaskHook hook : hooks) {
hook.boltFail(fail);
}
}
@Override
public void boltExecute(org.apache.heron.api.hooks.info.BoltExecuteInfo info) {
BoltExecuteInfo execute = new BoltExecuteInfo(info);
for (ITaskHook hook : hooks) {
hook.boltExecute(execute);
}
}
}