Allow garbage collection of UStateForSuspension's and DirectOrBufferedDataOutputStream's
The UStateForSuspensions and DirectOrBufferedDataOutputStream classes
have members that effectively create linked lists. In each of these
cases, we unknowingly hold onto the head of these linked lists, which
prevents garbage collection of all UStateForSuspensions and
DirectOrBufferedDataOutputStream instances. This means we essentially
hold on to all unparse state, which quickly leads to out of memory
errors for large format that require many suspensions.
- The first issue is the "prior" member of UStateMain/UStateForSuspensions.
This member is set so that each UState points to the previous
UStateForSuspension that has been created, essentially creating a
linked list of all UStateForSuspensions, with the head in UStateMain.
This prevents all UStateForSuspensions from being garbage collected,
as well all the state they point to (it's a lot).
Fortunately, this member isn't used anywhere anymore. Presumably it
was once used for debugging suspensions, but is no longer used or
needed. So we can simply remove this member so these
UStateForSuspensions can be garbage collected once the Suspensions
that use them are finished and garbage collected.
- The second issue is related to the "following" member in
DirectOrBufferedDataOutputStream's. This member is used too keep track
of the buffered DOS that follows this DOS (and iteratively, all
following DOS's). As the Direct DOS is finished, we make the following
DOS direct update pointers correctly. However, we create the very
first direct DOS in the "unparse" function, which means it lives on
the stack and cannot be garbage collected until unparse finished. And
because this DOS iteratively points to all following DOS's via the
"following" member, it means we can never free any DOS's (and all the
buffered data associated with those DOS's) until the end of unparse.
The solution in this case is to not create the initial direct DOS in
the unparse function on the stack, but instead to create it as part of
the UState creation when we initialize the "dataOutputStream" var.
This way there is no pointer to the initial DOS except for those held
in UState or Suspensions. As the UState mutates or Suspensions
resolve, we will complete lose a reference to earlier DOS's and they
can be garbage collected.
Fixing these two issues allows unparsing very large infosets that
require buffering, without running into out of memory errors.
DAFFODIL-2468
diff --git a/daffodil-runtime1/src/main/scala/org/apache/daffodil/processors/DataProcessor.scala b/daffodil-runtime1/src/main/scala/org/apache/daffodil/processors/DataProcessor.scala
index 3f8c9f9..50c0692 100644
--- a/daffodil-runtime1/src/main/scala/org/apache/daffodil/processors/DataProcessor.scala
+++ b/daffodil-runtime1/src/main/scala/org/apache/daffodil/processors/DataProcessor.scala
@@ -56,7 +56,6 @@
import org.apache.daffodil.infoset.TeeInfosetOutputter
import org.apache.daffodil.infoset.XMLTextInfosetOutputter
import org.apache.daffodil.io.BitOrderChangeException
-import org.apache.daffodil.io.DirectOrBufferedDataOutputStream
import org.apache.daffodil.io.FileIOException
import org.apache.daffodil.io.InputSourceDataInputStream
import org.apache.daffodil.oolag.ErrorAlreadyHandled
@@ -553,18 +552,10 @@
}
def unparse(inputter: InfosetInputter, outStream: java.io.OutputStream) = {
- val out = DirectOrBufferedDataOutputStream(
- outStream,
- null, // null means no other stream created this one.
- isLayer = false,
- tunables.outputStreamChunkSizeInBytes,
- tunables.maxByteArrayOutputStreamBufferSizeInBytes,
- tunables.tempFilePath)
-
inputter.initialize(ssrd.elementRuntimeData, getTunables())
val unparserState =
UState.createInitialUState(
- out,
+ outStream,
this,
inputter,
areDebugging)
@@ -575,7 +566,7 @@
unparserState.notifyDebugging(true)
}
unparserState.dataProc.get.init(unparserState, ssrd.unparser)
- out.setPriorBitOrder(ssrd.elementRuntimeData.defaultBitOrder)
+ unparserState.dataOutputStream.setPriorBitOrder(ssrd.elementRuntimeData.defaultBitOrder)
doUnparse(unparserState)
unparserState.evalSuspensions(isFinal = true)
unparserState.unparseResult
diff --git a/daffodil-runtime1/src/main/scala/org/apache/daffodil/processors/unparsers/UState.scala b/daffodil-runtime1/src/main/scala/org/apache/daffodil/processors/unparsers/UState.scala
index 617f484..9604803 100644
--- a/daffodil-runtime1/src/main/scala/org/apache/daffodil/processors/unparsers/UState.scala
+++ b/daffodil-runtime1/src/main/scala/org/apache/daffodil/processors/unparsers/UState.scala
@@ -70,7 +70,6 @@
object ENoWarn { EqualitySuppressUnusedImportWarning() }
abstract class UState(
- dos: DirectOrBufferedDataOutputStream,
vbox: VariableBox,
diagnosticsArg: List[Diagnostic],
dataProcArg: Maybe[DataProcessor],
@@ -102,9 +101,8 @@
"UState(" + elt + " DOS=" + dataOutputStream.toString() + ")"
}
- var dataOutputStream: DirectOrBufferedDataOutputStream = dos
+ var dataOutputStream: DirectOrBufferedDataOutputStream
- def prior: UStateForSuspension
def currentInfosetNode: DINode
def currentInfosetNodeMaybe: Maybe[DINode]
def escapeSchemeEVCache: MStackOfMaybe[EscapeSchemeUnparserHelper]
@@ -366,16 +364,15 @@
*/
final class UStateForSuspension(
val mainUState: UStateMain,
- dos: DirectOrBufferedDataOutputStream,
+ override var dataOutputStream: DirectOrBufferedDataOutputStream,
vbox: VariableBox,
override val currentInfosetNode: DINode,
occursIndex: Long,
escapeSchemeEVCacheMaybe: Maybe[MStackOfMaybe[EscapeSchemeUnparserHelper]],
delimiterStackMaybe: Maybe[MStackOf[DelimiterStackUnparseNode]],
- override val prior: UStateForSuspension,
tunable: DaffodilTunables,
areDebugging: Boolean)
- extends UState(dos, vbox, mainUState.diagnostics, mainUState.dataProc, tunable, areDebugging) {
+ extends UState(vbox, mainUState.diagnostics, mainUState.dataProc, tunable, areDebugging) {
dState.setMode(UnparserBlocking)
dState.setCurrentNode(thisElement.asInstanceOf[DINode])
@@ -445,29 +442,37 @@
final class UStateMain private (
private val inputter: InfosetInputter,
+ outStream: java.io.OutputStream,
vbox: VariableBox,
diagnosticsArg: List[Diagnostic],
dataProcArg: DataProcessor,
- dos: DirectOrBufferedDataOutputStream,
tunable: DaffodilTunables,
areDebugging: Boolean)
- extends UState(dos, vbox, diagnosticsArg, One(dataProcArg), tunable, areDebugging) {
+ extends UState(vbox, diagnosticsArg, One(dataProcArg), tunable, areDebugging) {
dState.setMode(UnparserBlocking)
def this(
inputter: InfosetInputter,
+ outputStream: java.io.OutputStream,
vmap: VariableMap,
diagnosticsArg: List[Diagnostic],
dataProcArg: DataProcessor,
- dataOutputStream: DirectOrBufferedDataOutputStream,
tunable: DaffodilTunables,
areDebugging: Boolean) =
- this(inputter, new VariableBox(vmap), diagnosticsArg, dataProcArg,
- dataOutputStream, tunable, areDebugging)
+ this(inputter, outputStream, new VariableBox(vmap), diagnosticsArg, dataProcArg,
+ tunable, areDebugging)
- private var _prior: UStateForSuspension = null
- override def prior = _prior
+ override var dataOutputStream: DirectOrBufferedDataOutputStream = {
+ val out = DirectOrBufferedDataOutputStream(
+ outStream,
+ null, // null means no other stream created this one.
+ isLayer = false,
+ tunable.outputStreamChunkSizeInBytes,
+ tunable.maxByteArrayOutputStreamBufferSizeInBytes,
+ tunable.tempFilePath)
+ out
+ }
def cloneForSuspension(suspendedDOS: DirectOrBufferedDataOutputStream): UState = {
val es =
@@ -502,13 +507,11 @@
arrayIndexStack.top, // only need the top of the stack, not the whole thing
es,
ds,
- prior,
tunable,
areDebugging)
clone.setProcessor(processor)
- this._prior = clone
clone
}
@@ -654,7 +657,7 @@
object UState {
def createInitialUState(
- out: DirectOrBufferedDataOutputStream,
+ outStream: java.io.OutputStream,
dataProc: DFDL.DataProcessor,
inputter: InfosetInputter,
areDebugging: Boolean): UStateMain = {
@@ -669,10 +672,10 @@
val diagnostics = Nil
val newState = new UStateMain(
inputter,
+ outStream,
variables,
diagnostics,
dataProc.asInstanceOf[DataProcessor],
- out,
dataProc.getTunables(),
areDebugging)
newState