blob: 7b881a39d0483a58dde86473164ac255452503fd [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.hive;
import com.google.common.annotations.VisibleForTesting;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasClientV2;
import org.apache.atlas.hive.bridge.ColumnLineageUtils;
import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
import org.apache.atlas.hive.hook.HiveHookIT;
import org.apache.atlas.hive.model.HiveDataTypes;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.AtlasStruct;
import org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage;
import org.apache.atlas.utils.AuthenticationUtil;
import org.apache.atlas.utils.ParamChecker;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.hooks.Entity;
import org.apache.hadoop.hive.ql.hooks.HookContext;
import org.apache.hadoop.hive.ql.hooks.LineageInfo;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.plan.HiveOperation;
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.security.UserGroupInformation;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import java.io.File;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.SortedSet;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.fail;
public class HiveITBase {
private static final Logger LOG = LoggerFactory.getLogger(HiveITBase.class);
public static final String DEFAULT_DB = "default";
public static final String SEP = ":".intern();
public static final String IO_SEP = "->".intern();
protected static final String DGI_URL = "http://localhost:21000/";
protected static final String CLUSTER_NAME = "primary";
protected static final String PART_FILE = "2015-01-01";
protected static final String INPUTS = "inputs";;
protected static final String OUTPUTS = "outputs";
protected Driver driver;
protected AtlasClient atlasClient;
protected AtlasClientV2 atlasClientV2;
protected HiveMetaStoreBridge hiveMetaStoreBridge;
protected SessionState ss;
protected HiveConf conf;
protected Driver driverWithoutContext;
private static final String REFERENCEABLE_ATTRIBUTE_NAME = "qualifiedName";
private static final String ATTR_NAME = "name";
@BeforeClass
public void setUp() throws Exception {
//Set-up hive session
conf = new HiveConf();
conf.setClassLoader(Thread.currentThread().getContextClassLoader());
driver = new Driver(conf);
ss = new SessionState(conf);
ss = SessionState.start(ss);
SessionState.setCurrentSessionState(ss);
Configuration configuration = ApplicationProperties.get();
String[] atlasEndPoint = configuration.getStringArray(HiveMetaStoreBridge.ATLAS_ENDPOINT);
if (atlasEndPoint == null || atlasEndPoint.length == 0) {
atlasEndPoint = new String[] { DGI_URL };
}
if (!AuthenticationUtil.isKerberosAuthenticationEnabled()) {
atlasClientV2 = new AtlasClientV2(atlasEndPoint, new String[]{"admin", "admin"});
atlasClient = new AtlasClient(atlasEndPoint, new String[]{"admin", "admin"});
} else {
atlasClientV2 = new AtlasClientV2(atlasEndPoint);
atlasClient = new AtlasClient(atlasEndPoint);
}
hiveMetaStoreBridge = new HiveMetaStoreBridge(configuration, conf, atlasClientV2);
HiveConf conf = new HiveConf();
conf.set("hive.exec.post.hooks", "");
SessionState ss = new SessionState(conf);
ss = SessionState.start(ss);
SessionState.setCurrentSessionState(ss);
driverWithoutContext = new Driver(conf);
}
protected void runCommand(String cmd) throws Exception {
runCommandWithDelay(cmd, 0);
}
protected void runCommand(Driver driver, String cmd) throws Exception {
runCommandWithDelay(driver, cmd, 0);
}
protected void runCommandWithDelay(String cmd, int sleepMs) throws Exception {
runCommandWithDelay(driver, cmd, sleepMs);
}
protected void runCommandWithDelay(Driver driver, String cmd, int sleepMs) throws Exception {
LOG.debug("Running command '{}'", cmd);
CommandProcessorResponse response = driver.run(cmd);
assertEquals(response.getResponseCode(), 0);
if (sleepMs != 0) {
Thread.sleep(sleepMs);
}
}
protected String createTestDFSPath(String path) throws Exception {
return "pfile://" + mkdir(path);
}
protected String file(String tag) throws Exception {
String filename = System.getProperty("user.dir") + "/target/" + tag + "-data-" + random();
File file = new File(filename);
file.createNewFile();
return file.getAbsolutePath();
}
protected String mkdir(String tag) throws Exception {
String filename = "./target/" + tag + "-data-" + random();
File file = new File(filename);
file.mkdirs();
return file.getAbsolutePath();
}
public static String lower(String str) {
if (StringUtils.isEmpty(str)) {
return null;
}
return str.toLowerCase().trim();
}
protected String random() {
return RandomStringUtils.randomAlphanumeric(10);
}
protected String tableName() {
return "table" + random();
}
protected String assertTableIsRegistered(String dbName, String tableName) throws Exception {
return assertTableIsRegistered(dbName, tableName, null, false);
}
protected String assertTableIsRegistered(String dbName, String tableName, HiveHookIT.AssertPredicate assertPredicate, boolean isTemporary) throws Exception {
LOG.debug("Searching for table {}.{}", dbName, tableName);
String tableQualifiedName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, dbName, tableName, isTemporary);
return assertEntityIsRegistered(HiveDataTypes.HIVE_TABLE.getName(), REFERENCEABLE_ATTRIBUTE_NAME, tableQualifiedName,
assertPredicate);
}
protected String assertEntityIsRegistered(final String typeName, final String property, final String value,
final HiveHookIT.AssertPredicate assertPredicate) throws Exception {
waitFor(80000, new HiveHookIT.Predicate() {
@Override
public void evaluate() throws Exception {
AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo = atlasClientV2.getEntityByAttribute(typeName, Collections.singletonMap(property,value));
AtlasEntity entity = atlasEntityWithExtInfo.getEntity();
assertNotNull(entity);
if (assertPredicate != null) {
assertPredicate.assertOnEntity(entity);
}
}
});
AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo = atlasClientV2.getEntityByAttribute(typeName, Collections.singletonMap(property,value));
AtlasEntity entity = atlasEntityWithExtInfo.getEntity();
return (String) entity.getGuid();
}
protected AtlasEntity assertEntityIsRegistedViaEntity(final String typeName, final String property, final String value,
final HiveHookIT.AssertPredicate assertPredicate) throws Exception {
waitFor(80000, new HiveHookIT.Predicate() {
@Override
public void evaluate() throws Exception {
AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo = atlasClientV2.getEntityByAttribute(typeName, Collections.singletonMap(property,value));
AtlasEntity entity = atlasEntityWithExtInfo.getEntity();
assertNotNull(entity);
if (assertPredicate != null) {
assertPredicate.assertOnEntity(entity);
}
}
});
AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo = atlasClientV2.getEntityByAttribute(typeName, Collections.singletonMap(property,value));
AtlasEntity entity = atlasEntityWithExtInfo.getEntity();
return entity;
}
public interface AssertPredicate {
void assertOnEntity(AtlasEntity entity) throws Exception;
}
public interface Predicate {
/**
* Perform a predicate evaluation.
*
* @return the boolean result of the evaluation.
* @throws Exception thrown if the predicate evaluation could not evaluate.
*/
void evaluate() throws Exception;
}
/**
* Wait for a condition, expressed via a {@link Predicate} to become true.
*
* @param timeout maximum time in milliseconds to wait for the predicate to become true.
* @param predicate predicate waiting on.
*/
protected void waitFor(int timeout, Predicate predicate) throws Exception {
ParamChecker.notNull(predicate, "predicate");
long mustEnd = System.currentTimeMillis() + timeout;
while (true) {
try {
predicate.evaluate();
return;
} catch(Error | Exception e) {
if (System.currentTimeMillis() >= mustEnd) {
fail("Assertions failed. Failing after waiting for timeout " + timeout + " msecs", e);
}
LOG.debug("Waiting up to {} msec as assertion failed", mustEnd - System.currentTimeMillis(), e);
Thread.sleep(5000);
}
}
}
protected String getTableProcessQualifiedName(String dbName, String tableName) throws Exception {
return HiveMetaStoreBridge.getTableProcessQualifiedName(CLUSTER_NAME,
hiveMetaStoreBridge.getHiveClient().getTable(dbName, tableName));
}
protected void validateHDFSPaths(AtlasEntity processEntity, String attributeName, String... testPaths) throws Exception {
List<AtlasObjectId> hdfsPathIds = toAtlasObjectIdList(processEntity.getAttribute(attributeName));
for (String testPath : testPaths) {
Path path = new Path(testPath);
String testPathNormed = lower(path.toString());
String hdfsPathId = assertHDFSPathIsRegistered(testPathNormed);
Assert.assertEquals(hdfsPathIds.get(0).getGuid(), hdfsPathId);
}
}
protected String assertHDFSPathIsRegistered(String path) throws Exception {
LOG.debug("Searching for hdfs path {}", path);
// ATLAS-2444 HDFS name node federation adds the cluster name to the qualifiedName
if (path.startsWith("hdfs://")) {
String pathWithCluster = path + "@" + CLUSTER_NAME;
return assertEntityIsRegistered(HiveMetaStoreBridge.HDFS_PATH, REFERENCEABLE_ATTRIBUTE_NAME, pathWithCluster, null);
} else {
return assertEntityIsRegistered(HiveMetaStoreBridge.HDFS_PATH, REFERENCEABLE_ATTRIBUTE_NAME, path, null);
}
}
protected String assertDatabaseIsRegistered(String dbName) throws Exception {
return assertDatabaseIsRegistered(dbName, null);
}
protected String assertDatabaseIsRegistered(String dbName, AssertPredicate assertPredicate) throws Exception {
LOG.debug("Searching for database {}", dbName);
String dbQualifiedName = HiveMetaStoreBridge.getDBQualifiedName(CLUSTER_NAME, dbName);
return assertEntityIsRegistered(HiveDataTypes.HIVE_DB.getName(), REFERENCEABLE_ATTRIBUTE_NAME,
dbQualifiedName, assertPredicate);
}
protected AtlasEntity getAtlasEntityByType(String type, String id) throws Exception {
AtlasEntity atlasEntity = null;
AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfoForProcess = atlasClientV2.getEntityByAttribute(type,
Collections.singletonMap(AtlasClient.GUID, id));
atlasEntity = atlasEntityWithExtInfoForProcess.getEntity();
return atlasEntity;
}
public static class HiveEventContext {
private Set<ReadEntity> inputs;
private Set<WriteEntity> outputs;
private String user;
private UserGroupInformation ugi;
private HiveOperation operation;
private HookContext.HookType hookType;
private JSONObject jsonPlan;
private String queryId;
private String queryStr;
private Long queryStartTime;
public Map<String, List<ColumnLineageUtils.HiveColumnLineageInfo>> lineageInfo;
private List<HookNotificationMessage> messages = new ArrayList<>();
public void setInputs(Set<ReadEntity> inputs) {
this.inputs = inputs;
}
public void setOutputs(Set<WriteEntity> outputs) {
this.outputs = outputs;
}
public void setUser(String user) {
this.user = user;
}
public void setUgi(UserGroupInformation ugi) {
this.ugi = ugi;
}
public void setOperation(HiveOperation operation) {
this.operation = operation;
}
public void setHookType(HookContext.HookType hookType) {
this.hookType = hookType;
}
public void setQueryId(String queryId) {
this.queryId = queryId;
}
public void setQueryStr(String queryStr) {
this.queryStr = queryStr;
}
public void setQueryStartTime(Long queryStartTime) {
this.queryStartTime = queryStartTime;
}
public void setLineageInfo(LineageInfo lineageInfo){
try {
this.lineageInfo = ColumnLineageUtils.buildLineageMap(lineageInfo);
LOG.debug("Column Lineage Map => {} ", this.lineageInfo.entrySet());
}catch (Throwable e){
LOG.warn("Column Lineage Map build failed with exception {}", e);
}
}
public Set<ReadEntity> getInputs() {
return inputs;
}
public Set<WriteEntity> getOutputs() {
return outputs;
}
public String getUser() {
return user;
}
public UserGroupInformation getUgi() {
return ugi;
}
public HiveOperation getOperation() {
return operation;
}
public HookContext.HookType getHookType() {
return hookType;
}
public String getQueryId() {
return queryId;
}
public String getQueryStr() {
return queryStr;
}
public Long getQueryStartTime() {
return queryStartTime;
}
public void addMessage(HookNotificationMessage message) {
messages.add(message);
}
public List<HookNotificationMessage> getMessages() {
return messages;
}
}
@VisibleForTesting
protected static String getProcessQualifiedName(HiveMetaStoreBridge dgiBridge, HiveEventContext eventContext,
final SortedSet<ReadEntity> sortedHiveInputs,
final SortedSet<WriteEntity> sortedHiveOutputs,
SortedMap<ReadEntity, AtlasEntity> hiveInputsMap,
SortedMap<WriteEntity, AtlasEntity> hiveOutputsMap) throws HiveException {
HiveOperation op = eventContext.getOperation();
if (isCreateOp(eventContext)) {
Entity entity = getEntityByType(sortedHiveOutputs, Entity.Type.TABLE);
if (entity != null) {
Table outTable = entity.getTable();
//refresh table
outTable = dgiBridge.getHiveClient().getTable(outTable.getDbName(), outTable.getTableName());
return HiveMetaStoreBridge.getTableProcessQualifiedName(dgiBridge.getClusterName(), outTable);
}
}
StringBuilder buffer = new StringBuilder(op.getOperationName());
boolean ignoreHDFSPathsinQFName = ignoreHDFSPathsinQFName(op, sortedHiveInputs, sortedHiveOutputs);
if ( ignoreHDFSPathsinQFName && LOG.isDebugEnabled()) {
LOG.debug("Ignoring HDFS paths in qualifiedName for {} {} ", op, eventContext.getQueryStr());
}
addInputs(dgiBridge, op, sortedHiveInputs, buffer, hiveInputsMap, ignoreHDFSPathsinQFName);
buffer.append(IO_SEP);
addOutputs(dgiBridge, op, sortedHiveOutputs, buffer, hiveOutputsMap, ignoreHDFSPathsinQFName);
LOG.info("Setting process qualified name to {}", buffer);
return buffer.toString();
}
protected static Entity getEntityByType(Set<? extends Entity> entities, Entity.Type entityType) {
for (Entity entity : entities) {
if (entity.getType() == entityType) {
return entity;
}
}
return null;
}
protected static boolean ignoreHDFSPathsinQFName(final HiveOperation op, final Set<ReadEntity> inputs, final Set<WriteEntity> outputs) {
switch (op) {
case LOAD:
case IMPORT:
return isPartitionBasedQuery(outputs);
case EXPORT:
return isPartitionBasedQuery(inputs);
case QUERY:
return true;
}
return false;
}
protected static boolean isPartitionBasedQuery(Set<? extends Entity> entities) {
for (Entity entity : entities) {
if (Entity.Type.PARTITION.equals(entity.getType())) {
return true;
}
}
return false;
}
protected static boolean isCreateOp(HiveEventContext hiveEvent) {
return HiveOperation.CREATETABLE.equals(hiveEvent.getOperation())
|| HiveOperation.CREATEVIEW.equals(hiveEvent.getOperation())
|| HiveOperation.ALTERVIEW_AS.equals(hiveEvent.getOperation())
|| HiveOperation.ALTERTABLE_LOCATION.equals(hiveEvent.getOperation())
|| HiveOperation.CREATETABLE_AS_SELECT.equals(hiveEvent.getOperation());
}
protected static void addInputs(HiveMetaStoreBridge hiveBridge, HiveOperation op, SortedSet<ReadEntity> sortedInputs, StringBuilder buffer, final Map<ReadEntity, AtlasEntity> refs, final boolean ignoreHDFSPathsInQFName) throws HiveException {
if (refs != null) {
if (sortedInputs != null) {
Set<String> dataSetsProcessed = new LinkedHashSet<>();
for (Entity input : sortedInputs) {
if (!dataSetsProcessed.contains(input.getName().toLowerCase())) {
//HiveOperation.QUERY type encompasses INSERT, INSERT_OVERWRITE, UPDATE, DELETE, PATH_WRITE operations
if (ignoreHDFSPathsInQFName &&
(Entity.Type.DFS_DIR.equals(input.getType()) || Entity.Type.LOCAL_DIR.equals(input.getType()))) {
LOG.debug("Skipping dfs dir input addition to process qualified name {} ", input.getName());
} else if (refs.containsKey(input)) {
if ( input.getType() == Entity.Type.PARTITION || input.getType() == Entity.Type.TABLE) {
Table inputTable = refreshTable(hiveBridge, input.getTable().getDbName(), input.getTable().getTableName());
if (inputTable != null) {
addDataset(buffer, refs.get(input), HiveMetaStoreBridge.getTableCreatedTime(inputTable));
}
} else {
addDataset(buffer, refs.get(input));
}
}
dataSetsProcessed.add(input.getName().toLowerCase());
}
}
}
}
}
protected static void addDataset(StringBuilder buffer, AtlasEntity ref, final long createTime) {
addDataset(buffer, ref);
buffer.append(SEP);
buffer.append(createTime);
}
protected static void addDataset(StringBuilder buffer, AtlasEntity ref) {
buffer.append(SEP);
String dataSetQlfdName = (String) ref.getAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME);
// '/' breaks query parsing on ATLAS
buffer.append(dataSetQlfdName.toLowerCase().replaceAll("/", ""));
}
protected static void addOutputs(HiveMetaStoreBridge hiveBridge, HiveOperation op, SortedSet<WriteEntity> sortedOutputs, StringBuilder buffer, final Map<WriteEntity, AtlasEntity> refs, final boolean ignoreHDFSPathsInQFName) throws HiveException {
if (refs != null) {
Set<String> dataSetsProcessed = new LinkedHashSet<>();
if (sortedOutputs != null) {
for (WriteEntity output : sortedOutputs) {
final Entity entity = output;
if (!dataSetsProcessed.contains(output.getName().toLowerCase())) {
if (ignoreHDFSPathsInQFName &&
(Entity.Type.DFS_DIR.equals(output.getType()) || Entity.Type.LOCAL_DIR.equals(output.getType()))) {
LOG.debug("Skipping dfs dir output addition to process qualified name {} ", output.getName());
} else if (refs.containsKey(output)) {
//HiveOperation.QUERY type encompasses INSERT, INSERT_OVERWRITE, UPDATE, DELETE, PATH_WRITE operations
if (addQueryType(op, (WriteEntity) entity)) {
buffer.append(SEP);
buffer.append(((WriteEntity) entity).getWriteType().name());
}
if (output.getType() == Entity.Type.PARTITION || output.getType() == Entity.Type.TABLE) {
Table outputTable = refreshTable(hiveBridge, output.getTable().getDbName(), output.getTable().getTableName());
if (outputTable != null) {
addDataset(buffer, refs.get(output), HiveMetaStoreBridge.getTableCreatedTime(outputTable));
}
} else {
addDataset(buffer, refs.get(output));
}
}
dataSetsProcessed.add(output.getName().toLowerCase());
}
}
}
}
}
protected static Table refreshTable(HiveMetaStoreBridge dgiBridge, String dbName, String tableName) {
try {
return dgiBridge.getHiveClient().getTable(dbName, tableName);
} catch (HiveException excp) { // this might be the case for temp tables
LOG.warn("failed to get details for table {}.{}. Ignoring. {}: {}", dbName, tableName, excp.getClass().getCanonicalName(), excp.getMessage());
}
return null;
}
protected static boolean addQueryType(HiveOperation op, WriteEntity entity) {
if (entity.getWriteType() != null && HiveOperation.QUERY.equals(op)) {
switch (entity.getWriteType()) {
case INSERT:
case INSERT_OVERWRITE:
case UPDATE:
case DELETE:
return true;
case PATH_WRITE:
//Add query type only for DFS paths and ignore local paths since they are not added as outputs
if ( !Entity.Type.LOCAL_DIR.equals(entity.getType())) {
return true;
}
break;
default:
}
}
return false;
}
@VisibleForTesting
protected static final class EntityComparator implements Comparator<Entity> {
@Override
public int compare(Entity o1, Entity o2) {
String s1 = o1.getName();
String s2 = o2.getName();
if (s1 == null || s2 == null){
s1 = o1.getD().toString();
s2 = o2.getD().toString();
}
return s1.toLowerCase().compareTo(s2.toLowerCase());
}
}
@VisibleForTesting
protected static final Comparator<Entity> entityComparator = new EntityComparator();
protected AtlasObjectId toAtlasObjectId(Object obj) {
final AtlasObjectId ret;
if (obj instanceof AtlasObjectId) {
ret = (AtlasObjectId) obj;
} else if (obj instanceof Map) {
ret = new AtlasObjectId((Map) obj);
} else if (obj != null) {
ret = new AtlasObjectId(obj.toString()); // guid
} else {
ret = null;
}
return ret;
}
protected List<AtlasObjectId> toAtlasObjectIdList(Object obj) {
final List<AtlasObjectId> ret;
if (obj instanceof Collection) {
Collection coll = (Collection) obj;
ret = new ArrayList<>(coll.size());
for (Object item : coll) {
AtlasObjectId objId = toAtlasObjectId(item);
if (objId != null) {
ret.add(objId);
}
}
} else {
AtlasObjectId objId = toAtlasObjectId(obj);
if (objId != null) {
ret = new ArrayList<>(1);
ret.add(objId);
} else {
ret = null;
}
}
return ret;
}
protected AtlasStruct toAtlasStruct(Object obj) {
final AtlasStruct ret;
if (obj instanceof AtlasStruct) {
ret = (AtlasStruct) obj;
} else if (obj instanceof Map) {
Map map = (Map) obj;
Object typeName = map.get("typeName");
Map attributes = (map.get("attributes") instanceof Map) ? (Map) map.get("attributes") : map;
ret = new AtlasStruct(typeName == null ? "" : typeName.toString(), attributes);
} else {
ret = null;
}
return ret;
}
protected List<AtlasStruct> toAtlasStructList(Object obj) {
final List<AtlasStruct> ret;
if (obj instanceof Collection) {
Collection coll = (Collection) obj;
ret = new ArrayList<>(coll.size());
for (Object item : coll) {
AtlasStruct struct = toAtlasStruct(item);
if (struct != null) {
ret.add(struct);
}
}
} else {
AtlasStruct struct = toAtlasStruct(obj);
if (struct != null) {
ret = new ArrayList<>(1);
ret.add(struct);
} else {
ret = null;
}
}
return ret;
}}