blob: 0005f772f08ce6ce3cec1444a6e2c6d013c0e6e6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_TABLE_NAME;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This chore is responsible to create replication marker rows with special WALEdit with family as
* {@link org.apache.hadoop.hbase.wal.WALEdit#METAFAMILY} and column qualifier as
* {@link WALEdit#REPLICATION_MARKER} and empty value. If config key
* {@link #REPLICATION_MARKER_ENABLED_KEY} is set to true, then we will create 1 marker row every
* {@link #REPLICATION_MARKER_CHORE_DURATION_KEY} ms
* {@link org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader} will populate
* the Replication Marker edit with region_server_name, wal_name and wal_offset encoded in
* {@link org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.ReplicationMarkerDescriptor}
* object. {@link org.apache.hadoop.hbase.replication.regionserver.Replication} will change the
* REPLICATION_SCOPE for this edit to GLOBAL so that it can replicate. On the sink cluster,
* {@link org.apache.hadoop.hbase.replication.regionserver.ReplicationSink} will convert the
* ReplicationMarkerDescriptor into a Put mutation to REPLICATION_SINK_TRACKER_TABLE_NAME_STR table.
*/
@InterfaceAudience.Private
public class ReplicationMarkerChore extends ScheduledChore {
private static final Logger LOG = LoggerFactory.getLogger(ReplicationMarkerChore.class);
private static final MultiVersionConcurrencyControl MVCC = new MultiVersionConcurrencyControl();
public static final RegionInfo REGION_INFO =
RegionInfoBuilder.newBuilder(REPLICATION_SINK_TRACKER_TABLE_NAME).build();
private static final String DELIMITER = "_";
private final Configuration conf;
private final RegionServerServices rsServices;
private WAL wal;
public static final String REPLICATION_MARKER_ENABLED_KEY =
"hbase.regionserver.replication.marker.enabled";
public static final boolean REPLICATION_MARKER_ENABLED_DEFAULT = false;
public static final String REPLICATION_MARKER_CHORE_DURATION_KEY =
"hbase.regionserver.replication.marker.chore.duration";
public static final int REPLICATION_MARKER_CHORE_DURATION_DEFAULT = 30 * 1000; // 30 seconds
public ReplicationMarkerChore(final Stoppable stopper, final RegionServerServices rsServices,
int period, Configuration conf) {
super("ReplicationTrackerChore", stopper, period);
this.conf = conf;
this.rsServices = rsServices;
}
@Override
protected void chore() {
if (wal == null) {
try {
// TODO: We need to add support for multi WAL implementation.
wal = rsServices.getWAL(null);
} catch (IOException ioe) {
LOG.warn("Unable to get WAL ", ioe);
// Shouldn't happen. Ignore and wait for the next chore run.
return;
}
}
String serverName = rsServices.getServerName().getServerName();
long timeStamp = EnvironmentEdgeManager.currentTime();
// We only have timestamp in ReplicationMarkerDescriptor and the remaining properties walname,
// regionserver name and wal offset at ReplicationSourceWALReaderThread.
byte[] rowKey = getRowKey(serverName, timeStamp);
if (LOG.isTraceEnabled()) {
LOG.trace("Creating replication marker edit.");
}
// This creates a new ArrayList of all the online regions for every call.
List<HRegion> regions = rsServices.getRegions();
if (regions.isEmpty()) {
LOG.info("There are no online regions for this server, so skipping adding replication marker"
+ " rows for this regionserver");
return;
}
HRegion region = regions.get(ThreadLocalRandom.current().nextInt(regions.size()));
try {
WALUtil.writeReplicationMarkerAndSync(wal, MVCC, region.getRegionInfo(), rowKey, timeStamp);
} catch (IOException ioe) {
LOG.error("Exception while sync'ing replication tracker edit", ioe);
// TODO: Should we stop region server or add a metric and keep going.
}
}
/**
* Creates a rowkey with region server name and timestamp.
* @param serverName region server name
* @param timestamp timestamp
*/
public static byte[] getRowKey(String serverName, long timestamp) {
// converting to string since this will help seeing the timestamp in string format using
// hbase shell commands.
String timestampStr = String.valueOf(timestamp);
final String rowKeyStr = serverName + DELIMITER + timestampStr;
return Bytes.toBytes(rowKeyStr);
}
}