/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.solr.handler;

import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.nio.charset.Charset;
import java.util.List;

import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.response.UpdateResponse;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.update.CdcrUpdateLog;
import org.apache.solr.update.UpdateLog;
import org.apache.solr.update.processor.CdcrUpdateProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.solr.common.params.CommonParams.VERSION_FIELD;

/**
 * The replication logic. Given a {@link org.apache.solr.handler.CdcrReplicatorState}, it reads all the new entries
 * in the update log and forward them to the target cluster. If an error occurs, the replication is stopped and
 * will be tried again later.
 * @deprecated since 8.6
 */
@Deprecated(since = "8.6")
public class CdcrReplicator implements Runnable {

  private final CdcrReplicatorState state;
  private final int batchSize;

  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

  public CdcrReplicator(CdcrReplicatorState state, int batchSize) {
    this.state = state;
    this.batchSize = batchSize;
  }

  @Override
  public void run() {
    CdcrUpdateLog.CdcrLogReader logReader = state.getLogReader();
    CdcrUpdateLog.CdcrLogReader subReader = null;
    if (logReader == null) {
      log.warn("Log reader for target {} is not initialised, it will be ignored.", state.getTargetCollection());
      return;
    }

    try {
      // create update request
      UpdateRequest req = new UpdateRequest();
      // Add the param to indicate the {@link CdcrUpdateProcessor} to keep the provided version number
      req.setParam(CdcrUpdateProcessor.CDCR_UPDATE, "");

      // Start the benchmark timer
      state.getBenchmarkTimer().start();

      long counter = 0;
      subReader = logReader.getSubReader();

      for (int i = 0; i < batchSize; i++) {
        Object o = subReader.next();
        if (o == null) break; // we have reached the end of the update logs, we should close the batch

        if (isTargetCluster(o)) {
          continue;
        }

        if (isDelete(o)) {

          /*
          * Deletes are sent one at a time.
          */

          // First send out current batch of SolrInputDocument, the non-deletes.
          List<SolrInputDocument> docs = req.getDocuments();

          if (docs != null && docs.size() > 0) {
            subReader.resetToLastPosition(); // Push back the delete for now.
            this.sendRequest(req); // Send the batch update request
            logReader.forwardSeek(subReader); // Advance the main reader to just before the delete.
            o = subReader.next(); // Read the delete again
            counter += docs.size();
            req.clear();
          }

          // Process Delete
          this.processUpdate(o, req);
          this.sendRequest(req);
          logReader.forwardSeek(subReader);
          counter++;
          req.clear();

        } else {

          this.processUpdate(o, req);

        }
      }

      //Send the final batch out.
      List<SolrInputDocument> docs = req.getDocuments();

      if ((docs != null && docs.size() > 0)) {
        this.sendRequest(req);
        counter += docs.size();
      }

      // we might have read a single commit operation and reached the end of the update logs
      logReader.forwardSeek(subReader);

      if (log.isInfoEnabled()) {
        log.info("Forwarded {} updates to target {}", counter, state.getTargetCollection());
      }
    } catch (Exception e) {
      // report error and update error stats
      this.handleException(e);
    } finally {
      // stop the benchmark timer
      state.getBenchmarkTimer().stop();
      // ensure that the subreader is closed and the associated pointer is removed
      if (subReader != null) subReader.close();
    }
  }

  private void sendRequest(UpdateRequest req) throws IOException, SolrServerException, CdcrReplicatorException {
    UpdateResponse rsp = req.process(state.getClient());
    if (rsp.getStatus() != 0) {
      throw new CdcrReplicatorException(req, rsp);
    }
    state.resetConsecutiveErrors();
  }

  /** check whether the update read from TLog is received from source
   *  or received via solr client
   */
  private boolean isTargetCluster(Object o) {
    @SuppressWarnings({"rawtypes"})
    List entry = (List) o;
    int operationAndFlags = (Integer) entry.get(0);
    int oper = operationAndFlags & UpdateLog.OPERATION_MASK;
    Boolean isTarget = false;
    if (oper == UpdateLog.DELETE_BY_QUERY ||  oper == UpdateLog.DELETE) {
      if (entry.size() == 4) { //back-combat - skip for previous versions
        isTarget = (Boolean) entry.get(entry.size() - 1);
      }
    } else if (oper == UpdateLog.UPDATE_INPLACE) {
      if (entry.size() == 6) { //back-combat - skip for previous versions
        isTarget = (Boolean) entry.get(entry.size() - 2);
      }
    } else if (oper == UpdateLog.ADD) {
      if (entry.size() == 4) { //back-combat - skip for previous versions
        isTarget = (Boolean) entry.get(entry.size() - 2);
      }
    }
    return isTarget;
  }

  private boolean isDelete(Object o) {
    @SuppressWarnings({"rawtypes"})
    List entry = (List) o;
    int operationAndFlags = (Integer) entry.get(0);
    int oper = operationAndFlags & UpdateLog.OPERATION_MASK;
    return oper == UpdateLog.DELETE_BY_QUERY || oper == UpdateLog.DELETE;
  }

  private void handleException(Exception e) {
    if (e instanceof CdcrReplicatorException) {
      UpdateRequest req = ((CdcrReplicatorException) e).req;
      UpdateResponse rsp = ((CdcrReplicatorException) e).rsp;
      log.warn("Failed to forward update request {} to target: {}. Got response {}", req, state.getTargetCollection(), rsp);
      state.reportError(CdcrReplicatorState.ErrorType.BAD_REQUEST);
    } else if (e instanceof CloudSolrClient.RouteException) {
      log.warn("Failed to forward update request to target: {}", state.getTargetCollection(), e);
      state.reportError(CdcrReplicatorState.ErrorType.BAD_REQUEST);
    } else {
      log.warn("Failed to forward update request to target: {}", state.getTargetCollection(), e);
      state.reportError(CdcrReplicatorState.ErrorType.INTERNAL);
    }
  }

  private UpdateRequest processUpdate(Object o, UpdateRequest req) {

    // should currently be a List<Oper,Ver,Doc/Id>
    @SuppressWarnings({"rawtypes"})
    List entry = (List) o;

    int operationAndFlags = (Integer) entry.get(0);
    int oper = operationAndFlags & UpdateLog.OPERATION_MASK;
    long version = (Long) entry.get(1);

    // record the operation in the benchmark timer
    state.getBenchmarkTimer().incrementCounter(oper);

    switch (oper) {
      case UpdateLog.ADD: {
        // the version is already attached to the document
        SolrInputDocument sdoc = (SolrInputDocument) entry.get(entry.size() - 1);
        req.add(sdoc);
        return req;
      }
      case UpdateLog.DELETE: {
        byte[] idBytes = (byte[]) entry.get(2);
        req.deleteById(new String(idBytes, Charset.forName("UTF-8")));
        req.setParam(VERSION_FIELD, Long.toString(version));
        return req;
      }

      case UpdateLog.DELETE_BY_QUERY: {
        String query = (String) entry.get(2);
        req.deleteByQuery(query);
        req.setParam(VERSION_FIELD, Long.toString(version));
        return req;
      }

      case UpdateLog.COMMIT: {
        return null;
      }

      default:
        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unknown Operation! " + oper);
    }
  }

  /**
   * Exception to catch update request issues with the target cluster.
   */
  public static class CdcrReplicatorException extends Exception {

    private final UpdateRequest req;
    private final UpdateResponse rsp;

    public CdcrReplicatorException(UpdateRequest req, UpdateResponse rsp) {
      this.req = req;
      this.rsp = rsp;
    }

  }

}

