Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration;
import org.apache.james.backends.cassandra.init.configuration.JamesExecutionProfiles;
import org.apache.james.blob.api.BlobStore;
import org.apache.james.blob.api.BucketName;
import org.apache.james.core.Username;
import org.apache.james.events.Event;
import org.apache.james.events.EventBus;
Expand Down Expand Up @@ -239,8 +240,8 @@ private Mono<Void> handleMessageDeletionAsPartOfMailboxDeletion(CassandraMessage

private Mono<MessageRepresentation> deleteMessageBlobs(MessageRepresentation message) {
return Flux.merge(
blobStore.delete(blobStore.getDefaultBucketName(), message.getHeaderId()),
blobStore.delete(blobStore.getDefaultBucketName(), message.getBodyId()))
blobStore.delete(BucketName.DEFAULT, message.getHeaderId()),
blobStore.delete(BucketName.DEFAULT, message.getBodyId()))
.then()
.thenReturn(message);
}
Expand All @@ -253,7 +254,7 @@ private Mono<Void> deleteUnreferencedAttachments(MessageRepresentation message)
return Flux.fromIterable(message.getAttachments())
.concatMap(attachment -> attachmentDAO.getAttachment(attachment.getAttachmentId())
.map(CassandraAttachmentDAOV2.DAOAttachment::getBlobId)
.flatMap(blobId -> Mono.from(blobStore.delete(blobStore.getDefaultBucketName(), blobId)))
.flatMap(blobId -> Mono.from(blobStore.delete(BucketName.DEFAULT, blobId)))
.then(attachmentDAO.delete(attachment.getAttachmentId())))
.then();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import jakarta.inject.Inject;

import org.apache.james.blob.api.BlobStore;
import org.apache.james.blob.api.BucketName;
import org.apache.james.mailbox.cassandra.mail.CassandraAttachmentDAOV2.DAOAttachment;
import org.apache.james.mailbox.exception.AttachmentNotFoundException;
import org.apache.james.mailbox.model.AttachmentId;
Expand Down Expand Up @@ -92,15 +93,15 @@ public List<AttachmentMetadata> getAttachments(Collection<AttachmentId> attachme
@Override
public InputStream loadAttachmentContent(AttachmentId attachmentId) throws AttachmentNotFoundException {
return attachmentDAOV2.getAttachment(attachmentId)
.map(daoAttachment -> blobStore.read(blobStore.getDefaultBucketName(), daoAttachment.getBlobId(), LOW_COST))
.map(daoAttachment -> blobStore.read(BucketName.DEFAULT, daoAttachment.getBlobId(), LOW_COST))
.blockOptional()
.orElseThrow(() -> new AttachmentNotFoundException(attachmentId.toString()));
}

@Override
public Mono<InputStream> loadAttachmentContentReactive(AttachmentId attachmentId) {
return attachmentDAOV2.getAttachment(attachmentId)
.flatMap(daoAttachment -> Mono.from(blobStore.readReactive(blobStore.getDefaultBucketName(), daoAttachment.getBlobId(), LOW_COST)))
.flatMap(daoAttachment -> Mono.from(blobStore.readReactive(BucketName.DEFAULT, daoAttachment.getBlobId(), LOW_COST)))
.switchIfEmpty(Mono.error(() -> new AttachmentNotFoundException(attachmentId.toString())));
}

Expand Down Expand Up @@ -133,7 +134,7 @@ private Mono<MessageAttachmentMetadata> storeAttachmentAsync(ParsedAttachment pa
AttachmentId attachmentId = attachmentIdAssignationStrategy.assign(parsedAttachment, ownerMessageId);
ByteSource content = parsedAttachment.getContent();
long size = content.size();
return Mono.from(blobStore.save(blobStore.getDefaultBucketName(), content, LOW_COST))
return Mono.from(blobStore.save(BucketName.DEFAULT, content, LOW_COST))
.map(blobId -> new DAOAttachment(ownerMessageId, attachmentId, blobId, parsedAttachment.getContentType(), size))
.flatMap(this::storeAttachmentWithIndex)
.thenReturn(parsedAttachment.asMessageAttachment(attachmentId, size, ownerMessageId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
import org.apache.james.blob.api.BlobId;
import org.apache.james.blob.api.BlobStore;
import org.apache.james.blob.api.BucketName;
import org.apache.james.mailbox.cassandra.ids.CassandraMessageId;
import org.apache.james.mailbox.cassandra.table.CassandraMessageV3Table.Attachments;
import org.apache.james.mailbox.model.ByteContent;
Expand Down Expand Up @@ -234,8 +235,8 @@ public long size() {
}
};

Mono<BlobId> headerFuture = Mono.from(blobStore.save(blobStore.getDefaultBucketName(), headerContent, SIZE_BASED));
Mono<BlobId> bodyFuture = Mono.from(blobStore.save(blobStore.getDefaultBucketName(), bodyByteSource, LOW_COST));
Mono<BlobId> headerFuture = Mono.from(blobStore.save(BucketName.DEFAULT, headerContent, SIZE_BASED));
Mono<BlobId> bodyFuture = Mono.from(blobStore.save(BucketName.DEFAULT, bodyByteSource, LOW_COST));

return headerFuture.zipWith(bodyFuture);
});
Expand Down Expand Up @@ -413,7 +414,7 @@ private Mono<Content> getFullContent(BlobId headerId, BlobId bodyId) {
}

private Mono<byte[]> getContent(BlobId blobId, BlobStore.StoragePolicy storagePolicy) {
return Mono.from(blobStore.readBytes(blobStore.getDefaultBucketName(), blobId, storagePolicy));
return Mono.from(blobStore.readBytes(BucketName.DEFAULT, blobId, storagePolicy));
}

private BlobId retrieveBlobId(CqlIdentifier field, Row row) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.james.backends.cassandra.init.configuration.JamesExecutionProfiles;
import org.apache.james.blob.api.BlobId;
import org.apache.james.blob.api.BlobStore;
import org.apache.james.blob.api.BucketName;
import org.apache.james.mailbox.MessageManager;
import org.apache.james.mailbox.ModSeq;
import org.apache.james.mailbox.cassandra.ids.CassandraId;
Expand Down Expand Up @@ -135,7 +136,7 @@ private Mono<MailboxMessage> toMailboxMessage(CassandraMessageMetadata metadata,
return Mono.just(metadata.asMailboxMessage(EMPTY_BYTE_ARRAY));
}
if (fetchType == FetchType.HEADERS && metadata.isComplete()) {
return Mono.from(blobStore.readBytes(blobStore.getDefaultBucketName(), metadata.getHeaderContent().get(), SIZE_BASED))
return Mono.from(blobStore.readBytes(BucketName.DEFAULT, metadata.getHeaderContent().get(), SIZE_BASED))
.map(metadata::asMailboxMessage);
}
return messageDAOV3.retrieveMessage(metadata.getComposedMessageId(), fetchType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.james.backends.cassandra.init.configuration.JamesExecutionProfiles;
import org.apache.james.blob.api.BlobId;
import org.apache.james.blob.api.BlobStore;
import org.apache.james.blob.api.BucketName;
import org.apache.james.mailbox.ApplicableFlagBuilder;
import org.apache.james.mailbox.FlagsBuilder;
import org.apache.james.mailbox.MessageManager.FlagsUpdateMode;
Expand Down Expand Up @@ -267,7 +268,7 @@ private Mono<MailboxMessage> toMailboxMessage(CassandraMessageMetadata metadata,
return Mono.just(metadata.asMailboxMessage(EMPTY_BYTE_ARRAY));
}
if (fetchType == FetchType.HEADERS && metadata.isComplete()) {
return Mono.from(blobStore.readBytes(blobStore.getDefaultBucketName(), metadata.getHeaderContent().get(), SIZE_BASED))
return Mono.from(blobStore.readBytes(BucketName.DEFAULT, metadata.getHeaderContent().get(), SIZE_BASED))
.map(metadata::asMailboxMessage);
}
return messageDAOV3.retrieveMessage(metadata.getComposedMessageId(), fetchType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import org.apache.james.blob.api.BlobId;
import org.apache.james.blob.api.BlobStore;
import org.apache.james.blob.api.BucketName;
import org.apache.james.core.MailAddress;
import org.apache.james.core.MaybeSender;
import org.apache.james.events.Event;
Expand Down Expand Up @@ -114,15 +115,15 @@ public Mono<Void> forMessage(MessageContentDeletionEvent messageContentDeletionE
.subject(mimeMessage.map(Message::getSubject))
.build();

return Mono.from(blobStore.readReactive(blobStore.getDefaultBucketName(), blobIdFactory.parse(messageContentDeletionEvent.bodyBlobId()), BlobStore.StoragePolicy.LOW_COST))
return Mono.from(blobStore.readReactive(BucketName.DEFAULT, blobIdFactory.parse(messageContentDeletionEvent.bodyBlobId()), BlobStore.StoragePolicy.LOW_COST))
.map(bodyStream -> new SequenceInputStream(new ByteArrayInputStream(bytes), bodyStream))
.flatMap(bodyStream -> Mono.from(deletedMessageVault.append(deletedMessage, bodyStream)));
});
}

private Mono<byte[]> fetchMessageHeaderBytes(MessageContentDeletionEvent messageContentDeletionEvent) {
return Mono.justOrEmpty(messageContentDeletionEvent.headerBlobId())
.flatMap(headerBlobId -> Mono.from(blobStore.readBytes(blobStore.getDefaultBucketName(), blobIdFactory.parse(headerBlobId), BlobStore.StoragePolicy.LOW_COST)))
.flatMap(headerBlobId -> Mono.from(blobStore.readBytes(BucketName.DEFAULT, blobIdFactory.parse(headerBlobId), BlobStore.StoragePolicy.LOW_COST)))
.switchIfEmpty(Mono.justOrEmpty(messageContentDeletionEvent.headerContent())
.map(headerContent -> headerContent.getBytes(StandardCharsets.UTF_8))
.switchIfEmpty(Mono.error(() -> new IllegalArgumentException("No header content nor header blob id provided"))));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,6 @@ void setUp() throws Exception {
BlobStoreFactory.builder()
.blobStoreDAO(blobStoreDAO)
.blobIdFactory(new PlainBlobId.Factory())
.defaultBucketName()
.passthrough(), blobStoreDAO, new BucketNameGenerator(clock), clock,
VaultConfiguration.ENABLED_DEFAULT);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ void setUp() {
BlobStoreFactory.builder()
.blobStoreDAO(blobStoreDAO)
.blobIdFactory(new PlainBlobId.Factory())
.defaultBucketName()
.passthrough(),
blobStoreDAO, new BucketNameGenerator(clock), clock, VaultConfiguration.ENABLED_DEFAULT);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import org.apache.commons.io.IOUtils;
import org.apache.james.blob.api.BlobStore;
import org.apache.james.blob.api.BucketName;
import org.apache.james.core.Username;
import org.apache.james.events.Event;
import org.apache.james.events.EventBus;
Expand Down Expand Up @@ -169,7 +170,7 @@ private MailboxEvents.MessageContentDeletionEvent messageContentDeletionEvent(Ma

private Mono<Void> deleteBodyBlob(PostgresMessageId id, PostgresMessageDAO postgresMessageDAO) {
return postgresMessageDAO.getBodyBlobId(id)
.flatMap(blobId -> Mono.from(blobStore.delete(blobStore.getDefaultBucketName(), blobId))
.flatMap(blobId -> Mono.from(blobStore.delete(BucketName.DEFAULT, blobId))
.then());
}

Expand All @@ -185,7 +186,7 @@ private Mono<Void> deleteAttachment(PostgresMessageId messageId, PostgresAttachm

private Mono<Void> deleteAttachmentBlobs(PostgresMessageId messageId, PostgresAttachmentDAO attachmentDAO) {
return attachmentDAO.listBlobsByMessageId(messageId)
.flatMap(blobId -> Mono.from(blobStore.delete(blobStore.getDefaultBucketName(), blobId)), ReactorUtils.DEFAULT_CONCURRENCY)
.flatMap(blobId -> Mono.from(blobStore.delete(BucketName.DEFAULT, blobId)), ReactorUtils.DEFAULT_CONCURRENCY)
.then();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import org.apache.commons.lang3.tuple.Pair;
import org.apache.james.blob.api.BlobStore;
import org.apache.james.blob.api.BucketName;
import org.apache.james.mailbox.exception.AttachmentNotFoundException;
import org.apache.james.mailbox.model.AttachmentId;
import org.apache.james.mailbox.model.AttachmentMetadata;
Expand Down Expand Up @@ -64,7 +65,7 @@ public InputStream loadAttachmentContent(AttachmentId attachmentId) {
@Override
public Mono<InputStream> loadAttachmentContentReactive(AttachmentId attachmentId) {
return postgresAttachmentDAO.getAttachment(attachmentId)
.flatMap(pair -> Mono.from(blobStore.readReactive(blobStore.getDefaultBucketName(), pair.getRight(), LOW_COST)))
.flatMap(pair -> Mono.from(blobStore.readReactive(BucketName.DEFAULT, pair.getRight(), LOW_COST)))
.switchIfEmpty(Mono.error(() -> new AttachmentNotFoundException(attachmentId.toString())));
}

Expand Down Expand Up @@ -112,7 +113,7 @@ public Mono<List<MessageAttachmentMetadata>> storeAttachmentsReactive(Collection

private Mono<MessageAttachmentMetadata> storeAttachmentAsync(ParsedAttachment parsedAttachment, MessageId ownerMessageId) {
return Mono.fromCallable(parsedAttachment::getContent)
.flatMap(content -> Mono.from(blobStore.save(blobStore.getDefaultBucketName(), parsedAttachment.getContent(), BlobStore.StoragePolicy.LOW_COST))
.flatMap(content -> Mono.from(blobStore.save(BucketName.DEFAULT, parsedAttachment.getContent(), BlobStore.StoragePolicy.LOW_COST))
.flatMap(blobId -> {
AttachmentId attachmentId = attachmentIdAssignationStrategy.assign(parsedAttachment, ownerMessageId);
return postgresAttachmentDAO.storeAttachment(AttachmentMetadata.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.james.backends.postgres.utils.PostgresUtils;
import org.apache.james.blob.api.BlobId;
import org.apache.james.blob.api.BlobStore;
import org.apache.james.blob.api.BucketName;
import org.apache.james.mailbox.MessageManager;
import org.apache.james.mailbox.MessageUid;
import org.apache.james.mailbox.ModSeq;
Expand Down Expand Up @@ -252,6 +253,6 @@ private boolean identicalFlags(ComposedMessageIdWithMetaData oldComposedId, Flag

private Mono<BlobId> saveBodyContent(MailboxMessage message) {
return Mono.fromCallable(() -> MESSAGE_BODY_CONTENT_LOADER.apply(message))
.flatMap(bodyByteSource -> Mono.from(blobStore.save(blobStore.getDefaultBucketName(), bodyByteSource, LOW_COST)));
.flatMap(bodyByteSource -> Mono.from(blobStore.save(BucketName.DEFAULT, bodyByteSource, LOW_COST)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.james.backends.postgres.utils.PostgresUtils;
import org.apache.james.blob.api.BlobId;
import org.apache.james.blob.api.BlobStore;
import org.apache.james.blob.api.BucketName;
import org.apache.james.mailbox.ApplicableFlagBuilder;
import org.apache.james.mailbox.FlagsBuilder;
import org.apache.james.mailbox.MessageUid;
Expand Down Expand Up @@ -266,7 +267,7 @@ public Mono<MessageMetaData> addReactive(Mailbox mailbox, MailboxMessage message

private Mono<BlobId> saveBodyContent(MailboxMessage message) {
return Mono.fromCallable(() -> MESSAGE_BODY_CONTENT_LOADER.apply(message))
.flatMap(bodyByteSource -> Mono.from(blobStore.save(blobStore.getDefaultBucketName(), bodyByteSource, LOW_COST)));
.flatMap(bodyByteSource -> Mono.from(blobStore.save(BucketName.DEFAULT, bodyByteSource, LOW_COST)));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.commons.lang3.tuple.Pair;
import org.apache.james.blob.api.BlobId;
import org.apache.james.blob.api.BlobStore;
import org.apache.james.blob.api.BucketName;
import org.apache.james.mailbox.model.AttachmentId;
import org.apache.james.mailbox.model.AttachmentMetadata;
import org.apache.james.mailbox.model.Content;
Expand Down Expand Up @@ -112,7 +113,7 @@ public Flux<Pair<SimpleMailboxMessage.Builder, Record>> doRetrieve(Flux<Pair<Sim
}

private Mono<Content> retrieveFullContent(Record messageRecord) {
return Mono.from(blobStore.readBytes(blobStore.getDefaultBucketName(),
return Mono.from(blobStore.readBytes(BucketName.DEFAULT,
blobIdFactory.parse(messageRecord.get(BODY_BLOB_ID)),
SIZE_BASED))
.map(bodyBytes -> new HeaderAndBodyByteContent(messageRecord.get(HEADER_CONTENT), bodyBytes));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

import org.apache.james.blob.api.BlobId;
import org.apache.james.blob.api.BlobStore;
import org.apache.james.blob.api.BucketName;
import org.apache.james.blob.api.ObjectNotFoundException;
import org.apache.james.core.Username;
import org.apache.james.mailbox.MailboxSession;
Expand Down Expand Up @@ -214,9 +215,9 @@ void deleteMessageListenerShouldDeleteUnreferencedBlob() throws Exception {
inboxManager.delete(ImmutableList.of(appendResult.getId().getUid()), session);

assertSoftly(softly -> {
softly.assertThatThrownBy(() -> Mono.from(blobStore.readReactive(blobStore.getDefaultBucketName(), attachmentBlobId)).block())
softly.assertThatThrownBy(() -> Mono.from(blobStore.readReactive(BucketName.DEFAULT, attachmentBlobId)).block())
.isInstanceOf(ObjectNotFoundException.class);
softly.assertThatThrownBy(() -> Mono.from(blobStore.readReactive(blobStore.getDefaultBucketName(), messageBodyBlobId)).block())
softly.assertThatThrownBy(() -> Mono.from(blobStore.readReactive(BucketName.DEFAULT, messageBodyBlobId)).block())
.isInstanceOf(ObjectNotFoundException.class);
});
}
Expand All @@ -236,9 +237,9 @@ void deleteMessageListenerShouldNotDeleteReferencedBlob() throws Exception {
inboxManager.delete(ImmutableList.of(appendResult.getId().getUid()), session);

assertSoftly(softly -> {
assertThat(Mono.from(blobStore.readReactive(blobStore.getDefaultBucketName(), attachmentBlobId)).blockOptional())
assertThat(Mono.from(blobStore.readReactive(BucketName.DEFAULT, attachmentBlobId)).blockOptional())
.isNotEmpty();
assertThat(Mono.from(blobStore.readReactive(blobStore.getDefaultBucketName(), messageBodyBlobId)).blockOptional())
assertThat(Mono.from(blobStore.readReactive(BucketName.DEFAULT, messageBodyBlobId)).blockOptional())
.isNotEmpty();
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@

import org.apache.james.backends.postgres.PostgresExtension;
import org.apache.james.blob.api.BlobStore;
import org.apache.james.blob.api.BucketName;
import org.apache.james.blob.memory.MemoryBlobStoreDAO;
import org.apache.james.events.EventBusTestFixture;
import org.apache.james.events.InVMEventBus;
Expand Down Expand Up @@ -64,7 +63,7 @@ public class DeleteMessageListenerTest extends DeleteMessageListenerContract {

@BeforeAll
static void beforeAll() {
blobStore = new PassThroughBlobStore(new MemoryBlobStoreDAO(), BucketName.DEFAULT, BLOB_ID_FACTORY);
blobStore = new PassThroughBlobStore(new MemoryBlobStoreDAO(), BLOB_ID_FACTORY);

PostgresMailboxSessionMapperFactory mapperFactory = new PostgresMailboxSessionMapperFactory(
postgresExtension.getExecutorFactory(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.james.backends.postgres.PostgresExtension;
import org.apache.james.blob.api.BlobId;
import org.apache.james.blob.api.BlobStore;
import org.apache.james.blob.api.BucketName;
import org.apache.james.blob.api.PlainBlobId;
import org.apache.james.blob.memory.MemoryBlobStoreDAO;
import org.apache.james.core.Username;
Expand Down Expand Up @@ -69,7 +68,7 @@ public class DeleteMessageListenerWithRLSTest extends DeleteMessageListenerContr

@BeforeAll
static void beforeAll() {
blobStore = new PassThroughBlobStore(new MemoryBlobStoreDAO(), BucketName.DEFAULT, BLOB_ID_FACTORY);
blobStore = new PassThroughBlobStore(new MemoryBlobStoreDAO(), BLOB_ID_FACTORY);
BlobId.Factory blobIdFactory = new PlainBlobId.Factory();

PostgresMailboxSessionMapperFactory mapperFactory = new PostgresMailboxSessionMapperFactory(
Expand Down
Loading