blob: 9bd4cb6302f54a4535990dfa01cdb2995e9dad58 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.elasticsearch
import groovy.json.JsonSlurper
import org.apache.nifi.controller.AbstractControllerService
import org.apache.nifi.elasticsearch.DeleteOperationResponse
import org.apache.nifi.elasticsearch.ElasticSearchClientService
import org.apache.nifi.elasticsearch.IndexOperationRequest
import org.apache.nifi.elasticsearch.IndexOperationResponse
import org.apache.nifi.elasticsearch.SearchResponse
import org.apache.nifi.elasticsearch.UpdateOperationResponse
class TestElasticsearchClientService extends AbstractControllerService implements ElasticSearchClientService {
private boolean returnAggs
private boolean throwErrorInSearch
private boolean throwErrorInDelete
private boolean throwErrorInPit
private boolean throwErrorInUpdate
private int pageCount = 0
private int maxPages = 1
private String query
private Map<String, String> requestParameters
TestElasticsearchClientService(boolean returnAggs) {
this.returnAggs = returnAggs
}
private void common(boolean throwError, Map<String, String> requestParameters) {
if (throwError) {
throw new IOException("Simulated IOException")
}
this.requestParameters = requestParameters
}
@Override
IndexOperationResponse add(IndexOperationRequest operation, Map<String, String> requestParameters) {
return bulk(Arrays.asList(operation) as List<IndexOperationRequest>, requestParameters)
}
@Override
IndexOperationResponse bulk(List<IndexOperationRequest> operations, Map<String, String> requestParameters) {
common(false, requestParameters)
return new IndexOperationResponse(100L)
}
@Override
Long count(String query, String index, String type, Map<String, String> requestParameters) {
common(false, requestParameters)
return null
}
@Override
DeleteOperationResponse deleteById(String index, String type, String id, Map<String, String> requestParameters) {
return deleteById(index, type, Arrays.asList(id), requestParameters)
}
@Override
DeleteOperationResponse deleteById(String index, String type, List<String> ids, Map<String, String> requestParameters) {
common(throwErrorInDelete, requestParameters)
return new DeleteOperationResponse(100L)
}
@Override
DeleteOperationResponse deleteByQuery(String query, String index, String type, Map<String, String> requestParameters) {
return deleteById(index, type, Arrays.asList("1"), requestParameters)
}
@Override
UpdateOperationResponse updateByQuery(String query, String index, String type, Map<String, String> requestParameters) {
common(throwErrorInUpdate, requestParameters)
return new UpdateOperationResponse(100L)
}
@Override
Map<String, Object> get(String index, String type, String id, Map<String, String> requestParameters) {
common(false, requestParameters)
return [ "msg": "one" ]
}
@Override
SearchResponse search(String query, String index, String type, Map<String, String> requestParameters) {
common(throwErrorInSearch, requestParameters)
this.query = query
final SearchResponse response
if (pageCount++ < maxPages) {
def mapper = new JsonSlurper()
def hits = mapper.parseText(HITS_RESULT)
def aggs = returnAggs && pageCount == 1 ? mapper.parseText(AGGS_RESULT) : null
response = new SearchResponse((hits as List<Map<String, Object>>), aggs as Map<String, Object>, "pitId-${pageCount}", "scrollId-${pageCount}", "[\"searchAfter-${pageCount}\"]", 15, 5, false, null)
} else {
response = new SearchResponse([], [:], "pitId-${pageCount}", "scrollId-${pageCount}", "[\"searchAfter-${pageCount}\"]", 0, 1, false, null)
}
return response
}
@Override
SearchResponse scroll(String scroll) {
if (throwErrorInSearch) {
throw new IOException("Simulated IOException - scroll")
}
return search(null, null, null, requestParameters)
}
@Override
String initialisePointInTime(String index, String keepAlive) {
if (throwErrorInPit) {
throw new IOException("Simulated IOException - initialisePointInTime")
}
pageCount = 0
return "123"
}
@Override
DeleteOperationResponse deletePointInTime(String pitId) {
if (throwErrorInDelete) {
throw new IOException("Simulated IOException - deletePointInTime")
}
return new DeleteOperationResponse(100L)
}
@Override
DeleteOperationResponse deleteScroll(String scrollId) {
if (throwErrorInDelete) {
throw new IOException("Simulated IOException - deleteScroll")
}
return new DeleteOperationResponse(100L)
}
@Override
String getTransitUrl(String index, String type) {
"http://localhost:9400/${index}/${type}"
}
private static final String AGGS_RESULT = "{\n" +
" \"term_agg\": {\n" +
" \"doc_count_error_upper_bound\": 0,\n" +
" \"sum_other_doc_count\": 0,\n" +
" \"buckets\": [\n" +
" {\n" +
" \"key\": \"five\",\n" +
" \"doc_count\": 5\n" +
" },\n" +
" {\n" +
" \"key\": \"four\",\n" +
" \"doc_count\": 4\n" +
" },\n" +
" {\n" +
" \"key\": \"three\",\n" +
" \"doc_count\": 3\n" +
" },\n" +
" {\n" +
" \"key\": \"two\",\n" +
" \"doc_count\": 2\n" +
" },\n" +
" {\n" +
" \"key\": \"one\",\n" +
" \"doc_count\": 1\n" +
" }\n" +
" ]\n" +
" },\n" +
" \"term_agg2\": {\n" +
" \"doc_count_error_upper_bound\": 0,\n" +
" \"sum_other_doc_count\": 0,\n" +
" \"buckets\": [\n" +
" {\n" +
" \"key\": \"five\",\n" +
" \"doc_count\": 5\n" +
" },\n" +
" {\n" +
" \"key\": \"four\",\n" +
" \"doc_count\": 4\n" +
" },\n" +
" {\n" +
" \"key\": \"three\",\n" +
" \"doc_count\": 3\n" +
" },\n" +
" {\n" +
" \"key\": \"two\",\n" +
" \"doc_count\": 2\n" +
" },\n" +
" {\n" +
" \"key\": \"one\",\n" +
" \"doc_count\": 1\n" +
" }\n" +
" ]\n" +
" }\n" +
" }"
private static final String HITS_RESULT = "[\n" +
" {\n" +
" \"_index\": \"messages\",\n" +
" \"_type\": \"message\",\n" +
" \"_id\": \"14\",\n" +
" \"_score\": 1,\n" +
" \"_source\": {\n" +
" \"msg\": \"five\"\n" +
" }\n" +
" },\n" +
" {\n" +
" \"_index\": \"messages\",\n" +
" \"_type\": \"message\",\n" +
" \"_id\": \"5\",\n" +
" \"_score\": 1,\n" +
" \"_source\": {\n" +
" \"msg\": \"three\"\n" +
" }\n" +
" },\n" +
" {\n" +
" \"_index\": \"messages\",\n" +
" \"_type\": \"message\",\n" +
" \"_id\": \"8\",\n" +
" \"_score\": 1,\n" +
" \"_source\": {\n" +
" \"msg\": \"four\"\n" +
" }\n" +
" },\n" +
" {\n" +
" \"_index\": \"messages\",\n" +
" \"_type\": \"message\",\n" +
" \"_id\": \"9\",\n" +
" \"_score\": 1,\n" +
" \"_source\": {\n" +
" \"msg\": \"four\"\n" +
" }\n" +
" },\n" +
" {\n" +
" \"_index\": \"messages\",\n" +
" \"_type\": \"message\",\n" +
" \"_id\": \"10\",\n" +
" \"_score\": 1,\n" +
" \"_source\": {\n" +
" \"msg\": \"four\"\n" +
" }\n" +
" },\n" +
" {\n" +
" \"_index\": \"messages\",\n" +
" \"_type\": \"message\",\n" +
" \"_id\": \"12\",\n" +
" \"_score\": 1,\n" +
" \"_source\": {\n" +
" \"msg\": \"five\"\n" +
" }\n" +
" },\n" +
" {\n" +
" \"_index\": \"messages\",\n" +
" \"_type\": \"message\",\n" +
" \"_id\": \"2\",\n" +
" \"_score\": 1,\n" +
" \"_source\": {\n" +
" \"msg\": \"two\"\n" +
" }\n" +
" },\n" +
" {\n" +
" \"_index\": \"messages\",\n" +
" \"_type\": \"message\",\n" +
" \"_id\": \"4\",\n" +
" \"_score\": 1,\n" +
" \"_source\": {\n" +
" \"msg\": \"three\"\n" +
" }\n" +
" },\n" +
" {\n" +
" \"_index\": \"messages\",\n" +
" \"_type\": \"message\",\n" +
" \"_id\": \"6\",\n" +
" \"_score\": 1,\n" +
" \"_source\": {\n" +
" \"msg\": \"three\"\n" +
" }\n" +
" },\n" +
" {\n" +
" \"_index\": \"messages\",\n" +
" \"_type\": \"message\",\n" +
" \"_id\": \"15\",\n" +
" \"_score\": 1,\n" +
" \"_source\": {\n" +
" \"msg\": \"five\"\n" +
" }\n" +
" }\n" +
" ]"
void setThrowErrorInSearch(boolean throwErrorInSearch) {
this.throwErrorInSearch = throwErrorInSearch
}
void setThrowErrorInDelete(boolean throwErrorInDelete) {
this.throwErrorInDelete = throwErrorInDelete
}
void setThrowErrorInPit(boolean throwErrorInPit) {
this.throwErrorInPit = throwErrorInPit
}
void setThrowErrorInUpdate(boolean throwErrorInUpdate) {
this.throwErrorInUpdate = throwErrorInUpdate
}
void resetPageCount() {
this.pageCount = 0
}
void setMaxPages(int maxPages) {
this.maxPages = maxPages
}
Map<String, String> getRequestParameters() {
return this.requestParameters
}
}