| Title: Using the Coordinator to optimise Transactions |
| Notice: Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| . |
| http://www.apache.org/licenses/LICENSE-2.0 |
| . |
| Unless required by applicable law or agreed to in writing, |
| software distributed under the License is distributed on an |
| "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| KIND, either express or implied. See the License for the |
| specific language governing permissions and limitations |
| under the License. |
| |
| #Using the Coordinator to optimise Transactions |
| |
| The transaction control service provides a transactional scope around resource access, but sometimes it |
| makes sense to delay doing this work until it can be batched efficiently together in a single transaction: |
| |
| ##Bulk resource access |
| |
| In this case we simply do each insert in a separate transaction: |
| |
| public void persistMessage(String message) { |
| txControl.required(() -> { |
| PreparedStatement ps = connection.prepareStatement( |
| "Insert into TEST_TABLE values ( ? )"); |
| ps.setString(1, message); |
| return ps.executeUpdate(); |
| }); |
| } |
| |
| If called a large number of times from an external service: |
| |
| List<String> messages = getMessages(); |
| |
| messages.stream() |
| .forEach(svc::persistMessage); |
| |
| Then this code can be quite slow as the message list becomes large |
| |
| ## The naive approach |
| |
| The obvious way to reduce overhead is to batch all of the inserts into a single transaction: |
| |
| List<String> messages = getMessages(); |
| |
| txControl.required(() -> { |
| messages.stream() |
| .forEach(svc::persistMessage); |
| return null; |
| }); |
| |
| This reuses the same physical connection each time, and it avoids repeated commits, so it should be faster |
| right? |
| |
| Actually it turns out that this approach can be slower for some databases. By building up a very large |
| transaction it can actually slow down the rate at which data can be insterted. |
| |
| ## Using the coordinator |
| |
| By adding in the Coordinator we can **dramatically** improve our performance: |
| |
| public void persistMessage(String message) { |
| if(coordinator.addParticipant(this)) { |
| ((List<String>)coordinator.peek().getVariables() |
| .computeIfAbsent(getClass(), k -> new ArrayList<String>())) |
| .add(message); |
| } else { |
| txControl.required(() -> { |
| PreparedStatement ps = connection.prepareStatement( |
| "Insert into TEST_TABLE values ( ? )"); |
| ps.setString(1, message); |
| return ps.executeUpdate(); |
| }); |
| } |
| } |
| |
| @Override |
| public void ended(Coordination coord) throws Exception { |
| txControl.required(() -> { |
| List<String> l = (List<String>) coord.getVariables() |
| .get(getClass()); |
| |
| PreparedStatement ps = connection.prepareStatement( |
| "Insert into TEST_TABLE values ( ? )"); |
| |
| l.stream().forEach(s -> { |
| try { |
| ps.setString(1, s); |
| ps.addBatch(); |
| } catch (SQLException sqle) { |
| throw new RuntimeException(sqle); |
| } |
| }); |
| |
| return ps.executeBatch(); |
| }); |
| } |
| |
| Now, if we do our bulk add inside a coordination: |
| |
| coordinator.begin("foo", MINUTES.toMillis(5)); |
| try { |
| messages.stream() |
| .forEach(this::persistMessage); |
| } finally { |
| coordinator.peek().end(); |
| } |
| |
| Then we find that it is **much** faster! This is because we can make use of more efficient JDBC API, and |
| because we can batch up a suitable number of inserts in a single transaction. |