blob: 23487ab87ef54d25b3ef166375c83ab3ed5d359c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oozie.coord;
import java.io.ByteArrayOutputStream;
import java.nio.charset.StandardCharsets;
import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.DagELFunctions;
import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.WorkflowJobBean;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.dependency.FSURIHandler;
import org.apache.oozie.dependency.HCatURIHandler;
import org.apache.oozie.service.ELService;
import org.apache.oozie.service.HCatAccessorService;
import org.apache.oozie.service.LiteWorkflowStoreService;
import org.apache.oozie.service.Services;
import org.apache.oozie.service.URIHandlerService;
import org.apache.oozie.test.XHCatTestCase;
import org.apache.oozie.util.DateUtils;
import org.apache.oozie.util.ELEvaluator;
import org.apache.oozie.util.XConfiguration;
import org.apache.oozie.workflow.lite.EndNodeDef;
import org.apache.oozie.workflow.lite.LiteWorkflowApp;
import org.apache.oozie.workflow.lite.LiteWorkflowInstance;
import org.apache.oozie.workflow.lite.StartNodeDef;
import org.junit.Test;
public class TestHCatELFunctions extends XHCatTestCase {
ELEvaluator eval = null;
SyncCoordAction appInst = null;
SyncCoordDataset ds = null;
private Services services;
@Override
protected void setUp() throws Exception {
super.setUp();
services = new Services();
services.getConf().set(URIHandlerService.URI_HANDLERS,
FSURIHandler.class.getName() + "," + HCatURIHandler.class.getName());
services.setService(HCatAccessorService.class);
services.init();
}
@Override
protected void tearDown() throws Exception {
services.destroy();
super.tearDown();
}
@Test
public void testHCatPartitionExists() throws Exception {
dropTable("db1", "table1", true);
dropDatabase("db1", true);
createDatabase("db1");
createTable("db1", "table1", "year,month,dt,country");
addPartition("db1", "table1", "year=2012;month=12;dt=02;country=us");
Configuration protoConf = new Configuration();
protoConf.set(OozieClient.USER_NAME, getTestUser());
protoConf.set("hadoop.job.ugi", getTestUser() + "," + "group");
Configuration conf = new XConfiguration();
conf.set(OozieClient.APP_PATH, "appPath");
conf.set(OozieClient.USER_NAME, getTestUser());
conf.set("test.dir", getTestCaseDir());
conf.set("partition1", getHCatURI("db1", "table1", "dt=02").toString());
conf.set("partition2", getHCatURI("db1", "table1", "dt=05").toString());
LiteWorkflowApp def = new LiteWorkflowApp("name", "<workflow-app/>", new StartNodeDef(
LiteWorkflowStoreService.LiteControlNodeHandler.class, "end")).addNode(new EndNodeDef("end",
LiteWorkflowStoreService.LiteControlNodeHandler.class));
LiteWorkflowInstance job = new LiteWorkflowInstance(def, conf, "wfId");
WorkflowJobBean wf = new WorkflowJobBean();
wf.setId(job.getId());
wf.setAppName("name");
wf.setAppPath("appPath");
wf.setUser(getTestUser());
wf.setGroup("group");
wf.setWorkflowInstance(job);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
protoConf.writeXml(baos);
wf.setProtoActionConf(baos.toString(StandardCharsets.UTF_8.name()));
WorkflowActionBean action = new WorkflowActionBean();
action.setId("actionId");
action.setName("actionName");
ELEvaluator eval = Services.get().get(ELService.class).createEvaluator("workflow");
DagELFunctions.configureEvaluator(eval, wf, action);
assertEquals(true, (boolean) eval.evaluate("${hcat:exists(wf:conf('partition1'))}", Boolean.class));
assertEquals(false, (boolean) eval.evaluate("${hcat:exists(wf:conf('partition2'))}", Boolean.class));
dropTable("db1", "table1", true);
dropDatabase("db1", true);
}
@Test
public void testHCatTableExists() throws Exception {
dropTable("db1", "table1", true);
dropDatabase("db1", true);
createDatabase("db1");
createTable("db1", "table1");
Configuration protoConf = new Configuration();
protoConf.set(OozieClient.USER_NAME, getTestUser());
protoConf.set("hadoop.job.ugi", getTestUser() + "," + "group");
Configuration conf = new XConfiguration();
conf.set(OozieClient.APP_PATH, "appPath");
conf.set(OozieClient.USER_NAME, getTestUser());
conf.set("test.dir", getTestCaseDir());
conf.set("table1", getHCatURI("db1", "table1").toString());
conf.set("table2", getHCatURI("db1", "table2").toString());
LiteWorkflowApp def = new LiteWorkflowApp("name", "<workflow-app/>", new StartNodeDef(
LiteWorkflowStoreService.LiteControlNodeHandler.class, "end")).addNode(new EndNodeDef("end",
LiteWorkflowStoreService.LiteControlNodeHandler.class));
LiteWorkflowInstance job = new LiteWorkflowInstance(def, conf, "wfId");
WorkflowJobBean wf = new WorkflowJobBean();
wf.setId(job.getId());
wf.setAppName("name");
wf.setAppPath("appPath");
wf.setUser(getTestUser());
wf.setGroup("group");
wf.setWorkflowInstance(job);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
protoConf.writeXml(baos);
wf.setProtoActionConf(baos.toString(StandardCharsets.UTF_8.name()));
WorkflowActionBean action = new WorkflowActionBean();
action.setId("actionId");
action.setName("actionName");
ELEvaluator eval = Services.get().get(ELService.class).createEvaluator("workflow");
DagELFunctions.configureEvaluator(eval, wf, action);
assertEquals(true, (boolean) eval.evaluate("${hcat:exists(wf:conf('table1'))}", Boolean.class));
assertEquals(false, (boolean) eval.evaluate("${hcat:exists(wf:conf('table2'))}", Boolean.class));
dropTable("db1", "table1", true);
dropDatabase("db1", true);
}
/**
* Test HCat databaseIn and databaseOut EL functions (phase 1) which echo
* back the EL function itself
*
* @throws Exception
*/
@Test
public void testDatabasePh1() throws Exception {
init("coord-job-submit-data");
/*
* databaseIn
*/
String expr = "${coord:databaseIn('ABC')}";
// +ve test
eval.setVariable("oozie.dataname.ABC", "data-in");
assertEquals("${coord:databaseIn('ABC')}", CoordELFunctions.evalAndWrap(eval, expr));
// -ve test
expr = "${coord:databaseIn('ABCD')}";
try {
assertEquals("${coord:databaseIn('ABCD')}", CoordELFunctions.evalAndWrap(eval, expr));
fail("should throw exception because Data-in ABCD is not defiend");
}
catch (Exception ex) {
}
/*
* databaseOut
*/
expr = "${coord:databaseOut('ABC')}";
// +ve test
eval.setVariable("oozie.dataname.ABC", "data-out");
assertEquals("${coord:databaseOut('ABC')}", CoordELFunctions.evalAndWrap(eval, expr));
// -ve test
expr = "${coord:databaseOut('ABCD')}";
try {
assertEquals("${coord:databaseOut('ABCD')}", CoordELFunctions.evalAndWrap(eval, expr));
fail("should throw exception because Data-out ABCD is not defiend");
}
catch (Exception ex) {
}
init("coord-sla-submit");
eval.setVariable("oozie.dataname.ABCD", "data-out");
assertEquals("${coord:databaseOut('ABCD')}", CoordELFunctions.evalAndWrap(eval, expr));
}
/**
* Test HCat tableIn and tableOut EL functions (phase 1) which echo back the
* EL function itself
*
* @throws Exception
*/
@Test
public void testTablePh1() throws Exception {
init("coord-job-submit-data");
/*
* tableIn
*/
String expr = "${coord:tableIn('ABC')}";
// +ve test
eval.setVariable("oozie.dataname.ABC", "data-in");
assertEquals("${coord:tableIn('ABC')}", CoordELFunctions.evalAndWrap(eval, expr));
// -ve test
expr = "${coord:tableIn('ABCD')}";
try {
assertEquals("${coord:tableIn('ABCD')}", CoordELFunctions.evalAndWrap(eval, expr));
fail("should throw exception because Data-in ABCD is not defiend");
}
catch (Exception ex) {
}
/*
* tableOut
*/
expr = "${coord:tableOut('ABC')}";
// +ve test
eval.setVariable("oozie.dataname.ABC", "data-out");
assertEquals("${coord:tableOut('ABC')}", CoordELFunctions.evalAndWrap(eval, expr));
// -ve test
expr = "${coord:tableOut('ABCD')}";
try {
assertEquals("${coord:tableOut('ABCD')}", CoordELFunctions.evalAndWrap(eval, expr));
fail("should throw exception because Data-out ABCD is not defiend");
}
catch (Exception ex) {
}
init("coord-sla-submit");
expr = "${coord:tableOut('ABC')}";
eval.setVariable("oozie.dataname.ABC", "data-out");
assertEquals("${coord:tableOut('ABC')}", CoordELFunctions.evalAndWrap(eval, expr));
}
/**
* Test HCat dataInPartitionPigFilter EL function (phase 1) which echo back
* the EL function itself
*
* @throws Exception
*/
@Test
public void testdataInPartitionFilterPh1() throws Exception {
init("coord-job-submit-data");
String expr = "${coord:dataInPartitionFilter('ABC', 'pig')}";
// +ve test
eval.setVariable("oozie.dataname.ABC", "data-in");
assertEquals("${coord:dataInPartitionFilter('ABC', 'pig')}", CoordELFunctions.evalAndWrap(eval, expr));
// -ve test
expr = "${coord:dataInPartitionFilter('ABCD')}";
eval.setVariable("oozie.dataname.ABCD", "data-in");
try {
assertEquals("${coord:dataInPartitionFilter('ABCD')}", CoordELFunctions.evalAndWrap(eval, expr));
fail("should throw exception because dataInPartitionFilter() requires 2 parameters");
}
catch (Exception ex) {
}
}
/**
* Test HCat dataInPartitionMin EL function (phase 1) which echo back the EL
* function itself
*
* @throws Exception
*/
@Test
public void testDataInPartitionMinPh1() throws Exception {
init("coord-job-submit-data");
String expr = "${coord:dataInPartitionMin('ABC', 'mypartition')}";
// +ve test
eval.setVariable("oozie.dataname.ABC", "data-in");
assertEquals("${coord:dataInPartitionMin('ABC', 'mypartition')}", CoordELFunctions.evalAndWrap(eval, expr));
// -ve test
expr = "${coord:dataInPartitionMin('ABCD')}";
eval.setVariable("oozie.dataname.ABCD", "data-in");
try {
assertEquals("${coord:dataInPartitionMin('ABCD')}", CoordELFunctions.evalAndWrap(eval, expr));
fail("should throw exception because EL function requires 2 parameters");
}
catch (Exception ex) {
}
}
/**
* Test HCat dataInPartitionMax EL function (phase 1) which echo back the EL
* function itself
*
* @throws Exception
*/
@Test
public void testDataInPartitionMaxPh1() throws Exception {
init("coord-job-submit-data");
String expr = "${coord:dataInPartitionMax('ABC', 'mypartition')}";
// +ve test
eval.setVariable("oozie.dataname.ABC", "data-in");
assertEquals("${coord:dataInPartitionMax('ABC', 'mypartition')}", CoordELFunctions.evalAndWrap(eval, expr));
// -ve test
expr = "${coord:dataInPartitionMax('ABCD')}";
eval.setVariable("oozie.dataname.ABCD", "data-in");
try {
assertEquals("${coord:dataInPartitionMax('ABCD')}", CoordELFunctions.evalAndWrap(eval, expr));
fail("should throw exception because EL function requires 2 parameters");
}
catch (Exception ex) {
}
}
/**
* Test HCat dataInPartition EL function (phase 1) which echo back the EL
* function itself
*
* @throws Exception
*/
@Test
public void testDataInPartitionsPh1() throws Exception {
init("coord-job-submit-data");
String expr = "${coord:dataInPartitions('ABC', 'hive-export')}";
// +ve test
eval.setVariable("oozie.dataname.ABC", "data-in");
assertEquals("${coord:dataInPartitions('ABC', 'hive-export')}", CoordELFunctions.evalAndWrap(eval, expr));
// -ve test
expr = "${coord:dataInPartitions('ABCD', 'hive-export')}";
try {
CoordELFunctions.evalAndWrap(eval, expr);
fail("should throw exception because Data-in is not defined");
}
catch (Exception ex) {
}
// -ve test
expr = "${coord:dataInPartitions('ABCD')}";
eval.setVariable("oozie.dataname.ABCD", "data-in");
try {
CoordELFunctions.evalAndWrap(eval, expr);
fail("should throw exception because EL function requires 2 parameters");
}
catch (Exception ex) {
}
}
/**
* Test HCat dataOutPartition EL function (phase 1) which echo back the EL
* function itself
*
* @throws Exception
*/
@Test
public void testDataOutPartitionsPh1() throws Exception {
init("coord-job-submit-data");
String expr = "${coord:dataOutPartitions('ABC')}";
// +ve test
eval.setVariable("oozie.dataname.ABC", "data-out");
assertEquals("${coord:dataOutPartitions('ABC')}", CoordELFunctions.evalAndWrap(eval, expr));
// -ve test
expr = "${coord:dataOutPartitions('ABCD')}";
try {
assertEquals("${coord:dataOutPartitions('ABCD')}", CoordELFunctions.evalAndWrap(eval, expr));
fail("should throw exception because Data-in is not defiend");
}
catch (Exception ex) {
}
init("coord-sla-submit");
eval.setVariable("oozie.dataname.ABCD", "data-out");
assertEquals("${coord:dataOutPartitions('ABCD')}", CoordELFunctions.evalAndWrap(eval, expr));
}
/**
* Test HCat dataOutPartitionValue EL function (phase 1) which echo back the
* EL function itself
*
* @throws Exception
*/
@Test
public void testDataOutPartitionValuePh1() throws Exception {
init("coord-job-submit-data");
String expr = "${coord:dataOutPartitionValue('ABC', 'mypartition')}";
// +ve test
eval.setVariable("oozie.dataname.ABC", "data-out");
assertEquals("${coord:dataOutPartitionValue('ABC', 'mypartition')}", CoordELFunctions.evalAndWrap(eval, expr));
// -ve test
expr = "${coord:dataOutPartitionValue('ABCD')}";
eval.setVariable("oozie.dataname.ABCD", "data-out");
try {
assertEquals("${coord:dataOutPartitionValue('ABCD')}", CoordELFunctions.evalAndWrap(eval, expr));
fail("should throw exception because EL function requires 2 parameters");
}
catch (Exception ex) {
}
}
/**
* Test databaseIn and databaseOut EL functions (phase 3) which returns the
* DB name from URI
*
* @throws Exception
*/
@Test
public void testDatabase() throws Exception {
init("coord-action-start", "hcat://hcat.server.com:5080/mydb/clicks/datastamp=12;region=us");
eval.setVariable(".datain.ABC", "hcat://hcat.server.com:5080/mydb/clicks/datastamp=12;region=us");
eval.setVariable(".datain.ABC.unresolved", Boolean.FALSE);
String expr = "${coord:databaseIn('ABC')}";
assertEquals("mydb", CoordELFunctions.evalAndWrap(eval, expr));
eval.setVariable(".dataout.ABC", "hcat://hcat.server.com:5080/mydb/clicks/datastamp=12;region=us");
eval.setVariable(".dataout.ABC.unresolved", Boolean.FALSE);
expr = "${coord:databaseOut('ABC')}";
assertEquals("mydb", CoordELFunctions.evalAndWrap(eval, expr));
init("coord-action-start", "hcat://hcat.server.com:5080/mydb/clicks");
eval.setVariable(".datain.ABC", "hcat://hcat.server.com:5080/mydb/clicks");
eval.setVariable(".datain.ABC.unresolved", Boolean.FALSE);
expr = "${coord:databaseIn('ABC')}";
assertEquals("mydb", CoordELFunctions.evalAndWrap(eval, expr));
eval.setVariable(".dataout.ABC", "hcat://hcat.server.com:5080/mydb/clicks");
eval.setVariable(".dataout.ABC.unresolved", Boolean.FALSE);
expr = "${coord:databaseOut('ABC')}";
assertEquals("mydb", CoordELFunctions.evalAndWrap(eval, expr));
}
/**
* Test HCat tableIn and tableOut EL functions (phase 3) which returns the HCat table from
* URI
*
* @throws Exception
*/
@Test
public void testTable() throws Exception {
init("coord-action-start", "hcat://hcat.server.com:5080/mydb/clicks/datastamp=12;region=us");
eval.setVariable(".datain.ABC", "hcat://hcat.server.com:5080/mydb/clicks/datastamp=12;region=us");
eval.setVariable(".datain.ABC.unresolved", Boolean.FALSE);
String expr = "${coord:tableIn('ABC')}";
assertEquals("clicks", CoordELFunctions.evalAndWrap(eval, expr));
eval.setVariable(".dataout.ABC", "hcat://hcat.server.com:5080/mydb/clicks/datastamp=12;region=us");
eval.setVariable(".dataout.ABC.unresolved", Boolean.FALSE);
expr = "${coord:tableOut('ABC')}";
assertEquals("clicks", CoordELFunctions.evalAndWrap(eval, expr));
init("coord-action-start", "hcat://hcat.server.com:5080/mydb/clicks");
eval.setVariable(".datain.ABC", "hcat://hcat.server.com:5080/mydb/clicks");
eval.setVariable(".datain.ABC.unresolved", Boolean.FALSE);
expr = "${coord:tableIn('ABC')}";
assertEquals("clicks", CoordELFunctions.evalAndWrap(eval, expr));
eval.setVariable(".dataout.ABC", "hcat://hcat.server.com:5080/mydb/clicks");
eval.setVariable(".dataout.ABC.unresolved", Boolean.FALSE);
expr = "${coord:tableOut('ABC')}";
assertEquals("clicks", CoordELFunctions.evalAndWrap(eval, expr));
init("coord-sla-create", "hcat://hcat.server.com:5080/mydb/clicks/datastamp=12;region=us");
eval.setVariable(".dataout.ABC", "hcat://hcat.server.com:5080/mydb/clicks/datastamp=12;region=us");
eval.setVariable(".dataout.ABC.unresolved", Boolean.FALSE);
expr = "${coord:tableOut('ABC')}";
assertEquals("clicks", CoordELFunctions.evalAndWrap(eval, expr));
}
/**
* Test dataInPartitionPigFilter EL function (phase 3) which returns the
* partition filter to be used as input to load data from
*
* @throws Exception
*/
@Test
public void testdataInPartitionFilter() throws Exception {
init("coord-action-start");
eval.setVariable(".datain.ABC", "hcat://hcat.server.com:5080/mydb/clicks/datastamp=12;region=us");
eval.setVariable(".datain.ABC.unresolved", Boolean.FALSE);
/*
* type=pig
*/
String expr = "${coord:dataInPartitionFilter('ABC', 'pig')}";
String res = CoordELFunctions.evalAndWrap(eval, expr);
assertTrue(res.equals("(datastamp=='12' AND region=='us')") || res.equals("(region=='us' AND datastamp=='12')"));
eval.setVariable(".datain.ABC", "hcat://hcat.server.com:5080/mydb/clicks/datastamp=12;region=us,"
+ "hcat://hcat.server.com:5080/mydb/clicks/datastamp=13;region=us");
eval.setVariable(".datain.ABC.unresolved", Boolean.FALSE);
expr = "${coord:dataInPartitionFilter('ABC', 'pig')}";
res = CoordELFunctions.evalAndWrap(eval, expr);
assertTrue(res.equals("(datastamp=='12' AND region=='us') OR (datastamp=='13' AND region=='us')")
|| res.equals("(datastamp=='12' AND region=='us') OR (region=='us' AND datastamp=='13')")
|| res.equals("(region=='us' AND datastamp=='12') OR (datastamp=='13' AND region=='us')")
|| res.equals("(region=='us' AND datastamp=='12') OR (region=='us' AND datastamp=='13')"));
/*
* type=java
*/
eval.setVariable(".datain.ABC", "hcat://hcat.server.com:5080/mydb/clicks/datastamp=12;region=us");
eval.setVariable(".datain.ABC.unresolved", Boolean.FALSE);
expr = "${coord:dataInPartitionFilter('ABC', 'java')}";
res = CoordELFunctions.evalAndWrap(eval, expr);
assertTrue(res.equals("(datastamp='12' AND region='us')") || res.equals("(region='us' AND datastamp='12')"));
}
/**
* Test dataOutPartitionsPig EL function (phase 3) which returns the partition
* to be used as output to store data into
*
* @throws Exception
*/
@Test
public void testDataOutPartitions() throws Exception {
init("coord-action-start");
String expr = "${coord:dataOutPartitions('ABC')}";
eval.setVariable(".dataout.ABC", "hcat://hcat.server.com:5080/mydb/clicks/datastamp=20120230;region=us");
eval.setVariable(".dataout.ABC.unresolved", Boolean.FALSE);
String res = CoordELFunctions.evalAndWrap(eval, expr);
assertTrue(res.equals("'datastamp=20120230,region=us'"));
init("coord-sla-create");
eval.setVariable(".dataout.ABC", "hcat://hcat.server.com:5080/mydb/clicks/datastamp=20130230;region=euro");
eval.setVariable(".dataout.ABC.unresolved", Boolean.FALSE);
res = CoordELFunctions.evalAndWrap(eval, expr);
assertTrue(res.equals("'datastamp=20130230,region=us'") || res.equals("'datastamp=20130230,region=euro'"));
}
/**
* Test dataOutPartitionValue EL function (phase 3) which returns the value
* of a particular partition to be used as output to store data into
*
* @throws Exception
*/
@Test
public void testDataOutPartitionValue() throws Exception {
init("coord-action-start");
String expr = "${coord:dataOutPartitionValue('ABC','datastamp')}";
eval.setVariable(".dataout.ABC", "hcat://hcat.server.com:5080/mydb/clicks/datastamp=20120230;region=US");
eval.setVariable(".dataout.ABC.unresolved", Boolean.FALSE);
String res = CoordELFunctions.evalAndWrap(eval, expr);
assertTrue(res.equals("20120230"));
expr = "${coord:dataOutPartitionValue('ABC','region')}";
res = CoordELFunctions.evalAndWrap(eval, expr);
assertTrue(res.equals("US"));
init("coord-sla-create");
eval.setVariable(".dataout.ABC", "hcat://hcat.server.com:5080/mydb/clicks/datastamp=20120230;region=US");
eval.setVariable(".dataout.ABC.unresolved", Boolean.FALSE);
expr = "${coord:dataOutPartitionValue('ABC','datastamp')}";
res = CoordELFunctions.evalAndWrap(eval, expr);
assertTrue(res.equals("20120230"));
expr = "${coord:dataOutPartitionValue('ABC','region')}";
res = CoordELFunctions.evalAndWrap(eval, expr);
assertTrue(res.equals("US"));
}
/**
* Test dataInPartitionMin EL function (phase 3) which returns the minimum
* value of a partition to be used as minimum in a range of input data to
* load
*
* @throws Exception
*/
@Test
public void testDataInPartitionMin() throws Exception {
init("coord-action-start");
eval.setVariable(".datain.ABC", "hcat://hcat.server.com:5080/mydb/clicks/datastamp=12;region=us,"
+ "hcat://hcat.server.com:5080/mydb/clicks/datastamp=13;region=us,"
+ "hcat://hcat.server.com:5080/mydb/clicks/datastamp=10;region=us");
eval.setVariable(".datain.ABC.unresolved", Boolean.FALSE);
String expr = "${coord:dataInPartitionMin('ABC','datastamp')}";
String res = CoordELFunctions.evalAndWrap(eval, expr);
assertTrue(res.equals("10"));
}
/**
* Test dataInPartitionMax EL function (phase 3) which returns the maximum
* value of a partition to be used as maximum in a range of input data to
* load
*
* @throws Exception
*/
@Test
public void testDataInPartitionMax() throws Exception {
init("coord-action-start");
eval.setVariable(".datain.ABC", "hcat://hcat.server.com:5080/mydb/clicks/datastamp=20;region=us,"
+ "hcat://hcat.server.com:5080/mydb/clicks/datastamp=12;region=us,"
+ "hcat://hcat.server.com:5080/mydb/clicks/datastamp=13;region=us,"
+ "hcat://hcat.server.com:5080/mydb/clicks/datastamp=10;region=us");
eval.setVariable(".datain.ABC.unresolved", Boolean.FALSE);
String expr = "${coord:dataInPartitionMax('ABC','datastamp')}";
String res = CoordELFunctions.evalAndWrap(eval, expr);
assertTrue(res.equals("20"));
}
/**
* Test dataInPartitions EL function (phase 3) which returns the complete partition value string of a single partition
* in case of hive-export type.
*
* @throws Exception
*/
@Test
public void testDataInPartitions() throws Exception {
init("coord-action-start");
String expr = "${coord:dataInPartitions('ABC', 'hive-export')}";
eval.setVariable(".datain.ABC", "hcat://hcat.server.com:5080/mydb/clicks/datastamp=20120230;region=us");
eval.setVariable(".datain.ABC.unresolved", Boolean.FALSE);
String res = CoordELFunctions.evalAndWrap(eval, expr);
assertTrue(res.equals("datastamp='20120230',region='us'") || res.equals("region='us',datastamp='20120230'"));
// -ve test; execute EL function with any other type than hive-export
try {
expr = "${coord:dataInPartitions('ABC', 'invalid-type')}";
eval.setVariable(".datain.ABC", "hcat://hcat.server.com:5080/mydb/clicks/datastamp=20120230;region=us");
eval.setVariable(".datain.ABC.unresolved", Boolean.FALSE);
res = CoordELFunctions.evalAndWrap(eval, expr);
fail("EL function should throw exception because of invalid type");
} catch (Exception e) {
}
}
private void init(String tag) throws Exception {
init(tag, "hdfs://localhost:9000/user/" + getTestUser() + "/US/${YEAR}/${MONTH}/${DAY}");
}
private void init(String tag, String uriTemplate) throws Exception {
eval = Services.get().get(ELService.class).createEvaluator(tag);
eval.setVariable(OozieClient.USER_NAME, getTestUser());
eval.setVariable(OozieClient.GROUP_NAME, getTestGroup());
appInst = new SyncCoordAction();
ds = new SyncCoordDataset();
ds.setFrequency(1);
ds.setInitInstance(DateUtils.parseDateOozieTZ("2009-09-01T23:59Z"));
ds.setTimeUnit(TimeUnit.DAY);
ds.setTimeZone(DateUtils.getTimeZone("America/Los_Angeles"));
ds.setName("test");
ds.setUriTemplate(uriTemplate);
ds.setType("SYNC");
ds.setDoneFlag("");
appInst.setActualTime(DateUtils.parseDateOozieTZ("2009-09-10T23:59Z"));
appInst.setNominalTime(DateUtils.parseDateOozieTZ("2009-09-09T23:59Z"));
appInst.setTimeZone(DateUtils.getTimeZone("America/Los_Angeles"));
appInst.setActionId("00000-oozie-C@1");
appInst.setName("mycoordinator-app");
CoordELFunctions.configureEvaluator(eval, ds, appInst);
}
}