blob: 25c133391cb65e47c30aa8d3edd8356c46f13d27 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.apache.pulsar.client.impl;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.UnaryOperator;
import org.apache.commons.lang3.StringUtils;
abstract class HandlerState {
protected final PulsarClientImpl client;
protected final String topic;
protected volatile URI redirectedClusterURI;
private static final AtomicReferenceFieldUpdater<HandlerState, State> STATE_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(HandlerState.class, State.class, "state");
private volatile State state = null;
enum State {
Uninitialized, // Not initialized
Connecting, // Client connecting to broker
Ready, // Handler is being used
Closing, // Close cmd has been sent to broker
Closed, // Broker acked the close
Terminated, // Topic associated with this handler
// has been terminated
Failed, // Handler is failed
RegisteringSchema, // Handler is registering schema
ProducerFenced, // The producer has been fenced by the broker
public HandlerState(PulsarClientImpl client, String topic) {
this.client = client;
this.topic = topic;
STATE_UPDATER.set(this, State.Uninitialized);
protected void setRedirectedClusterURI(String serviceUrl, String serviceUrlTls) throws URISyntaxException {
String url = client.conf.isUseTls() && StringUtils.isNotBlank(serviceUrlTls) ? serviceUrlTls : serviceUrl;
this.redirectedClusterURI = new URI(url);
// moves the state to ready if it wasn't closed
protected boolean changeToReadyState() {
if (STATE_UPDATER.get(this) == State.Ready) {
return true;
return (STATE_UPDATER.compareAndSet(this, State.Uninitialized, State.Ready)
|| STATE_UPDATER.compareAndSet(this, State.Connecting, State.Ready)
|| STATE_UPDATER.compareAndSet(this, State.RegisteringSchema, State.Ready));
protected boolean changeToRegisteringSchemaState() {
return STATE_UPDATER.compareAndSet(this, State.Ready, State.RegisteringSchema);
protected State getState() {
return STATE_UPDATER.get(this);
protected boolean changeToConnecting() {
if (STATE_UPDATER.get(this) == State.Connecting) {
return true;
return (STATE_UPDATER.compareAndSet(this, State.Uninitialized, State.Connecting)
|| STATE_UPDATER.compareAndSet(this, State.Ready, State.Connecting)
|| STATE_UPDATER.compareAndSet(this, State.RegisteringSchema, State.Connecting));
protected void setState(State s) {
STATE_UPDATER.set(this, s);
abstract String getHandlerName();
protected State getAndUpdateState(final UnaryOperator<State> updater) {
return STATE_UPDATER.getAndUpdate(this, updater);
public PulsarClientImpl getClient() {
return client;