| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.cassandra.diff; |
| |
| import java.math.BigInteger; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.UUID; |
| import java.util.stream.Collectors; |
| |
| import com.google.common.base.Throwables; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.datastax.driver.core.BatchStatement; |
| import com.datastax.driver.core.ConsistencyLevel; |
| import com.datastax.driver.core.PreparedStatement; |
| import com.datastax.driver.core.ResultSet; |
| import com.datastax.driver.core.Row; |
| import com.datastax.driver.core.Session; |
| import com.datastax.driver.core.SimpleStatement; |
| import com.datastax.driver.core.Statement; |
| import com.datastax.driver.core.utils.UUIDs; |
| import org.joda.time.DateTime; |
| import org.joda.time.DateTimeZone; |
| |
| public class JobMetadataDb { |
| private static final Logger logger = LoggerFactory.getLogger(JobMetadataDb.class); |
| |
| static class ProgressTracker { |
| |
| private final UUID jobId; |
| private final int bucket; |
| private final String startToken; |
| private final String endToken; |
| private final String metadataKeyspace; |
| private Session session; |
| |
| private static PreparedStatement updateStmt; |
| private static PreparedStatement mismatchStmt; |
| private static PreparedStatement errorSummaryStmt; |
| private static PreparedStatement errorDetailStmt; |
| private static PreparedStatement updateCompleteStmt; |
| |
| public ProgressTracker(UUID jobId, |
| int bucket, |
| BigInteger startToken, |
| BigInteger endToken, |
| String metadataKeyspace, |
| Session session) { |
| this.jobId = jobId; |
| this.bucket = bucket; |
| this.startToken = startToken.toString(); |
| this.endToken = endToken.toString(); |
| this.metadataKeyspace = metadataKeyspace; |
| this.session = session; |
| } |
| |
| /** |
| * Runs on each executor to prepare statements shared across all instances |
| */ |
| public static void initializeStatements(Session session, String metadataKeyspace) { |
| if (updateStmt == null) { |
| updateStmt = session.prepare(String.format("INSERT INTO %s.%s (" + |
| " job_id," + |
| " bucket," + |
| " qualified_table_name," + |
| " start_token," + |
| " end_token," + |
| " matched_partitions," + |
| " mismatched_partitions," + |
| " partitions_only_in_source," + |
| " partitions_only_in_target," + |
| " matched_rows," + |
| " matched_values," + |
| " mismatched_values," + |
| " skipped_partitions," + |
| " last_token )" + |
| "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", |
| metadataKeyspace, Schema.TASK_STATUS)); |
| } |
| if (mismatchStmt == null) { |
| mismatchStmt = session.prepare(String.format("INSERT INTO %s.%s (" + |
| " job_id," + |
| " bucket," + |
| " qualified_table_name," + |
| " mismatching_token," + |
| " mismatch_type )" + |
| "VALUES (?, ?, ?, ?, ?)", |
| metadataKeyspace, Schema.MISMATCHES)); |
| } |
| if (updateCompleteStmt == null) { |
| updateCompleteStmt = session.prepare(String.format("UPDATE %s.%s " + |
| " SET completed = completed + 1" + |
| " WHERE job_id = ? " + |
| " AND bucket = ? " + |
| " AND qualified_table_name = ? ", |
| metadataKeyspace, Schema.JOB_STATUS)) |
| .setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM); |
| } |
| if (errorSummaryStmt == null) { |
| errorSummaryStmt = session.prepare(String.format("INSERT INTO %s.%s (" + |
| " job_id," + |
| " bucket," + |
| " qualified_table_name," + |
| " start_token," + |
| " end_token)" + |
| " VALUES (?, ?, ?, ?, ?)", |
| metadataKeyspace, Schema.ERROR_SUMMARY)); |
| } |
| if (errorDetailStmt == null) { |
| errorDetailStmt = session.prepare(String.format("INSERT INTO %s.%s (" + |
| " job_id," + |
| " bucket," + |
| " qualified_table_name," + |
| " start_token," + |
| " end_token," + |
| " error_token," + |
| " error_source)" + |
| " VALUES (?, ?, ?, ?, ?, ?, ?)", |
| metadataKeyspace, Schema.ERROR_DETAIL)); |
| } |
| |
| } |
| |
| public static void resetStatements() |
| { |
| updateStmt = null; |
| mismatchStmt = null; |
| errorSummaryStmt = null; |
| errorDetailStmt = null; |
| updateCompleteStmt = null; |
| } |
| |
| /** |
| * |
| * @param keyspaceTablePair |
| * @return |
| */ |
| public DiffJob.TaskStatus getLastStatus(KeyspaceTablePair keyspaceTablePair) { |
| ResultSet rs = session.execute(String.format("SELECT last_token, " + |
| " matched_partitions, " + |
| " mismatched_partitions, " + |
| " partitions_only_in_source, " + |
| " partitions_only_in_target, " + |
| " matched_rows," + |
| " matched_values," + |
| " mismatched_values," + |
| " skipped_partitions " + |
| " FROM %s.%s " + |
| " WHERE job_id = ? " + |
| " AND bucket = ? " + |
| " AND qualified_table_name = ? " + |
| " AND start_token = ? " + |
| " AND end_token = ?", |
| metadataKeyspace, Schema.TASK_STATUS), |
| jobId, bucket, keyspaceTablePair.toCqlValueString(), startToken, endToken); |
| Row row = rs.one(); |
| if (null == row) |
| return DiffJob.TaskStatus.EMPTY; |
| |
| RangeStats stats = RangeStats.withValues(getOrDefaultLong(row, "matched_partitions"), |
| getOrDefaultLong(row, "mismatched_partitions"), |
| 0L, // error counts are per-run and not persisted in the metadata db |
| getOrDefaultLong(row, "skipped_partitions"), |
| getOrDefaultLong(row, "partitions_only_in_source"), |
| getOrDefaultLong(row, "partitions_only_in_target"), |
| getOrDefaultLong(row, "matched_rows"), |
| getOrDefaultLong(row, "matched_values"), |
| getOrDefaultLong(row, "mismatched_values")); |
| |
| BigInteger lastToken = row.isNull("last_token") ? null : new BigInteger(row.getString("last_token")); |
| return new DiffJob.TaskStatus(lastToken, stats); |
| } |
| |
| /** |
| * |
| * @param table |
| * @param diffStats |
| * @param latestToken |
| */ |
| public void updateStatus(KeyspaceTablePair table, RangeStats diffStats, BigInteger latestToken) { |
| session.execute(bindUpdateStatement(table, diffStats, latestToken)); |
| } |
| |
| public void recordMismatch(KeyspaceTablePair table, MismatchType type, BigInteger token) { |
| logger.info("Detected mismatch in table {}; partition with token {} is {}", |
| table, token, type == MismatchType.PARTITION_MISMATCH |
| ? " different in source and target clusters" |
| : type == MismatchType.ONLY_IN_SOURCE ? "only present in source cluster" |
| : "only present in target cluster"); |
| session.execute(bindMismatchesStatement(table, token, type.name())); |
| } |
| |
| /** |
| * |
| * @param table |
| * @param token |
| * @param error |
| */ |
| public void recordError(KeyspaceTablePair table, BigInteger token, Throwable error) { |
| logger.error(String.format("Encountered error during partition comparison in table %s; " + |
| "error for partition with token %s", table, token), error); |
| BatchStatement batch = new BatchStatement(); |
| batch.add(bindErrorSummaryStatement(table)); |
| DiffCluster.Type exceptionSource = null; |
| int maxRetrace = 10; // In case there is a loop, we do not want to loop forever or throw. So just limit the number of retracing. |
| for (Throwable t = error; t.getCause() != null && maxRetrace > 0; t = t.getCause(), maxRetrace--) { |
| if (t instanceof ClusterSourcedException) { |
| exceptionSource = ((ClusterSourcedException) t).exceptionSource; |
| break; |
| } |
| } |
| batch.add(bindErrorDetailStatement(table, token, exceptionSource)); |
| batch.setIdempotent(true); |
| session.execute(batch); |
| } |
| |
| /** |
| * |
| * @param table |
| * @param stats |
| */ |
| public void finishTable(KeyspaceTablePair table, RangeStats stats, boolean updateCompletedCount) { |
| logger.info("Finishing range [{}, {}] for table {}", startToken, endToken, table); |
| // first flush out the last status. |
| session.execute(bindUpdateStatement(table, stats, endToken)); |
| // then update the count of completed tasks |
| if (updateCompletedCount) |
| session.execute(updateCompleteStmt.bind(jobId, bucket, table.toCqlValueString())); |
| } |
| |
| private Statement bindMismatchesStatement(KeyspaceTablePair table, BigInteger token, String type) { |
| return mismatchStmt.bind(jobId, bucket, table.toCqlValueString(), token.toString(), type) |
| .setIdempotent(true); |
| } |
| |
| private Statement bindErrorSummaryStatement(KeyspaceTablePair table) { |
| return errorSummaryStmt.bind(jobId, bucket, table.toCqlValueString(), startToken, endToken) |
| .setIdempotent(true); |
| } |
| |
| private Statement bindErrorDetailStatement(KeyspaceTablePair table, BigInteger errorToken, DiffCluster.Type exceptionSource) { |
| String errorSource = exceptionSource == null ? "" : exceptionSource.name(); |
| return errorDetailStmt.bind(jobId, bucket, table.toCqlValueString(), startToken, endToken, errorToken.toString(), errorSource) |
| .setIdempotent(true); |
| } |
| |
| private Statement bindUpdateStatement(KeyspaceTablePair table, RangeStats stats, BigInteger token) { |
| return bindUpdateStatement(table, stats, token.toString()); |
| } |
| |
| private Statement bindUpdateStatement(KeyspaceTablePair table, RangeStats stats, String token) { |
| // We don't persist the partition error count from RangeStats as errors |
| // are likely to be transient and not data related, so we don't want to |
| // accumulate them across runs. |
| return updateStmt.bind(jobId, |
| bucket, |
| table.toCqlValueString(), |
| startToken, |
| endToken, |
| stats.getMatchedPartitions(), |
| stats.getMismatchedPartitions(), |
| stats.getOnlyInSource(), |
| stats.getOnlyInTarget(), |
| stats.getMatchedRows(), |
| stats.getMatchedValues(), |
| stats.getMismatchedValues(), |
| stats.getSkippedPartitions(), |
| token) |
| .setIdempotent(true); |
| } |
| |
| private static long getOrDefaultLong(Row row, String column) { |
| return (null == row || row.isNull(column)) ? 0L : row.getLong(column); |
| } |
| } |
| |
| static class JobLifeCycle { |
| final Session session; |
| final String metadataKeyspace; |
| |
| public JobLifeCycle(Session session, String metadataKeyspace) { |
| this.session = session; |
| this.metadataKeyspace = metadataKeyspace; |
| } |
| |
| public DiffJob.Params getJobParams(UUID jobId) { |
| ResultSet rs = session.execute(String.format("SELECT qualified_table_names," + |
| " buckets," + |
| " total_tasks " + |
| "FROM %s.%s " + |
| "WHERE job_id = ?", |
| metadataKeyspace, Schema.JOB_SUMMARY), |
| jobId); |
| Row row = rs.one(); |
| if (null == row) |
| return null; |
| |
| // qualified_table_names is encoded as a List<String>. Decode it back to List<KeyspaceTablePair>. |
| List<KeyspaceTablePair> keyspaceTables = row.getList("qualified_table_names", String.class) |
| .stream() |
| .map(KeyspaceTablePair::new) |
| .collect(Collectors.toList());; |
| return new DiffJob.Params(jobId, |
| keyspaceTables, |
| row.getInt("buckets"), |
| row.getInt("total_tasks")); |
| } |
| |
| |
| // Runs on Driver to insert top level job info |
| public void initializeJob(DiffJob.Params params, |
| String sourceClusterName, |
| String sourceClusterDesc, |
| String targetClusterName, |
| String targetClusterDesc) { |
| |
| logger.info("Initializing job status"); |
| // The job was previously run, so this could be a re-run to |
| // mop up any failed splits so mark it in progress. |
| ResultSet rs = session.execute(String.format("INSERT INTO %s.%s (job_id) VALUES (?) IF NOT EXISTS", |
| metadataKeyspace, Schema.RUNNING_JOBS), |
| params.jobId); |
| if (!rs.one().getBool("[applied]")) { |
| logger.info("Aborting due to inability to mark job as running. " + |
| "Did a previous run of job id {} fail non-gracefully?", |
| params.jobId); |
| throw new RuntimeException("Unable to mark job running, aborting"); |
| } |
| |
| UUID timeUUID = UUIDs.timeBased(); |
| DateTime startDateTime = new DateTime(UUIDs.unixTimestamp(timeUUID), DateTimeZone.UTC); |
| |
| rs = session.execute(String.format("INSERT INTO %s.%s (" + |
| " job_id," + |
| " job_start_time," + |
| " buckets," + |
| " qualified_table_names," + |
| " source_cluster_name," + |
| " source_cluster_desc," + |
| " target_cluster_name," + |
| " target_cluster_desc," + |
| " total_tasks)" + |
| " VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)" + |
| " IF NOT EXISTS", |
| metadataKeyspace, Schema.JOB_SUMMARY), |
| params.jobId, |
| timeUUID, |
| params.buckets, |
| params.keyspaceTables.stream().map(KeyspaceTablePair::toCqlValueString).collect(Collectors.toList()), |
| sourceClusterName, |
| sourceClusterDesc, |
| targetClusterName, |
| targetClusterDesc, |
| params.tasks); |
| |
| // This is a brand new job, index its details including start time |
| if (rs.one().getBool("[applied]")) { |
| BatchStatement batch = new BatchStatement(); |
| batch.add(new SimpleStatement(String.format("INSERT INTO %s.%s (source_cluster_name, job_id) VALUES (?, ?)", |
| metadataKeyspace, Schema.SOURCE_CLUSTER_INDEX), |
| sourceClusterName, params.jobId)); |
| batch.add(new SimpleStatement(String.format("INSERT INTO %s.%s (target_cluster_name, job_id) VALUES (?, ?)", |
| metadataKeyspace, Schema.TARGET_CLUSTER_INDEX), |
| targetClusterName, params.jobId)); |
| batch.add(new SimpleStatement(String.format("INSERT INTO %s.%s (keyspace_name, job_id) VALUES (?, ?)", |
| metadataKeyspace, Schema.KEYSPACE_INDEX), |
| metadataKeyspace, params.jobId)); |
| batch.add(new SimpleStatement(String.format("INSERT INTO %s.%s (job_start_date, job_start_hour, job_start_time, job_id) " + |
| "VALUES ('%s', ?, ?, ?)", |
| metadataKeyspace, Schema.JOB_START_INDEX, startDateTime.toString("yyyy-MM-dd")), |
| startDateTime.getHourOfDay(), timeUUID, params.jobId)); |
| session.execute(batch); |
| } |
| } |
| |
| public void finalizeJob(UUID jobId, Map<KeyspaceTablePair, RangeStats> results) { |
| logger.info("Finalizing job status"); |
| |
| markNotRunning(jobId); |
| |
| BatchStatement batch = new BatchStatement(); |
| for (Map.Entry<KeyspaceTablePair, RangeStats> result : results.entrySet()) { |
| KeyspaceTablePair table = result.getKey(); |
| RangeStats stats = result.getValue(); |
| session.execute(String.format("INSERT INTO %s.%s (" + |
| " job_id," + |
| " qualified_table_name," + |
| " matched_partitions," + |
| " mismatched_partitions," + |
| " partitions_only_in_source," + |
| " partitions_only_in_target," + |
| " matched_rows," + |
| " matched_values," + |
| " mismatched_values," + |
| " skipped_partitions) " + |
| "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", |
| metadataKeyspace, Schema.JOB_RESULTS), |
| jobId, |
| table.toCqlValueString(), |
| stats.getMatchedPartitions(), |
| stats.getMismatchedPartitions(), |
| stats.getOnlyInSource(), |
| stats.getOnlyInTarget(), |
| stats.getMatchedRows(), |
| stats.getMatchedValues(), |
| stats.getMismatchedValues(), |
| stats.getSkippedPartitions()); |
| } |
| session.execute(batch); |
| } |
| |
| |
| public void markNotRunning(UUID jobId) { |
| try |
| { |
| logger.info("Marking job {} as not running", jobId); |
| |
| ResultSet rs = session.execute(String.format("DELETE FROM %s.%s WHERE job_id = ? IF EXISTS", |
| metadataKeyspace, Schema.RUNNING_JOBS), |
| jobId); |
| if (!rs.one().getBool("[applied]")) |
| { |
| logger.warn("Non-fatal: Unable to mark job %s as not running, check logs for errors " + |
| "during initialization as there may be no entry for this job {} in the {} table", |
| jobId, Schema.RUNNING_JOBS); |
| } |
| } catch (Exception e) { |
| // Because this is called from another exception handler, we don't want to lose the original exception |
| // just because we may not have been able to mark the job as not running. Just log here |
| logger.error("Could not mark job {} as not running.", jobId, e); |
| } |
| } |
| } |
| |
| static class Schema { |
| |
| public static final String TASK_STATUS = "task_status"; |
| private static final String TASK_STATUS_SCHEMA = "CREATE TABLE IF NOT EXISTS %s.%s (" + |
| " job_id uuid," + |
| " bucket int," + |
| " qualified_table_name text," + |
| " start_token varchar," + |
| " end_token varchar," + |
| " matched_partitions bigint," + |
| " mismatched_partitions bigint, " + |
| " partitions_only_in_source bigint," + |
| " partitions_only_in_target bigint," + |
| " matched_rows bigint," + |
| " matched_values bigint," + |
| " mismatched_values bigint," + |
| " skipped_partitions bigint," + |
| " last_token varchar," + |
| " PRIMARY KEY((job_id, bucket), qualified_table_name, start_token, end_token))" + |
| " WITH default_time_to_live = %s"; |
| |
| public static final String JOB_SUMMARY = "job_summary"; |
| private static final String JOB_SUMMARY_SCHEMA = "CREATE TABLE IF NOT EXISTS %s.%s (" + |
| " job_id uuid," + |
| " job_start_time timeuuid," + |
| " buckets int," + |
| " qualified_table_names frozen<list<text>>," + |
| " source_cluster_name text," + |
| " source_cluster_desc text," + |
| " target_cluster_name text," + |
| " target_cluster_desc text," + |
| " total_tasks int," + |
| " PRIMARY KEY(job_id))" + |
| " WITH default_time_to_live = %s"; |
| |
| public static final String JOB_RESULTS = "job_results"; |
| private static final String JOB_RESULTS_SCHEMA = "CREATE TABLE IF NOT EXISTS %s.%s (" + |
| " job_id uuid," + |
| " qualified_table_name text," + |
| " matched_partitions bigint," + |
| " mismatched_partitions bigint," + |
| " partitions_only_in_source bigint," + |
| " partitions_only_in_target bigint," + |
| " matched_rows bigint," + |
| " matched_values bigint," + |
| " mismatched_values bigint," + |
| " skipped_partitions bigint," + |
| " PRIMARY KEY(job_id, qualified_table_name))" + |
| " WITH default_time_to_live = %s"; |
| |
| public static final String JOB_STATUS = "job_status"; |
| private static final String JOB_STATUS_SCHEMA = "CREATE TABLE IF NOT EXISTS %s.%s (" + |
| " job_id uuid," + |
| " bucket int," + |
| " qualified_table_name text," + |
| " completed counter," + |
| " PRIMARY KEY ((job_id, bucket), qualified_table_name))"; |
| |
| public static final String MISMATCHES = "mismatches"; |
| private static final String MISMATCHES_SCHEMA = "CREATE TABLE IF NOT EXISTS %s.%s (" + |
| " job_id uuid," + |
| " bucket int," + |
| " qualified_table_name text, " + |
| " mismatching_token varchar, " + |
| " mismatch_type text, " + |
| " PRIMARY KEY ((job_id, bucket), qualified_table_name, mismatching_token))" + |
| " WITH default_time_to_live = %s"; |
| |
| public static final String ERROR_SUMMARY = "task_errors"; |
| private static final String ERROR_SUMMARY_SCHEMA = "CREATE TABLE IF NOT EXISTS %s.%s (" + |
| " job_id uuid," + |
| " bucket int," + |
| " qualified_table_name text," + |
| " start_token varchar," + |
| " end_token varchar," + |
| " PRIMARY KEY ((job_id, bucket), qualified_table_name, start_token, end_token))" + |
| " WITH default_time_to_live = %s"; |
| |
| public static final String ERROR_DETAIL = "partition_errors"; |
| private static final String ERROR_DETAIL_SCHEMA = "CREATE TABLE IF NOT EXISTS %s.%s (" + |
| " job_id uuid," + |
| " bucket int," + |
| " qualified_table_name text," + |
| " start_token varchar," + |
| " end_token varchar," + |
| " error_token varchar," + |
| " error_source varchar," + |
| " PRIMARY KEY ((job_id, bucket, qualified_table_name, start_token, end_token), error_token))" + |
| " WITH default_time_to_live = %s"; |
| |
| public static final String SOURCE_CLUSTER_INDEX = "source_cluster_index"; |
| private static final String SOURCE_CLUSTER_INDEX_SCHEMA = "CREATE TABLE IF NOT EXISTS %s.%s (" + |
| " source_cluster_name text," + |
| " job_id uuid," + |
| " PRIMARY KEY (source_cluster_name, job_id))" + |
| " WITH default_time_to_live = %s"; |
| |
| public static final String TARGET_CLUSTER_INDEX = "target_cluster_index"; |
| private static final String TARGET_CLUSTER_INDEX_SCHEMA = "CREATE TABLE IF NOT EXISTS %s.%s (" + |
| " target_cluster_name text," + |
| " job_id uuid," + |
| " PRIMARY KEY (target_cluster_name, job_id))" + |
| " WITH default_time_to_live = %s"; |
| |
| public static final String KEYSPACE_INDEX = "keyspace_index"; |
| private static final String KEYSPACE_INDEX_SCHEMA = "CREATE TABLE IF NOT EXISTS %s.%s (" + |
| " keyspace_name text," + |
| " job_id uuid," + |
| " PRIMARY KEY(keyspace_name, job_id))" + |
| " WITH default_time_to_live = %s"; |
| |
| public static final String JOB_START_INDEX = "job_start_index"; |
| private static final String JOB_START_INDEX_SCHEMA = "CREATE TABLE IF NOT EXISTS %s.%s (" + |
| " job_start_date date," + |
| " job_start_hour int," + |
| " job_start_time timeuuid," + |
| " job_id uuid," + |
| " PRIMARY KEY ((job_start_date, job_start_hour), job_start_time))" + |
| " WITH default_time_to_live = %s"; |
| |
| public static final String RUNNING_JOBS = "running_jobs"; |
| private static final String RUNNING_JOBS_SCHEMA = "CREATE TABLE IF NOT EXISTS %s.%s (" + |
| " job_id uuid," + |
| " PRIMARY KEY (job_id))" + |
| " WITH default_time_to_live = %s"; |
| |
| private static final String KEYSPACE_SCHEMA = "CREATE KEYSPACE IF NOT EXISTS %s WITH REPLICATION = %s"; |
| |
| |
| public static void maybeInitialize(Session session, MetadataKeyspaceOptions options) { |
| if (!options.should_init) |
| return; |
| |
| logger.info("Initializing cassandradiff journal schema in \"{}\" keyspace", options.keyspace); |
| session.execute(String.format(KEYSPACE_SCHEMA, options.keyspace, options.replication)); |
| session.execute(String.format(JOB_SUMMARY_SCHEMA, options.keyspace, JOB_SUMMARY, options.ttl)); |
| session.execute(String.format(JOB_STATUS_SCHEMA, options.keyspace, JOB_STATUS)); |
| session.execute(String.format(JOB_RESULTS_SCHEMA, options.keyspace, JOB_RESULTS, options.ttl)); |
| session.execute(String.format(TASK_STATUS_SCHEMA, options.keyspace, TASK_STATUS, options.ttl)); |
| session.execute(String.format(MISMATCHES_SCHEMA, options.keyspace, MISMATCHES, options.ttl)); |
| session.execute(String.format(ERROR_SUMMARY_SCHEMA, options.keyspace, ERROR_SUMMARY, options.ttl)); |
| session.execute(String.format(ERROR_DETAIL_SCHEMA, options.keyspace, ERROR_DETAIL, options.ttl)); |
| session.execute(String.format(SOURCE_CLUSTER_INDEX_SCHEMA, options.keyspace, SOURCE_CLUSTER_INDEX, options.ttl)); |
| session.execute(String.format(TARGET_CLUSTER_INDEX_SCHEMA, options.keyspace, TARGET_CLUSTER_INDEX, options.ttl)); |
| session.execute(String.format(KEYSPACE_INDEX_SCHEMA, options.keyspace, KEYSPACE_INDEX, options.ttl)); |
| session.execute(String.format(JOB_START_INDEX_SCHEMA, options.keyspace, JOB_START_INDEX, options.ttl)); |
| session.execute(String.format(RUNNING_JOBS_SCHEMA, options.keyspace, RUNNING_JOBS, options.ttl)); |
| logger.info("Schema initialized"); |
| } |
| } |
| } |
| |