blob: b88b8ed0e5b9d59c9a78c70d1fdae65e01552484 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tajo.ws.rs;
import org.apache.tajo.QueryId;
import org.apache.tajo.master.TajoMaster.MasterContext;
import org.apache.tajo.master.exec.NonForwardQueryResultScanner;
import org.apache.tajo.ws.rs.resources.ClusterResource;
import org.apache.tajo.ws.rs.resources.DatabasesResource;
import org.apache.tajo.ws.rs.resources.FunctionsResource;
import org.apache.tajo.ws.rs.resources.QueryResource;
import org.apache.tajo.ws.rs.resources.SessionsResource;
import org.apache.tajo.ws.rs.resources.TablesResource;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import javax.ws.rs.core.Application;
import java.security.SecureRandom;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
/**
* It loads client classes for Tajo REST protocol.
*/
public class ClientApplication extends Application {
private final MasterContext masterContext;
private final ConcurrentMap<QueryId, Long> queryIdToResultSetCacheIdMap;
private final Cache<Long, NonForwardQueryResultScanner> queryResultScannerCache;
private final SecureRandom secureRandom;
public ClientApplication(MasterContext masterContext) {
this.masterContext = masterContext;
this.secureRandom = new SecureRandom();
this.queryIdToResultSetCacheIdMap = new ConcurrentHashMap<QueryId, Long>();
this.queryResultScannerCache = CacheBuilder.newBuilder()
.concurrencyLevel(4)
.maximumSize(1000)
.expireAfterAccess(30, TimeUnit.MINUTES)
.build();
}
@Override
public Set<Class<?>> getClasses() {
Set<Class<?>> classes = new HashSet<Class<?>>();
classes.add(SessionsResource.class);
classes.add(DatabasesResource.class);
classes.add(TablesResource.class);
classes.add(FunctionsResource.class);
classes.add(ClusterResource.class);
classes.add(QueryResource.class);
return classes;
}
public MasterContext getMasterContext() {
return masterContext;
}
/**
* It returns generated 8-byte size integer.
*
* @return
*/
private long generateCacheId() {
byte[] generatedBytes = new byte[8];
long generatedId = 0;
secureRandom.nextBytes(generatedBytes);
for (byte generatedByte: generatedBytes) {
generatedId = (generatedId << 8) + (generatedByte & 0xff);
}
if (generatedId < 0) {
generatedId = generatedId * -1;
}
return generatedId;
}
/**
* If cannot find any cache id for supplied query id, it will generate a new cache id.
*
* @param queryId
* @return
*/
public long generateCacheIdIfAbsent(QueryId queryId) {
Long cacheId = this.queryIdToResultSetCacheIdMap.get(queryId);
long newCacheId = 0;
if (cacheId == null) {
boolean generated = false;
do {
newCacheId = generateCacheId();
if (queryResultScannerCache.getIfPresent(newCacheId) == null) {
generated = true;
}
} while (!generated);
cacheId = this.queryIdToResultSetCacheIdMap.putIfAbsent(queryId, newCacheId);
if (cacheId != null) {
newCacheId = cacheId.longValue();
}
} else {
newCacheId = cacheId.longValue();
}
return newCacheId;
}
/**
* get cached NonForwardResultScanner instance by query id and cache id.
*
* @param queryId
* @param cacheId
* @return
*/
public NonForwardQueryResultScanner getCachedNonForwardResultScanner(QueryId queryId, long cacheId) {
Long cachedCacheId = queryIdToResultSetCacheIdMap.get(queryId);
if (cachedCacheId == null) {
throw new RuntimeException("Supplied cache id " + cacheId + " was expired or invalid.");
}
if (cacheId != cachedCacheId.longValue()) {
throw new RuntimeException("Supplied cache id " + cacheId + " was expired or invalid. " +
"Please use the valid cache id.");
}
return queryResultScannerCache.getIfPresent(cachedCacheId);
}
/**
* Store NonForwardResultScanner instance to cached memory if not present.
*
* @param queryId
* @param cacheId
* @param resultScanner
*/
public NonForwardQueryResultScanner setCachedNonForwardResultScanner(QueryId queryId, long cacheId,
NonForwardQueryResultScanner resultScanner) {
NonForwardQueryResultScanner cachedScanner = null;
if (cacheId == 0) {
cacheId = generateCacheIdIfAbsent(queryId);
}
cachedScanner = getCachedNonForwardResultScanner(queryId, cacheId);
if (cachedScanner == null) {
cachedScanner = this.queryResultScannerCache.asMap().putIfAbsent(cacheId, resultScanner);
if (cachedScanner == null) {
cachedScanner = resultScanner;
}
}
return cachedScanner;
}
}