blob: a6f771bf0e62bb38671be23015f1956fe914186c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/
package org.apache.storm.cassandra.context;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* Simple class used to register singletons within a storm worker.
*/
public class WorkerCtx implements Serializable {
private static final ConcurrentMap<Class, BeanFactory<?>> workerCtx = new ConcurrentHashMap<>();
private Map<Class, BeanFactory<?>> componentCtx = new HashMap<>();
/**
* Creates a new {@link WorkerCtx} instance.
*/
public WorkerCtx() {
super();
}
/**
* Register a bean provider for a specified type.
*/
public <T> void register(Class<T> clazz, BeanFactory<T> provider) {
componentCtx.put(clazz, provider);
}
/**
* Return an instance provider of the specified type.
* @throws RuntimeException if no bean provider can be resolve for the given class
*/
protected <T> BeanFactory<T> getBeanfactory(Class<T> clazz) {
BeanFactory<T> factory = (BeanFactory<T>) this.componentCtx.get(clazz);
if (factory == null) {
throw new RuntimeException("Cannot resolve bean factory for class : " + clazz.getCanonicalName());
}
factory.setStormContext(this);
return factory;
}
/**
* Return an instance, which is shared (within a Worker), of the specified type.
*/
public <T, K, V> T getWorkerBean(Class<T> clazz, Map<K, V> topoConf) {
return getWorkerBean(clazz, topoConf, false);
}
/**
* Return an instance, which is shared (within a Worker), of the specified type.
*
* @param clazz the class of the bean.
* @param topoConf the storm configuration
* @param force if <code>true</code>= create a new instance even if one already exist.
*
* @return a instance of type {@link T}.
*/
public <T, K, V> T getWorkerBean(Class<T> clazz, Map<K, V> topoConf, boolean force) {
if (force) {
workerCtx.remove(clazz);
}
BeanFactory<T> factory = (BeanFactory<T>) this.workerCtx.get(clazz);
if (factory == null) {
BeanFactory<T> instance = getBeanfactory(clazz).newInstance();
workerCtx.putIfAbsent(clazz, instance);
factory = (BeanFactory<T>) this.workerCtx.get(clazz);
}
return factory.get((Map<String, Object>) topoConf);
}
}