| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.processors.cache; |
| |
| import java.io.ByteArrayInputStream; |
| import java.io.ByteArrayOutputStream; |
| import java.io.ObjectInputStream; |
| import java.io.ObjectOutputStream; |
| import java.io.StringReader; |
| import java.sql.Connection; |
| import java.sql.PreparedStatement; |
| import java.sql.ResultSet; |
| import java.sql.SQLException; |
| import java.sql.Statement; |
| import java.util.Map; |
| import javax.cache.Cache; |
| import javax.cache.configuration.Factory; |
| import javax.cache.integration.CacheLoaderException; |
| import javax.cache.integration.CacheWriterException; |
| import org.apache.ignite.IgniteCheckedException; |
| import org.apache.ignite.IgniteException; |
| import org.apache.ignite.cache.store.CacheStore; |
| import org.apache.ignite.cache.store.CacheStoreAdapter; |
| import org.apache.ignite.cache.store.CacheStoreSession; |
| import org.apache.ignite.cache.store.CacheStoreSessionListener; |
| import org.apache.ignite.cache.store.jdbc.CacheJdbcStoreSessionListener; |
| import org.apache.ignite.configuration.CacheConfiguration; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.lang.IgniteBiInClosure; |
| import org.apache.ignite.resources.CacheStoreSessionResource; |
| import org.h2.jdbcx.JdbcConnectionPool; |
| import org.h2.tools.RunScript; |
| import org.h2.tools.Server; |
| |
| /** |
| * {@link TestCacheStoreStrategy} backed by H2 in-memory database. |
| */ |
| public class H2CacheStoreStrategy implements TestCacheStoreStrategy { |
| /** Pool to get {@link Connection}s from. */ |
| private final JdbcConnectionPool dataSrc; |
| |
| /** */ |
| private final int port; |
| |
| /** Script that creates CACHE table. */ |
| private static final String CREATE_CACHE_TABLE = |
| "create table if not exists CACHE(k binary not null, v binary not null, PRIMARY KEY(k));"; |
| |
| /** Script that creates STATS table. */ |
| private static final String CREATE_STATS_TABLES = |
| "create table if not exists READS(id bigint auto_increment);\n" + |
| "create table if not exists WRITES(id bigint auto_increment);\n" + |
| "create table if not exists REMOVES(id bigint auto_increment);"; |
| |
| /** Script that populates STATS table */ |
| private static final String POPULATE_STATS_TABLE = |
| "delete from READS;\n" + |
| "delete from WRITES;\n" + |
| "delete from REMOVES;"; |
| |
| /** |
| * @throws IgniteCheckedException If failed. |
| */ |
| public H2CacheStoreStrategy() throws IgniteCheckedException { |
| Server srv = null; |
| |
| try { |
| srv = Server.createTcpServer().start(); |
| |
| port = srv.getPort(); |
| |
| dataSrc = H2CacheStoreSessionListenerFactory.createDataSource(port); |
| |
| try (Connection conn = connection()) { |
| RunScript.execute(conn, new StringReader(CREATE_CACHE_TABLE)); |
| RunScript.execute(conn, new StringReader(CREATE_STATS_TABLES)); |
| RunScript.execute(conn, new StringReader(POPULATE_STATS_TABLE)); |
| } |
| } |
| catch (SQLException e) { |
| throw new IgniteCheckedException("Failed to set up cache store strategy" + |
| (srv == null ? "" : ": " + srv.getStatus()), e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int getReads() { |
| return queryStats("reads"); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int getWrites() { |
| return queryStats("writes"); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int getRemoves() { |
| return queryStats("removes"); |
| } |
| |
| /** |
| * @param tbl Table name. |
| * @return Update statistics. |
| */ |
| private int queryStats(String tbl) { |
| return querySingleInt("select count(*) from " + tbl, "Failed to query store stats [table=" + tbl + "]"); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int getStoreSize() { |
| return querySingleInt("select count(*) from CACHE;", "Failed to query number of rows from CACHE table"); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void resetStore() { |
| try (Connection conn = connection()) { |
| RunScript.execute(conn, new StringReader("delete from CACHE;")); |
| RunScript.execute(conn, new StringReader(POPULATE_STATS_TABLE)); |
| } |
| catch (SQLException e) { |
| throw new IgniteException(e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void putToStore(Object key, Object val) { |
| Connection conn = null; |
| try { |
| conn = connection(); |
| H2CacheStore.putToDb(conn, key, val); |
| } |
| catch (SQLException e) { |
| throw new IgniteException(e); |
| } |
| finally { |
| U.closeQuiet(conn); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void putAllToStore(Map<?, ?> data) { |
| Connection conn = null; |
| PreparedStatement stmt = null; |
| try { |
| conn = connection(); |
| stmt = conn.prepareStatement(H2CacheStore.MERGE); |
| for (Map.Entry<?, ?> e : data.entrySet()) { |
| stmt.setBinaryStream(1, new ByteArrayInputStream(H2CacheStore.serialize(e.getKey()))); |
| stmt.setBinaryStream(2, new ByteArrayInputStream(H2CacheStore.serialize(e.getValue()))); |
| stmt.addBatch(); |
| } |
| stmt.executeBatch(); |
| } |
| catch (SQLException e) { |
| throw new IgniteException(e); |
| } |
| finally { |
| U.closeQuiet(stmt); |
| U.closeQuiet(conn); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Object getFromStore(Object key) { |
| Connection conn = null; |
| try { |
| conn = connection(); |
| return H2CacheStore.getFromDb(conn, key); |
| } |
| catch (SQLException e) { |
| throw new IgniteException(e); |
| } |
| finally { |
| U.closeQuiet(conn); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void removeFromStore(Object key) { |
| Connection conn = null; |
| try { |
| conn = connection(); |
| H2CacheStore.removeFromDb(conn, key); |
| } |
| catch (SQLException e) { |
| throw new IgniteException(e); |
| } |
| finally { |
| U.closeQuiet(conn); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean isInStore(Object key) { |
| return getFromStore(key) != null; |
| } |
| |
| /** |
| * @return New {@link Connection} from {@link #dataSrc} |
| * @throws SQLException if failed |
| */ |
| private Connection connection() throws SQLException { |
| return dataSrc.getConnection(); |
| } |
| |
| /** |
| * Retrieves single int value from {@link ResultSet} returned by given query. |
| * |
| * @param qry Query string (fully populated, with params). |
| * @param errorMsg Message for {@link IgniteException} to bear in case of failure. |
| * @return Requested value |
| */ |
| private int querySingleInt(String qry, String errorMsg) { |
| Connection conn = null; |
| PreparedStatement stmt = null; |
| ResultSet rs = null; |
| try { |
| conn = connection(); |
| stmt = conn.prepareStatement(qry); |
| rs = stmt.executeQuery(); |
| if (rs.next()) |
| return rs.getInt(1); |
| else |
| throw new IgniteException(errorMsg); |
| } |
| catch (SQLException e) { |
| throw new IgniteException(e); |
| } |
| finally { |
| U.closeQuiet(rs); |
| U.closeQuiet(stmt); |
| U.closeQuiet(conn); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void updateCacheConfiguration(CacheConfiguration<Object, Object> cfg) { |
| cfg.setCacheStoreSessionListenerFactories(new H2CacheStoreSessionListenerFactory(port)); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Factory<? extends CacheStore<Object, Object>> getStoreFactory() { |
| return new H2StoreFactory(); |
| } |
| |
| /** Serializable H2 backed cache store factory. */ |
| public static class H2StoreFactory implements Factory<CacheStore<Object, Object>> { |
| /** {@inheritDoc} */ |
| @Override public CacheStore<Object, Object> create() { |
| return new H2CacheStore(); |
| } |
| } |
| |
| /** Serializable {@link Factory} producing H2 backed {@link CacheStoreSessionListener}s. */ |
| public static class H2CacheStoreSessionListenerFactory implements Factory<CacheStoreSessionListener> { |
| /** */ |
| private int port; |
| |
| /** |
| * @param port Port. |
| */ |
| public H2CacheStoreSessionListenerFactory(int port) { |
| this.port = port; |
| } |
| |
| /** |
| * @return Connection pool |
| */ |
| static JdbcConnectionPool createDataSource(int port) { |
| JdbcConnectionPool pool = JdbcConnectionPool.create("jdbc:h2:tcp://localhost:" + port + |
| "/mem:TestDb;LOCK_MODE=0", "sa", ""); |
| |
| pool.setMaxConnections(Integer.getInteger("H2_JDBC_CONNECTIONS", 100)); |
| return pool; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public CacheStoreSessionListener create() { |
| CacheJdbcStoreSessionListener lsnr = new CacheJdbcStoreSessionListener(); |
| lsnr.setDataSource(createDataSource(port)); |
| return lsnr; |
| } |
| } |
| |
| /** H2 backed {@link CacheStoreAdapter} implementations */ |
| public static class H2CacheStore extends CacheStoreAdapter<Object, Object> { |
| /** Store session */ |
| @CacheStoreSessionResource |
| private CacheStoreSession ses; |
| |
| /** Template for an insert statement */ |
| private static final String MERGE = "merge into CACHE(k, v) values(?, ?);"; |
| |
| /** {@inheritDoc} */ |
| @Override public void loadCache(IgniteBiInClosure<Object, Object> clo, Object... args) { |
| Connection conn = ses.attachment(); |
| assert conn != null; |
| |
| Statement stmt = null; |
| ResultSet rs = null; |
| try { |
| stmt = conn.createStatement(); |
| rs = stmt.executeQuery("select * from CACHE"); |
| while (rs.next()) |
| clo.apply(deserialize(rs.getBytes(1)), deserialize(rs.getBytes(2))); |
| } |
| catch (SQLException e) { |
| throw new IgniteException(e); |
| } |
| finally { |
| U.closeQuiet(rs); |
| U.closeQuiet(stmt); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Object load(Object key) throws CacheLoaderException { |
| try { |
| Connection conn = ses.attachment(); |
| Object res = getFromDb(conn, key); |
| updateStats("reads"); |
| return res; |
| } |
| catch (SQLException e) { |
| throw new CacheLoaderException("Failed to load object [key=" + key + ']', e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void write(Cache.Entry<?, ?> entry) throws CacheWriterException { |
| try { |
| Connection conn = ses.attachment(); |
| putToDb(conn, entry.getKey(), entry.getValue()); |
| updateStats("writes"); |
| } |
| catch (SQLException e) { |
| throw new CacheWriterException("Failed to write object [key=" + entry.getKey() + ", " + |
| "val=" + entry.getValue() + ']', e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void delete(Object key) throws CacheWriterException { |
| try { |
| Connection conn = ses.attachment(); |
| removeFromDb(conn, key); |
| updateStats("removes"); |
| } |
| catch (SQLException e) { |
| throw new CacheWriterException("Failed to delete object [key=" + key + ']', e); |
| } |
| } |
| |
| /** |
| * Selects from H2 and deserialize from bytes the value pointed by key. |
| * |
| * @param conn {@link Connection} to use. |
| * @param key Key to look for. |
| * @return Stored object or null if the key is missing from DB. |
| * @throws SQLException If failed. |
| */ |
| static Object getFromDb(Connection conn, Object key) throws SQLException { |
| PreparedStatement stmt = null; |
| ResultSet rs = null; |
| try { |
| stmt = conn.prepareStatement("select v from CACHE where k = ?"); |
| stmt.setBinaryStream(1, new ByteArrayInputStream(H2CacheStore.serialize(key))); |
| rs = stmt.executeQuery(); |
| return rs.next() ? H2CacheStore.deserialize(rs.getBytes(1)) : null; |
| } |
| finally { |
| U.closeQuiet(rs); |
| U.closeQuiet(stmt); |
| } |
| } |
| |
| /** |
| * Puts key-value pair to H2. |
| * |
| * @param conn {@link Connection} to use. |
| * @param key Key. |
| * @param val Value. |
| * @throws SQLException If failed. |
| */ |
| static void putToDb(Connection conn, Object key, Object val) throws SQLException { |
| PreparedStatement stmt = null; |
| try { |
| stmt = conn.prepareStatement(H2CacheStore.MERGE); |
| stmt.setBinaryStream(1, new ByteArrayInputStream(H2CacheStore.serialize(key))); |
| stmt.setBinaryStream(2, new ByteArrayInputStream(H2CacheStore.serialize(val))); |
| stmt.executeUpdate(); |
| } |
| finally { |
| U.closeQuiet(stmt); |
| } |
| } |
| |
| /** |
| * Removes given key and its value from H2. |
| * |
| * @param conn {@link Connection} to invoke query upon. |
| * @param key Key to remove. |
| * @throws SQLException if failed. |
| */ |
| static void removeFromDb(Connection conn, Object key) throws SQLException { |
| PreparedStatement stmt = null; |
| try { |
| stmt = conn.prepareStatement("delete from CACHE where k = ?"); |
| stmt.setBinaryStream(1, new ByteArrayInputStream(H2CacheStore.serialize(key))); |
| stmt.executeUpdate(); |
| } |
| finally { |
| U.closeQuiet(stmt); |
| } |
| } |
| |
| /** |
| * Increments stored stats for given operation. |
| * |
| * @param tblName Table name |
| */ |
| private void updateStats(String tblName) { |
| Connection conn = ses.attachment(); |
| assert conn != null; |
| Statement stmt = null; |
| try { |
| stmt = conn.createStatement(); |
| stmt.executeUpdate("insert into " + tblName + " default values"); |
| } |
| catch (SQLException e) { |
| throw new IgniteException("Failed to update H2 store usage stats", e); |
| } |
| finally { |
| U.closeQuiet(stmt); |
| } |
| } |
| |
| /** |
| * Turns given arbitrary object to byte array. |
| * |
| * @param obj Object to serialize |
| * @return Bytes representation of given object. |
| */ |
| static byte[] serialize(Object obj) { |
| try (ByteArrayOutputStream b = new ByteArrayOutputStream()) { |
| try (ObjectOutputStream o = new ObjectOutputStream(b)) { |
| o.writeObject(obj); |
| } |
| return b.toByteArray(); |
| } |
| catch (Exception e) { |
| throw new IgniteException("Failed to serialize object to byte array [obj=" + obj, e); |
| } |
| } |
| |
| /** |
| * Deserializes an object from its byte array representation. |
| * |
| * @param bytes Byte array representation of the object. |
| * @return Deserialized object. |
| */ |
| public static Object deserialize(byte[] bytes) { |
| try (ByteArrayInputStream b = new ByteArrayInputStream(bytes)) { |
| try (ObjectInputStream o = new ObjectInputStream(b)) { |
| return o.readObject(); |
| } |
| } |
| catch (Exception e) { |
| throw new IgniteException("Failed to deserialize object from byte array", e); |
| } |
| } |
| } |
| } |