blob: 84e712be4981c34a8a6906ff914efa7b5f348268 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.websocket;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import java.io.IOException;
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Base64;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.LongAdder;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderConfiguration;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.ReaderImpl;
import org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.websocket.data.ConsumerMessage;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.core.JsonProcessingException;
/**
*
* WebSocket end-point url handler to handle incoming receive.
* <p>
* <b>receive:</b> socket-proxy keeps pushing messages to client by writing into session.<br/>
* </P>
*
*/
public class ReaderHandler extends AbstractWebSocketHandler {
private String subscription;
private final ReaderConfiguration conf;
private Reader reader;
private final int maxPendingMessages;
private final AtomicInteger pendingMessages = new AtomicInteger();
private final LongAdder numMsgsDelivered;
private final LongAdder numBytesDelivered;
private volatile long msgDeliveredCounter = 0;
private static final AtomicLongFieldUpdater<ReaderHandler> MSG_DELIVERED_COUNTER_UPDATER =
AtomicLongFieldUpdater.newUpdater(ReaderHandler.class, "msgDeliveredCounter");
public ReaderHandler(WebSocketService service, HttpServletRequest request, ServletUpgradeResponse response) {
super(service, request, response);
this.subscription = "";
this.conf = getReaderConfiguration();
this.maxPendingMessages = (conf.getReceiverQueueSize() == 0) ? 1 : conf.getReceiverQueueSize();
this.numMsgsDelivered = new LongAdder();
this.numBytesDelivered = new LongAdder();
try {
this.reader = service.getPulsarClient().createReader(topic, getMessageId(), conf);
this.subscription = ((ReaderImpl)this.reader).getConsumer().getSubscription();
if (!this.service.addReader(this)) {
log.warn("[{}:{}] Failed to add reader handler for topic {}", request.getRemoteAddr(),
request.getRemotePort(), topic);
}
} catch (Exception e) {
log.warn("[{}:{}] Failed in creating reader {} on topic {}", request.getRemoteAddr(),
request.getRemotePort(), subscription, topic, e);
try {
response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, "Failed to create reader");
} catch (IOException e1) {
log.warn("[{}:{}] Failed to send error: {}", request.getRemoteAddr(), request.getRemotePort(),
e1.getMessage(), e1);
}
}
}
private void receiveMessage() {
if (log.isDebugEnabled()) {
log.debug("[{}:{}] [{}] [{}] Receive next message", request.getRemoteAddr(), request.getRemotePort(), topic, subscription);
}
reader.readNextAsync().thenAccept(msg -> {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] [{}] Got message {}", getSession().getRemoteAddress(), topic, subscription,
msg.getMessageId());
}
ConsumerMessage dm = new ConsumerMessage();
dm.messageId = Base64.getEncoder().encodeToString(msg.getMessageId().toByteArray());
dm.payload = Base64.getEncoder().encodeToString(msg.getData());
dm.properties = msg.getProperties();
dm.publishTime = DateFormatter.format(msg.getPublishTime());
if (msg.getEventTime() != 0) {
dm.eventTime = DateFormatter.format(msg.getEventTime());
}
if (msg.hasKey()) {
dm.key = msg.getKey();
}
final long msgSize = msg.getData().length;
try {
getSession().getRemote()
.sendString(ObjectMapperFactory.getThreadLocal().writeValueAsString(dm), new WriteCallback() {
@Override
public void writeFailed(Throwable th) {
log.warn("[{}/{}] Failed to deliver msg to {} {}", reader.getTopic(), subscription,
getRemote().getInetSocketAddress().toString(), th.getMessage());
pendingMessages.decrementAndGet();
// schedule receive as one of the delivery failed
service.getExecutor().execute(() -> receiveMessage());
}
@Override
public void writeSuccess() {
if (log.isDebugEnabled()) {
log.debug("[{}/{}] message is delivered successfully to {} ", reader.getTopic(),
subscription, getRemote().getInetSocketAddress().toString());
}
updateDeliverMsgStat(msgSize);
}
});
} catch (JsonProcessingException e) {
close(WebSocketError.FailedToSerializeToJSON);
}
int pending = pendingMessages.incrementAndGet();
if (pending < maxPendingMessages) {
// Start next read in a separate thread to avoid recursion
service.getExecutor().execute(() -> receiveMessage());
}
}).exceptionally(exception -> {
log.warn("[{}/{}] Failed to deliver msg to {} {}", reader.getTopic(),
subscription, getRemote().getInetSocketAddress().toString(), exception);
return null;
});
}
@Override
public void onWebSocketConnect(Session session) {
super.onWebSocketConnect(session);
receiveMessage();
}
@Override
public void onWebSocketText(String message) {
super.onWebSocketText(message);
// We should have received an ack
// but reader doesn't send an ack to broker here because already reader did
int pending = pendingMessages.getAndDecrement();
if (pending >= maxPendingMessages) {
// Resume delivery
receiveMessage();
}
}
@Override
public void close() throws IOException {
if (reader != null) {
if (!this.service.removeReader(this)) {
log.warn("[{}] Failed to remove reader handler", reader.getTopic());
}
reader.closeAsync().thenAccept(x -> {
if (log.isDebugEnabled()) {
log.debug("[{}] Closed reader asynchronously", reader.getTopic());
}
}).exceptionally(exception -> {
log.warn("[{}] Failed to close reader", reader.getTopic(), exception);
return null;
});
}
}
public Consumer getConsumer() {
return ((ReaderImpl)reader).getConsumer();
}
public String getSubscription() {
return subscription;
}
public SubscriptionType getSubscriptionType() {
return SubscriptionType.Exclusive;
}
public long getAndResetNumMsgsDelivered() {
return numMsgsDelivered.sumThenReset();
}
public long getAndResetNumBytesDelivered() {
return numBytesDelivered.sumThenReset();
}
public long getMsgDeliveredCounter() {
return MSG_DELIVERED_COUNTER_UPDATER.get(this);
}
protected void updateDeliverMsgStat(long msgSize) {
numMsgsDelivered.increment();
MSG_DELIVERED_COUNTER_UPDATER.incrementAndGet(this);
numBytesDelivered.add(msgSize);
}
private ReaderConfiguration getReaderConfiguration() {
ReaderConfiguration conf = new ReaderConfiguration();
if (queryParams.containsKey("readerName")) {
conf.setReaderName(queryParams.get("readerName"));
}
if (queryParams.containsKey("receiverQueueSize")) {
conf.setReceiverQueueSize(Math.min(Integer.parseInt(queryParams.get("receiverQueueSize")), 1000));
}
return conf;
}
@Override
protected Boolean isAuthorized(String authRole) throws Exception {
return service.getAuthorizationManager().canConsume(DestinationName.get(topic), authRole);
}
private MessageId getMessageId() throws IOException {
MessageId messageId = MessageId.latest;
if (isNotBlank(queryParams.get("messageId"))) {
if (queryParams.get("messageId").equals("earliest")) {
messageId = MessageId.earliest;
} else if (!queryParams.get("messageId").equals("latest")) {
messageId = MessageIdImpl.fromByteArray(Base64.getDecoder().decode(queryParams.get("messageId")));
}
}
return messageId;
}
private static final Logger log = LoggerFactory.getLogger(ReaderHandler.class);
}