blob: 3e5daa5495f347ebee9d97d3d2cced4ddde6608b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.openjpa.persistence.event;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import javax.persistence.EntityManager;
import org.apache.openjpa.event.RemoteCommitEvent;
import org.apache.openjpa.event.RemoteCommitListener;
import org.apache.openjpa.event.TCPRemoteCommitProvider;
import org.apache.openjpa.lib.conf.Configurations;
import org.apache.openjpa.persistence.OpenJPAEntityManagerFactory;
import org.apache.openjpa.persistence.OpenJPAEntityManagerFactorySPI;
import org.apache.openjpa.persistence.common.utils.AbstractTestCase;
import org.apache.openjpa.persistence.event.common.apps.Duration;
import org.apache.openjpa.persistence.event.common.apps.RuntimeTest1;
import org.apache.openjpa.persistence.test.AllowFailure;
@AllowFailure(message="surefire excluded")
public class TestTCPRemoteEventsDuration
extends AbstractTestCase {
public TestTCPRemoteEventsDuration(String s) {
super(s, "eventcactusapp");
}
@Override
public void setUp() {
deleteAll(RuntimeTest1.class);
}
public void tearDownTestClass()
throws Exception {
//super.tearDownTestClass();
}
// FYI For 1000 objects with MySQL -- 1005 transactions,
// doTransactions () takes 45 seconds
private static final int NUM_OBJECTS = 200;
private static final int NUM_TO_AVERAGE = 3;
private static final int NUM_CONCURRENT = 3;
public void testDurationDifference() {
// create a non-cluserted, single pmf
OpenJPAEntityManagerFactory factorySingle = createDistinctFactory(
null, "");
EntityManager pmSingle = factorySingle.createEntityManager();
// Create 2 pmfs that are using RCPTCP. run same benachmark of
// transactions
OpenJPAEntityManagerFactory senderFactory1TCP =
createDistinctFactory(TCPRemoteCommitProvider.class,
"MaxTotal=4, RecoveryTimeMillis=1000, Port=5636, " +
"Addresses=127.0.0.1:5636;127.0.0.1:6636");
OpenJPAEntityManagerFactory factory2TCP = createDistinctFactory(
TCPRemoteCommitProvider.class,
"Port=6636, Addresses=127.0.0.1:6636;127.0.0.1:5636");
// Register a RCEListener with the RCEM. The RCEListener
// will record the number of
// a,d, and u recevied when the sender PM performs commits.
RemoteCommitListenerTestImpl listenerAtReceiver
= new RemoteCommitListenerTestImpl();
((OpenJPAEntityManagerFactorySPI) factory2TCP).getConfiguration()
.getRemoteCommitEventManager().
addListener(listenerAtReceiver);
System.out.println("-------------------");
System.out.println("3 PMFs created, 1 as standalone, and 2 acting " +
"as a cluster using ports 5636 and 6636");
// This call is a "throw away" run to seed caches, etc.
doTransactions(pmSingle, NUM_OBJECTS);
double benchmarkSingle = 0.0;
for (int i = 0; i < NUM_TO_AVERAGE; i++) {
benchmarkSingle += doTransactions(pmSingle, NUM_OBJECTS);
}
benchmarkSingle /= NUM_TO_AVERAGE;
EntityManager pmOneThread =
senderFactory1TCP.createEntityManager();
double benchmarkClusterOneThread = doTransactions(pmOneThread,
NUM_OBJECTS);
pmOneThread.close();
Duration timeThreaded = new Duration("For " + NUM_CONCURRENT +
" concurrent threads performing Adds, removes, and dletes for " +
NUM_OBJECTS + " objects.");
Thread[] concurrentThreads = new Thread[NUM_CONCURRENT];
timeThreaded.start();
for (int i = 0; i < NUM_CONCURRENT; i++) {
Worker worker = new Worker
(senderFactory1TCP.createEntityManager());
concurrentThreads[i] = new Thread(worker);
concurrentThreads[i].start();
}
// Wait for all threads to finish
for (int i = 0; i < NUM_CONCURRENT; i++) {
try {
concurrentThreads[i].join();
} catch (Exception e) {
assertTrue(false);
break;
}
}
timeThreaded.stop();
double benchmarkCluster = timeThreaded.getDurationAsSeconds();
System.out.println("For " + NUM_OBJECTS + " objects, and " +
NUM_CONCURRENT + " concurrent threads, the receiving pmf of the " +
"cluster received :" +
listenerAtReceiver.totalAddedClasses + " claases adds, " +
listenerAtReceiver.totalDeleted + " deletes, " +
listenerAtReceiver.totalUpdated + " updates");
System.out.println(
"\nSingle pmf - " + benchmarkSingle +
"(s).\n Clustered pmfs (one worker thread) -"
+ benchmarkClusterOneThread +
"(s).\n Clustered pmfs (" + NUM_CONCURRENT
+ " threads - " + benchmarkCluster + "(s).\n");
Thread.currentThread();
Thread.yield();
try {
Thread.currentThread();
Thread.sleep((int) 500);
} catch (InterruptedException e) {
fail("unexecpted exception during pause");
}
assertEquals((NUM_CONCURRENT + 1) * NUM_OBJECTS,
listenerAtReceiver.totalAddedClasses);
assertEquals((NUM_CONCURRENT + 1) * NUM_OBJECTS,
listenerAtReceiver.totalDeleted);
assertEquals((NUM_CONCURRENT + 1) * NUM_OBJECTS,
listenerAtReceiver.totalUpdated);
// shutdown
pmSingle.close();
factorySingle.close();
factory2TCP.close();
senderFactory1TCP.close();
}
/*
* Worker thread that takes ownership of a PM to perform
* transactions. Once transactions are complete the tread
* will close the PM and end.
*/
private class Worker
implements Runnable {
private EntityManager _pm;
public Worker(EntityManager pm) {
_pm = pm;
}
@Override
public void run() {
doTransactions(_pm, NUM_OBJECTS);
endEm(_pm);
}
}
protected double doTransactions(EntityManager pm, int numObjects) {
// Perform a series of transactions that will trigger
// adds, deletes, and udpates
Duration timeToAMD = new Duration(
"Adds, removes, and dletes for " + numObjects + " objects.");
timeToAMD.start();
// create objects
RuntimeTest1[] persistables = new RuntimeTest1[numObjects];
for (int i = 0; i < persistables.length; i++) {
persistables[i] = new RuntimeTest1("foo #" + i, i);
}
// add them
// This will generate a larger number of transaction in a very
// short amount of time (old socket-per-transaction would
// exhaust jvm socket pool)
for (int i = 0; i < persistables.length; i++) {
startTx(pm);
pm.persist(persistables[i]);
endTx(pm);
}
// modify them
startTx(pm);
for (int i = 0; i < persistables.length; i++) {
persistables[i].setStringField("bazzed" + i);
}
endTx(pm);
// delete them
startTx(pm);
for (int i = 0; i < persistables.length; i++) {
pm.remove(persistables[i]);
}
endTx(pm);
timeToAMD.stop();
return timeToAMD.getDurationAsSeconds();
}
static int _fetchGroupSerial = 0;
protected OpenJPAEntityManagerFactory createDistinctFactory(
Class providerClass, String classProps1) {
String transmit = "TransmitPersistedObjectIds=true";
if (classProps1 == null || classProps1.length() == 0)
classProps1 = transmit;
else
classProps1 += "," + transmit;
Map propsMap;
if (providerClass != null) {
propsMap = new HashMap();
propsMap.put("openjpa.RemoteCommitProvider", Configurations.
getPlugin(providerClass.getName(), classProps1));
propsMap.put("openjpa.FetchGroups", "differentiatingFetchGroup" +
_fetchGroupSerial);
} else {
// No RCP
propsMap = new HashMap();
propsMap.put("openjpa.RemoteCommitProvider",
"sjvm(TransmitPersistedObjectIds=true)");
propsMap.put("openjpa.FetchGroups", "differentiatingFetchGroup" +
_fetchGroupSerial);
}
_fetchGroupSerial += 1;
return getEmf(propsMap);
}
protected static class RemoteCommitListenerTestImpl
implements RemoteCommitListener {
Collection addClasses;
Collection added;
Collection updated;
Collection deleted;
int totalAddedClasses;
int totalUpdated;
int totalDeleted;
@Override
public synchronized void afterCommit(RemoteCommitEvent event) {
this.addClasses = event.getPersistedTypeNames();
this.updated = event.getUpdatedObjectIds();
this.deleted = event.getDeletedObjectIds();
totalAddedClasses += addClasses.size();
totalUpdated += updated.size();
totalDeleted += deleted.size();
}
@Override
public void close() {
}
}
}