blob: 6eda37dd3d752cb8787f221a04b3a9865ee1016b [file] [log] [blame]
package org.apache.s4.core;
import java.io.InputStream;
import java.util.HashMap;
import org.apache.commons.configuration.ConfigurationConverter;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.s4.comm.topology.Assignment;
import org.apache.s4.comm.topology.AssignmentFromZK;
import org.apache.s4.comm.topology.ZkClient;
import org.apache.s4.comm.util.ArchiveFetcher;
import org.apache.s4.comm.util.RemoteFileFetcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.inject.AbstractModule;
import com.google.inject.Binder;
import com.google.inject.name.Names;
public class BaseModule extends AbstractModule {
private static Logger logger = LoggerFactory.getLogger(BaseModule.class);
private PropertiesConfiguration config;
InputStream baseConfigInputStream;
String clusterName;
public BaseModule(InputStream baseConfigInputStream, String clusterName) {
super();
this.baseConfigInputStream = baseConfigInputStream;
this.clusterName = clusterName;
}
@Override
protected void configure() {
if (config == null) {
loadProperties(binder());
}
// a node holds a single partition assignment
// ==> Assignment is a singleton so it shared between base, comm and app layers.
// it is eager so that the node is able to join a cluster immediately
bind(Assignment.class).to(AssignmentFromZK.class).asEagerSingleton();
// bind(Cluster.class).to(ClusterFromZK.class);
bind(ArchiveFetcher.class).to(RemoteFileFetcher.class);
bind(S4Bootstrap.class);
// ZkClientProvider singleton shares the Zookeeper connection
bind(ZkClient.class).toProvider(ZkClientProvider.class);
}
@SuppressWarnings("serial")
private void loadProperties(Binder binder) {
try {
config = new PropertiesConfiguration();
config.load(baseConfigInputStream);
// TODO - validate properties.
/* Make all properties injectable. Do we need this? */
Names.bindProperties(binder, ConfigurationConverter.getProperties(config));
if (clusterName != null) {
if (config.containsKey("s4.cluster.name")) {
logger.warn(
"cluster [{}] passed as a parameter will not be used because an existing cluster.name parameter of value [{}] was found in the configuration file and will be used",
clusterName, config.getProperty("s4.cluster.name"));
} else {
Names.bindProperties(binder, new HashMap<String, String>() {
{
put("s4.cluster.name", clusterName);
}
});
}
}
} catch (ConfigurationException e) {
binder.addError(e);
e.printStackTrace();
}
}
}