Iawen's Blog

我喜欢这样自由的随手涂鸦, 因为我喜欢风......

Kafka 是一个消息系统, 原本开发自 LinkedIn, 用作 LinkedIn 的活动流(Activity Stream)和运营数据处理管道(Pipeline)的基础。现在它已被多家不同类型的公司 作为多种类型的数据管道和消息系统使用。

活动流数据是几乎所有站点在对其网站使用情况做报表时都要用到的数据中最常规的部分。活动数据包括页面访问量(Page View)、被查看内容方面的信息以及搜索情况等内容。这种数据通常的处理方式是先把各种活动以日志的形式写入某种文件, 然后周期性地对这些文件进行统计分析。运营数据指的是服务器的性能数据(CPU、IO 使用率、请求时间、服务日志等等数据)。运营数据的统计方法种类繁多。

1. Kafka 简介

Kafka 是一种分布式的, 基于发布 / 订阅的消息系统。主要设计目标如下:

  • 以时间复杂度为 O(1) 的方式提供消息持久化能力, 即使对 TB 级以上数据也能保证常数时间复杂度的访问性能
  • 高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒 100K 条以上消息的传输
  • 支持 Kafka Server 间的消息分区, 及分布式消费, 同时保证每个 Partition 内的消息顺序传输
  • 同时支持离线数据处理和实时数据处理
  • Scale out: 支持在线水平扩展。

1.1 安装

# 下载及解压Zookeeper, kafka
# http://archive.apache.org/dist/
wget https://mirrors.aliyun.com/apache/zookeeper/stable/apache-zookeeper-3.6.3-bin.tar.gz
tar -xvf apache-zookeeper-3.6.3-bin.tar.gz
# 修改配置文件zoo.cfg

wget https://mirrors.aliyun.com/apache/kafka/2.8.2/kafka_2.13-2.8.2.tgz
tar -xaf kafka_2.13-2.8.2.tgz

# 编辑配置文件server.properties

# 启动
export ZOOKEEPER_HOME=/usr/local/zookeeper-3.4.12
export KAFKA_HOME=/usr/local/kafka_2.11
export PATH=$PATH:$ZOOKEEPER_HOME/bin:$KAFKA_HOME/bin

zkServer.sh start
kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties

2. Kafka 基础概念

2.1 概念一: 生产者与消费者

对于 Kafka 来说客户端有两种基本类型: 生产者(Producer)和消费者(Consumer)。除此之外, 还有用来做数据集成的 Kafka Connect API 和流式处理的 Kafka Streams 等高阶客户端, 但这些高阶客户端底层仍然是生产者和消费者API, 它们只不过是在上层做了封装。

2.2 主题(Topic)与分区(Partition)

在 Kafka 中, 消息以主题(Topic)来分类, 每一个主题都对应一个「消息队列」, 这有点儿类似于数据库中的表。但是如果我们把所有同类的消息都塞入到一个“中心”队列中, 势必缺少可伸缩性, 无论是生产者/消费者数目的增加, 还是消息数量的增加, 都可能耗尽系统的性能或存储。

2.3 Broker 和集群(Cluster)

一个 Kafka 服务器也称为 Broker, 它接受生产者发送的消息并存入磁盘; Broker 同时服务消费者拉取分区消息的请求, 返回目前已经提交的消息。使用特定的机器硬件, 一个 Broker 每秒可以处理成千上万的分区和百万量级的消息。(现在动不动就百万量级..我特地去查了一把, 好像确实集群的情况下吞吐量挺高的..摁..)

2.4 多集群

随着业务发展, 我们往往需要多集群, 通常处于下面几个原因:

  • 基于数据的隔离
  • 基于安全的隔离
  • 多数据中心(容灾)

当构建多个数据中心时, 往往需要实现消息互通。举个例子, 假如用户修改了个人资料, 那么后续的请求无论被哪个数据中心处理, 这个更新需要反映出来。又或者, 多个数据中心的数据需要汇总到一个总控中心来做数据分析。

3. Kafka基本配置

3.1 服务端的配置文件(server.properties)

参数 说明
broker.id 每一个broker在集群中的唯一表示, 要求是正数。当该服务器的IP地址发生改变时, broker.id没有变化, 则不会影响consumers的消息情况
host.name broker的主机地址, 若是设置了, 那么会绑定到这个地址上, 若是没有, 会绑定到所有的接口上, 并将其中之一发送到ZK, 一般不设置
port broker server服务端口
num.network.threads broker处理消息的最大线程数, 一般情况下数量为cpu core的个数
num.io.threads broker处理磁盘IO的线程数, 数值为cpu core个数的2倍
socket.send.buffer.bytes socket的发送缓冲区, socket的调优参数SO_SNDBUFF
socket.receive.buffer.bytes socket的接受缓冲区, socket的调优参数SO_RCVBUFF
socket.request.max.bytes socket请求的最大数值, 防止serverOOM, message.max.bytes必然要小于socket.request.max.bytes, 会被topic创建时的指定参数覆盖
log.dirs=/data/kafka-logs kafka数据的存放地址, 多个地址的话用逗号分割,多个目录分布在不同磁盘上可以提高读写性能 /data/kafka-logs-1, /data/kafka-logs-2
num.partitions 每个topic的分区个数, 若是在topic创建时候没有指定的话会被topic创建时的指定参数覆盖
num.recovery.threads.per.data.dir 每个数据目录在启动时用于日志恢复和在关闭时用于刷新的线程数。对于安装在RAID阵列中的数据dirs, 建议增加此值。
log.retention.hours 日志保留小时数
log.retention.bytes 保留的日志文件的大小 -1表示不限制, 可以同时指定log.retention.bytes和log.retention.hours来混合指定保留规则。一旦日志的大小超过了log.retention.bytes就清除老的segment, 一旦某个segment的保留时间超过了规定的值同样将其清除。
log.segment.bytes topic的分区是以一堆segment文件存储的, 这个控制每个segment的大小, 会被topic创建时的指定参数覆盖
log.roll.hours 这个参数会在日志segment没有达到log.segment.bytes设置的大小, 也会强制新建一个segment会被 topic创建时的指定参数覆盖
log.retention.check.interval.ms 文件大小检查的周期时间
zookeeper.connect broker需要使用zookeeper保存meta数据
zookeeper.connection.timeout.ms zookeeper链接超时时间
message.max.bytes 表示消息体的最大大小, 单位是字节
background.threads 一些后台任务处理的线程数, 例如过期消息文件的删除等, 一般情况下不需要去做修改
queued.max.requests 等待IO线程处理的请求队列最大数, 若是等待IO的请求超过这个数值, 那么会停止接受外部消息, 应该是一种自我保护机制

3.2 生产端的配置文件(producer.properties)

参数 说明
metadata.broker.list 指定节点列表
compression.type 压缩类型, 可选none, gzip, snappy, lz4
compressed.topics 如果要压缩消息, 这里指定哪些topic要压缩消息, 默认是empty, 表示不压缩
partitioner.class 指定分区处理类。默认kafka.producer.DefaultPartitioner
request.required.acks 设置发送数据是否需要服务端的反馈, 有三个值0, 1, -1, 0:producer不会等待broker发送ack; 1:当leader接收到消息后发送ack; -1:当所有的follower都同步消息成功后发送ack
request.timeout.ms 客户端等待请求响应的最长时间
queue.enqueue.timeout.ms 当消息在producer端沉积的条数达到“queue.buffering.max.messages"后阻塞一定时间后, 队列仍然没有enqueue(producer仍然没有发送出任何消息)此时producer可以继续阻塞, 或者将消息抛弃 -1: 无阻塞超时限制, 消息不会被抛弃 0 : 立即清空队列, 消息被抛弃
serializer.class 指定序列化处理类

3.3 消费端的配置文件(consumer.properties)

参数 说明
zookeeper.connect(必需) zookeeper连接服务器地址
zookeeper.connectiontimeout.ms zookeeper连接的过期时间
zookeeper.session.timeout.ms zookeeper的session的过期时间
zookeeper.sync.time.ms 指定多久消费者更新offset到zookeeper中
group.id(必需) 消费者的group id
auto.commit.enable 是否自动提交offset信息
auto.commit.interval.ms 自动更新时间
consumer.id 当前consumer的标识
queued.max.message.chunks 最大取多少块缓存到消费者(默认10)
rebalance.max.retries 当有新的consumer加入到group时, 将会reblance
fetch.min.bytes 获取消息的最大尺寸,broker不会向consumer输出大于此值得chunk
fetch.wait.max.ms 当消息尺寸不足时, server阻塞的时间,如果超时, 立即发送给consumer

3.4 用户认证

# 创建管理用户admin, 密码是admin
kafka-configs.sh --zookeeper 127.0.0.1:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=admin],SCRAM-SHA-512=[password=admin]' --entity-type users --entity-name admin

# 创建生产者 iawen, 密码是 123456
kafka-configs.sh --zookeeper 127.0.0.1:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=123456],SCRAM-SHA-512=[password=123456]' --entity-type users --entity-name iawen

kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=127.0.0.1:2181 --add --allow-principal User:iawen --operation Write --topic test


# 创建消费者 rec_user, 密码是 123456
kafka-configs.sh --zookeeper 127.0.0.1:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=123456],SCRAM-SHA-512=[password=123456]' --entity-type users --entity-name rec_user

# 赋Topic 和 group 权限给用户
kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=127.0.0.1:2181 --add --allow-principal User:rec_user --operation Read --topic test

kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=127.0.0.1:2181 --add --allow-principal User:reader --operation Read --group test-group

kafka-configs.sh --zookeeper 127.0.0.1:2181 --describe --entity-type users --entity-name iawen
kafka-acls.sh --authorizer-properties zookeeper.connect=127.0.0.1:2181 --list

修改 server.properties 里的配置:

# 启用ACL
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
# 设置admin为超级用户
super.users=User:admin
# 启用SCRAM机制, 采用SCRAM-SHA-512算法
sasl.enabled.mechanisms=SCRAM-SHA-512
# 为broker间通讯开启SCRAM机制, 采用SCRAM-SHA-512算法
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512

security.inter.broker.protocol=SASL_PLAINTEXT
listeners=SASL_PLAINTEXT://192.168.137.3:9092
advertised.listeners=SASL_PLAINTEXT://192.168.137.3:9092

auto.create.topics.enable=false
KAFKA_OPTS=-Djava.security.auth.login.config=$KAFKA_HOME/config/kafka-broker-jaas.conf kafka-server-start.sh $KAFKA_HOME/config/server.properties

kafka-broker-jaas.conf 文件内容如下:

KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="123456";
};

4. Kafka 命令

4.1 常用操作


export PATH=/usr/local/confluent/bin:$PATH

kafka-topics.sh --zookeeper 127.0.0.1:2181 --create --topic rec_behave --partitions 1 --replication-factor 1

kafka-topics.sh --zookeeper 127.0.0.1:2181 --list

kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list test_behavior

kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name __consumer_offsets --describe
kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name __consumer_offsets --alter --delete-config cleanup.policy
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic __consumer_offsets --partition 0 --from-beginning --formatter 'kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter'

# 1296000000  --> 15 days, 2592000000  --> 30 days
kafka-topics.sh --zookeeper localhost:2181 -topic xxxxx --alter --config retention.ms=2592000000 #

# 查看与删除topic下的持久化文件命令;  因为kafka一直运行的话 硬盘占用越来越多; 
# kafka-topics -delete --zookeeper 127.0.0.1:2181 --topic 162btc
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group xxx --topic xxx --execute --reset-offsets --to-offset 0

4.2 SASL 模式下

  • config/kafka-client-jaas.conf
KafkaClient {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="xxxxx";
};
  • config/consumer-sasl.properties
security.protocol = SASL_PLAINTEXT
sasl.mechanism = SCRAM-SHA-512
  • bin/kafka-consumer-groups-sasl.sh
ROOT=$(dirname $0)
export KAFKA_OPTS=" -Djava.security.auth.login.config=$ROOT/../config/kafka_client_jaas.conf"
exec $ROOT/kafka-run-class.sh kafka.admin.ConsumerGroupCommand "$@" --command-config $ROOT/../config/consumer-sasl.properties
kafka-consumer-groups-sasl.sh --bootstrap-server 127.0.0.1:9092 --list 
kafka-consumer-groups-sasl.sh --bootstrap-server 127.0.0.1:9092 --group rec-group --describe

5. 其他

5.1 线上Kafka突发rebalance异常

参考: https://www.cnblogs.com/chanshuyi/p/kafka_rebalance_quick_guide.html