blob: e0c5d32101e2e975fac8f6f4475e53a499cc9252 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
# Experiments consist of a
# - workload
# - data generator
# - query generator
# - oracle
# - sketches
#
# Running an experiment will generate a csv containing the experimental results
# This can be fed into a function/script/notebook to process and plot the results
#
# For an existing problem and new, experimental sketch, one just needs to create a sketch that wraps the
# implementation with the simple API used in the test.
#
# For a novel problem, one must also write a query generator and an oracle that will compute the correct answer and calculate the error
# when comparing to a sketch's answer.
#
#
# Replicates in an experiment come from
# - reordering / generating new data sequences (runs)
# - randomization at the sketch level (repetitions)
# To make things parallelizable we
# - For each run, write a workload's data sequence to disk (unless it's just drawing from a distribution)
# - Compute the oracle's answers for that sequence and cache it on disk
# - Create a bunch of jobs to evaluate a sketch on the data sequence using the cached answers
#
# Since our jobs contain nested class instances, python's default pickle doesn't work
# So we use dill to pickle the instances ourselves and unpickle when its run
#############################################################################################################################
import os, psutil
import logging
import time
from random import uniform
import pandas as pd
import numpy as np
from enum import Enum
import copy
import dill # needed to pickle classes/functions for use in multiprocessing
from collections import deque
import importlib
from concurrent.futures import ProcessPoolExecutor
from experiment_utils import makeDict
############
class ResultAccumulator:
def __init__(self, save_answers=False):
self.results = []
self.timings = []
self.save_answers = save_answers
def addResult(self, sketch, error, answer, truth, query, workload):
if not self.save_answers:
answer = None
query_info = query.info()
sketch_info = sketch.info()
workload_info = workload.info()
result = makeDict(error=error, answer=answer, truth=truth,
**query_info, **workload_info, **sketch_info)
#qid=query.qid, **workload_info, **sketch_info)
self.results.append(result)
def extend(self, results: list):
self.results.extend(results)
def merge(self, results):
self.results.extend(results.results)
self.timings.extend(results.timings)
def toDataFrame(self):
df = pd.DataFrame(self.results)
return df
def addTiming(self, run_time, sketch, workload):
sketch_info = sketch.info()
workload_info = workload.info()
time_result = makeDict(time=run_time, **workload_info, **sketch_info)
self.timings.append(time_result)
def timingsToDataFrame(self):
df = pd.DataFrame(self.timings)
return df
class ExperimentOptions:
class OracleMode(Enum):
PRECOMPUTE = 0 # standard case which can be serialized to a C++/Java test
POSTCOMPUTE = 1 # for when the oracle needs a large datastructure to evaluate the error
JOINTLY_COMPUTE = 2 # likely to lead to bad timings and memory
def __init__(
self,
ndatasets=1, # number of times to generate or reorder the dataset
nrepetitions=10, # number of times to repeat a given ordering. (so the true answers do not need to be recomputed)
oracle_mode=OracleMode.PRECOMPUTE,
nparallel=1, # number of parallel processes
maxtasks=1, # max number of concurrent workload instances
log_file=None,
save_answers=False,
):
self.ndatasets = ndatasets
self.nrepetitions = nrepetitions
self.oracle_mode = oracle_mode
self.nparallel = nparallel
self.maxtasks = maxtasks
self.log_file = log_file
self.save_answers = save_answers
####################################################################
#
# Classes used to run parallel processes
#
# Task = Each data stream in the experiment + the oracle + all the sketches
# Job = every pass through the stream. Each job has its sketch randomized with a different seed
#
# The basic architecture is that an experiment generates Tasks.
# Each Task generates a collection of Jobs
# If the data streams are processed in parallel
# Each Job uses dill to pickle the Job
# Each child process unpickles a Job and runs it
#
# Note that some work may be required to ensure all relevant variables are pickled or to ensure
# the enclosing environment for the unpickled job matches the original environment
#
####################################################################
class Job:
"""
given an instantiated workload, evaluates the specified sketch
"""
# assumes workload and oracle are already appropriately initialized
def __init__(self, repetition, sketch_name, sketch_fun, sketch_kwargs, workload, oracle, options):
self.repetition = repetition
self.sketch_name = sketch_name
self.sketch_fun = sketch_fun
self.sketch_kwargs = sketch_kwargs
self.sketch_name = sketch_name
self.workload = workload
self.oracle = oracle
self.results = ResultAccumulator(save_answers=True) # XXX
self.options = options
def executeJob(job):
job.workload.prepare()
job.oracle.prepare()
print(f"Running {job.workload.getID()}, repetition {job.repetition}, on sketch {job.sketch_name} {str(job.sketch_kwargs)} ")
query_iter = job.workload.genQueries() # XXX just workload
q = next(query_iter)
sketch = job.sketch_fun(seed=job.repetition, **job.sketch_kwargs)
sketch.name = job.sketch_name
results = ResultAccumulator(save_answers=job.options.save_answers)
start_time = time.perf_counter()
for i, x in enumerate(job.workload.genData()):
sketch.add(x)
while q and i == q.data_idx:
answer = sketch.query(q.query, q.parameters)
error = job.oracle.eval_sketch_answer(q.qid, answer)
truth = job.oracle.getAnswer(q.qid)
results.addResult(
error=error,
answer=answer,
truth=truth,
query=q,
sketch=sketch,
workload=job.workload)
q = next(query_iter, None)
run_time = time.perf_counter() - start_time
results.addTiming(run_time=run_time, sketch=sketch, workload=job.workload)
return results
def executePickledJob(pickled_job):
job = dill.loads(pickled_job)
return executeJob(job)
class Task:
"""
controls multiple runs of a workload
"""
def __init__(self, run, sketch_seed_start, experiment, executor=None):
self.run = run
self.workload = copy.deepcopy(experiment.workload)
self.oracle = copy.deepcopy(experiment.oracle)
self.experiment = experiment
self.executor = executor
self.sketch_seed_start = sketch_seed_start
# create jobs for each process
# Currently does not parallelize oracle preparation
def makeJobs(self):
print(f"Making jobs for run {self.run}")
self.workload.reset(seed=self.run)
self.oracle.reset(workload=self.workload)
self.oracle.prepare()
workload = self.workload
oracle = self.oracle
workload.prepareForPickle()
oracle.prepareForPickle()
for rep in range(self.experiment.options.nrepetitions):
for sketch_name, (sketch_fun, sketch_params) in self.experiment.sketches.items():
for p in sketch_params:
yield Job(self.sketch_seed_start + rep, sketch_name, sketch_fun, p, workload, oracle, options=self.experiment.options)
####################################################################################################
class SketchExperiment:
"""
Run an experiment for given workload over a collection of sketches
The workloads can be randomized with a seed or they can be repeated for randomized sketches
"""
name = "BaseExperiment"
target_results = None # iterable providing the corresponding results (or a way to compute the result)
sketches = None # dictionary of sketches
MAX_SKETCHES = 100000
def __init__(self, workload, oracle, options=None, result_file="exp_results.csv"):
# basic setup variables
self.workload = workload
self.oracle = oracle
self.options = options if options is not None else ExperimentOptions()
self.results = ResultAccumulator()
self.timer = {}
self.result_file=result_file
self.start_seed = 0
def getNumSketches(self):
num = 1
for v in self.experiment.sketches.values():
num *= len(v)
return num
def addSketches(self, sketches):
self.sketches = sketches
def setOptions(self, options):
self.options = options
def prepareOracle(self, size=None, **kwargs):
raise Exception
def prepare(self, **kwargs):
logging.info("prep workload0")
self.workload.prepare()
def execute(self, write_mode="w"):
assert(self.options.oracle_mode == self.options.OracleMode.PRECOMPUTE)
if self.options.nparallel > 1:
executor = ProcessPoolExecutor(max_workers=self.options.nparallel)
for run in range(self.options.ndatasets):
logging.info(f"Starting run {run}")
workload_seed = self.start_seed + run
sketch_seed_start = workload_seed * self.MAX_SKETCHES
task = Task(workload_seed, sketch_seed_start, experiment=self)
futures = deque()
job_sizes = []
for job in task.makeJobs():
if self.options.nparallel > 1:
pickled_job = dill.dumps(job)
job_sizes.append(len(pickled_job))
futures.append(executor.submit(executePickledJob, pickled_job))
else:
job_result = executeJob(job)
self.results.merge(job_result)
job_sizes = np.array(job_sizes)
print(f"pickled job max size: {max(job_sizes)} avg size: {np.mean(job_sizes)}")
if self.options.nparallel > 1:
# block when there are enough parallel jobs running
while futures and len(futures) >= self.options.nparallel:
f = futures.popleft()
job_result = f.result()
self.results.merge(job_result)
# cleanup last jobs
while futures:
f = futures.popleft()
job_result = f.result()
self.results.merge(job_result)
print(f"writing output {self.result_file} mode: {write_mode}")
self.results.toDataFrame().to_csv(self.result_file, mode=write_mode, header=(write_mode == 'w'))
timing_df = self.results.timingsToDataFrame()
agg_timing_df = timing_df.groupby(['sketch_name']).agg(runtime=('time', np.mean))
print(agg_timing_df.to_string())
# TODO: clean up the temporary answer and pickle files
####################################################################################################
class SketchMetaExperiment:
"""
Run experiments on multiple workloads
"""
def __init__(self, workloads, oracle, options=None, result_file="exp_results.csv"):
self.workloads = workloads
self.oracle = oracle
self.options = options
self.result_file=result_file
self.sketches = []
def execute(self, default_write_mode='w'):
write_mode = default_write_mode
for w in self.workloads:
self.oracle.reset(workload=w)
e = SketchExperiment(workload=w,
oracle=self.oracle,
options=self.options,
result_file=self.result_file)
e.addSketches(self.sketches)
e.prepare()
e.execute(write_mode=write_mode)
write_mode = "a"
def addSketches(self, sketches):
self.sketches = sketches