blob: 94e557c1f2ad41c207cb163a3df1995e91f33ca9 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.metamodel.elasticsearch.rest;
import java.util.List;
import org.apache.metamodel.AbstractUpdateCallback;
import org.apache.metamodel.MetaModelException;
import org.apache.metamodel.UpdateCallback;
import org.apache.metamodel.create.TableCreationBuilder;
import org.apache.metamodel.delete.RowDeletionBuilder;
import org.apache.metamodel.drop.TableDropBuilder;
import org.apache.metamodel.insert.RowInsertionBuilder;
import org.apache.metamodel.schema.Schema;
import org.apache.metamodel.schema.Table;
import org.elasticsearch.action.bulk.BulkRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.searchbox.action.Action;
import io.searchbox.action.BulkableAction;
import io.searchbox.client.JestResult;
import io.searchbox.core.Bulk;
import io.searchbox.core.Bulk.Builder;
import io.searchbox.core.BulkResult;
import io.searchbox.core.BulkResult.BulkResultItem;
import io.searchbox.indices.Refresh;
/**
* {@link UpdateCallback} implementation for
* {@link ElasticSearchRestDataContext}.
*/
final class JestElasticSearchUpdateCallback extends AbstractUpdateCallback {
private static final Logger logger = LoggerFactory.getLogger(JestElasticSearchUpdateCallback.class);
private static final int BULK_BUFFER_SIZE = 1000;
private Bulk.Builder bulkBuilder;
private int bulkActionCount = 0;
private final boolean isBatch;
public JestElasticSearchUpdateCallback(ElasticSearchRestDataContext dataContext, boolean isBatch) {
super(dataContext);
this.isBatch = isBatch;
}
private boolean isBatch() {
return isBatch;
}
@Override
public ElasticSearchRestDataContext getDataContext() {
return (ElasticSearchRestDataContext) super.getDataContext();
}
@Override
public TableCreationBuilder createTable(Schema schema, String name) throws IllegalArgumentException,
IllegalStateException {
return new JestElasticSearchCreateTableBuilder(this, schema, name);
}
@Override
public boolean isDropTableSupported() {
return true;
}
@Override
public TableDropBuilder dropTable(Table table) throws IllegalArgumentException, IllegalStateException,
UnsupportedOperationException {
return new JestElasticSearchDropTableBuilder(this, table);
}
@Override
public RowInsertionBuilder insertInto(Table table) throws IllegalArgumentException, IllegalStateException,
UnsupportedOperationException {
return new JestElasticSearchInsertBuilder(this, table);
}
@Override
public boolean isDeleteSupported() {
return true;
}
@Override
public RowDeletionBuilder deleteFrom(Table table) throws IllegalArgumentException, IllegalStateException,
UnsupportedOperationException {
return new JestElasticSearchDeleteBuilder(this, table);
}
public void onExecuteUpdateFinished() {
if (isBatch()) {
flushBulkActions();
}
final String indexName = getDataContext().getIndexName();
final Refresh refresh = new Refresh.Builder().addIndex(indexName).build();
JestClientExecutor.execute(getDataContext().getElasticSearchClient(), refresh, false);
}
private void flushBulkActions() {
if (bulkBuilder == null || bulkActionCount == 0) {
// nothing to flush
return;
}
final Bulk bulk = getBulkBuilder().build();
logger.info("Flushing {} actions to ElasticSearch index {}", bulkActionCount, getDataContext().getIndexName());
executeBlocking(bulk);
bulkActionCount = 0;
bulkBuilder = null;
}
public void execute(Action<?> action) {
if (isBatch() && action instanceof BulkableAction) {
final Bulk.Builder bulkBuilder = getBulkBuilder();
bulkBuilder.addAction((BulkableAction<?>) action);
bulkActionCount++;
if (bulkActionCount == BULK_BUFFER_SIZE) {
flushBulkActions();
}
} else {
executeBlocking(action);
}
}
private void executeBlocking(Action<?> action) {
final JestResult result = JestClientExecutor.execute(getDataContext().getElasticSearchClient(), action);
if (!result.isSucceeded()) {
if (result instanceof BulkResult) {
final List<BulkResultItem> failedItems = ((BulkResult) result).getFailedItems();
for (int i = 0; i < failedItems.size(); i++) {
final BulkResultItem failedItem = failedItems.get(i);
logger.error("Bulk failed with item no. {} of {}: id={} op={} status={} error={}", i+1, failedItems.size(), failedItem.id, failedItem.operation, failedItem.status, failedItem.error);
}
}
throw new MetaModelException(result.getResponseCode() + " - " + result.getErrorMessage());
}
}
private Builder getBulkBuilder() {
if (bulkBuilder == null) {
bulkBuilder = new Bulk.Builder();
bulkBuilder.defaultIndex(getDataContext().getIndexName());
}
return bulkBuilder;
}
}