/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.client.api.impl;

import java.io.File;
import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.net.ConnectException;
import java.net.HttpURLConnection;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.net.URL;
import java.net.URLConnection;
import java.security.GeneralSecurityException;
import java.security.PrivilegedExceptionAction;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.SSLSocketFactory;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.MultivaluedMap;

import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.hadoop.security.authentication.client.ConnectionConfigurator;
import org.apache.hadoop.security.ssl.SSLFactory;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticator;
import org.apache.hadoop.security.token.delegation.web.KerberosDelegationTokenAuthenticator;
import org.apache.hadoop.security.token.delegation.web.PseudoDelegationTokenAuthenticator;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomains;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
import org.codehaus.jackson.map.ObjectMapper;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientHandlerException;
import com.sun.jersey.api.client.ClientRequest;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.config.ClientConfig;
import com.sun.jersey.api.client.config.DefaultClientConfig;
import com.sun.jersey.api.client.filter.ClientFilter;
import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory;
import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
import com.sun.jersey.core.util.MultivaluedMapImpl;

@Private
@Evolving
public class TimelineClientImpl extends TimelineClient {

  private static final Log LOG = LogFactory.getLog(TimelineClientImpl.class);
  private static final String RESOURCE_URI_STR_V1 = "/ws/v1/timeline/";
  private static final String RESOURCE_URI_STR_V2 = "/ws/v2/timeline/";
  private static final Joiner JOINER = Joiner.on("");
  public final static int DEFAULT_SOCKET_TIMEOUT = 1 * 60 * 1000; // 1 minute

  private static Options opts;
  private static final String ENTITY_DATA_TYPE = "entity";
  private static final String DOMAIN_DATA_TYPE = "domain";

  static {
    opts = new Options();
    opts.addOption("put", true, "Put the timeline entities/domain in a JSON file");
    opts.getOption("put").setArgName("Path to the JSON file");
    opts.addOption(ENTITY_DATA_TYPE, false, "Specify the JSON file contains the entities");
    opts.addOption(DOMAIN_DATA_TYPE, false, "Specify the JSON file contains the domain");
    opts.addOption("help", false, "Print usage");
  }

  private Client client;
  private ConnectionConfigurator connConfigurator;
  private DelegationTokenAuthenticator authenticator;
  private DelegationTokenAuthenticatedURL.Token token;
  private UserGroupInformation authUgi;
  private String doAsUser;
  private Configuration configuration;
  private float timelineServiceVersion;
  private TimelineWriter timelineWriter;

  private volatile String timelineServiceAddress;

  // Retry parameters for identifying new timeline service
  // TODO consider to merge with connection retry
  private int maxServiceRetries;
  private long serviceRetryInterval;
  private boolean timelineServiceV2 = false;

  @Private
  @VisibleForTesting
  TimelineClientConnectionRetry connectionRetry;

  private TimelineEntityDispatcher entityDispatcher;

  // Abstract class for an operation that should be retried by timeline client
  @Private
  @VisibleForTesting
  public static abstract class TimelineClientRetryOp {
    // The operation that should be retried
    public abstract Object run() throws IOException;
    // The method to indicate if we should retry given the incoming exception
    public abstract boolean shouldRetryOn(Exception e);
  }

  // Class to handle retry
  // Outside this class, only visible to tests
  @Private
  @VisibleForTesting
  static class TimelineClientConnectionRetry {

    // maxRetries < 0 means keep trying
    @Private
    @VisibleForTesting
    public int maxRetries;

    @Private
    @VisibleForTesting
    public long retryInterval;

    // Indicates if retries happened last time. Only tests should read it.
    // In unit tests, retryOn() calls should _not_ be concurrent.
    private boolean retried = false;

    @Private
    @VisibleForTesting
    boolean getRetired() {
      return retried;
    }

    // Constructor with default retry settings
    public TimelineClientConnectionRetry(Configuration conf) {
      Preconditions.checkArgument(conf.getInt(
          YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES,
          YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_MAX_RETRIES) >= -1,
          "%s property value should be greater than or equal to -1",
          YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES);
      Preconditions
          .checkArgument(
              conf.getLong(
                  YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS,
                  YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS) > 0,
              "%s property value should be greater than zero",
              YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS);
      maxRetries = conf.getInt(
        YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES,
        YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_MAX_RETRIES);
      retryInterval = conf.getLong(
        YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS,
        YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS);
    }

    public Object retryOn(TimelineClientRetryOp op)
        throws RuntimeException, IOException {
      int leftRetries = maxRetries;
      retried = false;

      // keep trying
      while (true) {
        try {
          // try perform the op, if fail, keep retrying
          return op.run();
        } catch (IOException | RuntimeException e) {
          // break if there's no retries left
          if (leftRetries == 0) {
            break;
          }
          if (op.shouldRetryOn(e)) {
            logException(e, leftRetries);
          } else {
            throw e;
          }
        }
        if (leftRetries > 0) {
          leftRetries--;
        }
        retried = true;
        try {
          // sleep for the given time interval
          Thread.sleep(retryInterval);
        } catch (InterruptedException ie) {
          LOG.warn("Client retry sleep interrupted! ");
        }
      }
      throw new RuntimeException("Failed to connect to timeline server. "
          + "Connection retries limit exceeded. "
          + "The posted timeline event may be missing");
    };

    private void logException(Exception e, int leftRetries) {
      if (leftRetries > 0) {
        LOG.info("Exception caught by TimelineClientConnectionRetry,"
              + " will try " + leftRetries + " more time(s).\nMessage: "
              + e.getMessage());
      } else {
        // note that maxRetries may be -1 at the very beginning
        LOG.info("ConnectionException caught by TimelineClientConnectionRetry,"
            + " will keep retrying.\nMessage: "
            + e.getMessage());
      }
    }
  }

  private class TimelineJerseyRetryFilter extends ClientFilter {
    @Override
    public ClientResponse handle(final ClientRequest cr)
        throws ClientHandlerException {
      // Set up the retry operation
      TimelineClientRetryOp jerseyRetryOp = new TimelineClientRetryOp() {
        @Override
        public Object run() {
          // Try pass the request, if fail, keep retrying
          return getNext().handle(cr);
        }

        @Override
        public boolean shouldRetryOn(Exception e) {
          // Only retry on connection exceptions
          return (e instanceof ClientHandlerException)
              && (e.getCause() instanceof ConnectException);
        }
      };
      try {
        return (ClientResponse) connectionRetry.retryOn(jerseyRetryOp);
      } catch (IOException e) {
        throw new ClientHandlerException("Jersey retry failed!\nMessage: "
              + e.getMessage());
      }
    }
  }

  public TimelineClientImpl() {
    super(TimelineClientImpl.class.getName(), null);
  }

  public TimelineClientImpl(ApplicationId applicationId) {
    super(TimelineClientImpl.class.getName(), applicationId);
    this.timelineServiceV2 = true;
  }

  protected void serviceInit(Configuration conf) throws Exception {
    this.configuration = conf;
    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
    UserGroupInformation realUgi = ugi.getRealUser();
    if (realUgi != null) {
      authUgi = realUgi;
      doAsUser = ugi.getShortUserName();
    } else {
      authUgi = ugi;
      doAsUser = null;
    }
    ClientConfig cc = new DefaultClientConfig();
    cc.getClasses().add(YarnJacksonJaxbJsonProvider.class);
    connConfigurator = newConnConfigurator(conf);
    if (UserGroupInformation.isSecurityEnabled()) {
      authenticator = new KerberosDelegationTokenAuthenticator();
    } else {
      authenticator = new PseudoDelegationTokenAuthenticator();
    }
    authenticator.setConnectionConfigurator(connConfigurator);
    token = new DelegationTokenAuthenticatedURL.Token();

    connectionRetry = new TimelineClientConnectionRetry(conf);
    client = new Client(new URLConnectionClientHandler(
        new TimelineURLConnectionFactory()), cc);
    TimelineJerseyRetryFilter retryFilter = new TimelineJerseyRetryFilter();
    // TODO need to cleanup filter retry later.
    if (!timelineServiceV2) {
      client.addFilter(retryFilter);
    }

    // old version timeline service need to get address from configuration
    // while new version need to auto discovery (with retry).
    if (timelineServiceV2) {
      maxServiceRetries = conf.getInt(
          YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES,
          YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_MAX_RETRIES);
      serviceRetryInterval = conf.getLong(
          YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS,
          YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS);
      entityDispatcher = new TimelineEntityDispatcher(conf);
    } else {
      if (YarnConfiguration.useHttps(conf)) {
        setTimelineServiceAddress(conf.get(
            YarnConfiguration.TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS,
            YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS));
      } else {
        setTimelineServiceAddress(conf.get(
            YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
            YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_ADDRESS));
      }
      timelineServiceVersion =
          conf.getFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION,
              YarnConfiguration.DEFAULT_TIMELINE_SERVICE_VERSION);
      LOG.info("Timeline service address: " + getTimelineServiceAddress());
    }
    super.serviceInit(conf);
  }

  @Override
  protected void serviceStart() throws Exception {
    if (timelineServiceV2) {
      entityDispatcher.start();
    } else {
      timelineWriter = createTimelineWriter(configuration, authUgi, client,
          constructResURI(getConfig(), timelineServiceAddress, false));
    }
  }

  protected TimelineWriter createTimelineWriter(Configuration conf,
      UserGroupInformation ugi, Client webClient, URI uri)
      throws IOException {
    if (Float.compare(this.timelineServiceVersion, 1.5f) == 0) {
      return new FileSystemTimelineWriter(
          conf, ugi, webClient, uri);
    } else {
      return new DirectTimelineWriter(ugi, webClient, uri);
    }
  }

  @Override
  protected void serviceStop() throws Exception {
    if (this.timelineWriter != null) {
      this.timelineWriter.close();
    }
    if (timelineServiceV2) {
      entityDispatcher.stop();
    }
    super.serviceStop();
  }

  @Override
  public void flush() throws IOException {
    if (timelineWriter != null) {
      timelineWriter.flush();
    }
  }

  @Override
  public TimelinePutResponse putEntities(
      TimelineEntity... entities) throws IOException, YarnException {
    return timelineWriter.putEntities(entities);
  }

  @Override
  public void putEntities(
      org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity... entities)
          throws IOException, YarnException {
    if (!timelineServiceV2) {
      throw new YarnException("v.2 method is invoked on a v.1.x client");
    }
    entityDispatcher.dispatchEntities(true, entities);
  }

  @Override
  public void putEntitiesAsync(
      org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity... entities)
      throws IOException, YarnException {
    if (!timelineServiceV2) {
      throw new YarnException("v.2 method is invoked on a v.1.x client");
    }
    entityDispatcher.dispatchEntities(false, entities);
  }

  @Override
  public void putDomain(TimelineDomain domain) throws IOException,
      YarnException {
    timelineWriter.putDomain(domain);
  }

  // Used for new timeline service only
  @Private
  protected void putObjects(String path, MultivaluedMap<String, String> params,
      Object obj) throws IOException, YarnException {

    int retries = verifyRestEndPointAvailable();

    // timelineServiceAddress could be stale, add retry logic here.
    boolean needRetry = true;
    while (needRetry) {
      try {
        URI uri = constructResURI(getConfig(), timelineServiceAddress, true);
        putObjects(uri, path, params, obj);
        needRetry = false;
      } catch (IOException e) {
        // handle exception for timelineServiceAddress being updated.
        checkRetryWithSleep(retries, e);
        retries--;
      }
    }
  }

  private int verifyRestEndPointAvailable() throws YarnException {
    // timelineServiceAddress could haven't be initialized yet
    // or stale (only for new timeline service)
    int retries = pollTimelineServiceAddress(this.maxServiceRetries);
    if (timelineServiceAddress == null) {
      String errMessage = "TimelineClient has reached to max retry times : "
          + this.maxServiceRetries
          + ", but failed to fetch timeline service address. Please verify"
          + " Timeline Auxillary Service is configured in all the NMs";
      LOG.error(errMessage);
      throw new YarnException(errMessage);
    }
    return retries;
  }

  /**
   * Check if reaching to maximum of retries.
   * @param retries
   * @param e
   */
  private void checkRetryWithSleep(int retries, IOException e)
      throws YarnException, IOException {
    if (retries > 0) {
      try {
        Thread.sleep(this.serviceRetryInterval);
      } catch (InterruptedException ex) {
        Thread.currentThread().interrupt();
        throw new YarnException("Interrupted while retrying to connect to ATS");
      }
    } else {
      StringBuilder msg =
          new StringBuilder("TimelineClient has reached to max retry times : ");
      msg.append(this.maxServiceRetries);
      msg.append(" for service address: ");
      msg.append(timelineServiceAddress);
      LOG.error(msg.toString());
      throw new IOException(msg.toString(), e);
    }
  }

  protected void putObjects(
      URI base, String path, MultivaluedMap<String, String> params, Object obj)
          throws IOException, YarnException {
    ClientResponse resp;
    try {
      resp = client.resource(base).path(path).queryParams(params)
          .accept(MediaType.APPLICATION_JSON)
          .type(MediaType.APPLICATION_JSON)
          .put(ClientResponse.class, obj);
    } catch (RuntimeException re) {
      // runtime exception is expected if the client cannot connect the server
      String msg =
          "Failed to get the response from the timeline server.";
      LOG.error(msg, re);
      throw new IOException(re);
    }
    if (resp == null ||
        resp.getClientResponseStatus() != ClientResponse.Status.OK) {
      String msg = "Response from the timeline server is " +
          ((resp == null) ? "null":
          "not successful," + " HTTP error code: " + resp.getStatus()
          + ", Server response:\n" + resp.getEntity(String.class));
      LOG.error(msg);
      throw new YarnException(msg);
    }
  }

  @Override
  public void setTimelineServiceAddress(String address) {
    this.timelineServiceAddress = address;
  }

  private String getTimelineServiceAddress() {
    return this.timelineServiceAddress;
  }

  @SuppressWarnings("unchecked")
  @Override
  public Token<TimelineDelegationTokenIdentifier> getDelegationToken(
      final String renewer) throws IOException, YarnException {
    PrivilegedExceptionAction<Token<TimelineDelegationTokenIdentifier>>
        getDTAction =
        new PrivilegedExceptionAction<Token<TimelineDelegationTokenIdentifier>>() {

          @Override
          public Token<TimelineDelegationTokenIdentifier> run()
              throws Exception {
            DelegationTokenAuthenticatedURL authUrl =
                new DelegationTokenAuthenticatedURL(authenticator,
                    connConfigurator);
            // TODO we should add retry logic here if timelineServiceAddress is
            // not available immediately.
            return (Token) authUrl.getDelegationToken(
                constructResURI(getConfig(), getTimelineServiceAddress(), false).toURL(),
                token, renewer, doAsUser);
          }
        };
    return (Token<TimelineDelegationTokenIdentifier>) operateDelegationToken(getDTAction);
  }

  @SuppressWarnings("unchecked")
  @Override
  public long renewDelegationToken(
      final Token<TimelineDelegationTokenIdentifier> timelineDT)
          throws IOException, YarnException {
    final boolean isTokenServiceAddrEmpty =
        timelineDT.getService().toString().isEmpty();
    final String scheme = isTokenServiceAddrEmpty ? null
        : (YarnConfiguration.useHttps(this.getConfig()) ? "https" : "http");
    final InetSocketAddress address = isTokenServiceAddrEmpty ? null
        : SecurityUtil.getTokenServiceAddr(timelineDT);
    PrivilegedExceptionAction<Long> renewDTAction =
        new PrivilegedExceptionAction<Long>() {

          @Override
          public Long run() throws Exception {
            // If the timeline DT to renew is different than cached, replace it.
            // Token to set every time for retry, because when exception
            // happens, DelegationTokenAuthenticatedURL will reset it to null;
            if (!timelineDT.equals(token.getDelegationToken())) {
              token.setDelegationToken((Token) timelineDT);
            }
            DelegationTokenAuthenticatedURL authUrl =
                new DelegationTokenAuthenticatedURL(authenticator,
                    connConfigurator);
            // If the token service address is not available, fall back to use
            // the configured service address.
            final URI serviceURI = isTokenServiceAddrEmpty ?
                constructResURI(getConfig(), getTimelineServiceAddress(), false)
                : new URI(scheme, null, address.getHostName(),
                address.getPort(), RESOURCE_URI_STR_V1, null, null);
            return authUrl
                .renewDelegationToken(serviceURI.toURL(), token, doAsUser);
          }
        };
    return (Long) operateDelegationToken(renewDTAction);
  }

  @SuppressWarnings("unchecked")
  @Override
  public void cancelDelegationToken(
      final Token<TimelineDelegationTokenIdentifier> timelineDT)
          throws IOException, YarnException {
    final boolean isTokenServiceAddrEmpty =
        timelineDT.getService().toString().isEmpty();
    final String scheme = isTokenServiceAddrEmpty ? null
        : (YarnConfiguration.useHttps(this.getConfig()) ? "https" : "http");
    final InetSocketAddress address = isTokenServiceAddrEmpty ? null
        : SecurityUtil.getTokenServiceAddr(timelineDT);
    PrivilegedExceptionAction<Void> cancelDTAction =
        new PrivilegedExceptionAction<Void>() {

          @Override
          public Void run() throws Exception {
            // If the timeline DT to cancel is different than cached, replace
            // it.
            // Token to set every time for retry, because when exception
            // happens, DelegationTokenAuthenticatedURL will reset it to null;
            if (!timelineDT.equals(token.getDelegationToken())) {
              token.setDelegationToken((Token) timelineDT);
            }
            DelegationTokenAuthenticatedURL authUrl =
                new DelegationTokenAuthenticatedURL(authenticator,
                    connConfigurator);
            // If the token service address is not available, fall back to use
            // the configured service address.
            final URI serviceURI = isTokenServiceAddrEmpty ?
                constructResURI(getConfig(), getTimelineServiceAddress(), false)
                : new URI(scheme, null, address.getHostName(),
                address.getPort(), RESOURCE_URI_STR_V1, null, null);
            authUrl.cancelDelegationToken(serviceURI.toURL(), token, doAsUser);
            return null;
          }
        };
    operateDelegationToken(cancelDTAction);
  }

  @Override
  public String toString() {
    return super.toString() + " with timeline server "
        + constructResURI(getConfig(), getTimelineServiceAddress(), false)
        + " and writer " + timelineWriter;
  }

  private Object operateDelegationToken(
      final PrivilegedExceptionAction<?> action)
      throws IOException, YarnException {
    // Set up the retry operation
    TimelineClientRetryOp tokenRetryOp =
        createTimelineClientRetryOpForOperateDelegationToken(action);

    return connectionRetry.retryOn(tokenRetryOp);
  }

  /**
   * Poll TimelineServiceAddress for maximum of retries times if it is null.
   *
   * @param retries
   * @return the left retry times
   * @throws IOException
   */
  private int pollTimelineServiceAddress(int retries) throws YarnException {
    while (timelineServiceAddress == null && retries > 0) {
      try {
        Thread.sleep(this.serviceRetryInterval);
      } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
        throw new YarnException("Interrupted while trying to connect ATS");
      }
      retries--;
    }
    return retries;
  }

  private class TimelineURLConnectionFactory
      implements HttpURLConnectionFactory {

    @Override
    public HttpURLConnection getHttpURLConnection(final URL url) throws IOException {
      authUgi.checkTGTAndReloginFromKeytab();
      try {
        return new DelegationTokenAuthenticatedURL(
            authenticator, connConfigurator).openConnection(url, token,
              doAsUser);
      } catch (UndeclaredThrowableException e) {
        throw new IOException(e.getCause());
      } catch (AuthenticationException ae) {
        throw new IOException(ae);
      }
    }

  }

  private static ConnectionConfigurator newConnConfigurator(Configuration conf) {
    try {
      return newSslConnConfigurator(DEFAULT_SOCKET_TIMEOUT, conf);
    } catch (Exception e) {
      LOG.debug("Cannot load customized ssl related configuration. " +
          "Fallback to system-generic settings.", e);
      return DEFAULT_TIMEOUT_CONN_CONFIGURATOR;
    }
  }

  private static final ConnectionConfigurator DEFAULT_TIMEOUT_CONN_CONFIGURATOR =
      new ConnectionConfigurator() {
    @Override
    public HttpURLConnection configure(HttpURLConnection conn)
        throws IOException {
      setTimeouts(conn, DEFAULT_SOCKET_TIMEOUT);
      return conn;
    }
  };

  private static ConnectionConfigurator newSslConnConfigurator(final int timeout,
      Configuration conf) throws IOException, GeneralSecurityException {
    final SSLFactory factory;
    final SSLSocketFactory sf;
    final HostnameVerifier hv;

    factory = new SSLFactory(SSLFactory.Mode.CLIENT, conf);
    factory.init();
    sf = factory.createSSLSocketFactory();
    hv = factory.getHostnameVerifier();

    return new ConnectionConfigurator() {
      @Override
      public HttpURLConnection configure(HttpURLConnection conn)
          throws IOException {
        if (conn instanceof HttpsURLConnection) {
          HttpsURLConnection c = (HttpsURLConnection) conn;
          c.setSSLSocketFactory(sf);
          c.setHostnameVerifier(hv);
        }
        setTimeouts(conn, timeout);
        return conn;
      }
    };
  }

  private static void setTimeouts(URLConnection connection, int socketTimeout) {
    connection.setConnectTimeout(socketTimeout);
    connection.setReadTimeout(socketTimeout);
  }

  private static URI constructResURI(
      Configuration conf, String address, boolean v2) {
    return URI.create(
        JOINER.join(YarnConfiguration.useHttps(conf) ? "https://" : "http://",
            address, v2 ? RESOURCE_URI_STR_V2 : RESOURCE_URI_STR_V1));
  }

  public static void main(String[] argv) throws Exception {
    CommandLine cliParser = new GnuParser().parse(opts, argv);
    if (cliParser.hasOption("put")) {
      String path = cliParser.getOptionValue("put");
      if (path != null && path.length() > 0) {
        if (cliParser.hasOption(ENTITY_DATA_TYPE)) {
          putTimelineDataInJSONFile(path, ENTITY_DATA_TYPE);
          return;
        } else if (cliParser.hasOption(DOMAIN_DATA_TYPE)) {
          putTimelineDataInJSONFile(path, DOMAIN_DATA_TYPE);
          return;
        }
      }
    }
    printUsage();
  }

  /**
   * Put timeline data in a JSON file via command line.
   * 
   * @param path
   *          path to the timeline data JSON file
   * @param type
   *          the type of the timeline data in the JSON file
   */
  private static void putTimelineDataInJSONFile(String path, String type) {
    File jsonFile = new File(path);
    if (!jsonFile.exists()) {
      LOG.error("File [" + jsonFile.getAbsolutePath() + "] doesn't exist");
      return;
    }
    ObjectMapper mapper = new ObjectMapper();
    YarnJacksonJaxbJsonProvider.configObjectMapper(mapper);
    TimelineEntities entities = null;
    TimelineDomains domains = null;
    try {
      if (type.equals(ENTITY_DATA_TYPE)) {
        entities = mapper.readValue(jsonFile, TimelineEntities.class);
      } else if (type.equals(DOMAIN_DATA_TYPE)){
        domains = mapper.readValue(jsonFile, TimelineDomains.class);
      }
    } catch (Exception e) {
      LOG.error("Error when reading  " + e.getMessage());
      e.printStackTrace(System.err);
      return;
    }
    Configuration conf = new YarnConfiguration();
    TimelineClient client = TimelineClient.createTimelineClient();
    client.init(conf);
    client.start();
    try {
      if (UserGroupInformation.isSecurityEnabled()
          && conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, false)) {
        Token<TimelineDelegationTokenIdentifier> token =
            client.getDelegationToken(
                UserGroupInformation.getCurrentUser().getUserName());
        UserGroupInformation.getCurrentUser().addToken(token);
      }
      if (type.equals(ENTITY_DATA_TYPE)) {
        TimelinePutResponse response = client.putEntities(
            entities.getEntities().toArray(
                new TimelineEntity[entities.getEntities().size()]));
        if (response.getErrors().size() == 0) {
          LOG.info("Timeline entities are successfully put");
        } else {
          for (TimelinePutResponse.TimelinePutError error : response.getErrors()) {
            LOG.error("TimelineEntity [" + error.getEntityType() + ":" +
                error.getEntityId() + "] is not successfully put. Error code: " +
                error.getErrorCode());
          }
        }
      } else if (type.equals(DOMAIN_DATA_TYPE)) {
        boolean hasError = false;
        for (TimelineDomain domain : domains.getDomains()) {
          try {
            client.putDomain(domain);
          } catch (Exception e) {
            LOG.error("Error when putting domain " + domain.getId(), e);
            hasError = true;
          }
        }
        if (!hasError) {
          LOG.info("Timeline domains are successfully put");
        }
      }
    } catch(RuntimeException e) {
      LOG.error("Error when putting the timeline data", e);
    } catch (Exception e) {
      LOG.error("Error when putting the timeline data", e);
    } finally {
      client.stop();
    }
  }

  /**
   * Helper function to print out usage
   */
  private static void printUsage() {
    new HelpFormatter().printHelp("TimelineClient", opts);
  }

  @VisibleForTesting
  @Private
  public UserGroupInformation getUgi() {
    return authUgi;
  }

  @Override
  public TimelinePutResponse putEntities(ApplicationAttemptId appAttemptId,
      TimelineEntityGroupId groupId, TimelineEntity... entities)
      throws IOException, YarnException {
    if (Float.compare(this.timelineServiceVersion, 1.5f) != 0) {
      throw new YarnException(
        "This API is not supported under current Timeline Service Version: "
            + timelineServiceVersion);
    }

    return timelineWriter.putEntities(appAttemptId, groupId, entities);
  }

  @Override
  public void putDomain(ApplicationAttemptId appAttemptId,
      TimelineDomain domain) throws IOException, YarnException {
    if (Float.compare(this.timelineServiceVersion, 1.5f) != 0) {
      throw new YarnException(
        "This API is not supported under current Timeline Service Version: "
            + timelineServiceVersion);
    }
    timelineWriter.putDomain(appAttemptId, domain);
  }

  @Private
  @VisibleForTesting
  public void setTimelineWriter(TimelineWriter writer) {
    this.timelineWriter = writer;
  }

  @Private
  @VisibleForTesting
  public TimelineClientRetryOp
      createTimelineClientRetryOpForOperateDelegationToken(
          final PrivilegedExceptionAction<?> action) throws IOException {
    return new TimelineClientRetryOpForOperateDelegationToken(
        this.authUgi, action);
  }

  @Private
  @VisibleForTesting
  public class TimelineClientRetryOpForOperateDelegationToken
      extends TimelineClientRetryOp {

    private final UserGroupInformation authUgi;
    private final PrivilegedExceptionAction<?> action;

    public TimelineClientRetryOpForOperateDelegationToken(
        UserGroupInformation authUgi, PrivilegedExceptionAction<?> action) {
      this.authUgi = authUgi;
      this.action = action;
    }

    @Override
    public Object run() throws IOException {
      // Try pass the request, if fail, keep retrying
      authUgi.checkTGTAndReloginFromKeytab();
      try {
        return authUgi.doAs(action);
      } catch (UndeclaredThrowableException e) {
        throw new IOException(e.getCause());
      } catch (InterruptedException e) {
        throw new IOException(e);
      }
    }

    @Override
    public boolean shouldRetryOn(Exception e) {
      // retry on connection exceptions
      // and SocketTimeoutException
      return (e instanceof ConnectException
          || e instanceof SocketTimeoutException);
    }
  }

  private final class EntitiesHolder extends FutureTask<Void> {
    private final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities entities;
    private final boolean isSync;

    EntitiesHolder(
        final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities entities,
        final boolean isSync) {
      super(new Callable<Void>() {
        // publishEntities()
        public Void call() throws Exception {
          MultivaluedMap<String, String> params = new MultivaluedMapImpl();
          params.add("appid", contextAppId.toString());
          params.add("async", Boolean.toString(!isSync));
          putObjects("entities", params, entities);
          return null;
        }
      });
      this.entities = entities;
      this.isSync = isSync;
    }

    public boolean isSync() {
      return isSync;
    }

    public org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities getEntities() {
      return entities;
    }
  }

  /**
   * This class is responsible for collecting the timeline entities and
   * publishing them in async.
   */
  private class TimelineEntityDispatcher {
    /**
     * Time period for which the timelineclient will wait for draining after
     * stop
     */
    private static final long DRAIN_TIME_PERIOD = 2000L;

    private int numberOfAsyncsToMerge;
    private final BlockingQueue<EntitiesHolder> timelineEntityQueue;
    private ExecutorService executor;

    TimelineEntityDispatcher(Configuration conf) {
      timelineEntityQueue = new LinkedBlockingQueue<EntitiesHolder>();
      numberOfAsyncsToMerge =
          conf.getInt(YarnConfiguration.NUMBER_OF_ASYNC_ENTITIES_TO_MERGE,
              YarnConfiguration.DEFAULT_NUMBER_OF_ASYNC_ENTITIES_TO_MERGE);
    }

    Runnable createRunnable() {
      return new Runnable() {
        @Override
        public void run() {
          try {
            EntitiesHolder entitiesHolder;
            while (!Thread.currentThread().isInterrupted()) {
              // Merge all the async calls and make one push, but if its sync
              // call push immediately
              try {
                entitiesHolder = timelineEntityQueue.take();
              } catch (InterruptedException ie) {
                LOG.info("Timeline dispatcher thread was interrupted ");
                Thread.currentThread().interrupt();
                return;
              }
              if (entitiesHolder != null) {
                publishWithoutBlockingOnQueue(entitiesHolder);
              }
            }
          } finally {
            if (!timelineEntityQueue.isEmpty()) {
              LOG.info("Yet to publish " + timelineEntityQueue.size()
                  + " timelineEntities, draining them now. ");
            }
            // Try to drain the remaining entities to be published @ the max for
            // 2 seconds
            long timeTillweDrain =
                System.currentTimeMillis() + DRAIN_TIME_PERIOD;
            while (!timelineEntityQueue.isEmpty()) {
              publishWithoutBlockingOnQueue(timelineEntityQueue.poll());
              if (System.currentTimeMillis() > timeTillweDrain) {
                // time elapsed stop publishing further....
                if (!timelineEntityQueue.isEmpty()) {
                  LOG.warn("Time to drain elapsed! Remaining "
                      + timelineEntityQueue.size() + "timelineEntities will not"
                      + " be published");
                  // if some entities were not drained then we need interrupt
                  // the threads which had put sync EntityHolders to the queue.
                  EntitiesHolder nextEntityInTheQueue = null;
                  while ((nextEntityInTheQueue =
                      timelineEntityQueue.poll()) != null) {
                    nextEntityInTheQueue.cancel(true);
                  }
                }
                break;
              }
            }
          }
        }

        /**
         * Publishes the given EntitiesHolder and return immediately if sync
         * call, else tries to fetch the EntitiesHolder from the queue in non
         * blocking fashion and collate the Entities if possible before
         * publishing through REST.
         *
         * @param entitiesHolder
         */
        private void publishWithoutBlockingOnQueue(
            EntitiesHolder entitiesHolder) {
          if (entitiesHolder.isSync()) {
            entitiesHolder.run();
            return;
          }
          int count = 1;
          while (true) {
            // loop till we find a sync put Entities or there is nothing
            // to take
            EntitiesHolder nextEntityInTheQueue = timelineEntityQueue.poll();
            if (nextEntityInTheQueue == null) {
              // Nothing in the queue just publish and get back to the
              // blocked wait state
              entitiesHolder.run();
              break;
            } else if (nextEntityInTheQueue.isSync()) {
              // flush all the prev async entities first
              entitiesHolder.run();
              // and then flush the sync entity
              nextEntityInTheQueue.run();
              break;
            } else {
              // append all async entities together and then flush
              entitiesHolder.getEntities().addEntities(
                  nextEntityInTheQueue.getEntities().getEntities());
              count++;
              if (count == numberOfAsyncsToMerge) {
                // Flush the entities if the number of the async
                // putEntites merged reaches the desired limit. To avoid
                // collecting multiple entities and delaying for a long
                // time.
                entitiesHolder.run();
                break;
              }
            }
          }
        }
      };
    }

    public void dispatchEntities(boolean sync,
        org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity[] entitiesTobePublished)
            throws YarnException {
      if (executor.isShutdown()) {
        throw new YarnException("Timeline client is in the process of stopping,"
            + " not accepting any more TimelineEntities");
      }

      // wrap all TimelineEntity into TimelineEntities object
      org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities entities =
          new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities();
      for (org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity : entitiesTobePublished) {
        entities.addEntity(entity);
      }

      // created a holder and place it in queue
      EntitiesHolder entitiesHolder = new EntitiesHolder(entities, sync);
      try {
        timelineEntityQueue.put(entitiesHolder);
      } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
        throw new YarnException(
            "Failed while adding entity to the queue for publishing", e);
      }

      if (sync) {
        // In sync call we need to wait till its published and if any error then
        // throw it back
        try {
          entitiesHolder.get();
        } catch (ExecutionException e) {
          throw new YarnException("Failed while publishing entity",
              e.getCause());
        } catch (InterruptedException e) {
          Thread.currentThread().interrupt();
          throw new YarnException("Interrupted while publishing entity", e);
        }
      }
    }

    public void start() {
      executor = Executors.newSingleThreadExecutor();
      executor.execute(createRunnable());
    }

    public void stop() {
      LOG.info("Stopping TimelineClient.");
      executor.shutdownNow();
      try {
        executor.awaitTermination(DRAIN_TIME_PERIOD, TimeUnit.MILLISECONDS);
      } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
        e.printStackTrace();
      }
    }
  }
}
