blob: 28b4eca4b6c164b6a1fee45a7a0abb3eb19e0e25 [file] [log] [blame]
-- SPDX-FileCopyrightText: 2023 LakeSoul Contributors
--
-- SPDX-License-Identifier: Apache-2.0
create table if not exists namespace
(
namespace text,
properties json,
comment text,
domain text default 'public',
primary key (namespace)
);
insert into namespace(namespace, properties, comment) values ('default', '{}', '')
ON CONFLICT DO NOTHING;
create table if not exists table_info
(
table_id text,
table_namespace text default 'default',
table_name text,
table_path text,
table_schema text,
properties json,
partitions text,
domain text default 'public',
primary key (table_id)
);
create table if not exists table_name_id
(
table_name text,
table_id text,
table_namespace text default 'default',
domain text default 'public',
primary key (table_name, table_namespace)
);
create table if not exists table_path_id
(
table_path text,
table_id text,
table_namespace text default 'default',
domain text default 'public',
primary key (table_path)
);
DO
$$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'data_file_op') THEN
create type data_file_op as
(
path text,
file_op text,
size bigint,
file_exist_cols text
);
END IF;
END
$$;
create table if not exists data_commit_info
(
table_id text,
partition_desc text,
commit_id UUID,
file_ops data_file_op[],
commit_op text,
committed boolean default 'false',
timestamp bigint,
domain text default 'public',
primary key (table_id, partition_desc, commit_id)
);
create table if not exists partition_info
(
table_id text,
partition_desc text,
version int,
commit_op text,
timestamp bigint DEFAULT (date_part('epoch'::text, now()) * (1000)::double precision),
snapshot UUID[],
expression text,
domain text default 'public',
primary key (table_id, partition_desc, version)
);
CREATE OR REPLACE FUNCTION partition_insert() RETURNS TRIGGER AS
$$
DECLARE
rs_version integer;
rs_table_path text;
rs_table_namespace text;
BEGIN
if NEW.commit_op <> 'CompactionCommit' then
select version
INTO rs_version
from partition_info
where table_id = NEW.table_id
and partition_desc = NEW.partition_desc
and version != NEW.version
and commit_op = 'CompactionCommit'
order by version desc
limit 1;
if rs_version >= 0 then
if NEW.version - rs_version >= 3 then
select table_path, table_namespace
into rs_table_path, rs_table_namespace
from table_info
where table_id = NEW.table_id;
perform pg_notify('lakesoul_compaction_notify',
concat('{"table_path":"', rs_table_path, '","table_partition_desc":"',
NEW.partition_desc, '","table_namespace":"', rs_table_namespace, '"}'));
end if;
else
if NEW.version >= 2 then
select table_path, table_namespace
into rs_table_path, rs_table_namespace
from table_info
where table_id = NEW.table_id;
perform pg_notify('lakesoul_compaction_notify',
concat('{"table_path":"', rs_table_path, '","table_partition_desc":"',
NEW.partition_desc, '","table_namespace":"', rs_table_namespace, '"}'));
end if;
end if;
RETURN NULL;
end if;
RETURN NULL;
END;
$$ LANGUAGE plpgsql;
CREATE OR REPLACE TRIGGER partition_table_change
AFTER INSERT
ON partition_info
FOR EACH ROW
EXECUTE PROCEDURE partition_insert();
create table if not exists global_config
(
key text,
value text,
primary key (key)
);