blob: f7a154b85c70b03e4d98e47cd18ab76d7cf9ae14 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.ci.db;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import javax.cache.Cache;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.ci.IgnitePersistentTeamcity;
import org.apache.ignite.ci.issue.Issue;
import org.apache.ignite.ci.issue.IssueKey;
import org.apache.ignite.ci.issue.IssuesStorage;
import org.apache.ignite.ci.observer.CompactBuildsInfo;
import org.apache.ignite.ci.tcmodel.result.Build;
import org.apache.ignite.ci.web.model.CompactContributionKey;
import org.apache.ignite.ci.web.model.CompactVisa;
import org.apache.ignite.ci.web.model.CompactVisaRequest;
import org.apache.ignite.ci.web.model.hist.VisasHistoryStorage;
import org.apache.ignite.configuration.CacheConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Migrations to be applied to each TC related caches.
*/
public class DbMigrations {
public static final String BUILD_QUEUE = "buildQueue";
public static final String RUNNING_BUILDS = "runningBuilds";
/** Logger. */
private static final Logger logger = LoggerFactory.getLogger(DbMigrations.class);
public static final String DONE_MIGRATIONS = "doneMigrations";
private static final String BUILD_STATISTICS = "buildStatistics";
private static final String BUILD_CONDITIONS_CACHE_NAME = "buildConditions";
//V1 caches, 1024 parts
@Deprecated
public static final String RUN_STAT_CACHE = "runStat";
@Deprecated
public static final String TEST_OCCURRENCE_FULL = "testOccurrenceFull";
@Deprecated
private static final String PROBLEMS = "problems";
@Deprecated
private static final String STAT = "stat";
@Deprecated
private static final String FINISHED_BUILDS = "finishedBuilds";
@Deprecated
public static final String FINISHED_BUILDS_INCLUDE_FAILED = "finishedBuildsIncludeFailed";
@Deprecated
public static final String ISSUES = "issues";
@Deprecated
private static final String BUILD_HIST_FINISHED = "buildHistFinished";
@Deprecated
private static final String BUILD_HIST_FINISHED_OR_FAILED = "buildHistFinishedOrFailed";
@Deprecated
public static final String TEAMCITY_BUILD_CACHE_NAME_OLD = "teamcityBuild";
/** */
@Deprecated
public static final String COMPACT_VISAS_HISTORY_CACHE_NAME = "compactVisasHistoryCache";
interface Old {
String TEST_FULL = "testFull";
String BUILD_PROBLEMS = "buildProblems";
String CHANGES_LIST = "changesList";
String CHANGE_INFO_FULL = "changeInfoFull";
String CURRENT_PR_FAILURES = "currentPrFailures";
String CONFIGURATIONS = "configurations";
String TESTS_OCCURRENCES = "testOccurrences";
String TESTS = "tests";
String TEST_REFS = "testRefs";
String ISSUES_USAGES_LIST = "issuesUsagesList";
/** Cache name. */
String TEST_HIST_CACHE_NAME = "testRunHistV0";
String TEST_HIST_CACHE_NAME2 = "teamcityTestRunHistV0";
/** Build Start time Cache name. */
String BUILD_START_TIME_CACHE_NAME = "buildStartTimeV0";
String BUILD_START_TIME_CACHE_NAME2 = "teamcityBuildStartTimeV0";
/** Cache name. */
String SUITE_HIST_CACHE_NAME = "teamcitySuiteRunHistV0";
String CALCULATED_STATISTIC = "calculatedStatistic";
String BUILDS = "builds";
String BUILD_RESULTS = "buildResults";
String BUILDS_FAILURE_RUN_STAT = "buildsFailureRunStat";
String TESTS_RUN_STAT = "testsRunStat";
}
private final Ignite ignite;
private final String serverId;
private IgniteCache<String, Object> doneMigrations;
public DbMigrations(Ignite ignite, String srvId) {
this.ignite = ignite;
this.serverId = srvId;
}
public void dataMigration(Cache<CompactContributionKey, List<CompactVisaRequest>> visasCache) {
doneMigrations = doneMigrationsCache();
applyMigration(COMPACT_VISAS_HISTORY_CACHE_NAME + "-to-" + VisasHistoryStorage.VISAS_CACHE_NAME, () -> {
IgniteCache<Object, Object> cache = ignite.cache(COMPACT_VISAS_HISTORY_CACHE_NAME);
if (cache == null) {
System.err.println("Cache not found " + COMPACT_VISAS_HISTORY_CACHE_NAME);
return;
}
IgniteCache<Object, Object> oldVisasCache = cache.withKeepBinary();
if (Objects.isNull(oldVisasCache)) {
System.out.println("Old cache [" + COMPACT_VISAS_HISTORY_CACHE_NAME + "] not found");
return;
}
int size = oldVisasCache.size();
int i = 0;
for (IgniteCache.Entry<Object, Object> entry : oldVisasCache) {
System.out.println("Migrating entry " + i++ + " from " + size);
Collection<BinaryObject> binVisaReqs = null;
Object val = entry.getValue();
if (val instanceof List)
binVisaReqs = (Collection<BinaryObject>)val;
else {
if (val instanceof Map)
binVisaReqs = ((Map<?, BinaryObject>)val).values();
}
if (binVisaReqs == null)
continue;
List<CompactVisaRequest> compactVisaReqs = new ArrayList<>();
CompactContributionKey compactKey = ((BinaryObject)entry.getKey()).deserialize();
for (BinaryObject binVisaReq : binVisaReqs) {
CompactBuildsInfo compactInfo = ((BinaryObject)binVisaReq.field("compactInfo")).deserialize();
CompactVisa compactVisa = ((BinaryObject)binVisaReq.field("compactVisa")).deserialize();
compactVisaReqs.add(new CompactVisaRequest(compactVisa, compactInfo, false));
}
visasCache.put(compactKey, compactVisaReqs);
}
});
applyMigration("InitialFillLatestRunsV3", () -> {
});
applyMigration("Remove-" + RUN_STAT_CACHE, () -> {
IgniteCache<String, Build> oldBuilds = ignite.getOrCreateCache(ignCacheNme(RUN_STAT_CACHE));
oldBuilds.clear();
oldBuilds.destroy();
});
applyRemoveCache(Old.CURRENT_PR_FAILURES);
applyDestroyIgnCacheMigration(TEST_OCCURRENCE_FULL);
applyDestroyIgnCacheMigration(PROBLEMS);
applyDestroyIgnCacheMigration(FINISHED_BUILDS_INCLUDE_FAILED);
Cache<IssueKey, Issue> issuesCache = IssuesStorage.botDetectedIssuesCache(ignite);
applyMigration(ISSUES + "-to-" + issuesCache.getName() + "V2", () -> {
String cacheName = ISSUES;
IgniteCache<IssueKey, Issue> issuesOldCache = ignite.getOrCreateCache(cacheName);
int size = issuesOldCache.size();
if (size > 0) {
int i = 0;
Map<IssueKey, Issue> batch = new HashMap<>();
IgniteDataStreamer<IssueKey, Issue> streamer = ignite.dataStreamer(issuesCache.getName());
for (Cache.Entry<IssueKey, Issue> entry : issuesOldCache) {
batch.put(entry.getKey(), entry.getValue());
i++;
if (batch.size() >= 300)
saveOneBatch(cacheName, size, i, batch, streamer);
}
if (!batch.isEmpty())
saveOneBatch(cacheName, size, i, batch, streamer);
System.err.println("Removing data from old cache " + issuesOldCache.getName());
issuesOldCache.destroy();
}
});
applyDestroyIgnCacheMigration(RUNNING_BUILDS);
applyDestroyIgnCacheMigration(BUILD_QUEUE);
applyDestroyIgnCacheMigration(FINISHED_BUILDS_INCLUDE_FAILED);
applyDestroyIgnCacheMigration(TEST_OCCURRENCE_FULL);
applyDestroyIgnCacheMigration(Old.TESTS);
applyDestroyIgnCacheMigration(STAT);
applyDestroyIgnCacheMigration(BUILD_STATISTICS);
applyDestroyCacheMigration(BUILD_CONDITIONS_CACHE_NAME, BUILD_CONDITIONS_CACHE_NAME);
applyDestroyCacheMigration(TEAMCITY_BUILD_CACHE_NAME_OLD, TEAMCITY_BUILD_CACHE_NAME_OLD);
applyDestroyIgnCacheMigration(Old.CHANGE_INFO_FULL);
applyDestroyIgnCacheMigration(Old.CHANGES_LIST);
applyDestroyIgnCacheMigration(FINISHED_BUILDS);
applyDestroyIgnCacheMigration(BUILD_HIST_FINISHED);
applyDestroyIgnCacheMigration(BUILD_HIST_FINISHED_OR_FAILED);
applyDestroyCacheMigration(COMPACT_VISAS_HISTORY_CACHE_NAME, COMPACT_VISAS_HISTORY_CACHE_NAME);
applyDestroyIgnCacheMigration(Old.BUILD_PROBLEMS);
applyDestroyIgnCacheMigration(Old.TEST_FULL);
applyDestroyIgnCacheMigration(Old.CONFIGURATIONS);
applyDestroyIgnCacheMigration(Old.TESTS_OCCURRENCES);
applyDestroyIgnCacheMigration(Old.TEST_REFS);
applyDestroyIgnCacheMigration(Old.ISSUES_USAGES_LIST);
applyDestroyCacheMigration(Old.SUITE_HIST_CACHE_NAME);
applyDestroyCacheMigration(Old.BUILD_START_TIME_CACHE_NAME);
applyDestroyCacheMigration(Old.TEST_HIST_CACHE_NAME);
applyDestroyCacheMigration(Old.TEST_HIST_CACHE_NAME2);
applyDestroyCacheMigration(Old.BUILD_START_TIME_CACHE_NAME);
applyDestroyCacheMigration(Old.BUILD_START_TIME_CACHE_NAME2);
applyDestroyIgnCacheMigration(Old.CALCULATED_STATISTIC);
applyDestroyIgnCacheMigration(Old.BUILDS);
applyDestroyIgnCacheMigration(Old.BUILD_RESULTS);
applyDestroyIgnCacheMigration(Old.BUILDS_FAILURE_RUN_STAT);
applyDestroyIgnCacheMigration(Old.TESTS_RUN_STAT);
}
private void applyDestroyIgnCacheMigration(String cacheName) {
String ignCacheNme = ignCacheNme(cacheName);
applyDestroyCacheMigration(cacheName, ignCacheNme);
}
private void applyDestroyCacheMigration(String cacheNme) {
applyDestroyCacheMigration(cacheNme, cacheNme);
}
private void applyDestroyCacheMigration(String dispCacheName, String cacheNme) {
applyMigration("destroy-" + dispCacheName, () -> {
IgniteCache<Object, Object> cache = ignite.cache(cacheNme);
if (cache == null) {
System.err.println("cache [" + cacheNme + "] not found");
return;
}
cache.destroy();
});
}
private <K, V> void applyV1toV2Migration(String full, Cache<K, V> cache) {
applyMigration(full + "-to-" + cache.getName() + "V2", () -> {
v1tov2cacheMigrate(full, cache);
});
}
private <K, V> IgniteCache<K, V> getOrCreateIgnCacheV1(String name) {
return ignite.getOrCreateCache(ignCacheNme(name));
}
private <K, V> void v1tov2cacheMigrate(String deprecatedCache, Cache<K, V> newCache) {
String cacheNme = ignCacheNme(deprecatedCache);
IgniteCache<K, V> tests = ignite.cache(cacheNme);
if (tests == null) {
System.err.println("Cache not found: " + cacheNme);
return;
}
int size = tests.size();
if (size > 0) {
ignite.cluster().disableWal(newCache.getName());
try {
int i = 0;
Map<K, V> batch = new HashMap<>();
IgniteDataStreamer<K, V> streamer = ignite.dataStreamer(newCache.getName());
for (Cache.Entry<K, V> entry : tests) {
batch.put(entry.getKey(), entry.getValue());
i++;
if (batch.size() >= 300)
saveOneBatch(cacheNme, size, i, batch, streamer);
}
if (!batch.isEmpty())
saveOneBatch(cacheNme, size, i, batch, streamer);
streamer.flush();
System.err.println("Removing data from old cache " + tests.getName());
tests.destroy();
}
finally {
ignite.cluster().enableWal(newCache.getName());
}
}
}
/**
* @param cacheNme Cache name.
* @param size overall size of cache.
* @param i Processed count.
* @param batch Batch.
* @param streamer
*/
private <K, V> void saveOneBatch(String cacheNme, int size,
int i,
Map<K, V> batch, IgniteDataStreamer<K, V> streamer) {
K key = batch.keySet().iterator().next();
String msg = "Migrating " + cacheNme + " " + batch.size() + " entries." +
" Processed " + i + " from " + size + ": One entry key " + key;
System.out.println(msg);
logger.info(msg);
streamer.addData(batch);
batch.clear();
}
/**
* @param cacheNme Cache nme.
*/
private void applyRemoveCache(String cacheNme) {
applyMigration("remove" + cacheNme, () -> {
if (ignite.cacheNames().contains(cacheNme)) {
IgniteCache<Object, Object> oldBuilds = ignite.cache(cacheNme);
if (oldBuilds == null)
return;
oldBuilds.clear();
oldBuilds.destroy();
}
});
}
private IgniteCache<String, Object> doneMigrationsCache() {
String migrations = ignCacheNme(DONE_MIGRATIONS);
CacheConfiguration<String, Object> ccfg = new CacheConfiguration<>(migrations);
ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
ccfg.setCacheMode(CacheMode.REPLICATED);
return ignite.getOrCreateCache(ccfg);
}
private void applyMigration(String code, Runnable runnable) {
if (doneMigrations.containsKey(code))
return;
synchronized (DbMigrations.class) {
if (doneMigrations.containsKey(code))
return;
String msg = "Running migration procedure [" + code + "]";
System.err.println(msg);
logger.warn(msg);
runnable.run();
doneMigrations.put(code, true);
String msgComp = "Completed migration procedure [" + code + "]";
System.err.println(msgComp);
logger.warn(msgComp);
}
}
private String ignCacheNme(String tests) {
return IgnitePersistentTeamcity.ignCacheNme(tests, serverId);
}
}