blob: 9412f643bd63a9c6af783ff1841a93b9631a9be1 [file] [log] [blame]
/*
* Copyright 2019 WeBank
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.webank.wedatasphere.linkis.entrance.log
import java.io.{IOException, InputStream}
import java.util
import com.webank.wedatasphere.linkis.common.io.{Fs, FsPath}
import com.webank.wedatasphere.linkis.common.utils.Utils
import com.webank.wedatasphere.linkis.storage.FSFactory
class CacheLogReader(logPath:String ,
charset:String,
sharedCache: Cache,
user: String)
extends LogReader(charset:String){
val lock:Object = new Object
def getCache:Cache = sharedCache
var inputStream:InputStream = _
var fileSystem:Fs = _
private def createInputStream: InputStream = {
if (fileSystem == null) this synchronized {
if (fileSystem == null){
fileSystem = FSFactory.getFsByProxyUser(new FsPath(logPath), user)
fileSystem.init(new util.HashMap[String, String]())
}
}
val inputStream:InputStream = fileSystem.read(new FsPath(logPath))
//val inputStream = new FileInputStream(logPath)
inputStream
}
override def getInputStream: InputStream = {
if (inputStream == null) lock.synchronized{
if (inputStream == null){
inputStream = createInputStream
}
}
inputStream
}
override protected def readLog(deal: String => Unit, fromLine: Int, size: Int): Int = {
if (!sharedCache.cachedLogs.nonEmpty) return super.readLog(deal, fromLine, size)
val min = sharedCache.cachedLogs.min
val max = sharedCache.cachedLogs.max
if(fromLine > max) return 0
var from = fromLine
val to = if(fromLine >= min) {
if(size >= 0 && max >= fromLine + size) fromLine + size else max + 1
} else {
//If you are getting it from a file, you don't need to read the cached data again. In this case, you can guarantee that the log will not be missing.
//如果是从文件中进行进行获取,就不需要对缓存的数据再进行读取,这样的话,可以保证日志是不会缺失的
val read = super.readLog(deal, fromLine, size)
return read
// from = min
// if(size < 0) max + 1
// else {
// if(read >= size) return size
// //如果是从
// //val left = size - read
// //deal("\n")
//
// //if(max >= min + left) min + left else max + 1
// }
}
(from until to) map sharedCache.cachedLogs.get foreach deal
to - fromLine
}
@throws[IOException]
override def close(): Unit = {
if (inputStream != null) {
Utils.tryQuietly(inputStream.close(), t => {
warn("Error encounters when closing inputStream.", t)
})
inputStream = null
}
if (fileSystem != null){
Utils.tryQuietly(fileSystem.close(), t => {
warn("Error encounters when closing fileSystem.", t)
})
fileSystem = null
}
}
}