blob: ea2d93512fd4ba248cfd8c5fae0ef7bf8194b9a7 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os
import time
import unittest
import uuid
import pytest
try:
import apache_beam.testing.benchmarks.cloudml.cloudml_benchmark_constants_lib as lib
from apache_beam.testing.benchmarks.cloudml.pipelines import workflow
from apache_beam.testing.load_tests.load_test_metrics_utils import InfluxDBMetricsPublisherOptions
from apache_beam.testing.load_tests.load_test_metrics_utils import MetricsReader
from apache_beam.testing.test_pipeline import TestPipeline
except ImportError: # pylint: disable=bare-except
raise unittest.SkipTest('Dependencies are not installed')
_INPUT_GCS_BUCKET_ROOT = 'gs://apache-beam-ml/datasets/cloudml/criteo'
_OUTPUT_GCS_BUCKET_ROOT = 'gs://temp-storage-for-end-to-end-tests/tft/'
def _publish_metrics(pipeline, metric_value, metrics_table, metric_name):
influx_options = InfluxDBMetricsPublisherOptions(
metrics_table,
pipeline.get_option('influx_db_name'),
pipeline.get_option('influx_hostname'),
os.getenv('INFLUXDB_USER'),
os.getenv('INFLUXDB_USER_PASSWORD'),
)
metric_reader = MetricsReader(
project_name=pipeline.get_option('project'),
bq_table=metrics_table,
bq_dataset=pipeline.get_option('metrics_dataset'),
publish_to_bq=True,
influxdb_options=influx_options,
)
metric_reader.publish_values([(
metric_name,
metric_value,
)])
@pytest.mark.uses_tft
class CloudMLTFTBenchmarkTest(unittest.TestCase):
def test_cloudml_benchmark_criteo_small(self):
test_pipeline = TestPipeline(is_integration_test=True)
extra_opts = {}
extra_opts['input'] = os.path.join(
_INPUT_GCS_BUCKET_ROOT, lib.INPUT_CRITEO_SMALL)
extra_opts['benchmark_type'] = 'tft'
extra_opts['classifier'] = 'criteo'
extra_opts['frequency_threshold'] = 0
extra_opts['output'] = os.path.join(
_OUTPUT_GCS_BUCKET_ROOT, uuid.uuid4().hex)
start_time = time.time()
workflow.run(test_pipeline.get_full_options_as_args(**extra_opts))
end_time = time.time()
metrics_table = 'cloudml_benchmark_criteo_small'
_publish_metrics(
pipeline=test_pipeline,
metric_value=end_time - start_time,
metrics_table=metrics_table,
metric_name='runtime_sec')
def test_cloudml_benchmark_cirteo_no_shuffle_10GB(self):
test_pipeline = TestPipeline(is_integration_test=True)
extra_opts = {}
extra_opts['input'] = os.path.join(
_INPUT_GCS_BUCKET_ROOT, lib.INPUT_CRITEO_10GB)
extra_opts['benchmark_type'] = 'tft'
extra_opts['classifier'] = 'criteo'
extra_opts['frequency_threshold'] = 0
extra_opts['output'] = os.path.join(
_OUTPUT_GCS_BUCKET_ROOT, uuid.uuid4().hex)
extra_opts['shuffle'] = False
start_time = time.time()
workflow.run(test_pipeline.get_full_options_as_args(**extra_opts))
end_time = time.time()
metrics_table = 'cloudml_benchmark_cirteo_no_shuffle_10GB'
_publish_metrics(
pipeline=test_pipeline,
metric_value=end_time - start_time,
metrics_table=metrics_table,
metric_name='runtime_sec')
def test_cloudml_benchmark_criteo_10GB(self):
test_pipeline = TestPipeline(is_integration_test=True)
extra_opts = {}
extra_opts['input'] = os.path.join(
_INPUT_GCS_BUCKET_ROOT, lib.INPUT_CRITEO_10GB)
extra_opts['benchmark_type'] = 'tft'
extra_opts['classifier'] = 'criteo'
extra_opts['frequency_threshold'] = 0
extra_opts['output'] = os.path.join(
_OUTPUT_GCS_BUCKET_ROOT, uuid.uuid4().hex)
start_time = time.time()
workflow.run(test_pipeline.get_full_options_as_args(**extra_opts))
end_time = time.time()
metrics_table = 'cloudml_benchmark_criteo_10GB'
_publish_metrics(
pipeline=test_pipeline,
metric_value=end_time - start_time,
metrics_table=metrics_table,
metric_name='runtime_sec')
if __name__ == '__main__':
unittest.main()