blob: 51cea174e9a1d3feee55cb975a825d2f08947094 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.direct;
import java.nio.ByteBuffer;
import java.util.BitSet;
import java.util.Collection;
import java.util.Map;
import java.util.UUID;
import org.apache.ignite.internal.direct.state.DirectMessageState;
import org.apache.ignite.internal.direct.state.DirectMessageStateItem;
import org.apache.ignite.internal.direct.stream.DirectByteBufferStream;
import org.apache.ignite.internal.direct.stream.v1.DirectByteBufferStreamImplV1;
import org.apache.ignite.internal.direct.stream.v2.DirectByteBufferStreamImplV2;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteOutClosure;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.jetbrains.annotations.Nullable;
/**
* Message writer implementation.
*/
public class DirectMessageWriter implements MessageWriter {
/** State. */
@GridToStringInclude
private final DirectMessageState<StateItem> state;
/**
* @param protoVer Protocol version.
*/
public DirectMessageWriter(final byte protoVer) {
state = new DirectMessageState<>(StateItem.class, new IgniteOutClosure<StateItem>() {
@Override public StateItem apply() {
return new StateItem(protoVer);
}
});
}
/** {@inheritDoc} */
@Override public void setBuffer(ByteBuffer buf) {
state.item().stream.setBuffer(buf);
}
/** {@inheritDoc} */
@Override public void setCurrentWriteClass(Class<? extends Message> msgCls) {
// No-op.
}
/** {@inheritDoc} */
@Override public boolean writeHeader(short type, byte fieldCnt) {
DirectByteBufferStream stream = state.item().stream;
stream.writeShort(type);
return stream.lastFinished();
}
/** {@inheritDoc} */
@Override public boolean writeByte(String name, byte val) {
DirectByteBufferStream stream = state.item().stream;
stream.writeByte(val);
return stream.lastFinished();
}
/** {@inheritDoc} */
@Override public boolean writeShort(String name, short val) {
DirectByteBufferStream stream = state.item().stream;
stream.writeShort(val);
return stream.lastFinished();
}
/** {@inheritDoc} */
@Override public boolean writeInt(String name, int val) {
DirectByteBufferStream stream = state.item().stream;
stream.writeInt(val);
return stream.lastFinished();
}
/** {@inheritDoc} */
@Override public boolean writeLong(String name, long val) {
DirectByteBufferStream stream = state.item().stream;
stream.writeLong(val);
return stream.lastFinished();
}
/** {@inheritDoc} */
@Override public boolean writeFloat(String name, float val) {
DirectByteBufferStream stream = state.item().stream;
stream.writeFloat(val);
return stream.lastFinished();
}
/** {@inheritDoc} */
@Override public boolean writeDouble(String name, double val) {
DirectByteBufferStream stream = state.item().stream;
stream.writeDouble(val);
return stream.lastFinished();
}
/** {@inheritDoc} */
@Override public boolean writeChar(String name, char val) {
DirectByteBufferStream stream = state.item().stream;
stream.writeChar(val);
return stream.lastFinished();
}
/** {@inheritDoc} */
@Override public boolean writeBoolean(String name, boolean val) {
DirectByteBufferStream stream = state.item().stream;
stream.writeBoolean(val);
return stream.lastFinished();
}
/** {@inheritDoc} */
@Override public boolean writeByteArray(String name, @Nullable byte[] val) {
DirectByteBufferStream stream = state.item().stream;
stream.writeByteArray(val);
return stream.lastFinished();
}
/** {@inheritDoc} */
@Override public boolean writeByteArray(String name, byte[] val, long off, int len) {
DirectByteBufferStream stream = state.item().stream;
stream.writeByteArray(val, off, len);
return stream.lastFinished();
}
/** {@inheritDoc} */
@Override public boolean writeShortArray(String name, @Nullable short[] val) {
DirectByteBufferStream stream = state.item().stream;
stream.writeShortArray(val);
return stream.lastFinished();
}
/** {@inheritDoc} */
@Override public boolean writeIntArray(String name, @Nullable int[] val) {
DirectByteBufferStream stream = state.item().stream;
stream.writeIntArray(val);
return stream.lastFinished();
}
/** {@inheritDoc} */
@Override public boolean writeLongArray(String name, @Nullable long[] val) {
DirectByteBufferStream stream = state.item().stream;
stream.writeLongArray(val);
return stream.lastFinished();
}
/** {@inheritDoc} */
@Override public boolean writeLongArray(String name, long[] val, int len) {
DirectByteBufferStream stream = state.item().stream;
stream.writeLongArray(val, len);
return stream.lastFinished();
}
/** {@inheritDoc} */
@Override public boolean writeFloatArray(String name, @Nullable float[] val) {
DirectByteBufferStream stream = state.item().stream;
stream.writeFloatArray(val);
return stream.lastFinished();
}
/** {@inheritDoc} */
@Override public boolean writeDoubleArray(String name, @Nullable double[] val) {
DirectByteBufferStream stream = state.item().stream;
stream.writeDoubleArray(val);
return stream.lastFinished();
}
/** {@inheritDoc} */
@Override public boolean writeCharArray(String name, @Nullable char[] val) {
DirectByteBufferStream stream = state.item().stream;
stream.writeCharArray(val);
return stream.lastFinished();
}
/** {@inheritDoc} */
@Override public boolean writeBooleanArray(String name, @Nullable boolean[] val) {
DirectByteBufferStream stream = state.item().stream;
stream.writeBooleanArray(val);
return stream.lastFinished();
}
/** {@inheritDoc} */
@Override public boolean writeString(String name, String val) {
DirectByteBufferStream stream = state.item().stream;
stream.writeString(val);
return stream.lastFinished();
}
/** {@inheritDoc} */
@Override public boolean writeBitSet(String name, BitSet val) {
DirectByteBufferStream stream = state.item().stream;
stream.writeBitSet(val);
return stream.lastFinished();
}
/** {@inheritDoc} */
@Override public boolean writeUuid(String name, UUID val) {
DirectByteBufferStream stream = state.item().stream;
stream.writeUuid(val);
return stream.lastFinished();
}
/** {@inheritDoc} */
@Override public boolean writeIgniteUuid(String name, IgniteUuid val) {
DirectByteBufferStream stream = state.item().stream;
stream.writeIgniteUuid(val);
return stream.lastFinished();
}
/** {@inheritDoc} */
@Override public boolean writeMessage(String name, @Nullable Message msg) {
DirectByteBufferStream stream = state.item().stream;
stream.writeMessage(msg, this);
return stream.lastFinished();
}
/** {@inheritDoc} */
@Override public <T> boolean writeObjectArray(String name, T[] arr, MessageCollectionItemType itemType) {
DirectByteBufferStream stream = state.item().stream;
stream.writeObjectArray(arr, itemType, this);
return stream.lastFinished();
}
/** {@inheritDoc} */
@Override public <T> boolean writeCollection(String name, Collection<T> col, MessageCollectionItemType itemType) {
DirectByteBufferStream stream = state.item().stream;
stream.writeCollection(col, itemType, this);
return stream.lastFinished();
}
/** {@inheritDoc} */
@Override public <K, V> boolean writeMap(String name, Map<K, V> map, MessageCollectionItemType keyType,
MessageCollectionItemType valType) {
DirectByteBufferStream stream = state.item().stream;
stream.writeMap(map, keyType, valType, this);
return stream.lastFinished();
}
/** {@inheritDoc} */
@Override public boolean isHeaderWritten() {
return state.item().hdrWritten;
}
/** {@inheritDoc} */
@Override public void onHeaderWritten() {
state.item().hdrWritten = true;
}
/** {@inheritDoc} */
@Override public int state() {
return state.item().state;
}
/** {@inheritDoc} */
@Override public void incrementState() {
state.item().state++;
}
/** {@inheritDoc} */
@Override public void beforeInnerMessageWrite() {
state.forward();
}
/** {@inheritDoc} */
@Override public void afterInnerMessageWrite(boolean finished) {
state.backward(finished);
}
/** {@inheritDoc} */
@Override public void reset() {
state.reset();
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(DirectMessageWriter.class, this);
}
/**
*/
private static class StateItem implements DirectMessageStateItem {
/** */
private final DirectByteBufferStream stream;
/** */
private int state;
/** */
private boolean hdrWritten;
/**
* @param protoVer Protocol version.
*/
public StateItem(byte protoVer) {
switch (protoVer) {
case 1:
stream = new DirectByteBufferStreamImplV1(null);
break;
case 2:
stream = new DirectByteBufferStreamImplV2(null);
break;
default:
throw new IllegalStateException("Invalid protocol version: " + protoVer);
}
}
/** {@inheritDoc} */
@Override public void reset() {
state = 0;
hdrWritten = false;
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(StateItem.class, this);
}
}
}