| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hdfs.shortcircuit; |
| |
| import java.io.FileInputStream; |
| import java.io.IOException; |
| import java.lang.reflect.Field; |
| import java.util.BitSet; |
| import java.util.Iterator; |
| import java.util.NoSuchElementException; |
| import java.util.Random; |
| |
| import org.apache.commons.lang.builder.EqualsBuilder; |
| import org.apache.commons.lang.builder.HashCodeBuilder; |
| import org.apache.hadoop.fs.InvalidRequestException; |
| import org.apache.hadoop.hdfs.ExtendedBlockId; |
| import org.apache.hadoop.io.nativeio.NativeIO; |
| import org.apache.hadoop.io.nativeio.NativeIO.POSIX; |
| import org.apache.hadoop.util.Shell; |
| import org.apache.hadoop.util.StringUtils; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import sun.misc.Unsafe; |
| |
| import com.google.common.base.Preconditions; |
| import com.google.common.collect.ComparisonChain; |
| import com.google.common.primitives.Ints; |
| |
| import javax.annotation.Nonnull; |
| |
| /** |
| * A shared memory segment used to implement short-circuit reads. |
| */ |
| public class ShortCircuitShm { |
| private static final Logger LOG = LoggerFactory.getLogger( |
| ShortCircuitShm.class); |
| |
| protected static final int BYTES_PER_SLOT = 64; |
| |
| private static final Unsafe unsafe = safetyDance(); |
| |
| private static Unsafe safetyDance() { |
| try { |
| Field f = Unsafe.class.getDeclaredField("theUnsafe"); |
| f.setAccessible(true); |
| return (Unsafe)f.get(null); |
| } catch (Throwable e) { |
| LOG.error("failed to load misc.Unsafe", e); |
| } |
| return null; |
| } |
| |
| /** |
| * Calculate the usable size of a shared memory segment. |
| * We round down to a multiple of the slot size and do some validation. |
| * |
| * @param stream The stream we're using. |
| * @return The usable size of the shared memory segment. |
| */ |
| private static int getUsableLength(FileInputStream stream) |
| throws IOException { |
| int intSize = Ints.checkedCast(stream.getChannel().size()); |
| int slots = intSize / BYTES_PER_SLOT; |
| if (slots == 0) { |
| throw new IOException("size of shared memory segment was " + |
| intSize + ", but that is not enough to hold even one slot."); |
| } |
| return slots * BYTES_PER_SLOT; |
| } |
| |
| /** |
| * Identifies a DfsClientShm. |
| */ |
| public static class ShmId implements Comparable<ShmId> { |
| private static final Random random = new Random(); |
| private final long hi; |
| private final long lo; |
| |
| /** |
| * Generate a random ShmId. |
| * |
| * We generate ShmIds randomly to prevent a malicious client from |
| * successfully guessing one and using that to interfere with another |
| * client. |
| */ |
| public static ShmId createRandom() { |
| return new ShmId(random.nextLong(), random.nextLong()); |
| } |
| |
| public ShmId(long hi, long lo) { |
| this.hi = hi; |
| this.lo = lo; |
| } |
| |
| public long getHi() { |
| return hi; |
| } |
| |
| public long getLo() { |
| return lo; |
| } |
| |
| @Override |
| public boolean equals(Object o) { |
| if ((o == null) || (o.getClass() != this.getClass())) { |
| return false; |
| } |
| ShmId other = (ShmId)o; |
| return new EqualsBuilder(). |
| append(hi, other.hi). |
| append(lo, other.lo). |
| isEquals(); |
| } |
| |
| @Override |
| public int hashCode() { |
| return new HashCodeBuilder(). |
| append(this.hi). |
| append(this.lo). |
| toHashCode(); |
| } |
| |
| @Override |
| public String toString() { |
| return String.format("%016x%016x", hi, lo); |
| } |
| |
| @Override |
| public int compareTo(@Nonnull ShmId other) { |
| return ComparisonChain.start(). |
| compare(hi, other.hi). |
| compare(lo, other.lo). |
| result(); |
| } |
| } |
| |
| /** |
| * Uniquely identifies a slot. |
| */ |
| public static class SlotId { |
| private final ShmId shmId; |
| private final int slotIdx; |
| |
| public SlotId(ShmId shmId, int slotIdx) { |
| this.shmId = shmId; |
| this.slotIdx = slotIdx; |
| } |
| |
| public ShmId getShmId() { |
| return shmId; |
| } |
| |
| public int getSlotIdx() { |
| return slotIdx; |
| } |
| |
| @Override |
| public boolean equals(Object o) { |
| if ((o == null) || (o.getClass() != this.getClass())) { |
| return false; |
| } |
| SlotId other = (SlotId)o; |
| return new EqualsBuilder(). |
| append(shmId, other.shmId). |
| append(slotIdx, other.slotIdx). |
| isEquals(); |
| } |
| |
| @Override |
| public int hashCode() { |
| return new HashCodeBuilder(). |
| append(this.shmId). |
| append(this.slotIdx). |
| toHashCode(); |
| } |
| |
| @Override |
| public String toString() { |
| return String.format("SlotId(%s:%d)", shmId.toString(), slotIdx); |
| } |
| } |
| |
| public class SlotIterator implements Iterator<Slot> { |
| int slotIdx = -1; |
| |
| @Override |
| public boolean hasNext() { |
| synchronized (ShortCircuitShm.this) { |
| return allocatedSlots.nextSetBit(slotIdx + 1) != -1; |
| } |
| } |
| |
| @Override |
| public Slot next() { |
| synchronized (ShortCircuitShm.this) { |
| int nextSlotIdx = allocatedSlots.nextSetBit(slotIdx + 1); |
| if (nextSlotIdx == -1) { |
| throw new NoSuchElementException(); |
| } |
| slotIdx = nextSlotIdx; |
| return slots[nextSlotIdx]; |
| } |
| } |
| |
| @Override |
| public void remove() { |
| throw new UnsupportedOperationException("SlotIterator " + |
| "doesn't support removal"); |
| } |
| } |
| |
| /** |
| * A slot containing information about a replica. |
| * |
| * The format is: |
| * word 0 |
| * bit 0:32 Slot flags (see below). |
| * bit 33:63 Anchor count. |
| * word 1:7 |
| * Reserved for future use, such as statistics. |
| * Padding is also useful for avoiding false sharing. |
| * |
| * Little-endian versus big-endian is not relevant here since both the client |
| * and the server reside on the same computer and use the same orientation. |
| */ |
| public class Slot { |
| /** |
| * Flag indicating that the slot is valid. |
| * |
| * The DFSClient sets this flag when it allocates a new slot within one of |
| * its shared memory regions. |
| * |
| * The DataNode clears this flag when the replica associated with this slot |
| * is no longer valid. The client itself also clears this flag when it |
| * believes that the DataNode is no longer using this slot to communicate. |
| */ |
| private static final long VALID_FLAG = 1L<<63; |
| |
| /** |
| * Flag indicating that the slot can be anchored. |
| */ |
| private static final long ANCHORABLE_FLAG = 1L<<62; |
| |
| /** |
| * The slot address in memory. |
| */ |
| private final long slotAddress; |
| |
| /** |
| * BlockId of the block this slot is used for. |
| */ |
| private final ExtendedBlockId blockId; |
| |
| Slot(long slotAddress, ExtendedBlockId blockId) { |
| this.slotAddress = slotAddress; |
| this.blockId = blockId; |
| } |
| |
| /** |
| * Get the short-circuit memory segment associated with this Slot. |
| * |
| * @return The enclosing short-circuit memory segment. |
| */ |
| public ShortCircuitShm getShm() { |
| return ShortCircuitShm.this; |
| } |
| |
| /** |
| * Get the ExtendedBlockId associated with this slot. |
| * |
| * @return The ExtendedBlockId of this slot. |
| */ |
| public ExtendedBlockId getBlockId() { |
| return blockId; |
| } |
| |
| /** |
| * Get the SlotId of this slot, containing both shmId and slotIdx. |
| * |
| * @return The SlotId of this slot. |
| */ |
| public SlotId getSlotId() { |
| return new SlotId(getShmId(), getSlotIdx()); |
| } |
| |
| /** |
| * Get the Slot index. |
| * |
| * @return The index of this slot. |
| */ |
| public int getSlotIdx() { |
| return Ints.checkedCast( |
| (slotAddress - baseAddress) / BYTES_PER_SLOT); |
| } |
| |
| /** |
| * Clear the slot. |
| */ |
| void clear() { |
| unsafe.putLongVolatile(null, this.slotAddress, 0); |
| } |
| |
| private boolean isSet(long flag) { |
| long prev = unsafe.getLongVolatile(null, this.slotAddress); |
| return (prev & flag) != 0; |
| } |
| |
| private void setFlag(long flag) { |
| long prev; |
| do { |
| prev = unsafe.getLongVolatile(null, this.slotAddress); |
| if ((prev & flag) != 0) { |
| return; |
| } |
| } while (!unsafe.compareAndSwapLong(null, this.slotAddress, |
| prev, prev | flag)); |
| } |
| |
| private void clearFlag(long flag) { |
| long prev; |
| do { |
| prev = unsafe.getLongVolatile(null, this.slotAddress); |
| if ((prev & flag) == 0) { |
| return; |
| } |
| } while (!unsafe.compareAndSwapLong(null, this.slotAddress, |
| prev, prev & (~flag))); |
| } |
| |
| public boolean isValid() { |
| return isSet(VALID_FLAG); |
| } |
| |
| public void makeValid() { |
| setFlag(VALID_FLAG); |
| } |
| |
| public void makeInvalid() { |
| clearFlag(VALID_FLAG); |
| } |
| |
| public boolean isAnchorable() { |
| return isSet(ANCHORABLE_FLAG); |
| } |
| |
| public void makeAnchorable() { |
| setFlag(ANCHORABLE_FLAG); |
| } |
| |
| public void makeUnanchorable() { |
| clearFlag(ANCHORABLE_FLAG); |
| } |
| |
| public boolean isAnchored() { |
| long prev = unsafe.getLongVolatile(null, this.slotAddress); |
| // Slot is no longer valid. |
| return (prev & VALID_FLAG) != 0 && ((prev & 0x7fffffff) != 0); |
| } |
| |
| /** |
| * Try to add an anchor for a given slot. |
| * |
| * When a slot is anchored, we know that the block it refers to is resident |
| * in memory. |
| * |
| * @return True if the slot is anchored. |
| */ |
| public boolean addAnchor() { |
| long prev; |
| do { |
| prev = unsafe.getLongVolatile(null, this.slotAddress); |
| if ((prev & VALID_FLAG) == 0) { |
| // Slot is no longer valid. |
| return false; |
| } |
| if ((prev & ANCHORABLE_FLAG) == 0) { |
| // Slot can't be anchored right now. |
| return false; |
| } |
| if ((prev & 0x7fffffff) == 0x7fffffff) { |
| // Too many other threads have anchored the slot (2 billion?) |
| return false; |
| } |
| } while (!unsafe.compareAndSwapLong(null, this.slotAddress, |
| prev, prev + 1)); |
| return true; |
| } |
| |
| /** |
| * Remove an anchor for a given slot. |
| */ |
| public void removeAnchor() { |
| long prev; |
| do { |
| prev = unsafe.getLongVolatile(null, this.slotAddress); |
| Preconditions.checkState((prev & 0x7fffffff) != 0, |
| "Tried to remove anchor for slot " + slotAddress +", which was " + |
| "not anchored."); |
| } while (!unsafe.compareAndSwapLong(null, this.slotAddress, |
| prev, prev - 1)); |
| } |
| |
| @Override |
| public String toString() { |
| return "Slot(slotIdx=" + getSlotIdx() + ", shm=" + getShm() + ")"; |
| } |
| } |
| |
| /** |
| * ID for this SharedMemorySegment. |
| */ |
| private final ShmId shmId; |
| |
| /** |
| * The base address of the memory-mapped file. |
| */ |
| private final long baseAddress; |
| |
| /** |
| * The mmapped length of the shared memory segment |
| */ |
| private final int mmappedLength; |
| |
| /** |
| * The slots associated with this shared memory segment. |
| * slot[i] contains the slot at offset i * BYTES_PER_SLOT, |
| * or null if that slot is not allocated. |
| */ |
| private final Slot slots[]; |
| |
| /** |
| * A bitset where each bit represents a slot which is in use. |
| */ |
| private final BitSet allocatedSlots; |
| |
| /** |
| * Create the ShortCircuitShm. |
| * |
| * @param shmId The ID to use. |
| * @param stream The stream that we're going to use to create this |
| * shared memory segment. |
| * |
| * Although this is a FileInputStream, we are going to |
| * assume that the underlying file descriptor is writable |
| * as well as readable. It would be more appropriate to use |
| * a RandomAccessFile here, but that class does not have |
| * any public accessor which returns a FileDescriptor, |
| * unlike FileInputStream. |
| */ |
| public ShortCircuitShm(ShmId shmId, FileInputStream stream) |
| throws IOException { |
| if (!NativeIO.isAvailable()) { |
| throw new UnsupportedOperationException("NativeIO is not available."); |
| } |
| if (Shell.WINDOWS) { |
| throw new UnsupportedOperationException( |
| "DfsClientShm is not yet implemented for Windows."); |
| } |
| if (unsafe == null) { |
| throw new UnsupportedOperationException( |
| "can't use DfsClientShm because we failed to " + |
| "load misc.Unsafe."); |
| } |
| this.shmId = shmId; |
| this.mmappedLength = getUsableLength(stream); |
| this.baseAddress = POSIX.mmap(stream.getFD(), |
| POSIX.MMAP_PROT_READ | POSIX.MMAP_PROT_WRITE, true, mmappedLength); |
| this.slots = new Slot[mmappedLength / BYTES_PER_SLOT]; |
| this.allocatedSlots = new BitSet(slots.length); |
| LOG.trace("creating {}(shmId={}, mmappedLength={}, baseAddress={}, " |
| + "slots.length={})", this.getClass().getSimpleName(), shmId, |
| mmappedLength, String.format("%x", baseAddress), slots.length); |
| } |
| |
| public final ShmId getShmId() { |
| return shmId; |
| } |
| |
| /** |
| * Determine if this shared memory object is empty. |
| * |
| * @return True if the shared memory object is empty. |
| */ |
| synchronized final public boolean isEmpty() { |
| return allocatedSlots.nextSetBit(0) == -1; |
| } |
| |
| /** |
| * Determine if this shared memory object is full. |
| * |
| * @return True if the shared memory object is full. |
| */ |
| synchronized final public boolean isFull() { |
| return allocatedSlots.nextClearBit(0) >= slots.length; |
| } |
| |
| /** |
| * Calculate the base address of a slot. |
| * |
| * @param slotIdx Index of the slot. |
| * @return The base address of the slot. |
| */ |
| private long calculateSlotAddress(int slotIdx) { |
| long offset = slotIdx; |
| offset *= BYTES_PER_SLOT; |
| return this.baseAddress + offset; |
| } |
| |
| /** |
| * Allocate a new slot and register it. |
| * |
| * This function chooses an empty slot, initializes it, and then returns |
| * the relevant Slot object. |
| * |
| * @return The new slot. |
| */ |
| synchronized public final Slot allocAndRegisterSlot( |
| ExtendedBlockId blockId) { |
| int idx = allocatedSlots.nextClearBit(0); |
| if (idx >= slots.length) { |
| throw new RuntimeException(this + ": no more slots are available."); |
| } |
| allocatedSlots.set(idx, true); |
| Slot slot = new Slot(calculateSlotAddress(idx), blockId); |
| slot.clear(); |
| slot.makeValid(); |
| slots[idx] = slot; |
| if (LOG.isTraceEnabled()) { |
| LOG.trace(this + ": allocAndRegisterSlot " + idx + ": allocatedSlots=" + allocatedSlots + |
| StringUtils.getStackTrace(Thread.currentThread())); |
| } |
| return slot; |
| } |
| |
| synchronized public final Slot getSlot(int slotIdx) |
| throws InvalidRequestException { |
| if (!allocatedSlots.get(slotIdx)) { |
| throw new InvalidRequestException(this + ": slot " + slotIdx + |
| " does not exist."); |
| } |
| return slots[slotIdx]; |
| } |
| |
| /** |
| * Register a slot. |
| * |
| * This function looks at a slot which has already been initialized (by |
| * another process), and registers it with us. Then, it returns the |
| * relevant Slot object. |
| * |
| * @return The slot. |
| * |
| * @throws InvalidRequestException |
| * If the slot index we're trying to allocate has not been |
| * initialized, or is already in use. |
| */ |
| synchronized public final Slot registerSlot(int slotIdx, |
| ExtendedBlockId blockId) throws InvalidRequestException { |
| if (slotIdx < 0) { |
| throw new InvalidRequestException(this + ": invalid negative slot " + |
| "index " + slotIdx); |
| } |
| if (slotIdx >= slots.length) { |
| throw new InvalidRequestException(this + ": invalid slot " + |
| "index " + slotIdx); |
| } |
| if (allocatedSlots.get(slotIdx)) { |
| throw new InvalidRequestException(this + ": slot " + slotIdx + |
| " is already in use."); |
| } |
| Slot slot = new Slot(calculateSlotAddress(slotIdx), blockId); |
| if (!slot.isValid()) { |
| throw new InvalidRequestException(this + ": slot " + slotIdx + |
| " is not marked as valid."); |
| } |
| slots[slotIdx] = slot; |
| allocatedSlots.set(slotIdx, true); |
| if (LOG.isTraceEnabled()) { |
| LOG.trace(this + ": registerSlot " + slotIdx + ": allocatedSlots=" + allocatedSlots + |
| StringUtils.getStackTrace(Thread.currentThread())); |
| } |
| return slot; |
| } |
| |
| /** |
| * Unregisters a slot. |
| * |
| * This doesn't alter the contents of the slot. It just means |
| * |
| * @param slotIdx Index of the slot to unregister. |
| */ |
| synchronized public final void unregisterSlot(int slotIdx) { |
| Preconditions.checkState(allocatedSlots.get(slotIdx), |
| "tried to unregister slot " + slotIdx + ", which was not registered."); |
| allocatedSlots.set(slotIdx, false); |
| slots[slotIdx] = null; |
| LOG.trace("{}: unregisterSlot {}", this, slotIdx); |
| } |
| |
| /** |
| * Iterate over all allocated slots. |
| * |
| * Note that this method isn't safe if |
| * |
| * @return The slot iterator. |
| */ |
| public SlotIterator slotIterator() { |
| return new SlotIterator(); |
| } |
| |
| public void free() { |
| try { |
| POSIX.munmap(baseAddress, mmappedLength); |
| } catch (IOException e) { |
| LOG.warn(this + ": failed to munmap", e); |
| } |
| LOG.trace(this + ": freed"); |
| } |
| |
| @Override |
| public String toString() { |
| return this.getClass().getSimpleName() + "(" + shmId + ")"; |
| } |
| } |