/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.beam.sdk.transforms.splittabledofn;

import static org.apache.beam.sdk.transforms.splittabledofn.ByteKeyRangeTracker.next;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.lessThan;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;

import java.math.BigDecimal;
import org.apache.beam.sdk.io.range.ByteKey;
import org.apache.beam.sdk.io.range.ByteKeyRange;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

/** Tests for {@link ByteKeyRangeTrackerTest}. */
@RunWith(JUnit4.class)
public class ByteKeyRangeTrackerTest {
  @Rule public final ExpectedException expected = ExpectedException.none();

  @Test
  public void testTryClaim() throws Exception {
    ByteKeyRange range = ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0));
    ByteKeyRangeTracker tracker = ByteKeyRangeTracker.of(range);
    assertEquals(range, tracker.currentRestriction());
    assertTrue(tracker.tryClaim(ByteKey.of(0x10)));
    assertTrue(tracker.tryClaim(ByteKey.of(0x10, 0x00)));
    assertTrue(tracker.tryClaim(ByteKey.of(0x10, 0x00, 0x00)));
    assertTrue(tracker.tryClaim(ByteKey.of(0x50)));
    assertTrue(tracker.tryClaim(ByteKey.of(0x99)));
    assertFalse(tracker.tryClaim(ByteKey.of(0xc0)));
  }

  @Test
  public void testCheckpointUnstarted() throws Exception {
    ByteKeyRangeTracker tracker =
        ByteKeyRangeTracker.of(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0)));

    ByteKeyRange checkpoint = tracker.checkpoint();
    // We expect to get the original range back and that the current restriction
    // is effectively made empty.
    assertEquals(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0)), checkpoint);
    assertEquals(ByteKeyRangeTracker.NO_KEYS, tracker.currentRestriction());
  }

  @Test
  public void testCheckpointUnstartedForAllKeysRange() throws Exception {
    ByteKeyRangeTracker tracker = ByteKeyRangeTracker.of(ByteKeyRange.ALL_KEYS);

    ByteKeyRange checkpoint = tracker.checkpoint();
    // We expect to get the original range back and that the current restriction
    // is effectively made empty.
    assertEquals(ByteKeyRange.ALL_KEYS, checkpoint);
    assertEquals(ByteKeyRangeTracker.NO_KEYS, tracker.currentRestriction());
  }

  @Test
  public void testCheckpointOnlyFailedClaim() throws Exception {
    ByteKeyRangeTracker tracker =
        ByteKeyRangeTracker.of(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0)));
    assertFalse(tracker.tryClaim(ByteKey.of(0xd0)));
    ByteKeyRange checkpoint = tracker.checkpoint();
    assertEquals(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0)), tracker.currentRestriction());
    assertEquals(ByteKeyRangeTracker.NO_KEYS, checkpoint);
  }

  @Test
  public void testCheckpointJustStarted() throws Exception {
    ByteKeyRangeTracker tracker =
        ByteKeyRangeTracker.of(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0)));
    assertTrue(tracker.tryClaim(ByteKey.of(0x10)));
    ByteKeyRange checkpoint = tracker.checkpoint();
    assertEquals(
        ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0x10, 0x00)), tracker.currentRestriction());
    assertEquals(ByteKeyRange.of(ByteKey.of(0x10, 0x00), ByteKey.of(0xc0)), checkpoint);
  }

  @Test
  public void testCheckpointRegular() throws Exception {
    ByteKeyRangeTracker tracker =
        ByteKeyRangeTracker.of(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0)));
    assertTrue(tracker.tryClaim(ByteKey.of(0x50)));
    assertTrue(tracker.tryClaim(ByteKey.of(0x90)));
    ByteKeyRange checkpoint = tracker.checkpoint();
    assertEquals(
        ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0x90, 0x00)), tracker.currentRestriction());
    assertEquals(ByteKeyRange.of(ByteKey.of(0x90, 0x00), ByteKey.of(0xc0)), checkpoint);
  }

  @Test
  public void testCheckpointAtLast() throws Exception {
    ByteKeyRangeTracker tracker =
        ByteKeyRangeTracker.of(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0)));
    assertTrue(tracker.tryClaim(ByteKey.of(0x50)));
    assertTrue(tracker.tryClaim(ByteKey.of(0x90)));
    assertFalse(tracker.tryClaim(ByteKey.of(0xc0)));
    ByteKeyRange checkpoint = tracker.checkpoint();
    assertEquals(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0)), tracker.currentRestriction());
    assertEquals(ByteKeyRangeTracker.NO_KEYS, checkpoint);
  }

  @Test
  public void testCheckpointAtLastUsingAllKeysAndEmptyKey() throws Exception {
    ByteKeyRangeTracker tracker = ByteKeyRangeTracker.of(ByteKeyRange.ALL_KEYS);
    assertTrue(tracker.tryClaim(ByteKey.of(0x50)));
    assertTrue(tracker.tryClaim(ByteKey.of(0x90)));
    assertFalse(tracker.tryClaim(ByteKey.EMPTY));
    ByteKeyRange checkpoint = tracker.checkpoint();
    assertEquals(ByteKeyRange.ALL_KEYS, tracker.currentRestriction());
    assertEquals(ByteKeyRangeTracker.NO_KEYS, checkpoint);
  }

  @Test
  public void testCheckpointAfterLast() throws Exception {
    ByteKeyRangeTracker tracker =
        ByteKeyRangeTracker.of(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0)));
    assertTrue(tracker.tryClaim(ByteKey.of(0x50)));
    assertTrue(tracker.tryClaim(ByteKey.of(0x90)));
    assertTrue(tracker.tryClaim(ByteKey.of(0xa0)));
    assertFalse(tracker.tryClaim(ByteKey.of(0xd0)));
    ByteKeyRange checkpoint = tracker.checkpoint();
    assertEquals(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0)), tracker.currentRestriction());
    assertEquals(ByteKeyRangeTracker.NO_KEYS, checkpoint);
  }

  @Test
  public void testCheckpointAfterLastUsingEmptyKey() throws Exception {
    ByteKeyRangeTracker tracker =
        ByteKeyRangeTracker.of(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0)));
    assertTrue(tracker.tryClaim(ByteKey.of(0x50)));
    assertTrue(tracker.tryClaim(ByteKey.of(0x90)));
    assertTrue(tracker.tryClaim(ByteKey.of(0xa0)));
    assertFalse(tracker.tryClaim(ByteKey.EMPTY));
    ByteKeyRange checkpoint = tracker.checkpoint();
    assertEquals(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0)), tracker.currentRestriction());
    assertEquals(ByteKeyRangeTracker.NO_KEYS, checkpoint);
  }

  @Test
  public void testNonMonotonicClaim() throws Exception {
    ByteKeyRangeTracker tracker =
        ByteKeyRangeTracker.of(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0)));
    assertTrue(tracker.tryClaim(ByteKey.of(0x50)));
    assertTrue(tracker.tryClaim(ByteKey.of(0x90)));
    expected.expectMessage("Trying to claim key [70] while last attempted key was [90]");
    tracker.tryClaim(ByteKey.of(0x70));
  }

  @Test
  public void testClaimBeforeStartOfRange() throws Exception {
    ByteKeyRangeTracker tracker =
        ByteKeyRangeTracker.of(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0)));
    expected.expectMessage(
        "Trying to claim key [05] before start of the range "
            + "ByteKeyRange{startKey=[10], endKey=[c0]}");
    tracker.tryClaim(ByteKey.of(0x05));
  }

  @Test
  public void testCheckDoneAfterTryClaimPastEndOfRange() {
    ByteKeyRangeTracker tracker =
        ByteKeyRangeTracker.of(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0)));
    assertTrue(tracker.tryClaim(ByteKey.of(0x50)));
    assertTrue(tracker.tryClaim(ByteKey.of(0x90)));
    assertFalse(tracker.tryClaim(ByteKey.of(0xd0)));
    tracker.checkDone();
  }

  @Test
  public void testCheckDoneAfterTryClaimAtEndOfRange() {
    ByteKeyRangeTracker tracker =
        ByteKeyRangeTracker.of(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0)));
    assertTrue(tracker.tryClaim(ByteKey.of(0x50)));
    assertTrue(tracker.tryClaim(ByteKey.of(0x90)));
    assertFalse(tracker.tryClaim(ByteKey.of(0xc0)));
    tracker.checkDone();
  }

  @Test
  public void testCheckDoneWhenClaimingEndOfRangeForEmptyKey() {
    ByteKeyRangeTracker tracker =
        ByteKeyRangeTracker.of(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.EMPTY));
    assertTrue(tracker.tryClaim(ByteKey.of(0x50)));
    assertTrue(tracker.tryClaim(ByteKey.of(0x90)));
    assertFalse(tracker.tryClaim(ByteKey.EMPTY));
    tracker.checkDone();
  }

  @Test
  public void testCheckDoneAfterTryClaimRightBeforeEndOfRange() {
    ByteKeyRangeTracker tracker =
        ByteKeyRangeTracker.of(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0)));
    assertTrue(tracker.tryClaim(ByteKey.of(0x50)));
    assertTrue(tracker.tryClaim(ByteKey.of(0x90)));
    assertTrue(tracker.tryClaim(ByteKey.of(0xbf)));
    expected.expectMessage(
        "Last attempted key was [bf] in range ByteKeyRange{startKey=[10], endKey=[c0]}, "
            + "claiming work in [[bf00], [c0]) was not attempted");
    tracker.checkDone();
  }

  @Test
  public void testCheckDoneForEmptyRange() {
    ByteKeyRangeTracker tracker = ByteKeyRangeTracker.of(ByteKeyRangeTracker.NO_KEYS);
    tracker.checkDone();
  }

  @Test
  public void testCheckDoneWhenNotDone() {
    ByteKeyRangeTracker tracker =
        ByteKeyRangeTracker.of(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0)));
    assertTrue(tracker.tryClaim(ByteKey.of(0x50)));
    assertTrue(tracker.tryClaim(ByteKey.of(0x90)));
    expected.expectMessage(
        "Last attempted key was [90] in range ByteKeyRange{startKey=[10], endKey=[c0]}, "
            + "claiming work in [[9000], [c0]) was not attempted");
    tracker.checkDone();
  }

  @Test
  public void testCheckDoneUnstarted() {
    ByteKeyRangeTracker tracker =
        ByteKeyRangeTracker.of(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0)));
    expected.expect(IllegalStateException.class);
    tracker.checkDone();
  }

  @Test
  public void testNextByteKey() {
    assertEquals(next(ByteKey.EMPTY), ByteKey.of(0x00));
    assertEquals(next(ByteKey.of(0x00)), ByteKey.of(0x00, 0x00));
    assertEquals(next(ByteKey.of(0x9f)), ByteKey.of(0x9f, 0x00));
    assertEquals(next(ByteKey.of(0xff)), ByteKey.of(0xff, 0x00));
    assertEquals(next(ByteKey.of(0x10, 0x10)), ByteKey.of(0x10, 0x10, 0x00));
    assertEquals(next(ByteKey.of(0x00, 0xff)), ByteKey.of(0x00, 0xff, 0x00));
    assertEquals(next(ByteKey.of(0xff, 0xff)), ByteKey.of(0xff, 0xff, 0x00));
  }

  @Test
  public void testBacklogUnstarted() {
    ByteKeyRangeTracker tracker = ByteKeyRangeTracker.of(ByteKeyRange.ALL_KEYS);
    assertEquals(BigDecimal.ONE, tracker.getBacklog().backlog());

    tracker = ByteKeyRangeTracker.of(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0)));
    assertEquals(BigDecimal.ONE, tracker.getBacklog().backlog());
  }

  @Test
  public void testBacklogFinished() {
    ByteKeyRangeTracker tracker = ByteKeyRangeTracker.of(ByteKeyRange.ALL_KEYS);
    tracker.tryClaim(ByteKey.EMPTY);
    assertEquals(BigDecimal.ZERO, tracker.getBacklog().backlog());

    tracker = ByteKeyRangeTracker.of(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0)));
    tracker.tryClaim(ByteKey.of(0xd0));
    assertEquals(BigDecimal.ZERO, tracker.getBacklog().backlog());
  }

  @Test
  public void testBacklogPartiallyCompleted() {
    ByteKeyRangeTracker tracker = ByteKeyRangeTracker.of(ByteKeyRange.ALL_KEYS);
    tracker.tryClaim(ByteKey.of(0xa0));
    assertThat(
        tracker.getBacklog().backlog(),
        allOf(greaterThan(BigDecimal.ZERO), lessThan(BigDecimal.ONE)));

    tracker = ByteKeyRangeTracker.of(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0)));
    tracker.tryClaim(ByteKey.of(0xa0));
    assertThat(
        tracker.getBacklog().backlog(),
        allOf(greaterThan(BigDecimal.ZERO), lessThan(BigDecimal.ONE)));
  }
}
