Relay parse assertion failures to debug console.

As part of previously-closed #648.
diff --git a/debugger/src/main/scala/org.apache.daffodil.debugger.dap/Parse.scala b/debugger/src/main/scala/org.apache.daffodil.debugger.dap/Parse.scala
index 8ac32aa..8703f32 100644
--- a/debugger/src/main/scala/org.apache.daffodil.debugger.dap/Parse.scala
+++ b/debugger/src/main/scala/org.apache.daffodil.debugger.dap/Parse.scala
@@ -32,7 +32,6 @@
 import java.net.URI
 import java.nio.file._
 import org.apache.commons.io.FileUtils
-import org.apache.daffodil.api.Diagnostic
 import org.apache.daffodil.debugger.dap.{BuildInfo => DAPBuildInfo}
 import org.apache.daffodil.debugger.Debugger
 import org.apache.daffodil.exceptions.SchemaFileLocation
@@ -40,7 +39,7 @@
 import org.apache.daffodil.processors.dfa.DFADelimiter
 import org.apache.daffodil.processors.parsers._
 import org.apache.daffodil.processors._
-import org.apache.daffodil.sapi.ValidationMode
+import org.apache.daffodil.sapi.{Diagnostic, ValidationMode}
 import org.apache.daffodil.sapi.infoset.XMLTextInfosetOutputter
 import org.apache.daffodil.sapi.infoset.JsonInfosetOutputter
 import org.apache.daffodil.sapi.io.InputSourceDataInputStream
@@ -50,9 +49,6 @@
 import org.typelevel.log4cats.slf4j.Slf4jLogger
 import scala.collection.JavaConverters._
 import scala.util.Try
-import cats.effect.kernel.Resource.ExitCase.Errored
-import cats.effect.kernel.Resource.ExitCase.Canceled
-import cats.effect.kernel.Resource.ExitCase.Succeeded
 
 trait Parse {
 
@@ -67,6 +63,13 @@
 object Parse {
   implicit val logger: Logger[IO] = Slf4jLogger.getLogger
 
+  case class Exception(diagnostics: List[Diagnostic])
+      extends RuntimeException(
+        diagnostics
+          .map(d => d.toString)
+          .mkString("\n")
+      )
+
   def apply(
       schema: Path,
       data: InputStream,
@@ -99,25 +102,17 @@
             }
 
             val parse =
-              IO.interruptibleMany {
-                val parse_res = dp.parse(
+              IO.interruptibleMany(
+                dp.parse(
                   new InputSourceDataInputStream(data),
                   infosetOutputter
                 )
-
-                parse_res.isError match {
-                  case true =>
-                    throw new Error(
-                      parse_res.getDiagnostics
-                        .map(d => d.toString)
-                        .mkString("\n")
-                    )
-                  case _ => parse_res
-                }
                 // WARNING: parse doesn't close the OutputStream, so closed below
-              }.void
+              ).ensureOr(res => new Parse.Exception(res.getDiagnostics.toList))(res => !res.isError)
+                .guarantee(IO(os.close) *> done.set(true))
+                .void
 
-            stopper &> parse.guarantee(IO(os.close) *> done.set(true))
+            stopper &> parse
           }
 
       def close(): IO[Unit] =
@@ -691,9 +686,10 @@
             .run()
             .through(text.utf8.decode)
             .foldMonoid
-            .evalTap(_ => Logger[IO].debug("done collecting infoset XML output"))
-            .map(infosetXML => Some(Events.OutputEvent.createConsoleOutput(infosetXML)))
-            .enqueueUnterminated(dapEvents) // later handling will terminate dapEvents
+            .evalTap(xml =>
+              Logger[IO].debug("done collecting infoset XML output") *>
+                dapEvents.offer(Some(Events.OutputEvent.createConsoleOutput(xml)))
+            )
         case Debugee.LaunchArgs.InfosetOutput.File(path) =>
           parse
             .run()
@@ -744,20 +740,21 @@
         .concurrently(
           Stream(
             Stream.eval(startup),
-            parsing.onFinalizeCase(ec =>
-              Logger[IO].debug(s"parsing: ${ec match {
-                  case Errored(_) => "Errored"
-                  case Canceled   => "Canceled"
-                  case Succeeded  => "Succeeded"
-                }}")
-            ),
+            // ensure dapEvents is terminated when the parse is terminated
+            parsing
+              .onFinalizeCase {
+                case Resource.ExitCase.Errored(e: Parse.Exception) =>
+                  // TODO: when Parse.Exception has source coordinates, include it into a more structured OutputEvent
+                  dapEvents.offer(Some(Events.OutputEvent.createConsoleOutput(e.getMessage()))) *>
+                    dapEvents.offer(None)
+                case _ => dapEvents.offer(None)
+              }
+              .onFinalizeCase(ec => Logger[IO].debug(s"parsing: $ec")),
             deliverParseData.onFinalizeCase {
               case ec @ Resource.ExitCase.Errored(e) =>
                 Logger[IO].warn(e)(s"deliverParseData: $ec")
               case ec => Logger[IO].debug(s"deliverParseData: $ec")
-            } ++ Stream.eval(
-              dapEvents.offer(None) // ensure dapEvents is terminated when the parse is terminated
-            ),
+            },
             infosetChanges
           ).parJoinUnbounded
         )
@@ -989,7 +986,7 @@
         schemaLocation: SchemaFileLocation,
         pointsOfUncertainty: List[PointOfUncertainty],
         delimiterStack: List[Delimiter],
-        diagnostics: List[Diagnostic]
+        diagnostics: List[org.apache.daffodil.api.Diagnostic]
     ) extends Event {
       // PState is mutable, so we copy all the information we might need downstream.
       def this(pstate: PState, isStepping: Boolean) =
@@ -1244,7 +1241,7 @@
           events.offer(None) *> // no more events after this
           state.offer(None) *> // no more states == the parse is terminated
           infoset.offer(None) *>
-          Logger[IO].debug("infoset queue completed")
+          Logger[IO].debug("Debugger fini event: completed parse")
       }
 
     override def startElement(pstate: PState, processor: Parser): Unit =