| # |
| # Copyright 2022 The Applied Research Laboratory for Intelligence and Security (ARLIS) |
| # |
| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| # @TODO add header with description of file |
| |
| # Segment Testing |
| import pytest |
| import json |
| import pandas as pd |
| import distill |
| import testing_utils |
| import datetime |
| import os |
| |
| ######################## |
| # SEGMENT OBJECT TESTS # |
| ######################## |
| |
| def test_segment_constructor(): |
| segment = distill.Segment() |
| assert segment.get_segment_name() == "" |
| assert segment.get_num_logs() == 0 |
| assert segment.get_start_end_val() is None |
| assert segment.get_segment_uids() == [] |
| |
| def test_segment_string(): |
| segment = distill.Segment() |
| segment.segment_name = "segment_name" |
| segment.start_end_val = (1, 2) |
| segment.segment_type = distill.Segment_Type.CREATE |
| |
| assert str(segment) == "Segment: segment_name=segment_name, start=1, end=2, num_logs=0, " \ |
| "generate_field_name=None, generate_matched_values=None, segment_type=Segment_Type.CREATE" |
| |
| segment.test = "test_attribute" |
| assert str(segment) == "Segment: segment_name=segment_name, start=1, end=2, num_logs=0, " \ |
| "generate_field_name=None, generate_matched_values=None, segment_type=Segment_Type.CREATE," \ |
| " test=test_attribute" |
| |
| def test_getters(): |
| data = testing_utils.setup("./data/sample_data.json", "integer") |
| sorted_data = data[0] |
| sorted_dict = data[1] |
| |
| # Create Test Segment |
| start_end_vals = [] |
| start_end_vals.append((sorted_data[0][1]['clientTime'], sorted_data[1][1]['clientTime'])) |
| segment_names = ["test_segment_1"] |
| result = distill.create_segment(sorted_dict, segment_names, start_end_vals) |
| seg = result.get_segment_list()[0] |
| |
| assert seg.get_segment_name() == "test_segment_1" |
| assert seg.get_start_end_val() == (sorted_data[0][1]['clientTime'], sorted_data[1][1]['clientTime']) |
| assert seg.get_num_logs() == 2 |
| assert seg.get_segment_uids() == [sorted_data[0][0], sorted_data[1][0]] |
| assert seg.get_segment_type() == distill.Segment_Type.CREATE |
| assert seg.get_generate_field_name() is None |
| assert seg.get_generate_matched_values() is None |
| |
| ######################## |
| # CREATE_SEGMENT TESTS # |
| ######################## |
| def test_create_segment_integer(): |
| data = testing_utils.setup("./data/sample_data.json", "integer") |
| sorted_data = data[0] |
| sorted_dict = data[1] |
| |
| # Create Test Segment Tuples |
| start_end_vals = [] |
| start_end_vals.append((sorted_data[0][1]['clientTime'], sorted_data[18][1]['clientTime'])) |
| start_end_vals.append((sorted_data[5][1]['clientTime'], sorted_data[6][1]['clientTime'])) |
| start_end_vals.append((sorted_data[3][1]['clientTime'], sorted_data[9][1]['clientTime'])) |
| |
| segment_names = ["test_segment_all", "test_segment_same_client_time", "test_segment_extra_log"] |
| |
| # Call create_segment |
| create_result = distill.create_segment(sorted_dict, segment_names, start_end_vals) |
| |
| result = create_result.get_segment_name_dict() |
| |
| assert result["test_segment_all"].num_logs == 19 |
| assert result["test_segment_all"].segment_name == "test_segment_all" |
| assert result["test_segment_all"].start_end_val == (1623691890656, 1623691909728) |
| |
| assert result["test_segment_same_client_time"].num_logs == 2 |
| assert result["test_segment_same_client_time"].segment_name == "test_segment_same_client_time" |
| assert result["test_segment_same_client_time"].start_end_val == (1623691904488, 1623691904488) |
| assert result["test_segment_same_client_time"].uids == ["session_16236918905391623691904488rawclick", |
| "session_16236918905391623691904488customclick"] |
| |
| assert result["test_segment_extra_log"].num_logs == 8 |
| assert result["test_segment_extra_log"].segment_name == "test_segment_extra_log" |
| assert result["test_segment_extra_log"].start_end_val == (1623691904212, 1623691904923) |
| for segment_name in result: |
| assert result[segment_name].segment_type == distill.Segment_Type.CREATE |
| assert result[segment_name].get_segment_type() == distill.Segment_Type.CREATE |
| assert result[segment_name].generate_field_name is None |
| assert result[segment_name].get_generate_field_name() is None |
| assert result[segment_name].generate_matched_values is None |
| assert result[segment_name].get_generate_matched_values() is None |
| |
| def test_create_segment_datetime(): |
| data = testing_utils.setup("./data/sample_data.json", "datetime") |
| sorted_data = data[0] |
| sorted_dict = data[1] |
| |
| # Create Test Segment Tuples |
| start_end_vals = [] |
| start_end_vals.append((sorted_data[0][1]['clientTime'], sorted_data[18][1]['clientTime'])) |
| start_end_vals.append((sorted_data[5][1]['clientTime'], sorted_data[6][1]['clientTime'])) |
| start_end_vals.append((sorted_data[3][1]['clientTime'], sorted_data[9][1]['clientTime'])) |
| |
| segment_names = ["test_segment_all", "test_segment_same_client_time", "test_segment_extra_log"] |
| |
| # Call create_segment |
| result = distill.create_segment(sorted_dict, segment_names, start_end_vals).get_segment_name_dict() |
| |
| assert result["test_segment_all"].num_logs == 19 |
| assert result["test_segment_all"].segment_name == "test_segment_all" |
| assert result["test_segment_all"].start_end_val == (testing_utils.to_datetime(1623691890656), |
| testing_utils.to_datetime(1623691909728)) |
| |
| assert result["test_segment_same_client_time"].num_logs == 2 |
| assert result["test_segment_same_client_time"].segment_name == "test_segment_same_client_time" |
| assert result["test_segment_same_client_time"].start_end_val == \ |
| (testing_utils.to_datetime(1623691904488), |
| testing_utils.to_datetime(1623691904488)) |
| assert result["test_segment_same_client_time"].uids == ["session_16236918905391623691904488rawclick", |
| "session_16236918905391623691904488customclick"] |
| |
| assert result["test_segment_extra_log"].num_logs == 8 |
| assert result["test_segment_extra_log"].segment_name == "test_segment_extra_log" |
| assert result["test_segment_extra_log"].start_end_val == (testing_utils.to_datetime(1623691904212), |
| testing_utils.to_datetime(1623691904923)) |
| for segment_name in result: |
| assert result[segment_name].segment_type == distill.Segment_Type.CREATE |
| assert result[segment_name].get_segment_type() == distill.Segment_Type.CREATE |
| assert result[segment_name].generate_field_name is None |
| assert result[segment_name].get_generate_field_name() is None |
| assert result[segment_name].generate_matched_values is None |
| assert result[segment_name].get_generate_matched_values() is None |
| |
| def test_create_segment_error_1(): |
| with pytest.raises(TypeError): |
| data = testing_utils.setup("./data/sample_data.json", "integer") |
| sorted_data = data[0] |
| sorted_dict = data[1] |
| |
| # Create Test Segment Tuples |
| start_end_vals = [] |
| start_end_vals.append((testing_utils.to_datetime(sorted_data[0][1]['clientTime']), |
| testing_utils.to_datetime(sorted_data[18][1]['clientTime']))) |
| segment_names = ["test_segment_error"] |
| |
| distill.create_segment(sorted_dict, segment_names, start_end_vals) |
| |
| def test_create_segment_error_2(): |
| with pytest.raises(TypeError): |
| data = testing_utils.setup("./data/sample_data.json", "string") |
| sorted_data = data[0] |
| sorted_dict = data[1] |
| |
| # Create Test Segment Tuples |
| start_end_vals = [] |
| start_end_vals.append(("random_string_1", "random_string_2")) |
| segment_names = ["test_segment_error"] |
| |
| distill.create_segment(sorted_dict, segment_names, start_end_vals) |
| |
| ####################### |
| # WRITE_SEGMENT TESTS # |
| ####################### |
| def test_write_segment_integer(): |
| data = testing_utils.setup("./data/sample_data.json", "integer") |
| sorted_data = data[0] |
| sorted_dict = data[1] |
| |
| # Create Test Segment Tuples |
| start_end_vals = [] |
| start_end_vals.append((sorted_data[0][1]['clientTime'], sorted_data[18][1]['clientTime'])) |
| start_end_vals.append((sorted_data[5][1]['clientTime'], sorted_data[6][1]['clientTime'])) |
| start_end_vals.append((sorted_data[3][1]['clientTime'], sorted_data[9][1]['clientTime'])) |
| |
| segment_names = ["test_segment_all", "test_segment_same_client_time", "test_segment_extra_log"] |
| |
| # Call write_segment |
| result = distill.write_segment(sorted_dict, segment_names, start_end_vals) |
| |
| # Assert dictionary lengths |
| assert len(result["test_segment_all"]) == 19 |
| assert len(result["test_segment_same_client_time"]) == 2 |
| assert len(result["test_segment_extra_log"]) == 8 |
| |
| # Assert clientTime types |
| for uid in result['test_segment_all']: |
| assert isinstance(result['test_segment_all'][uid]['clientTime'], int) |
| for uid in result['test_segment_same_client_time']: |
| assert isinstance(result['test_segment_same_client_time'][uid]['clientTime'], int) |
| for uid in result['test_segment_extra_log']: |
| assert isinstance(result['test_segment_extra_log'][uid]['clientTime'], int) |
| |
| def test_write_segment_datetime(): |
| data = testing_utils.setup("./data/sample_data.json", "datetime") |
| sorted_data = data[0] |
| sorted_dict = data[1] |
| |
| # Create Test Segment Tuples |
| start_end_vals = [] |
| start_end_vals.append((sorted_data[0][1]['clientTime'], sorted_data[18][1]['clientTime'])) |
| start_end_vals.append((sorted_data[5][1]['clientTime'], sorted_data[6][1]['clientTime'])) |
| start_end_vals.append((sorted_data[3][1]['clientTime'], sorted_data[9][1]['clientTime'])) |
| |
| segment_names = ["test_segment_all", "test_segment_same_client_time", "test_segment_extra_log"] |
| |
| # Call write_segment |
| result = distill.write_segment(sorted_dict, segment_names, start_end_vals) |
| |
| assert len(result["test_segment_all"]) == 19 |
| assert len(result["test_segment_same_client_time"]) == 2 |
| assert len(result["test_segment_extra_log"]) == 8 |
| |
| # Assert clientTime types |
| for uid in result['test_segment_all']: |
| assert isinstance(result['test_segment_all'][uid]['clientTime'], datetime.datetime) |
| assert isinstance(result['test_segment_all'][uid]['clientTime'], pd.Timestamp) |
| for uid in result['test_segment_same_client_time']: |
| assert isinstance(result['test_segment_same_client_time'][uid]['clientTime'], datetime.datetime) |
| assert isinstance(result['test_segment_same_client_time'][uid]['clientTime'], pd.Timestamp) |
| for uid in result['test_segment_extra_log']: |
| assert isinstance(result['test_segment_extra_log'][uid]['clientTime'], datetime.datetime) |
| assert isinstance(result['test_segment_extra_log'][uid]['clientTime'], pd.Timestamp) |
| |
| def test_write_segment_error_1(): |
| with pytest.raises(TypeError): |
| data = testing_utils.setup("./data/sample_data.json", "integer") |
| sorted_data = data[0] |
| sorted_dict = data[1] |
| |
| # Create Test Segment Tuples |
| start_end_vals = [] |
| start_end_vals.append((testing_utils.to_datetime(sorted_data[0][1]['clientTime']), |
| testing_utils.to_datetime(sorted_data[18][1]['clientTime']))) |
| segment_names = ["test_segment_error"] |
| |
| result = distill.write_segment(sorted_dict, segment_names, start_end_vals) |
| |
| def test_write_segment_error_2(): |
| with pytest.raises(TypeError): |
| data = testing_utils.setup("./data/sample_data.json", "string") |
| sorted_data = data[0] |
| sorted_dict = data[1] |
| |
| # Create Test Segment Tuples |
| start_end_vals = [] |
| start_end_vals.append((testing_utils.to_datetime(sorted_data[0][1]['clientTime']), |
| testing_utils.to_datetime(sorted_data[18][1]['clientTime']))) |
| segment_names = ["test_segment_error"] |
| |
| result = distill.write_segment(sorted_dict, segment_names, start_end_vals) |
| |
| ########################### |
| # GENERATE_SEGMENTS TESTS # |
| ########################### |
| def test_generate_segments_integer(): |
| data = testing_utils.setup("./data/segment_generator_sample_data.json", "integer") |
| sorted_dict = data[1] |
| |
| load_result = distill.generate_segments(sorted_dict, 'type', ['load'], 1, 1, label="load").get_segment_name_dict() |
| assert len(load_result) == 2 |
| assert load_result["load0"].start_end_val == (1623691889600, 1623691891600) |
| assert load_result["load0"].num_logs == 3 |
| assert load_result["load1"].start_end_val == (1623691906302, 1623691908302) |
| assert load_result["load1"].num_logs == 7 |
| for segment_name in load_result: |
| assert load_result[segment_name].segment_type == distill.Segment_Type.GENERATE |
| assert load_result[segment_name].generate_field_name == 'type' |
| assert load_result[segment_name].generate_matched_values == ['load'] |
| |
| click_result = distill.generate_segments(sorted_dict, 'type', ['click'], 1, 1).get_segment_name_dict() |
| assert len(click_result) == 4 |
| assert click_result["0"].start_end_val == (1623691903200, 1623691905200) |
| assert click_result["0"].num_logs == 2 |
| assert click_result["1"].start_end_val == (1623691905200, 1623691906488) |
| assert click_result["1"].num_logs == 7 |
| assert click_result["2"].start_end_val == (1623691906488, 1623691907955) |
| assert click_result["2"].num_logs == 6 |
| assert click_result["3"].start_end_val == (1623691907955, 1623691909100) |
| assert click_result["3"].num_logs == 1 |
| for segment_name in click_result: |
| assert click_result[segment_name].segment_type == distill.Segment_Type.GENERATE |
| assert click_result[segment_name].get_segment_type() == distill.Segment_Type.GENERATE |
| assert click_result[segment_name].generate_field_name == 'type' |
| assert click_result[segment_name].get_generate_field_name() == 'type' |
| assert click_result[segment_name].generate_matched_values == ['click'] |
| assert click_result[segment_name].get_generate_matched_values() == ['click'] |
| |
| load_click_result = distill.generate_segments(sorted_dict, 'type', ['load', 'click'], 1, 1).get_segment_name_dict() |
| assert len(load_click_result) == 5 |
| assert load_click_result["0"].start_end_val == (1623691889600, 1623691891600) |
| assert load_click_result["0"].num_logs == 3 |
| assert load_click_result["1"].start_end_val == (1623691903200, 1623691905200) |
| assert load_click_result["1"].num_logs == 2 |
| assert load_click_result["2"].start_end_val == (1623691905200, 1623691906488) |
| assert load_click_result["2"].num_logs == 7 |
| assert load_click_result["3"].start_end_val == (1623691906488, 1623691907955) |
| assert load_click_result["3"].num_logs == 6 |
| assert load_click_result["4"].start_end_val == (1623691907955, 1623691909100) |
| assert load_click_result["4"].num_logs == 1 |
| for segment_name in load_click_result: |
| assert load_click_result[segment_name].segment_type == distill.Segment_Type.GENERATE |
| assert load_click_result[segment_name].get_segment_type() == distill.Segment_Type.GENERATE |
| assert load_click_result[segment_name].generate_field_name == 'type' |
| assert load_click_result[segment_name].get_generate_field_name() == 'type' |
| assert load_click_result[segment_name].generate_matched_values == ['load', 'click'] |
| assert load_click_result[segment_name].get_generate_matched_values() == ['load', 'click'] |
| |
| def test_generate_segments_datetime(): |
| data = testing_utils.setup("./data/segment_generator_sample_data.json", "datetime") |
| sorted_dict = data[1] |
| |
| load_result = distill.generate_segments(sorted_dict, 'type', ['load'], 1, 1).get_segment_name_dict() |
| assert len(load_result) == 2 |
| assert load_result["0"].start_end_val == (testing_utils.to_datetime(1623691889600), |
| testing_utils.to_datetime(1623691891600)) |
| assert load_result["0"].num_logs == 3 |
| assert load_result["1"].start_end_val == (testing_utils.to_datetime(1623691906302), |
| testing_utils.to_datetime(1623691908302)) |
| assert load_result["1"].num_logs == 7 |
| for segment_name in load_result: |
| assert load_result[segment_name].segment_type == distill.Segment_Type.GENERATE |
| assert load_result[segment_name].get_segment_type() == distill.Segment_Type.GENERATE |
| assert load_result[segment_name].generate_field_name == 'type' |
| assert load_result[segment_name].get_generate_field_name() == 'type' |
| assert load_result[segment_name].generate_matched_values == ['load'] |
| assert load_result[segment_name].get_generate_matched_values() == ['load'] |
| |
| click_result = distill.generate_segments(sorted_dict, 'type', ['click'], 1, 1, "click").get_segment_name_dict() |
| assert len(click_result) == 4 |
| assert click_result["click0"].start_end_val == (testing_utils.to_datetime(1623691903200), |
| testing_utils.to_datetime(1623691905200)) |
| assert click_result["click0"].num_logs == 2 |
| assert click_result["click1"].start_end_val == (testing_utils.to_datetime(1623691905200), |
| testing_utils.to_datetime(1623691906488)) |
| assert click_result["click1"].num_logs == 7 |
| assert click_result["click2"].start_end_val == (testing_utils.to_datetime(1623691906488), |
| testing_utils.to_datetime(1623691907955)) |
| assert click_result["click2"].num_logs == 6 |
| assert click_result["click3"].start_end_val == (testing_utils.to_datetime(1623691907955), |
| testing_utils.to_datetime(1623691909100)) |
| assert click_result["click3"].num_logs == 1 |
| for segment_name in click_result: |
| assert click_result[segment_name].segment_type == distill.Segment_Type.GENERATE |
| assert click_result[segment_name].get_segment_type() == distill.Segment_Type.GENERATE |
| assert click_result[segment_name].generate_field_name == 'type' |
| assert click_result[segment_name].get_generate_field_name() == 'type' |
| assert click_result[segment_name].generate_matched_values == ['click'] |
| assert click_result[segment_name].get_generate_matched_values() == ['click'] |
| |
| load_click_result = distill.generate_segments(sorted_dict, 'type', ['load', 'click'], 1, 1).get_segment_name_dict() |
| assert len(load_click_result) == 5 |
| assert load_click_result["0"].start_end_val == (testing_utils.to_datetime(1623691889600), |
| testing_utils.to_datetime(1623691891600)) |
| assert load_click_result["0"].num_logs == 3 |
| assert load_click_result["1"].start_end_val == (testing_utils.to_datetime(1623691903200), |
| testing_utils.to_datetime(1623691905200)) |
| assert load_click_result["1"].num_logs == 2 |
| assert load_click_result["2"].start_end_val == (testing_utils.to_datetime(1623691905200), |
| testing_utils.to_datetime(1623691906488)) |
| assert load_click_result["2"].num_logs == 7 |
| assert load_click_result["3"].start_end_val == (testing_utils.to_datetime(1623691906488), |
| testing_utils.to_datetime(1623691907955)) |
| assert load_click_result["3"].num_logs == 6 |
| assert load_click_result["4"].start_end_val == (testing_utils.to_datetime(1623691907955), |
| testing_utils.to_datetime(1623691909100)) |
| assert load_click_result["4"].num_logs == 1 |
| |
| for segment_name in load_click_result: |
| assert load_click_result[segment_name].segment_type == distill.Segment_Type.GENERATE |
| assert load_click_result[segment_name].get_segment_type() == distill.Segment_Type.GENERATE |
| assert load_click_result[segment_name].generate_field_name == 'type' |
| assert load_click_result[segment_name].get_generate_field_name() == 'type' |
| assert load_click_result[segment_name].generate_matched_values == ['load', 'click'] |
| assert load_click_result[segment_name].get_generate_matched_values() == ['load', 'click'] |
| |
| def test_generate_segments_none(): |
| data = testing_utils.setup("./data/segment_generator_sample_data.json", "datetime") |
| sorted_dict = data[1] |
| |
| result1 = distill.generate_segments(sorted_dict, 'type', ['random'], 1, 1) |
| assert len(result1) == 0 |
| |
| result2 = distill.generate_segments(sorted_dict, 'random', ['random'], 1, 1) |
| assert len(result2) == 0 |
| |
| def test_generate_segments_error(): |
| with pytest.raises(TypeError): |
| data = testing_utils.setup("./data/segment_generator_sample_data.json", "string") |
| sorted_dict = data[1] |
| |
| distill.generate_segments(sorted_dict, 'type', ['load'], 1, 1) |
| |
| ############################# |
| # DETECT_DEADSPACE TESTS # |
| ############################# |
| def test_deadspace_detection_integer(): |
| data = testing_utils.setup("./data/deadspace_detection_sample_data.json", "integer") |
| sorted_dict = data[1] |
| |
| result_no_label = distill.detect_deadspace(sorted_dict, 5, 1, 2).get_segment_name_dict() |
| |
| assert len(result_no_label) == 3 |
| assert result_no_label["0"].start_end_val == (1623691890459, 1623691994888) |
| assert result_no_label["0"].num_logs == 7 |
| assert result_no_label["1"].start_end_val == (1623691991900, 1623693994900) |
| assert result_no_label["1"].num_logs == 15 |
| assert result_no_label["2"].start_end_val == (1623693994550, 1623697997550) |
| assert result_no_label["2"].num_logs == 3 |
| for segment_name in result_no_label: |
| assert result_no_label[segment_name].segment_type == distill.Segment_Type.DEADSPACE |
| assert result_no_label[segment_name].get_segment_type() == distill.Segment_Type.DEADSPACE |
| assert result_no_label[segment_name].generate_field_name is None |
| assert result_no_label[segment_name].get_generate_field_name() is None |
| assert result_no_label[segment_name].generate_matched_values is None |
| assert result_no_label[segment_name].get_generate_matched_values() is None |
| |
| result_with_label = distill.detect_deadspace(sorted_dict, 5, 1, 2, "deadspace").get_segment_name_dict() |
| assert len(result_with_label) == 3 |
| assert result_with_label["deadspace0"].start_end_val == (1623691890459, 1623691994888) |
| assert result_with_label["deadspace0"].num_logs == 7 |
| assert result_with_label["deadspace1"].start_end_val == (1623691991900, 1623693994900) |
| assert result_with_label["deadspace1"].num_logs == 15 |
| assert result_with_label["deadspace2"].start_end_val == (1623693994550, 1623697997550) |
| assert result_with_label["deadspace2"].num_logs == 3 |
| for segment_name in result_with_label: |
| assert result_with_label[segment_name].segment_type == distill.Segment_Type.DEADSPACE |
| assert result_with_label[segment_name].get_segment_type() == distill.Segment_Type.DEADSPACE |
| assert result_with_label[segment_name].generate_field_name is None |
| assert result_with_label[segment_name].get_generate_field_name() is None |
| assert result_with_label[segment_name].generate_matched_values is None |
| assert result_with_label[segment_name].get_generate_matched_values() is None |
| |
| |
| def test_deadspace_detection_datetime(): |
| data = testing_utils.setup("./data/deadspace_detection_sample_data.json", "datetime") |
| sorted_dict = data[1] |
| |
| result_no_label = distill.detect_deadspace(sorted_dict, 5, 1, 2).get_segment_name_dict() |
| |
| assert len(result_no_label) == 3 |
| assert result_no_label["0"].start_end_val == (testing_utils.to_datetime(1623691890459), |
| testing_utils.to_datetime(1623691994888)) |
| assert result_no_label["0"].num_logs == 7 |
| assert result_no_label["1"].start_end_val == (testing_utils.to_datetime(1623691991900), |
| testing_utils.to_datetime(1623693994900)) |
| assert result_no_label["1"].num_logs == 15 |
| assert result_no_label["2"].start_end_val == (testing_utils.to_datetime(1623693994550), |
| testing_utils.to_datetime(1623697997550)) |
| assert result_no_label["2"].num_logs == 3 |
| for segment_name in result_no_label: |
| assert result_no_label[segment_name].segment_type == distill.Segment_Type.DEADSPACE |
| assert result_no_label[segment_name].get_segment_type() == distill.Segment_Type.DEADSPACE |
| assert result_no_label[segment_name].generate_field_name is None |
| assert result_no_label[segment_name].get_generate_field_name() is None |
| assert result_no_label[segment_name].generate_matched_values is None |
| assert result_no_label[segment_name].get_generate_matched_values() is None |
| |
| result_with_label = distill.detect_deadspace(sorted_dict, 5, 1, 2, "deadspace").get_segment_name_dict() |
| |
| assert len(result_with_label) == 3 |
| assert result_with_label["deadspace0"].start_end_val == (testing_utils.to_datetime(1623691890459), |
| testing_utils.to_datetime(1623691994888)) |
| assert result_with_label["deadspace0"].num_logs == 7 |
| assert result_with_label["deadspace1"].start_end_val == (testing_utils.to_datetime(1623691991900), |
| testing_utils.to_datetime(1623693994900)) |
| assert result_with_label["deadspace1"].num_logs == 15 |
| assert result_with_label["deadspace2"].start_end_val == (testing_utils.to_datetime(1623693994550), |
| testing_utils.to_datetime(1623697997550)) |
| assert result_with_label["deadspace2"].num_logs == 3 |
| for segment_name in result_with_label: |
| assert result_with_label[segment_name].segment_type == distill.Segment_Type.DEADSPACE |
| assert result_with_label[segment_name].get_segment_type() == distill.Segment_Type.DEADSPACE |
| assert result_with_label[segment_name].generate_field_name is None |
| assert result_with_label[segment_name].get_generate_field_name() is None |
| assert result_with_label[segment_name].generate_matched_values is None |
| assert result_with_label[segment_name].get_generate_matched_values() is None |
| |
| def test_deadspace_detection_error1(): |
| with pytest.raises(TypeError): |
| data = testing_utils.setup("./data/deadspace_detection_sample_data.json", "string") |
| sorted_dict = data[1] |
| |
| distill.detect_deadspace(sorted_dict, 5, 1, 2) |
| |
| def test_deadspace_detection_error2(): |
| with pytest.raises(TypeError): |
| data = testing_utils.setup("./data/deadspace_detection_sample_data.json", "integer") |
| sorted_dict = data[1] |
| |
| sorted_dict["session_16236918905391623691891459rawscroll"]['clientTime'] = \ |
| testing_utils.to_datetime(sorted_dict["session_16236918905391623691891459rawscroll"]['clientTime']) |
| |
| distill.detect_deadspace(sorted_dict, 5, 1, 2) |
| |
| def test_fixed_time_segments_integer(): |
| data = testing_utils.setup("./data/sample_data.json", "integer") |
| sorted_dict = data[1] |
| |
| result_no_label = distill.generate_fixed_time_segments(sorted_dict, 5).get_segment_name_dict() |
| |
| # Check that start and end times are 5 seconds apart |
| for segment_name in result_no_label: |
| start = result_no_label[segment_name].start_end_val[0] |
| end = result_no_label[segment_name].start_end_val[1] |
| diff = end - start |
| assert diff == 5000 |
| |
| assert len(result_no_label) == 4 |
| |
| assert result_no_label["0"].start_end_val == (1623691890656, 1623691895656) |
| assert result_no_label["0"].num_logs == 3 |
| assert result_no_label["1"].start_end_val == (1623691895656, 1623691900656) |
| assert result_no_label["1"].num_logs == 0 |
| assert result_no_label["2"].start_end_val == (1623691900656, 1623691905656) |
| assert result_no_label["2"].num_logs == 9 |
| assert result_no_label["3"].start_end_val == (1623691905656, 1623691910656) |
| assert result_no_label["3"].num_logs == 7 |
| |
| result_label_trim = distill.generate_fixed_time_segments(sorted_dict, 5, trim=True, label="trim").get_segment_name_dict() |
| |
| # Check that start and end times are 5 seconds apart |
| for segment_name in result_label_trim: |
| start = result_label_trim[segment_name].start_end_val[0] |
| end = result_label_trim[segment_name].start_end_val[1] |
| diff = end - start |
| assert diff == 5000 |
| |
| assert len(result_label_trim) == 3 |
| |
| assert result_label_trim["trim0"].start_end_val == (1623691890656, 1623691895656) |
| assert result_label_trim["trim0"].num_logs == 3 |
| assert result_label_trim["trim1"].start_end_val == (1623691895656, 1623691900656) |
| assert result_label_trim["trim1"].num_logs == 0 |
| assert result_label_trim["trim2"].start_end_val == (1623691900656, 1623691905656) |
| assert result_label_trim["trim2"].num_logs == 9 |
| |
| def test_fixed_time_segments_datetime(): |
| data = testing_utils.setup("./data/sample_data.json", "datetime") |
| sorted_dict = data[1] |
| |
| result_no_label = distill.generate_fixed_time_segments(sorted_dict, 5).get_segment_name_dict() |
| |
| # Check that start and end times are 5 seconds apart |
| for segment_name in result_no_label: |
| start = result_no_label[segment_name].start_end_val[0] |
| end = result_no_label[segment_name].start_end_val[1] |
| diff = end - start |
| assert diff == datetime.timedelta(seconds=5) |
| |
| assert len(result_no_label) == 4 |
| |
| assert result_no_label["0"].start_end_val == (testing_utils.to_datetime(1623691890656), |
| testing_utils.to_datetime(1623691895656)) |
| assert result_no_label["0"].num_logs == 3 |
| assert result_no_label["1"].start_end_val == (testing_utils.to_datetime(1623691895656), |
| testing_utils.to_datetime(1623691900656)) |
| assert result_no_label["1"].num_logs == 0 |
| assert result_no_label["2"].start_end_val == (testing_utils.to_datetime(1623691900656), |
| testing_utils.to_datetime(1623691905656)) |
| assert result_no_label["2"].num_logs == 9 |
| assert result_no_label["3"].start_end_val == (testing_utils.to_datetime(1623691905656), |
| testing_utils.to_datetime(1623691910656)) |
| assert result_no_label["3"].num_logs == 7 |
| |
| result_label_trim = distill.generate_fixed_time_segments(sorted_dict, 5, trim=True, label="trim").get_segment_name_dict() |
| |
| # Check that start and end times are 5 seconds apart |
| for segment_name in result_label_trim: |
| start = result_label_trim[segment_name].start_end_val[0] |
| end = result_label_trim[segment_name].start_end_val[1] |
| diff = end - start |
| assert diff == datetime.timedelta(seconds=5) |
| |
| assert len(result_label_trim) == 3 |
| |
| assert result_label_trim["trim0"].start_end_val == (testing_utils.to_datetime(1623691890656), |
| testing_utils.to_datetime(1623691895656)) |
| assert result_label_trim["trim0"].num_logs == 3 |
| assert result_label_trim["trim1"].start_end_val == (testing_utils.to_datetime(1623691895656), |
| testing_utils.to_datetime(1623691900656)) |
| assert result_label_trim["trim1"].num_logs == 0 |
| assert result_label_trim["trim2"].start_end_val == (testing_utils.to_datetime(1623691900656), |
| testing_utils.to_datetime(1623691905656)) |
| assert result_label_trim["trim2"].num_logs == 9 |
| |
| def test_fixed_time_segments_error(): |
| with pytest.raises(TypeError): |
| data = testing_utils.setup("./data/deadspace_detection_sample_data.json", "string") |
| sorted_dict = data[1] |
| |
| distill.generate_fixed_time_segments(sorted_dict, 10) |
| |
| def test_generate_collapsing_windows_integer(): |
| data = testing_utils.setup("./data/sample_data.json", "integer") |
| sorted_dict = data[1] |
| |
| result_no_label = distill.generate_collapsing_window_segments(sorted_dict, "path", ["button#test_button"]) |
| |
| segment = result_no_label[0] |
| |
| assert len(result_no_label) == 1 |
| assert segment.num_logs == 8 |
| assert segment.segment_name == "0" |
| assert segment.start_end_val == (1623691904212, 1623691904923) |
| |
| def test_generate_collapsing_windows_datetime(): |
| data = testing_utils.setup("./data/sample_data.json", "datetime") |
| sorted_dict = data[1] |
| |
| result_no_label = distill.generate_collapsing_window_segments(sorted_dict, "path", ["Window"]) |
| |
| segment1 = result_no_label[0] |
| segment2 = result_no_label[1] |
| |
| assert segment1.num_logs == 16 |
| assert segment2.num_logs == 1 |
| assert segment1.segment_name == "0" |
| assert segment2.segment_name == "1" |
| assert segment1.start_end_val == (testing_utils.to_datetime(1623691891459), |
| testing_utils.to_datetime(1623691907136)) |
| assert segment2.start_end_val == (testing_utils.to_datetime(1623691909728), |
| testing_utils.to_datetime(1623691909728)) |
| |
| assert len(result_no_label) == 2 |
| |
| |
| def test_generate_collapsing_windows_datetime_all_logs(): |
| data = testing_utils.setup("./data/sample_data.json", "datetime") |
| sorted_dict = data[1] |
| |
| result_no_label = distill.generate_collapsing_window_segments(sorted_dict, "sessionID", ["session_1623691890539"]) |
| |
| segment1 = result_no_label[0] |
| |
| assert len(result_no_label) == 1 |
| assert segment1.num_logs == 19 |
| assert segment1.segment_name == "0" |
| assert segment1.start_end_val == (testing_utils.to_datetime(1623691890656), |
| testing_utils.to_datetime(1623691909728)) |
| |
| |
| ################### |
| # SET LOGIC TESTS # |
| ################### |
| def test_union_integer(): |
| data = testing_utils.setup("./data/sample_data.json", "integer") |
| sorted_data = data[0] |
| sorted_dict = data[1] |
| |
| # Create Tuples |
| start_end_vals = [] |
| start_end_vals.append((sorted_data[0][1]['clientTime'], sorted_data[18][1]['clientTime'])) |
| start_end_vals.append((sorted_data[5][1]['clientTime'], sorted_data[6][1]['clientTime'])) |
| start_end_vals.append((sorted_data[6][1]['clientTime'], sorted_data[7][1]['clientTime'])) |
| start_end_vals.append((sorted_data[3][1]['clientTime'], sorted_data[9][1]['clientTime'])) |
| |
| segment_names = ["test_segment_1", "test_segment_2", "test_segment_3", "test_segment_4"] |
| |
| result = distill.create_segment(sorted_dict, segment_names, start_end_vals).get_segment_name_dict() |
| |
| new_segment = distill.union("new_segment", result["test_segment_2"], result["test_segment_3"]) |
| |
| assert new_segment.segment_name == "new_segment" |
| assert new_segment.num_logs == 4 |
| assert new_segment.uids == [sorted_data[5][0], sorted_data[6][0], sorted_data[7][0], sorted_data[8][0]] |
| assert new_segment.start_end_val == (sorted_data[5][1]['clientTime'], sorted_data[7][1]['clientTime']) |
| assert new_segment.segment_type == distill.Segment_Type.UNION |
| assert new_segment.get_segment_type() == distill.Segment_Type.UNION |
| assert new_segment.generate_field_name is None |
| assert new_segment.get_generate_field_name() is None |
| assert new_segment.generate_matched_values is None |
| assert new_segment.get_generate_matched_values() is None |
| |
| def test_union_datetime(): |
| data = testing_utils.setup("./data/sample_data.json", "datetime") |
| sorted_data = data[0] |
| sorted_dict = data[1] |
| |
| # Create Tuples |
| start_end_vals = [] |
| start_end_vals.append((sorted_data[0][1]['clientTime'], sorted_data[18][1]['clientTime'])) |
| start_end_vals.append((sorted_data[5][1]['clientTime'], sorted_data[6][1]['clientTime'])) |
| start_end_vals.append((sorted_data[6][1]['clientTime'], sorted_data[7][1]['clientTime'])) |
| start_end_vals.append((sorted_data[3][1]['clientTime'], sorted_data[9][1]['clientTime'])) |
| |
| segment_names = ["test_segment_1", "test_segment_2", "test_segment_3", "test_segment_4"] |
| |
| result = distill.create_segment(sorted_dict, segment_names, start_end_vals).get_segment_name_dict() |
| |
| new_segment = distill.union("new_segment", result["test_segment_3"], result["test_segment_1"]) |
| |
| assert new_segment.segment_name == "new_segment" |
| assert new_segment.num_logs == 19 |
| assert new_segment.uids == [sorted_data[5][0], sorted_data[6][0], sorted_data[7][0], sorted_data[8][0], |
| sorted_data[0][0], sorted_data[1][0], sorted_data[2][0], sorted_data[3][0], |
| sorted_data[4][0], sorted_data[9][0], sorted_data[10][0], sorted_data[11][0], |
| sorted_data[12][0], sorted_data[13][0], sorted_data[14][0], sorted_data[15][0], |
| sorted_data[16][0], sorted_data[17][0], sorted_data[18][0]] |
| assert new_segment.start_end_val == (sorted_data[0][1]['clientTime'], sorted_data[18][1]['clientTime']) |
| assert new_segment.segment_type == distill.Segment_Type.UNION |
| assert new_segment.get_segment_type() == distill.Segment_Type.UNION |
| assert new_segment.generate_field_name is None |
| assert new_segment.get_generate_field_name() is None |
| assert new_segment.generate_matched_values is None |
| assert new_segment.get_generate_matched_values() is None |
| |
| def test_union_error(): |
| with pytest.raises(TypeError): |
| data_integer = testing_utils.setup("./data/sample_data.json", "integer") |
| sorted_data_integer = data_integer[0] |
| sorted_dict_integer = data_integer[1] |
| |
| data_datetime = testing_utils.setup("./data/sample_data.json", "datetime") |
| sorted_data_datetime = data_datetime[0] |
| sorted_dict_datetime = data_datetime[1] |
| |
| segment_name_integer = ["test_segment_integer"] |
| segment_name_datetime = ["test_segment_datetime"] |
| |
| start_end_integer = [] |
| start_end_integer.append((sorted_data_integer[0][1]['clientTime'], sorted_data_integer[18][1]['clientTime'])) |
| start_end_datetime = [] |
| start_end_datetime.append((sorted_data_datetime[3][1]['clientTime'], sorted_data_datetime[9][1]['clientTime'])) |
| |
| int_segment = distill.create_segment(sorted_dict_integer, segment_name_integer, |
| start_end_integer).get_segment_name_dict() |
| datetime_segment = distill.create_segment(sorted_dict_datetime, segment_name_datetime, |
| start_end_datetime).get_segment_name_dict() |
| |
| distill.union("new_segment", int_segment["test_segment_integer"], datetime_segment["test_segment_datetime"]) |
| |
| def test_intersection_integer(): |
| data = testing_utils.setup("./data/sample_data.json", "integer") |
| sorted_data = data[0] |
| sorted_dict = data[1] |
| |
| # Create Tuples |
| start_end_vals = [] |
| start_end_vals.append((sorted_data[0][1]['clientTime'], sorted_data[18][1]['clientTime'])) |
| start_end_vals.append((sorted_data[5][1]['clientTime'], sorted_data[6][1]['clientTime'])) |
| start_end_vals.append((sorted_data[6][1]['clientTime'], sorted_data[7][1]['clientTime'])) |
| start_end_vals.append((sorted_data[3][1]['clientTime'], sorted_data[9][1]['clientTime'])) |
| |
| segment_names = ["test_segment_1", "test_segment_2", "test_segment_3", "test_segment_4"] |
| |
| result = distill.create_segment(sorted_dict, segment_names, start_end_vals).get_segment_name_dict() |
| |
| new_segment = distill.intersection("new_segment", result["test_segment_2"], result["test_segment_3"]) |
| |
| assert new_segment.segment_name == "new_segment" |
| assert new_segment.num_logs == 2 |
| assert new_segment.uids == [sorted_data[5][0], sorted_data[6][0]] |
| assert new_segment.start_end_val == (sorted_data[5][1]['clientTime'], sorted_data[7][1]['clientTime']) |
| assert new_segment.segment_type == distill.Segment_Type.INTERSECTION |
| assert new_segment.get_segment_type() == distill.Segment_Type.INTERSECTION |
| assert new_segment.generate_field_name is None |
| assert new_segment.get_generate_field_name() is None |
| assert new_segment.generate_matched_values is None |
| assert new_segment.get_generate_matched_values() is None |
| |
| def test_intersection_datetime(): |
| data = testing_utils.setup("./data/sample_data.json", "datetime") |
| sorted_data = data[0] |
| sorted_dict = data[1] |
| |
| # Create Tuples |
| start_end_vals = [] |
| start_end_vals.append((sorted_data[0][1]['clientTime'], sorted_data[18][1]['clientTime'])) |
| start_end_vals.append((sorted_data[5][1]['clientTime'], sorted_data[6][1]['clientTime'])) |
| start_end_vals.append((sorted_data[6][1]['clientTime'], sorted_data[7][1]['clientTime'])) |
| start_end_vals.append((sorted_data[3][1]['clientTime'], sorted_data[9][1]['clientTime'])) |
| |
| segment_names = ["test_segment_1", "test_segment_2", "test_segment_3", "test_segment_4"] |
| |
| result = distill.create_segment(sorted_dict, segment_names, start_end_vals).get_segment_name_dict() |
| |
| new_segment = distill.intersection("new_segment", result["test_segment_3"], result["test_segment_1"]) |
| |
| assert new_segment.segment_name == "new_segment" |
| assert new_segment.num_logs == 4 |
| assert new_segment.uids == [sorted_data[5][0], sorted_data[6][0], sorted_data[7][0], sorted_data[8][0]] |
| assert new_segment.start_end_val == (sorted_data[0][1]['clientTime'], sorted_data[18][1]['clientTime']) |
| assert new_segment.segment_type == distill.Segment_Type.INTERSECTION |
| assert new_segment.get_segment_type() == distill.Segment_Type.INTERSECTION |
| assert new_segment.generate_field_name is None |
| assert new_segment.get_generate_field_name() is None |
| assert new_segment.generate_matched_values is None |
| assert new_segment.get_generate_matched_values() is None |
| |
| def test_intersection_error(): |
| with pytest.raises(TypeError): |
| data_integer = testing_utils.setup("./data/sample_data.json", "integer") |
| sorted_data_integer = data_integer[0] |
| sorted_dict_integer = data_integer[1] |
| |
| data_datetime = testing_utils.setup("./data/sample_data.json", "datetime") |
| sorted_data_datetime = data_datetime[0] |
| sorted_dict_datetime = data_datetime[1] |
| |
| segment_name_integer = ["test_segment_integer"] |
| segment_name_datetime = ["test_segment_datetime"] |
| |
| start_end_integer = [] |
| start_end_integer.append((sorted_data_integer[0][1]['clientTime'], sorted_data_integer[18][1]['clientTime'])) |
| start_end_datetime = [] |
| start_end_datetime.append((sorted_data_datetime[3][1]['clientTime'], sorted_data_datetime[9][1]['clientTime'])) |
| |
| int_segment = distill.create_segment(sorted_dict_integer, segment_name_integer, |
| start_end_integer).get_segment_name_dict() |
| datetime_segment = distill.create_segment(sorted_dict_datetime, segment_name_datetime, |
| start_end_datetime).get_segment_name_dict() |
| |
| distill.intersection("new_segment", int_segment["test_segment_integer"], |
| datetime_segment["test_segment_datetime"]) |
| |
| def test_difference_integer(): |
| data = testing_utils.setup("./data/sample_data.json", "integer") |
| sorted_data = data[0] |
| sorted_dict = data[1] |
| |
| # Create Tuples |
| start_end_vals = [] |
| start_end_vals.append((sorted_data[0][1]['clientTime'], sorted_data[18][1]['clientTime'])) |
| start_end_vals.append((sorted_data[5][1]['clientTime'], sorted_data[6][1]['clientTime'])) |
| start_end_vals.append((sorted_data[6][1]['clientTime'], sorted_data[7][1]['clientTime'])) |
| start_end_vals.append((sorted_data[3][1]['clientTime'], sorted_data[9][1]['clientTime'])) |
| |
| segment_names = ["test_segment_1", "test_segment_2", "test_segment_3", "test_segment_4"] |
| |
| result = distill.create_segment(sorted_dict, segment_names, start_end_vals).get_segment_name_dict() |
| |
| new_segment = distill.difference("new_segment", result["test_segment_1"], result["test_segment_4"]) |
| |
| assert new_segment.segment_name == "new_segment" |
| assert new_segment.num_logs == 11 |
| assert new_segment.uids == [sorted_data[0][0], sorted_data[1][0], sorted_data[2][0], sorted_data[11][0], |
| sorted_data[12][0], sorted_data[13][0], sorted_data[14][0], sorted_data[15][0], |
| sorted_data[16][0], sorted_data[17][0], sorted_data[18][0]] |
| assert new_segment.start_end_val == result["test_segment_1"].start_end_val |
| assert new_segment.segment_type == distill.Segment_Type.DIFFERENCE |
| assert new_segment.get_segment_type() == distill.Segment_Type.DIFFERENCE |
| assert new_segment.generate_field_name is None |
| assert new_segment.get_generate_field_name() is None |
| assert new_segment.generate_matched_values is None |
| assert new_segment.get_generate_matched_values() is None |
| |
| def test_difference_datetime(): |
| data = testing_utils.setup("./data/sample_data.json", "datetime") |
| sorted_data = data[0] |
| sorted_dict = data[1] |
| |
| # Create Tuples |
| start_end_vals = [] |
| start_end_vals.append((sorted_data[0][1]['clientTime'], sorted_data[18][1]['clientTime'])) |
| start_end_vals.append((sorted_data[5][1]['clientTime'], sorted_data[6][1]['clientTime'])) |
| start_end_vals.append((sorted_data[6][1]['clientTime'], sorted_data[7][1]['clientTime'])) |
| start_end_vals.append((sorted_data[3][1]['clientTime'], sorted_data[9][1]['clientTime'])) |
| |
| segment_names = ["test_segment_1", "test_segment_2", "test_segment_3", "test_segment_4"] |
| |
| result = distill.create_segment(sorted_dict, segment_names, start_end_vals).get_segment_name_dict() |
| |
| new_segment = distill.difference("new_segment", result["test_segment_1"], result["test_segment_4"]) |
| |
| assert new_segment.segment_name == "new_segment" |
| assert new_segment.num_logs == 11 |
| assert new_segment.uids == [sorted_data[0][0], sorted_data[1][0], sorted_data[2][0], sorted_data[11][0], |
| sorted_data[12][0], sorted_data[13][0], sorted_data[14][0], sorted_data[15][0], |
| sorted_data[16][0], sorted_data[17][0], sorted_data[18][0]] |
| assert new_segment.start_end_val == result["test_segment_1"].start_end_val |
| assert new_segment.segment_type == distill.Segment_Type.DIFFERENCE |
| assert new_segment.get_segment_type() == distill.Segment_Type.DIFFERENCE |
| assert new_segment.generate_field_name is None |
| assert new_segment.get_generate_field_name() is None |
| assert new_segment.generate_matched_values is None |
| assert new_segment.get_generate_matched_values() is None |
| |
| ############################ |
| # EXPORTING SEGMENTS TESTS # |
| ############################ |
| def test_export_segments(): |
| data = testing_utils.setup("./data/deadspace_detection_sample_data.json", "integer") |
| sorted_dict = data[1] |
| |
| result = distill.detect_deadspace(sorted_dict, 5, 1, 2) |
| distill.export_segments("./test.csv", result) |
| |
| # Read from file |
| file = open("./test.csv", "r") |
| lines = file.readlines() |
| |
| assert len(lines) == 4 |
| assert lines[0] == 'Segment Name,Start Time,End Time,Number of Logs,Generate Field Name,Generate Matched Values,' \ |
| 'Segment Type\n' |
| assert lines[1] == '0,1623691890459,1623691994888,7,,,' \ |
| 'Segment_Type.DEADSPACE\n' |
| assert lines[2] == '1,1623691991900,1623693994900,15,,,' \ |
| 'Segment_Type.DEADSPACE\n' |
| assert lines[3] == '2,1623693994550,1623697997550,3,,,' \ |
| 'Segment_Type.DEADSPACE\n' |
| |
| os.remove("./test.csv") |
| |