/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.apache.druid.segment.realtime.appenderator;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import org.apache.druid.client.CachingQueryRunner;
import org.apache.druid.client.cache.Cache;
import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.client.cache.CachePopulatorStats;
import org.apache.druid.client.cache.ForegroundCachePopulator;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.java.util.common.guava.FunctionalIterable;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.BySegmentQueryRunner;
import org.apache.druid.query.CPUTimeMetricQueryRunner;
import org.apache.druid.query.FinalizeResultsQueryRunner;
import org.apache.druid.query.MetricsEmittingQueryRunner;
import org.apache.druid.query.NoopQueryRunner;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.QueryMetrics;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.QueryRunnerHelper;
import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.ReportTimelineMissingSegmentQueryRunner;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.SinkQueryRunners;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.query.spec.SpecificSegmentQueryRunner;
import org.apache.druid.query.spec.SpecificSegmentSpec;
import org.apache.druid.segment.SegmentReference;
import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.filter.Filters;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.join.JoinableFactoryWrapper;
import org.apache.druid.segment.realtime.FireHydrant;
import org.apache.druid.segment.realtime.plumber.Sink;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.PartitionChunk;
import org.joda.time.Interval;

import java.io.Closeable;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;

/**
 * Query handler for indexing tasks.
 */
public class SinkQuerySegmentWalker implements QuerySegmentWalker
{
  private static final EmittingLogger log = new EmittingLogger(SinkQuerySegmentWalker.class);
  private static final String CONTEXT_SKIP_INCREMENTAL_SEGMENT = "skipIncrementalSegment";

  private final String dataSource;

  private final VersionedIntervalTimeline<String, Sink> sinkTimeline;
  private final ObjectMapper objectMapper;
  private final ServiceEmitter emitter;
  private final QueryRunnerFactoryConglomerate conglomerate;
  private final ExecutorService queryExecutorService;
  private final JoinableFactoryWrapper joinableFactoryWrapper;
  private final Cache cache;
  private final CacheConfig cacheConfig;
  private final CachePopulatorStats cachePopulatorStats;

  public SinkQuerySegmentWalker(
      String dataSource,
      VersionedIntervalTimeline<String, Sink> sinkTimeline,
      ObjectMapper objectMapper,
      ServiceEmitter emitter,
      QueryRunnerFactoryConglomerate conglomerate,
      ExecutorService queryExecutorService,
      JoinableFactory joinableFactory,
      Cache cache,
      CacheConfig cacheConfig,
      CachePopulatorStats cachePopulatorStats
  )
  {
    this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
    this.sinkTimeline = Preconditions.checkNotNull(sinkTimeline, "sinkTimeline");
    this.objectMapper = Preconditions.checkNotNull(objectMapper, "objectMapper");
    this.emitter = Preconditions.checkNotNull(emitter, "emitter");
    this.conglomerate = Preconditions.checkNotNull(conglomerate, "conglomerate");
    this.queryExecutorService = Preconditions.checkNotNull(queryExecutorService, "queryExecutorService");
    this.joinableFactoryWrapper = new JoinableFactoryWrapper(joinableFactory);
    this.cache = Preconditions.checkNotNull(cache, "cache");
    this.cacheConfig = Preconditions.checkNotNull(cacheConfig, "cacheConfig");
    this.cachePopulatorStats = Preconditions.checkNotNull(cachePopulatorStats, "cachePopulatorStats");

    if (!cache.isLocal()) {
      log.warn("Configured cache[%s] is not local, caching will not be enabled.", cache.getClass().getName());
    }
  }

  @Override
  public <T> QueryRunner<T> getQueryRunnerForIntervals(final Query<T> query, final Iterable<Interval> intervals)
  {
    final Iterable<SegmentDescriptor> specs = FunctionalIterable
        .create(intervals)
        .transformCat(sinkTimeline::lookup)
        .transformCat(
            holder -> FunctionalIterable
                .create(holder.getObject())
                .transform(
                    chunk -> new SegmentDescriptor(
                        holder.getInterval(),
                        holder.getVersion(),
                        chunk.getChunkNumber()
                    )
                )
        );

    return getQueryRunnerForSegments(query, specs);
  }

  @Override
  public <T> QueryRunner<T> getQueryRunnerForSegments(final Query<T> query, final Iterable<SegmentDescriptor> specs)
  {
    // We only handle one particular dataSource. Make sure that's what we have, then ignore from here on out.
    final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(query.getDataSource());

    // Sanity check: make sure the query is based on the table we're meant to handle.
    if (!analysis.getBaseTableDataSource().filter(ds -> dataSource.equals(ds.getName())).isPresent()) {
      throw new ISE("Cannot handle datasource: %s", analysis.getDataSource());
    }

    final QueryRunnerFactory<T, Query<T>> factory = conglomerate.findFactory(query);
    if (factory == null) {
      throw new ISE("Unknown query type[%s].", query.getClass());
    }

    final QueryToolChest<T, Query<T>> toolChest = factory.getToolchest();
    final boolean skipIncrementalSegment = query.getContextValue(CONTEXT_SKIP_INCREMENTAL_SEGMENT, false);
    final AtomicLong cpuTimeAccumulator = new AtomicLong(0L);

    // Make sure this query type can handle the subquery, if present.
    if (analysis.isQuery() && !toolChest.canPerformSubquery(((QueryDataSource) analysis.getDataSource()).getQuery())) {
      throw new ISE("Cannot handle subquery: %s", analysis.getDataSource());
    }

    // segmentMapFn maps each base Segment into a joined Segment if necessary.
    final Function<SegmentReference, SegmentReference> segmentMapFn = joinableFactoryWrapper.createSegmentMapFn(
        analysis.getJoinBaseTableFilter().map(Filters::toFilter).orElse(null),
        analysis.getPreJoinableClauses(),
        cpuTimeAccumulator,
        analysis.getBaseQuery().orElse(query)
    );

    // We compute the join cache key here itself so it doesn't need to be re-computed for every segment
    final Optional<byte[]> cacheKeyPrefix = analysis.isJoin()
                                            ? joinableFactoryWrapper.computeJoinDataSourceCacheKey(analysis)
                                            : Optional.of(StringUtils.EMPTY_BYTES);

    Iterable<QueryRunner<T>> perSegmentRunners = Iterables.transform(
        specs,
        descriptor -> {
          final PartitionChunk<Sink> chunk = sinkTimeline.findChunk(
              descriptor.getInterval(),
              descriptor.getVersion(),
              descriptor.getPartitionNumber()
          );

          if (chunk == null) {
            return new ReportTimelineMissingSegmentQueryRunner<>(descriptor);
          }

          final Sink theSink = chunk.getObject();
          final SegmentId sinkSegmentId = theSink.getSegment().getId();

          Iterable<QueryRunner<T>> perHydrantRunners = new SinkQueryRunners<>(
              Iterables.transform(
                  theSink,
                  hydrant -> {
                    // Hydrant might swap at any point, but if it's swapped at the start
                    // then we know it's *definitely* swapped.
                    final boolean hydrantDefinitelySwapped = hydrant.hasSwapped();

                    if (skipIncrementalSegment && !hydrantDefinitelySwapped) {
                      return new Pair<>(hydrant.getSegmentDataInterval(), new NoopQueryRunner<>());
                    }

                    // Prevent the underlying segment from swapping when its being iterated
                    final Optional<Pair<SegmentReference, Closeable>> maybeSegmentAndCloseable =
                        hydrant.getSegmentForQuery(segmentMapFn);

                    // if optional isn't present, we failed to acquire reference to the segment or any joinables
                    if (!maybeSegmentAndCloseable.isPresent()) {
                      return new Pair<>(
                          hydrant.getSegmentDataInterval(),
                          new ReportTimelineMissingSegmentQueryRunner<>(descriptor)
                      );
                    }
                    final Pair<SegmentReference, Closeable> segmentAndCloseable = maybeSegmentAndCloseable.get();
                    try {

                      QueryRunner<T> runner = factory.createRunner(segmentAndCloseable.lhs);

                      // 1) Only use caching if data is immutable
                      // 2) Hydrants are not the same between replicas, make sure cache is local
                      if (hydrantDefinitelySwapped && cache.isLocal()) {
                        StorageAdapter storageAdapter = segmentAndCloseable.lhs.asStorageAdapter();
                        long segmentMinTime = storageAdapter.getMinTime().getMillis();
                        long segmentMaxTime = storageAdapter.getMaxTime().getMillis();
                        Interval actualDataInterval = Intervals.utc(segmentMinTime, segmentMaxTime + 1);
                        runner = new CachingQueryRunner<>(
                            makeHydrantCacheIdentifier(hydrant),
                            cacheKeyPrefix,
                            descriptor,
                            actualDataInterval,
                            objectMapper,
                            cache,
                            toolChest,
                            runner,
                            // Always populate in foreground regardless of config
                            new ForegroundCachePopulator(
                                objectMapper,
                                cachePopulatorStats,
                                cacheConfig.getMaxEntrySize()
                            ),
                            cacheConfig
                        );
                      }
                      // Make it always use Closeable to decrement()
                      runner = QueryRunnerHelper.makeClosingQueryRunner(
                          runner,
                          segmentAndCloseable.rhs
                      );
                      return new Pair<>(segmentAndCloseable.lhs.getDataInterval(), runner);
                    }
                    catch (RuntimeException e) {
                      CloseQuietly.close(segmentAndCloseable.rhs);
                      throw e;
                    }
                  }
              )
          );
          return new SpecificSegmentQueryRunner<>(
              withPerSinkMetrics(
                  new BySegmentQueryRunner<>(
                      sinkSegmentId,
                      descriptor.getInterval().getStart(),
                      factory.mergeRunners(
                          Execs.directExecutor(),
                          perHydrantRunners
                      )
                  ),
                  toolChest,
                  sinkSegmentId,
                  cpuTimeAccumulator
              ),
              new SpecificSegmentSpec(descriptor)
          );
        }
    );
    final QueryRunner<T> mergedRunner =
        toolChest.mergeResults(
            factory.mergeRunners(
                queryExecutorService,
                perSegmentRunners
            )
        );

    return CPUTimeMetricQueryRunner.safeBuild(
        new FinalizeResultsQueryRunner<>(mergedRunner, toolChest),
        toolChest,
        emitter,
        cpuTimeAccumulator,
        true
    );
  }

  @VisibleForTesting
  String getDataSource()
  {
    return dataSource;
  }

  /**
   * Decorates a Sink's query runner to emit query/segmentAndCache/time, query/segment/time, query/wait/time once
   * each for the whole Sink. Also adds CPU time to cpuTimeAccumulator.
   */
  private <T> QueryRunner<T> withPerSinkMetrics(
      final QueryRunner<T> sinkRunner,
      final QueryToolChest<T, ? extends Query<T>> queryToolChest,
      final SegmentId sinkSegmentId,
      final AtomicLong cpuTimeAccumulator
  )
  {
    // Note: reportSegmentAndCacheTime and reportSegmentTime are effectively the same here. They don't split apart
    // cache vs. non-cache due to the fact that Sinks may be partially cached and partially uncached. Making this
    // better would need to involve another accumulator like the cpuTimeAccumulator that we could share with the
    // sinkRunner.
    String sinkSegmentIdString = sinkSegmentId.toString();
    return CPUTimeMetricQueryRunner.safeBuild(
        new MetricsEmittingQueryRunner<>(
            emitter,
            queryToolChest,
            new MetricsEmittingQueryRunner<>(
                emitter,
                queryToolChest,
                sinkRunner,
                QueryMetrics::reportSegmentTime,
                queryMetrics -> queryMetrics.segment(sinkSegmentIdString)
            ),
            QueryMetrics::reportSegmentAndCacheTime,
            queryMetrics -> queryMetrics.segment(sinkSegmentIdString)
        ).withWaitMeasuredFromNow(),
        queryToolChest,
        emitter,
        cpuTimeAccumulator,
        false
    );
  }

  public VersionedIntervalTimeline<String, Sink> getSinkTimeline()
  {
    return sinkTimeline;
  }

  public static String makeHydrantCacheIdentifier(FireHydrant input)
  {
    return input.getSegmentId() + "_" + input.getCount();
  }
}
