| = Using the Coordinator to optimise Transactions |
| |
| The transaction control service provides a transactional scope around resource access, but sometimes it makes sense to delay doing this work until it can be batched efficiently together in a single transaction: |
| |
| == Bulk resource access |
| |
| In this case we simply do each insert in a separate transaction: |
| |
| public void persistMessage(String message) { |
| txControl.required(() -> { |
| PreparedStatement ps = connection.prepareStatement( |
| "Insert into TEST_TABLE values ( ? )"); |
| ps.setString(1, message); |
| return ps.executeUpdate(); |
| }); |
| } |
| |
| If called a large number of times from an external service: |
| |
| .... |
| List<String> messages = getMessages(); |
| |
| messages.stream() |
| .forEach(svc::persistMessage); |
| .... |
| |
| Then this code can be quite slow as the message list becomes large |
| |
| == The naive approach |
| |
| The obvious way to reduce overhead is to batch all of the inserts into a single transaction: |
| |
| .... |
| List<String> messages = getMessages(); |
| |
| txControl.required(() -> { |
| messages.stream() |
| .forEach(svc::persistMessage); |
| return null; |
| }); |
| .... |
| |
| This reuses the same physical connection each time, and it avoids repeated commits, so it should be faster right? |
| |
| Actually it turns out that this approach can be slower for some databases. |
| By building up a very large transaction it can actually slow down the rate at which data can be insterted. |
| |
| == Using the coordinator |
| |
| By adding in the Coordinator we can *dramatically* improve our performance: |
| |
| .... |
| public void persistMessage(String message) { |
| if(coordinator.addParticipant(this)) { |
| ((List<String>)coordinator.peek().getVariables() |
| .computeIfAbsent(getClass(), k -> new ArrayList<String>())) |
| .add(message); |
| } else { |
| txControl.required(() -> { |
| PreparedStatement ps = connection.prepareStatement( |
| "Insert into TEST_TABLE values ( ? )"); |
| ps.setString(1, message); |
| return ps.executeUpdate(); |
| }); |
| } |
| } |
| |
| @Override |
| public void ended(Coordination coord) throws Exception { |
| txControl.required(() -> { |
| List<String> l = (List<String>) coord.getVariables() |
| .get(getClass()); |
| |
| PreparedStatement ps = connection.prepareStatement( |
| "Insert into TEST_TABLE values ( ? )"); |
| |
| l.stream().forEach(s -> { |
| try { |
| ps.setString(1, s); |
| ps.addBatch(); |
| } catch (SQLException sqle) { |
| throw new RuntimeException(sqle); |
| } |
| }); |
| |
| return ps.executeBatch(); |
| }); |
| } |
| .... |
| |
| Now, if we do our bulk add inside a coordination: |
| |
| coordinator.begin("foo", MINUTES.toMillis(5)); |
| try { |
| messages.stream() |
| .forEach(this::persistMessage); |
| } finally { |
| coordinator.peek().end(); |
| } |
| |
| Then we find that it is *much* faster! |
| This is because we can make use of more efficient JDBC API, and because we can batch up a suitable number of inserts in a single transaction. |