blob: f0c166cdbeb5e7201a1fa54a05637942934498ab [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.commons.compress.utils;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.NonWritableChannelException;
import java.nio.channels.SeekableByteChannel;
import java.nio.file.Files;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
/**
* Read-Only Implementation of {@link SeekableByteChannel} that
* concatenates a collection of other {@link SeekableByteChannel}s.
*
* <p>This is a lose port of <a
* href="https://github.com/frugalmechanic/fm-common/blob/master/jvm/src/main/scala/fm/common/MultiReadOnlySeekableByteChannel.scala">MultiReadOnlySeekableByteChannel</a>
* by Tim Underwood.</p>
*
* @since 1.19
*/
public class MultiReadOnlySeekableByteChannel implements SeekableByteChannel {
private final List<SeekableByteChannel> channels;
private long globalPosition;
private int currentChannelIdx;
/**
* Concatenates the given channels.
*
* @param channels the channels to concatenate
* @throws NullPointerException if channels is null
*/
public MultiReadOnlySeekableByteChannel(final List<SeekableByteChannel> channels) {
this.channels = Collections.unmodifiableList(new ArrayList<>(
Objects.requireNonNull(channels, "channels must not be null")));
}
@Override
public synchronized int read(final ByteBuffer dst) throws IOException {
if (!isOpen()) {
throw new ClosedChannelException();
}
if (!dst.hasRemaining()) {
return 0;
}
int totalBytesRead = 0;
while (dst.hasRemaining() && currentChannelIdx < channels.size()) {
final SeekableByteChannel currentChannel = channels.get(currentChannelIdx);
final int newBytesRead = currentChannel.read(dst);
if (newBytesRead == -1) {
// EOF for this channel -- advance to next channel idx
currentChannelIdx += 1;
continue;
}
if (currentChannel.position() >= currentChannel.size()) {
// we are at the end of the current channel
currentChannelIdx++;
}
totalBytesRead += newBytesRead;
}
if (totalBytesRead > 0) {
globalPosition += totalBytesRead;
return totalBytesRead;
}
return -1;
}
@Override
public void close() throws IOException {
IOException first = null;
for (final SeekableByteChannel ch : channels) {
try {
ch.close();
} catch (final IOException ex) {
if (first == null) {
first = ex;
}
}
}
if (first != null) {
throw new IOException("failed to close wrapped channel", first);
}
}
@Override
public boolean isOpen() {
for (final SeekableByteChannel ch : channels) {
if (!ch.isOpen()) {
return false;
}
}
return true;
}
/**
* Returns this channel's position.
*
* <p>This method violates the contract of {@link SeekableByteChannel#position()} as it will not throw any exception
* when invoked on a closed channel. Instead it will return the position the channel had when close has been
* called.</p>
*/
@Override
public long position() {
return globalPosition;
}
/**
* set the position based on the given channel number and relative offset
*
* @param channelNumber the channel number
* @param relativeOffset the relative offset in the corresponding channel
* @return global position of all channels as if they are a single channel
* @throws IOException if positioning fails
*/
public synchronized SeekableByteChannel position(final long channelNumber, final long relativeOffset) throws IOException {
if (!isOpen()) {
throw new ClosedChannelException();
}
long globalPosition = relativeOffset;
for (int i = 0; i < channelNumber; i++) {
globalPosition += channels.get(i).size();
}
return position(globalPosition);
}
@Override
public long size() throws IOException {
if (!isOpen()) {
throw new ClosedChannelException();
}
long acc = 0;
for (final SeekableByteChannel ch : channels) {
acc += ch.size();
}
return acc;
}
/**
* @throws NonWritableChannelException since this implementation is read-only.
*/
@Override
public SeekableByteChannel truncate(final long size) {
throw new NonWritableChannelException();
}
/**
* @throws NonWritableChannelException since this implementation is read-only.
*/
@Override
public int write(final ByteBuffer src) {
throw new NonWritableChannelException();
}
@Override
public synchronized SeekableByteChannel position(final long newPosition) throws IOException {
if (newPosition < 0) {
throw new IllegalArgumentException("Negative position: " + newPosition);
}
if (!isOpen()) {
throw new ClosedChannelException();
}
globalPosition = newPosition;
long pos = newPosition;
for (int i = 0; i < channels.size(); i++) {
final SeekableByteChannel currentChannel = channels.get(i);
final long size = currentChannel.size();
final long newChannelPos;
if (pos == -1L) {
// Position is already set for the correct channel,
// the rest of the channels get reset to 0
newChannelPos = 0;
} else if (pos <= size) {
// This channel is where we want to be
currentChannelIdx = i;
final long tmp = pos;
pos = -1L; // Mark pos as already being set
newChannelPos = tmp;
} else {
// newPosition is past this channel. Set channel
// position to the end and substract channel size from
// pos
pos -= size;
newChannelPos = size;
}
currentChannel.position(newChannelPos);
}
return this;
}
/**
* Concatenates the given channels.
*
* @param channels the channels to concatenate
* @throws NullPointerException if channels is null
* @return SeekableByteChannel that concatenates all provided channels
*/
public static SeekableByteChannel forSeekableByteChannels(final SeekableByteChannel... channels) {
if (Objects.requireNonNull(channels, "channels must not be null").length == 1) {
return channels[0];
}
return new MultiReadOnlySeekableByteChannel(Arrays.asList(channels));
}
/**
* Concatenates the given files.
*
* @param files the files to concatenate
* @throws NullPointerException if files is null
* @throws IOException if opening a channel for one of the files fails
* @return SeekableByteChannel that concatenates all provided files
*/
public static SeekableByteChannel forFiles(final File... files) throws IOException {
final List<SeekableByteChannel> channels = new ArrayList<>();
for (final File f : Objects.requireNonNull(files, "files must not be null")) {
channels.add(Files.newByteChannel(f.toPath(), StandardOpenOption.READ));
}
if (channels.size() == 1) {
return channels.get(0);
}
return new MultiReadOnlySeekableByteChannel(channels);
}
}