[CARBONDATA-3460] Fixed EOFException in CarbonScanRDD
Problem: Delete delta information was not written properly in the OutputStream due the flag based writing.
Solution: Always write the delete delta info, the size of the array will be the deciding factor whether to read further or not.
This closes #3316
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
index d97148d..a85423b 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
@@ -177,7 +177,6 @@
DataOutputStream dos = new DataOutputStream(ebos);
inputSplit.setFilePath(null);
inputSplit.setBucketId(null);
- inputSplit.setWriteDeleteDelta(false);
if (inputSplit.isBlockCache()) {
inputSplit.updateFooteroffset();
inputSplit.updateBlockLength();
diff --git a/core/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java b/core/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
index da1bc2c..edbfcfe 100644
--- a/core/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
+++ b/core/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.carbondata.hadoop;
import java.io.ByteArrayInputStream;
@@ -150,8 +151,6 @@
*/
private int rowCount;
- private boolean writeDeleteDelta = true;
-
public CarbonInputSplit() {
segment = null;
taskId = "0";
@@ -195,7 +194,13 @@
this.version = ColumnarFormatVersion.valueOf(in.readShort());
// will be removed after count(*) optmization in case of index server
this.rowCount = in.readInt();
- this.writeDeleteDelta = in.readBoolean();
+ if (in.readBoolean()) {
+ int numberOfDeleteDeltaFiles = in.readInt();
+ deleteDeltaFiles = new String[numberOfDeleteDeltaFiles];
+ for (int i = 0; i < numberOfDeleteDeltaFiles; i++) {
+ deleteDeltaFiles[i] = in.readUTF();
+ }
+ }
// after deseralizing required field get the start position of field which will be only used
// in executor
int leftoverPosition = underlineStream.getPosition();
@@ -359,7 +364,13 @@
this.length = in.readLong();
this.version = ColumnarFormatVersion.valueOf(in.readShort());
this.rowCount = in.readInt();
- this.writeDeleteDelta = in.readBoolean();
+ if (in.readBoolean()) {
+ int numberOfDeleteDeltaFiles = in.readInt();
+ deleteDeltaFiles = new String[numberOfDeleteDeltaFiles];
+ for (int i = 0; i < numberOfDeleteDeltaFiles; i++) {
+ deleteDeltaFiles[i] = in.readUTF();
+ }
+ }
this.bucketId = in.readUTF();
}
this.blockletId = in.readUTF();
@@ -379,13 +390,6 @@
validBlockletIds.add((int) in.readShort());
}
this.isLegacyStore = in.readBoolean();
- if (writeDeleteDelta) {
- int numberOfDeleteDeltaFiles = in.readInt();
- deleteDeltaFiles = new String[numberOfDeleteDeltaFiles];
- for (int i = 0; i < numberOfDeleteDeltaFiles; i++) {
- deleteDeltaFiles[i] = in.readUTF();
- }
- }
}
@Override public void write(DataOutput out) throws IOException {
@@ -397,11 +401,10 @@
out.writeLong(length);
out.writeShort(version.number());
out.writeInt(rowCount);
- out.writeBoolean(writeDeleteDelta);
+ writeDeleteDeltaFile(out);
out.writeUTF(bucketId);
out.writeUTF(blockletId);
out.write(serializeData, offset, actualLen);
- writeDeleteDeltaFile(out);
return;
}
// please refer writeDetailInfo doc
@@ -419,7 +422,7 @@
} else {
out.writeInt(0);
}
- out.writeBoolean(writeDeleteDelta);
+ writeDeleteDeltaFile(out);
if (null != bucketId) {
out.writeUTF(bucketId);
}
@@ -442,18 +445,19 @@
out.writeShort(blockletId);
}
out.writeBoolean(isLegacyStore);
- writeDeleteDeltaFile(out);
}
private void writeDeleteDeltaFile(DataOutput out) throws IOException {
- if (!writeDeleteDelta) {
- return;
- }
- out.writeInt(null != deleteDeltaFiles ? deleteDeltaFiles.length : 0);
- if (null != deleteDeltaFiles) {
- for (int i = 0; i < deleteDeltaFiles.length; i++) {
- out.writeUTF(deleteDeltaFiles[i]);
+ if (deleteDeltaFiles != null) {
+ out.writeBoolean(true);
+ out.writeInt(deleteDeltaFiles.length);
+ if (null != deleteDeltaFiles) {
+ for (int i = 0; i < deleteDeltaFiles.length; i++) {
+ out.writeUTF(deleteDeltaFiles[i]);
+ }
}
+ } else {
+ out.writeBoolean(false);
}
}
@@ -586,7 +590,6 @@
}
public void setDeleteDeltaFiles(String[] deleteDeltaFiles) {
- this.writeDeleteDelta = true;
this.deleteDeltaFiles = deleteDeltaFiles;
}
@@ -879,7 +882,4 @@
this.bucketId = bucketId;
}
- public void setWriteDeleteDelta(boolean writeDeleteDelta) {
- this.writeDeleteDelta = writeDeleteDelta;
- }
}