blob: ee9e47ca50c98e1fa2be85373dab458efa61b965 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.elasticsearch;
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.ConnectionConfiguration;
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.getBackendVersion;
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.parseResponse;
import com.fasterxml.jackson.databind.JsonNode;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.http.HttpEntity;
import org.apache.http.entity.ContentType;
import org.apache.http.nio.entity.NStringEntity;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
/** Test utilities to use with {@link ElasticsearchIO}. */
class ElasticsearchIOTestUtils {
static final String[] FAMOUS_SCIENTISTS = {
"Einstein",
"Darwin",
"Copernicus",
"Pasteur",
"Curie",
"Faraday",
"Newton",
"Bohr",
"Galilei",
"Maxwell"
};
static final int NUM_SCIENTISTS = FAMOUS_SCIENTISTS.length;
/** Enumeration that specifies whether to insert malformed documents. */
public enum InjectionMode {
INJECT_SOME_INVALID_DOCS,
DO_NOT_INJECT_INVALID_DOCS
}
/** Deletes the given index synchronously. */
static void deleteIndex(ConnectionConfiguration connectionConfiguration, RestClient restClient)
throws IOException {
deleteIndex(restClient, connectionConfiguration.getIndex());
}
private static void closeIndex(RestClient restClient, String index) throws IOException {
restClient.performRequest("POST", String.format("/%s/_close", index));
}
private static void deleteIndex(RestClient restClient, String index) throws IOException {
try {
closeIndex(restClient, index);
restClient.performRequest(
"DELETE", String.format("/%s", index), Collections.singletonMap("refresh", "wait_for"));
} catch (IOException e) {
// it is fine to ignore this expression as deleteIndex occurs in @before,
// so when the first tests is run, the index does not exist yet
if (!e.getMessage().contains("index_not_found_exception")) {
throw e;
}
}
}
/**
* Synchronously deletes the target if it exists and then (re)creates it as a copy of source
* synchronously.
*/
static void copyIndex(RestClient restClient, String source, String target) throws IOException {
deleteIndex(restClient, target);
HttpEntity entity =
new NStringEntity(
String.format(
"{\"source\" : { \"index\" : \"%s\" }, \"dest\" : { \"index\" : \"%s\" } }",
source, target),
ContentType.APPLICATION_JSON);
restClient.performRequest(
"POST", "/_reindex", Collections.singletonMap("refresh", "wait_for"), entity);
}
/** Inserts the given number of test documents into Elasticsearch. */
static void insertTestDocuments(
ConnectionConfiguration connectionConfiguration, long numDocs, RestClient restClient)
throws IOException {
List<String> data =
ElasticsearchIOTestUtils.createDocuments(
numDocs, ElasticsearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS);
StringBuilder bulkRequest = new StringBuilder();
int i = 0;
for (String document : data) {
bulkRequest.append(
String.format(
"{ \"index\" : { \"_index\" : \"%s\", \"_type\" : \"%s\", \"_id\" : \"%s\" } }%n%s%n",
connectionConfiguration.getIndex(),
connectionConfiguration.getType(),
i++,
document));
}
String endPoint =
String.format(
"/%s/%s/_bulk", connectionConfiguration.getIndex(), connectionConfiguration.getType());
HttpEntity requestBody =
new NStringEntity(bulkRequest.toString(), ContentType.APPLICATION_JSON);
Response response =
restClient.performRequest(
"POST", endPoint, Collections.singletonMap("refresh", "wait_for"), requestBody);
ElasticsearchIO.checkForErrors(
response.getEntity(), ElasticsearchIO.getBackendVersion(connectionConfiguration), false);
}
/**
* Forces a refresh of the given index to make recently inserted documents available for search
* using the index and type named in the connectionConfiguration.
*
* @param connectionConfiguration providing the index and type
* @param restClient To use for issuing queries
* @return The number of docs in the index
* @throws IOException On error communicating with Elasticsearch
*/
static long refreshIndexAndGetCurrentNumDocs(
ConnectionConfiguration connectionConfiguration, RestClient restClient) throws IOException {
return refreshIndexAndGetCurrentNumDocs(
restClient, connectionConfiguration.getIndex(), connectionConfiguration.getType());
}
/**
* Forces a refresh of the given index to make recently inserted documents available for search.
*
* @param restClient To use for issuing queries
* @param index The Elasticsearch index
* @param type The Elasticsearch type
* @return The number of docs in the index
* @throws IOException On error communicating with Elasticsearch
*/
static long refreshIndexAndGetCurrentNumDocs(RestClient restClient, String index, String type)
throws IOException {
long result = 0;
try {
String endPoint = String.format("/%s/_refresh", index);
restClient.performRequest("POST", endPoint);
endPoint = String.format("/%s/%s/_search", index, type);
Response response = restClient.performRequest("GET", endPoint);
JsonNode searchResult = ElasticsearchIO.parseResponse(response.getEntity());
result = searchResult.path("hits").path("total").asLong();
} catch (IOException e) {
// it is fine to ignore bellow exceptions because in testWriteWithBatchSize* sometimes,
// we call upgrade before any doc have been written
// (when there are fewer docs processed than batchSize).
// In that cases index/type has not been created (created upon first doc insertion)
if (!e.getMessage().contains("index_not_found_exception")) {
throw e;
}
}
return result;
}
/**
* Generates a list of test documents for insertion.
*
* @param numDocs Number of docs to generate
* @param injectionMode {@link InjectionMode} that specifies whether to insert malformed documents
* @return the list of json String representing the documents
*/
static List<String> createDocuments(long numDocs, InjectionMode injectionMode) {
ArrayList<String> data = new ArrayList<>();
for (int i = 0; i < numDocs; i++) {
int index = i % FAMOUS_SCIENTISTS.length;
// insert 2 malformed documents
if (InjectionMode.INJECT_SOME_INVALID_DOCS.equals(injectionMode) && (i == 6 || i == 7)) {
data.add(String.format("{\"scientist\";\"%s\", \"id\":%s}", FAMOUS_SCIENTISTS[index], i));
} else {
data.add(String.format("{\"scientist\":\"%s\", \"id\":%s}", FAMOUS_SCIENTISTS[index], i));
}
}
return data;
}
/**
* Executes a query for the named scientist and returns the count from the result.
*
* @param connectionConfiguration Specifies the index and type
* @param restClient To use to execute the call
* @param scientistName The scientist to query for
* @return The cound of documents found
* @throws IOException On error talking to Elasticsearch
*/
static int countByScientistName(
ConnectionConfiguration connectionConfiguration, RestClient restClient, String scientistName)
throws IOException {
return countByMatch(connectionConfiguration, restClient, "scientist", scientistName);
}
/**
* Executes a match query for given field/value and returns the count of results.
*
* @param connectionConfiguration Specifies the index and type
* @param restClient To use to execute the call
* @param field The field to query
* @param value The value to match
* @return The count of documents in the search result
* @throws IOException On error communicating with Elasticsearch
*/
static int countByMatch(
ConnectionConfiguration connectionConfiguration,
RestClient restClient,
String field,
String value)
throws IOException {
String requestBody =
"{\n"
+ " \"query\" : {\"match\": {\n"
+ " \""
+ field
+ "\": \""
+ value
+ "\"\n"
+ " }}\n"
+ "}\n";
String endPoint =
String.format(
"/%s/%s/_search",
connectionConfiguration.getIndex(), connectionConfiguration.getType());
HttpEntity httpEntity = new NStringEntity(requestBody, ContentType.APPLICATION_JSON);
Response response =
restClient.performRequest("GET", endPoint, Collections.emptyMap(), httpEntity);
JsonNode searchResult = parseResponse(response.getEntity());
return searchResult.path("hits").path("total").asInt();
}
public static void setIndexMapping(
ConnectionConfiguration connectionConfiguration, RestClient restClient) throws IOException {
String endpoint = String.format("/%s", connectionConfiguration.getIndex());
String requestString =
String.format(
"{\"mappings\":{\"%s\":{\"properties\":{\"age\":{\"type\":\"long\"},"
+ " \"scientist\":{\"type\":\"%s\"}, \"id\":{\"type\":\"long\"}}}}}",
connectionConfiguration.getType(),
getBackendVersion(connectionConfiguration) == 2 ? "string" : "text");
HttpEntity requestBody = new NStringEntity(requestString, ContentType.APPLICATION_JSON);
Request request = new Request("PUT", endpoint);
request.setEntity(requestBody);
restClient.performRequest(request);
}
}