blob: cbc87980b24a86bb78e66183cbe0073ef20bceee [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.network.direct;
import java.nio.ByteBuffer;
import java.util.BitSet;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.ignite.internal.network.direct.state.DirectMessageState;
import org.apache.ignite.internal.network.direct.state.DirectMessageStateItem;
import org.apache.ignite.internal.network.direct.stream.DirectByteBufferStream;
import org.apache.ignite.internal.network.direct.stream.DirectByteBufferStreamImplV1;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.serialization.MessageSerializationRegistry;
import org.apache.ignite.network.serialization.MessageWriter;
import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
import org.jetbrains.annotations.Nullable;
/**
* Message writer implementation.
*/
public class DirectMessageWriter implements MessageWriter {
/** State. */
private final DirectMessageState<StateItem> state;
/**
* Constructor.
*
* @param serializationRegistry Serialization registry.
* @param protoVer Protocol version.
*/
public DirectMessageWriter(MessageSerializationRegistry serializationRegistry, byte protoVer) {
state = new DirectMessageState<>(StateItem.class, () -> new StateItem(createStream(serializationRegistry, protoVer)));
}
/** {@inheritDoc} */
@Override
public void setBuffer(ByteBuffer buf) {
state.item().stream.setBuffer(buf);
}
/** {@inheritDoc} */
@Override
public void setCurrentWriteClass(Class<? extends NetworkMessage> msgCls) {
// No-op.
}
/** {@inheritDoc} */
// TODO: compress the header https://issues.apache.org/jira/browse/IGNITE-14818
@Override
public boolean writeHeader(short groupType, short messageType, byte fieldCnt) {
DirectByteBufferStream stream = state.item().stream;
// first part of the header might have already been sent in a previous write attempt
if (!state.item().partialHdrWritten) {
stream.writeShort(groupType);
if (stream.lastFinished()) {
state.item().partialHdrWritten = true;
} else {
return false;
}
}
stream.writeShort(messageType);
return stream.lastFinished();
}
/** {@inheritDoc} */
@Override
public boolean writeByte(String name, byte val) {
DirectByteBufferStream stream = state.item().stream;
stream.writeByte(val);
return stream.lastFinished();
}
@Override
public boolean writeBoxedByte(String name, @Nullable Byte val) {
DirectByteBufferStream stream = state.item().stream;
stream.writeBoxedByte(val);
return stream.lastFinished();
}
/** {@inheritDoc} */
@Override
public boolean writeShort(String name, short val) {
DirectByteBufferStream stream = state.item().stream;
stream.writeShort(val);
return stream.lastFinished();
}
@Override
public boolean writeBoxedShort(String name, @Nullable Short val) {
DirectByteBufferStream stream = state.item().stream;
stream.writeBoxedShort(val);
return stream.lastFinished();
}
/** {@inheritDoc} */
@Override
public boolean writeInt(String name, int val) {
DirectByteBufferStream stream = state.item().stream;
stream.writeInt(val);
return stream.lastFinished();
}
@Override
public boolean writeBoxedInt(String name, @Nullable Integer val) {
DirectByteBufferStream stream = state.item().stream;
stream.writeBoxedInt(val);
return stream.lastFinished();
}
/** {@inheritDoc} */
@Override
public boolean writeLong(String name, long val) {
DirectByteBufferStream stream = state.item().stream;
stream.writeLong(val);
return stream.lastFinished();
}
@Override
public boolean writeBoxedLong(String name, @Nullable Long val) {
DirectByteBufferStream stream = state.item().stream;
stream.writeBoxedLong(val);
return stream.lastFinished();
}
/** {@inheritDoc} */
@Override
public boolean writeFloat(String name, float val) {
DirectByteBufferStream stream = state.item().stream;
stream.writeFloat(val);
return stream.lastFinished();
}
@Override
public boolean writeBoxedFloat(String name, @Nullable Float val) {
DirectByteBufferStream stream = state.item().stream;
stream.writeBoxedFloat(val);
return stream.lastFinished();
}
/** {@inheritDoc} */
@Override
public boolean writeDouble(String name, double val) {
DirectByteBufferStream stream = state.item().stream;
stream.writeDouble(val);
return stream.lastFinished();
}
@Override
public boolean writeBoxedDouble(String name, @Nullable Double val) {
DirectByteBufferStream stream = state.item().stream;
stream.writeBoxedDouble(val);
return stream.lastFinished();
}
/** {@inheritDoc} */
@Override
public boolean writeChar(String name, char val) {
DirectByteBufferStream stream = state.item().stream;
stream.writeChar(val);
return stream.lastFinished();
}
@Override
public boolean writeBoxedChar(String name, @Nullable Character val) {
DirectByteBufferStream stream = state.item().stream;
stream.writeBoxedChar(val);
return stream.lastFinished();
}
/** {@inheritDoc} */
@Override
public boolean writeBoolean(String name, boolean val) {
DirectByteBufferStream stream = state.item().stream;
stream.writeBoolean(val);
return stream.lastFinished();
}
@Override
public boolean writeBoxedBoolean(String name, @Nullable Boolean val) {
DirectByteBufferStream stream = state.item().stream;
stream.writeBoxedBoolean(val);
return stream.lastFinished();
}
/** {@inheritDoc} */
@Override
public boolean writeByteArray(String name, @Nullable byte[] val) {
DirectByteBufferStream stream = state.item().stream;
stream.writeByteArray(val);
return stream.lastFinished();
}
/** {@inheritDoc} */
@Override
public boolean writeByteArray(String name, byte[] val, long off, int len) {
DirectByteBufferStream stream = state.item().stream;
stream.writeByteArray(val, off, len);
return stream.lastFinished();
}
/** {@inheritDoc} */
@Override
public boolean writeShortArray(String name, @Nullable short[] val) {
DirectByteBufferStream stream = state.item().stream;
stream.writeShortArray(val);
return stream.lastFinished();
}
/** {@inheritDoc} */
@Override
public boolean writeIntArray(String name, @Nullable int[] val) {
DirectByteBufferStream stream = state.item().stream;
stream.writeIntArray(val);
return stream.lastFinished();
}
/** {@inheritDoc} */
@Override
public boolean writeLongArray(String name, @Nullable long[] val) {
DirectByteBufferStream stream = state.item().stream;
stream.writeLongArray(val);
return stream.lastFinished();
}
/** {@inheritDoc} */
@Override
public boolean writeLongArray(String name, long[] val, int len) {
DirectByteBufferStream stream = state.item().stream;
stream.writeLongArray(val, len);
return stream.lastFinished();
}
/** {@inheritDoc} */
@Override
public boolean writeFloatArray(String name, @Nullable float[] val) {
DirectByteBufferStream stream = state.item().stream;
stream.writeFloatArray(val);
return stream.lastFinished();
}
/** {@inheritDoc} */
@Override
public boolean writeDoubleArray(String name, @Nullable double[] val) {
DirectByteBufferStream stream = state.item().stream;
stream.writeDoubleArray(val);
return stream.lastFinished();
}
/** {@inheritDoc} */
@Override
public boolean writeCharArray(String name, @Nullable char[] val) {
DirectByteBufferStream stream = state.item().stream;
stream.writeCharArray(val);
return stream.lastFinished();
}
/** {@inheritDoc} */
@Override
public boolean writeBooleanArray(String name, @Nullable boolean[] val) {
DirectByteBufferStream stream = state.item().stream;
stream.writeBooleanArray(val);
return stream.lastFinished();
}
/** {@inheritDoc} */
@Override
public boolean writeString(String name, String val) {
DirectByteBufferStream stream = state.item().stream;
stream.writeString(val);
return stream.lastFinished();
}
/** {@inheritDoc} */
@Override
public boolean writeBitSet(String name, BitSet val) {
DirectByteBufferStream stream = state.item().stream;
stream.writeBitSet(val);
return stream.lastFinished();
}
@Override
public boolean writeByteBuffer(String name, ByteBuffer val) {
DirectByteBufferStream stream = state.item().stream;
stream.writeByteBuffer(val);
return stream.lastFinished();
}
/** {@inheritDoc} */
@Override
public boolean writeUuid(String name, UUID val) {
DirectByteBufferStream stream = state.item().stream;
stream.writeUuid(val);
return stream.lastFinished();
}
/** {@inheritDoc} */
@Override
public boolean writeIgniteUuid(String name, IgniteUuid val) {
DirectByteBufferStream stream = state.item().stream;
stream.writeIgniteUuid(val);
return stream.lastFinished();
}
/** {@inheritDoc} */
@Override
public boolean writeMessage(String name, @Nullable NetworkMessage msg) {
DirectByteBufferStream stream = state.item().stream;
stream.writeMessage(msg, this);
return stream.lastFinished();
}
/** {@inheritDoc} */
@Override
public <T> boolean writeObjectArray(String name, T[] arr, MessageCollectionItemType itemType) {
DirectByteBufferStream stream = state.item().stream;
stream.writeObjectArray(arr, itemType, this);
return stream.lastFinished();
}
/** {@inheritDoc} */
@Override
public <T> boolean writeCollection(String name, Collection<T> col, MessageCollectionItemType itemType) {
DirectByteBufferStream stream = state.item().stream;
stream.writeCollection(col, itemType, this);
return stream.lastFinished();
}
/** {@inheritDoc} */
@Override
public <T> boolean writeList(String name, List<T> col, MessageCollectionItemType itemType) {
DirectByteBufferStream stream = state.item().stream;
stream.writeCollection(col, itemType, this);
return stream.lastFinished();
}
/** {@inheritDoc} */
@Override
public <T> boolean writeSet(String name, Set<T> col, MessageCollectionItemType itemType) {
DirectByteBufferStream stream = state.item().stream;
stream.writeSet(col, itemType, this);
return stream.lastFinished();
}
/** {@inheritDoc} */
@Override
public <K, V> boolean writeMap(String name, Map<K, V> map, MessageCollectionItemType keyType,
MessageCollectionItemType valType) {
DirectByteBufferStream stream = state.item().stream;
stream.writeMap(map, keyType, valType, this);
return stream.lastFinished();
}
/** {@inheritDoc} */
@Override
public boolean isHeaderWritten() {
return state.item().hdrWritten;
}
/** {@inheritDoc} */
@Override
public void onHeaderWritten() {
state.item().hdrWritten = true;
}
/** {@inheritDoc} */
@Override
public int state() {
return state.item().state;
}
/** {@inheritDoc} */
@Override
public void incrementState() {
state.item().state++;
}
/** {@inheritDoc} */
@Override
public void beforeInnerMessageWrite() {
state.forward();
}
/** {@inheritDoc} */
@Override
public void afterInnerMessageWrite(boolean finished) {
state.backward(finished);
}
/** {@inheritDoc} */
@Override
public void reset() {
state.reset();
}
/**
* Returns a stream to write message fields recursively.
*
* @param serializationRegistry Serialization registry.
* @param protoVer Protocol version.
*/
protected DirectByteBufferStream createStream(MessageSerializationRegistry serializationRegistry, byte protoVer) {
switch (protoVer) {
case 1:
return new DirectByteBufferStreamImplV1(serializationRegistry);
default:
throw new IllegalStateException("Invalid protocol version: " + protoVer);
}
}
/**
* State item.
*/
private static class StateItem implements DirectMessageStateItem {
private final DirectByteBufferStream stream;
private int state;
/**
* Flag indicating that the first part of the message header has been written.
*/
private boolean partialHdrWritten;
/**
* Flag indicating that the whole message header has been written.
*/
private boolean hdrWritten;
/**
* Constructor.
*
* @param stream Direct byte buffer stream.
*/
StateItem(DirectByteBufferStream stream) {
this.stream = stream;
}
/** {@inheritDoc} */
@Override
public void reset() {
state = 0;
partialHdrWritten = false;
hdrWritten = false;
}
}
}