SQOOP-3237: Mainframe FTP transfer option to insert custom FTP commands prior to transfer

(Chris Teoh via Szabolcs Vasas)
diff --git a/src/docs/user/import-mainframe.txt b/src/docs/user/import-mainframe.txt
index 3ecfb7e..a994f8b 100644
--- a/src/docs/user/import-mainframe.txt
+++ b/src/docs/user/import-mainframe.txt
@@ -214,6 +214,20 @@
 will alter the number of records Sqoop reports to have imported. This is because it reads the
 binary dataset in chunks specified by buffersize. Larger buffer size means lower number of records.
 
+Use the +\--ftp-commands+ with a comma separated list of commands to send custom FTP commands prior to
+file retrieval. This is useful for letting the mainframe know to embed data into the binary files
+like Record Descriptor Words for variable length records so downstream processes can separate each
+record. The mainframe will otherwise discard this metadata in the file transmission.
+
+NOTE: The responses from the mainframe of these commands are logged ONLY. It is up to the user to check
+for errors responses from the mainframe.
+
+----
+$ sqoop import-mainframe -D hadoop.security.credential.provider.path=jceks://file/my/folder/mainframe.jceks \
+  --connect <host> --username user1 --password-alias alias1 --dataset SomeDS --tape true \
+  --as-binaryfile --datasettype g --ftp-commands "SITE RDW,SITE RDW READTAPEFORMAT=V"
+----
+
 Additional Import Configuration Properties
 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 There are some additional properties which can be configured by modifying
diff --git a/src/java/org/apache/sqoop/SqoopOptions.java b/src/java/org/apache/sqoop/SqoopOptions.java
index f06872f..99eb8e6 100644
--- a/src/java/org/apache/sqoop/SqoopOptions.java
+++ b/src/java/org/apache/sqoop/SqoopOptions.java
@@ -369,6 +369,9 @@
   // Buffer size to use when using binary FTP transfer mode
   @StoredAsProperty("mainframe.ftp.buffersize")
   private Integer bufferSize;
+  // custom FTP commands to be sent to mainframe
+  @StoredAsProperty("mainframe.ftp.commands")
+  private String customFtpCommands;
   // Accumulo home directory
   private String accumuloHome; // not serialized to metastore.
   // Zookeeper home directory
@@ -2528,6 +2531,16 @@
     bufferSize = buf;
   }
 
+  // sets the custom FTP commands
+  public void setFtpCommands(String ftpCmds) {
+    customFtpCommands = ftpCmds;
+  }
+
+  // gets the custom FTP commands issued
+  public String getFtpCommands() {
+    return customFtpCommands;
+  }
+
   public static String getAccumuloHomeDefault() {
     // Set this with $ACCUMULO_HOME, but -Daccumulo.home can override.
     String accumuloHome = System.getenv("ACCUMULO_HOME");
diff --git a/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeConfiguration.java b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeConfiguration.java
index 9842daa..2ea21f7 100644
--- a/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeConfiguration.java
+++ b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeConfiguration.java
@@ -44,4 +44,6 @@
   public static final Integer MAINFRAME_FTP_TRANSFER_BINARY_DEFAULT_BUFFER_SIZE = 32760;
 
   public static final String MAINFRAME_FTP_TRANSFER_BINARY_BUFFER_SIZE = "mainframe.ftp.buffersize";
+
+  public static final String MAINFRAME_FTP_CUSTOM_COMMANDS = "mainframe.ftp.commands";
 }
diff --git a/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeImportJob.java b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeImportJob.java
index 90dc2dd..622667d 100644
--- a/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeImportJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeImportJob.java
@@ -20,6 +20,7 @@
 
 import java.io.IOException;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -74,6 +75,11 @@
     job.getConfiguration().set(
             MainframeConfiguration.MAINFRAME_INPUT_DATASET_TAPE,
             options.getMainframeInputDatasetTape().toString());
+    if (!StringUtils.isBlank(options.getFtpCommands())) {
+      job.getConfiguration().set(
+      MainframeConfiguration.MAINFRAME_FTP_CUSTOM_COMMANDS,
+      options.getFtpCommands());
+    }
     if (SqoopOptions.FileLayout.BinaryFile == options.getFileLayout()) {
       job.getConfiguration().set(
         MainframeConfiguration.MAINFRAME_FTP_TRANSFER_MODE,
diff --git a/src/java/org/apache/sqoop/tool/MainframeImportTool.java b/src/java/org/apache/sqoop/tool/MainframeImportTool.java
index fbc8c3d..cbaaf65 100644
--- a/src/java/org/apache/sqoop/tool/MainframeImportTool.java
+++ b/src/java/org/apache/sqoop/tool/MainframeImportTool.java
@@ -42,6 +42,7 @@
   public static final String DS_TYPE_ARG = "datasettype";
   public static final String DS_TAPE_ARG = "tape";
   public static final String BUFFERSIZE_ARG = "buffersize";
+  public static final String FTP_COMMANDS = "ftp-commands";
 
   public MainframeImportTool() {
     super("import-mainframe", false);
@@ -92,6 +93,10 @@
       .hasArg().withDescription("Sets buffer size for binary import in bytes (default=32kB)")
       .withLongOpt(BUFFERSIZE_ARG)
       .create());
+    importOpts.addOption(OptionBuilder.withArgName("Comma separated FTP commands issued before FTP transfer")
+      .hasArg().withDescription("Additional FTP commands issued before transfer")
+      .withLongOpt(FTP_COMMANDS)
+      .create());
     importOpts.addOption(OptionBuilder.withArgName("n")
         .hasArg().withDescription("Use 'n' map tasks to import in parallel")
         .withLongOpt(NUM_MAPPERS_ARG)
@@ -200,6 +205,9 @@
       // set the default buffer size to 32kB
       out.setBufferSize(MainframeConfiguration.MAINFRAME_FTP_TRANSFER_BINARY_DEFAULT_BUFFER_SIZE);
     }
+    if (in.hasOption(FTP_COMMANDS)) {
+      out.setFtpCommands(in.getOptionValue(FTP_COMMANDS));
+    }
   }
 
   @Override
diff --git a/src/java/org/apache/sqoop/util/MainframeFTPClientUtils.java b/src/java/org/apache/sqoop/util/MainframeFTPClientUtils.java
index e7c48a6..a80aad9 100644
--- a/src/java/org/apache/sqoop/util/MainframeFTPClientUtils.java
+++ b/src/java/org/apache/sqoop/util/MainframeFTPClientUtils.java
@@ -21,6 +21,7 @@
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 
 import org.apache.commons.lang3.StringUtils;
@@ -226,6 +227,7 @@
         LOG.info("Defaulting FTP transfer mode to ascii");
         ftp.setFileTransferMode(FTP.ASCII_FILE_TYPE);
       }
+      applyFtpCmds(ftp,conf);
       // Use passive mode as default.
       ftp.enterLocalPassiveMode();
       LOG.info("System type detected: " + ftp.getSystemType());
@@ -271,4 +273,28 @@
     mockFTPClient = FTPClient;
   }
 
+  public static List<String> applyFtpCmds(FTPClient ftp, Configuration conf) throws IOException {
+    String ftpCmds = conf.get(MainframeConfiguration.MAINFRAME_FTP_CUSTOM_COMMANDS);
+    String[] ftpCmdList = parseFtpCommands(ftpCmds);
+    List<String> results = new ArrayList<String>();
+    for (String ftpCommand : ftpCmdList) {
+      LOG.info("Issuing command: "+ftpCommand);
+      int res = ftp.sendCommand(ftpCommand);
+      String result = ftp.getReplyString();
+      results.add(result);
+      LOG.info("ReplyCode: "+res + " ReplyString: "+result);
+    }
+    return results;
+  }
+
+  // splits out the concatenated FTP commands
+  public static String[] parseFtpCommands(String ftpCmds) {
+    if (StringUtils.isBlank(ftpCmds)) {
+      return new String[] {};
+    }
+    return Arrays.stream(ftpCmds.split(","))
+      .map(String::trim)
+      .filter(StringUtils::isNotEmpty)
+      .toArray(String[]::new);
+  }
 }
diff --git a/src/test/org/apache/sqoop/tool/TestMainframeImportTool.java b/src/test/org/apache/sqoop/tool/TestMainframeImportTool.java
index 00e57bd..9c4ac48 100644
--- a/src/test/org/apache/sqoop/tool/TestMainframeImportTool.java
+++ b/src/test/org/apache/sqoop/tool/TestMainframeImportTool.java
@@ -228,6 +228,19 @@
     configureAndValidateOptions(args);
   }
 
+  @Test
+  public void testFtpTransferCommands() throws ParseException, InvalidOptionsException {
+    String expectedCmds = "quote SITE RDW,quote SITE RDW READTAPEFORMAT=V";
+    String[] args = new String[] { "--dataset", "mydatasetname", "--ftp-commands", expectedCmds};
+    ToolOptions toolOptions = new ToolOptions();
+    SqoopOptions sqoopOption = new SqoopOptions();
+    mfImportTool.configureOptions(toolOptions);
+    sqoopOption = mfImportTool.parseArguments(args, null, sqoopOption, false);
+    mfImportTool.validateImportOptions(sqoopOption);
+    String ftpcmds = sqoopOption.getFtpCommands();
+    assertEquals(ftpcmds,expectedCmds);
+  }
+
   private void configureAndValidateOptions(String[] args) throws ParseException, InvalidOptionsException {
     mfImportTool.configureOptions(toolOptions);
     sqoopOption = mfImportTool.parseArguments(args, null, sqoopOption, false);
diff --git a/src/test/org/apache/sqoop/util/TestMainframeFTPClientUtils.java b/src/test/org/apache/sqoop/util/TestMainframeFTPClientUtils.java
index fc6e56d..7a842ec 100644
--- a/src/test/org/apache/sqoop/util/TestMainframeFTPClientUtils.java
+++ b/src/test/org/apache/sqoop/util/TestMainframeFTPClientUtils.java
@@ -18,6 +18,8 @@
 
 package org.apache.sqoop.util;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -26,6 +28,7 @@
 import java.io.IOException;
 
 import java.util.List;
+
 import org.apache.commons.net.ftp.FTPClient;
 import org.apache.commons.net.ftp.FTPFile;
 import org.apache.commons.net.ftp.FTPListParseEngine;
@@ -48,6 +51,8 @@
 
   private FTPClient mockFTPClient;
   private FTPListParseEngine mockFTPListParseEngine;
+  private static final String DEFAULT_FTP_USERNAME="user";
+  private static final String DEFAULT_FTP_PASSWORD="pssword";
 
   @Before
   public void setUp() {
@@ -119,9 +124,8 @@
     }
 
     FTPClient ftp = null;
-    conf.set(DBConfiguration.URL_PROPERTY, "localhost:11111");
-    conf.set(DBConfiguration.USERNAME_PROPERTY, "userr");
-    conf.set(DBConfiguration.PASSWORD_PROPERTY, "pssword");
+    setupDefaultConfiguration();
+    conf.set(DBConfiguration.USERNAME_PROPERTY, "invalidusername");
     // set the password in the secure credentials object
     Text PASSWORD_SECRET_KEY = new Text(DBConfiguration.PASSWORD_PROPERTY);
     conf.getCredentials().addSecretKey(PASSWORD_SECRET_KEY,
@@ -148,13 +152,8 @@
       fail("No IOException should be thrown!");
     }
 
-    conf.set(DBConfiguration.URL_PROPERTY, "localhost:11111");
-    conf.set(DBConfiguration.USERNAME_PROPERTY, "userr");
-    conf.set(DBConfiguration.PASSWORD_PROPERTY, "pssword");
-    // set the password in the secure credentials object
-    Text PASSWORD_SECRET_KEY = new Text(DBConfiguration.PASSWORD_PROPERTY);
-    conf.getCredentials().addSecretKey(PASSWORD_SECRET_KEY,
-        "pssword".getBytes());
+    setupDefaultConfiguration();
+    conf.set(DBConfiguration.USERNAME_PROPERTY, "invalidusername");
 
     try {
       MainframeFTPClientUtils.listSequentialDatasets("pdsName", conf);
@@ -187,15 +186,9 @@
       fail("No IOException should be thrown!");
     }
 
-    conf.set(DBConfiguration.URL_PROPERTY, "localhost:11111");
-    conf.set(DBConfiguration.USERNAME_PROPERTY, "user");
-    conf.set(DBConfiguration.PASSWORD_PROPERTY, "pssword");
+    setupDefaultConfiguration();
     conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE,"s");
     conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME,"a.b.c.blah1");
-    // set the password in the secure credentials object
-    Text PASSWORD_SECRET_KEY = new Text(DBConfiguration.PASSWORD_PROPERTY);
-    conf.getCredentials().addSecretKey(PASSWORD_SECRET_KEY,
-            "pssword".getBytes());
 
     try {
       List<String> files = MainframeFTPClientUtils.listSequentialDatasets("a.b.c.blah1", conf);
@@ -226,15 +219,9 @@
       fail("No IOException should be thrown!");
     }
 
-    conf.set(DBConfiguration.URL_PROPERTY, "localhost:11111");
-    conf.set(DBConfiguration.USERNAME_PROPERTY, "user");
-    conf.set(DBConfiguration.PASSWORD_PROPERTY, "pssword");
+    setupDefaultConfiguration();
     conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE,"s");
     conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME,"a.b.c.blah1");
-    // set the password in the secure credentials object
-    Text PASSWORD_SECRET_KEY = new Text(DBConfiguration.PASSWORD_PROPERTY);
-    conf.getCredentials().addSecretKey(PASSWORD_SECRET_KEY,
-            "pssword".getBytes());
 
     try {
 		String dsName = conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME);
@@ -266,15 +253,9 @@
       fail("No IOException should be thrown!");
     }
 
-    conf.set(DBConfiguration.URL_PROPERTY, "localhost:11111");
-    conf.set(DBConfiguration.USERNAME_PROPERTY, "user");
-    conf.set(DBConfiguration.PASSWORD_PROPERTY, "pssword");
+    setupDefaultConfiguration();
     conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE,"s");
     conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME,"a.b.c.blah1.");
-    // set the password in the secure credentials object
-    Text PASSWORD_SECRET_KEY = new Text(DBConfiguration.PASSWORD_PROPERTY);
-    conf.getCredentials().addSecretKey(PASSWORD_SECRET_KEY,
-            "pssword".getBytes());
 
     try {
 		String dsName = conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME);
@@ -307,15 +288,9 @@
 	      fail("No IOException should be thrown!");
 	    }
 
-	    conf.set(DBConfiguration.URL_PROPERTY, "localhost:11111");
-	    conf.set(DBConfiguration.USERNAME_PROPERTY, "user");
-	    conf.set(DBConfiguration.PASSWORD_PROPERTY, "pssword");
+	    setupDefaultConfiguration();
 	    conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE,"g");
 	    conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME,"a.b.c.d");
-	    // set the password in the secure credentials object
-	    Text PASSWORD_SECRET_KEY = new Text(DBConfiguration.PASSWORD_PROPERTY);
-	    conf.getCredentials().addSecretKey(PASSWORD_SECRET_KEY,
-	            "pssword".getBytes());
 
 	    try {
 			String dsName = conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME);
@@ -347,14 +322,9 @@
     } catch (IOException e) {
       fail("No IOException should be thrown!");
     }
-    conf.set(DBConfiguration.URL_PROPERTY, "localhost:11111");
-    conf.set(DBConfiguration.USERNAME_PROPERTY, "user");
-    conf.set(DBConfiguration.PASSWORD_PROPERTY, "pssword");
+    setupDefaultConfiguration();
     conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE,"p");
     conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME,"a.b.c.blah1");
-    // set the password in the secure credentials object
-    Text PASSWORD_SECRET_KEY = new Text(DBConfiguration.PASSWORD_PROPERTY);
-    conf.getCredentials().addSecretKey(PASSWORD_SECRET_KEY, "pssword".getBytes());
     try {
       String dsName = conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME);
       List<String> files = MainframeFTPClientUtils.listSequentialDatasets(dsName, conf);
@@ -365,4 +335,79 @@
       Assert.fail(ioeString);
     }
   }
+
+  @Test
+  public void testFtpCommandExecutes() throws IOException {
+    final String EXPECTED_RESPONSE = "200 OK";
+    final int EXPECTED_RESPONSE_CODE = 200;
+    String ftpcmds = "quote SITE RDW,quote SITE RDW READTAPEFORMAT=V";
+    when(mockFTPClient.login("user", "pssword")).thenReturn(true);
+    when(mockFTPClient.logout()).thenReturn(true);
+    when(mockFTPClient.isConnected()).thenReturn(false);
+    when(mockFTPClient.getReplyCode()).thenReturn(EXPECTED_RESPONSE_CODE);
+    when(mockFTPClient.getReplyString()).thenReturn(EXPECTED_RESPONSE);
+    setupDefaultConfiguration();
+    conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE,"g");
+    conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME,"a.b.c.d");
+    conf.set(MainframeConfiguration.MAINFRAME_FTP_TRANSFER_MODE,MainframeConfiguration.MAINFRAME_FTP_TRANSFER_MODE_BINARY);
+    conf.set(MainframeConfiguration.MAINFRAME_FTP_CUSTOM_COMMANDS, ftpcmds);
+    MainframeFTPClientUtils.setMockFTPClient(mockFTPClient);
+    FTPClient ftp = MainframeFTPClientUtils.getFTPConnection(conf);
+    verify(mockFTPClient).sendCommand("quote SITE RDW");
+    verify(mockFTPClient).sendCommand("quote SITE RDW READTAPEFORMAT=V");
+  }
+
+  @Test
+  public void testFtpCommandsOneCommand() {
+    String inputString = "quote SITE RDW READTAPEFORMAT=V";
+    String [] expected = new String [] {"quote SITE RDW READTAPEFORMAT=V"};
+    String [] cmds = MainframeFTPClientUtils.parseFtpCommands(inputString);
+    assertArrayEquals(cmds,expected);
+  }
+
+  @Test
+  public void testFtpCommandsOneCommandWithComma() {
+    String inputString = ",quote SITE RDW READTAPEFORMAT=V";
+    String [] expected = new String [] {"quote SITE RDW READTAPEFORMAT=V"};
+    String [] cmds = MainframeFTPClientUtils.parseFtpCommands(inputString);
+    assertArrayEquals(cmds,expected);
+  }
+
+  @Test
+  public void testFtpCommandsOneCommandWithCommas() {
+    String inputString = ",quote SITE RDW READTAPEFORMAT=V,";
+    String [] expected = new String [] {"quote SITE RDW READTAPEFORMAT=V"};
+    String [] cmds = MainframeFTPClientUtils.parseFtpCommands(inputString);
+    assertArrayEquals(cmds,expected);
+  }
+
+  @Test
+  public void testFtpCommandsTwoCommandWithComma() {
+    String inputString = "quote SITE RDW,quote SITE RDW READTAPEFORMAT=V";
+    String [] expected = new String [] {"quote SITE RDW","quote SITE RDW READTAPEFORMAT=V"};
+    String [] cmds = MainframeFTPClientUtils.parseFtpCommands(inputString);
+    assertArrayEquals(cmds,expected);
+  }
+
+  @Test
+  public void testFtpCommandsNullCommand() {
+    String inputString = null;
+    String [] cmds = MainframeFTPClientUtils.parseFtpCommands(inputString);
+    assertEquals(0, cmds.length);
+  }
+
+  @Test
+  public void testFtpCommandsEmptyCommands() {
+    String inputString = ",,,";
+    String [] cmds = MainframeFTPClientUtils.parseFtpCommands(inputString);
+    assertEquals(0, cmds.length);
+  }
+
+  private void setupDefaultConfiguration() {
+    conf.set(DBConfiguration.URL_PROPERTY, "localhost:11111");
+    conf.set(DBConfiguration.USERNAME_PROPERTY, DEFAULT_FTP_USERNAME);
+    Text PASSWORD_SECRET_KEY = new Text(DBConfiguration.PASSWORD_PROPERTY);
+    conf.getCredentials().addSecretKey(PASSWORD_SECRET_KEY,
+      DEFAULT_FTP_PASSWORD.getBytes());
+  }
 }