Rocky & Sky IT技术和管理

和大家分享一些技术,聊一聊有趣的话题

Kafka 处理超大消息的方法-分块和外部存储

Kafka作为使用最广泛的流消息系统,被广泛应用企业流数据处理和互联网实时处理平台,可以在该平台传输各种类型的数据。但是Kafka对其处理的消息大小是有限制的。在最近的版本中,其最大消息大小被限制在2GB以下。官方建议最好是把单个消息的大小限制在10k左右,Kafka的设计本来就是为了处理巨量的消息,而不是用来传输大型的消息的。 我们可以通过以下的配置修改消息的大小,但是强烈不建议这样做。在Kafka系统上传输大型的消息会对整个系统的性能造成很大的负面影响,很有可能把整个系统搞崩溃。

  • broker
    • message.max.bytes :500000000B(500M),单个消息的最大值 。
    • replica.fetch.max.bytes :512000000B(512M),单个消息的最大值broker可复制的消息的最大字节数,比message.max.bytes大。这个512M肯定已经会对性能产生非常不好的影响了。
  • consumer
    • fetch.message.max.bytes:500000000B(500M) 消费者能读取的最大消息,大于或等于message.max.bytes
  • producer
    • max.request.size:500000000B(500M) 生产者能请求的最大消息,大于或等于message.max.bytes

那如果我们确实会有一些比较大的消息,比如视频信息,大的二进制文件需要传送怎么办呢?我们通常采用以下两种方式来处理。 方法1:分块,这个也是LinkedIn团队推荐的的方式: 这个方法就是把大型的数据块拆成多个小的消息,然后需要给每一个消息设置编号,并把同一个数据块的消息设置ID,我们可以用下面的方式发送大数据块。 《Kafka 处理超大消息的方法-分块和外部存储》上面的例子里,我们把一个数据拆成了3个消息发送,type指的是消息类型,如果是chuck,则指的是需要进行合并的消息数据。uuid是这个数据块的标识。chunk_pos是表示某条消息在原来的数据块拆分序号,chunk_max表示拆分的消息总数。一旦三个分片消息都接受完成,这三个消息就会被合并成原来的数据块。 每一个消息我们都会有uuid,chunk_pos和chunk_max这几个元数据。消息格式如下: 《Kafka 处理超大消息的方法-分块和外部存储》发送消息:《Kafka 处理超大消息的方法-分块和外部存储》接收消息:必须保证接受到了所有的消息才能够合并后发送给用户。要做到这一点其实要保证所有的消息不能够在接受的时候丢失。 #### 方法二:外部存储

除了直接通过Kafka发送拆分的消息外,我们还可以采用另外一种方式,把数据直接存储在另外一个外部数据存储系统中。在消息中只发送存储的Key或者ID 消息的格式如下: 《Kafka 处理超大消息的方法-分块和外部存储》消息的发送需要处理两个操作,第一个是把数据存入外部存储系统中,另外一个操作就是把带有存储的key/id的消息发送到Kafka的队列里去。 《Kafka 处理超大消息的方法-分块和外部存储》操作外部存储的部分,则需要有以下功能:写入,读取和回滚; 下面的代码是示例封装一个消息: 《Kafka 处理超大消息的方法-分块和外部存储》下面的代码示例是接收到消息并从外部存储读取数据: – 检查是否是特殊的消息,需要读取外部存储中的数据

  • 从消息里取得存储的key/ID
  • 通过key读取外部存储的数据
  • 返回结果给用户

《Kafka 处理超大消息的方法-分块和外部存储》

点赞
  1. Awesome post! Keep up the great work! :)

发表评论

电子邮件地址不会被公开。 必填项已用*标注