blob: 2bb36abb2a04f3bae0d836ceda660b4b796bd627 [file] [log] [blame]
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.storage.hdfs;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.ByteStreams;
import com.google.common.io.Closeables;
import com.google.common.io.OutputSupplier;
import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
import io.druid.segment.SegmentUtils;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.loading.DataSegmentPusherUtil;
import io.druid.timeline.DataSegment;
import io.druid.utils.CompressionUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
/**
*/
public class HdfsDataSegmentPusher implements DataSegmentPusher
{
private static final Logger log = new Logger(HdfsDataSegmentPusher.class);
private final HdfsDataSegmentPusherConfig config;
private final Configuration hadoopConfig;
private final ObjectMapper jsonMapper;
@Inject
public HdfsDataSegmentPusher(
HdfsDataSegmentPusherConfig config,
Configuration hadoopConfig,
ObjectMapper jsonMapper
)
{
this.config = config;
this.hadoopConfig = hadoopConfig;
this.jsonMapper = jsonMapper;
}
@Override
public String getPathForHadoop(String dataSource)
{
return new Path(config.getStorageDirectory(), dataSource).toUri().toString();
}
@Override
public DataSegment push(File inDir, DataSegment segment) throws IOException
{
final String storageDir = DataSegmentPusherUtil.getHdfsStorageDir(segment);
Path outFile = new Path(String.format("%s/%s/index.zip", config.getStorageDirectory(), storageDir));
FileSystem fs = outFile.getFileSystem(hadoopConfig);
fs.mkdirs(outFile.getParent());
log.info("Compressing files from[%s] to [%s]", inDir, outFile);
FSDataOutputStream out = null;
long size;
try {
out = fs.create(outFile);
size = CompressionUtils.zip(inDir, out);
out.close();
}
finally {
Closeables.closeQuietly(out);
}
return createDescriptorFile(
segment.withLoadSpec(makeLoadSpec(outFile))
.withSize(size)
.withBinaryVersion(SegmentUtils.getVersionFromDir(inDir)),
outFile.getParent(),
fs
);
}
private DataSegment createDescriptorFile(DataSegment segment, Path outDir, final FileSystem fs) throws IOException
{
final Path descriptorFile = new Path(outDir, "descriptor.json");
log.info("Creating descriptor file at[%s]", descriptorFile);
ByteStreams.copy(
ByteStreams.newInputStreamSupplier(jsonMapper.writeValueAsBytes(segment)),
new HdfsOutputStreamSupplier(fs, descriptorFile)
);
return segment;
}
private ImmutableMap<String, Object> makeLoadSpec(Path outFile)
{
return ImmutableMap.<String, Object>of("type", "hdfs", "path", outFile.toString());
}
private static class HdfsOutputStreamSupplier implements OutputSupplier<OutputStream>
{
private final FileSystem fs;
private final Path descriptorFile;
public HdfsOutputStreamSupplier(FileSystem fs, Path descriptorFile)
{
this.fs = fs;
this.descriptorFile = descriptorFile;
}
@Override
public OutputStream getOutput() throws IOException
{
return fs.create(descriptorFile);
}
}
}