blob: d852846f7b9d9ee4de62033d78dad5a5dc7efb50 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gora.ignite.store;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import javax.sql.rowset.CachedRowSet;
import javax.sql.rowset.RowSetFactory;
import javax.sql.rowset.RowSetProvider;
import org.apache.avro.Schema;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.util.Utf8;
import org.apache.gora.ignite.query.IgniteQuery;
import org.apache.gora.ignite.query.IgniteResult;
import org.apache.gora.ignite.utils.IgniteBackendConstants;
import org.apache.gora.ignite.utils.IgniteSQLBuilder;
import org.apache.gora.persistency.Persistent;
import org.apache.gora.persistency.impl.PersistentBase;
import org.apache.gora.query.PartitionQuery;
import org.apache.gora.query.Query;
import org.apache.gora.query.Result;
import org.apache.gora.query.impl.PartitionQueryImpl;
import org.apache.gora.store.impl.DataStoreBase;
import org.apache.gora.util.AvroUtils;
import org.apache.gora.util.GoraException;
import org.apache.gora.util.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Implementation of a Ignite data store to be used by gora.
*
* @param <K> class to be used for the key
* @param <T> class to be persisted within the store
*/
public class IgniteStore<K, T extends PersistentBase> extends DataStoreBase<K, T> {
private static final Logger LOG = LoggerFactory.getLogger(IgniteStore.class);
private static final String PARSE_MAPPING_FILE_KEY = "gora.ignite.mapping.file";
private static final String DEFAULT_MAPPING_FILE = "gora-ignite-mapping.xml";
private static final String XML_MAPPING_DEFINITION = "gora.mapping";
private IgniteParameters igniteParameters;
private IgniteMapping igniteMapping;
private Connection connection;
private static final ConcurrentHashMap<Schema, SpecificDatumReader<?>> readerMap = new ConcurrentHashMap<>();
private static final ConcurrentHashMap<Schema, SpecificDatumWriter<?>> writerMap = new ConcurrentHashMap<>();
@Override
public void initialize(Class<K> keyClass, Class<T> persistentClass, Properties properties) throws GoraException {
try {
super.initialize(keyClass, persistentClass, properties);
IgniteMappingBuilder<K, T> builder = new IgniteMappingBuilder<K, T>(this);
InputStream mappingStream;
if (properties.containsKey(XML_MAPPING_DEFINITION)) {
if (LOG.isTraceEnabled()) {
LOG.trace("{} = {}", XML_MAPPING_DEFINITION, properties.getProperty(XML_MAPPING_DEFINITION));
}
mappingStream = org.apache.commons.io.IOUtils.toInputStream(properties.getProperty(XML_MAPPING_DEFINITION), (Charset) null);
} else {
mappingStream = getClass().getClassLoader().getResourceAsStream(getConf().get(PARSE_MAPPING_FILE_KEY, DEFAULT_MAPPING_FILE));
}
builder.readMappingFile(mappingStream);
igniteMapping = builder.getIgniteMapping();
igniteParameters = IgniteParameters.load(properties);
connection = acquireConnection(this.igniteParameters);
LOG.info("Ignite store was successfully initialized");
if (!schemaExists()) {
createSchema();
}
} catch (ClassNotFoundException | SQLException ex) {
LOG.error("Error while initializing Ignite store", ex);
throw new GoraException(ex);
}
}
public static Connection acquireConnection(IgniteParameters igniteParameters) throws ClassNotFoundException, SQLException {
Class.forName(IgniteBackendConstants.DRIVER_NAME);
StringBuilder urlBuilder = new StringBuilder();
urlBuilder.append(IgniteBackendConstants.JDBC_PREFIX);
urlBuilder.append(igniteParameters.getHost());
if (igniteParameters.getPort() != null) {
urlBuilder.append(":" + igniteParameters.getPort());
}
if (igniteParameters.getSchema() != null) {
urlBuilder.append("/" + igniteParameters.getSchema());
}
if (igniteParameters.getUser() != null) {
urlBuilder.append(";" + igniteParameters.getUser());
}
if (igniteParameters.getPassword() != null) {
urlBuilder.append(";" + igniteParameters.getPassword());
}
if (igniteParameters.getAdditionalConfigurations() != null) {
urlBuilder.append(igniteParameters.getAdditionalConfigurations());
}
return DriverManager.getConnection(urlBuilder.toString());
}
@Override
public String getSchemaName() {
return igniteMapping.getTableName();
}
@Override
public String getSchemaName(final String mappingSchemaName,
final Class<?> persistentClass) {
return super.getSchemaName(mappingSchemaName, persistentClass);
}
@Override
public void createSchema() throws GoraException {
if (connection == null) {
throw new GoraException(
"Impossible to create the schema as no connection has been initiated.");
}
if (schemaExists()) {
return;
}
try (Statement stmt = connection.createStatement()) {
String createTableSQL = IgniteSQLBuilder.createTable(igniteMapping);
stmt.executeUpdate(createTableSQL);
LOG.info("Table {} has been created for Ignite instance.",
igniteMapping.getTableName());
} catch (SQLException ex) {
throw new GoraException(ex);
}
}
@Override
public void deleteSchema() throws GoraException {
if (connection == null) {
throw new GoraException(
"Impossible to delete the schema as no connection has been initiated.");
}
try (Statement stmt = connection.createStatement()) {
String dropTableSQL = IgniteSQLBuilder.dropTable(igniteMapping.getTableName());
stmt.executeUpdate(dropTableSQL);
LOG.info("Table {} has been dropped from Ignite instance.",
igniteMapping.getTableName());
} catch (SQLException ex) {
throw new GoraException(ex);
}
}
@Override
public boolean schemaExists() throws GoraException {
try (Statement stmt = connection.createStatement()) {
String tableExistsSQL = IgniteSQLBuilder.tableExists(igniteMapping.getTableName());
ResultSet executeQuery = stmt.executeQuery(tableExistsSQL);
executeQuery.close();
return true;
} catch (SQLException ex) {
if (ex.getSQLState() != null
&& ex.getSQLState().equals(IgniteBackendConstants.DEFAULT_IGNITE_TABLE_NOT_EXISTS_CODE)) {
return false;
} else {
throw new GoraException(ex);
}
}
}
@Override
public boolean exists(K key) throws GoraException {
Object[] keyl = null;
if (igniteMapping.getPrimaryKey().size() == 1) {
keyl = new Object[]{key};
} else {
//Composite key pending
}
String selectQuery = IgniteSQLBuilder.createSelectQueryExists(igniteMapping);
try (PreparedStatement stmt = connection.prepareStatement(selectQuery)) {
IgniteSQLBuilder.fillSelectQuery(stmt, igniteMapping, keyl);
ResultSet rs = stmt.executeQuery();
rs.next();
int resp = rs.getInt(1);
rs.close();
return resp != 0;
} catch (SQLException ex) {
throw new GoraException(ex);
}
}
@Override
public T get(K key, String[] fields) throws GoraException {
String[] avFields = getFieldsToQuery(fields);
Object[] keyl = null;
if (igniteMapping.getPrimaryKey().size() == 1) {
keyl = new Object[]{key};
} else {
//Composite key pending
}
//Avro fields to Ignite fields
List<String> dbFields = new ArrayList<>();
for (String af : avFields) {
dbFields.add(igniteMapping.getFields().get(af).getName());
}
String selectQuery = IgniteSQLBuilder.createSelectQueryGet(igniteMapping, dbFields);
try (PreparedStatement stmt = connection.prepareStatement(selectQuery)) {
IgniteSQLBuilder.fillSelectQuery(stmt, igniteMapping, keyl);
ResultSet rs = stmt.executeQuery();
boolean data = rs.next();
T resp = null;
if (data) {
resp = newInstance(rs, fields);
if (rs.next()) {
LOG.warn("Multiple results for primary key {} in the schema {}, ignoring additional rows.", keyl, igniteMapping.getTableName());
}
}
rs.close();
return resp;
} catch (SQLException | IOException ex) {
throw new GoraException(ex);
}
}
public T newInstance(ResultSet resultset, String[] fields) throws GoraException, SQLException, IOException {
fields = getFieldsToQuery(fields);
T persistent = newPersistent();
for (String f : fields) {
Schema.Field field = fieldMap.get(f);
Schema fieldSchema = field.schema();
String dbField = igniteMapping.getFields().get(f).getName();
Object fieldValue = resultset.getObject(dbField);
if (fieldValue == null) {
continue;
}
Object v = deserializeFieldValue(field, fieldSchema, fieldValue, persistent);
persistent.put(field.pos(), v);
persistent.setDirty(field.pos());
}
return persistent;
}
@SuppressWarnings("unchecked")
private Object deserializeFieldValue(Schema.Field field, Schema fieldSchema,
Object igniteValue, T persistent) throws IOException {
Object fieldValue = null;
switch (fieldSchema.getType()) {
case MAP:
case ARRAY:
case RECORD:
@SuppressWarnings("rawtypes") SpecificDatumReader reader = getDatumReader(fieldSchema);
fieldValue = IOUtils.deserialize((byte[]) igniteValue, reader,
persistent.get(field.pos()));
break;
case ENUM:
fieldValue = AvroUtils.getEnumValue(fieldSchema, igniteValue.toString());
break;
case FIXED:
break;
case BYTES:
fieldValue = ByteBuffer.wrap((byte[]) igniteValue);
break;
case STRING:
fieldValue = new Utf8(igniteValue.toString());
break;
case UNION:
if (fieldSchema.getTypes().size() == 2 && isNullable(fieldSchema)) {
int schemaPos = getUnionSchema(igniteValue, fieldSchema);
Schema unionSchema = fieldSchema.getTypes().get(schemaPos);
fieldValue = deserializeFieldValue(field, unionSchema, igniteValue, persistent);
} else {
reader = getDatumReader(fieldSchema);
fieldValue = IOUtils.deserialize((byte[]) igniteValue, reader,
persistent.get(field.pos()));
}
break;
default:
fieldValue = igniteValue;
}
return fieldValue;
}
@Override
public void put(K key, T obj) throws GoraException {
try {
if (obj.isDirty()) {
Schema schema = obj.getSchema();
List<Schema.Field> fields = schema.getFields();
Map<Column, Object> data = new HashMap<>();
if (igniteMapping.getPrimaryKey().size() == 1) {
Column getKey = igniteMapping.getPrimaryKey().get(0);
data.put(getKey, key);
} else {
//Composite keys pending..
}
for (Schema.Field field : fields) {
Column mappedColumn = igniteMapping.getFields().get(field.name());
Object fieldValue = obj.get(field.pos());
if (mappedColumn != null && fieldValue != null) {
Schema fieldSchema = field.schema();
Object serializedObj = serializeFieldValue(fieldSchema, fieldValue);
data.put(mappedColumn, serializedObj);
}
}
String baseInsertStatement = IgniteSQLBuilder.createInsertQuery(igniteMapping, data);
try (PreparedStatement stmt = connection.prepareStatement(baseInsertStatement)) {
IgniteSQLBuilder.fillInsertQuery(stmt, data);
stmt.executeUpdate();
} catch (SQLException ex) {
throw new GoraException(ex);
}
} else {
LOG.info("Ignored putting object {} in the store as it is neither "
+ "new, neither dirty.", new Object[]{obj});
}
} catch (Exception e) {
throw new GoraException(e);
}
}
@Override
public boolean delete(K key) throws GoraException {
String deleteQuery = null;
Object[] keyArray = null;
if (igniteMapping.getPrimaryKey().size() == 1) {
deleteQuery = IgniteSQLBuilder.createDeleteQuery(igniteMapping);
keyArray = new Object[]{key};
} else {
//Composite key pending
}
try (PreparedStatement stmt = connection.prepareStatement(deleteQuery)) {
IgniteSQLBuilder.fillDeleteQuery(stmt, igniteMapping, keyArray);
stmt.executeUpdate();
return true;
} catch (SQLException ex) {
throw new GoraException(ex);
}
}
@Override
public long deleteByQuery(Query<K, T> query) throws GoraException {
String deleteQuery;
if (query.getFields() != null && query.getFields().length < igniteMapping.getFields().size()) {
List<String> dbFields = new ArrayList<>();
for (String af : query.getFields()) {
dbFields.add(igniteMapping.getFields().get(af).getName());
}
deleteQuery = IgniteSQLBuilder.createDeleteQueryWithFields(igniteMapping, dbFields);
} else {
deleteQuery = IgniteSQLBuilder.createDeleteQueryMultipleRecords(igniteMapping);
}
String selectQueryWhere = IgniteSQLBuilder.createWhereClause(igniteMapping, query.getStartKey(), query.getEndKey(), query.getLimit());
try (PreparedStatement stmt = connection.prepareStatement(deleteQuery + selectQueryWhere)) {
IgniteSQLBuilder.fillWhereClause(stmt, query.getStartKey(), query.getEndKey());
stmt.executeUpdate();
return 0;
} catch (SQLException ex) {
throw new GoraException(ex);
}
}
@Override
public Result<K, T> execute(Query<K, T> query) throws GoraException {
String[] fields = getFieldsToQuery(query.getFields());
//Avro fields to Ignite fields
List<String> dbFields = new ArrayList<>();
for (String aField : fields) {
dbFields.add(igniteMapping.getFields().get(aField).getName());
}
String selectQuery = IgniteSQLBuilder.createSelectQuery(igniteMapping, dbFields);
String selectQueryWhere = IgniteSQLBuilder.createWhereClause(igniteMapping, query.getStartKey(), query.getEndKey(), query.getLimit());
try (PreparedStatement stmt = connection.prepareStatement(selectQuery + selectQueryWhere)) {
RowSetFactory factory = RowSetProvider.newFactory();
CachedRowSet rowset = factory.createCachedRowSet();
IgniteSQLBuilder.fillWhereClause(stmt, query.getStartKey(), query.getEndKey());
ResultSet executeQuery = stmt.executeQuery();
rowset.populate(executeQuery);
IgniteResult<K, T> igniteResult = new IgniteResult<>(this, query, rowset);
return igniteResult;
} catch (SQLException ex) {
throw new GoraException(ex);
}
}
@SuppressWarnings("unchecked")
public K extractKey(ResultSet r) throws SQLException {
assert igniteMapping.getPrimaryKey().size() == 1;
return (K) r.getObject(igniteMapping.getPrimaryKey().get(0).getName());
}
@Override
public Query<K, T> newQuery() {
IgniteQuery<K, T> query = new IgniteQuery<>(this);
query.setFields(getFieldsToQuery(null));
return query;
}
@Override
public List<PartitionQuery<K, T>> getPartitions(Query<K, T> query) throws IOException {
List<PartitionQuery<K, T>> partitions = new ArrayList<>();
PartitionQueryImpl<K, T> partitionQuery = new PartitionQueryImpl<>(
query);
partitionQuery.setConf(getConf());
partitions.add(partitionQuery);
return partitions;
}
@Override
public void flush() throws GoraException {
//Auto-commit mode by default
}
@Override
public void close() {
try {
connection.close();
LOG.info("Ignite datastore destroyed successfully.");
} catch (Exception ex) {
LOG.error(ex.getMessage(), ex);
}
}
@SuppressWarnings("unchecked")
private Object serializeFieldValue(Schema fieldSchema, Object fieldValue) {
Object output = fieldValue;
switch (fieldSchema.getType()) {
case ARRAY:
case MAP:
case RECORD:
byte[] data = null;
try {
@SuppressWarnings("rawtypes")
SpecificDatumWriter writer = getDatumWriter(fieldSchema);
data = IOUtils.serialize(writer, fieldValue);
} catch (IOException e) {
LOG.error(e.getMessage(), e);
}
output = data;
break;
case UNION:
if (fieldSchema.getTypes().size() == 2 && isNullable(fieldSchema)) {
int schemaPos = getUnionSchema(fieldValue, fieldSchema);
Schema unionSchema = fieldSchema.getTypes().get(schemaPos);
output = serializeFieldValue(unionSchema, fieldValue);
} else {
data = null;
try {
@SuppressWarnings("rawtypes")
SpecificDatumWriter writer = getDatumWriter(fieldSchema);
data = IOUtils.serialize(writer, fieldValue);
} catch (IOException e) {
LOG.error(e.getMessage(), e);
}
output = data;
}
break;
case FIXED:
break;
case ENUM:
case STRING:
output = fieldValue.toString();
break;
case BYTES:
output = ((ByteBuffer) fieldValue).array();
break;
case INT:
case LONG:
case FLOAT:
case DOUBLE:
case BOOLEAN:
output = fieldValue;
break;
case NULL:
break;
default:
throw new AssertionError(fieldSchema.getType().name());
}
return output;
}
private boolean isNullable(Schema unionSchema) {
for (Schema innerSchema : unionSchema.getTypes()) {
if (innerSchema.getType().equals(Schema.Type.NULL)) {
return true;
}
}
return false;
}
/**
* Method to retrieve the corresponding schema type index of a particular
* object having UNION schema. As UNION type can have one or more types and at
* a given instance, it holds an object of only one type of the defined types,
* this method is used to figure out the corresponding instance's schema type
* index.
*
* @param instanceValue value that the object holds
* @param unionSchema union schema containing all of the data types
* @return the unionSchemaPosition corresponding schema position
*/
private int getUnionSchema(Object instanceValue, Schema unionSchema) {
int unionSchemaPos = 0;
for (Schema currentSchema : unionSchema.getTypes()) {
Schema.Type schemaType = currentSchema.getType();
if (instanceValue instanceof CharSequence && schemaType.equals(Schema.Type.STRING)) {
return unionSchemaPos;
}
if (instanceValue instanceof ByteBuffer && schemaType.equals(Schema.Type.BYTES)) {
return unionSchemaPos;
}
if (instanceValue instanceof byte[] && schemaType.equals(Schema.Type.BYTES)) {
return unionSchemaPos;
}
if (instanceValue instanceof Integer && schemaType.equals(Schema.Type.INT)) {
return unionSchemaPos;
}
if (instanceValue instanceof Long && schemaType.equals(Schema.Type.LONG)) {
return unionSchemaPos;
}
if (instanceValue instanceof Double && schemaType.equals(Schema.Type.DOUBLE)) {
return unionSchemaPos;
}
if (instanceValue instanceof Float && schemaType.equals(Schema.Type.FLOAT)) {
return unionSchemaPos;
}
if (instanceValue instanceof Boolean && schemaType.equals(Schema.Type.BOOLEAN)) {
return unionSchemaPos;
}
if (instanceValue instanceof Map && schemaType.equals(Schema.Type.MAP)) {
return unionSchemaPos;
}
if (instanceValue instanceof List && schemaType.equals(Schema.Type.ARRAY)) {
return unionSchemaPos;
}
if (instanceValue instanceof Persistent && schemaType.equals(Schema.Type.RECORD)) {
return unionSchemaPos;
}
if (instanceValue instanceof byte[] && schemaType.equals(Schema.Type.MAP)) {
return unionSchemaPos;
}
if (instanceValue instanceof byte[] && schemaType.equals(Schema.Type.RECORD)) {
return unionSchemaPos;
}
if (instanceValue instanceof byte[] && schemaType.equals(Schema.Type.ARRAY)) {
return unionSchemaPos;
}
unionSchemaPos++;
}
return 0;
}
@SuppressWarnings("rawtypes")
private SpecificDatumReader getDatumReader(Schema fieldSchema) {
SpecificDatumReader<?> reader = readerMap.get(fieldSchema);
if (reader == null) {
reader = new SpecificDatumReader(fieldSchema);// ignore dirty bits
SpecificDatumReader localReader = null;
if ((localReader = readerMap.putIfAbsent(fieldSchema, reader)) != null) {
reader = localReader;
}
}
return reader;
}
@SuppressWarnings("rawtypes")
private SpecificDatumWriter getDatumWriter(Schema fieldSchema) {
SpecificDatumWriter writer = writerMap.get(fieldSchema);
if (writer == null) {
writer = new SpecificDatumWriter(fieldSchema);// ignore dirty bits
writerMap.put(fieldSchema, writer);
}
return writer;
}
}