blob: 461af92338271ab9c62ecaaefde3802156de988e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.cache.store.hibernate;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.cache.integration.CacheLoaderException;
import javax.cache.integration.CacheWriterException;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.store.CacheStore;
import org.apache.ignite.cache.store.CacheStoreAdapter;
import org.apache.ignite.cache.store.CacheStoreSession;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.marshaller.jdk.JdkMarshaller;
import org.apache.ignite.resources.CacheStoreSessionResource;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.resources.LoggerResource;
import org.apache.ignite.transactions.Transaction;
import org.hibernate.HibernateException;
import org.hibernate.Session;
import org.hibernate.SessionFactory;
import org.hibernate.SharedSessionContract;
import org.hibernate.cfg.Configuration;
import org.hibernate.resource.transaction.spi.TransactionStatus;
import org.jetbrains.annotations.Nullable;
/**
* {@link CacheStore} implementation backed by Hibernate. This implementation
* stores objects in underlying database in {@code BLOB} format.
* <h2 class="header">Configuration</h2>
* Either {@link #setSessionFactory(SessionFactory)} or
* {@link #setHibernateConfigurationPath(String)} or
* {@link #setHibernateProperties(Properties)} should be set.
* <p>
* If session factory is provided it should contain
* {@link CacheHibernateBlobStoreEntry} persistent class (via provided
* mapping file {@code GridCacheHibernateStoreEntry.hbm.xml} or by
* adding {@link CacheHibernateBlobStoreEntry} to annotated classes
* of session factory.
* <p>
* Path to hibernate configuration may be either an URL or a file path or
* a classpath resource. This configuration file should include provided
* mapping {@code GridCacheHibernateStoreEntry.hbm.xml} or include annotated
* class {@link CacheHibernateBlobStoreEntry}.
* <p>
* If hibernate properties are provided, mapping
* {@code GridCacheHibernateStoreEntry.hbm.xml} is included automatically.
* <p>
* Use {@link CacheHibernateBlobStoreFactory} factory to pass {@link CacheHibernateBlobStore} to {@link CacheConfiguration}.
*/
public class CacheHibernateBlobStore<K, V> extends CacheStoreAdapter<K, V> {
/**
* Default connection URL
* (value is <tt>jdbc:h2:mem:hibernateCacheStore;DB_CLOSE_DELAY=-1;DEFAULT_LOCK_TIMEOUT=5000</tt>).
*/
public static final String DFLT_CONN_URL = "jdbc:h2:mem:hibernateCacheStore;DB_CLOSE_DELAY=-1;" +
"DEFAULT_LOCK_TIMEOUT=5000";
/** Default show SQL property value (value is <tt>true</tt>). */
public static final String DFLT_SHOW_SQL = "true";
/** Default <tt>hibernate.hbm2ddl.auto</tt> property value (value is <tt>true</tt>). */
public static final String DFLT_HBM2DDL_AUTO = "update";
/** Default <tt>hibernate.connection.pool_size</tt> property value (value is <tt>64</tt>). */
public static final String DFLT_CONN_POOL_SIZE = "64";
/** Session attribute name. */
private static final String ATTR_SES = "HIBERNATE_STORE_SESSION";
/** Name of Hibarname mapping resource. */
private static final String MAPPING_RESOURCE =
"org/apache/ignite/cache/store/hibernate/CacheHibernateBlobStoreEntry.hbm.xml";
/** Marshaller. */
private static final Marshaller marsh = new JdkMarshaller();
/** Init guard. */
@GridToStringExclude
private final AtomicBoolean initGuard = new AtomicBoolean();
/** Init latch. */
@GridToStringExclude
private final CountDownLatch initLatch = new CountDownLatch(1);
/** Hibernate properties. */
@GridToStringExclude
private Properties hibernateProps;
/** Session factory. */
@GridToStringExclude
private SessionFactory sesFactory;
/** Path to hibernate configuration file. */
private String hibernateCfgPath;
/** Log. */
@LoggerResource
private IgniteLogger log;
/** Auto-injected store session. */
@CacheStoreSessionResource
private CacheStoreSession ses;
/** Ignite instance. */
@IgniteInstanceResource
private Ignite ignite;
/** {@inheritDoc} */
@SuppressWarnings({"unchecked", "RedundantTypeArguments"})
@Override public V load(K key) {
init();
Transaction tx = transaction();
if (log.isDebugEnabled())
log.debug("Store load [key=" + key + ", tx=" + tx + ']');
Session ses = session(tx);
try {
CacheHibernateBlobStoreEntry entry = (CacheHibernateBlobStoreEntry)
ses.get(CacheHibernateBlobStoreEntry.class, toBytes(key));
if (entry == null)
return null;
return fromBytes(entry.getValue());
}
catch (IgniteCheckedException | HibernateException e) {
rollback(ses, tx);
throw new CacheLoaderException("Failed to load value from cache store with key: " + key, e);
}
finally {
end(ses, tx);
}
}
/** {@inheritDoc} */
@Override public void write(javax.cache.Cache.Entry<? extends K, ? extends V> entry) {
init();
Transaction tx = transaction();
K key = entry.getKey();
V val = entry.getValue();
if (log.isDebugEnabled())
log.debug("Store put [key=" + key + ", val=" + val + ", tx=" + tx + ']');
if (val == null) {
delete(key);
return;
}
Session ses = session(tx);
try {
CacheHibernateBlobStoreEntry entry0 = new CacheHibernateBlobStoreEntry(toBytes(key), toBytes(val));
ses.saveOrUpdate(entry0);
}
catch (IgniteCheckedException | HibernateException e) {
rollback(ses, tx);
throw new CacheWriterException("Failed to put value to cache store [key=" + key + ", val" + val + "]", e);
}
finally {
end(ses, tx);
}
}
/** {@inheritDoc} */
@SuppressWarnings({"JpaQueryApiInspection", "JpaQlInspection"})
@Override public void delete(Object key) {
init();
Transaction tx = transaction();
if (log.isDebugEnabled())
log.debug("Store remove [key=" + key + ", tx=" + tx + ']');
Session ses = session(tx);
try {
Object obj = ses.get(CacheHibernateBlobStoreEntry.class, toBytes(key));
if (obj != null)
ses.delete(obj);
}
catch (IgniteCheckedException | HibernateException e) {
rollback(ses, tx);
throw new CacheWriterException("Failed to remove value from cache store with key: " + key, e);
}
finally {
end(ses, tx);
}
}
/**
* Rolls back hibernate session.
*
* @param ses Hibernate session.
* @param tx Cache ongoing transaction.
*/
private void rollback(SharedSessionContract ses, Transaction tx) {
// Rollback only if there is no cache transaction,
// otherwise sessionEnd() will do all required work.
if (tx == null) {
org.hibernate.Transaction hTx = ses.getTransaction();
if (hTx != null && hTx.getStatus().canRollback())
hTx.rollback();
}
}
/**
* Ends hibernate session.
*
* @param ses Hibernate session.
* @param tx Cache ongoing transaction.
*/
private void end(Session ses, Transaction tx) {
// Commit only if there is no cache transaction,
// otherwise sessionEnd() will do all required work.
if (tx == null) {
org.hibernate.Transaction hTx = ses.getTransaction();
if (hTx != null && hTx.getStatus() == TransactionStatus.ACTIVE)
hTx.commit();
ses.close();
}
}
/** {@inheritDoc} */
@Override public void sessionEnd(boolean commit) {
init();
Transaction tx = transaction();
Map<String, Session> props = session().properties();
Session ses = props.remove(ATTR_SES);
if (ses != null) {
org.hibernate.Transaction hTx = ses.getTransaction();
if (hTx != null) {
try {
if (commit) {
ses.flush();
hTx.commit();
}
else
hTx.rollback();
if (log.isDebugEnabled())
log.debug("Transaction ended [xid=" + tx.xid() + ", commit=" + commit + ']');
}
catch (HibernateException e) {
throw new CacheWriterException("Failed to end transaction [xid=" + tx.xid() +
", commit=" + commit + ']', e);
}
finally {
ses.close();
}
}
}
}
/**
* Gets Hibernate session.
*
* @param tx Cache transaction.
* @return Session.
*/
Session session(@Nullable Transaction tx) {
Session ses;
if (tx != null) {
Map<String, Session> props = session().properties();
ses = props.get(ATTR_SES);
if (ses == null) {
ses = sesFactory.openSession();
ses.beginTransaction();
// Store session in transaction metadata, so it can be accessed
// for other operations on the same transaction.
props.put(ATTR_SES, ses);
if (log.isDebugEnabled())
log.debug("Hibernate session open [ses=" + ses + ", tx=" + tx.xid() + "]");
}
}
else {
ses = sesFactory.openSession();
ses.beginTransaction();
}
return ses;
}
/**
* Sets session factory.
*
* @param sesFactory Session factory.
*/
public void setSessionFactory(SessionFactory sesFactory) {
this.sesFactory = sesFactory;
}
/**
* Sets hibernate configuration path.
* <p>
* This may be either URL or file path or classpath resource.
*
* @param hibernateCfgPath URL or file path or classpath resource
* pointing to hibernate configuration XML file.
*/
public void setHibernateConfigurationPath(String hibernateCfgPath) {
this.hibernateCfgPath = hibernateCfgPath;
}
/**
* Sets Hibernate properties.
*
* @param hibernateProps Hibernate properties.
*/
public void setHibernateProperties(Properties hibernateProps) {
this.hibernateProps = hibernateProps;
}
/**
* Initializes store.
*
* @throws IgniteException If failed to initialize.
*/
private void init() throws IgniteException {
if (initGuard.compareAndSet(false, true)) {
if (log.isDebugEnabled())
log.debug("Initializing cache store.");
try {
if (sesFactory != null)
// Session factory has been provided - nothing to do.
return;
if (!F.isEmpty(hibernateCfgPath)) {
try {
URL url = new URL(hibernateCfgPath);
sesFactory = new Configuration().configure(url).buildSessionFactory();
if (log.isDebugEnabled())
log.debug("Configured session factory using URL: " + url);
// Session factory has been successfully initialized.
return;
}
catch (MalformedURLException e) {
if (log.isDebugEnabled())
log.debug("Caught malformed URL exception: " + e.getMessage());
}
// Provided path is not a valid URL. File?
File cfgFile = new File(hibernateCfgPath);
if (cfgFile.exists()) {
sesFactory = new Configuration().configure(cfgFile).buildSessionFactory();
if (log.isDebugEnabled())
log.debug("Configured session factory using file: " + hibernateCfgPath);
// Session factory has been successfully initialized.
return;
}
// Provided path is not a file. Classpath resource?
sesFactory = new Configuration().configure(hibernateCfgPath).buildSessionFactory();
if (log.isDebugEnabled())
log.debug("Configured session factory using classpath resource: " + hibernateCfgPath);
}
else {
if (hibernateProps == null) {
U.warn(log, "No Hibernate configuration has been provided for store (will use default).");
hibernateProps = new Properties();
hibernateProps.setProperty("hibernate.connection.url", DFLT_CONN_URL);
hibernateProps.setProperty("hibernate.show_sql", DFLT_SHOW_SQL);
hibernateProps.setProperty("hibernate.hbm2ddl.auto", DFLT_HBM2DDL_AUTO);
hibernateProps.setProperty("hibernate.connection.pool_size", DFLT_CONN_POOL_SIZE);
}
Configuration cfg = new Configuration();
cfg.setProperties(hibernateProps);
assert resourceAvailable(MAPPING_RESOURCE) : MAPPING_RESOURCE;
cfg.addResource(MAPPING_RESOURCE);
sesFactory = cfg.buildSessionFactory();
if (log.isDebugEnabled())
log.debug("Configured session factory using properties: " + hibernateProps);
}
}
catch (HibernateException e) {
throw new IgniteException("Failed to initialize store.", e);
}
finally {
initLatch.countDown();
}
}
else if (initLatch.getCount() > 0) {
try {
U.await(initLatch);
}
catch (IgniteInterruptedCheckedException e) {
throw new IgniteException(e);
}
}
if (sesFactory == null)
throw new IgniteException("Cache store was not properly initialized.");
}
/**
* Checks availability of a classpath resource.
*
* @param name Resource name.
* @return {@code true} if resource is available and ready for read, {@code false} otherwise.
*/
private boolean resourceAvailable(String name) {
InputStream cfgStream = Thread.currentThread().getContextClassLoader().getResourceAsStream(name);
if (cfgStream == null) {
log.error("Classpath resource not found: " + name);
return false;
}
try {
// Read a single byte to force actual content access by JVM.
cfgStream.read();
return true;
}
catch (IOException e) {
log.error("Failed to read classpath resource: " + name, e);
return false;
}
finally {
U.close(cfgStream, log);
}
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(CacheHibernateBlobStore.class, this);
}
/**
* Serialize object to byte array using marshaller.
*
* @param obj Object to convert to byte array.
* @return Byte array.
* @throws IgniteCheckedException If failed to convert.
*/
protected byte[] toBytes(Object obj) throws IgniteCheckedException {
return U.marshal(marsh, obj);
}
/**
* Deserialize object from byte array using marshaller.
*
* @param bytes Bytes to deserialize.
* @param <X> Result object type.
* @return Deserialized object.
* @throws IgniteCheckedException If failed.
*/
protected <X> X fromBytes(byte[] bytes) throws IgniteCheckedException {
if (bytes == null || bytes.length == 0)
return null;
return U.unmarshal(marsh, bytes, getClass().getClassLoader());
}
/**
* @return Current transaction.
*/
@Nullable private Transaction transaction() {
CacheStoreSession ses = session();
return ses != null ? ses.transaction() : null;
}
/**
* @return Store session.
*/
private CacheStoreSession session() {
return ses;
}
}