/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 * 
 * http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */

package org.apache.fluo.integration.impl;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.fluo.api.client.FluoAdmin;
import org.apache.fluo.api.client.FluoClient;
import org.apache.fluo.api.client.FluoFactory;
import org.apache.fluo.api.client.Snapshot;
import org.apache.fluo.api.client.scanner.CellScanner;
import org.apache.fluo.api.config.FluoConfiguration;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.data.RowColumnValue;
import org.apache.fluo.api.exceptions.CommitException;
import org.apache.fluo.api.observer.Observer.NotificationType;
import org.apache.fluo.api.observer.ObserverProvider;
import org.apache.fluo.core.exceptions.AlreadyAcknowledgedException;
import org.apache.fluo.core.impl.Environment;
import org.apache.fluo.core.impl.TransactionImpl.CommitData;
import org.apache.fluo.core.oracle.Stamp;
import org.apache.fluo.integration.ITBaseImpl;
import org.apache.fluo.integration.TestTransaction;
import org.apache.fluo.integration.TestUtil;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;

import static org.apache.fluo.integration.BankUtil.BALANCE;

public class FluoIT extends ITBaseImpl {
  @Rule
  public Timeout globalTimeout = Timeout.seconds(getTestTimeout());

  public static class FluoITObserverProvider implements ObserverProvider {
    @Override
    public void provide(Registry or, Context ctx) {
      or.forColumn(BALANCE, NotificationType.STRONG).useObserver((tx, row, col) -> {
        Assert.fail();
      });
    }
  }

  @Override
  protected Class<? extends ObserverProvider> getObserverProviderClass() {
    return FluoITObserverProvider.class;
  }

  @Test
  public void testFluoFactory() throws Exception {
    try (FluoAdmin admin = FluoFactory.newAdmin(config)) {
      Assert.assertNotNull(admin);
    }

    try (FluoClient client = FluoFactory.newClient(config)) {
      Assert.assertNotNull(client);
      Assert.assertNotNull(client.newLoaderExecutor());

      try (Snapshot s = client.newSnapshot()) {
        Assert.assertNotNull(s);
        s.get(Bytes.of("test"), new Column(Bytes.of("cf"), Bytes.of("cq")));
      }
    }
  }

  @Test
  public void testOverlap1() throws Exception {
    // test transactions that overlap reads and both attempt to write
    // TX1 starts
    // TX2 starts
    // TX1 reads/writes
    // TX2 reads/writes
    // TX2 commits -- succeeds
    // TX1 commits -- fails

    final TestTransaction tx = new TestTransaction(env);

    tx.set("bob", BALANCE, "10");
    tx.set("joe", BALANCE, "20");
    tx.set("jill", BALANCE, "60");

    tx.done();

    final TestTransaction tx1 = new TestTransaction(env);

    Assert.assertEquals("10", tx1.gets("bob", BALANCE));
    Assert.assertEquals("20", tx1.gets("joe", BALANCE));

    TestUtil.increment(tx1, "bob", BALANCE, -5);
    TestUtil.increment(tx1, "joe", BALANCE, 5);

    final TestTransaction tx2 = new TestTransaction(env);

    Assert.assertEquals("10", tx2.gets("bob", BALANCE));
    Assert.assertEquals("60", tx2.gets("jill", BALANCE));

    TestUtil.increment(tx2, "bob", BALANCE, -5);
    TestUtil.increment(tx2, "jill", BALANCE, 5);

    tx2.done();
    assertCommitFails(tx1);

    final TestTransaction tx3 = new TestTransaction(env);

    Assert.assertEquals("5", tx3.gets("bob", BALANCE));
    Assert.assertEquals("20", tx3.gets("joe", BALANCE));
    Assert.assertEquals("65", tx3.gets("jill", BALANCE));
    tx3.done();
  }

  private void assertCommitFails(TestTransaction tx) {
    try {
      tx.done();
      Assert.fail();
    } catch (CommitException ce) {
      // expected
    }
  }

  private void assertAAck(TestTransaction tx) {
    try {
      tx.done();
      Assert.fail();
    } catch (AlreadyAcknowledgedException ce) {
      // expected
    }
  }

  @Test
  public void testSnapshots() throws Exception {
    // test the following case
    // TX1 starts
    // TX2 starts
    // TX2 reads/writes
    // TX2 commits
    // TX1 reads -- should not see TX2 writes

    final TestTransaction tx = new TestTransaction(env);

    tx.set("bob", BALANCE, "10");
    tx.set("joe", BALANCE, "20");
    tx.set("jill", BALANCE, "60");
    tx.set("jane", BALANCE, "0");

    tx.done();

    final TestTransaction tx1 = new TestTransaction(env);

    final TestTransaction tx2 = new TestTransaction(env);

    TestUtil.increment(tx2, "bob", BALANCE, -5);
    TestUtil.increment(tx2, "joe", BALANCE, -5);
    TestUtil.increment(tx2, "jill", BALANCE, 10);

    Assert.assertEquals("10", tx1.gets("bob", BALANCE));

    tx2.done();

    final TestTransaction txd = new TestTransaction(env);
    txd.delete("jane", BALANCE);
    txd.done();

    Assert.assertEquals("20", tx1.gets("joe", BALANCE));
    Assert.assertEquals("60", tx1.gets("jill", BALANCE));
    Assert.assertEquals("0", tx1.gets("jane", BALANCE));

    tx1.set("bob", BALANCE, "5");
    tx1.set("joe", BALANCE, "25");

    assertCommitFails(tx1);

    final TestTransaction tx3 = new TestTransaction(env);

    final TestTransaction tx4 = new TestTransaction(env);
    tx4.set("jane", BALANCE, "3");
    tx4.done();

    Assert.assertEquals("5", tx3.gets("bob", BALANCE));
    Assert.assertEquals("15", tx3.gets("joe", BALANCE));
    Assert.assertEquals("70", tx3.gets("jill", BALANCE));
    Assert.assertNull(tx3.gets("jane", BALANCE));
    tx3.done();

    final TestTransaction tx5 = new TestTransaction(env);

    Assert.assertEquals("5", tx5.gets("bob", BALANCE));
    Assert.assertEquals("15", tx5.gets("joe", BALANCE));
    Assert.assertEquals("70", tx5.gets("jill", BALANCE));
    Assert.assertEquals("3", tx5.gets("jane", BALANCE));
    tx5.done();
  }

  @Test
  public void testAck() throws Exception {
    // when two transactions run against the same observed column, only one should commit

    final TestTransaction tx = new TestTransaction(env);

    tx.set("bob", BALANCE, "10");
    tx.set("joe", BALANCE, "20");
    tx.set("jill", BALANCE, "60");

    tx.done();

    final TestTransaction tx1 = new TestTransaction(env, "joe", BALANCE);
    tx1.gets("joe", BALANCE);
    tx1.set("jill", BALANCE, "61");

    final TestTransaction tx2 = new TestTransaction(env, "joe", BALANCE);
    tx2.gets("joe", BALANCE);
    tx2.set("bob", BALANCE, "11");

    tx1.done();
    assertAAck(tx2);

    final TestTransaction tx3 = new TestTransaction(env);

    Assert.assertEquals("10", tx3.gets("bob", BALANCE));
    Assert.assertEquals("20", tx3.gets("joe", BALANCE));
    Assert.assertEquals("61", tx3.gets("jill", BALANCE));

    // update joe, so it can be acknowledged again
    tx3.set("joe", BALANCE, "21");

    tx3.done();

    final TestTransaction tx4 = new TestTransaction(env, "joe", BALANCE);
    tx4.gets("joe", BALANCE);
    tx4.set("jill", BALANCE, "62");

    final TestTransaction tx5 = new TestTransaction(env, "joe", BALANCE);
    tx5.gets("joe", BALANCE);
    tx5.set("bob", BALANCE, "11");

    final TestTransaction tx7 = new TestTransaction(env, "joe", BALANCE);

    // make the 2nd transaction to start commit 1st
    tx5.done();
    assertAAck(tx4);

    final TestTransaction tx6 = new TestTransaction(env);

    Assert.assertEquals("11", tx6.gets("bob", BALANCE));
    Assert.assertEquals("21", tx6.gets("joe", BALANCE));
    Assert.assertEquals("61", tx6.gets("jill", BALANCE));
    tx6.done();

    tx7.gets("joe", BALANCE);
    tx7.set("bob", BALANCE, "15");
    tx7.set("jill", BALANCE, "60");

    assertAAck(tx7);

    final TestTransaction tx8 = new TestTransaction(env);

    Assert.assertEquals("11", tx8.gets("bob", BALANCE));
    Assert.assertEquals("21", tx8.gets("joe", BALANCE));
    Assert.assertEquals("61", tx8.gets("jill", BALANCE));
    tx8.done();
  }

  @Test
  public void testAck2() throws Exception {
    final TestTransaction tx = new TestTransaction(env);

    final Column addrCol = new Column("account", "addr");

    tx.set("bob", BALANCE, "10");
    tx.set("joe", BALANCE, "20");
    tx.set("jill", BALANCE, "60");

    tx.done();

    final TestTransaction tx1 = new TestTransaction(env, "bob", BALANCE);
    final TestTransaction tx2 = new TestTransaction(env, "bob", BALANCE);
    final TestTransaction tx3 = new TestTransaction(env, "bob", BALANCE);

    tx1.gets("bob", BALANCE);
    tx2.gets("bob", BALANCE);

    tx1.gets("bob", addrCol);
    tx2.gets("bob", addrCol);

    tx1.set("bob", addrCol, "1 loop pl");
    tx2.set("bob", addrCol, "1 loop pl");

    // this test overlaps the commits of two transactions w/ the same trigger

    CommitData cd = tx1.createCommitData();
    Assert.assertTrue(tx1.preCommit(cd));

    assertCommitFails(tx2);

    Stamp commitTs = env.getSharedResources().getOracleClient().getStamp();
    Assert.assertTrue(tx1.commitPrimaryColumn(cd, commitTs));
    tx1.finishCommit(cd, commitTs);
    tx1.close();

    tx3.set("bob", addrCol, "2 loop pl");
    assertAAck(tx3);
  }

  @Test
  public void testAck3() throws Exception {
    final TestTransaction tx = new TestTransaction(env);

    tx.set("bob", BALANCE, "10");
    tx.set("joe", BALANCE, "20");
    tx.set("jill", BALANCE, "60");

    tx.done();

    long notTS1 = TestTransaction.getNotificationTS(env, "bob", BALANCE);

    // this transaction should create a second notification
    final TestTransaction tx1 = new TestTransaction(env);
    tx1.set("bob", BALANCE, "11");
    tx1.done();

    long notTS2 = TestTransaction.getNotificationTS(env, "bob", BALANCE);

    Assert.assertTrue(notTS1 < notTS2);

    // even though there were two notifications and TX is using 1st notification TS... only 1st TX
    // should execute
    // google paper calls this message collapsing

    final TestTransaction tx3 = new TestTransaction(env, "bob", BALANCE, notTS1);

    final TestTransaction tx2 = new TestTransaction(env, "bob", BALANCE, notTS1);
    Assert.assertEquals("11", tx2.gets("bob", BALANCE));
    tx2.done();

    Assert.assertEquals("11", tx3.gets("bob", BALANCE));
    assertAAck(tx3);

    final TestTransaction tx4 = new TestTransaction(env, "bob", BALANCE, notTS2);
    Assert.assertEquals("11", tx4.gets("bob", BALANCE));
    assertAAck(tx4);
  }

  @Test
  public void testWriteObserved() throws Exception {
    // setting an acknowledged observed column in a transaction should not affect acknowledged
    // status

    final TestTransaction tx = new TestTransaction(env);

    tx.set("bob", BALANCE, "10");
    tx.set("joe", BALANCE, "20");
    tx.set("jill", BALANCE, "60");

    tx.done();

    final TestTransaction tx2 = new TestTransaction(env, "joe", BALANCE);
    tx2.gets("joe", BALANCE);
    tx2.set("joe", BALANCE, "21");
    tx2.set("bob", BALANCE, "11");

    final TestTransaction tx1 = new TestTransaction(env, "joe", BALANCE);
    tx1.gets("joe", BALANCE);
    tx1.set("jill", BALANCE, "61");

    tx1.done();
    assertAAck(tx2);

    final TestTransaction tx3 = new TestTransaction(env);

    Assert.assertEquals("10", tx3.gets("bob", BALANCE));
    Assert.assertEquals("20", tx3.gets("joe", BALANCE));
    Assert.assertEquals("61", tx3.gets("jill", BALANCE));

    tx3.done();
  }

  @Test
  public void testVisibility() throws Exception {

    aClient.securityOperations().changeUserAuthorizations(USER, new Authorizations("A", "B", "C"));

    env.setAuthorizations(new Authorizations("A", "B", "C"));

    Column balanceCol = new Column("account", "balance", "A|B");

    final TestTransaction tx = new TestTransaction(env);

    tx.set("bob", balanceCol, "10");
    tx.set("joe", balanceCol, "20");
    tx.set("jill", balanceCol, "60");

    tx.done();

    FluoConfiguration fc = new FluoConfiguration(config);
    Environment env2 = new Environment(fc);
    env2.setAuthorizations(new Authorizations("B"));

    final TestTransaction tx2 = new TestTransaction(env2);
    Assert.assertEquals("10", tx2.gets("bob", balanceCol));
    Assert.assertEquals("20", tx2.gets("joe", balanceCol));
    Assert.assertEquals("60", tx2.gets("jill", balanceCol));
    tx2.done();
    env2.close();

    Environment env3 = new Environment(fc);
    env3.setAuthorizations(new Authorizations("C"));

    final TestTransaction tx3 = new TestTransaction(env3);
    Assert.assertNull(tx3.gets("bob", balanceCol));
    Assert.assertNull(tx3.gets("joe", balanceCol));
    Assert.assertNull(tx3.gets("jill", balanceCol));
    tx3.done();
    env3.close();
  }

  @Test
  public void testRange() throws Exception {
    // setting an acknowledged observed column in a transaction should not affect acknowledged
    // status

    final TestTransaction tx = new TestTransaction(env);
    tx.set("d00001", new Column("data", "content"),
        "blah blah, blah http://a.com. Blah blah http://b.com.  Blah http://c.com");
    tx.set("d00001", new Column("outlink", "http://a.com"), "");
    tx.set("d00001", new Column("outlink", "http://b.com"), "");
    tx.set("d00001", new Column("outlink", "http://c.com"), "");

    tx.set("d00002", new Column("data", "content"),
        "blah blah, blah http://d.com. Blah blah http://e.com.  Blah http://c.com");
    tx.set("d00002", new Column("outlink", "http://d.com"), "");
    tx.set("d00002", new Column("outlink", "http://e.com"), "");
    tx.set("d00002", new Column("outlink", "http://c.com"), "");

    tx.done();

    final TestTransaction tx2 = new TestTransaction(env);

    final TestTransaction tx3 = new TestTransaction(env);

    tx3.set("d00001", new Column("data", "content"),
        "blah blah, blah http://a.com. Blah http://c.com .  Blah http://z.com");
    tx3.set("d00001", new Column("outlink", "http://a.com"), "");
    tx3.delete("d00001", new Column("outlink", "http://b.com"));
    tx3.set("d00001", new Column("outlink", "http://c.com"), "");
    tx3.set("d00001", new Column("outlink", "http://z.com"), "");

    tx3.done();

    HashSet<Column> columns = new HashSet<>();

    CellScanner cellScanner = tx2.scanner().over("d00001").fetch(new Column("outlink")).build();
    for (RowColumnValue rcv : cellScanner) {
      columns.add(rcv.getColumn());
    }

    tx2.done();

    HashSet<Column> expected = new HashSet<>();
    expected.add(new Column("outlink", "http://a.com"));
    expected.add(new Column("outlink", "http://b.com"));
    expected.add(new Column("outlink", "http://c.com"));

    Assert.assertEquals(expected, columns);

    final TestTransaction tx4 = new TestTransaction(env);
    columns.clear();
    cellScanner = tx4.scanner().over("d00001").fetch(new Column("outlink")).build();
    for (RowColumnValue rcv : cellScanner) {
      columns.add(rcv.getColumn());
    }

    expected.add(new Column("outlink", "http://z.com"));
    expected.remove(new Column("outlink", "http://b.com"));
    Assert.assertEquals(expected, columns);
    tx4.done();
  }

  @Test
  public void testStringMethods() {
    final TestTransaction tx = new TestTransaction(env);

    Column ccol = new Column("doc", "content");
    Column tcol = new Column("doc", "time");

    tx.set("d:0001", ccol, "abc def");
    tx.set("d:0001", tcol, "45");
    tx.set("d:0002", ccol, "neb feg");
    tx.set("d:0002", tcol, "97");
    tx.set("d:0003", ccol, "xyz abc");
    tx.set("d:0003", tcol, "42");

    tx.done();

    final TestTransaction tx2 = new TestTransaction(env);

    Map<String, Map<Column, String>> map1 =
        tx2.gets(Arrays.asList("d:0001", "d:0002"), Collections.singleton(ccol));
    Map<String, ImmutableMap<Column, String>> expected1 = ImmutableMap.of("d:0001",
        ImmutableMap.of(ccol, "abc def"), "d:0002", ImmutableMap.of(ccol, "neb feg"));
    Assert.assertEquals(expected1, map1);

    Assert.assertEquals("45", tx2.gets("d:0001", tcol));
    Assert.assertEquals("xyz abc", tx2.gets("d:0003", ccol));

    Map<Column, String> map2 = tx2.gets("d:0002", ImmutableSet.of(ccol, tcol));
    Map<Column, String> expected2 = ImmutableMap.of(ccol, "neb feg", tcol, "97");
    Assert.assertEquals(expected2, map2);

    tx2.delete("d:0003", ccol);
    tx2.delete("d:0003", tcol);

    tx2.done();

    final TestTransaction tx3 = new TestTransaction(env);

    Assert.assertNull(tx3.gets("d:0003", ccol));
    Assert.assertNull(tx3.gets("d:0003", tcol));

    tx3.done();
  }
}
