/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.sling.discovery.base.connectors.ping;

import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.StringTokenizer;
import java.util.zip.GZIPOutputStream;

import javax.json.JsonException;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.apache.sling.discovery.ClusterView;
import org.apache.sling.discovery.base.commons.ClusterViewHelper;
import org.apache.sling.discovery.base.commons.ClusterViewService;
import org.apache.sling.discovery.base.commons.UndefinedClusterViewException;
import org.apache.sling.discovery.base.connectors.BaseConfig;
import org.apache.sling.discovery.base.connectors.announcement.Announcement;
import org.apache.sling.discovery.base.connectors.announcement.AnnouncementRegistry;
import org.apache.sling.discovery.base.connectors.ping.wl.SubnetWhitelistEntry;
import org.apache.sling.discovery.base.connectors.ping.wl.WhitelistEntry;
import org.apache.sling.discovery.base.connectors.ping.wl.WildcardWhitelistEntry;
import org.osgi.service.component.ComponentContext;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.http.HttpService;
import org.osgi.service.http.NamespaceException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Servlet which receives topology announcements at
 * /libs/sling/topology/connector*
 * without authorization (authorization is handled either via
 * hmac-signature with a shared key or via a flexible whitelist)
 */
@SuppressWarnings("serial")
@Component(immediate = true, service = TopologyConnectorServlet.class)
public class TopologyConnectorServlet extends HttpServlet {

    /**
     * prefix under which the topology connector servlet is registered -
     * the URL will consist of this prefix + "connector.slingId.json"
     */
    private static final String TOPOLOGY_CONNECTOR_PREFIX = "/libs/sling/topology";

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    @Reference
    private AnnouncementRegistry announcementRegistry;

    @Reference
    private ClusterViewService clusterViewService;

    @Reference
    private HttpService httpService;

    @Reference
    private BaseConfig config;

    /**
     * This list contains WhitelistEntry (ips/hostnames, cidr, wildcards),
     * each filtering some hostname/addresses that are allowed to connect to this servlet.
     **/
    private final List<WhitelistEntry> whitelist = new ArrayList<>();

    /**
     * Set of plaintext whitelist entries - for faster lookups
     **/
    private final Set<String> plaintextWhitelist = new HashSet<>();

    private TopologyRequestValidator requestValidator;

    @Activate
    protected void activate(final ComponentContext context) {
        whitelist.clear();
        if (!config.isHmacEnabled()) {
            String[] whitelistConfig = config.getTopologyConnectorWhitelist();
            initWhitelist(whitelistConfig);
        }
        requestValidator = new TopologyRequestValidator(config);

        try {
            httpService.registerServlet(TopologyConnectorServlet.TOPOLOGY_CONNECTOR_PREFIX, this, null, null);
            logger.info("activate: connector servlet registered at " + TopologyConnectorServlet.TOPOLOGY_CONNECTOR_PREFIX);
        } catch (ServletException e) {
            logger.error("activate: ServletException while registering topology connector servlet: " + e, e);
        } catch (NamespaceException e) {
            logger.error("activate: NamespaceException while registering topology connector servlet: " + e, e);
        }
    }

    @Deactivate
    protected void deactivate() {
        httpService.unregister(TOPOLOGY_CONNECTOR_PREFIX);
    }

    void initWhitelist(String[] whitelistConfig) {
        if (whitelistConfig == null) {
            return;
        }
        for (int i = 0; i < whitelistConfig.length; i++) {
            String aWhitelistEntry = whitelistConfig[i];

            WhitelistEntry whitelistEntry = null;
            if (aWhitelistEntry.contains(".") && aWhitelistEntry.contains("/")) {
                // then this is a CIDR notation
                try {
                    whitelistEntry = new SubnetWhitelistEntry(aWhitelistEntry);
                } catch (Exception e) {
                    logger.error("activate: wrongly formatted CIDR subnet definition. Expected eg '1.2.3.4/24'. ignoring: " + aWhitelistEntry);
                    continue;
                }
            } else if (aWhitelistEntry.contains(".") && aWhitelistEntry.contains(" ")) {
                // then this is a IP/subnet-mask notation
                try {
                    final StringTokenizer st = new StringTokenizer(aWhitelistEntry, " ");
                    final String ip = st.nextToken();
                    if (st.hasMoreTokens()) {
                        final String mask = st.nextToken();
                        if (st.hasMoreTokens()) {
                            logger.error("activate: wrongly formatted ip subnet definition. Expected '10.1.2.3 255.0.0.0'. Ignoring: " + aWhitelistEntry);
                            continue;
                        }
                        whitelistEntry = new SubnetWhitelistEntry(ip, mask);
                    }
                } catch (Exception e) {
                    logger.error("activate: wrongly formatted ip subnet definition. Expected '10.1.2.3 255.0.0.0'. Ignoring: " + aWhitelistEntry);
                    continue;
                }
            }
            if (whitelistEntry == null) {
                if (aWhitelistEntry.contains("*") || aWhitelistEntry.contains("?")) {
                    whitelistEntry = new WildcardWhitelistEntry(aWhitelistEntry);
                } else {
                    plaintextWhitelist.add(aWhitelistEntry);
                }
            }
            logger.info("activate: adding whitelist entry: " + aWhitelistEntry);
            if (whitelistEntry != null) {
                whitelist.add(whitelistEntry);
            }
        }
    }

    @Override
    protected void doDelete(HttpServletRequest request, HttpServletResponse response)
            throws ServletException, IOException {

        if (!isWhitelisted(request)) {
            // in theory it would be 403==forbidden, but that would reveal that
            // a resource would exist there in the first place
            response.sendError(HttpServletResponse.SC_NOT_FOUND);
            return;
        }

        final String[] pathInfo = request.getPathInfo().split("\\.");
        final String extension = pathInfo.length == 3 ? pathInfo[2] : "";
        if (!"json".equals(extension)) {
            response.sendError(HttpServletResponse.SC_NOT_FOUND);
            return;
        }
        final String selector = pathInfo.length == 3 ? pathInfo[1] : "";

        announcementRegistry.unregisterAnnouncement(selector);
    }

    @Override
    protected void doPut(HttpServletRequest request, HttpServletResponse response)
            throws ServletException, IOException {

        if (!isWhitelisted(request)) {
            // in theory it would be 403==forbidden, but that would reveal that
            // a resource would exist there in the first place
            response.sendError(HttpServletResponse.SC_NOT_FOUND);
            return;
        }

        final String[] pathInfo = request.getPathInfo().split("\\.");
        final String extension = pathInfo.length == 3 ? pathInfo[2] : "";
        if (!"json".equals(extension)) {
            response.sendError(HttpServletResponse.SC_NOT_FOUND);
            return;
        }

        final String selector = pathInfo.length == 3 ? pathInfo[1] : "";

        String topologyAnnouncementJSON = requestValidator.decodeMessage(request);

        if (logger.isDebugEnabled()) {
            // javasecurity:S5145: Replace pattern-breaking characters
            logger.debug("doPost: incoming topology announcement is: " + topologyAnnouncementJSON.replaceAll("[\n\r\t]", "_"));
        }
        final Announcement incomingTopologyAnnouncement;
        try {
            incomingTopologyAnnouncement = Announcement.fromJSON(topologyAnnouncementJSON);

            if (!incomingTopologyAnnouncement.getOwnerId().equals(selector)) {
                response.sendError(HttpServletResponse.SC_BAD_REQUEST);
                return;
            }

            String slingId = clusterViewService.getSlingId();
            if (slingId == null) {
                response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
                logger.info("doPut: no slingId available. Service not ready as expected at the moment.");
                return;
            }
            incomingTopologyAnnouncement.removeInherited(slingId);

            final Announcement replyAnnouncement = new Announcement(slingId);

            long backoffInterval = -1;
            ClusterView clusterView = clusterViewService.getLocalClusterView();
            if (!incomingTopologyAnnouncement.isCorrectVersion()) {
                logger.warn("doPost: rejecting an announcement from an incompatible connector protocol version: "
                        + incomingTopologyAnnouncement);
                response.sendError(HttpServletResponse.SC_BAD_REQUEST);
                return;
            } else if (ClusterViewHelper.contains(clusterView, incomingTopologyAnnouncement
                    .getOwnerId())) {
                if (logger.isDebugEnabled()) {
                    logger.debug("doPost: rejecting an announcement from an instance that is part of my cluster: "
                            + incomingTopologyAnnouncement);
                }
                // marking as 'loop'
                replyAnnouncement.setLoop(true);
                backoffInterval = config.getBackoffStandbyInterval();
            } else if (ClusterViewHelper.containsAny(clusterView, incomingTopologyAnnouncement
                    .listInstances())) {
                if (logger.isDebugEnabled()) {
                    logger.debug("doPost: rejecting an announcement as it contains instance(s) that is/are part of my cluster: "
                            + incomingTopologyAnnouncement);
                }
                // marking as 'loop'
                replyAnnouncement.setLoop(true);
                backoffInterval = config.getBackoffStandbyInterval();
            } else {
                backoffInterval = announcementRegistry
                        .registerAnnouncement(incomingTopologyAnnouncement);
                if (logger.isDebugEnabled()) {
                    logger.debug("doPost: backoffInterval after registration: " + backoffInterval);
                }
                if (backoffInterval == -1) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("doPost: rejecting an announcement from an instance that I already see in my topology: "
                                + incomingTopologyAnnouncement);
                    }
                    // marking as 'loop'
                    replyAnnouncement.setLoop(true);
                    backoffInterval = config.getBackoffStandbyInterval();
                } else {
                    // normal, successful case: replying with the part of the topology which this instance sees
                    replyAnnouncement.setLocalCluster(clusterView);
                    announcementRegistry.addAllExcept(replyAnnouncement, clusterView,
                            (receivingSlingId, announcement) -> {
                                if (announcement.getPrimaryKey().equals(
                                        incomingTopologyAnnouncement.getPrimaryKey())) {
                                    return false;
                                }
                                return true;
                            });
                }
            }
            if (backoffInterval > 0) {
                replyAnnouncement.setBackoffInterval(backoffInterval);
                if (logger.isDebugEnabled()) {
                    logger.debug("doPost: backoffInterval for client set to " + replyAnnouncement.getBackoffInterval());
                }
            }
            final String p = requestValidator.encodeMessage(replyAnnouncement.asJSON());
            requestValidator.trustMessage(response, request, p);
            // gzip the response if the client accepts this
            final String acceptEncodingHeader = request.getHeader("Accept-Encoding");
            if (acceptEncodingHeader != null && acceptEncodingHeader.contains("gzip")) {
                // tell the client that the content is gzipped:
                response.setHeader("Content-Encoding", "gzip");

                // then gzip the body
                final GZIPOutputStream gzipOut = new GZIPOutputStream(response.getOutputStream());
                gzipOut.write(p.getBytes("UTF-8"));
                gzipOut.close();
            } else {
                // otherwise plaintext
                final PrintWriter pw = response.getWriter();
                pw.print(p);
                pw.flush();
            }
        } catch (JsonException e) {
            logger.error("doPost: Got a JSONException: " + e, e);
            response.sendError(500);
        } catch (UndefinedClusterViewException e) {
            logger.warn("doPost: no clusterView available at the moment - cannot handle connectors now: " + e);
            response.sendError(503); // "please retry, but atm I can't help since I'm isolated"
        }

    }

    /**
     * Checks if the provided request's remote server is whitelisted
     **/
    boolean isWhitelisted(final HttpServletRequest request) {
        if (config.isHmacEnabled()) {
            final boolean isTrusted = requestValidator.isTrusted(request);
            if (!isTrusted) {
                logger.info("isWhitelisted: rejecting distrusted " + request.getRemoteAddr()
                        + ", " + request.getRemoteHost());
            }
            return isTrusted;
        }

        if (plaintextWhitelist.contains(request.getRemoteHost()) ||
                plaintextWhitelist.contains(request.getRemoteAddr())) {
            return true;
        }

        for (Iterator<WhitelistEntry> it = whitelist.iterator(); it.hasNext(); ) {
            WhitelistEntry whitelistEntry = it.next();
            if (whitelistEntry.accepts(request)) {
                return true;
            }
        }
        logger.info("isWhitelisted: rejecting " + request.getRemoteAddr() + ", " + request.getRemoteHost());
        return false;
    }

}
