Python 之kafka Consumer和Producer

Admin 2019-06-10 15:13:15 Linux服务,Python

Topic 和 Partition

Topic 在逻辑上可以被认为是一个 queue,每条消费都必须指定它的 Topic,可以简单理解为必须指明把这条消息放进哪个queue 里。为了使得 Kafka 的吞吐率可以线性提高,物理上把 Topic 分成一个或多个 Partition,每个 Partition 在物理上对应一个文件夹,该文件夹下存储这个 Partition 的所有消息和索引文件。若创建 topic1 和 topic2 两个 topic,且分别有 3个和 6 个分区,则整个集群上会相应会生成共 9 个文件夹。

Kafka Python Client常见的有三个库(我这里使用kafka-python库)

安装

pip install kafka-python

生产者(Producer)

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from kafka import KafkaProducer
from kafka.errors import KafkaError

producer = KafkaProducer(bootstrap_servers=['192.168.8.152:9092'])

for i in range(5):
  future = producer.send("test", value=b"aa.bb.cc.{0}".format(i), key=b"{0}".format(i)) #, partition = i)
  try:
    record_metadata = future.get(timeout=30)

    print "topic: {0} partition: {1} offset: {2}".format(record_metadata.topic, record_metadata.partition, record_metadata.offset)
  except Exception,e:
  	print str(e)

producer.close()


Producer发送消息到 broker 时,会根据 Paritition 机制选择将其存储到哪一个 Partition。如果 Partition 机制设置合理,所有消息可以均匀分布到不同的 Partition 里,这样就实现了负载均衡。如果一个 Topic 对应一个文件,那这个文件所在的机器 I/O 将会成为这个 Topic 的性能瓶颈,而有了 Partition 后,不同的消息可以并行写入不同 broker 的不同 Partition 里,极大的提高了吞吐率。可以在 $KAFKA_HOME/config/server.properties 中通过配置项 num.partitions 来指定新建 Topic 的默认 Partition 数量,也可在创建 Topic 时通过参数指定,同时也可以在 Topic 创建之后通过 Kafka 提供的工具修改。


在发送一条消息时,可以指定这条消息的 key,Producer 根据这个 key 和 Partition 机制来判断应该将这条消息发送到哪个 Parition。Paritition 机制可以通过指定 Producer 的 paritition这一参数来指定,该参数必须实现 kafka.producer.Partitioner 接口。(每个 Parition 都会有个序号, 默认从0开始)


future.get函数等待单条消息发送完成或超时(必须有此函数或使用time.sleep代替)

消费者(Consumer)

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from kafka import KafkaConsumer

consumer = KafkaConsumer("test", group_id='group1', bootstrap_servers=['192.168.8.152:9092'])
for message in consumer:
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                          message.offset, message.key,
                                          message.value))

group_id:指定此消费者实例属于的组名(可不指定)。同一 Topic 的一条消息只能被同一个 Consumer Group 内的一个 Consumer 消费,但多个 Consumer Group 可同时消费这一消息。这是 Kafka 用来实现一个 Topic 消息的广播(发给所有的 Consumer)和单播(发给某一个 Consumer)的手段。一个 Topic 可以对应多个 Consumer Group。如果需要实现广播,只要每个 Consumer 有一个独立的 Group 就可以了。要实现单播只要所有的 Consumer 在同一个 Group 里。用 Consumer Group 还可以将 Consumer 进行自由的分组而不需要多次发送消息到不同的 Topic。

实时上,Kafka 的设计理念之一就是同时提供离线处理和实时处理。根据这一特性,可以使用 Storm 这种实时流处理系统对消息进行实时在线处理,同时使用 Hadoop 这种批处理系统进行离线处理,还可以同时将数据实时备份到另一个数据中心,只需要保证这三个操作所使用的 Consumer 属于不同的 Consumer Group 即可。


参考文档

相关文章
最新推荐