Java源码示例:storm.kafka.bolt.KafkaBolt

示例1
public static void main(String[] args) throws Exception{
    System.setProperty("config.resource", "/application.conf");
    Config config = ConfigFactory.load();
    KafkaSpoutProvider provider = new KafkaSpoutProvider();
    IRichSpout spout = provider.getSpout(config);

    SecurityLogParserBolt bolt = new SecurityLogParserBolt();
    TopologyBuilder builder = new TopologyBuilder();

    int numOfSpoutTasks = config.getInt(SPOUT_TASK_NUM);
    int numOfParserTasks = config.getInt(PARSER_TASK_NUM);
    int numOfSinkTasks = config.getInt(SINK_TASK_NUM);

    builder.setSpout("ingest", spout, numOfSpoutTasks);
    BoltDeclarer boltDeclarer = builder.setBolt("parserBolt", bolt, numOfParserTasks);
    boltDeclarer.shuffleGrouping("ingest");

    KafkaBolt kafkaBolt = new KafkaBolt();
    BoltDeclarer kafkaBoltDeclarer = builder.setBolt("kafkaSink", kafkaBolt, numOfSinkTasks);
    kafkaBoltDeclarer.shuffleGrouping("parserBolt");

    StormTopology topology = builder.createTopology();

    TopologySubmitter.submit(topology, config);
}
 
示例2
private Map getKafkaConfig(Map options) {
  Map kafkaConfig = new HashMap();
  Map brokerConfig = new HashMap();
  String brokers = (String) Utils.get(options, BROKER_LIST, "localhost:9092");
  String topic = (String) Utils.get(options, TOPIC, KafkaUtils.DEFAULT_TOPIC);
  brokerConfig.put("metadata.broker.list", brokers);
  brokerConfig.put("serializer.class", "kafka.serializer.StringEncoder");
  brokerConfig.put("key.serializer.class", "kafka.serializer.StringEncoder");
  brokerConfig.put("request.required.acks", "1");
  kafkaConfig.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, brokerConfig);
  kafkaConfig.put(KafkaBolt.TOPIC, topic);
  return kafkaConfig;
}
 
示例3
private boolean initializeKafkaBolt(String name) {
	try {

		String messageUpstreamComponent = messageComponents
				.get(messageComponents.size() - 1);

		System.out.println("[OpenSOC] ------" + name
				+ " is initializing from " + messageUpstreamComponent);

		Map<String, String> kafka_broker_properties = new HashMap<String, String>();
		kafka_broker_properties.put("zk.connect",
				config.getString("kafka.zk"));
		kafka_broker_properties.put("metadata.broker.list",
				config.getString("kafka.br"));

		kafka_broker_properties.put("serializer.class",
				"com.opensoc.json.serialization.JSONKafkaSerializer");

		kafka_broker_properties.put("key.serializer.class",
				"kafka.serializer.StringEncoder");

		String output_topic = config.getString("bolt.kafka.topic");

		conf.put("kafka.broker.properties", kafka_broker_properties);
		conf.put("topic", output_topic);

		builder.setBolt(name, new KafkaBolt<String, JSONObject>(),
				config.getInt("bolt.kafka.parallelism.hint"))
				.shuffleGrouping(messageUpstreamComponent, "message")
				.setNumTasks(config.getInt("bolt.kafka.num.tasks"));
	} catch (Exception e) {
		e.printStackTrace();
		System.exit(0);
	}
	return true;
}
 
示例4
public static void submit(StormTopology topology, Config config){
    backtype.storm.Config stormConfig = new backtype.storm.Config();
    int messageTimeoutSecs = config.hasPath(MESSAGE_TIMEOUT_SECS)?config.getInt(MESSAGE_TIMEOUT_SECS) : DEFAULT_MESSAGE_TIMEOUT_SECS;
    LOG.info("Set topology.message.timeout.secs as {}",messageTimeoutSecs);
    stormConfig.setMessageTimeoutSecs(messageTimeoutSecs);

    // set kafka sink
    if(config.hasPath("dataSinkConfig.brokerList")){
        Map props = new HashMap<>();
        props.put("metadata.broker.list", config.getString("dataSinkConfig.brokerList"));
        props.put("serializer.class", config.getString("dataSinkConfig.serializerClass"));
        props.put("key.serializer.class", config.getString("dataSinkConfig.keySerializerClass"));
        stormConfig.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, props);
    }

    if(config.hasPath("dataSinkConfig.serializerClass")){
        stormConfig.put(KafkaBolt.TOPIC, config.getString("dataSinkConfig.topic"));
    }

    if(config.hasPath("dataSinkConfig.topic")){
        stormConfig.put(KafkaBolt.TOPIC, config.getString("dataSinkConfig.topic"));
    }

    boolean localMode = config.getBoolean(LOCAL_MODE);
    int numOfTotalWorkers = config.getInt(TOTAL_WORKER_NUM);
    stormConfig.setNumWorkers(numOfTotalWorkers);
    String topologyId = config.getString("topology.name");
    if(localMode) {
        LOG.info("Submitting as local mode");
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology(topologyId, stormConfig, topology);
        Utils.sleep(Long.MAX_VALUE);
    }else{
        LOG.info("Submitting as cluster mode");
        try {
            StormSubmitter.submitTopologyWithProgressBar(topologyId, stormConfig, topology);
        } catch(Exception ex) {
            LOG.error("fail submitting topology {}", topology, ex);
            throw new IllegalStateException(ex);
        }
    }
}