package org.apache.ignite.spring.sessions;

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.cache.configuration.CacheEntryListenerConfiguration;
import javax.cache.configuration.Factory;
import javax.cache.event.CacheEntryCreatedListener;
import javax.cache.event.CacheEntryEvent;
import javax.cache.event.CacheEntryEventFilter;
import javax.cache.event.CacheEntryExpiredListener;
import javax.cache.event.CacheEntryListener;
import javax.cache.event.CacheEntryListenerException;
import javax.cache.event.CacheEntryRemovedListener;
import javax.cache.expiry.TouchedExpiryPolicy;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.query.FieldsQueryCursor;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.annotations.QuerySqlField;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.GridDirectTransient;

import org.springframework.context.ApplicationEventPublisher;
import org.springframework.session.DelegatingIndexResolver;
import org.springframework.session.FindByIndexNameSessionRepository;
import org.springframework.session.FlushMode;
import org.springframework.session.IndexResolver;
import org.springframework.session.MapSession;
import org.springframework.session.PrincipalNameIndexResolver;
import org.springframework.session.SaveMode;
import org.springframework.session.Session;
import org.springframework.session.events.AbstractSessionEvent;
import org.springframework.session.events.SessionCreatedEvent;
import org.springframework.session.events.SessionDeletedEvent;
import org.springframework.session.events.SessionExpiredEvent;
import org.springframework.util.Assert;

/**
 * A {@link org.springframework.session.SessionRepository} implementation that stores
 * sessions in Apache Ignite distributed {@link IgniteCache}.
 *
 * <p>
 * An example of how to create a new instance can be seen below:
 *
 * <pre class="code">
 * IgniteConfiguration config = new IgniteConfiguration();
 *
 * // ... configure Ignite ...
 *
 * Ignite ignite = IgnitionEx.start(config);
 *
 * IgniteIndexedSessionRepository sessionRepository =
 *         new IgniteIndexedSessionRepository(ignite);
 * </pre>
 *
 * In order to support finding sessions by principal name using
 * {@link #findByIndexNameAndIndexValue(String, String)} method, custom configuration of
 * {@link IgniteCache} supplied to this implementation is required.
 *
 * This implementation listens for events on the Ignite-backed SessionRepository and
 * translates those events into the corresponding Spring Session events. Publish the
 * Spring Session events with the given {@link ApplicationEventPublisher}.
 *
 * <ul>
 * <li>entryAdded - {@link SessionCreatedEvent}</li>
 * <li>entryEvicted - {@link SessionExpiredEvent}</li>
 * <li>entryRemoved - {@link SessionDeletedEvent}</li>
 * </ul>
 *
 */
public class IgniteIndexedSessionRepository
        implements FindByIndexNameSessionRepository<IgniteIndexedSessionRepository.IgniteSession>,
        CacheEntryCreatedListener<String, IgniteIndexedSessionRepository.IgniteSession>,
        CacheEntryRemovedListener<String, IgniteIndexedSessionRepository.IgniteSession>,
        CacheEntryExpiredListener<String, IgniteIndexedSessionRepository.IgniteSession> {

    /**
     * The default name of map used by Spring Session to store sessions.
     */
    public static final String DEFAULT_SESSION_MAP_NAME = "spring:session:sessions";

    /**
     * The principal name custom attribute name.
     */
    public static final String PRINCIPAL_NAME_ATTRIBUTE = "principalName";

    private static final String SPRING_SECURITY_CONTEXT = "SPRING_SECURITY_CONTEXT";

    private static final Log logger = LogFactory.getLog(IgniteIndexedSessionRepository.class);

    private final Ignite ignite;

    private ApplicationEventPublisher eventPublisher = (event) -> {
    };

    /**
     * If non-null, this value is used to override
     * {@link MapSession#setMaxInactiveInterval(Duration)}.
     */
    private Integer defaultMaxInactiveInterval;

    private IndexResolver<Session> indexResolver = new DelegatingIndexResolver<>(new PrincipalNameIndexResolver<>());

    private String sessionMapName = DEFAULT_SESSION_MAP_NAME;

    private FlushMode flushMode = FlushMode.ON_SAVE;

    private SaveMode saveMode = SaveMode.ON_SET_ATTRIBUTE;

    private IgniteCache<String, IgniteSession> sessions;

    private CacheEntryListenerConfiguration<String, IgniteSession> listenerConfiguration;

    /**
     * Create a new {@link IgniteIndexedSessionRepository} instance.
     * @param ignite the {@link Ignite} instance to use for managing sessions
     */
    public IgniteIndexedSessionRepository(Ignite ignite) {
        Assert.notNull(ignite, "Ignite must not be null");
        this.ignite = ignite;
    }

    @PostConstruct
    public void init() {
        final CacheConfiguration<String, IgniteSession> configuration = new CacheConfiguration<String, IgniteSession>(
                this.sessionMapName).setIndexedTypes(String.class, IgniteSession.class);

        if (this.defaultMaxInactiveInterval != null) {
            configuration.setExpiryPolicyFactory(TouchedExpiryPolicy
                    .factoryOf(new javax.cache.expiry.Duration(TimeUnit.SECONDS, this.defaultMaxInactiveInterval)));
        }

        this.sessions = this.ignite.getOrCreateCache(configuration);

        this.listenerConfiguration = new CacheEntryListenerConfiguration<String, IgniteSession>() {
            @Override
            public Factory<CacheEntryListener<? super String, ? super IgniteSession>> getCacheEntryListenerFactory() {
                return (Factory<CacheEntryListener<? super String, ? super IgniteSession>>) () -> IgniteIndexedSessionRepository.this;
            }

            @Override
            public boolean isOldValueRequired() {
                return true;
            }

            @Override
            public Factory<CacheEntryEventFilter<? super String, ? super IgniteSession>> getCacheEntryEventFilterFactory() {
                return null;
            }

            @Override
            public boolean isSynchronous() {
                return false;
            }
        };
        this.sessions.registerCacheEntryListener(this.listenerConfiguration);
    }

    @PreDestroy
    public void close() {
        this.sessions.deregisterCacheEntryListener(this.listenerConfiguration);
    }

    /**
     * Sets the {@link ApplicationEventPublisher} that is used to publish
     * {@link AbstractSessionEvent session events}. The default is to not publish session
     * events.
     * @param applicationEventPublisher the {@link ApplicationEventPublisher} that is used
     * to publish session events. Cannot be null.
     */
    public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
        Assert.notNull(applicationEventPublisher, "ApplicationEventPublisher cannot be null");
        this.eventPublisher = applicationEventPublisher;
    }

    /**
     * Set the maximum inactive interval in seconds between requests before newly created
     * sessions will be invalidated. A negative time indicates that the session will never
     * timeout. The default is 1800 (30 minutes).
     * @param defaultMaxInactiveInterval the maximum inactive interval in seconds
     */
    public void setDefaultMaxInactiveInterval(Integer defaultMaxInactiveInterval) {
        this.defaultMaxInactiveInterval = defaultMaxInactiveInterval;
    }

    /**
     * Set the {@link IndexResolver} to use.
     * @param indexResolver the index resolver
     */
    public void setIndexResolver(IndexResolver<Session> indexResolver) {
        Assert.notNull(indexResolver, "indexResolver cannot be null");
        this.indexResolver = indexResolver;
    }

    /**
     * Set the name of map used to store sessions.
     * @param sessionMapName the session map name
     */
    public void setSessionMapName(String sessionMapName) {
        Assert.hasText(sessionMapName, "Map name must not be empty");
        this.sessionMapName = sessionMapName;
    }

    /**
     * Sets the flush mode. Default flush mode is {@link FlushMode#ON_SAVE}.
     * @param flushMode the new flush mode
     */
    public void setFlushMode(FlushMode flushMode) {
        Assert.notNull(flushMode, "flushMode cannot be null");
        this.flushMode = flushMode;
    }

    /**
     * Set the save mode.
     * @param saveMode the save mode
     */
    public void setSaveMode(SaveMode saveMode) {
        Assert.notNull(saveMode, "saveMode must not be null");
        this.saveMode = saveMode;
    }

    @Override
    public IgniteSession createSession() {
        MapSession cached = new MapSession();
        if (this.defaultMaxInactiveInterval != null) {
            cached.setMaxInactiveInterval(Duration.ofSeconds(this.defaultMaxInactiveInterval));
        }
        IgniteSession session = new IgniteSession(cached, true);
        session.flushImmediateIfNecessary();
        return session;
    }

    @Override
    public void save(IgniteSession session) {
        if (session.isNew) {
            ttlSessions(session.getMaxInactiveInterval()).put(session.getId(), session);
        }
        else if (session.sessionIdChanged) {
            this.sessions.remove(session.originalId);
            session.originalId = session.getId();
            ttlSessions(session.getMaxInactiveInterval()).put(session.getId(), session);
        }
        else if (session.hasChanges()) {
            if (session.maxInactiveIntervalChanged) {
                ttlSessions(session.getMaxInactiveInterval()).replace(session.getId(), session);
            }
            else {
                this.sessions.replace(session.getId(), session);
            }
        }
        session.clearChangeFlags();
    }

    @Override
    public IgniteSession findById(String id) {
        IgniteSession saved = this.sessions.get(id);
        if (saved == null) {
            return null;
        }
        if (saved.isExpired()) {
            deleteById(saved.getId());
            return null;
        }
        saved.isNew = false;
        return saved;
    }

    @Override
    public void deleteById(String id) {
        this.sessions.remove(id);
    }

    @Override
    public Map<String, IgniteSession> findByIndexNameAndIndexValue(String indexName, String indexValue) {
        if (!PRINCIPAL_NAME_INDEX_NAME.equals(indexName)) {
            return Collections.emptyMap();
        }
        final FieldsQueryCursor<List<?>> cursor = this.sessions
                .query(new SqlFieldsQuery("SELECT * FROM IgniteSession WHERE principal='" + indexValue + "'"));

        if (cursor == null) {
            return Collections.emptyMap();
        }

        final List<List<?>> sessions = cursor.getAll();

        Map<String, IgniteSession> sessionMap = new HashMap<>(sessions.size());
        sessions.forEach((List<?> res) -> {
            final MapSession session = (MapSession) res.get(0);
            final IgniteSession value = new IgniteSession(session, false);
            value.principal = (String) res.get(1);
            sessionMap.put(session.getId(), value);
        });

        return sessionMap;
    }

    @Override
    public void onCreated(Iterable<CacheEntryEvent<? extends String, ? extends IgniteSession>> events)
            throws CacheEntryListenerException {
        events.forEach((event) -> {
            IgniteSession session = event.getValue();
            if (session.getId().equals(session.getDelegate().getOriginalId())) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Session created with id: " + session.getId());
                }
                this.eventPublisher.publishEvent(new SessionCreatedEvent(this, session));
            }
        });
    }

    @Override
    public void onExpired(Iterable<CacheEntryEvent<? extends String, ? extends IgniteSession>> events)
            throws CacheEntryListenerException {
        events.forEach((event) -> {
            if (logger.isDebugEnabled()) {
                logger.debug("Session expired with id: " + event.getOldValue().getId());
            }
            this.eventPublisher.publishEvent(new SessionExpiredEvent(this, event.getOldValue()));
        });
    }

    @Override
    public void onRemoved(Iterable<CacheEntryEvent<? extends String, ? extends IgniteSession>> events)
            throws CacheEntryListenerException {
        events.forEach((event) -> {
            IgniteSession session = event.getOldValue();
            if (session != null) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Session deleted with id: " + session.getId());
                }
                this.eventPublisher.publishEvent(new SessionDeletedEvent(this, session));
            }
        });
    }

    /**
     * Get cache view with custom duration expiry policy.
     * @param duration expiry duration for IgniteSession.
     * @return cache with custom duration expiry policy.
     */
    private IgniteCache<String, IgniteSession> ttlSessions(Duration duration) {
        return this.sessions.withExpiryPolicy(createPolicy(duration));
    }

    /**
     * Create expiry policy from {@link Duration}.
     * @param duration expiry duration.
     * @return expiry policy.
     */
    private static TouchedExpiryPolicy createPolicy(Duration duration) {
        return new TouchedExpiryPolicy(new javax.cache.expiry.Duration(TimeUnit.SECONDS, duration.getSeconds()));
    }

    /**
     * A custom implementation of {@link Session} that uses a {@link MapSession} as the
     * basis for its mapping. It keeps track if changes have been made since last save.
     */
    final class IgniteSession implements Session {

        @QuerySqlField
        private final MapSession delegate;

        @GridDirectTransient
        private boolean isNew;

        @GridDirectTransient
        private boolean sessionIdChanged;

        @GridDirectTransient
        private boolean lastAccessedTimeChanged;

        @GridDirectTransient
        private boolean maxInactiveIntervalChanged;

        @GridDirectTransient
        private String originalId;

        @GridDirectTransient
        private Map<String, Object> delta = new HashMap<>();

        @QuerySqlField(index = true)
        private String principal;

        IgniteSession(MapSession cached, boolean isNew) {
            this.delegate = cached;
            this.isNew = isNew;
            this.originalId = cached.getId();
            if (this.isNew || (IgniteIndexedSessionRepository.this.saveMode == SaveMode.ALWAYS)) {
                getAttributeNames()
                        .forEach((attributeName) -> this.delta.put(attributeName, cached.getAttribute(attributeName)));
            }
        }

        @Override
        public void setLastAccessedTime(Instant lastAccessedTime) {
            this.delegate.setLastAccessedTime(lastAccessedTime);
            this.lastAccessedTimeChanged = true;
            flushImmediateIfNecessary();
        }

        @Override
        public boolean isExpired() {
            return this.delegate.isExpired();
        }

        @Override
        public Instant getCreationTime() {
            return this.delegate.getCreationTime();
        }

        @Override
        public String getId() {
            return this.delegate.getId();
        }

        @Override
        public String changeSessionId() {
            String newSessionId = this.delegate.changeSessionId();
            this.sessionIdChanged = true;
            return newSessionId;
        }

        @Override
        public Instant getLastAccessedTime() {
            return this.delegate.getLastAccessedTime();
        }

        @Override
        public void setMaxInactiveInterval(Duration interval) {
            this.delegate.setMaxInactiveInterval(interval);
            this.maxInactiveIntervalChanged = true;
            flushImmediateIfNecessary();
        }

        @Override
        public Duration getMaxInactiveInterval() {
            return this.delegate.getMaxInactiveInterval();
        }

        @Override
        public <T> T getAttribute(String attributeName) {
            T attributeValue = this.delegate.getAttribute(attributeName);
            if (attributeValue != null
                    && IgniteIndexedSessionRepository.this.saveMode.equals(SaveMode.ON_GET_ATTRIBUTE)) {
                this.delta.put(attributeName, attributeValue);
            }
            return attributeValue;
        }

        @Override
        public Set<String> getAttributeNames() {
            return this.delegate.getAttributeNames();
        }

        @Override
        public void setAttribute(String attributeName, Object attributeValue) {
            this.delegate.setAttribute(attributeName, attributeValue);
            this.delta.put(attributeName, attributeValue);
            if (SPRING_SECURITY_CONTEXT.equals(attributeName)) {
                Map<String, String> indexes = IgniteIndexedSessionRepository.this.indexResolver.resolveIndexesFor(this);
                String principal = (attributeValue != null) ? indexes.get(PRINCIPAL_NAME_INDEX_NAME) : null;
                this.delegate.setAttribute(PRINCIPAL_NAME_INDEX_NAME, principal);
                this.principal = principal;
            }
            flushImmediateIfNecessary();
        }

        @Override
        public void removeAttribute(String attributeName) {
            setAttribute(attributeName, null);
        }

        MapSession getDelegate() {
            return this.delegate;
        }

        boolean hasChanges() {
            return (this.lastAccessedTimeChanged || this.maxInactiveIntervalChanged || !this.delta.isEmpty());
        }

        void clearChangeFlags() {
            this.isNew = false;
            this.lastAccessedTimeChanged = false;
            this.sessionIdChanged = false;
            this.maxInactiveIntervalChanged = false;
            this.delta.clear();
        }

        private void flushImmediateIfNecessary() {
            if (IgniteIndexedSessionRepository.this.flushMode == FlushMode.IMMEDIATE) {
                IgniteIndexedSessionRepository.this.save(this);
            }
        }

        @Override
        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || getClass() != o.getClass()) {
                return false;
            }
            IgniteSession session = (IgniteSession) o;
            return this.delegate.equals(session.delegate);
        }

        @Override
        public int hashCode() {
            return Objects.hash(this.delegate);
        }

    }

}
