blob: a9955d843c8e34652c98fc5f8e8f161424ff4bbb [file] [log] [blame]
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.db;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.metamx.common.MapUtils;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
import com.metamx.druid.master.rules.PeriodLoadRule;
import com.metamx.druid.master.rules.Rule;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.type.TypeReference;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Period;
import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.FoldController;
import org.skife.jdbi.v2.Folder3;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.StatementContext;
import org.skife.jdbi.v2.tweak.HandleCallback;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference;
/**
*/
public class DatabaseRuleManager
{
public static void createDefaultRule(
final DBI dbi,
final String ruleTable,
final String defaultDatasource,
final ObjectMapper jsonMapper
)
{
try {
dbi.withHandle(
new HandleCallback<Void>()
{
@Override
public Void withHandle(Handle handle) throws Exception
{
List<Map<String, Object>> existing = handle.select(
String.format(
"SELECT id from %s where datasource=%s;",
ruleTable,
defaultDatasource
)
);
if (!existing.isEmpty()) {
return null;
}
final List<Rule> defaultRules = Arrays.<Rule>asList(
new PeriodLoadRule(
new Period("P5000Y"),
2,
"_default_tier"
)
);
final String version = new DateTime().toString();
handle.createStatement(
String.format(
"INSERT INTO %s (id, dataSource, version, payload) VALUES (:id, :dataSource, :version, :payload)",
ruleTable
)
)
.bind("id", String.format("%s_%s", defaultDatasource, version))
.bind("dataSource", defaultDatasource)
.bind("version", version)
.bind("payload", jsonMapper.writeValueAsString(defaultRules))
.execute();
return null;
}
}
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
private static final Logger log = new Logger(DatabaseRuleManager.class);
private final ObjectMapper jsonMapper;
private final ScheduledExecutorService exec;
private final DatabaseRuleManagerConfig config;
private final DBI dbi;
private final AtomicReference<ConcurrentHashMap<String, List<Rule>>> rules;
private final Object lock = new Object();
private volatile boolean started = false;
public DatabaseRuleManager(
ObjectMapper jsonMapper,
ScheduledExecutorService exec,
DatabaseRuleManagerConfig config,
DBI dbi
)
{
this.jsonMapper = jsonMapper;
this.exec = exec;
this.config = config;
this.dbi = dbi;
this.rules = new AtomicReference<ConcurrentHashMap<String, List<Rule>>>(
new ConcurrentHashMap<String, List<Rule>>()
);
}
@LifecycleStart
public void start()
{
synchronized (lock) {
if (started) {
return;
}
ScheduledExecutors.scheduleWithFixedDelay(
exec,
new Duration(0),
config.getRulesPollDuration(),
new Runnable()
{
@Override
public void run()
{
poll();
}
}
);
started = true;
}
}
@LifecycleStop
public void stop()
{
synchronized (lock) {
if (!started) {
return;
}
rules.set(new ConcurrentHashMap<String, List<Rule>>());
started = false;
}
}
public void poll()
{
try {
ConcurrentHashMap<String, List<Rule>> newRules = new ConcurrentHashMap<String, List<Rule>>(
dbi.withHandle(
new HandleCallback<Map<String, List<Rule>>>()
{
@Override
public Map<String, List<Rule>> withHandle(Handle handle) throws Exception
{
return handle.createQuery(
String.format("SELECT dataSource, payload FROM %s", config.getRuleTable())
).fold(
Maps.<String, List<Rule>>newHashMap(),
new Folder3<Map<String, List<Rule>>, Map<String, Object>>()
{
@Override
public Map<String, List<Rule>> fold(
Map<String, List<Rule>> retVal,
Map<String, Object> stringObjectMap,
FoldController foldController,
StatementContext statementContext
) throws SQLException
{
try {
String dataSource = MapUtils.getString(stringObjectMap, "dataSource");
List<Rule> rules = jsonMapper.readValue(
MapUtils.getString(stringObjectMap, "payload"), new TypeReference<List<Rule>>()
{
}
);
retVal.put(dataSource, rules);
return retVal;
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
}
);
}
}
)
);
log.info("Polled and found rules for %,d datasource(s)", newRules.size());
rules.set(newRules);
}
catch (Exception e) {
log.error(e, "Exception while polling for rules");
}
}
public Map<String, List<Rule>> getAllRules()
{
return rules.get();
}
public List<Rule> getRules(final String dataSource)
{
return rules.get().get(dataSource);
}
public List<Rule> getRulesWithDefault(final String dataSource)
{
List<Rule> retVal = Lists.newArrayList();
Map<String, List<Rule>> theRules = rules.get();
if (theRules.get(dataSource) != null) {
retVal.addAll(theRules.get(dataSource));
}
if (theRules.get(config.getDefaultDatasource()) != null) {
retVal.addAll(theRules.get(config.getDefaultDatasource()));
}
return retVal;
}
public boolean overrideRule(final String dataSource, final List<Rule> newRules)
{
synchronized (lock) {
try {
dbi.withHandle(
new HandleCallback<Void>()
{
@Override
public Void withHandle(Handle handle) throws Exception
{
final String version = new DateTime().toString();
handle.createStatement(
String.format(
"INSERT INTO %s (id, dataSource, version, payload) VALUES (:id, :dataSource, :version, :payload)",
config.getRuleTable()
)
)
.bind("id", String.format("%s_%s", dataSource, version))
.bind("dataSource", dataSource)
.bind("version", version)
.bind("payload", jsonMapper.writeValueAsString(newRules))
.execute();
return null;
}
}
);
ConcurrentHashMap<String, List<Rule>> existingRules = rules.get();
if (existingRules == null) {
existingRules = new ConcurrentHashMap<String, List<Rule>>();
}
existingRules.put(dataSource, newRules);
}
catch (Exception e) {
log.error(e, String.format("Exception while overriding rule for %s", dataSource));
return false;
}
}
return true;
}
}