blob: 3b57d20e0976dd07ac08a1329e0c8e83e1ec3378 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.oss.sync;
import java.io.IOException;
import java.io.OutputStream;
import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
/**
* Provides convenience data structures to help ensure that locks are closed
* when and only when the path is no longer in use. The basic AutoLock is simply
* an AutoCloseable that will release the lock after a try-with-resources block.
* LockedRemoteIterator will release the lock when the stream has been exhausted
* or in the event of any exception. LockedFSDataOutputStream will release the
* lock when the stream gets closed.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public interface AutoLock extends AutoCloseable {
public void close() throws IOException;
/**
* A wrapper for a RemoteIterator that releases a lock only when the
* underlying iterator has been exhausted.
*/
public static class LockedRemoteIterator<E> implements RemoteIterator<E> {
public LockedRemoteIterator(RemoteIterator<E> iterator, AutoLock lock)
{
this.iterator = iterator;
this.lock = lock;
}
private RemoteIterator<E> iterator;
private AutoLock lock;
private AtomicBoolean closed = new AtomicBoolean(false);
public void close() throws IOException {
if (!closed.getAndSet(true)) {
lock.close();
}
}
private void checkClosed() throws IOException {
if (closed.get()) {
throw new IOException(
"LockedRemoteIterator was accessed after releasing lock");
}
}
@Override
public boolean hasNext() throws IOException {
checkClosed();
try {
if (iterator.hasNext()) {
return true;
}
close();
return false;
} catch (Throwable e) {
close();
throw e;
}
}
/**
* Delegates to the wrapped iterator, but will close the lock in the event
* of a NoSuchElementException. Some applications do not call hasNext() and
* simply depend on the NoSuchElementException.
*/
@Override
public E next() throws IOException {
checkClosed();
try {
return iterator.next();
} catch (Throwable e) {
close();
throw e;
}
}
}
/**
* A wrapper for a FSDataOutputStream that releases a lock only when the
* underlying output stream is closed.
*/
public class LockedFSDataOutputStream extends FSDataOutputStream {
public LockedFSDataOutputStream(FSDataOutputStream stream, AutoLock lock) throws IOException {
// super() throws IOException, but this constructor can't catch it.
// Instantiators must catch the exception and close the lock.
super(stream, null);
this.stream = stream;
this.lock = lock;
}
private final FSDataOutputStream stream;
private AutoLock lock;
private AtomicBoolean closed = new AtomicBoolean(false);
private void checkClosed() throws IOException {
if (closed.get()) {
throw new IOException(
"LockedFSDataOutputStream was accessed after releasing lock");
}
}
@Override
/**
* Returns the position in the wrapped stream. This should not be accessed
* after the stream has been closed. Unlike most other functions in this
* class, this is not enforced because this function shouldn't throw
* IOExceptions.
*/
public long getPos() {
return stream.getPos();
}
@Override
public void close() throws IOException {
// Contract tests attempt to close the stream twice
if (!closed.getAndSet(true)) {
try {
stream.close();
} finally {
lock.close();
}
}
}
@Override
public String toString() {
return "LockedFSDataOutputStream:" + stream.toString();
}
@Override
/**
* Returns the wrapped stream. This should not be accessed after the stream
* has been closed. Unlike most other functions in this class, this is not
* enforced because this function shouldn't throw IOExceptions.
*/
public OutputStream getWrappedStream() {
return stream.getWrappedStream();
}
@Override
public void hflush() throws IOException {
checkClosed();
stream.hflush();
}
@Override
public void hsync() throws IOException {
checkClosed();
stream.hsync();
}
@Override
public void setDropBehind(Boolean dropBehind) throws IOException {
checkClosed();
stream.setDropBehind(dropBehind);
}
}
}