# flume整合kafka简单配置及操作步骤
- 此次操作考虑到是在已经安装服务的基础上进行配置工作 所以安装步骤省略
- 配置信息中的路径等信息均是根据实际服务器目录记录 使用时可灵活改变对应目录及命令中的IP端口
# 创建主题
/inf_data/kafka_2.11-0.11.0.0/bin/kafka-topics.sh --create --zookeeper 172.27.132.81:3181 --replication-factor 1 --partitions 1 --topic CMCC_PRO_HJDEVICE_SC
1
# 查看主题列表
/inf_data/kafka_2.11-0.11.0.0/bin/kafka-topics.sh --list --zookeeper 172.29.73.51:2181
1
# 通过控制台消费主题
/inf_data/kafka_2.11-0.11.0.0/bin/kafka-console-consumer.sh --bootstrap-server 172.27.132.83:9200 --topic CMCC_PRO_HJDEVICE_SC --from-beginning
1
# 通过控制台生产消息
/inf_data/kafka_2.11-0.11.0.0/bin/kafka-console-producer.sh --broker-list 172.27.132.83:9200 --topic
1
# 删除指定主题 (此操作需要kafka配置支持)
/inf_data/kafka_2.11-0.11.0.0/bin/kafka-topics.sh --delete --topic CMCC_PRD_HJDEVICE_SC --zookeeper 172.27.132.81:3181
1
# 查看某个主题的详细信息
/inf_data/kafka_2.11-0.11.0.0/bin/kafka-topics.sh --zookeeper 172.27.132.81:3181 --describe --topic CMCC_PRO_HJDEVICE_SC
1
显示结果如下:
Topic: CMCC_PRO_HJDEVICE_SC PartitionCount:12 ReplicationFactor:3 Configs:
Topic: CMCC_PRO_HJDEVICE_SC Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
Topic: CMCC_PRO_HJDEVICE_SC Partition: 1 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
Topic: CMCC_PRO_HJDEVICE_SC Partition: 2 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
Topic: CMCC_PRO_HJDEVICE_SC Partition: 3 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0
Topic: CMCC_PRO_HJDEVICE_SC Partition: 4 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1
Topic: CMCC_PRO_HJDEVICE_SC Partition: 5 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
Topic: CMCC_PRO_HJDEVICE_SC Partition: 6 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
Topic: CMCC_PRO_HJDEVICE_SC Partition: 7 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
Topic: CMCC_PRO_HJDEVICE_SC Partition: 8 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
Topic: CMCC_PRO_HJDEVICE_SC Partition: 9 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0
Topic: CMCC_PRO_HJDEVICE_SC Partition: 10 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1
Topic: CMCC_PRO_HJDEVICE_SC Partition: 11 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
2
3
4
5
6
7
8
9
10
11
12
13
14
# 查询指定主题内消息总数 查询结果作差
/inf_data/kafka_2.11-0.11.0.0/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 172.27.132.83:9200 --topic CMCC_PRO_HJDEVICE_SC --time -1
/inf_data/kafka_2.11-0.11.0.0/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 172.27.132.83:9200 --topic CMCC_PRO_HJDEVICE_SC --time -2
1
2
2
# 查看zookeeper上记录的所有消费组
/inf_data/kafka_2.11-0.11.0.0/bin/kafka-consumer-groups.sh --zookeeper 172.27.132.81:3181 --list
1
# 查看指定broker上的所有消息组
/inf_data/kafka_2.11-0.11.0.0/bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server 172.27.132.83:9200 --list
1
# 查看主要配置文件如下 (此处配置时已根据路径映射 平时配置需要结合flume的配置文件实际目录进行配置)
iwc@hncmccbd70:/inf_data/kafka_2.11-0.11.0.0> cat /inf_data/flume-properties/k2k.properties
1
# 配置说明(其实主要是这个配置)
# 定义这个 agent 中各个组件的名字
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 描述和配置 source 组件:r1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.kafka.bootstrap.servers = 172.29.73.33:9092
# 这里也可以手动指定消费组
# a1.sources.r1.kafka.consumer.group.id = CMCC_PRO_HJDEVICE_SC_GROUP
a1.sources.r1.kafka.topics = PRD_HJDEVICE_SC
# 拦截器: i1
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = topic
a1.sources.r1.interceptors.i1.preserveExisting = false
a1.sources.r1.interceptors.i1.value = CMCC_PRO_HJDEVICE_SC
# 描述和配置 sink 组件:k1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = 172.27.132.81:9200,172.27.132.82:9200,172.27.132.83:9200
a1.sinks.k1.topic = CMCC_PRO_HJDEVICE_SC
# 描述和配置 channel 组件,此处使用是内存缓存的方式
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000
# 描述和配置 source channel sink 之间的连接关系
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# 启动脚本:
nohup sh /inf_data/flume-properties/kafka2kafka.sh &
1
# 查看脚本内容
iwc@hncmccbd70:/inf_data/kafka_2.11-0.11.0.0> cat /inf_data/flume-properties/kafka2kafka.sh
/opt/cloudera/parcels/CDH/lib/flume-ng/bin/flume-ng agent -c /opt/cloudera/parcels/CDH/lib/flume-ng/conf -f k2k.properties -n a1 ##-Dflume.root.logger=INFO,console
1
2
2
# 注意事项:
# 实际工作配置过程中应该根据实际情况进行编写命令和配置内容,针对kafka和flume有相应的一定程度的了解,参阅官方文档明确配置各个参数的含义。
# 查询资料:
- Kafka中文官方文档 (opens new window)
- Flume官网 (opens new window)
- 以及网络上的资料和视频学习资料(度娘的比较多就不列了)