blob: 837b61addf9863fb38493baeec45a245f2b5cae7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.hadoop.shuffle.direct;
import java.io.IOException;
import java.util.zip.GZIPOutputStream;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.processors.hadoop.HadoopSerialization;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
/**
* Hadoop data output context for direct communication.
*/
public class HadoopDirectDataOutputContext {
/** Initial allocation size for GZIP output. We start with very low value, but then it will grow if needed. */
private static final int GZIP_OUT_MIN_ALLOC_SIZE = 1024;
/** GZIP buffer size. We should remove it when we implement efficient direct GZIP output. */
private static final int GZIP_BUFFER_SIZE = 8096;
/** Flush size. */
private final int flushSize;
/** Whether to perform GZIP. */
private final boolean gzip;
/** Key serialization. */
private final HadoopSerialization keySer;
/** Value serialization. */
private final HadoopSerialization valSer;
/** Data output. */
private HadoopDirectDataOutput out;
/** Data output for GZIP. */
private HadoopDirectDataOutput gzipOut;
/** Number of keys written. */
private int cnt;
/**
* Constructor.
*
* @param flushSize Flush size.
* @param gzip Whether to perform GZIP.
* @param taskCtx Task context.
* @throws IgniteCheckedException If failed.
*/
public HadoopDirectDataOutputContext(int flushSize, boolean gzip, HadoopTaskContext taskCtx)
throws IgniteCheckedException {
this.flushSize = flushSize;
this.gzip = gzip;
keySer = taskCtx.keySerialization();
valSer = taskCtx.valueSerialization();
out = new HadoopDirectDataOutput(flushSize);
if (gzip)
gzipOut = new HadoopDirectDataOutput(Math.max(flushSize / 8, GZIP_OUT_MIN_ALLOC_SIZE));
}
/**
* Write key-value pair.
*
* @param key Key.
* @param val Value.
* @return Whether flush is needed.
* @throws IgniteCheckedException If failed.
*/
public boolean write(Object key, Object val) throws IgniteCheckedException {
keySer.write(out, key);
valSer.write(out, val);
cnt++;
return out.readyForFlush();
}
/**
* @return Key-value pairs count.
*/
public int count() {
return cnt;
}
/**
* @return State.
*/
public HadoopDirectDataOutputState state() {
if (gzip) {
try {
try (GZIPOutputStream gzip = new GZIPOutputStream(gzipOut, GZIP_BUFFER_SIZE)) {
gzip.write(out.buffer(), 0, out.position());
}
return new HadoopDirectDataOutputState(gzipOut.buffer(), gzipOut.position(), out.position());
}
catch (IOException e) {
throw new IgniteException("Failed to compress.", e);
}
}
else
return new HadoopDirectDataOutputState(out.buffer(), out.position(), out.position());
}
/**
* Reset buffer.
*/
public void reset() {
if (gzip) {
// In GZIP mode we do not expose normal output to the outside. Hence, no need for reallocation, just reset.
out.reset();
gzipOut = new HadoopDirectDataOutput(gzipOut.bufferLength());
}
else
out = new HadoopDirectDataOutput(flushSize, out.bufferLength());
cnt = 0;
}
}