blob: defb51b738610f1bb04644df48e1e95b2f1ccc2c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.etl;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Map;
public class KafkaETLRequest {
public static long DEFAULT_OFFSET = -1;
public static String DELIM = "\t";
String _topic;
URI _uri;
int _partition;
long _offset = DEFAULT_OFFSET;
public KafkaETLRequest() {
}
public KafkaETLRequest(String input) throws IOException {
//System.out.println("Init request from " + input);
String[] pieces = input.trim().split(DELIM);
if (pieces.length != 4)
throw new IOException( input +
" : input must be in the form 'url" + DELIM +
"topic" + DELIM +"partition" + DELIM +"offset'");
try {
_uri = new URI (pieces[0]);
}catch (java.net.URISyntaxException e) {
throw new IOException (e);
}
_topic = pieces[1];
_partition = Integer.valueOf(pieces[2]);
_offset = Long.valueOf(pieces[3]);
}
public KafkaETLRequest(String node, String topic, String partition, String offset,
Map<String, String> nodes) throws IOException {
Integer nodeId = Integer.parseInt(node);
String uri = nodes.get(nodeId.toString());
if (uri == null) throw new IOException ("Cannot form node for id " + nodeId);
try {
_uri = new URI (uri);
}catch (java.net.URISyntaxException e) {
throw new IOException (e);
}
_topic = topic;
_partition = Integer.valueOf(partition);
_offset = Long.valueOf(offset);
}
public KafkaETLRequest(String topic, String uri, int partition) throws URISyntaxException {
_topic = topic;
_uri = new URI(uri);
_partition = partition;
}
public void setDefaultOffset() {
_offset = DEFAULT_OFFSET;
}
public void setOffset(long offset) {
_offset = offset;
}
public String getTopic() { return _topic;}
public URI getURI () { return _uri;}
public int getPartition() { return _partition;}
public long getOffset() { return _offset;}
public boolean isValidOffset() {
return _offset >= 0;
}
@Override
public boolean equals(Object o) {
if (! (o instanceof KafkaETLRequest))
return false;
KafkaETLRequest r = (KafkaETLRequest) o;
return this._topic.equals(r._topic) ||
this._uri.equals(r._uri) ||
this._partition == r._partition;
}
@Override
public int hashCode() {
return toString(0).hashCode();
}
@Override
public String toString() {
return toString(_offset);
}
public String toString (long offset) {
return
_uri + DELIM +
_topic + DELIM +
_partition + DELIM +
offset;
}
}