METAMODEL-1179: Upgraded ElasticSearch REST module to new client.
Using the official elastic REST high level client for ElasticSearch.
Closes #177
diff --git a/.travis.yml b/.travis.yml
index 1b1e342..65df979 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -15,7 +15,10 @@
services:
- couchdb
- mongodb
-
+ - docker
+
+script: "mvn clean verify -P integration-test"
+
after_success:
- mvn test javadoc:javadoc
diff --git a/CHANGES.md b/CHANGES.md
index 5af3fc4..7ab1d22 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -1,5 +1,6 @@
-### WIP
+### Apache MetaModel 5.1.0 (WIP)
+ * [METAMODEL-1179] - Refactored ElasticSearch REST module to use new official REST based client from Elastic.
* [METAMODEL-1177] - Made TableType.TABLE the default table type, replacing null values.
### Apache MetaModel 5.0.1
diff --git a/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/AbstractElasticSearchDataContext.java b/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/AbstractElasticSearchDataContext.java
new file mode 100644
index 0000000..c8ffae3
--- /dev/null
+++ b/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/AbstractElasticSearchDataContext.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.elasticsearch;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.metamodel.DataContext;
+import org.apache.metamodel.MetaModelException;
+import org.apache.metamodel.QueryPostprocessDataContext;
+import org.apache.metamodel.UpdateableDataContext;
+import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils;
+import org.apache.metamodel.schema.Column;
+import org.apache.metamodel.schema.MutableColumn;
+import org.apache.metamodel.schema.MutableSchema;
+import org.apache.metamodel.schema.MutableTable;
+import org.apache.metamodel.schema.Schema;
+import org.apache.metamodel.util.SimpleTableDef;
+import org.elasticsearch.common.unit.TimeValue;
+
+public abstract class AbstractElasticSearchDataContext extends QueryPostprocessDataContext implements DataContext,
+ UpdateableDataContext {
+
+ public static final TimeValue TIMEOUT_SCROLL = TimeValue.timeValueSeconds(60);
+
+ protected final String indexName;
+
+ // Table definitions that are set from the beginning, not supposed to be
+ // changed.
+ protected final List<SimpleTableDef> staticTableDefinitions;
+
+ // Table definitions that are discovered, these can change
+ protected final List<SimpleTableDef> dynamicTableDefinitions = new ArrayList<>();
+
+ /**
+ * Constructs a {@link ElasticSearchRestDataContext}. This constructor
+ * accepts a custom array of {@link SimpleTableDef}s which allows the user
+ * to define his own view on the indexes in the engine.
+ *
+ * @param indexName
+ * the name of the ElasticSearch index to represent
+ * @param tableDefinitions
+ * an array of {@link SimpleTableDef}s, which define the table
+ * and column model of the ElasticSearch index.
+ */
+ public AbstractElasticSearchDataContext(String indexName, SimpleTableDef... tableDefinitions) {
+ super(false);
+ if (indexName == null || indexName.trim().length() == 0) {
+ throw new IllegalArgumentException("Invalid ElasticSearch Index name: " + indexName);
+ }
+ this.indexName = indexName;
+ this.staticTableDefinitions = (tableDefinitions == null || tableDefinitions.length == 0 ? Collections
+ .<SimpleTableDef> emptyList() : Arrays.asList(tableDefinitions));
+ }
+
+ /**
+ * Performs an analysis of the available indexes in an ElasticSearch cluster
+ * {@link JestClient} instance and detects the elasticsearch types structure
+ * based on the metadata provided by the ElasticSearch java client.
+ *
+ * @see {@link #detectTable(JsonObject, String)}
+ * @return a mutable schema instance, useful for further fine tuning by the
+ * user.
+ */
+ protected abstract SimpleTableDef[] detectSchema();
+
+ @Override
+ protected Schema getMainSchema() throws MetaModelException {
+ final MutableSchema theSchema = new MutableSchema(getMainSchemaName());
+ for (final SimpleTableDef tableDef : staticTableDefinitions) {
+ addTable(theSchema, tableDef);
+ }
+
+ final SimpleTableDef[] tables = detectSchema();
+ synchronized (this) {
+ dynamicTableDefinitions.clear();
+ dynamicTableDefinitions.addAll(Arrays.asList(tables));
+ for (final SimpleTableDef tableDef : dynamicTableDefinitions) {
+ final List<String> tableNames = theSchema.getTableNames();
+
+ if (!tableNames.contains(tableDef.getName())) {
+ addTable(theSchema, tableDef);
+ }
+ }
+ }
+
+ return theSchema;
+ }
+
+ private void addTable(final MutableSchema theSchema, final SimpleTableDef tableDef) {
+ final MutableTable table = tableDef.toTable().setSchema(theSchema);
+ final Column idColumn = table.getColumnByName(ElasticSearchUtils.FIELD_ID);
+ if (idColumn != null && idColumn instanceof MutableColumn) {
+ final MutableColumn mutableColumn = (MutableColumn) idColumn;
+ mutableColumn.setPrimaryKey(true);
+ }
+ theSchema.addTable(table);
+ }
+
+ @Override
+ protected String getMainSchemaName() throws MetaModelException {
+ return indexName;
+ }
+
+ /**
+ * Gets the name of the index that this {@link DataContext} is working on.
+ */
+ public String getIndexName() {
+ return indexName;
+ }
+
+ protected boolean limitMaxRowsIsSet(int maxRows) {
+ return (maxRows != -1);
+ }
+
+ protected static SimpleTableDef[] sortTables(final List<SimpleTableDef> result) {
+ final SimpleTableDef[] tableDefArray = result.toArray(new SimpleTableDef[result.size()]);
+ Arrays.sort(tableDefArray, (o1, o2) -> o1.getName().compareTo(o2.getName()));
+ return tableDefArray;
+ }
+
+}
diff --git a/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/AbstractElasticSearchDataSet.java b/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/AbstractElasticSearchDataSet.java
new file mode 100644
index 0000000..fea2190
--- /dev/null
+++ b/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/AbstractElasticSearchDataSet.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.elasticsearch;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.metamodel.data.AbstractDataSet;
+import org.apache.metamodel.data.DataSet;
+import org.apache.metamodel.data.Row;
+import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils;
+import org.apache.metamodel.query.SelectItem;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.search.SearchHit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link DataSet} implementation for ElasticSearch
+ */
+public abstract class AbstractElasticSearchDataSet extends AbstractDataSet {
+
+ private static final Logger logger = LoggerFactory.getLogger(AbstractElasticSearchDataSet.class);
+
+ protected final AtomicBoolean _closed;
+
+ protected SearchResponse _searchResponse;
+ protected SearchHit _currentHit;
+ protected int _hitIndex = 0;
+
+ public AbstractElasticSearchDataSet(final SearchResponse searchResponse, final List<SelectItem> selectItems) {
+ super(selectItems);
+ _searchResponse = searchResponse;
+ _closed = new AtomicBoolean(false);
+ }
+
+ @Override
+ public void close() {
+ super.close();
+ boolean closeNow = _closed.compareAndSet(true, false);
+ if (closeNow) {
+ closeNow();
+ }
+ }
+
+ protected abstract void closeNow();
+
+ @Override
+ protected void finalize() throws Throwable {
+ super.finalize();
+ if (!_closed.get()) {
+ logger.warn("finalize() invoked, but DataSet is not closed. Invoking close() on {}", this);
+ close();
+ }
+ }
+
+ @Override
+ public boolean next() {
+ final SearchHit[] hits = _searchResponse.getHits().getHits();
+ if (hits.length == 0) {
+ // break condition for the scroll
+ _currentHit = null;
+ return false;
+ }
+
+ if (_hitIndex < hits.length) {
+ // pick the next hit within this search response
+ _currentHit = hits[_hitIndex];
+ _hitIndex++;
+ return true;
+ }
+
+ final String scrollId = _searchResponse.getScrollId();
+ if (scrollId == null) {
+ // this search response is not scrolleable - then it's the end.
+ _currentHit = null;
+ return false;
+ }
+
+ // try to scroll to the next set of hits
+ try {
+ _searchResponse = scrollSearchResponse(scrollId);
+ } catch (IOException e) {
+ logger.warn("Failed to scroll to the next search response set.", e);
+ return false;
+ }
+
+ // start over (recursively)
+ _hitIndex = 0;
+ return next();
+ }
+
+ protected abstract SearchResponse scrollSearchResponse(final String scrollId) throws IOException;
+
+ @Override
+ public Row getRow() {
+ if (_currentHit == null) {
+ return null;
+ }
+
+ final Map<String, Object> source = _currentHit.getSource();
+ final String documentId = _currentHit.getId();
+ return ElasticSearchUtils.createRow(source, documentId, getHeader());
+ }
+}
diff --git a/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchDateConverter.java b/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchDateConverter.java
index a6ce656..652fbe6 100644
--- a/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchDateConverter.java
+++ b/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchDateConverter.java
@@ -20,6 +20,7 @@
import org.apache.metamodel.util.TimeComparator;
+import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
@@ -30,12 +31,22 @@
*/
public final class ElasticSearchDateConverter {
+ private static final DateFormat DEFAULT_DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss");
+ private static final DateFormat FALLBACK_DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX");
+
public static Date tryToConvert(String dateAsString) {
+ if (dateAsString == null) {
+ return null;
+ }
+
try {
- SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX");
- return dateFormat.parse(dateAsString);
+ return DEFAULT_DATE_FORMAT.parse(dateAsString);
} catch (ParseException e) {
- return TimeComparator.toDate(dateAsString);
+ try {
+ return FALLBACK_DATE_FORMAT.parse(dateAsString);
+ } catch (ParseException e1) {
+ return TimeComparator.toDate(dateAsString);
+ }
}
}
}
diff --git a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchMetaDataParser.java b/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchMetaDataParser.java
similarity index 93%
rename from elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchMetaDataParser.java
rename to elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchMetaDataParser.java
index c0a1232..32f07ff 100644
--- a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchMetaDataParser.java
+++ b/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchMetaDataParser.java
@@ -16,13 +16,11 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.metamodel.elasticsearch.nativeclient;
+package org.apache.metamodel.elasticsearch.common;
import java.util.Map;
import java.util.Map.Entry;
-import org.apache.metamodel.elasticsearch.common.ElasticSearchMetaData;
-import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils;
import org.apache.metamodel.schema.ColumnType;
/**
diff --git a/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchUtils.java b/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchUtils.java
index b298d11..9128182 100644
--- a/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchUtils.java
+++ b/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchUtils.java
@@ -18,72 +18,43 @@
*/
package org.apache.metamodel.elasticsearch.common;
-import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Date;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import org.apache.metamodel.data.DataSetHeader;
+import org.apache.metamodel.data.DefaultRow;
+import org.apache.metamodel.data.Row;
import org.apache.metamodel.query.FilterItem;
import org.apache.metamodel.query.LogicalOperator;
import org.apache.metamodel.query.OperatorType;
+import org.apache.metamodel.query.SelectItem;
import org.apache.metamodel.schema.Column;
import org.apache.metamodel.schema.ColumnType;
import org.apache.metamodel.schema.MutableColumn;
import org.apache.metamodel.schema.MutableTable;
import org.apache.metamodel.util.CollectionUtils;
-import org.elasticsearch.common.base.Strings;
+import org.elasticsearch.common.Strings;
import org.elasticsearch.index.query.BoolQueryBuilder;
-import org.elasticsearch.index.query.FilterBuilder;
+import org.elasticsearch.index.query.ExistsQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
public class ElasticSearchUtils {
- private static final Logger logger = LoggerFactory.getLogger(ElasticSearchUtils.class);
-
public static final String FIELD_ID = "_id";
public static final String SYSTEM_PROPERTY_STRIP_INVALID_FIELD_CHARS = "metamodel.elasticsearch.strip_invalid_field_chars";
- /**
- * Gets a "filter" query which is both 1.x and 2.x compatible.
- */
- private static QueryBuilder getFilteredQuery(String prefix, String fieldName) {
- // 1.x: itemQueryBuilder = QueryBuilders.filteredQuery(null,
- // FilterBuilders.missingFilter(fieldName));
- // 2.x: itemQueryBuilder =
- // QueryBuilders.boolQuery().must(QueryBuilders.missingQuery(fieldName));
- try {
- try {
- Method method = QueryBuilders.class.getDeclaredMethod(prefix + "Query", String.class);
- method.setAccessible(true);
- return QueryBuilders.boolQuery().must((QueryBuilder) method.invoke(null, fieldName));
- } catch (NoSuchMethodException e) {
- Class<?> clazz = ElasticSearchUtils.class.getClassLoader().loadClass(
- "org.elasticsearch.index.query.FilterBuilders");
- Method filterBuilderMethod = clazz.getDeclaredMethod(prefix + "Filter", String.class);
- filterBuilderMethod.setAccessible(true);
- Method queryBuildersFilteredQueryMethod = QueryBuilders.class.getDeclaredMethod("filteredQuery",
- QueryBuilder.class, FilterBuilder.class);
- return (QueryBuilder) queryBuildersFilteredQueryMethod.invoke(null, null, filterBuilderMethod.invoke(
- null, fieldName));
- }
- } catch (Exception e) {
- logger.error("Failed to resolve/invoke filtering method", e);
- throw new IllegalStateException("Failed to resolve filtering method", e);
- }
- }
-
public static QueryBuilder getMissingQuery(String fieldName) {
- return getFilteredQuery("missing", fieldName);
+ return new BoolQueryBuilder().mustNot(new ExistsQueryBuilder(fieldName));
}
public static QueryBuilder getExistsQuery(String fieldName) {
- return getFilteredQuery("exists", fieldName);
+ return new ExistsQueryBuilder(fieldName);
}
public static Map<String, ?> getMappingSource(final MutableTable table) {
@@ -170,7 +141,7 @@
}
if (type.isLiteral()) {
- return "string";
+ return "text";
} else if (type == ColumnType.FLOAT) {
return "float";
} else if (type == ColumnType.DOUBLE || type == ColumnType.NUMERIC || type == ColumnType.NUMBER) {
@@ -294,4 +265,35 @@
}
return columnType;
}
+
+ public static Row createRow(final Map<String, Object> sourceMap, final String documentId, final DataSetHeader header) {
+ final Object[] values = new Object[header.size()];
+ for (int i = 0; i < values.length; i++) {
+ final SelectItem selectItem = header.getSelectItem(i);
+ final Column column = selectItem.getColumn();
+
+ assert column != null;
+ assert selectItem.getAggregateFunction() == null;
+ assert selectItem.getScalarFunction() == null;
+
+ if (column.isPrimaryKey()) {
+ values[i] = documentId;
+ } else {
+ Object value = sourceMap.get(column.getName());
+
+ if (column.getType() == ColumnType.DATE) {
+ Date valueToDate = ElasticSearchDateConverter.tryToConvert((String) value);
+ if (valueToDate == null) {
+ values[i] = value;
+ } else {
+ values[i] = valueToDate;
+ }
+ } else {
+ values[i] = value;
+ }
+ }
+ }
+
+ return new DefaultRow(header, values);
+ }
}
diff --git a/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchUtilsTest.java b/elasticsearch/common/src/test/java/org/apache/metamodel/elasticsearch/common/ElasticSearchUtilsTest.java
similarity index 91%
rename from elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchUtilsTest.java
rename to elasticsearch/common/src/test/java/org/apache/metamodel/elasticsearch/common/ElasticSearchUtilsTest.java
index 9ffc6b8..9fb7e03 100644
--- a/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchUtilsTest.java
+++ b/elasticsearch/common/src/test/java/org/apache/metamodel/elasticsearch/common/ElasticSearchUtilsTest.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.metamodel.elasticsearch.nativeclient;
+package org.apache.metamodel.elasticsearch.common;
import junit.framework.TestCase;
import org.apache.metamodel.data.DataSetHeader;
@@ -38,7 +38,7 @@
DataSetHeader header = new SimpleDataSetHeader(selectItems1);
Map<String, Object> values = new HashMap<>();
values.put("value1", "theValue");
- Row row = NativeElasticSearchUtils.createRow(values, documentId, header);
+ Row row = ElasticSearchUtils.createRow(values, documentId, header);
String primaryKeyValue = (String) row.getValue(primaryKeyItem);
assertEquals(primaryKeyValue, documentId);
@@ -53,7 +53,7 @@
Map<String, Object> values = new HashMap<>();
values.put("value1", "theValue");
values.put("value2", "2013-01-04T15:55:51.217+01:00");
- Row row = NativeElasticSearchUtils.createRow(values, documentId, header);
+ Row row = ElasticSearchUtils.createRow(values, documentId, header);
Object stringValue = row.getValue(item1);
Object dateValue = row.getValue(item2);
diff --git a/elasticsearch/native/pom.xml b/elasticsearch/native/pom.xml
index 4c1abcf..6adca36 100644
--- a/elasticsearch/native/pom.xml
+++ b/elasticsearch/native/pom.xml
@@ -43,6 +43,17 @@
<artifactId>elasticsearch</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.elasticsearch.client</groupId>
+ <artifactId>transport</artifactId>
+ <version>${elasticsearch.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
<!-- test -->
<dependency>
@@ -53,7 +64,32 @@
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
+ <version>4.12</version>
<scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-core</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.elasticsearch.test</groupId>
+ <artifactId>framework</artifactId>
+ <version>${elasticsearch.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-core</artifactId>
+ <version>2.9.1</version>
+ <scope>test</scope>
</dependency>
</dependencies>
diff --git a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchCreateTableBuilder.java b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchCreateTableBuilder.java
index f27e8ac..4e5873c 100644
--- a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchCreateTableBuilder.java
+++ b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchCreateTableBuilder.java
@@ -27,6 +27,7 @@
import org.apache.metamodel.schema.MutableTable;
import org.apache.metamodel.schema.Schema;
import org.apache.metamodel.schema.Table;
+import org.elasticsearch.action.admin.indices.mapping.put.PutMappingAction;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequestBuilder;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
import org.elasticsearch.client.IndicesAdminClient;
@@ -50,13 +51,16 @@
final IndicesAdminClient indicesAdmin = dataContext.getElasticSearchClient().admin().indices();
final String indexName = dataContext.getIndexName();
- final PutMappingRequestBuilder requestBuilder = new PutMappingRequestBuilder(indicesAdmin).setIndices(indexName)
- .setType(table.getName());
+ final PutMappingRequestBuilder requestBuilder =
+ new PutMappingRequestBuilder(indicesAdmin, PutMappingAction.INSTANCE).setIndices(indexName)
+ .setType(table.getName());
requestBuilder.setSource(source);
final PutMappingResponse result = requestBuilder.execute().actionGet();
logger.debug("PutMapping response: acknowledged={}", result.isAcknowledged());
+ dataContext.getElasticSearchClient().admin().indices().prepareRefresh(indexName).get();
+
final MutableSchema schema = (MutableSchema) getSchema();
schema.addTable(table);
return table;
diff --git a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContext.java b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContext.java
index d2dfe4b..3df0ce1 100644
--- a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContext.java
+++ b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContext.java
@@ -18,39 +18,30 @@
*/
package org.apache.metamodel.elasticsearch.nativeclient;
-import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.metamodel.DataContext;
-import org.apache.metamodel.MetaModelException;
-import org.apache.metamodel.QueryPostprocessDataContext;
import org.apache.metamodel.UpdateScript;
import org.apache.metamodel.UpdateSummary;
-import org.apache.metamodel.UpdateableDataContext;
import org.apache.metamodel.data.DataSet;
import org.apache.metamodel.data.DataSetHeader;
import org.apache.metamodel.data.Row;
import org.apache.metamodel.data.SimpleDataSetHeader;
+import org.apache.metamodel.elasticsearch.AbstractElasticSearchDataContext;
import org.apache.metamodel.elasticsearch.common.ElasticSearchMetaData;
+import org.apache.metamodel.elasticsearch.common.ElasticSearchMetaDataParser;
import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils;
import org.apache.metamodel.query.FilterItem;
import org.apache.metamodel.query.LogicalOperator;
import org.apache.metamodel.query.SelectItem;
import org.apache.metamodel.schema.Column;
-import org.apache.metamodel.schema.MutableColumn;
-import org.apache.metamodel.schema.MutableSchema;
-import org.apache.metamodel.schema.MutableTable;
-import org.apache.metamodel.schema.Schema;
import org.apache.metamodel.schema.Table;
import org.apache.metamodel.util.SimpleTableDef;
-import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequestBuilder;
-import org.elasticsearch.action.count.CountResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
@@ -59,14 +50,16 @@
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.common.collect.ImmutableOpenMap;
-import org.elasticsearch.common.hppc.ObjectLookupContainer;
-import org.elasticsearch.common.hppc.cursors.ObjectCursor;
-import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.index.query.TermQueryBuilder;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.carrotsearch.hppc.ObjectLookupContainer;
+import com.carrotsearch.hppc.cursors.ObjectCursor;
+
/**
* DataContext implementation for ElasticSearch analytics engine.
*
@@ -84,20 +77,11 @@
* This implementation supports either automatic discovery of a schema or manual
* specification of a schema, through the {@link SimpleTableDef} class.
*/
-public class ElasticSearchDataContext extends QueryPostprocessDataContext implements DataContext, UpdateableDataContext {
+public class ElasticSearchDataContext extends AbstractElasticSearchDataContext {
private static final Logger logger = LoggerFactory.getLogger(ElasticSearchDataContext.class);
- public static final TimeValue TIMEOUT_SCROLL = TimeValue.timeValueSeconds(60);
-
private final Client elasticSearchClient;
- private final String indexName;
- // Table definitions that are set from the beginning, not supposed to be
- // changed.
- private final List<SimpleTableDef> staticTableDefinitions;
-
- // Table definitions that are discovered, these can change
- private final List<SimpleTableDef> dynamicTableDefinitions = new ArrayList<>();
/**
* Constructs a {@link ElasticSearchDataContext}. This constructor accepts a
@@ -113,16 +97,12 @@
* and column model of the ElasticSearch index.
*/
public ElasticSearchDataContext(Client client, String indexName, SimpleTableDef... tableDefinitions) {
- super(false);
+ super(indexName, tableDefinitions);
+
if (client == null) {
throw new IllegalArgumentException("ElasticSearch Client cannot be null");
}
- if (indexName == null || indexName.trim().length() == 0) {
- throw new IllegalArgumentException("Invalid ElasticSearch Index name: " + indexName);
- }
this.elasticSearchClient = client;
- this.indexName = indexName;
- this.staticTableDefinitions = Arrays.asList(tableDefinitions);
this.dynamicTableDefinitions.addAll(Arrays.asList(detectSchema()));
}
@@ -140,40 +120,14 @@
this(client, indexName, new SimpleTableDef[0]);
}
- /**
- * Performs an analysis of the available indexes in an ElasticSearch cluster
- * {@link Client} instance and detects the elasticsearch types structure
- * based on the metadata provided by the ElasticSearch java client.
- *
- * @see {@link #detectTable(ClusterState, String, String)}
- * @return a mutable schema instance, useful for further fine tuning by the
- * user.
- */
- private SimpleTableDef[] detectSchema() {
+ @Override
+ protected SimpleTableDef[] detectSchema() {
logger.info("Detecting schema for index '{}'", indexName);
- final ClusterState cs;
final ClusterStateRequestBuilder clusterStateRequestBuilder = getElasticSearchClient().admin().cluster()
- .prepareState();
-
- // different methods here to set the index name, so we have to use
- // reflection :-/
- try {
- final byte majorVersion = Version.CURRENT.major;
- final Object methodArgument = new String[] { indexName };
- if (majorVersion == 0) {
- final Method method = ClusterStateRequestBuilder.class.getMethod("setFilterIndices", String[].class);
- method.invoke(clusterStateRequestBuilder, methodArgument);
- } else {
- final Method method = ClusterStateRequestBuilder.class.getMethod("setIndices", String[].class);
- method.invoke(clusterStateRequestBuilder, methodArgument);
- }
- } catch (Exception e) {
- logger.error("Failed to set index name on ClusterStateRequestBuilder, version {}", Version.CURRENT, e);
- throw new MetaModelException("Failed to create request for index information needed to detect schema", e);
- }
- cs = clusterStateRequestBuilder.execute().actionGet().getState();
-
+ .prepareState().setIndices(indexName);
+ final ClusterState cs = clusterStateRequestBuilder.execute().actionGet().getState();
+
final List<SimpleTableDef> result = new ArrayList<>();
final IndexMetaData imd = cs.getMetaData().index(indexName);
@@ -183,9 +137,8 @@
} else {
final ImmutableOpenMap<String, MappingMetaData> mappings = imd.getMappings();
final ObjectLookupContainer<String> documentTypes = mappings.keys();
-
- for (final Object documentTypeCursor : documentTypes) {
- final String documentType = ((ObjectCursor<?>) documentTypeCursor).value.toString();
+ for (final ObjectCursor<?> documentTypeCursor : documentTypes) {
+ final String documentType = documentTypeCursor.value.toString();
try {
final SimpleTableDef table = detectTable(cs, indexName, documentType);
result.add(table);
@@ -194,15 +147,7 @@
}
}
}
- final SimpleTableDef[] tableDefArray = result.toArray(new SimpleTableDef[result.size()]);
- Arrays.sort(tableDefArray, new Comparator<SimpleTableDef>() {
- @Override
- public int compare(SimpleTableDef o1, SimpleTableDef o2) {
- return o1.getName().compareTo(o2.getName());
- }
- });
-
- return tableDefArray;
+ return sortTables(result);
}
/**
@@ -225,7 +170,8 @@
// index does not exist
throw new IllegalArgumentException("No such index: " + indexName);
}
- final MappingMetaData mappingMetaData = imd.mapping(documentType);
+ final ImmutableOpenMap<String, MappingMetaData> mappings = imd.getMappings();
+ final MappingMetaData mappingMetaData = mappings.get(documentType);
if (mappingMetaData == null) {
throw new IllegalArgumentException("No such document type in index '" + indexName + "': " + documentType);
}
@@ -244,44 +190,6 @@
}
@Override
- protected Schema getMainSchema() throws MetaModelException {
- final MutableSchema theSchema = new MutableSchema(getMainSchemaName());
- for (final SimpleTableDef tableDef : staticTableDefinitions) {
- addTable(theSchema, tableDef);
- }
-
- final SimpleTableDef[] tables = detectSchema();
- synchronized (this) {
- dynamicTableDefinitions.clear();
- dynamicTableDefinitions.addAll(Arrays.asList(tables));
- for (final SimpleTableDef tableDef : dynamicTableDefinitions) {
- final List<String> tableNames = theSchema.getTableNames();
-
- if (!tableNames.contains(tableDef.getName())) {
- addTable(theSchema, tableDef);
- }
- }
- }
-
- return theSchema;
- }
-
- private void addTable(final MutableSchema theSchema, final SimpleTableDef tableDef) {
- final MutableTable table = tableDef.toTable().setSchema(theSchema);
- final Column idColumn = table.getColumnByName(ElasticSearchUtils.FIELD_ID);
- if (idColumn != null && idColumn instanceof MutableColumn) {
- final MutableColumn mutableColumn = (MutableColumn) idColumn;
- mutableColumn.setPrimaryKey(true);
- }
- theSchema.addTable(table);
- }
-
- @Override
- protected String getMainSchemaName() throws MetaModelException {
- return indexName;
- }
-
- @Override
protected DataSet materializeMainSchemaTable(Table table, List<SelectItem> selectItems,
List<FilterItem> whereItems, int firstRow, int maxRows) {
final QueryBuilder queryBuilder = ElasticSearchUtils.createQueryBuilderForSimpleWhere(whereItems,
@@ -290,7 +198,7 @@
// where clause can be pushed down to an ElasticSearch query
final SearchRequestBuilder searchRequest = createSearchRequest(table, firstRow, maxRows, queryBuilder);
final SearchResponse response = searchRequest.execute().actionGet();
- return new ElasticSearchDataSet(elasticSearchClient, response, selectItems, false);
+ return new ElasticSearchDataSet(getElasticSearchClient(), response, selectItems);
}
return super.materializeMainSchemaTable(table, selectItems, whereItems, firstRow, maxRows);
}
@@ -299,12 +207,13 @@
protected DataSet materializeMainSchemaTable(Table table, List<Column> columns, int maxRows) {
final SearchRequestBuilder searchRequest = createSearchRequest(table, 1, maxRows, null);
final SearchResponse response = searchRequest.execute().actionGet();
- return new ElasticSearchDataSet(elasticSearchClient, response, columns.stream().map(SelectItem::new).collect(Collectors.toList()), false);
+ return new ElasticSearchDataSet(getElasticSearchClient(), response, columns.stream().map(SelectItem::new)
+ .collect(Collectors.toList()));
}
private SearchRequestBuilder createSearchRequest(Table table, int firstRow, int maxRows, QueryBuilder queryBuilder) {
final String documentType = table.getName();
- final SearchRequestBuilder searchRequest = elasticSearchClient.prepareSearch(indexName).setTypes(documentType);
+ final SearchRequestBuilder searchRequest = getElasticSearchClient().prepareSearch(indexName).setTypes(documentType);
if (firstRow > 1) {
final int zeroBasedFrom = firstRow - 1;
searchRequest.setFrom(zeroBasedFrom);
@@ -332,7 +241,7 @@
final String documentType = table.getName();
final String id = keyValue.toString();
- final GetResponse response = elasticSearchClient.prepareGet(indexName, documentType, id).execute().actionGet();
+ final GetResponse response = getElasticSearchClient().prepareGet(indexName, documentType, id).execute().actionGet();
if (!response.isExists()) {
return null;
@@ -343,7 +252,7 @@
final DataSetHeader header = new SimpleDataSetHeader(selectItems);
- return NativeElasticSearchUtils.createRow(source, documentId, header);
+ return ElasticSearchUtils.createRow(source, documentId, header);
}
@Override
@@ -353,13 +262,11 @@
return null;
}
final String documentType = table.getName();
- final CountResponse response = elasticSearchClient.prepareCount(indexName)
- .setQuery(QueryBuilders.termQuery("_type", documentType)).execute().actionGet();
- return response.getCount();
- }
-
- private boolean limitMaxRowsIsSet(int maxRows) {
- return (maxRows != -1);
+ final TermQueryBuilder query = QueryBuilders.termQuery("_type", documentType);
+ final SearchResponse searchResponse =
+ getElasticSearchClient().prepareSearch(indexName).setSource(new SearchSourceBuilder().size(0).query(query))
+ .execute().actionGet();
+ return searchResponse.getHits().getTotalHits();
}
@Override
@@ -370,6 +277,13 @@
return callback.getUpdateSummary();
}
+ @Override
+ protected void onSchemaCacheRefreshed() {
+ getElasticSearchClient().admin().indices().prepareRefresh(indexName).get();
+
+ detectSchema();
+ }
+
/**
* Gets the {@link Client} that this {@link DataContext} is wrapping.
*
@@ -378,13 +292,4 @@
public Client getElasticSearchClient() {
return elasticSearchClient;
}
-
- /**
- * Gets the name of the index that this {@link DataContext} is working on.
- *
- * @return
- */
- public String getIndexName() {
- return indexName;
- }
}
diff --git a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContextFactory.java b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContextFactory.java
index 94359c4..a6f6953 100644
--- a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContextFactory.java
+++ b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContextFactory.java
@@ -18,6 +18,9 @@
*/
package org.apache.metamodel.elasticsearch.nativeclient;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
import org.apache.metamodel.ConnectionException;
import org.apache.metamodel.DataContext;
import org.apache.metamodel.factory.DataContextFactory;
@@ -27,12 +30,11 @@
import org.apache.metamodel.util.SimpleTableDef;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
-import org.elasticsearch.common.settings.ImmutableSettings;
-import org.elasticsearch.common.settings.ImmutableSettings.Builder;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
-import org.elasticsearch.node.Node;
-import org.elasticsearch.node.NodeBuilder;
+import org.elasticsearch.transport.client.PreBuiltTransportClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Factory for ElasticSearch data context of native type.
@@ -59,6 +61,7 @@
* </ul>
*/
public class ElasticSearchDataContextFactory implements DataContextFactory {
+ private static final Logger logger = LoggerFactory.getLogger(ElasticSearchDataContextFactory.class);
@Override
public boolean accepts(DataContextProperties properties, ResourceFactoryRegistry resourceFactoryRegistry) {
@@ -120,46 +123,22 @@
@Override
public DataContext create(DataContextProperties properties, ResourceFactoryRegistry resourceFactoryRegistry)
throws UnsupportedDataContextPropertiesException, ConnectionException {
- final String clientType = getClientType(properties);
final Client client;
- if ("node".equals(clientType)) {
- client = createNodeClient(properties);
- } else {
client = createTransportClient(properties);
- }
final String indexName = getIndex(properties);
final SimpleTableDef[] tableDefinitions = properties.getTableDefs();
return new ElasticSearchDataContext(client, indexName, tableDefinitions);
}
private Client createTransportClient(DataContextProperties properties) {
- final Builder settingsBuilder = ImmutableSettings.builder();
- settingsBuilder.put("name", "MetaModel");
- settingsBuilder.put("cluster.name", getCluster(properties));
- if (properties.getUsername() != null && properties.getPassword() != null) {
- settingsBuilder.put("shield.user", properties.getUsername() + ":" + properties.getPassword());
- if ("true".equals(properties.toMap().get("ssl"))) {
- if (properties.toMap().get("keystorePath") != null) {
- settingsBuilder.put("shield.ssl.keystore.path", properties.toMap().get("keystorePath"));
- settingsBuilder.put("shield.ssl.keystore.password", properties.toMap().get("keystorePassword"));
- }
- settingsBuilder.put("shield.transport.ssl", "true");
- }
+ final Settings settings = Settings.builder().put().put("name", "MetaModel").put("cluster.name", getCluster(properties)).build();
+ final TransportClient client = new PreBuiltTransportClient(settings);
+ try {
+ client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(properties.getHostname()), properties.getPort()));
+ } catch (UnknownHostException e) {
+ logger.warn("no IP address for the host with name \"{}\" could be found.", properties.getHostname());
}
- final Settings settings = settingsBuilder.build();
-
- final TransportClient client = new TransportClient(settings);
- client.addTransportAddress(new InetSocketTransportAddress(properties.getHostname(), properties.getPort()));
return client;
}
- private Client createNodeClient(DataContextProperties properties) {
- final Builder settingsBuilder = ImmutableSettings.builder();
- settingsBuilder.put("name", "MetaModel");
- settingsBuilder.put("shield.enabled", false);
- final Settings settings = settingsBuilder.build();
- final Node node = NodeBuilder.nodeBuilder().clusterName(getCluster(properties)).client(true).settings(settings)
- .node();
- return node.client();
- }
}
diff --git a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataSet.java b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataSet.java
index 4ced2c8..b616eb2 100644
--- a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataSet.java
+++ b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataSet.java
@@ -19,104 +19,38 @@
package org.apache.metamodel.elasticsearch.nativeclient;
import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.metamodel.data.AbstractDataSet;
import org.apache.metamodel.data.DataSet;
-import org.apache.metamodel.data.Row;
+import org.apache.metamodel.elasticsearch.AbstractElasticSearchDataSet;
import org.apache.metamodel.query.SelectItem;
+import org.elasticsearch.action.search.ClearScrollAction;
import org.elasticsearch.action.search.ClearScrollRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
-import org.elasticsearch.search.SearchHit;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* {@link DataSet} implementation for ElasticSearch
*/
-final class ElasticSearchDataSet extends AbstractDataSet {
-
- private static final Logger logger = LoggerFactory.getLogger(ElasticSearchDataSet.class);
+final class ElasticSearchDataSet extends AbstractElasticSearchDataSet {
private final Client _client;
- private final AtomicBoolean _closed;
- private SearchResponse _searchResponse;
- private SearchHit _currentHit;
- private int _hitIndex = 0;
-
- public ElasticSearchDataSet(Client client, SearchResponse searchResponse, List<SelectItem> selectItems,
- boolean queryPostProcessed) {
- super(selectItems);
+ public ElasticSearchDataSet(final Client client, final SearchResponse searchResponse,
+ final List<SelectItem> selectItems) {
+ super(searchResponse, selectItems);
_client = client;
- _searchResponse = searchResponse;
- _closed = new AtomicBoolean(false);
- }
-
-
- @Override
- public void close() {
- super.close();
- boolean closeNow = _closed.compareAndSet(true, false);
- if (closeNow) {
- ClearScrollRequestBuilder scrollRequestBuilder = new ClearScrollRequestBuilder(_client)
- .addScrollId(_searchResponse.getScrollId());
- scrollRequestBuilder.execute();
- }
}
@Override
- protected void finalize() throws Throwable {
- super.finalize();
- if (!_closed.get()) {
- logger.warn("finalize() invoked, but DataSet is not closed. Invoking close() on {}", this);
- close();
- }
+ public void closeNow() {
+ ClearScrollRequestBuilder scrollRequestBuilder = new ClearScrollRequestBuilder(_client,
+ ClearScrollAction.INSTANCE).addScrollId(_searchResponse.getScrollId());
+ scrollRequestBuilder.execute();
}
@Override
- public boolean next() {
- final SearchHit[] hits = _searchResponse.getHits().hits();
- if (hits.length == 0) {
- // break condition for the scroll
- _currentHit = null;
- return false;
- }
-
- if (_hitIndex < hits.length) {
- // pick the next hit within this search response
- _currentHit = hits[_hitIndex];
- _hitIndex++;
- return true;
- }
-
- final String scrollId = _searchResponse.getScrollId();
- if (scrollId == null) {
- // this search response is not scrolleable - then it's the end.
- _currentHit = null;
- return false;
- }
-
- // try to scroll to the next set of hits
- _searchResponse = _client.prepareSearchScroll(scrollId).setScroll(ElasticSearchDataContext.TIMEOUT_SCROLL)
+ protected SearchResponse scrollSearchResponse(final String scrollId) {
+ return _client.prepareSearchScroll(scrollId).setScroll(ElasticSearchDataContext.TIMEOUT_SCROLL)
.execute().actionGet();
-
- // start over (recursively)
- _hitIndex = 0;
- return next();
- }
-
- @Override
- public Row getRow() {
- if (_currentHit == null) {
- return null;
- }
-
- final Map<String, Object> source = _currentHit.getSource();
- final String documentId = _currentHit.getId();
- final Row row = NativeElasticSearchUtils.createRow(source, documentId, getHeader());
- return row;
}
}
diff --git a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDeleteBuilder.java b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDeleteBuilder.java
index 0de2a71..2db8e8c 100644
--- a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDeleteBuilder.java
+++ b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDeleteBuilder.java
@@ -18,6 +18,7 @@
*/
package org.apache.metamodel.elasticsearch.nativeclient;
+import java.util.Iterator;
import java.util.List;
import org.apache.metamodel.MetaModelException;
@@ -27,9 +28,11 @@
import org.apache.metamodel.query.FilterItem;
import org.apache.metamodel.query.LogicalOperator;
import org.apache.metamodel.schema.Table;
-import org.elasticsearch.action.deletebyquery.DeleteByQueryRequestBuilder;
+import org.elasticsearch.action.delete.DeleteResponse;
+import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.search.SearchHit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,10 +60,6 @@
final Client client = dataContext.getElasticSearchClient();
final String indexName = dataContext.getIndexName();
- final DeleteByQueryRequestBuilder deleteByQueryRequestBuilder = new DeleteByQueryRequestBuilder(client);
- deleteByQueryRequestBuilder.setIndices(indexName);
- deleteByQueryRequestBuilder.setTypes(documentType);
-
final List<FilterItem> whereItems = getWhereItems();
// delete by query - note that creteQueryBuilderForSimpleWhere may
@@ -74,9 +73,21 @@
throw new UnsupportedOperationException("Could not push down WHERE items to delete by query request: "
+ whereItems);
}
- deleteByQueryRequestBuilder.setQuery(queryBuilder);
- deleteByQueryRequestBuilder.execute().actionGet();
- logger.debug("Deleted documents by query.");
+ final SearchResponse response =
+ client.prepareSearch(indexName).setQuery(queryBuilder).setTypes(documentType).execute()
+ .actionGet();
+
+ client.admin().indices().prepareRefresh(indexName).execute().actionGet();
+ final Iterator<SearchHit> iterator = response.getHits().iterator();
+ while (iterator.hasNext()) {
+ final SearchHit hit = iterator.next();
+ final String typeId = hit.getId();
+ final DeleteResponse deleteResponse =
+ client.prepareDelete().setIndex(indexName).setType(documentType).setId(typeId).execute()
+ .actionGet();
+ logger.debug("Deleted documents by query." + deleteResponse.getResult());
+ }
+ client.admin().indices().prepareRefresh(indexName).execute().actionGet();
}
}
diff --git a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDropTableBuilder.java b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDropTableBuilder.java
deleted file mode 100644
index d66b240..0000000
--- a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDropTableBuilder.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.metamodel.elasticsearch.nativeclient;
-
-import java.lang.reflect.Method;
-
-import org.apache.metamodel.MetaModelException;
-import org.apache.metamodel.drop.AbstractTableDropBuilder;
-import org.apache.metamodel.drop.TableDropBuilder;
-import org.apache.metamodel.schema.MutableSchema;
-import org.apache.metamodel.schema.Table;
-import org.elasticsearch.action.admin.indices.mapping.delete.DeleteMappingRequestBuilder;
-import org.elasticsearch.action.admin.indices.mapping.delete.DeleteMappingResponse;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.client.IndicesAdminClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * {@link TableDropBuilder} for dropping tables (document types) in an
- * ElasticSearch index.
- */
-final class ElasticSearchDropTableBuilder extends AbstractTableDropBuilder {
-
- private static final Logger logger = LoggerFactory.getLogger(ElasticSearchDropTableBuilder.class);
-
- private final ElasticSearchUpdateCallback _updateCallback;
-
- public ElasticSearchDropTableBuilder(ElasticSearchUpdateCallback updateCallback, Table table) {
- super(table);
- _updateCallback = updateCallback;
- }
-
- @Override
- public void execute() throws MetaModelException {
- final ElasticSearchDataContext dataContext = _updateCallback.getDataContext();
- final Table table = getTable();
- final String documentType = table.getName();
- logger.info("Deleting mapping / document type: {}", documentType);
- final Client client = dataContext.getElasticSearchClient();
- final IndicesAdminClient indicesAdminClient = client.admin().indices();
- final String indexName = dataContext.getIndexName();
-
- final DeleteMappingRequestBuilder requestBuilder = new DeleteMappingRequestBuilder(indicesAdminClient)
- .setIndices(indexName);
- setType(requestBuilder, documentType);
-
- final DeleteMappingResponse result = requestBuilder.execute().actionGet();
- logger.debug("Delete mapping response: acknowledged={}", result.isAcknowledged());
-
- final MutableSchema schema = (MutableSchema) table.getSchema();
- schema.removeTable(table);
- }
-
- /**
- * Invokes the {@link DeleteMappingRequestBuilder#setType(String...)} method
- * using reflection. This is done because the API of ElasticSearch was
- * changed and the method signature differes between different versions.
- *
- * @param requestBuilder
- * @param documentType
- */
- private void setType(DeleteMappingRequestBuilder requestBuilder, String documentType) {
- Object argument;
- Method method;
- try {
- try {
- method = requestBuilder.getClass().getDeclaredMethod("setType", String[].class);
- argument = new String[] {documentType};
- } catch (NoSuchMethodException e) {
- logger.debug("No setType(String[]) method found, trying with a single String instead", e);
- method = requestBuilder.getClass().getDeclaredMethod("setType", String.class);
- argument = documentType;
- }
- } catch (Exception e) {
- logger.error("Failed to resolve DeleteMappingRequestBuilder.setType(...) method", e);
- throw new IllegalStateException("Failed to resolve DeleteMappingRequestBuilder.setType(...) method", e);
- }
- try {
- method.setAccessible(true);
- method.invoke(requestBuilder, argument);
- } catch (Exception e) {
- logger.error("Failed to invoke {}", method, e);
- throw new IllegalStateException("Failed to invoke " + method, e);
- }
- }
-}
diff --git a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchInsertBuilder.java b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchInsertBuilder.java
index 70d31b4..a1c7f69 100644
--- a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchInsertBuilder.java
+++ b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchInsertBuilder.java
@@ -26,6 +26,7 @@
import org.apache.metamodel.insert.AbstractRowInsertionBuilder;
import org.apache.metamodel.schema.Column;
import org.apache.metamodel.schema.Table;
+import org.elasticsearch.action.index.IndexAction;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.Client;
@@ -46,7 +47,8 @@
final Client client = dataContext.getElasticSearchClient();
final String indexName = dataContext.getIndexName();
final String documentType = getTable().getName();
- final IndexRequestBuilder requestBuilder = new IndexRequestBuilder(client, indexName).setType(documentType);
+ final IndexRequestBuilder requestBuilder =
+ new IndexRequestBuilder(client, IndexAction.INSTANCE).setIndex(indexName).setType(documentType);
final Map<String, Object> valueMap = new HashMap<>();
final Column[] columns = getColumns();
@@ -68,11 +70,11 @@
assert !valueMap.isEmpty();
requestBuilder.setSource(valueMap);
- requestBuilder.setCreate(true);
final IndexResponse result = requestBuilder.execute().actionGet();
logger.debug("Inserted document: id={}", result.getId());
- }
+ client.admin().indices().prepareRefresh(indexName).execute().actionGet();
+ }
}
diff --git a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchUpdateBuilder.java b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchUpdateBuilder.java
new file mode 100644
index 0000000..eeee6fc
--- /dev/null
+++ b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchUpdateBuilder.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.elasticsearch.nativeclient;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.metamodel.MetaModelException;
+import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils;
+import org.apache.metamodel.query.FilterItem;
+import org.apache.metamodel.query.LogicalOperator;
+import org.apache.metamodel.schema.Column;
+import org.apache.metamodel.schema.Table;
+import org.apache.metamodel.update.AbstractRowUpdationBuilder;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.update.UpdateAction;
+import org.elasticsearch.action.update.UpdateRequestBuilder;
+import org.elasticsearch.action.update.UpdateResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.search.SearchHit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ElasticSearchUpdateBuilder extends AbstractRowUpdationBuilder {
+
+ private static final Logger logger = LoggerFactory.getLogger(ElasticSearchUpdateBuilder.class);
+
+ private final ElasticSearchUpdateCallback _updateCallback;
+
+ public ElasticSearchUpdateBuilder(ElasticSearchUpdateCallback updateCallback, Table table) {
+ super(table);
+ _updateCallback = updateCallback;
+ }
+
+ @Override
+ public void execute() throws MetaModelException {
+
+ final Table table = getTable();
+ final String documentType = table.getName();
+
+ final ElasticSearchDataContext dataContext = _updateCallback.getDataContext();
+ final Client client = dataContext.getElasticSearchClient();
+ final String indexName = dataContext.getIndexName();
+ final List<FilterItem> whereItems = getWhereItems();
+
+ // delete by query - note that creteQueryBuilderForSimpleWhere may
+ // return matchAllQuery() if no where items are present.
+ final QueryBuilder queryBuilder =
+ ElasticSearchUtils.createQueryBuilderForSimpleWhere(whereItems, LogicalOperator.AND);
+ if (queryBuilder == null) {
+ // TODO: The where items could not be pushed down to a query. We
+ // could solve this by running a query first, gather all
+ // document IDs and then delete by IDs.
+ throw new UnsupportedOperationException(
+ "Could not push down WHERE items to delete by query request: " + whereItems);
+ }
+
+ final SearchResponse response = client.prepareSearch(indexName).setQuery(queryBuilder).execute().actionGet();
+
+ final Iterator<SearchHit> iterator = response.getHits().iterator();
+ while (iterator.hasNext()) {
+ final SearchHit hit = iterator.next();
+ final String typeId = hit.getId();
+
+ final UpdateRequestBuilder requestBuilder =
+ new UpdateRequestBuilder(client, UpdateAction.INSTANCE).setIndex(indexName).setType(documentType)
+ .setId(typeId);
+
+ final Map<String, Object> valueMap = new HashMap<>();
+ final Column[] columns = getColumns();
+ final Object[] values = getValues();
+ for (int i = 0; i < columns.length; i++) {
+ if (isSet(columns[i])) {
+ final String name = columns[i].getName();
+ final Object value = values[i];
+ if (ElasticSearchUtils.FIELD_ID.equals(name)) {
+ if (value != null) {
+ requestBuilder.setId(value.toString());
+ }
+ } else {
+ valueMap.put(name, value);
+ }
+ }
+ }
+
+ assert !valueMap.isEmpty();
+
+ requestBuilder.setDoc(valueMap);
+
+ final UpdateResponse updateResponse = requestBuilder.execute().actionGet();
+
+ logger.debug("Update document: id={}", updateResponse.getId());
+
+ client.admin().indices().prepareRefresh(indexName).get();
+ }
+ }
+}
diff --git a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchUpdateCallback.java b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchUpdateCallback.java
index b81c9c7..c4cbbac 100644
--- a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchUpdateCallback.java
+++ b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchUpdateCallback.java
@@ -26,6 +26,7 @@
import org.apache.metamodel.insert.RowInsertionBuilder;
import org.apache.metamodel.schema.Schema;
import org.apache.metamodel.schema.Table;
+import org.apache.metamodel.update.RowUpdationBuilder;
import org.elasticsearch.client.Client;
/**
@@ -50,13 +51,13 @@
@Override
public boolean isDropTableSupported() {
- return true;
+ return false;
}
@Override
public TableDropBuilder dropTable(Table table) throws IllegalArgumentException, IllegalStateException,
UnsupportedOperationException {
- return new ElasticSearchDropTableBuilder(this, table);
+ throw new UnsupportedOperationException();
}
@Override
@@ -76,6 +77,11 @@
return new ElasticSearchDeleteBuilder(this, table);
}
+ @Override
+ public RowUpdationBuilder update(final Table table) {
+ return new ElasticSearchUpdateBuilder(this, table);
+ }
+
public void onExecuteUpdateFinished() {
// force refresh of the index
final ElasticSearchDataContext dataContext = getDataContext();
diff --git a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/NativeElasticSearchUtils.java b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/NativeElasticSearchUtils.java
deleted file mode 100644
index 822ef1b..0000000
--- a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/NativeElasticSearchUtils.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.metamodel.elasticsearch.nativeclient;
-
-import java.util.Date;
-import java.util.Map;
-
-import org.apache.metamodel.data.DataSetHeader;
-import org.apache.metamodel.data.DefaultRow;
-import org.apache.metamodel.data.Row;
-import org.apache.metamodel.elasticsearch.common.ElasticSearchDateConverter;
-import org.apache.metamodel.query.SelectItem;
-import org.apache.metamodel.schema.Column;
-import org.apache.metamodel.schema.ColumnType;
-
-/**
- * Shared/common util functions for the ElasticSearch MetaModel module.
- */
-final class NativeElasticSearchUtils {
-
- public static Row createRow(Map<String, Object> sourceMap, String documentId, DataSetHeader header) {
- final Object[] values = new Object[header.size()];
- for (int i = 0; i < values.length; i++) {
- final SelectItem selectItem = header.getSelectItem(i);
- final Column column = selectItem.getColumn();
-
- assert column != null;
- assert selectItem.getAggregateFunction() == null;
- assert selectItem.getScalarFunction() == null;
-
- if (column.isPrimaryKey()) {
- values[i] = documentId;
- } else {
- Object value = sourceMap.get(column.getName());
-
- if (column.getType() == ColumnType.DATE) {
- Date valueToDate = ElasticSearchDateConverter.tryToConvert((String) value);
- if (valueToDate == null) {
- values[i] = value;
- } else {
- values[i] = valueToDate;
- }
- } else {
- values[i] = value;
- }
- }
- }
-
- return new DefaultRow(header, values);
- }
-}
diff --git a/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContextTest.java b/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContextTest.java
index ec5ecba..983ba5e 100644
--- a/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContextTest.java
+++ b/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContextTest.java
@@ -19,12 +19,6 @@
package org.apache.metamodel.elasticsearch.nativeclient;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.Arrays;
@@ -45,9 +39,7 @@
import org.apache.metamodel.data.InMemoryDataSet;
import org.apache.metamodel.data.Row;
import org.apache.metamodel.delete.DeleteFrom;
-import org.apache.metamodel.drop.DropTable;
import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils;
-import org.apache.metamodel.elasticsearch.nativeclient.utils.EmbeddedElasticsearchServer;
import org.apache.metamodel.query.FunctionType;
import org.apache.metamodel.query.Query;
import org.apache.metamodel.query.SelectItem;
@@ -58,20 +50,18 @@
import org.apache.metamodel.schema.Table;
import org.apache.metamodel.update.Update;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
-import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
-import org.elasticsearch.action.admin.indices.mapping.delete.DeleteMappingRequestBuilder;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
-import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequestBuilder;
-import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.client.Client;
-import org.elasticsearch.client.IndicesAdminClient;
import org.elasticsearch.common.xcontent.XContentBuilder;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
+import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.test.ESSingleNodeTestCase;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
-public class ElasticSearchDataContextTest {
+public class ElasticSearchDataContextTest extends ESSingleNodeTestCase {
private static final String indexName = "twitter";
private static final String indexType1 = "tweet1";
@@ -81,14 +71,15 @@
private static final String bulkIndexType = "bulktype";
private static final String peopleIndexType = "peopletype";
private static final String mapping = "{\"date_detection\":\"false\",\"properties\":{\"message\":{\"type\":\"string\",\"index\":\"not_analyzed\",\"doc_values\":\"true\"}}}";
- private static EmbeddedElasticsearchServer embeddedElasticsearchServer;
- private static Client client;
- private static UpdateableDataContext dataContext;
+ private Client client;
+ private UpdateableDataContext dataContext;
- @BeforeClass
- public static void beforeTests() throws Exception {
- embeddedElasticsearchServer = new EmbeddedElasticsearchServer();
- client = embeddedElasticsearchServer.getClient();
+ @Before
+ public void beforeTests() throws Exception {
+ client = client();
+
+ dataContext = new ElasticSearchDataContext(client, indexName);
+
indexTweeterDocument(indexType1, 1);
indexTweeterDocument(indexType2, 1);
indexTweeterDocument(indexType2, 2, null);
@@ -96,15 +87,10 @@
indexTweeterDocument(indexType2, 1);
indexBulkDocuments(indexName, bulkIndexType, 10);
- // The refresh API allows to explicitly refresh one or more index,
- // making all operations performed since the last refresh available for
- // search
- embeddedElasticsearchServer.getClient().admin().indices().prepareRefresh().execute().actionGet();
- dataContext = new ElasticSearchDataContext(client, indexName);
- System.out.println("Embedded ElasticSearch server created!");
+ dataContext.refreshSchemas();
}
- private static void insertPeopleDocuments() throws IOException {
+ private void insertPeopleDocuments() throws IOException {
indexOnePeopleDocument("female", 20, 5);
indexOnePeopleDocument("female", 17, 8);
indexOnePeopleDocument("female", 18, 9);
@@ -116,10 +102,9 @@
indexOnePeopleDocument("male", 18, 4);
}
- @AfterClass
- public static void afterTests() {
- embeddedElasticsearchServer.shutdown();
- System.out.println("Embedded ElasticSearch server shut down!");
+ @After
+ public void afterTests() {
+ client.admin().indices().delete(new DeleteIndexRequest("_all")).actionGet();
}
@Test
@@ -128,9 +113,15 @@
Arrays.toString(dataContext.getDefaultSchema().getTableNames().toArray()));
Table table = dataContext.getDefaultSchema().getTableByName("tweet1");
+ try (DataSet ds = dataContext.query().from(indexType1).select("_id").execute()) {
+ assertEquals(ElasticSearchDataSet.class, ds.getClass());
+ assertTrue(ds.next());
+ assertEquals("Row[values=[tweet_tweet1_1]]",ds.getRow().toString());
+ }
assertEquals("[_id, message, postDate, user]", Arrays.toString(table.getColumnNames().toArray()));
+ assertEquals(ColumnType.STRING, table.getColumnByName("_id").getType());
assertEquals(ColumnType.STRING, table.getColumnByName("user").getType());
assertEquals(ColumnType.DATE, table.getColumnByName("postDate").getType());
assertEquals(ColumnType.BIGINT, table.getColumnByName("message").getType());
@@ -201,7 +192,7 @@
}
@Test
- public void testCreateTableInsertQueryAndDrop() throws Exception {
+ public void testCreateTableAndInsertQuery() throws Exception {
final Schema schema = dataContext.getDefaultSchema();
final CreateTable createTable = new CreateTable(schema, "testCreateTable");
createTable.withColumn("foo").ofType(ColumnType.STRING);
@@ -235,42 +226,42 @@
assertNotNull(ds.getRow().getValue(idColumn));
assertFalse(ds.next());
}
-
- dataContext.executeUpdate(new DropTable(table));
-
- dataContext.refreshSchemas();
-
- assertNull(dataContext.getTableByQualifiedLabel(table.getName()));
}
@Test
- public void testDetectOutsideChanges() throws Exception {
- ElasticSearchDataContext elasticSearchDataContext = (ElasticSearchDataContext) dataContext;
+ public void testDeleteFromWithWhere() throws Exception {
+ final Schema schema = dataContext.getDefaultSchema();
+ final String tableName = "testCreateTableDelete";
+ final CreateTable createTable = new CreateTable(schema, tableName);
+ createTable.withColumn("foo").ofType(ColumnType.STRING);
+ createTable.withColumn("bar").ofType(ColumnType.DOUBLE);
+ dataContext.executeUpdate(createTable);
- // Create the type in ES
- final IndicesAdminClient indicesAdmin = elasticSearchDataContext.getElasticSearchClient().admin().indices();
- final String tableType = "outsideTable";
+ final Table table = schema.getTableByName(tableName);
- Object[] sourceProperties = { "testA", "type=string, store=true", "testB", "type=string, store=true" };
+ dataContext.executeUpdate(new UpdateScript() {
+ @Override
+ public void run(UpdateCallback callback) {
+ callback.insertInto(table).value("foo", "hello").value("bar", 42).execute();
+ callback.insertInto(table).value("foo", "world").value("bar", 43).execute();
+ }
+ });
- new PutMappingRequestBuilder(indicesAdmin).setIndices(indexName).setType(tableType).setSource(sourceProperties)
- .execute().actionGet();
+ dataContext.executeUpdate(new DeleteFrom(table).where("bar").eq(42));
- dataContext.refreshSchemas();
+ final Row row = MetaModelHelper.executeSingleRowQuery(dataContext, dataContext.query().from(table).selectCount()
+ .toQuery());
- assertNotNull(dataContext.getDefaultSchema().getTableByName(tableType));
+ assertEquals("Row[values=[1]]", row.toString());
- new DeleteMappingRequestBuilder(indicesAdmin).setIndices(indexName).setType(tableType).execute().actionGet();
- dataContext.refreshSchemas();
- assertNull(dataContext.getTableByQualifiedLabel(tableType));
}
@Test
- public void testDeleteAll() throws Exception {
+ public void testDeleteNoWhere() throws Exception {
final Schema schema = dataContext.getDefaultSchema();
final CreateTable createTable = new CreateTable(schema, "testCreateTable");
createTable.withColumn("foo").ofType(ColumnType.STRING);
- createTable.withColumn("bar").ofType(ColumnType.NUMBER);
+ createTable.withColumn("bar").ofType(ColumnType.DOUBLE);
dataContext.executeUpdate(createTable);
final Table table = schema.getTableByName("testCreateTable");
@@ -288,8 +279,6 @@
Row row = MetaModelHelper.executeSingleRowQuery(dataContext, dataContext.query().from(table).selectCount()
.toQuery());
assertEquals("Row[values=[0]]", row.toString());
-
- dataContext.executeUpdate(new DropTable(table));
}
@Test
@@ -315,8 +304,6 @@
Row row = MetaModelHelper.executeSingleRowQuery(dataContext,
dataContext.query().from(table).select("foo", "bar").toQuery());
assertEquals("Row[values=[world, 43]]", row.toString());
-
- dataContext.executeUpdate(new DropTable(table));
}
@Test
@@ -328,27 +315,22 @@
dataContext.executeUpdate(createTable);
final Table table = schema.getTableByName("testCreateTable");
- try {
- dataContext.executeUpdate(new UpdateScript() {
- @Override
- public void run(UpdateCallback callback) {
- callback.insertInto(table).value("foo", "hello").value("bar", 42).execute();
- callback.insertInto(table).value("foo", "world").value("bar", 43).execute();
- }
- });
-
- // greater than is not yet supported
- try {
- dataContext.executeUpdate(new DeleteFrom(table).where("bar").gt(40));
- fail("Exception expected");
- } catch (UnsupportedOperationException e) {
- assertEquals("Could not push down WHERE items to delete by query request: [testCreateTable.bar > 40]",
- e.getMessage());
+ dataContext.executeUpdate(new UpdateScript() {
+ @Override
+ public void run(UpdateCallback callback) {
+ callback.insertInto(table).value("foo", "hello").value("bar", 42).execute();
+ callback.insertInto(table).value("foo", "world").value("bar", 43).execute();
}
+ });
- } finally {
- dataContext.executeUpdate(new DropTable(table));
+ // greater than is not yet supported
+ try {
+ dataContext.executeUpdate(new DeleteFrom(table).where("bar").gt(40));
+ fail("Exception expected");
+ } catch (UnsupportedOperationException e) {
+ assertEquals("Could not push down WHERE items to delete by query request: [testCreateTable.bar > 40]",
+ e.getMessage());
}
}
@@ -361,54 +343,24 @@
dataContext.executeUpdate(createTable);
final Table table = schema.getTableByName("testCreateTable");
- try {
- dataContext.executeUpdate(new UpdateScript() {
- @Override
- public void run(UpdateCallback callback) {
- callback.insertInto(table).value("foo", "hello").value("bar", 42).execute();
- callback.insertInto(table).value("foo", "world").value("bar", 43).execute();
- }
- });
+ dataContext.executeUpdate(new UpdateScript() {
+ @Override
+ public void run(UpdateCallback callback) {
+ callback.insertInto(table).value("foo", "hello").value("bar", 42).execute();
+ callback.insertInto(table).value("foo", "world").value("bar", 43).execute();
+ }
+ });
- dataContext.executeUpdate(new Update(table).value("foo", "howdy").where("bar").eq(42));
+ dataContext.executeUpdate(new Update(table).value("foo", "howdy").where("bar").eq(42));
- DataSet dataSet = dataContext.query().from(table).select("foo", "bar").orderBy("bar").execute();
- assertTrue(dataSet.next());
- assertEquals("Row[values=[howdy, 42]]", dataSet.getRow().toString());
- assertTrue(dataSet.next());
- assertEquals("Row[values=[world, 43]]", dataSet.getRow().toString());
- assertFalse(dataSet.next());
- dataSet.close();
- } finally {
- dataContext.executeUpdate(new DropTable(table));
- }
- }
-
- @Test
- public void testDropTable() throws Exception {
- Table table = dataContext.getDefaultSchema().getTableByName(peopleIndexType);
-
- // assert that the table was there to begin with
- {
- DataSet ds = dataContext.query().from(table).selectCount().execute();
- ds.next();
- assertEquals("Row[values=[9]]", ds.getRow().toString());
- ds.close();
- }
-
- dataContext.executeUpdate(new DropTable(table));
- try {
- DataSet ds = dataContext.query().from(table).selectCount().execute();
- ds.next();
- assertEquals("Row[values=[0]]", ds.getRow().toString());
- ds.close();
- } finally {
- // restore the people documents for the next tests
- insertPeopleDocuments();
- client.admin().indices().prepareRefresh().execute().actionGet();
- dataContext = new ElasticSearchDataContext(client, indexName);
- }
+ DataSet dataSet = dataContext.query().from(table).select("foo", "bar").orderBy("bar").execute();
+ assertTrue(dataSet.next());
+ assertEquals("Row[values=[howdy, 42]]", dataSet.getRow().toString());
+ assertTrue(dataSet.next());
+ assertEquals("Row[values=[world, 43]]", dataSet.getRow().toString());
+ assertFalse(dataSet.next());
+ dataSet.close();
}
@Test
@@ -549,26 +501,19 @@
@Test
public void testNonDynamicMapingTableNames() throws Exception {
- createIndex();
+ CreateIndexRequest cir = new CreateIndexRequest(indexName2);
+ client.admin().indices().create(cir).actionGet();
+
+ PutMappingRequest pmr = new PutMappingRequest(indexName2).type(indexType3).source(mapping, XContentType.JSON);
+
+ client.admin().indices().putMapping(pmr).actionGet();
ElasticSearchDataContext dataContext2 = new ElasticSearchDataContext(client, indexName2);
assertEquals("[tweet3]", Arrays.toString(dataContext2.getDefaultSchema().getTableNames().toArray()));
}
- private static void createIndex() {
- CreateIndexRequest cir = new CreateIndexRequest(indexName2);
- CreateIndexResponse response = client.admin().indices().create(cir).actionGet();
-
- System.out.println("create index: " + response.isAcknowledged());
-
- PutMappingRequest pmr = new PutMappingRequest(indexName2).type(indexType3).source(mapping);
-
- PutMappingResponse response2 = client.admin().indices().putMapping(pmr).actionGet();
- System.out.println("put mapping: " + response2.isAcknowledged());
- }
-
- private static void indexBulkDocuments(String indexName, String indexType, int numberOfDocuments) {
+ private void indexBulkDocuments(String indexName, String indexType, int numberOfDocuments) {
BulkRequestBuilder bulkRequest = client.prepareBulk();
for (int i = 0; i < numberOfDocuments; i++) {
@@ -578,17 +523,17 @@
bulkRequest.execute().actionGet();
}
- private static void indexTweeterDocument(String indexType, int id, Date date) {
- client.prepareIndex(indexName, indexType).setSource(buildTweeterJson(id, date))
- .setId("tweet_" + indexType + "_" + id).execute().actionGet();
+ private void indexTweeterDocument(String indexType, int id, Date date) {
+ final String id1 = "tweet_" + indexType + "_" + id;
+ client.prepareIndex(indexName, indexType, id1).setSource(buildTweeterJson(id, date)).execute().actionGet();
}
- private static void indexTweeterDocument(String indexType, int id) {
+ private void indexTweeterDocument(String indexType, int id) {
client.prepareIndex(indexName, indexType).setSource(buildTweeterJson(id))
.setId("tweet_" + indexType + "_" + id).execute().actionGet();
}
- private static void indexOnePeopleDocument(String gender, int age, int id) throws IOException {
+ private void indexOnePeopleDocument(String gender, int age, int id) throws IOException {
client.prepareIndex(indexName, peopleIndexType).setSource(buildPeopleJson(gender, age, id)).execute()
.actionGet();
}
diff --git a/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchMetaDataParserTest.java b/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchMetaDataParserTest.java
index 8b5eb50..e08f715 100644
--- a/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchMetaDataParserTest.java
+++ b/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchMetaDataParserTest.java
@@ -24,6 +24,7 @@
import junit.framework.TestCase;
import org.apache.metamodel.elasticsearch.common.ElasticSearchMetaData;
+import org.apache.metamodel.elasticsearch.common.ElasticSearchMetaDataParser;
import org.apache.metamodel.schema.ColumnType;
import org.elasticsearch.common.collect.MapBuilder;
diff --git a/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/utils/EmbeddedElasticsearchServer.java b/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/utils/EmbeddedElasticsearchServer.java
deleted file mode 100644
index b94d0ab..0000000
--- a/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/utils/EmbeddedElasticsearchServer.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.metamodel.elasticsearch.nativeclient.utils;
-
-import org.apache.commons.io.FileUtils;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.common.settings.ImmutableSettings;
-import org.elasticsearch.node.Node;
-
-import java.io.File;
-import java.io.IOException;
-
-import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
-
-public class EmbeddedElasticsearchServer {
-
- private static final String DEFAULT_DATA_DIRECTORY = "target/elasticsearch-data";
-
- private final Node node;
- private final String dataDirectory;
-
- public EmbeddedElasticsearchServer() {
- this(DEFAULT_DATA_DIRECTORY);
- }
-
- public EmbeddedElasticsearchServer(String dataDirectory) {
- this.dataDirectory = dataDirectory;
-
- ImmutableSettings.Builder elasticsearchSettings = ImmutableSettings.settingsBuilder()
- .put("http.enabled", "true")
- .put("path.data", dataDirectory);
-
- node = nodeBuilder()
- .local(true)
- .settings(elasticsearchSettings.build())
- .node();
- }
-
- public Client getClient() {
- return node.client();
- }
-
- public void shutdown() {
- node.close();
- deleteDataDirectory();
- }
-
- private void deleteDataDirectory() {
- try {
- FileUtils.deleteDirectory(new File(dataDirectory));
- } catch (IOException e) {
- throw new RuntimeException("Could not delete data directory of embedded elasticsearch server", e);
- }
- }
-}
\ No newline at end of file
diff --git a/elasticsearch/pom.xml b/elasticsearch/pom.xml
index 9a3b1d8..46f930c 100644
--- a/elasticsearch/pom.xml
+++ b/elasticsearch/pom.xml
@@ -21,7 +21,7 @@
<name>MetaModel module for Elasticsearch</name>
<properties>
- <elasticsearch.version>1.4.4</elasticsearch.version>
+ <elasticsearch.version>5.6.3</elasticsearch.version>
</properties>
<modules>
diff --git a/elasticsearch/rest/pom.xml b/elasticsearch/rest/pom.xml
index 936cf4c..8f556e6 100644
--- a/elasticsearch/rest/pom.xml
+++ b/elasticsearch/rest/pom.xml
@@ -26,11 +26,6 @@
<modelVersion>4.0.0</modelVersion>
- <properties>
- <jest.version>2.0.2</jest.version>
- <elasticsearch.version>1.4.4</elasticsearch.version>
- </properties>
-
<artifactId>MetaModel-elasticsearch-rest</artifactId>
<name>MetaModel module for ElasticSearch via REST client</name>
@@ -52,11 +47,15 @@
<artifactId>commons-io</artifactId>
</dependency>
- <!-- Jest -->
<dependency>
- <groupId>io.searchbox</groupId>
- <artifactId>jest</artifactId>
- <version>${jest.version}</version>
+ <groupId>org.slf4j</groupId>
+ <artifactId>jcl-over-slf4j</artifactId>
+ </dependency>
+ <!-- elasticsearch -->
+ <dependency>
+ <groupId>org.elasticsearch.client</groupId>
+ <artifactId>elasticsearch-rest-high-level-client</artifactId>
+ <version>${elasticsearch.version}</version>
<exclusions>
<exclusion>
<groupId>commons-logging</groupId>
@@ -65,21 +64,11 @@
</exclusions>
</dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>jcl-over-slf4j</artifactId>
- </dependency>
- <!-- elasticsearch -->
- <dependency>
- <groupId>org.elasticsearch</groupId>
- <artifactId>elasticsearch</artifactId>
- <version>${elasticsearch.version}</version>
- </dependency>
-
<!-- test -->
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
+ <dependency><!-- required by elasticsearch -->
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-core</artifactId>
+ <version>2.9.1</version>
<scope>test</scope>
</dependency>
<dependency>
@@ -87,5 +76,84 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-all</artifactId>
+ <version>1.3</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
+ <profiles>
+ <profile>
+ <id>integration-test</id>
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ <version>2.19.1</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>integration-test</goal>
+ <goal>verify</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <groupId>io.fabric8</groupId>
+ <artifactId>docker-maven-plugin</artifactId>
+ <version>${docker-maven-plugin.version}</version>
+ <configuration>
+ <logDate>default</logDate>
+ <autoPull>true</autoPull>
+ <images>
+ <image>
+ <name>elasticsearch-metamodel</name>
+ <build>
+ <dockerFileDir>${project.build.directory}/test-classes</dockerFileDir>
+ </build>
+ <run>
+ <ports>
+ <port>9200:9200</port>
+ <port>9300:9300</port>
+ </ports>
+ <env>
+ <ES_JAVA_OPTS>-Xms1g -Xmx1g</ES_JAVA_OPTS>
+ <cluster.name>docker-cluster</cluster.name>
+ <bootstrap.memory_lock>true</bootstrap.memory_lock>
+ <xpack.security.enabled>false</xpack.security.enabled>
+ </env>
+ <wait>
+ <url>http://${docker.host.address}:9200</url>
+ <time>300000</time>
+ </wait>
+ </run>
+ </image>
+ </images>
+ </configuration>
+ <executions>
+ <execution>
+ <id>start</id>
+ <phase>pre-integration-test</phase>
+ <goals>
+ <goal>build</goal>
+ <goal>start</goal>
+ </goals>
+ </execution>
+ <execution>
+ <id>stop</id>
+ <phase>post-integration-test</phase>
+ <goals>
+ <goal>stop</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
</project>
\ No newline at end of file
diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestClient.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestClient.java
new file mode 100644
index 0000000..ddd7e17
--- /dev/null
+++ b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestClient.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.elasticsearch.rest;
+
+import static java.util.Collections.emptySet;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.http.Header;
+import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.entity.ContentType;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.ActionResponse;
+import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
+import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.main.MainRequest;
+import org.elasticsearch.action.search.ClearScrollRequest;
+import org.elasticsearch.action.search.SearchScrollRequest;
+import org.elasticsearch.client.Request;
+import org.elasticsearch.client.Response;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ElasticSearchRestClient extends RestHighLevelClient {
+ private static final Logger logger = LoggerFactory.getLogger(ElasticSearchRestClient.class);
+
+ public ElasticSearchRestClient(final RestClient restClient) {
+ super(restClient);
+ }
+
+ public final boolean refresh(final String indexName, final Header... headers) {
+ try {
+ return performRequest(new MainRequest(), request -> refresh(indexName),
+ ElasticSearchRestClient::convertResponse, emptySet(), headers);
+ } catch (IOException e) {
+ logger.info("Failed to refresh index \"{}\"", indexName, e);
+ }
+ return false;
+ }
+
+ private static Request refresh(final String indexName) {
+ return new Request(HttpPost.METHOD_NAME, "/" + indexName + "/_refresh", Collections.emptyMap(), null);
+ }
+
+ public final boolean delete(final String indexName, final Header... headers) throws IOException {
+ return performRequest(new MainRequest(), request -> delete(indexName),
+ ElasticSearchRestClient::convertResponse, emptySet(), headers);
+ }
+
+ private static Request delete(final String indexName) {
+ return new Request(HttpDelete.METHOD_NAME, "/" + indexName, Collections.emptyMap(), null);
+ }
+
+ public Set<Entry<String, Object>> getMappings(final String indexName, final Header... headers) throws IOException {
+ return performRequestAndParseEntity(new GetIndexRequest(), request -> getMappings(indexName), (
+ response) -> parseMappings(response, indexName), emptySet(), headers);
+ }
+
+ private static Request getMappings(final String indexName) {
+ return new Request(HttpGet.METHOD_NAME, "/" + indexName, Collections.emptyMap(), null);
+ }
+
+ public final boolean createMapping(final PutMappingRequest putMappingRequest, final Header... headers)
+ throws IOException {
+ return performRequest(putMappingRequest, request -> putMapping(putMappingRequest),
+ ElasticSearchRestClient::convertResponse, emptySet(), headers);
+ }
+
+ private static Request putMapping(final PutMappingRequest putMappingRequest) {
+ final String endpoint = "/" + putMappingRequest.indices()[0] + "/_mapping/" + putMappingRequest.type();
+ final ByteArrayEntity entity = new ByteArrayEntity(putMappingRequest.source().getBytes(),
+ ContentType.APPLICATION_JSON);
+ return new Request(HttpPut.METHOD_NAME, endpoint, Collections.emptyMap(), entity);
+ }
+
+ // Carbon copy of RestHighLevelClient#convertExistsResponse(Response) method, which is unaccessible from this class.
+ private static boolean convertResponse(final Response response) {
+ return response.getStatusLine().getStatusCode() == 200;
+ }
+
+ @SuppressWarnings("unchecked")
+ static Set<Entry<String, Object>> parseMappings(final XContentParser response, final String indexName) throws IOException {
+ Map<String, Object> schema = (Map<String, Object>) response.map().get(indexName);
+ Map<String, Object> tables = (Map<String, Object>) schema.get("mappings");
+
+ return tables.entrySet();
+ }
+
+ ActionResponse execute(final ActionRequest request) throws IOException {
+ if (request instanceof BulkRequest) {
+ return bulk((BulkRequest) request);
+ } else if (request instanceof IndexRequest) {
+ return index((IndexRequest) request);
+ } else if (request instanceof DeleteRequest) {
+ return delete((DeleteRequest) request);
+ } else if (request instanceof ClearScrollRequest) {
+ return clearScroll((ClearScrollRequest) request);
+ } else if (request instanceof SearchScrollRequest) {
+ return searchScroll((SearchScrollRequest) request);
+ }
+
+ return null;
+ }
+}
diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchCreateTableBuilder.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestCreateTableBuilder.java
similarity index 79%
rename from elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchCreateTableBuilder.java
rename to elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestCreateTableBuilder.java
index 3e71c4d..91842f5 100644
--- a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchCreateTableBuilder.java
+++ b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestCreateTableBuilder.java
@@ -27,13 +27,12 @@
import org.apache.metamodel.schema.MutableTable;
import org.apache.metamodel.schema.Schema;
import org.apache.metamodel.schema.Table;
+import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
-import io.searchbox.indices.mapping.PutMapping;
+final class ElasticSearchRestCreateTableBuilder extends AbstractTableCreationBuilder<ElasticSearchRestUpdateCallback> {
-final class JestElasticSearchCreateTableBuilder extends AbstractTableCreationBuilder<JestElasticSearchUpdateCallback> {
-
- public JestElasticSearchCreateTableBuilder(JestElasticSearchUpdateCallback updateCallback, Schema schema,
- String name) {
+ public ElasticSearchRestCreateTableBuilder(final ElasticSearchRestUpdateCallback updateCallback,
+ final Schema schema, final String name) {
super(updateCallback, schema, name);
}
@@ -45,7 +44,7 @@
final ElasticSearchRestDataContext dataContext = getUpdateCallback().getDataContext();
final String indexName = dataContext.getIndexName();
- final PutMapping putMapping = new PutMapping.Builder(indexName, table.getName(), source).build();
+ final PutMappingRequest putMapping = new PutMappingRequest(indexName).type(table.getName()).source(source);
getUpdateCallback().execute(putMapping);
final MutableSchema schema = (MutableSchema) getSchema();
diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContext.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContext.java
index c5a5696..5b32d14 100644
--- a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContext.java
+++ b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContext.java
@@ -18,57 +18,43 @@
*/
package org.apache.metamodel.elasticsearch.rest;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.metamodel.BatchUpdateScript;
import org.apache.metamodel.DataContext;
import org.apache.metamodel.MetaModelException;
-import org.apache.metamodel.QueryPostprocessDataContext;
import org.apache.metamodel.UpdateScript;
import org.apache.metamodel.UpdateSummary;
-import org.apache.metamodel.UpdateableDataContext;
import org.apache.metamodel.data.DataSet;
import org.apache.metamodel.data.DataSetHeader;
import org.apache.metamodel.data.Row;
import org.apache.metamodel.data.SimpleDataSetHeader;
+import org.apache.metamodel.elasticsearch.AbstractElasticSearchDataContext;
import org.apache.metamodel.elasticsearch.common.ElasticSearchMetaData;
+import org.apache.metamodel.elasticsearch.common.ElasticSearchMetaDataParser;
import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils;
import org.apache.metamodel.query.FilterItem;
import org.apache.metamodel.query.LogicalOperator;
import org.apache.metamodel.query.SelectItem;
import org.apache.metamodel.schema.Column;
-import org.apache.metamodel.schema.MutableColumn;
-import org.apache.metamodel.schema.MutableSchema;
-import org.apache.metamodel.schema.MutableTable;
-import org.apache.metamodel.schema.Schema;
import org.apache.metamodel.schema.Table;
import org.apache.metamodel.util.SimpleTableDef;
+import org.elasticsearch.action.get.GetRequest;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-
-import io.searchbox.client.JestClient;
-import io.searchbox.client.JestResult;
-import io.searchbox.core.Count;
-import io.searchbox.core.CountResult;
-import io.searchbox.core.Get;
-import io.searchbox.core.Search;
-import io.searchbox.core.SearchResult;
-import io.searchbox.indices.mapping.GetMapping;
-import io.searchbox.params.Parameters;
-
/**
* DataContext implementation for ElasticSearch analytics engine.
*
@@ -86,28 +72,14 @@
* This implementation supports either automatic discovery of a schema or manual
* specification of a schema, through the {@link SimpleTableDef} class.
*/
-public class ElasticSearchRestDataContext extends QueryPostprocessDataContext implements DataContext,
- UpdateableDataContext {
+public class ElasticSearchRestDataContext extends AbstractElasticSearchDataContext {
private static final Logger logger = LoggerFactory.getLogger(ElasticSearchRestDataContext.class);
- public static final String FIELD_ID = "_id";
-
- // 1 minute timeout
- public static final String TIMEOUT_SCROLL = "1m";
-
// we scroll when more than 400 rows are expected
private static final int SCROLL_THRESHOLD = 400;
- private final JestClient elasticSearchClient;
-
- private final String indexName;
- // Table definitions that are set from the beginning, not supposed to be
- // changed.
- private final List<SimpleTableDef> staticTableDefinitions;
-
- // Table definitions that are discovered, these can change
- private final List<SimpleTableDef> dynamicTableDefinitions = new ArrayList<>();
+ private final ElasticSearchRestClient elasticSearchClient;
/**
* Constructs a {@link ElasticSearchRestDataContext}. This constructor
@@ -122,18 +94,14 @@
* an array of {@link SimpleTableDef}s, which define the table
* and column model of the ElasticSearch index.
*/
- public ElasticSearchRestDataContext(JestClient client, String indexName, SimpleTableDef... tableDefinitions) {
- super(false);
+ public ElasticSearchRestDataContext(final ElasticSearchRestClient client, final String indexName,
+ final SimpleTableDef... tableDefinitions) {
+ super(indexName, tableDefinitions);
+
if (client == null) {
throw new IllegalArgumentException("ElasticSearch Client cannot be null");
}
- if (indexName == null || indexName.trim().length() == 0) {
- throw new IllegalArgumentException("Invalid ElasticSearch Index name: " + indexName);
- }
this.elasticSearchClient = client;
- this.indexName = indexName;
- this.staticTableDefinitions = (tableDefinitions == null || tableDefinitions.length == 0 ? Collections
- .<SimpleTableDef> emptyList() : Arrays.asList(tableDefinitions));
this.dynamicTableDefinitions.addAll(Arrays.asList(detectSchema()));
}
@@ -147,65 +115,51 @@
* @param indexName
* the name of the ElasticSearch index to represent
*/
- public ElasticSearchRestDataContext(JestClient client, String indexName) {
+ public ElasticSearchRestDataContext(final ElasticSearchRestClient client, String indexName) {
this(client, indexName, new SimpleTableDef[0]);
}
- /**
- * Performs an analysis of the available indexes in an ElasticSearch cluster
- * {@link JestClient} instance and detects the elasticsearch types structure
- * based on the metadata provided by the ElasticSearch java client.
- *
- * @see {@link #detectTable(JsonObject, String)}
- * @return a mutable schema instance, useful for further fine tuning by the
- * user.
- */
- private SimpleTableDef[] detectSchema() {
+ @Override
+ protected SimpleTableDef[] detectSchema() {
logger.info("Detecting schema for index '{}'", indexName);
- final JestResult jestResult;
+ final Set<Entry<String, Object>> mappings;
try {
- final GetMapping getMapping = new GetMapping.Builder().addIndex(indexName).build();
- jestResult = elasticSearchClient.execute(getMapping);
- } catch (Exception e) {
+ mappings = getElasticSearchClient().getMappings(indexName);
+ } catch (IOException e) {
logger.error("Failed to retrieve mappings", e);
throw new MetaModelException("Failed to execute request for index information needed to detect schema", e);
}
- if (!jestResult.isSucceeded()) {
- logger.error("Failed to retrieve mappings; {}", jestResult.getErrorMessage());
- throw new MetaModelException("Failed to retrieve mappings; " + jestResult.getErrorMessage());
- }
-
final List<SimpleTableDef> result = new ArrayList<>();
- final Set<Map.Entry<String, JsonElement>> mappings = jestResult.getJsonObject().getAsJsonObject(indexName)
- .getAsJsonObject("mappings").entrySet();
- if (mappings.size() == 0) {
+ if (mappings.isEmpty()) {
logger.warn("No metadata returned for index name '{}' - no tables will be detected.");
} else {
+ for (Entry<String, Object> mapping : mappings) {
+ final String documentType = mapping.getKey();
- for (Map.Entry<String, JsonElement> entry : mappings) {
- final String documentType = entry.getKey();
+ @SuppressWarnings("unchecked")
+ Map<String, Object> mappingConfiguration = (Map<String, Object>) mapping.getValue();
+ @SuppressWarnings("unchecked")
+ Map<String, Object> properties = (Map<String, Object>) mappingConfiguration.get("properties");
try {
- final SimpleTableDef table = detectTable(entry.getValue().getAsJsonObject().get("properties")
- .getAsJsonObject(), documentType);
+ final SimpleTableDef table = detectTable(properties, documentType);
result.add(table);
} catch (Exception e) {
logger.error("Unexpected error during detectTable for document type '{}'", documentType, e);
}
}
}
- final SimpleTableDef[] tableDefArray = result.toArray(new SimpleTableDef[result.size()]);
- Arrays.sort(tableDefArray, new Comparator<SimpleTableDef>() {
- @Override
- public int compare(SimpleTableDef o1, SimpleTableDef o2) {
- return o1.getName().compareTo(o2.getName());
- }
- });
+ return sortTables(result);
+ }
- return tableDefArray;
+ @Override
+ protected void onSchemaCacheRefreshed() {
+ getElasticSearchClient().refresh(indexName);
+
+ detectSchema();
}
/**
@@ -219,60 +173,22 @@
* the name of the index type
* @return a table definition for ElasticSearch.
*/
- private static SimpleTableDef detectTable(JsonObject metadataProperties, String documentType) {
- final ElasticSearchMetaData metaData = JestElasticSearchMetaDataParser.parse(metadataProperties);
+ private static SimpleTableDef detectTable(final Map<String, Object> metadataProperties, final String documentType) {
+ final ElasticSearchMetaData metaData = ElasticSearchMetaDataParser.parse(metadataProperties);
return new SimpleTableDef(documentType, metaData.getColumnNames(), metaData.getColumnTypes());
}
@Override
- protected Schema getMainSchema() throws MetaModelException {
- final MutableSchema theSchema = new MutableSchema(getMainSchemaName());
- for (final SimpleTableDef tableDef : staticTableDefinitions) {
- addTable(theSchema, tableDef);
- }
-
- final SimpleTableDef[] tables = detectSchema();
- synchronized (this) {
- dynamicTableDefinitions.clear();
- dynamicTableDefinitions.addAll(Arrays.asList(tables));
- for (final SimpleTableDef tableDef : dynamicTableDefinitions) {
- final List<String> tableNames = theSchema.getTableNames();
-
- if (!tableNames.contains(tableDef.getName())) {
- addTable(theSchema, tableDef);
- }
- }
- }
-
- return theSchema;
- }
-
- private void addTable(final MutableSchema theSchema, final SimpleTableDef tableDef) {
- final MutableTable table = tableDef.toTable().setSchema(theSchema);
- final Column idColumn = table.getColumnByName(FIELD_ID);
- if (idColumn != null && idColumn instanceof MutableColumn) {
- final MutableColumn mutableColumn = (MutableColumn) idColumn;
- mutableColumn.setPrimaryKey(true);
- }
- theSchema.addTable(table);
- }
-
- @Override
- protected String getMainSchemaName() throws MetaModelException {
- return indexName;
- }
-
- @Override
- protected DataSet materializeMainSchemaTable(Table table, List<SelectItem> selectItems,
- List<FilterItem> whereItems, int firstRow, int maxRows) {
+ protected DataSet materializeMainSchemaTable(final Table table, final List<SelectItem> selectItems,
+ final List<FilterItem> whereItems, final int firstRow, final int maxRows) {
final QueryBuilder queryBuilder = ElasticSearchUtils.createQueryBuilderForSimpleWhere(whereItems,
LogicalOperator.AND);
if (queryBuilder != null) {
// where clause can be pushed down to an ElasticSearch query
SearchSourceBuilder searchSourceBuilder = createSearchRequest(firstRow, maxRows, queryBuilder);
- SearchResult result = executeSearch(table, searchSourceBuilder, scrollNeeded(maxRows));
+ SearchResponse result = executeSearch(table, searchSourceBuilder, scrollNeeded(maxRows));
- return new JestElasticSearchDataSet(elasticSearchClient, result, selectItems);
+ return new ElasticSearchRestDataSet(getElasticSearchClient(), result, selectItems);
}
return super.materializeMainSchemaTable(table, selectItems, whereItems, firstRow, maxRows);
}
@@ -282,30 +198,30 @@
return !limitMaxRowsIsSet(maxRows) || maxRows > SCROLL_THRESHOLD;
}
- private SearchResult executeSearch(Table table, SearchSourceBuilder searchSourceBuilder, boolean scroll) {
- Search.Builder builder = new Search.Builder(searchSourceBuilder.toString()).addIndex(getIndexName()).addType(
- table.getName());
+ private SearchResponse executeSearch(final Table table, final SearchSourceBuilder searchSourceBuilder,
+ final boolean scroll) {
+ final SearchRequest searchRequest = new SearchRequest(new String[] { getIndexName() }, searchSourceBuilder)
+ .types(table.getName());
+
if (scroll) {
- builder.setParameter(Parameters.SCROLL, TIMEOUT_SCROLL);
+ searchRequest.scroll(TIMEOUT_SCROLL);
}
- Search search = builder.build();
- SearchResult result;
try {
- result = elasticSearchClient.execute(search);
- } catch (Exception e) {
+ return getElasticSearchClient().search(searchRequest);
+ } catch (IOException e) {
logger.warn("Could not execute ElasticSearch query", e);
throw new MetaModelException("Could not execute ElasticSearch query", e);
}
- return result;
}
@Override
protected DataSet materializeMainSchemaTable(Table table, List<Column> columns, int maxRows) {
- SearchResult searchResult = executeSearch(table, createSearchRequest(1, maxRows, null), scrollNeeded(
+ SearchResponse searchResult = executeSearch(table, createSearchRequest(1, maxRows, null), scrollNeeded(
maxRows));
- return new JestElasticSearchDataSet(elasticSearchClient, searchResult, columns.stream().map(SelectItem::new).collect(Collectors.toList()));
+ return new ElasticSearchRestDataSet(getElasticSearchClient(), searchResult, columns.stream()
+ .map(SelectItem::new).collect(Collectors.toList()));
}
private SearchSourceBuilder createSearchRequest(int firstRow, int maxRows, QueryBuilder queryBuilder) {
@@ -317,7 +233,7 @@
if (limitMaxRowsIsSet(maxRows)) {
searchRequest.size(maxRows);
} else {
- searchRequest.size(Integer.MAX_VALUE);
+ searchRequest.size(SCROLL_THRESHOLD);
}
if (queryBuilder != null) {
@@ -337,12 +253,16 @@
final String documentType = table.getName();
final String id = keyValue.toString();
- final Get get = new Get.Builder(indexName, id).type(documentType).build();
- final JestResult getResult = JestClientExecutor.execute(elasticSearchClient, get);
-
final DataSetHeader header = new SimpleDataSetHeader(selectItems);
- return JestElasticSearchUtils.createRow(getResult.getJsonObject().get("_source").getAsJsonObject(), id, header);
+ try {
+ return ElasticSearchUtils.createRow(getElasticSearchClient()
+ .get(new GetRequest(getIndexName(), documentType, id))
+ .getSource(), id, header);
+ } catch (IOException e) {
+ logger.warn("Could not execute ElasticSearch query", e);
+ throw new MetaModelException("Could not execute ElasticSearch query", e);
+ }
}
@Override
@@ -352,30 +272,23 @@
return null;
}
final String documentType = table.getName();
- SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
+ final SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.query(QueryBuilders.termQuery("_type", documentType));
+ sourceBuilder.size(0);
- Count count = new Count.Builder().addIndex(indexName).query(sourceBuilder.toString()).build();
-
- CountResult countResult;
try {
- countResult = elasticSearchClient.execute(count);
+ return getElasticSearchClient().search(new SearchRequest(new String[] { getIndexName() }, sourceBuilder))
+ .getHits().getTotalHits();
} catch (Exception e) {
logger.warn("Could not execute ElasticSearch get query", e);
throw new MetaModelException("Could not execute ElasticSearch get query", e);
}
-
- return countResult.getCount();
- }
-
- private boolean limitMaxRowsIsSet(int maxRows) {
- return (maxRows != -1);
}
@Override
public UpdateSummary executeUpdate(UpdateScript update) {
final boolean isBatch = update instanceof BatchUpdateScript;
- final JestElasticSearchUpdateCallback callback = new JestElasticSearchUpdateCallback(this, isBatch);
+ final ElasticSearchRestUpdateCallback callback = new ElasticSearchRestUpdateCallback(this, isBatch);
update.run(callback);
callback.onExecuteUpdateFinished();
return callback.getUpdateSummary();
@@ -384,14 +297,7 @@
/**
* Gets the {@link JestClient} that this {@link DataContext} is wrapping.
*/
- public JestClient getElasticSearchClient() {
+ public ElasticSearchRestClient getElasticSearchClient() {
return elasticSearchClient;
}
-
- /**
- * Gets the name of the index that this {@link DataContext} is working on.
- */
- public String getIndexName() {
- return indexName;
- }
}
diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContextFactory.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContextFactory.java
index b2dc4c3..b1756b7 100644
--- a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContextFactory.java
+++ b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContextFactory.java
@@ -18,6 +18,14 @@
*/
package org.apache.metamodel.elasticsearch.rest;
+import java.net.MalformedURLException;
+import java.net.URL;
+
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.metamodel.ConnectionException;
import org.apache.metamodel.DataContext;
import org.apache.metamodel.factory.DataContextFactory;
@@ -25,10 +33,8 @@
import org.apache.metamodel.factory.ResourceFactoryRegistry;
import org.apache.metamodel.factory.UnsupportedDataContextPropertiesException;
import org.apache.metamodel.util.SimpleTableDef;
-
-import io.searchbox.client.JestClient;
-import io.searchbox.client.JestClientFactory;
-import io.searchbox.client.config.HttpClientConfig;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
/**
* Factory for ElasticSearch data context of REST type.
@@ -72,18 +78,20 @@
return true;
}
- private JestClient createClient(DataContextProperties properties) {
- final String serverUri = properties.getUrl();
- final HttpClientConfig.Builder builder = new HttpClientConfig.Builder(serverUri);
+ private ElasticSearchRestClient createClient(final DataContextProperties properties) throws MalformedURLException {
+ final URL url = new URL(properties.getUrl());
+ final RestClientBuilder builder = RestClient.builder(new HttpHost(url.getHost(), url.getPort()));
+
if (properties.getUsername() != null) {
- builder.defaultCredentials(properties.getUsername(), properties.getPassword());
+ final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
+ credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(properties.getUsername(),
+ properties.getPassword()));
+
+ builder.setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(
+ credentialsProvider));
}
- final JestClientFactory clientFactory = new JestClientFactory();
- final HttpClientConfig httpClientConfig = new HttpClientConfig(builder);
- clientFactory.setHttpClientConfig(httpClientConfig);
- final JestClient client = clientFactory.getObject();
- return client;
+ return new ElasticSearchRestClient(builder.build());
}
private String getIndex(DataContextProperties properties) {
@@ -97,10 +105,14 @@
@Override
public DataContext create(DataContextProperties properties, ResourceFactoryRegistry resourceFactoryRegistry)
throws UnsupportedDataContextPropertiesException, ConnectionException {
- final JestClient client = createClient(properties);
- final String indexName = getIndex(properties);
- final SimpleTableDef[] tableDefinitions = properties.getTableDefs();
- return new ElasticSearchRestDataContext(client, indexName, tableDefinitions);
+ try {
+ ElasticSearchRestClient client = createClient(properties);
+ final String indexName = getIndex(properties);
+ final SimpleTableDef[] tableDefinitions = properties.getTableDefs();
+ return new ElasticSearchRestDataContext(client, indexName, tableDefinitions);
+ } catch (MalformedURLException e) {
+ throw new UnsupportedDataContextPropertiesException(e);
+ }
}
}
diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataSet.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataSet.java
new file mode 100644
index 0000000..d79b271
--- /dev/null
+++ b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataSet.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.elasticsearch.rest;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.metamodel.data.DataSet;
+import org.apache.metamodel.elasticsearch.AbstractElasticSearchDataContext;
+import org.apache.metamodel.elasticsearch.AbstractElasticSearchDataSet;
+import org.apache.metamodel.query.SelectItem;
+import org.elasticsearch.action.search.ClearScrollRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.search.SearchScrollRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link DataSet} implementation for ElasticSearch
+ */
+final class ElasticSearchRestDataSet extends AbstractElasticSearchDataSet {
+
+ private static final Logger logger = LoggerFactory.getLogger(ElasticSearchRestDataSet.class);
+
+ private final ElasticSearchRestClient _client;
+
+ public ElasticSearchRestDataSet(final ElasticSearchRestClient client, final SearchResponse searchResponse, final List<SelectItem> selectItems) {
+ super(searchResponse, selectItems);
+ _client = client;
+ }
+
+ @Override
+ public void closeNow() {
+ final String scrollId = _searchResponse.getScrollId();
+ final ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
+ clearScrollRequest.addScrollId(scrollId);
+ try {
+ _client.execute(clearScrollRequest);
+ } catch (IOException e) {
+ logger.warn("Could not clear scroll.", e);
+ }
+ }
+
+ @Override
+ protected SearchResponse scrollSearchResponse(final String scrollId) throws IOException {
+ return _client.searchScroll(new SearchScrollRequest(scrollId).scroll(
+ AbstractElasticSearchDataContext.TIMEOUT_SCROLL));
+ }
+}
diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDeleteBuilder.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDeleteBuilder.java
similarity index 68%
rename from elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDeleteBuilder.java
rename to elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDeleteBuilder.java
index cc1c3e7..f8caa2d 100644
--- a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDeleteBuilder.java
+++ b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDeleteBuilder.java
@@ -18,7 +18,10 @@
*/
package org.apache.metamodel.elasticsearch.rest;
-import io.searchbox.core.DeleteByQuery;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
import org.apache.metamodel.MetaModelException;
import org.apache.metamodel.delete.AbstractRowDeletionBuilder;
import org.apache.metamodel.delete.RowDeletionBuilder;
@@ -26,19 +29,21 @@
import org.apache.metamodel.query.FilterItem;
import org.apache.metamodel.query.LogicalOperator;
import org.apache.metamodel.schema.Table;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
-import java.util.List;
-
/**
* {@link RowDeletionBuilder} implementation for
* {@link ElasticSearchRestDataContext}.
*/
-final class JestElasticSearchDeleteBuilder extends AbstractRowDeletionBuilder {
- private final JestElasticSearchUpdateCallback _updateCallback;
+final class ElasticSearchRestDeleteBuilder extends AbstractRowDeletionBuilder {
+ private final ElasticSearchRestUpdateCallback _updateCallback;
- public JestElasticSearchDeleteBuilder(JestElasticSearchUpdateCallback updateCallback, Table table) {
+ public ElasticSearchRestDeleteBuilder(final ElasticSearchRestUpdateCallback updateCallback, final Table table) {
super(table);
_updateCallback = updateCallback;
}
@@ -64,13 +69,28 @@
throw new UnsupportedOperationException("Could not push down WHERE items to delete by query request: "
+ whereItems);
}
+
final SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(queryBuilder);
- final DeleteByQuery deleteByQuery =
- new DeleteByQuery.Builder(searchSourceBuilder.toString()).addIndex(indexName).addType(
- documentType).build();
+ SearchRequest searchRequest = new SearchRequest(indexName);
+ searchRequest.types(documentType);
+ searchRequest.source(searchSourceBuilder);
- _updateCallback.execute(deleteByQuery);
+ try {
+ final SearchResponse response = dataContext.getElasticSearchClient().search(searchRequest);
+
+ final Iterator<SearchHit> iterator = response.getHits().iterator();
+ while (iterator.hasNext()) {
+ final SearchHit hit = iterator.next();
+ final String typeId = hit.getId();
+
+ DeleteRequest deleteRequest = new DeleteRequest(indexName, documentType, typeId);
+
+ _updateCallback.execute(deleteRequest);
+ }
+ } catch (IOException e) {
+ throw new MetaModelException(e);
+ }
}
}
diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchInsertBuilder.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestInsertBuilder.java
similarity index 77%
rename from elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchInsertBuilder.java
rename to elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestInsertBuilder.java
index 746538d..0ba4f66 100644
--- a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchInsertBuilder.java
+++ b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestInsertBuilder.java
@@ -26,19 +26,17 @@
import org.apache.metamodel.insert.AbstractRowInsertionBuilder;
import org.apache.metamodel.schema.Column;
import org.apache.metamodel.schema.Table;
+import org.elasticsearch.action.index.IndexRequest;
-import io.searchbox.core.Index;
-import io.searchbox.params.Parameters;
+final class ElasticSearchRestInsertBuilder extends AbstractRowInsertionBuilder<ElasticSearchRestUpdateCallback> {
-final class JestElasticSearchInsertBuilder extends AbstractRowInsertionBuilder<JestElasticSearchUpdateCallback> {
-
- public JestElasticSearchInsertBuilder(JestElasticSearchUpdateCallback updateCallback, Table table) {
+ public ElasticSearchRestInsertBuilder(final ElasticSearchRestUpdateCallback updateCallback, final Table table) {
super(updateCallback, table);
}
@Override
public void execute() throws MetaModelException {
- final JestElasticSearchUpdateCallback updateCallback = getUpdateCallback();
+ final ElasticSearchRestUpdateCallback updateCallback = getUpdateCallback();
final ElasticSearchRestDataContext dataContext = updateCallback.getDataContext();
final String indexName = dataContext.getIndexName();
final String documentType = getTable().getName();
@@ -52,7 +50,7 @@
final String columnName = columns[i].getName();
final Object value = values[i];
- if (ElasticSearchRestDataContext.FIELD_ID.equals(columnName)) {
+ if (ElasticSearchUtils.FIELD_ID.equals(columnName)) {
if (value != null) {
id = value.toString();
}
@@ -65,10 +63,10 @@
assert !source.isEmpty();
- final Index index = new Index.Builder(source).index(indexName).type(documentType).id(id).setParameter(
- Parameters.OP_TYPE, "create").build();
+ IndexRequest indexRequest = new IndexRequest(indexName, documentType, id);
+ indexRequest.source(source);
- getUpdateCallback().execute(index);
+ getUpdateCallback().execute(indexRequest);
}
}
diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestUpdateCallback.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestUpdateCallback.java
new file mode 100644
index 0000000..defd18f
--- /dev/null
+++ b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestUpdateCallback.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.elasticsearch.rest;
+
+import java.io.IOException;
+
+import org.apache.metamodel.AbstractUpdateCallback;
+import org.apache.metamodel.MetaModelException;
+import org.apache.metamodel.UpdateCallback;
+import org.apache.metamodel.create.TableCreationBuilder;
+import org.apache.metamodel.delete.RowDeletionBuilder;
+import org.apache.metamodel.drop.TableDropBuilder;
+import org.apache.metamodel.insert.RowInsertionBuilder;
+import org.apache.metamodel.schema.Schema;
+import org.apache.metamodel.schema.Table;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.ActionResponse;
+import org.elasticsearch.action.DocWriteRequest;
+import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link UpdateCallback} implementation for
+ * {@link ElasticSearchRestDataContext}.
+ */
+final class ElasticSearchRestUpdateCallback extends AbstractUpdateCallback {
+
+ private static final Logger logger = LoggerFactory.getLogger(ElasticSearchRestUpdateCallback.class);
+
+ private static final int BULK_BUFFER_SIZE = 1000;
+
+ private BulkRequest bulkRequest;
+ private int bulkActionCount = 0;
+ private final boolean isBatch;
+
+ public ElasticSearchRestUpdateCallback(final ElasticSearchRestDataContext dataContext, final boolean isBatch) {
+ super(dataContext);
+ this.isBatch = isBatch;
+ }
+
+ private boolean isBatch() {
+ return isBatch;
+ }
+
+ @Override
+ public ElasticSearchRestDataContext getDataContext() {
+ return (ElasticSearchRestDataContext) super.getDataContext();
+ }
+
+ @Override
+ public TableCreationBuilder createTable(final Schema schema, final String name) throws IllegalArgumentException,
+ IllegalStateException {
+ return new ElasticSearchRestCreateTableBuilder(this, schema, name);
+ }
+
+ @Override
+ public boolean isDropTableSupported() {
+ return false;
+ }
+
+ @Override
+ public TableDropBuilder dropTable(final Table table) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public RowInsertionBuilder insertInto(final Table table) throws IllegalArgumentException, IllegalStateException,
+ UnsupportedOperationException {
+ return new ElasticSearchRestInsertBuilder(this, table);
+ }
+
+ @Override
+ public boolean isDeleteSupported() {
+ return true;
+ }
+
+ @Override
+ public RowDeletionBuilder deleteFrom(final Table table) throws IllegalArgumentException, IllegalStateException,
+ UnsupportedOperationException {
+ return new ElasticSearchRestDeleteBuilder(this, table);
+ }
+
+ public void onExecuteUpdateFinished() {
+ if (isBatch()) {
+ flushBulkActions();
+ }
+
+ getDataContext().refreshSchemas();
+ }
+
+ private void flushBulkActions() {
+ if (bulkRequest == null || bulkActionCount == 0) {
+ // nothing to flush
+ return;
+ }
+
+ logger.info("Flushing {} actions to ElasticSearch index {}", bulkActionCount, getDataContext().getIndexName());
+ executeBlocking(bulkRequest);
+
+ bulkActionCount = 0;
+ bulkRequest = null;
+ }
+
+ public void execute(final ActionRequest action) {
+ if (isBatch() && (action instanceof DocWriteRequest<?>)) {
+ getBulkRequest().add((DocWriteRequest<?>) action);
+ bulkActionCount++;
+ if (bulkActionCount == BULK_BUFFER_SIZE) {
+ flushBulkActions();
+ }
+ } else {
+ executeBlocking(action);
+ }
+ }
+
+ private void executeBlocking(final ActionRequest action) {
+ try {
+ if (action instanceof PutMappingRequest) {
+ getDataContext().getElasticSearchClient().createMapping((PutMappingRequest) action);
+ } else {
+ final ActionResponse result = getDataContext().getElasticSearchClient().execute(action);
+
+ if (result instanceof BulkResponse && ((BulkResponse) result).hasFailures()) {
+ BulkItemResponse[] failedItems = ((BulkResponse) result).getItems();
+ for (int i = 0; i < failedItems.length; i++) {
+ if (failedItems[i].isFailed()) {
+ final BulkItemResponse failedItem = failedItems[i];
+ logger.error("Bulk failed with item no. {} of {}: id={} op={} status={} error={}", i + 1,
+ failedItems.length, failedItem.getId(), failedItem.getOpType(), failedItem.status(),
+ failedItem.getFailureMessage());
+ }
+ }
+ }
+ }
+ } catch (IOException e) {
+ logger.warn("Could not execute command {} ", action, e);
+ throw new MetaModelException("Could not execute " + action, e);
+ }
+ }
+
+ private BulkRequest getBulkRequest() {
+ if (bulkRequest == null) {
+ bulkRequest = new BulkRequest();
+ }
+ return bulkRequest;
+ }
+}
diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestClientExecutor.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestClientExecutor.java
deleted file mode 100644
index 1bb026d..0000000
--- a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestClientExecutor.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.metamodel.elasticsearch.rest;
-
-import io.searchbox.action.Action;
-import io.searchbox.client.JestClient;
-import io.searchbox.client.JestResult;
-import org.apache.metamodel.MetaModelException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-final class JestClientExecutor {
- private static final Logger logger = LoggerFactory.getLogger(JestClientExecutor.class);
-
- static <T extends JestResult> T execute(JestClient jestClient, Action<T> clientRequest) {
- return execute(jestClient, clientRequest, true);
- }
-
- static <T extends JestResult> T execute(JestClient jestClient, Action<T> clientRequest, boolean doThrow) {
- try {
- final T result = jestClient.execute(clientRequest);
- logger.debug("{} response: acknowledged={}", clientRequest, result.isSucceeded());
- return result;
- } catch (IOException e) {
- logger.warn("Could not execute command {} ", clientRequest, e);
- if (doThrow) {
- throw new MetaModelException("Could not execute command " + clientRequest, e);
- }
- }
-
- return null;
- }
-}
diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestDeleteScroll.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestDeleteScroll.java
deleted file mode 100644
index cc42b07..0000000
--- a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestDeleteScroll.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.metamodel.elasticsearch.rest;
-
-import io.searchbox.action.GenericResultAbstractAction;
-
-public class JestDeleteScroll extends GenericResultAbstractAction {
- private JestDeleteScroll(Builder builder) {
- super(builder);
- this.payload = builder.getScrollId();
- setURI(buildURI());
- }
-
- @Override
- public String getRestMethodName() {
- return "DELETE";
- }
-
- @Override
- protected String buildURI() {
- return super.buildURI() + "/_search/scroll";
- }
-
- public static class Builder extends GenericResultAbstractAction.Builder<JestDeleteScroll, Builder> {
- private final String scrollId;
-
- public Builder(String scrollId) {
- this.scrollId = scrollId;
- }
-
- @Override
- public JestDeleteScroll build() {
- return new JestDeleteScroll(this);
- }
-
- public String getScrollId() {
- return scrollId;
- }
- }
-
-}
diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDataSet.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDataSet.java
deleted file mode 100644
index 37e06dc..0000000
--- a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDataSet.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.metamodel.elasticsearch.rest;
-
-import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.metamodel.data.AbstractDataSet;
-import org.apache.metamodel.data.DataSet;
-import org.apache.metamodel.data.Row;
-import org.apache.metamodel.query.SelectItem;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.gson.JsonArray;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonPrimitive;
-
-import io.searchbox.client.JestClient;
-import io.searchbox.client.JestResult;
-import io.searchbox.core.SearchScroll;
-
-/**
- * {@link DataSet} implementation for ElasticSearch
- */
-final class JestElasticSearchDataSet extends AbstractDataSet {
-
- private static final Logger logger = LoggerFactory.getLogger(JestElasticSearchDataSet.class);
-
- private final JestClient _client;
- private final AtomicBoolean _closed;
-
- private JestResult _searchResponse;
- private JsonObject _currentHit;
- private int _hitIndex = 0;
-
- public JestElasticSearchDataSet(JestClient client, JestResult searchResponse, List<SelectItem> selectItems) {
- super(selectItems);
- _client = client;
- _searchResponse = searchResponse;
- _closed = new AtomicBoolean(false);
- }
-
-
- @Override
- public void close() {
- super.close();
- boolean closeNow = _closed.compareAndSet(true, false);
- if (closeNow) {
- final String scrollId = _searchResponse.getJsonObject().getAsJsonPrimitive("_scroll_id").getAsString();
- JestClientExecutor.execute(_client, new JestDeleteScroll.Builder(scrollId).build(), false);
- }
- }
-
- @Override
- protected void finalize() throws Throwable {
- super.finalize();
- if (!_closed.get()) {
- logger.warn("finalize() invoked, but DataSet is not closed. Invoking close() on {}", this);
- close();
- }
- }
-
- @Override
- public boolean next() {
- final JsonArray hits = _searchResponse.getJsonObject().getAsJsonObject("hits").getAsJsonArray("hits");
- if (hits.size() == 0) {
- // break condition for the scroll
- _currentHit = null;
- return false;
- }
-
- if (_hitIndex < hits.size()) {
- // pick the next hit within this search response
- _currentHit = hits.get(_hitIndex).getAsJsonObject();
- _hitIndex++;
- return true;
- }
-
- final JsonPrimitive scrollId = _searchResponse.getJsonObject().getAsJsonPrimitive("_scroll_id");
- if (scrollId == null) {
- // this search response is not scrollable - then it's the end.
- _currentHit = null;
- return false;
- }
-
- // try to scroll to the next set of hits
- final SearchScroll scroll = new SearchScroll.Builder(scrollId.getAsString(), ElasticSearchRestDataContext.TIMEOUT_SCROLL).build();
-
- _searchResponse = JestClientExecutor.execute(_client, scroll);
-
- // start over (recursively)
- _hitIndex = 0;
- return next();
- }
-
- @Override
- public Row getRow() {
- if (_currentHit == null) {
- return null;
- }
-
- final JsonObject source = _currentHit.getAsJsonObject("_source");
- final String documentId = _currentHit.get("_id").getAsString();
- return JestElasticSearchUtils.createRow(source, documentId, getHeader());
-
- }
-}
diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDropTableBuilder.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDropTableBuilder.java
deleted file mode 100644
index 8a1ac71..0000000
--- a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDropTableBuilder.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.metamodel.elasticsearch.rest;
-
-import org.apache.metamodel.MetaModelException;
-import org.apache.metamodel.drop.AbstractTableDropBuilder;
-import org.apache.metamodel.drop.TableDropBuilder;
-import org.apache.metamodel.schema.MutableSchema;
-import org.apache.metamodel.schema.Table;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import io.searchbox.indices.mapping.DeleteMapping;
-
-/**
- * {@link TableDropBuilder} for dropping tables (document types) in an
- * ElasticSearch index.
- */
-final class JestElasticSearchDropTableBuilder extends AbstractTableDropBuilder {
-
- private static final Logger logger = LoggerFactory.getLogger(JestElasticSearchDropTableBuilder.class);
-
- private final JestElasticSearchUpdateCallback _updateCallback;
-
- public JestElasticSearchDropTableBuilder(JestElasticSearchUpdateCallback updateCallback, Table table) {
- super(table);
- _updateCallback = updateCallback;
- }
-
- @Override
- public void execute() throws MetaModelException {
-
- final ElasticSearchRestDataContext dataContext = _updateCallback.getDataContext();
- final Table table = getTable();
- final String documentType = table.getName();
- logger.info("Deleting mapping / document type: {}", documentType);
-
- final DeleteMapping deleteIndex = new DeleteMapping.Builder(dataContext.getIndexName(), documentType).build();
-
- _updateCallback.execute(deleteIndex);
-
- final MutableSchema schema = (MutableSchema) table.getSchema();
- schema.removeTable(table);
-
- }
-}
diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchMetaDataParser.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchMetaDataParser.java
deleted file mode 100644
index 074de2e..0000000
--- a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchMetaDataParser.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.metamodel.elasticsearch.rest;
-
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-
-import org.apache.metamodel.elasticsearch.common.ElasticSearchMetaData;
-import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils;
-import org.apache.metamodel.schema.ColumnType;
-
-import java.util.Map.Entry;
-
-/**
- * Parser that transforms the ElasticSearch metadata response (json-like format)
- * into an ElasticSearchMetaData object.
- */
-final class JestElasticSearchMetaDataParser {
-
- /**
- * Parses the ElasticSearch meta data info into an ElasticSearchMetaData
- * object. This method makes much easier to create the ElasticSearch schema.
- *
- * @param metaDataInfo
- * ElasticSearch mapping metadata in Map format
- * @return An ElasticSearchMetaData object
- */
- public static ElasticSearchMetaData parse(JsonObject metaDataInfo) {
- final int columns = metaDataInfo.entrySet().size() + 1;
- final String[] fieldNames = new String[columns];
- final ColumnType[] columnTypes = new ColumnType[columns];
-
- // add the document ID field (fixed)
- fieldNames[0] = ElasticSearchRestDataContext.FIELD_ID;
- columnTypes[0] = ColumnType.STRING;
-
- int i = 1;
- for (Entry<String, JsonElement> metaDataField : metaDataInfo.entrySet()) {
- JsonElement fieldMetadata = metaDataField.getValue();
-
- fieldNames[i] = metaDataField.getKey();
- columnTypes[i] = getColumnTypeFromMetadataField(fieldMetadata);
- i++;
-
- }
- return new ElasticSearchMetaData(fieldNames, columnTypes);
- }
-
- private static ColumnType getColumnTypeFromMetadataField(JsonElement fieldMetadata) {
- final JsonElement typeElement = ((JsonObject) fieldMetadata).get("type");
- if (typeElement != null) {
- String metaDataFieldType = typeElement.getAsString();
-
- return ElasticSearchUtils.getColumnTypeFromElasticSearchType(metaDataFieldType);
- } else {
- return ColumnType.STRING;
- }
- }
-}
diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchUpdateCallback.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchUpdateCallback.java
deleted file mode 100644
index a61280a..0000000
--- a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchUpdateCallback.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.metamodel.elasticsearch.rest;
-
-import java.util.List;
-
-import org.apache.metamodel.AbstractUpdateCallback;
-import org.apache.metamodel.MetaModelException;
-import org.apache.metamodel.UpdateCallback;
-import org.apache.metamodel.create.TableCreationBuilder;
-import org.apache.metamodel.delete.RowDeletionBuilder;
-import org.apache.metamodel.drop.TableDropBuilder;
-import org.apache.metamodel.insert.RowInsertionBuilder;
-import org.apache.metamodel.schema.Schema;
-import org.apache.metamodel.schema.Table;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import io.searchbox.action.Action;
-import io.searchbox.action.BulkableAction;
-import io.searchbox.client.JestResult;
-import io.searchbox.core.Bulk;
-import io.searchbox.core.Bulk.Builder;
-import io.searchbox.core.BulkResult;
-import io.searchbox.core.BulkResult.BulkResultItem;
-import io.searchbox.indices.Refresh;
-
-/**
- * {@link UpdateCallback} implementation for
- * {@link ElasticSearchRestDataContext}.
- */
-final class JestElasticSearchUpdateCallback extends AbstractUpdateCallback {
-
- private static final Logger logger = LoggerFactory.getLogger(JestElasticSearchUpdateCallback.class);
-
- private static final int BULK_BUFFER_SIZE = 1000;
-
- private Bulk.Builder bulkBuilder;
- private int bulkActionCount = 0;
- private final boolean isBatch;
-
- public JestElasticSearchUpdateCallback(ElasticSearchRestDataContext dataContext, boolean isBatch) {
- super(dataContext);
- this.isBatch = isBatch;
- }
-
- private boolean isBatch() {
- return isBatch;
- }
-
- @Override
- public ElasticSearchRestDataContext getDataContext() {
- return (ElasticSearchRestDataContext) super.getDataContext();
- }
-
- @Override
- public TableCreationBuilder createTable(Schema schema, String name) throws IllegalArgumentException,
- IllegalStateException {
- return new JestElasticSearchCreateTableBuilder(this, schema, name);
- }
-
- @Override
- public boolean isDropTableSupported() {
- return true;
- }
-
- @Override
- public TableDropBuilder dropTable(Table table) throws IllegalArgumentException, IllegalStateException,
- UnsupportedOperationException {
- return new JestElasticSearchDropTableBuilder(this, table);
- }
-
- @Override
- public RowInsertionBuilder insertInto(Table table) throws IllegalArgumentException, IllegalStateException,
- UnsupportedOperationException {
- return new JestElasticSearchInsertBuilder(this, table);
- }
-
- @Override
- public boolean isDeleteSupported() {
- return true;
- }
-
- @Override
- public RowDeletionBuilder deleteFrom(Table table) throws IllegalArgumentException, IllegalStateException,
- UnsupportedOperationException {
- return new JestElasticSearchDeleteBuilder(this, table);
- }
-
- public void onExecuteUpdateFinished() {
- if (isBatch()) {
- flushBulkActions();
- }
-
- final String indexName = getDataContext().getIndexName();
- final Refresh refresh = new Refresh.Builder().addIndex(indexName).build();
-
- JestClientExecutor.execute(getDataContext().getElasticSearchClient(), refresh, false);
- }
-
- private void flushBulkActions() {
- if (bulkBuilder == null || bulkActionCount == 0) {
- // nothing to flush
- return;
- }
- final Bulk bulk = getBulkBuilder().build();
- logger.info("Flushing {} actions to ElasticSearch index {}", bulkActionCount, getDataContext().getIndexName());
- executeBlocking(bulk);
-
- bulkActionCount = 0;
- bulkBuilder = null;
- }
-
- public void execute(Action<?> action) {
- if (isBatch() && action instanceof BulkableAction) {
- final Bulk.Builder bulkBuilder = getBulkBuilder();
- bulkBuilder.addAction((BulkableAction<?>) action);
- bulkActionCount++;
- if (bulkActionCount == BULK_BUFFER_SIZE) {
- flushBulkActions();
- }
- } else {
- executeBlocking(action);
- }
- }
-
- private void executeBlocking(Action<?> action) {
- final JestResult result = JestClientExecutor.execute(getDataContext().getElasticSearchClient(), action);
- if (!result.isSucceeded()) {
- if (result instanceof BulkResult) {
- final List<BulkResultItem> failedItems = ((BulkResult) result).getFailedItems();
- for (int i = 0; i < failedItems.size(); i++) {
- final BulkResultItem failedItem = failedItems.get(i);
- logger.error("Bulk failed with item no. {} of {}: id={} op={} status={} error={}", i+1, failedItems.size(), failedItem.id, failedItem.operation, failedItem.status, failedItem.error);
- }
- }
- throw new MetaModelException(result.getResponseCode() + " - " + result.getErrorMessage());
- }
- }
-
- private Builder getBulkBuilder() {
- if (bulkBuilder == null) {
- bulkBuilder = new Bulk.Builder();
- bulkBuilder.defaultIndex(getDataContext().getIndexName());
- }
- return bulkBuilder;
- }
-}
diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchUtils.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchUtils.java
deleted file mode 100644
index 11a79b7..0000000
--- a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchUtils.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.metamodel.elasticsearch.rest;
-
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.metamodel.data.DataSetHeader;
-import org.apache.metamodel.data.DefaultRow;
-import org.apache.metamodel.data.Row;
-import org.apache.metamodel.elasticsearch.common.ElasticSearchDateConverter;
-import org.apache.metamodel.query.SelectItem;
-import org.apache.metamodel.schema.Column;
-import org.apache.metamodel.schema.ColumnType;
-import org.apache.metamodel.util.NumberComparator;
-
-import com.google.gson.Gson;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-
-/**
- * Shared/common util functions for the ElasticSearch MetaModel module.
- */
-final class JestElasticSearchUtils {
- public static Row createRow(JsonObject source, String documentId, DataSetHeader header) {
- final Object[] values = new Object[header.size()];
- for (int i = 0; i < values.length; i++) {
- final SelectItem selectItem = header.getSelectItem(i);
- final Column column = selectItem.getColumn();
-
- assert column != null;
- assert !selectItem.hasFunction();
-
- if (column.isPrimaryKey()) {
- values[i] = documentId;
- } else {
- values[i] = getDataFromColumnType(source.get(column.getName()), column.getType());
- }
- }
-
- return new DefaultRow(header, values);
- }
-
- private static Object getDataFromColumnType(JsonElement field, ColumnType type) {
- if (field == null || field.isJsonNull()) {
- return null;
- }
-
- if (field.isJsonObject()) {
- return new Gson().fromJson(field, Map.class);
- }
- if (field.isJsonArray()) {
- return new Gson().fromJson(field, List.class);
- }
-
- if (type.isNumber()) {
- // Pretty terrible workaround to avoid LazilyParsedNumber
- // (which is happily output, but not recognized by Jest/GSON).
- return NumberComparator.toNumber(field.getAsString());
- } else if (type.isTimeBased()) {
- final Date valueToDate = ElasticSearchDateConverter.tryToConvert(field.getAsString());
- if (valueToDate == null) {
- return field.getAsString();
- } else {
- return valueToDate;
- }
- } else if (type.isBoolean()) {
- return field.getAsBoolean();
- } else {
- return field.getAsString();
- }
- }
-}
diff --git a/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContexFactoryIT.java b/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContexFactoryIT.java
new file mode 100644
index 0000000..4705585
--- /dev/null
+++ b/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContexFactoryIT.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.elasticsearch.rest;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.http.HttpHost;
+import org.apache.metamodel.BatchUpdateScript;
+import org.apache.metamodel.UpdateCallback;
+import org.apache.metamodel.data.DataSet;
+import org.apache.metamodel.data.Row;
+import org.apache.metamodel.factory.DataContextFactory;
+import org.apache.metamodel.factory.DataContextPropertiesImpl;
+import org.apache.metamodel.schema.Column;
+import org.apache.metamodel.schema.ColumnType;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.RestClient;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ElasticSearchRestDataContexFactoryIT {
+ private static final String INDEX_NAME = "myindex";
+
+ private static ElasticSearchRestClient externalClient;
+
+ private String dockerHostAddress;
+
+ private DataContextFactory factory;
+
+ @Before
+ public void setUp() throws Exception {
+ dockerHostAddress = ElasticSearchRestDataContextIT.determineHostName();
+
+ externalClient = new ElasticSearchRestClient(RestClient.builder(new HttpHost(dockerHostAddress, 9200)).build());
+
+ final Map<String, Object> source = new LinkedHashMap<>();
+ source.put("mytext", "dummy");
+
+ final IndexRequest indexRequest = new IndexRequest(INDEX_NAME, "text");
+ indexRequest.source(source);
+
+ externalClient.index(indexRequest);
+
+ factory = new ElasticSearchRestDataContextFactory();
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ externalClient.delete(INDEX_NAME);
+ }
+
+ @Test
+ public void testAccepts() throws Exception {
+ final DataContextPropertiesImpl properties = new DataContextPropertiesImpl();
+ properties.setDataContextType("elasticsearch");
+ properties.put(DataContextPropertiesImpl.PROPERTY_URL, "http://" + dockerHostAddress + ":9200");
+ properties.put(DataContextPropertiesImpl.PROPERTY_DATABASE, INDEX_NAME);
+
+ assertTrue(factory.accepts(properties, null));
+ }
+
+ @Test
+ public void testCreateContextAndBulkScript() throws Exception {
+ final DataContextPropertiesImpl properties = new DataContextPropertiesImpl();
+ properties.setDataContextType("es-rest");
+ properties.put(DataContextPropertiesImpl.PROPERTY_URL, "http://" + dockerHostAddress + ":9200");
+ properties.put(DataContextPropertiesImpl.PROPERTY_DATABASE, INDEX_NAME);
+
+ assertTrue(factory.accepts(properties, null));
+
+ final ElasticSearchRestDataContext dataContext = (ElasticSearchRestDataContext) factory.create(properties,
+ null);
+
+ dataContext.executeUpdate(new BatchUpdateScript() {
+ @Override
+ public void run(UpdateCallback callback) {
+ callback.createTable(INDEX_NAME, "persons")
+ .withColumn("name").ofType(ColumnType.STRING)
+ .withColumn("age").ofType(ColumnType.INTEGER)
+ .execute();
+ }
+ });
+
+ dataContext.executeUpdate(new BatchUpdateScript() {
+ @Override
+ public void run(UpdateCallback callback) {
+ callback.insertInto("persons").value("name", "John Doe").value("age", 42).execute();
+ callback.insertInto("persons").value("name", "Jane Doe").value("age", 41).execute();
+ }
+ });
+
+ dataContext.refreshSchemas();
+
+ final DataSet persons = dataContext.executeQuery("SELECT name, age FROM persons");
+ final List<Row> personData = persons.toRows();
+
+ assertEquals(2, personData.size());
+
+ // Sort person data, so we can validate each row's values.
+ Column ageColumn = dataContext.getSchemaByName(INDEX_NAME).getTableByName("persons").getColumnByName("age");
+ personData.sort((row1, row2) -> ((Integer) row1.getValue(ageColumn)).compareTo(((Integer) row2.getValue(
+ ageColumn))));
+
+ assertThat(Arrays.asList(personData.get(0).getValues()), containsInAnyOrder("Jane Doe", 41));
+ assertThat(Arrays.asList(personData.get(1).getValues()), containsInAnyOrder("John Doe", 42));
+ }
+}
diff --git a/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDataContextTest.java b/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContextIT.java
similarity index 65%
rename from elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDataContextTest.java
rename to elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContextIT.java
index 53dbdf6..7d5eb12 100644
--- a/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDataContextTest.java
+++ b/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContextIT.java
@@ -18,9 +18,22 @@
*/
package org.apache.metamodel.elasticsearch.rest;
-import io.searchbox.client.JestClient;
-import io.searchbox.client.JestClientFactory;
-import io.searchbox.client.config.HttpClientConfig;
+import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.swing.table.TableModel;
+
+import org.apache.http.HttpHost;
import org.apache.metamodel.MetaModelHelper;
import org.apache.metamodel.UpdateCallback;
import org.apache.metamodel.UpdateScript;
@@ -31,8 +44,7 @@
import org.apache.metamodel.data.InMemoryDataSet;
import org.apache.metamodel.data.Row;
import org.apache.metamodel.delete.DeleteFrom;
-import org.apache.metamodel.drop.DropTable;
-import org.apache.metamodel.elasticsearch.rest.utils.EmbeddedElasticsearchServer;
+import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils;
import org.apache.metamodel.query.FunctionType;
import org.apache.metamodel.query.Query;
import org.apache.metamodel.query.SelectItem;
@@ -42,51 +54,43 @@
import org.apache.metamodel.schema.Schema;
import org.apache.metamodel.schema.Table;
import org.apache.metamodel.update.Update;
-import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
-import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
-import org.elasticsearch.action.admin.indices.mapping.delete.DeleteMappingRequestBuilder;
-import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
-import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequestBuilder;
-import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
-import org.elasticsearch.action.bulk.BulkRequestBuilder;
-import org.elasticsearch.client.IndicesAdminClient;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.xcontent.XContentBuilder;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
-import javax.swing.table.TableModel;
-import java.io.IOException;
-import java.util.*;
-
-import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
-import static org.junit.Assert.*;
-
-public class JestElasticSearchDataContextTest {
+public class ElasticSearchRestDataContextIT {
+ private static final String DEFAULT_DOCKER_HOST_NAME = "localhost";
private static final String indexName = "twitter";
private static final String indexType1 = "tweet1";
private static final String indexType2 = "tweet2";
- private static final String indexName2 = "twitter2";
- private static final String indexType3 = "tweet3";
private static final String bulkIndexType = "bulktype";
private static final String peopleIndexType = "peopletype";
- private static final String mapping =
- "{\"date_detection\":\"false\",\"properties\":{\"message\":{\"type\":\"string\",\"index\":\"not_analyzed\",\"doc_values\":\"true\"}}}";
- private static EmbeddedElasticsearchServer embeddedElasticsearchServer;
- private static JestClient client;
- private static UpdateableDataContext dataContext;
- @BeforeClass
- public static void beforeTests() throws Exception {
- embeddedElasticsearchServer = new EmbeddedElasticsearchServer();
- final int port = Integer.parseInt(embeddedElasticsearchServer.getClient().settings().get("http.port"));
- JestClientFactory factory = new JestClientFactory();
- factory.setHttpClientConfig(new HttpClientConfig
- .Builder("http://localhost:" + port)
- .multiThreaded(true)
- .build());
- client = factory.getObject();
+ private static ElasticSearchRestClient client;
+
+ private static UpdateableDataContext dataContext;
+
+ public static String determineHostName() throws URISyntaxException {
+ final String dockerHost = System.getenv("DOCKER_HOST");
+
+ if (dockerHost == null) {
+ // If no value is returned for the DOCKER_HOST environment variable fall back to a default.
+ return DEFAULT_DOCKER_HOST_NAME;
+ } else {
+ return (new URI(dockerHost)).getHost();
+ }
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ final String dockerHostAddress = determineHostName();
+
+ client = new ElasticSearchRestClient(RestClient.builder(new HttpHost(dockerHostAddress, 9200)).build());
indexTweeterDocument(indexType1, 1);
indexTweeterDocument(indexType2, 1);
@@ -94,13 +98,15 @@
insertPeopleDocuments();
indexTweeterDocument(indexType2, 1);
indexBulkDocuments(indexName, bulkIndexType, 10);
+
+ client.refresh(indexName);
- // The refresh API allows to explicitly refresh one or more index,
- // making all operations performed since the last refresh available for
- // search
dataContext = new ElasticSearchRestDataContext(client, indexName);
- Thread.sleep(1000);
- System.out.println("Embedded ElasticSearch server created!");
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ client.delete(indexName);
}
private static void insertPeopleDocuments() throws IOException {
@@ -115,12 +121,6 @@
indexOnePeopleDocument("male", 18, 4);
}
- @AfterClass
- public static void afterTests() {
- embeddedElasticsearchServer.shutdown();
- System.out.println("Embedded ElasticSearch server shut down!");
- }
-
@Test
public void testSimpleQuery() throws Exception {
assertEquals("[bulktype, peopletype, tweet1, tweet2]",
@@ -128,14 +128,14 @@
Table table = dataContext.getDefaultSchema().getTableByName("tweet1");
- assertEquals("[_id, message, postDate, user]", Arrays.toString(table.getColumnNames().toArray()));
+ assertThat(table.getColumnNames(), containsInAnyOrder("_id", "message", "postDate", "user"));
assertEquals(ColumnType.STRING, table.getColumnByName("user").getType());
assertEquals(ColumnType.DATE, table.getColumnByName("postDate").getType());
assertEquals(ColumnType.BIGINT, table.getColumnByName("message").getType());
try (DataSet ds = dataContext.query().from(indexType1).select("user").and("message").execute()) {
- assertEquals(JestElasticSearchDataSet.class, ds.getClass());
+ assertEquals(ElasticSearchRestDataSet.class, ds.getClass());
assertTrue(ds.next());
assertEquals("Row[values=[user1, 1]]", ds.getRow().toString());
@@ -162,8 +162,8 @@
try (DataSet ds = dataContext.query().from(table).selectAll().where(pks[0]).eq("tweet_tweet2_1").execute()) {
assertTrue(ds.next());
- Object dateValue = ds.getRow().getValue(2);
- assertEquals("Row[values=[tweet_tweet2_1, 1, " + dateValue + ", user1]]", ds.getRow().toString());
+ Object dateValue = ds.getRow().getValue(1);
+ assertEquals("Row[values=[tweet_tweet2_1, " + dateValue + ", 1, user1]]", ds.getRow().toString());
assertFalse(ds.next());
@@ -200,7 +200,7 @@
}
@Test
- public void testCreateTableInsertQueryAndDrop() throws Exception {
+ public void testCreateTableAndInsertQuery() throws Exception {
final Schema schema = dataContext.getDefaultSchema();
final CreateTable createTable = new CreateTable(schema, "testCreateTable");
createTable.withColumn("foo").ofType(ColumnType.STRING);
@@ -209,7 +209,7 @@
final Table table = schema.getTableByName("testCreateTable");
assertNotNull(table);
- assertEquals("[" + ElasticSearchRestDataContext.FIELD_ID + ", foo, bar]", Arrays.toString(table.getColumnNames().toArray()));
+ assertEquals("[" + ElasticSearchUtils.FIELD_ID + ", foo, bar]", Arrays.toString(table.getColumnNames().toArray()));
final Column fooColumn = table.getColumnByName("foo");
final Column idColumn = table.getPrimaryKeys().get(0);
@@ -236,32 +236,6 @@
assertNotNull(ds.getRow().getValue(idColumn));
assertFalse(ds.next());
}
-
- dataContext.executeUpdate(new DropTable(table));
-
- dataContext.refreshSchemas();
-
- assertNull(dataContext.getTableByQualifiedLabel(table.getName()));
- }
-
- @Test
- public void testDetectOutsideChanges() throws Exception {
- // Create the type in ES
- final IndicesAdminClient indicesAdmin = embeddedElasticsearchServer.getClient().admin().indices();
- final String tableType = "outsideTable";
-
- Object[] sourceProperties = { "testA", "type=string, store=true", "testB", "type=string, store=true" };
-
- new PutMappingRequestBuilder(indicesAdmin).setIndices(indexName).setType(tableType).setSource(sourceProperties)
- .execute().actionGet();
-
- dataContext.refreshSchemas();
-
- assertNotNull(dataContext.getDefaultSchema().getTableByName(tableType));
-
- new DeleteMappingRequestBuilder(indicesAdmin).setIndices(indexName).setType(tableType).execute().actionGet();
- dataContext.refreshSchemas();
- assertNull(dataContext.getTableByQualifiedLabel(tableType));
}
@Test
@@ -287,8 +261,6 @@
Row row = MetaModelHelper.executeSingleRowQuery(dataContext, dataContext.query().from(table).selectCount()
.toQuery());
assertEquals("Count is wrong", 0, ((Number) row.getValue(0)).intValue());
-
- dataContext.executeUpdate(new DropTable(table));
}
@Test
@@ -314,8 +286,6 @@
Row row = MetaModelHelper.executeSingleRowQuery(dataContext,
dataContext.query().from(table).select("foo", "bar").toQuery());
assertEquals("Row[values=[world, 43]]", row.toString());
-
- dataContext.executeUpdate(new DropTable(table));
}
@Test
@@ -327,27 +297,22 @@
dataContext.executeUpdate(createTable);
final Table table = schema.getTableByName("testCreateTable");
- try {
- dataContext.executeUpdate(new UpdateScript() {
- @Override
- public void run(UpdateCallback callback) {
- callback.insertInto(table).value("foo", "hello").value("bar", 42).execute();
- callback.insertInto(table).value("foo", "world").value("bar", 43).execute();
- }
- });
-
- // greater than is not yet supported
- try {
- dataContext.executeUpdate(new DeleteFrom(table).where("bar").gt(40));
- fail("Exception expected");
- } catch (UnsupportedOperationException e) {
- assertEquals("Could not push down WHERE items to delete by query request: [testCreateTable.bar > 40]",
- e.getMessage());
+ dataContext.executeUpdate(new UpdateScript() {
+ @Override
+ public void run(UpdateCallback callback) {
+ callback.insertInto(table).value("foo", "hello").value("bar", 42).execute();
+ callback.insertInto(table).value("foo", "world").value("bar", 43).execute();
}
+ });
- } finally {
- dataContext.executeUpdate(new DropTable(table));
+ // greater than is not supported
+ try {
+ dataContext.executeUpdate(new DeleteFrom(table).where("bar").gt(40));
+ fail("Exception expected");
+ } catch (UnsupportedOperationException e) {
+ assertEquals("Could not push down WHERE items to delete by query request: [testCreateTable.bar > 40]", e
+ .getMessage());
}
}
@@ -360,61 +325,31 @@
dataContext.executeUpdate(createTable);
final Table table = schema.getTableByName("testCreateTable");
- try {
- dataContext.executeUpdate(new UpdateScript() {
- @Override
- public void run(UpdateCallback callback) {
- callback.insertInto(table).value("foo", "hello").value("bar", 42).execute();
- callback.insertInto(table).value("foo", "world").value("bar", 43).execute();
- }
- });
+ dataContext.executeUpdate(new UpdateScript() {
+ @Override
+ public void run(UpdateCallback callback) {
+ callback.insertInto(table).value("foo", "hello").value("bar", 42).execute();
+ callback.insertInto(table).value("foo", "world").value("bar", 43).execute();
+ }
+ });
- dataContext.executeUpdate(new Update(table).value("foo", "howdy").where("bar").eq(42));
-
- DataSet dataSet = dataContext.query().from(table).select("foo", "bar").orderBy("bar").execute();
- assertTrue(dataSet.next());
- assertEquals("Row[values=[howdy, 42]]", dataSet.getRow().toString());
- assertTrue(dataSet.next());
- assertEquals("Row[values=[world, 43]]", dataSet.getRow().toString());
- assertFalse(dataSet.next());
- dataSet.close();
- } finally {
- dataContext.executeUpdate(new DropTable(table));
- }
- }
-
- @Test
- public void testDropTable() throws Exception {
- Table table = dataContext.getDefaultSchema().getTableByName(peopleIndexType);
-
- // assert that the table was there to begin with
- {
- DataSet ds = dataContext.query().from(table).selectCount().execute();
- ds.next();
- assertEquals("Count is wrong", 9, ((Number) ds.getRow().getValue(0)).intValue());
- ds.close();
- }
-
- dataContext.executeUpdate(new DropTable(table));
- try {
- DataSet ds = dataContext.query().from(table).selectCount().execute();
- ds.next();
- assertEquals("Count is wrong", 0, ((Number) ds.getRow().getValue(0)).intValue());
- ds.close();
- } finally {
- // restore the people documents for the next tests
- insertPeopleDocuments();
- embeddedElasticsearchServer.getClient().admin().indices().prepareRefresh().execute().actionGet();
- dataContext = new ElasticSearchRestDataContext(client, indexName);
- }
+ dataContext.executeUpdate(new Update(table).value("foo", "howdy").where("bar").eq(42));
+
+ DataSet dataSet = dataContext.query().from(table).select("foo", "bar").orderBy("bar").execute();
+ assertTrue(dataSet.next());
+ assertEquals("Row[values=[howdy, 42]]", dataSet.getRow().toString());
+ assertTrue(dataSet.next());
+ assertEquals("Row[values=[world, 43]]", dataSet.getRow().toString());
+ assertFalse(dataSet.next());
+ dataSet.close();
}
@Test
public void testWhereColumnEqualsValues() throws Exception {
try (DataSet ds = dataContext.query().from(bulkIndexType).select("user").and("message").where("user")
.isEquals("user4").execute()) {
- assertEquals(JestElasticSearchDataSet.class, ds.getClass());
+ assertEquals(ElasticSearchRestDataSet.class, ds.getClass());
assertTrue(ds.next());
assertEquals("Row[values=[user4, 4]]", ds.getRow().toString());
@@ -426,7 +361,7 @@
public void testWhereColumnIsNullValues() throws Exception {
try (DataSet ds = dataContext.query().from(indexType2).select("message").where("postDate")
.isNull().execute()) {
- assertEquals(JestElasticSearchDataSet.class, ds.getClass());
+ assertEquals(ElasticSearchRestDataSet.class, ds.getClass());
assertTrue(ds.next());
assertEquals("Row[values=[2]]", ds.getRow().toString());
@@ -438,7 +373,7 @@
public void testWhereColumnIsNotNullValues() throws Exception {
try (DataSet ds = dataContext.query().from(indexType2).select("message").where("postDate")
.isNotNull().execute()) {
- assertEquals(JestElasticSearchDataSet.class, ds.getClass());
+ assertEquals(ElasticSearchRestDataSet.class, ds.getClass());
assertTrue(ds.next());
assertEquals("Row[values=[1]]", ds.getRow().toString());
@@ -450,7 +385,7 @@
public void testWhereMultiColumnsEqualValues() throws Exception {
try (DataSet ds = dataContext.query().from(bulkIndexType).select("user").and("message").where("user")
.isEquals("user4").and("message").ne(5).execute()) {
- assertEquals(JestElasticSearchDataSet.class, ds.getClass());
+ assertEquals(ElasticSearchRestDataSet.class, ds.getClass());
assertTrue(ds.next());
assertEquals("Row[values=[user4, 4]]", ds.getRow().toString());
@@ -546,54 +481,39 @@
dataContext.query().from(indexType1).select("nonExistingField").execute();
}
- @Test
- public void testNonDynamicMapingTableNames() throws Exception {
- createIndex();
-
- ElasticSearchRestDataContext dataContext2 = new ElasticSearchRestDataContext(client, indexName2);
-
- assertEquals("[tweet3]", Arrays.toString(dataContext2.getDefaultSchema().getTableNames().toArray()));
- }
-
- private static void createIndex() {
- CreateIndexRequest cir = new CreateIndexRequest(indexName2);
- CreateIndexResponse response =
- embeddedElasticsearchServer.getClient().admin().indices().create(cir).actionGet();
-
- System.out.println("create index: " + response.isAcknowledged());
-
- PutMappingRequest pmr = new PutMappingRequest(indexName2).type(indexType3).source(mapping);
-
- PutMappingResponse response2 =
- embeddedElasticsearchServer.getClient().admin().indices().putMapping(pmr).actionGet();
- System.out.println("put mapping: " + response2.isAcknowledged());
- }
-
- private static void indexBulkDocuments(String indexName, String indexType, int numberOfDocuments) {
- BulkRequestBuilder bulkRequest = embeddedElasticsearchServer.getClient().prepareBulk();
+ private static void indexBulkDocuments(final String indexName, final String indexType, final int numberOfDocuments)
+ throws IOException {
+ final BulkRequest bulkRequest = new BulkRequest();
for (int i = 0; i < numberOfDocuments; i++) {
- bulkRequest.add(embeddedElasticsearchServer.getClient().prepareIndex(indexName, indexType,
- Integer.toString(i)).setSource(
- buildTweeterJson(i)));
+ final IndexRequest indexRequest = new IndexRequest(indexName, indexType, Integer.toString(i));
+ indexRequest.source(buildTweeterJson(i));
+
+ bulkRequest.add(indexRequest);
}
- bulkRequest.execute().actionGet();
+
+ client.bulk(bulkRequest);
}
- private static void indexTweeterDocument(String indexType, int id, Date date) {
- embeddedElasticsearchServer.getClient().prepareIndex(indexName, indexType).setSource(buildTweeterJson(id, date))
- .setId("tweet_" + indexType + "_" + id).execute().actionGet();
+ private static void indexTweeterDocument(final String indexType, final int id, final Date date) throws IOException {
+ final IndexRequest indexRequest = new IndexRequest(indexName, indexType, "tweet_" + indexType + "_" + id);
+ indexRequest.source(buildTweeterJson(id, date));
+
+ client.index(indexRequest);
}
- private static void indexTweeterDocument(String indexType, int id) {
- embeddedElasticsearchServer.getClient().prepareIndex(indexName, indexType).setSource(buildTweeterJson(id))
- .setId("tweet_" + indexType + "_" + id).execute().actionGet();
+ private static void indexTweeterDocument(String indexType, int id) throws IOException {
+ final IndexRequest indexRequest = new IndexRequest(indexName, indexType, "tweet_" + indexType + "_" + id);
+ indexRequest.source(buildTweeterJson(id));
+
+ client.index(indexRequest);
}
private static void indexOnePeopleDocument(String gender, int age, int id) throws IOException {
- embeddedElasticsearchServer.getClient().prepareIndex(indexName, peopleIndexType)
- .setSource(buildPeopleJson(gender, age, id)).execute()
- .actionGet();
+ final IndexRequest indexRequest = new IndexRequest(indexName, peopleIndexType);
+ indexRequest.source(buildPeopleJson(gender, age, id));
+
+ client.index(indexRequest);
}
private static Map<String, Object> buildTweeterJson(int elementId) {
diff --git a/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchMetaDataParserTest.java b/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchMetaDataParserTest.java
deleted file mode 100644
index 6eeac6a..0000000
--- a/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchMetaDataParserTest.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.metamodel.elasticsearch.rest;
-
-import java.util.LinkedHashMap;
-import java.util.Map;
-
-import junit.framework.TestCase;
-
-import org.apache.metamodel.elasticsearch.common.ElasticSearchMetaData;
-import org.apache.metamodel.schema.ColumnType;
-import org.elasticsearch.common.collect.MapBuilder;
-
-import com.google.gson.Gson;
-import com.google.gson.JsonObject;
-
-public class JestElasticSearchMetaDataParserTest extends TestCase {
-
- public void testParseMetadataInfo() throws Exception {
- Map<String, Object> metadata = new LinkedHashMap<>();
- metadata.put("message", MapBuilder.newMapBuilder().put("type", "long").immutableMap());
- metadata.put("postDate", MapBuilder.newMapBuilder().put("type", "date").put("format", "dateOptionalTime").immutableMap());
- metadata.put("anotherDate", MapBuilder.newMapBuilder().put("type", "date").put("format", "dateOptionalTime").immutableMap());
- metadata.put("user", MapBuilder.newMapBuilder().put("type", "string").immutableMap());
- metadata.put("critical", MapBuilder.newMapBuilder().put("type", "boolean").immutableMap());
- metadata.put("income", MapBuilder.newMapBuilder().put("type", "double").immutableMap());
- metadata.put("untypedthingie", MapBuilder.newMapBuilder().put("foo", "bar").immutableMap());
- final Gson gson = new Gson();
- ElasticSearchMetaData metaData = JestElasticSearchMetaDataParser
- .parse((JsonObject) gson.toJsonTree(metadata));
- String[] columnNames = metaData.getColumnNames();
- ColumnType[] columnTypes = metaData.getColumnTypes();
-
- assertTrue(columnNames.length == 8);
- assertEquals(columnNames[0], "_id");
- assertEquals(columnNames[1], "message");
- assertEquals(columnNames[2], "postDate");
- assertEquals(columnNames[3], "anotherDate");
- assertEquals(columnNames[4], "user");
- assertEquals(columnNames[5], "critical");
- assertEquals(columnNames[6], "income");
- assertEquals(columnNames[7], "untypedthingie");
-
- assertTrue(columnTypes.length == 8);
- assertEquals(columnTypes[0], ColumnType.STRING);
- assertEquals(columnTypes[1], ColumnType.BIGINT);
- assertEquals(columnTypes[2], ColumnType.DATE);
- assertEquals(columnTypes[3], ColumnType.DATE);
- assertEquals(columnTypes[4], ColumnType.STRING);
- assertEquals(columnTypes[5], ColumnType.BOOLEAN);
- assertEquals(columnTypes[6], ColumnType.DOUBLE);
- assertEquals(columnTypes[7], ColumnType.STRING);
- }
-}
diff --git a/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchUtilsTest.java b/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchUtilsTest.java
deleted file mode 100644
index 4c8cca1..0000000
--- a/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchUtilsTest.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.metamodel.elasticsearch.rest;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-import com.google.common.collect.Lists;
-import org.apache.metamodel.data.DataSetHeader;
-import org.apache.metamodel.data.Row;
-import org.apache.metamodel.data.SimpleDataSetHeader;
-import org.apache.metamodel.query.SelectItem;
-import org.apache.metamodel.schema.Column;
-import org.apache.metamodel.schema.ColumnType;
-import org.apache.metamodel.schema.MutableColumn;
-import org.junit.Test;
-
-import com.google.gson.JsonArray;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonPrimitive;
-
-public class JestElasticSearchUtilsTest {
-
- @Test
- public void testAssignDocumentIdForPrimaryKeys() throws Exception {
- MutableColumn primaryKeyColumn = new MutableColumn("value1", ColumnType.STRING).setPrimaryKey(true);
- SelectItem primaryKeyItem = new SelectItem(primaryKeyColumn);
- List<SelectItem> selectItems1 = Collections.singletonList(primaryKeyItem);
- String documentId = "doc1";
- DataSetHeader header = new SimpleDataSetHeader(selectItems1);
- JsonObject values = new JsonObject();
-
- values.addProperty("value1", "theValue");
- Row row = JestElasticSearchUtils.createRow(values, documentId, header);
- String primaryKeyValue = (String) row.getValue(primaryKeyItem);
-
- assertEquals(primaryKeyValue, documentId);
- }
-
- @Test
- public void testCreateRowWithNullValues() throws Exception {
- final Column col1 = new MutableColumn("col1", ColumnType.STRING);
- final Column col2 = new MutableColumn("col2", ColumnType.STRING);
- final DataSetHeader header = new SimpleDataSetHeader(Lists.newArrayList(col1, col2).stream().map(SelectItem::new).collect(Collectors.toList()));
- final JsonObject source = new JsonObject();
- source.addProperty("col1", "foo");
- source.addProperty("col2", (String) null);
- final String documentId = "row1";
-
- final Row row = JestElasticSearchUtils.createRow(source, documentId, header);
- assertEquals("Row[values=[foo, null]]", row.toString());
- }
-
- @Test
- public void testCreateRowWithNumberValueAndStringType() throws Exception {
- final Column col1 = new MutableColumn("col1", ColumnType.STRING);
- final DataSetHeader header = SimpleDataSetHeader.fromColumns(Lists.newArrayList(col1));
- final JsonObject source = new JsonObject();
- source.addProperty("col1", 42);
- final String documentId = "row1";
-
- final Row row = JestElasticSearchUtils.createRow(source, documentId, header);
- assertEquals("Row[values=[42]]", row.toString());
- }
-
- @Test
- public void testCreateRowWithStringValueAndNumberType() throws Exception {
- final Column col1 = new MutableColumn("col1", ColumnType.NUMBER);
- final DataSetHeader header = SimpleDataSetHeader.fromColumns(Lists.newArrayList(col1));
- final JsonObject source = new JsonObject();
- source.addProperty("col1", "hello world");
- final String documentId = "row1";
-
- final Row row = JestElasticSearchUtils.createRow(source, documentId, header);
-
- // whether or not 'null' should be returned (bad value, but preserves
- // type) or 'hello world' should be returned (correct value, breaks
- // type) can be debated. For now it is added here as an assertion to
- // keep track of any regressions.
- assertEquals("Row[values=[null]]", row.toString());
- }
-
- @Test
- public void testCreateRowWithJsonObject() throws Exception {
- final Column col1 = new MutableColumn("col1", ColumnType.MAP);
- final DataSetHeader header = SimpleDataSetHeader.fromColumns(Lists.newArrayList(col1));
- final JsonObject source = new JsonObject();
- final JsonObject value = new JsonObject();
- value.addProperty("foo1", "bar");
- value.addProperty("foo2", 42);
- source.add("col1", value);
- final String documentId = "row1";
-
- final Row row = JestElasticSearchUtils.createRow(source, documentId, header);
- assertEquals("Row[values=[{foo1=bar, foo2=42.0}]]", row.toString());
-
- final Map<?, ?> rowValue = (Map<?, ?>) row.getValue(col1);
- assertEquals("bar", rowValue.get("foo1"));
- }
-
- @Test
- public void testCreateRowWithJsonArray() throws Exception {
- final Column col1 = new MutableColumn("col1", ColumnType.LIST);
- final DataSetHeader header = SimpleDataSetHeader.fromColumns(Lists.newArrayList(col1));
- final JsonObject source = new JsonObject();
- final JsonArray value = new JsonArray();
- value.add(new JsonPrimitive("foo"));
- value.add(new JsonPrimitive("bar"));
- source.add("col1", value);
- final String documentId = "row1";
-
- final Row row = JestElasticSearchUtils.createRow(source, documentId, header);
- assertEquals("Row[values=[[foo, bar]]]", row.toString());
-
- final List<?> rowValue = (List<?>) row.getValue(col1);
- assertEquals("foo", rowValue.get(0));
- }
-
- @Test
- public void testCreateRowWithDeepNesting() throws Exception {
- final Column col1 = new MutableColumn("col1", ColumnType.LIST);
- final DataSetHeader header = SimpleDataSetHeader.fromColumns(Lists.newArrayList(col1));
- final JsonObject source = new JsonObject();
-
- final JsonObject obj2 = new JsonObject();
- obj2.addProperty("foo", 43);
-
- final JsonArray arr1 = new JsonArray();
- arr1.add(new JsonPrimitive("foo"));
- arr1.add(new JsonPrimitive("bar"));
- arr1.add(obj2);
-
- final JsonObject obj1 = new JsonObject();
- obj1.addProperty("mybool", true);
- obj1.add("arr1", arr1);
- source.add("col1", obj1);
- final String documentId = "row1";
-
- final Row row = JestElasticSearchUtils.createRow(source, documentId, header);
- assertEquals("Row[values=[{mybool=true, arr1=[foo, bar, {foo=43.0}]}]]", row.toString());
-
- final Map<?, ?> rowObj1 = (Map<?, ?>) row.getValue(col1);
- final List<?> rowList = (List<?>) rowObj1.get("arr1");
- final Map<?, ?> rowObj2 = (Map<?, ?>) rowList.get(2);
- assertEquals(43.0, rowObj2.get("foo"));
- }
-
- @Test
- public void testCreateRowWithParseableDates() throws Exception {
- SelectItem item1 = new SelectItem(new MutableColumn("value1", ColumnType.STRING));
- SelectItem item2 = new SelectItem(new MutableColumn("value2", ColumnType.DATE));
- List<SelectItem> selectItems1 = Arrays.asList(item1, item2);
- String documentId = "doc1";
- DataSetHeader header = new SimpleDataSetHeader(selectItems1);
- JsonObject values = new JsonObject();
- values.addProperty("value1", "theValue");
- values.addProperty("value2", "2013-01-04T15:55:51.217+01:00");
- Row row = JestElasticSearchUtils.createRow(values, documentId, header);
- Object stringValue = row.getValue(item1);
- Object dateValue = row.getValue(item2);
-
- assertTrue(stringValue instanceof String);
- assertTrue(dateValue instanceof Date);
- }
-}
diff --git a/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/utils/EmbeddedElasticsearchServer.java b/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/utils/EmbeddedElasticsearchServer.java
deleted file mode 100644
index 11e7eb5..0000000
--- a/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/utils/EmbeddedElasticsearchServer.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.metamodel.elasticsearch.rest.utils;
-
-import org.apache.commons.io.FileUtils;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.common.settings.ImmutableSettings;
-import org.elasticsearch.node.Node;
-
-import java.io.File;
-import java.io.IOException;
-
-import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
-
-public class EmbeddedElasticsearchServer {
-
- private static final String DEFAULT_DATA_DIRECTORY = "target/elasticsearch-data";
-
- private final Node node;
- private final String dataDirectory;
-
- public EmbeddedElasticsearchServer() {
- this(DEFAULT_DATA_DIRECTORY);
- }
-
- public EmbeddedElasticsearchServer(String dataDirectory) {
- this.dataDirectory = dataDirectory;
-
- ImmutableSettings.Builder elasticsearchSettings = ImmutableSettings.settingsBuilder()
- .put("http.enabled", "true")
- .put("path.data", dataDirectory)
- .put("http.port", 9292);
-
- node = nodeBuilder()
- .local(true)
- .settings(elasticsearchSettings.build())
- .node();
- }
-
- public Client getClient() {
- return node.client();
- }
-
- public void shutdown() {
- node.close();
- deleteDataDirectory();
- }
-
- private void deleteDataDirectory() {
- try {
- FileUtils.deleteDirectory(new File(dataDirectory));
- } catch (IOException e) {
- throw new RuntimeException("Could not delete data directory of embedded elasticsearch server", e);
- }
- }
-}
\ No newline at end of file
diff --git a/elasticsearch/rest/src/test/resources/Dockerfile b/elasticsearch/rest/src/test/resources/Dockerfile
new file mode 100644
index 0000000..6c10f8e
--- /dev/null
+++ b/elasticsearch/rest/src/test/resources/Dockerfile
@@ -0,0 +1,5 @@
+FROM docker.elastic.co/elasticsearch/elasticsearch:5.6.3
+ADD elasticsearch.yml /usr/share/elasticsearch/config/
+USER root
+RUN chown elasticsearch:elasticsearch config/elasticsearch.yml
+USER elasticsearch
\ No newline at end of file
diff --git a/elasticsearch/rest/src/test/resources/elasticsearch.yml b/elasticsearch/rest/src/test/resources/elasticsearch.yml
new file mode 100644
index 0000000..ba7c07f
--- /dev/null
+++ b/elasticsearch/rest/src/test/resources/elasticsearch.yml
@@ -0,0 +1,13 @@
+bootstrap.memory_lock: true
+cluster.name: docker-cluster
+http.port: 9200
+node.data: true
+node.ingest: true
+node.master: true
+node.max_local_storage_nodes: 1
+node.name: estest
+path.data: /usr/share/elasticsearch/data
+path.logs: /usr/share/elasticsearch/logs
+transport.tcp.port: 9300
+discovery.type: single-node
+network.host: 0.0.0.0
diff --git a/full/src/main/java/org/apache/metamodel/DataContextFactory.java b/full/src/main/java/org/apache/metamodel/DataContextFactory.java
index 427101e..eeb7df5 100644
--- a/full/src/main/java/org/apache/metamodel/DataContextFactory.java
+++ b/full/src/main/java/org/apache/metamodel/DataContextFactory.java
@@ -33,6 +33,7 @@
import org.apache.metamodel.csv.CsvConfiguration;
import org.apache.metamodel.csv.CsvDataContext;
import org.apache.metamodel.elasticsearch.nativeclient.ElasticSearchDataContext;
+import org.apache.metamodel.elasticsearch.rest.ElasticSearchRestClient;
import org.apache.metamodel.elasticsearch.rest.ElasticSearchRestDataContext;
import org.apache.metamodel.excel.ExcelConfiguration;
import org.apache.metamodel.excel.ExcelDataContext;
@@ -64,8 +65,6 @@
import com.mongodb.ServerAddress;
import com.mongodb.client.MongoDatabase;
-import io.searchbox.client.JestClient;
-
/**
* A factory for DataContext objects. This class substantially easens the task
* of creating and initializing DataContext objects and/or their strategies for
@@ -681,7 +680,8 @@
* The ElasticSearch index name
* @return a DataContext object that matches the request
*/
- public static UpdateableDataContext createElasticSearchDataContext(JestClient client, String indexName) {
+ public static UpdateableDataContext createElasticSearchDataContext(final ElasticSearchRestClient client,
+ final String indexName) {
return new ElasticSearchRestDataContext(client, indexName);
}
diff --git a/hbase/pom.xml b/hbase/pom.xml
index 172e2f6..6659dad 100644
--- a/hbase/pom.xml
+++ b/hbase/pom.xml
@@ -111,6 +111,10 @@
<artifactId>netty</artifactId>
</exclusion>
<exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-all</artifactId>
+ </exclusion>
+ <exclusion>
<groupId>commons-httpclient</groupId>
<artifactId>commons-httpclient</artifactId>
</exclusion>
diff --git a/pom.xml b/pom.xml
index 4caae5b..3df13d5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -32,6 +32,7 @@
<spring.version>4.2.6.RELEASE</spring.version>
<httpcomponents.version>4.4.1</httpcomponents.version>
<checksum-maven-plugin.version>1.2</checksum-maven-plugin.version>
+ <docker-maven-plugin.version>0.23.0</docker-maven-plugin.version>
<skipTests>false</skipTests>
</properties>
<parent>