| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.pulsar.client.impl; |
| |
| import static com.google.common.base.Preconditions.checkArgument; |
| import static org.apache.pulsar.client.util.TypeCheckUtil.checkType; |
| import java.nio.ByteBuffer; |
| import java.util.Base64; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Objects; |
| import java.util.Optional; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.TimeUnit; |
| import org.apache.pulsar.client.api.Message; |
| import org.apache.pulsar.client.api.MessageId; |
| import org.apache.pulsar.client.api.PulsarClientException; |
| import org.apache.pulsar.client.api.Schema; |
| import org.apache.pulsar.client.api.TypedMessageBuilder; |
| import org.apache.pulsar.client.api.schema.KeyValueSchema; |
| import org.apache.pulsar.client.impl.transaction.TransactionImpl; |
| import org.apache.pulsar.common.api.proto.MessageMetadata; |
| import org.apache.pulsar.common.schema.KeyValueEncodingType; |
| import org.apache.pulsar.common.schema.SchemaType; |
| |
| public class TypedMessageBuilderImpl<T> implements TypedMessageBuilder<T> { |
| |
| private static final long serialVersionUID = 0L; |
| |
| private static final ByteBuffer EMPTY_CONTENT = ByteBuffer.allocate(0); |
| |
| private final transient ProducerBase<?> producer; |
| private final transient MessageMetadata msgMetadata = new MessageMetadata(); |
| private final transient Schema<T> schema; |
| private transient ByteBuffer content; |
| private final transient TransactionImpl txn; |
| |
| public TypedMessageBuilderImpl(ProducerBase<?> producer, Schema<T> schema) { |
| this(producer, schema, null); |
| } |
| |
| public TypedMessageBuilderImpl(ProducerBase<?> producer, |
| Schema<T> schema, |
| TransactionImpl txn) { |
| this.producer = producer; |
| this.schema = schema; |
| this.content = EMPTY_CONTENT; |
| this.txn = txn; |
| } |
| |
| private long beforeSend() { |
| if (txn == null) { |
| return -1L; |
| } |
| msgMetadata.setTxnidLeastBits(txn.getTxnIdLeastBits()); |
| msgMetadata.setTxnidMostBits(txn.getTxnIdMostBits()); |
| return -1L; |
| } |
| |
| @Override |
| public MessageId send() throws PulsarClientException { |
| try { |
| // enqueue the message to the buffer |
| CompletableFuture<MessageId> sendFuture = sendAsync(); |
| |
| if (!sendFuture.isDone()) { |
| // the send request wasn't completed yet (e.g. not failing at enqueuing), then attempt to triggerFlush |
| // it out |
| producer.triggerFlush(); |
| } |
| |
| return sendFuture.get(); |
| } catch (Exception e) { |
| throw PulsarClientException.unwrap(e); |
| } |
| } |
| |
| @Override |
| public CompletableFuture<MessageId> sendAsync() { |
| Message<T> message = getMessage(); |
| CompletableFuture<MessageId> sendFuture; |
| if (txn != null) { |
| sendFuture = producer.internalSendWithTxnAsync(message, txn); |
| txn.registerSendOp(sendFuture); |
| } else { |
| sendFuture = producer.internalSendAsync(message); |
| } |
| return sendFuture; |
| } |
| |
| @Override |
| public TypedMessageBuilder<T> key(String key) { |
| getKeyValueSchema().ifPresent(keyValueSchema -> checkArgument( |
| keyValueSchema.getKeyValueEncodingType() != KeyValueEncodingType.SEPARATED, |
| "This method is not allowed to set keys when in encoding type is SEPARATED")); |
| if (key == null) { |
| msgMetadata.setNullPartitionKey(true); |
| return this; |
| } |
| msgMetadata.setPartitionKey(key); |
| msgMetadata.setPartitionKeyB64Encoded(false); |
| return this; |
| } |
| |
| @Override |
| public TypedMessageBuilder<T> keyBytes(byte[] key) { |
| getKeyValueSchema().ifPresent(keyValueSchema -> checkArgument( |
| keyValueSchema.getKeyValueEncodingType() != KeyValueEncodingType.SEPARATED, |
| "This method is not allowed to set keys when in encoding type is SEPARATED")); |
| if (key == null) { |
| msgMetadata.setNullPartitionKey(true); |
| return this; |
| } |
| msgMetadata.setPartitionKey(Base64.getEncoder().encodeToString(key)); |
| msgMetadata.setPartitionKeyB64Encoded(true); |
| return this; |
| } |
| |
| @Override |
| public TypedMessageBuilder<T> orderingKey(byte[] orderingKey) { |
| msgMetadata.setOrderingKey(orderingKey); |
| return this; |
| } |
| |
| @Override |
| public TypedMessageBuilder<T> value(T value) { |
| if (value == null) { |
| msgMetadata.setNullValue(true); |
| return this; |
| } |
| |
| return getKeyValueSchema().map(keyValueSchema -> { |
| if (keyValueSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED) { |
| setSeparateKeyValue(value, keyValueSchema); |
| return this; |
| } else { |
| return null; |
| } |
| }).orElseGet(() -> { |
| content = ByteBuffer.wrap(schema.encode(value)); |
| return this; |
| }); |
| } |
| |
| @Override |
| public TypedMessageBuilder<T> property(String name, String value) { |
| checkArgument(name != null, "Need Non-Null name"); |
| checkArgument(value != null, "Need Non-Null value for name: " + name); |
| msgMetadata.addProperty() |
| .setKey(name) |
| .setValue(value); |
| return this; |
| } |
| |
| @Override |
| public TypedMessageBuilder<T> properties(Map<String, String> properties) { |
| for (Map.Entry<String, String> entry : properties.entrySet()) { |
| checkArgument(entry.getKey() != null, "Need Non-Null key"); |
| checkArgument(entry.getValue() != null, "Need Non-Null value for key: " + entry.getKey()); |
| msgMetadata.addProperty() |
| .setKey(entry.getKey()) |
| .setValue(entry.getValue()); |
| } |
| |
| return this; |
| } |
| |
| @Override |
| public TypedMessageBuilder<T> eventTime(long timestamp) { |
| checkArgument(timestamp > 0, "Invalid timestamp : '%s'", timestamp); |
| msgMetadata.setEventTime(timestamp); |
| return this; |
| } |
| |
| @Override |
| public TypedMessageBuilder<T> sequenceId(long sequenceId) { |
| checkArgument(sequenceId >= 0); |
| msgMetadata.setSequenceId(sequenceId); |
| return this; |
| } |
| |
| @Override |
| public TypedMessageBuilder<T> replicationClusters(List<String> clusters) { |
| Objects.requireNonNull(clusters); |
| msgMetadata.clearReplicateTo(); |
| msgMetadata.addAllReplicateTos(clusters); |
| return this; |
| } |
| |
| @Override |
| public TypedMessageBuilder<T> disableReplication() { |
| msgMetadata.clearReplicateTo(); |
| msgMetadata.addReplicateTo("__local__"); |
| return this; |
| } |
| |
| @Override |
| public TypedMessageBuilder<T> deliverAfter(long delay, TimeUnit unit) { |
| return deliverAt(System.currentTimeMillis() + unit.toMillis(delay)); |
| } |
| |
| @Override |
| public TypedMessageBuilder<T> deliverAt(long timestamp) { |
| msgMetadata.setDeliverAtTime(timestamp); |
| return this; |
| } |
| |
| @SuppressWarnings("unchecked") |
| @Override |
| public TypedMessageBuilder<T> loadConf(Map<String, Object> config) { |
| config.forEach((key, value) -> { |
| switch (key) { |
| case CONF_KEY: |
| this.key(checkType(value, String.class)); |
| break; |
| case CONF_PROPERTIES: |
| this.properties(checkType(value, Map.class)); |
| break; |
| case CONF_EVENT_TIME: |
| this.eventTime(checkType(value, Long.class)); |
| break; |
| case CONF_SEQUENCE_ID: |
| this.sequenceId(checkType(value, Long.class)); |
| break; |
| case CONF_REPLICATION_CLUSTERS: |
| this.replicationClusters(checkType(value, List.class)); |
| break; |
| case CONF_DISABLE_REPLICATION: |
| boolean disableReplication = checkType(value, Boolean.class); |
| if (disableReplication) { |
| this.disableReplication(); |
| } |
| break; |
| case CONF_DELIVERY_AFTER_SECONDS: |
| this.deliverAfter(checkType(value, Long.class), TimeUnit.SECONDS); |
| break; |
| case CONF_DELIVERY_AT: |
| this.deliverAt(checkType(value, Long.class)); |
| break; |
| default: |
| throw new RuntimeException("Invalid message config key '" + key + "'"); |
| } |
| }); |
| return this; |
| } |
| |
| public MessageMetadata getMetadataBuilder() { |
| return msgMetadata; |
| } |
| |
| public Message<T> getMessage() { |
| beforeSend(); |
| return MessageImpl.create(msgMetadata, content, schema, producer != null ? producer.getTopic() : null); |
| } |
| |
| public long getPublishTime() { |
| return msgMetadata.getPublishTime(); |
| } |
| |
| public boolean hasKey() { |
| return msgMetadata.hasPartitionKey(); |
| } |
| |
| public String getKey() { |
| return msgMetadata.getPartitionKey(); |
| } |
| |
| public ByteBuffer getContent() { |
| return content; |
| } |
| |
| private Optional<KeyValueSchema<?, ?>> getKeyValueSchema() { |
| if (schema.getSchemaInfo() != null |
| && schema.getSchemaInfo().getType() == SchemaType.KEY_VALUE |
| // The schema's class could also be AutoProduceBytesSchema when its type is KEY_VALUE |
| && schema instanceof KeyValueSchema) { |
| return Optional.of((KeyValueSchema<?, ?>) schema); |
| } else { |
| return Optional.empty(); |
| } |
| } |
| |
| @SuppressWarnings("unchecked") |
| private <K, V> void setSeparateKeyValue(T value, KeyValueSchema<K, V> keyValueSchema) { |
| checkArgument(value instanceof org.apache.pulsar.common.schema.KeyValue); |
| org.apache.pulsar.common.schema.KeyValue<K, V> keyValue = |
| (org.apache.pulsar.common.schema.KeyValue<K, V>) value; |
| |
| // set key as the message key |
| if (keyValue.getKey() != null) { |
| msgMetadata.setPartitionKey(Base64.getEncoder().encodeToString( |
| keyValueSchema.getKeySchema().encode(keyValue.getKey()))); |
| msgMetadata.setPartitionKeyB64Encoded(true); |
| } else { |
| msgMetadata.setNullPartitionKey(true); |
| } |
| |
| // set value as the payload |
| if (keyValue.getValue() != null) { |
| content = ByteBuffer.wrap(keyValueSchema.getValueSchema().encode(keyValue.getValue())); |
| } else { |
| msgMetadata.setNullValue(true); |
| } |
| } |
| } |