blob: 7b327dac45c40ba8fd2918df2ae90fc1643bcc9b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/
package org.apache.storm.spout;
import static org.apache.storm.spout.CheckPointState.State.COMMITTED;
import static org.apache.storm.spout.CheckPointState.State.COMMITTING;
import static org.apache.storm.spout.CheckPointState.State.PREPARING;
/**
* Captures the current state of the transaction in {@link CheckpointSpout}. The state transitions are as follows.
* <pre>
* ROLLBACK(tx2)
* <------------- PREPARE(tx2) COMMIT(tx2)
* COMMITTED(tx1)-------------> PREPARING(tx2) --------------> COMMITTING(tx2) -----------------> COMMITTED (tx2)
*
* </pre>
*
* <p>During recovery, if a previous transaction is in PREPARING state, it is rolled back since all bolts in the topology might not have
* prepared (saved) the data for commit. If the previous transaction is in COMMITTING state, it is rolled forward (committed) since some
* bolts might have already committed the data.
*
* <p>During normal flow, the state transitions from PREPARING to COMMITTING to COMMITTED. In case of failures the
* prepare/commit operation is retried.
*/
public class CheckPointState {
private long txid;
private State state;
public CheckPointState(long txid, State state) {
this.txid = txid;
this.state = state;
}
// for kryo
public CheckPointState() {
}
public long getTxid() {
return txid;
}
public State getState() {
return state;
}
/**
* Get the next state based on this checkpoint state.
*
* @param recovering if in recovering phase
* @return the next checkpoint state based on this state.
*/
public CheckPointState nextState(boolean recovering) {
CheckPointState nextState;
switch (state) {
case PREPARING:
nextState = recovering ? new CheckPointState(txid - 1, COMMITTED) : new CheckPointState(txid, COMMITTING);
break;
case COMMITTING:
nextState = new CheckPointState(txid, COMMITTED);
break;
case COMMITTED:
nextState = recovering ? this : new CheckPointState(txid + 1, PREPARING);
break;
default:
throw new IllegalStateException("Unknown state " + state);
}
return nextState;
}
/**
* Get the next action to perform based on this checkpoint state.
*
* @param recovering if in recovering phase
* @return the next action to perform based on this state
*/
public Action nextAction(boolean recovering) {
Action action;
switch (state) {
case PREPARING:
action = recovering ? Action.ROLLBACK : Action.PREPARE;
break;
case COMMITTING:
action = Action.COMMIT;
break;
case COMMITTED:
action = recovering ? Action.INITSTATE : Action.PREPARE;
break;
default:
throw new IllegalStateException("Unknown state " + state);
}
return action;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
CheckPointState that = (CheckPointState) o;
if (txid != that.txid) {
return false;
}
return state == that.state;
}
@Override
public int hashCode() {
int result = (int) (txid ^ (txid >>> 32));
result = 31 * result + (state != null ? state.hashCode() : 0);
return result;
}
@Override
public String toString() {
return "CheckPointState{"
+ "txid=" + txid
+ ", state=" + state
+ '}';
}
public enum State {
/**
* The checkpoint spout has committed the transaction.
*/
COMMITTED,
/**
* The checkpoint spout has started committing the transaction and the commit is in progress.
*/
COMMITTING,
/**
* The checkpoint spout has started preparing the transaction for commit and the prepare is in progress.
*/
PREPARING
}
public enum Action {
/**
* prepare transaction for commit.
*/
PREPARE,
/**
* commit the previously prepared transaction.
*/
COMMIT,
/**
* rollback the previously prepared transaction.
*/
ROLLBACK,
/**
* initialize the state.
*/
INITSTATE
}
}