blob: 3918c84337ab1138f1a551e8a94ecd176ac3e95c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.s2graph.s2jobs.task
import org.apache.s2graph.core.S2EdgeLike
import org.apache.s2graph.s2jobs.BaseSparkTest
import org.apache.spark.sql.DataFrame
import scala.collection.JavaConverters._
class SinkTest extends BaseSparkTest {
override def beforeAll(): Unit = {
super.beforeAll()
initTestEdgeSchema(s2, tableName, schemaVersion, compressionAlgorithm)
}
def toDataFrame(edges: Seq[String]): DataFrame = {
import spark.sqlContext.implicits._
val elements = edges.flatMap(s2.elementBuilder.toEdge(_))
elements.map { e =>
(e.getTs(),
e.getOperation(),
"e",
e.srcVertex.innerIdVal.toString,
e.tgtVertex.innerIdVal.toString,
e.label(),
"{}",
e.getDirection())
}.toDF("timestamp", "operation", "element", "from", "to", "label", "props", "direction")
}
test("S2graphSink writeBatchWithBulkload") {
val bulkEdgeString = "1416236400000\tinsert\tedge\ta\tb\tfriends\t{\"since\":1316236400000,\"score\":10}"
val df = toDataFrame(Seq(bulkEdgeString))
val conf = bulkloadOptions
val sink = new S2GraphSink("testQuery", conf)
sink.write(df)
val s2Edges = s2.edges().asScala.toSeq.map(_.asInstanceOf[S2EdgeLike])
s2Edges.foreach { edge => println(edge) }
val filteredEdges = s2Edges.filter { edge =>
edge.srcVertex.innerIdVal.toString == "a" &&
edge.tgtVertex.innerIdVal.toString == "b" &&
edge.label() == "friends"
}
assert(filteredEdges.size == 1)
}
test("S2graphSink writeBatchWithMutate") {
val bulkEdgeString = "1416236400000\tinsert\tedge\tb\tc\tfriends\t{\"since\":1316236400000,\"score\":20}"
val df = toDataFrame(Seq(bulkEdgeString))
val args = Map("writeMethod" -> "mutate") ++
options.toCommand.grouped(2).map { kv => kv.head -> kv.last }.toMap
val conf = TaskConf("test", "sql", Seq("input"), args)
val sink = new S2GraphSink("testQuery", conf)
sink.write(df)
val s2Edges = s2.edges().asScala.toSeq.map(_.asInstanceOf[S2EdgeLike])
s2Edges.foreach { edge => println(edge) }
val filteredEdges = s2Edges.filter { edge =>
edge.srcVertex.innerIdVal.toString == "b" &&
edge.tgtVertex.innerIdVal.toString == "c" &&
edge.getTs() == 1416236400000L &&
edge.label() == "friends"
}
assert(filteredEdges.size == 1)
}
}