blob: b43fa55b7a493766708c1b545b238b6cddf3cdf2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.crunch.scrunch.spark
import java.io.InputStream
import java.nio.ByteBuffer
/**
* Reads data from a ByteBuffer, and optionally cleans it up using BlockManager.dispose()
* at the end of the stream (e.g. to close a memory-mapped file).
*/
private[spark]
class ByteBufferInputStream(private var buffer: ByteBuffer)
extends InputStream {
override def read(): Int = {
if (buffer == null || buffer.remaining() == 0) {
cleanUp()
-1
} else {
buffer.get() & 0xFF
}
}
override def read(dest: Array[Byte]): Int = {
read(dest, 0, dest.length)
}
override def read(dest: Array[Byte], offset: Int, length: Int): Int = {
if (buffer == null || buffer.remaining() == 0) {
cleanUp()
-1
} else {
val amountToGet = math.min(buffer.remaining(), length)
buffer.get(dest, offset, amountToGet)
amountToGet
}
}
override def skip(bytes: Long): Long = {
if (buffer != null) {
val amountToSkip = math.min(bytes, buffer.remaining).toInt
buffer.position(buffer.position() + amountToSkip)
if (buffer.remaining() == 0) {
cleanUp()
}
amountToSkip
} else {
0L
}
}
/**
* Clean up the buffer, and potentially dispose of it using BlockManager.dispose().
*/
private def cleanUp() {
if (buffer != null) {
buffer = null
}
}
}