blob: 14996c35b19076a3164f8e1ff58f8f269ea9199c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.rest;
import static java.util.concurrent.TimeUnit.SECONDS;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.net.URI;
import java.net.URL;
import java.nio.ByteBuffer;
import java.sql.Time;
import java.sql.Timestamp;
import java.text.DateFormat;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.UriBuilder;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase;
import org.apache.pulsar.broker.authentication.AuthenticationParameters;
import org.apache.pulsar.broker.lookup.LookupResult;
import org.apache.pulsar.broker.namespace.LookupOptions;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.schema.SchemaRegistry;
import org.apache.pulsar.broker.service.schema.exceptions.SchemaException;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.apache.pulsar.client.impl.schema.AvroBaseStructSchema;
import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
import org.apache.pulsar.client.impl.schema.KeyValueSchemaInfo;
import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
import org.apache.pulsar.client.impl.schema.StringSchema;
import org.apache.pulsar.client.impl.schema.generic.GenericAvroRecord;
import org.apache.pulsar.client.impl.schema.generic.GenericAvroWriter;
import org.apache.pulsar.client.impl.schema.generic.GenericJsonRecord;
import org.apache.pulsar.client.impl.schema.generic.GenericJsonWriter;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.compression.CompressionCodecProvider;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.schema.LongSchemaVersion;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.apache.pulsar.websocket.data.ProducerAck;
import org.apache.pulsar.websocket.data.ProducerAcks;
import org.apache.pulsar.websocket.data.ProducerMessage;
import org.apache.pulsar.websocket.data.ProducerMessages;
/**
* Contains methods used by REST api to producer/consumer/read messages to/from pulsar topics.
*/
@Slf4j
public class TopicsBase extends PersistentTopicsBase {
private static String defaultProducerName = "RestProducer";
// Publish message to a topic, can be partitioned or non-partitioned
protected void publishMessages(AsyncResponse asyncResponse, ProducerMessages request, boolean authoritative) {
String topic = topicName.getPartitionedTopicName();
try {
if (pulsar().getBrokerService().getOwningTopics().containsKey(topic)
|| !findOwnerBrokerForTopic(authoritative, asyncResponse)) {
// If we've done look up or or after look up this broker owns some of the partitions
// then proceed to publish message else asyncResponse will be complete by look up.
addOrGetSchemaForTopic(getSchemaData(request.getKeySchema(), request.getValueSchema()),
request.getSchemaVersion() == -1 ? null : new LongSchemaVersion(request.getSchemaVersion()))
.thenAccept(schemaMeta -> {
// Both schema version and schema data are necessary.
if (schemaMeta.getLeft() != null && schemaMeta.getRight() != null) {
internalPublishMessages(topicName, request, pulsar().getBrokerService()
.getOwningTopics().get(topic).values(), asyncResponse,
AutoConsumeSchema.getSchema(schemaMeta.getLeft().toSchemaInfo()),
schemaMeta.getRight());
} else {
asyncResponse.resume(new RestException(Status.INTERNAL_SERVER_ERROR,
"Fail to add or retrieve schema."));
}
}).exceptionally(e -> {
if (log.isDebugEnabled()) {
log.debug("Fail to publish message: " + e.getMessage());
}
asyncResponse.resume(new RestException(Status.INTERNAL_SERVER_ERROR, "Fail to publish message:"
+ e.getMessage()));
return null;
});
}
} catch (Exception e) {
asyncResponse.resume(new RestException(Status.INTERNAL_SERVER_ERROR, "Fail to publish message: "
+ e.getMessage()));
}
}
// Publish message to single partition of a partitioned topic.
protected void publishMessagesToPartition(AsyncResponse asyncResponse, ProducerMessages request,
boolean authoritative, int partition) {
if (topicName.isPartitioned()) {
asyncResponse.resume(new RestException(Status.BAD_REQUEST, "Topic name can't contain "
+ "'-partition-' suffix."));
}
String topic = topicName.getPartitionedTopicName();
try {
// If broker owns the partition then proceed to publish message, else do look up.
if ((pulsar().getBrokerService().getOwningTopics().containsKey(topic)
&& pulsar().getBrokerService().getOwningTopics().get(topic)
.contains(partition))
|| !findOwnerBrokerForTopic(authoritative, asyncResponse)) {
addOrGetSchemaForTopic(getSchemaData(request.getKeySchema(), request.getValueSchema()),
request.getSchemaVersion() == -1 ? null : new LongSchemaVersion(request.getSchemaVersion()))
.thenAccept(schemaMeta -> {
// Both schema version and schema data are necessary.
if (schemaMeta.getLeft() != null && schemaMeta.getRight() != null) {
internalPublishMessagesToPartition(topicName, request, partition, asyncResponse,
AutoConsumeSchema.getSchema(schemaMeta.getLeft().toSchemaInfo()),
schemaMeta.getRight());
} else {
asyncResponse.resume(new RestException(Status.INTERNAL_SERVER_ERROR,
"Fail to add or retrieve schema."));
}
}).exceptionally(e -> {
if (log.isDebugEnabled()) {
log.debug("Fail to publish message to single partition: " + e.getLocalizedMessage());
}
asyncResponse.resume(new RestException(Status.INTERNAL_SERVER_ERROR, "Fail to publish message"
+ "to single partition: "
+ e.getMessage()));
return null;
});
}
} catch (Exception e) {
asyncResponse.resume(new RestException(Status.INTERNAL_SERVER_ERROR, "Fail to publish message: "
+ e.getMessage()));
}
}
private void internalPublishMessagesToPartition(TopicName topicName, ProducerMessages request,
int partition, AsyncResponse asyncResponse,
Schema schema, SchemaVersion schemaVersion) {
try {
String producerName = (null == request.getProducerName() || request.getProducerName().isEmpty())
? defaultProducerName : request.getProducerName();
List<Message> messages = buildMessage(request, schema, producerName, topicName);
List<CompletableFuture<PositionImpl>> publishResults = new ArrayList<>();
List<ProducerAck> produceMessageResults = new ArrayList<>();
for (int index = 0; index < messages.size(); index++) {
ProducerAck produceMessageResult = new ProducerAck();
produceMessageResult.setMessageId(partition + "");
produceMessageResults.add(produceMessageResult);
publishResults.add(publishSingleMessageToPartition(topicName.getPartition(partition).toString(),
messages.get(index)));
}
FutureUtil.waitForAll(publishResults).thenRun(() -> {
processPublishMessageResults(produceMessageResults, publishResults);
asyncResponse.resume(Response.ok().entity(new ProducerAcks(produceMessageResults,
((LongSchemaVersion) schemaVersion).getVersion())).build());
}).exceptionally(e -> {
// Some message may published successfully, so till return ok with result for each individual message.
processPublishMessageResults(produceMessageResults, publishResults);
asyncResponse.resume(Response.ok().entity(new ProducerAcks(produceMessageResults,
((LongSchemaVersion) schemaVersion).getVersion())).build());
return null;
});
} catch (Exception e) {
if (log.isDebugEnabled()) {
log.debug("Fail publish messages to single partition with rest produce message "
+ "request for topic {}: {} ", topicName, e.getCause());
}
asyncResponse.resume(new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage()));
}
}
private void internalPublishMessages(TopicName topicName, ProducerMessages request,
List<Integer> partitionIndexes,
AsyncResponse asyncResponse, Schema schema,
SchemaVersion schemaVersion) {
if (partitionIndexes.size() < 1) {
asyncResponse.resume(new RestException(Status.INTERNAL_SERVER_ERROR,
new BrokerServiceException.TopicNotFoundException("Topic not owned by current broker.")));
}
try {
String producerName = (null == request.getProducerName() || request.getProducerName().isEmpty())
? defaultProducerName : request.getProducerName();
List<Message> messages = buildMessage(request, schema, producerName, topicName);
List<CompletableFuture<PositionImpl>> publishResults = new ArrayList<>();
List<ProducerAck> produceMessageResults = new ArrayList<>();
// Try to publish messages to all partitions this broker owns in round robin mode.
for (int index = 0; index < messages.size(); index++) {
ProducerAck produceMessageResult = new ProducerAck();
produceMessageResult.setMessageId(partitionIndexes.get(index % (int) partitionIndexes.size()) + "");
produceMessageResults.add(produceMessageResult);
publishResults.add(publishSingleMessageToPartition(topicName.getPartition(partitionIndexes
.get(index % (int) partitionIndexes.size())).toString(),
messages.get(index)));
}
FutureUtil.waitForAll(publishResults).thenRun(() -> {
processPublishMessageResults(produceMessageResults, publishResults);
asyncResponse.resume(Response.ok().entity(new ProducerAcks(produceMessageResults,
((LongSchemaVersion) schemaVersion).getVersion())).build());
}).exceptionally(e -> {
// Some message may published successfully, so till return ok with result for each individual message.
processPublishMessageResults(produceMessageResults, publishResults);
asyncResponse.resume(Response.ok().entity(new ProducerAcks(produceMessageResults,
((LongSchemaVersion) schemaVersion).getVersion())).build());
return null;
});
} catch (Exception e) {
if (log.isDebugEnabled()) {
log.debug("Fail to publish messages with rest produce message request for topic {}: {} ",
topicName, e.getCause());
}
asyncResponse.resume(new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage()));
}
}
private CompletableFuture<PositionImpl> publishSingleMessageToPartition(String topic, Message message) {
CompletableFuture<PositionImpl> publishResult = new CompletableFuture<>();
pulsar().getBrokerService().getTopic(topic, false)
.thenAccept(t -> {
// TODO: Check message backlog and fail if backlog too large.
if (!t.isPresent()) {
// Topic not found, and remove from owning partition list.
publishResult.completeExceptionally(new BrokerServiceException.TopicNotFoundException("Topic not "
+ "owned by current broker."));
TopicName topicName = TopicName.get(topic);
pulsar().getBrokerService().getOwningTopics().get(topicName.getPartitionedTopicName())
.remove(topicName.getPartitionIndex());
} else {
try {
t.get().publishMessage(messageToByteBuf(message),
RestMessagePublishContext.get(publishResult, t.get(), System.nanoTime()));
} catch (Exception e) {
if (log.isDebugEnabled()) {
log.debug("Fail to publish single messages to topic {}: {} ",
topicName, e.getCause());
}
publishResult.completeExceptionally(e);
}
}
});
return publishResult;
}
// Process results for all message publishing attempts
private void processPublishMessageResults(List<ProducerAck> produceMessageResults,
List<CompletableFuture<PositionImpl>> publishResults) {
// process publish message result
for (int index = 0; index < publishResults.size(); index++) {
try {
PositionImpl position = publishResults.get(index).get();
MessageId messageId = new MessageIdImpl(position.getLedgerId(), position.getEntryId(),
Integer.parseInt(produceMessageResults.get(index).getMessageId()));
produceMessageResults.get(index).setMessageId(messageId.toString());
} catch (Exception e) {
if (log.isDebugEnabled()) {
log.debug("Fail publish [{}] message with rest produce message request for topic {}",
index, topicName);
}
if (e instanceof BrokerServiceException.TopicNotFoundException) {
// Topic ownership might changed, force to look up again.
pulsar().getBrokerService().getOwningTopics().remove(topicName.getPartitionedTopicName());
}
extractException(e, produceMessageResults.get(index));
}
}
}
// Return error code depends on exception we got indicating if client should retry with same broker.
private void extractException(Exception e, ProducerAck produceMessageResult) {
if (!(e instanceof BrokerServiceException.TopicFencedException && e instanceof ManagedLedgerException)) {
produceMessageResult.setErrorCode(2);
} else {
produceMessageResult.setErrorCode(1);
}
produceMessageResult.setErrorMsg(e.getMessage());
}
// Look up topic owner for given topic. Return if asyncResponse has been completed
// which indicating redirect or exception.
private boolean findOwnerBrokerForTopic(boolean authoritative, AsyncResponse asyncResponse) {
PartitionedTopicMetadata metadata = internalGetPartitionedMetadata(authoritative, false);
List<String> redirectAddresses = Collections.synchronizedList(new ArrayList<>());
CompletableFuture<Boolean> future = new CompletableFuture<>();
List<CompletableFuture<Void>> lookupFutures = new ArrayList<>();
if (!topicName.isPartitioned() && metadata.partitions > 1) {
// Partitioned topic with multiple partitions, need to do look up for each partition.
for (int index = 0; index < metadata.partitions; index++) {
lookupFutures.add(lookUpBrokerForTopic(topicName.getPartition(index),
authoritative, redirectAddresses));
}
} else {
// Non-partitioned topic or specific topic partition.
lookupFutures.add(lookUpBrokerForTopic(topicName, authoritative, redirectAddresses));
}
FutureUtil.waitForAll(lookupFutures)
.thenRun(() -> {
processLookUpResult(redirectAddresses, asyncResponse, future);
}).exceptionally(e -> {
processLookUpResult(redirectAddresses, asyncResponse, future);
return null;
});
try {
return future.get();
} catch (Exception e) {
if (log.isDebugEnabled()) {
log.debug("Fail to lookup topic for rest produce message request for topic {}.", topicName.toString());
}
if (!asyncResponse.isDone()) {
asyncResponse.resume(new RestException(Status.INTERNAL_SERVER_ERROR, "Internal error: "
+ e.getMessage()));
}
return true;
}
}
private void processLookUpResult(List<String> redirectAddresses, AsyncResponse asyncResponse,
CompletableFuture<Boolean> future) {
// Current broker doesn't own the topic or any partition of the topic, redirect client to a broker
// that own partition of the topic or know who own partition of the topic.
if (!pulsar().getBrokerService().getOwningTopics().containsKey(topicName.getPartitionedTopicName())) {
if (redirectAddresses.isEmpty()) {
// No broker to redirect, means look up for some partitions failed,
// client should retry with other brokers.
asyncResponse.resume(new RestException(Status.NOT_FOUND, "Can't find owner of given topic."));
future.complete(true);
} else {
// Redirect client to other broker owns the topic or know which broker own the topic.
try {
if (log.isDebugEnabled()) {
log.debug("Redirect rest produce request for topic {} from {} to {}.",
topicName, pulsar().getWebServiceAddress(), redirectAddresses.get(0));
}
URL redirectAddress = new URL(redirectAddresses.get(0));
URI redirectURI = UriBuilder.fromUri(uri.getRequestUri())
.host(redirectAddress.getHost())
.port(redirectAddress.getPort())
.build();
asyncResponse.resume(Response.temporaryRedirect(redirectURI).build());
future.complete(true);
} catch (Exception e) {
if (log.isDebugEnabled()) {
log.error("Error in preparing redirect url with rest produce message request for topic {}: {}",
topicName, e.getMessage(), e);
}
asyncResponse.resume(new RestException(Status.INTERNAL_SERVER_ERROR,
"Fail to redirect client request."));
future.complete(true);
}
}
} else {
future.complete(false);
}
}
// Look up topic owner for non-partitioned topic or single topic partition.
private CompletableFuture<Void> lookUpBrokerForTopic(TopicName partitionedTopicName,
boolean authoritative, List<String> redirectAddresses) {
CompletableFuture<Void> future = new CompletableFuture<>();
if (!pulsar().getBrokerService().getLookupRequestSemaphore().tryAcquire()) {
if (log.isDebugEnabled()) {
log.debug("Too many concurrent lookup request.");
}
future.completeExceptionally(new BrokerServiceException.TooManyRequestsException("Too many "
+ "concurrent lookup request"));
return future;
}
CompletableFuture<Optional<LookupResult>> lookupFuture = pulsar().getNamespaceService()
.getBrokerServiceUrlAsync(partitionedTopicName,
LookupOptions.builder().authoritative(authoritative).loadTopicsInBundle(false).build());
lookupFuture.thenAccept(optionalResult -> {
if (optionalResult == null || !optionalResult.isPresent()) {
if (log.isDebugEnabled()) {
log.debug("Fail to lookup topic for rest produce message request for topic {}.",
partitionedTopicName);
}
completeLookup(Pair.of(Collections.emptyList(), false), redirectAddresses, future);
return;
}
LookupResult result = optionalResult.get();
if (result.getLookupData().getHttpUrl().equals(pulsar().getWebServiceAddress())) {
// Current broker owns the topic, add to owning topic.
if (log.isDebugEnabled()) {
log.debug("Complete topic look up for rest produce message request for topic {}, "
+ "current broker is owner broker: {}",
partitionedTopicName, result.getLookupData());
}
pulsar().getBrokerService().getOwningTopics().computeIfAbsent(partitionedTopicName
.getPartitionedTopicName(),
(key) -> ConcurrentOpenHashSet.<Integer>newBuilder().build())
.add(partitionedTopicName.getPartitionIndex());
completeLookup(Pair.of(Collections.emptyList(), false), redirectAddresses, future);
} else {
// Current broker doesn't own the topic or doesn't know who own the topic.
if (log.isDebugEnabled()) {
log.debug("Complete topic look up for rest produce message request for topic {}, "
+ "current broker is not owner broker: {}",
partitionedTopicName, result.getLookupData());
}
if (result.isRedirect()) {
// Redirect lookup.
completeLookup(Pair.of(Arrays.asList(result.getLookupData().getHttpUrl(),
result.getLookupData().getHttpUrlTls()), false), redirectAddresses, future);
} else {
// Found owner for topic.
completeLookup(Pair.of(Arrays.asList(result.getLookupData().getHttpUrl(),
result.getLookupData().getHttpUrlTls()), true), redirectAddresses, future);
}
}
}).exceptionally(exception -> {
if (log.isDebugEnabled()) {
log.debug("Fail to lookup broker with rest produce message request for topic {}: {}",
partitionedTopicName, exception.getMessage());
}
completeLookup(Pair.of(Collections.emptyList(), false), redirectAddresses, future);
return null;
});
return future;
}
private CompletableFuture<Pair<SchemaData, SchemaVersion>> addOrGetSchemaForTopic(SchemaData schemaData,
LongSchemaVersion schemaVersion) {
CompletableFuture<Pair<SchemaData, SchemaVersion>> future = new CompletableFuture<>();
// If schema version presents try to fetch existing schema.
if (null != schemaVersion) {
String id = TopicName.get(topicName.getPartitionedTopicName()).getSchemaName();
SchemaRegistry.SchemaAndMetadata schemaAndMetadata;
try {
schemaAndMetadata = pulsar().getSchemaRegistryService().getSchema(id, schemaVersion).get();
future.complete(Pair.of(schemaAndMetadata.schema, schemaAndMetadata.version));
} catch (Exception e) {
if (log.isDebugEnabled()) {
log.debug("Fail to retrieve schema of version {} for topic {}: {}",
schemaVersion.getVersion(), topicName, e.getMessage());
}
future.completeExceptionally(e);
}
} else if (null != schemaData) {
// Else try to add schema to topic.
SchemaVersion sv;
try {
sv = addSchema(schemaData).get();
future.complete(Pair.of(schemaData, sv));
} catch (Exception e) {
if (log.isDebugEnabled()) {
log.debug("Fail to add schema {} for topic {}: {}",
new String(schemaData.toSchemaInfo().getSchema()), topicName, e.getMessage());
}
future.completeExceptionally(e);
}
} else {
// Indicating exception.
future.complete(Pair.of(null, null));
}
return future;
}
// Add a new schema to schema registry for a topic
private CompletableFuture<SchemaVersion> addSchema(SchemaData schemaData) {
// Only need to add to first partition the broker owns since the schema id in schema registry are
// same for all partitions which is the partitionedTopicName
List<Integer> partitions = pulsar().getBrokerService().getOwningTopics()
.get(topicName.getPartitionedTopicName()).values();
CompletableFuture<SchemaVersion> result = new CompletableFuture<>();
for (int index = 0; index < partitions.size(); index++) {
CompletableFuture<SchemaVersion> future = new CompletableFuture<>();
String topicPartitionName = topicName.getPartition(partitions.get(index)).toString();
pulsar().getBrokerService().getTopic(topicPartitionName, false)
.thenAccept(topic -> {
if (!topic.isPresent()) {
future.completeExceptionally(new BrokerServiceException.TopicNotFoundException(
"Topic " + topicPartitionName + " not found"));
} else {
topic.get().addSchema(schemaData).thenAccept(schemaVersion -> future.complete(schemaVersion))
.exceptionally(exception -> {
future.completeExceptionally(exception);
return null;
});
}
});
try {
result.complete(future.get());
break;
} catch (Exception e) {
if (log.isDebugEnabled()) {
log.debug("Fail to add schema to topic " + topicName.getPartitionedTopicName()
+ " for partition " + partitions.get(index) + " for REST produce request.");
}
}
}
// Not able to add schema to any partition
if (!result.isDone()) {
result.completeExceptionally(new SchemaException("Unable to add schema " + schemaData
+ " to topic " + topicName.getPartitionedTopicName()));
}
return result;
}
// Build schemaData from passed in schema string.
private SchemaData getSchemaData(String keySchema, String valueSchema) {
try {
SchemaInfoImpl valueSchemaInfo = (valueSchema == null || valueSchema.isEmpty())
? (SchemaInfoImpl) StringSchema.utf8().getSchemaInfo() :
ObjectMapperFactory.getThreadLocal()
.readValue(valueSchema, SchemaInfoImpl.class);
if (null == valueSchemaInfo.getName()) {
valueSchemaInfo.setName(valueSchemaInfo.getType().toString());
}
// Value schema only
if (keySchema == null || keySchema.isEmpty()) {
return SchemaData.builder()
.data(valueSchemaInfo.getSchema())
.isDeleted(false)
.user("Rest Producer")
.timestamp(System.currentTimeMillis())
.type(valueSchemaInfo.getType())
.props(valueSchemaInfo.getProperties())
.build();
} else {
// Key_Value schema
SchemaInfoImpl keySchemaInfo = ObjectMapperFactory.getThreadLocal()
.readValue(keySchema, SchemaInfoImpl.class);
if (null == keySchemaInfo.getName()) {
keySchemaInfo.setName(keySchemaInfo.getType().toString());
}
SchemaInfo schemaInfo = KeyValueSchemaInfo.encodeKeyValueSchemaInfo("KVSchema-"
+ topicName.getPartitionedTopicName(),
keySchemaInfo, valueSchemaInfo,
KeyValueEncodingType.SEPARATED);
return SchemaData.builder()
.data(schemaInfo.getSchema())
.isDeleted(false)
.user("Rest Producer")
.timestamp(System.currentTimeMillis())
.type(schemaInfo.getType())
.props(schemaInfo.getProperties())
.build();
}
} catch (IOException e) {
if (log.isDebugEnabled()) {
log.debug("Fail to parse schema info for rest produce request with key schema {} and value schema {}"
, keySchema, valueSchema);
}
return null;
}
}
// Convert message to ByteBuf
public ByteBuf messageToByteBuf(Message message) {
checkArgument(message instanceof MessageImpl, "Message must be type of MessageImpl.");
MessageImpl msg = (MessageImpl) message;
MessageMetadata messageMetadata = msg.getMessageBuilder();
ByteBuf payload = msg.getDataBuffer();
messageMetadata.setCompression(CompressionCodecProvider.convertToWireProtocol(CompressionType.NONE));
messageMetadata.setUncompressedSize(payload.readableBytes());
return Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, messageMetadata, payload);
}
// Build pulsar message from REST request.
private List<Message> buildMessage(ProducerMessages producerMessages, Schema schema,
String producerName, TopicName topicName) {
List<ProducerMessage> messages;
List<Message> pulsarMessages = new ArrayList<>();
messages = producerMessages.getMessages();
for (ProducerMessage message : messages) {
MessageMetadata messageMetadata = new MessageMetadata();
messageMetadata.setProducerName(producerName);
messageMetadata.setPublishTime(System.currentTimeMillis());
messageMetadata.setSequenceId(message.getSequenceId());
if (null != message.getReplicationClusters()) {
messageMetadata.addAllReplicateTos(message.getReplicationClusters());
}
if (null != message.getProperties()) {
messageMetadata.addAllProperties(message.getProperties().entrySet().stream().map(entry -> {
org.apache.pulsar.common.api.proto.KeyValue keyValue =
new org.apache.pulsar.common.api.proto.KeyValue();
keyValue.setKey(entry.getKey());
keyValue.setValue(entry.getValue());
return keyValue;
}).collect(Collectors.toList()));
}
if (null != message.getKey()) {
// If has key schema, encode partition key, else use plain text.
if (schema.getSchemaInfo().getType() == SchemaType.KEY_VALUE) {
KeyValueSchemaImpl kvSchema = (KeyValueSchemaImpl) schema;
messageMetadata.setPartitionKey(
Base64.getEncoder().encodeToString(encodeWithSchema(message.getKey(),
kvSchema.getKeySchema())));
messageMetadata.setPartitionKeyB64Encoded(true);
} else {
messageMetadata.setPartitionKey(message.getKey());
messageMetadata.setPartitionKeyB64Encoded(false);
}
}
if (null != message.getEventTime() && !message.getEventTime().isEmpty()) {
messageMetadata.setEventTime(Long.parseLong(message.getEventTime()));
}
if (message.isDisableReplication()) {
messageMetadata.clearReplicateTo();
messageMetadata.addReplicateTo("__local__");
}
if (message.getDeliverAt() != 0 && messageMetadata.hasEventTime()) {
messageMetadata.setDeliverAtTime(message.getDeliverAt());
} else if (message.getDeliverAfterMs() != 0) {
messageMetadata.setDeliverAtTime(messageMetadata.getEventTime() + message.getDeliverAfterMs());
}
if (schema.getSchemaInfo().getType() == SchemaType.KEY_VALUE) {
KeyValueSchemaImpl kvSchema = (KeyValueSchemaImpl) schema;
pulsarMessages.add(MessageImpl.create(messageMetadata,
ByteBuffer.wrap(encodeWithSchema(message.getPayload(), kvSchema.getValueSchema())),
schema, topicName.toString()));
} else {
pulsarMessages.add(MessageImpl.create(messageMetadata,
ByteBuffer.wrap(encodeWithSchema(message.getPayload(), schema)), schema,
topicName.toString()));
}
}
return pulsarMessages;
}
// Encode message with corresponding schema, do necessary conversion before encoding
private byte[] encodeWithSchema(String input, Schema schema) {
try {
switch (schema.getSchemaInfo().getType()) {
case INT8:
return schema.encode(Byte.parseByte(input));
case INT16:
return schema.encode(Short.parseShort(input));
case INT32:
return schema.encode(Integer.parseInt(input));
case INT64:
return schema.encode(Long.parseLong(input));
case STRING:
return schema.encode(input);
case FLOAT:
return schema.encode(Float.parseFloat(input));
case DOUBLE:
return schema.encode(Double.parseDouble(input));
case BOOLEAN:
return schema.encode(Boolean.parseBoolean(input));
case BYTES:
return schema.encode(input.getBytes());
case DATE:
return schema.encode(DateFormat.getDateInstance().parse(input));
case TIME:
return schema.encode(new Time(Long.parseLong(input)));
case TIMESTAMP:
return schema.encode(new Timestamp(Long.parseLong(input)));
case INSTANT:
return schema.encode(Instant.parse(input));
case LOCAL_DATE:
return schema.encode(LocalDate.parse(input));
case LOCAL_TIME:
return schema.encode(LocalTime.parse(input));
case LOCAL_DATE_TIME:
return schema.encode(LocalDateTime.parse(input));
case JSON:
GenericJsonWriter jsonWriter = new GenericJsonWriter();
return jsonWriter.write(new GenericJsonRecord(null, null,
ObjectMapperFactory.getThreadLocal().readTree(input), schema.getSchemaInfo()));
case AVRO:
AvroBaseStructSchema avroSchema = ((AvroBaseStructSchema) schema);
Decoder decoder = DecoderFactory.get().jsonDecoder(avroSchema.getAvroSchema(), input);
DatumReader<GenericData.Record> reader = new GenericDatumReader(avroSchema.getAvroSchema());
GenericRecord genericRecord = reader.read(null, decoder);
GenericAvroWriter avroWriter = new GenericAvroWriter(avroSchema.getAvroSchema());
return avroWriter.write(new GenericAvroRecord(null,
avroSchema.getAvroSchema(), null, genericRecord));
case PROTOBUF_NATIVE:
case KEY_VALUE:
default:
throw new PulsarClientException.InvalidMessageException("");
}
} catch (Exception e) {
if (log.isDebugEnabled()) {
log.debug("Fail to encode value {} with schema {} for rest produce request", input,
new String(schema.getSchemaInfo().getSchema()));
}
return new byte[0];
}
}
// Release lookup semaphore and add result to redirectAddresses if current broker doesn't own the topic.
private synchronized void completeLookup(Pair<List<String>, Boolean> result, List<String> redirectAddresses,
CompletableFuture<Void> future) {
pulsar().getBrokerService().getLookupRequestSemaphore().release();
// Left is lookup result of secure/insecure address if lookup succeed, Right is address is the owner's address
// or it's a address to redirect lookup.
if (!result.getLeft().isEmpty()) {
if (result.getRight()) {
// If address is for owner of topic partition, add to head and it'll have higher priority
// compare to broker for look redirect.
redirectAddresses.add(0, isRequestHttps() ? result.getLeft().get(1) : result.getLeft().get(0));
} else {
redirectAddresses.add(redirectAddresses.size(), isRequestHttps()
? result.getLeft().get(1) : result.getLeft().get(0));
}
}
future.complete(null);
}
public void validateProducePermission() throws Exception {
if (pulsar().getConfiguration().isAuthenticationEnabled()
&& pulsar().getBrokerService().isAuthorizationEnabled()) {
if (!isClientAuthenticated(clientAppId())) {
throw new RestException(Status.UNAUTHORIZED, "Need to authenticate to perform the request");
}
AuthenticationParameters authParams = authParams();
boolean isAuthorized;
try {
isAuthorized = pulsar().getBrokerService().getAuthorizationService()
.allowTopicOperationAsync(topicName, TopicOperation.PRODUCE, authParams)
.get(config().getMetadataStoreOperationTimeoutSeconds(), SECONDS);
} catch (InterruptedException e) {
log.warn("Time-out {} sec while checking authorization on {} ",
config().getMetadataStoreOperationTimeoutSeconds(), topicName);
throw new RestException(Status.INTERNAL_SERVER_ERROR, "Time-out while checking authorization");
} catch (Exception e) {
log.warn("Producer-client with Role - {} {} failed to get permissions for topic - {}. {}",
authParams.getClientRole(), authParams.getOriginalPrincipal(), topicName, e.getMessage());
throw new RestException(Status.INTERNAL_SERVER_ERROR, "Failed to get permissions");
}
if (!isAuthorized) {
throw new RestException(Status.UNAUTHORIZED, "Unauthorized to produce to topic " + topicName);
}
}
}
}