/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.distributedlog;

import java.io.ByteArrayInputStream;
import java.net.URI;
import java.util.Arrays;

import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;

import org.apache.distributedlog.exceptions.EndOfStreamException;
import com.twitter.util.Await;
import com.twitter.util.Duration;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.junit.Assert.*;

public class TestAppendOnlyStreamReader extends TestDistributedLogBase {
    static final Logger LOG = LoggerFactory.getLogger(TestAppendOnlyStreamReader.class);

    @Rule
    public TestName testNames = new TestName();

    // Simple test subroutine writes some records, reads some back, skips ahead, skips back.
    public void skipForwardThenSkipBack(String name, DistributedLogConfiguration conf) throws Exception {
        DistributedLogManager dlmwrite = createNewDLM(conf, name);
        DistributedLogManager dlmreader = createNewDLM(conf, name);

        long txid = 1;
        AppendOnlyStreamWriter writer = dlmwrite.getAppendOnlyStreamWriter();
        writer.write(DLMTestUtil.repeatString("abc", 5).getBytes());
        writer.write(DLMTestUtil.repeatString("abc", 5).getBytes());
        writer.write(DLMTestUtil.repeatString("def", 5).getBytes());
        writer.write(DLMTestUtil.repeatString("def", 5).getBytes());
        writer.write(DLMTestUtil.repeatString("ghi", 5).getBytes());
        writer.write(DLMTestUtil.repeatString("ghi", 5).getBytes());
        writer.force(false);
        writer.close();

        AppendOnlyStreamReader reader = dlmreader.getAppendOnlyStreamReader();
        byte[] bytesIn = new byte[30];

        byte[] bytes1 = DLMTestUtil.repeatString("abc", 10).getBytes();
        byte[] bytes2 = DLMTestUtil.repeatString("def", 10).getBytes();
        byte[] bytes3 = DLMTestUtil.repeatString("ghi", 10).getBytes();

        int read = reader.read(bytesIn, 0, 30);
        assertEquals(30, read);
        assertTrue(Arrays.equals(bytes1, bytesIn));

        reader.skipTo(60);
        read = reader.read(bytesIn, 0, 30);
        assertEquals(30, read);
        assertTrue(Arrays.equals(bytes3, bytesIn));

        reader.skipTo(30);
        read = reader.read(bytesIn, 0, 30);
        assertEquals(30, read);
        assertTrue(Arrays.equals(bytes2, bytesIn));
    }

    @Test(timeout = 60000)
    public void testSkipToSkipsBytesWithImmediateFlush() throws Exception {
        String name = testNames.getMethodName();

        DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
        confLocal.loadConf(conf);
        confLocal.setImmediateFlushEnabled(true);
        confLocal.setOutputBufferSize(0);

        skipForwardThenSkipBack(name, confLocal);
    }

    @Test(timeout = 60000)
    public void testSkipToSkipsBytesWithLargerLogRecords() throws Exception {
        String name = testNames.getMethodName();

        DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
        confLocal.loadConf(conf);
        confLocal.setImmediateFlushEnabled(false);
        confLocal.setOutputBufferSize(1024*100);
        confLocal.setPeriodicFlushFrequencyMilliSeconds(1000*60);

        skipForwardThenSkipBack(name, confLocal);
    }

    @Test(timeout = 60000)
    public void testSkipToSkipsBytesUntilEndOfStream() throws Exception {
        String name = testNames.getMethodName();

        DistributedLogManager dlmwrite = createNewDLM(conf, name);
        DistributedLogManager dlmreader = createNewDLM(conf, name);

        long txid = 1;
        AppendOnlyStreamWriter writer = dlmwrite.getAppendOnlyStreamWriter();
        writer.write(DLMTestUtil.repeatString("abc", 5).getBytes());
        writer.markEndOfStream();
        writer.force(false);
        writer.close();

        AppendOnlyStreamReader reader = dlmreader.getAppendOnlyStreamReader();
        byte[] bytesIn = new byte[9];

        int read = reader.read(bytesIn, 0, 9);
        assertEquals(9, read);
        assertTrue(Arrays.equals(DLMTestUtil.repeatString("abc", 3).getBytes(), bytesIn));

        assertTrue(reader.skipTo(15));

        try {
            read = reader.read(bytesIn, 0, 1);
            fail("Should have thrown");
        } catch (EndOfStreamException ex) {
        }

        assertTrue(reader.skipTo(0));

        try {
            reader.skipTo(16);
            fail("Should have thrown");
        } catch (EndOfStreamException ex) {
        }
    }

    @Test(timeout = 60000)
    public void testSkipToreturnsFalseIfPositionDoesNotExistYetForUnSealedStream() throws Exception {
        String name = testNames.getMethodName();

        DistributedLogManager dlmwrite = createNewDLM(conf, name);
        DistributedLogManager dlmreader = createNewDLM(conf, name);

        long txid = 1;
        AppendOnlyStreamWriter writer = dlmwrite.getAppendOnlyStreamWriter();
        writer.write(DLMTestUtil.repeatString("abc", 5).getBytes());
        writer.close();

        final AppendOnlyStreamReader reader = dlmreader.getAppendOnlyStreamReader();
        byte[] bytesIn = new byte[9];

        int read = reader.read(bytesIn, 0, 9);
        assertEquals(9, read);
        assertTrue(Arrays.equals(DLMTestUtil.repeatString("abc", 3).getBytes(), bytesIn));

        assertFalse(reader.skipTo(16));
        assertFalse(reader.skipTo(16));

        AppendOnlyStreamWriter writer2 = dlmwrite.getAppendOnlyStreamWriter();
        writer2.write(DLMTestUtil.repeatString("abc", 5).getBytes());
        writer2.close();

        assertTrue(reader.skipTo(16));

        byte[] bytesIn2 = new byte[5];
        read = reader.read(bytesIn2, 0, 5);
        assertEquals(5, read);
        assertTrue(Arrays.equals("bcabc".getBytes(), bytesIn2));
    }

    @Test(timeout = 60000)
    public void testSkipToForNoPositionChange() throws Exception {
        String name = testNames.getMethodName();

        DistributedLogManager dlmwrite = createNewDLM(conf, name);
        DistributedLogManager dlmreader = createNewDLM(conf, name);

        long txid = 1;
        AppendOnlyStreamWriter writer = dlmwrite.getAppendOnlyStreamWriter();
        writer.write(DLMTestUtil.repeatString("abc", 5).getBytes());
        writer.close();

        final AppendOnlyStreamReader reader = dlmreader.getAppendOnlyStreamReader();

        assertTrue(reader.skipTo(0));

        byte[] bytesIn = new byte[4];
        int read = reader.read(bytesIn, 0, 4);
        assertEquals(4, read);
        assertEquals(new String("abca"), new String(bytesIn));

        assertTrue(reader.skipTo(reader.position()));

        assertTrue(reader.skipTo(1));

        read = reader.read(bytesIn, 0, 4);
        assertEquals(4, read);
        assertEquals(new String("bcab"), new String(bytesIn));
    }
}
