/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.beam.sdk.io.jms;

import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument;

import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import javax.annotation.Nullable;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.Read.Unbounded;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
import org.joda.time.Duration;
import org.joda.time.Instant;

/**
 * An unbounded source for JMS destinations (queues or topics).
 *
 * <h3>Reading from a JMS destination</h3>
 *
 * <p>JmsIO source returns unbounded collection of JMS records as {@code PCollection<JmsRecord>}. A
 * {@link JmsRecord} includes JMS headers and properties, along with the JMS {@link
 * javax.jms.TextMessage} payload.
 *
 * <p>To configure a JMS source, you have to provide a {@link javax.jms.ConnectionFactory} and the
 * destination (queue or topic) where to consume. The following example illustrates various options
 * for configuring the source:
 *
 * <pre>{@code
 * pipeline.apply(JmsIO.read()
 *    .withConnectionFactory(myConnectionFactory)
 *    .withQueue("my-queue")
 *    // above two are required configuration, returns PCollection<JmsRecord>
 *
 *    // rest of the settings are optional
 *
 * }</pre>
 *
 * <p>It is possible to read any type of JMS {@link javax.jms.Message} into a custom POJO using the
 * following configuration:
 *
 * <pre>{@code
 * pipeline.apply(JmsIO.<T>readMessage()
 *    .withConnectionFactory(myConnectionFactory)
 *    .withQueue("my-queue")
 *    .withMessageMapper((MessageMapper<T>) message -> {
 *      // code that maps message to T
 *    })
 *    .withCoder(
 *      // a coder for T
 *    )
 *
 * }</pre>
 *
 * <h3>Writing to a JMS destination</h3>
 *
 * <p>JmsIO sink supports writing text messages to a JMS destination on a broker. To configure a JMS
 * sink, you must specify a {@link javax.jms.ConnectionFactory} and a {@link javax.jms.Destination}
 * name. For instance:
 *
 * <pre>{@code
 * pipeline
 *   .apply(...) // returns PCollection<String>
 *   .apply(JmsIO.write()
 *       .withConnectionFactory(myConnectionFactory)
 *       .withQueue("my-queue")
 *
 * }</pre>
 */
@Experimental(Experimental.Kind.SOURCE_SINK)
public class JmsIO {

  public static Read<JmsRecord> read() {
    return new AutoValue_JmsIO_Read.Builder<JmsRecord>()
        .setMaxNumRecords(Long.MAX_VALUE)
        .setCoder(SerializableCoder.of(JmsRecord.class))
        .setMessageMapper(
            (MessageMapper<JmsRecord>)
                new MessageMapper<JmsRecord>() {

                  @Override
                  public JmsRecord mapMessage(Message message) throws Exception {
                    TextMessage textMessage = (TextMessage) message;
                    Map<String, Object> properties = new HashMap<>();
                    @SuppressWarnings("rawtypes")
                    Enumeration propertyNames = textMessage.getPropertyNames();
                    while (propertyNames.hasMoreElements()) {
                      String propertyName = (String) propertyNames.nextElement();
                      properties.put(propertyName, textMessage.getObjectProperty(propertyName));
                    }

                    return new JmsRecord(
                        textMessage.getJMSMessageID(),
                        textMessage.getJMSTimestamp(),
                        textMessage.getJMSCorrelationID(),
                        textMessage.getJMSReplyTo(),
                        textMessage.getJMSDestination(),
                        textMessage.getJMSDeliveryMode(),
                        textMessage.getJMSRedelivered(),
                        textMessage.getJMSType(),
                        textMessage.getJMSExpiration(),
                        textMessage.getJMSPriority(),
                        properties,
                        textMessage.getText());
                  }
                })
        .build();
  }

  public static <T> Read<T> readMessage() {
    return new AutoValue_JmsIO_Read.Builder<T>().setMaxNumRecords(Long.MAX_VALUE).build();
  }

  public static Write write() {
    return new AutoValue_JmsIO_Write.Builder().build();
  }

  /**
   * A {@link PTransform} to read from a JMS destination. See {@link JmsIO} for more information on
   * usage and configuration.
   */
  @AutoValue
  public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>> {

    /**
     * NB: According to http://docs.oracle.com/javaee/1.4/api/javax/jms/ConnectionFactory.html "It
     * is expected that JMS providers will provide the tools an administrator needs to create and
     * configure administered objects in a JNDI namespace. JMS provider implementations of
     * administered objects should be both javax.jndi.Referenceable and java.io.Serializable so that
     * they can be stored in all JNDI naming contexts. In addition, it is recommended that these
     * implementations follow the JavaBeansTM design patterns."
     *
     * <p>So, a {@link ConnectionFactory} implementation is serializable.
     */
    @Nullable
    abstract ConnectionFactory getConnectionFactory();

    @Nullable
    abstract String getQueue();

    @Nullable
    abstract String getTopic();

    @Nullable
    abstract String getUsername();

    @Nullable
    abstract String getPassword();

    abstract long getMaxNumRecords();

    @Nullable
    abstract Duration getMaxReadTime();

    @Nullable
    abstract MessageMapper<T> getMessageMapper();

    @Nullable
    abstract Coder<T> getCoder();

    abstract Builder<T> builder();

    @AutoValue.Builder
    abstract static class Builder<T> {
      abstract Builder<T> setConnectionFactory(ConnectionFactory connectionFactory);

      abstract Builder<T> setQueue(String queue);

      abstract Builder<T> setTopic(String topic);

      abstract Builder<T> setUsername(String username);

      abstract Builder<T> setPassword(String password);

      abstract Builder<T> setMaxNumRecords(long maxNumRecords);

      abstract Builder<T> setMaxReadTime(Duration maxReadTime);

      abstract Builder<T> setMessageMapper(MessageMapper<T> mesageMapper);

      abstract Builder<T> setCoder(Coder<T> coder);

      abstract Read<T> build();
    }

    /**
     * Specify the JMS connection factory to connect to the JMS broker.
     *
     * <p>For instance:
     *
     * <pre>{@code
     * pipeline.apply(JmsIO.read().withConnectionFactory(myConnectionFactory)
     *
     * }</pre>
     *
     * @param connectionFactory The JMS {@link ConnectionFactory}.
     * @return The corresponding {@link JmsIO.Read}.
     */
    public Read<T> withConnectionFactory(ConnectionFactory connectionFactory) {
      checkArgument(connectionFactory != null, "connectionFactory can not be null");
      return builder().setConnectionFactory(connectionFactory).build();
    }

    /**
     * Specify the JMS queue destination name where to read messages from. The {@link JmsIO.Read}
     * acts as a consumer on the queue.
     *
     * <p>This method is exclusive with {@link JmsIO.Read#withTopic(String)}. The user has to
     * specify a destination: queue or topic.
     *
     * <p>For instance:
     *
     * <pre>{@code
     * pipeline.apply(JmsIO.read().withQueue("my-queue")
     *
     * }</pre>
     *
     * @param queue The JMS queue name where to read messages from.
     * @return The corresponding {@link JmsIO.Read}.
     */
    public Read<T> withQueue(String queue) {
      checkArgument(queue != null, "queue can not be null");
      return builder().setQueue(queue).build();
    }

    /**
     * Specify the JMS topic destination name where to receive messages from. The {@link JmsIO.Read}
     * acts as a subscriber on the topic.
     *
     * <p>This method is exclusive with {@link JmsIO.Read#withQueue(String)}. The user has to
     * specify a destination: queue or topic.
     *
     * <p>For instance:
     *
     * <pre>{@code
     * pipeline.apply(JmsIO.read().withTopic("my-topic")
     *
     * }</pre>
     *
     * @param topic The JMS topic name.
     * @return The corresponding {@link JmsIO.Read}.
     */
    public Read<T> withTopic(String topic) {
      checkArgument(topic != null, "topic can not be null");
      return builder().setTopic(topic).build();
    }

    /** Define the username to connect to the JMS broker (authenticated). */
    public Read<T> withUsername(String username) {
      checkArgument(username != null, "username can not be null");
      return builder().setUsername(username).build();
    }

    /** Define the password to connect to the JMS broker (authenticated). */
    public Read<T> withPassword(String password) {
      checkArgument(password != null, "password can not be null");
      return builder().setPassword(password).build();
    }

    /**
     * Define the max number of records that the source will read. Using a max number of records
     * different from {@code Long.MAX_VALUE} means the source will be {@code Bounded}, and will stop
     * once the max number of records read is reached.
     *
     * <p>For instance:
     *
     * <pre>{@code
     * pipeline.apply(JmsIO.read().withNumRecords(1000)
     *
     * }</pre>
     *
     * @param maxNumRecords The max number of records to read from the JMS destination.
     * @return The corresponding {@link JmsIO.Read}.
     */
    public Read<T> withMaxNumRecords(long maxNumRecords) {
      checkArgument(maxNumRecords >= 0, "maxNumRecords must be > 0, but was: %s", maxNumRecords);
      return builder().setMaxNumRecords(maxNumRecords).build();
    }

    /**
     * Define the max read time that the source will read. Using a non null max read time duration
     * means the source will be {@code Bounded}, and will stop once the max read time is reached.
     *
     * <p>For instance:
     *
     * <pre>{@code
     * pipeline.apply(JmsIO.read().withMaxReadTime(Duration.minutes(10))
     *
     * }</pre>
     *
     * @param maxReadTime The max read time duration.
     * @return The corresponding {@link JmsIO.Read}.
     */
    public Read<T> withMaxReadTime(Duration maxReadTime) {
      checkArgument(maxReadTime != null, "maxReadTime can not be null");
      return builder().setMaxReadTime(maxReadTime).build();
    }

    public Read<T> withMessageMapper(MessageMapper<T> messageMapper) {
      checkArgument(messageMapper != null, "messageMapper can not be null");
      return builder().setMessageMapper(messageMapper).build();
    }

    public Read<T> withCoder(Coder<T> coder) {
      checkArgument(coder != null, "coder can not be null");
      return builder().setCoder(coder).build();
    }

    @Override
    public PCollection<T> expand(PBegin input) {
      checkArgument(getConnectionFactory() != null, "withConnectionFactory() is required");
      checkArgument(
          getQueue() != null || getTopic() != null,
          "Either withQueue() or withTopic() is required");
      checkArgument(
          getQueue() == null || getTopic() == null, "withQueue() and withTopic() are exclusive");
      checkArgument(getMessageMapper() != null, "withMessageMapper() is required");
      checkArgument(getCoder() != null, "withCoder() is required");

      // handles unbounded source to bounded conversion if maxNumRecords is set.
      Unbounded<T> unbounded = org.apache.beam.sdk.io.Read.from(createSource());

      PTransform<PBegin, PCollection<T>> transform = unbounded;

      if (getMaxNumRecords() < Long.MAX_VALUE || getMaxReadTime() != null) {
        transform =
            unbounded.withMaxReadTime(getMaxReadTime()).withMaxNumRecords(getMaxNumRecords());
      }

      return input.getPipeline().apply(transform);
    }

    @Override
    public void populateDisplayData(DisplayData.Builder builder) {
      super.populateDisplayData(builder);
      builder.addIfNotNull(DisplayData.item("queue", getQueue()));
      builder.addIfNotNull(DisplayData.item("topic", getTopic()));
    }

    ///////////////////////////////////////////////////////////////////////////////////////

    /**
     * Creates an {@link UnboundedSource UnboundedSource&lt;JmsRecord, ?&gt;} with the configuration
     * in {@link Read}. Primary use case is unit tests, should not be used in an application.
     */
    @VisibleForTesting
    UnboundedSource<T, JmsCheckpointMark> createSource() {
      return new UnboundedJmsSource<T>(this);
    }
  }

  private JmsIO() {}

  /**
   * An interface used by {@link JmsIO.Read} for converting each jms {@link Message} into an element
   * of the resulting {@link PCollection}.
   */
  @FunctionalInterface
  public interface MessageMapper<T> extends Serializable {
    T mapMessage(Message message) throws Exception;
  }

  /** An unbounded JMS source. */
  @VisibleForTesting
  protected static class UnboundedJmsSource<T> extends UnboundedSource<T, JmsCheckpointMark> {

    private final Read<T> spec;

    public UnboundedJmsSource(Read<T> spec) {
      this.spec = spec;
    }

    @Override
    public List<UnboundedJmsSource<T>> split(int desiredNumSplits, PipelineOptions options)
        throws Exception {
      List<UnboundedJmsSource<T>> sources = new ArrayList<>();
      if (spec.getTopic() != null) {
        // in the case of a topic, we create a single source, so an unique subscriber, to avoid
        // element duplication
        sources.add(new UnboundedJmsSource<T>(spec));
      } else {
        // in the case of a queue, we allow concurrent consumers
        for (int i = 0; i < desiredNumSplits; i++) {
          sources.add(new UnboundedJmsSource<T>(spec));
        }
      }
      return sources;
    }

    @Override
    public UnboundedJmsReader<T> createReader(
        PipelineOptions options, JmsCheckpointMark checkpointMark) {
      return new UnboundedJmsReader<T>(this, checkpointMark);
    }

    @Override
    public Coder<JmsCheckpointMark> getCheckpointMarkCoder() {
      return AvroCoder.of(JmsCheckpointMark.class);
    }

    @Override
    public Coder<T> getOutputCoder() {
      return this.spec.getCoder();
    }
  }

  @VisibleForTesting
  static class UnboundedJmsReader<T> extends UnboundedReader<T> {

    private UnboundedJmsSource<T> source;
    private JmsCheckpointMark checkpointMark;
    private Connection connection;
    private Session session;
    private MessageConsumer consumer;

    private T currentMessage;
    private Instant currentTimestamp;

    public UnboundedJmsReader(UnboundedJmsSource<T> source, JmsCheckpointMark checkpointMark) {
      this.source = source;
      if (checkpointMark != null) {
        this.checkpointMark = checkpointMark;
      } else {
        this.checkpointMark = new JmsCheckpointMark();
      }
      this.currentMessage = null;
    }

    @Override
    public boolean start() throws IOException {
      Read<T> spec = source.spec;
      ConnectionFactory connectionFactory = spec.getConnectionFactory();
      try {
        Connection connection;
        if (spec.getUsername() != null) {
          connection = connectionFactory.createConnection(spec.getUsername(), spec.getPassword());
        } else {
          connection = connectionFactory.createConnection();
        }
        connection.start();
        this.connection = connection;
      } catch (Exception e) {
        throw new IOException("Error connecting to JMS", e);
      }

      try {
        this.session = this.connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
      } catch (Exception e) {
        throw new IOException("Error creating JMS session", e);
      }

      try {
        if (spec.getTopic() != null) {
          this.consumer = this.session.createConsumer(this.session.createTopic(spec.getTopic()));
        } else {
          this.consumer = this.session.createConsumer(this.session.createQueue(spec.getQueue()));
        }
      } catch (Exception e) {
        throw new IOException("Error creating JMS consumer", e);
      }

      return advance();
    }

    @Override
    public boolean advance() throws IOException {
      try {
        Message message = this.consumer.receiveNoWait();

        if (message == null) {
          currentMessage = null;
          return false;
        }

        checkpointMark.addMessage(message);

        currentMessage = this.source.spec.getMessageMapper().mapMessage(message);
        currentTimestamp = new Instant(message.getJMSTimestamp());

        return true;
      } catch (Exception e) {
        throw new IOException(e);
      }
    }

    @Override
    public T getCurrent() throws NoSuchElementException {
      if (currentMessage == null) {
        throw new NoSuchElementException();
      }
      return currentMessage;
    }

    @Override
    public Instant getWatermark() {
      return checkpointMark.getOldestPendingTimestamp();
    }

    @Override
    public Instant getCurrentTimestamp() {
      if (currentMessage == null) {
        throw new NoSuchElementException();
      }
      return currentTimestamp;
    }

    @Override
    public CheckpointMark getCheckpointMark() {
      return checkpointMark;
    }

    @Override
    public UnboundedSource<T, ?> getCurrentSource() {
      return source;
    }

    @Override
    public void close() throws IOException {
      try {
        if (consumer != null) {
          consumer.close();
          consumer = null;
        }
        if (session != null) {
          session.close();
          session = null;
        }
        if (connection != null) {
          connection.stop();
          connection.close();
          connection = null;
        }
      } catch (Exception e) {
        throw new IOException(e);
      }
    }
  }

  /**
   * A {@link PTransform} to write to a JMS queue. See {@link JmsIO} for more information on usage
   * and configuration.
   */
  @AutoValue
  public abstract static class Write extends PTransform<PCollection<String>, PDone> {

    @Nullable
    abstract ConnectionFactory getConnectionFactory();

    @Nullable
    abstract String getQueue();

    @Nullable
    abstract String getTopic();

    @Nullable
    abstract String getUsername();

    @Nullable
    abstract String getPassword();

    abstract Builder builder();

    @AutoValue.Builder
    abstract static class Builder {
      abstract Builder setConnectionFactory(ConnectionFactory connectionFactory);

      abstract Builder setQueue(String queue);

      abstract Builder setTopic(String topic);

      abstract Builder setUsername(String username);

      abstract Builder setPassword(String password);

      abstract Write build();
    }

    /**
     * Specify the JMS connection factory to connect to the JMS broker.
     *
     * <p>For instance:
     *
     * <pre>{@code
     * .apply(JmsIO.write().withConnectionFactory(myConnectionFactory)
     *
     * }</pre>
     *
     * @param connectionFactory The JMS {@link ConnectionFactory}.
     * @return The corresponding {@link JmsIO.Read}.
     */
    public Write withConnectionFactory(ConnectionFactory connectionFactory) {
      checkArgument(connectionFactory != null, "connectionFactory can not be null");
      return builder().setConnectionFactory(connectionFactory).build();
    }

    /**
     * Specify the JMS queue destination name where to send messages to. The {@link JmsIO.Write}
     * acts as a producer on the queue.
     *
     * <p>This method is exclusive with {@link JmsIO.Write#withTopic(String)}. The user has to
     * specify a destination: queue or topic.
     *
     * <p>For instance:
     *
     * <pre>{@code
     * .apply(JmsIO.write().withQueue("my-queue")
     *
     * }</pre>
     *
     * @param queue The JMS queue name where to send messages to.
     * @return The corresponding {@link JmsIO.Read}.
     */
    public Write withQueue(String queue) {
      checkArgument(queue != null, "queue can not be null");
      return builder().setQueue(queue).build();
    }

    /**
     * Specify the JMS topic destination name where to send messages to. The {@link JmsIO.Read} acts
     * as a publisher on the topic.
     *
     * <p>This method is exclusive with {@link JmsIO.Write#withQueue(String)}. The user has to
     * specify a destination: queue or topic.
     *
     * <p>For instance:
     *
     * <pre>{@code
     * .apply(JmsIO.write().withTopic("my-topic")
     *
     * }</pre>
     *
     * @param topic The JMS topic name.
     * @return The corresponding {@link JmsIO.Read}.
     */
    public Write withTopic(String topic) {
      checkArgument(topic != null, "topic can not be null");
      return builder().setTopic(topic).build();
    }

    /** Define the username to connect to the JMS broker (authenticated). */
    public Write withUsername(String username) {
      checkArgument(username != null, "username can not be null");
      return builder().setUsername(username).build();
    }

    /** Define the password to connect to the JMS broker (authenticated). */
    public Write withPassword(String password) {
      checkArgument(password != null, "password can not be null");
      return builder().setPassword(password).build();
    }

    @Override
    public PDone expand(PCollection<String> input) {
      checkArgument(getConnectionFactory() != null, "withConnectionFactory() is required");
      checkArgument(
          getQueue() != null || getTopic() != null,
          "Either withQueue(queue) or withTopic(topic) is required");
      checkArgument(
          getQueue() == null || getTopic() == null,
          "withQueue(queue) and withTopic(topic) are exclusive");

      input.apply(ParDo.of(new WriterFn(this)));
      return PDone.in(input.getPipeline());
    }

    private static class WriterFn extends DoFn<String, Void> {

      private Write spec;

      private Connection connection;
      private Session session;
      private MessageProducer producer;

      public WriterFn(Write spec) {
        this.spec = spec;
      }

      @Setup
      public void setup() throws Exception {
        if (producer == null) {
          if (spec.getUsername() != null) {
            this.connection =
                spec.getConnectionFactory()
                    .createConnection(spec.getUsername(), spec.getPassword());
          } else {
            this.connection = spec.getConnectionFactory().createConnection();
          }
          this.connection.start();
          // false means we don't use JMS transaction.
          this.session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
          Destination destination;
          if (spec.getQueue() != null) {
            destination = session.createQueue(spec.getQueue());
          } else {
            destination = session.createTopic(spec.getTopic());
          }
          this.producer = this.session.createProducer(destination);
        }
      }

      @ProcessElement
      public void processElement(ProcessContext ctx) throws Exception {
        String value = ctx.element();
        TextMessage message = session.createTextMessage(value);
        producer.send(message);
      }

      @Teardown
      public void teardown() throws Exception {
        producer.close();
        producer = null;
        session.close();
        session = null;
        connection.stop();
        connection.close();
        connection = null;
      }
    }
  }
}
