blob: 637a73751dceb774edaa825be672200ece7e1c9e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sling.cassandra.resource.provider;
import org.apache.commons.codec.binary.Base64;
import me.prettyprint.cassandra.model.CqlQuery;
import me.prettyprint.cassandra.model.CqlRows;
import me.prettyprint.cassandra.serializers.StringSerializer;
import me.prettyprint.cassandra.service.ThriftKsDef;
import me.prettyprint.hector.api.Cluster;
import me.prettyprint.hector.api.Keyspace;
import me.prettyprint.hector.api.ddl.ColumnFamilyDefinition;
import me.prettyprint.hector.api.ddl.ComparatorType;
import me.prettyprint.hector.api.ddl.KeyspaceDefinition;
import me.prettyprint.hector.api.exceptions.HInvalidRequestException;
import me.prettyprint.hector.api.factory.HFactory;
import me.prettyprint.hector.api.query.QueryResult;
import org.apache.felix.scr.annotations.*;
import org.apache.sling.api.resource.*;
import org.apache.sling.cassandra.resource.provider.mapper.CassandraMapper;
import org.apache.sling.cassandra.resource.provider.mapper.DefaultCassandraMapperImpl;
import org.apache.sling.cassandra.resource.provider.util.CassandraResourceProviderUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.UnsupportedEncodingException;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.Arrays;
import java.util.Iterator;
@Component(immediate = true, metatype = true)
@Service(ResourceProvider.class)
@Properties({
@Property(name = "provider.roots", value = {"/content/cassandra"})
})
public class CassandraResourceProvider implements ResourceProvider, ModifyingResourceProvider {
private Cluster cluster;
private Keyspace keyspace;
private String cf = "pictures";
private int replicationFactor = 1;
private final static String KEYSPACE = "sling_cassandra";
private String CASSANDRA_EP = "localhost:9160";
private ConcurrentHashMap<String, CassandraMapper> cassandraMapperMap
= new ConcurrentHashMap<String, CassandraMapper>();
private static Logger LOGGER = LoggerFactory.getLogger(CassandraResourceProvider.class);
private static final Map<String, ValueMap> CASSANDRA_MAP = new HashMap<String, ValueMap>();
private static final Map<String, Map> TRANSIENT_CREATE_MAP = new HashMap<String, Map>();
private static final Map<String, Map> TRANSIENT_DELETE_MAP = new HashMap<String, Map>();
public CassandraResourceProvider() {
init();
}
public Keyspace getKeyspace() {
return keyspace;
}
public void setColumnFamily(String cf) {
this.cf = cf;
}
private synchronized void init() {
System.out.println("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ PROVIDER INIT");
cluster = HFactory.getOrCreateCluster(CassandraConstants.CASSANDRA_DEFAULT_CLUSTER_NAME, CASSANDRA_EP);
KeyspaceDefinition keyspaceDef = cluster.describeKeyspace(KEYSPACE);
if (keyspaceDef == null) {
try {
createSchema(cluster);
} catch (Exception ignore) {
System.out.println("Ignoring the exception when trying to create a already existing schema " + ignore.getMessage());
LOGGER.debug("Ignoring the exception when trying to create a already existing schema " + ignore.getMessage());
}
}
this.keyspace = HFactory.createKeyspace(KEYSPACE, cluster);
System.out.println("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ PROVIDER INIT2");
}
public Resource getResource(ResourceResolver resourceResolver, javax.servlet.http.HttpServletRequest httpServletRequest, String s) {
return getResource(resourceResolver, s);
}
public Resource getResource(ResourceResolver resourceResolver, String s) {
//Populating the map of cassandra mappers.
if(s.startsWith("/") && !s.equals("/content/cassandra") && s.split("/").length <= 4){
return new CassandraResource(this, resourceResolver, s, new CassandraResource.CassandraValueMap(s),new HashMap<String, Object>());
}
System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%"+s);
if (CASSANDRA_MAP.get(s) == null) {
return null;
}
if (CassandraResourceProviderUtil.getColumnFamilySector(s) != null && !getCassandraMapperMap().containsKey(CassandraResourceProviderUtil.getColumnFamilySector(s))) {
getCassandraMapperMap().put(CassandraResourceProviderUtil.getColumnFamilySector(s), new DefaultCassandraMapperImpl());
}
try {
return new CassandraResource(this, resourceResolver, s, CASSANDRA_MAP.get(s));
// return CassandraResourceProviderUtil.isResourceExists(this,keyspace,s) ? new CassandraResource(this,resourceResolver, s,CASSANDRA_MAP.get(s)) : null;
} catch (Exception e) {
System.out.println("Error at Provider " + e.getMessage());
LOGGER.error("Error at Provider " + e.getMessage());
LOGGER.debug(e.getMessage());
}
return null;
}
public Iterator<Resource> listChildren(Resource resource) {
return resource.listChildren();
}
private void createSchema(Cluster myCluster) {
ColumnFamilyDefinition cfDef = HFactory.createColumnFamilyDefinition(KEYSPACE, cf, ComparatorType.BYTESTYPE);
KeyspaceDefinition newKeyspace = HFactory.createKeyspaceDefinition(
KEYSPACE,
ThriftKsDef.DEF_STRATEGY_CLASS,
replicationFactor,
Arrays.asList(cfDef));
myCluster.addKeyspace(newKeyspace, true);
}
public ConcurrentHashMap<String, CassandraMapper> getCassandraMapperMap() {
return cassandraMapperMap;
}
static {
defineCassandra("/content/cassandra/p1/c1");
defineCassandra("/content/cassandra/p1/c1/c2");
defineCassandra("/content/cassandra/nK/1");
loadMapWithTestData();
}
private static void loadMapWithTestData() {
String[] nodes = new String[]{"A", "B", "C", "D", "E", "F"};
for (String node : nodes) {
int exerciseCount = 0;
int testCount = 0;
for (int i = 0; i < 1000; i++) {
String path = "/content/cassandra/" + node + "/" + i;
if (i % 2 == 0) {
if (exerciseCount <= 100) {
defineCassandra(path);
exerciseCount++;
}
} else {
if (testCount <= 100) {
defineCassandra(path);
testCount++;
}
}
}
}
}
private static ValueMap defineCassandra(String path) {
final ValueMap valueMap = new CassandraResource.CassandraValueMap(path);
CASSANDRA_MAP.put(path, valueMap);
return valueMap;
}
/**
* Modification methods for Cassandra Resource Provider
*/
@Override
public Resource create(ResourceResolver resourceResolver, String s, Map<String, Object> stringObjectMap) throws PersistenceException {
System.out.println("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@AA3 CREATE");
TRANSIENT_CREATE_MAP.put(s, stringObjectMap);
commit(resourceResolver);
// return getResource(resourceResolver,s);
return new CassandraResource(this, resourceResolver, s, new CassandraResource.CassandraValueMap(s),stringObjectMap);
}
@Override
public void delete(ResourceResolver resourceResolver, String s) throws PersistenceException {
if (TRANSIENT_CREATE_MAP.get(s) != null) {
TRANSIENT_DELETE_MAP.put(s, TRANSIENT_CREATE_MAP.get(s));
TRANSIENT_CREATE_MAP.remove(s);
}
if (CASSANDRA_MAP.get(s) != null) {
TRANSIENT_DELETE_MAP.put(s, CASSANDRA_MAP.get(s));
}
}
@Override
public void revert(ResourceResolver resourceResolver) {
TRANSIENT_CREATE_MAP.clear();
TRANSIENT_DELETE_MAP.clear();
}
@Override
public void commit(ResourceResolver resourceResolver) throws PersistenceException {
for (Map.Entry<String, Map> entry : TRANSIENT_CREATE_MAP.entrySet()) {
if (entry.getKey() != null && entry.getValue() != null) {
if(insertResource(entry.getKey(), entry.getValue())){
defineCassandra(entry.getKey());
}
}
}
for (Map.Entry<String, Map> entry : TRANSIENT_DELETE_MAP.entrySet()) {
if(deleteResource(resourceResolver, entry.getKey())){
CASSANDRA_MAP.remove(entry.getKey());
}
}
}
private boolean deleteResource(ResourceResolver resourceResolver, String path) throws PersistenceException {
try {
String key = getrowID(path);
if(key == null){
return false;
}
String _cf = CassandraResourceProviderUtil.getColumnFamilySector(path);
createColumnFamily(_cf, this.getKeyspace(), new StringSerializer());
StringSerializer se = new StringSerializer();
CqlQuery<String, String, String> cqlQuery = new CqlQuery<String, String, String>(keyspace, se, se, se);
String query = "delete FROM " + _cf + " where KEY = '" + key + "';";
cqlQuery.setQuery(query);
QueryResult<CqlRows<String, String, String>> result = cqlQuery.execute();
} catch (NoSuchAlgorithmException e) {
throw new PersistenceException(e.getMessage());
} catch (UnsupportedEncodingException e) {
throw new PersistenceException(e.getMessage());
}
return true;
}
private boolean insertResource(String path, Map map) throws PersistenceException {
try {
String r = getrowID(path);
if(r == null){
return false;
}
String _cf = CassandraResourceProviderUtil.getColumnFamilySector(path);
createColumnFamily(_cf, this.getKeyspace(), new StringSerializer());
String metadata = map.get("metadata") == null? "resolutionPathInfo=json":(String)map.get("metadata");
String resourceType = map.get("resourceType") == null?"nt:cassandra":(String)map.get("resourceType");
String resourceSuperType = map.get("resourceSuperType") == null?"nt:superCassandra":(String) map.get("resourceSuperType");
addData(this.getKeyspace(), _cf, new StringSerializer(),
new String[]{
"'" + r + "','" + path + "','" + resourceType + "','" + resourceSuperType + "','" + metadata + "'",
});
} catch (NoSuchAlgorithmException e) {
throw new PersistenceException(e.getMessage());
} catch (UnsupportedEncodingException e) {
throw new PersistenceException(e.getMessage());
}
return true;
}
private void addData(Keyspace keyspace, String cf, StringSerializer se, String[] dataArray) {
for (String data : dataArray) {
CqlQuery<String, String, String> cqlQuery = new CqlQuery<String, String, String>(keyspace, se, se, se);
String query = "insert into " + cf + " (KEY,path,resourceType,resourceSuperType,metadata) values (" + data + ");";
cqlQuery.setQuery(query);
QueryResult<CqlRows<String, String, String>> result = cqlQuery.execute();
LOGGER.info("Added " + result.toString());
}
}
private void createColumnFamily(String cf, Keyspace keyspace, StringSerializer se) {
String createCF = "CREATE COLUMNFAMILY " + cf + " (KEY varchar PRIMARY KEY,path varchar,resourceType varchar,resourceSuperType varchar,metadata varchar);";
CqlQuery<String, String, String> cqlQuery = new CqlQuery<String, String, String>(keyspace, se, se, se);
cqlQuery.setQuery(createCF);
try{
QueryResult<CqlRows<String, String, String>> result1 = cqlQuery.execute();
LOGGER.info(result1.get().getList().size() + " Finished.!");
}catch(HInvalidRequestException ignore){
LOGGER.debug("Column Family already exists " + ignore.getMessage());
}
}
private String getrowID(String path) throws NoSuchAlgorithmException, UnsupportedEncodingException {
MessageDigest md = MessageDigest.getInstance("SHA1");
String remPath = CassandraResourceProviderUtil.getRemainingPath(path);
if(remPath == null ) {
return null;
} else {
String rowID = new String(Base64.encodeBase64(md.digest(remPath.getBytes("UTF-8"))));
return rowID;
}
}
@Override
public boolean hasChanges(ResourceResolver resourceResolver) {
if (TRANSIENT_CREATE_MAP.isEmpty() && TRANSIENT_DELETE_MAP.isEmpty()) {
return false;
} else {
return true;
}
}
}