blob: f7c9b87aa048f082e689018ad43c6bbf3e4b2e4c [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.store.berkeleydb.upgrade;
import java.nio.ByteBuffer;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import com.sleepycat.bind.tuple.ByteBinding;
import com.sleepycat.bind.tuple.LongBinding;
import com.sleepycat.bind.tuple.TupleBase;
import com.sleepycat.bind.tuple.TupleBinding;
import com.sleepycat.bind.tuple.TupleInput;
import com.sleepycat.bind.tuple.TupleOutput;
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseEntry;
import com.sleepycat.je.Environment;
import com.sleepycat.je.Transaction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.filter.AMQPFilterTypes;
import org.apache.qpid.server.exchange.ExchangeDefaults;
import org.apache.qpid.server.protocol.v0_8.AMQFrameDecodingException;
import org.apache.qpid.server.protocol.v0_8.FieldTableFactory;
import org.apache.qpid.server.protocol.v0_8.transport.AMQProtocolVersionException;
import org.apache.qpid.server.protocol.v0_8.AMQShortString;
import org.apache.qpid.server.protocol.v0_8.transport.ContentHeaderBody;
import org.apache.qpid.server.protocol.v0_8.FieldTable;
import org.apache.qpid.server.protocol.v0_8.transport.MessagePublishInfo;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.protocol.v0_8.MessageMetaData;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
import org.apache.qpid.server.store.berkeleydb.FieldTableEncoding;
public class UpgradeFrom4To5 extends AbstractStoreUpgrade
{
private static final String OLD_DELIVERY_DB = "deliveryDb_v4";
private static final String NEW_DELIVERY_DB = "deliveryDb_v5";
private static final String EXCHANGE_DB_NAME = "exchangeDb_v4";
private static final String OLD_BINDINGS_DB_NAME = "queueBindingsDb_v4";
private static final String NEW_BINDINGS_DB_NAME = "queueBindingsDb_v5";
private static final String OLD_QUEUE_DB_NAME = "queueDb_v4";
private static final String NEW_QUEUE_DB_NAME = "queueDb_v5";
private static final String OLD_METADATA_DB_NAME = "messageMetaDataDb_v4";
private static final String NEW_METADATA_DB_NAME = "messageMetaDataDb_v5";
private static final String OLD_CONTENT_DB_NAME = "messageContentDb_v4";
private static final String NEW_CONTENT_DB_NAME = "messageContentDb_v5";
private static final byte COLON = (byte) ':';
private static final Logger LOGGER = LoggerFactory.getLogger(UpgradeFrom4To5.class);
@Override
public void performUpgrade(final Environment environment, final UpgradeInteractionHandler handler, ConfiguredObject<?> parent)
{
Transaction transaction = null;
reportStarting(environment, 4);
transaction = environment.beginTransaction(null, null);
// find all queues which are bound to a topic exchange and which have a colon in their name
final List<AMQShortString> potentialDurableSubs = findPotentialDurableSubscriptions(environment, transaction);
Set<String> existingQueues = upgradeQueues(environment, handler, potentialDurableSubs, transaction);
upgradeQueueBindings(environment, handler, potentialDurableSubs, transaction);
Set<Long> messagesToDiscard = upgradeDelivery(environment, existingQueues, handler, transaction);
upgradeContent(environment, handler, messagesToDiscard, transaction);
upgradeMetaData(environment, handler, messagesToDiscard, transaction);
renameRemainingDatabases(environment, handler, transaction);
transaction.commit();
reportFinished(environment, 5);
}
private void upgradeQueueBindings(Environment environment, UpgradeInteractionHandler handler, final List<AMQShortString> potentialDurableSubs,
Transaction transaction)
{
if (environment.getDatabaseNames().contains(OLD_BINDINGS_DB_NAME))
{
LOGGER.info("Queue Bindings");
final BindingTuple bindingTuple = new BindingTuple();
CursorOperation databaseOperation = new CursorOperation()
{
@Override
public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction,
DatabaseEntry key, DatabaseEntry value)
{
// All the information required in binding entries is actually in the *key* not value.
BindingRecord oldBindingRecord = bindingTuple.entryToObject(key);
AMQShortString queueName = oldBindingRecord.getQueueName();
AMQShortString exchangeName = oldBindingRecord.getExchangeName();
AMQShortString routingKey = oldBindingRecord.getRoutingKey();
FieldTable arguments = oldBindingRecord.getArguments();
if (LOGGER.isDebugEnabled())
{
LOGGER.debug(String.format(
"Processing binding for queue %s, exchange %s, routingKey %s arguments %s", queueName,
exchangeName, routingKey, arguments));
}
// if the queue name is in the gathered list then inspect its binding arguments
// only topic exchange should have a JMS selector key in binding
if (potentialDurableSubs.contains(queueName)
&& exchangeName.equals(AMQShortString.valueOf(ExchangeDefaults.TOPIC_EXCHANGE_NAME)))
{
Map<String, Object> argumentsMap = new HashMap<>();
if (arguments != null)
{
argumentsMap.putAll(FieldTable.convertToMap(arguments));
}
String selectorFilterKey = AMQPFilterTypes.JMS_SELECTOR.getValue();
if (!argumentsMap.containsKey(selectorFilterKey))
{
if (LOGGER.isDebugEnabled())
{
LOGGER.info("adding the empty string (i.e. 'no selector') value for " + queueName
+ " and exchange " + exchangeName);
}
argumentsMap.put(selectorFilterKey, "");
}
arguments = FieldTable.convertToFieldTable(argumentsMap);
}
addBindingToDatabase(bindingTuple, targetDatabase, transaction, queueName, exchangeName, routingKey,
arguments);
}
};
new DatabaseTemplate(environment, OLD_BINDINGS_DB_NAME, NEW_BINDINGS_DB_NAME, transaction)
.run(databaseOperation);
environment.removeDatabase(transaction, OLD_BINDINGS_DB_NAME);
LOGGER.info(databaseOperation.getRowCount() + " Queue Binding entries");
}
}
private Set<String> upgradeQueues(final Environment environment, final UpgradeInteractionHandler handler,
List<AMQShortString> potentialDurableSubs, Transaction transaction)
{
LOGGER.info("Queues");
final Set<String> existingQueues = new HashSet<String>();
if (environment.getDatabaseNames().contains(OLD_QUEUE_DB_NAME))
{
final QueueRecordBinding binding = new QueueRecordBinding(potentialDurableSubs);
CursorOperation databaseOperation = new CursorOperation()
{
@Override
public void processEntry(final Database sourceDatabase, final Database targetDatabase,
final Transaction transaction, final DatabaseEntry key, final DatabaseEntry value)
{
QueueRecord record = binding.entryToObject(value);
DatabaseEntry newValue = new DatabaseEntry();
binding.objectToEntry(record, newValue);
targetDatabase.put(transaction, key, newValue);
existingQueues.add(record.getNameShortString().toString());
sourceDatabase.delete(transaction, key);
}
};
new DatabaseTemplate(environment, OLD_QUEUE_DB_NAME, NEW_QUEUE_DB_NAME, transaction).run(databaseOperation);
environment.removeDatabase(transaction, OLD_QUEUE_DB_NAME);
LOGGER.info(databaseOperation.getRowCount() + " Queue entries");
}
return existingQueues;
}
private List<AMQShortString> findPotentialDurableSubscriptions(final Environment environment,
Transaction transaction)
{
final List<AMQShortString> exchangeNames = findTopicExchanges(environment);
final List<AMQShortString> queues = new ArrayList<AMQShortString>();
final PartialBindingRecordBinding binding = new PartialBindingRecordBinding();
CursorOperation databaseOperation = new CursorOperation()
{
@Override
public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction,
DatabaseEntry key, DatabaseEntry value)
{
PartialBindingRecord record = binding.entryToObject(key);
if (exchangeNames.contains(record.getExchangeName()) && record.getQueueName().contains(COLON))
{
queues.add(record.getQueueName());
}
}
};
new DatabaseTemplate(environment, OLD_BINDINGS_DB_NAME, transaction).run(databaseOperation);
return queues;
}
private Set<Long> upgradeDelivery(final Environment environment, final Set<String> existingQueues,
final UpgradeInteractionHandler handler, Transaction transaction)
{
final Set<Long> messagesToDiscard = new HashSet<Long>();
final Set<String> queuesToDiscard = new HashSet<String>();
final QueueEntryKeyBinding queueEntryKeyBinding = new QueueEntryKeyBinding();
LOGGER.info("Delivery Records");
CursorOperation databaseOperation = new CursorOperation()
{
@Override
public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction,
DatabaseEntry key, DatabaseEntry value)
{
QueueEntryKey entryKey = queueEntryKeyBinding.entryToObject(key);
Long messageId = entryKey.getMessageId();
final String queueName = entryKey.getQueueName().toString();
if (!existingQueues.contains(queueName))
{
if (queuesToDiscard.contains(queueName))
{
messagesToDiscard.add(messageId);
}
else
{
String lineSeparator = System.getProperty("line.separator");
String question = MessageFormat.format("Found persistent messages for non-durable queue ''{1}''. "
+ " Do you with to create this queue and move all the messages into it?" + lineSeparator
+ "NOTE: Answering No will result in these messages being discarded!", queueName);
UpgradeInteractionResponse response = handler.requireResponse(question,
UpgradeInteractionResponse.YES, UpgradeInteractionResponse.YES,
UpgradeInteractionResponse.NO, UpgradeInteractionResponse.ABORT);
if (response == UpgradeInteractionResponse.YES)
{
createQueue(environment, transaction, queueName);
existingQueues.add(queueName);
}
else if (response == UpgradeInteractionResponse.NO)
{
queuesToDiscard.add(queueName);
messagesToDiscard.add(messageId);
}
else
{
throw new StoreException("Unable is aborted!");
}
}
}
if (!messagesToDiscard.contains(messageId))
{
DatabaseEntry newKey = new DatabaseEntry();
queueEntryKeyBinding.objectToEntry(entryKey, newKey);
targetDatabase.put(transaction, newKey, value);
}
}
};
new DatabaseTemplate(environment, OLD_DELIVERY_DB, NEW_DELIVERY_DB, transaction).run(databaseOperation);
if (!messagesToDiscard.isEmpty())
{
databaseOperation = new CursorOperation()
{
@Override
public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction,
DatabaseEntry key, DatabaseEntry value)
{
QueueEntryKey entryKey = queueEntryKeyBinding.entryToObject(key);
Long messageId = entryKey.getMessageId();
if (messagesToDiscard.contains(messageId))
{
messagesToDiscard.remove(messageId);
}
}
};
new DatabaseTemplate(environment, NEW_DELIVERY_DB, transaction).run(databaseOperation);
}
LOGGER.info(databaseOperation.getRowCount() + " Delivery Records entries ");
environment.removeDatabase(transaction, OLD_DELIVERY_DB);
return messagesToDiscard;
}
protected void createQueue(final Environment environment, Transaction transaction, final String queueName)
{
final QueueRecordBinding binding = new QueueRecordBinding(null);
final BindingTuple bindingTuple = new BindingTuple();
DatabaseRunnable queueCreateOperation = new DatabaseRunnable()
{
@Override
public void run(Database newQueueDatabase, Database newBindingsDatabase, Transaction transaction)
{
AMQShortString queueNameAMQ = AMQShortString.createAMQShortString(queueName);
QueueRecord record = new QueueRecord(queueNameAMQ, null, false, null);
DatabaseEntry key = new DatabaseEntry();
TupleOutput output = new TupleOutput();
AMQShortStringEncoding.writeShortString(record.getNameShortString(), output);
TupleBase.outputToEntry(output, key);
DatabaseEntry newValue = new DatabaseEntry();
binding.objectToEntry(record, newValue);
newQueueDatabase.put(transaction, key, newValue);
FieldTable emptyArguments = FieldTableFactory.createFieldTable(Collections.emptyMap());
addBindingToDatabase(bindingTuple, newBindingsDatabase, transaction, queueNameAMQ,
AMQShortString.valueOf(ExchangeDefaults.DIRECT_EXCHANGE_NAME), queueNameAMQ, emptyArguments);
// TODO QPID-3490 we should not persist a default exchange binding
addBindingToDatabase(bindingTuple, newBindingsDatabase, transaction, queueNameAMQ,
AMQShortString.valueOf(ExchangeDefaults.DEFAULT_EXCHANGE_NAME), queueNameAMQ, emptyArguments);
}
};
new DatabaseTemplate(environment, NEW_QUEUE_DB_NAME, NEW_BINDINGS_DB_NAME, transaction).run(queueCreateOperation);
}
private List<AMQShortString> findTopicExchanges(final Environment environment)
{
final List<AMQShortString> topicExchanges = new ArrayList<AMQShortString>();
final ExchangeRecordBinding binding = new ExchangeRecordBinding();
CursorOperation databaseOperation = new CursorOperation()
{
@Override
public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction,
DatabaseEntry key, DatabaseEntry value)
{
ExchangeRecord record = binding.entryToObject(value);
if (AMQShortString.valueOf(ExchangeDefaults.TOPIC_EXCHANGE_CLASS).equals(record.getType()))
{
topicExchanges.add(record.getName());
}
}
};
new DatabaseTemplate(environment, EXCHANGE_DB_NAME, null).run(databaseOperation);
return topicExchanges;
}
private void upgradeMetaData(final Environment environment, final UpgradeInteractionHandler handler,
final Set<Long> messagesToDiscard, Transaction transaction)
{
LOGGER.info("Message MetaData");
if (environment.getDatabaseNames().contains(OLD_METADATA_DB_NAME))
{
final MessageMetaDataBinding binding = new MessageMetaDataBinding();
CursorOperation databaseOperation = new CursorOperation()
{
@Override
public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction,
DatabaseEntry key, DatabaseEntry value)
{
StorableMessageMetaData metaData = binding.entryToObject(value);
// get message id
Long messageId = LongBinding.entryToLong(key);
// ONLY copy data if message is delivered to existing queue
if (messagesToDiscard.contains(messageId))
{
return;
}
DatabaseEntry newValue = new DatabaseEntry();
binding.objectToEntry(metaData, newValue);
targetDatabase.put(transaction, key, newValue);
targetDatabase.put(transaction, key, newValue);
deleteCurrent();
}
};
new DatabaseTemplate(environment, OLD_METADATA_DB_NAME, NEW_METADATA_DB_NAME, transaction)
.run(databaseOperation);
environment.removeDatabase(transaction, OLD_METADATA_DB_NAME);
LOGGER.info(databaseOperation.getRowCount() + " Message MetaData entries");
}
}
private void upgradeContent(final Environment environment, final UpgradeInteractionHandler handler,
final Set<Long> messagesToDiscard, Transaction transaction)
{
LOGGER.info("Message Contents");
if (environment.getDatabaseNames().contains(OLD_CONTENT_DB_NAME))
{
final MessageContentKeyBinding keyBinding = new MessageContentKeyBinding();
final ContentBinding contentBinding = new ContentBinding();
CursorOperation cursorOperation = new CursorOperation()
{
private long _prevMsgId = -1;
private int _bytesSeenSoFar;
@Override
public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction,
DatabaseEntry key, DatabaseEntry value)
{
// determine the msgId of the current entry
MessageContentKey contentKey = keyBinding.entryToObject(key);
long msgId = contentKey.getMessageId();
// ONLY copy data if message is delivered to existing queue
if (messagesToDiscard.contains(msgId))
{
return;
}
// if this is a new message, restart the byte offset count.
if (_prevMsgId != msgId)
{
_bytesSeenSoFar = 0;
}
// determine the content size
ByteBuffer content = contentBinding.entryToObject(value);
int contentSize = content.limit();
// create the new key: id + previously seen data count
MessageContentKey newKey = new MessageContentKey(msgId, _bytesSeenSoFar);
DatabaseEntry newKeyEntry = new DatabaseEntry();
keyBinding.objectToEntry(newKey, newKeyEntry);
DatabaseEntry newValueEntry = new DatabaseEntry();
contentBinding.objectToEntry(content, newValueEntry);
targetDatabase.put(null, newKeyEntry, newValueEntry);
_prevMsgId = msgId;
_bytesSeenSoFar += contentSize;
}
};
new DatabaseTemplate(environment, OLD_CONTENT_DB_NAME, NEW_CONTENT_DB_NAME, transaction).run(cursorOperation);
environment.removeDatabase(transaction, OLD_CONTENT_DB_NAME);
LOGGER.info(cursorOperation.getRowCount() + " Message Content entries");
}
}
/**
* For all databases which haven't been otherwise upgraded, we still need to
* rename them from _v4 to _v5
*/
private void renameRemainingDatabases(final Environment environment, final UpgradeInteractionHandler handler,
Transaction transaction)
{
for (String dbName : environment.getDatabaseNames())
{
if (dbName.endsWith("_v4"))
{
String newName = dbName.substring(0, dbName.length() - 3) + "_v5";
LOGGER.info("Renaming " + dbName + " into " + newName);
environment.renameDatabase(transaction, dbName, newName);
}
}
}
private void addBindingToDatabase(final BindingTuple bindingTuple, Database targetDatabase, Transaction transaction,
AMQShortString queueName, AMQShortString exchangeName, AMQShortString routingKey, FieldTable arguments)
{
DatabaseEntry newKey = new DatabaseEntry();
bindingTuple.objectToEntry(new BindingRecord(exchangeName, queueName, routingKey, arguments), newKey);
DatabaseEntry newValue = new DatabaseEntry();
ByteBinding.byteToEntry((byte) 0, newValue);
targetDatabase.put(transaction, newKey, newValue);
}
private static final class ExchangeRecord
{
private final AMQShortString _name;
private final AMQShortString _type;
private ExchangeRecord(final AMQShortString name, final AMQShortString type)
{
_name = name;
_type = type;
}
public AMQShortString getName()
{
return _name;
}
public AMQShortString getType()
{
return _type;
}
}
private static final class ExchangeRecordBinding extends TupleBinding<ExchangeRecord>
{
@Override
public ExchangeRecord entryToObject(final TupleInput input)
{
return new ExchangeRecord(AMQShortStringEncoding.readShortString(input),
AMQShortStringEncoding.readShortString(input));
}
@Override
public void objectToEntry(final ExchangeRecord object, final TupleOutput output)
{
AMQShortStringEncoding.writeShortString(object.getName(), output);
AMQShortStringEncoding.writeShortString(object.getType(), output);
output.writeBoolean(false);
}
}
private static final class PartialBindingRecord
{
private final AMQShortString _exchangeName;
private final AMQShortString _queueName;
private PartialBindingRecord(final AMQShortString name, final AMQShortString type)
{
_exchangeName = name;
_queueName = type;
}
public AMQShortString getExchangeName()
{
return _exchangeName;
}
public AMQShortString getQueueName()
{
return _queueName;
}
}
private static final class PartialBindingRecordBinding extends TupleBinding<PartialBindingRecord>
{
@Override
public PartialBindingRecord entryToObject(final TupleInput input)
{
return new PartialBindingRecord(AMQShortStringEncoding.readShortString(input),
AMQShortStringEncoding.readShortString(input));
}
@Override
public void objectToEntry(final PartialBindingRecord object, final TupleOutput output)
{
throw new UnsupportedOperationException();
}
}
static final class QueueRecord
{
private final AMQShortString _queueName;
private final AMQShortString _owner;
private final FieldTable _arguments;
private final boolean _exclusive;
public QueueRecord(AMQShortString queueName, AMQShortString owner, boolean exclusive, FieldTable arguments)
{
_queueName = queueName;
_owner = owner;
_exclusive = exclusive;
_arguments = arguments;
}
public AMQShortString getNameShortString()
{
return _queueName;
}
public AMQShortString getOwner()
{
return _owner;
}
public boolean isExclusive()
{
return _exclusive;
}
public FieldTable getArguments()
{
return _arguments;
}
}
static final class QueueRecordBinding extends TupleBinding<QueueRecord>
{
private final List<AMQShortString> _durableSubNames;
QueueRecordBinding(final List<AMQShortString> durableSubNames)
{
_durableSubNames = durableSubNames;
}
@Override
public QueueRecord entryToObject(final TupleInput input)
{
AMQShortString name = AMQShortStringEncoding.readShortString(input);
AMQShortString owner = AMQShortStringEncoding.readShortString(input);
FieldTable arguments = FieldTableEncoding.readFieldTable(input);
boolean exclusive = input.available() > 0 && input.readBoolean();
exclusive = exclusive || _durableSubNames.contains(name);
return new QueueRecord(name, owner, exclusive, arguments);
}
@Override
public void objectToEntry(final QueueRecord record, final TupleOutput output)
{
AMQShortStringEncoding.writeShortString(record.getNameShortString(), output);
AMQShortStringEncoding.writeShortString(record.getOwner(), output);
FieldTableEncoding.writeFieldTable(record.getArguments(), output);
output.writeBoolean(record.isExclusive());
}
}
static final class MessageMetaDataBinding extends TupleBinding<StorableMessageMetaData>
{
@Override
public MessageMetaData entryToObject(final TupleInput input)
{
try
{
final MessagePublishInfo publishBody = readMessagePublishInfo(input);
final ContentHeaderBody contentHeaderBody = readContentHeaderBody(input);
return new MessageMetaData(publishBody, contentHeaderBody);
}
catch (Exception e)
{
LOGGER.error("Error converting entry to object: " + e, e);
// annoyingly just have to return null since we cannot throw
return null;
}
}
private MessagePublishInfo readMessagePublishInfo(TupleInput tupleInput)
{
final AMQShortString exchange = AMQShortStringEncoding.readShortString(tupleInput);
final AMQShortString routingKey = AMQShortStringEncoding.readShortString(tupleInput);
final boolean mandatory = tupleInput.readBoolean();
final boolean immediate = tupleInput.readBoolean();
return new MessagePublishInfo(exchange, immediate, mandatory, routingKey);
}
private ContentHeaderBody readContentHeaderBody(TupleInput tupleInput) throws AMQFrameDecodingException,
AMQProtocolVersionException
{
int bodySize = tupleInput.readInt();
byte[] underlying = new byte[bodySize];
tupleInput.readFast(underlying);
return ContentHeaderBody.createFromBuffer(QpidByteBuffer.wrap(underlying),
bodySize);
}
@Override
public void objectToEntry(final StorableMessageMetaData metaData, final TupleOutput output)
{
final int bodySize = 1 + metaData.getStorableSize();
byte[] underlying = new byte[bodySize];
underlying[0] = (byte) metaData.getType().ordinal();
QpidByteBuffer buf = QpidByteBuffer.wrap(underlying);
buf.position(1);
buf = buf.slice();
metaData.writeToBuffer(buf);
output.writeInt(bodySize);
output.writeFast(underlying);
}
}
static final class MessageContentKey
{
private long _messageId;
private int _chunk;
public MessageContentKey(long messageId, int chunkNo)
{
_messageId = messageId;
_chunk = chunkNo;
}
public int getChunk()
{
return _chunk;
}
public long getMessageId()
{
return _messageId;
}
}
static final class MessageContentKeyBinding extends TupleBinding<MessageContentKey>
{
@Override
public MessageContentKey entryToObject(TupleInput tupleInput)
{
long messageId = tupleInput.readLong();
int chunk = tupleInput.readInt();
return new MessageContentKey(messageId, chunk);
}
@Override
public void objectToEntry(MessageContentKey object, TupleOutput tupleOutput)
{
final MessageContentKey mk = object;
tupleOutput.writeLong(mk.getMessageId());
tupleOutput.writeInt(mk.getChunk());
}
}
static final class ContentBinding extends TupleBinding<ByteBuffer>
{
@Override
public ByteBuffer entryToObject(TupleInput tupleInput)
{
final int size = tupleInput.readInt();
byte[] underlying = new byte[size];
tupleInput.readFast(underlying);
return ByteBuffer.wrap(underlying);
}
@Override
public void objectToEntry(ByteBuffer src, TupleOutput tupleOutput)
{
src = src.slice();
byte[] chunkData = new byte[src.limit()];
src.duplicate().get(chunkData);
tupleOutput.writeInt(chunkData.length);
tupleOutput.writeFast(chunkData);
}
}
static final class QueueEntryKey
{
private AMQShortString _queueName;
private long _messageId;
public QueueEntryKey(AMQShortString queueName, long messageId)
{
_queueName = queueName;
_messageId = messageId;
}
public AMQShortString getQueueName()
{
return _queueName;
}
public long getMessageId()
{
return _messageId;
}
}
static final class QueueEntryKeyBinding extends TupleBinding<QueueEntryKey>
{
@Override
public QueueEntryKey entryToObject(TupleInput tupleInput)
{
AMQShortString queueName = AMQShortStringEncoding.readShortString(tupleInput);
long messageId = tupleInput.readLong();
return new QueueEntryKey(queueName, messageId);
}
@Override
public void objectToEntry(QueueEntryKey mk, TupleOutput tupleOutput)
{
AMQShortStringEncoding.writeShortString(mk.getQueueName(), tupleOutput);
tupleOutput.writeLong(mk.getMessageId());
}
}
static final class BindingRecord extends Object
{
private final AMQShortString _exchangeName;
private final AMQShortString _queueName;
private final AMQShortString _routingKey;
private final FieldTable _arguments;
public BindingRecord(AMQShortString exchangeName, AMQShortString queueName, AMQShortString routingKey,
FieldTable arguments)
{
_exchangeName = exchangeName;
_queueName = queueName;
_routingKey = routingKey;
_arguments = arguments;
}
public AMQShortString getExchangeName()
{
return _exchangeName;
}
public AMQShortString getQueueName()
{
return _queueName;
}
public AMQShortString getRoutingKey()
{
return _routingKey;
}
public FieldTable getArguments()
{
return _arguments;
}
}
static final class BindingTuple extends TupleBinding<BindingRecord>
{
@Override
public BindingRecord entryToObject(TupleInput tupleInput)
{
AMQShortString exchangeName = AMQShortStringEncoding.readShortString(tupleInput);
AMQShortString queueName = AMQShortStringEncoding.readShortString(tupleInput);
AMQShortString routingKey = AMQShortStringEncoding.readShortString(tupleInput);
FieldTable arguments = FieldTableEncoding.readFieldTable(tupleInput);
return new BindingRecord(exchangeName, queueName, routingKey, arguments);
}
@Override
public void objectToEntry(BindingRecord binding, TupleOutput tupleOutput)
{
AMQShortStringEncoding.writeShortString(binding.getExchangeName(), tupleOutput);
AMQShortStringEncoding.writeShortString(binding.getQueueName(), tupleOutput);
AMQShortStringEncoding.writeShortString(binding.getRoutingKey(), tupleOutput);
FieldTableEncoding.writeFieldTable(binding.getArguments(), tupleOutput);
}
}
}