blob: 910aaafb615a698256f80505117eb6ae7f733c8e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hop.imports.kettle;
import org.apache.commons.lang3.StringUtils;
import org.apache.hop.core.database.*;
import org.apache.hop.core.exception.HopPluginException;
import org.apache.hop.core.plugins.IPlugin;
import org.apache.hop.core.util.StringUtil;
import org.apache.hop.imports.HopImport;
import org.apache.hop.imports.IHopImport;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
import org.xml.sax.SAXException;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;
import javax.xml.transform.dom.DOMSource;
import java.io.*;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class KettleImport extends HopImport implements IHopImport {
public int kjbCounter, ktrCounter, otherCounter = 0;
public KettleImport(){
super();
}
public KettleImport(String inputFolderName, String outputFolderName) {
super(inputFolderName, outputFolderName);
}
@Override
public void importHopFolder(){
FilenameFilter kettleFilter = (dir, name) -> name.endsWith(".ktr") | name.endsWith("*.kjb");
String[] kettleFileNames = inputFolder.list(kettleFilter);
try {
// Walk over all ktr and kjb files we received, migrate to hpl and hwf
Stream<Path> kettleWalk = Files.walk(Paths.get(inputFolder.getAbsolutePath()));
List<String> result = kettleWalk.map(x -> x.toString()).filter(f -> f.endsWith(".ktr") || f.endsWith(".kjb")).collect(Collectors.toList());
result.forEach(kettleFilename -> {
File kettleFile = new File(kettleFilename);
importHopFile(kettleFile);
});
kettleWalk = Files.walk(Paths.get(inputFolder.getAbsolutePath()));
// TODO: add a proper way to exclude folders instead of hard coded .git exclude.
List<String> otherFilesList = kettleWalk.map(x -> x.toString()).filter(f -> !f.endsWith(".ktr") && !f.endsWith(".kjb") && !f.contains(".git/")).collect(Collectors.toList());
otherFilesList.forEach(otherFilename -> {
File otherFile = new File(otherFilename);
if(!otherFile.isDirectory()){
migratedFilesMap.put(otherFilename, null);
otherCounter++;
}
});
} catch (IOException e) {
e.printStackTrace();
}
log.logBasic("We found " + kettleFileNames.length + " kettle files. ");
}
@Override
public void importHopFile(File kettleFile){
Document doc = getDocFromFile(kettleFile);
// import connections first
importDbConnections(doc, kettleFile);
// move to processNode?
if(kettleFile.getName().endsWith(".ktr")){
ktrCounter++;
renameNode(doc, doc.getDocumentElement(), "pipeline");
}else if(kettleFile.getName().endsWith(".kjb")){
kjbCounter++;
renameNode(doc, doc.getDocumentElement(), "workflow");
}
processNode(doc, doc.getDocumentElement());
DOMSource domSource = new DOMSource(doc);
String outFilename = kettleFile.getAbsolutePath().replaceAll(inputFolder.getAbsolutePath(), outputFolder.getAbsolutePath()).replaceAll(".ktr", ".hpl").replaceAll(".kjb", ".hwf");
migratedFilesMap.put(outFilename, domSource);
}
@Override
public void importXmlDbConn(String dbConnPath){
Document doc = getDocFromFile(new File(dbConnPath));
importDbConnections(doc, null);
}
@Override
public void importPropertiesDbConn(String dbConnPath){
try{
Properties properties = new Properties();
File varFile = new File(dbConnPath);
InputStream inputStream = new FileInputStream(varFile);
properties.load(inputStream);
List<String> connNamesList = new ArrayList<String>();
Set connKeys = properties.keySet();
connKeys.forEach(connKey -> {
String connName = ((String)connKey).split("/")[0];
if(!connNamesList.contains(connName)){
connNamesList.add(connName);
};
});
connNamesList.forEach(connName -> {
GenericDatabaseMeta database = new GenericDatabaseMeta();
database.setDriverClass((String) properties.get(connName + "/driver"));
database.setManualUrl((String) properties.get(connName + "/url"));
database.setUsername((String) properties.get(connName + "/user"));
database.setPassword((String) properties.get(connName + "/password"));
IDatabase db = (IDatabase) database;
DatabaseMeta databaseMeta = new DatabaseMeta();
databaseMeta.setName(connName);
db.setPluginId(databaseMeta.getPluginName());
databaseMeta.setIDatabase(db);
addDatabaseMeta(varFile.getAbsolutePath(), databaseMeta);
});
}catch(FileNotFoundException e){
e.printStackTrace();
}catch(IOException e){
e.printStackTrace();
}
}
private void importDbConnections(Document doc, File kettleFile){
NodeList connectionList = doc.getElementsByTagName("connection");
for(int i = 0; i < connectionList.getLength(); i++){
if(connectionList.item(i).getParentNode().equals(doc.getDocumentElement())){
Element connElement = (Element)connectionList.item(i);
String databaseType = connElement.getElementsByTagName("type").item(0).getTextContent();
List<IPlugin> databasePluginTypes = registry.getPlugins(DatabasePluginType.class);
IPlugin databasePlugin = registry.findPluginWithId(DatabasePluginType.class, connElement.getElementsByTagName("type").item(0).getTextContent());
try {
DatabaseMeta databaseMeta = new DatabaseMeta();
IDatabase iDatabase = (BaseDatabaseMeta) registry.loadClass(databasePlugin);
if(connElement.getElementsByTagName("name").getLength() > 0){
databaseMeta.setName(getTextContent(connElement, "name", 0));
}
if(connElement.getElementsByTagName("server").getLength() > 0){
iDatabase.setHostname(getTextContent(connElement, "server", 0));
}
if(connElement.getElementsByTagName("access").getLength() > 0){
iDatabase.setAccessType(DatabaseMeta.getAccessType(getTextContent(connElement, "access", 0)));
}
if(connElement.getElementsByTagName("database").getLength() > 0){
iDatabase.setDatabaseName(getTextContent(connElement, "database", 0));
}
if(connElement.getElementsByTagName("port").getLength() > 0){
iDatabase.setPort(getTextContent(connElement, "port", 0));
}
if(connElement.getElementsByTagName("username").getLength() > 0) {
iDatabase.setUsername(getTextContent(connElement, "username", 0));
}
if(connElement.getElementsByTagName("password").getLength() > 0){
iDatabase.setPassword(getTextContent(connElement, "password",0 ));
}
if(connElement.getElementsByTagName("servername").getLength() > 0){
iDatabase.setServername(getTextContent(connElement, "servername", 0));
}
if(connElement.getElementsByTagName("tablespace").getLength() > 0){
iDatabase.setDataTablespace(getTextContent(connElement, "tablespace", 0));
}
if(connElement.getElementsByTagName("data_tablespace").getLength() > 0){
iDatabase.setDataTablespace(getTextContent(connElement, "data_tablespace", 0));
}
if(connElement.getElementsByTagName("index_tablespace").getLength() > 0){
iDatabase.setIndexTablespace(getTextContent(connElement, "index_tablespace", 0));
}
Map<String, String> attributesMap = new HashMap<String, String>();
NodeList connNodeList = connElement.getElementsByTagName("attributes");
for(int j=0; j < connNodeList.getLength() ; j++){
if(connNodeList.item(j).getNodeName().equals("attributes")){
Node attributesNode = connNodeList.item(j);
for(int k=0; k < attributesNode.getChildNodes().getLength(); k++){
Node attributeNode = attributesNode.getChildNodes().item(k);
for(int l=0; l < attributeNode.getChildNodes().getLength(); l++){
String code = "";
String attribute = "";
if(attributeNode.getChildNodes().item(l).getNodeName().equals("code")){
code = attributeNode.getChildNodes().item(l).getTextContent();
}
if(attributeNode.getChildNodes().item(l).getNodeName().equals("attribute")){
attribute = attributeNode.getChildNodes().item(l).getTextContent();
}
attributesMap.put(code, attribute);
}
}
}
}
iDatabase.setAttributes(attributesMap);
databaseMeta.setIDatabase(iDatabase);
iDatabase.setPluginId(databaseMeta.getPluginName());
databaseMeta.setDatabaseType(connElement.getElementsByTagName("type").item(0).getTextContent());
addDatabaseMeta(kettleFile.getAbsolutePath(), databaseMeta);
}catch (HopPluginException e) {
e.printStackTrace();
}catch(NullPointerException e){
log.logError("Failed to parse connection of type '" + databaseType + "' for file '" + kettleFile.getAbsolutePath() + "'");
log.logError("Exception processing connection: " + e.getMessage());
e.printStackTrace();
}
}
}
}
private void renameNode(Document doc, Element element, String newElementName){
doc.renameNode(element, null, newElementName);
}
private void processNode(Document doc, Node node){
Node nodeToProcess = node;
NodeList nodeList = nodeToProcess.getChildNodes();
// do a first pass to remove repository definitions
for (int i = 0; i < nodeList.getLength(); i++) {
Node repositoryNode = nodeList.item(i);
if (repositoryNode.getNodeType() == Node.ELEMENT_NODE) {
if (KettleConst.repositoryTypes.contains(repositoryNode.getTextContent())) {
for(int j=0; j < node.getChildNodes().getLength(); j++){
Node childNode = node.getChildNodes().item(j);
if(childNode.getNodeName().equals("jobname") || childNode.getNodeName().equals("transname")){
if(!StringUtil.isEmpty(childNode.getTextContent())){
nodeToProcess = processRepositoryNode(node);
}
}
}
nodeList = nodeToProcess.getChildNodes();
}
}
}
for (int i = 0; i < nodeList.getLength(); i++) {
Node currentNode = nodeList.item(i);
if (currentNode.getNodeType() == Node.ELEMENT_NODE) {
// remove superfluous elements
if(KettleConst.kettleElementsToRemove.containsKey(currentNode.getNodeName())){
if(!StringUtils.isEmpty(KettleConst.kettleElementsToRemove.get(currentNode.getNodeName()))){
if(currentNode.getParentNode().getNodeName().equals(KettleConst.kettleElementsToRemove.get(currentNode.getNodeName()))){
currentNode.getParentNode().removeChild(currentNode);
}
}else{
currentNode.getParentNode().removeChild(currentNode);
}
}
// rename Kettle elements to Hop elements
if(KettleConst.kettleElementReplacements.containsKey(currentNode.getNodeName())){
renameNode(doc, (Element)currentNode, KettleConst.kettleElementReplacements.get(currentNode.getNodeName()));
}
// replace element contents with Hop equivalent
if(KettleConst.kettleReplaceContent.containsKey(currentNode.getTextContent())){
currentNode.setTextContent(KettleConst.kettleReplaceContent.get(currentNode.getTextContent()));
}
processNode(doc, currentNode);
}
// partial node content replacement
if(currentNode.getNodeType() == Node.TEXT_NODE && !StringUtils.isEmpty(currentNode.getTextContent())){
for(Map.Entry<String, String> entry : KettleConst.kettleReplaceInContent.entrySet()){
if(currentNode.getTextContent().contains(entry.getKey())){
currentNode.setTextContent(currentNode.getTextContent().replace(entry.getKey(), entry.getValue()));
}
}
}
}
}
private Node processRepositoryNode(Node repositoryNode){
String filename = "";
String directory = "${PROJECT_HOME}";
String type = "";
Node filenameNode = null;
for(int i=0; i < repositoryNode.getChildNodes().getLength(); i++){
Node childNode = repositoryNode.getChildNodes().item(i);
if(childNode.getNodeName().equals("directory")){
if(childNode.getTextContent().startsWith(System.getProperty("file.separator"))){
directory += childNode.getTextContent();
}else{
directory += System.getProperty("file.separator") + childNode.getTextContent();
}
repositoryNode.removeChild(childNode);
}
if(childNode.getNodeName().equals("type")){
if(KettleConst.jobTypes.contains(childNode.getTextContent())){
type = ".hwf";
}
if(KettleConst.transTypes.contains(childNode.getTextContent())){
type = ".hpl";
}
}
if(childNode.getNodeName().equals("filename")){
filename = childNode.getTextContent().replaceAll(".ktr", "").replaceAll(".kjb", "");
childNode.setTextContent(filename + type);
filenameNode = childNode;
}
// hard coded local run configuration for now
if(childNode.getNodeName().equals("run_configuration")){
childNode.setTextContent("local");
}
if(childNode.getNodeName().equals("jobname") || childNode.getNodeName().equals("transname")){
filename = childNode.getTextContent();
repositoryNode.removeChild(childNode);
}
}
filenameNode.setTextContent(directory + "/" + filename + type);
return repositoryNode;
}
private Document getDocFromFile(File kettleFile){
try{
DocumentBuilderFactory dbFactory = DocumentBuilderFactory.newInstance();
dbFactory.setIgnoringElementContentWhitespace(true);
DocumentBuilder dBuilder = dbFactory.newDocumentBuilder();
Document doc = dBuilder.parse(kettleFile);
return doc;
} catch (SAXException e) {
e.printStackTrace();
} catch (ParserConfigurationException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
private String getTextContent(Element element, String tagName, Integer itemIndex){
return element.getElementsByTagName(tagName).item(itemIndex).getTextContent();
}
}