blob: 96f3779cb69337e36b05359befcdaed1a350570f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor.internals.assignment;
import org.apache.kafka.streams.errors.TaskAssignmentException;
import org.apache.kafka.streams.processor.TaskId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
public class SubscriptionInfo {
private static final Logger log = LoggerFactory.getLogger(SubscriptionInfo.class);
public static final int LATEST_SUPPORTED_VERSION = 4;
static final int UNKNOWN = -1;
private final int usedVersion;
private final int latestSupportedVersion;
private UUID processId;
private Set<TaskId> prevTasks;
private Set<TaskId> standbyTasks;
private String userEndPoint;
// used for decoding; don't apply version checks
private SubscriptionInfo(final int version,
final int latestSupportedVersion) {
this.usedVersion = version;
this.latestSupportedVersion = latestSupportedVersion;
}
public SubscriptionInfo(final UUID processId,
final Set<TaskId> prevTasks,
final Set<TaskId> standbyTasks,
final String userEndPoint) {
this(LATEST_SUPPORTED_VERSION, processId, prevTasks, standbyTasks, userEndPoint);
}
public SubscriptionInfo(final int version,
final UUID processId,
final Set<TaskId> prevTasks,
final Set<TaskId> standbyTasks,
final String userEndPoint) {
this(version, LATEST_SUPPORTED_VERSION, processId, prevTasks, standbyTasks, userEndPoint);
if (version < 1 || version > LATEST_SUPPORTED_VERSION) {
throw new IllegalArgumentException("version must be between 1 and " + LATEST_SUPPORTED_VERSION
+ "; was: " + version);
}
}
// for testing only; don't apply version checks
protected SubscriptionInfo(final int version,
final int latestSupportedVersion,
final UUID processId,
final Set<TaskId> prevTasks,
final Set<TaskId> standbyTasks,
final String userEndPoint) {
this.usedVersion = version;
this.latestSupportedVersion = latestSupportedVersion;
this.processId = processId;
this.prevTasks = prevTasks;
this.standbyTasks = standbyTasks;
this.userEndPoint = userEndPoint;
}
public int version() {
return usedVersion;
}
public int latestSupportedVersion() {
return latestSupportedVersion;
}
public UUID processId() {
return processId;
}
public Set<TaskId> prevTasks() {
return prevTasks;
}
public Set<TaskId> standbyTasks() {
return standbyTasks;
}
public String userEndPoint() {
return userEndPoint;
}
/**
* @throws TaskAssignmentException if method fails to encode the data
*/
public ByteBuffer encode() {
final ByteBuffer buf;
switch (usedVersion) {
case 1:
buf = encodeVersionOne();
break;
case 2:
buf = encodeVersionTwo();
break;
case 3:
buf = encodeVersionThree();
break;
case 4:
buf = encodeVersionFour();
break;
default:
throw new IllegalStateException("Unknown metadata version: " + usedVersion
+ "; latest supported version: " + LATEST_SUPPORTED_VERSION);
}
buf.rewind();
return buf;
}
private ByteBuffer encodeVersionOne() {
final ByteBuffer buf = ByteBuffer.allocate(getVersionOneByteLength());
buf.putInt(1); // version
encodeClientUUID(buf);
encodeTasks(buf, prevTasks);
encodeTasks(buf, standbyTasks);
return buf;
}
private int getVersionOneByteLength() {
return 4 + // version
16 + // client ID
4 + prevTasks.size() * 8 + // length + prev tasks
4 + standbyTasks.size() * 8; // length + standby tasks
}
protected void encodeClientUUID(final ByteBuffer buf) {
buf.putLong(processId.getMostSignificantBits());
buf.putLong(processId.getLeastSignificantBits());
}
protected void encodeTasks(final ByteBuffer buf,
final Collection<TaskId> taskIds) {
buf.putInt(taskIds.size());
for (final TaskId id : taskIds) {
id.writeTo(buf);
}
}
protected byte[] prepareUserEndPoint() {
if (userEndPoint == null) {
return new byte[0];
} else {
return userEndPoint.getBytes(StandardCharsets.UTF_8);
}
}
private ByteBuffer encodeVersionTwo() {
final byte[] endPointBytes = prepareUserEndPoint();
final ByteBuffer buf = ByteBuffer.allocate(getVersionTwoByteLength(endPointBytes));
buf.putInt(2); // version
encodeClientUUID(buf);
encodeTasks(buf, prevTasks);
encodeTasks(buf, standbyTasks);
encodeUserEndPoint(buf, endPointBytes);
return buf;
}
private int getVersionTwoByteLength(final byte[] endPointBytes) {
return 4 + // version
16 + // client ID
4 + prevTasks.size() * 8 + // length + prev tasks
4 + standbyTasks.size() * 8 + // length + standby tasks
4 + endPointBytes.length; // length + userEndPoint
}
protected void encodeUserEndPoint(final ByteBuffer buf,
final byte[] endPointBytes) {
if (endPointBytes != null) {
buf.putInt(endPointBytes.length);
buf.put(endPointBytes);
}
}
private ByteBuffer encodeVersionThree() {
final byte[] endPointBytes = prepareUserEndPoint();
final ByteBuffer buf = ByteBuffer.allocate(getVersionThreeAndFourByteLength(endPointBytes));
buf.putInt(3);
buf.putInt(latestSupportedVersion);
encodeClientUUID(buf);
encodeTasks(buf, prevTasks);
encodeTasks(buf, standbyTasks);
encodeUserEndPoint(buf, endPointBytes);
return buf;
}
private ByteBuffer encodeVersionFour() {
final byte[] endPointBytes = prepareUserEndPoint();
final ByteBuffer buf = ByteBuffer.allocate(getVersionThreeAndFourByteLength(endPointBytes));
buf.putInt(4); // used version
buf.putInt(latestSupportedVersion); // supported version
encodeClientUUID(buf);
encodeTasks(buf, prevTasks);
encodeTasks(buf, standbyTasks);
encodeUserEndPoint(buf, endPointBytes);
return buf;
}
protected int getVersionThreeAndFourByteLength(final byte[] endPointBytes) {
return 4 + // used version
4 + // latest supported version version
16 + // client ID
4 + prevTasks.size() * 8 + // length + prev tasks
4 + standbyTasks.size() * 8 + // length + standby tasks
4 + endPointBytes.length; // length + userEndPoint
}
/**
* @throws TaskAssignmentException if method fails to decode the data
*/
public static SubscriptionInfo decode(final ByteBuffer data) {
final SubscriptionInfo subscriptionInfo;
// ensure we are at the beginning of the ByteBuffer
data.rewind();
final int usedVersion = data.getInt();
final int latestSupportedVersion;
switch (usedVersion) {
case 1:
subscriptionInfo = new SubscriptionInfo(usedVersion, UNKNOWN);
decodeVersionOneData(subscriptionInfo, data);
break;
case 2:
subscriptionInfo = new SubscriptionInfo(usedVersion, UNKNOWN);
decodeVersionTwoData(subscriptionInfo, data);
break;
case 3:
case 4:
latestSupportedVersion = data.getInt();
subscriptionInfo = new SubscriptionInfo(usedVersion, latestSupportedVersion);
decodeVersionThreeData(subscriptionInfo, data);
break;
default:
latestSupportedVersion = data.getInt();
subscriptionInfo = new SubscriptionInfo(usedVersion, latestSupportedVersion);
log.info("Unable to decode subscription data: used version: {}; latest supported version: {}", usedVersion, latestSupportedVersion);
}
return subscriptionInfo;
}
private static void decodeVersionOneData(final SubscriptionInfo subscriptionInfo,
final ByteBuffer data) {
decodeClientUUID(subscriptionInfo, data);
decodeTasks(subscriptionInfo, data);
}
private static void decodeClientUUID(final SubscriptionInfo subscriptionInfo,
final ByteBuffer data) {
subscriptionInfo.processId = new UUID(data.getLong(), data.getLong());
}
private static void decodeTasks(final SubscriptionInfo subscriptionInfo,
final ByteBuffer data) {
subscriptionInfo.prevTasks = new HashSet<>();
final int numPrevTasks = data.getInt();
for (int i = 0; i < numPrevTasks; i++) {
subscriptionInfo.prevTasks.add(TaskId.readFrom(data));
}
subscriptionInfo.standbyTasks = new HashSet<>();
final int numStandbyTasks = data.getInt();
for (int i = 0; i < numStandbyTasks; i++) {
subscriptionInfo.standbyTasks.add(TaskId.readFrom(data));
}
}
private static void decodeVersionTwoData(final SubscriptionInfo subscriptionInfo,
final ByteBuffer data) {
decodeClientUUID(subscriptionInfo, data);
decodeTasks(subscriptionInfo, data);
decodeUserEndPoint(subscriptionInfo, data);
}
private static void decodeUserEndPoint(final SubscriptionInfo subscriptionInfo,
final ByteBuffer data) {
final int bytesLength = data.getInt();
if (bytesLength != 0) {
final byte[] bytes = new byte[bytesLength];
data.get(bytes);
subscriptionInfo.userEndPoint = new String(bytes, StandardCharsets.UTF_8);
}
}
private static void decodeVersionThreeData(final SubscriptionInfo subscriptionInfo,
final ByteBuffer data) {
decodeClientUUID(subscriptionInfo, data);
decodeTasks(subscriptionInfo, data);
decodeUserEndPoint(subscriptionInfo, data);
}
@Override
public int hashCode() {
final int hashCode = usedVersion ^ latestSupportedVersion ^ processId.hashCode() ^ prevTasks.hashCode() ^ standbyTasks.hashCode();
if (userEndPoint == null) {
return hashCode;
}
return hashCode ^ userEndPoint.hashCode();
}
@Override
public boolean equals(final Object o) {
if (o instanceof SubscriptionInfo) {
final SubscriptionInfo other = (SubscriptionInfo) o;
return this.usedVersion == other.usedVersion &&
this.latestSupportedVersion == other.latestSupportedVersion &&
this.processId.equals(other.processId) &&
this.prevTasks.equals(other.prevTasks) &&
this.standbyTasks.equals(other.standbyTasks) &&
this.userEndPoint != null ? this.userEndPoint.equals(other.userEndPoint) : other.userEndPoint == null;
} else {
return false;
}
}
@Override
public String toString() {
return "[version=" + usedVersion
+ ", supported version=" + latestSupportedVersion
+ ", process ID=" + processId
+ ", prev tasks=" + prevTasks
+ ", standby tasks=" + standbyTasks
+ ", user endpoint=" + userEndPoint + "]";
}
}