| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version |
| * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions |
| * and limitations under the License. |
| */ |
| |
| package org.apache.storm.spout; |
| |
| import static org.apache.storm.spout.CheckPointState.State.COMMITTED; |
| import static org.apache.storm.spout.CheckPointState.State.COMMITTING; |
| import static org.apache.storm.spout.CheckPointState.State.PREPARING; |
| |
| /** |
| * Captures the current state of the transaction in {@link CheckpointSpout}. The state transitions are as follows. |
| * <pre> |
| * ROLLBACK(tx2) |
| * <------------- PREPARE(tx2) COMMIT(tx2) |
| * COMMITTED(tx1)-------------> PREPARING(tx2) --------------> COMMITTING(tx2) -----------------> COMMITTED (tx2) |
| * |
| * </pre> |
| * |
| * <p>During recovery, if a previous transaction is in PREPARING state, it is rolled back since all bolts in the topology might not have |
| * prepared (saved) the data for commit. If the previous transaction is in COMMITTING state, it is rolled forward (committed) since some |
| * bolts might have already committed the data. |
| * |
| * <p>During normal flow, the state transitions from PREPARING to COMMITTING to COMMITTED. In case of failures the |
| * prepare/commit operation is retried. |
| */ |
| public class CheckPointState { |
| private long txid; |
| private State state; |
| |
| public CheckPointState(long txid, State state) { |
| this.txid = txid; |
| this.state = state; |
| } |
| |
| // for kryo |
| public CheckPointState() { |
| } |
| |
| public long getTxid() { |
| return txid; |
| } |
| |
| public State getState() { |
| return state; |
| } |
| |
| /** |
| * Get the next state based on this checkpoint state. |
| * |
| * @param recovering if in recovering phase |
| * @return the next checkpoint state based on this state. |
| */ |
| public CheckPointState nextState(boolean recovering) { |
| CheckPointState nextState; |
| switch (state) { |
| case PREPARING: |
| nextState = recovering ? new CheckPointState(txid - 1, COMMITTED) : new CheckPointState(txid, COMMITTING); |
| break; |
| case COMMITTING: |
| nextState = new CheckPointState(txid, COMMITTED); |
| break; |
| case COMMITTED: |
| nextState = recovering ? this : new CheckPointState(txid + 1, PREPARING); |
| break; |
| default: |
| throw new IllegalStateException("Unknown state " + state); |
| } |
| return nextState; |
| } |
| |
| /** |
| * Get the next action to perform based on this checkpoint state. |
| * |
| * @param recovering if in recovering phase |
| * @return the next action to perform based on this state |
| */ |
| public Action nextAction(boolean recovering) { |
| Action action; |
| switch (state) { |
| case PREPARING: |
| action = recovering ? Action.ROLLBACK : Action.PREPARE; |
| break; |
| case COMMITTING: |
| action = Action.COMMIT; |
| break; |
| case COMMITTED: |
| action = recovering ? Action.INITSTATE : Action.PREPARE; |
| break; |
| default: |
| throw new IllegalStateException("Unknown state " + state); |
| } |
| return action; |
| } |
| |
| @Override |
| public boolean equals(Object o) { |
| if (this == o) { |
| return true; |
| } |
| if (o == null || getClass() != o.getClass()) { |
| return false; |
| } |
| |
| CheckPointState that = (CheckPointState) o; |
| |
| if (txid != that.txid) { |
| return false; |
| } |
| return state == that.state; |
| |
| } |
| |
| @Override |
| public int hashCode() { |
| int result = (int) (txid ^ (txid >>> 32)); |
| result = 31 * result + (state != null ? state.hashCode() : 0); |
| return result; |
| } |
| |
| @Override |
| public String toString() { |
| return "CheckPointState{" |
| + "txid=" + txid |
| + ", state=" + state |
| + '}'; |
| } |
| |
| public enum State { |
| /** |
| * The checkpoint spout has committed the transaction. |
| */ |
| COMMITTED, |
| /** |
| * The checkpoint spout has started committing the transaction and the commit is in progress. |
| */ |
| COMMITTING, |
| /** |
| * The checkpoint spout has started preparing the transaction for commit and the prepare is in progress. |
| */ |
| PREPARING |
| } |
| |
| public enum Action { |
| /** |
| * prepare transaction for commit. |
| */ |
| PREPARE, |
| /** |
| * commit the previously prepared transaction. |
| */ |
| COMMIT, |
| /** |
| * rollback the previously prepared transaction. |
| */ |
| ROLLBACK, |
| /** |
| * initialize the state. |
| */ |
| INITSTATE |
| } |
| } |