blob: 5b0a640ba1e2758814515c1ec932a75235b7611e [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import time
from typing import Set, List
from src.eva_engine import coordinator
from src.eva_engine.phase1.run_phase1 import RunPhase1, p1_evaluate_query
from torch.utils.data import DataLoader
from src.eva_engine.phase2.run_sh import BudgetAwareControllerSH
from src.eva_engine.phase2.run_sr import BudgetAwareControllerSR
from src.eva_engine.phase2.run_uniform import UniformAllocation
from src.logger import logger
from src.search_space.init_search_space import init_search_space
from src.query_api.interface import profile_NK_trade_off
from src.common.constant import Config
class RunModelSelection:
def __init__(self, search_space_name: str, args, is_simulate: bool = False):
self.args = args
self.eta = 3
self.is_simulate = is_simulate
# basic
self.search_space_name = search_space_name
self.dataset = self.args.dataset
# p2 evaluator
self.sh = None
# instance of the search space.
self.search_space_ins = init_search_space(self.args)
def select_model_simulate(self, budget: float, run_id: int = 0, only_phase1: bool = False, run_workers: int = 1):
"""
This is for image data only
"""
# 0. profiling dataset and search space, get t1 and t2
score_time_per_model, train_time_per_epoch, N_K_ratio = self.search_space_ins.profiling(self.dataset)
self.sh = BudgetAwareControllerSH(
search_space_ins=self.search_space_ins,
dataset_name=self.dataset,
eta=self.eta,
time_per_epoch=train_time_per_epoch,
args=self.args,
is_simulate=self.is_simulate)
# 1. run coordinator to schedule
K, U, N, B1_planed_time, B2_planed_time, B2_all_epoch = coordinator.schedule(self.dataset, self.sh, budget,
score_time_per_model,
train_time_per_epoch,
run_workers,
self.search_space_ins,
N_K_ratio,
only_phase1)
print(f"Budget = {budget}, N={N}, K={K}")
# 2. run phase 1 to score N models
k_models, B1_actual_time_use = p1_evaluate_query(self.search_space_name, self.dataset, run_id, N, K)
# 3. run phase-2 to determine the final model
best_arch, best_arch_performance, B2_actual_epoch_use, _ = self.sh.run_phase2(U, k_models)
# print("best model returned from Phase2 = ", k_models)
return best_arch, B1_actual_time_use + B2_actual_epoch_use * train_time_per_epoch, \
B1_planed_time + B2_planed_time, B2_all_epoch
def select_model_online_clean(self, budget: float, data_loader: List[DataLoader],
only_phase1: bool = False, run_workers: int = 1):
"""
Select model online for structured data.
:param budget: time budget
:param data_loader: time budget
:param only_phase1:
:param run_workers:
:return:
"""
begin_time = time.time()
logger.info("1. profiling....")
score_time_per_model = self.profile_filtering(data_loader)
train_time_per_epoch = self.profile_refinement(data_loader)
logger.info("2. coordination....")
K, U, N = self.coordination(budget, score_time_per_model, train_time_per_epoch, only_phase1)
logger.info("3. filtering phase....")
k_models, all_models, p1_trace_highest_score, p1_trace_highest_scored_models_id = self.filtering_phase(
N, K, train_loader=data_loader[0])
logger.info("4. refinement phase....")
best_arch, best_arch_performance, _, _ = self.refinement_phase(
U, k_models, train_loader=data_loader[0], valid_loader=data_loader[1])
end_time = time.time()
real_time_usage = end_time - begin_time
return best_arch, best_arch_performance, real_time_usage, all_models, \
p1_trace_highest_score, p1_trace_highest_scored_models_id
def select_model_online(self, budget: float, data_loader: List[DataLoader],
only_phase1: bool = False, run_workers: int = 1):
"""
Select model online for structured data.
:param budget: time budget
:param data_loader: time budget
:param only_phase1:
:param run_workers:
:return:
"""
train_loader, valid_loader, test_loader = data_loader
logger.info(f"0. [trails] Begin model selection, is_simulate={self.is_simulate} ... ")
begin_time = time.time()
logger.info("1. [trails] Begin profiling.")
# 0. profiling dataset and search space, get t1 and t2
score_time_per_model, train_time_per_epoch, N_K_ratio = self.search_space_ins.profiling(
self.dataset,
train_loader,
valid_loader,
self.args,
is_simulate=self.is_simulate)
self.sh = BudgetAwareControllerSH(
search_space_ins=self.search_space_ins,
dataset_name=self.dataset,
eta=self.eta,
time_per_epoch=train_time_per_epoch,
is_simulate=self.is_simulate,
train_loader=train_loader,
val_loader=valid_loader,
args=self.args)
# 1. run coordinator to schedule
logger.info("2. [trails] Begin scheduling...")
K, U, N, B1_planed_time, B2_planed_time, B2_all_epoch = coordinator.schedule(self.dataset, self.sh, budget,
score_time_per_model,
train_time_per_epoch,
run_workers,
self.search_space_ins,
N_K_ratio,
only_phase1)
print(f"Budget = {budget}, N={N}, K={K}")
# 2. run phase 1 to score N models
logger.info("3. [trails] Begin to run phase1: filter phase")
# lazy loading the search space if needed.
# run phase-1 to get the K models.
p1_runner = RunPhase1(
args=self.args,
K=K, N=N,
search_space_ins=self.search_space_ins,
train_loader=train_loader,
is_simulate=self.is_simulate)
k_models, all_models, p1_trace_highest_score, p1_trace_highest_scored_models_id \
= p1_runner.run_phase1()
logger.info("4. [trails] Begin to run phase2: refinement phase")
# 3. run phase-2 to determine the final model
best_arch, best_arch_performance, B2_actual_epoch_use, _ = self.sh.run_phase2(U, k_models)
# print("best model returned from Phase2 = ", k_models)
end_time = time.time()
real_time_usage = end_time - begin_time
planned_time_usage = B1_planed_time + B2_planed_time
logger.info("5. [trails] Real time Usage = " + str(real_time_usage)
+ ", Final selected model = " + str(best_arch)
+ ", planned time usage = " + str(planned_time_usage)
)
# best arch returned,
# time usage, epoch trained,
# p1 ea trace
return best_arch, best_arch_performance, \
real_time_usage, planned_time_usage, B2_all_epoch, \
all_models, p1_trace_highest_score, p1_trace_highest_scored_models_id
def schedule_only(self, budget: float, data_loader: List[DataLoader],
only_phase1: bool = False, run_workers: int = 1):
"""
Select model online
:param budget: time budget
:param data_loader: time budget
:param only_phase1:
:param run_workers:
:return:
"""
train_loader, valid_loader, test_loader = data_loader
logger.info("0. [trails] Begin model selection ... ")
logger.info("1. [trails] Begin profiling.")
# 0. profiling dataset and search space, get t1 and t2
score_time_per_model, train_time_per_epoch, N_K_ratio = self.search_space_ins.profiling(
self.dataset,
train_loader,
valid_loader,
self.args,
is_simulate=self.is_simulate)
self.sh = BudgetAwareControllerSH(
search_space_ins=self.search_space_ins,
dataset_name=self.dataset,
eta=self.eta,
time_per_epoch=train_time_per_epoch,
is_simulate=self.is_simulate,
train_loader=train_loader,
val_loader=valid_loader,
args=self.args)
# 1. run coordinator to schedule
logger.info("2. [trails] Begin scheduling...")
K, U, N, B1_planed_time, B2_planed_time, B2_all_epoch = coordinator.schedule(self.dataset, self.sh, budget,
score_time_per_model,
train_time_per_epoch,
run_workers,
self.search_space_ins,
N_K_ratio,
only_phase1)
return K, U, N, B1_planed_time, B2_planed_time, B2_all_epoch
#############################################
# to support in-database model selection
#############################################
def profile_filtering(self, data_loader: List[DataLoader] = [None, None, None]):
logger.info("0. [trails] Begin profile_filtering...")
begin_time = time.time()
train_loader, valid_loader, test_loader = data_loader
score_time_per_model = self.search_space_ins.profiling_score_time(
self.dataset,
train_loader,
valid_loader,
self.args,
is_simulate=self.is_simulate)
logger.info(f"0. [trails] profile_filtering Done, time_usage = {time.time() - begin_time}")
return score_time_per_model
def profile_refinement(self, data_loader: List[DataLoader] = [None, None, None]):
logger.info("0. [trails] Begin profile_refinement...")
begin_time = time.time()
train_loader, valid_loader, test_loader = data_loader
train_time_per_epoch = self.search_space_ins.profiling_train_time(
self.dataset,
train_loader,
valid_loader,
self.args,
is_simulate=self.is_simulate)
logger.info(f"0. [trails] profile_refinement Done, time_usage = {time.time() - begin_time}")
return train_time_per_epoch
def coordination(self, budget: float, score_time_per_model: float, train_time_per_epoch: float, only_phase1: bool):
logger.info("1. [trails] Begin coordination...")
begin_time = time.time()
sh = BudgetAwareControllerSH(
search_space_ins=self.search_space_ins,
dataset_name=self.dataset,
eta=self.eta,
time_per_epoch=train_time_per_epoch,
is_simulate=self.is_simulate,
train_loader=None,
val_loader=None,
args=self.args)
n_k_ratio = profile_NK_trade_off(self.dataset)
K, U, N, B1_planed_time, B2_planed_time, B2_all_epoch = coordinator.schedule(
self.dataset, sh, budget,
score_time_per_model,
train_time_per_epoch,
1,
self.search_space_ins,
n_k_ratio,
only_phase1)
logger.info(f"1. [trails] Coordination Done, time_usage = {time.time() - begin_time}")
return K, U, N
def filtering_phase(self, N, K, train_loader=None):
logger.info("2. [trails] Begin filtering_phase...")
begin_time = time.time()
p1_runner = RunPhase1(
args=self.args,
K=K, N=N,
search_space_ins=self.search_space_ins,
train_loader=train_loader,
is_simulate=self.is_simulate)
k_models, all_models, p1_trace_highest_score, p1_trace_highest_scored_models_id \
= p1_runner.run_phase1()
logger.info(f"2. [trails] filtering_phase Done, time_usage = {time.time() - begin_time}")
print(f"2. [trails] filtering_phase Done, time_usage = {time.time() - begin_time}")
return k_models, all_models, p1_trace_highest_score, p1_trace_highest_scored_models_id
def refinement_phase(self, U, k_models, alg_name: str = Config.SUCCHALF, train_loader=None, valid_loader=None,
train_time_per_epoch=None):
logger.info("3. [trails] Begin refinement...")
begin_time = time.time()
if alg_name == Config.SUCCHALF:
self.sh = BudgetAwareControllerSH(
search_space_ins=self.search_space_ins,
dataset_name=self.dataset,
eta=self.eta,
time_per_epoch=train_time_per_epoch,
is_simulate=self.is_simulate,
train_loader=train_loader,
val_loader=valid_loader,
args=self.args)
elif alg_name == Config.SUCCREJCT:
self.sh = BudgetAwareControllerSR(
search_space_ins=self.search_space_ins,
dataset_name=self.dataset,
eta=self.eta,
time_per_epoch=train_time_per_epoch,
args=self.args)
elif alg_name == Config.UNIFORM:
self.sh = UniformAllocation(
search_space_ins=self.search_space_ins,
dataset_name=self.dataset,
eta=self.eta,
time_per_epoch=train_time_per_epoch,
args=self.args)
else:
raise NotImplementedError
best_arch, best_arch_performance, B2_actual_epoch_use, total_time_usage = self.sh.run_phase2(U, k_models)
logger.info(
f"3. [trails] refinement phase Done, time_usage = {time.time() - begin_time}, "
f"epoches_used = {B2_actual_epoch_use}")
return best_arch, best_arch_performance, B2_actual_epoch_use, total_time_usage