[CARBONDATA-3460] Fixed EOFException in CarbonScanRDD

Problem: Delete delta information was not written properly in the OutputStream due the flag based writing.

Solution: Always write the delete delta info, the size of the array will be the deciding factor whether to read further or not.

This closes #3316
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
index d97148d..a85423b 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
@@ -177,7 +177,6 @@
       DataOutputStream dos = new DataOutputStream(ebos);
       inputSplit.setFilePath(null);
       inputSplit.setBucketId(null);
-      inputSplit.setWriteDeleteDelta(false);
       if (inputSplit.isBlockCache()) {
         inputSplit.updateFooteroffset();
         inputSplit.updateBlockLength();
diff --git a/core/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java b/core/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
index da1bc2c..edbfcfe 100644
--- a/core/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
+++ b/core/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.carbondata.hadoop;
 
 import java.io.ByteArrayInputStream;
@@ -150,8 +151,6 @@
    */
   private int rowCount;
 
-  private boolean writeDeleteDelta = true;
-
   public CarbonInputSplit() {
     segment = null;
     taskId = "0";
@@ -195,7 +194,13 @@
     this.version = ColumnarFormatVersion.valueOf(in.readShort());
     // will be removed after count(*) optmization in case of index server
     this.rowCount = in.readInt();
-    this.writeDeleteDelta = in.readBoolean();
+    if (in.readBoolean()) {
+      int numberOfDeleteDeltaFiles = in.readInt();
+      deleteDeltaFiles = new String[numberOfDeleteDeltaFiles];
+      for (int i = 0; i < numberOfDeleteDeltaFiles; i++) {
+        deleteDeltaFiles[i] = in.readUTF();
+      }
+    }
     // after deseralizing required field get the start position of field which will be only used
     // in executor
     int leftoverPosition = underlineStream.getPosition();
@@ -359,7 +364,13 @@
       this.length = in.readLong();
       this.version = ColumnarFormatVersion.valueOf(in.readShort());
       this.rowCount = in.readInt();
-      this.writeDeleteDelta = in.readBoolean();
+      if (in.readBoolean()) {
+        int numberOfDeleteDeltaFiles = in.readInt();
+        deleteDeltaFiles = new String[numberOfDeleteDeltaFiles];
+        for (int i = 0; i < numberOfDeleteDeltaFiles; i++) {
+          deleteDeltaFiles[i] = in.readUTF();
+        }
+      }
       this.bucketId = in.readUTF();
     }
     this.blockletId = in.readUTF();
@@ -379,13 +390,6 @@
       validBlockletIds.add((int) in.readShort());
     }
     this.isLegacyStore = in.readBoolean();
-    if (writeDeleteDelta) {
-      int numberOfDeleteDeltaFiles = in.readInt();
-      deleteDeltaFiles = new String[numberOfDeleteDeltaFiles];
-      for (int i = 0; i < numberOfDeleteDeltaFiles; i++) {
-        deleteDeltaFiles[i] = in.readUTF();
-      }
-    }
   }
 
   @Override public void write(DataOutput out) throws IOException {
@@ -397,11 +401,10 @@
       out.writeLong(length);
       out.writeShort(version.number());
       out.writeInt(rowCount);
-      out.writeBoolean(writeDeleteDelta);
+      writeDeleteDeltaFile(out);
       out.writeUTF(bucketId);
       out.writeUTF(blockletId);
       out.write(serializeData, offset, actualLen);
-      writeDeleteDeltaFile(out);
       return;
     }
     // please refer writeDetailInfo doc
@@ -419,7 +422,7 @@
     } else {
       out.writeInt(0);
     }
-    out.writeBoolean(writeDeleteDelta);
+    writeDeleteDeltaFile(out);
     if (null != bucketId) {
       out.writeUTF(bucketId);
     }
@@ -442,18 +445,19 @@
       out.writeShort(blockletId);
     }
     out.writeBoolean(isLegacyStore);
-    writeDeleteDeltaFile(out);
   }
 
   private void writeDeleteDeltaFile(DataOutput out) throws IOException {
-    if (!writeDeleteDelta) {
-      return;
-    }
-    out.writeInt(null != deleteDeltaFiles ? deleteDeltaFiles.length : 0);
-    if (null != deleteDeltaFiles) {
-      for (int i = 0; i < deleteDeltaFiles.length; i++) {
-        out.writeUTF(deleteDeltaFiles[i]);
+    if (deleteDeltaFiles != null) {
+      out.writeBoolean(true);
+      out.writeInt(deleteDeltaFiles.length);
+      if (null != deleteDeltaFiles) {
+        for (int i = 0; i < deleteDeltaFiles.length; i++) {
+          out.writeUTF(deleteDeltaFiles[i]);
+        }
       }
+    } else {
+      out.writeBoolean(false);
     }
   }
 
@@ -586,7 +590,6 @@
   }
 
   public void setDeleteDeltaFiles(String[] deleteDeltaFiles) {
-    this.writeDeleteDelta = true;
     this.deleteDeltaFiles = deleteDeltaFiles;
   }
 
@@ -879,7 +882,4 @@
     this.bucketId = bucketId;
   }
 
-  public void setWriteDeleteDelta(boolean writeDeleteDelta) {
-    this.writeDeleteDelta = writeDeleteDelta;
-  }
 }