| --- |
| title: Code Examples |
| --- |
| |
| <!-- |
| Licensed to the Apache Software Foundation (ASF) under one or more |
| contributor license agreements. See the NOTICE file distributed with |
| this work for additional information regarding copyright ownership. |
| The ASF licenses this file to You under the Apache License, Version 2.0 |
| (the "License"); you may not use this file except in compliance with |
| the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| --> |
| |
| <a id="transaction-example"></a> |
| |
| An application can run a transaction directly or |
| invoke a function which contains a transaction. |
| This section illustrates these two use cases with code fragments |
| that demonstrate the proper way to program a transaction. |
| |
| An expected use case operates on two regions within a transaction. |
| For performance purposes the |
| <%=vars.product_name%> transaction implementation requires that region entries |
| of partitioned regions be colocated. |
| See [Custom-Partitioning and Colocating Data](../partitioned_regions/overview_custom_partitioning_and_data_colocation.html) for details on how to colocate region entries. |
| |
| |
| ## Transaction within an Application |
| |
| An application/client uses the `CacheTransactionManager` API. |
| This most basic code fragment shows the structure of a transaction, |
| with its `begin` to start the transaction, `commit` to end the transaction, |
| and handling of exceptions that these methods may throw. |
| |
| ``` pre |
| CacheTransactionManager txManager = |
| cache.getCacheTransactionManager(); |
| |
| try { |
| txManager.begin(); |
| // ... do transactional, region operations |
| txManager.commit(); |
| } catch (CommitConflictException conflict) { |
| // ... do necessary work for a transaction that failed on commit |
| } finally { |
| // All other exceptions will be handled by the caller. |
| // Examples of some exceptions: the data is not colocated, a rebalance |
| // interfered with the transaction, or the server is gone. |
| // Any exception thrown by a method other than commit() needs |
| // to do a rollback to avoid leaking the transaction state. |
| if(txManager.exists()) { |
| txManager.rollback(); |
| } |
| } |
| ``` |
| |
| More details of a transaction appear in this next application/client |
| code fragment example. |
| In this typical transaction, |
| the put operations must be atomic and two regions are involved. |
| |
| In this transaction, a customer's purchase is recorded. |
| The `cash` region contains each customer's cash balance |
| available for making trades. |
| The `trades` region records each customer's balance spent on trades. |
| |
| If there is a conflict upon commit of the transaction, |
| an exception is thrown, and this example tries again. |
| |
| ``` |
| // inputs needed for this transaction; shown as variables for simplicity |
| final String customer = "Customer1"; |
| final Integer purchase = 1000; |
| |
| // region set up shown to promote understanding |
| Cache cache = new CacheFactory().create(); |
| Pool pool = PoolManager.createFactory() |
| .addLocator("localhost", LOCATOR_PORT) |
| .create("pool-name"); |
| Region<String, Integer> cash = |
| cache.createClientRegionFactory(ClientRegionShortcut.PROXY) |
| .setPoolName(pool.getName()) |
| .create("cash"); |
| Region<String, Integer> trades = |
| cache.createClientRegionFactory(ClientRegionShortcut.PROXY) |
| .setPoolName(pool.getName()) |
| .create("trades"); |
| |
| // transaction code |
| CacheTransactionManager txManager = cache.getCacheTransactionManager(); |
| boolean retryTransaction = false; |
| do { |
| try { |
| txManager.begin(); |
| |
| // Subtract out the cost of the trade for this customer's balance |
| Integer cashBalance = cash.get(customer); |
| Integer newBalance = (cashBalance != null ? cashBalance : 0) - purchase; |
| cash.put(customer, newBalance); |
| |
| // Add in the cost of the trade for this customer |
| Integer tradeBalance = trades.get(customer); |
| newBalance = (tradeBalance != null ? tradeBalance : 0) + purchase; |
| trades.put(customer, newBalance); |
| |
| txManager.commit(); |
| retryTransaction = false; |
| } |
| catch (CommitConflictException conflict) { |
| // entry value changed causing a conflict for this customer, so try again |
| retryTransaction = true; |
| } finally { |
| // All other exceptions will be handled by the caller. |
| // Any exception thrown by a method other than commit() needs |
| // to do a rollback to avoid leaking the transaction state. |
| if(txManager.exists()) { |
| txManager.rollback(); |
| } |
| } |
| |
| } while (retryTransaction); |
| ``` |
| |
| Design transactions such that any get operations are within the transaction. |
| This causes those entries to be part of the transactional state, |
| which is desired such that intersecting transactions can be detected |
| and signal commit conficts. |
| |
| ## Transaction within a Function |
| |
| A transaction may be embedded in a function. |
| The application invokes the function, |
| and the function contains the transaction that does the `begin`, |
| the region operations, and the `commit` or `rollback`. |
| |
| This use of a function can have performance benefits. |
| The performance benefit results from both the function |
| and the region data residing on servers. |
| As the function invokes region operations, |
| those operations on region entries stay on the server, |
| so there is no network round trip time to do get or put |
| operations on region data. |
| |
| This function example accomplishes atomic updates on a single |
| region representing the quantity of products available in inventory. |
| Doing this in a transaction prevents double allocating inventory for |
| two orders placed simultaneously. |
| |
| |
| ``` pre |
| /** |
| * Atomically reduce inventory quantity |
| */ |
| public class TransactionalFunction extends Function { |
| |
| /** |
| * Returns true if the function had the requested quantity of |
| * inventory and successfully completed the transaction to |
| * record the reduced inventory that fulfills the order. |
| */ |
| @Override |
| public void execute(FunctionContext context) { |
| RegionFunctionContext rfc = (RegionFunctionContext) context; |
| Region<ProductId, Integer> inventoryRegion = rfc.getDataSet(); |
| |
| CacheTransactionManager |
| txManager = context.getCache().getCacheTransactionManager(); |
| |
| // single argument will be a ProductId and a quantity |
| ProductRequest request = (ProductRequest) rfc.getArguments(); |
| ProductId productRequested = request.getProductId(); |
| Integer qtyRequested = request.getQuantity(); |
| |
| boolean success = false; |
| |
| do { |
| boolean commitConflict = false; |
| try { |
| txManager.begin(); |
| |
| Integer qtyAvailable = inventoryRegion.get(productRequested); |
| if (qtyAvailable >= qtyRequested) { |
| // enough inventory is available, so process request |
| Integer remaining = qtyAvailable - qtyRequested; |
| inventoryRegion.put(productRequested, remaining); |
| txManager.commit(); |
| success = true; |
| } |
| |
| } catch (CommitConflictException conflict) { |
| // retry transaction, as another request on this same key succeeded, |
| // so this transaction attempt failed |
| commitConflict = true; |
| } finally { |
| // All other exceptions will be handled by the caller; however, |
| // any exception thrown by a method other than commit() needs |
| // to do a rollback to avoid leaking the transaction state. |
| if(txManager.exists()) { |
| txManager.rollback(); |
| } |
| } |
| |
| } while (commitConflict); |
| |
| context.getResultSender().lastResult(success); |
| } |
| |
| @Override |
| public String getId() { |
| return "TxFunction"; |
| } |
| |
| /** |
| * Returning true causes this function to execute on the server |
| * that holds the primary bucket for the given key. It can save a |
| * network hop from the secondary to the primary. |
| */ |
| @Override |
| public boolean optimizeForWrite() { |
| return true; |
| } |
| } |
| ``` |
| |
| The application-side details on function implementation are |
| not covered in this example. |
| The application sets up the function context and the argument. |
| See the section on [Function Execution](../function_exec/chapter_overview.html) for details on functions. |
| |
| The function implementation needs to catch the commit conflict exception |
| such that it can retry the entire transaction. |
| The exception only occurs if another request for the same product |
| intersected with this one, |
| and that other request's transaction committed first. |
| |
| The `optimizeForWrite` method is defined to cause the system to |
| execute the function on the server that holds the primary bucket |
| for the given key. |
| It can save a network hop from the secondary to the primary. |
| |
| Note that the variable `qtyAvailable` is a reference, |
| because the `Region.get` operation returns a reference |
| within this server-side code. |
| Read [Region Operations Return References](design_considerations.html#copy-on-read-transactions) |
| for details and how to work around the |
| implications of a reference as a return value when working with server code. |
| |