blob: 18cd314e259cc21ed83da2b0c3c99bf742cf8cf0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.hadoop.impl;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileFilter;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import com.google.common.base.Joiner;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.Ignition;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.hadoop.fs.IgniteHadoopFileSystemCounterWriter;
import org.apache.ignite.igfs.IgfsInputStream;
import org.apache.ignite.igfs.IgfsPath;
import org.apache.ignite.internal.IgnitionEx;
import org.apache.ignite.internal.processors.hadoop.HadoopCommonUtils;
import org.apache.ignite.internal.processors.hadoop.HadoopJobEx;
import org.apache.ignite.internal.processors.hadoop.jobtracker.HadoopJobTracker;
import org.apache.ignite.internal.processors.igfs.IgfsEx;
import org.apache.ignite.internal.processors.resource.GridSpringResourceContext;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Ignore;
import org.junit.Test;
/**
* Test of integration with Hadoop client via command line interface.
*/
public class HadoopCommandLineTest extends GridCommonAbstractTest {
/** IGFS instance. */
private static IgfsEx igfs;
/** */
private static final String igfsName = "igfs";
/** */
private static File testWorkDir;
/** */
private static String hadoopHome;
/** */
private static String hiveHome;
/** */
private static File examplesJar;
/**
*
* @param path File name.
* @param wordCounts Words and counts.
* @throws Exception If failed.
*/
private void generateTestFile(File path, Object... wordCounts) throws Exception {
List<String> wordsArr = new ArrayList<>();
//Generating
for (int i = 0; i < wordCounts.length; i += 2) {
String word = (String) wordCounts[i];
int cnt = (Integer) wordCounts[i + 1];
while (cnt-- > 0)
wordsArr.add(word);
}
//Shuffling
for (int i = 0; i < wordsArr.size(); i++) {
int j = (int)(Math.random() * wordsArr.size());
Collections.swap(wordsArr, i, j);
}
//Writing file
try (PrintWriter writer = new PrintWriter(path)) {
int j = 0;
while (j < wordsArr.size()) {
int i = 5 + (int)(Math.random() * 5);
List<String> subList = wordsArr.subList(j, Math.min(j + i, wordsArr.size()));
j += i;
writer.println(Joiner.on(' ').join(subList));
}
writer.flush();
}
}
/**
* Generates two data files to join its with Hive.
*
* @throws FileNotFoundException If failed.
*/
private void generateHiveTestFiles() throws FileNotFoundException {
try (PrintWriter writerA = new PrintWriter(new File(testWorkDir, "data-a"));
PrintWriter writerB = new PrintWriter(new File(testWorkDir, "data-b"))) {
char sep = '\t';
int idB = 0;
int idA = 0;
int v = 1000;
for (int i = 0; i < 1000; i++) {
writerA.print(idA++);
writerA.print(sep);
writerA.println(idB);
writerB.print(idB++);
writerB.print(sep);
writerB.println(v += 2);
writerB.print(idB++);
writerB.print(sep);
writerB.println(v += 2);
}
writerA.flush();
writerB.flush();
}
}
/** */
private void beforeHadoopCommandLineTest() throws Exception {
hiveHome = IgniteSystemProperties.getString("HIVE_HOME");
assertFalse("HIVE_HOME hasn't been set.", F.isEmpty(hiveHome));
hadoopHome = IgniteSystemProperties.getString("HADOOP_HOME");
assertFalse("HADOOP_HOME hasn't been set.", F.isEmpty(hadoopHome));
String mapredHome = hadoopHome + "/share/hadoop/mapreduce";
File[] fileList = new File(mapredHome).listFiles(new FileFilter() {
@Override public boolean accept(File pathname) {
return pathname.getName().startsWith("hadoop-mapreduce-examples-") &&
pathname.getName().endsWith(".jar");
}
});
assertEquals("Invalid hadoop distribution.", 1, fileList.length);
examplesJar = fileList[0];
testWorkDir = Files.createTempDirectory("hadoop-cli-test").toFile();
U.copy(resolveHadoopConfig("core-site.ignite.xml"), new File(testWorkDir, "core-site.xml"), false);
File srcFile = resolveHadoopConfig("mapred-site.ignite.xml");
File dstFile = new File(testWorkDir, "mapred-site.xml");
try (BufferedReader in = new BufferedReader(new FileReader(srcFile));
PrintWriter out = new PrintWriter(dstFile)) {
String line;
while ((line = in.readLine()) != null) {
if (line.startsWith("</configuration>"))
out.println(
" <property>\n" +
" <name>" + HadoopCommonUtils.JOB_COUNTER_WRITER_PROPERTY + "</name>\n" +
" <value>" + IgniteHadoopFileSystemCounterWriter.class.getName() + "</value>\n" +
" </property>\n");
out.println(line);
}
out.flush();
}
generateTestFile(new File(testWorkDir, "test-data"), "red", 100, "green", 200, "blue", 150, "yellow", 50);
generateHiveTestFiles();
}
/**
* Resolve Hadoop configuration file.
*
* @param name File name.
* @return Resolve file.
*/
private static File resolveHadoopConfig(String name) {
File path = U.resolveIgnitePath("modules/hadoop/config/" + name);
return path != null ? path : U.resolveIgnitePath("config/hadoop/" + name);
}
/** */
private void afterHadoopCommandLineTest() {
U.delete(testWorkDir);
}
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
beforeHadoopCommandLineTest();
String cfgPath = "config/hadoop/default-config.xml";
IgniteBiTuple<IgniteConfiguration, GridSpringResourceContext> tup = IgnitionEx.loadConfiguration(cfgPath);
IgniteConfiguration cfg = tup.get1();
cfg.setLocalHost("127.0.0.1"); // Avoid connecting to other nodes.
igfs = (IgfsEx) Ignition.start(cfg).fileSystem(igfsName);
}
/** {@inheritDoc} */
@Override protected void afterTest() {
stopAllGrids(true);
afterHadoopCommandLineTest();
}
/**
* Creates the process build with appropriate environment to run Hadoop CLI.
*
* @return Process builder.
*/
private ProcessBuilder createProcessBuilder() {
String sep = ":";
String ggClsPath = HadoopJobEx.class.getProtectionDomain().getCodeSource().getLocation().getPath() + sep +
HadoopJobTracker.class.getProtectionDomain().getCodeSource().getLocation().getPath();
ProcessBuilder res = new ProcessBuilder();
res.environment().put("HADOOP_HOME", hadoopHome);
res.environment().put("HADOOP_CLASSPATH", ggClsPath);
res.environment().put("HADOOP_CONF_DIR", testWorkDir.getAbsolutePath());
res.environment().put("HADOOP_OPTS", filteredJvmArgs());
res.redirectErrorStream(true);
return res;
}
/**
* Creates list of JVM arguments to be used to start hadoop process.
*
* @return JVM arguments.
*/
private String filteredJvmArgs() {
StringBuilder filteredJvmArgs = new StringBuilder();
filteredJvmArgs.append("-ea");
for (String arg : U.jvmArgs()) {
if (arg.startsWith("--add-opens") || arg.startsWith("--add-exports") || arg.startsWith("--add-modules") ||
arg.startsWith("--patch-module") || arg.startsWith("--add-reads") ||
arg.startsWith("-XX:+IgnoreUnrecognizedVMOptions"))
filteredJvmArgs.append(' ').append(arg);
}
return filteredJvmArgs.toString();
}
/**
* Waits for process exit and prints the its output.
*
* @param proc Process.
* @return Exit code.
* @throws Exception If failed.
*/
private int watchProcess(Process proc) throws Exception {
BufferedReader reader = new BufferedReader(new InputStreamReader(proc.getInputStream()));
String line;
while ((line = reader.readLine()) != null)
log().info(line);
return proc.waitFor();
}
/**
* Executes Hadoop command line tool.
*
* @param args Arguments for Hadoop command line tool.
* @return Process exit code.
* @throws Exception If failed.
*/
private int executeHadoopCmd(String... args) throws Exception {
ProcessBuilder procBuilder = createProcessBuilder();
List<String> cmd = new ArrayList<>();
cmd.add(hadoopHome + "/bin/hadoop");
cmd.addAll(Arrays.asList(args));
procBuilder.command(cmd);
log().info("Execute: " + procBuilder.command());
return watchProcess(procBuilder.start());
}
/**
* Executes Hive query.
*
* @param qry Query.
* @return Process exit code.
* @throws Exception If failed.
*/
private int executeHiveQuery(String qry) throws Exception {
ProcessBuilder procBuilder = createProcessBuilder();
List<String> cmd = new ArrayList<>();
procBuilder.command(cmd);
cmd.add(hiveHome + "/bin/hive");
cmd.add("--hiveconf");
cmd.add("hive.rpc.query.plan=true");
cmd.add("--hiveconf");
cmd.add("javax.jdo.option.ConnectionURL=jdbc:derby:" + testWorkDir.getAbsolutePath() + "/metastore_db;" +
"databaseName=metastore_db;create=true");
cmd.add("-e");
cmd.add(qry);
procBuilder.command(cmd);
log().info("Execute: " + procBuilder.command());
return watchProcess(procBuilder.start());
}
/**
* Tests Hadoop command line integration.
*/
@Test
public void testHadoopCommandLine() throws Exception {
assertEquals(0, executeHadoopCmd("fs", "-ls", "/"));
assertEquals(0, executeHadoopCmd("fs", "-mkdir", "/input"));
assertEquals(0, executeHadoopCmd("fs", "-put", new File(testWorkDir, "test-data").getAbsolutePath(), "/input"));
assertTrue(igfs.exists(new IgfsPath("/input/test-data")));
assertEquals(0, executeHadoopCmd("jar", examplesJar.getAbsolutePath(), "wordcount", "/input", "/output"));
IgfsPath path = new IgfsPath("/user/" + System.getProperty("user.name") + "/");
assertTrue(igfs.exists(path));
IgfsPath jobStatPath = null;
for (IgfsPath jobPath : igfs.listPaths(path)) {
assertNull(jobStatPath);
jobStatPath = jobPath;
}
File locStatFile = new File(testWorkDir, "performance");
assertEquals(0, executeHadoopCmd("fs", "-get", jobStatPath.toString() + "/performance", locStatFile.toString()));
long evtCnt = HadoopTestUtils.simpleCheckJobStatFile(new BufferedReader(new FileReader(locStatFile)));
assertTrue(evtCnt >= 22); //It's the minimum amount of events for job with combiner.
assertTrue(igfs.exists(new IgfsPath("/output")));
BufferedReader in = new BufferedReader(new InputStreamReader(igfs.open(new IgfsPath("/output/part-r-00000"))));
List<String> res = new ArrayList<>();
String line;
while ((line = in.readLine()) != null)
res.add(line);
Collections.sort(res);
assertEquals("[blue\t150, green\t200, red\t100, yellow\t50]", res.toString());
}
/**
* Runs query check result.
*
* @param expRes Expected result.
* @param qry Query.
* @throws Exception If failed.
*/
private void checkQuery(String expRes, String qry) throws Exception {
assertEquals(0, executeHiveQuery("drop table if exists result"));
assertEquals(0, executeHiveQuery(
"create table result " +
"row format delimited fields terminated by ' ' " +
"stored as textfile " +
"location '/result' as " + qry
));
IgfsInputStream in = igfs.open(new IgfsPath("/result/000000_0"));
byte[] buf = new byte[(int) in.length()];
in.read(buf);
assertEquals(expRes, new String(buf));
}
/**
* Tests Hive integration.
*/
@Ignore("https://issues.apache.org/jira/browse/IGNITE-9920")
@Test
public void testHiveCommandLine() throws Exception {
assertEquals(0, executeHiveQuery(
"create table table_a (" +
"id_a int," +
"id_b int" +
") " +
"row format delimited fields terminated by '\\t'" +
"stored as textfile " +
"location '/table-a'"
));
assertEquals(0, executeHadoopCmd("fs", "-put", new File(testWorkDir, "data-a").getAbsolutePath(), "/table-a"));
assertEquals(0, executeHiveQuery(
"create table table_b (" +
"id_b int," +
"rndv int" +
") " +
"row format delimited fields terminated by '\\t'" +
"stored as textfile " +
"location '/table-b'"
));
assertEquals(0, executeHadoopCmd("fs", "-put", new File(testWorkDir, "data-b").getAbsolutePath(), "/table-b"));
checkQuery(
"0 0\n" +
"1 2\n" +
"2 4\n" +
"3 6\n" +
"4 8\n" +
"5 10\n" +
"6 12\n" +
"7 14\n" +
"8 16\n" +
"9 18\n",
"select * from table_a order by id_a limit 10"
);
checkQuery("2000\n", "select count(id_b) from table_b");
checkQuery(
"250 500 2002\n" +
"251 502 2006\n" +
"252 504 2010\n" +
"253 506 2014\n" +
"254 508 2018\n" +
"255 510 2022\n" +
"256 512 2026\n" +
"257 514 2030\n" +
"258 516 2034\n" +
"259 518 2038\n",
"select a.id_a, a.id_b, b.rndv" +
" from table_a a" +
" inner join table_b b on a.id_b = b.id_b" +
" where b.rndv > 2000" +
" order by a.id_a limit 10"
);
checkQuery("1000\n", "select count(b.id_b) from table_a a inner join table_b b on a.id_b = b.id_b");
}
}