blob: d8d08286e38562a4a952d561578b1fd7dbb38dba [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.gravitino.storage.relational.mapper;
import java.util.List;
import org.apache.gravitino.storage.relational.po.TopicPO;
import org.apache.ibatis.annotations.Delete;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
import org.apache.ibatis.annotations.Update;
public interface TopicMetaMapper {
String TABLE_NAME = "topic_meta";
@Insert(
"INSERT INTO "
+ TABLE_NAME
+ "(topic_id, topic_name, metalake_id, catalog_id, schema_id,"
+ " comment, properties, audit_info, current_version, last_version,"
+ " deleted_at)"
+ " VALUES("
+ " #{topicMeta.topicId},"
+ " #{topicMeta.topicName},"
+ " #{topicMeta.metalakeId},"
+ " #{topicMeta.catalogId},"
+ " #{topicMeta.schemaId},"
+ " #{topicMeta.comment},"
+ " #{topicMeta.properties},"
+ " #{topicMeta.auditInfo},"
+ " #{topicMeta.currentVersion},"
+ " #{topicMeta.lastVersion},"
+ " #{topicMeta.deletedAt}"
+ " )")
void insertTopicMeta(@Param("topicMeta") TopicPO topicPO);
@Insert(
"INSERT INTO "
+ TABLE_NAME
+ "(topic_id, topic_name, metalake_id, catalog_id, schema_id,"
+ " comment, properties, audit_info, current_version, last_version,"
+ " deleted_at)"
+ " VALUES("
+ " #{topicMeta.topicId},"
+ " #{topicMeta.topicName},"
+ " #{topicMeta.metalakeId},"
+ " #{topicMeta.catalogId},"
+ " #{topicMeta.schemaId},"
+ " #{topicMeta.comment},"
+ " #{topicMeta.properties},"
+ " #{topicMeta.auditInfo},"
+ " #{topicMeta.currentVersion},"
+ " #{topicMeta.lastVersion},"
+ " #{topicMeta.deletedAt}"
+ " )"
+ " ON DUPLICATE KEY UPDATE"
+ " topic_name = #{topicMeta.topicName},"
+ " metalake_id = #{topicMeta.metalakeId},"
+ " catalog_id = #{topicMeta.catalogId},"
+ " schema_id = #{topicMeta.schemaId},"
+ " comment = #{topicMeta.comment},"
+ " properties = #{topicMeta.properties},"
+ " audit_info = #{topicMeta.auditInfo},"
+ " current_version = #{topicMeta.currentVersion},"
+ " last_version = #{topicMeta.lastVersion},"
+ " deleted_at = #{topicMeta.deletedAt}")
void insertTopicMetaOnDuplicateKeyUpdate(@Param("topicMeta") TopicPO topicPO);
@Select(
"SELECT topic_id as topicId, topic_name as topicName, metalake_id as metalakeId,"
+ " catalog_id as catalogId, schema_id as schemaId,"
+ " comment as comment, properties as properties, audit_info as auditInfo,"
+ " current_version as currentVersion, last_version as lastVersion,"
+ " deleted_at as deletedAt"
+ " FROM "
+ TABLE_NAME
+ " WHERE schema_id = #{schemaId} AND deleted_at = 0")
List<TopicPO> listTopicPOsBySchemaId(@Param("schemaId") Long schemaId);
@Select(
"SELECT topic_id as topicId, topic_name as topicName,"
+ " metalake_id as metalakeId, catalog_id as catalogId, schema_id as schemaId,"
+ " comment as comment, properties as properties, audit_info as auditInfo,"
+ " current_version as currentVersion, last_version as lastVersion,"
+ " deleted_at as deletedAt"
+ " FROM "
+ TABLE_NAME
+ " WHERE schema_id = #{schemaId} AND topic_name = #{topicName} AND deleted_at = 0")
TopicPO selectTopicMetaBySchemaIdAndName(
@Param("schemaId") Long schemaId, @Param("topicName") String topicName);
@Select(
"SELECT topic_id as topicId, topic_name as topicName,"
+ " metalake_id as metalakeId, catalog_id as catalogId, schema_id as schemaId,"
+ " comment as comment, properties as properties, audit_info as auditInfo,"
+ " current_version as currentVersion, last_version as lastVersion,"
+ " deleted_at as deletedAt"
+ " FROM "
+ TABLE_NAME
+ " WHERE topic_id = #{topicId} AND deleted_at = 0")
TopicPO selectTopicMetaById(@Param("topicId") Long topicId);
@Update(
"UPDATE "
+ TABLE_NAME
+ " SET topic_name = #{newTopicMeta.topicName},"
+ " metalake_id = #{newTopicMeta.metalakeId},"
+ " catalog_id = #{newTopicMeta.catalogId},"
+ " schema_id = #{newTopicMeta.schemaId},"
+ " comment = #{newTopicMeta.comment},"
+ " properties = #{newTopicMeta.properties},"
+ " audit_info = #{newTopicMeta.auditInfo},"
+ " current_version = #{newTopicMeta.currentVersion},"
+ " last_version = #{newTopicMeta.lastVersion},"
+ " deleted_at = #{newTopicMeta.deletedAt}"
+ " WHERE topic_id = #{oldTopicMeta.topicId}"
+ " AND topic_name = #{oldTopicMeta.topicName}"
+ " AND metalake_id = #{oldTopicMeta.metalakeId}"
+ " AND catalog_id = #{oldTopicMeta.catalogId}"
+ " AND schema_id = #{oldTopicMeta.schemaId}"
+ " AND comment = #{oldTopicMeta.comment}"
+ " AND properties = #{oldTopicMeta.properties}"
+ " AND audit_info = #{oldTopicMeta.auditInfo}"
+ " AND current_version = #{oldTopicMeta.currentVersion}"
+ " AND last_version = #{oldTopicMeta.lastVersion}"
+ " AND deleted_at = 0")
Integer updateTopicMeta(
@Param("newTopicMeta") TopicPO newTopicPO, @Param("oldTopicMeta") TopicPO oldTopicPO);
@Select(
"SELECT topic_id as topicId FROM "
+ TABLE_NAME
+ " WHERE schema_id = #{schemaId} AND topic_name = #{topicName}"
+ " AND deleted_at = 0")
Long selectTopicIdBySchemaIdAndName(
@Param("schemaId") Long schemaId, @Param("topicName") String name);
@Update(
"UPDATE "
+ TABLE_NAME
+ " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)"
+ " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000"
+ " WHERE topic_id = #{topicId} AND deleted_at = 0")
Integer softDeleteTopicMetasByTopicId(@Param("topicId") Long topicId);
@Update(
"UPDATE "
+ TABLE_NAME
+ " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)"
+ " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000"
+ " WHERE catalog_id = #{catalogId} AND deleted_at = 0")
Integer softDeleteTopicMetasByCatalogId(@Param("catalogId") Long catalogId);
@Update(
"UPDATE "
+ TABLE_NAME
+ " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)"
+ " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000"
+ " WHERE metalake_id = #{metalakeId} AND deleted_at = 0")
Integer softDeleteTopicMetasByMetalakeId(@Param("metalakeId") Long metalakeId);
@Update(
"UPDATE "
+ TABLE_NAME
+ " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)"
+ " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000"
+ " WHERE schema_id = #{schemaId} AND deleted_at = 0")
Integer softDeleteTopicMetasBySchemaId(@Param("schemaId") Long schemaId);
@Delete(
"DELETE FROM "
+ TABLE_NAME
+ " WHERE deleted_at != 0 AND deleted_at < #{legacyTimeline} LIMIT #{limit}")
Integer deleteTopicMetasByLegacyTimeline(
@Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit);
}