Java源码示例:storm.kafka.bolt.KafkaBolt
示例1
public static void main(String[] args) throws Exception{
System.setProperty("config.resource", "/application.conf");
Config config = ConfigFactory.load();
KafkaSpoutProvider provider = new KafkaSpoutProvider();
IRichSpout spout = provider.getSpout(config);
SecurityLogParserBolt bolt = new SecurityLogParserBolt();
TopologyBuilder builder = new TopologyBuilder();
int numOfSpoutTasks = config.getInt(SPOUT_TASK_NUM);
int numOfParserTasks = config.getInt(PARSER_TASK_NUM);
int numOfSinkTasks = config.getInt(SINK_TASK_NUM);
builder.setSpout("ingest", spout, numOfSpoutTasks);
BoltDeclarer boltDeclarer = builder.setBolt("parserBolt", bolt, numOfParserTasks);
boltDeclarer.shuffleGrouping("ingest");
KafkaBolt kafkaBolt = new KafkaBolt();
BoltDeclarer kafkaBoltDeclarer = builder.setBolt("kafkaSink", kafkaBolt, numOfSinkTasks);
kafkaBoltDeclarer.shuffleGrouping("parserBolt");
StormTopology topology = builder.createTopology();
TopologySubmitter.submit(topology, config);
}
示例2
private Map getKafkaConfig(Map options) {
Map kafkaConfig = new HashMap();
Map brokerConfig = new HashMap();
String brokers = (String) Utils.get(options, BROKER_LIST, "localhost:9092");
String topic = (String) Utils.get(options, TOPIC, KafkaUtils.DEFAULT_TOPIC);
brokerConfig.put("metadata.broker.list", brokers);
brokerConfig.put("serializer.class", "kafka.serializer.StringEncoder");
brokerConfig.put("key.serializer.class", "kafka.serializer.StringEncoder");
brokerConfig.put("request.required.acks", "1");
kafkaConfig.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, brokerConfig);
kafkaConfig.put(KafkaBolt.TOPIC, topic);
return kafkaConfig;
}
示例3
private boolean initializeKafkaBolt(String name) {
try {
String messageUpstreamComponent = messageComponents
.get(messageComponents.size() - 1);
System.out.println("[OpenSOC] ------" + name
+ " is initializing from " + messageUpstreamComponent);
Map<String, String> kafka_broker_properties = new HashMap<String, String>();
kafka_broker_properties.put("zk.connect",
config.getString("kafka.zk"));
kafka_broker_properties.put("metadata.broker.list",
config.getString("kafka.br"));
kafka_broker_properties.put("serializer.class",
"com.opensoc.json.serialization.JSONKafkaSerializer");
kafka_broker_properties.put("key.serializer.class",
"kafka.serializer.StringEncoder");
String output_topic = config.getString("bolt.kafka.topic");
conf.put("kafka.broker.properties", kafka_broker_properties);
conf.put("topic", output_topic);
builder.setBolt(name, new KafkaBolt<String, JSONObject>(),
config.getInt("bolt.kafka.parallelism.hint"))
.shuffleGrouping(messageUpstreamComponent, "message")
.setNumTasks(config.getInt("bolt.kafka.num.tasks"));
} catch (Exception e) {
e.printStackTrace();
System.exit(0);
}
return true;
}
示例4
public static void submit(StormTopology topology, Config config){
backtype.storm.Config stormConfig = new backtype.storm.Config();
int messageTimeoutSecs = config.hasPath(MESSAGE_TIMEOUT_SECS)?config.getInt(MESSAGE_TIMEOUT_SECS) : DEFAULT_MESSAGE_TIMEOUT_SECS;
LOG.info("Set topology.message.timeout.secs as {}",messageTimeoutSecs);
stormConfig.setMessageTimeoutSecs(messageTimeoutSecs);
// set kafka sink
if(config.hasPath("dataSinkConfig.brokerList")){
Map props = new HashMap<>();
props.put("metadata.broker.list", config.getString("dataSinkConfig.brokerList"));
props.put("serializer.class", config.getString("dataSinkConfig.serializerClass"));
props.put("key.serializer.class", config.getString("dataSinkConfig.keySerializerClass"));
stormConfig.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, props);
}
if(config.hasPath("dataSinkConfig.serializerClass")){
stormConfig.put(KafkaBolt.TOPIC, config.getString("dataSinkConfig.topic"));
}
if(config.hasPath("dataSinkConfig.topic")){
stormConfig.put(KafkaBolt.TOPIC, config.getString("dataSinkConfig.topic"));
}
boolean localMode = config.getBoolean(LOCAL_MODE);
int numOfTotalWorkers = config.getInt(TOTAL_WORKER_NUM);
stormConfig.setNumWorkers(numOfTotalWorkers);
String topologyId = config.getString("topology.name");
if(localMode) {
LOG.info("Submitting as local mode");
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(topologyId, stormConfig, topology);
Utils.sleep(Long.MAX_VALUE);
}else{
LOG.info("Submitting as cluster mode");
try {
StormSubmitter.submitTopologyWithProgressBar(topologyId, stormConfig, topology);
} catch(Exception ex) {
LOG.error("fail submitting topology {}", topology, ex);
throw new IllegalStateException(ex);
}
}
}