blob: ec8b02f51a78a16e9a6aa830842d7ebe94a45cc6 [file] [log] [blame]
/** Copyright 2015 TappingStone, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.prediction.data.storage.examples
import io.prediction.data.storage.Event
import io.prediction.data.storage.StorageClientConfig
import io.prediction.data.storage.hbase.HBPEvents
import io.prediction.data.storage.hbase.StorageClient
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._
object HBPEventsTest {
def main(args: Array[String]) {
val fromAppId = args(0).toInt
val toAppId = args(1).toInt
val sparkConf = new SparkConf().setAppName("HBaseTest")
val sc = new SparkContext(sparkConf)
val eventClient = new HBPEvents(
new StorageClient(new StorageClientConfig(Seq(), Seq(), true)).client,
"predictionio_eventdata")
val e1: RDD[Event] = eventClient.find(fromAppId)(sc).cache()
eventClient.write(e1, toAppId)(sc)
println("Finished writing.")
// read back from toAppId
val e2: RDD[Event] = eventClient.find(toAppId)(sc)
val c1 = e1.count()
val c2 = e2.count()
// check data
val a = e1.map { event => (event.eventId.getOrElse(""), event) }
val b = e2.map { event => (event.eventId.getOrElse(""), event) }
val notMatched1 = a.leftOuterJoin(b).map { case (k, (v, wOpt)) =>
((Some(v) == wOpt), (Some(v), wOpt))
}.filter{ case (m, d) => (!m) }
.cache()
val notMatched2 = b.leftOuterJoin(a).map { case (k, (v, wOpt)) =>
((Some(v) == wOpt), (Some(v), wOpt))
}.filter{ case (m, d) => (!m) }
.cache()
val notMatchedCount1 = notMatched1.count()
if (notMatchedCount1 != 0) {
notMatched1.take(2).foreach(println(_))
}
val notMatchedCount2 = notMatched2.count()
if (notMatchedCount2 != 0) {
notMatched2.take(2).foreach(println(_))
}
println(s"count1: ${c1}, count2: ${c2}")
println(s"count match = ${c1 == c2}")
println(s"data match1 = ${notMatchedCount1 == 0}")
println(s"data match2 = ${notMatchedCount2 == 0}")
}
}