blob: b057fcbcba94dd1a16a485740f462408cee861be [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.openjpa.slice;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.persistence.EntityManager;
import javax.persistence.Query;
import javax.persistence.TypedQuery;
/**
* Tests when multiple user threads enter the same EntityManager and executes
* query.
*
* @author Pinaki Poddar
*
*/
public class TestQueryMultiThreaded extends SliceTestCase {
private int POBJECT_COUNT = 25;
private int VALUE_MIN = 100;
private int VALUE_MAX = VALUE_MIN + POBJECT_COUNT - 1;
private static int THREADS = 5;
private static int MAX_TIMEOUT = 300;
private ExecutorService group;
private Future[] futures;
@Override
protected String getPersistenceUnitName() {
return "ordering";
}
@Override
public void setUp() throws Exception {
super.setUp(PObject.class, Person.class, Address.class, Country.class,
CLEAR_TABLES, "openjpa.Multithreaded", "true");
int count = count(PObject.class);
if (count == 0) {
create(POBJECT_COUNT);
}
group = new ThreadPoolExecutor(THREADS, THREADS,
60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r);
}
});
futures = new Future[THREADS];
}
@Override
public void tearDown() throws Exception {
group.shutdown();
super.tearDown();
}
void create(int N) {
EntityManager em = emf.createEntityManager();
em.getTransaction().begin();
for (int i = 0; i < POBJECT_COUNT; i++) {
PObject pc = new PObject();
pc.setValue(VALUE_MIN + i);
em.persist(pc);
String slice = SlicePersistence.getSlice(pc);
String expected = (pc.getValue() % 2 == 0) ? "Even" : "Odd";
assertEquals(expected, slice);
}
Person p1 = new Person();
Person p2 = new Person();
Address a1 = new Address();
Address a2 = new Address();
p1.setName("Even");
p2.setName("Odd");
a1.setCity("San Francisco");
a2.setCity("Rome");
p1.setAddress(a1);
p2.setAddress(a2);
em.persist(p1);
em.persist(p2);
assertEquals("Even", SlicePersistence.getSlice(p1));
assertEquals("Odd", SlicePersistence.getSlice(p2));
em.getTransaction().commit();
}
public void testQueryResultIsOrderedAcrossSlice() {
final EntityManager em = emf.createEntityManager();
em.getTransaction().begin();
final Query query = em.createQuery(
"SELECT p.value,p FROM PObject p ORDER BY p.value ASC");
for (int i = 0; i < THREADS; i++) {
futures[i] = group.submit(new Callable<Object>() {
@Override
public Object call() {
List result = query.getResultList();
Integer old = Integer.MIN_VALUE;
for (Object row : result) {
Object[] line = (Object[]) row;
int value = (Integer) line[0];
PObject pc = (PObject) line[1];
assertTrue(value >= old);
old = value;
assertEquals(value, pc.getValue());
}
return null;
}
});
}
waitForTermination();
em.getTransaction().rollback();
}
public void testAggregateQuery() {
final EntityManager em = emf.createEntityManager();
em.getTransaction().begin();
final Query countQ = em.createQuery("SELECT COUNT(p) FROM PObject p");
final Query maxQ = em.createQuery("SELECT MAX(p.value) FROM PObject p");
final Query minQ = em.createQuery("SELECT MIN(p.value) FROM PObject p");
final Query sumQ = em.createQuery("SELECT SUM(p.value) FROM PObject p");
final Query minmaxQ = em.createQuery(
"SELECT MIN(p.value),MAX(p.value) FROM PObject p");
for (int i = 0; i < THREADS; i++) {
futures[i] = group.submit(new Callable<Object>() {
@Override
public Object call() {
Object count = countQ.getSingleResult();
Object max = maxQ.getSingleResult();
Object min = minQ.getSingleResult();
Object sum = sumQ.getSingleResult();
Object minmax = minmaxQ.getSingleResult();
Object min1 = ((Object[]) minmax)[0];
Object max1 = ((Object[]) minmax)[1];
assertEquals(POBJECT_COUNT, ((Number) count).intValue());
assertEquals(VALUE_MAX, ((Number) max).intValue());
assertEquals(VALUE_MIN, ((Number) min).intValue());
assertEquals((VALUE_MIN + VALUE_MAX) * POBJECT_COUNT,
2 * ((Number) sum).intValue());
assertEquals(min, min1);
assertEquals(max, max1);
return null;
}
});
}
waitForTermination();
em.getTransaction().rollback();
}
public void testAggregateQueryWithMissingValueFromSlice() {
final EntityManager em = emf.createEntityManager();
em.getTransaction().begin();
final Query maxQ = em.createQuery(
"SELECT MAX(p.value) FROM PObject p WHERE MOD(p.value,2)=0");
for (int i = 0; i < THREADS; i++) {
futures[i] = group.submit(new Callable<Object>() {
@Override
public Object call() {
Object max = maxQ.getSingleResult();
assertEquals(VALUE_MAX, ((Number) max).intValue());
return null;
}
});
}
waitForTermination();
em.getTransaction().rollback();
}
public void testSetMaxResult() {
final EntityManager em = emf.createEntityManager();
final int limit = 3;
em.getTransaction().begin();
final Query q = em.createQuery(
"SELECT p.value,p FROM PObject p ORDER BY p.value ASC");
for (int i = 0; i < THREADS; i++) {
futures[i] = group.submit(new Callable<Object>() {
@Override
public Object call() {
List result = q.setMaxResults(limit).getResultList();
int i = 0;
for (Object row : result) {
Object[] line = (Object[]) row;
int value = (Integer) line[0];
PObject pc = (PObject) line[1];
}
assertEquals(limit, result.size());
return null;
}
});
}
waitForTermination();
em.getTransaction().rollback();
}
public void testHeavyLoad() {
Thread[] threads = new Thread[800];
for (int i = 0; i < 800; i++) {
Runnable r = new Runnable() {
@Override
public void run() {
EntityManager em = emf.createEntityManager();
em.getTransaction().begin();
for (int j = 0; j < 10; j ++) {
PObject pc = new PObject();
pc.setValue((int)System.currentTimeMillis()%10);
em.persist(pc);
}
em.getTransaction().commit();
}
};
threads[i] = new Thread(r);
threads[i].start();
}
for (Thread t : threads) {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
fail();
}
}
}
public void testHint() {
final List<String> targets = new ArrayList<>();
targets.add("Even");
final EntityManager em = emf.createEntityManager();
em.getTransaction().begin();
final Query query = em.createQuery("SELECT p FROM PObject p");
for (int i = 0; i < THREADS; i++) {
futures[i] = group.submit(new Callable<Object>() {
@Override
public Object call() {
query.setHint(SlicePersistence.HINT_TARGET, "Even");
List result = query.getResultList();
for (Object pc : result) {
String slice = SlicePersistence.getSlice(pc);
assertTrue(targets.contains(slice));
}
return null;
}
});
}
waitForTermination();
em.getTransaction().rollback();
}
public void testInMemoryOrderBy() {
final EntityManager em = emf.createEntityManager();
em.getTransaction().begin();
final Query query =
em.createQuery("SELECT p FROM PObject p ORDER BY p.value");
for (int i = 0; i < THREADS; i++) {
futures[i] = group.submit(new Callable<Object>() {
@Override
public Object call() {
List result = query.getResultList();
return null;
}
});
}
waitForTermination();
em.getTransaction().rollback();
}
public void testQueryParameter() {
final EntityManager em = emf.createEntityManager();
em.getTransaction().begin();
final Query query =
em.createQuery("SELECT p FROM PObject p WHERE p.value > :v");
for (int i = 0; i < THREADS; i++) {
futures[i] = group.submit(new Callable<Object>() {
@Override
public Object call() {
query.setParameter("v", 200);
List result = query.getResultList();
return null;
}
});
}
waitForTermination();
em.getTransaction().rollback();
}
public void testQueryParameterEntity() {
final EntityManager em = emf.createEntityManager();
em.getTransaction().begin();
final TypedQuery<Address> addressQ = em.createQuery(
"select a from Address a where a.city = :city", Address.class);
final TypedQuery<Person> personQ = em.createQuery(
"SELECT p FROM Person p WHERE p.address = :a", Person.class);
for (int i = 0; i < THREADS; i++) {
futures[i] = group.submit(new Callable<Object>() {
@Override
public Object call() {
Address a = addressQ.setParameter("city", "Rome")
.getSingleResult();
assertNotNull(a);
assertEquals("Odd", SlicePersistence.getSlice(a));
List<Person> result = personQ.setParameter("a", a).getResultList();
assertEquals(1, result.size());
Person p = result.get(0);
assertEquals("Odd", SlicePersistence.getSlice(p));
assertEquals("Rome", p.getAddress().getCity());
return null;
}
});
}
waitForTermination();
em.getTransaction().rollback();
}
void waitForTermination() {
try {
for (Future f : futures)
try {
f.get(MAX_TIMEOUT, TimeUnit.SECONDS);
} catch (TimeoutException te) {
fail("Failed " + te + "\r\n" + getStackDump(te));
} catch (ExecutionException e) {
fail("Failed " + "\r\n" + getStackDump(e.getCause()));
}
} catch (InterruptedException e) {
}
}
String getStackDump(Throwable t) {
StringWriter writer = new StringWriter();
t.printStackTrace(new PrintWriter(writer));
return writer.toString();
}
}