SQOOP-3326: Mainframe FTP listing for GDG should filter out non-GDG datasets in a heterogeneous listing

(Chris Teoh via Szabolcs Vasas)
diff --git a/build.xml b/build.xml
index cd2e9e2..f397531 100644
--- a/build.xml
+++ b/build.xml
@@ -262,7 +262,9 @@
   <property name="sqoop.test.mainframe.ftp.binary.dataset.seq" value="TSODIQ1.FOLDER.FOLDERTXT" />
   <property name="sqoop.test.mainframe.ftp.binary.dataset.seq.filename" value="FOLDERTXT" />
   <property name="sqoop.test.mainframe.ftp.binary.dataset.seq.md5" value="1591c0fcc718fda7e9c1f3561d232b2b" />
-
+  <property name="sqoop.test.mainframe.ftp.binary.dataset.mixed" value="TSODIQ1.MIXED" />
+  <property name="sqoop.test.mainframe.ftp.binary.dataset.mixed.filename" value="G0039V00" />
+  <property name="sqoop.test.mainframe.ftp.binary.dataset.mixed.md5" value="5e7f4ec7cbeae8e0e0b4d88346eb9349" />
   <property name="s3.bucket.url" value="" />
   <property name="s3.generator.command" value="" />
 
@@ -912,6 +914,9 @@
       <sysproperty key="sqoop.test.mainframe.ftp.binary.dataset.seq" value="${sqoop.test.mainframe.ftp.binary.dataset.seq}" />
       <sysproperty key="sqoop.test.mainframe.ftp.binary.dataset.seq.filename" value="${sqoop.test.mainframe.ftp.binary.dataset.seq.filename}" />
       <sysproperty key="sqoop.test.mainframe.ftp.binary.dataset.seq.md5" value="${sqoop.test.mainframe.ftp.binary.dataset.seq.md5}" />
+      <sysproperty key="sqoop.test.mainframe.ftp.binary.dataset.mixed" value="${sqoop.test.mainframe.ftp.binary.dataset.mixed}" />
+      <sysproperty key="sqoop.test.mainframe.ftp.binary.dataset.mixed.filename" value="${sqoop.test.mainframe.ftp.binary.dataset.mixed.filename}" />
+      <sysproperty key="sqoop.test.mainframe.ftp.binary.dataset.mixed.md5" value="${sqoop.test.mainframe.ftp.binary.dataset.mixed.md5}" />
       <!-- Location of Hive logs -->
       <!--<sysproperty key="hive.log.dir"
                    value="${test.build.data}/sqoop/logs"/> -->
diff --git a/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeConfiguration.java b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeConfiguration.java
index 9d6a2fe..9842daa 100644
--- a/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeConfiguration.java
+++ b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeConfiguration.java
@@ -31,7 +31,7 @@
   public static final String MAINFRAME_INPUT_DATASET_TYPE_PARTITIONED
 	= "p";
   public static final String MAINFRAME_INPUT_DATASET_TAPE = "mainframe.input.dataset.tape";
-
+  public static final String MAINFRAME_FTP_FILE_GDG_ENTRY_PARSER_CLASSNAME = MainframeFTPFileGdgEntryParser.class.getName();
   public static final String MAINFRAME_FTP_FILE_ENTRY_PARSER_CLASSNAME = "org.apache.sqoop.mapreduce.mainframe.MainframeFTPFileEntryParser";
 
   public static final String MAINFRAME_FTP_TRANSFER_MODE = "mainframe.ftp.transfermode";
diff --git a/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeFTPFileGdgEntryParser.java b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeFTPFileGdgEntryParser.java
new file mode 100644
index 0000000..8467afa
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeFTPFileGdgEntryParser.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.mainframe;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.net.ftp.FTPClientConfig;
+import org.apache.commons.net.ftp.FTPFile;
+import org.apache.commons.net.ftp.parser.ConfigurableFTPFileEntryParserImpl;
+
+public class MainframeFTPFileGdgEntryParser extends ConfigurableFTPFileEntryParserImpl {
+/* Sample FTP listing
+Volume Unit    Referred Ext Used Recfm Lrecl BlkSz Dsorg Dsname
+H19761 Tape                                             G0034V00
+H81751 Tape                                             G0035V00
+H73545 Tape                                             G0036V00
+G10987 Tape                                             G0037V00
+SHT331 3390   **NONE**    1   15  VB     114 27998  PS  DUMMY
+SHT337 3390   **NONE**    1   15  VB     114 27998  PS  G0035V00.COPY
+SHT33A 3390   **NONE**    1   15  VB     114 27998  PS  HELLO
+
+* And what we need to get back from parsing are the following entries:-
+H19761 Tape                                             G0034V00
+H81751 Tape                                             G0035V00
+H73545 Tape                                             G0036V00
+G10987 Tape                                             G0037V00
+*/
+
+  private static final String DEFAULT_DATE_FORMAT = "yyyy/MM/dd HH:mm";
+  private static final String HEADER = "Volume Unit ";
+  private static String GDG_REGEX = "^\\S+\\s+.*?\\s+(G\\d{4}V\\d{2})$";
+  private static final Log LOG = LogFactory.getLog(MainframeFTPFileGdgEntryParser.class.getName());
+
+  public MainframeFTPFileGdgEntryParser() {
+    super(GDG_REGEX);
+    LOG.info("MainframeFTPFileGdgEntryParser default constructor");
+  }
+
+  @Override
+  public FTPFile parseFTPEntry(String entry) {
+    LOG.info("parseFTPEntry: "+entry);
+    if (isFtpListingHeader(entry)) {
+      return null;
+    }
+    if (matches(entry)) {
+      String dsName = group(1);
+      return createFtpFile(entry,dsName);
+    }
+    return null;
+  }
+
+  protected FTPFile createFtpFile(String entry, String dsName) {
+    FTPFile file = new FTPFile();
+    file.setRawListing(entry);
+    file.setName(dsName);
+    file.setType(FTPFile.FILE_TYPE);
+    return file;
+  }
+
+  protected Boolean isFtpListingHeader(String entry) {
+    return entry.startsWith(HEADER);
+  }
+
+  @Override
+  protected FTPClientConfig getDefaultConfiguration() {
+    return new FTPClientConfig(FTPClientConfig.SYST_MVS,
+        DEFAULT_DATE_FORMAT, null, null, null, null);
+  }
+
+}
diff --git a/src/java/org/apache/sqoop/util/MainframeFTPClientUtils.java b/src/java/org/apache/sqoop/util/MainframeFTPClientUtils.java
index 654721e..e7c48a6 100644
--- a/src/java/org/apache/sqoop/util/MainframeFTPClientUtils.java
+++ b/src/java/org/apache/sqoop/util/MainframeFTPClientUtils.java
@@ -86,8 +86,14 @@
         ftp.changeWorkingDirectory("'" + pdsName + "'");
         FTPFile[] ftpFiles = null;
         if (!MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_PARTITIONED.equals(dsType)) {
-          // excepting partitioned datasets, use the MainframeFTPFileEntryParser, default doesn't match larger datasets
-        	FTPListParseEngine parser = ftp.initiateListParsing(MainframeConfiguration.MAINFRAME_FTP_FILE_ENTRY_PARSER_CLASSNAME, "");
+          FTPListParseEngine parser = null;
+          if (MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_GDG.equals(dsType)) {
+            // use GDG specific parser to filter out non GDG datasets
+            parser = ftp.initiateListParsing(MainframeConfiguration.MAINFRAME_FTP_FILE_GDG_ENTRY_PARSER_CLASSNAME, "");
+          } else {
+            // excepting partitioned datasets, use the MainframeFTPFileEntryParser, default doesn't match larger datasets
+            parser = ftp.initiateListParsing(MainframeConfiguration.MAINFRAME_FTP_FILE_ENTRY_PARSER_CLASSNAME, "");
+          }
         	List<FTPFile> listing = new ArrayList<FTPFile>();
         	while(parser.hasNext()) {
         		FTPFile[] files = parser.getNext(25);
diff --git a/src/scripts/thirdpartytest/docker-compose/sqoop-thirdpartytest-db-services.yml b/src/scripts/thirdpartytest/docker-compose/sqoop-thirdpartytest-db-services.yml
index 4648f54..b4cf488 100644
--- a/src/scripts/thirdpartytest/docker-compose/sqoop-thirdpartytest-db-services.yml
+++ b/src/scripts/thirdpartytest/docker-compose/sqoop-thirdpartytest-db-services.yml
@@ -112,7 +112,7 @@
       timeout: 10s
       retries: 20
   mainframe:
-    image: cntroversycubed/sqoopgdg:afdf57b15d8e71eb77d24d606b77e185ef39ceb3
+    image: cntroversycubed/sqoopgdg:42e6c3a1229a6cdf346eb3976bd7298091ea11e2
     container_name: sqoop_mainframe_gdg_container
     ports:
       - 2121:2121
diff --git a/src/test/org/apache/sqoop/manager/mainframe/MainframeManagerImportTest.java b/src/test/org/apache/sqoop/manager/mainframe/MainframeManagerImportTest.java
index 3b8ed23..af5c754 100644
--- a/src/test/org/apache/sqoop/manager/mainframe/MainframeManagerImportTest.java
+++ b/src/test/org/apache/sqoop/manager/mainframe/MainframeManagerImportTest.java
@@ -149,6 +149,13 @@
     doImportAndVerify(MainframeTestUtil.SEQ_BINARY_DATASET_NAME, MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_SEQUENTIAL, files, "--as-binaryfile", "--buffersize", "64000");
   }
 
+  @Test
+  public void testImportMixedBinaryWithBufferSize() throws IOException {
+    HashMap<String,String> files = new HashMap<String,String>();
+    files.put(MainframeTestUtil.MIXED_BINARY_DATASET_FILENAME, MainframeTestUtil.EXPECTED_MIXED_BINARY_DATASET_MD5);
+    doImportAndVerify(MainframeTestUtil.MIXED_BINARY_DATASET_NAME, MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_GDG, files, "--as-binaryfile", "--buffersize", "64000");
+  }
+
   private String [] getArgv(String datasetName, String datasetType, String ... extraArgs) {
     ArrayList<String> args = new ArrayList<String>();
 
diff --git a/src/test/org/apache/sqoop/manager/mainframe/MainframeTestUtil.java b/src/test/org/apache/sqoop/manager/mainframe/MainframeTestUtil.java
index 9f86f6c..a65aea6 100644
--- a/src/test/org/apache/sqoop/manager/mainframe/MainframeTestUtil.java
+++ b/src/test/org/apache/sqoop/manager/mainframe/MainframeTestUtil.java
@@ -68,4 +68,13 @@
   public static final String EXPECTED_SEQ_BINARY_DATASET_MD5 = System.getProperty(
       "sqoop.test.mainframe.ftp.binary.dataset.seq.md5",
       "1591c0fcc718fda7e9c1f3561d232b2b");
+  public static final String MIXED_BINARY_DATASET_NAME = System.getProperty(
+      "sqoop.test.mainframe.ftp.binary.dataset.mixed",
+      "TSODIQ1.MIXED");
+  public static final String MIXED_BINARY_DATASET_FILENAME = System.getProperty(
+      "sqoop.test.mainframe.ftp.binary.dataset.mixed.filename",
+      "G0039V00");
+  public static final String EXPECTED_MIXED_BINARY_DATASET_MD5 = System.getProperty(
+      "sqoop.test.mainframe.ftp.binary.dataset.mixed.md5",
+      "5e7f4ec7cbeae8e0e0b4d88346eb9349");
 }
diff --git a/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeFTPFileGdgEntryParser.java b/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeFTPFileGdgEntryParser.java
new file mode 100644
index 0000000..521a042
--- /dev/null
+++ b/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeFTPFileGdgEntryParser.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.mainframe;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.commons.net.ftp.FTPFile;
+import org.junit.Before;
+import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestMainframeFTPFileGdgEntryParser {
+  /* Sample FTP listing
+  Volume Unit    Referred Ext Used Recfm Lrecl BlkSz Dsorg Dsname
+  H19761 Tape                                             G0034V00
+  H81751 Tape                                             G0035V00
+  H73545 Tape                                             G0036V00
+  G10987 Tape                                             G0037V00
+  SHT331 3390   **NONE**    1   15  VB     114 27998  PS  DUMMY
+  SHT337 3390   **NONE**    1   15  VB     114 27998  PS  G0035V00.COPY
+  SHT33A 3390   **NONE**    1   15  VB     114 27998  PS  HELLO
+
+  * And what we need to get back from parsing are the following entries:-
+  H19761 Tape                                             G0034V00
+  H81751 Tape                                             G0035V00
+  H73545 Tape                                             G0036V00
+  G10987 Tape                                             G0037V00
+  */
+  private final static String FTP_LIST_HEADER = "Volume Unit    Referred Ext Used Recfm Lrecl BlkSz Dsorg Dsname";
+  private final String DSNAME = "G0034V00";
+  private final String ENTRY = String.format("H19761 Tape                                             %s",DSNAME);
+  private List<String> listing;
+  private MainframeFTPFileGdgEntryParser parser;
+  @Before
+  public void setUpBefore() throws Exception {
+    parser = new MainframeFTPFileGdgEntryParser();
+    listing = new ArrayList<>();
+    listing.add("Volume Unit    Referred Ext Used Recfm Lrecl BlkSz Dsorg Dsname");
+    listing.add(ENTRY);
+    listing.add("H81751 Tape                                             G0035V00");
+    listing.add("H73545 Tape                                             G0036V00");
+    listing.add("G10987 Tape                                             G0037V00");
+    listing.add("SHT331 3390   **NONE**    1   15  VB     114 27998  PS  DUMMY");
+    listing.add("SHT337 3390   **NONE**    1   15  VB     114 27998  PS  G0035V00.COPY");
+    listing.add("SHT33A 3390   **NONE**    1   15  VB     114 27998  PS  HELLO");
+  }
+
+  @Test
+  public void testIsHeader() {
+    assertTrue(parser.isFtpListingHeader(FTP_LIST_HEADER));
+  }
+
+  @Test
+  public void testCreateFtpFile() {
+    FTPFile file = parser.createFtpFile(ENTRY, DSNAME);
+    assertEquals(ENTRY,file.getRawListing());
+    assertEquals(DSNAME,file.getName());
+  }
+
+  @Test
+  public void testParseFTPEntry() {
+    final int EXPECTED_RECORD_COUNT=4;
+    long i = listing.stream()
+        .map(parser::parseFTPEntry)
+        .filter(Objects::nonNull)
+        .count();
+    assertEquals(EXPECTED_RECORD_COUNT,i);
+  }
+}
diff --git a/src/test/org/apache/sqoop/util/TestMainframeFTPClientUtils.java b/src/test/org/apache/sqoop/util/TestMainframeFTPClientUtils.java
index 90a8519..0714bdc 100644
--- a/src/test/org/apache/sqoop/util/TestMainframeFTPClientUtils.java
+++ b/src/test/org/apache/sqoop/util/TestMainframeFTPClientUtils.java
@@ -297,7 +297,7 @@
 	        FTPFile file2 = new FTPFile();
 	        file2.setName("G0101V00");
 	        file2.setType(FTPFile.FILE_TYPE);
-        when(mockFTPClient.initiateListParsing(MainframeConfiguration.MAINFRAME_FTP_FILE_ENTRY_PARSER_CLASSNAME,"")).thenReturn(mockFTPListParseEngine);
+        when(mockFTPClient.initiateListParsing(MainframeConfiguration.MAINFRAME_FTP_FILE_GDG_ENTRY_PARSER_CLASSNAME,"")).thenReturn(mockFTPListParseEngine);
         when(mockFTPListParseEngine.hasNext()).thenReturn(true,true,false);
         when(mockFTPListParseEngine.getNext(25)).thenReturn(new FTPFile[] {file1,file2});
 	    } catch (IOException e) {