/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * 
 *   http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.apache.vinci.transport;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketTimeoutException;

import org.apache.vinci.debug.Debug;

/**
 * Class for conjuring a Vinci service by host/port (that is, without interaction with the naming
 * service). Usually you want to use VinciClient, which extends this class to invoke a service by
 * (qualified) name.
 * 
 * Provides generic "send/recieve/sendAndReceive" for communicating arbitrary (transportable)
 * document models, and also specific "rpc" methods for more convenient support of the VinciFrame
 * document model.
 */
public class BaseClient {

  static public final int DEFAULT_SOCKET_TIMEOUT = 120000;

  static public final int DEFAULT_CONNECT_TIMEOUT = 30000;

  private String host = null;

  private int port = 0;

  private TransportableFactory factory;

  private Socket socket = null;

  private InputStream is = null;

  private OutputStream os = null;

  private KeyValuePair header = null;

  private int socketTimeout = DEFAULT_SOCKET_TIMEOUT;

  private int connectTimeout = DEFAULT_CONNECT_TIMEOUT;

  private boolean retry = true;

  /**
   * Open up the service at the specified host and port, using a VinciFrame factory.
   * 
   * @param h
   *          The hostname/ip address of the machine running the service.
   * @param p
   *          The port on which the service runs.
   * @throws IOException
   *           if the underlying socket fails to connect
   * 
   * @pre h != null
   * @pre p &ge; 0
   * @pre p &lt; 65536
   */
  public BaseClient(String h, int p) throws IOException {
    this(h, p, VinciFrame.getVinciFrameFactory());
  }

  /**
   * Open up the service at the specified host and port, using a VinciFrame factory.
   * 
   * @param h
   *          The hostname/ip address of the machine running the service.
   * @param p
   *          The port on which the service runs.
   * @param connect_timeout
   *          The number of milliseconds that will elapse before a connect attempt fails.
   * 
   * @throws IOException
   *           if the underlying socket fails to connect
   * 
   * @pre h != null
   * @pre p &ge; 0
   * @pre p &lt; 65536
   */
  public BaseClient(String h, int p, int connect_timeout) throws IOException {
    this(h, p, VinciFrame.getVinciFrameFactory(), connect_timeout);
  }

  /**
   * Open up the service at the specified host and port.
   * 
   * @param h
   *          The hostname/ip address of the machine running the service.
   * @param p
   *          The port on which the service runs.
   * @param f
   *          A factory for creating documents of the desired type.
   * @throws IOException
   *           if the underlying socket fails to connect.
   * 
   * @pre h != null
   * @pre f != null
   * @pre p &ge; 0
   * @pre p &lt; 65536
   */
  public BaseClient(String h, int p, TransportableFactory f) throws IOException {
    this.factory = f;
    this.host = h;
    this.port = p;
    open(host, port);
  }

  /**
   * Open up the service at the specified host and port, using the specified connect timeout.
   * 
   * @param h
   *          The hostname/ip address of the machine running the service.
   * @param p
   *          The port on which the service runs.
   * @param f
   *          A factory for creating documents of the desired type.
   * @param timeout
   *          The number of milliseconds that will elapse before a connect attempt fails.
   * @throws IOException
   *           if the underlying socket fails to connect.
   * 
   * @pre h != null
   * @pre f != null
   * @pre p &ge; 0
   * @pre p &lt; 65536
   * @pre timeout &gt; 0
   */
  public BaseClient(String h, int p, TransportableFactory f, int timeout) throws IOException {
    this.factory = f;
    this.host = h;
    this.port = p;
    connectTimeout = timeout;
    open(host, port);
  }

  /**
   * Create a base client without establishing a connection. This is useful for client classes which
   * extend this class and which to perform their own connection establishment. Uses a VinciFrame
   * factory.
   */
  public BaseClient() {
    this(VinciFrame.getVinciFrameFactory());
  }

  /**
   * Create a base client without establishing a connection. This is useful for client classes which
   * extend this class and which to perform their own connection establishment.
   * 
   * @param f
   *          A factory for creating documents of the desired type.
   * 
   * @pre f != null
   */
  public BaseClient(TransportableFactory f) {
    this.factory = f;
  }

  /**
   * Create a base client without establishing a connection. This is useful for client classes which
   * extend this class and which to perform their own connection establishment.
   * 
   * @param f
   *          A factory for creating documents of the desired type.
   * @param timeout
   *          The number of milliseconds that will elapse before a connect attempt fails.
   * 
   * @pre f != null
   */
  public BaseClient(TransportableFactory f, int timeout) {
    this.factory = f;
    this.connectTimeout = timeout;
  }

  /**
   * Get the hostname/ip address to which this client is connected.
   * @return -
   */
  public String getHost() {
    return host;
  }

  public int getPort() {
    return port;
  }

  /**
   * Get the socket used for the connection.
   * @return -
   */
  protected Socket getSocket() {
    return socket;
  }

  /**
   * Get the default timeout value for the socket (0 indicates never timeout, which is the default,
   * but generally NOT a good setting).
   * @return -
   */
  public int getSocketTimeout() {
    return socketTimeout;
  }

  /**
   * Set the timeout value used for connect timeouts. Note that if you use one of the
   * connection-inducing constructors, then this method has no effect unless a subsequent connection
   * attempt is made.
   * 
   * @param timeout
   *          The number of milliseconds that will elapse before a connect attempt fails.
   */
  public void setConnectTimeout(int timeout) {
    connectTimeout = timeout;
  }

  /**
   * Set the transportable factory used by this client.
   * @param f -
   */
  public void setTransportableFactory(TransportableFactory f) {
    factory = f;
  }

  /**
   * Takes a transportable object, sends it across the connection, then retrieves the response and
   * returns it.
   * 
   * @param in
   *          The query frame.
   * @return A transportable representing the result, its specific type determined by the factory
   *         provided through BaseClient's constructor.
   * @exception IOException
   *              thrown by the underlying socket IO (e.g. due to connection timeout).
   * @exception ServiceException
   *              thrown if the server threw ServiceException or returned an ErrorFrame.
   * 
   * @pre in != null
   * @pre getHost() != null
   */
  public Transportable sendAndReceive(Transportable in) throws IOException, ServiceException {
    try {
      try {
        if (!isOpen()) {
          open();
        }
        return sendAndReceiveWork(in, factory);
      } catch (SocketTimeoutException e) {
        // Don't retry on socket timeout - service might be overloaded
        throw e;
      } catch (IOException e) {
        if (retry) {
          reopen(e);
          return sendAndReceiveWork(in, factory);
        } else {
          throw e;
        }
      }
    } catch (SocketTimeoutException e) {
      // Close on timeout to make sure buffers are cleared if client reused
      close();
      throw e;
    }
  }

  /**
   * Takes a transportable object, sends it across the connection, then retrieves the response and
   * returns it.
   * 
   * @param in
   *          The query frame.
   * @param timeout
   *          The timeout value to use in place of this client's default timeout setting.
   * @return A transportable representing the result, its specific type determined by the factory
   *         provided through BaseClient's constructor.
   * @throws IOException
   *           thrown by the underlying socket IO (e.g. due to connection timeout).
   * @throws ServiceException
   *           thrown if the server threw ServiceException or returned an ErrorFrame.
   * 
   * @pre getHost() != null
   */
  public Transportable sendAndReceive(Transportable in, int timeout) throws IOException,
          ServiceException {
    try {
      try {
        if (!isOpen()) {
          open();
        }
        return sendAndReceiveWork(in, factory, timeout);
      } catch (SocketTimeoutException e) {
        // Don't retry on socket timeout - service might be overloaded
        throw e;
      } catch (IOException e) {
        if (retry) {
          reopen(e);
          return sendAndReceiveWork(in, factory, timeout);
        } else {
          throw e;
        }
      }
    } catch (SocketTimeoutException e) {
      // Close on timeout to make sure buffers are cleared if client reused
      close();
      throw e;
    }
  }

  /**
   * Same as sendAndReceive(Transportable) except the provided factory is used to create the return
   * document in place of the default factory.
   * 
   * 
   * @pre f != null
   * @pre in != null
   * 
   * @pre getHost() != null
   * @param in -
   * @param f
   *          The factory to used to create the return document.
   * @return -
   * @throws IOException -
   * @throws ServiceException -
   */
  public Transportable sendAndReceive(Transportable in, TransportableFactory f) throws IOException,
          ServiceException {
    try {
      try {
        if (!isOpen()) {
          open();
        }
        return sendAndReceiveWork(in, f);
      } catch (SocketTimeoutException e) {
        // Don't retry on socket timeout - service might be overloaded
        throw e;
      } catch (IOException e) {
        if (retry) {
          reopen(e);
          return sendAndReceiveWork(in, f);
        } else {
          throw e;
        }
      }
    } catch (SocketTimeoutException e) {
      // Close on timeout to make sure buffers are cleared if client reused
      close();
      throw e;
    }
  }

  /**
   * Same as sendAndReceive(Transportable, timeout) except the provided factory is used to create
   * the return document in place of the default factory.
   * 
   * @param in -
   * @param f  The factory to used to create the return document.
   * @param timeout -
   * @return -
   * @throws IOException -
   * @throws ServiceException -
   * @pre in != null
   * @pre f != null
   * @pre getHost() != null
   */
  public Transportable sendAndReceive(Transportable in, TransportableFactory f, int timeout)
          throws IOException, ServiceException {
    try {
      try {
        if (!isOpen()) {
          open();
        }
        return sendAndReceiveWork(in, f, timeout);
      } catch (SocketTimeoutException e) {
        // Don't retry on socket timeout - service might be overloaded
        throw e;
      } catch (IOException e) {
        if (retry) {
          reopen(e);
          return sendAndReceiveWork(in, f, timeout);
        } else {
          throw e;
        }
      }
    } catch (SocketTimeoutException e) {
      // Close on timeout to make sure buffers are cleared if client reused
      close();
      throw e;
    }
  }

  /**
   * Same as sendAndReceive(Transportable) except for return type. Syntactic sugar method for the
   * case where return result is known to be VinciFrame (eliminates the need for casting in the
   * typical usage case).
   * 
   * @return A VinciFrame representing the service result.
   * 
   * @pre query != null
   * @pre getHost() != null
   * @param query -
   * @return -
   * @throws IOException -
   * @throws ServiceException -
   */
  public VinciFrame rpc(Transportable query) throws IOException, ServiceException {
    return (VinciFrame) sendAndReceive(query);
  }

  /**
   * Same as sendAndReceive(Transportable, timeout) except for return type. Syntactic sugar method
   * for the case where return result is known to be VinciFrame (eliminates the need for casting in
   * the typical usage case).
   * @param query -
   * @param timeout - 
   * @return A VinciFrame representing the service result.
   * @throws IOException -
   * @throws ServiceException -
   * 
   * @pre query != null
   * @pre getHost() != null
   */
  public VinciFrame rpc(Transportable query, int timeout) throws IOException, ServiceException {
    return (VinciFrame) sendAndReceive(query, timeout);
  }

  /**
   * Support for 1/2 transaction RPC. This allows interaction with an asynchronous ("receive only")
   * service, or for the sender to simply do something else before coming back and receiving the
   * result (though at the risk of timeouts!).
   * 
   * @param in
   *          The Transportable to send.
   * @throws IOException
   *           Thrown by the underlying transport layer.
   * 
   * @pre in != null
   * @pre getHost() != null
   * @pre os != null
   */
  public void send(Transportable in) throws IOException {
    try {
      try {
        if (!isOpen()) {
          open();
        }
        in.toStream(os);
        os.flush();
      } catch (SocketTimeoutException e) {
        // Don't retry on socket timeout - service might be overloaded
        throw e;
      } catch (IOException e) {
        if (retry) {
          reopen(e);
          in.toStream(os);
          os.flush();
        } else {
          throw e;
        }
      }
    } catch (SocketTimeoutException e) {
      // Close on timeout to make sure buffers are cleared if client reused
      close();
      throw e;
    }
  }

  /**
   * The other 1/2 of the split RPC. This allows for interaction with an asynchronous "publish only"
   * service, or simply picks up a result queried for earlier via send().
   * 
   * @return The Transportable requested.
   * @throws IOException
   *           Thrown by the underlying transport layer, or the socket is closed.
   * @throws ServiceException
   *           Thrown if the remote server responded with an error frame.
   * 
   * @pre is != null
   */
  public Transportable receive() throws IOException, ServiceException {
    if (!isOpen()) {
      throw new IOException("Socket not open");
    }
    Transportable out = factory.makeTransportable();
    header = out.fromStream(is);
    if (header != null && header.key.equals(TransportConstants.ERROR_KEY)) {
      throw new ServiceException(header.getValueAsString(), out);
    }
    return out;
  }

  /**
   * Close the connection. Using the Client object after this will throw an exception.
   */
  public void close() {
    if (isOpen()) {
      try {
        socket.close();
        is = null;
        os = null;
        socket = null;
      } catch (IOException e) {
        Debug.reportException(e, "Could not close connection.");
      }
    }
  }

  public boolean isOpen() {
    return socket != null && os != null && is != null;
  }

  /**
   * Set connection restablishment on IOException to on/off, default is ON. This way, by default,
   * BaseClient attempts to reopen a connection at most once if it receives an IOException which can
   * happen, for example, from the connection timing out.
   * @param to -
   */
  public void setRetry(boolean to) {
    retry = to;
  }

  /**
   * Fetch the header of the last Transportable received.
   * @return -
   */
  public KeyValuePair getHeader() {
    return header;
  }

  /**
   * Convenience method for "one-shot" or "single-query" connections.
   * 
   * @pre host_name != null
   * @pre f != null
   * @pre p &ge; 0
   * @pre p &lt; 65536
   * @param in -
   * @param host_name -
   * @param p -
   * @param f -
   * @return -
   * @throws IOException -
   * @throws ServiceException -
   */
  static public Transportable sendAndReceive(Transportable in, String host_name, int p,
          TransportableFactory f) throws IOException, ServiceException {
    BaseClient tempClient = new BaseClient(host_name, p, f);
    tempClient.setRetry(false);
    try {
      return tempClient.sendAndReceive(in);
    } finally {
      tempClient.close();
    }
  }

  /**
   * Convenience method for "one-shot" or "single-query" connections with socket timeout support.
   * 
   * @pre in != null
   * @pre host_name != null
   * @pre f != null
   * @pre p &ge; 0
   * @pre p &lt; 65536
   * @param in -
   * @param host_name -
   * @param p -
   * @param f -
   * @param socket_timeout -
   * @return -
   * @throws IOException -
   * @throws ServiceException -
   */
  static public Transportable sendAndReceive(Transportable in, String host_name, int p,
          TransportableFactory f, int socket_timeout) throws IOException, ServiceException {
    BaseClient tempClient = new BaseClient(host_name, p, f);
    tempClient.setSocketTimeout(socket_timeout);
    tempClient.setRetry(false);
    try {
      return tempClient.sendAndReceive(in);
    } finally {
      tempClient.close();
    }
  }

  /**
   * Convenience method for "one-shot" or "single-query" connections with socket timeout support 
   * &amp; connect timeout support.
   * 
   * @pre in != null
   * @pre host_name != null
   * @pre f != null
   * @pre p &ge; 0
   * @pre p &lt; 65536
   * @param in -
   * @param host_name -
   * @param p -
   * @param f -
   * @param socket_timeout -
   * @param connect_timeout -
   * @return -
   * @throws IOException -
   * @throws ServiceException -
   */
  static public Transportable sendAndReceive(Transportable in, String host_name, int p,
          TransportableFactory f, int socket_timeout, int connect_timeout) throws IOException,
          ServiceException {
    BaseClient tempClient = new BaseClient(host_name, p, f, connect_timeout);
    tempClient.setSocketTimeout(socket_timeout);
    tempClient.setRetry(false);
    try {
      return tempClient.sendAndReceive(in);
    } finally {
      tempClient.close();
    }
  }

  /**
   * Convenience method for "one-shot" or "single-query" connections. Same as sendAndReceive except
   * uses VinciFrame factory so return type is known to be VinciFrame.
   * 
   * @pre in != null
   * @pre host_name != null
   * @pre p &ge; 0
   * @pre p &lt; 65536
   * @param in -
   * @param host_name -
   * @param p -
   * @return -
   * @throws IOException -
   * @throws ServiceException -
   */
  static public VinciFrame rpc(Transportable in, String host_name, int p) throws IOException,
          ServiceException {
    return (VinciFrame) sendAndReceive(in, host_name, p, VinciFrame.getVinciFrameFactory());
  }

  /**
   * Convenience method for "one-shot" or "single-query" connections. Same as sendAndReceive except
   * uses VinciFrame factory so return type is known to be VinciFrame.
   * 
   * @pre host_name != null
   * @pre in != null
   * @pre p &ge; 0
   * @pre p &lt; 65536
   * @param in -
   * @param host_name -
   * @param p -
   * @param socket_timeout -
   * @return -
   * @throws IOException -
   * @throws ServiceException -
   */
  static public VinciFrame rpc(Transportable in, String host_name, int p, int socket_timeout)
          throws IOException, ServiceException {
    return (VinciFrame) sendAndReceive(in, host_name, p, VinciFrame.getVinciFrameFactory(),
            socket_timeout);
  }

  /**
   * Convenience method for "one-shot" or "single-query" connections. Same as sendAndReceive except
   * uses VinciFrame factory so return type is known to be VinciFrame.
   * 
   * @pre host_name != null
   * @pre in != null
   * @pre p &ge; 0
   * @pre p &lt; 65536
   * @param in -
   * @param host_name -
   * @param p -
   * @param socket_timeout -
   * @param connect_timeout -
   * @return -
   * @throws IOException -
   * @throws ServiceException -
   */
  static public VinciFrame rpc(Transportable in, String host_name, int p, int socket_timeout,
          int connect_timeout) throws IOException, ServiceException {
    return (VinciFrame) sendAndReceive(in, host_name, p, VinciFrame.getVinciFrameFactory(),
            socket_timeout, connect_timeout);
  }

  /**
   * @pre in != null
   * @pre f != null
   * @pre getHost() != null
   * @pre os != null
   * @pre is != null
   * @param in -
   * @param f -
   * @return -
   * @throws IOException -
   * @throws ServiceException -
   */
  protected Transportable sendAndReceiveWork(Transportable in, TransportableFactory f)
          throws IOException, ServiceException {
    in.toStream(os);
    os.flush();
    Transportable out = f.makeTransportable();
    header = out.fromStream(is);
    if (header != null && header.key.equals(TransportConstants.ERROR_KEY)) {
      throw new ServiceException(header.getValueAsString(), out);
    }
    return out;
  }

  /**
   * @pre in != null
   * @pre f != null
   * @pre socket != null
   * @param in -
   * @param f -
   * @param timeout -
   * @return -
   * @throws IOException -
   * @throws ServiceException -
   */
  protected Transportable sendAndReceiveWork(Transportable in, TransportableFactory f, int timeout)
          throws IOException, ServiceException {
    socket.setSoTimeout(timeout);
    try {
      return sendAndReceiveWork(in, f);
    } finally {
      // Restore default timeout setting.
      socket.setSoTimeout(socketTimeout);
    }
  }

  /**
   * @pre e != null
   * @pre getHost() != null
   * @param e -
   * @throws IOException -
   */
  protected void reopen(Exception e) throws IOException {
    Debug.p("Trying to reopen connection due to exception: " + e.getMessage());
    // Make sure connection is closed.
    close();
    open();
  }

  /**
   * Connects the client to the specified host and port.
   * 
   * @param h
   *          The hostname/ip address of the server to connect to.
   * @param p
   *          The port to connect to.
   * 
   * @throws IOException
   *           Thrown by underlying Socket open() call.
   * 
   * @pre h != null
   * @pre p &ge; 0
   * @pre p &lt; 65536
   */
  protected final void open(String h, int p) throws IOException {
    this.host = h;
    this.port = p;
    open();
  }

  /**
   * (Re)connects the client to a previously specified host and port. Should only be called if this
   * client has been previously closed via a call to "close".
   * 
   * @throws IOException
   *           Thrown by underlying Socket open() call.
   * 
   * @pre getHost() != null
   */
  public final void open() throws IOException {
    socket = new Socket();
    InetAddress addr = InetAddress.getByName(host);
    InetSocketAddress socketAddress = new InetSocketAddress(addr, port);
    socket.connect(socketAddress, connectTimeout);
    socket.setTcpNoDelay(true); // needed to avoid delays with Linux/loopback
    socket.setSoTimeout(socketTimeout);
    socket.setKeepAlive(isSocketKeepAliveEnabled());    
    is = new BufferedInputStream(socket.getInputStream());
    os = new BufferedOutputStream(socket.getOutputStream());
  }

  /**
   * Make this client use an already established socket connection. If you use this open method,
   * then setRetry is set to false. Resetting it to true will cause problems since the client does
   * not know how to reopen the connection.
   * 
   * @param use_me
   *          The socket to use.
   * 
   * @exception IOException
   *              Thrown by underlying Socket open() call.
   * 
   * @pre use_me != null
   */
  public void open(Socket use_me) throws IOException {
    setRetry(false);
    socket = use_me;
    socket.setSoTimeout(socketTimeout);
    socket.setKeepAlive(isSocketKeepAliveEnabled());    
    is = new BufferedInputStream(socket.getInputStream());
    os = new BufferedOutputStream(socket.getOutputStream());
  }

  /**
   * Set the timeout value used by the underlying socket. Default is 2 minutes.
   * @param millis -
   * @throws IOException -
   */
  public void setSocketTimeout(int millis) throws IOException {
    if (socket != null) {
      socket.setSoTimeout(millis);
    }
    socketTimeout = millis;
  }
  
  /**
   * Gets whether keepAlive should be turned on for client sockets.
   * Always returns true for BaseClient.  Can be overridden in
   * subclasses.
   *
   *@return whether socket keepAlive should be turned on
   */
  protected boolean isSocketKeepAliveEnabled() {
    return true;
  }

} // end class BaseClient
