| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.ignite.ci.teamcity.ignited.fatbuild; |
| |
| import com.google.common.base.Strings; |
| import com.google.common.base.Throwables; |
| import java.util.stream.Stream; |
| import org.apache.ignite.ci.di.AutoProfiling; |
| import org.apache.ignite.ci.di.MonitoredTask; |
| import org.apache.ignite.ci.di.scheduler.IScheduler; |
| import org.apache.ignite.ci.tcmodel.changes.ChangesList; |
| import org.apache.ignite.ci.tcmodel.result.Build; |
| import org.apache.ignite.ci.tcmodel.result.problems.ProblemOccurrence; |
| import org.apache.ignite.ci.tcmodel.result.stat.Statistics; |
| import org.apache.ignite.ci.tcmodel.result.tests.TestOccurrencesFull; |
| import org.apache.ignite.ci.teamcity.ignited.BuildRefCompacted; |
| import org.apache.ignite.ci.teamcity.ignited.buildref.BuildRefDao; |
| import org.apache.ignite.ci.teamcity.ignited.IStringCompactor; |
| import org.apache.ignite.ci.teamcity.ignited.ITeamcityIgnited; |
| import org.apache.ignite.ci.teamcity.ignited.SyncMode; |
| import org.apache.ignite.ci.teamcity.ignited.change.ChangeSync; |
| import org.apache.ignite.ci.teamcity.ignited.runhist.RunHistSync; |
| import org.apache.ignite.ci.teamcity.pure.ITeamcityConn; |
| import org.apache.ignite.ci.util.ExceptionUtil; |
| import org.apache.ignite.internal.util.GridConcurrentHashSet; |
| import org.jetbrains.annotations.NotNull; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import javax.annotation.Nullable; |
| import javax.annotation.concurrent.GuardedBy; |
| import javax.inject.Inject; |
| import java.io.FileNotFoundException; |
| import java.util.*; |
| import java.util.concurrent.ThreadLocalRandom; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicInteger; |
| |
| public class ProactiveFatBuildSync { |
| public static final int FAT_BUILD_PROACTIVE_TASKS = 5; |
| |
| /** Logger. */ |
| private static final Logger logger = LoggerFactory.getLogger(ProactiveFatBuildSync.class); |
| |
| /** Build reference DAO. */ |
| @Inject |
| private BuildRefDao buildRefDao; |
| |
| /** Build DAO. */ |
| @Inject private FatBuildDao fatBuildDao; |
| |
| /** Scheduler. */ |
| @Inject private IScheduler scheduler; |
| |
| @Inject private IStringCompactor compactor; |
| |
| /** Change sync. */ |
| @Inject private ChangeSync changeSync; |
| |
| /** Run history sync. */ |
| @Inject private RunHistSync runHistSync; |
| |
| @GuardedBy("this") |
| private Map<String, SyncTask> buildToLoad = new HashMap<>(); |
| |
| public void doLoadBuilds(int i, String srvNme, ITeamcityConn conn, Set<Integer> paginateUntil) { |
| doLoadBuilds(i, srvNme, conn, paginateUntil, getSyncTask(conn).loadingBuilds); |
| } |
| |
| /** |
| * Scope of work: builds to be loaded from a connection. |
| */ |
| private static class SyncTask { |
| ITeamcityConn conn; |
| Set<Integer> ids = new HashSet<>(); |
| |
| GridConcurrentHashSet<Integer> loadingBuilds = new GridConcurrentHashSet<>(); |
| } |
| |
| /** |
| * Invoke load fat builds later, re-load provided builds. |
| * @param conn |
| * @param buildsToAskFromTc Builds to ask from tc. |
| */ |
| public void scheduleBuildsLoad(ITeamcityConn conn, Collection<Integer> buildsToAskFromTc) { |
| if (buildsToAskFromTc.isEmpty()) |
| return; |
| |
| synchronized (this) { |
| final SyncTask syncTask = getSyncTask(conn); |
| |
| buildsToAskFromTc.stream() |
| .filter(id -> !syncTask.loadingBuilds.contains(id)) |
| .forEach(syncTask.ids::add); |
| } |
| |
| int ldrToActivate = ThreadLocalRandom.current().nextInt(FAT_BUILD_PROACTIVE_TASKS); |
| |
| scheduler.sheduleNamed(taskName("loadFatBuilds" + ldrToActivate, conn.serverCode()), |
| () -> loadFatBuilds(ldrToActivate, conn.serverCode()), 2, TimeUnit.MINUTES); |
| |
| } |
| |
| @NotNull |
| public synchronized SyncTask getSyncTask(ITeamcityConn conn) { |
| final SyncTask syncTask = buildToLoad.computeIfAbsent(conn.serverCode(), s -> new SyncTask()); |
| |
| syncTask.conn = conn; |
| |
| return syncTask; |
| } |
| |
| @SuppressWarnings({"WeakerAccess", "UnusedReturnValue"}) |
| @MonitoredTask(name = "Find missing builds", nameExtArgsIndexes = {0}) |
| @AutoProfiling |
| protected String findMissingBuildsFromBuildRef(String srvId, ITeamcityConn conn) { |
| int srvIdMaskHigh = ITeamcityIgnited.serverIdToInt(srvId); |
| |
| Stream<BuildRefCompacted> buildRefs = buildRefDao.compactedBuildsForServer(srvIdMaskHigh); |
| |
| List<Integer> buildsIdsToLoad = new ArrayList<>(); |
| AtomicInteger totalAskedToLoad = new AtomicInteger(); |
| |
| buildRefs.forEach(buildRef -> { |
| Integer buildId = buildRef.getId(); |
| if (buildId == null) |
| return; |
| |
| if (buildRef.isRunning(compactor) |
| || buildRef.isQueued(compactor) |
| || !fatBuildDao.containsKey(srvIdMaskHigh, buildId)) |
| buildsIdsToLoad.add(buildId); |
| |
| if (buildsIdsToLoad.size() >= 100) { |
| totalAskedToLoad.addAndGet(buildsIdsToLoad.size()); |
| scheduleBuildsLoad(conn, buildsIdsToLoad); |
| buildsIdsToLoad.clear(); |
| } |
| }); |
| |
| if (!buildsIdsToLoad.isEmpty()) { |
| totalAskedToLoad.addAndGet(buildsIdsToLoad.size()); |
| scheduleBuildsLoad(conn, buildsIdsToLoad); |
| } |
| |
| return "Invoked later load for " + totalAskedToLoad.get() + " builds from " + srvId; |
| } |
| |
| /** */ |
| private void loadFatBuilds(int ldrNo, String srvId) { |
| Set<Integer> load; |
| ITeamcityConn conn; |
| final GridConcurrentHashSet<Integer> loadingBuilds; |
| |
| synchronized (this) { |
| final SyncTask syncTask = buildToLoad.get(srvId); |
| if (syncTask == null) |
| return; |
| |
| if (syncTask.ids.isEmpty()) { |
| syncTask.conn = null; |
| return; |
| } |
| |
| if (syncTask.conn == null) |
| return; |
| |
| load = syncTask.ids; |
| //marking that builds are in progress |
| |
| loadingBuilds = syncTask.loadingBuilds; |
| loadingBuilds.addAll(load); |
| |
| syncTask.ids = new HashSet<>(); |
| |
| conn = syncTask.conn; |
| syncTask.conn = null; |
| } |
| |
| doLoadBuilds(ldrNo, srvId, conn, load, loadingBuilds); |
| } |
| |
| @SuppressWarnings({"WeakerAccess", "UnusedReturnValue"}) |
| @MonitoredTask(name = "Proactive Builds Loading (srv,agent)", nameExtArgsIndexes = {1, 0}) |
| @AutoProfiling |
| public String doLoadBuilds(int ldrNo, String srvId, ITeamcityConn conn, Set<Integer> load, |
| GridConcurrentHashSet<Integer> loadingBuilds) { |
| if(load.isEmpty()) |
| return "Nothing to load"; |
| |
| final int srvIdMaskHigh = ITeamcityIgnited.serverIdToInt(srvId); |
| |
| AtomicInteger err = new AtomicInteger(); |
| AtomicInteger ld = new AtomicInteger(); |
| |
| Map<Long, FatBuildCompacted> builds = fatBuildDao.getAllFatBuilds(srvIdMaskHigh, load); |
| |
| load.forEach( |
| buildId -> { |
| try { |
| FatBuildCompacted existingBuild = builds.get(FatBuildDao.buildIdToCacheKey(srvIdMaskHigh, buildId)); |
| |
| FatBuildCompacted savedVer = loadBuild(conn, buildId, existingBuild, SyncMode.RELOAD_QUEUED); |
| |
| if (savedVer != null) |
| ld.incrementAndGet(); |
| |
| loadingBuilds.remove(buildId); |
| } |
| catch (Exception e) { |
| logger.error("", e); |
| err.incrementAndGet(); |
| } |
| } |
| ); |
| |
| return "Builds updated " + ld.get() + " from " + load.size() + " requested, errors: " + err; |
| } |
| |
| @NotNull |
| private String taskName(String taskName, String srvName) { |
| return ProactiveFatBuildSync.class.getSimpleName() +"." + taskName + "." + srvName; |
| } |
| |
| /** |
| * Schedule missing builds into Fat builds cache. Sync is based by BuildRefs cache. |
| * @param srvName Server name. |
| * @param conn Connection. |
| */ |
| public void ensureActualizationRequested(String srvName, ITeamcityConn conn) { |
| scheduler.sheduleNamed(taskName("findMissingBuildsFromBuildRef", srvName), |
| () -> findMissingBuildsFromBuildRef(srvName, conn), 360, TimeUnit.MINUTES); |
| |
| /* |
| scheduler.sheduleNamed(taskName("migrateBuildsToV6", srvName), |
| () -> migrateBuildsToV6(srvName, conn), 8, TimeUnit.HOURS); |
| */ |
| } |
| |
| /** |
| * @param srvName Server name. |
| * @param conn Connection. |
| */ |
| @MonitoredTask(name = "Migrate Builds to V6", nameExtArgsIndexes = {0}) |
| public String migrateBuildsToV6(String srvName, ITeamcityConn conn) { |
| int srvId = ITeamcityIgnited.serverIdToInt(srvName); |
| |
| AtomicInteger cnt = new AtomicInteger(); |
| AtomicInteger divergedIds = new AtomicInteger(); |
| fatBuildDao.outdatedVersionEntries(srvId).forEach(entry -> { |
| cnt.incrementAndGet(); |
| int buildId = BuildRefDao.cacheKeyToBuildId(entry.getKey()); |
| FatBuildCompacted transformed = transformV5Build( |
| srvId, |
| buildId, |
| entry.getValue()); |
| |
| if (transformed != null) |
| divergedIds.incrementAndGet(); |
| }); |
| |
| return "Found: " + cnt.get() + " builds found having version < 6 and " |
| + divergedIds.get() + " with ID divergence."; |
| } |
| |
| /** |
| * |
| * @param conn TC connection to load data |
| * @param buildId build ID (TC identification). |
| * @param existingBuild build from DB. |
| * @return null if nothing was saved, use existing build. Non null value indicates that |
| * new build if it was updated. |
| */ |
| @Nullable |
| public FatBuildCompacted loadBuild(ITeamcityConn conn, int buildId, |
| @Nullable FatBuildCompacted existingBuild, |
| SyncMode mode) { |
| if (existingBuild != null && !existingBuild.isOutdatedEntityVersion()) { |
| boolean finished = |
| existingBuild.state(compactor) != null // don't count old fake builds as finished |
| && !existingBuild.isRunning(compactor) |
| && !existingBuild.isQueued(compactor); |
| |
| if (finished || mode != SyncMode.RELOAD_QUEUED) |
| return null; |
| } |
| |
| FatBuildCompacted savedVer = reloadBuild(conn, buildId, existingBuild); |
| |
| if (savedVer == null) |
| return null; |
| |
| BuildRefCompacted refCompacted = new BuildRefCompacted(savedVer); |
| if (savedVer.isFakeStub()) |
| refCompacted.setId(buildId); //to provide possiblity to save the build |
| |
| final String srvCode = conn.serverCode(); |
| final int srvIdMask = ITeamcityIgnited.serverIdToInt(srvCode); |
| |
| buildRefDao.save(srvIdMask, refCompacted); |
| |
| runHistSync.saveToHistoryLater(srvCode, savedVer); |
| |
| return savedVer; |
| } |
| |
| /** |
| * |
| * @param conn |
| * @param buildId |
| * @param existingBuild |
| * @return new build if it was updated or null if no updates detected |
| */ |
| @SuppressWarnings({"WeakerAccess"}) |
| @AutoProfiling |
| @Nullable public FatBuildCompacted reloadBuild(ITeamcityConn conn, int buildId, @Nullable FatBuildCompacted existingBuild) { |
| //todo some sort of locking to avoid double requests |
| |
| final String srvName = conn.serverCode(); |
| final int srvIdMask = ITeamcityIgnited.serverIdToInt(srvName); |
| |
| if (existingBuild != null && existingBuild.isOutdatedEntityVersion()) { |
| if (existingBuild.version() == FatBuildCompacted.VER_FULL_DATA_BUT_ID_CONFLICTS_POSSIBLE) |
| return transformV5Build(srvIdMask, buildId, existingBuild); |
| } |
| |
| Build build; |
| List<TestOccurrencesFull> tests = new ArrayList<>(); |
| List<ProblemOccurrence> problems = null; |
| Statistics statistics = null; |
| ChangesList changesList = null; |
| try { |
| build = conn.getBuild(buildId); |
| |
| if (build.isFakeStub()) |
| build.setCancelled(); // probably now it will not happen because of direct connection to TC. |
| else { |
| if(!Objects.equals(build.getId(), buildId)) |
| throw new FileNotFoundException( |
| "Build IDs are not consistent: returned " + build.getId() + " queued is " + buildId); |
| } |
| |
| if (build.testOccurrences != null && !build.isComposite()) { // don't query tests for compoite |
| String nextHref = null; |
| do { |
| TestOccurrencesFull page = conn.getTestsPage(buildId, nextHref, true); |
| nextHref = page.nextHref(); |
| |
| tests.add(page); |
| } |
| while (!Strings.isNullOrEmpty(nextHref)); |
| } |
| |
| if (build.problemOccurrences != null) |
| problems = conn.getProblems(buildId).getProblemsNonNull(); |
| |
| if (build.statisticsRef != null) |
| statistics = conn.getStatistics(buildId); |
| |
| if (build.changesRef != null) { |
| changesList = conn.getChangesList(buildId); |
| |
| for (int changeId : FatBuildDao.extractChangeIds(changesList)) { |
| // consult change sync for provided changes data |
| |
| changeSync.change(srvIdMask, changeId, conn); |
| } |
| } |
| } |
| catch (Exception e) { |
| if (Throwables.getRootCause(e) instanceof FileNotFoundException) { |
| logger.info("Loading build [" + buildId + "] for server [" + srvName + "] failed:" + e.getMessage(), e); |
| |
| if (existingBuild != null) { |
| build = existingBuild.toBuild(compactor); |
| |
| if(build.isRunning() || build.isQueued()) |
| build.setCancelled(); |
| |
| if (build.isFakeStub()) |
| build.setCancelled(); |
| |
| tests = Collections.singletonList(existingBuild.getTestOcurrences(compactor)); |
| |
| problems = existingBuild.problems(compactor); |
| |
| //todo extract new parameters or save fat build without XML convertions |
| // - existingBuild.statistics(); |
| // - int[] changes = existingBuild.changes(); |
| } |
| else { |
| build = Build.createFakeStub(); |
| |
| build.setCancelled(); |
| } |
| } else { |
| logger.error("Loading build [" + buildId + "] for server [" + srvName + "] failed:" + e.getMessage(), e); |
| |
| e.printStackTrace(); |
| |
| throw ExceptionUtil.propagateException(e); |
| } |
| } |
| |
| //if we are here because of some sort of outdated version of build, |
| // new save will be performed with new entity version for compacted build |
| return fatBuildDao.saveBuild(srvIdMask, buildId, build, tests, problems, statistics, changesList, existingBuild); |
| } |
| |
| @Nullable |
| public FatBuildCompacted transformV5Build(int srvIdMask, int buildId, @NotNull FatBuildCompacted existingBuild) { |
| if (Objects.equals(buildId, existingBuild.id())) { |
| existingBuild.setVersion(FatBuildCompacted.LATEST_VERSION); |
| |
| fatBuildDao.putFatBuild(srvIdMask, buildId, existingBuild); |
| |
| return null; |
| } |
| else { |
| logger.warn("Build inconsistency found in the DB, removing build " + existingBuild.getId()); |
| |
| FatBuildCompacted buildCompacted = new FatBuildCompacted() |
| .setFakeStub(true) |
| .setCancelled(compactor); |
| |
| fatBuildDao.putFatBuild(srvIdMask, buildId, buildCompacted); |
| |
| return buildCompacted; |
| } |
| } |
| } |