| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package com.datatorrent.stram.client; |
| |
| import java.io.Closeable; |
| import java.io.File; |
| import java.io.FileInputStream; |
| import java.io.FileOutputStream; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.nio.file.Files; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.TreeMap; |
| import java.util.TreeSet; |
| import java.util.jar.Attributes; |
| import java.util.jar.JarEntry; |
| import java.util.jar.JarFile; |
| import java.util.jar.JarInputStream; |
| import java.util.jar.Manifest; |
| |
| import org.codehaus.jackson.JsonGenerator; |
| import org.codehaus.jackson.JsonProcessingException; |
| import org.codehaus.jackson.map.JsonSerializer; |
| import org.codehaus.jackson.map.SerializerProvider; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.commons.io.FileUtils; |
| import org.apache.commons.io.IOUtils; |
| import org.apache.commons.lang.exception.ExceptionUtils; |
| import org.apache.commons.lang3.StringUtils; |
| import org.apache.hadoop.conf.Configuration; |
| |
| import com.datatorrent.stram.client.StramAppLauncher.AppFactory; |
| import com.datatorrent.stram.plan.logical.LogicalPlan; |
| |
| import net.lingala.zip4j.core.ZipFile; |
| import net.lingala.zip4j.exception.ZipException; |
| import net.lingala.zip4j.model.ZipParameters; |
| |
| |
| /** |
| * <p> |
| * AppPackage class.</p> |
| * |
| * @since 1.0.3 |
| */ |
| public class AppPackage implements Closeable |
| { |
| public static final String ATTRIBUTE_DT_ENGINE_VERSION = "DT-Engine-Version"; |
| public static final String ATTRIBUTE_DT_APP_PACKAGE_NAME = "DT-App-Package-Name"; |
| public static final String ATTRIBUTE_DT_APP_PACKAGE_VERSION = "DT-App-Package-Version"; |
| public static final String ATTRIBUTE_DT_APP_PACKAGE_GROUP_ID = "DT-App-Package-Group-Id"; |
| public static final String ATTRIBUTE_CLASS_PATH = "Class-Path"; |
| public static final String ATTRIBUTE_DT_APP_PACKAGE_DISPLAY_NAME = "DT-App-Package-Display-Name"; |
| public static final String ATTRIBUTE_DT_APP_PACKAGE_DESCRIPTION = "DT-App-Package-Description"; |
| |
| private final String appPackageName; |
| private final String appPackageVersion; |
| private final String appPackageGroupId; |
| private final String dtEngineVersion; |
| private final String appPackageDescription; |
| private final String appPackageDisplayName; |
| private final ArrayList<String> classPath = new ArrayList<>(); |
| private final File directory; |
| |
| private final List<AppInfo> applications = new ArrayList<>(); |
| private final List<String> appJars = new ArrayList<>(); |
| private final List<String> appJsonFiles = new ArrayList<>(); |
| private final List<String> appPropertiesFiles = new ArrayList<>(); |
| |
| private final Set<String> requiredProperties = new TreeSet<>(); |
| private final Map<String, PropertyInfo> defaultProperties = new TreeMap<>(); |
| private final Set<String> configs = new TreeSet<>(); |
| |
| private final File resourcesDirectory; |
| private final boolean cleanOnClose; |
| |
| public static class AppInfo |
| { |
| public final String name; |
| public final String file; |
| public final String type; |
| public String displayName; |
| public LogicalPlan dag; |
| public String error; |
| public String errorStackTrace; |
| |
| public Set<String> requiredProperties = new TreeSet<>(); |
| public Map<String, PropertyInfo> defaultProperties = new TreeMap<>(); |
| |
| public AppInfo(String name, String file, String type) |
| { |
| this.name = name; |
| this.file = file; |
| this.type = type; |
| } |
| |
| } |
| |
| public static class PropertyInfo |
| { |
| private final String value; |
| private final String description; |
| |
| public PropertyInfo(final String value, final String description) |
| { |
| this.value = value; |
| this.description = description; |
| } |
| |
| public String getValue() |
| { |
| return value; |
| } |
| |
| public String getDescription() |
| { |
| return description; |
| } |
| } |
| |
| public static class PropertyInfoSerializer extends JsonSerializer<PropertyInfo> |
| { |
| private boolean provideDescription; |
| |
| public PropertyInfoSerializer(boolean provideDescription) |
| { |
| this.provideDescription = provideDescription; |
| } |
| |
| @Override |
| public void serialize( |
| PropertyInfo propertyInfo, JsonGenerator jgen, SerializerProvider provider) |
| throws IOException, JsonProcessingException |
| { |
| if (provideDescription) { |
| jgen.writeStartObject(); |
| jgen.writeStringField("value", propertyInfo.value); |
| jgen.writeStringField("description", propertyInfo.description); |
| jgen.writeEndObject(); |
| } else { |
| jgen.writeString(propertyInfo.value); |
| } |
| } |
| } |
| |
| public AppPackage(File file) throws IOException |
| { |
| this(new FileInputStream(file)); |
| } |
| |
| public AppPackage(InputStream input) throws IOException |
| { |
| this(input, false); |
| } |
| |
| /** |
| * Creates an App Package object. |
| * |
| * If app directory is to be processed, there may be resource leak in the class loader. Only pass true for short-lived |
| * applications |
| * |
| * If contentFolder is not null, it will try to create the contentFolder, file will be retained on disk after App Package is closed |
| * If contentFolder is null, temp folder will be created and will be cleaned on close() |
| * |
| * @param file |
| * @param contentFolder the folder that the app package will be extracted to |
| * @param processAppDirectory |
| * @throws java.io.IOException |
| */ |
| public AppPackage(File file, File contentFolder, boolean processAppDirectory) throws IOException |
| { |
| this(new FileInputStream(file), contentFolder, processAppDirectory); |
| } |
| |
| /** |
| * Creates an App Package object. |
| * |
| * If app directory is to be processed, there may be resource leak in the class loader. Only pass true for short-lived |
| * applications |
| * |
| * If contentFolder is not null, it will try to create the contentFolder, file will be retained on disk after App Package is closed |
| * If contentFolder is null, temp folder will be created and will be cleaned on close() |
| * |
| * @param input |
| * @param contentFolder the folder that the app package will be extracted to |
| * @param processAppDirectory |
| * @throws java.io.IOException |
| */ |
| public AppPackage(InputStream input, File contentFolder, boolean processAppDirectory) throws IOException |
| { |
| final JarInputStream jarInputStream = new JarInputStream(input); |
| |
| if (contentFolder != null) { |
| FileUtils.forceMkdir(contentFolder); |
| cleanOnClose = false; |
| } else { |
| cleanOnClose = true; |
| contentFolder = Files.createTempDirectory("dt-appPackage-").toFile(); |
| } |
| directory = contentFolder; |
| |
| Manifest manifest = jarInputStream.getManifest(); |
| if (manifest == null) { |
| throw new IOException("Not a valid app package. MANIFEST.MF is not present."); |
| } |
| Attributes attr = manifest.getMainAttributes(); |
| appPackageName = attr.getValue(ATTRIBUTE_DT_APP_PACKAGE_NAME); |
| appPackageVersion = attr.getValue(ATTRIBUTE_DT_APP_PACKAGE_VERSION); |
| appPackageGroupId = attr.getValue(ATTRIBUTE_DT_APP_PACKAGE_GROUP_ID); |
| dtEngineVersion = attr.getValue(ATTRIBUTE_DT_ENGINE_VERSION); |
| appPackageDisplayName = attr.getValue(ATTRIBUTE_DT_APP_PACKAGE_DISPLAY_NAME); |
| appPackageDescription = attr.getValue(ATTRIBUTE_DT_APP_PACKAGE_DESCRIPTION); |
| String classPathString = attr.getValue(ATTRIBUTE_CLASS_PATH); |
| if (appPackageName == null || appPackageVersion == null || classPathString == null) { |
| throw new IOException("Not a valid app package. App Package Name or Version or Class-Path is missing from MANIFEST.MF"); |
| } |
| classPath.addAll(Arrays.asList(StringUtils.split(classPathString, " "))); |
| extractToDirectory(directory, jarInputStream); |
| |
| File confDirectory = new File(directory, "conf"); |
| if (confDirectory.exists()) { |
| processConfDirectory(confDirectory); |
| } |
| resourcesDirectory = new File(directory, "resources"); |
| |
| File propertiesXml = new File(directory, "META-INF/properties.xml"); |
| if (propertiesXml.exists()) { |
| processPropertiesXml(propertiesXml, null); |
| } |
| |
| if (processAppDirectory) { |
| processAppDirectory(false); |
| } |
| } |
| |
| private void processAppProperties() |
| { |
| for (AppInfo app : applications) { |
| app.requiredProperties.addAll(requiredProperties); |
| app.defaultProperties.putAll(defaultProperties); |
| File appPropertiesXml = new File(directory, "META-INF/properties-" + app.name + ".xml"); |
| if (appPropertiesXml.exists()) { |
| processPropertiesXml(appPropertiesXml, app); |
| } |
| } |
| } |
| |
| /** |
| * Creates an App Package object. |
| * |
| * If app directory is to be processed, there may be resource leak in the class loader. Only pass true for short-lived |
| * applications |
| * |
| * Files in app package will be extracted to tmp folder and will be cleaned on close() |
| * The close() method could be explicitly called or implicitly called by GC finalize() |
| * |
| * @param file |
| * @param processAppDirectory |
| * @throws java.io.IOException |
| */ |
| public AppPackage(File file, boolean processAppDirectory) throws IOException |
| { |
| this(new FileInputStream(file), processAppDirectory); |
| } |
| |
| /** |
| * Creates an App Package object. |
| * |
| * If app directory is to be processed, there may be resource leak in the class loader. Only pass true for short-lived |
| * applications |
| * |
| * Files in app package will be extracted to tmp folder and will be cleaned on close() |
| * The close() method could be explicitly called or implicitly called by GC finalize() |
| * |
| * @param input |
| * @param processAppDirectory |
| * @throws java.io.IOException |
| */ |
| public AppPackage(InputStream input, boolean processAppDirectory) throws IOException |
| { |
| this(input, null, processAppDirectory); |
| } |
| |
| public static void extractToDirectory(File directory, File appPackageFile) throws IOException |
| { |
| extractToDirectory(directory, new JarInputStream(new FileInputStream(appPackageFile))); |
| } |
| |
| private static void extractToDirectory(File directory, JarInputStream input) throws IOException |
| { |
| File manifestFile = new File(directory, JarFile.MANIFEST_NAME); |
| manifestFile.getParentFile().mkdirs(); |
| try (FileOutputStream output = new FileOutputStream(manifestFile)) { |
| input.getManifest().write(output); |
| } |
| |
| JarEntry entry = input.getNextJarEntry(); |
| while (entry != null) { |
| File newFile = new File(directory, entry.getName()); |
| if (entry.isDirectory()) { |
| newFile.mkdirs(); |
| } else { |
| try (FileOutputStream output = new FileOutputStream(newFile)) { |
| IOUtils.copy(input, output); |
| } |
| } |
| input.closeEntry(); |
| entry = input.getNextJarEntry(); |
| } |
| } |
| |
| public static void createAppPackageFile(File fileToBeCreated, File directory) throws ZipException |
| { |
| ZipFile zipFile = new ZipFile(fileToBeCreated); |
| ZipParameters params = new ZipParameters(); |
| params.setIncludeRootFolder(false); |
| zipFile.addFolder(directory, params); |
| } |
| |
| public File tempDirectory() |
| { |
| return directory; |
| } |
| |
| @Override |
| public void close() throws IOException |
| { |
| if (cleanOnClose) { |
| cleanContent(); |
| } |
| } |
| |
| public void cleanContent() throws IOException |
| { |
| FileUtils.deleteDirectory(directory); |
| LOG.debug("App Package {}-{} folder {} is removed", appPackageName, appPackageVersion, directory.getAbsolutePath()); |
| } |
| |
| public String getAppPackageName() |
| { |
| return appPackageName; |
| } |
| |
| public String getAppPackageVersion() |
| { |
| return appPackageVersion; |
| } |
| |
| public String getAppPackageGroupId() |
| { |
| return appPackageGroupId; |
| } |
| |
| public String getAppPackageDescription() |
| { |
| return appPackageDescription; |
| } |
| |
| public String getAppPackageDisplayName() |
| { |
| return appPackageDisplayName; |
| } |
| |
| public String getDtEngineVersion() |
| { |
| return dtEngineVersion; |
| } |
| |
| public List<String> getClassPath() |
| { |
| return Collections.unmodifiableList(classPath); |
| } |
| |
| public Collection<String> getConfigs() |
| { |
| return Collections.unmodifiableCollection(configs); |
| } |
| |
| public File resourcesDirectory() |
| { |
| return resourcesDirectory; |
| } |
| |
| public List<AppInfo> getApplications() |
| { |
| return Collections.unmodifiableList(applications); |
| } |
| |
| public List<String> getAppJars() |
| { |
| return Collections.unmodifiableList(appJars); |
| } |
| |
| public List<String> getAppJsonFiles() |
| { |
| return Collections.unmodifiableList(appJsonFiles); |
| } |
| |
| public List<String> getAppPropertiesFiles() |
| { |
| return Collections.unmodifiableList(appPropertiesFiles); |
| } |
| |
| public Set<String> getRequiredProperties() |
| { |
| return Collections.unmodifiableSet(requiredProperties); |
| } |
| |
| public Map<String, PropertyInfo> getDefaultProperties() |
| { |
| return Collections.unmodifiableMap(defaultProperties); |
| } |
| |
| public void processAppDirectory(boolean skipJars) |
| { |
| File dir = new File(directory, "app"); |
| applications.clear(); |
| |
| Configuration config = new Configuration(); |
| |
| for (Map.Entry<String, PropertyInfo> entry : defaultProperties.entrySet()) { |
| config.set(entry.getKey(), entry.getValue().getValue()); |
| } |
| |
| List<String> absClassPath = new ArrayList<>(classPath); |
| for (int i = 0; i < absClassPath.size(); i++) { |
| String path = absClassPath.get(i); |
| if (!path.startsWith("/")) { |
| absClassPath.set(i, directory + "/" + path); |
| } |
| } |
| config.set(StramAppLauncher.LIBJARS_CONF_KEY_NAME, StringUtils.join(absClassPath, ',')); |
| File[] files = dir.listFiles(); |
| for (File entry : files) { |
| |
| if (entry.getName().endsWith(".jar") && !skipJars) { |
| appJars.add(entry.getName()); |
| try { |
| StramAppLauncher stramAppLauncher = new StramAppLauncher(entry, config); |
| stramAppLauncher.loadDependencies(); |
| List<AppFactory> appFactories = stramAppLauncher.getBundledTopologies(); |
| for (AppFactory appFactory : appFactories) { |
| String appName = stramAppLauncher.getLogicalPlanConfiguration().getAppAlias(appFactory.getName()); |
| if (appName == null) { |
| appName = appFactory.getName(); |
| } |
| AppInfo appInfo = new AppInfo(appName, entry.getName(), "class"); |
| appInfo.displayName = appFactory.getDisplayName(); |
| try { |
| appInfo.dag = appFactory.createApp(stramAppLauncher.getLogicalPlanConfiguration()); |
| } catch (Throwable ex) { |
| appInfo.error = ex.getMessage(); |
| appInfo.errorStackTrace = ExceptionUtils.getStackTrace(ex); |
| } |
| applications.add(appInfo); |
| } |
| } catch (Exception ex) { |
| LOG.error("Caught exception trying to process {}", entry.getName(), ex); |
| } |
| } |
| } |
| |
| // this is for the properties and json files to be able to depend on the app jars, |
| // since it's possible for users to implement the operators as part of the app package |
| for (String appJar : appJars) { |
| absClassPath.add(new File(dir, appJar).getAbsolutePath()); |
| } |
| config.set(StramAppLauncher.LIBJARS_CONF_KEY_NAME, StringUtils.join(absClassPath, ',')); |
| files = dir.listFiles(); |
| for (File entry : files) { |
| if (entry.getName().endsWith(".json")) { |
| appJsonFiles.add(entry.getName()); |
| AppInfo appInfo = StramClientUtils.jsonFileToAppInfo(entry, config); |
| |
| if (appInfo != null) { |
| applications.add(appInfo); |
| } |
| |
| } else if (entry.getName().endsWith(".properties")) { |
| appPropertiesFiles.add(entry.getName()); |
| try { |
| AppFactory appFactory = new StramAppLauncher.PropertyFileAppFactory(entry); |
| StramAppLauncher stramAppLauncher = new StramAppLauncher(entry.getName(), config); |
| stramAppLauncher.loadDependencies(); |
| AppInfo appInfo = new AppInfo(appFactory.getName(), entry.getName(), "properties"); |
| appInfo.displayName = appFactory.getDisplayName(); |
| try { |
| appInfo.dag = appFactory.createApp(stramAppLauncher.getLogicalPlanConfiguration()); |
| } catch (Throwable t) { |
| appInfo.error = t.getMessage(); |
| appInfo.errorStackTrace = ExceptionUtils.getStackTrace(t); |
| } |
| applications.add(appInfo); |
| } catch (Exception ex) { |
| LOG.error("Caught exceptions trying to process {}", entry.getName(), ex); |
| } |
| } else if (!entry.getName().endsWith(".jar")) { |
| LOG.warn("Ignoring file {} with unknown extension in app directory", entry.getName()); |
| } |
| } |
| |
| processAppProperties(); |
| } |
| |
| private void processConfDirectory(File dir) |
| { |
| File[] files = dir.listFiles(); |
| for (File entry : files) { |
| if (entry.getName().endsWith(".xml")) { |
| configs.add(entry.getName()); |
| } |
| } |
| } |
| |
| private void processPropertiesXml(File file, AppInfo app) |
| { |
| DTConfiguration config = new DTConfiguration(); |
| try { |
| config.loadFile(file); |
| for (Map.Entry<String, String> entry : config) { |
| String key = entry.getKey(); |
| String value = entry.getValue(); |
| if (value == null) { |
| if (app == null) { |
| requiredProperties.add(key); |
| } else { |
| app.requiredProperties.add(key); |
| } |
| } else { |
| //Attribute are platform specific, ignoring description provided in properties file |
| String description = key.contains(".attr.") ? null : config.getDescription(key); |
| PropertyInfo propertyInfo = new PropertyInfo(value, description); |
| if (app == null) { |
| defaultProperties.put(key, propertyInfo); |
| } else { |
| app.requiredProperties.remove(key); |
| app.defaultProperties.put(key, propertyInfo); |
| } |
| } |
| } |
| } catch (Exception ex) { |
| LOG.warn("Ignoring META_INF/properties.xml because of error", ex); |
| } |
| } |
| |
| private static final Logger LOG = LoggerFactory.getLogger(AppPackage.class); |
| |
| } |