/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */
package org.apache.geode.internal.cache.entries;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;

import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mockito.Mockito;

import org.apache.geode.DataSerializer;
import org.apache.geode.cache.query.QueryException;
import org.apache.geode.cache.query.internal.index.IndexManager;
import org.apache.geode.cache.query.internal.index.IndexProtocol;
import org.apache.geode.cache.util.GatewayConflictResolver;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.EntryEventImpl;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.InternalRegion;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.RegionClearedException;
import org.apache.geode.internal.cache.RegionEntryContext;
import org.apache.geode.internal.cache.TimestampedEntryEventImpl;
import org.apache.geode.internal.cache.Token;
import org.apache.geode.internal.cache.versions.ConcurrentCacheModificationException;
import org.apache.geode.internal.cache.versions.RegionVersionVector;
import org.apache.geode.internal.cache.versions.VersionSource;
import org.apache.geode.internal.cache.versions.VersionStamp;
import org.apache.geode.internal.cache.versions.VersionTag;
import org.apache.geode.internal.offheap.MemoryAllocatorImpl;
import org.apache.geode.internal.offheap.OffHeapMemoryStats;
import org.apache.geode.internal.offheap.OutOfOffHeapMemoryListener;
import org.apache.geode.internal.offheap.SlabImpl;
import org.apache.geode.internal.offheap.StoredObject;
import org.apache.geode.internal.offheap.annotations.Unretained;
import org.apache.geode.internal.util.concurrent.CustomEntryConcurrentHashMap.HashEntry;

public class AbstractRegionEntryTest {

  @Rule
  public ExpectedException expectedException = ExpectedException.none();

  @Test
  public void whenMakeTombstoneHasSetValueThatThrowsExceptionDoesNotChangeValueToTombstone()
      throws RegionClearedException {
    LocalRegion lr = mock(LocalRegion.class);
    RegionVersionVector<?> rvv = mock(RegionVersionVector.class);
    when(lr.getVersionVector()).thenReturn(rvv);
    VersionTag<?> vt = mock(VersionTag.class);
    Object value = "value";
    AbstractRegionEntry re = new TestableRegionEntry(lr, value);
    assertEquals(value, re.getValueField());
    Assertions.assertThatThrownBy(() -> re.makeTombstone(lr, vt))
        .isInstanceOf(RuntimeException.class).hasMessage("throw exception on setValue(TOMBSTONE)");
    Assert.assertEquals(Token.REMOVED_PHASE2, re.getValueField());
  }


  @Test
  public void whenRegionEntryIsTombstoneThenIndexShouldNotBeUpdated() throws QueryException {
    InternalRegion region = mock(InternalRegion.class);
    IndexManager indexManager = mock(IndexManager.class);
    when(region.getIndexManager()).thenReturn(indexManager);
    AbstractRegionEntry abstractRegionEntry = mock(AbstractRegionEntry.class);
    when(abstractRegionEntry.isTombstone()).thenReturn(true);
    abstractRegionEntry.updateIndexOnDestroyOperation(region);
    verify(indexManager, never()).updateIndexes(any(AbstractRegionEntry.class),
        eq(IndexManager.REMOVE_ENTRY), eq(IndexProtocol.OTHER_OP));

  }


  @Test
  public void whenPrepareValueForCacheCalledWithOffHeapEntryHasNewCachedSerializedValue()
      throws RegionClearedException, IOException, ClassNotFoundException {
    LocalRegion lr = mock(LocalRegion.class);
    RegionEntryContext regionEntryContext = mock(RegionEntryContext.class);
    OutOfOffHeapMemoryListener ooohml = mock(OutOfOffHeapMemoryListener.class);
    OffHeapMemoryStats stats = mock(OffHeapMemoryStats.class);
    SlabImpl slab = new SlabImpl(1024); // 1k
    MemoryAllocatorImpl ma =
        MemoryAllocatorImpl.createForUnitTest(ooohml, stats, new SlabImpl[] {slab});
    try {
      when(regionEntryContext.getOffHeap()).thenReturn(true);
      String value = "value";
      AbstractRegionEntry re = new TestableRegionEntry(lr, value);
      assertEquals(value, re.getValueField());
      EntryEventImpl entryEvent = new EntryEventImpl();
      StoredObject valueForCache =
          (StoredObject) re.prepareValueForCache(regionEntryContext, value, entryEvent, true);
      final byte[] cachedSerializedNewValue = entryEvent.getCachedSerializedNewValue();
      assertNotNull(cachedSerializedNewValue);
      valueForCache.checkDataEquals(cachedSerializedNewValue);
      DataInputStream dataInputStream =
          new DataInputStream(new ByteArrayInputStream(cachedSerializedNewValue));
      Object o = DataSerializer.readObject(dataInputStream);
      assertEquals(o, value);
    } finally {
      MemoryAllocatorImpl.freeOffHeapMemory();
    }
  }

  @Test
  public void gatewayEventsFromSameDSShouldThrowCMEIfMisordered() {
    // create 2 gateway events with the same dsid, but different timestamp
    // apply them in misorder, it should throw CME before calling resolver
    GemFireCacheImpl cache = mock(GemFireCacheImpl.class);
    LocalRegion lr = mock(LocalRegion.class);
    String value = "value";
    AbstractRegionEntry re = new TestableRegionEntry(lr, value);
    InternalDistributedMember member1 = mock(InternalDistributedMember.class);

    EntryEventImpl entryEvent1 = new EntryEventImpl();
    entryEvent1.setRegion(lr);
    when(lr.getCache()).thenReturn(cache);
    GatewayConflictResolver resolver = mock(GatewayConflictResolver.class);
    when(cache.getGatewayConflictResolver()).thenReturn(resolver);

    VersionTag tag1 = VersionTag.create(member1);
    tag1.setVersionTimeStamp(1);
    tag1.setDistributedSystemId(3);
    tag1.setIsGatewayTag(true);

    VersionTag tag2 = VersionTag.create(member1);
    tag2.setVersionTimeStamp(2);
    tag2.setDistributedSystemId(3);
    tag2.setIsGatewayTag(true);

    ((TestableRegionEntry) re).setVersions(tag2);
    assertEquals(tag2.getVersionTimeStamp(),
        re.getVersionStamp().asVersionTag().getVersionTimeStamp());
    assertEquals(3, ((TestableRegionEntry) re).getDistributedSystemId());

    // apply tag1 with smaller timestamp should throw CME
    entryEvent1.setVersionTag(tag1);
    expectedException.expect(ConcurrentCacheModificationException.class);
    expectedException.expectMessage("conflicting WAN event detected");
    re.processVersionTag(entryEvent1);
    verify(resolver, never()).onEvent(any(), any());
  }

  @Test
  public void gatewayEventsFromSameDSInCorrectOrderOfTimestampShouldPass() {
    // create 2 gateway events with the same dsid, but different timestamp
    // apply them in correct order, it should pass
    GemFireCacheImpl cache = mock(GemFireCacheImpl.class);
    LocalRegion lr = mock(LocalRegion.class);
    String value = "value";
    AbstractRegionEntry re = new TestableRegionEntry(lr, value);
    InternalDistributedMember member1 = mock(InternalDistributedMember.class);

    EntryEventImpl entryEvent1 = new EntryEventImpl();
    entryEvent1.setRegion(lr);
    when(lr.getCache()).thenReturn(cache);
    when(cache.getGatewayConflictResolver()).thenReturn(null);

    VersionTag tag1 = VersionTag.create(member1);
    tag1.setVersionTimeStamp(1);
    tag1.setDistributedSystemId(3);
    tag1.setIsGatewayTag(true);

    VersionTag tag2 = VersionTag.create(member1);
    tag2.setVersionTimeStamp(2);
    tag2.setDistributedSystemId(3);
    tag2.setIsGatewayTag(true);

    ((TestableRegionEntry) re).setVersions(tag1);
    assertEquals(tag1.getVersionTimeStamp(),
        re.getVersionStamp().asVersionTag().getVersionTimeStamp());
    assertEquals(3, ((TestableRegionEntry) re).getDistributedSystemId());

    // apply tag2 should be accepted
    entryEvent1.setVersionTag(tag2);
    re.processVersionTag(entryEvent1);
  }

  @Test
  public void stampWithoutDSIDShouldAcceptAnyTag() {
    GemFireCacheImpl cache = mock(GemFireCacheImpl.class);
    LocalRegion lr = mock(LocalRegion.class);
    String value = "value";
    AbstractRegionEntry re = new TestableRegionEntry(lr, value);
    InternalDistributedMember member1 = mock(InternalDistributedMember.class);

    EntryEventImpl entryEvent1 = new EntryEventImpl();
    entryEvent1.setRegion(lr);
    when(lr.getCache()).thenReturn(cache);
    when(cache.getGatewayConflictResolver()).thenReturn(null);

    VersionTag tag1 = VersionTag.create(member1);
    tag1.setVersionTimeStamp(1);
    tag1.setDistributedSystemId(-1);
    tag1.setIsGatewayTag(true);

    VersionTag tag2 = VersionTag.create(member1);
    tag2.setVersionTimeStamp(2);
    tag2.setDistributedSystemId(2);
    tag2.setIsGatewayTag(true);

    ((TestableRegionEntry) re).setVersions(tag1);
    assertEquals(tag1.getVersionTimeStamp(),
        re.getVersionStamp().asVersionTag().getVersionTimeStamp());
    assertEquals(-1, ((TestableRegionEntry) re).getDistributedSystemId());

    // apply tag2 should be accepted
    entryEvent1.setVersionTag(tag2);
    re.processVersionTag(entryEvent1);
  }

  @Test
  public void applyingGatewayEventsFromDifferentDSShouldAcceptBiggerTimestamp() {
    // create 2 gateway events:
    // tag1 with smaller distributed system ids (DSIDs) and bigger timestamp
    // tag2 with bigger DSID and smaller timestamp
    // set tag2 into stamp. Apply event with tag1 should pass
    // i.e. We compare timestamp first, then DSID
    GemFireCacheImpl cache = mock(GemFireCacheImpl.class);
    LocalRegion lr = mock(LocalRegion.class);
    String value = "value";
    AbstractRegionEntry re = new TestableRegionEntry(lr, value);
    InternalDistributedMember member1 = mock(InternalDistributedMember.class);

    EntryEventImpl entryEvent1 = new EntryEventImpl();
    entryEvent1.setRegion(lr);
    when(lr.getCache()).thenReturn(cache);
    when(cache.getGatewayConflictResolver()).thenReturn(null);

    VersionTag tag1 = VersionTag.create(member1);
    tag1.setVersionTimeStamp(2);
    tag1.setDistributedSystemId(1);
    tag1.setIsGatewayTag(true);

    VersionTag tag2 = VersionTag.create(member1);
    tag2.setVersionTimeStamp(1);
    tag2.setDistributedSystemId(2);
    tag2.setIsGatewayTag(true);

    ((TestableRegionEntry) re).setVersions(tag2);
    assertEquals(2, ((TestableRegionEntry) re).getDistributedSystemId());

    // apply tag1 with bigger timestamp should pass
    entryEvent1.setVersionTag(tag1);
    re.processVersionTag(entryEvent1);
  }

  @Test
  public void applyingGatewayEventsFromSmallerDSWithSameTimestampShouldThrowCMEIfNoResolver() {
    // create 2 gateway events with different distributed system ids (DSIDs), with same timestamp
    // set the one with bigger DSID into stamp.
    // Apply the one with smaller DSID show throw CME
    GemFireCacheImpl cache = mock(GemFireCacheImpl.class);
    LocalRegion lr = mock(LocalRegion.class);
    String value = "value";
    AbstractRegionEntry re = new TestableRegionEntry(lr, value);
    InternalDistributedMember member1 = mock(InternalDistributedMember.class);

    EntryEventImpl entryEvent1 = new EntryEventImpl();
    entryEvent1.setRegion(lr);
    when(lr.getCache()).thenReturn(cache);
    when(cache.getGatewayConflictResolver()).thenReturn(null);

    VersionTag tag1 = VersionTag.create(member1);
    tag1.setVersionTimeStamp(1);
    tag1.setDistributedSystemId(1);
    tag1.setIsGatewayTag(true);

    VersionTag tag2 = VersionTag.create(member1);
    tag2.setVersionTimeStamp(1);
    tag2.setDistributedSystemId(2);
    tag2.setIsGatewayTag(true);

    ((TestableRegionEntry) re).setVersions(tag2);
    assertEquals(2, ((TestableRegionEntry) re).getDistributedSystemId());

    // apply tag1 with smaller timestamp should throw CME
    entryEvent1.setVersionTag(tag1);
    expectedException.expect(ConcurrentCacheModificationException.class);
    expectedException.expectMessage("conflicting WAN event detected");
    re.processVersionTag(entryEvent1);
  }

  @Test
  public void resolverShouldHandleConflictEventsFromDifferentDS() {
    // create 2 gateway events with different distributed system ids (DSIDs), with same timestamp
    // set the one with bigger DSID into stamp.
    // Usually, apply the one with smaller DSID should throw CME, but since there's resolver
    // resolver will accept the event
    GemFireCacheImpl cache = mock(GemFireCacheImpl.class);
    LocalRegion lr = mock(LocalRegion.class);
    String value = "value";
    AbstractRegionEntry re = new TestableRegionEntry(lr, value);
    InternalDistributedMember member1 = mock(InternalDistributedMember.class);

    EntryEventImpl entryEvent1 = new EntryEventImpl();
    entryEvent1 = Mockito.spy(entryEvent1);
    entryEvent1.setRegion(lr);
    when(lr.getCache()).thenReturn(cache);
    GatewayConflictResolver resolver = mock(GatewayConflictResolver.class);
    when(cache.getGatewayConflictResolver()).thenReturn(resolver);
    doNothing().when(resolver).onEvent(any(), any());
    TimestampedEntryEventImpl timestampedEvent = mock(TimestampedEntryEventImpl.class);
    // when(entryEvent1.getTimestampedEvent(anyInt(), anyInt(), anyLong(), anyLong()));
    doReturn(timestampedEvent).when(entryEvent1).getTimestampedEvent(anyInt(), anyInt(), anyLong(),
        anyLong());
    when(timestampedEvent.hasOldValue()).thenReturn(true);

    VersionTag tag1 = VersionTag.create(member1);
    tag1.setVersionTimeStamp(1);
    tag1.setDistributedSystemId(1);
    tag1.setIsGatewayTag(true);

    VersionTag tag2 = VersionTag.create(member1);
    tag2.setVersionTimeStamp(1);
    tag2.setDistributedSystemId(2);
    tag2.setIsGatewayTag(true);

    ((TestableRegionEntry) re).setVersions(tag2);
    assertEquals(2, ((TestableRegionEntry) re).getDistributedSystemId());

    entryEvent1.setVersionTag(tag1);
    re.processVersionTag(entryEvent1);
    verify(resolver, Mockito.times(1)).onEvent(any(), any());
  }

  public static class TestableRegionEntry extends AbstractRegionEntry
      implements OffHeapRegionEntry, VersionStamp {

    private Object value;
    private VersionTag tag;
    private long timeStamp = 0;
    private int dsId;

    protected TestableRegionEntry(RegionEntryContext context, Object value) {
      super(context, value);
    }

    @Override
    public void setVersionTimeStamp(long timeStamp) {
      this.timeStamp = timeStamp;
    }

    @Override
    public void setVersions(VersionTag tag) {
      this.tag = tag;
      timeStamp = tag.getVersionTimeStamp();
      dsId = tag.getDistributedSystemId();
    }

    @Override
    public void setMemberID(VersionSource memberID) {

    }

    @Override
    public VersionTag asVersionTag() {
      return tag;
    }

    @Override
    public void processVersionTag(InternalRegion region, VersionTag tag, boolean isTombstoneFromGII,
        boolean hasDelta, VersionSource versionSource,
        InternalDistributedMember sender, boolean checkConflicts) {

    }

    @Override
    public VersionStamp getVersionStamp() {
      return this;
    }

    @Override
    protected Object getValueField() {
      return value;
    }

    @Override
    protected void setValueField(Object v) {
      value = v;
    }

    @Override
    public void setValue(RegionEntryContext context, @Unretained Object value)
        throws RegionClearedException {
      super.setValue(context, value);
      if (value == Token.TOMBSTONE) {
        throw new RuntimeException("throw exception on setValue(TOMBSTONE)");
      }
    }

    @Override
    public int getEntryHash() {
      return 0;
    }

    @Override
    public HashEntry<Object, Object> getNextEntry() {
      return null;
    }

    @Override
    public void setNextEntry(HashEntry<Object, Object> n) {}

    @Override
    public Object getKey() {
      return null;
    }

    @Override
    protected long getLastModifiedField() {
      return 0;
    }

    @Override
    protected boolean compareAndSetLastModifiedField(long expectedValue, long newValue) {
      return false;
    }

    @Override
    protected void setEntryHash(int v) {}

    @Override
    public void release() {

    }

    @Override
    public long getAddress() {
      return 0;
    }

    @Override
    public boolean setAddress(long expectedAddr, long newAddr) {
      return false;
    }

    @Override
    public int getEntryVersion() {
      return 0;
    }

    @Override
    public long getRegionVersion() {
      return 0;
    }

    @Override
    public long getVersionTimeStamp() {
      return timeStamp;
    }

    @Override
    public VersionSource getMemberID() {
      return null;
    }

    @Override
    public int getDistributedSystemId() {
      return dsId;
    }

    @Override
    public short getRegionVersionHighBytes() {
      return 0;
    }

    @Override
    public int getRegionVersionLowBytes() {
      return 0;
    }
  }
}
