| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.cassandra.db.commitlog; |
| |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.util.Map; |
| import javax.crypto.Cipher; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.cassandra.io.FSWriteError; |
| import org.apache.cassandra.io.compress.BufferType; |
| import org.apache.cassandra.io.compress.ICompressor; |
| import org.apache.cassandra.security.EncryptionUtils; |
| import org.apache.cassandra.security.EncryptionContext; |
| import org.apache.cassandra.utils.ByteBufferUtil; |
| import org.apache.cassandra.utils.Hex; |
| import org.apache.cassandra.utils.SyncUtil; |
| |
| import static org.apache.cassandra.security.EncryptionUtils.ENCRYPTED_BLOCK_HEADER_SIZE; |
| |
| /** |
| * Writes encrypted segments to disk. Data is compressed before encrypting to (hopefully) reduce the size of the data into |
| * the encryption algorithms. |
| * |
| * The format of the encrypted commit log is as follows: |
| * - standard commit log header (as written by {@link CommitLogDescriptor#writeHeader(ByteBuffer, CommitLogDescriptor)}) |
| * - a series of 'sync segments' that are written every time the commit log is sync()'ed |
| * -- a sync section header, see {@link CommitLogSegment#writeSyncMarker(ByteBuffer, int, int, int)} |
| * -- total plain text length for this section |
| * -- a series of encrypted data blocks, each of which contains: |
| * --- the length of the encrypted block (cipher text) |
| * --- the length of the unencrypted data (compressed text) |
| * --- the encrypted block, which contains: |
| * ---- the length of the plain text (raw) data |
| * ---- block of compressed data |
| * |
| * Notes: |
| * - "length of the unencrypted data" is different from the length of resulting decrypted buffer as encryption adds padding |
| * to the output buffer, and we need to ignore that padding when processing. |
| */ |
| public class EncryptedSegment extends FileDirectSegment |
| { |
| private static final Logger logger = LoggerFactory.getLogger(EncryptedSegment.class); |
| |
| private static final int ENCRYPTED_SECTION_HEADER_SIZE = SYNC_MARKER_SIZE + 4; |
| |
| private final EncryptionContext encryptionContext; |
| private final Cipher cipher; |
| |
| public EncryptedSegment(CommitLog commitLog, AbstractCommitLogSegmentManager manager, Runnable onClose) |
| { |
| super(commitLog, manager, onClose); |
| this.encryptionContext = commitLog.configuration.getEncryptionContext(); |
| |
| try |
| { |
| cipher = encryptionContext.getEncryptor(); |
| } |
| catch (IOException e) |
| { |
| throw new FSWriteError(e, logFile); |
| } |
| logger.debug("created a new encrypted commit log segment: {}", logFile); |
| } |
| |
| protected Map<String, String> additionalHeaderParameters() |
| { |
| Map<String, String> map = encryptionContext.toHeaderParameters(); |
| map.put(EncryptionContext.ENCRYPTION_IV, Hex.bytesToHex(cipher.getIV())); |
| return map; |
| } |
| |
| ByteBuffer createBuffer(CommitLog commitLog) |
| { |
| // Note: we want to keep the compression buffers on-heap as we need those bytes for encryption, |
| // and we want to avoid copying from off-heap (compression buffer) to on-heap encryption APIs |
| return manager.getBufferPool().createBuffer(BufferType.ON_HEAP); |
| } |
| |
| void write(int startMarker, int nextMarker) |
| { |
| int contentStart = startMarker + SYNC_MARKER_SIZE; |
| final int length = nextMarker - contentStart; |
| // The length may be 0 when the segment is being closed. |
| assert length > 0 || length == 0 && !isStillAllocating(); |
| |
| final ICompressor compressor = encryptionContext.getCompressor(); |
| final int blockSize = encryptionContext.getChunkLength(); |
| try |
| { |
| ByteBuffer inputBuffer = buffer.duplicate(); |
| inputBuffer.limit(contentStart + length).position(contentStart); |
| ByteBuffer buffer = manager.getBufferPool().getThreadLocalReusableBuffer(); |
| |
| // save space for the sync marker at the beginning of this section |
| final long syncMarkerPosition = lastWrittenPos; |
| channel.position(syncMarkerPosition + ENCRYPTED_SECTION_HEADER_SIZE); |
| |
| // loop over the segment data in encryption buffer sized chunks |
| while (contentStart < nextMarker) |
| { |
| int nextBlockSize = nextMarker - blockSize > contentStart ? blockSize : nextMarker - contentStart; |
| ByteBuffer slice = inputBuffer.duplicate(); |
| slice.limit(contentStart + nextBlockSize).position(contentStart); |
| |
| buffer = EncryptionUtils.compress(slice, buffer, true, compressor); |
| |
| // reuse the same buffer for the input and output of the encryption operation |
| buffer = EncryptionUtils.encryptAndWrite(buffer, channel, true, cipher); |
| |
| contentStart += nextBlockSize; |
| manager.addSize(buffer.limit() + ENCRYPTED_BLOCK_HEADER_SIZE); |
| } |
| |
| lastWrittenPos = channel.position(); |
| |
| // rewind to the beginning of the section and write out the sync marker, |
| // reusing the one of the existing buffers |
| buffer = ByteBufferUtil.ensureCapacity(buffer, ENCRYPTED_SECTION_HEADER_SIZE, true); |
| writeSyncMarker(buffer, 0, (int) syncMarkerPosition, (int) lastWrittenPos); |
| buffer.putInt(SYNC_MARKER_SIZE, length); |
| buffer.position(0).limit(ENCRYPTED_SECTION_HEADER_SIZE); |
| manager.addSize(buffer.limit()); |
| |
| channel.position(syncMarkerPosition); |
| channel.write(buffer); |
| |
| SyncUtil.force(channel, true); |
| |
| if (manager.getBufferPool().getThreadLocalReusableBuffer().capacity() < buffer.capacity()) |
| manager.getBufferPool().setThreadLocalReusableBuffer(buffer); |
| } |
| catch (Exception e) |
| { |
| throw new FSWriteError(e, getPath()); |
| } |
| } |
| |
| public long onDiskSize() |
| { |
| return lastWrittenPos; |
| } |
| } |