| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.metron.elasticsearch.bulk; |
| |
| import org.apache.commons.lang3.exception.ExceptionUtils; |
| import org.apache.metron.elasticsearch.client.ElasticsearchClient; |
| import org.apache.metron.indexing.dao.update.Document; |
| import org.elasticsearch.action.DocWriteRequest; |
| import org.elasticsearch.action.bulk.BulkItemResponse; |
| import org.elasticsearch.action.bulk.BulkRequest; |
| import org.elasticsearch.action.bulk.BulkResponse; |
| import org.elasticsearch.action.index.IndexRequest; |
| import org.elasticsearch.action.support.WriteRequest; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.IOException; |
| import java.lang.invoke.MethodHandles; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.List; |
| import java.util.stream.Collectors; |
| |
| /** |
| * Writes documents to an Elasticsearch index in bulk. |
| * |
| * @param <D> The type of document to write. |
| */ |
| public class ElasticsearchBulkDocumentWriter<D extends Document> implements BulkDocumentWriter<D> { |
| |
| /** |
| * A {@link Document} along with the index it will be written to. |
| */ |
| private class Indexable { |
| D document; |
| String index; |
| |
| public Indexable(D document, String index) { |
| this.document = document; |
| this.index = index; |
| } |
| } |
| |
| private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); |
| private ElasticsearchClient client; |
| private List<Indexable> documents; |
| private WriteRequest.RefreshPolicy refreshPolicy; |
| |
| public ElasticsearchBulkDocumentWriter(ElasticsearchClient client) { |
| this.client = client; |
| this.documents = new ArrayList<>(); |
| this.refreshPolicy = WriteRequest.RefreshPolicy.NONE; |
| } |
| |
| @Override |
| public void addDocument(D document, String indexName) { |
| documents.add(new Indexable(document, indexName)); |
| LOG.debug("Adding document to batch; document={}, index={}", document, indexName); |
| } |
| |
| @Override |
| public BulkDocumentWriterResults<D> write() { |
| BulkDocumentWriterResults<D> results = new BulkDocumentWriterResults<>(); |
| try { |
| // create an index request for each document |
| BulkRequest bulkRequest = new BulkRequest(); |
| bulkRequest.setRefreshPolicy(refreshPolicy); |
| for(Indexable doc: documents) { |
| DocWriteRequest request = createRequest(doc.document, doc.index); |
| bulkRequest.add(request); |
| } |
| |
| // submit the request and handle the response |
| BulkResponse bulkResponse = client.getHighLevelClient().bulk(bulkRequest); |
| handleBulkResponse(bulkResponse, documents, results); |
| if (LOG.isDebugEnabled()) { |
| String shards = Arrays.stream(bulkResponse.getItems()) |
| .map(bulkItemResponse -> bulkItemResponse.getResponse().getShardId().toString()) |
| .collect(Collectors.joining(",")); |
| LOG.debug("{} results written to shards {} in {} ms; batchSize={}, success={}, failed={}", |
| bulkResponse.getItems().length, shards, bulkResponse.getTookInMillis(), |
| documents.size(), results.getSuccesses().size(), results.getFailures().size()); |
| } |
| } catch(IOException e) { |
| // assume all documents have failed |
| for(Indexable indexable: documents) { |
| D failed = indexable.document; |
| results.addFailure(failed, e, ExceptionUtils.getRootCauseMessage(e)); |
| } |
| LOG.error("Failed to submit bulk request; all documents failed", e); |
| |
| } finally { |
| // flush all documents no matter which ones succeeded or failed |
| documents.clear(); |
| } |
| return results; |
| } |
| |
| @Override |
| public int size() { |
| return documents.size(); |
| } |
| |
| public ElasticsearchBulkDocumentWriter<D> withRefreshPolicy(WriteRequest.RefreshPolicy refreshPolicy) { |
| this.refreshPolicy = refreshPolicy; |
| return this; |
| } |
| |
| private IndexRequest createRequest(D document, String index) { |
| if(document.getTimestamp() == null) { |
| throw new IllegalArgumentException("Document must contain the timestamp"); |
| } |
| |
| // if updating an existing document, the doc ID should be defined. |
| // if creating a new document, set the doc ID to null to allow Elasticsearch to generate one. |
| String docId = document.getDocumentID().orElse(null); |
| if(LOG.isDebugEnabled() && document.getDocumentID().isPresent()) { |
| LOG.debug("Creating/Updating a document with known doc ID; docID={}, guid={}, sensorType={}", |
| docId, document.getGuid(), document.getSensorType()); |
| } else if(LOG.isDebugEnabled()) { |
| LOG.debug("Creating a new document, doc ID not yet known; guid={}, sensorType={}", |
| document.getGuid(), document.getSensorType()); |
| } |
| |
| return new IndexRequest() |
| .source(document.getDocument()) |
| .type(document.getSensorType() + "_doc") |
| .index(index) |
| .id(docId) |
| .index(index) |
| .timestamp(document.getTimestamp().toString()); |
| } |
| |
| /** |
| * Handles the {@link BulkResponse} received from Elasticsearch. |
| * @param bulkResponse The response received from Elasticsearch. |
| * @param documents The documents included in the bulk request. |
| * @param results The writer results. |
| */ |
| private void handleBulkResponse(BulkResponse bulkResponse, List<Indexable> documents, BulkDocumentWriterResults<D> results) { |
| if (bulkResponse.hasFailures()) { |
| |
| // interrogate the response to distinguish between those that succeeded and those that failed |
| for(BulkItemResponse response: bulkResponse) { |
| if(response.isFailed()) { |
| // request failed |
| D failed = getDocument(response.getItemId()); |
| Exception cause = response.getFailure().getCause(); |
| String message = response.getFailureMessage(); |
| results.addFailure(failed, cause, message); |
| |
| } else { |
| // request succeeded |
| D success = getDocument(response.getItemId()); |
| success.setDocumentID(response.getResponse().getId()); |
| results.addSuccess(success); |
| } |
| } |
| } else { |
| // all requests succeeded |
| for(Indexable success: documents) { |
| results.addSuccess(success.document); |
| } |
| } |
| } |
| |
| private D getDocument(int index) { |
| return documents.get(index).document; |
| } |
| } |