blob: 137280e4926c32d453bac0e5d6fd5bfb647bf208 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.realtime;
import com.google.common.annotations.VisibleForTesting;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.segment.IncrementalIndexSegment;
import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.SegmentReference;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.timeline.SegmentId;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.Closeable;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
/**
*/
public class FireHydrant
{
private final int count;
private final AtomicReference<ReferenceCountingSegment> adapter;
private volatile IncrementalIndex index;
public FireHydrant(IncrementalIndex index, int count, SegmentId segmentId)
{
this.index = index;
this.adapter = new AtomicReference<>(
ReferenceCountingSegment.wrapRootGenerationSegment(new IncrementalIndexSegment(index, segmentId))
);
this.count = count;
}
public FireHydrant(Segment adapter, int count)
{
this.index = null;
this.adapter = new AtomicReference<>(ReferenceCountingSegment.wrapRootGenerationSegment(adapter));
this.count = count;
}
public IncrementalIndex getIndex()
{
return index;
}
public SegmentId getSegmentId()
{
return adapter.get().getId();
}
public Interval getSegmentDataInterval()
{
return adapter.get().getDataInterval();
}
public int getCount()
{
return count;
}
public boolean hasSwapped()
{
return index == null;
}
public void swapSegment(@Nullable Segment newSegment)
{
while (true) {
ReferenceCountingSegment currentSegment = adapter.get();
if (currentSegment == null && newSegment == null) {
return;
}
if (currentSegment != null && newSegment != null &&
!newSegment.getId().equals(currentSegment.getId())) {
// Sanity check: identifier should not change
throw new ISE(
"Cannot swap identifier[%s] -> [%s]",
currentSegment.getId(),
newSegment.getId()
);
}
if (currentSegment == newSegment) {
throw new ISE("Cannot swap to the same segment");
}
ReferenceCountingSegment newReferenceCountingSegment =
newSegment != null ? ReferenceCountingSegment.wrapRootGenerationSegment(newSegment) : null;
if (adapter.compareAndSet(currentSegment, newReferenceCountingSegment)) {
if (currentSegment != null) {
currentSegment.close();
}
index = null;
return;
}
}
}
public ReferenceCountingSegment getIncrementedSegment()
{
ReferenceCountingSegment segment = adapter.get();
while (true) {
if (segment.increment()) {
return segment;
}
// segment.increment() returned false, means it is closed. Since close() in swapSegment() happens after segment
// swap, the new segment should already be visible.
ReferenceCountingSegment newSegment = adapter.get();
if (segment == newSegment) {
throw new ISE("segment.close() is called somewhere outside FireHydrant.swapSegment()");
}
if (newSegment == null) {
throw new ISE("FireHydrant was 'closed' by swapping segment to null while acquiring a segment");
}
segment = newSegment;
// Spin loop.
}
}
public Pair<ReferenceCountingSegment, Closeable> getAndIncrementSegment()
{
ReferenceCountingSegment segment = getIncrementedSegment();
return new Pair<>(segment, segment.decrementOnceCloseable());
}
/**
* This method is like a combined form of {@link #getIncrementedSegment} and {@link #getAndIncrementSegment} that
* deals in {@link SegmentReference} instead of directly with {@link ReferenceCountingSegment} in order to acquire
* reference count for both hydrant's segment and any tracked joinables taking part in the query.
*/
public Optional<Pair<SegmentReference, Closeable>> getSegmentForQuery(
Function<SegmentReference, SegmentReference> segmentMapFn
)
{
ReferenceCountingSegment sinkSegment = adapter.get();
SegmentReference segment = segmentMapFn.apply(sinkSegment);
while (true) {
Optional<Closeable> reference = segment.acquireReferences();
if (reference.isPresent()) {
return Optional.of(new Pair<>(segment, reference.get()));
}
// segment.acquireReferences() returned false, means it is closed. Since close() in swapSegment() happens after
// segment swap, the new segment should already be visible.
ReferenceCountingSegment newSinkSegment = adapter.get();
if (newSinkSegment == null) {
throw new ISE("FireHydrant was 'closed' by swapping segment to null while acquiring a segment");
}
if (sinkSegment == newSinkSegment) {
if (newSinkSegment.isClosed()) {
throw new ISE("segment.close() is called somewhere outside FireHydrant.swapSegment()");
}
// if segment is not closed, but is same segment it means we are having trouble getting references for joinables
// of a HashJoinSegment created by segmentMapFn
return Optional.empty();
}
segment = segmentMapFn.apply(newSinkSegment);
// Spin loop.
}
}
@VisibleForTesting
public ReferenceCountingSegment getHydrantSegment()
{
return adapter.get();
}
@Override
public String toString()
{
// Do not include IncrementalIndex in toString as AbstractIndex.toString() actually prints
// all the rows in the index
return "FireHydrant{" +
"queryable=" + adapter.get().getId() +
", count=" + count +
'}';
}
}