blob: b0ed898d4e6f9b044a33a44e6764930f21b76721 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.jena.hadoop.rdf.stats.jobs;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.chain.ChainMapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.StringUtils;
import org.apache.jena.hadoop.rdf.io.input.QuadsInputFormat;
import org.apache.jena.hadoop.rdf.io.input.TriplesInputFormat;
import org.apache.jena.hadoop.rdf.io.input.TriplesOrQuadsInputFormat;
import org.apache.jena.hadoop.rdf.io.input.nquads.NQuadsInputFormat;
import org.apache.jena.hadoop.rdf.io.input.ntriples.NTriplesInputFormat;
import org.apache.jena.hadoop.rdf.io.output.nquads.NQuadsOutputFormat;
import org.apache.jena.hadoop.rdf.io.output.ntriples.NTriplesNodeOutputFormat;
import org.apache.jena.hadoop.rdf.io.output.ntriples.NTriplesOutputFormat;
import org.apache.jena.hadoop.rdf.mapreduce.KeyMapper;
import org.apache.jena.hadoop.rdf.mapreduce.RdfMapReduceConstants;
import org.apache.jena.hadoop.rdf.mapreduce.TextCountReducer;
import org.apache.jena.hadoop.rdf.mapreduce.characteristics.CharacteristicSetReducer;
import org.apache.jena.hadoop.rdf.mapreduce.characteristics.QuadCharacteristicSetGeneratingReducer;
import org.apache.jena.hadoop.rdf.mapreduce.characteristics.TripleCharacteristicSetGeneratingReducer;
import org.apache.jena.hadoop.rdf.mapreduce.count.NodeCountReducer;
import org.apache.jena.hadoop.rdf.mapreduce.count.QuadNodeCountMapper;
import org.apache.jena.hadoop.rdf.mapreduce.count.TripleNodeCountMapper;
import org.apache.jena.hadoop.rdf.mapreduce.count.datatypes.QuadDataTypeCountMapper;
import org.apache.jena.hadoop.rdf.mapreduce.count.datatypes.TripleDataTypeCountMapper;
import org.apache.jena.hadoop.rdf.mapreduce.count.namespaces.QuadNamespaceCountMapper;
import org.apache.jena.hadoop.rdf.mapreduce.count.namespaces.TripleNamespaceCountMapper;
import org.apache.jena.hadoop.rdf.mapreduce.count.positional.QuadGraphCountMapper;
import org.apache.jena.hadoop.rdf.mapreduce.count.positional.QuadObjectCountMapper;
import org.apache.jena.hadoop.rdf.mapreduce.count.positional.TripleObjectCountMapper;
import org.apache.jena.hadoop.rdf.mapreduce.filter.positional.QuadFilterByPredicateMapper;
import org.apache.jena.hadoop.rdf.mapreduce.filter.positional.TripleFilterByPredicateUriMapper;
import org.apache.jena.hadoop.rdf.mapreduce.group.QuadGroupBySubjectMapper;
import org.apache.jena.hadoop.rdf.mapreduce.group.TripleGroupBySubjectMapper;
import org.apache.jena.hadoop.rdf.mapreduce.transform.TriplesToQuadsConstantGraphMapper;
import org.apache.jena.hadoop.rdf.types.CharacteristicSetWritable;
import org.apache.jena.hadoop.rdf.types.NodeWritable;
import org.apache.jena.hadoop.rdf.types.QuadWritable;
import org.apache.jena.hadoop.rdf.types.TripleWritable;
import org.apache.jena.vocabulary.RDF ;
/**
* Factory that can produce {@link Job} instances for computing various RDF
* statistics
*
*
*
*/
public class JobFactory {
/**
* Private constructor prevents instantiation
*/
private JobFactory() {
}
/**
* Gets a job for computing node counts on RDF triple inputs
*
* @param config
* Configuration
* @param inputPaths
* Input paths
* @param outputPath
* Output path
* @return Job
* @throws IOException
*/
public static Job getTripleNodeCountJob(Configuration config, String[] inputPaths, String outputPath) throws IOException {
Job job = Job.getInstance(config);
job.setJarByClass(JobFactory.class);
job.setJobName("RDF Triples Node Usage Count");
// Map/Reduce classes
job.setMapperClass(TripleNodeCountMapper.class);
job.setMapOutputKeyClass(NodeWritable.class);
job.setMapOutputValueClass(LongWritable.class);
job.setReducerClass(NodeCountReducer.class);
// Input and Output
job.setInputFormatClass(TriplesInputFormat.class);
job.setOutputFormatClass(NTriplesNodeOutputFormat.class);
FileInputFormat.setInputPaths(job, StringUtils.arrayToString(inputPaths));
FileOutputFormat.setOutputPath(job, new Path(outputPath));
return job;
}
/**
* Gets a job for computing node counts on RDF quad inputs
*
* @param config
* Configuration
* @param inputPaths
* Input paths
* @param outputPath
* Output path
* @return Job
* @throws IOException
*/
public static Job getQuadNodeCountJob(Configuration config, String[] inputPaths, String outputPath) throws IOException {
Job job = Job.getInstance(config);
job.setJarByClass(JobFactory.class);
job.setJobName("RDF Quads Node Usage Count");
// Map/Reduce classes
job.setMapperClass(QuadNodeCountMapper.class);
job.setMapOutputKeyClass(NodeWritable.class);
job.setMapOutputValueClass(LongWritable.class);
job.setReducerClass(NodeCountReducer.class);
// Input and Output
job.setInputFormatClass(QuadsInputFormat.class);
job.setOutputFormatClass(NTriplesNodeOutputFormat.class);
FileInputFormat.setInputPaths(job, StringUtils.arrayToString(inputPaths));
FileOutputFormat.setOutputPath(job, new Path(outputPath));
return job;
}
/**
* Gets a job for computing node counts on RDF triple and/or quad inputs
*
* @param config
* Configuration
* @param inputPaths
* Input paths
* @param outputPath
* Output path
* @return Job
* @throws IOException
*/
public static Job getNodeCountJob(Configuration config, String[] inputPaths, String outputPath) throws IOException {
Job job = Job.getInstance(config);
job.setJarByClass(JobFactory.class);
job.setJobName("RDF Node Usage Count");
// Map/Reduce classes
job.setMapperClass(QuadNodeCountMapper.class);
job.setMapOutputKeyClass(NodeWritable.class);
job.setMapOutputValueClass(LongWritable.class);
job.setReducerClass(NodeCountReducer.class);
// Input and Output
job.setInputFormatClass(TriplesOrQuadsInputFormat.class);
job.setOutputFormatClass(NTriplesNodeOutputFormat.class);
FileInputFormat.setInputPaths(job, StringUtils.arrayToString(inputPaths));
FileOutputFormat.setOutputPath(job, new Path(outputPath));
return job;
}
public static Job getTripleGraphSizesJob(Configuration config, String[] inputPaths, String outputPath) throws IOException {
Job job = Job.getInstance(config);
job.setJarByClass(JobFactory.class);
job.setJobName("RDF Triples Graph Sizes");
// Map/Reduce classes
ChainMapper.addMapper(job, TriplesToQuadsConstantGraphMapper.class, LongWritable.class, TripleWritable.class, LongWritable.class, QuadWritable.class, config);
ChainMapper.addMapper(job, QuadGraphCountMapper.class, LongWritable.class, QuadWritable.class, NodeWritable.class, LongWritable.class, config);
job.setMapOutputKeyClass(NodeWritable.class);
job.setMapOutputValueClass(LongWritable.class);
job.setReducerClass(NodeCountReducer.class);
// Input and Output
job.setInputFormatClass(TriplesInputFormat.class);
job.setOutputFormatClass(NTriplesNodeOutputFormat.class);
FileInputFormat.setInputPaths(job, StringUtils.arrayToString(inputPaths));
FileOutputFormat.setOutputPath(job, new Path(outputPath));
return job;
}
public static Job getQuadGraphSizesJob(Configuration config, String[] inputPaths, String outputPath) throws IOException {
Job job = Job.getInstance(config);
job.setJarByClass(JobFactory.class);
job.setJobName("RDF Quads Graph Sizes");
// Map/Reduce classes
job.setMapperClass(QuadGraphCountMapper.class);
job.setMapOutputKeyClass(NodeWritable.class);
job.setMapOutputValueClass(LongWritable.class);
job.setReducerClass(NodeCountReducer.class);
// Input and Output
job.setInputFormatClass(QuadsInputFormat.class);
job.setOutputFormatClass(NTriplesNodeOutputFormat.class);
FileInputFormat.setInputPaths(job, StringUtils.arrayToString(inputPaths));
FileOutputFormat.setOutputPath(job, new Path(outputPath));
return job;
}
public static Job getGraphSizesJob(Configuration config, String[] inputPaths, String outputPath) throws IOException {
Job job = Job.getInstance(config);
job.setJarByClass(JobFactory.class);
job.setJobName("RDF Graph Sizes");
// Map/Reduce classes
job.setMapperClass(QuadGraphCountMapper.class);
job.setMapOutputKeyClass(NodeWritable.class);
job.setMapOutputValueClass(LongWritable.class);
job.setReducerClass(NodeCountReducer.class);
// Input and Output
job.setInputFormatClass(TriplesOrQuadsInputFormat.class);
job.setOutputFormatClass(NTriplesNodeOutputFormat.class);
FileInputFormat.setInputPaths(job, StringUtils.arrayToString(inputPaths));
FileOutputFormat.setOutputPath(job, new Path(outputPath));
return job;
}
/**
* Gets a sequence of jobs that can be used to compute characteristic sets
* for RDF triples
*
* @param config
* Configuration
* @param inputPaths
* Input paths
* @param intermediateOutputPath
* Intermediate output path
* @param outputPath
* Final output path
* @return Sequence of jobs
* @throws IOException
*/
public static Job[] getTripleCharacteristicSetJobs(Configuration config, String[] inputPaths, String intermediateOutputPath,
String outputPath) throws IOException {
Job[] jobs = new Job[2];
Job job = Job.getInstance(config);
job.setJarByClass(JobFactory.class);
job.setJobName("RDF Triples Characteristic Set (Generation)");
// Map/Reduce classes
job.setMapperClass(TripleGroupBySubjectMapper.class);
job.setMapOutputKeyClass(NodeWritable.class);
job.setMapOutputValueClass(TripleWritable.class);
job.setReducerClass(TripleCharacteristicSetGeneratingReducer.class);
job.setOutputKeyClass(CharacteristicSetWritable.class);
job.setOutputValueClass(NullWritable.class);
// Input and Output
job.setInputFormatClass(TriplesInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
FileInputFormat.setInputPaths(job, StringUtils.arrayToString(inputPaths));
FileOutputFormat.setOutputPath(job, new Path(intermediateOutputPath));
SequenceFileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class);
SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK);
jobs[0] = job;
job = Job.getInstance(config);
job.setJarByClass(JobFactory.class);
job.setJobName("RDF Triples Characteristic Set (Reduction)");
// Map/Reduce classes
job.setMapperClass(KeyMapper.class);
job.setMapOutputKeyClass(CharacteristicSetWritable.class);
job.setMapOutputValueClass(CharacteristicSetWritable.class);
job.setReducerClass(CharacteristicSetReducer.class);
job.setOutputKeyClass(CharacteristicSetWritable.class);
job.setOutputValueClass(CharacteristicSetWritable.class);
// Input and Output
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.setInputPaths(job, intermediateOutputPath);
FileOutputFormat.setOutputPath(job, new Path(outputPath));
jobs[1] = job;
return jobs;
}
/**
* Gets a sequence of jobs that can be used to compute characteristic sets
* for RDF quads
*
* @param config
* Configuration
* @param inputPaths
* Input paths
* @param intermediateOutputPath
* Intermediate output path
* @param outputPath
* Final output path
* @return Sequence of jobs
* @throws IOException
*/
public static Job[] getQuadCharacteristicSetJobs(Configuration config, String[] inputPaths, String intermediateOutputPath,
String outputPath) throws IOException {
Job[] jobs = new Job[2];
Job job = Job.getInstance(config);
job.setJarByClass(JobFactory.class);
job.setJobName("RDF Quads Characteristic Set (Generation)");
// Map/Reduce classes
job.setMapperClass(QuadGroupBySubjectMapper.class);
job.setMapOutputKeyClass(NodeWritable.class);
job.setMapOutputValueClass(QuadWritable.class);
job.setReducerClass(QuadCharacteristicSetGeneratingReducer.class);
job.setOutputKeyClass(CharacteristicSetWritable.class);
job.setOutputValueClass(NullWritable.class);
// Input and Output
job.setInputFormatClass(QuadsInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
FileInputFormat.setInputPaths(job, StringUtils.arrayToString(inputPaths));
FileOutputFormat.setOutputPath(job, new Path(intermediateOutputPath));
SequenceFileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class);
SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK);
jobs[0] = job;
job = Job.getInstance(config);
job.setJarByClass(JobFactory.class);
job.setJobName("RDF Quads Characteristic Set (Reduction)");
// Map/Reduce classes
job.setMapperClass(KeyMapper.class);
job.setMapOutputKeyClass(CharacteristicSetWritable.class);
job.setMapOutputValueClass(CharacteristicSetWritable.class);
job.setReducerClass(CharacteristicSetReducer.class);
job.setOutputKeyClass(CharacteristicSetWritable.class);
job.setOutputValueClass(CharacteristicSetWritable.class);
// Input and Output
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.setInputPaths(job, intermediateOutputPath);
FileOutputFormat.setOutputPath(job, new Path(outputPath));
jobs[1] = job;
return jobs;
}
/**
* Gets a sequence of jobs that can be used to compute characteristic sets
* for RDF triple and/or quad inputs
*
* @param config
* Configuration
* @param inputPaths
* Input paths
* @param intermediateOutputPath
* Intermediate output path
* @param outputPath
* Final output path
* @return Sequence of jobs
* @throws IOException
*/
public static Job[] getCharacteristicSetJobs(Configuration config, String[] inputPaths, String intermediateOutputPath,
String outputPath) throws IOException {
Job[] jobs = new Job[2];
Job job = Job.getInstance(config);
job.setJarByClass(JobFactory.class);
job.setJobName("RDF Characteristic Set (Generation)");
// Map/Reduce classes
job.setMapperClass(QuadGroupBySubjectMapper.class);
job.setMapOutputKeyClass(NodeWritable.class);
job.setMapOutputValueClass(QuadWritable.class);
job.setReducerClass(QuadCharacteristicSetGeneratingReducer.class);
job.setOutputKeyClass(CharacteristicSetWritable.class);
job.setOutputValueClass(NullWritable.class);
// Input and Output
job.setInputFormatClass(TriplesOrQuadsInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
FileInputFormat.setInputPaths(job, StringUtils.arrayToString(inputPaths));
FileOutputFormat.setOutputPath(job, new Path(intermediateOutputPath));
SequenceFileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class);
SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK);
jobs[0] = job;
job = Job.getInstance(config);
job.setJarByClass(JobFactory.class);
job.setJobName("RDF Characteristic Set (Reduction)");
// Map/Reduce classes
job.setMapperClass(KeyMapper.class);
job.setMapOutputKeyClass(CharacteristicSetWritable.class);
job.setMapOutputValueClass(CharacteristicSetWritable.class);
job.setReducerClass(CharacteristicSetReducer.class);
job.setOutputKeyClass(CharacteristicSetWritable.class);
job.setOutputValueClass(CharacteristicSetWritable.class);
// Input and Output
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.setInputPaths(job, intermediateOutputPath);
FileOutputFormat.setOutputPath(job, new Path(outputPath));
jobs[1] = job;
return jobs;
}
/**
* Gets a job for computing type counts on RDF triple inputs
*
* @param config
* Configuration
* @param inputPaths
* Input paths
* @param intermediateOutputPath
* Path for intermediate output which will be all the type
* declaration triples present in the inputs
* @param outputPath
* Output path
* @return Job
* @throws IOException
*/
public static Job[] getTripleTypeCountJobs(Configuration config, String[] inputPaths, String intermediateOutputPath,
String outputPath) throws IOException {
Job[] jobs = new Job[2];
Job job = Job.getInstance(config);
job.setJarByClass(JobFactory.class);
job.setJobName("RDF Type Triples Extraction");
// Map/Reduce classes
job.getConfiguration().setStrings(RdfMapReduceConstants.FILTER_PREDICATE_URIS, RDF.type.getURI());
job.setMapperClass(TripleFilterByPredicateUriMapper.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(TripleWritable.class);
// Input and Output Format
job.setInputFormatClass(TriplesInputFormat.class);
job.setOutputFormatClass(NTriplesOutputFormat.class);
FileInputFormat.setInputPaths(job, StringUtils.arrayToString(inputPaths));
FileOutputFormat.setOutputPath(job, new Path(intermediateOutputPath));
jobs[0] = job;
// Object Node Usage count job
job = Job.getInstance(config);
job.setJarByClass(JobFactory.class);
job.setJobName("RDF Triples Type Usage Count");
// Map/Reduce classes
job.setMapperClass(TripleObjectCountMapper.class);
job.setMapOutputKeyClass(NodeWritable.class);
job.setMapOutputValueClass(LongWritable.class);
job.setReducerClass(NodeCountReducer.class);
// Input and Output
job.setInputFormatClass(NTriplesInputFormat.class);
NLineInputFormat.setNumLinesPerSplit(job, 10000); // TODO Would be
// better if this was
// intelligently
// configured
job.setOutputFormatClass(NTriplesNodeOutputFormat.class);
FileInputFormat.setInputPaths(job, intermediateOutputPath);
FileOutputFormat.setOutputPath(job, new Path(outputPath));
jobs[1] = job;
return jobs;
}
/**
* Gets a job for computing type counts on RDF quad inputs
*
* @param config
* Configuration
* @param inputPaths
* Input paths
* @param intermediateOutputPath
* Path for intermediate output which will be all the type
* declaration quads present in the inputs
* @param outputPath
* Output path
* @return Job
* @throws IOException
*/
public static Job[] getQuadTypeCountJobs(Configuration config, String[] inputPaths, String intermediateOutputPath,
String outputPath) throws IOException {
Job[] jobs = new Job[2];
Job job = Job.getInstance(config);
job.setJarByClass(JobFactory.class);
job.setJobName("RDF Type Quads Extraction");
// Map/Reduce classes
job.getConfiguration().setStrings(RdfMapReduceConstants.FILTER_PREDICATE_URIS, RDF.type.getURI());
job.setMapperClass(QuadFilterByPredicateMapper.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(QuadWritable.class);
// Input and Output Format
job.setInputFormatClass(QuadsInputFormat.class);
job.setOutputFormatClass(NQuadsOutputFormat.class);
FileInputFormat.setInputPaths(job, StringUtils.arrayToString(inputPaths));
FileOutputFormat.setOutputPath(job, new Path(intermediateOutputPath));
jobs[0] = job;
// Object Node Usage count job
job = Job.getInstance(config);
job.setJarByClass(JobFactory.class);
job.setJobName("RDF Quads Type Usage Count");
// Map/Reduce classes
job.setMapperClass(QuadObjectCountMapper.class);
job.setMapOutputKeyClass(NodeWritable.class);
job.setMapOutputValueClass(LongWritable.class);
job.setReducerClass(NodeCountReducer.class);
// Input and Output
job.setInputFormatClass(NQuadsInputFormat.class);
NLineInputFormat.setNumLinesPerSplit(job, 10000); // TODO Would be
// better if this was
// intelligently
// configured
job.setOutputFormatClass(NTriplesNodeOutputFormat.class);
FileInputFormat.setInputPaths(job, intermediateOutputPath);
FileOutputFormat.setOutputPath(job, new Path(outputPath));
jobs[1] = job;
return jobs;
}
/**
* Gets a job for computing type counts on RDF triple and/or quad inputs
*
* @param config
* Configuration
* @param inputPaths
* Input paths
* @param intermediateOutputPath
* Path for intermediate output which will be all the type
* declaration quads present in the inputs
* @param outputPath
* Output path
* @return Job
* @throws IOException
*/
public static Job[] getTypeCountJobs(Configuration config, String[] inputPaths, String intermediateOutputPath,
String outputPath) throws IOException {
Job[] jobs = new Job[2];
Job job = Job.getInstance(config);
job.setJarByClass(JobFactory.class);
job.setJobName("RDF Type Extraction");
// Map/Reduce classes
job.getConfiguration().setStrings(RdfMapReduceConstants.FILTER_PREDICATE_URIS, RDF.type.getURI());
job.setMapperClass(QuadFilterByPredicateMapper.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(QuadWritable.class);
// Input and Output Format
job.setInputFormatClass(TriplesOrQuadsInputFormat.class);
job.setOutputFormatClass(NQuadsOutputFormat.class);
FileInputFormat.setInputPaths(job, StringUtils.arrayToString(inputPaths));
FileOutputFormat.setOutputPath(job, new Path(intermediateOutputPath));
jobs[0] = job;
// Object Node Usage count job
job = Job.getInstance(config);
job.setJarByClass(JobFactory.class);
job.setJobName("RDF Type Usage Count");
// Map/Reduce classes
job.setMapperClass(QuadObjectCountMapper.class);
job.setMapOutputKeyClass(NodeWritable.class);
job.setMapOutputValueClass(LongWritable.class);
job.setReducerClass(NodeCountReducer.class);
// Input and Output
job.setInputFormatClass(NQuadsInputFormat.class);
NLineInputFormat.setNumLinesPerSplit(job, 10000); // TODO Would be
// better if this was
// intelligently
// configured
job.setOutputFormatClass(NTriplesNodeOutputFormat.class);
FileInputFormat.setInputPaths(job, intermediateOutputPath);
FileOutputFormat.setOutputPath(job, new Path(outputPath));
jobs[1] = job;
return jobs;
}
/**
* Gets a job for computing literal data type counts on RDF triple inputs
*
* @param config
* Configuration
* @param inputPaths
* Input paths
* @param outputPath
* Output path
* @return Job
* @throws IOException
*/
public static Job getTripleDataTypeCountJob(Configuration config, String[] inputPaths, String outputPath) throws IOException {
Job job = Job.getInstance(config);
job.setJarByClass(JobFactory.class);
job.setJobName("RDF Triples Literal Data Type Usage Count");
// Map/Reduce classes
job.setMapperClass(TripleDataTypeCountMapper.class);
job.setMapOutputKeyClass(NodeWritable.class);
job.setMapOutputValueClass(LongWritable.class);
job.setReducerClass(NodeCountReducer.class);
// Input and Output
job.setInputFormatClass(TriplesInputFormat.class);
job.setOutputFormatClass(NTriplesNodeOutputFormat.class);
FileInputFormat.setInputPaths(job, StringUtils.arrayToString(inputPaths));
FileOutputFormat.setOutputPath(job, new Path(outputPath));
return job;
}
/**
* Gets a job for computing literal data type counts on RDF quad inputs
*
* @param config
* Configuration
* @param inputPaths
* Input paths
* @param outputPath
* Output path
* @return Job
* @throws IOException
*/
public static Job getQuadDataTypeCountJob(Configuration config, String[] inputPaths, String outputPath) throws IOException {
Job job = Job.getInstance(config);
job.setJarByClass(JobFactory.class);
job.setJobName("RDF Quads Literal Data Type Usage Count");
// Map/Reduce classes
job.setMapperClass(QuadDataTypeCountMapper.class);
job.setMapOutputKeyClass(NodeWritable.class);
job.setMapOutputValueClass(LongWritable.class);
job.setReducerClass(NodeCountReducer.class);
// Input and Output
job.setInputFormatClass(QuadsInputFormat.class);
job.setOutputFormatClass(NTriplesNodeOutputFormat.class);
FileInputFormat.setInputPaths(job, StringUtils.arrayToString(inputPaths));
FileOutputFormat.setOutputPath(job, new Path(outputPath));
return job;
}
/**
* Gets a job for computing literal data type counts on RDF triple and/or
* quad inputs
*
* @param config
* Configuration
* @param inputPaths
* Input paths
* @param outputPath
* Output path
* @return Job
* @throws IOException
*/
public static Job getDataTypeCountJob(Configuration config, String[] inputPaths, String outputPath) throws IOException {
Job job = Job.getInstance(config);
job.setJarByClass(JobFactory.class);
job.setJobName("RDF Literal Data Type Usage Count");
// Map/Reduce classes
job.setMapperClass(QuadDataTypeCountMapper.class);
job.setMapOutputKeyClass(NodeWritable.class);
job.setMapOutputValueClass(LongWritable.class);
job.setReducerClass(NodeCountReducer.class);
// Input and Output
job.setInputFormatClass(TriplesOrQuadsInputFormat.class);
job.setOutputFormatClass(NTriplesNodeOutputFormat.class);
FileInputFormat.setInputPaths(job, StringUtils.arrayToString(inputPaths));
FileOutputFormat.setOutputPath(job, new Path(outputPath));
return job;
}
/**
* Gets a job for computing literal data type counts on RDF triple inputs
*
* @param config
* Configuration
* @param inputPaths
* Input paths
* @param outputPath
* Output path
* @return Job
* @throws IOException
*/
public static Job getTripleNamespaceCountJob(Configuration config, String[] inputPaths, String outputPath) throws IOException {
Job job = Job.getInstance(config);
job.setJarByClass(JobFactory.class);
job.setJobName("RDF Triples Namespace Usage Count");
// Map/Reduce classes
job.setMapperClass(TripleNamespaceCountMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
job.setReducerClass(TextCountReducer.class);
// Input and Output
job.setInputFormatClass(TriplesInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.setInputPaths(job, StringUtils.arrayToString(inputPaths));
FileOutputFormat.setOutputPath(job, new Path(outputPath));
return job;
}
/**
* Gets a job for computing literal data type counts on RDF quad inputs
*
* @param config
* Configuration
* @param inputPaths
* Input paths
* @param outputPath
* Output path
* @return Job
* @throws IOException
*/
public static Job getQuadNamespaceCountJob(Configuration config, String[] inputPaths, String outputPath) throws IOException {
Job job = Job.getInstance(config);
job.setJarByClass(JobFactory.class);
job.setJobName("RDF Quads Namespace Usage Count");
// Map/Reduce classes
job.setMapperClass(QuadNamespaceCountMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
job.setReducerClass(TextCountReducer.class);
// Input and Output
job.setInputFormatClass(QuadsInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.setInputPaths(job, StringUtils.arrayToString(inputPaths));
FileOutputFormat.setOutputPath(job, new Path(outputPath));
return job;
}
/**
* Gets a job for computing literal data type counts on RDF triple and/or
* quad inputs
*
* @param config
* Configuration
* @param inputPaths
* Input paths
* @param outputPath
* Output path
* @return Job
* @throws IOException
*/
public static Job getNamespaceCountJob(Configuration config, String[] inputPaths, String outputPath) throws IOException {
Job job = Job.getInstance(config);
job.setJarByClass(JobFactory.class);
job.setJobName("RDF Namespace Usage Count");
// Map/Reduce classes
job.setMapperClass(QuadNamespaceCountMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
job.setReducerClass(TextCountReducer.class);
// Input and Output
job.setInputFormatClass(TriplesOrQuadsInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.setInputPaths(job, StringUtils.arrayToString(inputPaths));
FileOutputFormat.setOutputPath(job, new Path(outputPath));
return job;
}
}