/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.distributedlog;

import org.apache.distributedlog.exceptions.DLIllegalStateException;
import org.apache.distributedlog.exceptions.UnexpectedException;
import org.apache.distributedlog.metadata.LogMetadata;
import org.apache.distributedlog.namespace.DistributedLogNamespace;
import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder;
import org.apache.distributedlog.util.DLUtils;
import org.apache.bookkeeper.meta.ZkVersion;
import org.apache.bookkeeper.versioning.Versioned;
import org.apache.zookeeper.data.Stat;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.URI;
import java.util.List;

import static com.google.common.base.Charsets.UTF_8;
import static org.junit.Assert.*;

public class TestLogSegmentsZK extends TestDistributedLogBase {

    static Logger LOG = LoggerFactory.getLogger(TestLogSegmentsZK.class);

    private static MaxLogSegmentSequenceNo getMaxLogSegmentSequenceNo(ZooKeeperClient zkc, URI uri, String streamName,
                                                                      DistributedLogConfiguration conf) throws Exception {
        Stat stat = new Stat();
        String logSegmentsPath = LogMetadata.getLogSegmentsPath(
                uri, streamName, conf.getUnpartitionedStreamName());
        byte[] data = zkc.get().getData(logSegmentsPath, false, stat);
        Versioned<byte[]> maxLSSNData = new Versioned<byte[]>(data, new ZkVersion(stat.getVersion()));
        return new MaxLogSegmentSequenceNo(maxLSSNData);
    }

    private static void updateMaxLogSegmentSequenceNo(ZooKeeperClient zkc, URI uri, String streamName,
                                                      DistributedLogConfiguration conf, byte[] data) throws Exception {
        String logSegmentsPath = LogMetadata.getLogSegmentsPath(
                uri, streamName, conf.getUnpartitionedStreamName());
        zkc.get().setData(logSegmentsPath, data, -1);
    }

    @Rule
    public TestName testName = new TestName();

    private URI createURI() throws Exception {
        return createDLMURI("/" + testName.getMethodName());
    }

    /**
     * Create Log Segment for an pre-create stream. No max ledger sequence number recorded.
     */
    @Test(timeout = 60000)
    public void testCreateLogSegmentOnPrecreatedStream() throws Exception {
        URI uri = createURI();
        String streamName = testName.getMethodName();
        DistributedLogConfiguration conf = new DistributedLogConfiguration()
                .setLockTimeout(99999)
                .setOutputBufferSize(0)
                .setImmediateFlushEnabled(true)
                .setEnableLedgerAllocatorPool(true)
                .setLedgerAllocatorPoolName("test");
        DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder().conf(conf).uri(uri).build();

        namespace.createLog(streamName);
        MaxLogSegmentSequenceNo max1 = getMaxLogSegmentSequenceNo(getZooKeeperClient(namespace), uri, streamName, conf);
        assertEquals(DistributedLogConstants.UNASSIGNED_LOGSEGMENT_SEQNO, max1.getSequenceNumber());
        DistributedLogManager dlm = namespace.openLog(streamName);
        final int numSegments = 3;
        for (int i = 0; i < numSegments; i++) {
            BKSyncLogWriter out = (BKSyncLogWriter) dlm.startLogSegmentNonPartitioned();
            out.write(DLMTestUtil.getLogRecordInstance(i));
            out.closeAndComplete();
        }
        MaxLogSegmentSequenceNo max2 = getMaxLogSegmentSequenceNo(getZooKeeperClient(namespace), uri, streamName, conf);
        assertEquals(3, max2.getSequenceNumber());
        dlm.close();
        namespace.close();
    }

    /**
     * Create Log Segment when no max sequence number recorded in /ledgers. e.g. old version.
     */
    @Test(timeout = 60000)
    public void testCreateLogSegmentMissingMaxSequenceNumber() throws Exception {
        URI uri = createURI();
        String streamName = testName.getMethodName();
        DistributedLogConfiguration conf = new DistributedLogConfiguration()
                .setLockTimeout(99999)
                .setOutputBufferSize(0)
                .setImmediateFlushEnabled(true)
                .setEnableLedgerAllocatorPool(true)
                .setLedgerAllocatorPoolName("test");
        DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder().conf(conf).uri(uri).build();

        namespace.createLog(streamName);
        MaxLogSegmentSequenceNo max1 = getMaxLogSegmentSequenceNo(getZooKeeperClient(namespace), uri, streamName, conf);
        assertEquals(DistributedLogConstants.UNASSIGNED_LOGSEGMENT_SEQNO, max1.getSequenceNumber());
        DistributedLogManager dlm = namespace.openLog(streamName);
        final int numSegments = 3;
        for (int i = 0; i < numSegments; i++) {
            BKSyncLogWriter out = (BKSyncLogWriter) dlm.startLogSegmentNonPartitioned();
            out.write(DLMTestUtil.getLogRecordInstance(i));
            out.closeAndComplete();
        }
        MaxLogSegmentSequenceNo max2 = getMaxLogSegmentSequenceNo(getZooKeeperClient(namespace), uri, streamName, conf);
        assertEquals(3, max2.getSequenceNumber());

        // nuke the max ledger sequence number
        updateMaxLogSegmentSequenceNo(getZooKeeperClient(namespace), uri, streamName, conf, new byte[0]);
        DistributedLogManager dlm1 = namespace.openLog(streamName);
        try {
            dlm1.startLogSegmentNonPartitioned();
            fail("Should fail with unexpected exceptions");
        } catch (UnexpectedException ue) {
            // expected
        } finally {
            dlm1.close();
        }

        // invalid max ledger sequence number
        updateMaxLogSegmentSequenceNo(getZooKeeperClient(namespace), uri, streamName, conf, "invalid-max".getBytes(UTF_8));
        DistributedLogManager dlm2 = namespace.openLog(streamName);
        try {
            dlm2.startLogSegmentNonPartitioned();
            fail("Should fail with unexpected exceptions");
        } catch (UnexpectedException ue) {
            // expected
        } finally {
            dlm2.close();
        }

        dlm.close();
        namespace.close();
    }

    /**
     * Create Log Segment while max sequence number isn't match with list of log segments.
     */
    @Test(timeout = 60000)
    public void testCreateLogSegmentUnmatchMaxSequenceNumber() throws Exception {
        URI uri = createURI();
        String streamName = testName.getMethodName();
        DistributedLogConfiguration conf = new DistributedLogConfiguration()
                .setLockTimeout(99999)
                .setOutputBufferSize(0)
                .setImmediateFlushEnabled(true)
                .setEnableLedgerAllocatorPool(true)
                .setLedgerAllocatorPoolName("test");
        DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder().conf(conf).uri(uri).build();

        namespace.createLog(streamName);
        MaxLogSegmentSequenceNo max1 = getMaxLogSegmentSequenceNo(getZooKeeperClient(namespace), uri, streamName, conf);
        assertEquals(DistributedLogConstants.UNASSIGNED_LOGSEGMENT_SEQNO, max1.getSequenceNumber());
        DistributedLogManager dlm = namespace.openLog(streamName);
        final int numSegments = 3;
        for (int i = 0; i < numSegments; i++) {
            BKSyncLogWriter out = (BKSyncLogWriter) dlm.startLogSegmentNonPartitioned();
            out.write(DLMTestUtil.getLogRecordInstance(i));
            out.closeAndComplete();
        }
        MaxLogSegmentSequenceNo max2 = getMaxLogSegmentSequenceNo(getZooKeeperClient(namespace), uri, streamName, conf);
        assertEquals(3, max2.getSequenceNumber());

        // update the max ledger sequence number
        updateMaxLogSegmentSequenceNo(getZooKeeperClient(namespace), uri, streamName, conf,
                DLUtils.serializeLogSegmentSequenceNumber(99));

        DistributedLogManager dlm1 = namespace.openLog(streamName);
        try {
            BKSyncLogWriter out1 = (BKSyncLogWriter) dlm1.startLogSegmentNonPartitioned();
            out1.write(DLMTestUtil.getLogRecordInstance(numSegments+1));
            out1.closeAndComplete();
            fail("Should fail creating new log segment when encountered unmatch max ledger sequence number");
        } catch (DLIllegalStateException lse) {
            // expected
        } finally {
            dlm1.close();
        }

        DistributedLogManager dlm2 = namespace.openLog(streamName);
        List<LogSegmentMetadata> segments = dlm2.getLogSegments();
        try {
            assertEquals(3, segments.size());
            assertEquals(1L, segments.get(0).getLogSegmentSequenceNumber());
            assertEquals(2L, segments.get(1).getLogSegmentSequenceNumber());
            assertEquals(3L, segments.get(2).getLogSegmentSequenceNumber());
        } finally {
            dlm2.close();
        }

        dlm.close();
        namespace.close();
    }

    @Test(timeout = 60000)
    public void testCompleteLogSegmentConflicts() throws Exception {
        URI uri = createURI();
        String streamName = testName.getMethodName();
        DistributedLogConfiguration conf = new DistributedLogConfiguration()
                .setLockTimeout(99999)
                .setOutputBufferSize(0)
                .setImmediateFlushEnabled(true)
                .setEnableLedgerAllocatorPool(true)
                .setLedgerAllocatorPoolName("test");
        DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder().conf(conf).uri(uri).build();

        namespace.createLog(streamName);
        DistributedLogManager dlm1 = namespace.openLog(streamName);
        DistributedLogManager dlm2 = namespace.openLog(streamName);

        // dlm1 is writing
        BKSyncLogWriter out1 = (BKSyncLogWriter) dlm1.startLogSegmentNonPartitioned();
        out1.write(DLMTestUtil.getLogRecordInstance(1));
        // before out1 complete, out2 is in on recovery
        // it completed the log segments which bump the version of /ledgers znode
        BKAsyncLogWriter out2 = (BKAsyncLogWriter) dlm2.startAsyncLogSegmentNonPartitioned();

        try {
            out1.closeAndComplete();
            fail("Should fail closeAndComplete since other people already completed it.");
        } catch (IOException ioe) {
        }
    }
}
