blob: acd2102932be0b8b20e3a072b835fcbaa5bcf556 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIESOR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.aries.tx.control.itests;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Stream.generate;
import static org.ops4j.pax.exam.CoreOptions.mavenBundle;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import javax.inject.Inject;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.ops4j.pax.exam.Option;
import org.ops4j.pax.exam.junit.PaxExam;
import org.ops4j.pax.exam.spi.reactors.ExamReactorStrategy;
import org.ops4j.pax.exam.spi.reactors.PerClass;
import org.osgi.service.coordinator.Coordination;
import org.osgi.service.coordinator.Coordinator;
import org.osgi.service.coordinator.Participant;
@RunWith(PaxExam.class)
@ExamReactorStrategy(PerClass.class)
public class CoordinatorOptimisationTest extends AbstractTransactionTest implements Participant {
protected Option testSpecificOptions() {
return mavenBundle("org.apache.felix", "org.apache.felix.coordinator").versionAsInProject();
}
@Inject
Coordinator coordinator;
@Test
public void compareWithAndWithoutCoord() {
String base = "Hello ";
AtomicInteger counter = new AtomicInteger(1);
List<String> messages = generate(() -> base + counter.getAndIncrement())
.limit(10000)
.collect(toList());
long noCoord;
long oneTran;
long withCoord;
long start = System.currentTimeMillis();
try {
messages.stream()
.forEach(this::persistMessage);
} finally {
noCoord = System.currentTimeMillis() - start;
}
txControl.required(() -> connection.createStatement().executeUpdate("DELETE FROM TEST_TABLE"));
txControl.required(() -> {
messages.stream()
.forEach(this::persistMessage);
return null;
});
oneTran = System.currentTimeMillis() - start;
txControl.required(() -> connection.createStatement().executeUpdate("DELETE FROM TEST_TABLE"));
coordinator.begin("foo", MINUTES.toMillis(5));
start = System.currentTimeMillis();
try {
messages.stream()
.forEach(this::persistMessage);
} finally {
coordinator.peek().end();
withCoord = System.currentTimeMillis() - start;
}
System.out.println("\n\n\n\nWithout Coord: " + noCoord + " One Tran: " + oneTran+
" With Coord: " + withCoord);
}
@SuppressWarnings("unchecked")
private void persistMessage(String message) {
if(coordinator.addParticipant(this)) {
((List<String>)coordinator.peek().getVariables()
.computeIfAbsent(getClass(), k -> new ArrayList<String>()))
.add(message);
} else {
txControl.required(() -> {
PreparedStatement ps = connection.prepareStatement(
"Insert into TEST_TABLE values ( ? )");
ps.setString(1, message);
return ps.executeUpdate();
});
}
}
@SuppressWarnings("unchecked")
@Override
public void ended(Coordination coord) throws Exception {
txControl.required(() -> {
List<String> l = (List<String>) coord.getVariables()
.get(getClass());
PreparedStatement ps = connection.prepareStatement(
"Insert into TEST_TABLE values ( ? )");
l.stream().forEach(s -> {
try {
ps.setString(1, s);
ps.addBatch();
} catch (SQLException sqle) {
throw new RuntimeException(sqle);
}
});
return ps.executeBatch();
});
}
@Override
public void failed(Coordination arg0) throws Exception { }
}