blob: 57362046527713078f30380b8b24fb0f8b13e845 [file] [log] [blame]
/*
* Copyright 2019 WeBank
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.webank.wedatasphere.spark.excel
import java.io.{BufferedOutputStream, OutputStream}
import java.text.SimpleDateFormat
import java.util.Date
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.poi.ss.usermodel.{IndexedColors, _}
import org.apache.poi.xssf.streaming.SXSSFWorkbook
import org.apache.spark.sql.DataFrame
import scala.language.postfixOps
object ExcelFileSaver {
final val DEFAULT_SHEET_NAME = "Sheet1"
final val DEFAULT_DATE_FORMAT = "yyyy-MM-dd"
final val DEFAULT_TIMESTAMP_FORMAT = "yyyy-mm-dd hh:mm:ss.000"
final val DEFAULT_EXPORT_NULL_VALUE = "SHUFFLEOFF"
}
class ExcelFileSaver(fs: FileSystem) {
import ExcelFileSaver._
def save(
location: Path,
dataFrame: DataFrame,
sheetName: String = DEFAULT_SHEET_NAME,
useHeader: Boolean = true,
dateFormat: String = DEFAULT_DATE_FORMAT,
timestampFormat: String = DEFAULT_TIMESTAMP_FORMAT,
exportNullValue:String = DEFAULT_EXPORT_NULL_VALUE
): Unit = {
fs.setVerifyChecksum(false)
val headerRow = dataFrame.schema.map(_.name)
val dataRows = dataFrame.toLocalIterator()
val excelWriter = new ExcelWriter(sheetName, dateFormat, timestampFormat)
if (useHeader) excelWriter.writeHead(headerRow)
while (dataRows.hasNext) {
val line = dataRows.next().toSeq
excelWriter.writerRow(line,exportNullValue)
}
// if failed try to refresh nfs cache
val out: OutputStream =
try{
fs.create(location)
} catch {
case e:Throwable =>
fs.listFiles(location.getParent, false)
fs.create(location)
case _ =>
fs.listFiles(location.getParent, false)
fs.create(location)
}
excelWriter.close(new BufferedOutputStream(out))
}
def autoClose[A <: AutoCloseable, B](closeable: A)(fun: (A) => B): B = {
try {
fun(closeable)
} finally {
closeable.close()
}
}
}
class ExcelWriter(sheetName: String, dateFormat: String, timestampFormat: String) {
val wb = new SXSSFWorkbook(100)
val sheet = wb.createSheet(sheetName)
val df = new SimpleDateFormat(dateFormat)
val tf = new SimpleDateFormat(timestampFormat)
val createHelper = wb.getCreationHelper
val sdf = wb.createCellStyle()
sdf.setDataFormat(createHelper.createDataFormat().getFormat(dateFormat))
val stf = wb.createCellStyle()
stf.setDataFormat(createHelper.createDataFormat().getFormat(timestampFormat))
var rowNum = 0
var columnsLen = 0
def writeHead(headRow: Seq[String]): Unit = {
columnsLen = headRow.length
//设置header的格式
val headerFont = wb.createFont()
headerFont.setBold(true)
headerFont.setFontHeightInPoints(14)
headerFont.setColor(IndexedColors.RED.getIndex())
val headerCellStyle = wb.createCellStyle()
headerCellStyle.setFont(headerFont)
val row = sheet.createRow(rowNum)
for (i <- headRow.indices) {
createCell(row, i, headRow(i), headerCellStyle)
}
rowNum = rowNum + 1
}
def writerRow(line: Seq[Any],exportNullValue:String): Unit = {
val row = sheet.createRow(rowNum)
for (i <- line.indices) {
createCell(row, i, line(i),exportNullValue)
}
rowNum = rowNum + 1
}
def createCell(row: Row, col: Int, value: String, cellStyle: CellStyle): Unit = {
val cell = row.createCell(col)
cell.setCellValue(value)
cell.setCellStyle(cellStyle)
}
def setDateValue(cell:Cell, date: Date): Unit ={
cell.setCellStyle(sdf)
cell.setCellValue(date)
}
def setTimestampValue(cell:Cell, date: Date): Unit ={
cell.setCellStyle(stf)
cell.setCellValue(date)
}
def createCell(row: Row, col: Int, value: Any,exportNullValue:String): Unit = {
val cell = row.createCell(col)
value match {
case t: java.sql.Timestamp => setTimestampValue(cell, new Date(t.getTime))
case d: java.sql.Date => setDateValue(cell, new Date(d.getTime))
case s: String => {
if(("NULL".equals(s) || "".equals(s)) && !"SHUFFLEOFF".equals(exportNullValue))
cell.setCellValue(exportNullValue)
else
cell.setCellValue(s)
}
case f: Float => cell.setCellValue(f)
case d: Double => cell.setCellValue(d)
case b: Boolean => cell.setCellValue(b)
case b: Byte => cell.setCellValue(b)
case s: Short => cell.setCellValue(s)
case i: Int => cell.setCellValue(i)
case l: Long => cell.setCellValue(l)
case b: BigDecimal => cell.setCellValue(b.doubleValue())
case b: java.math.BigDecimal => cell.setCellValue(b.doubleValue())
case null => if("SHUFFLEOFF".equals(exportNullValue)) cell.setCellValue("NULL") else cell.setCellValue(exportNullValue)
case _ => cell.setCellValue(value.toString)
}
}
def close(outputStream: OutputStream): Unit = {
try {
sheet.trackAllColumnsForAutoSizing()
for (i <- 0 until columnsLen) {
sheet.autoSizeColumn(i)
}
wb.write(outputStream);
} catch {
case e: Throwable =>
throw e
} finally {
outputStream.close()
wb.close()
}
}
}