blob: 0400ef54673a549def1f8ad84dcbc85f7c7609d1 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# for binary insert
from typing import List
import numpy as np
def binary_insert_get_rank(rank_list: list, new_item: List) -> int:
"""
Insert the new_item to rank_list, then get the rank of it.
:param rank_list: 0: id, 1: score
:param new_item:
:return:
"""
index = search_position(rank_list, new_item)
# search the position to insert into
rank_list.insert(index, new_item)
return index
# O(logN) search the position to insert into
def search_position(rank_list_m: list, new_item: List):
if len(rank_list_m) == 0:
return 0
left = 0
right = len(rank_list_m) - 1
while left + 1 < right:
mid = int((left + right) / 2)
if rank_list_m[mid][1] <= new_item[1]:
left = mid
else:
right = mid
# consider the time.
if rank_list_m[right][1] <= new_item[1]:
return right + 1
elif rank_list_m[left][1] <= new_item[1]:
return left + 1
else:
return left
def generate_global_rank(ml_data_score_dic: dict, alg_name_list: List) -> dict:
"""
ml_data_score_dic: { model_id: {alg: score1, alg2: score2} }
return: { model_id: {alg1_alg2: rank_score} }
"""
history = {}
for alg in alg_name_list:
history[alg] = []
for arch_id, arch_score in ml_data_score_dic.items():
# add model and score to local list
for alg, score in arch_score.items():
if alg in alg_name_list:
binary_insert_get_rank(history[alg], [str(arch_id), float(score)])
# convert multiple scores into rank value
model_new_rank_score = {}
current_explored_models = 0
for alg in alg_name_list:
current_explored_models = len(history[alg])
for rank_index in range(len(history[alg])):
ms_ins = history[alg][rank_index]
# rank = index + 1, since index can be 0
if ms_ins[0] in model_new_rank_score:
model_new_rank_score[ms_ins[0]] += rank_index + 1
else:
model_new_rank_score[ms_ins[0]] = rank_index + 1
for ele in model_new_rank_score.keys():
model_new_rank_score[ele] = \
{"_".join(list(alg_name_list)): model_new_rank_score[ele] / current_explored_models}
return model_new_rank_score
def log_scale_x_array(num_points, max_minute, base=10) -> list:
"""
return a list of mins in log scale distance.
"""
# Set the minimum and maximum values for the log scale
min_val = 1 # 1 second
max_val = max_minute * 60 # 1440 minutes converted to seconds
# Generate the log scale values
log_vals = np.logspace(np.log10(min_val), np.log10(max_val), num=num_points, base=base)
# Convert the log scale values to minutes
log_vals_min = log_vals / 60
# Print the log scale values in minutes
return log_vals_min.tolist()
def sample_in_log_scale(lst: List, num_points: int) -> List:
indices = np.logspace(0, np.log10(len(lst) - 1), num_points + num_points // 2, dtype=int)
# Remove any duplicate indices
indices = np.unique(indices)
return list(indices)
def sample_in_log_scale_new(lstM: List, num_points: int) -> List:
lst = np.array(lstM)
# Create an evenly spaced array in the log scale domain
evenly_spaced_log_x = np.linspace(np.log10(lst.min()), np.log10(lst.max()), num_points)
# Convert the new array back to the original scale
evenly_spaced_x = 10 ** evenly_spaced_log_x
# Find the indices of the sampled points in the original x-array
indices = [np.abs(lst - point).argmin() for point in evenly_spaced_x]
return indices
def sample_in_line_scale(lst: List, num_points: int) -> List:
indices = np.linspace(0, len(lst) - 1, num_points, dtype=int)
# Remove any duplicate indices
indices = np.unique(indices)
return list(indices)