blob: acd5f6ba19181ee5b5767d531f061e141cd12f65 [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/
package org.apache.griffin.measure.step
import org.scalatest._
import org.apache.griffin.measure.Loggable
import org.apache.griffin.measure.configuration.enums.BatchProcessType
import org.apache.griffin.measure.context.ContextId
import org.apache.griffin.measure.context.DQContext
import org.apache.griffin.measure.SparkSuiteBase
import org.apache.griffin.measure.step.transform.TransformStep
class TransformStepTest extends FlatSpec with Matchers with SparkSuiteBase with Loggable {
case class DualTransformStep(name: String,
duration: Int,
rule: String = "",
details: Map[String, Any] = Map(),
cache: Boolean = false
) extends TransformStep {
def doExecute(context: DQContext): Boolean = {
val threadName = Thread.currentThread().getName
info(s"Step $name started with $threadName")
Thread.sleep(duration * 1000L)
info(s"Step $name finished with $threadName")
true
}
}
private def getDqContext(name: String = "test-context"): DQContext = {
DQContext(
ContextId(System.currentTimeMillis),
name,
Nil,
Nil,
BatchProcessType
)(spark)
}
/**
* Run transform steps in parallel. Here are the dependencies of transform steps
*
* step5
* | |---step2
* | | |---step1
* | |---step3
* | | |---step1
* | |---step4
*
* step1 : -->
* step2 : --->
* step3 : ---->
* step4 : ->
* step5 : -->
*
*/
"transform step " should "be run steps in parallel" in {
val step1 = DualTransformStep("step1", 3)
val step2 = DualTransformStep("step2", 4)
step2.parentSteps += step1
val step3 = DualTransformStep("step3", 5)
step3.parentSteps += step1
val step4 = DualTransformStep("step4", 2)
val step5 = DualTransformStep("step5", 3)
step5.parentSteps += step2
step5.parentSteps += step3
step5.parentSteps += step4
val context = getDqContext()
step5.execute(context) should be (true)
}
}