blob: 720242466e4792f84f5f8abc30d7aebdad687abd [file] [log] [blame]
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.indexer;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.db.DbConnector;
import com.metamx.druid.indexer.updater.DbUpdaterJobSpec;
import com.metamx.druid.jackson.DefaultObjectMapper;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.codehaus.jackson.map.ObjectMapper;
import org.joda.time.DateTime;
import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.tweak.HandleCallback;
import java.io.IOException;
import java.util.List;
/**
*/
public class DbUpdaterJob implements Jobby
{
private static final Logger log = new Logger(DbUpdaterJob.class);
private static final ObjectMapper jsonMapper = new DefaultObjectMapper();
private final HadoopDruidIndexerConfig config;
private final DbUpdaterJobSpec spec;
private final DBI dbi;
// Keep track of published segment identifiers, in case a client is interested.
private volatile ImmutableList<DataSegment> publishedSegments = null;
public DbUpdaterJob(
HadoopDruidIndexerConfig config
)
{
this.config = config;
this.spec = (DbUpdaterJobSpec) config.getUpdaterJobSpec();
this.dbi = new DbConnector(spec).getDBI();
}
@Override
public boolean run()
{
final Configuration conf = new Configuration();
ImmutableList.Builder<DataSegment> publishedSegmentsBuilder = ImmutableList.builder();
for (String propName : System.getProperties().stringPropertyNames()) {
if (propName.startsWith("hadoop.")) {
conf.set(propName.substring("hadoop.".length()), System.getProperty(propName));
}
}
final Path descriptorInfoDir = config.makeDescriptorInfoDir();
try {
FileSystem fs = descriptorInfoDir.getFileSystem(conf);
for (FileStatus status : fs.listStatus(descriptorInfoDir)) {
final DataSegment segment = jsonMapper.readValue(fs.open(status.getPath()), DataSegment.class);
dbi.withHandle(
new HandleCallback<Void>()
{
@Override
public Void withHandle(Handle handle) throws Exception
{
handle.createStatement(String.format(
"INSERT INTO %s (id, dataSource, created_date, start, end, partitioned, version, used, payload) "
+ "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)",
spec.getSegmentTable()
))
.bind("id", segment.getIdentifier())
.bind("dataSource", segment.getDataSource())
.bind("created_date", new DateTime().toString())
.bind("start", segment.getInterval().getStart().toString())
.bind("end", segment.getInterval().getEnd().toString())
.bind("partitioned", segment.getShardSpec().getPartitionNum())
.bind("version", segment.getVersion())
.bind("used", true)
.bind("payload", jsonMapper.writeValueAsString(segment))
.execute();
return null;
}
}
);
publishedSegmentsBuilder.add(segment);
log.info("Published %s", segment.getIdentifier());
}
}
catch (IOException e) {
throw Throwables.propagate(e);
}
publishedSegments = publishedSegmentsBuilder.build();
return true;
}
/**
* Returns a list of segment identifiers published by the most recent call to run().
* Throws an IllegalStateException if run() has never been called.
*/
public List<DataSegment> getPublishedSegments()
{
if (publishedSegments == null) {
log.error("getPublishedSegments called before run!");
throw new IllegalStateException("DbUpdaterJob has not run yet");
} else {
return publishedSegments;
}
}
}