blob: eb053cf6fedeac5e861bbe1e1ecae51af1bc310f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.sqoop.hook;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
import org.apache.atlas.hive.model.HiveDataTypes;
import org.apache.atlas.sqoop.model.SqoopDataTypes;
import org.apache.atlas.utils.AuthenticationUtil;
import org.apache.commons.configuration.Configuration;
import org.apache.sqoop.SqoopJobDataPublisher;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import java.util.Properties;
public class SqoopHookIT {
public static final Logger LOG = org.slf4j.LoggerFactory.getLogger(SqoopHookIT.class);
private static final String CLUSTER_NAME = "primary";
public static final String DEFAULT_DB = "default";
private static final int MAX_WAIT_TIME = 80000;
private AtlasClient atlasClient;
@BeforeClass
public void setUp() throws Exception {
//Set-up sqoop session
Configuration configuration = ApplicationProperties.get();
if (!AuthenticationUtil.isKerberosAuthenticationEnabled()) {
atlasClient = new AtlasClient(configuration.getStringArray(HiveMetaStoreBridge.ATLAS_ENDPOINT), new String[]{"admin", "admin"});
} else {
atlasClient = new AtlasClient(configuration.getStringArray(HiveMetaStoreBridge.ATLAS_ENDPOINT));
}
}
@Test
public void testSqoopImport() throws Exception {
SqoopJobDataPublisher.Data d = new SqoopJobDataPublisher.Data("import", "jdbc:mysql:///localhost/db",
"mysqluser", "mysql", "myTable", null, "default", "hiveTable", new Properties(),
System.currentTimeMillis() - 100, System.currentTimeMillis());
SqoopHook hook = new SqoopHook();
hook.publish(d);
Thread.sleep(1000);
String storeName = SqoopHook.getSqoopDBStoreName(d);
assertDBStoreIsRegistered(storeName);
String name = SqoopHook.getSqoopProcessName(d, CLUSTER_NAME);
assertSqoopProcessIsRegistered(name);
assertHiveTableIsRegistered(DEFAULT_DB, "hiveTable");
}
@Test
public void testSqoopExport() throws Exception {
SqoopJobDataPublisher.Data d = new SqoopJobDataPublisher.Data("export", "jdbc:mysql:///localhost/db",
"mysqluser", "mysql", "myTable", null, "default", "hiveTable", new Properties(),
System.currentTimeMillis() - 100, System.currentTimeMillis());
SqoopHook hook = new SqoopHook();
hook.publish(d);
Thread.sleep(1000);
String storeName = SqoopHook.getSqoopDBStoreName(d);
assertDBStoreIsRegistered(storeName);
String name = SqoopHook.getSqoopProcessName(d, CLUSTER_NAME);
assertSqoopProcessIsRegistered(name);
assertHiveTableIsRegistered(DEFAULT_DB, "hiveTable");
}
private String assertDBStoreIsRegistered(String storeName) throws Exception {
LOG.debug("Searching for db store {}", storeName);
String query = String.format(
"%s as t where " + AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME + " = '%s'" + " select t",
SqoopDataTypes.SQOOP_DBDATASTORE.getName(), storeName);
return assertEntityIsRegistered(query);
}
private String assertHiveTableIsRegistered(String dbName, String tableName) throws Exception {
LOG.debug("Searching for table {}.{}", dbName, tableName);
String query = String.format(
"%s as t where " + AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME + " = '%s', db where " + AtlasClient.NAME + " = '%s' and clusterName = '%s'" + " select t",
HiveDataTypes.HIVE_TABLE.getName(), HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, dbName, tableName), dbName.toLowerCase(), CLUSTER_NAME);
return assertEntityIsRegistered(query);
}
private String assertSqoopProcessIsRegistered(String processName) throws Exception {
LOG.debug("Searching for sqoop process {}", processName);
String query = String.format(
"%s as t where %s = '%s' select t",
SqoopDataTypes.SQOOP_PROCESS.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, processName);
return assertEntityIsRegistered(query);
}
private String assertEntityIsRegistered(final String query) throws Exception {
waitFor(MAX_WAIT_TIME, new Predicate() {
@Override
public boolean evaluate() throws Exception {
JSONArray results = atlasClient.search(query, 10, 0);
return results.length() > 0;
}
});
JSONArray results = atlasClient.search(query, 10, 0);
JSONObject row = results.getJSONObject(0).getJSONObject("t");
return row.getString("id");
}
protected void waitFor(int timeout, Predicate predicate) throws Exception {
long mustEnd = System.currentTimeMillis() + timeout;
boolean eval;
while (!(eval = predicate.evaluate()) && System.currentTimeMillis() < mustEnd) {
LOG.info("Waiting up to {} msec", mustEnd - System.currentTimeMillis());
Thread.sleep(1000);
}
if (!eval) {
throw new Exception("Waiting timed out after " + timeout + " msec");
}
}
public interface Predicate {
/**
* Perform a predicate evaluation.
*
* @return the boolean result of the evaluation.
* @throws Exception thrown if the predicate evaluation could not evaluate.
*/
boolean evaluate() throws Exception;
}
}