blob: 5ad2ab939d0117021966177de24c0ae37d103d6b [file] [log] [blame]
/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/
package org.apache.james.jmap.cassandra.upload;
import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.bindMarker;
import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.deleteFrom;
import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.insertInto;
import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.selectFrom;
import static org.apache.james.jmap.cassandra.upload.UploadModule.BLOB_ID;
import static org.apache.james.jmap.cassandra.upload.UploadModule.CONTENT_TYPE;
import static org.apache.james.jmap.cassandra.upload.UploadModule.ID;
import static org.apache.james.jmap.cassandra.upload.UploadModule.SIZE;
import static org.apache.james.jmap.cassandra.upload.UploadModule.TABLE_NAME;
import static org.apache.james.jmap.cassandra.upload.UploadModule.UPLOAD_DATE;
import static org.apache.james.jmap.cassandra.upload.UploadModule.USER;
import java.time.Instant;
import java.util.Optional;
import java.util.function.Function;
import jakarta.inject.Inject;
import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
import org.apache.james.blob.api.BlobId;
import org.apache.james.core.Username;
import org.apache.james.jmap.api.model.UploadId;
import org.apache.james.jmap.api.model.UploadMetaData;
import org.apache.james.mailbox.model.ContentType;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.Row;
import com.google.common.base.MoreObjects;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class UploadDAO {
public static class UploadRepresentation {
private final UploadId id;
private final BlobId blobId;
private final ContentType contentType;
private final long size;
private final Username user;
private final Instant uploadDate;
public UploadRepresentation(UploadId id, BlobId blobId, ContentType contentType, long size, Username user, Instant uploadDate) {
this.user = user;
Preconditions.checkArgument(size >= 0, "Size must be strictly positive");
this.id = id;
this.blobId = blobId;
this.contentType = contentType;
this.size = size;
this.uploadDate = uploadDate;
}
public UploadId getId() {
return id;
}
public BlobId getBlobId() {
return blobId;
}
public ContentType getContentType() {
return contentType;
}
public long getSize() {
return size;
}
public Username getUser() {
return user;
}
public Instant getUploadDate() {
return uploadDate;
}
public UploadMetaData toUploadMetaData() {
return UploadMetaData.from(id, contentType, size, blobId, uploadDate);
}
@Override
public boolean equals(Object obj) {
if (obj instanceof UploadRepresentation) {
UploadRepresentation other = (UploadRepresentation) obj;
return Objects.equal(id, other.id)
&& Objects.equal(user, other.user)
&& Objects.equal(blobId, other.blobId)
&& Objects.equal(contentType, other.contentType)
&& Objects.equal(uploadDate, other.uploadDate)
&& Objects.equal(size, other.size);
}
return false;
}
@Override
public int hashCode() {
return Objects.hashCode(id, blobId, contentType, size, user, uploadDate);
}
@Override
public String toString() {
return MoreObjects
.toStringHelper(this)
.add("id", id)
.add("blobId", blobId)
.add("contentType", contentType)
.add("user", user)
.add("user", user)
.add("uploadDate", uploadDate)
.toString();
}
}
public static final Instant UPLOAD_DATE_FALLBACK = Instant.EPOCH; // 1970-01-01T00:00:00Z
private final CassandraAsyncExecutor executor;
private final BlobId.Factory blobIdFactory;
private final PreparedStatement insert;
private final PreparedStatement selectOne;
private final PreparedStatement delete;
private final PreparedStatement list;
private final PreparedStatement all;
@Inject
public UploadDAO(CqlSession session, BlobId.Factory blobIdFactory) {
this.executor = new CassandraAsyncExecutor(session);
this.blobIdFactory = blobIdFactory;
this.insert = session.prepare(insertInto(TABLE_NAME)
.value(ID, bindMarker(ID))
.value(BLOB_ID, bindMarker(BLOB_ID))
.value(SIZE, bindMarker(SIZE))
.value(USER, bindMarker(USER))
.value(CONTENT_TYPE, bindMarker(CONTENT_TYPE))
.value(UPLOAD_DATE, bindMarker(UPLOAD_DATE))
.build());
this.list = session.prepare(selectFrom(TABLE_NAME)
.all()
.whereColumn(USER).isEqualTo(bindMarker(USER))
.build());
this.selectOne = session.prepare(selectFrom(TABLE_NAME)
.all()
.whereColumn(USER).isEqualTo(bindMarker(USER))
.whereColumn(ID).isEqualTo(bindMarker(ID))
.build());
this.delete = session.prepare(deleteFrom(TABLE_NAME)
.whereColumn(USER).isEqualTo(bindMarker(USER))
.whereColumn(ID).isEqualTo(bindMarker(ID))
.build());
this.all = session.prepare(selectFrom(TABLE_NAME)
.all()
.allowFiltering()
.build());
}
public Mono<Void> save(UploadRepresentation uploadRepresentation) {
return executor.executeVoid(insert.bind()
.setString(USER, uploadRepresentation.getUser().asString())
.setUuid(ID, uploadRepresentation.getId().getId())
.setString(BLOB_ID, uploadRepresentation.getBlobId().asString())
.setLong(SIZE, uploadRepresentation.getSize())
.setInstant(UPLOAD_DATE, uploadRepresentation.getUploadDate())
.setString(CONTENT_TYPE, uploadRepresentation.getContentType().asString()));
}
public Mono<UploadRepresentation> retrieve(Username username, UploadId id) {
return executor.executeSingleRow(selectOne.bind()
.setString(USER, username.asString())
.setUuid(ID, id.getId()))
.map(rowToUploadRepresentation());
}
public Flux<UploadRepresentation> list(Username username) {
return Flux.from(executor.executeRows(list.bind()
.setString(USER, username.asString())))
.map(rowToUploadRepresentation());
}
public Mono<Boolean> delete(Username username, UploadId uploadId) {
return executor.executeVoid(delete.bind()
.setString(USER, username.asString())
.setUuid(ID, uploadId.getId()))
.thenReturn(true);
}
public Flux<UploadRepresentation> all() {
return Flux.from(executor.executeRows(all.bind()))
.map(rowToUploadRepresentation());
}
private Function<Row, UploadRepresentation> rowToUploadRepresentation() {
return row -> new UploadRepresentation(UploadId.from(row.getUuid(ID)),
blobIdFactory.from(row.getString(BLOB_ID)),
ContentType.of(row.getString(CONTENT_TYPE)),
row.getLong(SIZE),
Username.of(row.getString(USER)),
Optional.ofNullable(row.getInstant(UPLOAD_DATE)).orElse(UPLOAD_DATE_FALLBACK));
}
}