blob: 784d52f446f1367b4a671066d20eaea1fa927b0a [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# @TODO add header with description of file
import datetime
import os
import pandas as pd
# Segment Testing
import pytest
import distill
from tests import testing_utils
from tests.data_config import DATA_DIR
########################
# SEGMENT OBJECT TESTS #
########################
def test_segment_constructor():
segment = distill.Segment()
assert segment.get_segment_name() == ""
assert segment.get_num_logs() == 0
assert segment.get_start_end_val() is None
assert segment.get_segment_uids() == []
def test_segment_string():
segment = distill.Segment()
segment.segment_name = "segment_name"
segment.start_end_val = (1, 2)
segment.segment_type = distill.Segment_Type.CREATE
assert (
str(segment)
== "Segment: segment_name=segment_name, start=1, end=2, num_logs=0,"
" generate_field_name=None, generate_matched_values=None,"
" segment_type=Segment_Type.CREATE"
)
segment.test = "test_attribute"
assert (
str(segment)
== "Segment: segment_name=segment_name, start=1, end=2, num_logs=0,"
" generate_field_name=None, generate_matched_values=None,"
" segment_type=Segment_Type.CREATE, test=test_attribute"
)
def test_getters():
data = testing_utils.setup(os.path.join(DATA_DIR, "sample_data.json"), "integer")
sorted_data = data[0]
sorted_dict = data[1]
# Create Test Segment
start_end_vals = []
start_end_vals.append(
(sorted_data[0][1]["clientTime"], sorted_data[1][1]["clientTime"])
)
segment_names = ["test_segment_1"]
result = distill.create_segment(sorted_dict, segment_names, start_end_vals)
seg = result.get_segment_list()[0]
assert seg.get_segment_name() == "test_segment_1"
assert seg.get_start_end_val() == (
sorted_data[0][1]["clientTime"],
sorted_data[1][1]["clientTime"],
)
assert seg.get_num_logs() == 2
assert seg.get_segment_uids() == [sorted_data[0][0], sorted_data[1][0]]
assert seg.get_segment_type() == distill.Segment_Type.CREATE
assert seg.get_generate_field_name() is None
assert seg.get_generate_matched_values() is None
########################
# CREATE_SEGMENT TESTS #
########################
def test_create_segment_integer():
data = testing_utils.setup(os.path.join(DATA_DIR, "sample_data.json"), "integer")
sorted_data = data[0]
sorted_dict = data[1]
# Create Test Segment Tuples
start_end_vals = []
start_end_vals.append(
(sorted_data[0][1]["clientTime"], sorted_data[18][1]["clientTime"])
)
start_end_vals.append(
(sorted_data[5][1]["clientTime"], sorted_data[6][1]["clientTime"])
)
start_end_vals.append(
(sorted_data[3][1]["clientTime"], sorted_data[9][1]["clientTime"])
)
segment_names = [
"test_segment_all",
"test_segment_same_client_time",
"test_segment_extra_log",
]
# Call create_segment
create_result = distill.create_segment(sorted_dict, segment_names, start_end_vals)
result = create_result.get_segment_name_dict()
assert result["test_segment_all"].num_logs == 19
assert result["test_segment_all"].segment_name == "test_segment_all"
assert result["test_segment_all"].start_end_val == (1623691890656, 1623691909728)
assert result["test_segment_same_client_time"].num_logs == 2
assert (
result["test_segment_same_client_time"].segment_name
== "test_segment_same_client_time"
)
assert result["test_segment_same_client_time"].start_end_val == (
1623691904488,
1623691904488,
)
assert result["test_segment_same_client_time"].uids == [
"session_16236918905391623691904488rawclick",
"session_16236918905391623691904488customclick",
]
assert result["test_segment_extra_log"].num_logs == 8
assert result["test_segment_extra_log"].segment_name == "test_segment_extra_log"
assert result["test_segment_extra_log"].start_end_val == (
1623691904212,
1623691904923,
)
for segment_name in result:
assert result[segment_name].segment_type == distill.Segment_Type.CREATE
assert result[segment_name].get_segment_type() == distill.Segment_Type.CREATE
assert result[segment_name].generate_field_name is None
assert result[segment_name].get_generate_field_name() is None
assert result[segment_name].generate_matched_values is None
assert result[segment_name].get_generate_matched_values() is None
def test_create_segment_datetime():
data = testing_utils.setup(os.path.join(DATA_DIR, "sample_data.json"), "datetime")
sorted_data = data[0]
sorted_dict = data[1]
# Create Test Segment Tuples
start_end_vals = []
start_end_vals.append(
(sorted_data[0][1]["clientTime"], sorted_data[18][1]["clientTime"])
)
start_end_vals.append(
(sorted_data[5][1]["clientTime"], sorted_data[6][1]["clientTime"])
)
start_end_vals.append(
(sorted_data[3][1]["clientTime"], sorted_data[9][1]["clientTime"])
)
segment_names = [
"test_segment_all",
"test_segment_same_client_time",
"test_segment_extra_log",
]
# Call create_segment
result = distill.create_segment(
sorted_dict, segment_names, start_end_vals
).get_segment_name_dict()
assert result["test_segment_all"].num_logs == 19
assert result["test_segment_all"].segment_name == "test_segment_all"
assert result["test_segment_all"].start_end_val == (
testing_utils.to_datetime(1623691890656),
testing_utils.to_datetime(1623691909728),
)
assert result["test_segment_same_client_time"].num_logs == 2
assert (
result["test_segment_same_client_time"].segment_name
== "test_segment_same_client_time"
)
assert result["test_segment_same_client_time"].start_end_val == (
testing_utils.to_datetime(1623691904488),
testing_utils.to_datetime(1623691904488),
)
assert result["test_segment_same_client_time"].uids == [
"session_16236918905391623691904488rawclick",
"session_16236918905391623691904488customclick",
]
assert result["test_segment_extra_log"].num_logs == 8
assert result["test_segment_extra_log"].segment_name == "test_segment_extra_log"
assert result["test_segment_extra_log"].start_end_val == (
testing_utils.to_datetime(1623691904212),
testing_utils.to_datetime(1623691904923),
)
for segment_name in result:
assert result[segment_name].segment_type == distill.Segment_Type.CREATE
assert result[segment_name].get_segment_type() == distill.Segment_Type.CREATE
assert result[segment_name].generate_field_name is None
assert result[segment_name].get_generate_field_name() is None
assert result[segment_name].generate_matched_values is None
assert result[segment_name].get_generate_matched_values() is None
def test_create_segment_error_1():
with pytest.raises(TypeError):
data = testing_utils.setup(
os.path.join(DATA_DIR, "sample_data.json"), "integer"
)
sorted_data = data[0]
sorted_dict = data[1]
# Create Test Segment Tuples
start_end_vals = []
start_end_vals.append(
(
testing_utils.to_datetime(sorted_data[0][1]["clientTime"]),
testing_utils.to_datetime(sorted_data[18][1]["clientTime"]),
)
)
segment_names = ["test_segment_error"]
distill.create_segment(sorted_dict, segment_names, start_end_vals)
def test_create_segment_error_2():
with pytest.raises(TypeError):
data = testing_utils.setup(os.path.join(DATA_DIR, "sample_data.json"), "string")
sorted_dict = data[1]
# Create Test Segment Tuples
start_end_vals = []
start_end_vals.append(("random_string_1", "random_string_2"))
segment_names = ["test_segment_error"]
distill.create_segment(sorted_dict, segment_names, start_end_vals)
#######################
# WRITE_SEGMENT TESTS #
#######################
def test_write_segment_integer():
data = testing_utils.setup(os.path.join(DATA_DIR, "sample_data.json"), "integer")
sorted_data = data[0]
sorted_dict = data[1]
# Create Test Segment Tuples
start_end_vals = []
start_end_vals.append(
(sorted_data[0][1]["clientTime"], sorted_data[18][1]["clientTime"])
)
start_end_vals.append(
(sorted_data[5][1]["clientTime"], sorted_data[6][1]["clientTime"])
)
start_end_vals.append(
(sorted_data[3][1]["clientTime"], sorted_data[9][1]["clientTime"])
)
segment_names = [
"test_segment_all",
"test_segment_same_client_time",
"test_segment_extra_log",
]
# Call write_segment
result = distill.write_segment(sorted_dict, segment_names, start_end_vals)
# Assert dictionary lengths
assert len(result["test_segment_all"]) == 19
assert len(result["test_segment_same_client_time"]) == 2
assert len(result["test_segment_extra_log"]) == 8
# Assert clientTime types
for uid in result["test_segment_all"]:
assert isinstance(result["test_segment_all"][uid]["clientTime"], int)
for uid in result["test_segment_same_client_time"]:
assert isinstance(
result["test_segment_same_client_time"][uid]["clientTime"], int
)
for uid in result["test_segment_extra_log"]:
assert isinstance(result["test_segment_extra_log"][uid]["clientTime"], int)
def test_write_segment_datetime():
data = testing_utils.setup(os.path.join(DATA_DIR, "sample_data.json"), "datetime")
sorted_data = data[0]
sorted_dict = data[1]
# Create Test Segment Tuples
start_end_vals = []
start_end_vals.append(
(sorted_data[0][1]["clientTime"], sorted_data[18][1]["clientTime"])
)
start_end_vals.append(
(sorted_data[5][1]["clientTime"], sorted_data[6][1]["clientTime"])
)
start_end_vals.append(
(sorted_data[3][1]["clientTime"], sorted_data[9][1]["clientTime"])
)
segment_names = [
"test_segment_all",
"test_segment_same_client_time",
"test_segment_extra_log",
]
# Call write_segment
result = distill.write_segment(sorted_dict, segment_names, start_end_vals)
assert len(result["test_segment_all"]) == 19
assert len(result["test_segment_same_client_time"]) == 2
assert len(result["test_segment_extra_log"]) == 8
# Assert clientTime types
for uid in result["test_segment_all"]:
assert isinstance(
result["test_segment_all"][uid]["clientTime"], datetime.datetime
)
assert isinstance(result["test_segment_all"][uid]["clientTime"], pd.Timestamp)
for uid in result["test_segment_same_client_time"]:
assert isinstance(
result["test_segment_same_client_time"][uid]["clientTime"],
datetime.datetime,
)
assert isinstance(
result["test_segment_same_client_time"][uid]["clientTime"], pd.Timestamp
)
for uid in result["test_segment_extra_log"]:
assert isinstance(
result["test_segment_extra_log"][uid]["clientTime"], datetime.datetime
)
assert isinstance(
result["test_segment_extra_log"][uid]["clientTime"], pd.Timestamp
)
def test_write_segment_error_1():
with pytest.raises(TypeError):
data = testing_utils.setup(
os.path.join(DATA_DIR, "sample_data.json"), "integer"
)
sorted_data = data[0]
sorted_dict = data[1]
# Create Test Segment Tuples
start_end_vals = []
start_end_vals.append(
(
testing_utils.to_datetime(sorted_data[0][1]["clientTime"]),
testing_utils.to_datetime(sorted_data[18][1]["clientTime"]),
)
)
segment_names = ["test_segment_error"]
# Should there be some assertions?
distill.write_segment(sorted_dict, segment_names, start_end_vals)
def test_write_segment_error_2():
with pytest.raises(TypeError):
data = testing_utils.setup(os.path.join(DATA_DIR, "sample_data.json"), "string")
sorted_data = data[0]
sorted_dict = data[1]
# Create Test Segment Tuples
start_end_vals = []
start_end_vals.append(
(
testing_utils.to_datetime(sorted_data[0][1]["clientTime"]),
testing_utils.to_datetime(sorted_data[18][1]["clientTime"]),
)
)
segment_names = ["test_segment_error"]
# Should there be some assertions here?
distill.write_segment(sorted_dict, segment_names, start_end_vals)
###########################
# GENERATE_SEGMENTS TESTS #
###########################
def test_generate_segments_integer():
data = testing_utils.setup(
os.path.join(DATA_DIR, "segment_generator_sample_data.json"), "integer"
)
sorted_dict = data[1]
load_result = distill.generate_segments(
sorted_dict, "type", ["load"], 1, 1, label="load"
).get_segment_name_dict()
assert len(load_result) == 2
assert load_result["load0"].start_end_val == (1623691889600, 1623691891600)
assert load_result["load0"].num_logs == 3
assert load_result["load1"].start_end_val == (1623691906302, 1623691908302)
assert load_result["load1"].num_logs == 7
for segment_name in load_result:
assert load_result[segment_name].segment_type == distill.Segment_Type.GENERATE
assert load_result[segment_name].generate_field_name == "type"
assert load_result[segment_name].generate_matched_values == ["load"]
click_result = distill.generate_segments(
sorted_dict, "type", ["click"], 1, 1
).get_segment_name_dict()
assert len(click_result) == 4
assert click_result["0"].start_end_val == (1623691903200, 1623691905200)
assert click_result["0"].num_logs == 2
assert click_result["1"].start_end_val == (1623691905200, 1623691906488)
assert click_result["1"].num_logs == 7
assert click_result["2"].start_end_val == (1623691906488, 1623691907955)
assert click_result["2"].num_logs == 6
assert click_result["3"].start_end_val == (1623691907955, 1623691909100)
assert click_result["3"].num_logs == 1
for segment_name in click_result:
assert click_result[segment_name].segment_type == distill.Segment_Type.GENERATE
assert (
click_result[segment_name].get_segment_type()
== distill.Segment_Type.GENERATE
)
assert click_result[segment_name].generate_field_name == "type"
assert click_result[segment_name].get_generate_field_name() == "type"
assert click_result[segment_name].generate_matched_values == ["click"]
assert click_result[segment_name].get_generate_matched_values() == ["click"]
load_click_result = distill.generate_segments(
sorted_dict, "type", ["load", "click"], 1, 1
).get_segment_name_dict()
assert len(load_click_result) == 5
assert load_click_result["0"].start_end_val == (1623691889600, 1623691891600)
assert load_click_result["0"].num_logs == 3
assert load_click_result["1"].start_end_val == (1623691903200, 1623691905200)
assert load_click_result["1"].num_logs == 2
assert load_click_result["2"].start_end_val == (1623691905200, 1623691906488)
assert load_click_result["2"].num_logs == 7
assert load_click_result["3"].start_end_val == (1623691906488, 1623691907955)
assert load_click_result["3"].num_logs == 6
assert load_click_result["4"].start_end_val == (1623691907955, 1623691909100)
assert load_click_result["4"].num_logs == 1
for segment_name in load_click_result:
assert (
load_click_result[segment_name].segment_type
== distill.Segment_Type.GENERATE
)
assert (
load_click_result[segment_name].get_segment_type()
== distill.Segment_Type.GENERATE
)
assert load_click_result[segment_name].generate_field_name == "type"
assert load_click_result[segment_name].get_generate_field_name() == "type"
assert load_click_result[segment_name].generate_matched_values == [
"load",
"click",
]
assert load_click_result[segment_name].get_generate_matched_values() == [
"load",
"click",
]
def test_generate_segments_datetime():
data = testing_utils.setup(
os.path.join(DATA_DIR, "segment_generator_sample_data.json"), "datetime"
)
sorted_dict = data[1]
load_result = distill.generate_segments(
sorted_dict, "type", ["load"], 1, 1
).get_segment_name_dict()
assert len(load_result) == 2
assert load_result["0"].start_end_val == (
testing_utils.to_datetime(1623691889600),
testing_utils.to_datetime(1623691891600),
)
assert load_result["0"].num_logs == 3
assert load_result["1"].start_end_val == (
testing_utils.to_datetime(1623691906302),
testing_utils.to_datetime(1623691908302),
)
assert load_result["1"].num_logs == 7
for segment_name in load_result:
assert load_result[segment_name].segment_type == distill.Segment_Type.GENERATE
assert (
load_result[segment_name].get_segment_type()
== distill.Segment_Type.GENERATE
)
assert load_result[segment_name].generate_field_name == "type"
assert load_result[segment_name].get_generate_field_name() == "type"
assert load_result[segment_name].generate_matched_values == ["load"]
assert load_result[segment_name].get_generate_matched_values() == ["load"]
click_result = distill.generate_segments(
sorted_dict, "type", ["click"], 1, 1, "click"
).get_segment_name_dict()
assert len(click_result) == 4
assert click_result["click0"].start_end_val == (
testing_utils.to_datetime(1623691903200),
testing_utils.to_datetime(1623691905200),
)
assert click_result["click0"].num_logs == 2
assert click_result["click1"].start_end_val == (
testing_utils.to_datetime(1623691905200),
testing_utils.to_datetime(1623691906488),
)
assert click_result["click1"].num_logs == 7
assert click_result["click2"].start_end_val == (
testing_utils.to_datetime(1623691906488),
testing_utils.to_datetime(1623691907955),
)
assert click_result["click2"].num_logs == 6
assert click_result["click3"].start_end_val == (
testing_utils.to_datetime(1623691907955),
testing_utils.to_datetime(1623691909100),
)
assert click_result["click3"].num_logs == 1
for segment_name in click_result:
assert click_result[segment_name].segment_type == distill.Segment_Type.GENERATE
assert (
click_result[segment_name].get_segment_type()
== distill.Segment_Type.GENERATE
)
assert click_result[segment_name].generate_field_name == "type"
assert click_result[segment_name].get_generate_field_name() == "type"
assert click_result[segment_name].generate_matched_values == ["click"]
assert click_result[segment_name].get_generate_matched_values() == ["click"]
load_click_result = distill.generate_segments(
sorted_dict, "type", ["load", "click"], 1, 1
).get_segment_name_dict()
assert len(load_click_result) == 5
assert load_click_result["0"].start_end_val == (
testing_utils.to_datetime(1623691889600),
testing_utils.to_datetime(1623691891600),
)
assert load_click_result["0"].num_logs == 3
assert load_click_result["1"].start_end_val == (
testing_utils.to_datetime(1623691903200),
testing_utils.to_datetime(1623691905200),
)
assert load_click_result["1"].num_logs == 2
assert load_click_result["2"].start_end_val == (
testing_utils.to_datetime(1623691905200),
testing_utils.to_datetime(1623691906488),
)
assert load_click_result["2"].num_logs == 7
assert load_click_result["3"].start_end_val == (
testing_utils.to_datetime(1623691906488),
testing_utils.to_datetime(1623691907955),
)
assert load_click_result["3"].num_logs == 6
assert load_click_result["4"].start_end_val == (
testing_utils.to_datetime(1623691907955),
testing_utils.to_datetime(1623691909100),
)
assert load_click_result["4"].num_logs == 1
for segment_name in load_click_result:
assert (
load_click_result[segment_name].segment_type
== distill.Segment_Type.GENERATE
)
assert (
load_click_result[segment_name].get_segment_type()
== distill.Segment_Type.GENERATE
)
assert load_click_result[segment_name].generate_field_name == "type"
assert load_click_result[segment_name].get_generate_field_name() == "type"
assert load_click_result[segment_name].generate_matched_values == [
"load",
"click",
]
assert load_click_result[segment_name].get_generate_matched_values() == [
"load",
"click",
]
def test_generate_segments_none():
data = testing_utils.setup(
os.path.join(DATA_DIR, "segment_generator_sample_data.json"), "datetime"
)
sorted_dict = data[1]
result1 = distill.generate_segments(sorted_dict, "type", ["random"], 1, 1)
assert len(result1) == 0
result2 = distill.generate_segments(sorted_dict, "random", ["random"], 1, 1)
assert len(result2) == 0
def test_generate_segments_error():
with pytest.raises(TypeError):
data = testing_utils.setup(
os.path.join(DATA_DIR, "segment_generator_sample_data.json"), "string"
)
sorted_dict = data[1]
distill.generate_segments(sorted_dict, "type", ["load"], 1, 1)
#############################
# DETECT_DEADSPACE TESTS #
#############################
def test_deadspace_detection_integer():
data = testing_utils.setup(
os.path.join(DATA_DIR, "deadspace_detection_sample_data.json"), "integer"
)
sorted_dict = data[1]
result_no_label = distill.detect_deadspace(
sorted_dict, 5, 1, 2
).get_segment_name_dict()
assert len(result_no_label) == 3
assert result_no_label["0"].start_end_val == (1623691890459, 1623691994888)
assert result_no_label["0"].num_logs == 7
assert result_no_label["1"].start_end_val == (1623691991900, 1623693994900)
assert result_no_label["1"].num_logs == 15
assert result_no_label["2"].start_end_val == (1623693994550, 1623697997550)
assert result_no_label["2"].num_logs == 3
for segment_name in result_no_label:
assert (
result_no_label[segment_name].segment_type == distill.Segment_Type.DEADSPACE
)
assert (
result_no_label[segment_name].get_segment_type()
== distill.Segment_Type.DEADSPACE
)
assert result_no_label[segment_name].generate_field_name is None
assert result_no_label[segment_name].get_generate_field_name() is None
assert result_no_label[segment_name].generate_matched_values is None
assert result_no_label[segment_name].get_generate_matched_values() is None
result_with_label = distill.detect_deadspace(
sorted_dict, 5, 1, 2, "deadspace"
).get_segment_name_dict()
assert len(result_with_label) == 3
assert result_with_label["deadspace0"].start_end_val == (
1623691890459,
1623691994888,
)
assert result_with_label["deadspace0"].num_logs == 7
assert result_with_label["deadspace1"].start_end_val == (
1623691991900,
1623693994900,
)
assert result_with_label["deadspace1"].num_logs == 15
assert result_with_label["deadspace2"].start_end_val == (
1623693994550,
1623697997550,
)
assert result_with_label["deadspace2"].num_logs == 3
for segment_name in result_with_label:
assert (
result_with_label[segment_name].segment_type
== distill.Segment_Type.DEADSPACE
)
assert (
result_with_label[segment_name].get_segment_type()
== distill.Segment_Type.DEADSPACE
)
assert result_with_label[segment_name].generate_field_name is None
assert result_with_label[segment_name].get_generate_field_name() is None
assert result_with_label[segment_name].generate_matched_values is None
assert result_with_label[segment_name].get_generate_matched_values() is None
def test_deadspace_detection_datetime():
data = testing_utils.setup(
os.path.join(DATA_DIR, "deadspace_detection_sample_data.json"), "datetime"
)
sorted_dict = data[1]
result_no_label = distill.detect_deadspace(
sorted_dict, 5, 1, 2
).get_segment_name_dict()
assert len(result_no_label) == 3
assert result_no_label["0"].start_end_val == (
testing_utils.to_datetime(1623691890459),
testing_utils.to_datetime(1623691994888),
)
assert result_no_label["0"].num_logs == 7
assert result_no_label["1"].start_end_val == (
testing_utils.to_datetime(1623691991900),
testing_utils.to_datetime(1623693994900),
)
assert result_no_label["1"].num_logs == 15
assert result_no_label["2"].start_end_val == (
testing_utils.to_datetime(1623693994550),
testing_utils.to_datetime(1623697997550),
)
assert result_no_label["2"].num_logs == 3
for segment_name in result_no_label:
assert (
result_no_label[segment_name].segment_type == distill.Segment_Type.DEADSPACE
)
assert (
result_no_label[segment_name].get_segment_type()
== distill.Segment_Type.DEADSPACE
)
assert result_no_label[segment_name].generate_field_name is None
assert result_no_label[segment_name].get_generate_field_name() is None
assert result_no_label[segment_name].generate_matched_values is None
assert result_no_label[segment_name].get_generate_matched_values() is None
result_with_label = distill.detect_deadspace(
sorted_dict, 5, 1, 2, "deadspace"
).get_segment_name_dict()
assert len(result_with_label) == 3
assert result_with_label["deadspace0"].start_end_val == (
testing_utils.to_datetime(1623691890459),
testing_utils.to_datetime(1623691994888),
)
assert result_with_label["deadspace0"].num_logs == 7
assert result_with_label["deadspace1"].start_end_val == (
testing_utils.to_datetime(1623691991900),
testing_utils.to_datetime(1623693994900),
)
assert result_with_label["deadspace1"].num_logs == 15
assert result_with_label["deadspace2"].start_end_val == (
testing_utils.to_datetime(1623693994550),
testing_utils.to_datetime(1623697997550),
)
assert result_with_label["deadspace2"].num_logs == 3
for segment_name in result_with_label:
assert (
result_with_label[segment_name].segment_type
== distill.Segment_Type.DEADSPACE
)
assert (
result_with_label[segment_name].get_segment_type()
== distill.Segment_Type.DEADSPACE
)
assert result_with_label[segment_name].generate_field_name is None
assert result_with_label[segment_name].get_generate_field_name() is None
assert result_with_label[segment_name].generate_matched_values is None
assert result_with_label[segment_name].get_generate_matched_values() is None
def test_deadspace_detection_error1():
with pytest.raises(TypeError):
data = testing_utils.setup(
os.path.join(DATA_DIR, "deadspace_detection_sample_data.json"), "string"
)
sorted_dict = data[1]
distill.detect_deadspace(sorted_dict, 5, 1, 2)
def test_deadspace_detection_error2():
with pytest.raises(TypeError):
data = testing_utils.setup(
os.path.join(DATA_DIR, "deadspace_detection_sample_data.json"), "integer"
)
sorted_dict = data[1]
sorted_dict["session_16236918905391623691891459rawscroll"]["clientTime"] = (
testing_utils.to_datetime(
sorted_dict["session_16236918905391623691891459rawscroll"]["clientTime"]
)
)
distill.detect_deadspace(sorted_dict, 5, 1, 2)
def test_fixed_time_segments_integer():
data = testing_utils.setup(os.path.join(DATA_DIR, "sample_data.json"), "integer")
sorted_dict = data[1]
result_no_label = distill.generate_fixed_time_segments(
sorted_dict, 5
).get_segment_name_dict()
# Check that start and end times are 5 seconds apart
for segment_name in result_no_label:
start = result_no_label[segment_name].start_end_val[0]
end = result_no_label[segment_name].start_end_val[1]
diff = end - start
assert diff == 5000
assert len(result_no_label) == 4
assert result_no_label["0"].start_end_val == (1623691890656, 1623691895656)
assert result_no_label["0"].num_logs == 3
assert result_no_label["1"].start_end_val == (1623691895656, 1623691900656)
assert result_no_label["1"].num_logs == 0
assert result_no_label["2"].start_end_val == (1623691900656, 1623691905656)
assert result_no_label["2"].num_logs == 9
assert result_no_label["3"].start_end_val == (1623691905656, 1623691910656)
assert result_no_label["3"].num_logs == 7
result_label_trim = distill.generate_fixed_time_segments(
sorted_dict, 5, trim=True, label="trim"
).get_segment_name_dict()
# Check that start and end times are 5 seconds apart
for segment_name in result_label_trim:
start = result_label_trim[segment_name].start_end_val[0]
end = result_label_trim[segment_name].start_end_val[1]
diff = end - start
assert diff == 5000
assert len(result_label_trim) == 3
assert result_label_trim["trim0"].start_end_val == (1623691890656, 1623691895656)
assert result_label_trim["trim0"].num_logs == 3
assert result_label_trim["trim1"].start_end_val == (1623691895656, 1623691900656)
assert result_label_trim["trim1"].num_logs == 0
assert result_label_trim["trim2"].start_end_val == (1623691900656, 1623691905656)
assert result_label_trim["trim2"].num_logs == 9
def test_fixed_time_segments_datetime():
data = testing_utils.setup(os.path.join(DATA_DIR, "sample_data.json"), "datetime")
sorted_dict = data[1]
result_no_label = distill.generate_fixed_time_segments(
sorted_dict, 5
).get_segment_name_dict()
# Check that start and end times are 5 seconds apart
for segment_name in result_no_label:
start = result_no_label[segment_name].start_end_val[0]
end = result_no_label[segment_name].start_end_val[1]
diff = end - start
assert diff == datetime.timedelta(seconds=5)
assert len(result_no_label) == 4
assert result_no_label["0"].start_end_val == (
testing_utils.to_datetime(1623691890656),
testing_utils.to_datetime(1623691895656),
)
assert result_no_label["0"].num_logs == 3
assert result_no_label["1"].start_end_val == (
testing_utils.to_datetime(1623691895656),
testing_utils.to_datetime(1623691900656),
)
assert result_no_label["1"].num_logs == 0
assert result_no_label["2"].start_end_val == (
testing_utils.to_datetime(1623691900656),
testing_utils.to_datetime(1623691905656),
)
assert result_no_label["2"].num_logs == 9
assert result_no_label["3"].start_end_val == (
testing_utils.to_datetime(1623691905656),
testing_utils.to_datetime(1623691910656),
)
assert result_no_label["3"].num_logs == 7
result_label_trim = distill.generate_fixed_time_segments(
sorted_dict, 5, trim=True, label="trim"
).get_segment_name_dict()
# Check that start and end times are 5 seconds apart
for segment_name in result_label_trim:
start = result_label_trim[segment_name].start_end_val[0]
end = result_label_trim[segment_name].start_end_val[1]
diff = end - start
assert diff == datetime.timedelta(seconds=5)
assert len(result_label_trim) == 3
assert result_label_trim["trim0"].start_end_val == (
testing_utils.to_datetime(1623691890656),
testing_utils.to_datetime(1623691895656),
)
assert result_label_trim["trim0"].num_logs == 3
assert result_label_trim["trim1"].start_end_val == (
testing_utils.to_datetime(1623691895656),
testing_utils.to_datetime(1623691900656),
)
assert result_label_trim["trim1"].num_logs == 0
assert result_label_trim["trim2"].start_end_val == (
testing_utils.to_datetime(1623691900656),
testing_utils.to_datetime(1623691905656),
)
assert result_label_trim["trim2"].num_logs == 9
def test_fixed_time_segments_error():
with pytest.raises(TypeError):
data = testing_utils.setup(
os.path.join(DATA_DIR, "deadspace_detection_sample_data.json"), "string"
)
sorted_dict = data[1]
distill.generate_fixed_time_segments(sorted_dict, 10)
def test_generate_collapsing_windows_integer():
data = testing_utils.setup(os.path.join(DATA_DIR, "sample_data.json"), "integer")
sorted_dict = data[1]
result_no_label = distill.generate_collapsing_window_segments(
sorted_dict, "path", ["button#test_button"]
)
segment = result_no_label[0]
assert len(result_no_label) == 1
assert segment.num_logs == 8
assert segment.segment_name == "0"
assert segment.start_end_val == (1623691904212, 1623691904923)
def test_generate_collapsing_windows_datetime():
data = testing_utils.setup(os.path.join(DATA_DIR, "sample_data.json"), "datetime")
sorted_dict = data[1]
result_no_label = distill.generate_collapsing_window_segments(
sorted_dict, "path", ["Window"]
)
segment1 = result_no_label[0]
segment2 = result_no_label[1]
assert segment1.num_logs == 16
assert segment2.num_logs == 1
assert segment1.segment_name == "0"
assert segment2.segment_name == "1"
assert segment1.start_end_val == (
testing_utils.to_datetime(1623691891459),
testing_utils.to_datetime(1623691907136),
)
assert segment2.start_end_val == (
testing_utils.to_datetime(1623691909728),
testing_utils.to_datetime(1623691909728),
)
assert len(result_no_label) == 2
def test_generate_collapsing_windows_datetime_all_logs():
data = testing_utils.setup(os.path.join(DATA_DIR, "sample_data.json"), "datetime")
sorted_dict = data[1]
result_no_label = distill.generate_collapsing_window_segments(
sorted_dict, "sessionID", ["session_1623691890539"]
)
segment1 = result_no_label[0]
assert len(result_no_label) == 1
assert segment1.num_logs == 19
assert segment1.segment_name == "0"
assert segment1.start_end_val == (
testing_utils.to_datetime(1623691890656),
testing_utils.to_datetime(1623691909728),
)
###################
# SET LOGIC TESTS #
###################
def test_union_integer():
data = testing_utils.setup(os.path.join(DATA_DIR, "sample_data.json"), "integer")
sorted_data = data[0]
sorted_dict = data[1]
# Create Tuples
start_end_vals = []
start_end_vals.append(
(sorted_data[0][1]["clientTime"], sorted_data[18][1]["clientTime"])
)
start_end_vals.append(
(sorted_data[5][1]["clientTime"], sorted_data[6][1]["clientTime"])
)
start_end_vals.append(
(sorted_data[6][1]["clientTime"], sorted_data[7][1]["clientTime"])
)
start_end_vals.append(
(sorted_data[3][1]["clientTime"], sorted_data[9][1]["clientTime"])
)
segment_names = [
"test_segment_1",
"test_segment_2",
"test_segment_3",
"test_segment_4",
]
result = distill.create_segment(
sorted_dict, segment_names, start_end_vals
).get_segment_name_dict()
new_segment = distill.union(
"new_segment", result["test_segment_2"], result["test_segment_3"]
)
assert new_segment.segment_name == "new_segment"
assert new_segment.num_logs == 4
assert new_segment.uids == [
sorted_data[5][0],
sorted_data[6][0],
sorted_data[7][0],
sorted_data[8][0],
]
assert new_segment.start_end_val == (
sorted_data[5][1]["clientTime"],
sorted_data[7][1]["clientTime"],
)
assert new_segment.segment_type == distill.Segment_Type.UNION
assert new_segment.get_segment_type() == distill.Segment_Type.UNION
assert new_segment.generate_field_name is None
assert new_segment.get_generate_field_name() is None
assert new_segment.generate_matched_values is None
assert new_segment.get_generate_matched_values() is None
def test_union_datetime():
data = testing_utils.setup(os.path.join(DATA_DIR, "sample_data.json"), "datetime")
sorted_data = data[0]
sorted_dict = data[1]
# Create Tuples
start_end_vals = []
start_end_vals.append(
(sorted_data[0][1]["clientTime"], sorted_data[18][1]["clientTime"])
)
start_end_vals.append(
(sorted_data[5][1]["clientTime"], sorted_data[6][1]["clientTime"])
)
start_end_vals.append(
(sorted_data[6][1]["clientTime"], sorted_data[7][1]["clientTime"])
)
start_end_vals.append(
(sorted_data[3][1]["clientTime"], sorted_data[9][1]["clientTime"])
)
segment_names = [
"test_segment_1",
"test_segment_2",
"test_segment_3",
"test_segment_4",
]
result = distill.create_segment(
sorted_dict, segment_names, start_end_vals
).get_segment_name_dict()
new_segment = distill.union(
"new_segment", result["test_segment_3"], result["test_segment_1"]
)
assert new_segment.segment_name == "new_segment"
assert new_segment.num_logs == 19
assert new_segment.uids == [
sorted_data[5][0],
sorted_data[6][0],
sorted_data[7][0],
sorted_data[8][0],
sorted_data[0][0],
sorted_data[1][0],
sorted_data[2][0],
sorted_data[3][0],
sorted_data[4][0],
sorted_data[9][0],
sorted_data[10][0],
sorted_data[11][0],
sorted_data[12][0],
sorted_data[13][0],
sorted_data[14][0],
sorted_data[15][0],
sorted_data[16][0],
sorted_data[17][0],
sorted_data[18][0],
]
assert new_segment.start_end_val == (
sorted_data[0][1]["clientTime"],
sorted_data[18][1]["clientTime"],
)
assert new_segment.segment_type == distill.Segment_Type.UNION
assert new_segment.get_segment_type() == distill.Segment_Type.UNION
assert new_segment.generate_field_name is None
assert new_segment.get_generate_field_name() is None
assert new_segment.generate_matched_values is None
assert new_segment.get_generate_matched_values() is None
def test_union_error():
with pytest.raises(TypeError):
data_integer = testing_utils.setup(
os.path.join(DATA_DIR, "sample_data.json"), "integer"
)
sorted_data_integer = data_integer[0]
sorted_dict_integer = data_integer[1]
data_datetime = testing_utils.setup(
os.path.join(DATA_DIR, "sample_data.json"), "datetime"
)
sorted_data_datetime = data_datetime[0]
sorted_dict_datetime = data_datetime[1]
segment_name_integer = ["test_segment_integer"]
segment_name_datetime = ["test_segment_datetime"]
start_end_integer = []
start_end_integer.append(
(
sorted_data_integer[0][1]["clientTime"],
sorted_data_integer[18][1]["clientTime"],
)
)
start_end_datetime = []
start_end_datetime.append(
(
sorted_data_datetime[3][1]["clientTime"],
sorted_data_datetime[9][1]["clientTime"],
)
)
int_segment = distill.create_segment(
sorted_dict_integer, segment_name_integer, start_end_integer
).get_segment_name_dict()
datetime_segment = distill.create_segment(
sorted_dict_datetime, segment_name_datetime, start_end_datetime
).get_segment_name_dict()
distill.union(
"new_segment",
int_segment["test_segment_integer"],
datetime_segment["test_segment_datetime"],
)
def test_intersection_integer():
data = testing_utils.setup(os.path.join(DATA_DIR, "sample_data.json"), "integer")
sorted_data = data[0]
sorted_dict = data[1]
# Create Tuples
start_end_vals = []
start_end_vals.append(
(sorted_data[0][1]["clientTime"], sorted_data[18][1]["clientTime"])
)
start_end_vals.append(
(sorted_data[5][1]["clientTime"], sorted_data[6][1]["clientTime"])
)
start_end_vals.append(
(sorted_data[6][1]["clientTime"], sorted_data[7][1]["clientTime"])
)
start_end_vals.append(
(sorted_data[3][1]["clientTime"], sorted_data[9][1]["clientTime"])
)
segment_names = [
"test_segment_1",
"test_segment_2",
"test_segment_3",
"test_segment_4",
]
result = distill.create_segment(
sorted_dict, segment_names, start_end_vals
).get_segment_name_dict()
new_segment = distill.intersection(
"new_segment", result["test_segment_2"], result["test_segment_3"]
)
assert new_segment.segment_name == "new_segment"
assert new_segment.num_logs == 2
assert new_segment.uids == [sorted_data[5][0], sorted_data[6][0]]
assert new_segment.start_end_val == (
sorted_data[5][1]["clientTime"],
sorted_data[7][1]["clientTime"],
)
assert new_segment.segment_type == distill.Segment_Type.INTERSECTION
assert new_segment.get_segment_type() == distill.Segment_Type.INTERSECTION
assert new_segment.generate_field_name is None
assert new_segment.get_generate_field_name() is None
assert new_segment.generate_matched_values is None
assert new_segment.get_generate_matched_values() is None
def test_intersection_datetime():
data = testing_utils.setup(os.path.join(DATA_DIR, "sample_data.json"), "datetime")
sorted_data = data[0]
sorted_dict = data[1]
# Create Tuples
start_end_vals = []
start_end_vals.append(
(sorted_data[0][1]["clientTime"], sorted_data[18][1]["clientTime"])
)
start_end_vals.append(
(sorted_data[5][1]["clientTime"], sorted_data[6][1]["clientTime"])
)
start_end_vals.append(
(sorted_data[6][1]["clientTime"], sorted_data[7][1]["clientTime"])
)
start_end_vals.append(
(sorted_data[3][1]["clientTime"], sorted_data[9][1]["clientTime"])
)
segment_names = [
"test_segment_1",
"test_segment_2",
"test_segment_3",
"test_segment_4",
]
result = distill.create_segment(
sorted_dict, segment_names, start_end_vals
).get_segment_name_dict()
new_segment = distill.intersection(
"new_segment", result["test_segment_3"], result["test_segment_1"]
)
assert new_segment.segment_name == "new_segment"
assert new_segment.num_logs == 4
assert new_segment.uids == [
sorted_data[5][0],
sorted_data[6][0],
sorted_data[7][0],
sorted_data[8][0],
]
assert new_segment.start_end_val == (
sorted_data[0][1]["clientTime"],
sorted_data[18][1]["clientTime"],
)
assert new_segment.segment_type == distill.Segment_Type.INTERSECTION
assert new_segment.get_segment_type() == distill.Segment_Type.INTERSECTION
assert new_segment.generate_field_name is None
assert new_segment.get_generate_field_name() is None
assert new_segment.generate_matched_values is None
assert new_segment.get_generate_matched_values() is None
def test_intersection_error():
with pytest.raises(TypeError):
data_integer = testing_utils.setup(
os.path.join(DATA_DIR, "sample_data.json"), "integer"
)
sorted_data_integer = data_integer[0]
sorted_dict_integer = data_integer[1]
data_datetime = testing_utils.setup(
os.path.join(DATA_DIR, "sample_data.json"), "datetime"
)
sorted_data_datetime = data_datetime[0]
sorted_dict_datetime = data_datetime[1]
segment_name_integer = ["test_segment_integer"]
segment_name_datetime = ["test_segment_datetime"]
start_end_integer = []
start_end_integer.append(
(
sorted_data_integer[0][1]["clientTime"],
sorted_data_integer[18][1]["clientTime"],
)
)
start_end_datetime = []
start_end_datetime.append(
(
sorted_data_datetime[3][1]["clientTime"],
sorted_data_datetime[9][1]["clientTime"],
)
)
int_segment = distill.create_segment(
sorted_dict_integer, segment_name_integer, start_end_integer
).get_segment_name_dict()
datetime_segment = distill.create_segment(
sorted_dict_datetime, segment_name_datetime, start_end_datetime
).get_segment_name_dict()
distill.intersection(
"new_segment",
int_segment["test_segment_integer"],
datetime_segment["test_segment_datetime"],
)
def test_difference_integer():
data = testing_utils.setup(os.path.join(DATA_DIR, "sample_data.json"), "integer")
sorted_data = data[0]
sorted_dict = data[1]
# Create Tuples
start_end_vals = []
start_end_vals.append(
(sorted_data[0][1]["clientTime"], sorted_data[18][1]["clientTime"])
)
start_end_vals.append(
(sorted_data[5][1]["clientTime"], sorted_data[6][1]["clientTime"])
)
start_end_vals.append(
(sorted_data[6][1]["clientTime"], sorted_data[7][1]["clientTime"])
)
start_end_vals.append(
(sorted_data[3][1]["clientTime"], sorted_data[9][1]["clientTime"])
)
segment_names = [
"test_segment_1",
"test_segment_2",
"test_segment_3",
"test_segment_4",
]
result = distill.create_segment(
sorted_dict, segment_names, start_end_vals
).get_segment_name_dict()
new_segment = distill.difference(
"new_segment", result["test_segment_1"], result["test_segment_4"]
)
assert new_segment.segment_name == "new_segment"
assert new_segment.num_logs == 11
assert new_segment.uids == [
sorted_data[0][0],
sorted_data[1][0],
sorted_data[2][0],
sorted_data[11][0],
sorted_data[12][0],
sorted_data[13][0],
sorted_data[14][0],
sorted_data[15][0],
sorted_data[16][0],
sorted_data[17][0],
sorted_data[18][0],
]
assert new_segment.start_end_val == result["test_segment_1"].start_end_val
assert new_segment.segment_type == distill.Segment_Type.DIFFERENCE
assert new_segment.get_segment_type() == distill.Segment_Type.DIFFERENCE
assert new_segment.generate_field_name is None
assert new_segment.get_generate_field_name() is None
assert new_segment.generate_matched_values is None
assert new_segment.get_generate_matched_values() is None
def test_difference_datetime():
data = testing_utils.setup(os.path.join(DATA_DIR, "sample_data.json"), "datetime")
sorted_data = data[0]
sorted_dict = data[1]
# Create Tuples
start_end_vals = []
start_end_vals.append(
(sorted_data[0][1]["clientTime"], sorted_data[18][1]["clientTime"])
)
start_end_vals.append(
(sorted_data[5][1]["clientTime"], sorted_data[6][1]["clientTime"])
)
start_end_vals.append(
(sorted_data[6][1]["clientTime"], sorted_data[7][1]["clientTime"])
)
start_end_vals.append(
(sorted_data[3][1]["clientTime"], sorted_data[9][1]["clientTime"])
)
segment_names = [
"test_segment_1",
"test_segment_2",
"test_segment_3",
"test_segment_4",
]
result = distill.create_segment(
sorted_dict, segment_names, start_end_vals
).get_segment_name_dict()
new_segment = distill.difference(
"new_segment", result["test_segment_1"], result["test_segment_4"]
)
assert new_segment.segment_name == "new_segment"
assert new_segment.num_logs == 11
assert new_segment.uids == [
sorted_data[0][0],
sorted_data[1][0],
sorted_data[2][0],
sorted_data[11][0],
sorted_data[12][0],
sorted_data[13][0],
sorted_data[14][0],
sorted_data[15][0],
sorted_data[16][0],
sorted_data[17][0],
sorted_data[18][0],
]
assert new_segment.start_end_val == result["test_segment_1"].start_end_val
assert new_segment.segment_type == distill.Segment_Type.DIFFERENCE
assert new_segment.get_segment_type() == distill.Segment_Type.DIFFERENCE
assert new_segment.generate_field_name is None
assert new_segment.get_generate_field_name() is None
assert new_segment.generate_matched_values is None
assert new_segment.get_generate_matched_values() is None
############################
# EXPORTING SEGMENTS TESTS #
############################
def test_export_segments():
data = testing_utils.setup(
os.path.join(DATA_DIR, "deadspace_detection_sample_data.json"), "integer"
)
sorted_dict = data[1]
result = distill.detect_deadspace(sorted_dict, 5, 1, 2)
distill.export_segments("./test.csv", result)
# Read from file
with open("./test.csv", "r") as file:
lines = file.readlines()
assert len(lines) == 4
assert (
lines[0] == "Segment Name,Start Time,End Time,Number of Logs,Generate Field"
" Name,Generate Matched Values,Segment Type\n"
)
assert lines[1] == "0,1623691890459,1623691994888,7,,,Segment_Type.DEADSPACE\n"
assert lines[2] == "1,1623691991900,1623693994900,15,,,Segment_Type.DEADSPACE\n"
assert lines[3] == "2,1623693994550,1623697997550,3,,,Segment_Type.DEADSPACE\n"
os.remove("./test.csv")