blob: b054bf2c3c8e785fa1ad68714dfa174a14dcef66 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.websocket;
import static java.lang.String.format;
import static org.apache.pulsar.websocket.WebSocketError.FailedToCreateProducer;
import static org.apache.pulsar.websocket.WebSocketError.FailedToDeserializeFromJSON;
import static org.apache.pulsar.websocket.WebSocketError.PayloadEncodingError;
import static org.apache.pulsar.websocket.WebSocketError.UnknownError;
import java.io.IOException;
import java.util.Base64;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.LongAdder;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageBuilder;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConfiguration;
import org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode;
import org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.websocket.data.ProducerAck;
import org.apache.pulsar.websocket.data.ProducerMessage;
import org.apache.pulsar.websocket.stats.StatsBuckets;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.UpgradeResponse;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.core.JsonProcessingException;
/**
* Websocket end-point url handler to handle incoming message coming from client. Websocket end-point url handler to
* handle incoming message coming from client.
* <p>
* On every produced message from client it calls broker to persists it.
* </p>
*
*/
public class ProducerHandler extends AbstractWebSocketHandler {
private Producer producer;
private final LongAdder numMsgsSent;
private final LongAdder numMsgsFailed;
private final LongAdder numBytesSent;
private final StatsBuckets publishLatencyStatsUSec;
private volatile long msgPublishedCounter = 0;
private static final AtomicLongFieldUpdater<ProducerHandler> MSG_PUBLISHED_COUNTER_UPDATER =
AtomicLongFieldUpdater.newUpdater(ProducerHandler.class, "msgPublishedCounter");
public static final long[] ENTRY_LATENCY_BUCKETS_USEC = { 500, 1_000, 5_000, 10_000, 20_000, 50_000, 100_000,
200_000, 1000_000 };
public ProducerHandler(WebSocketService service, HttpServletRequest request, ServletUpgradeResponse response) {
super(service, request, response);
this.numMsgsSent = new LongAdder();
this.numBytesSent = new LongAdder();
this.numMsgsFailed = new LongAdder();
this.publishLatencyStatsUSec = new StatsBuckets(ENTRY_LATENCY_BUCKETS_USEC);
try {
ProducerConfiguration conf = getProducerConfiguration();
this.producer = service.getPulsarClient().createProducer(topic, conf);
if (!this.service.addProducer(this)) {
log.warn("[{}:{}] Failed to add producer handler for topic {}", request.getRemoteAddr(),
request.getRemotePort(), topic);
}
} catch (Exception e) {
log.warn("[{}:{}] Failed in creating producer on topic {}", request.getRemoteAddr(),
request.getRemotePort(), topic, e);
try {
response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, "Failed to create producer");
} catch (IOException e1) {
log.warn("[{}:{}] Failed to send error: {}", request.getRemoteAddr(), request.getRemotePort(),
e1.getMessage(), e1);
}
}
}
@Override
public void close() throws IOException {
if (producer != null) {
if (!this.service.removeProducer(this)) {
log.warn("[{}] Failed to remove producer handler", producer.getTopic());
}
producer.closeAsync().thenAccept(x -> {
if (log.isDebugEnabled()) {
log.debug("[{}] Closed producer asynchronously", producer.getTopic());
}
}).exceptionally(exception -> {
log.warn("[{}] Failed to close producer", producer.getTopic(), exception);
return null;
});
}
}
@Override
public void onWebSocketText(String message) {
ProducerMessage sendRequest;
byte[] rawPayload = null;
String requestContext = null;
try {
sendRequest = ObjectMapperFactory.getThreadLocal().readValue(message, ProducerMessage.class);
requestContext = sendRequest.context;
rawPayload = Base64.getDecoder().decode(sendRequest.payload);
} catch (IOException e) {
sendAckResponse(new ProducerAck(FailedToDeserializeFromJSON, e.getMessage(), null, null));
return;
} catch (IllegalArgumentException e) {
String msg = format("Invalid Base64 message-payload error=%s", e.getMessage());
sendAckResponse(new ProducerAck(PayloadEncodingError, msg, null, requestContext));
return;
}
final long msgSize = rawPayload.length;
MessageBuilder builder = MessageBuilder.create().setContent(rawPayload);
if (sendRequest.properties != null) {
builder.setProperties(sendRequest.properties);
}
if (sendRequest.key != null) {
builder.setKey(sendRequest.key);
}
if (sendRequest.replicationClusters != null) {
builder.setReplicationClusters(sendRequest.replicationClusters);
}
Message msg = builder.build();
final long now = System.nanoTime();
producer.sendAsync(msg).thenAccept(msgId -> {
updateSentMsgStats(msgSize, TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - now));
if (isConnected()) {
String messageId = Base64.getEncoder().encodeToString(msgId.toByteArray());
sendAckResponse(new ProducerAck(messageId, sendRequest.context));
}
}).exceptionally(exception -> {
numMsgsFailed.increment();
sendAckResponse(
new ProducerAck(UnknownError, exception.getMessage(), null, sendRequest.context));
return null;
});
}
public Producer getProducer() {
return this.producer;
}
public long getAndResetNumMsgsSent() {
return numMsgsSent.sumThenReset();
}
public long getAndResetNumBytesSent() {
return numBytesSent.sumThenReset();
}
public long getAndResetNumMsgsFailed() {
return numMsgsFailed.sumThenReset();
}
public long[] getAndResetPublishLatencyStatsUSec() {
publishLatencyStatsUSec.refresh();
return publishLatencyStatsUSec.getBuckets();
}
public StatsBuckets getPublishLatencyStatsUSec() {
return this.publishLatencyStatsUSec;
}
public long getMsgPublishedCounter() {
return MSG_PUBLISHED_COUNTER_UPDATER.get(this);
}
@Override
protected Boolean isAuthorized(String authRole) throws Exception {
return service.getAuthorizationManager().canProduce(DestinationName.get(topic), authRole);
}
private void sendAckResponse(ProducerAck response) {
try {
String msg = ObjectMapperFactory.getThreadLocal().writeValueAsString(response);
getSession().getRemote().sendString(msg);
} catch (JsonProcessingException e) {
log.warn("[{}] Failed to generate ack json-response {}", producer.getTopic(), e.getMessage(), e);
} catch (Exception e) {
log.warn("[{}] Failed to send ack {}", producer.getTopic(), e.getMessage(), e);
}
}
private void updateSentMsgStats(long msgSize, long latencyUsec) {
this.publishLatencyStatsUSec.addValue(latencyUsec);
this.numBytesSent.add(msgSize);
this.numMsgsSent.increment();
MSG_PUBLISHED_COUNTER_UPDATER.getAndIncrement(this);
}
private ProducerConfiguration getProducerConfiguration() {
ProducerConfiguration conf = new ProducerConfiguration();
// Set to false to prevent the server thread from being blocked if a lot of messages are pending.
conf.setBlockIfQueueFull(false);
if (queryParams.containsKey("sendTimeoutMillis")) {
conf.setSendTimeout(Integer.parseInt(queryParams.get("sendTimeoutMillis")), TimeUnit.MILLISECONDS);
}
if (queryParams.containsKey("batchingEnabled")) {
conf.setBatchingEnabled(Boolean.parseBoolean(queryParams.get("batchingEnabled")));
}
if (queryParams.containsKey("batchingMaxMessages")) {
conf.setBatchingMaxMessages(Integer.parseInt(queryParams.get("batchingMaxMessages")));
}
if (queryParams.containsKey("maxPendingMessages")) {
conf.setMaxPendingMessages(Integer.parseInt(queryParams.get("maxPendingMessages")));
}
if (queryParams.containsKey("batchingMaxPublishDelay")) {
conf.setBatchingMaxPublishDelay(Integer.parseInt(queryParams.get("batchingMaxPublishDelay")),
TimeUnit.MILLISECONDS);
}
if (queryParams.containsKey("messageRoutingMode")) {
conf.setMessageRoutingMode(MessageRoutingMode.valueOf(queryParams.get("messageRoutingMode")));
}
if (queryParams.containsKey("compressionType")) {
conf.setCompressionType(CompressionType.valueOf(queryParams.get("compressionType")));
}
return conf;
}
private static final Logger log = LoggerFactory.getLogger(ProducerHandler.class);
}