blob: c838e0930d0e3778f29dd53b795e2d10dfc39e88 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server
import java.nio.file.{FileSystems, Paths}
import java.util.regex.Pattern
import org.apache.kafka.common.utils.Utils
import scala.collection._
import kafka.utils.Logging
import kafka.common._
import java.io._
import org.apache.kafka.common.TopicPartition
object OffsetCheckpoint {
private val WhiteSpacesPattern = Pattern.compile("\\s+")
private val CurrentVersion = 0
}
/**
* This class saves out a map of topic/partition=>offsets to a file
*/
class OffsetCheckpoint(val file: File) extends Logging {
import OffsetCheckpoint._
private val path = file.toPath.toAbsolutePath
private val tempPath = Paths.get(path.toString + ".tmp")
private val lock = new Object()
file.createNewFile() // in case the file doesn't exist
def write(offsets: Map[TopicPartition, Long]) {
lock synchronized {
// write to temp file and then swap with the existing file
val fileOutputStream = new FileOutputStream(tempPath.toFile)
val writer = new BufferedWriter(new OutputStreamWriter(fileOutputStream))
try {
writer.write(CurrentVersion.toString)
writer.newLine()
writer.write(offsets.size.toString)
writer.newLine()
offsets.foreach { case (topicPart, offset) =>
writer.write(s"${topicPart.topic} ${topicPart.partition} $offset")
writer.newLine()
}
writer.flush()
fileOutputStream.getFD().sync()
} catch {
case e: FileNotFoundException =>
if (FileSystems.getDefault.isReadOnly) {
fatal("Halting writes to offset checkpoint file because the underlying file system is inaccessible : ", e)
Runtime.getRuntime.halt(1)
}
throw e
} finally {
writer.close()
}
Utils.atomicMoveWithFallback(tempPath, path)
}
}
def read(): Map[TopicPartition, Long] = {
def malformedLineException(line: String) =
new IOException(s"Malformed line in offset checkpoint file: $line'")
lock synchronized {
val reader = new BufferedReader(new FileReader(file))
var line: String = null
try {
line = reader.readLine()
if (line == null)
return Map.empty
val version = line.toInt
version match {
case CurrentVersion =>
line = reader.readLine()
if (line == null)
return Map.empty
val expectedSize = line.toInt
val offsets = mutable.Map[TopicPartition, Long]()
line = reader.readLine()
while (line != null) {
WhiteSpacesPattern.split(line) match {
case Array(topic, partition, offset) =>
offsets += new TopicPartition(topic, partition.toInt) -> offset.toLong
line = reader.readLine()
case _ => throw malformedLineException(line)
}
}
if (offsets.size != expectedSize)
throw new IOException(s"Expected $expectedSize entries but found only ${offsets.size}")
offsets
case _ =>
throw new IOException("Unrecognized version of the highwatermark checkpoint file: " + version)
}
} catch {
case _: NumberFormatException => throw malformedLineException(line)
} finally {
reader.close()
}
}
}
}