blob: ef7c83ada7fab06f29ad811a1d9349f64a5277ca [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.hadoop.format;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import org.apache.beam.sdk.io.common.HashingFn;
import org.apache.beam.sdk.io.common.NetworkTestHelper;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputFormat;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.hadoop.cfg.ConfigurationOptions;
import org.elasticsearch.hadoop.mr.EsInputFormat;
import org.elasticsearch.hadoop.mr.LinkedMapWritable;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.NodeValidationException;
import org.elasticsearch.node.internal.InternalSettingsPreparer;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.transport.Netty4Plugin;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Tests to validate HadoopFormatIO for embedded Elasticsearch instance.
*
* <p>{@link EsInputFormat} can be used to read data from Elasticsearch. EsInputFormat by default
* returns key class as Text and value class as LinkedMapWritable. You can also set MapWritable as
* value class, provided that you set the property "mapred.mapoutput.value.class" with
* MapWritable.class. If this property is not set then, using MapWritable as value class may give
* org.apache.beam.sdk.coders.CoderException due to unexpected extra bytes after decoding.
*/
@RunWith(JUnit4.class)
public class HadoopFormatIOElasticTest implements Serializable {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(HadoopFormatIOElasticTest.class);
private static final String ELASTIC_IN_MEM_HOSTNAME = "127.0.0.1";
private static int port;
private static final String ELASTIC_INTERNAL_VERSION = "5.x";
private static final String TRUE = "true";
private static final String ELASTIC_INDEX_NAME = "beamdb";
private static final String ELASTIC_TYPE_NAME = "scientists";
private static final String ELASTIC_RESOURCE = "/" + ELASTIC_INDEX_NAME + "/" + ELASTIC_TYPE_NAME;
private static final int TEST_DATA_ROW_COUNT = 10;
private static final String ELASTIC_TYPE_ID_PREFIX = "s";
@ClassRule public static TemporaryFolder elasticTempFolder = new TemporaryFolder();
@Rule public final transient TestPipeline pipeline = TestPipeline.create();
@BeforeClass
public static void startServer() throws NodeValidationException, IOException {
port = NetworkTestHelper.getAvailableLocalPort();
ElasticEmbeddedServer.startElasticEmbeddedServer();
}
/**
* Test to read data from embedded Elasticsearch instance and verify whether data is read
* successfully.
*/
@Test
public void testHifIOWithElastic() {
// Expected hashcode is evaluated during insertion time one time and hardcoded here.
String expectedHashCode = "a62a85f5f081e3840baf1028d4d6c6bc";
Configuration conf = getConfiguration();
PCollection<KV<Text, LinkedMapWritable>> esData =
pipeline.apply(HadoopFormatIO.<Text, LinkedMapWritable>read().withConfiguration(conf));
PCollection<Long> count = esData.apply(Count.globally());
// Verify that the count of objects fetched using HIFInputFormat IO is correct.
PAssert.thatSingleton(count).isEqualTo((long) TEST_DATA_ROW_COUNT);
PCollection<LinkedMapWritable> values = esData.apply(Values.create());
PCollection<String> textValues = values.apply(transformFunc);
// Verify the output values using checksum comparison.
PCollection<String> consolidatedHashcode =
textValues.apply(Combine.globally(new HashingFn()).withoutDefaults());
PAssert.that(consolidatedHashcode).containsInAnyOrder(expectedHashCode);
pipeline.run().waitUntilFinish();
}
private final MapElements<LinkedMapWritable, String> transformFunc =
MapElements.via(
new SimpleFunction<LinkedMapWritable, String>() {
@Override
public String apply(LinkedMapWritable mapw) {
return mapw.get(new Text("id")) + "|" + mapw.get(new Text("scientist"));
}
});
/**
* Test to read data from embedded Elasticsearch instance based on query and verify whether data
* is read successfully.
*/
@Test
public void testHifIOWithElasticQuery() {
long expectedRowCount = 1L;
String expectedHashCode = "cfbf3e5c993d44e57535a114e25f782d";
Configuration conf = getConfiguration();
String fieldValue = ELASTIC_TYPE_ID_PREFIX + "2";
String query =
"{"
+ " \"query\": {"
+ " \"match\" : {"
+ " \"id\" : {"
+ " \"query\" : \""
+ fieldValue
+ "\","
+ " \"type\" : \"boolean\""
+ " }"
+ " }"
+ " }"
+ "}";
conf.set(ConfigurationOptions.ES_QUERY, query);
PCollection<KV<Text, LinkedMapWritable>> esData =
pipeline.apply(HadoopFormatIO.<Text, LinkedMapWritable>read().withConfiguration(conf));
PCollection<Long> count = esData.apply(Count.globally());
// Verify that the count of objects fetched using HIFInputFormat IO is correct.
PAssert.thatSingleton(count).isEqualTo(expectedRowCount);
PCollection<LinkedMapWritable> values = esData.apply(Values.create());
PCollection<String> textValues = values.apply(transformFunc);
// Verify the output values using checksum comparison.
PCollection<String> consolidatedHashcode =
textValues.apply(Combine.globally(new HashingFn()).withoutDefaults());
PAssert.that(consolidatedHashcode).containsInAnyOrder(expectedHashCode);
pipeline.run().waitUntilFinish();
}
/**
* Set the Elasticsearch configuration parameters in the Hadoop configuration object.
* Configuration object should have InputFormat class, key class and value class set. Mandatory
* fields for ESInputFormat to be set are es.resource, es.nodes, es.port, es.internal.es.version.
* Please refer to <a
* href="https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html"
* >Elasticsearch Configuration</a> for more details.
*/
private Configuration getConfiguration() {
Configuration conf = new Configuration();
conf.set(ConfigurationOptions.ES_NODES, ELASTIC_IN_MEM_HOSTNAME);
conf.set(ConfigurationOptions.ES_PORT, String.format("%s", port));
conf.set(ConfigurationOptions.ES_RESOURCE, ELASTIC_RESOURCE);
conf.set("es.internal.es.version", ELASTIC_INTERNAL_VERSION);
conf.set(ConfigurationOptions.ES_NODES_DISCOVERY, TRUE);
conf.set(ConfigurationOptions.ES_INDEX_AUTO_CREATE, TRUE);
conf.setClass("mapreduce.job.inputformat.class", EsInputFormat.class, InputFormat.class);
conf.setClass("key.class", Text.class, Object.class);
conf.setClass("value.class", LinkedMapWritable.class, Object.class);
return conf;
}
private static Map<String, String> createElasticRow(String id, String name) {
Map<String, String> data = new HashMap<>();
data.put("id", id);
data.put("scientist", name);
return data;
}
@AfterClass
public static void shutdownServer() throws IOException {
ElasticEmbeddedServer.shutdown();
}
/** Class for in memory Elasticsearch server. */
static class ElasticEmbeddedServer implements Serializable {
private static final long serialVersionUID = 1L;
private static Node node;
static void startElasticEmbeddedServer() throws NodeValidationException {
Settings settings =
Settings.builder()
.put("node.data", TRUE)
.put("network.host", ELASTIC_IN_MEM_HOSTNAME)
.put("http.port", port)
.put("path.data", elasticTempFolder.getRoot().getPath())
.put("path.home", elasticTempFolder.getRoot().getPath())
.put("transport.type", "local")
.put("http.enabled", TRUE)
.put("node.ingest", TRUE)
.build();
node = new PluginNode(settings);
node.start();
LOG.info("Elastic in memory server started.");
prepareElasticIndex();
LOG.info(
"Prepared index "
+ ELASTIC_INDEX_NAME
+ "and populated data on elastic in memory server.");
}
/** Prepares Elastic index, by adding rows. */
private static void prepareElasticIndex() {
CreateIndexRequest indexRequest = new CreateIndexRequest(ELASTIC_INDEX_NAME);
node.client().admin().indices().create(indexRequest).actionGet();
for (int i = 0; i < TEST_DATA_ROW_COUNT; i++) {
node.client()
.prepareIndex(ELASTIC_INDEX_NAME, ELASTIC_TYPE_NAME, String.valueOf(i))
.setSource(createElasticRow(ELASTIC_TYPE_ID_PREFIX + i, "Faraday" + i))
.execute()
.actionGet();
}
node.client().admin().indices().prepareRefresh(ELASTIC_INDEX_NAME).get();
}
/** Shutdown the embedded instance. */
static void shutdown() throws IOException {
DeleteIndexRequest indexRequest = new DeleteIndexRequest(ELASTIC_INDEX_NAME);
node.client().admin().indices().delete(indexRequest).actionGet();
LOG.info("Deleted index " + ELASTIC_INDEX_NAME + " from elastic in memory server");
node.close();
LOG.info("Closed elastic in memory server node.");
deleteElasticDataDirectory();
}
private static void deleteElasticDataDirectory() {
try {
FileUtils.deleteDirectory(new File(elasticTempFolder.getRoot().getPath()));
} catch (IOException e) {
throw new RuntimeException("Could not delete elastic data directory: " + e.getMessage(), e);
}
}
}
/** Class created for handling "http.enabled" property as "true" for Elasticsearch node. */
static class PluginNode extends Node implements Serializable {
private static final long serialVersionUID = 1L;
private static final ImmutableList<Class<? extends Plugin>> PLUGINS =
ImmutableList.of(Netty4Plugin.class);
PluginNode(final Settings settings) {
super(InternalSettingsPreparer.prepareEnvironment(settings, null), PLUGINS);
}
}
}