blob: 605910c9ea864a9745d7e4f41dbc5dec658f409f [file] [log] [blame]
Title: Using the Coordinator to optimise Transactions Notice: Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.
See the NOTICE file distributed with this work for additional information regarding copyright ownership.
The ASF licenses this file to you under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at . http://www.apache.org/licenses/LICENSE-2.0 . Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and limitations under the License.
#Using the Coordinator to optimise Transactions
The transaction control service provides a transactional scope around resource access, but sometimes it makes sense to delay doing this work until it can be batched efficiently together in a single transaction:
##Bulk resource access
In this case we simply do each insert in a separate transaction:
public void persistMessage(String message) {
txControl.required(() -> {
PreparedStatement ps = connection.prepareStatement(
"Insert into TEST_TABLE values ( ? )");
ps.setString(1, message);
return ps.executeUpdate();
});
}
If called a large number of times from an external service:
....
List<String> messages = getMessages();
messages.stream()
.forEach(svc::persistMessage);
....
Then this code can be quite slow as the message list becomes large
== The naive approach
The obvious way to reduce overhead is to batch all of the inserts into a single transaction:
....
List<String> messages = getMessages();
txControl.required(() -> {
messages.stream()
.forEach(svc::persistMessage);
return null;
});
....
This reuses the same physical connection each time, and it avoids repeated commits, so it should be faster right?
Actually it turns out that this approach can be slower for some databases.
By building up a very large transaction it can actually slow down the rate at which data can be insterted.
== Using the coordinator
By adding in the Coordinator we can *dramatically* improve our performance:
....
public void persistMessage(String message) {
if(coordinator.addParticipant(this)) {
((List<String>)coordinator.peek().getVariables()
.computeIfAbsent(getClass(), k -> new ArrayList<String>()))
.add(message);
} else {
txControl.required(() -> {
PreparedStatement ps = connection.prepareStatement(
"Insert into TEST_TABLE values ( ? )");
ps.setString(1, message);
return ps.executeUpdate();
});
}
}
@Override
public void ended(Coordination coord) throws Exception {
txControl.required(() -> {
List<String> l = (List<String>) coord.getVariables()
.get(getClass());
PreparedStatement ps = connection.prepareStatement(
"Insert into TEST_TABLE values ( ? )");
l.stream().forEach(s -> {
try {
ps.setString(1, s);
ps.addBatch();
} catch (SQLException sqle) {
throw new RuntimeException(sqle);
}
});
return ps.executeBatch();
});
}
....
Now, if we do our bulk add inside a coordination:
coordinator.begin("foo", MINUTES.toMillis(5));
try {
messages.stream()
.forEach(this::persistMessage);
} finally {
coordinator.peek().end();
}
Then we find that it is *much* faster!
This is because we can make use of more efficient JDBC API, and because we can batch up a suitable number of inserts in a single transaction.