HBASE-24230 Support user-defined version timestamp when bulk load data (#66)

Signed-off-by: Balazs Meszaros <meszibalu@apache.org>
diff --git a/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala b/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala
index 890e67f..80c415c 100644
--- a/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala
+++ b/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala
@@ -599,6 +599,7 @@
    *                                       column family is written
    * @param compactionExclude              Compaction excluded for the HFiles
    * @param maxSize                        Max size for the HFiles before they roll
+   * @param nowTimeStamp                   Version timestamp
    * @tparam T                             The Type of values in the original RDD
    */
   def bulkLoad[T](rdd:RDD[T],
@@ -609,7 +610,8 @@
                   util.Map[Array[Byte], FamilyHFileWriteOptions] =
                   new util.HashMap[Array[Byte], FamilyHFileWriteOptions],
                   compactionExclude: Boolean = false,
-                  maxSize:Long = HConstants.DEFAULT_MAX_FILE_SIZE):
+                  maxSize:Long = HConstants.DEFAULT_MAX_FILE_SIZE,
+                  nowTimeStamp:Long = System.currentTimeMillis()):
   Unit = {
     val stagingPath = new Path(stagingDir)
     val fs = stagingPath.getFileSystem(config)
@@ -627,7 +629,6 @@
         Compression.Algorithm.NONE.getName)
       val hfileCompression = HFileWriterImpl
         .compressionByName(defaultCompressionStr)
-      val nowTimeStamp = System.currentTimeMillis()
       val tableRawName = tableName.getName
 
       val familyHFileWriteOptionsMapInternal =