1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| private Map<String, IRichBolt> _bolts = new HashMap<>(); private Map<String, IRichSpout> _spouts = new HashMap<>();
public SpoutDeclarer setSpout(String id, IRichSpout spout, Number parallelism_hint) throws IllegalArgumentException { validateUnusedId(id); initCommon(id, spout, parallelism_hint); _spouts.put(id, spout); return new SpoutGetter(id); }
public BoltDeclarer setBolt(String id, IRichBolt bolt, Number parallelism_hint) throws IllegalArgumentException { validateUnusedId(id); initCommon(id, bolt, parallelism_hint); _bolts.put(id, bolt); return new BoltGetter(id); }
private void initCommon(String id, IComponent component, Number parallelism) throws IllegalArgumentException { ComponentCommon common = new ComponentCommon(); common.set_inputs(new HashMap<GlobalStreamId, Grouping>()); if(parallelism!=null) { int dop = parallelism.intValue(); if(dop < 1) { throw new IllegalArgumentException("Parallelism must be positive."); }
common.set_parallelism_hint(dop); }
Map conf = component.getComponentConfiguration(); if(conf!=null) common.set_json_conf(JSONValue.toJSONString(conf)); _commons.put(id, common); }
String currConf = _commons.get(_id).get_json_conf(); _commons.get(_id).set_json_conf(mergeIntoJson(parseJson(currConf), conf));
|