blob: 0085cb12ead52102fe38e57ac8cfcafabcb53c17 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.realtime;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.segment.IncrementalIndexSegment;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.SegmentReference;
import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.TestIndex;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.apache.druid.timeline.SegmentId;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.IOException;
import java.util.Optional;
import java.util.function.Function;
public class FireHydrantTest extends InitializedNullHandlingTest
{
private IncrementalIndexSegment incrementalIndexSegment;
private QueryableIndexSegment queryableIndexSegment;
private FireHydrant hydrant;
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Before
public void setup()
{
incrementalIndexSegment = new IncrementalIndexSegment(TestIndex.getIncrementalTestIndex(), SegmentId.dummy("test"));
queryableIndexSegment = new QueryableIndexSegment(TestIndex.getMMappedTestIndex(), SegmentId.dummy("test"));
// hydrant starts out with incremental segment loaded
hydrant = new FireHydrant(incrementalIndexSegment, 0);
}
@Test
public void testGetIncrementedSegmentNotSwapped()
{
Assert.assertEquals(0, hydrant.getHydrantSegment().getNumReferences());
ReferenceCountingSegment segment = hydrant.getIncrementedSegment();
Assert.assertNotNull(segment);
Assert.assertTrue(segment.getBaseSegment() == incrementalIndexSegment);
Assert.assertEquals(1, segment.getNumReferences());
}
@Test
public void testGetIncrementedSegmentSwapped()
{
ReferenceCountingSegment incrementalSegmentReference = hydrant.getHydrantSegment();
Assert.assertEquals(0, incrementalSegmentReference.getNumReferences());
hydrant.swapSegment(queryableIndexSegment);
ReferenceCountingSegment segment = hydrant.getIncrementedSegment();
Assert.assertNotNull(segment);
Assert.assertTrue(segment.getBaseSegment() == queryableIndexSegment);
Assert.assertEquals(0, incrementalSegmentReference.getNumReferences());
Assert.assertEquals(1, segment.getNumReferences());
}
@Test
public void testGetIncrementedSegmentClosed()
{
expectedException.expect(ISE.class);
expectedException.expectMessage("segment.close() is called somewhere outside FireHydrant.swapSegment()");
hydrant.getHydrantSegment().close();
Assert.assertEquals(0, hydrant.getHydrantSegment().getNumReferences());
ReferenceCountingSegment segment = hydrant.getIncrementedSegment();
}
@Test
public void testGetAndIncrementSegment() throws IOException
{
ReferenceCountingSegment incrementalSegmentReference = hydrant.getHydrantSegment();
Assert.assertEquals(0, incrementalSegmentReference.getNumReferences());
Pair<ReferenceCountingSegment, Closeable> segmentAndCloseable = hydrant.getAndIncrementSegment();
Assert.assertEquals(1, segmentAndCloseable.lhs.getNumReferences());
segmentAndCloseable.rhs.close();
Assert.assertEquals(0, segmentAndCloseable.lhs.getNumReferences());
}
@Test
public void testGetSegmentForQuery() throws IOException
{
ReferenceCountingSegment incrementalSegmentReference = hydrant.getHydrantSegment();
Assert.assertEquals(0, incrementalSegmentReference.getNumReferences());
Optional<Pair<SegmentReference, Closeable>> maybeSegmentAndCloseable = hydrant.getSegmentForQuery(
Function.identity()
);
Assert.assertTrue(maybeSegmentAndCloseable.isPresent());
Assert.assertEquals(1, incrementalSegmentReference.getNumReferences());
Pair<SegmentReference, Closeable> segmentAndCloseable = maybeSegmentAndCloseable.get();
segmentAndCloseable.rhs.close();
Assert.assertEquals(0, incrementalSegmentReference.getNumReferences());
}
@Test
public void testGetSegmentForQuerySwapped() throws IOException
{
ReferenceCountingSegment incrementalSegmentReference = hydrant.getHydrantSegment();
hydrant.swapSegment(queryableIndexSegment);
ReferenceCountingSegment queryableSegmentReference = hydrant.getHydrantSegment();
Assert.assertEquals(0, incrementalSegmentReference.getNumReferences());
Assert.assertEquals(0, queryableSegmentReference.getNumReferences());
Optional<Pair<SegmentReference, Closeable>> maybeSegmentAndCloseable = hydrant.getSegmentForQuery(
Function.identity()
);
Assert.assertTrue(maybeSegmentAndCloseable.isPresent());
Assert.assertEquals(0, incrementalSegmentReference.getNumReferences());
Assert.assertEquals(1, queryableSegmentReference.getNumReferences());
Pair<SegmentReference, Closeable> segmentAndCloseable = maybeSegmentAndCloseable.get();
segmentAndCloseable.rhs.close();
Assert.assertEquals(0, incrementalSegmentReference.getNumReferences());
Assert.assertEquals(0, queryableSegmentReference.getNumReferences());
}
@Test
public void testGetSegmentForQueryButNotAbleToAcquireReferences()
{
ReferenceCountingSegment incrementalSegmentReference = hydrant.getHydrantSegment();
Assert.assertEquals(0, incrementalSegmentReference.getNumReferences());
Optional<Pair<SegmentReference, Closeable>> maybeSegmentAndCloseable = hydrant.getSegmentForQuery(
segmentReference -> new SegmentReference()
{
@Override
public Optional<Closeable> acquireReferences()
{
return Optional.empty();
}
@Override
public SegmentId getId()
{
return incrementalIndexSegment.getId();
}
@Override
public Interval getDataInterval()
{
return incrementalIndexSegment.getDataInterval();
}
@Nullable
@Override
public QueryableIndex asQueryableIndex()
{
return incrementalIndexSegment.asQueryableIndex();
}
@Override
public StorageAdapter asStorageAdapter()
{
return incrementalIndexSegment.asStorageAdapter();
}
@Override
public void close()
{
incrementalIndexSegment.close();
}
}
);
Assert.assertFalse(maybeSegmentAndCloseable.isPresent());
Assert.assertEquals(0, incrementalSegmentReference.getNumReferences());
}
@Test
public void testGetSegmentForQueryButNotAbleToAcquireReferencesSegmentClosed()
{
expectedException.expect(ISE.class);
expectedException.expectMessage("segment.close() is called somewhere outside FireHydrant.swapSegment()");
ReferenceCountingSegment incrementalSegmentReference = hydrant.getHydrantSegment();
Assert.assertEquals(0, incrementalSegmentReference.getNumReferences());
incrementalSegmentReference.close();
Optional<Pair<SegmentReference, Closeable>> maybeSegmentAndCloseable = hydrant.getSegmentForQuery(
Function.identity()
);
}
@Test
@SuppressWarnings("ReturnValueIgnored")
public void testToStringWhenSwappedWithNull()
{
hydrant.swapSegment(null);
hydrant.toString();
}
}