blob: 14334e571a511f0f5491ded47553db6866e344e4 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gora.redis.store;
import java.io.IOException;
import java.io.InputStream;
import java.lang.invoke.MethodHandles;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.commons.io.IOUtils;
import org.apache.gora.persistency.impl.PersistentBase;
import org.apache.gora.query.PartitionQuery;
import org.apache.gora.query.Query;
import org.apache.gora.query.Result;
import org.apache.gora.query.impl.PartitionQueryImpl;
import org.apache.gora.redis.query.RedisQuery;
import org.apache.gora.redis.query.RedisResult;
import org.apache.gora.redis.util.DatumHandler;
import static org.apache.gora.redis.util.RedisStoreConstants.PREFIX;
import static org.apache.gora.redis.util.RedisStoreConstants.END_TAG;
import static org.apache.gora.redis.util.RedisStoreConstants.FIELD_SEPARATOR;
import static org.apache.gora.redis.util.RedisStoreConstants.GORA_REDIS_ADDRESS;
import static org.apache.gora.redis.util.RedisStoreConstants.GORA_REDIS_MASTERNAME;
import static org.apache.gora.redis.util.RedisStoreConstants.GORA_REDIS_MODE;
import static org.apache.gora.redis.util.RedisStoreConstants.GORA_REDIS_READMODE;
import static org.apache.gora.redis.util.RedisStoreConstants.GORA_REDIS_STORAGE;
import static org.apache.gora.redis.util.RedisStoreConstants.INDEX;
import static org.apache.gora.redis.util.RedisStoreConstants.START_TAG;
import static org.apache.gora.redis.util.RedisStoreConstants.WILDCARD;
import org.apache.gora.redis.util.ServerMode;
import org.apache.gora.redis.util.StorageMode;
import org.apache.gora.store.impl.DataStoreBase;
import org.apache.gora.util.GoraException;
import org.redisson.Redisson;
import org.redisson.api.RBatch;
import org.redisson.api.RBucket;
import org.redisson.api.RBucketAsync;
import org.redisson.api.RFuture;
import org.redisson.api.RLexSortedSet;
import org.redisson.api.RLexSortedSetAsync;
import org.redisson.api.RList;
import org.redisson.api.RListAsync;
import org.redisson.api.RMap;
import org.redisson.api.RMapAsync;
import org.redisson.api.RScoredSortedSet;
import org.redisson.api.RScoredSortedSetAsync;
import org.redisson.api.RedissonClient;
import org.redisson.client.protocol.ScoredEntry;
import org.redisson.config.Config;
import org.redisson.config.ReadMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Implementation of a Redis data store to be used by gora.
*
* @param <K> class to be used for the key
* @param <T> class to be persisted within the store
*/
public class RedisStore<K, T extends PersistentBase> extends DataStoreBase<K, T> {
protected static final String PARSE_MAPPING_FILE_KEY = "gora.redis.mapping.file";
protected static final String DEFAULT_MAPPING_FILE = "gora-redis-mapping.xml";
protected static final String XML_MAPPING_DEFINITION = "gora.mapping";
private RedissonClient redisInstance;
private RedisMapping mapping;
public static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final DatumHandler handler = new DatumHandler();
private StorageMode mode;
/**
* Initialize the data store by reading the credentials, setting the client's
* properties up and reading the mapping file. Initialize is called when then
* the call to {@link org.apache.gora.store.DataStoreFactory#createDataStore}
* is made.
*
* @param keyClass Gora's key class
* @param persistentClass Persistent class
* @param properties Configurations for the data store
* @throws org.apache.gora.util.GoraException Unexpected exception during initialization
*/
@Override
public void initialize(Class<K> keyClass, Class<T> persistentClass, Properties properties) throws GoraException {
try {
super.initialize(keyClass, persistentClass, properties);
InputStream mappingStream;
if (properties.containsKey(XML_MAPPING_DEFINITION)) {
if (LOG.isTraceEnabled()) {
LOG.trace("{} = {}", XML_MAPPING_DEFINITION, properties.getProperty(XML_MAPPING_DEFINITION));
}
mappingStream = IOUtils.toInputStream(properties.getProperty(XML_MAPPING_DEFINITION), (Charset) null);
} else {
mappingStream = getClass().getClassLoader().getResourceAsStream(getConf().get(PARSE_MAPPING_FILE_KEY, DEFAULT_MAPPING_FILE));
}
RedisMappingBuilder mappingBuilder = new RedisMappingBuilder(this);
mapping = mappingBuilder.readMapping(mappingStream);
Config config = new Config();
String storage = getConf().get(GORA_REDIS_STORAGE, properties.getProperty(GORA_REDIS_STORAGE));
mode = StorageMode.valueOf(storage);
String modeString = getConf().get(GORA_REDIS_MODE, properties.getProperty(GORA_REDIS_MODE));
ServerMode connectionMode = ServerMode.valueOf(modeString);
String name = getConf().get(GORA_REDIS_MASTERNAME, properties.getProperty(GORA_REDIS_MASTERNAME));
String readm = getConf().get(GORA_REDIS_READMODE, properties.getProperty(GORA_REDIS_READMODE));
//Override address in tests
String[] hosts = getConf().get(GORA_REDIS_ADDRESS, properties.getProperty(GORA_REDIS_ADDRESS)).split(",");
for (int indexHosts = 0; indexHosts < hosts.length; indexHosts++) {
hosts[indexHosts] = PREFIX + hosts[indexHosts];
}
switch (connectionMode) {
case SINGLE:
config.useSingleServer()
.setAddress(hosts[0])
.setDatabase(mapping.getDatabase());
break;
case CLUSTER:
config.useClusterServers()
.addNodeAddress(hosts);
break;
case REPLICATED:
config.useReplicatedServers()
.addNodeAddress(hosts)
.setDatabase(mapping.getDatabase());
break;
case SENTINEL:
config.useSentinelServers()
.setMasterName(name)
.setReadMode(ReadMode.valueOf(readm))
.addSentinelAddress(hosts);
break;
default:
throw new AssertionError(connectionMode.name());
}
redisInstance = Redisson.create(config);
if (autoCreateSchema && !schemaExists()) {
createSchema();
}
} catch (IOException ex) {
throw new GoraException(ex);
}
}
/**
* Redis, being a schemaless database does not support explicit schema
* creation. When the records are added to the database, the schema is created
* on the fly. Thus, schema operations are unavailable in gora-redis module.
*
* @return null
*/
@Override
public String getSchemaName() {
return null;
}
/**
* Redis, being a schemaless database does not support explicit schema
* creation. When the records are added to the database, the schema is created
* on the fly. Thus, schema operations are unavailable in gora-redis module.
*
* @throws org.apache.gora.util.GoraException Unexpected exception.
*/
@Override
public void createSchema() throws GoraException {
}
@Override
public void deleteSchema() throws GoraException {
redisInstance.getKeys().deleteByPattern(mapping.getPrefix() + FIELD_SEPARATOR + WILDCARD);
}
/**
* Redis, being a schemaless database does not support explicit schema
* creation. When the records are added to the database, the schema is created
* on the fly. Thus, schema operations are unavailable in gora-redis module.
*
* @return true
* @throws org.apache.gora.util.GoraException Unexpected exception.
*/
@Override
public boolean schemaExists() throws GoraException {
return true;
}
private String generateKeyHash(K baseKey) {
return mapping.getPrefix() + FIELD_SEPARATOR + START_TAG + baseKey + END_TAG;
}
private String generateKeyString(String field, K baseKey) {
return generateKeyStringBase(baseKey) + field;
}
private String generateKeyStringBase(K baseKey) {
return mapping.getPrefix() + FIELD_SEPARATOR + START_TAG + baseKey + END_TAG + FIELD_SEPARATOR;
}
private String generateIndexKey() {
return mapping.getPrefix() + FIELD_SEPARATOR + INDEX;
}
public T newInstanceFromString(K key, String[] fields) throws GoraException, IOException {
fields = getFieldsToQuery(fields);
T persistent = newPersistent();
int countRetrieved = 0;
for (String f : fields) {
Schema.Field field = fieldMap.get(f);
String redisField = mapping.getFields().get(field.name());
RedisType redisType = mapping.getTypes().get(field.name());
Object redisVal = null;
switch (redisType) {
case STRING:
RBucket<Object> bucket = redisInstance.getBucket(generateKeyString(redisField, key));
redisVal = bucket.isExists() ? handler.deserializeFieldValue(field, field.schema(), bucket.get(), persistent) : null;
break;
case LIST:
RList<Object> list = redisInstance.getList(generateKeyString(redisField, key));
redisVal = list.isExists() ? handler.deserializeFieldList(field, field.schema(), list, persistent) : null;
break;
case HASH:
RMap<Object, Object> map = redisInstance.getMap(generateKeyString(redisField, key));
redisVal = map.isExists() ? handler.deserializeFieldMap(field, field.schema(), map, persistent) : null;
break;
default:
throw new AssertionError(redisType.name());
}
if (redisVal == null) {
continue;
}
countRetrieved++;
persistent.put(field.pos(), redisVal);
persistent.setDirty(field.pos());
}
return countRetrieved > 0 ? persistent : null;
}
public T newInstanceFromHash(RMap<String, Object> map, String[] fields) throws GoraException, IOException {
fields = getFieldsToQuery(fields);
T persistent = newPersistent();
for (String fieldName : fields) {
Schema.Field field = fieldMap.get(fieldName);
Object fValue = map.get(mapping.getFields().get(field.name()));
if (fValue == null) {
continue;
}
Object fieldValue = handler.deserializeFieldValue(field, field.schema(), fValue, persistent);
persistent.put(field.pos(), fieldValue);
persistent.setDirty(field.pos());
}
return persistent;
}
@Override
public T get(K key, String[] fields) throws GoraException {
try {
if (mode == StorageMode.SINGLEKEY) {
RMap<String, Object> map = redisInstance.getMap(generateKeyHash(key));
if (!map.isEmpty()) {
return newInstanceFromHash(map, fields);
} else {
return null;
}
} else {
return newInstanceFromString(key, fields);
}
} catch (IOException ex) {
throw new GoraException(ex);
}
}
@Override
public void put(K key, T obj) throws GoraException {
try {
if (obj.isDirty()) {
Schema objectSchema = obj.getSchema();
List<Schema.Field> fields = objectSchema.getFields();
RBatch batchInstance = redisInstance.createBatch();
//update secundary index
if (isNumericKey()) {
RScoredSortedSetAsync<Object> secundaryIndex = batchInstance.getScoredSortedSet(generateIndexKey());
secundaryIndex.addAsync(obtainDoubleValue(key), key);
} else {
RLexSortedSetAsync secundaryIndex = batchInstance.getLexSortedSet(generateIndexKey());
secundaryIndex.addAsync(key.toString());
}
if (mode == StorageMode.SINGLEKEY) {
RMapAsync<Object, Object> map = batchInstance.getMap(generateKeyHash(key));
fields.forEach((field) -> {
Object fieldValue = handler.serializeFieldValue(field.schema(), obj.get(field.pos()));
if (fieldValue != null) {
map.fastPutAsync(mapping.getFields().get(field.name()), fieldValue);
} else {
map.fastRemoveAsync(mapping.getFields().get(field.name()));
}
});
} else {
for (Schema.Field field : fields) {
Object fieldValue = obj.get(field.pos());
String redisField = mapping.getFields().get(field.name());
RedisType redisType = mapping.getTypes().get(field.name());
switch (redisType) {
case STRING:
RBucketAsync<Object> bucket = batchInstance.getBucket(generateKeyString(redisField, key));
bucket.deleteAsync();
if (fieldValue != null) {
fieldValue = handler.serializeFieldValue(field.schema(), fieldValue);
bucket.setAsync(fieldValue);
}
break;
case LIST:
RListAsync<Object> rlist = batchInstance.getList(generateKeyString(redisField, key));
rlist.deleteAsync();
if (fieldValue != null) {
List<Object> list = handler.serializeFieldList(field.schema(), fieldValue);
rlist.addAllAsync(list);
}
break;
case HASH:
RMapAsync<Object, Object> map = batchInstance.getMap(generateKeyString(redisField, key));
map.deleteAsync();
if (fieldValue != null) {
Map<Object, Object> mp = handler.serializeFieldMap(field.schema(), fieldValue);
map.putAllAsync(mp);
}
break;
default:
throw new AssertionError(redisType.name());
}
}
}
batchInstance.execute();
} else {
LOG.info("Ignored putting object {} in the store as it is neither "
+ "new, neither dirty.", new Object[]{obj});
}
} catch (Exception e) {
throw new GoraException(e);
}
}
@Override
public boolean delete(K key) throws GoraException {
try {
RBatch batchInstance = redisInstance.createBatch();
//update secundary index
if (isNumericKey()) {
RScoredSortedSetAsync<Object> secundaryIndex = batchInstance.getScoredSortedSet(generateIndexKey());
secundaryIndex.removeAsync(key);
} else {
RLexSortedSetAsync secundaryIndex = batchInstance.getLexSortedSet(generateIndexKey());
secundaryIndex.removeAsync(key.toString());
}
if (mode == StorageMode.SINGLEKEY) {
RMapAsync<Object, Object> map = batchInstance.getMap(generateKeyHash(key));
RFuture<Boolean> deleteAsync = map.deleteAsync();
batchInstance.execute();
return deleteAsync.get();
} else {
batchInstance.execute();
return redisInstance.getKeys().deleteByPattern(generateKeyStringBase(key) + WILDCARD) > 0;
}
} catch (Exception ex) {
throw new GoraException(ex);
}
}
@Override
public long deleteByQuery(Query<K, T> query) throws GoraException {
Collection<K> range = runQuery(query);
RBatch batchInstance = redisInstance.createBatch();
RLexSortedSetAsync secundaryIndex = batchInstance.getLexSortedSet(generateIndexKey());
if (query.getFields() != null && query.getFields().length < mapping.getFields().size()) {
List<String> dbFields = new ArrayList<>();
List<RedisType> dbTypes = new ArrayList<>();
for (String af : query.getFields()) {
dbFields.add(mapping.getFields().get(af));
dbTypes.add(mapping.getTypes().get(af));
}
for (K key : range) {
if (mode == StorageMode.SINGLEKEY) {
RMapAsync<Object, Object> map = batchInstance.getMap(generateKeyHash(key));
dbFields.forEach((field) -> {
map.removeAsync(field);
});
} else {
for (int indexField = 0; indexField < dbFields.size(); indexField++) {
String field = dbFields.get(indexField);
RedisType type = dbTypes.get(indexField);
switch (type) {
case STRING:
RBucketAsync<Object> bucket = batchInstance.getBucket(generateKeyString(field, key));
bucket.deleteAsync();
break;
case LIST:
RListAsync<Object> rlist = batchInstance.getList(generateKeyString(field, key));
rlist.deleteAsync();
break;
case HASH:
RMapAsync<Object, Object> map = batchInstance.getMap(generateKeyString(field, key));
map.deleteAsync();
break;
default:
throw new AssertionError(type.name());
}
}
}
}
} else {
range.stream().map((key) -> {
secundaryIndex.removeAsync(key);
return key;
}).forEachOrdered((key) -> {
if (mode == StorageMode.SINGLEKEY) {
RMapAsync<Object, Object> map = batchInstance.getMap(generateKeyHash(key));
map.deleteAsync();
} else {
redisInstance.getKeys().deleteByPattern(generateKeyStringBase(key) + WILDCARD);
}
});
}
batchInstance.execute();
return range.size();
}
/**
* Execute the query and return the result.
*
* @param query Query sent to Redis
* @return Query result
* @throws org.apache.gora.util.GoraException Unexpected exception in querying process.
*/
@Override
public Result<K, T> execute(Query<K, T> query) throws GoraException {
return new RedisResult<>(this, query, runQuery(query));
}
@Override
public Query<K, T> newQuery() {
RedisQuery<K, T> query = new RedisQuery<>(this);
query.setFields(getFieldsToQuery(null));
return query;
}
@Override
public List<PartitionQuery<K, T>> getPartitions(Query<K, T> query) throws GoraException {
List<PartitionQuery<K, T>> partitions = new ArrayList<>();
PartitionQueryImpl<K, T> partitionQuery = new PartitionQueryImpl<>(
query);
partitionQuery.setConf(getConf());
partitions.add(partitionQuery);
return partitions;
}
@Override
public void flush() throws GoraException {
}
@Override
public void close() {
redisInstance.shutdown();
}
@Override
public boolean exists(K key) throws GoraException {
if (mode == StorageMode.SINGLEKEY) {
return redisInstance.getKeys().countExists(generateKeyHash(key)) != 0;
} else {
Iterator<String> respKeys = redisInstance.getKeys().getKeysByPattern(generateKeyStringBase(key) + WILDCARD, 1).iterator();
return respKeys.hasNext();
}
}
private Collection<K> runQuery(Query<K, T> query) {
Collection<K> range;
if (isNumericKey()) {
RScoredSortedSet<Object> index = redisInstance.getScoredSortedSet(generateIndexKey());
Collection<ScoredEntry<Object>> rangeResponse;
int limit = query.getLimit() > -1 ? (int) query.getLimit() : Integer.MAX_VALUE;
if (query.getStartKey() != null && query.getEndKey() != null) {
rangeResponse = index.entryRange(obtainDoubleValue(query.getStartKey()), true, obtainDoubleValue(query.getEndKey()), true, 0, limit);
} else if (query.getStartKey() != null && query.getEndKey() == null) {
rangeResponse = index.entryRange(obtainDoubleValue(query.getStartKey()), true, Double.MAX_VALUE, true, 0, limit);
} else if (query.getStartKey() == null && query.getEndKey() != null) {
rangeResponse = index.entryRange(Double.MIN_VALUE, true, obtainDoubleValue(query.getEndKey()), true, 0, limit);
} else {
rangeResponse = index.entryRange(Double.MIN_VALUE, true, Double.MAX_VALUE, true, 0, limit);
}
range = new ArrayList<>();
for (ScoredEntry<Object> indexVal : rangeResponse) {
range.add((K) indexVal.getValue());
}
} else {
RLexSortedSet index = redisInstance.getLexSortedSet(generateIndexKey());
Collection<String> rangeResponse;
int limit = query.getLimit() > -1 ? (int) query.getLimit() : Integer.MAX_VALUE;
if (query.getStartKey() != null && query.getEndKey() != null) {
rangeResponse = index.range(query.getStartKey().toString(), true, query.getEndKey().toString(), true, 0, limit);
} else if (query.getStartKey() != null && query.getEndKey() == null) {
rangeResponse = index.rangeTail(query.getStartKey().toString(), true, 0, limit);
} else if (query.getStartKey() == null && query.getEndKey() != null) {
rangeResponse = index.rangeHead(query.getEndKey().toString(), true, 0, limit);
} else {
rangeResponse = index.stream().limit(limit).collect(Collectors.toList());
}
range = new ArrayList<>();
for (String indexVal : rangeResponse) {
range.add((K) indexVal);
}
}
return range;
}
private boolean isNumericKey() {
return Number.class.isAssignableFrom(keyClass);
}
private double obtainDoubleValue(K key) {
return Double.parseDouble(key.toString());
}
}