blob: 19bda804f74326446173058192768963b179a40c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.elasticsearch;
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.ConnectionConfiguration;
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.ES_TYPE;
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.UPDATE_INDEX;
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.UPDATE_TYPE;
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.getEsIndex;
import java.io.IOException;
import java.io.Serializable;
import org.apache.beam.sdk.io.common.NetworkTestHelper;
import org.apache.beam.sdk.testing.TestPipeline;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.node.Node;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** Tests for {@link ElasticsearchIO} version 2.x. */
@RunWith(JUnit4.class)
public class ElasticsearchIOTest implements Serializable {
private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchIOTest.class);
private static final String ES_IP = "127.0.0.1";
private static final int MAX_STARTUP_WAITING_TIME_MSEC = 5000;
private static int esHttpPort;
private static Node node;
private static RestClient restClient;
private static ConnectionConfiguration connectionConfiguration;
// cannot use inheritance because ES5 test already extends ESIntegTestCase.
private static ElasticsearchIOTestCommon elasticsearchIOTestCommon;
@ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
@Rule public TestPipeline pipeline = TestPipeline.create();
@BeforeClass
public static void beforeClass() throws IOException {
esHttpPort = NetworkTestHelper.getAvailableLocalPort();
LOG.info("Starting embedded Elasticsearch instance ({})", esHttpPort);
Settings.Builder settingsBuilder =
Settings.settingsBuilder()
.put("cluster.name", "beam")
.put("http.enabled", "true")
.put("node.data", "true")
.put("path.data", TEMPORARY_FOLDER.getRoot().getPath())
.put("path.home", TEMPORARY_FOLDER.getRoot().getPath())
.put("node.name", "beam")
.put("network.host", ES_IP)
.put("http.port", esHttpPort)
.put("index.store.stats_refresh_interval", 0)
// had problems with some jdk, embedded ES was too slow for bulk insertion,
// and queue of 50 was full. No pb with real ES instance (cf testWrite integration test)
.put("threadpool.bulk.queue_size", 400);
node = new Node(settingsBuilder.build());
LOG.info("Elasticsearch node created");
node.start();
connectionConfiguration =
ConnectionConfiguration.create(
new String[] {"http://" + ES_IP + ":" + esHttpPort}, getEsIndex(), ES_TYPE)
.withSocketAndRetryTimeout(120000)
.withConnectTimeout(5000);
restClient = connectionConfiguration.createClient();
elasticsearchIOTestCommon =
new ElasticsearchIOTestCommon(connectionConfiguration, restClient, false);
int waitingTime = 0;
int healthCheckFrequency = 500;
while ((waitingTime < MAX_STARTUP_WAITING_TIME_MSEC)
&& restClient.performRequest("HEAD", "/").getStatusLine().getStatusCode() != 200) {
try {
Thread.sleep(healthCheckFrequency);
waitingTime += healthCheckFrequency;
} catch (InterruptedException e) {
LOG.warn(
"Waiting thread was interrupted while waiting for connection to Elasticsearch to be available");
}
}
if (waitingTime >= MAX_STARTUP_WAITING_TIME_MSEC) {
throw new IOException("Max startup waiting for embedded Elasticsearch to start was exceeded");
}
}
@AfterClass
public static void afterClass() throws IOException {
restClient.close();
node.close();
}
@Before
public void before() throws Exception {
ElasticSearchIOTestUtils.deleteIndex(connectionConfiguration, restClient);
}
@Test
public void testSizes() throws Exception {
elasticsearchIOTestCommon.testSizes();
}
@Test
public void testRead() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testRead();
}
@Test
public void testReadWithQuery() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testReadWithQuery();
}
@Test
public void testWrite() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testWrite();
}
@Rule public ExpectedException expectedException = ExpectedException.none();
@Test
public void testWriteWithErrors() throws Exception {
elasticsearchIOTestCommon.setExpectedException(expectedException);
elasticsearchIOTestCommon.testWriteWithErrors();
}
@Test
public void testWriteWithMaxBatchSize() throws Exception {
elasticsearchIOTestCommon.testWriteWithMaxBatchSize();
}
@Test
public void testWriteWithMaxBatchSizeBytes() throws Exception {
elasticsearchIOTestCommon.testWriteWithMaxBatchSizeBytes();
}
@Test
public void testSplit() throws Exception {
elasticsearchIOTestCommon.testSplit(0);
}
@Test
public void testWriteWithIdFn() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testWriteWithIdFn();
}
@Test
public void testWriteWithIndexFn() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testWriteWithIndexFn();
}
@Test
public void testWriteWithTypeFn() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testWriteWithTypeFn2x5x();
}
@Test
public void testWriteFullAddressing() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testWriteWithFullAddressing();
}
@Test
public void testWritePartialUpdate() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testWritePartialUpdate();
}
@Test
public void testWritePartialUpdateWithErrors() throws Exception {
// cannot share elasticsearchIOTestCommon because tests run in parallel.
ConnectionConfiguration connectionConfiguration =
ConnectionConfiguration.create(
new String[] {"http://" + ES_IP + ":" + esHttpPort}, UPDATE_INDEX, UPDATE_TYPE);
ElasticsearchIOTestCommon elasticsearchIOTestCommonWithErrors =
new ElasticsearchIOTestCommon(connectionConfiguration, restClient, false);
elasticsearchIOTestCommonWithErrors.setPipeline(pipeline);
elasticsearchIOTestCommonWithErrors.testWritePartialUpdateWithErrors();
}
@Test
public void testReadWithMetadata() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testReadWithMetadata();
}
@Test
public void testDefaultRetryPredicate() throws IOException {
elasticsearchIOTestCommon.testDefaultRetryPredicate(restClient);
}
@Test
public void testWriteRetry() throws Throwable {
elasticsearchIOTestCommon.setExpectedException(expectedException);
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testWriteRetry();
}
@Test
public void testWriteRetryValidRequest() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testWriteRetryValidRequest();
}
}