| package org.apache.helix.manager.zk; |
| |
| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| import java.util.ArrayList; |
| import java.util.Iterator; |
| import java.util.concurrent.ConcurrentLinkedQueue; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicReference; |
| |
| import org.I0Itec.zkclient.DataUpdater; |
| import org.I0Itec.zkclient.exception.ZkBadVersionException; |
| import org.I0Itec.zkclient.exception.ZkNoNodeException; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.apache.zookeeper.data.Stat; |
| |
| public class HelixGroupCommit<T> { |
| private static Logger LOG = LoggerFactory.getLogger(HelixGroupCommit.class); |
| |
| private static class Queue<T> { |
| final AtomicReference<Thread> _running = new AtomicReference<Thread>(); |
| final ConcurrentLinkedQueue<Entry<T>> _pending = new ConcurrentLinkedQueue<Entry<T>>(); |
| } |
| |
| private static class Entry<T> { |
| final String _key; |
| final DataUpdater<T> _updater; |
| AtomicBoolean _sent = new AtomicBoolean(false); |
| boolean _isSuccess; |
| |
| Entry(String key, DataUpdater<T> updater) { |
| _key = key; |
| _updater = updater; |
| _isSuccess = true; |
| } |
| } |
| |
| private final Queue<T>[] _queues = new Queue[100]; |
| |
| public HelixGroupCommit() { |
| // Don't use Arrays.fill(); |
| for (int i = 0; i < _queues.length; ++i) { |
| _queues[i] = new Queue<T>(); |
| } |
| } |
| |
| private Queue<T> getQueue(String key) { |
| return _queues[(key.hashCode() & Integer.MAX_VALUE) % _queues.length]; |
| } |
| |
| public boolean commit(ZkBaseDataAccessor<T> accessor, int options, String key, |
| DataUpdater<T> updater) { |
| Queue<T> queue = getQueue(key); |
| Entry<T> entry = new Entry<T>(key, updater); |
| |
| queue._pending.add(entry); |
| |
| while (!entry._sent.get()) { |
| if (queue._running.compareAndSet(null, Thread.currentThread())) { |
| ArrayList<Entry<T>> processed = new ArrayList<Entry<T>>(); |
| boolean success = true; |
| try { |
| Entry<T> first = queue._pending.peek(); |
| if (first == null) { |
| return true; |
| } |
| |
| String mergedKey = first._key; |
| |
| boolean retry; |
| do { |
| retry = false; |
| |
| try { |
| T merged = null; |
| |
| Stat readStat = new Stat(); |
| |
| // to create a new znode, we need set version to -1 |
| readStat.setVersion(-1); |
| try { |
| // accessor will fallback to zk if not found in cache |
| merged = accessor.get(mergedKey, readStat, options); |
| } catch (ZkNoNodeException e) { |
| // OK |
| } |
| |
| // iterate over processed if we are retrying |
| Iterator<Entry<T>> it = processed.iterator(); |
| while (it.hasNext()) { |
| Entry<T> ent = it.next(); |
| if (!ent._key.equals(mergedKey)) { |
| continue; |
| } |
| merged = ent._updater.update(merged); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("After merging processed entry. path: " + mergedKey + ", value: " + merged); |
| } |
| } |
| |
| // iterate over queue._pending for newly coming requests |
| it = queue._pending.iterator(); |
| while (it.hasNext()) { |
| Entry<T> ent = it.next(); |
| if (!ent._key.equals(mergedKey)) { |
| continue; |
| } |
| processed.add(ent); |
| merged = ent._updater.update(merged); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("After merging pending entry. path: " + mergedKey + ", value: " + merged); |
| } |
| |
| it.remove(); |
| } |
| success = accessor.set(mergedKey, merged, readStat.getVersion(), options); |
| if (!success) { |
| LOG.error("Fail to group commit. path: " + mergedKey + ", value: " + merged |
| + ", version: " + readStat.getVersion()); |
| } |
| } catch (ZkBadVersionException e) { |
| retry = true; |
| } |
| } while (retry); |
| } finally { |
| queue._running.set(null); |
| for (Entry<T> e : processed) { |
| synchronized (e) { |
| e._sent.set(true); |
| e._isSuccess = success; |
| e.notify(); |
| } |
| } |
| } |
| } else { |
| synchronized (entry) { |
| try { |
| entry.wait(10); |
| } catch (InterruptedException e) { |
| e.printStackTrace(); |
| return false; |
| } |
| } |
| } |
| } |
| return entry._isSuccess; |
| } |
| } |