本贴最后更新于 264 天前,其中的信息可能已经时过境迁

37 套精品 Java 架构师高并发高性能高可用分布式集群电商缓存性能调优设计项目实战视教程 置顶! 有更新!

四、接口示例

1. 生产者

kafka 客户端发布 record(消息) 到 kafka 集群。

新的生产者是线程安全的,在线程之间共享单个生产者实例,通常单例比多个实例要快。

简单示例

使用 producer 发送一个有序的 key/value(键值对),放到 java 的 main 方法里就能直接运行

package com.boom.kafka.KafkaDemo;

 

import java.util.Properties;

 

import org.apache.kafka.clients.producer.KafkaProducer;

import org.apache.kafka.clients.producer.Producer;

import org.apache.kafka.clients.producer.ProducerRecord;

 

public class ProducerDemo {

public static void main(String[] args) {

Properties props = new Properties();

 props.put("bootstrap.servers""192.168.211.133:9092");

 props.put("acks""all");

 props.put("retries", 0);

 props.put("batch.size", 16384);

 props.put("linger.ms", 1);

 props.put("buffer.memory", 33554432);

 props.put("key.serializer""org.apache.kafka.common.serialization.StringSerializer");

 props.put("value.serializer""org.apache.kafka.common.serialization.StringSerializer");

 

 Producer<String, String> producer = new KafkaProducer(props);

 for(int i = 0; i < 10; i++)

     producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));

 

 producer.close();

}

}

 

生产者的缓冲空间池保留尚未发送到服务器的消息,后台 I/O 线程负责将这些消息转换成请求发送到集群。如果使用后不关闭生产者,则会泄露这些资源。

send()方法是异步的,添加消息到缓冲区等待发送,并立即返回。生产者将单个的消息批量在一起发送来提高效率。

ack是判别请求是否为完整的条件(就是是判断是不是成功发送了)。我们指定了“all”将会阻塞消息,这种设置性能最低,但是是最可靠的。

retries,如果请求失败,生产者会自动重试,我们指定是 0 次,如果启用重试,则会有重复消息的可能性。

producer(生产者)缓存每个分区未发送消息。缓存的大小是通过 batch.size 配置指定的。值较大的话将会产生更大的批。并需要更多的内存(因为每个“活跃 ”的分区都有 1 个缓冲区)。

默认缓冲可立即发送,即遍缓冲空间还没有满,但是,如果你想减少请求的数量,可以设置linger.ms大于 0。这将指示生产者发送请求之前等待一段时间,希望更多的消息填补到未满的批中。这类似于 TCP 的算法,例如上面的代码段,可能 100 条消息在一个请求发送,因为我们设置了 linger(逗留) 时间为 1 毫秒,然后,如果我们没有填满缓冲区,这个设置将增加 1 毫秒的延迟请求以等待更多的消息。需要注意的是,在高负载下,相近的时间一般也会组成批,即使是 linger.ms=0。在不处于高负载的情况下,如果设置比 0 大,以少量的延迟代价换取更少的,更有效的请求。

buffer.memory 控制生产者可用的缓存总量,如果消息发送速度比其传输到服务器的快,将会耗尽这个缓存空间。当缓存空间耗尽,其他发送调用将被阻塞,阻塞时间的阈值通过 max.block.ms 设定,之后它将抛出一个 TimeoutException。

key.serializervalue.serializer示例,将用户提供的 key 和 value 对象 ProducerRecord 转换成字节,你可以使用附带的ByteArraySerializaerStringSerializer处理简单的 string 或 byte 类型。

send(record,Callback)

public Future<RecordMetadata> send(ProducerRecord<K,V> record,Callback callback)

异步发送一条消息到 topic,并调用 callback(当发送已确认)。

send 是异步的,并且一旦消息被保存在等待发送的消息缓存中,此方法就立即返回。这样并行发送多条消息而不阻塞去等待每一条消息的响应。

发送的结果是一个RecordMetadata,它指定了消息发送的分区,分配的 offset 和消息的时间戳。如果 topic 使用的是 CreateTime,则使用用户提供的时间戳或发送的时间(如果用户没有指定指定消息的时间戳)如果 topic 使用的是 LogAppendTime,则追加消息时,时间戳是 broker 的本地时间。

由于 send 调用是异步的,它将为分配消息的此消息的 RecordMetadata 返回一个Future。如果 future 调用,则将阻塞,直到相关请求完成并返回该消息的 metadata,或抛出发送异常。

阻塞调用

如果要模拟一个简单的阻塞调用,你可以调用 get() 方法。

 byte[] key = "key".getBytes();

 byte[] value = "value".getBytes();

 ProducerRecord<byte[],byte[]> record = new  ProducerRecord<byte[],byte[]>("my-topic", key, value)

 producer.send(record).get();

完全无阻塞调用 -callback

完全无阻塞的话, 可以利用回调参数提供的请求完成时将调用的回调通知。

 ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", key, value);

 producer.send(myRecord,

               new Callback() {

                   public void onCompletion(RecordMetadata metadata, Exception e) {

         &nbsnbsp;             if(e != null)

                           e.printStackTrace();

                       System.out.println("The offset of the record we just sent is: " + metadata.offset());

                   }

               });

 

回调顺序注意

发送到同一个分区的消息回调保证按一定的顺序执行,也就是说,在下面的例子中 callback1 保证执行 callback2 之前:

producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key1, value1), callback1);

producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key2, value2), callback2);

注意:callback 一般在生产者的 I/O 线程中执行,所以是相当的快的,否则将延迟其他的线程的消息发送。如果你需要执行阻塞或计算昂贵(消耗)的回调,建议在 callback 主体中使用自己的Executor来并行处理。

pecified by:

send in interface Producer<K,V>

Parameters:

record - 发送的记录(消息)
callback - 用户提供的 callback,服务器来调用这个 callback 来应答结果(null 表示没有 callback)。

Throws:

InterruptException - 如果线程在阻塞中断。
SerializationException - 如果 key 或 value 不是给定有效配置的 serializers。
TimeoutException - 如果获取元数据或消息分配内存话费的时间超过 max.block.ms。
KafkaException - Kafka 有关的错误(不属于公共 API 的异常)。


2. 消费者

简单示例

package com.boom.kafka.KafkaDemo;

 

import java.util.Arrays;

import java.util.Properties;

 

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.apache.kafka.clients.consumer.ConsumerRecords;

import org.apache.kafka.clients.consumer.KafkaConsumer;

 

public class ConsumerDemo {

public static void main(String[] args) {

Properties props = new Properties();

     props.put("bootstrap.servers""192.168.211.133:9092");

     props.put("group.id""test");

     props.put("enable.auto.commit""true");

     props.put("auto.commit.interval.ms""1000");

     props.put("key.deserializer""org.apache.kafka.common.serialization.StringDeserializer");

     props.put("value.deserializer""org.apache.kafka.common.serialization.StringDeserializer");

     KafkaConsumer<String, String> consumer = new KafkaConsumer(props);

     consumer.subscribe(Arrays.asList("test""my-topic"));

     while (true) {

         ConsumerRecords<String, String> records = consumer.poll(100);

         for (ConsumerRecord<String, String> record : records)

             System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());

     }

}

}

 



3. 连接器

http://docs.confluent.io/2.0.0/connect/connect-hdfs/docs/index.html

http://kafka.apache.org/documentation/#connect

 


 

http://orchome.com/

http://orchome.com/6

 

 

解析 storm 的 KafkaSpout

http://blog.csdn.net/fengyedeyanlei/article/details/52485165

http://www.myexception.cn/cloud/1830075.html

http://www.cnblogs.com/cruze/p/4241181.html

http://tianxingzhe.blog.51cto.com/3390077/1701258/

 

 

  • B3log

    B3log 是一个开源组织,名字来源于“Bulletin Board Blog”缩写,目标是将独立博客与论坛结合,形成一种新的网络社区体验,详细请看 B3log 构思。目前 B3log 已经开源了多款产品:PipeSoloSymWide 等,欢迎大家加入,贡献开源。

    3141 引用 • 3905 回帖 • 654 关注
  • Kafka

    Kafka 是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是现代系统中许多功能的基础。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。

    19 引用 • 18 回帖 • 207 关注
  • MQ
    8 引用 • 12 回帖
  • 消息队列
    13 引用 • 5 回帖
感谢    关注    收藏    赞同    反对    举报    分享