| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package kafka.tools |
| |
| import java.io.BufferedReader |
| import java.io.FileReader |
| import joptsimple._ |
| import kafka.utils.{Logging, ZkUtils,ZKStringSerializer} |
| import org.I0Itec.zkclient.ZkClient |
| |
| |
| /** |
| * A utility that updates the offset of broker partitions in ZK. |
| * |
| * This utility expects 2 input files as arguments: |
| * 1. consumer properties file |
| * 2. a file contains partition offsets data such as: |
| * (This output data file can be obtained by running kafka.tools.ExportZkOffsets) |
| * |
| * /consumers/group1/offsets/topic1/3-0:285038193 |
| * /consumers/group1/offsets/topic1/1-0:286894308 |
| * |
| * To print debug message, add the following line to log4j.properties: |
| * log4j.logger.kafka.tools.ImportZkOffsets$=DEBUG |
| * (for eclipse debugging, copy log4j.properties to the binary directory in "core" such as core/bin) |
| */ |
| object ImportZkOffsets extends Logging { |
| |
| def main(args: Array[String]) { |
| val parser = new OptionParser |
| |
| val zkConnectOpt = parser.accepts("zkconnect", "ZooKeeper connect string.") |
| .withRequiredArg() |
| .defaultsTo("localhost:2181") |
| .ofType(classOf[String]) |
| val inFileOpt = parser.accepts("input-file", "Input file") |
| .withRequiredArg() |
| .ofType(classOf[String]) |
| parser.accepts("help", "Print this message.") |
| |
| val options = parser.parse(args : _*) |
| |
| if (options.has("help")) { |
| parser.printHelpOn(System.out) |
| System.exit(0) |
| } |
| |
| for (opt <- List(inFileOpt)) { |
| if (!options.has(opt)) { |
| System.err.println("Missing required argument: %s".format(opt)) |
| parser.printHelpOn(System.err) |
| System.exit(1) |
| } |
| } |
| |
| val zkConnect = options.valueOf(zkConnectOpt) |
| val partitionOffsetFile = options.valueOf(inFileOpt) |
| |
| val zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer) |
| val partitionOffsets: Map[String,String] = getPartitionOffsetsFromFile(partitionOffsetFile) |
| |
| updateZkOffsets(zkClient, partitionOffsets) |
| } |
| |
| private def getPartitionOffsetsFromFile(filename: String):Map[String,String] = { |
| val fr = new FileReader(filename) |
| val br = new BufferedReader(fr) |
| var partOffsetsMap: Map[String,String] = Map() |
| |
| var s: String = br.readLine() |
| while ( s != null && s.length() >= 1) { |
| val tokens = s.split(":") |
| |
| partOffsetsMap += tokens(0) -> tokens(1) |
| debug("adding node path [" + s + "]") |
| |
| s = br.readLine() |
| } |
| |
| return partOffsetsMap |
| } |
| |
| private def updateZkOffsets(zkClient: ZkClient, partitionOffsets: Map[String,String]): Unit = { |
| val cluster = ZkUtils.getCluster(zkClient) |
| var partitions: List[String] = Nil |
| |
| for ((partition, offset) <- partitionOffsets) { |
| debug("updating [" + partition + "] with offset [" + offset + "]") |
| |
| try { |
| ZkUtils.updatePersistentPath(zkClient, partition, offset.toString) |
| } catch { |
| case e => e.printStackTrace() |
| } |
| } |
| } |
| } |