| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.direct; |
| |
| import java.nio.ByteBuffer; |
| import java.util.BitSet; |
| import java.util.Collection; |
| import java.util.Map; |
| import java.util.UUID; |
| import java.util.function.Consumer; |
| import org.apache.ignite.internal.direct.state.DirectMessageState; |
| import org.apache.ignite.internal.direct.state.DirectMessageStateItem; |
| import org.apache.ignite.internal.direct.stream.DirectByteBufferStream; |
| import org.apache.ignite.internal.managers.communication.CompressedMessage; |
| import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; |
| import org.apache.ignite.internal.processors.cache.CacheObject; |
| import org.apache.ignite.internal.processors.cache.KeyCacheObject; |
| import org.apache.ignite.internal.util.GridLongList; |
| import org.apache.ignite.internal.util.tostring.GridToStringInclude; |
| import org.apache.ignite.internal.util.typedef.internal.S; |
| import org.apache.ignite.lang.IgniteOutClosure; |
| import org.apache.ignite.lang.IgniteUuid; |
| import org.apache.ignite.plugin.extensions.communication.Message; |
| import org.apache.ignite.plugin.extensions.communication.MessageArrayType; |
| import org.apache.ignite.plugin.extensions.communication.MessageCollectionType; |
| import org.apache.ignite.plugin.extensions.communication.MessageFactory; |
| import org.apache.ignite.plugin.extensions.communication.MessageMapType; |
| import org.apache.ignite.plugin.extensions.communication.MessageWriter; |
| import org.jetbrains.annotations.Nullable; |
| |
| import static org.apache.ignite.configuration.IgniteConfiguration.DFLT_NETWORK_COMPRESSION; |
| |
| /** |
| * Message writer implementation. |
| */ |
| public class DirectMessageWriter implements MessageWriter { |
| /** Temporary buffer capacity. */ |
| private static final int TMP_BUF_CAPACITY = 10 * 1024; |
| |
| /** State. */ |
| @GridToStringInclude |
| private final DirectMessageState<StateItem> state; |
| |
| /** Message factory. */ |
| private final MessageFactory msgFactory; |
| |
| /** Compression level. Used only for {@link CompressedMessage}. */ |
| private final int compressionLvl; |
| |
| /** Buffer for writing. */ |
| private ByteBuffer buf; |
| |
| /** @param msgFactory Message factory. */ |
| public DirectMessageWriter(final MessageFactory msgFactory) { |
| this(msgFactory, DFLT_NETWORK_COMPRESSION); |
| } |
| |
| /** |
| * @param msgFactory Message factory. |
| * @param compressionLvl Compression level. |
| */ |
| public DirectMessageWriter(final MessageFactory msgFactory, final int compressionLvl) { |
| this.msgFactory = msgFactory; |
| this.compressionLvl = compressionLvl; |
| |
| state = new DirectMessageState<>(StateItem.class, new IgniteOutClosure<StateItem>() { |
| @Override public StateItem apply() { |
| return new StateItem(msgFactory); |
| } |
| }); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void setBuffer(ByteBuffer buf) { |
| this.buf = buf; |
| |
| state.item().stream.setBuffer(buf); |
| } |
| |
| /** |
| * Gets buffer to write to. |
| * |
| * @return Byte buffer. |
| */ |
| public ByteBuffer getBuffer() { |
| return buf; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean writeHeader(short type) { |
| DirectByteBufferStream stream = state.item().stream; |
| |
| stream.writeShort(type); |
| |
| return stream.lastFinished(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean writeByte(byte val) { |
| DirectByteBufferStream stream = state.item().stream; |
| |
| stream.writeByte(val); |
| |
| return stream.lastFinished(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean writeShort(short val) { |
| DirectByteBufferStream stream = state.item().stream; |
| |
| stream.writeShort(val); |
| |
| return stream.lastFinished(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean writeInt(int val) { |
| DirectByteBufferStream stream = state.item().stream; |
| |
| stream.writeInt(val); |
| |
| return stream.lastFinished(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean writeLong(long val) { |
| DirectByteBufferStream stream = state.item().stream; |
| |
| stream.writeLong(val); |
| |
| return stream.lastFinished(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean writeFloat(float val) { |
| DirectByteBufferStream stream = state.item().stream; |
| |
| stream.writeFloat(val); |
| |
| return stream.lastFinished(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean writeDouble(double val) { |
| DirectByteBufferStream stream = state.item().stream; |
| |
| stream.writeDouble(val); |
| |
| return stream.lastFinished(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean writeChar(char val) { |
| DirectByteBufferStream stream = state.item().stream; |
| |
| stream.writeChar(val); |
| |
| return stream.lastFinished(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean writeBoolean(boolean val) { |
| DirectByteBufferStream stream = state.item().stream; |
| |
| stream.writeBoolean(val); |
| |
| return stream.lastFinished(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean writeByteArray(@Nullable byte[] val) { |
| DirectByteBufferStream stream = state.item().stream; |
| |
| stream.writeByteArray(val); |
| |
| return stream.lastFinished(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean writeByteArray(byte[] val, long off, int len) { |
| DirectByteBufferStream stream = state.item().stream; |
| |
| stream.writeByteArray(val, off, len); |
| |
| return stream.lastFinished(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean writeShortArray(@Nullable short[] val) { |
| DirectByteBufferStream stream = state.item().stream; |
| |
| stream.writeShortArray(val); |
| |
| return stream.lastFinished(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean writeIntArray(@Nullable int[] val) { |
| DirectByteBufferStream stream = state.item().stream; |
| |
| stream.writeIntArray(val); |
| |
| return stream.lastFinished(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean writeLongArray(@Nullable long[] val) { |
| DirectByteBufferStream stream = state.item().stream; |
| |
| stream.writeLongArray(val); |
| |
| return stream.lastFinished(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean writeLongArray(long[] val, int len) { |
| DirectByteBufferStream stream = state.item().stream; |
| |
| stream.writeLongArray(val, len); |
| |
| return stream.lastFinished(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean writeFloatArray(@Nullable float[] val) { |
| DirectByteBufferStream stream = state.item().stream; |
| |
| stream.writeFloatArray(val); |
| |
| return stream.lastFinished(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean writeDoubleArray(@Nullable double[] val) { |
| DirectByteBufferStream stream = state.item().stream; |
| |
| stream.writeDoubleArray(val); |
| |
| return stream.lastFinished(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean writeCharArray(@Nullable char[] val) { |
| DirectByteBufferStream stream = state.item().stream; |
| |
| stream.writeCharArray(val); |
| |
| return stream.lastFinished(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean writeBooleanArray(@Nullable boolean[] val) { |
| DirectByteBufferStream stream = state.item().stream; |
| |
| stream.writeBooleanArray(val); |
| |
| return stream.lastFinished(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean writeString(String val) { |
| DirectByteBufferStream stream = state.item().stream; |
| |
| stream.writeString(val); |
| |
| return stream.lastFinished(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean writeBitSet(BitSet val) { |
| DirectByteBufferStream stream = state.item().stream; |
| |
| stream.writeBitSet(val); |
| |
| return stream.lastFinished(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean writeUuid(UUID val) { |
| DirectByteBufferStream stream = state.item().stream; |
| |
| stream.writeUuid(val); |
| |
| return stream.lastFinished(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean writeIgniteUuid(IgniteUuid val) { |
| DirectByteBufferStream stream = state.item().stream; |
| |
| stream.writeIgniteUuid(val); |
| |
| return stream.lastFinished(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean writeAffinityTopologyVersion(AffinityTopologyVersion val) { |
| DirectByteBufferStream stream = state.item().stream; |
| |
| stream.writeAffinityTopologyVersion(val); |
| |
| return stream.lastFinished(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean writeMessage(@Nullable Message msg, boolean compress) { |
| DirectByteBufferStream stream = state.item().stream; |
| |
| if (compress) |
| writeCompressedMessage( |
| tmpWriter -> tmpWriter.state.item().stream.writeMessage(msg, tmpWriter), |
| msg == null, |
| stream |
| ); |
| else |
| stream.writeMessage(msg, this); |
| |
| return stream.lastFinished(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean writeCacheObject(@Nullable CacheObject obj) { |
| DirectByteBufferStream stream = state.item().stream; |
| |
| stream.writeCacheObject(obj); |
| |
| return stream.lastFinished(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean writeKeyCacheObject(KeyCacheObject obj) { |
| DirectByteBufferStream stream = state.item().stream; |
| |
| stream.writeKeyCacheObject(obj); |
| |
| return stream.lastFinished(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean writeGridLongList(@Nullable GridLongList ll) { |
| DirectByteBufferStream stream = state.item().stream; |
| |
| stream.writeGridLongList(ll); |
| |
| return stream.lastFinished(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public <T> boolean writeObjectArray(T[] arr, MessageArrayType type) { |
| DirectByteBufferStream stream = state.item().stream; |
| |
| stream.writeObjectArray(arr, type, this); |
| |
| return stream.lastFinished(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public <T> boolean writeCollection(Collection<T> col, MessageCollectionType type) { |
| DirectByteBufferStream stream = state.item().stream; |
| |
| stream.writeCollection(col, type, this); |
| |
| return stream.lastFinished(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public <K, V> boolean writeMap(Map<K, V> map, MessageMapType type, boolean compress) { |
| DirectByteBufferStream stream = state.item().stream; |
| |
| if (compress) |
| writeCompressedMessage( |
| tmpWriter -> tmpWriter.state.item().stream.writeMap(map, type, tmpWriter), |
| map == null, |
| stream |
| ); |
| else |
| stream.writeMap(map, type, this); |
| |
| return stream.lastFinished(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean isHeaderWritten() { |
| return state.item().hdrWritten; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onHeaderWritten() { |
| state.item().hdrWritten = true; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int state() { |
| return state.item().state; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void incrementState() { |
| state.item().state++; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void decrementState() { |
| state.item().state--; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void beforeNestedWrite() { |
| state.forward(); |
| |
| state.item().stream.setBuffer(buf); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void afterNestedWrite(boolean finished) { |
| state.backward(finished); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void reset() { |
| state.reset(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| return S.toString(DirectMessageWriter.class, this); |
| } |
| |
| /** |
| * @param consumer Consumer. |
| * @param isNull {@code True} if message is null. |
| * @param stream Byte buffer stream. |
| */ |
| private void writeCompressedMessage(Consumer<DirectMessageWriter> consumer, boolean isNull, DirectByteBufferStream stream) { |
| if (isNull) { |
| stream.writeShort(Short.MIN_VALUE); |
| |
| return; |
| } |
| |
| if (!stream.serializeFinished()) { |
| ByteBuffer tmpBuf = ByteBuffer.allocateDirect(TMP_BUF_CAPACITY); |
| |
| DirectMessageWriter tmpWriter = new DirectMessageWriter(msgFactory, compressionLvl); |
| |
| tmpWriter.setBuffer(tmpBuf); |
| |
| boolean finished; |
| |
| do { |
| if (tmpBuf.remaining() <= tmpBuf.capacity() / 10) { |
| byte[] bytes = new byte[tmpBuf.position()]; |
| |
| tmpBuf.flip(); |
| tmpBuf.get(bytes); |
| |
| tmpBuf = ByteBuffer.allocateDirect(tmpBuf.capacity() * 2); |
| |
| tmpBuf.put(bytes); |
| |
| tmpWriter.setBuffer(tmpBuf); |
| } |
| |
| consumer.accept(tmpWriter); |
| |
| finished = tmpWriter.state.item().stream.lastFinished(); |
| } |
| while (!finished); |
| |
| tmpBuf.flip(); |
| |
| stream.compressedMessage(new CompressedMessage(tmpBuf, compressionLvl)); |
| stream.serializeFinished(true); |
| } |
| |
| stream.writeMessage(stream.compressedMessage(), this); |
| |
| if (stream.lastFinished()) { |
| stream.compressedMessage(null); |
| stream.serializeFinished(false); |
| } |
| } |
| |
| /** |
| */ |
| private static class StateItem implements DirectMessageStateItem { |
| /** */ |
| private final DirectByteBufferStream stream; |
| |
| /** */ |
| private int state; |
| |
| /** */ |
| private boolean hdrWritten; |
| |
| /** */ |
| public StateItem(MessageFactory msgFactory) { |
| stream = new DirectByteBufferStream(msgFactory); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void reset() { |
| state = 0; |
| hdrWritten = false; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| return S.toString(StateItem.class, this); |
| } |
| } |
| } |