blob: 7226c30e5ac296f6822cf29b152f9a90a886b1b7 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.metron.elasticsearch.dao;
import org.apache.metron.elasticsearch.client.ElasticsearchClient;
import org.apache.metron.elasticsearch.client.ElasticsearchClientFactory;
import org.apache.metron.indexing.dao.AccessConfig;
import org.apache.metron.indexing.dao.IndexDao;
import org.apache.metron.indexing.dao.RetrieveLatestDao;
import org.apache.metron.indexing.dao.search.FieldType;
import org.apache.metron.indexing.dao.search.GetRequest;
import org.apache.metron.indexing.dao.search.GroupRequest;
import org.apache.metron.indexing.dao.search.GroupResponse;
import org.apache.metron.indexing.dao.search.InvalidSearchException;
import org.apache.metron.indexing.dao.search.SearchRequest;
import org.apache.metron.indexing.dao.search.SearchResponse;
import org.apache.metron.indexing.dao.update.CommentAddRemoveRequest;
import org.apache.metron.indexing.dao.update.Document;
import org.apache.metron.indexing.dao.update.OriginalNotFoundException;
import org.apache.metron.indexing.dao.update.PatchRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.index.query.QueryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.List;
import java.util.Map;
import java.util.Optional;
public class ElasticsearchDao implements IndexDao {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private transient ElasticsearchClient client;
private ElasticsearchSearchDao searchDao;
private ElasticsearchUpdateDao updateDao;
private ElasticsearchRetrieveLatestDao retrieveLatestDao;
/**
* Retrieves column metadata about search indices.
*/
private ElasticsearchColumnMetadataDao columnMetadataDao;
/**
* Handles the submission of search requests to Elasticsearch.
*/
private ElasticsearchRequestSubmitter requestSubmitter;
private AccessConfig accessConfig;
private WriteRequest.RefreshPolicy refreshPolicy;
public ElasticsearchDao(ElasticsearchClient client,
AccessConfig config,
ElasticsearchSearchDao searchDao,
ElasticsearchUpdateDao updateDao,
ElasticsearchRetrieveLatestDao retrieveLatestDao,
ElasticsearchColumnMetadataDao columnMetadataDao,
ElasticsearchRequestSubmitter requestSubmitter
) {
this.client = client;
this.searchDao = searchDao;
this.updateDao = updateDao;
this.retrieveLatestDao = retrieveLatestDao;
this.columnMetadataDao = columnMetadataDao;
this.requestSubmitter = requestSubmitter;
this.accessConfig = config;
}
public ElasticsearchDao() {
//uninitialized.
refreshPolicy = WriteRequest.RefreshPolicy.NONE;
}
public AccessConfig getAccessConfig() {
return accessConfig;
}
public void setAccessConfig(AccessConfig accessConfig) {
this.accessConfig = accessConfig;
}
@Override
public synchronized void init(AccessConfig config) {
if (this.client == null) {
this.client = ElasticsearchClientFactory.create(config.getGlobalConfigSupplier().get());
this.accessConfig = config;
this.columnMetadataDao = new ElasticsearchColumnMetadataDao(this.client);
this.requestSubmitter = new ElasticsearchRequestSubmitter(this.client);
this.searchDao = new ElasticsearchSearchDao(client, accessConfig, columnMetadataDao, requestSubmitter);
this.retrieveLatestDao = new ElasticsearchRetrieveLatestDao(client);
this.updateDao = new ElasticsearchUpdateDao(client, accessConfig, retrieveLatestDao)
.withRefreshPolicy(refreshPolicy);
}
if (columnMetadataDao == null) {
throw new IllegalArgumentException("No ColumnMetadataDao available");
}
if (requestSubmitter == null) {
throw new IllegalArgumentException("No ElasticsearchRequestSubmitter available");
}
}
@Override
public SearchResponse search(SearchRequest searchRequest) throws InvalidSearchException {
return this.searchDao.search(searchRequest);
}
@Override
public GroupResponse group(GroupRequest groupRequest) throws InvalidSearchException {
return this.searchDao.group(groupRequest);
}
@Override
public Document getLatest(final String guid, final String sensorType) throws IOException {
return retrieveLatestDao.getLatest(guid, sensorType);
}
@Override
public Iterable<Document> getAllLatest(
final List<GetRequest> getRequests) throws IOException {
return retrieveLatestDao.getAllLatest(getRequests);
}
@Override
public Document update(Document update, Optional<String> index) throws IOException {
return updateDao.update(update, index);
}
@Override
public Map<Document, Optional<String>> batchUpdate(Map<Document, Optional<String>> updates) throws IOException {
return updateDao.batchUpdate(updates);
}
@Override
public Document patch(RetrieveLatestDao retrieveLatestDao, PatchRequest request, Optional<Long> timestamp)
throws OriginalNotFoundException, IOException {
return updateDao.patch(retrieveLatestDao, request, timestamp);
}
@Override
public Document addCommentToAlert(CommentAddRemoveRequest request) throws IOException {
return updateDao.addCommentToAlert(request);
}
@Override
public Document removeCommentFromAlert(CommentAddRemoveRequest request) throws IOException {
return updateDao.removeCommentFromAlert(request);
}
@Override
public Map<String, FieldType> getColumnMetadata(List<String> indices) throws IOException {
return this.columnMetadataDao.getColumnMetadata(indices);
}
@Override
public Optional<Map<String, Object>> getLatestResult(GetRequest request) throws IOException {
return retrieveLatestDao.getLatestResult(request);
}
@Override
public Document addCommentToAlert(CommentAddRemoveRequest request, Document latest) throws IOException {
return this.updateDao.addCommentToAlert(request, latest);
}
@Override
public Document removeCommentFromAlert(CommentAddRemoveRequest request, Document latest) throws IOException {
return this.updateDao.removeCommentFromAlert(request, latest);
}
public ElasticsearchDao withRefreshPolicy(WriteRequest.RefreshPolicy refreshPolicy) {
this.refreshPolicy = refreshPolicy;
return this;
}
protected Optional<String> getIndexName(String guid, String sensorType) throws IOException {
return updateDao.findIndexNameByGUID(guid, sensorType);
}
protected SearchResponse search(SearchRequest request, QueryBuilder queryBuilder)
throws InvalidSearchException {
return searchDao.search(request, queryBuilder);
}
protected GroupResponse group(GroupRequest groupRequest, QueryBuilder queryBuilder)
throws InvalidSearchException {
return searchDao.group(groupRequest, queryBuilder);
}
public ElasticsearchClient getClient() {
return this.client;
}
}