blob: 53188839dc57c93ce8667027d6e896e324410a1b [file] [log] [blame]
package org.apache.ignite.spring.sessions;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.cache.configuration.CacheEntryListenerConfiguration;
import javax.cache.configuration.Factory;
import javax.cache.event.CacheEntryCreatedListener;
import javax.cache.event.CacheEntryEvent;
import javax.cache.event.CacheEntryEventFilter;
import javax.cache.event.CacheEntryExpiredListener;
import javax.cache.event.CacheEntryListener;
import javax.cache.event.CacheEntryListenerException;
import javax.cache.event.CacheEntryRemovedListener;
import javax.cache.expiry.TouchedExpiryPolicy;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.query.FieldsQueryCursor;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.annotations.QuerySqlField;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.GridDirectTransient;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.session.DelegatingIndexResolver;
import org.springframework.session.FindByIndexNameSessionRepository;
import org.springframework.session.FlushMode;
import org.springframework.session.IndexResolver;
import org.springframework.session.MapSession;
import org.springframework.session.PrincipalNameIndexResolver;
import org.springframework.session.SaveMode;
import org.springframework.session.Session;
import org.springframework.session.events.AbstractSessionEvent;
import org.springframework.session.events.SessionCreatedEvent;
import org.springframework.session.events.SessionDeletedEvent;
import org.springframework.session.events.SessionExpiredEvent;
import org.springframework.util.Assert;
/**
* A {@link org.springframework.session.SessionRepository} implementation that stores
* sessions in Apache Ignite distributed {@link IgniteCache}.
*
* <p>
* An example of how to create a new instance can be seen below:
*
* <pre class="code">
* IgniteConfiguration config = new IgniteConfiguration();
*
* // ... configure Ignite ...
*
* Ignite ignite = IgnitionEx.start(config);
*
* IgniteIndexedSessionRepository sessionRepository =
* new IgniteIndexedSessionRepository(ignite);
* </pre>
*
* In order to support finding sessions by principal name using
* {@link #findByIndexNameAndIndexValue(String, String)} method, custom configuration of
* {@link IgniteCache} supplied to this implementation is required.
*
* This implementation listens for events on the Ignite-backed SessionRepository and
* translates those events into the corresponding Spring Session events. Publish the
* Spring Session events with the given {@link ApplicationEventPublisher}.
*
* <ul>
* <li>entryAdded - {@link SessionCreatedEvent}</li>
* <li>entryEvicted - {@link SessionExpiredEvent}</li>
* <li>entryRemoved - {@link SessionDeletedEvent}</li>
* </ul>
*
*/
public class IgniteIndexedSessionRepository
implements FindByIndexNameSessionRepository<IgniteIndexedSessionRepository.IgniteSession>,
CacheEntryCreatedListener<String, IgniteIndexedSessionRepository.IgniteSession>,
CacheEntryRemovedListener<String, IgniteIndexedSessionRepository.IgniteSession>,
CacheEntryExpiredListener<String, IgniteIndexedSessionRepository.IgniteSession> {
/**
* The default name of map used by Spring Session to store sessions.
*/
public static final String DEFAULT_SESSION_MAP_NAME = "spring:session:sessions";
/**
*
*/
private static final String SPRING_SECURITY_CONTEXT = "SPRING_SECURITY_CONTEXT";
/**
*
*/
private static final Log logger = LogFactory.getLog(IgniteIndexedSessionRepository.class);
/**
*
*/
private final Ignite ignite;
/**
*
*/
private ApplicationEventPublisher eventPublisher = (event) -> {
};
/**
* If non-null, this value is used to override
* {@link MapSession#setMaxInactiveInterval(Duration)}.
*/
private Integer defaultMaxInactiveInterval;
/**
*
*/
private IndexResolver<Session> indexResolver = new DelegatingIndexResolver<>(new PrincipalNameIndexResolver<>());
/**
*
*/
private String sessionMapName = DEFAULT_SESSION_MAP_NAME;
/**
*
*/
private FlushMode flushMode = FlushMode.ON_SAVE;
/**
*
*/
private SaveMode saveMode = SaveMode.ON_SET_ATTRIBUTE;
/**
*
*/
private IgniteCache<String, IgniteSession> sessions;
/**
*
*/
private CacheEntryListenerConfiguration<String, IgniteSession> listenerConfiguration;
/**
* Create a new {@link IgniteIndexedSessionRepository} instance.
* @param ignite the {@link Ignite} instance to use for managing sessions
*/
public IgniteIndexedSessionRepository(Ignite ignite) {
Assert.notNull(ignite, "Ignite must not be null");
this.ignite = ignite;
}
/**
*
*/
@PostConstruct
public void init() {
final CacheConfiguration<String, IgniteSession> configuration = new CacheConfiguration<String, IgniteSession>(
this.sessionMapName).setIndexedTypes(String.class, IgniteSession.class);
if (this.defaultMaxInactiveInterval != null)
configuration.setExpiryPolicyFactory(TouchedExpiryPolicy
.factoryOf(new javax.cache.expiry.Duration(TimeUnit.SECONDS, this.defaultMaxInactiveInterval)));
this.sessions = this.ignite.getOrCreateCache(configuration);
this.listenerConfiguration = new CacheEntryListenerConfiguration<String, IgniteSession>() {
@Override public Factory<CacheEntryListener<? super String, ? super IgniteSession>> getCacheEntryListenerFactory() {
return (Factory<CacheEntryListener<? super String, ? super IgniteSession>>) () -> IgniteIndexedSessionRepository.this;
}
@Override public boolean isOldValueRequired() {
return true;
}
@Override public Factory<CacheEntryEventFilter<? super String, ? super IgniteSession>> getCacheEntryEventFilterFactory() {
return null;
}
@Override public boolean isSynchronous() {
return false;
}
};
this.sessions.registerCacheEntryListener(this.listenerConfiguration);
}
/**
*
*/
@PreDestroy
public void close() {
this.sessions.deregisterCacheEntryListener(this.listenerConfiguration);
}
/**
* Sets the {@link ApplicationEventPublisher} that is used to publish
* {@link AbstractSessionEvent session events}. The default is to not publish session
* events.
* @param applicationEventPublisher the {@link ApplicationEventPublisher} that is used
* to publish session events. Cannot be null.
*/
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
Assert.notNull(applicationEventPublisher, "ApplicationEventPublisher cannot be null");
this.eventPublisher = applicationEventPublisher;
}
/**
* Set the maximum inactive interval in seconds between requests before newly created
* sessions will be invalidated. A negative time indicates that the session will never
* timeout. The default is 1800 (30 minutes).
* @param defaultMaxInactiveInterval the maximum inactive interval in seconds
*/
public void setDefaultMaxInactiveInterval(Integer defaultMaxInactiveInterval) {
this.defaultMaxInactiveInterval = defaultMaxInactiveInterval;
}
/**
* Set the {@link IndexResolver} to use.
* @param indexResolver the index resolver
*/
public void setIndexResolver(IndexResolver<Session> indexResolver) {
Assert.notNull(indexResolver, "indexResolver cannot be null");
this.indexResolver = indexResolver;
}
/**
* Set the name of map used to store sessions.
* @param sessionMapName the session map name
*/
public void setSessionMapName(String sessionMapName) {
Assert.hasText(sessionMapName, "Map name must not be empty");
this.sessionMapName = sessionMapName;
}
/**
* Sets the flush mode. Default flush mode is {@link FlushMode#ON_SAVE}.
* @param flushMode the new flush mode
*/
public void setFlushMode(FlushMode flushMode) {
Assert.notNull(flushMode, "flushMode cannot be null");
this.flushMode = flushMode;
}
/**
* Set the save mode.
* @param saveMode the save mode
*/
public void setSaveMode(SaveMode saveMode) {
Assert.notNull(saveMode, "saveMode must not be null");
this.saveMode = saveMode;
}
/**
*
*/
@Override public IgniteSession createSession() {
MapSession cached = new MapSession();
if (this.defaultMaxInactiveInterval != null)
cached.setMaxInactiveInterval(Duration.ofSeconds(this.defaultMaxInactiveInterval));
IgniteSession session = new IgniteSession(cached, true);
session.flushImmediateIfNecessary();
return session;
}
/**
*
*/
@Override public void save(IgniteSession session) {
if (session.isNew)
ttlSessions(session.getMaxInactiveInterval()).put(session.getId(), session);
else if (session.sessionIdChanged) {
this.sessions.remove(session.originalId);
session.originalId = session.getId();
ttlSessions(session.getMaxInactiveInterval()).put(session.getId(), session);
}
else if (session.hasChanges()) {
if (session.maxInactiveIntervalChanged) {
ttlSessions(session.getMaxInactiveInterval()).replace(session.getId(), session);
}
else {
this.sessions.replace(session.getId(), session);
}
}
session.clearChangeFlags();
}
/**
*
*/
@Override public IgniteSession findById(String id) {
IgniteSession saved = this.sessions.get(id);
if (saved == null)
return null;
if (saved.isExpired()) {
deleteById(saved.getId());
return null;
}
saved.isNew = false;
return saved;
}
/**
*
*/
@Override public void deleteById(String id) {
this.sessions.remove(id);
}
/**
*
*/
@Override public Map<String, IgniteSession> findByIndexNameAndIndexValue(String indexName, String indexValue) {
if (!PRINCIPAL_NAME_INDEX_NAME.equals(indexName))
return Collections.emptyMap();
final FieldsQueryCursor<List<?>> cursor = this.sessions
.query(new SqlFieldsQuery("SELECT * FROM IgniteSession WHERE principal='" + indexValue + "'"));
if (cursor == null)
return Collections.emptyMap();
final List<List<?>> sessions = cursor.getAll();
Map<String, IgniteSession> sessionMap = new HashMap<>(sessions.size());
sessions.forEach((List<?> res) -> {
final MapSession session = (MapSession) res.get(0);
final IgniteSession value = new IgniteSession(session, false);
value.principal = (String) res.get(1);
sessionMap.put(session.getId(), value);
});
return sessionMap;
}
/**
*
*/
@Override public void onCreated(Iterable<CacheEntryEvent<? extends String, ? extends IgniteSession>> events)
throws CacheEntryListenerException {
events.forEach((event) -> {
IgniteSession session = event.getValue();
if (session.getId().equals(session.getDelegate().getOriginalId())) {
if (logger.isDebugEnabled())
logger.debug("Session created with id: " + session.getId());
this.eventPublisher.publishEvent(new SessionCreatedEvent(this, session));
}
});
}
/**
*
*/
@Override
public void onExpired(Iterable<CacheEntryEvent<? extends String, ? extends IgniteSession>> events)
throws CacheEntryListenerException {
events.forEach((event) -> {
if (logger.isDebugEnabled())
logger.debug("Session expired with id: " + event.getOldValue().getId());
this.eventPublisher.publishEvent(new SessionExpiredEvent(this, event.getOldValue()));
});
}
/**
*
*/
@Override public void onRemoved(Iterable<CacheEntryEvent<? extends String, ? extends IgniteSession>> events)
throws CacheEntryListenerException {
events.forEach((event) -> {
IgniteSession session = event.getOldValue();
if (session != null) {
if (logger.isDebugEnabled())
logger.debug("Session deleted with id: " + session.getId());
this.eventPublisher.publishEvent(new SessionDeletedEvent(this, session));
}
});
}
/**
* Get cache view with custom duration expiry policy.
* @param duration expiry duration for IgniteSession.
* @return cache with custom duration expiry policy.
*/
private IgniteCache<String, IgniteSession> ttlSessions(Duration duration) {
return this.sessions.withExpiryPolicy(createPolicy(duration));
}
/**
* Create expiry policy from {@link Duration}.
* @param duration expiry duration.
* @return expiry policy.
*/
private static TouchedExpiryPolicy createPolicy(Duration duration) {
return new TouchedExpiryPolicy(new javax.cache.expiry.Duration(TimeUnit.SECONDS, duration.getSeconds()));
}
/**
* A custom implementation of {@link Session} that uses a {@link MapSession} as the
* basis for its mapping. It keeps track if changes have been made since last save.
*/
final class IgniteSession implements Session {
/**
*
*/
@QuerySqlField
private final MapSession delegate;
/**
*
*/
@GridDirectTransient
private boolean isNew;
/**
*
*/
@GridDirectTransient
private boolean sessionIdChanged;
/**
*
*/
@GridDirectTransient
private boolean lastAccessedTimeChanged;
/**
*
*/
@GridDirectTransient
private boolean maxInactiveIntervalChanged;
/**
*
*/
@GridDirectTransient
private String originalId;
/**
*
*/
@GridDirectTransient
private Map<String, Object> delta = new HashMap<>();
/**
*
*/
@QuerySqlField(index = true)
private String principal;
IgniteSession(MapSession cached, boolean isNew) {
this.delegate = cached;
this.isNew = isNew;
this.originalId = cached.getId();
if (this.isNew || (IgniteIndexedSessionRepository.this.saveMode == SaveMode.ALWAYS))
getAttributeNames()
.forEach((attributeName) -> this.delta.put(attributeName, cached.getAttribute(attributeName)));
}
/**
*
*/
@Override public void setLastAccessedTime(Instant lastAccessedTime) {
this.delegate.setLastAccessedTime(lastAccessedTime);
this.lastAccessedTimeChanged = true;
flushImmediateIfNecessary();
}
/**
*
*/
@Override public boolean isExpired() {
return this.delegate.isExpired();
}
/**
*
*/
@Override public Instant getCreationTime() {
return this.delegate.getCreationTime();
}
/**
*
*/
@Override public String getId() {
return this.delegate.getId();
}
/**
*
*/
@Override public String changeSessionId() {
String newSessionId = this.delegate.changeSessionId();
this.sessionIdChanged = true;
return newSessionId;
}
/**
*
*/
@Override public Instant getLastAccessedTime() {
return this.delegate.getLastAccessedTime();
}
/**
*
*/
@Override public void setMaxInactiveInterval(Duration interval) {
this.delegate.setMaxInactiveInterval(interval);
this.maxInactiveIntervalChanged = true;
flushImmediateIfNecessary();
}
/**
*
*/
@Override public Duration getMaxInactiveInterval() {
return this.delegate.getMaxInactiveInterval();
}
/**
*
*/
@Override public <T> T getAttribute(String attributeName) {
T attributeValue = this.delegate.getAttribute(attributeName);
if (attributeValue != null
&& IgniteIndexedSessionRepository.this.saveMode.equals(SaveMode.ON_GET_ATTRIBUTE))
this.delta.put(attributeName, attributeValue);
return attributeValue;
}
/**
*
*/
@Override public Set<String> getAttributeNames() {
return this.delegate.getAttributeNames();
}
/**
*
*/
@Override public void setAttribute(String attributeName, Object attributeValue) {
this.delegate.setAttribute(attributeName, attributeValue);
this.delta.put(attributeName, attributeValue);
if (SPRING_SECURITY_CONTEXT.equals(attributeName)) {
Map<String, String> indexes = IgniteIndexedSessionRepository.this.indexResolver.resolveIndexesFor(this);
String principal = (attributeValue != null) ? indexes.get(PRINCIPAL_NAME_INDEX_NAME) : null;
this.delegate.setAttribute(PRINCIPAL_NAME_INDEX_NAME, principal);
this.principal = principal;
}
flushImmediateIfNecessary();
}
/**
*
*/
@Override public void removeAttribute(String attributeName) {
setAttribute(attributeName, null);
}
/**
*
*/
MapSession getDelegate() {
return this.delegate;
}
/**
*
*/
boolean hasChanges() {
return (this.lastAccessedTimeChanged || this.maxInactiveIntervalChanged || !this.delta.isEmpty());
}
/**
*
*/
void clearChangeFlags() {
this.isNew = false;
this.lastAccessedTimeChanged = false;
this.sessionIdChanged = false;
this.maxInactiveIntervalChanged = false;
this.delta.clear();
}
/**
*
*/
private void flushImmediateIfNecessary() {
if (IgniteIndexedSessionRepository.this.flushMode == FlushMode.IMMEDIATE)
IgniteIndexedSessionRepository.this.save(this);
}
/**
*
*/
@Override public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
IgniteSession session = (IgniteSession) o;
return this.delegate.equals(session.delegate);
}
/**
*
*/
@Override public int hashCode() {
return Objects.hash(this.delegate);
}
}
}