blob: 5973be8ce5fc7d61ff977fdc85e3b7781ccb69f6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gora.aerospike.store;
import com.aerospike.client.policy.GenerationPolicy;
import com.aerospike.client.policy.Policy;
import com.aerospike.client.policy.RecordExistsAction;
import com.aerospike.client.policy.WritePolicy;
import org.jdom.Document;
import org.jdom.Element;
import org.jdom.input.SAXBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.jdom.JDOMException;
import javax.naming.ConfigurationException;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.HashMap;
/**
* Class to represent the Aerospike mapping builder
*/
public class AerospikeMappingBuilder {
private static final Logger LOG = LoggerFactory.getLogger(AerospikeMappingBuilder.class);
private final AerospikeMapping aerospikeMapping;
public AerospikeMappingBuilder() {
this.aerospikeMapping = new AerospikeMapping();
}
public AerospikeMapping getAerospikeMapping() {
return this.aerospikeMapping;
}
/**
* Reads the gora aerospike mapping file
*
* @param mappingFile mapping file path
* @param keyClass key class
* @param persistentClass persistent class
*/
public void readMappingFile(String mappingFile, Class<?> keyClass, Class<?> persistentClass) {
try {
SAXBuilder saxBuilder = new SAXBuilder();
InputStream inputStream = getClass().getClassLoader().getResourceAsStream(mappingFile);
if (inputStream == null) {
LOG.error("Mapping file '{}' could not be found!", mappingFile);
throw new IOException("Mapping file '" + mappingFile + "' could not be found!");
}
Document document = saxBuilder.build(inputStream);
if (document == null) {
LOG.error("Mapping file '{}' could not be found!", mappingFile);
throw new IOException("Mapping file '" + mappingFile + "' could not be found!");
}
Element root = document.getRootElement();
List<Element> policyElements = root.getChildren("policy");
for (Element policyElement : policyElements) {
String policy = policyElement.getAttributeValue("name");
if (policy != null) {
if (policy.equals("write")) {
WritePolicy writePolicy = new WritePolicy();
if (policyElement.getAttributeValue("gen") != null) {
writePolicy.generationPolicy = getGenerationPolicyMapping(
policyElement.getAttributeValue("gen").toUpperCase(Locale.getDefault()));
}
if (policyElement.getAttributeValue("exists") != null) {
writePolicy.recordExistsAction = getRecordExistsAction(
policyElement.getAttributeValue("exists").toUpperCase(Locale.getDefault()));
}
if (policyElement.getAttributeValue("key") != null) {
writePolicy.sendKey = getKeyUsagePolicy(
policyElement.getAttributeValue("key").toUpperCase(Locale.getDefault()));
}
if (policyElement.getAttributeValue("timeout") != null) {
writePolicy.timeoutDelay = getTimeoutValue(policyElement.getAttributeValue("timeout"));
}
aerospikeMapping.setWritePolicy(writePolicy);
} else if (policy.equals("read")) {
Policy readPolicy = new Policy();
if (policyElement.getAttributeValue("key") != null) {
readPolicy.sendKey = getKeyUsagePolicy(
policyElement.getAttributeValue("key").toUpperCase(Locale.getDefault()));
}
if (policyElement.getAttributeValue("timeout") != null) {
readPolicy.timeoutDelay = getTimeoutValue(policyElement.getAttributeValue("timeout"));
}
aerospikeMapping.setReadPolicy(readPolicy);
}
}
}
List<Element> classElements = root.getChildren("class");
boolean persistentClassAndKeyClassMatches = false;
for (Element classElement : classElements) {
String mappingKeyClass = classElement.getAttributeValue("keyClass");
String mappingClassName = classElement.getAttributeValue("name");
if (mappingKeyClass != null && mappingClassName != null && mappingKeyClass
.equals(keyClass.getCanonicalName()) && mappingClassName
.equals(persistentClass.getCanonicalName())) {
persistentClassAndKeyClassMatches = true;
List<Element> fields = classElement.getChildren("field");
Map<String, String> binMapping = new HashMap<>();
for (Element field : fields) {
String fieldName = field.getAttributeValue("name");
String binName = field.getAttributeValue("bin");
if (fieldName != null && binName != null) {
binMapping.put(fieldName, binName);
}
}
aerospikeMapping.setBinMapping(binMapping);
String nameSpace = classElement.getAttributeValue("namespace");
if (nameSpace == null || nameSpace.isEmpty()) {
LOG.error("Gora-aerospike-mapping does not include the relevant namespace for the "
+ "{} class", mappingClassName);
throw new ConfigurationException(
"Gora-aerospike-mapping does not include the relevant namespace for the "
+ mappingClassName + "class");
}
aerospikeMapping.setNamespace(nameSpace);
String set = classElement.getAttributeValue("set");
if (set != null && !set.isEmpty()) {
aerospikeMapping.setSet(set);
}
}
}
if (!persistentClassAndKeyClassMatches) {
LOG.error("Gora-aerospike-mapping does not include the name and keyClass specified in the "
+ "databean");
throw new ConfigurationException(
"Gora-aerospike-mapping does not include the name and keyClass specified in the "
+ "databean");
}
} catch (IOException | JDOMException | ConfigurationException e) {
throw new RuntimeException(e);
}
LOG.info("Gora Aerospike mapping file is read successfully.");
}
/**
* Returns the corresponding generation policy from the user specified generation policy name
*
* @param genPolicy generation policy name
* @return corresponding generation policy
*/
private GenerationPolicy getGenerationPolicyMapping(String genPolicy) {
if (genPolicy == null)
return GenerationPolicy.NONE;
GenerationPolicy generationPolicy;
switch (genPolicy) {
case "IGNORE":
generationPolicy = GenerationPolicy.NONE;
break;
case "EQ":
generationPolicy = GenerationPolicy.EXPECT_GEN_EQUAL;
break;
case "GT":
generationPolicy = GenerationPolicy.EXPECT_GEN_GT;
break;
default: {
LOG.warn("Invalid generation policy provided, using the default generation policy");
generationPolicy = GenerationPolicy.NONE;
}
}
return generationPolicy;
}
/**
* Returns the corresponding record exist action from the user specified exists policy name
*
* @param existsPolicy exists policy name
* @return corresponding record exist action
*/
private RecordExistsAction getRecordExistsAction(String existsPolicy) {
if (existsPolicy == null)
return RecordExistsAction.UPDATE;
RecordExistsAction recordExistsAction;
switch (existsPolicy) {
case "UPDATE":
recordExistsAction = RecordExistsAction.UPDATE;
break;
case "UPDATE_ONLY":
recordExistsAction = RecordExistsAction.UPDATE_ONLY;
break;
case "REPLACE":
recordExistsAction = RecordExistsAction.REPLACE;
break;
case "REPLACE_ONLY":
recordExistsAction = RecordExistsAction.REPLACE_ONLY;
break;
case "CREATE_ONLY":
recordExistsAction = RecordExistsAction.CREATE_ONLY;
break;
default: {
LOG.warn("Invalid record exists action provided, using the default record exists action");
recordExistsAction = RecordExistsAction.UPDATE;
}
}
return recordExistsAction;
}
/**
* Returns the corresponding key usage policy from the user specified key policy name
*
* @param keyPolicy key policy name
* @return corresponding key usage policy
*/
private boolean getKeyUsagePolicy(String keyPolicy) {
if (keyPolicy == null)
return false;
boolean sendKey;
switch (keyPolicy) {
case "DIGEST":
sendKey = false;
break;
case "SEND":
sendKey = true;
break;
default: {
LOG.warn("Invalid key action policy provided, using the default key action policy");
sendKey = false;
}
}
return sendKey;
}
/**
* Returns the timeout value from the user specified timeout value
*
* @param timeout user specified timeout value
* @return timeout value
*/
private int getTimeoutValue(String timeout) {
if (timeout == null) {
return 0;
}
int timeoutInt = 0;
try {
timeoutInt = Integer.valueOf(timeout);
} catch (NumberFormatException e) {
LOG.warn("Invalid timeout value provided, using the default timeout value");
}
return timeoutInt;
}
}