/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package backtype.storm.coordination;

import backtype.storm.Constants;
import backtype.storm.coordination.CoordinatedBolt.SourceArgs;
import backtype.storm.generated.GlobalStreamId;
import backtype.storm.generated.Grouping;
import backtype.storm.grouping.CustomStreamGrouping;
import backtype.storm.grouping.PartialKeyGrouping;
import backtype.storm.topology.BaseConfigurationDeclarer;
import backtype.storm.topology.BasicBoltExecutor;
import backtype.storm.topology.BoltDeclarer;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.InputDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

public class BatchSubtopologyBuilder {
    Map<String, Component> _bolts = new HashMap<String, Component>();
    Component _masterBolt;
    String _masterId;

    public BatchSubtopologyBuilder(String masterBoltId, IBasicBolt masterBolt, Number boltParallelism) {
        Integer p = boltParallelism == null ? null : boltParallelism.intValue();
        _masterBolt = new Component(new BasicBoltExecutor(masterBolt), p);
        _masterId = masterBoltId;
    }

    public BatchSubtopologyBuilder(String masterBoltId, IBasicBolt masterBolt) {
        this(masterBoltId, masterBolt, null);
    }

    public BoltDeclarer getMasterDeclarer() {
        return new BoltDeclarerImpl(_masterBolt);
    }

    public BoltDeclarer setBolt(String id, IBatchBolt bolt) {
        return setBolt(id, bolt, null);
    }

    public BoltDeclarer setBolt(String id, IBatchBolt bolt, Number parallelism) {
        return setBolt(id, new BatchBoltExecutor(bolt), parallelism);
    }

    public BoltDeclarer setBolt(String id, IBasicBolt bolt) {
        return setBolt(id, bolt, null);
    }

    public BoltDeclarer setBolt(String id, IBasicBolt bolt, Number parallelism) {
        return setBolt(id, new BasicBoltExecutor(bolt), parallelism);
    }

    private BoltDeclarer setBolt(String id, IRichBolt bolt, Number parallelism) {
        Integer p = null;
        if (parallelism != null)
            p = parallelism.intValue();
        Component component = new Component(bolt, p);
        _bolts.put(id, component);
        return new BoltDeclarerImpl(component);
    }

    public void extendTopology(TopologyBuilder builder) {
        BoltDeclarer declarer = builder.setBolt(_masterId, new CoordinatedBolt(_masterBolt.bolt), _masterBolt.parallelism);
        for (InputDeclaration decl : _masterBolt.declarations) {
            decl.declare(declarer);
        }
        for (Map conf : _masterBolt.componentConfs) {
            declarer.addConfigurations(conf);
        }
        for (String id : _bolts.keySet()) {
            Component component = _bolts.get(id);
            Map<String, SourceArgs> coordinatedArgs = new HashMap<String, SourceArgs>();
            for (String c : componentBoltSubscriptions(component)) {
                SourceArgs source;
                if (c.equals(_masterId)) {
                    source = SourceArgs.single();
                } else {
                    source = SourceArgs.all();
                }
                coordinatedArgs.put(c, source);
            }

            BoltDeclarer input = builder.setBolt(id, new CoordinatedBolt(component.bolt, coordinatedArgs, null), component.parallelism);
            for (Map conf : component.componentConfs) {
                input.addConfigurations(conf);
            }
            for (String c : componentBoltSubscriptions(component)) {
                input.directGrouping(c, Constants.COORDINATED_STREAM_ID);
            }
            for (InputDeclaration d : component.declarations) {
                d.declare(input);
            }
        }
    }

    private Set<String> componentBoltSubscriptions(Component component) {
        Set<String> ret = new HashSet<String>();
        for (InputDeclaration d : component.declarations) {
            ret.add(d.getComponent());
        }
        return ret;
    }

    private static class Component {
        public IRichBolt bolt;
        public Integer parallelism;
        public List<InputDeclaration> declarations = new ArrayList<InputDeclaration>();
        public List<Map> componentConfs = new ArrayList<Map>();

        public Component(IRichBolt bolt, Integer parallelism) {
            this.bolt = bolt;
            this.parallelism = parallelism;
        }
    }

    private static interface InputDeclaration {
        void declare(InputDeclarer declarer);

        String getComponent();
    }

    private class BoltDeclarerImpl extends BaseConfigurationDeclarer<BoltDeclarer> implements BoltDeclarer {
        Component _component;

        public BoltDeclarerImpl(Component component) {
            _component = component;
        }

        @Override
        public BoltDeclarer fieldsGrouping(final String component, final Fields fields) {
            addDeclaration(new InputDeclaration() {
                @Override
                public void declare(InputDeclarer declarer) {
                    declarer.fieldsGrouping(component, fields);
                }

                @Override
                public String getComponent() {
                    return component;
                }
            });
            return this;
        }

        @Override
        public BoltDeclarer fieldsGrouping(final String component, final String streamId, final Fields fields) {
            addDeclaration(new InputDeclaration() {
                @Override
                public void declare(InputDeclarer declarer) {
                    declarer.fieldsGrouping(component, streamId, fields);
                }

                @Override
                public String getComponent() {
                    return component;
                }
            });
            return this;
        }

        @Override
        public BoltDeclarer globalGrouping(final String component) {
            addDeclaration(new InputDeclaration() {
                @Override
                public void declare(InputDeclarer declarer) {
                    declarer.globalGrouping(component);
                }

                @Override
                public String getComponent() {
                    return component;
                }
            });
            return this;
        }

        @Override
        public BoltDeclarer globalGrouping(final String component, final String streamId) {
            addDeclaration(new InputDeclaration() {
                @Override
                public void declare(InputDeclarer declarer) {
                    declarer.globalGrouping(component, streamId);
                }

                @Override
                public String getComponent() {
                    return component;
                }
            });
            return this;
        }

        @Override
        public BoltDeclarer shuffleGrouping(final String component) {
            addDeclaration(new InputDeclaration() {
                @Override
                public void declare(InputDeclarer declarer) {
                    declarer.shuffleGrouping(component);
                }

                @Override
                public String getComponent() {
                    return component;
                }
            });
            return this;
        }

        @Override
        public BoltDeclarer shuffleGrouping(final String component, final String streamId) {
            addDeclaration(new InputDeclaration() {
                @Override
                public void declare(InputDeclarer declarer) {
                    declarer.shuffleGrouping(component, streamId);
                }

                @Override
                public String getComponent() {
                    return component;
                }
            });
            return this;
        }

        @Override
        public BoltDeclarer localOrShuffleGrouping(final String component) {
            addDeclaration(new InputDeclaration() {
                @Override
                public void declare(InputDeclarer declarer) {
                    declarer.localOrShuffleGrouping(component);
                }

                @Override
                public String getComponent() {
                    return component;
                }
            });
            return this;
        }

        @Override
        public BoltDeclarer localOrShuffleGrouping(final String component, final String streamId) {
            addDeclaration(new InputDeclaration() {
                @Override
                public void declare(InputDeclarer declarer) {
                    declarer.localOrShuffleGrouping(component, streamId);
                }

                @Override
                public String getComponent() {
                    return component;
                }
            });
            return this;
        }

        @Override
        public BoltDeclarer localFirstGrouping(final String componentId) {
            addDeclaration(new InputDeclaration() {
                @Override
                public void declare(InputDeclarer declarer) {
                    declarer.localFirstGrouping(componentId);
                }

                @Override
                public String getComponent() {
                    return componentId;
                }
            });
            return this;
        }

        @Override
        public BoltDeclarer localFirstGrouping(final String component, final String streamId) {
            addDeclaration(new InputDeclaration() {
                @Override
                public void declare(InputDeclarer declarer) {
                    declarer.localFirstGrouping(component, streamId);
                }

                @Override
                public String getComponent() {
                    return component;
                }
            });
            return this;
        }

        @Override
        public BoltDeclarer noneGrouping(final String component) {
            addDeclaration(new InputDeclaration() {
                @Override
                public void declare(InputDeclarer declarer) {
                    declarer.noneGrouping(component);
                }

                @Override
                public String getComponent() {
                    return component;
                }
            });
            return this;
        }

        @Override
        public BoltDeclarer noneGrouping(final String component, final String streamId) {
            addDeclaration(new InputDeclaration() {
                @Override
                public void declare(InputDeclarer declarer) {
                    declarer.noneGrouping(component, streamId);
                }

                @Override
                public String getComponent() {
                    return component;
                }
            });
            return this;
        }

        @Override
        public BoltDeclarer allGrouping(final String component) {
            addDeclaration(new InputDeclaration() {
                @Override
                public void declare(InputDeclarer declarer) {
                    declarer.allGrouping(component);
                }

                @Override
                public String getComponent() {
                    return component;
                }
            });
            return this;
        }

        @Override
        public BoltDeclarer allGrouping(final String component, final String streamId) {
            addDeclaration(new InputDeclaration() {
                @Override
                public void declare(InputDeclarer declarer) {
                    declarer.allGrouping(component, streamId);
                }

                @Override
                public String getComponent() {
                    return component;
                }
            });
            return this;
        }

        @Override
        public BoltDeclarer directGrouping(final String component) {
            addDeclaration(new InputDeclaration() {
                @Override
                public void declare(InputDeclarer declarer) {
                    declarer.directGrouping(component);
                }

                @Override
                public String getComponent() {
                    return component;
                }
            });
            return this;
        }

        @Override
        public BoltDeclarer directGrouping(final String component, final String streamId) {
            addDeclaration(new InputDeclaration() {
                @Override
                public void declare(InputDeclarer declarer) {
                    declarer.directGrouping(component, streamId);
                }

                @Override
                public String getComponent() {
                    return component;
                }
            });
            return this;
        }

        @Override
        public BoltDeclarer partialKeyGrouping(String componentId, Fields fields) {
            return customGrouping(componentId, new PartialKeyGrouping(fields));
        }

        @Override
        public BoltDeclarer partialKeyGrouping(String componentId, String streamId, Fields fields) {
            return customGrouping(componentId, streamId, new PartialKeyGrouping(fields));
        }

        @Override
        public BoltDeclarer customGrouping(final String component, final CustomStreamGrouping grouping) {
            addDeclaration(new InputDeclaration() {
                @Override
                public void declare(InputDeclarer declarer) {
                    declarer.customGrouping(component, grouping);
                }

                @Override
                public String getComponent() {
                    return component;
                }
            });
            return this;
        }

        @Override
        public BoltDeclarer customGrouping(final String component, final String streamId, final CustomStreamGrouping grouping) {
            addDeclaration(new InputDeclaration() {
                @Override
                public void declare(InputDeclarer declarer) {
                    declarer.customGrouping(component, streamId, grouping);
                }

                @Override
                public String getComponent() {
                    return component;
                }
            });
            return this;
        }

        @Override
        public BoltDeclarer grouping(final GlobalStreamId stream, final Grouping grouping) {
            addDeclaration(new InputDeclaration() {
                @Override
                public void declare(InputDeclarer declarer) {
                    declarer.grouping(stream, grouping);
                }

                @Override
                public String getComponent() {
                    return stream.get_componentId();
                }
            });
            return this;
        }

        private void addDeclaration(InputDeclaration declaration) {
            _component.declarations.add(declaration);
        }

        @Override
        public BoltDeclarer addConfigurations(Map conf) {
            _component.componentConfs.add(conf);
            return this;
        }
    }
}
