blob: f2b6a85c58956add31902a19c794ffa4a0b0c080 [file] [log] [blame]
/*
* Copyright 2019 WeBank
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.webank.wedatasphere.linkis.engine.spark.common
import java.io.InputStream
import java.util.concurrent.locks.ReentrantLock
import com.webank.wedatasphere.linkis.common.conf.CommonVars
import com.webank.wedatasphere.linkis.common.utils.{Logging, Utils}
import com.webank.wedatasphere.linkis.engine.configuration.{SparkConfiguration}
import org.apache.commons.io.IOUtils
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionContext, Future}
/**
* Created by allenlliu on 2018/11/19.
*/
class LineBufferedStream(inputStream: InputStream) extends Logging {
// private[this] var _lines: IndexedSeq[String] = IndexedSeq()
private implicit val executor: ExecutionContext = LineBufferedStream.executor
private[this] val _lock = new ReentrantLock()
private[this] val _condition = _lock.newCondition()
private[this] var _finished = false
val logSize = CommonVars("query.session.log.hold", 500).getValue
private[this] val logs = new LogContainer(logSize)
private val future = Future {
val iterator = IOUtils.lineIterator(inputStream, "utf-8")
while (iterator.hasNext && !_finished) {
logs.putLog(iterator.next())
notifyTo()
}
close
}
private def notifyTo() = {
_lock.lock()
Utils.tryFinally(_condition.signalAll()){ _lock.unlock()}
}
// private val thread = new Thread {
// setName("stdout")
//
// override def run() = {
// val lines = Source.fromInputStream(inputStream).getLines()
// for (line <- lines if continue) {
// _lock.lock()
// try {
// trace("stdout: ", line)
// logs.putLog(line)
// _condition.signalAll()
// } finally {
// _lock.unlock()
// }
// }
//
// _lock.lock()
// try {
// _finished = true
// _condition.signalAll()
// } finally {
// _lock.unlock()
// }
// }
// }
// thread.setDaemon(true)
// thread.start()
def lines: List[String] = logs.getLogs
def iterator: Iterator[String] = {
new LinesIterator
}
def close = {
// thread.continue = false
// thread.interrupt()
// thread.join()
_finished = true
notifyTo() //need to notify all.
IOUtils.closeQuietly(inputStream)
}
def waitForClose(atMost: Duration) = {
Utils.tryQuietly(Utils.waitUntil(() => future.isCompleted, atMost))
close
}
def waitForComplete = {
Utils.tryQuietly(Await.result(future, Duration.Inf))
close
}
private class LinesIterator extends Iterator[String] {
private[this] var index = 0
private[this] var _lines: List[String] = logs.getLogs
override def hasNext: Boolean = {
def getAndNext = {
_lines = logs.getLogs
index = 0
if(_lines.length > 0) {
true
} else {
false
}
}
if (_lines != null && index < _lines.length) {
true
} else {
if(getAndNext) return true
// Otherwise we might still have more data.
_lock.lock()
try {
if (_finished) {
false
} else {
_condition.await()
!_finished && getAndNext
}
} finally {
_lock.unlock()
}
}
}
override def next(): String = {
val line = _lines(index)
index += 1
line
}
}
}
object LineBufferedStream {
val executor = Utils.newCachedExecutionContext(SparkConfiguration.PROCESS_MAX_THREADS.getValue, "Shell-Command-stdout-")
}