blob: b2450659f94bd6775efff3cf9f5480d108ed12dc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.openjpa.slice.jdbc;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.apache.openjpa.jdbc.kernel.JDBCStore;
import org.apache.openjpa.jdbc.kernel.JDBCStoreQuery;
import org.apache.openjpa.kernel.ExpressionStoreQuery;
import org.apache.openjpa.kernel.FetchConfiguration;
import org.apache.openjpa.kernel.OrderingMergedResultObjectProvider;
import org.apache.openjpa.kernel.QueryContext;
import org.apache.openjpa.kernel.StoreQuery;
import org.apache.openjpa.kernel.exps.ExpressionParser;
import org.apache.openjpa.lib.rop.MergedResultObjectProvider;
import org.apache.openjpa.lib.rop.RangeResultObjectProvider;
import org.apache.openjpa.lib.rop.ResultObjectProvider;
import org.apache.openjpa.meta.ClassMetaData;
import org.apache.openjpa.util.StoreException;
/**
* A query for distributed databases.
*
* @author Pinaki Poddar
*
*/
@SuppressWarnings("serial")
class DistributedStoreQuery extends JDBCStoreQuery {
private List<StoreQuery> _queries = new ArrayList<StoreQuery>();
private ExpressionParser _parser;
public DistributedStoreQuery(JDBCStore store, ExpressionParser parser) {
super(store, parser);
_parser = parser;
}
void add(StoreQuery q) {
_queries.add(q);
}
public Executor newDataStoreExecutor(ClassMetaData meta, boolean subs) {
ParallelExecutor ex = new ParallelExecutor(this, meta, subs, _parser,
ctx.getCompilation());
FetchConfiguration fetch = getContext().getFetchConfiguration();
DistributedStoreManager store = (DistributedStoreManager)getContext()
.getStoreContext().getStoreManager().getInnermostDelegate();
List<SliceStoreManager> targets = store.getTargets(fetch);
for (StoreQuery q:_queries) {
if (targets.contains(((JDBCStoreQuery)q).getStore()))
ex.addExecutor(q.newDataStoreExecutor(meta, subs));
}
return ex;
}
public void setContext(QueryContext ctx) {
super.setContext(ctx);
for (StoreQuery q:_queries)
q.setContext(ctx);
}
public ExecutorService getExecutorServiceInstance() {
DistributedJDBCConfiguration conf =
((DistributedJDBCConfiguration)getStore().getConfiguration());
return conf.getExecutorServiceInstance();
}
/**
* Executes queries on multiple databases.
*
* @author Pinaki Poddar
*
*/
public static class ParallelExecutor extends
ExpressionStoreQuery.DataStoreExecutor {
private List<Executor> executors = new ArrayList<Executor>();
private DistributedStoreQuery owner = null;
private ExecutorService threadPool = null;
public void addExecutor(Executor ex) {
executors.add(ex);
}
public ParallelExecutor(DistributedStoreQuery dsq, ClassMetaData meta,
boolean subclasses, ExpressionParser parser, Object parsed) {
super(dsq, meta, subclasses, parser, parsed);
owner = dsq;
threadPool = dsq.getExecutorServiceInstance();
}
/**
* Each child query must be executed with slice context and not the
* given query context.
*/
public ResultObjectProvider executeQuery(StoreQuery q,
final Object[] params, final Range range) {
ResultObjectProvider[] tmp = new ResultObjectProvider[executors.size()];
final Iterator<StoreQuery> qs = owner._queries.iterator();
final List<Future<ResultObjectProvider>> futures =
new ArrayList<Future<ResultObjectProvider>>();
int i = 0;
for (Executor ex:executors) {
QueryExecutor call = new QueryExecutor();
call.executor = ex;
call.query = qs.next();
call.params = params;
call.range = range;
futures.add(threadPool.submit(call));
}
for (Future<ResultObjectProvider> future:futures) {
try {
tmp[i++] = future.get();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new StoreException(e.getCause());
}
}
boolean[] ascending = getAscending(q);
boolean isAscending = ascending.length > 0;
boolean isUnique = q.getContext().isUnique();
boolean hasRange = q.getContext().getEndRange() != Long.MAX_VALUE;
ResultObjectProvider result = null;
if (isUnique) {
result = new UniqueResultObjectProvider(tmp, q,
getQueryExpressions());
} else if (isAscending) {
result = new OrderingMergedResultObjectProvider(tmp, ascending,
(Executor[])executors.toArray(new Executor[executors.size()]),
q, params);
} else {
result = new MergedResultObjectProvider(tmp);
}
if (hasRange)
result = new RangeResultObjectProvider(result,
q.getContext().getStartRange(),
q.getContext().getEndRange());
return result;
}
public Number executeDelete(StoreQuery q, Object[] params) {
Iterator<StoreQuery> qs = owner._queries.iterator();
final List<Future<Number>> futures = new ArrayList<Future<Number>>();
for (Executor ex:executors) {
DeleteExecutor call = new DeleteExecutor();
call.executor = ex;
call.query = qs.next();
call.params = params;
futures.add(threadPool.submit(call));
}
int N = 0;
for (Future<Number> future:futures) {
try {
Number n = future.get();
if (n != null)
N += n.intValue();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new StoreException(e.getCause());
}
}
return new Integer(N);
}
public Number executeUpdate(StoreQuery q, Object[] params) {
Iterator<StoreQuery> qs = owner._queries.iterator();
final List<Future<Number>> futures = new ArrayList<Future<Number>>();
for (Executor ex:executors) {
UpdateExecutor call = new UpdateExecutor();
call.executor = ex;
call.query = qs.next();
call.params = params;
futures.add(threadPool.submit(call));
}
int N = 0;
for (Future<Number> future:futures) {
try {
Number n = future.get();
if (n != null)
N += n.intValue();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new StoreException(e.getCause());
}
}
return new Integer(N);
}
}
static class QueryExecutor implements Callable<ResultObjectProvider> {
StoreQuery query;
Executor executor;
Object[] params;
Range range;
public ResultObjectProvider call() throws Exception {
return executor.executeQuery(query, params, range);
}
}
static class DeleteExecutor implements Callable<Number> {
StoreQuery query;
Executor executor;
Object[] params;
public Number call() throws Exception {
return executor.executeDelete(query, params);
}
}
static class UpdateExecutor implements Callable<Number> {
StoreQuery query;
Executor executor;
Object[] params;
public Number call() throws Exception {
return executor.executeDelete(query, params);
}
}
}