blob: af0e1c7a37708eb879d39df25d94ceb4a193833e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import java.io.Closeable;
import java.util.Optional;
import java.util.concurrent.Phaser;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* ReferenceCountingCloseableObject implements something like automatic reference count-based resource management,
* backed by a {@link Phaser}.
*
* ReferenceCountingCloseableObject allows consumers to call {@link #close()} before some other "users", which called
* {@link #increment()} or {@link #incrementReferenceAndDecrementOnceCloseable()}, but have not called
* {@link #decrement()} yet or the closer for {@link #incrementReferenceAndDecrementOnceCloseable()}, and the wrapped
* object won't be actually closed until that all references are released.
*/
public abstract class ReferenceCountingCloseableObject<BaseObject extends Closeable> implements Closeable
{
private static final Logger log = new Logger(ReferenceCountingCloseableObject.class);
private final AtomicBoolean closed = new AtomicBoolean(false);
protected final Phaser referents = new Phaser(1)
{
@Override
protected boolean onAdvance(int phase, int registeredParties)
{
// Ensure that onAdvance() doesn't throw exception, otherwise termination won't happen
if (registeredParties != 0) {
log.error("registeredParties[%s] is not 0", registeredParties);
}
try {
baseObject.close();
}
catch (Exception e) {
try {
log.error(e, "Exception while closing reference counted object[%s]", baseObject);
}
catch (Exception e2) {
// ignore
}
}
// Always terminate.
return true;
}
};
protected final BaseObject baseObject;
public ReferenceCountingCloseableObject(BaseObject object)
{
this.baseObject = object;
}
public int getNumReferences()
{
return Math.max(referents.getRegisteredParties() - 1, 0);
}
public boolean isClosed()
{
return referents.isTerminated();
}
/**
* Increment the reference count by one.
*/
public boolean increment()
{
// Negative return from referents.register() means the Phaser is terminated.
return referents.register() >= 0;
}
/**
* Decrement the reference count by one.
*/
public void decrement()
{
referents.arriveAndDeregister();
}
/**
* Returns an {@link Optional} of a {@link Closeable} from {@link #decrementOnceCloseable}, if it is able to
* successfully {@link #increment}, else nothing indicating that the reference could not be acquired.
*/
public Optional<Closeable> incrementReferenceAndDecrementOnceCloseable()
{
final Closer closer;
if (increment()) {
closer = Closer.create();
closer.register(decrementOnceCloseable());
} else {
closer = null;
}
return Optional.ofNullable(closer);
}
/**
* Returns a {@link Closeable} which action is to call {@link #decrement()} only once. If close() is called on the
* returned Closeable object for the second time, it won't call {@link #decrement()} again.
*/
public Closeable decrementOnceCloseable()
{
AtomicBoolean decremented = new AtomicBoolean(false);
return () -> {
if (decremented.compareAndSet(false, true)) {
decrement();
} else {
log.warn("close() is called more than once on ReferenceCountingCloseableObject.decrementOnceCloseable()");
}
};
}
@Override
public void close()
{
if (closed.compareAndSet(false, true)) {
referents.arriveAndDeregister();
} else {
log.warn("close() is called more than once on ReferenceCountingCloseableObject");
}
}
}