| package edu.illinois.ncsa.daffodil.processors |
| |
| /* Copyright (c) 2012-2013 Tresys Technology, LLC. All rights reserved. |
| * |
| * Developed by: Tresys Technology, LLC |
| * http://www.tresys.com |
| * |
| * Permission is hereby granted, free of charge, to any person obtaining a copy of |
| * this software and associated documentation files (the "Software"), to deal with |
| * the Software without restriction, including without limitation the rights to |
| * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies |
| * of the Software, and to permit persons to whom the Software is furnished to do |
| * so, subject to the following conditions: |
| * |
| * 1. Redistributions of source code must retain the above copyright notice, |
| * this list of conditions and the following disclaimers. |
| * |
| * 2. Redistributions in binary form must reproduce the above copyright |
| * notice, this list of conditions and the following disclaimers in the |
| * documentation and/or other materials provided with the distribution. |
| * |
| * 3. Neither the names of Tresys Technology, nor the names of its contributors |
| * may be used to endorse or promote products derived from this Software |
| * without specific prior written permission. |
| * |
| * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
| * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, |
| * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE |
| * CONTRIBUTORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER |
| * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
| * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS WITH THE |
| * SOFTWARE. |
| */ |
| |
| import java.io.InputStream |
| import java.nio.channels.ReadableByteChannel |
| import scala.collection.immutable.PagedSeq |
| import scala.collection.mutable.HashMap |
| import scala.util.parsing.input.OffsetPosition |
| import java.nio.charset.CodingErrorAction |
| import java.nio.ByteBuffer |
| import java.io.InputStreamReader |
| import sun.nio.cs.StreamDecoder |
| import java.nio.charset.Charset |
| import java.io.UnsupportedEncodingException |
| import java.nio.charset.IllegalCharsetNameException |
| import java.nio.charset.CharsetDecoder |
| import java.io.FileInputStream |
| import java.nio.channels.FileChannel |
| import java.io.IOException |
| import java.nio.CharBuffer |
| import scala.util.control.Breaks._ |
| import java.nio.charset.CoderResult |
| import sun.nio.cs.HistoricallyNamedCharset |
| import java.nio.channels.Channels |
| import edu.illinois.ncsa.daffodil.exceptions.Assert |
| import scala.util.parsing.input.Reader |
| import scala.util.parsing.input.CharSequenceReader |
| import edu.illinois.ncsa.daffodil.util._ |
| |
| // |
| // Convention: name index fields like bytePos or bitPos or charPos with suffixes to indicate |
| // zero based or 1 based. Suffixes are ...0b and ...1b respectively. |
| // |
| |
| /** |
| * Pure functional Reader[Byte] that gets its data from a DFDL.Input (aka a ReadableByteChannel) |
| * |
| * All reading of data ultimately comes to this layer which retrieves data on demand. |
| * |
| * This layer doesn't know anything about bits and bit positions like all the higher layers do. |
| */ |
| class DFDLByteReader private (psb: PagedSeq[Byte], val bytePos0b: Int = 0) |
| extends scala.util.parsing.input.Reader[Byte] with Logging { |
| |
| def this(in: ReadableByteChannel) = this(PagedSeq.fromIterator(new IterableReadableByteChannel(in)), 0) |
| |
| /** |
| * Note: calling this will force the entire input into memory. |
| */ |
| def lengthInBytes: Long = psb.length |
| |
| lazy val first: Byte = psb(bytePos0b) |
| |
| lazy val rest: DFDLByteReader = new DFDLByteReader(psb, bytePos0b + 1) |
| |
| // needed because of contract from Reader superclass. |
| lazy val pos: scala.util.parsing.input.Position = new DFDLBytePosition(bytePos0b) |
| |
| lazy val atEnd: Boolean = !psb.isDefinedAt(bytePos0b) |
| |
| def atPos(bytePosition0b: Int): DFDLByteReader = { |
| // note: do NOT slice. That copies the psb. |
| //new DFDLByteReader(psb.slice(bytePosition), 0) |
| if (bytePosition0b == bytePos0b) this // already at this position. |
| else new DFDLByteReader(psb, bytePosition0b) |
| } |
| |
| def getByte(bytePosition0b: Int): Byte = { |
| val res = psb(bytePosition0b) |
| res |
| } |
| |
| def getByteArray(bytePosition0b: Int, numBytes: Int): Array[Byte] = { |
| val arr = new Array[Byte](numBytes) |
| for (i <- 0 to (numBytes - 1)) { |
| arr(i) = getByte(bytePosition0b + i) |
| } |
| arr |
| } |
| |
| // Removed: These are a really bad idea. They will make a giant |
| // copy of the entire input. Maybe ok for unit testing, but just |
| // having them here is asking for trouble. |
| // |
| // lazy val byteArray: Array[Byte] = psb.toArray[Byte] |
| // lazy val bb: ByteBuffer = ByteBuffer.wrap(byteArray) |
| |
| /** |
| * Factory for a Reader[Char] that constructs characters by decoding them from this |
| * Reader[Byte] for a specific encoding starting at a particular bit position. |
| * |
| * Yes, I said "bit" position. Some characters are not a full byte wide (7-bit, 6-bit, and even 5-bit |
| * encodings exist) |
| * |
| * These are kept in the processor state for reuse. |
| */ |
| def newCharReader(charset: Charset, bitPos: Long, bitLimit: Long): DFDLCharReader = { |
| log(LogLevel.Debug, "DFDLByteReader.newCharReader for bytePos %s.", (bitPos >> 3)) |
| DFDLCharReader(psb, bitPos, bitLimit, charset) |
| } |
| |
| } |
| |
| object DFDLCharReader { |
| |
| // TODO: make a specialized DFDLSingleByteCharReader for known single-byte character sets. |
| // This can bypass the PagedSeq[Char] entirely. |
| def apply(thePsb: PagedSeq[Byte], bitPosition: Long, bitLimit: Long, charset: Charset): DFDLCharReader = { |
| |
| Assert.usage(bitPosition <= Int.MaxValue, "bit positions are limited to 32-bit signed integer by underlying libraries.") |
| val bitPos = bitPosition.toInt |
| |
| val bitOffset = bitPos & 0x7 |
| val bytePos = bitPos >> 3 |
| |
| val is = { |
| // |
| // Removed slice call: psb.slice makes a copy of the psb. |
| // now passes the psb and start/end to the IteratorInputStream |
| // which manages delivery of bytes one by one so as to not |
| // do any copying that isn't necessary. |
| // |
| val endBytePos = |
| if (bitLimit == -1) -1 |
| // Here we want to limit the PagedSeq[Byte] via bitLimit |
| // because we need to determine the ending byte position from |
| // the bit limit we must divide by 8.0 (must divide by double) |
| // in order to round to the appropriate byte position |
| else scala.math.ceil(bitLimit / 8.0).toInt |
| new IteratorInputStream(thePsb, bytePos, endBytePos) |
| } |
| |
| // TODO: Why is bitLimit not working for DFDLJavaIOInputStreamReader? |
| // it appears to not be implemented, why is it there at all? |
| val r = DFDLJavaIOInputStreamReader(is, charset, bitOffset, bitLimit) |
| // TRW - The following line was changed because the fromSource |
| // method was causing the readLine method of the BufferedReader class to be |
| // called. This resulted in the loss of \n, \r and \r\n characters from the data. |
| //val psc = PagedSeq.fromSource(scala.io.Source.fromInputStream(is)(codec)) |
| val psc = PagedSeq.fromReader(r) |
| val charOffset = 0 |
| // val rdr = new DFDLPagedSeqCharReader(charset, bitOffset, bitLimit, psc, charOffset, thePsb) |
| val rdr = new DFDLPagedSeqCharReader(charset, bitPos, bitLimit, psc, charOffset, thePsb) |
| rdr |
| } |
| |
| } |
| |
| /** |
| * Reader[Char] constructed from a specific point within a PagedSeq[Byte], for |
| * a particular character set encoding. Ends if there is any error trying to decode a |
| * character. |
| * |
| * This trait allows for multiple different implementations for performance |
| * reasons. |
| * |
| * Some implementations deal with the general issue of variable-width |
| * character encodings. |
| * |
| * Others are specialized for 1-to-1 single-byte character encodings |
| * like US-ASCII or ISO-8859-1, where the mapping to unicode characters |
| * is either trivial, or requires just a small lookup table. |
| */ |
| |
| trait DFDLCharReader |
| extends Reader[Char] { |
| def first: Char |
| def rest: DFDLCharReader |
| def atEnd: Boolean |
| def atCharPos(cp0b: Int): DFDLCharReader |
| def atBitPos(bp0b: Long): DFDLCharReader |
| def getCharsetName: String |
| def characterPos: Int |
| def charset: Charset |
| def bitLimit: Long |
| } |
| |
| /** |
| * This is for unit tests that want to feed data from a string |
| */ |
| class DFDLUTStringReader private (rdr: Reader[Char]) |
| extends DFDLCharReader { |
| override def source = rdr.source |
| override def offset = rdr.offset |
| def this(data: String) = this(new CharSequenceReader(data)) |
| def first = rdr.first |
| def rest = new DFDLUTStringReader(rdr.rest) |
| def atEnd = rdr.atEnd |
| def pos = rdr.pos |
| def atCharPos(cp0b: Int) = Assert.usageError("not to be used in test reader") |
| def atBitPos(bp0b: Long) = Assert.usageError("not to be used in test reader") |
| def getCharsetName = Assert.usageError("not to be used in test reader") |
| def characterPos = Assert.usageError("not to be used in test reader") |
| def charset = Assert.usageError("not to be used in test reader") |
| def bitLimit = -1 |
| } |
| |
| // TODO: make this global singleton go away! |
| // This state should be maintained in the DataProcessor object I think. |
| object DFDLCharCounter { |
| var count: Long = 0 |
| def incr(n: Long) { |
| count += n |
| } |
| def getAndResetCount = { |
| val c = count |
| count = 0 |
| c |
| } |
| } |
| /** |
| * This is for arbitrary character sets. Uses a PagedSeq[Char] as underlying cache. |
| */ |
| class DFDLPagedSeqCharReader(charsetArg: Charset, |
| val startingBitPos: Int, |
| bitLimitArg: Long, |
| psc: PagedSeq[Char], |
| override val offset: Int, |
| psb: PagedSeq[Byte]) |
| extends DFDLCharReader with Logging { |
| |
| Assert.usage(offset >= 0) |
| Assert.usage(startingBitPos >= 0) |
| |
| val charset = charsetArg |
| val bitLimit = bitLimitArg |
| |
| override lazy val source: CharSequence = psc |
| |
| def first: Char = { |
| val char = psc(offset) |
| char |
| } |
| |
| def rest: DFDLCharReader = |
| if (psc.isDefinedAt(offset)) new DFDLPagedSeqCharReader(charset, startingBitPos, bitLimit, psc, offset + 1, psb) |
| else this |
| |
| def atEnd: Boolean = !psc.isDefinedAt(offset) |
| |
| def pos: scala.util.parsing.input.Position = new OffsetPosition(source, offset) //new DFDLCharPosition(offset) |
| |
| override def drop(n: Int): DFDLCharReader = new DFDLPagedSeqCharReader(charset, startingBitPos, bitLimit, psc, offset + n, psb) |
| |
| def atCharPos(characterPos: Int): DFDLCharReader = { |
| if (characterPos == this.characterPos) this |
| else new DFDLPagedSeqCharReader(charset, startingBitPos, bitLimit, psc, characterPos, psb) |
| } |
| |
| // We really want to be able to ask for a CharReader starting at said bitPos |
| def atBitPos(bitPos: Long): DFDLCharReader = { |
| log(LogLevel.Debug, "creating new DFDLCharReader.atBytePos(%s)", (bitPos >> 3)) |
| new DFDLPagedSeqCharReader(charset, startingBitPos = bitPos.toInt, bitLimit, psc, characterPos, psb) |
| } |
| |
| def getCharsetName: String = charset.name() |
| |
| def characterPos: Int = offset |
| |
| // def isDefinedAt(charPos : Int) : Boolean = psc.isDefinedAt(charPos) |
| |
| def print: String = { |
| "DFDLCharReader - " + source.length() + ": " + source + "\nDFDLCharReader - " + characterPos + ": " + source.subSequence(characterPos, source.length()) |
| } |
| |
| override def toString = { |
| "DFDLCharReader starting at bitPos " + startingBitPos + " charPos " + characterPos + " bitLimit " + bitLimit |
| } |
| |
| } |
| |
| // Scala Reader stuff is not consistent about whether it is generic over the element type, |
| // or specific to Char. We want to have a Reader like abstraction that is over bytes, but |
| // be able to create real Reader[Char] from it at any byte position. |
| |
| object IterableReadableByteChannel { |
| var byteCount: Long = 0 |
| def getAndResetCalls = { |
| val res = byteCount |
| byteCount = 0 |
| res |
| } |
| } |
| |
| /** |
| * All this excess buffering layer for lack of a way to convert a ReadableByteChannel directly into |
| * a PagedSeq. We need an Iterator[Byte] first to construct a PagedSeq[Byte]. |
| */ |
| class IterableReadableByteChannel(rbc: ReadableByteChannel) |
| extends scala.collection.Iterator[Byte] { |
| |
| private final val bufferSize = 10000 |
| private var currentBuf: java.nio.ByteBuffer = _ |
| private var sz: Int = _ |
| |
| private def advanceToNextBuf() { |
| currentBuf = java.nio.ByteBuffer.allocate(bufferSize) |
| sz = rbc.read(currentBuf) |
| currentBuf.flip() |
| } |
| |
| advanceToNextBuf() |
| |
| def hasNext(): Boolean = { |
| if (sz == -1) return false |
| if (currentBuf.hasRemaining()) return true |
| advanceToNextBuf() |
| if (sz == -1) return false |
| if (currentBuf.hasRemaining()) return true |
| return false |
| } |
| |
| var pos: Int = 0 |
| |
| def next(): Byte = { |
| if (!hasNext()) throw new IndexOutOfBoundsException(pos.toString) |
| pos += 1 |
| IterableReadableByteChannel.byteCount += 1 |
| currentBuf.get() |
| } |
| } |
| |
| /** |
| * Scala's Position is document oriented in that it is 1-based indexing and assumes |
| * line numbers and column numbers. |
| * |
| */ |
| class DFDLBytePosition(i: Int) extends scala.util.parsing.input.Position { |
| def line = 1 |
| def column = i + 1 |
| // IDEA: could we assume a 'line' of bytes is 32 bytes because those print out nicely as |
| // as in HHHHHHHH HHHHHHHH ... etc. on a 72 character line? |
| // Could come in handy perhaps. |
| val lineContents = "" // unused. Maybe this should throw. NoSuchOperation, or something. |
| } |
| |
| /** |
| * Position in a character stream. |
| * |
| * We ignore line/column structure. It's all one "line" as far as we are concerned. |
| */ |
| class DFDLCharPosition(i: Int) extends scala.util.parsing.input.Position { |
| def line = 1 |
| def column = i + 1 |
| val lineContents = "" // unused |
| } |
| |
| /** |
| * Whole additional layer of byte-by-byte because there's no way to create |
| * a Source (of Char) from a Seq[Byte]. Instead we have to take our |
| * PagedSeq[Byte] to an Iterator, create an InputStream from the Iterator, |
| * and create a Source (of Char) from that. |
| * |
| * Convert an iterator of bytes into an InputStream |
| */ |
| |
| object IteratorInputStream { |
| var calls: Long = 0 // instrumentation for performance analysis. |
| def getAndResetCalls: Long = { |
| val c = calls |
| calls = 0 |
| c |
| } |
| } |
| |
| class IteratorInputStream(psb: PagedSeq[Byte], startBytePos0b: Int, endBytePos0b: Int) |
| extends InputStream with Logging { |
| |
| log(LogLevel.Debug, "Creating an IteratorInputStream. This should happen only once per DataProcessor.parse call") |
| var currentBytePos0b: Int = startBytePos0b |
| |
| def read(): Int = { |
| if (currentBytePos0b == endBytePos0b |
| || !psb.isDefinedAt(currentBytePos0b)) -1 |
| else { |
| IteratorInputStream.calls += 1 |
| val res = psb(currentBytePos0b) |
| currentBytePos0b += 1 |
| res |
| } |
| } |
| |
| } |