[UIMA-4685] Adding back COMPRESSED_FILTERED_TSI, refactoring to allow staying with a single header in the binary stream, added unit test back


git-svn-id: https://svn.apache.org/repos/asf/uima/uimaj/trunk@1755934 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/uimaj-core/src/main/java/org/apache/uima/cas/SerialFormat.java b/uimaj-core/src/main/java/org/apache/uima/cas/SerialFormat.java
index d9c357b..87ac3c7 100644
--- a/uimaj-core/src/main/java/org/apache/uima/cas/SerialFormat.java
+++ b/uimaj-core/src/main/java/org/apache/uima/cas/SerialFormat.java
@@ -69,12 +69,11 @@
    */
   SERIALIZED_TSI("scas"),
 
-//  /**
-//   * Binary compressed CAS with embedded Java-serialized type system
-//   * with reachability and type and feature filtering (form 6)
-//   */
-//  COMPRESSED_FILTERED_TS("bcas");
-  ;
+  /**
+   * Binary compressed CAS with embedded Java-serialized type system
+   * with reachability and type and feature filtering (form 6)
+   */
+  COMPRESSED_FILTERED_TSI("bcas");
   
   private String defaultFileExtension;
 
diff --git a/uimaj-core/src/main/java/org/apache/uima/cas/impl/BinaryCasSerDes6.java b/uimaj-core/src/main/java/org/apache/uima/cas/impl/BinaryCasSerDes6.java
index ffefa53..6ef0564 100644
--- a/uimaj-core/src/main/java/org/apache/uima/cas/impl/BinaryCasSerDes6.java
+++ b/uimaj-core/src/main/java/org/apache/uima/cas/impl/BinaryCasSerDes6.java
@@ -58,6 +58,7 @@
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.ObjectOutputStream;
 import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -70,6 +71,7 @@
 
 import org.apache.uima.cas.AbstractCas;
 import org.apache.uima.cas.CASRuntimeException;
+import org.apache.uima.cas.admin.CASMgr;
 import org.apache.uima.cas.impl.CommonSerDes.Header;
 import org.apache.uima.cas.impl.FSsTobeAddedback.FSsTobeAddedbackSingle;
 import org.apache.uima.cas.impl.SlotKinds.SlotKind;
@@ -342,6 +344,8 @@
 
   final private TypeSystemImpl tgtTs;
 
+  private boolean isTsiIncluded;
+  
   private TypeInfo typeInfo; // type info for the current type being serialized/deserialized
                              // always the "src" typeInfo I think, except for compareCas use
   final private CasTypeSystemMapper typeMapper;
@@ -470,6 +474,7 @@
       AbstractCas aCas,
       MarkerImpl mark,
       TypeSystemImpl tgtTs,
+      boolean storeTSI,      
       ReuseInfo rfs,
       boolean doMeasurements,
       CompressLevel compressLevel, 
@@ -489,6 +494,7 @@
     isDelta = isSerializingDelta = (mark != null);
     typeMapperCmn = typeMapper = ts.getTypeSystemMapper(tgtTs);
     isTypeMappingCmn = isTypeMapping = (null != typeMapper);
+    isTsiIncluded = storeTSI;
     
     heap = cas.getHeap().heap;
     heapEnd = cas.getHeap().getCellsUsed();
@@ -523,7 +529,7 @@
    * @throws ResourceInitializationException never thrown 
    */
   public BinaryCasSerDes6(AbstractCas cas) throws ResourceInitializationException {
-    this(cas, null, null, null, false, CompressLevel.Default, CompressStrat.Default);
+    this(cas, null, null, false, null, false, CompressLevel.Default, CompressStrat.Default);
   }
   
   /**
@@ -533,7 +539,7 @@
    * @throws ResourceInitializationException if the target type system is incompatible with the source type system
    */
   public BinaryCasSerDes6(AbstractCas cas, TypeSystemImpl tgtTs) throws ResourceInitializationException {
-    this(cas, null, tgtTs, null, false, CompressLevel.Default, CompressStrat.Default);
+    this(cas, null, tgtTs, false, null, false, CompressLevel.Default, CompressStrat.Default);
   }
 
   /**
@@ -545,7 +551,7 @@
    * @throws ResourceInitializationException if the target type system is incompatible with the source type system
    */
   public BinaryCasSerDes6(AbstractCas cas, MarkerImpl mark, TypeSystemImpl tgtTs, ReuseInfo rfs) throws ResourceInitializationException {
-    this(cas, mark, tgtTs, rfs, false, CompressLevel.Default, CompressStrat.Default);
+    this(cas, mark, tgtTs, false, rfs, false, CompressLevel.Default, CompressStrat.Default);
   }
   
   /**
@@ -558,7 +564,7 @@
    * @throws ResourceInitializationException if the target type system is incompatible with the source type system
    */
   public BinaryCasSerDes6(AbstractCas cas, MarkerImpl mark, TypeSystemImpl tgtTs, ReuseInfo rfs, boolean doMeasurements) throws ResourceInitializationException {
-    this(cas, mark, tgtTs, rfs, doMeasurements, CompressLevel.Default, CompressStrat.Default);
+    this(cas, mark, tgtTs, false, rfs, doMeasurements, CompressLevel.Default, CompressStrat.Default);
   }
 
   /**
@@ -568,7 +574,18 @@
    * @throws ResourceInitializationException never thrown
    */
   public BinaryCasSerDes6(AbstractCas cas, ReuseInfo rfs) throws ResourceInitializationException {
-    this(cas, null, null, rfs, false, CompressLevel.Default, CompressStrat.Default);
+    this(cas, null, null, false, rfs, false, CompressLevel.Default, CompressStrat.Default);
+  }
+
+  /**
+   * Setup to serialize (not delta) or deserialize (maybe delta) using binary compression, no type mapping, optionally storing TSI, and only processing reachable Feature Structures
+   * @param cas -
+   * @param rfs -
+   * @param storeTSI -
+   * @throws ResourceInitializationException never thrown
+   */
+  public BinaryCasSerDes6(AbstractCas cas, ReuseInfo rfs, boolean storeTSI) throws ResourceInitializationException {
+    this(cas, null, null, storeTSI, rfs, false, CompressLevel.Default, CompressStrat.Default);
   }
 
   /*********************************************************************************************
@@ -589,6 +606,10 @@
       throw new UnsupportedOperationException("Can't do Delta Serialization with different target TS");
     }
 
+    if (isTsiIncluded && (tgtTs != null)) {
+      throw new UnsupportedOperationException("Can't store a different target TS in the serialized form");
+    }
+    
     if (fsStartIndexes == null) {
       if (isSerializingDelta) {
         throw new UnsupportedOperationException("Serializing a delta requires valid ReuseInfo for Cas being serialized," +
@@ -612,9 +633,15 @@
     CommonSerDes.createHeader()
     .form6()
     .delta(isSerializingDelta)
-    .seqVer(0)     
+    .seqVer(0)
+    .typeSystemIncluded(isTsiIncluded)
     .write(serializedOut);
-    
+ 
+    if (isTsiIncluded) {
+      ObjectOutputStream tsiOS = new ObjectOutputStream(serializedOut);
+      tsiOS.writeObject(Serialization.serializeCASMgr((CASMgr) cas));
+      tsiOS.flush();
+    }
  
     os = new OptimizeStrings(doMeasurements);
  
diff --git a/uimaj-core/src/main/java/org/apache/uima/cas/impl/CASImpl.java b/uimaj-core/src/main/java/org/apache/uima/cas/impl/CASImpl.java
index 69141ac..59550b3 100644
--- a/uimaj-core/src/main/java/org/apache/uima/cas/impl/CASImpl.java
+++ b/uimaj-core/src/main/java/org/apache/uima/cas/impl/CASImpl.java
@@ -1376,7 +1376,6 @@
    * @return -
    * @throws CASRuntimeException wraps IOException
    */
-
   public SerialFormat reinit(InputStream istream) throws CASRuntimeException {
     if (this != this.svd.baseCAS) {
       return this.svd.baseCAS.reinit(istream);
@@ -1384,11 +1383,42 @@
    
     final DataInputStream dis = CommonSerDes.maybeWrapToDataInputStream(istream);
 
+    try {
+      Header h = CommonSerDes.readHeader(dis);
+      return reinit(h, istream);
+    } catch (IOException e) {
+      String msg = e.getMessage();
+      if (msg == null) {
+        msg = e.toString();
+      }
+      CASRuntimeException exception = new CASRuntimeException(
+          CASRuntimeException.BLOB_DESERIALIZATION, new String[] { msg });
+      throw exception;
+    }
+  }
+  
+  /**
+   * --------------------------------------------------------------------- see
+   * Blob Format in CASSerializer
+   * 
+   * This reads in and deserializes CAS data from a stream. Byte swapping may be
+   * needed if the blob is from C++ -- C++ blob serialization writes data in
+   * native byte order.
+   * 
+   * @param istream -
+   * @return -
+   * @throws CASRuntimeException wraps IOException
+   */
+  public SerialFormat reinit(Header h, InputStream istream) throws CASRuntimeException {
+    if (this != this.svd.baseCAS) {
+      return this.svd.baseCAS.reinit(h, istream);
+    }
+   
+    final DataInputStream dis = CommonSerDes.maybeWrapToDataInputStream(istream);
+
     final BinDeserSupport bds = new BinDeserSupport();
     
     try {
-      Header h = CommonSerDes.readHeader(dis);
-
       final boolean delta = h.isDelta;
       
       if (!delta) {
@@ -1403,7 +1433,8 @@
       if (h.form6) { 
         try {
           (new BinaryCasSerDes6(this)).deserializeAfterVersion(dis, delta, AllowPreexistingFS.allow);
-          return SerialFormat.COMPRESSED_FILTERED;
+          return h.typeSystemIncluded ? SerialFormat.COMPRESSED_FILTERED_TSI
+                  : SerialFormat.COMPRESSED_FILTERED;
         } catch (ResourceInitializationException e) {
           throw new CASRuntimeException(CASRuntimeException.DESERIALIZING_COMPRESSED_BINARY_UNSUPPORTED, null, e);
         }
diff --git a/uimaj-core/src/main/java/org/apache/uima/cas/impl/CommonSerDes.java b/uimaj-core/src/main/java/org/apache/uima/cas/impl/CommonSerDes.java
index 468c87b..ce4fb71 100644
--- a/uimaj-core/src/main/java/org/apache/uima/cas/impl/CommonSerDes.java
+++ b/uimaj-core/src/main/java/org/apache/uima/cas/impl/CommonSerDes.java
@@ -85,7 +85,7 @@
     public Header delta(boolean v2) {isDelta = v2;  return this; }

     public Header form4() {isCompressed = form4 = true; form6 = false; return this; }

     public Header form6() {isCompressed = form6 = true; form4 = false; return this; }

-    public Header typeSystemIncluded() {typeSystemIncluded = true; return this; }

+    public Header typeSystemIncluded(boolean f) {typeSystemIncluded = f; return this; }

     public Header seqVer(int v2) { assert (v2 >= 0 && v2 < 256); seqVersionNbr = (byte)v2; return this; }

     public Header v3() {isV3 = true; return this; }

     

diff --git a/uimaj-core/src/main/java/org/apache/uima/cas/impl/Serialization.java b/uimaj-core/src/main/java/org/apache/uima/cas/impl/Serialization.java
index 1097581..1913408 100644
--- a/uimaj-core/src/main/java/org/apache/uima/cas/impl/Serialization.java
+++ b/uimaj-core/src/main/java/org/apache/uima/cas/impl/Serialization.java
@@ -228,6 +228,25 @@
    * @throws IOException if IO exception
    * @throws ResourceInitializationException if target type system is incompatible with this CAS's type system
    */  
+  public static ReuseInfo serializeWithCompression(CAS cas, Object out, boolean includeTSI) throws IOException, ResourceInitializationException {
+    BinaryCasSerDes6 bcs = new BinaryCasSerDes6(cas, null, includeTSI);
+    bcs.serialize(out);
+    return bcs.getReuseInfo();
+  }
+  
+  /**
+   * Serialize in compressed binary with type filtering
+   * This method can use type filtering to omit sending those types and/or features not present in the target type system.
+   *   - To omit type filtering, use null for the target type system
+   * It also only sends those feature structures which are reachable either from an index or references from other reachable feature structures.
+   * 
+   * @param cas the CAS to serialize
+   * @param out an OutputStream, a DataOutputStream, or a File
+   * @param tgtTypeSystem null or a target TypeSystem, which must be mergable with this CAS's type system
+   * @return information to be used on subsequent serializations (to save time) or deserializations (for receiving delta CASs), or reserializations (if sending delta CASs)
+   * @throws IOException if IO exception
+   * @throws ResourceInitializationException if target type system is incompatible with this CAS's type system
+   */  
   public static ReuseInfo serializeWithCompression(CAS cas, Object out, TypeSystem tgtTypeSystem) throws IOException, ResourceInitializationException {
     BinaryCasSerDes6 bcs = new BinaryCasSerDes6(cas, (TypeSystemImpl) tgtTypeSystem);
     bcs.serialize(out);
diff --git a/uimaj-core/src/main/java/org/apache/uima/util/CasIOUtils.java b/uimaj-core/src/main/java/org/apache/uima/util/CasIOUtils.java
index 4854cf3..dc57a79 100644
--- a/uimaj-core/src/main/java/org/apache/uima/util/CasIOUtils.java
+++ b/uimaj-core/src/main/java/org/apache/uima/util/CasIOUtils.java
@@ -42,6 +42,7 @@
 import org.apache.uima.cas.impl.CASMgrSerializer;
 import org.apache.uima.cas.impl.CASSerializer;
 import org.apache.uima.cas.impl.CommonSerDes;
+import org.apache.uima.cas.impl.CommonSerDes.Header;
 import org.apache.uima.cas.impl.Serialization;
 import org.apache.uima.cas.impl.XCASDeserializer;
 import org.apache.uima.cas.impl.XCASSerializer;
@@ -191,7 +192,7 @@
    *           - Problem loading from given InputStream
    */
   public static SerialFormat load(InputStream casInputStream, CAS aCAS) throws IOException {
-    return load(casInputStream, null, aCAS, false);
+    return load(casInputStream, (CASMgrSerializer) null, aCAS, false);
   }
 
   /**
@@ -204,9 +205,10 @@
    * pass null for the tsiInputStream.
    * 
    * @param casInputStream
-   *          The input stream containing the CAS. Caller should buffer this appropriately.
+   *          The input stream containing the CAS.
    * @param tsiInputStream
-   *          The optional input stream containing the type system. Caller should buffer this appropriately.
+   *          The optional input stream containing the type system. This is only used if the
+   *          casInputStream does not already come with an embedded CAS configuration.
    * @param aCAS
    *          The CAS that should be filled
    * @param leniently
@@ -220,17 +222,63 @@
    */
   public static SerialFormat load(InputStream casInputStream, InputStream tsiInputStream, CAS aCAS,
           boolean leniently) throws IOException {
+    CASMgrSerializer casMgrSerializer = null; 
+
+    // If there is a TSI specified, load it - we will see later if we actually use it.
+    if (tsiInputStream != null) {
+      if (!tsiInputStream.markSupported()) {
+        tsiInputStream = new BufferedInputStream(tsiInputStream);
+      }
+      
+      try {
+        ObjectInputStream is = new ObjectInputStream(tsiInputStream);
+        casMgrSerializer = (CASMgrSerializer) is.readObject();
+      } catch (ClassNotFoundException e) {
+        throw new IOException(e);
+      }    
+    }
+
+    return load(casInputStream, casMgrSerializer, aCAS, leniently);
+  }
+
+  /**
+   * Loads a CAS from a URL source. The format is determined from the content. For formats of type
+   * SERIALIZED_TSI, the type system and index definitions are read from the casUrl source; the
+   * value of tsiInputStream is ignored. For other formats, if the tsiUrl is not null, type system
+   * and index definitions are read from that source.
+   * 
+   * To specify lenient loading, without specifying an additional type system and index definition
+   * source, pass null for the tsiInputStream.
+   * 
+   * This method avoids the repeated loading of the typesystem and index definitions
+   * from a stream when loading many CASes in a row.
+   * 
+   * @param casInputStream
+   *          The input stream containing the CAS.
+   * @param casMgr
+   *          The optional CAS configuration including type system definition and index definition
+   *          in form of a {@link CASMgrSerializer}}. This is only used if the
+   *          casInputStream does not already come with an embedded CAS configuration.
+   * @param aCAS
+   *          The CAS that should be filled
+   * @param leniently
+   *          for XCAS and XMI formats, ignore feature structures and features of non-existing types
+   *          and/or features. ignored for other formats.
+   * @return the SerialFormat of the loaded CAS
+   * @throws IOException
+   *           - Problem loading from given InputStream
+   * @throws IllegalArgumentException
+   *           - when trying to load XCAS
+   */
+  public static SerialFormat load(InputStream casInputStream, CASMgrSerializer casMgr, CAS aCAS,
+          boolean leniently) throws IOException {
 
     if (!casInputStream.markSupported()) {
       casInputStream = new BufferedInputStream(casInputStream);
     }
     
-    if (tsiInputStream != null && !tsiInputStream.markSupported()) {
-      tsiInputStream = new BufferedInputStream(tsiInputStream);
-    }
-    
     CASImpl casImpl = (CASImpl) aCAS;
-    /** scan the first part of the file for known formats */
+    // scan the first part of the file for known formats
     casInputStream.mark(6);
     byte[] firstPartOfFile = new byte[6];
     int bytesReadCount = casInputStream.read(firstPartOfFile);
@@ -247,15 +295,31 @@
     
     DataInputStream deserIn = CommonSerDes.maybeWrapToDataInputStream(casInputStream);
     if (CommonSerDes.isBinaryHeader(deserIn)) {   
-      return casImpl.reinit(casInputStream);
+      Header h = CommonSerDes.readHeader(deserIn);
+      if (h.isTypeSystemIncluded()) { // Load TSI from CAS stream
+        try {
+          ObjectInputStream ois = new ObjectInputStream(deserIn);
+          CASMgrSerializer casMgrSerializer = (CASMgrSerializer) ois.readObject();
+          casImpl.setupCasFromCasMgrSerializer(casImpl, casMgrSerializer);  
+        } catch (ClassNotFoundException e) {
+          /**Unrecognized serialized CAS format*/
+          throw new CASRuntimeException(CASRuntimeException.UNRECOGNIZED_SERIALIZED_CAS_FORMAT);
+        }       
+      }
+      else if (casMgr != null) { // if TSI not in file, maybe set it from parameter
+        casImpl.setupCasFromCasMgrSerializer(casImpl, casMgr);  
+      }
+      return casImpl.reinit(h, casInputStream);
     } else {
       // is a Java Object serialization, with or without a type system
       ObjectInputStream ois = new ObjectInputStream(casInputStream);
       try {
         Object o = ois.readObject();
         if (o instanceof CASSerializer) {
-          casImpl = readCasManager(casImpl, tsiInputStream);  // maybe install type system and index def   
-          casImpl.reinit((CASSerializer) o);                  // deserialize from object
+          if (casMgr != null) { // maybe install type system and index def  
+            casImpl.setupCasFromCasMgrSerializer(casImpl, casMgr);  
+          }
+          casImpl.reinit((CASSerializer) o); // deserialize from object
           return SerialFormat.SERIALIZED;
         } else if (o instanceof CASCompleteSerializer) {
           // with a type system use that, ignore any supplied via tsiInputStream
@@ -331,7 +395,13 @@
           serializeWithCompression(aCas, docOS);
           break;
         case COMPRESSED_FILTERED: // Binary compressed CAS (form 6)
-          serializeWithCompression(aCas, docOS, aCas.getTypeSystem());
+          serializeWithCompression(aCas, docOS, false);
+          break;
+        case COMPRESSED_FILTERED_TSI:
+          // Binary compressed CAS (form 6)
+          // ... with embedded Java-serialized type system
+          serializeWithCompression(aCas, docOS, true);
+          typeSystemWritten = true; // Embedded type system
           break;
         default:
           throw new IllegalArgumentException("Unknown format [" + format.name()
@@ -343,37 +413,13 @@
       throw new IOException(e);
     }
 
+    // Write type system to the separate stream only if it has not alreay been embedded into the
+    // main stream
     if (tsiOS != null && !typeSystemWritten) {
       writeTypeSystem(aCas, tsiOS);
     }
   }
 
-  /**
-   * Takes a serialized version of the type system and index definitions as represented by
-   * the Java object serialization of the class CASMgrSerializer, and reads it, and uses it 
-   * to reset the provided CAS to this new definition.
-   * 
-   * @param cas the CAS to reset
-   * @param aIs the stream having the serialized CASMgrSerializer
-   * @return the initial view of the new cas with the type and index definitions installed and set up.
-   * @throws IOException
-   */
-  private static CASImpl readCasManager(CAS cas, InputStream aIs) throws IOException {
-    if (null == aIs) {
-      return (CASImpl) cas;
-    }
-    CASMgrSerializer casMgrSerializer;
-    CASImpl casImpl = (CASImpl) cas;
-
-    try {
-      ObjectInputStream is = new ObjectInputStream(aIs);
-      casMgrSerializer = (CASMgrSerializer) is.readObject();
-      return casImpl.setupCasFromCasMgrSerializer(casImpl, casMgrSerializer);
-    } catch (ClassNotFoundException e) {
-      throw new IOException(e);
-    }    
-  }
-
   private static void writeJavaObject(Object o, OutputStream aOS) throws IOException {
     ObjectOutputStream tsiOS = new ObjectOutputStream(aOS);
     tsiOS.writeObject(o);
diff --git a/uimaj-core/src/test/java/org/apache/uima/util/CasIOUtilsTest.java b/uimaj-core/src/test/java/org/apache/uima/util/CasIOUtilsTest.java
index 4eb1fbe..f072349 100644
--- a/uimaj-core/src/test/java/org/apache/uima/util/CasIOUtilsTest.java
+++ b/uimaj-core/src/test/java/org/apache/uima/util/CasIOUtilsTest.java
@@ -23,7 +23,6 @@
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
-import java.io.IOException;
 import java.io.ObjectOutput;
 import java.io.ObjectOutputStream;
 
@@ -100,6 +99,11 @@
     testFormat(SerialFormat.SERIALIZED_TSI, "binsp");
   }
   
+  
+  public void testS6p() throws Exception {
+    testFormat(SerialFormat.COMPRESSED_FILTERED_TSI, "bins6p");
+  }
+  
   public void testS0() throws Exception {
     testFormat(SerialFormat.BINARY, "bins0");
   }