blob: 067452b32c4f6df5e17e18b8d111550c03ca1575 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cassandra.db.commitlog;
import java.nio.ByteBuffer;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.Digest;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.rows.UnfilteredRowIterators;
import org.apache.cassandra.io.sstable.ISSTableScanner;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.spark.cdc.IPartitionUpdateWrapper;
import org.apache.cassandra.spark.reader.CdcScannerBuilder;
import org.apache.cassandra.spark.reader.Scannable;
import org.jetbrains.annotations.NotNull;
public class PartitionUpdateWrapper implements IPartitionUpdateWrapper, Scannable, Comparable<PartitionUpdateWrapper>
{
public final String keyspace;
public final String table;
private final TableMetadata tableMetadata;
private final PartitionUpdate update;
private final ByteBuffer digestBytes;
private final long maxTimestampMicros;
private final int dataSize;
public PartitionUpdateWrapper(TableMetadata tableMetadata, PartitionUpdate update, long maxTimestampMicros)
{
this.tableMetadata = tableMetadata;
this.update = update;
this.keyspace = update.metadata().keyspace;
this.table = update.metadata().name;
this.maxTimestampMicros = maxTimestampMicros;
Digest digest = Digest.forReadResponse();
UnfilteredRowIterators.digest(update.unfilteredIterator(), digest, MessagingService.current_version);
this.digestBytes = ByteBuffer.wrap(digest.digest());
this.dataSize = 18 /* = 8 + 4 + 2 + 4 */ + digestBytes.remaining() + update.dataSize();
}
// For deserialization
private PartitionUpdateWrapper(TableMetadata tableMetadata,
PartitionUpdate update,
String keyspace,
String table,
long maxTimestampMicros,
ByteBuffer digestBytes,
int dataSize)
{
this.tableMetadata = tableMetadata;
this.update = update;
this.keyspace = keyspace;
this.table = table;
this.maxTimestampMicros = maxTimestampMicros;
this.digestBytes = digestBytes;
this.dataSize = dataSize;
}
public DecoratedKey partitionKey()
{
return update.partitionKey();
}
@Override
public long maxTimestampMicros()
{
return maxTimestampMicros;
}
@Override
public int dataSize()
{
return dataSize;
}
public PartitionUpdate partitionUpdate()
{
return update;
}
public ByteBuffer digest()
{
return digestBytes;
}
@Override
public ISSTableScanner scanner()
{
return new CdcScannerBuilder.CdcScanner(tableMetadata, update);
}
// TODO: Add proper equals and hashCode impl for PartitionUpdate in OSS
@Override
public int hashCode()
{
return digestBytes.hashCode();
}
@Override
public boolean equals(Object other)
{
if (this == other)
{
return true;
}
else if (other instanceof PartitionUpdateWrapper)
{
return this.digestBytes.equals(((PartitionUpdateWrapper) other).digestBytes);
}
else
{
return false;
}
}
public int compareTo(@NotNull PartitionUpdateWrapper that)
{
return Long.compare(this.maxTimestampMicros, that.maxTimestampMicros);
}
public static class Serializer extends com.esotericsoftware.kryo.Serializer<PartitionUpdateWrapper>
{
final TableMetadata metadata;
final boolean includePartitionUpdate;
public Serializer(String keyspace, String table)
{
this(keyspace, table, false);
}
public Serializer(String keyspace, String table, boolean includePartitionUpdate)
{
this.metadata = Schema.instance.getTableMetadata(keyspace, table);
this.includePartitionUpdate = includePartitionUpdate;
}
public Serializer(TableMetadata metadata)
{
this(metadata, false);
}
public Serializer(TableMetadata metadata, boolean includePartitionUpdate)
{
this.metadata = metadata;
this.includePartitionUpdate = includePartitionUpdate;
}
@Override
public PartitionUpdateWrapper read(Kryo kryo, Input in, Class type)
{
long maxTimestampMicros = in.readLong();
int size = in.readInt();
// Read digest
ByteBuffer digest = ByteBuffer.wrap(in.readBytes(in.readShort()));
PartitionUpdate partitionUpdate = null;
if (includePartitionUpdate)
{
// Read partition update
partitionUpdate = PartitionUpdate.fromBytes(ByteBuffer.wrap(in.readBytes(in.readInt())), MessagingService.current_version);
}
String keyspace = in.readString();
String table = in.readString();
return new PartitionUpdateWrapper(metadata, partitionUpdate, keyspace, table, maxTimestampMicros, digest, size);
}
@Override
public void write(Kryo kryo, Output out, PartitionUpdateWrapper update)
{
out.writeLong(update.maxTimestampMicros); // 8 bytes
out.writeInt(update.dataSize()); // 4 bytes
// Write digest
byte[] bytes = new byte[update.digestBytes.remaining()];
update.digestBytes.get(bytes);
out.writeShort(bytes.length); // 2 bytes
out.writeBytes(bytes); // Variable bytes
update.digestBytes.clear();
// Write partition update
if (includePartitionUpdate)
{
ByteBuffer buffer = PartitionUpdate.toBytes(update.update, MessagingService.current_version);
byte[] array = new byte[buffer.remaining()];
buffer.get(array);
out.writeInt(array.length); // 4 bytes
out.writeBytes(array); // Variable bytes
}
out.writeString(update.keyspace);
out.writeString(update.table);
}
}
}