blob: dde2f103e9a2d1b8d03552de2f46b72dae3b7681 [file] [log] [blame]
/*
* Copyright 2019 WeBank
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.webank.wedatasphere.linkis.engine.pipeline.executor
import java.io.OutputStream
import com.webank.wedatasphere.linkis.common.io.FsPath
import com.webank.wedatasphere.linkis.engine.execute.EngineExecutorContext
import com.webank.wedatasphere.linkis.engine.pipeline.OutputStreamCache
import com.webank.wedatasphere.linkis.engine.pipeline.constant.PipeLineConstant._
import com.webank.wedatasphere.linkis.engine.pipeline.conversions.FsConvertions._
import com.webank.wedatasphere.linkis.engine.pipeline.exception.PipeLineErrorException
import com.webank.wedatasphere.linkis.scheduler.executer.ExecuteResponse
import com.webank.wedatasphere.linkis.storage.FSFactory
import com.webank.wedatasphere.linkis.storage.excel.ExcelFsWriter
import com.webank.wedatasphere.linkis.storage.source.FileSource
import org.apache.commons.io.IOUtils
/**
* Created by johnnwang on 2019/1/30.
*/
class ExcelExecutor extends PipeLineExecutor {
override def execute(sourcePath: String, destPath: String, engineExecutorContext: EngineExecutorContext): ExecuteResponse = {
if (!FileSource.isResultSet(sourcePath)) {
throw new PipeLineErrorException(70005, "不是结果集文件")
}
val sourceFsPath = new FsPath(sourcePath)
val destFsPath = new FsPath(s"$destPath.xlsx")
val sourceFs = FSFactory.getFs(sourceFsPath)
sourceFs.init(null)
val destFs = FSFactory.getFs(destFsPath)
destFs.init(null)
val fileSource = FileSource.create(sourceFsPath, sourceFs)
if (!FileSource.isTableResultSet(fileSource)) {
throw new PipeLineErrorException(70005, "只有table类型的结果集才能转为excel")
}
var nullValue = options.getOrDefault(PIPELINE_OUTPUT_SHUFFLE_NULL_TYPE, "NULL")
if (BLANK.equalsIgnoreCase(nullValue)) nullValue = ""
val outputStream: OutputStream = destFs.write(destFsPath, options.get(PIPELINE_OUTPUT_ISOVERWRITE).toBoolean)
val excelFsWriter = ExcelFsWriter.getExcelFsWriter(DEFAULTC_HARSET, DEFAULT_SHEETNAME, DEFAULT_DATEFORMATE, outputStream)
import scala.collection.JavaConversions._
OutputStreamCache.osCache += engineExecutorContext.getJobId.get -> outputStream
fileSource.addParams("nullValue", nullValue).write(excelFsWriter)
IOUtils.closeQuietly(excelFsWriter)
IOUtils.closeQuietly(fileSource)
IOUtils.closeQuietly(sourceFs)
IOUtils.closeQuietly(destFs)
super.execute(sourcePath, destPath, engineExecutorContext)
}
override def Kind: String = "excel"
}
object ExcelExecutor {
val excelExecutor = new ExcelExecutor
def getInstance: PipeLineExecutor = excelExecutor
}