【原创】
本文作者:Denghui.Zhou,欢迎交流讨论。
原文链接:https://jianwi.com/articles/bigdata/kafka/reference.html
版权声明:原创不易,转载请注明出处,谢谢!

1、命令行客户端

集群参数

# Zookeeper地址
zkinfo=s33.dc.domain:2181,s34.dc.domain:2181,s35.dc.domain:2181,s36.dc.domain:2181,s37.dc.domain:2181/kafka
# Kafka Broker地址
kafkainfo=s31.dc.domain:9092,s32.dc.domain:9092,s33.dc.domain:9092,s34.dc.domain:9092,s35.dc.domain:9092,s36.dc.domain:9092,s37.dc.domain:9092

创建主题

kafka-topics --create --zookeeper $zkinfo --replication-factor 2 --partitions 21 --topic test

验证主题创建成功

kafka-topics --list --zookeeper $zkinfo

生产消息

kafka-console-producer --broker-list $kafkainfo --topic test

消费消息

kafka-console-consumer --bootstrap-server $kafkainfo --topic test --from-beginning

2、Java API 用法

生产者生产消息简单样例:

package com.domain.dc.kafkaexamples;

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

/**
 * SimpleProducer
 *
 * @author Denghui.Zhou
 * @date 2017/12/19
 */
public class SimpleProducer {
    public static void main(String[] args) {
        // kafka 代理服务地址
        final String bootstrapServers = "s31.dc.domain:9092,s32.dc.domain:9092,s33.dc.domain:9092,s34.dc.domain:9092,s35.dc.domain:9092,s36.dc.domain:9092,s37.dc.domain:9092";
        // 生成以 ufoId 开头的连续事件
        long total = Long.parseLong(args[0]);
        long ufoId = Math.round(Math.random() * Integer.MAX_VALUE);

        // 设置客户端 Java 属性
        Properties props = new Properties();
        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.setProperty(ProducerConfig.ACKS_CONFIG, "1");

        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        for (long i = 0; i < total; i++) {
            String key = Long.toString(ufoId++);
            long runtime = System.currentTimeMillis();
            double latitude = (Math.random() * (2 * 76.92386517)) - 76.92386517;
            double longitude = (Math.random() * 360.0) - 180.0;
            String msg = runtime + "," + latitude + "," + longitude;
            try {
                ProducerRecord<String, String> data = new ProducerRecord<>("test", key, msg);
                System.out.printf("发送消息:<key: %s>, <msg: %s>, <partition: %d>\n", key, msg, data.partition());
                producer.send(data);
                long wait = Math.round(Math.random() * 100);
                Thread.sleep(wait);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        producer.close();

    }
}

Kafka Java 客户端 API 生产者:
Kafka Java客户端API生产者

消费者消费消息简单样例:

package com.domain.dc.kafkaexamples;

import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

/**
 * SimpleConsumer
 *
 * @author Denghui.Zhou
 * @date 2017/12/19
 */
public class SimpleConsumer {
    public static void main(String[] args) {
        // Kafka 代理服务地址
        final String bootstrapServers = "s31.dc.domain:9092,s32.dc.domain:9092,s33.dc.domain:9092,s34.dc.domain:9092,s35.dc.domain:9092,s36.dc.domain:9092,s37.dc.domain:9092";
        // 设置客户端 Java 参数
        Properties props = new Properties();
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        // 指定消费组
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
        // 启用自动提交
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        // 要订阅的主题列表
        consumer.subscribe(Arrays.asList("test"));
        while (true) {
            try {
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("接收消息:<key: %s>, <msg: %s>, <partition: %d>, <offset: %d>\n", record.key(), record.value(), record.partition(), record.offset());
                }
            } catch (Exception e) {
                e.printStackTrace();
                break;
            }
        }
        consumer.close();

    }

}

Kafka Java 客户端 API 消费者:
Kafka Java客户端API消费者

3、Python API 用法

安装 kafka-python

pip install kafka-python

生产消息

from kafka import KafkaProducer
# Kafka Broker 地址
kafkainfo='s31.dc.domain:9092,s32.dc.domain:9092,s33.dc.domain:9092,s34.dc.domain:9092,s35.dc.domain:9092,s36.dc.domain:9092,s37.dc.domain:9092'
producer = KafkaProducer(bootstrap_servers=kafkainfo, compression_type='gzip')
for i in range(10):
    producer.send('test', b'msg %d' % i)
producer.flush()

Kafka Python API 生产者:
Kafka Python API生产者
消费消息

from kafka import KafkaConsumer
# Kafka Broker 地址
kafkainfo='s31.dc.domain:9092,s32.dc.domain:9092,s33.dc.domain:9092,s34.dc.domain:9092,s35.dc.domain:9092,s36.dc.domain:9092,s37.dc.domain:9092'
consumer = KafkaConsumer('test', bootstrap_servers=kafkainfo)
for msg in consumer:
    print (msg)

Kafka Python API 消费者:
Kafka Python API消费者

  • B3log

    B3log 是一个开源组织,名字来源于“Bulletin Board Blog”缩写,目标是将独立博客与论坛结合,形成一种新的网络社区体验,详细请看 B3log 构思。目前 B3log 已经开源了多款产品:PipeSoloSymWide 等,欢迎大家加入,贡献开源。

    3157 引用 • 3905 回帖 • 654 关注
  • Kafka

    Kafka 是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是现代系统中许多功能的基础。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。

    19 引用 • 18 回帖 • 207 关注
  • 大数据

    大数据(big data)是指无法在一定时间范围内用常规软件工具进行捕捉、管理和处理的数据集合,是需要新处理模式才能具有更强的决策力、洞察发现力和流程优化能力的海量、高增长率和多样化的信息资产。

    72 引用 • 106 回帖 • 1 关注
感谢    关注    收藏    赞同    反对    举报    分享