blob: 0486e57ef7117b1f1f6e33885250d5d91e43352b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor.internals.assignment;
import org.apache.kafka.common.record.ByteBufferInputStream;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.errors.TaskAssignmentException;
import org.apache.kafka.streams.processor.TaskId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class AssignmentInfo {
private static final Logger log = LoggerFactory.getLogger(AssignmentInfo.class);
public final int version;
public final List<TaskId> activeTasks; // each element corresponds to a partition
public final Map<TaskId, Set<TopicPartition>> standbyTasks;
public AssignmentInfo(List<TaskId> activeTasks, Map<TaskId, Set<TopicPartition>> standbyTasks) {
this(1, activeTasks, standbyTasks);
}
protected AssignmentInfo(int version, List<TaskId> activeTasks, Map<TaskId, Set<TopicPartition>> standbyTasks) {
this.version = version;
this.activeTasks = activeTasks;
this.standbyTasks = standbyTasks;
}
/**
* @throws TaskAssignmentException if method fails to encode the data, e.g., if there is an
* IO exception during encoding
*/
public ByteBuffer encode() {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream out = new DataOutputStream(baos);
try {
if (version == 1) {
// Encode version
out.writeInt(1);
// Encode active tasks
out.writeInt(activeTasks.size());
for (TaskId id : activeTasks) {
id.writeTo(out);
}
// Encode standby tasks
out.writeInt(standbyTasks.size());
for (Map.Entry<TaskId, Set<TopicPartition>> entry : standbyTasks.entrySet()) {
TaskId id = entry.getKey();
id.writeTo(out);
Set<TopicPartition> partitions = entry.getValue();
out.writeInt(partitions.size());
for (TopicPartition partition : partitions) {
out.writeUTF(partition.topic());
out.writeInt(partition.partition());
}
}
out.flush();
out.close();
return ByteBuffer.wrap(baos.toByteArray());
} else {
TaskAssignmentException ex = new TaskAssignmentException("Unable to encode assignment data: version=" + version);
log.error(ex.getMessage(), ex);
throw ex;
}
} catch (IOException ex) {
throw new TaskAssignmentException("Failed to encode AssignmentInfo", ex);
}
}
/**
* @throws TaskAssignmentException if method fails to decode the data or if the data version is unknown
*/
public static AssignmentInfo decode(ByteBuffer data) {
// ensure we are at the beginning of the ByteBuffer
data.rewind();
DataInputStream in = new DataInputStream(new ByteBufferInputStream(data));
try {
// Decode version
int version = in.readInt();
if (version == 1) {
// Decode active tasks
int count = in.readInt();
List<TaskId> activeTasks = new ArrayList<>(count);
for (int i = 0; i < count; i++) {
activeTasks.add(TaskId.readFrom(in));
}
// Decode standby tasks
count = in.readInt();
Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>(count);
for (int i = 0; i < count; i++) {
TaskId id = TaskId.readFrom(in);
int numPartitions = in.readInt();
Set<TopicPartition> partitions = new HashSet<>(numPartitions);
for (int j = 0; j < numPartitions; j++) {
partitions.add(new TopicPartition(in.readUTF(), in.readInt()));
}
standbyTasks.put(id, partitions);
}
return new AssignmentInfo(activeTasks, standbyTasks);
} else {
TaskAssignmentException ex = new TaskAssignmentException("Unknown assignment data version: " + version);
log.error(ex.getMessage(), ex);
throw ex;
}
} catch (IOException ex) {
throw new TaskAssignmentException("Failed to decode AssignmentInfo", ex);
}
}
@Override
public int hashCode() {
return version ^ activeTasks.hashCode() ^ standbyTasks.hashCode();
}
@Override
public boolean equals(Object o) {
if (o instanceof AssignmentInfo) {
AssignmentInfo other = (AssignmentInfo) o;
return this.version == other.version &&
this.activeTasks.equals(other.activeTasks) &&
this.standbyTasks.equals(other.standbyTasks);
} else {
return false;
}
}
@Override
public String toString() {
return "[version=" + version + ", active tasks=" + activeTasks.size() + ", standby tasks=" + standbyTasks.size() + "]";
}
}