/****************************************************************
 * Licensed to the Apache Software Foundation (ASF) under one   *
 * or more contributor license agreements.  See the NOTICE file *
 * distributed with this work for additional information        *
 * regarding copyright ownership.  The ASF licenses this file   *
 * to you under the Apache License, Version 2.0 (the            *
 * "License"); you may not use this file except in compliance   *
 * with the License.  You may obtain a copy of the License at   *
 *                                                              *
 *   http://www.apache.org/licenses/LICENSE-2.0                 *
 *                                                              *
 * Unless required by applicable law or agreed to in writing,   *
 * software distributed under the License is distributed on an  *
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
 * KIND, either express or implied.  See the License for the    *
 * specific language governing permissions and limitations      *
 * under the License.                                           *
 ****************************************************************/

package org.apache.james.jmap.cassandra.upload;

import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.bindMarker;
import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.deleteFrom;
import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.insertInto;
import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.selectFrom;
import static org.apache.james.jmap.cassandra.upload.UploadModule.BLOB_ID;
import static org.apache.james.jmap.cassandra.upload.UploadModule.CONTENT_TYPE;
import static org.apache.james.jmap.cassandra.upload.UploadModule.ID;
import static org.apache.james.jmap.cassandra.upload.UploadModule.SIZE;
import static org.apache.james.jmap.cassandra.upload.UploadModule.TABLE_NAME;
import static org.apache.james.jmap.cassandra.upload.UploadModule.UPLOAD_DATE;
import static org.apache.james.jmap.cassandra.upload.UploadModule.USER;

import java.time.Instant;
import java.util.Optional;
import java.util.function.Function;

import jakarta.inject.Inject;

import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
import org.apache.james.blob.api.BlobId;
import org.apache.james.core.Username;
import org.apache.james.jmap.api.model.UploadId;
import org.apache.james.jmap.api.model.UploadMetaData;
import org.apache.james.mailbox.model.ContentType;

import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.Row;
import com.google.common.base.MoreObjects;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class UploadDAO {
    public static class UploadRepresentation {
        private final UploadId id;
        private final BlobId blobId;
        private final ContentType contentType;
        private final long size;
        private final Username user;
        private final Instant uploadDate;

        public UploadRepresentation(UploadId id, BlobId blobId, ContentType contentType, long size, Username user, Instant uploadDate) {
            this.user = user;
            Preconditions.checkArgument(size >= 0, "Size must be strictly positive");
            this.id = id;
            this.blobId = blobId;
            this.contentType = contentType;
            this.size = size;
            this.uploadDate = uploadDate;
        }

        public UploadId getId() {
            return id;
        }

        public BlobId getBlobId() {
            return blobId;
        }

        public ContentType getContentType() {
            return contentType;
        }

        public long getSize() {
            return size;
        }

        public Username getUser() {
            return user;
        }

        public Instant getUploadDate() {
            return uploadDate;
        }

        public UploadMetaData toUploadMetaData() {
            return UploadMetaData.from(id, contentType, size, blobId, uploadDate);
        }

        @Override
        public boolean equals(Object obj) {
            if (obj instanceof UploadRepresentation) {
                UploadRepresentation other = (UploadRepresentation) obj;
                return Objects.equal(id, other.id)
                    && Objects.equal(user, other.user)
                    && Objects.equal(blobId, other.blobId)
                    && Objects.equal(contentType, other.contentType)
                    && Objects.equal(uploadDate, other.uploadDate)
                    && Objects.equal(size, other.size);
            }
            return false;
        }

        @Override
        public int hashCode() {
            return Objects.hashCode(id, blobId, contentType, size, user, uploadDate);
        }

        @Override
        public String toString() {
            return MoreObjects
                .toStringHelper(this)
                .add("id", id)
                .add("blobId", blobId)
                .add("contentType", contentType)
                .add("user", user)
                .add("user", user)
                .add("uploadDate", uploadDate)
                .toString();
        }
    }

    public static final Instant UPLOAD_DATE_FALLBACK = Instant.EPOCH; // 1970-01-01T00:00:00Z
    private final CassandraAsyncExecutor executor;
    private final BlobId.Factory blobIdFactory;
    private final PreparedStatement insert;
    private final PreparedStatement selectOne;
    private final PreparedStatement delete;
    private final PreparedStatement list;

    private final PreparedStatement all;

    @Inject
    public UploadDAO(CqlSession session, BlobId.Factory blobIdFactory) {
        this.executor = new CassandraAsyncExecutor(session);
        this.blobIdFactory = blobIdFactory;
        this.insert = session.prepare(insertInto(TABLE_NAME)
            .value(ID, bindMarker(ID))
            .value(BLOB_ID, bindMarker(BLOB_ID))
            .value(SIZE, bindMarker(SIZE))
            .value(USER, bindMarker(USER))
            .value(CONTENT_TYPE, bindMarker(CONTENT_TYPE))
            .value(UPLOAD_DATE, bindMarker(UPLOAD_DATE))
            .build());

        this.list = session.prepare(selectFrom(TABLE_NAME)
            .all()
            .whereColumn(USER).isEqualTo(bindMarker(USER))
            .build());

        this.selectOne = session.prepare(selectFrom(TABLE_NAME)
            .all()
            .whereColumn(USER).isEqualTo(bindMarker(USER))
            .whereColumn(ID).isEqualTo(bindMarker(ID))
            .build());

        this.delete = session.prepare(deleteFrom(TABLE_NAME)
            .whereColumn(USER).isEqualTo(bindMarker(USER))
            .whereColumn(ID).isEqualTo(bindMarker(ID))
            .build());

        this.all = session.prepare(selectFrom(TABLE_NAME)
            .all()
            .allowFiltering()
            .build());
    }

    public Mono<Void> save(UploadRepresentation uploadRepresentation) {
        return executor.executeVoid(insert.bind()
            .setString(USER, uploadRepresentation.getUser().asString())
            .setUuid(ID, uploadRepresentation.getId().getId())
            .setString(BLOB_ID, uploadRepresentation.getBlobId().asString())
            .setLong(SIZE, uploadRepresentation.getSize())
            .setInstant(UPLOAD_DATE, uploadRepresentation.getUploadDate())
            .setString(CONTENT_TYPE, uploadRepresentation.getContentType().asString()));
    }

    public Mono<UploadRepresentation> retrieve(Username username, UploadId id) {
        return executor.executeSingleRow(selectOne.bind()
                .setString(USER, username.asString())
                .setUuid(ID, id.getId()))
            .map(rowToUploadRepresentation());
    }

    public Flux<UploadRepresentation> list(Username username) {
        return Flux.from(executor.executeRows(list.bind()
                .setString(USER, username.asString())))
            .map(rowToUploadRepresentation());
    }

    public Mono<Boolean> delete(Username username, UploadId uploadId) {
        return executor.executeVoid(delete.bind()
            .setString(USER, username.asString())
            .setUuid(ID, uploadId.getId()))
            .thenReturn(true);
    }

    public Flux<UploadRepresentation> all() {
        return Flux.from(executor.executeRows(all.bind()))
            .map(rowToUploadRepresentation());
    }

    private Function<Row, UploadRepresentation> rowToUploadRepresentation() {
        return row -> new UploadRepresentation(UploadId.from(row.getUuid(ID)),
            blobIdFactory.from(row.getString(BLOB_ID)),
            ContentType.of(row.getString(CONTENT_TYPE)),
            row.getLong(SIZE),
            Username.of(row.getString(USER)),
            Optional.ofNullable(row.getInstant(UPLOAD_DATE)).orElse(UPLOAD_DATE_FALLBACK));
    }
}
