/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.nutch.net;

import java.lang.invoke.MethodHandles;
import java.net.MalformedURLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.Vector;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.conf.Configuration;
import org.apache.nutch.plugin.Extension;
import org.apache.nutch.plugin.ExtensionPoint;
import org.apache.nutch.plugin.PluginRepository;
import org.apache.nutch.plugin.PluginRuntimeException;
import org.apache.nutch.util.ObjectCache;

/**
 * This class uses a "chained filter" pattern to run defined normalizers.
 * Different lists of normalizers may be defined for different "scopes", or
 * contexts where they are used (note however that they need to be activated
 * first through <tt>plugin.include</tt> property).
 * 
 * <p>
 * There is one global scope defined by default, which consists of all active
 * normalizers. The order in which these normalizers are executed may be defined
 * in "urlnormalizer.order" property, which lists space-separated implementation
 * classes (if this property is missing normalizers will be run in random
 * order). If there are more normalizers activated than explicitly named on this
 * list, the remaining ones will be run in random order after the ones specified
 * on the list are executed.
 * </p>
 * <p>
 * You can define a set of contexts (or scopes) in which normalizers may be
 * called. Each scope can have its own list of normalizers (defined in
 * "urlnormalizer.scope.&lt;scope_name&gt;" property) and its own order (defined in
 * "urlnormalizer.order.&lt;scope_name&gt;" property). If any of these properties are
 * missing, default settings are used for the global scope.
 * </p>
 * <p>
 * In case no normalizers are required for any given scope, a
 * <code>org.apache.nutch.net.urlnormalizer.pass.PassURLNormalizer</code> should
 * be used.
 * </p>
 * <p>
 * Each normalizer may further select among many configurations, depending on
 * the scope in which it is called, because the scope name is passed as a
 * parameter to each normalizer. You can also use the same normalizer for many
 * scopes.
 * </p>
 * <p>
 * Several scopes have been defined, and various Nutch tools will attempt using
 * scope-specific normalizers first (and fall back to default config if
 * scope-specific configuration is missing).
 * </p>
 * <p>
 * Normalizers may be run several times, to ensure that modifications introduced
 * by normalizers at the end of the list can be further reduced by normalizers
 * executed at the beginning. By default this loop is executed just once - if
 * you want to ensure that all possible combinations have been applied you may
 * want to run this loop up to the number of activated normalizers. This loop
 * count can be configured through <tt>urlnormalizer.loop.count</tt> property.
 * As soon as the url is unchanged the loop will stop and return the result.
 * </p>
 * 
 * @author Andrzej Bialecki
 */
public final class URLNormalizers {

  /**
   * Default scope. If no scope properties are defined then the configuration
   * for this scope will be used.
   */
  public static final String SCOPE_DEFAULT = "default";
  /** Scope used by {@link org.apache.nutch.crawl.URLPartitioner}. */
  public static final String SCOPE_PARTITION = "partition";
  /** Scope used by {@link org.apache.nutch.crawl.Generator}. */
  public static final String SCOPE_GENERATE_HOST_COUNT = "generate_host_count";
  /**
   * Scope used by {@link org.apache.nutch.fetcher.Fetcher} when processing
   * redirect URLs.
   */
  public static final String SCOPE_FETCHER = "fetcher";
  /** Scope used when updating the CrawlDb with new URLs. */
  public static final String SCOPE_CRAWLDB = "crawldb";
  /** Scope used when updating the LinkDb with new URLs. */
  public static final String SCOPE_LINKDB = "linkdb";
  /** Scope used by {@link org.apache.nutch.crawl.Injector}. */
  public static final String SCOPE_INJECT = "inject";
  /**
   * Scope used when constructing new {@link org.apache.nutch.parse.Outlink}
   * instances.
   */
  public static final String SCOPE_OUTLINK = "outlink";
  /** Scope used when indexing URLs. */
  public static final String SCOPE_INDEXER = "indexer";

  private static final Logger LOG = LoggerFactory
      .getLogger(MethodHandles.lookup().lookupClass());

  /* Empty extension list for caching purposes. */
  private final List<Extension> EMPTY_EXTENSION_LIST = Collections
      .<Extension> emptyList();

  private final URLNormalizer[] EMPTY_NORMALIZERS = new URLNormalizer[0];

  private Configuration conf;

  private ExtensionPoint extensionPoint;

  private URLNormalizer[] normalizers;

  private int loopCount;

  public URLNormalizers(Configuration conf, String scope) {
    this.conf = conf;
    this.extensionPoint = PluginRepository.get(conf).getExtensionPoint(
        URLNormalizer.X_POINT_ID);
    ObjectCache objectCache = ObjectCache.get(conf);

    if (this.extensionPoint == null) {
      throw new RuntimeException("x point " + URLNormalizer.X_POINT_ID
          + " not found.");
    }

    normalizers = (URLNormalizer[]) objectCache
        .getObject(URLNormalizer.X_POINT_ID + "_" + scope);
    if (normalizers == null) {
      normalizers = getURLNormalizers(scope);
    }
    if (normalizers == EMPTY_NORMALIZERS) {
      normalizers = (URLNormalizer[]) objectCache
          .getObject(URLNormalizer.X_POINT_ID + "_" + SCOPE_DEFAULT);
      if (normalizers == null) {
        normalizers = getURLNormalizers(SCOPE_DEFAULT);
      }
    }

    loopCount = conf.getInt("urlnormalizer.loop.count", 1);
  }

  /**
   * Function returns an array of {@link URLNormalizer}s for a given scope, with
   * a specified order.
   * 
   * @param scope
   *          The scope to return the <code>Array</code> of
   *          {@link URLNormalizer}s for.
   * @return An <code>Array</code> of {@link URLNormalizer}s for the given
   *         scope.
   * @throws PluginRuntimeException
   */
  URLNormalizer[] getURLNormalizers(String scope) {
    List<Extension> extensions = getExtensions(scope);
    ObjectCache objectCache = ObjectCache.get(conf);

    if (extensions == EMPTY_EXTENSION_LIST) {
      return EMPTY_NORMALIZERS;
    }

    List<URLNormalizer> normalizers = new Vector<>(
        extensions.size());

    Iterator<Extension> it = extensions.iterator();
    while (it.hasNext()) {
      Extension ext = it.next();
      URLNormalizer normalizer = null;
      try {
        // check to see if we've cached this URLNormalizer instance yet
        normalizer = (URLNormalizer) objectCache.getObject(ext.getId());
        if (normalizer == null) {
          // go ahead and instantiate it and then cache it
          normalizer = (URLNormalizer) ext.getExtensionInstance();
          objectCache.setObject(ext.getId(), normalizer);
        }
        normalizers.add(normalizer);
      } catch (PluginRuntimeException e) {
        e.printStackTrace();
        LOG.warn("URLNormalizers:PluginRuntimeException when "
            + "initializing url normalizer plugin "
            + ext.getDescriptor().getPluginId()
            + " instance in getURLNormalizers "
            + "function: attempting to continue instantiating plugins");
      }
    }
    return normalizers.toArray(new URLNormalizer[normalizers.size()]);
  }

  /**
   * Finds the best-suited normalizer plugin for a given scope.
   * 
   * @param scope
   *          Scope for which we seek a normalizer plugin.
   * @return a list of extensions to be used for this scope. If none, returns
   *         empty list.
   * @throws PluginRuntimeException
   */
  @SuppressWarnings("unchecked")
  private List<Extension> getExtensions(String scope) {
    ObjectCache objectCache = ObjectCache.get(conf);
    List<Extension> extensions = (List<Extension>) objectCache
        .getObject(URLNormalizer.X_POINT_ID + "_x_" + scope);

    // Just compare the reference:
    // if this is the empty list, we know we will find no extension.
    if (extensions == EMPTY_EXTENSION_LIST) {
      return EMPTY_EXTENSION_LIST;
    }

    if (extensions == null) {
      extensions = findExtensions(scope);
      if (extensions != null) {
        objectCache.setObject(URLNormalizer.X_POINT_ID + "_x_" + scope,
            extensions);
      } else {
        // Put the empty extension list into cache
        // to remember we don't know any related extension.
        objectCache.setObject(URLNormalizer.X_POINT_ID + "_x_" + scope,
            EMPTY_EXTENSION_LIST);
        extensions = EMPTY_EXTENSION_LIST;
      }
    }
    return extensions;
  }

  /**
   * searches a list of suitable url normalizer plugins for the given scope.
   * 
   * @param scope
   *          Scope for which we seek a url normalizer plugin.
   * @return List - List of extensions to be used for this scope. If none,
   *         returns null.
   * @throws PluginRuntimeException
   */
  private List<Extension> findExtensions(String scope) {

    String[] orders = null;
    String orderlist = conf.get("urlnormalizer.order." + scope);
    if (orderlist == null)
      orderlist = conf.get("urlnormalizer.order");
    if (orderlist != null && !orderlist.trim().equals("")) {
      orders = orderlist.trim().split("\\s+");
    }
    String scopelist = conf.get("urlnormalizer.scope." + scope);
    Set<String> impls = null;
    if (scopelist != null && !scopelist.trim().equals("")) {
      String[] names = scopelist.split("\\s+");
      impls = new HashSet<>(Arrays.asList(names));
    }
    Extension[] extensions = this.extensionPoint.getExtensions();
    HashMap<String, Extension> normalizerExtensions = new HashMap<>();
    for (int i = 0; i < extensions.length; i++) {
      Extension extension = extensions[i];
      if (impls != null && !impls.contains(extension.getClazz()))
        continue;
      normalizerExtensions.put(extension.getClazz(), extension);
    }
    List<Extension> res = new ArrayList<>();
    if (orders == null) {
      res.addAll(normalizerExtensions.values());
    } else {
      // first add those explicitly named in correct order
      for (int i = 0; i < orders.length; i++) {
        Extension e = normalizerExtensions.get(orders[i]);
        if (e != null) {
          res.add(e);
          normalizerExtensions.remove(orders[i]);
        }
      }
      // then add all others in random order
      res.addAll(normalizerExtensions.values());
    }
    return res;
  }

  /**
   * Normalize
   * 
   * @param urlString
   *          The URL string to normalize.
   * @param scope
   *          The given scope.
   * @return A normalized String, using the given <code>scope</code>
   * @throws MalformedURLException
   *           If the given URL string is malformed.
   */
  public String normalize(String urlString, String scope)
      throws MalformedURLException {
    // optionally loop several times, and break if no further changes
    String initialString = urlString;
    for (int k = 0; k < loopCount; k++) {
      for (int i = 0; i < this.normalizers.length; i++) {
        if (urlString == null)
          return null;
        urlString = this.normalizers[i].normalize(urlString, scope);
      }
      if (initialString.equals(urlString))
        break;
      initialString = urlString;
    }
    return urlString;
  }
}
