blob: 9cd37943badcc09f0449c66fec488a72b85690ba [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.accumulo.core.client.mapreduce;
import java.io.IOException;
import org.apache.accumulo.core.client.mapreduce.lib.impl.ConfiguratorBase;
import org.apache.accumulo.core.client.mapreduce.lib.impl.FileOutputConfigurator;
import org.apache.accumulo.core.client.rfile.RFile;
import org.apache.accumulo.core.client.rfile.RFileWriter;
import org.apache.accumulo.core.client.sample.SamplerConfiguration;
import org.apache.accumulo.core.client.summary.Summarizer;
import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.log4j.Logger;
/**
* This class allows MapReduce jobs to write output in the Accumulo data file format.<br>
* Care should be taken to write only sorted data (sorted by {@link Key}), as this is an important
* requirement of Accumulo data files.
*
* <p>
* The output path to be created must be specified via
* {@link AccumuloFileOutputFormat#setOutputPath(Job, Path)}. This is inherited from
* {@link FileOutputFormat#setOutputPath(Job, Path)}. Other methods from {@link FileOutputFormat}
* are not supported and may be ignored or cause failures. Using other Hadoop configuration options
* that affect the behavior of the underlying files directly in the Job's configuration may work,
* but are not directly supported at this time.
*/
public class AccumuloFileOutputFormat extends FileOutputFormat<Key,Value> {
private static final Class<?> CLASS = AccumuloFileOutputFormat.class;
protected static final Logger log = Logger.getLogger(CLASS);
/**
* Sets the compression type to use for data blocks. Specifying a compression may require
* additional libraries to be available to your Job.
*
* @param job
* the Hadoop job instance to be configured
* @param compressionType
* one of "none", "gz", "lzo", or "snappy"
* @since 1.5.0
*/
public static void setCompressionType(Job job, String compressionType) {
FileOutputConfigurator.setCompressionType(CLASS, job.getConfiguration(), compressionType);
}
/**
* Sets the size for data blocks within each file.<br>
* Data blocks are a span of key/value pairs stored in the file that are compressed and indexed as
* a group.
*
* <p>
* Making this value smaller may increase seek performance, but at the cost of increasing the size
* of the indexes (which can also affect seek performance).
*
* @param job
* the Hadoop job instance to be configured
* @param dataBlockSize
* the block size, in bytes
* @since 1.5.0
*/
public static void setDataBlockSize(Job job, long dataBlockSize) {
FileOutputConfigurator.setDataBlockSize(CLASS, job.getConfiguration(), dataBlockSize);
}
/**
* Sets the size for file blocks in the file system; file blocks are managed, and replicated, by
* the underlying file system.
*
* @param job
* the Hadoop job instance to be configured
* @param fileBlockSize
* the block size, in bytes
* @since 1.5.0
*/
public static void setFileBlockSize(Job job, long fileBlockSize) {
FileOutputConfigurator.setFileBlockSize(CLASS, job.getConfiguration(), fileBlockSize);
}
/**
* Sets the size for index blocks within each file; smaller blocks means a deeper index hierarchy
* within the file, while larger blocks mean a more shallow index hierarchy within the file. This
* can affect the performance of queries.
*
* @param job
* the Hadoop job instance to be configured
* @param indexBlockSize
* the block size, in bytes
* @since 1.5.0
*/
public static void setIndexBlockSize(Job job, long indexBlockSize) {
FileOutputConfigurator.setIndexBlockSize(CLASS, job.getConfiguration(), indexBlockSize);
}
/**
* Sets the file system replication factor for the resulting file, overriding the file system
* default.
*
* @param job
* the Hadoop job instance to be configured
* @param replication
* the number of replicas for produced files
* @since 1.5.0
*/
public static void setReplication(Job job, int replication) {
FileOutputConfigurator.setReplication(CLASS, job.getConfiguration(), replication);
}
/**
* Specify a sampler to be used when writing out data. This will result in the output file having
* sample data.
*
* @param job
* The Hadoop job instance to be configured
* @param samplerConfig
* The configuration for creating sample data in the output file.
* @since 1.8.0
*/
public static void setSampler(Job job, SamplerConfiguration samplerConfig) {
FileOutputConfigurator.setSampler(CLASS, job.getConfiguration(), samplerConfig);
}
/**
* Specifies a list of summarizer configurations to create summary data in the output file. Each
* Key Value written will be passed to the configured {@link Summarizer}'s.
*
* @param job
* The Hadoop job instance to be configured
* @param summarizerConfigs
* summarizer configurations
* @since 2.0.0
*/
public static void setSummarizers(Job job, SummarizerConfiguration... summarizerConfigs) {
FileOutputConfigurator.setSummarizers(CLASS, job.getConfiguration(), summarizerConfigs);
}
@Override
public RecordWriter<Key,Value> getRecordWriter(TaskAttemptContext context) throws IOException {
// get the path of the temporary output file
final Configuration conf = context.getConfiguration();
final AccumuloConfiguration acuConf = FileOutputConfigurator.getAccumuloConfiguration(CLASS,
context.getConfiguration());
final String extension = acuConf.get(Property.TABLE_FILE_TYPE);
final Path file = this.getDefaultWorkFile(context, "." + extension);
final int visCacheSize = ConfiguratorBase.getVisibilityCacheSize(conf);
return new RecordWriter<Key,Value>() {
RFileWriter out = null;
@Override
public void close(TaskAttemptContext context) throws IOException {
if (out != null)
out.close();
}
@Override
public void write(Key key, Value value) throws IOException {
if (out == null) {
out = RFile.newWriter().to(file.toString()).withFileSystem(file.getFileSystem(conf))
.withTableProperties(acuConf).withVisibilityCacheSize(visCacheSize).build();
out.startDefaultLocalityGroup();
}
out.append(key, value);
}
};
}
}