blob: 11d52ff0d2da1e00f3eada884d09c63f164d037a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.external;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.util.UUID;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteFileSystem;
import org.apache.ignite.configuration.HadoopConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.igfs.IgfsOutputStream;
import org.apache.ignite.igfs.IgfsPath;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
import org.apache.ignite.internal.processors.hadoop.impl.HadoopAbstractSelfTest;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.marshaller.jdk.JdkMarshaller;
import org.junit.Ignore;
import org.junit.Test;
import static org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils.createJobInfo;
/**
* Job tracker self test.
*/
@Ignore("https://issues.apache.org/jira/browse/IGNITE-404")
public class HadoopExternalTaskExecutionSelfTest extends HadoopAbstractSelfTest {
/** {@inheritDoc} */
@Override protected boolean igfsEnabled() {
return true;
}
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
super.beforeTest();
startGrids(gridCount());
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
stopAllGrids();
}
/** {@inheritDoc} */
@Override public HadoopConfiguration hadoopConfiguration(String igniteInstanceName) {
HadoopConfiguration cfg = super.hadoopConfiguration(igniteInstanceName);
// TODO: IGNITE-404: Uncomment when fixed.
//cfg.setExternalExecution(true);
return cfg;
}
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
cfg.setMarshaller(new JdkMarshaller());
return cfg;
}
/**
* @throws Exception If failed.
*/
@Test
public void testSimpleTaskSubmit() throws Exception {
String testInputFile = "/test";
prepareTestFile(testInputFile);
Configuration cfg = new Configuration();
setupFileSystems(cfg);
Job job = Job.getInstance(cfg);
job.setMapperClass(TestMapper.class);
job.setCombinerClass(TestReducer.class);
job.setReducerClass(TestReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setNumReduceTasks(1);
FileInputFormat.setInputPaths(job, new Path("igfs://:" + getTestIgniteInstanceName(0) + "@/" + testInputFile));
FileOutputFormat.setOutputPath(job, new Path("igfs://:" + getTestIgniteInstanceName(0) + "@/output"));
job.setJarByClass(getClass());
IgniteInternalFuture<?> fut = grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 1),
createJobInfo(job.getConfiguration(), null));
fut.get();
}
/**
* @throws Exception If failed.
*/
@Test
public void testMapperException() throws Exception {
String testInputFile = "/test";
prepareTestFile(testInputFile);
Configuration cfg = new Configuration();
setupFileSystems(cfg);
Job job = Job.getInstance(cfg);
job.setMapperClass(TestFailingMapper.class);
job.setCombinerClass(TestReducer.class);
job.setReducerClass(TestReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setNumReduceTasks(1);
FileInputFormat.setInputPaths(job, new Path("igfs://:" + getTestIgniteInstanceName(0) + "@/" + testInputFile));
FileOutputFormat.setOutputPath(job, new Path("igfs://:" + getTestIgniteInstanceName(0) + "@/output"));
job.setJarByClass(getClass());
IgniteInternalFuture<?> fut = grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 1),
createJobInfo(job.getConfiguration(), null));
try {
fut.get();
}
catch (IgniteCheckedException e) {
IOException exp = X.cause(e, IOException.class);
assertNotNull(exp);
assertEquals("Test failure", exp.getMessage());
}
}
/**
* @param filePath File path to prepare.
* @throws Exception If failed.
*/
private void prepareTestFile(String filePath) throws Exception {
IgniteFileSystem igfs = grid(0).fileSystem(igfsName);
try (IgfsOutputStream out = igfs.create(new IgfsPath(filePath), true)) {
PrintWriter wr = new PrintWriter(new OutputStreamWriter(out));
for (int i = 0; i < 1000; i++)
wr.println("Hello, world: " + i);
wr.flush();
}
}
/**
*
*/
private static class TestMapper extends Mapper<Object, Text, Text, IntWritable> {
/** One constant. */
private IntWritable one = new IntWritable(1);
/** Line constant. */
private Text line = new Text("line");
@Override protected void map(Object key, Text val, Context ctx) throws IOException, InterruptedException {
ctx.write(line, one);
}
}
/**
* Failing mapper.
*/
private static class TestFailingMapper extends Mapper<Object, Text, Text, IntWritable> {
@Override protected void map(Object key, Text val, Context c) throws IOException, InterruptedException {
throw new IOException("Test failure");
}
}
/**
*
*/
private static class TestReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
/** Line constant. */
private Text line = new Text("line");
@Override protected void setup(Context ctx) throws IOException, InterruptedException {
super.setup(ctx);
}
/** {@inheritDoc} */
@Override protected void reduce(Text key, Iterable<IntWritable> values, Context ctx)
throws IOException, InterruptedException {
int s = 0;
for (IntWritable val : values)
s += val.get();
System.out.println(">>>> Reduced: " + s);
ctx.write(line, new IntWritable(s));
}
}
}