Apache Kafka

本贴最后更新于 2628 天前,其中的信息可能已经时移世改

Apache kafka 是分布式发布-订阅消息系统。
主要的组件:

·话题(Topic)

话题是特定类型的消息流
消息是字节的有效负载(payload)
而话题就是消息的分类名 或者 种子(Feed)名
已经发布的消息保存在一组服务器上面,它们被成为代理(Broker)或者Kafka集群

·生产者(Producer)

生产者是能够发布消息到话题的对象,任意对象!

·消费者(Consumer)

消费者可以订阅一个或者多个消息,并且从Broker中拉取数据,进而消费这些已经发布的消息。

架构如图所示

生产者代码

/** 
 * Instantiates a new Kafka producer. 
 * 
 * @param topic the topic 
 * @param directoryPath the directory path 
 */ 
public KafkaMailProducer(String topic, String directoryPath) { 
       props.put("serializer.class", "kafka.serializer.StringEncoder"); 
       props.put("metadata.broker.list", "localhost:9092"); 
       producer = new kafka.javaapi.producer.Producer(new ProducerConfig(props)); 
       this.topic = topic; 
       this.directoryPath = directoryPath; 
} 

public void run() { 
      Path dir = Paths.get(directoryPath); 
      try { 
           new WatchDir(dir).start(); 
           new ReadDir(dir).start(); 
      } catch (IOException e) { 
           e.printStackTrace(); 
      } 
} 

消费者代码

public KafkaMailConsumer(String topic) { 
       consumer = 
Kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig()); 
       this.topic = topic; 
} 

/** 
 * Creates the consumer config. 
 * 
 * @return the consumer config 
 */ 
private static ConsumerConfig createConsumerConfig() { 
      Properties props = new Properties(); 
      props.put("zookeeper.connect", KafkaMailProperties.zkConnect); 
      props.put("group.id", KafkaMailProperties.groupId); 
      props.put("zookeeper.session.timeout.ms", "400"); 
      props.put("zookeeper.sync.time.ms", "200"); 
      props.put("auto.commit.interval.ms", "1000"); 
      return new ConsumerConfig(props); 
} 

public void run() { 
      Map topicCountMap = new HashMap(); 
      topicCountMap.put(topic, new Integer(1)); 
      Map>> consumerMap = consumer.createMessageStreams(topicCountMap); 
      KafkaStream stream = consumerMap.get(topic).get(0); 
      ConsumerIterator it = stream.iterator();
      while (it.hasNext()) 
      System.out.println(new String(it.next().message())); 
}

相关帖子

回帖

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...