blob: 8ff2b832019e2586c8cdb2a7374e362405200260 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.streaming;
import java.io.*;
import java.net.InetAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.io.compress.CompressedRandomAccessReader;
import org.apache.cassandra.io.util.RandomAccessReader;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.net.Header;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.Throttle;
import org.apache.cassandra.utils.WrappedRunnable;
import com.ning.compress.lzf.LZFOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class FileStreamTask extends WrappedRunnable
{
private static Logger logger = LoggerFactory.getLogger(FileStreamTask.class);
public static final int CHUNK_SIZE = 64 * 1024;
// around 10 minutes at the default rpctimeout
public static final int MAX_CONNECT_ATTEMPTS = 8;
protected final StreamHeader header;
protected final InetAddress to;
// communication socket
private Socket socket;
// socket's output/input stream
private OutputStream output;
private OutputStream compressedoutput;
private DataInputStream input;
// allocate buffer to use for transfers only once
private final byte[] transferBuffer = new byte[CHUNK_SIZE];
// outbound global throughput limiter
private final Throttle throttle;
private final StreamReplyVerbHandler handler = new StreamReplyVerbHandler();
public FileStreamTask(StreamHeader header, InetAddress to)
{
this.header = header;
this.to = to;
this.throttle = new Throttle(toString(), new Throttle.ThroughputFunction()
{
/** @return Instantaneous throughput target in bytes per millisecond. */
public int targetThroughput()
{
if (DatabaseDescriptor.getStreamThroughputOutboundMegabitsPerSec() < 1)
// throttling disabled
return 0;
// total throughput
int totalBytesPerMS = DatabaseDescriptor.getStreamThroughputOutboundMegabitsPerSec() * 1024 * 1024 / 8 / 1000;
// per stream throughput (target bytes per MS)
return totalBytesPerMS / Math.max(1, MessagingService.instance().getActiveStreamsOutbound());
}
});
}
public void runMayThrow() throws IOException
{
try
{
connectAttempt();
// successfully connected: stream.
// (at this point, if we fail, it is the receiver's job to re-request)
stream();
StreamOutSession session = StreamOutSession.get(to, header.sessionId);
if (session == null)
{
logger.info("Found no stream out session at end of file stream task - this is expected if the receiver went down");
}
else if (session.getFiles().size() == 0)
{
// we are the last of our kind, receive the final confirmation before closing
receiveReply();
logger.info("Finished streaming session to {}", to);
}
}
catch (IOException e)
{
StreamOutSession session = StreamOutSession.get(to, header.sessionId);
if (session != null)
session.close(false);
throw e;
}
finally
{
try
{
close();
}
catch (IOException e)
{
if (logger.isDebugEnabled())
logger.debug("error closing socket", e);
}
}
if (logger.isDebugEnabled())
logger.debug("Done streaming " + header.file);
}
/**
* Stream file by it's sections specified by this.header
* @throws IOException on any I/O error
*/
private void stream() throws IOException
{
ByteBuffer HeaderBuffer = MessagingService.instance().constructStreamHeader(header, false, Gossiper.instance.getVersion(to));
// write header (this should not be compressed for compatibility with other messages)
output.write(ByteBufferUtil.getArray(HeaderBuffer));
if (header.file == null)
return;
// TODO just use a raw RandomAccessFile since we're managing our own buffer here
RandomAccessReader file = (header.file.sstable.compression) // try to skip kernel page cache if possible
? CompressedRandomAccessReader.open(header.file.getFilename(), header.file.sstable.getCompressionMetadata(), true)
: RandomAccessReader.open(new File(header.file.getFilename()), true);
// setting up data compression stream
compressedoutput = new LZFOutputStream(output);
MessagingService.instance().incrementActiveStreamsOutbound();
try
{
// stream each of the required sections of the file
for (Pair<Long, Long> section : header.file.sections)
{
// seek to the beginning of the section
file.seek(section.left);
// length of the section to stream
long length = section.right - section.left;
// tracks write progress
long bytesTransferred = 0;
while (bytesTransferred < length)
{
long lastWrite = write(file, length, bytesTransferred);
bytesTransferred += lastWrite;
// store streaming progress
header.file.progress += lastWrite;
}
// make sure that current section is send
compressedoutput.flush();
if (logger.isDebugEnabled())
logger.debug("Bytes transferred " + bytesTransferred + "/" + header.file.size);
}
// receive reply confirmation
receiveReply();
}
finally
{
MessagingService.instance().decrementActiveStreamsOutbound();
// no matter what happens close file
FileUtils.closeQuietly(file);
}
}
private void receiveReply() throws IOException
{
MessagingService.validateMagic(input.readInt());
int msheader = input.readInt();
assert MessagingService.getBits(msheader, 3, 1) == 0 : "Stream received before stream reply";
int version = MessagingService.getBits(msheader, 15, 8);
input.readInt(); // Read total size
String id = input.readUTF();
Header header = Header.serializer().deserialize(input, version);
int bodySize = input.readInt();
byte[] body = new byte[bodySize];
input.readFully(body);
Message message = new Message(header, body, version);
assert message.getVerb() == StorageService.Verb.STREAM_REPLY : "Non-reply message received on stream socket";
handler.doVerb(message, id);
}
/**
* Sequentially read bytes from the file and write them to the output stream
*
* @param reader The file reader to read from
* @param length The full length that should be transferred
* @param bytesTransferred Number of bytes remaining to transfer
*
* @return Number of bytes transferred
*
* @throws IOException on any I/O error
*/
protected long write(RandomAccessReader reader, long length, long bytesTransferred) throws IOException
{
int toTransfer = (int) Math.min(CHUNK_SIZE, length - bytesTransferred);
reader.readFully(transferBuffer, 0, toTransfer);
compressedoutput.write(transferBuffer, 0, toTransfer);
throttle.throttleDelta(toTransfer);
return toTransfer;
}
/**
* Connects to the destination, with backoff for failed attempts.
* TODO: all nodes on a cluster must currently use the same storage port
* @throws IOException If all attempts fail.
*/
private void connectAttempt() throws IOException
{
int attempts = 0;
while (true)
{
try
{
socket = MessagingService.instance().getConnectionPool(to).newSocket();
socket.setSoTimeout(DatabaseDescriptor.getStreamingSocketTimeout());
output = socket.getOutputStream();
input = new DataInputStream(socket.getInputStream());
break;
}
catch (IOException e)
{
if (++attempts >= MAX_CONNECT_ATTEMPTS)
throw e;
long waitms = DatabaseDescriptor.getRpcTimeout() * (long)Math.pow(2, attempts);
logger.warn("Failed attempt " + attempts + " to connect to " + to + " to stream " + header.file + ". Retrying in " + waitms + " ms. (" + e + ")");
try
{
Thread.sleep(waitms);
}
catch (InterruptedException wtf)
{
throw new RuntimeException(wtf);
}
}
}
}
protected void close() throws IOException
{
output.close();
}
public String toString()
{
return String.format("FileStreamTask(session=%s, to=%s)", header.sessionId, to);
}
}