| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.cassandra.batchlog; |
| |
| import java.io.IOException; |
| import java.net.InetAddress; |
| import java.nio.ByteBuffer; |
| import java.util.*; |
| import java.util.concurrent.TimeUnit; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.cassandra.config.SchemaConstants; |
| import org.apache.cassandra.cql3.QueryProcessor; |
| import org.apache.cassandra.cql3.UntypedResultSet; |
| import org.apache.cassandra.db.*; |
| import org.apache.cassandra.db.marshal.UUIDType; |
| import org.apache.cassandra.db.partitions.PartitionUpdate; |
| import org.apache.cassandra.exceptions.WriteFailureException; |
| import org.apache.cassandra.exceptions.WriteTimeoutException; |
| import org.apache.cassandra.io.util.DataInputBuffer; |
| import org.apache.cassandra.io.util.DataOutputBuffer; |
| import org.apache.cassandra.net.MessagingService; |
| import org.apache.cassandra.service.AbstractWriteResponseHandler; |
| import org.apache.cassandra.service.WriteResponseHandler; |
| import org.apache.cassandra.utils.FBUtilities; |
| import org.apache.cassandra.utils.UUIDGen; |
| |
| public final class LegacyBatchlogMigrator |
| { |
| private static final Logger logger = LoggerFactory.getLogger(LegacyBatchlogMigrator.class); |
| |
| private LegacyBatchlogMigrator() |
| { |
| // static class |
| } |
| |
| @SuppressWarnings("deprecation") |
| public static void migrate() |
| { |
| ColumnFamilyStore store = Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME).getColumnFamilyStore(SystemKeyspace.LEGACY_BATCHLOG); |
| |
| // nothing to migrate |
| if (store.isEmpty()) |
| return; |
| |
| logger.info("Migrating legacy batchlog to new storage"); |
| |
| int convertedBatches = 0; |
| String query = String.format("SELECT id, data, written_at, version FROM %s.%s", |
| SchemaConstants.SYSTEM_KEYSPACE_NAME, |
| SystemKeyspace.LEGACY_BATCHLOG); |
| |
| int pageSize = BatchlogManager.calculatePageSize(store); |
| |
| UntypedResultSet rows = QueryProcessor.executeInternalWithPaging(query, pageSize); |
| for (UntypedResultSet.Row row : rows) |
| { |
| if (apply(row, convertedBatches)) |
| convertedBatches++; |
| } |
| |
| if (convertedBatches > 0) |
| Keyspace.openAndGetStore(SystemKeyspace.LegacyBatchlog).truncateBlocking(); |
| } |
| |
| @SuppressWarnings("deprecation") |
| public static boolean isLegacyBatchlogMutation(Mutation mutation) |
| { |
| return mutation.getKeyspaceName().equals(SchemaConstants.SYSTEM_KEYSPACE_NAME) |
| && mutation.getPartitionUpdate(SystemKeyspace.LegacyBatchlog.cfId) != null; |
| } |
| |
| @SuppressWarnings("deprecation") |
| public static void handleLegacyMutation(Mutation mutation) |
| { |
| PartitionUpdate update = mutation.getPartitionUpdate(SystemKeyspace.LegacyBatchlog.cfId); |
| logger.trace("Applying legacy batchlog mutation {}", update); |
| update.forEach(row -> apply(UntypedResultSet.Row.fromInternalRow(update.metadata(), update.partitionKey(), row), -1)); |
| } |
| |
| private static boolean apply(UntypedResultSet.Row row, long counter) |
| { |
| UUID id = row.getUUID("id"); |
| long timestamp = id.version() == 1 ? UUIDGen.unixTimestamp(id) : row.getLong("written_at"); |
| int version = row.has("version") ? row.getInt("version") : MessagingService.VERSION_12; |
| |
| if (id.version() != 1) |
| id = UUIDGen.getTimeUUID(timestamp, counter); |
| |
| logger.trace("Converting mutation at {}", timestamp); |
| |
| try (DataInputBuffer in = new DataInputBuffer(row.getBytes("data"), false)) |
| { |
| int numMutations = in.readInt(); |
| List<Mutation> mutations = new ArrayList<>(numMutations); |
| for (int i = 0; i < numMutations; i++) |
| mutations.add(Mutation.serializer.deserialize(in, version)); |
| |
| BatchlogManager.store(Batch.createLocal(id, TimeUnit.MILLISECONDS.toMicros(timestamp), mutations)); |
| return true; |
| } |
| catch (Throwable t) |
| { |
| logger.error("Failed to convert mutation {} at timestamp {}", id, timestamp, t); |
| return false; |
| } |
| } |
| |
| public static void syncWriteToBatchlog(WriteResponseHandler<?> handler, Batch batch, Collection<InetAddress> endpoints) |
| throws WriteTimeoutException, WriteFailureException |
| { |
| for (InetAddress target : endpoints) |
| { |
| logger.trace("Sending legacy batchlog store request {} to {} for {} mutations", batch.id, target, batch.size()); |
| |
| int targetVersion = MessagingService.instance().getVersion(target); |
| MessagingService.instance().sendRR(getStoreMutation(batch, targetVersion).createMessage(MessagingService.Verb.MUTATION), |
| target, |
| handler, |
| false); |
| } |
| } |
| |
| public static void asyncRemoveFromBatchlog(Collection<InetAddress> endpoints, UUID uuid, long queryStartNanoTime) |
| { |
| AbstractWriteResponseHandler<IMutation> handler = new WriteResponseHandler<>(endpoints, |
| Collections.<InetAddress>emptyList(), |
| ConsistencyLevel.ANY, |
| Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME), |
| null, |
| WriteType.SIMPLE, |
| queryStartNanoTime); |
| Mutation mutation = getRemoveMutation(uuid); |
| |
| for (InetAddress target : endpoints) |
| { |
| logger.trace("Sending legacy batchlog remove request {} to {}", uuid, target); |
| MessagingService.instance().sendRR(mutation.createMessage(MessagingService.Verb.MUTATION), target, handler, false); |
| } |
| } |
| |
| static void store(Batch batch, int version) |
| { |
| getStoreMutation(batch, version).apply(); |
| } |
| |
| @SuppressWarnings("deprecation") |
| static Mutation getStoreMutation(Batch batch, int version) |
| { |
| PartitionUpdate.SimpleBuilder builder = PartitionUpdate.simpleBuilder(SystemKeyspace.LegacyBatchlog, batch.id); |
| builder.row() |
| .timestamp(batch.creationTime) |
| .add("written_at", new Date(batch.creationTime / 1000)) |
| .add("data", getSerializedMutations(version, batch.decodedMutations)) |
| .add("version", version); |
| return builder.buildAsMutation(); |
| } |
| |
| @SuppressWarnings("deprecation") |
| private static Mutation getRemoveMutation(UUID uuid) |
| { |
| return new Mutation(PartitionUpdate.fullPartitionDelete(SystemKeyspace.LegacyBatchlog, |
| UUIDType.instance.decompose(uuid), |
| FBUtilities.timestampMicros(), |
| FBUtilities.nowInSeconds())); |
| } |
| |
| private static ByteBuffer getSerializedMutations(int version, Collection<Mutation> mutations) |
| { |
| try (DataOutputBuffer buf = new DataOutputBuffer()) |
| { |
| buf.writeInt(mutations.size()); |
| for (Mutation mutation : mutations) |
| Mutation.serializer.serialize(mutation, buf, version); |
| return buf.buffer(); |
| } |
| catch (IOException e) |
| { |
| throw new RuntimeException(e); |
| } |
| } |
| } |