blob: 00d0b5a32de764b43a8e1f1729273bb8066c12da [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.metamodel.neo4j;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.http.HttpHost;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.metamodel.DataContext;
import org.apache.metamodel.MetaModelException;
import org.apache.metamodel.QueryPostprocessDataContext;
import org.apache.metamodel.data.DataSet;
import org.apache.metamodel.data.DocumentSource;
import org.apache.metamodel.query.FilterItem;
import org.apache.metamodel.query.SelectItem;
import org.apache.metamodel.schema.Column;
import org.apache.metamodel.schema.MutableSchema;
import org.apache.metamodel.schema.MutableTable;
import org.apache.metamodel.schema.Schema;
import org.apache.metamodel.schema.Table;
import org.apache.metamodel.schema.builder.DocumentSourceProvider;
import org.apache.metamodel.util.SimpleTableDef;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* DataContext implementation for Neo4j
*/
public class Neo4jDataContext extends QueryPostprocessDataContext implements DataContext, DocumentSourceProvider {
public static final Logger logger = LoggerFactory.getLogger(Neo4jDataContext.class);
public static final String SCHEMA_NAME = "neo4j";
public static final int DEFAULT_PORT = 7474;
public static final String RELATIONSHIP_PREFIX = "rel_";
public static final String RELATIONSHIP_COLUMN_SEPARATOR = "#";
private final SimpleTableDef[] _tableDefs;
private final Neo4jRequestWrapper _requestWrapper;
private final HttpHost _httpHost;
private String _serviceRoot = "/db/data";
public Neo4jDataContext(String hostname, int port, String username, String password, SimpleTableDef... tableDefs) {
super(false);
_httpHost = new HttpHost(hostname, port);
final CloseableHttpClient httpClient = HttpClientBuilder.create().build();
_requestWrapper = new Neo4jRequestWrapper(httpClient, _httpHost, username, password, _serviceRoot);
_tableDefs = tableDefs;
}
public Neo4jDataContext(String hostname, int port, String username, String password, String serviceRoot,
SimpleTableDef... tableDefs) {
super(false);
_httpHost = new HttpHost(hostname, port);
final CloseableHttpClient httpClient = HttpClientBuilder.create().build();
_requestWrapper = new Neo4jRequestWrapper(httpClient, _httpHost, username, password, _serviceRoot);
_tableDefs = tableDefs;
_serviceRoot = serviceRoot;
}
public Neo4jDataContext(String hostname, int port, String username, String password) {
super(false);
_httpHost = new HttpHost(hostname, port);
final CloseableHttpClient httpClient = HttpClientBuilder.create().build();
_requestWrapper = new Neo4jRequestWrapper(httpClient, _httpHost, username, password, _serviceRoot);
_tableDefs = detectTableDefs();
}
public Neo4jDataContext(String hostname, int port, String username, String password, String serviceRoot) {
super(false);
_httpHost = new HttpHost(hostname, port);
final CloseableHttpClient httpClient = HttpClientBuilder.create().build();
_requestWrapper = new Neo4jRequestWrapper(httpClient, _httpHost, username, password, _serviceRoot);
_tableDefs = detectTableDefs();
_serviceRoot = serviceRoot;
}
public Neo4jDataContext(String hostname, int port, CloseableHttpClient httpClient) {
super(false);
_httpHost = new HttpHost(hostname, port);
_requestWrapper = new Neo4jRequestWrapper(httpClient, _httpHost, _serviceRoot);
_tableDefs = detectTableDefs();
}
public Neo4jDataContext(String hostname, int port, CloseableHttpClient httpClient, String serviceRoot) {
super(false);
_httpHost = new HttpHost(hostname, port);
_requestWrapper = new Neo4jRequestWrapper(httpClient, _httpHost, _serviceRoot);
_tableDefs = detectTableDefs();
_serviceRoot = serviceRoot;
}
public Neo4jDataContext(String hostname, int port, CloseableHttpClient httpClient, SimpleTableDef... tableDefs) {
super(false);
_httpHost = new HttpHost(hostname, port);
_requestWrapper = new Neo4jRequestWrapper(httpClient, _httpHost, _serviceRoot);
_tableDefs = tableDefs;
}
public Neo4jDataContext(String hostname, int port, CloseableHttpClient httpClient, String serviceRoot,
SimpleTableDef... tableDefs) {
super(false);
_httpHost = new HttpHost(hostname, port);
_requestWrapper = new Neo4jRequestWrapper(httpClient, _httpHost, _serviceRoot);
_tableDefs = tableDefs;
_serviceRoot = serviceRoot;
}
@Override
protected String getDefaultSchemaName() throws MetaModelException {
return SCHEMA_NAME;
}
@Override
protected Schema getMainSchema() throws MetaModelException {
MutableSchema schema = new MutableSchema(getMainSchemaName());
for (SimpleTableDef tableDef : _tableDefs) {
MutableTable table = tableDef.toTable().setSchema(schema);
schema.addTable(table);
}
return schema;
}
@Override
protected String getMainSchemaName() throws MetaModelException {
return SCHEMA_NAME;
}
public SimpleTableDef[] detectTableDefs() {
List<SimpleTableDef> tableDefs = new ArrayList<SimpleTableDef>();
String labelsJsonString = _requestWrapper.executeRestRequest(new HttpGet(_serviceRoot + "/labels"));
JSONArray labelsJsonArray;
try {
labelsJsonArray = new JSONArray(labelsJsonString);
for (int i = 0; i < labelsJsonArray.length(); i++) {
String label = labelsJsonArray.getString(i);
List<JSONObject> nodesPerLabel = getAllNodesPerLabel(label);
List<String> propertiesPerLabel = new ArrayList<String>();
for (JSONObject node : nodesPerLabel) {
List<String> propertiesPerNode = getAllPropertiesPerNode(node);
for (String property : propertiesPerNode) {
if (!propertiesPerLabel.contains(property)) {
propertiesPerLabel.add(property);
}
}
}
Set<String> relationshipPropertiesPerLabel = new LinkedHashSet<String>();
for (JSONObject node : nodesPerLabel) {
Integer nodeId = (Integer) node.getJSONObject("metadata").get("id");
List<JSONObject> relationshipsPerNode = getOutgoingRelationshipsPerNode(nodeId);
for (JSONObject relationship : relationshipsPerNode) {
// Add the relationship as a column in the table
String relationshipName = relationship.getString("type");
String relationshipNameProperty = RELATIONSHIP_PREFIX + relationshipName;
if (!relationshipPropertiesPerLabel.contains(relationshipNameProperty)) {
relationshipPropertiesPerLabel.add(relationshipNameProperty);
}
// Add all the relationship properties as table columns
List<String> propertiesPerRelationship = getAllPropertiesPerRelationship(relationship);
relationshipPropertiesPerLabel.addAll(propertiesPerRelationship);
}
}
propertiesPerLabel.addAll(relationshipPropertiesPerLabel);
// Do not add a table if label has no nodes (empty tables are
// considered non-existent)
if (!nodesPerLabel.isEmpty()) {
SimpleTableDef tableDef = new SimpleTableDef(label,
propertiesPerLabel.toArray(new String[propertiesPerLabel.size()]));
tableDefs.add(tableDef);
}
}
return tableDefs.toArray(new SimpleTableDef[tableDefs.size()]);
} catch (JSONException e) {
logger.error("Error occured in parsing JSON while detecting the schema: ", e);
throw new IllegalStateException(e);
}
}
private List<String> getAllPropertiesPerRelationship(JSONObject relationship) {
List<String> propertyNames = new ArrayList<String>();
try {
String relationshipName = RELATIONSHIP_PREFIX + relationship.getJSONObject("metadata").getString("type");
JSONObject relationshipPropertiesJSONObject = relationship.getJSONObject("data");
if (relationshipPropertiesJSONObject.length() > 0) {
JSONArray relationshipPropertiesNamesJSONArray = relationshipPropertiesJSONObject.names();
for (int i = 0; i < relationshipPropertiesNamesJSONArray.length(); i++) {
String propertyName = relationshipName + RELATIONSHIP_COLUMN_SEPARATOR
+ relationshipPropertiesNamesJSONArray.getString(i);
if (!propertyNames.contains(propertyName)) {
propertyNames.add(propertyName);
}
}
}
return propertyNames;
} catch (JSONException e) {
logger.error("Error occured in parsing JSON while getting relationship properties: ", e);
throw new IllegalStateException(e);
}
}
private List<JSONObject> getOutgoingRelationshipsPerNode(Integer nodeId) {
List<JSONObject> outgoingRelationshipsPerNode = new ArrayList<JSONObject>();
String outgoingRelationshipsPerNodeJsonString = _requestWrapper.executeRestRequest(new HttpGet(_serviceRoot
+ "/node/" + nodeId + "/relationships/out"));
JSONArray outgoingRelationshipsPerNodeJsonArray;
try {
outgoingRelationshipsPerNodeJsonArray = new JSONArray(outgoingRelationshipsPerNodeJsonString);
for (int i = 0; i < outgoingRelationshipsPerNodeJsonArray.length(); i++) {
JSONObject relationship = outgoingRelationshipsPerNodeJsonArray.getJSONObject(i);
if (!outgoingRelationshipsPerNode.contains(relationship)) {
outgoingRelationshipsPerNode.add(relationship);
}
}
return outgoingRelationshipsPerNode;
} catch (JSONException e) {
logger.error("Error occured in parsing JSON while detecting outgoing relationships for node: " + nodeId, e);
throw new IllegalStateException(e);
}
}
private List<JSONObject> getAllNodesPerLabel(String label) {
List<JSONObject> allNodesPerLabel = new ArrayList<JSONObject>();
String allNodesForLabelJsonString = _requestWrapper.executeRestRequest(new HttpGet(_serviceRoot + "/label/"
+ label + "/nodes"));
JSONArray allNodesForLabelJsonArray;
try {
allNodesForLabelJsonArray = new JSONArray(allNodesForLabelJsonString);
for (int i = 0; i < allNodesForLabelJsonArray.length(); i++) {
JSONObject node = allNodesForLabelJsonArray.getJSONObject(i);
allNodesPerLabel.add(node);
}
return allNodesPerLabel;
} catch (JSONException e) {
logger.error("Error occured in parsing JSON while detecting the nodes for a label: " + label, e);
throw new IllegalStateException(e);
}
}
private List<String> getAllPropertiesPerNode(JSONObject node) {
List<String> properties = new ArrayList<String>();
properties.add("_id");
String propertiesEndpoint;
try {
propertiesEndpoint = node.getString("properties");
String allPropertiesPerNodeJsonString = _requestWrapper.executeRestRequest(new HttpGet(propertiesEndpoint));
JSONObject allPropertiesPerNodeJsonObject = new JSONObject(allPropertiesPerNodeJsonString);
for (int j = 0; j < allPropertiesPerNodeJsonObject.length(); j++) {
JSONArray propertiesJsonArray = allPropertiesPerNodeJsonObject.names();
for (int k = 0; k < propertiesJsonArray.length(); k++) {
String property = propertiesJsonArray.getString(k);
properties.add(property);
}
}
return properties;
} catch (JSONException e) {
logger.error("Error occured in parsing JSON while detecting the properties of a node: " + node, e);
throw new IllegalStateException(e);
}
}
@Override
protected DataSet materializeMainSchemaTable(Table table, List<Column> columns, int firstRow, int maxRows) {
if ((columns != null) && (columns.size() > 0)) {
Neo4jDataSet dataSet = null;
try {
String selectQuery = Neo4jCypherQueryBuilder.buildSelectQuery(table, columns, firstRow, maxRows);
String responseJSONString = _requestWrapper.executeCypherQuery(selectQuery);
JSONObject resultJSONObject = new JSONObject(responseJSONString);
final List<SelectItem> selectItems = columns.stream().map(SelectItem::new).collect(Collectors.toList());
dataSet = new Neo4jDataSet(selectItems, resultJSONObject);
} catch (JSONException e) {
logger.error("Error occured in parsing JSON while materializing the schema: ", e);
throw new IllegalStateException(e);
}
return dataSet;
} else {
logger.error("Encountered null or empty columns array for materializing main schema table.");
throw new IllegalArgumentException("Columns cannot be null or empty array");
}
}
@Override
protected DataSet materializeMainSchemaTable(Table table, List<Column> columns, int maxRows) {
return materializeMainSchemaTable(table, columns, 1, maxRows);
}
@Override
protected Number executeCountQuery(Table table, List<FilterItem> whereItems, boolean functionApproximationAllowed) {
String countQuery = Neo4jCypherQueryBuilder.buildCountQuery(table.getName(), whereItems);
String jsonResponse = _requestWrapper.executeCypherQuery(countQuery);
JSONObject jsonResponseObject;
try {
jsonResponseObject = new JSONObject(jsonResponse);
JSONArray resultsJSONArray = jsonResponseObject.getJSONArray("results");
JSONObject resultJSONObject = (JSONObject) resultsJSONArray.get(0);
JSONArray dataJSONArray = resultJSONObject.getJSONArray("data");
JSONObject rowJSONObject = (JSONObject) dataJSONArray.get(0);
JSONArray valueJSONArray = rowJSONObject.getJSONArray("row");
Number value = (Number) valueJSONArray.get(0);
return value;
} catch (JSONException e) {
logger.error("Error occured in parsing JSON response: ", e);
// Do not throw an exception here. Returning null here will make
// MetaModel attempt to count records manually and therefore recover
// from the error.
return null;
}
}
@Override
public DocumentSource getMixedDocumentSourceForSampling() {
return null;
}
@Override
public DocumentSource getDocumentSourceForTable(String sourceCollectionName) {
return null;
}
}