blob: cca7dfc70175de158b78a357689b9060dbd2a799 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
/**
* An implementation of the abstract class {@link EditLogOutputStream},
* which streams edits to a backup node.
*
* @see org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol#journal
* (org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration,
* int, int, byte[])
*/
class EditLogBackupOutputStream extends EditLogOutputStream {
static int DEFAULT_BUFFER_SIZE = 256;
private JournalProtocol backupNode; // RPC proxy to backup node
private NamenodeRegistration bnRegistration; // backup node registration
private NamenodeRegistration nnRegistration; // active node registration
private ArrayList<JournalRecord> bufCurrent; // current buffer for writing
private ArrayList<JournalRecord> bufReady; // buffer ready for flushing
private DataOutputBuffer out; // serialized output sent to backup node
static class JournalRecord {
byte op;
long txid;
Writable[] args;
JournalRecord(byte op, long txid, Writable ... writables) {
this.op = op;
this.txid = txid;
this.args = writables;
}
void write(DataOutputBuffer out) throws IOException {
writeChecksummedOp(out, op, txid, args);
}
}
EditLogBackupOutputStream(NamenodeRegistration bnReg, // backup node
NamenodeRegistration nnReg) // active name-node
throws IOException {
super();
this.bnRegistration = bnReg;
this.nnRegistration = nnReg;
InetSocketAddress bnAddress =
NetUtils.createSocketAddr(bnRegistration.getAddress());
Storage.LOG.info("EditLogBackupOutputStream connects to: " + bnAddress);
try {
this.backupNode =
RPC.getProxy(JournalProtocol.class,
JournalProtocol.versionID, bnAddress, new HdfsConfiguration());
} catch(IOException e) {
Storage.LOG.error("Error connecting to: " + bnAddress, e);
throw e;
}
this.bufCurrent = new ArrayList<JournalRecord>();
this.bufReady = new ArrayList<JournalRecord>();
this.out = new DataOutputBuffer(DEFAULT_BUFFER_SIZE);
}
@Override // JournalStream
public String getName() {
return bnRegistration.getAddress();
}
@Override // JournalStream
public JournalType getType() {
return JournalType.BACKUP;
}
@Override
void write(byte[] data, int i, int length) throws IOException {
throw new IOException("Not implemented");
}
@Override // EditLogOutputStream
void write(byte op, long txid, Writable ... writables) throws IOException {
bufCurrent.add(new JournalRecord(op, txid, writables));
}
/**
* There is no persistent storage. Just clear the buffers.
*/
@Override // EditLogOutputStream
void create() throws IOException {
bufCurrent.clear();
assert bufReady.size() == 0 : "previous data is not flushed yet";
}
@Override // EditLogOutputStream
public void close() throws IOException {
// close should have been called after all pending transactions
// have been flushed & synced.
int size = bufCurrent.size();
if (size != 0) {
throw new IOException("BackupEditStream has " + size +
" records still to be flushed and cannot be closed.");
}
RPC.stopProxy(backupNode); // stop the RPC threads
bufCurrent = bufReady = null;
}
@Override
public void abort() throws IOException {
RPC.stopProxy(backupNode);
bufCurrent = bufReady = null;
}
@Override // EditLogOutputStream
void setReadyToFlush() throws IOException {
assert bufReady.size() == 0 : "previous data is not flushed yet";
ArrayList<JournalRecord> tmp = bufReady;
bufReady = bufCurrent;
bufCurrent = tmp;
}
@Override // EditLogOutputStream
protected void flushAndSync() throws IOException {
assert out.size() == 0 : "Output buffer is not empty";
for (JournalRecord jRec : bufReady) {
jRec.write(out);
}
if (out.size() > 0) {
byte[] data = Arrays.copyOf(out.getData(), out.getLength());
backupNode.journal(nnRegistration,
bufReady.get(0).txid, bufReady.size(),
data);
}
bufReady.clear(); // erase all data in the buffer
out.reset(); // reset buffer to the start position
}
/**
* There is no persistent storage. Therefore length is 0.<p>
* Length is used to check when it is large enough to start a checkpoint.
* This criteria should not be used for backup streams.
*/
@Override // EditLogOutputStream
long length() throws IOException {
return 0;
}
/**
* Get backup node registration.
*/
NamenodeRegistration getRegistration() {
return bnRegistration;
}
void startLogSegment(long txId) throws IOException {
backupNode.startLogSegment(nnRegistration, txId);
}
}