SQOOP-3225: Mainframe module FTP listing parser should cater for larger datasets on disk
(Chris Teoh via Boglarka Egyed)
diff --git a/src/java/org/apache/sqoop/util/MainframeFTPClientUtils.java b/src/java/org/apache/sqoop/util/MainframeFTPClientUtils.java
index f61b983..95bc0ec 100644
--- a/src/java/org/apache/sqoop/util/MainframeFTPClientUtils.java
+++ b/src/java/org/apache/sqoop/util/MainframeFTPClientUtils.java
@@ -57,6 +57,12 @@
String dsName = pdsName;
String fileName = "";
MainframeDatasetPath p = null;
+ String dsType = conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE);
+ if (dsType == null) {
+ // default dataset type to partitioned dataset
+ conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE,MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_PARTITIONED);
+ dsType = conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE);
+ }
try {
p = new MainframeDatasetPath(dsName,conf);
} catch (Exception e) {
@@ -64,8 +70,6 @@
LOG.error("MainframeDatasetPath helper class incorrectly initialised");
e.printStackTrace();
}
- String dsType = conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE);
- boolean isTape = Boolean.parseBoolean(conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TAPE));
boolean isSequentialDs = false;
boolean isGDG = false;
if (dsType != null && p != null) {
@@ -80,7 +84,8 @@
if (ftp != null) {
ftp.changeWorkingDirectory("'" + pdsName + "'");
FTPFile[] ftpFiles = null;
- if (isTape) {
+ if (!MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_PARTITIONED.equals(dsType)) {
+ // excepting partitioned datasets, use the MainframeFTPFileEntryParser, default doesn't match larger datasets
FTPListParseEngine parser = ftp.initiateListParsing(MainframeConfiguration.MAINFRAME_FTP_FILE_ENTRY_PARSER_CLASSNAME, "");
List<FTPFile> listing = new ArrayList<FTPFile>();
while(parser.hasNext()) {
@@ -102,7 +107,11 @@
}
}
}
- else { ftpFiles = ftp.listFiles(); }
+ else {
+ // partitioned datasets have a different FTP listing structure
+ LOG.info("Dataset is a partitioned dataset, using default FTP list parsing");
+ ftpFiles = ftp.listFiles();
+ }
if (!isGDG) {
for (FTPFile f : ftpFiles) {
LOG.info(String.format("Name: %s Type: %s",f.getName(), f.getType()));
diff --git a/src/test/org/apache/sqoop/util/TestMainframeFTPClientUtils.java b/src/test/org/apache/sqoop/util/TestMainframeFTPClientUtils.java
index d87c75d..90a8519 100644
--- a/src/test/org/apache/sqoop/util/TestMainframeFTPClientUtils.java
+++ b/src/test/org/apache/sqoop/util/TestMainframeFTPClientUtils.java
@@ -21,12 +21,14 @@
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.verify;
import java.io.IOException;
import java.util.List;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.net.ftp.FTPFile;
+import org.apache.commons.net.ftp.FTPListParseEngine;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.sqoop.mapreduce.JobBase;
@@ -42,12 +44,14 @@
private JobConf conf;
private FTPClient mockFTPClient;
+ private FTPListParseEngine mockFTPListParseEngine;
@Before
public void setUp() {
conf = new JobConf();
mockFTPClient = mock(FTPClient.class);
when(mockFTPClient.getReplyString()).thenReturn("");
+ mockFTPListParseEngine = mock(FTPListParseEngine.class);
MainframeFTPClientUtils.setMockFTPClient(mockFTPClient);
}
@@ -173,7 +177,9 @@
FTPFile file2 = new FTPFile();
file2.setName("blah2");
file2.setType(FTPFile.FILE_TYPE);
- when(mockFTPClient.listFiles()).thenReturn(new FTPFile[] {file1,file2});
+ when(mockFTPClient.initiateListParsing(MainframeConfiguration.MAINFRAME_FTP_FILE_ENTRY_PARSER_CLASSNAME,"")).thenReturn(mockFTPListParseEngine);
+ when(mockFTPListParseEngine.hasNext()).thenReturn(true,false);
+ when(mockFTPListParseEngine.getNext(25)).thenReturn(new FTPFile[] {file1,file2});
} catch (IOException e) {
fail("No IOException should be thrown!");
}
@@ -210,7 +216,9 @@
FTPFile file2 = new FTPFile();
file2.setName("blah2");
file2.setType(FTPFile.FILE_TYPE);
- when(mockFTPClient.listFiles()).thenReturn(new FTPFile[]{file1,file2});
+ when(mockFTPClient.initiateListParsing(MainframeConfiguration.MAINFRAME_FTP_FILE_ENTRY_PARSER_CLASSNAME,"")).thenReturn(mockFTPListParseEngine);
+ when(mockFTPListParseEngine.hasNext()).thenReturn(true,false);
+ when(mockFTPListParseEngine.getNext(25)).thenReturn(new FTPFile[] {file1,file2});
} catch (IOException e) {
fail("No IOException should be thrown!");
}
@@ -248,7 +256,9 @@
FTPFile file2 = new FTPFile();
file2.setName("blah2");
file2.setType(FTPFile.FILE_TYPE);
- when(mockFTPClient.listFiles()).thenReturn(new FTPFile[]{file1,file2});
+ when(mockFTPClient.initiateListParsing(MainframeConfiguration.MAINFRAME_FTP_FILE_ENTRY_PARSER_CLASSNAME,"")).thenReturn(mockFTPListParseEngine);
+ when(mockFTPListParseEngine.hasNext()).thenReturn(true,true,false);
+ when(mockFTPListParseEngine.getNext(25)).thenReturn(new FTPFile[] {file1,file2});
} catch (IOException e) {
fail("No IOException should be thrown!");
}
@@ -287,7 +297,9 @@
FTPFile file2 = new FTPFile();
file2.setName("G0101V00");
file2.setType(FTPFile.FILE_TYPE);
- when(mockFTPClient.listFiles()).thenReturn(new FTPFile[]{file1,file2});
+ when(mockFTPClient.initiateListParsing(MainframeConfiguration.MAINFRAME_FTP_FILE_ENTRY_PARSER_CLASSNAME,"")).thenReturn(mockFTPListParseEngine);
+ when(mockFTPListParseEngine.hasNext()).thenReturn(true,true,false);
+ when(mockFTPListParseEngine.getNext(25)).thenReturn(new FTPFile[] {file1,file2});
} catch (IOException e) {
fail("No IOException should be thrown!");
}
@@ -312,4 +324,42 @@
Assert.assertEquals("Could not list datasets from a.b.c.blah1:java.io.IOException: Folder not found",ioeString);
}
}
+
+ @Test
+ public void testPartitionedDatasetsShouldReturnAllFiles() {
+ try {
+ when(mockFTPClient.login("user", "pssword")).thenReturn(true);
+ when(mockFTPClient.logout()).thenReturn(true);
+ when(mockFTPClient.isConnected()).thenReturn(false);
+ when(mockFTPClient.getReplyCode()).thenReturn(200);
+ when(mockFTPClient.changeWorkingDirectory("a.b.c.blah1")).thenReturn(true);
+ FTPFile file1 = new FTPFile();
+ file1.setName("blah1");
+ file1.setType(FTPFile.FILE_TYPE);
+ FTPFile file2 = new FTPFile();
+ file2.setName("blah2");
+ file2.setType(FTPFile.FILE_TYPE);
+ // initiateListParsing should not be called here as it is a partitioned dataset and default to listFiles()
+ when(mockFTPClient.listFiles()).thenReturn(new FTPFile[]{file1,file2});
+ } catch (IOException e) {
+ fail("No IOException should be thrown!");
+ }
+ conf.set(DBConfiguration.URL_PROPERTY, "localhost:11111");
+ conf.set(DBConfiguration.USERNAME_PROPERTY, "user");
+ conf.set(DBConfiguration.PASSWORD_PROPERTY, "pssword");
+ conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE,"p");
+ conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME,"a.b.c.blah1");
+ // set the password in the secure credentials object
+ Text PASSWORD_SECRET_KEY = new Text(DBConfiguration.PASSWORD_PROPERTY);
+ conf.getCredentials().addSecretKey(PASSWORD_SECRET_KEY, "pssword".getBytes());
+ try {
+ String dsName = conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME);
+ List<String> files = MainframeFTPClientUtils.listSequentialDatasets(dsName, conf);
+ Assert.assertTrue(files != null && files.size() == 2);
+ verify(mockFTPClient).listFiles();
+ } catch (IOException ioe) {
+ String ioeString = ioe.getMessage();
+ Assert.fail(ioeString);
+ }
+ }
}