Kafka
- Apache Kafka® 是 一个分布式流处理平台,是 使用 Scala 编写具有高水平扩展和高吞吐量的分布式消息系统
- 可以让你****发布和订阅流式的记录。这一方面与消息队列或者企业消息系统类似。
- 可以****储存流式的记录,并且有较好的容错性。
- 可以在流式记录产生时就****进行处理。
- Kafka 使用场景
- 构造实时流数据管道,它可以在系统或应用之间可靠地获取数据。 (相当于消息队列)
- 构建实时流式应用程序,对这些流数据进行转换或者影响。
- 特性:高吞吐、低延迟、高伸缩(易扩展)、持久性、可靠性、容错性、高并发
核心原理
Broker存储消息
- 在kafka种每条消息记录中包含一个 key ,一个 value 和一个 timestamp(时间戳)
- Topic:主题
- Topic 就是数据主题,可以理解为一种类型的消息数据集合,类似于RabbitMQ中的Queue,用于存储数据
- 一个 topic 可以拥有一个或者多个消费者来订阅它的数据
- Partition :分区
- Topic是消息的集合,如果消息总量过大,那么会存在存储与并发性能问题(类比于数据库数据太多),因此Topic会分为多个分区。每个分区会存储Topic的一部分数据,以此来解决存储扩展,以及并发访问的问题.
- **例:如果Topic总共9条记录,有3个Partition **
那么Partition 1可能有编号1、4、7的数据
那么Partition 2可能有编号2、3、5的数据
那么Partition 3可能有编号6、8、9的数据
- **例:如果Topic总共9条记录,有3个Partition **
- Kafka 默认的分区数量是1,也就是一个Topic默认只有一个分区,可以通过配置进行修改
- Partition 数量不是越多越好,可以通过实际使用情况来进行优化设置
- 计算公式:系统期望最大吞吐量/生产者吞吐量 、系统期望最大吞吐量/消费者吞吐量。在两者之间取最大值
- Partition 数量越多占用资源就越多
- segment:段
- Kafka消息数据是存储到磁盘上,并记录到一个log文件当中的。为了防止log文件不断变大导致检索效率过低,Partition被分为多个段来保存数据。
- 每个段的结构包含一个日志文件(.log)、一个偏移量索引文件(.index)、一个时间戳索引文件(.timeindex)。三个文件是成套出现的,如图:
- 出现如下情况时,就会出现分段现象:
- log文件达到固定大小:默认大小是1 G,由参数控制:
log.segment.bytes
- 超过一定时间没有向log文件写入数据:默认事件是一周,由参数控制:
log.roll.hours
或者log.roll.ms
- index文件达到固定大小:默认大小是10 M
- log文件达到固定大小:默认大小是1 G,由参数控制:
- 日志文件:文件以该段中最小的offset命名
- 偏移量索引
- 使用的是稀疏索引,记录的消息偏移量以及消息在log文件中的物理地址
- 默认情况下每生成4 K的消息,就会生成一个索引记录,
- 使用的是稀疏索引,记录的消息偏移量以及消息在log文件中的物理地址
- 时间戳索引:有两种,一种是消息发送时间,一种是消息落盘时间,默认是落盘时间。通过参数控制:
log.message.timestamp.type=CreateTime/LogAppendTime
- 消息结构如图:
- 根据 offset 查找消息流程
- 现根据 offset 确定数据是在哪个log文件中,选择名称中小于offset的最大的那个log文件
- 找到log文件对应的偏移量索引文件,使用二分查找找到最接近的索引
- 再根据索引去log文件中找到对应数据
- 日志清理:log文件是不断追加的,需要清理日志,保证磁盘空间。自动清理日志配置:
- 需要打开清理开关:
log.cleanner.enable=true
- 定制清理策略是删除还是压缩:
log.cleanner.policy=delete/compact
。压缩是将相同key合并为最后一个value:
- 设置清理周期:
log.retention.check.interval.ms=3000000
- 定制清理规则:
- 过期日志清理:
log.retention.hours
、log.retention.minutes
、log.retention.ms
- 清理超过的大小的日志:
log.retention.bytes
、log.segment.bytes
- 过期日志清理:
- 需要打开清理开关:
- Topic是消息的集合,如果消息总量过大,那么会存在存储与并发性能问题(类比于数据库数据太多),因此Topic会分为多个分区。每个分区会存储Topic的一部分数据,以此来解决存储扩展,以及并发访问的问题.
- Replication:副本
每个Partition还会被复制到其它服务器作为Replication,这是一种冗余备份策略,副本满足以下原则:-
副本数量小于等于Broker节点数量
-
同一broker上只能存在同一个Partition的一个Replication
-
每个Partition的Replication中,有一个leader节点 ,零或多个follower节点
- leader节点负责监听Broker变化、监听Topic变化、监听Partition变化、获取和管理Broker、Topic、Partition的信息、管理Partition的主从信息、处理此分区的****所有的读写请求, follower仅仅被动的复制数据
- leader宕机后,会从follower中选举出新的leader
-
副本在Broker的分布规则
- 副本个数不能大于Broker
- 第一个分区的第一个副本是随机分布到一个Broker上
- 其他分区的第一个副本相当于第一个分区的第一个副本往后移一个Broker。
- 决定好第一个副本之后,其余的每个分区随机分布
- 例如3个Broker,4个分区,2个副本。假如第一个分区的第一个副本随机分布到了第二个Broker上,那么第二个副本的第一个分区分布在第三个Broker上,第三个分区的第一个分布到第一个Broker上,第四个分区的第一个副本分布到第二个broker上。分区的第一个副本作为leader节点,剩余的副本随机分布,作为follower节点,如图:
-
生产者发送消息
- Producers:消息的生产者
- 生产者往某个Topic上发布消息,生产者可以指定消息发布到Topic上的哪一个分区。
- 生产者发送消息是批量发送的
- 多少条发送一次:默认是16K发送一次,使用参数
batch.size
控制, - 批量发送的等待时间:如果没有到16K达到了阈值也会发送:
linger.ms
- 多少条发送一次:默认是16K发送一次,使用参数
- 如果生产者没有指定分区,根据消息中的
key
进行判断- 消息中的
key
为空,那么默认会采用轮询的方式,将数据分配到不同的分区上。 - 消息中的
key
不为空,则会对key
值进行哈希运算,根据hash运算结果对分区数量取模,决定到哪个分区上
- 消息中的
- 流程
- Broker 响应
- 消息发送到Borker之后,为了保证消息的可靠性需要返回个标志给生产者,返回标志有以下三种方案:
- 消息只要被leader节点接受就返回
- 消息被半数以上的follower节点存储成功就返回
- **消息被所有follower存储成功再返回 **
- 生产者发送消息的代码中有一个参数
acks
pros.put("acks","1");
:leader存储完成之后就返回ACKpros.put("acks","0");
:不等待ACKpros.put("acks","-1");
:leader和全部follower存储成功就返回ACK。此时消息重复次数最好设为0
- kafka第三种方案的实现
- ISR是动态列表,kafka中有一个in-sync replica set将所有活动的follower节点维护起来,简称ISR。
- 当follower节点超过一定时间没有与leader节点同步数据,那就会从ISR列表中移除。当follower节点恢复从leader节点同步数据,那么就会重新加入到ISRl列表中,使用参数
replica.lag.time.max.ms
设置 - ISR列表中的所有节点存储完成之后再返回确认标志
- 消息发送到Borker之后,为了保证消息的可靠性需要返回个标志给生产者,返回标志有以下三种方案:
消费者消费消息
- Consumers:消费者
消费者通过poll方式主动拉取消息,可以控制每次拉取消息的数量 - Consumer Group:消费者组
- 一个消费者组由一个或多个消费者实例组成。通常情况下,每个 Topic 都会有一些消费组,一个消费组对应一个"逻辑订阅者"。这就是发布和订阅的概念,只不过订阅者是一组消费者而不是单个的消费者。
- 默认情况下发布到 Topic 中的每条记录被分配给订阅消费组中的一个消费者实例,消息记录会负载平衡到每一个消费者实例
- 如果Partition数量 等于 消费者组中的消费者数量,则一个消费者消费一个Partition中的消息
- 如果Partition数量 小于 消费者组中的消费者数量,则有一个消费者消费不到消息
- 如果Partition数量 大于 消费者组中的消费者数量,则一个消费者消费多个Partition中的消息
- 分配策略:
- **Range Assignor(范围分配):默认使用的是范围分配的策略,尽量均分消息然后分配到每个消费者 **
- Round Robin Assignor(轮询策略):采用轮询分配到每一个消费者
- Sticky Assignor(粘滞策略):尽可能跟上次相同、尽可能均匀
- 指定消费:通过指定消费者消费某个分区
TopicPatition tp = new TopicPatiton("ass5part",0); consumer.assign(Arrays.asList(tp));
- rebalance 分区重分配
- 当分区数发生变更,或者消费组中的消费者数量发生变化都会会触发重分配
- 从Broker中找到一个 GroupCoordinator 作为主持人
- 找到所有的消费者,叫做join group
- 从所有的消费者选出一个leader,根据leader定制一个方案上报给主持人
- 主持人将方案通知到所有消费者
- 当分区数发生变更,或者消费组中的消费者数量发生变化都会会触发重分配
- 如果一个消费者属于多个消费者组,那么这个消费者可能会收到多个组的信息。
- Consumer Offset
- 消息是被记录到log文件中不断追加的,不会删除,所以每个消费者读取之后,需要记录一个位置,方便下次的顺序读取,这个对应log文件内容的Offset存储于一个固定的Topic之中,名称叫做:_consumer_offsets
- _consumer_offsets 默认会创建50个分区,默认为1个副本。这5O个分区均匀的轮流分布在Broker上。这个Topic中的消息主要存储了两种内容:
- GroupMetadata:保存了消费者组中各个消费者组的信息(每个消息者有编号)
- OffsetAndMetadata:消费者组和各个partition的offset元数据
- 消息是被记录到log文件中不断追加的,不会删除,所以每个消费者读取之后,需要记录一个位置,方便下次的顺序读取,这个对应log文件内容的Offset存储于一个固定的Topic之中,名称叫做:_consumer_offsets
- 消费者组的编号经过hash运算之后模50,获取到的值就是该消费者组对应的分区编号
- 新连接的消费者组默认消费连接之后的消息,要消费之前的消息需要手工设置消费的offset
- 当消费者消费完消息后会自动发送消息到该队列,修改对应的offset的值。也可以关闭
enable.auto.commit
,使用手工提交:- 同步提交:
consumer.commitSync();
- 异步提交:
consumer.commitAsync();
- 同步提交:
幂等性和事务
- 消息幂等性,使用PID(Producer ID)和Sequence Number。只能保证单分区单会话的幂等性
- 事务使用场景
- 发送多条消息
- 发送消息到多个Topic 或者多个Partition
- 消费以后然后发出
- 事务实现原理
- 二阶段提交(
2PC
),步骤:- 生产者通过
initTransactions API
向Coordinator注册事务ID。 - **Coordinator记录事务日志。 **
- **生产者把消息写入目标分区。 **
- 分区和Coordinator的交互。当事务完成以后,消息的状态应该是已提交, 这样消费者才可以消费到。
- 生产者通过
- 二阶段提交(
Stream:流
-
允许一个应用程序作为一个流处理器,消费一个或者多个 topic 产生的输入流,然后生产一个输出流到一个或多个 topic 中去,在输入输出流中进行有效的转换。
-
官方演示:
- 该演示内容是在输入topic与输出topic之间建立一个流处理器,计算输入的单词的次数,并将结果发送给输出topic
- 创建一个输入topic和一个输出topic
- 输入:
bin/kafka-topics.sh --create --bootstrap-server 192.168.18.22:9092 --topic stream-plaintext-input --partitions 1 --replication-factor 1
- 输出:
bin/kafka-topics.sh --create --bootstrap-server 192.168.18.22:9092 --topic stream-wordcount-output --partitions 1 --replication-factor 1 --config cleanup.policy=compact
- 输入:
- 运行WordCount程序:
- 使用命令
bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo
WordCountDemo
代码位于libs目录的jar包中:kafka-streams-examples-xxx.jar
- 使用命令
- 启动生产者,向输入topic发送消息
./bin/kafka-console-producer.sh --bootstrap-server 192.168.18.22:9092 --topic stream-plaintext-input
- 启动消费者,获取输出topic的内容:
./bin/kafka-console-consumer.sh --bootstrap-server 192.168.18.22:9092 --topic stream-wordcount-output --from-beginning --formatter kafka.tools.DefaultMessageFormatter --property print.key=true --property print.value=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer -property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
Interceptor:拦截器
- 针对Producer进行拦截,拦截器(interceptor)是在Kafka 0.10版本被引入的。
- Producer允许用户指定多个interceptor按序作用于同一条消息从 而形成一个拦截链(interceptor chain)。
- 可以使用Interceptor使得用户在消息发送前以及producer回调逻辑前有机会对消息做一些 定制化需求,比如修改消息等。
存储速度
- Kafka具有相当高的吞吐量与速度,怎么实现的呢,主要有以下4中因素
- 顺序IO
- 采用顺序IO,读写的数据在磁盘上是集中存储的,无需重复寻址过程
- Kafka消息是不断追加在本地磁盘的文件末尾的,因此使吞吐量得到显著提升
- 索引
- 使用偏移量所以以及时间戳索引搜寻数据,提高数据读取效率
- 批量读写和文件压缩
- 把所有消息变成一个批量文件,进行合理压缩减少网络IO损耗
- 零拷贝
- 概念
- 用户空间、内核空间
操作系统虚拟内存分为用户空间和内核空间,避免用户空间操作内核空间,保证内核安全。如果想要从磁盘读取数据,需要先将磁盘数据拷贝到内核缓冲区,然后从内核缓冲区拷贝到用户缓冲区,最后再拷贝给用户。过程如图
- DMA 拷贝
没有DMA之前拷贝数据是CPU去执行,这样如果传输数据量大会占用大量CPU资源从而出现问题,因此出现了DMA拷贝
将IO设备与内存的数据传输的工作交给DMA控制器,解放CPU
- 传统IO模型需要经过4步
- 优化后的IO
- 用户空间、内核空间
- 概念
- 整体结构流转:
高可靠架构
-
文件同步
- leader节点会根据每个节点的LEO(Log End Offset,下一条待写入的Offset,即最新的Offset+1),找到 HW (ISR中最小的LEO)。为了保证不丢失消费者最多只能消费到 HW 之前位置的消息。
- 随着follower 节点的不断同步消息,HW会发生变化,消费者能消费的消息也随之变化。当LEO和HW重叠,所有消息都可以 消费了:
- 主从同步步骤:
- follower节点向Leader发送fetch请求,leader向follower发送数据后更新follower的LEO
- follower接收到数据响应之后,更新自己的LEO
- Leader更新HW
- leader节点会根据每个节点的LEO(Log End Offset,下一条待写入的Offset,即最新的Offset+1),找到 HW (ISR中最小的LEO)。为了保证不丢失消费者最多只能消费到 HW 之前位置的消息。
-
故障处理
- follower节点宕机恢复之后,会将自己记录的HW之后的数据删除,然后从Leader节点同步数据
- Leader节点故障之后会选举出Leader节点,其他节点会将高于HW的数据删除,然后从新的Leader节点获取数据
-
leader节点的选举
- Broker Controller:使用 Zookeeper 选出一个Broker作为选举的主持
- 从 ISR 之中选择新的 leader,如果ISR 为空,那么会从 OSR 中选取leader
- AR (所有follower节点)= ISR(活动的follower节点) + OSR(不活动的follower节点)。超过一定时间没有同步leader节点数据follower视为非活动节点
- ISR 中的节点编号最小的变为leader节点
使用实例
- 生产者
package com.qingshan.commit; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; /** * @author: qingshan */ public class CommitProducer { public static void main(String[] args) { Properties props=new Properties(); //props.put("bootstrap.servers","192.168.44.161:9093,192.168.44.161:9094,192.168.44.161:9095"); props.put("bootstrap.servers","192.168.44.160:9092"); props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); // 0 发出去就确认 | 1 leader 落盘就确认| all 所有Follower同步完才确认 props.put("acks","1"); // 异常自动重试次数 props.put("retries",3); // 多少条数据发送一次,默认16K props.put("batch.size",16384); // 批量发送的等待时间 props.put("linger.ms",5); // 客户端缓冲区大小,默认32M,满了也会触发消息发送 props.put("buffer.memory",33554432); // 获取元数据时生产者的阻塞时间,超时后抛出异常 props.put("max.block.ms",3000); Producer<String,String> producer = new KafkaProducer<String,String>(props); for (int i =0 ;i<100;i++) { producer.send(new ProducerRecord<String,String>("commit-test",Integer.toString(i),Integer.toString(i))); } producer.close(); } }
- 消费者
package com.qingshan.commit; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Properties; /** * @author: qingshan */ public class CommitConsumer { public static void main(String[] args) { Properties props= new Properties(); //props.put("bootstrap.servers","192.168.44.161:9093,192.168.44.161:9094,192.168.44.161:9095"); props.put("bootstrap.servers","192.168.44.160:9092"); props.put("group.id","gp-test-group1"); // 是否自动提交偏移量,只有commit之后才更新消费组的 offset props.put("enable.auto.commit","true"); // 消费者自动提交的间隔 props.put("auto.commit.interval.ms","1000"); // 从最早的数据开始消费 earliest | latest | none props.put("auto.offset.reset","earliest"); props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String,String> consumer=new KafkaConsumer<String, String>(props); // 订阅 topic consumer.subscribe(Arrays.asList("commit-test")); final int minBatchSize = 200; List<ConsumerRecord<String, String>> buffer = new ArrayList<>(); // 手动提交 while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d ,key =%s, value= %s, partition= %s%n" ,record.offset(),record.key(),record.value(),record.partition()); buffer.add(record); } if (buffer.size() >= minBatchSize) { // 同步提交 consumer.commitSync(); buffer.clear(); } } } }
- 创建三个拦截器,针对于产生的消息进行不同处理
- 导入依赖
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.12</artifactId> <version>2.1.1</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.1.1</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>2.1.1</version> </dependency>
- **Time Interceptor:在消息内容前边加上时间戳 **
package com.study.kafka.interceptor; import org.apache.kafka.clients.producer.ProducerInterceptor; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.Map; public class TimeInterceptor implements ProducerInterceptor<String, String> { @Override public void configure(Map<String, ?> configs) { } /** * 发送消息到Borker之前调用,在消息被序列化和落地到具体的partition 之前执行 * 可以修改指定实际存储到哪个topic 的 partition 上去 * 该方法封装进KafkaProducer.send方法中,即它运行在用户主线程中。 * Producer确保在消息被序列化 以及计算分区前调用该方法。 * 用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的 topic和分区,否则会影响目标分区的计算 */ @Override public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) { // 创建一个新的record,把时间戳写入消息体的最前部 return new ProducerRecord(record.topic(), record.partition(), record.timestamp(), record.key(), System.currentTimeMillis() + "," + record.value().toString()); } @Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) { } @Override public void close() { } }
- Counter Interceptor:统计消息发送成功与失败的数量
package com.study.kafka.interceptor; import org.apache.kafka.clients.producer.ProducerInterceptor; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.Map; public class CounterInterceptor implements ProducerInterceptor<String, String> { private int errorCounter = 0; private int successCounter = 0; @Override public void configure(Map<String, ?> configs) { } @Override public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) { return record; } /** * 该方法会在消息被应答或消息发送失败时调用,并且通常都是在producer回调逻辑触发之前。 * onAcknowledgement运行在producer的IO线程中,因此不要在方法中放入很重的逻辑,否则会拖慢producer的消息发送效率 * */ @Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) { // 统计成功和失败的次数 if (exception == null) { successCounter++; } else { errorCounter++; } } /** *关闭interceptor,主要用于执行一些资源清理工作 */ @Override public void close() { // 保存结果 System.out.println("Successful sent: " + successCounter); System.out.println("Failed sent: " + errorCounter); } }
- Producer调用拦截器
package com.study.kafka.interceptor; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.ArrayList; import java.util.List; import java.util.Properties; public class InterceptorProducer { public static void main(String[] args) throws Exception { // 1 设置配置信息 Properties props = new Properties(); props.put("bootstrap.servers", "192.168.100.249:9092"); // 默认为1;当为all时候值为-1,表示所有的都需要同步(一致性最高相对性能也会有所降低) props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 2 构建拦截链 List<String> interceptors = new ArrayList<>(); interceptors.add("com.study.kafka.interceptor.TimeInterceptor"); interceptors.add("com.study.kafka.interceptor.CounterInterceptor"); props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors); String topic = "test"; Producer<String, String> producer = new KafkaProducer<>(props); // 3 发送消息 for (int i = 0; i < 10; i++) { ProducerRecord<String, String> record = new ProducerRecord<>(topic, "message" + i); producer.send(record); // message0 , -> 123129374927,message0 // 成功: // 失败: } // 4 一定要关闭producer,这样才会调用interceptor的close方法 producer.close(); } }
- Interceptor可能被运行在多个线程中,因此在具体实现时用户需要自行确保线程安全。另外 倘若指定了多个interceptor,则Producer将按照指定顺序调用它们,并仅仅是捕获每个Interceptor可 能抛出的异常记录到错误日志中而非在向上传递。这在使用过程中要特别留意。
- 导入依赖
使用场景
- 消息中间件
- 消息系统被用于各种场景(解耦数据生产者,缓存未处理的消息),通常消息传递使用较低的吞吐量,但可能要求较低的端到端延迟。在此方面Kafka 可以与传统的消息传递系统(ActiveMQ 和 RabbitMQ)相媲美
- kafka 可以替换传统的消息系统,与大多数消息系统比较,kafka 有更好的吞吐量,副本和故障转移等功能,内置分区提供强大的持久性来满足这一要求这有利于处理大规模的消息。
- 跟踪网站活动
- **kafka 的最初始作用就是是将用户活动跟踪管道重建为一组实时发布-订阅源。 **
- **把网站活动(浏览网页、搜索或其他的用户操作)发布到中心 topic,其中每个活动类型有一个 topic。 **
- 这些订阅源提供一系列用例,包括实时处理、实时监视、对加载到Hadoop或离线数据仓库系统的数据进行离线处理和报告等。
- 每个用户浏览网页时都生成了许多活动信息,因此活动跟踪的数据量通常非常大。使用 kafka就相当合适。
- **kafka 的最初始作用就是是将用户活动跟踪管道重建为一组实时发布-订阅源。 **
- 日志聚合
- 日志聚合系统:从服务器收集物理日志文件,并将其置于一个中心系统(文件服务器或HDFS)进行处理。可以使用 kafka来作为日志聚合解决方案。
- kafka 从这些日志文件中提取信息,并将其抽象为一个更加清晰的消息流。 这样可以实现更低的延迟处理且易于支持多个数据源及分布式数据的消耗。
- 与 Scribe 或 Flume 等以日志为中心的系统相比,Kafka具备同样出色的性能、更强的耐用性(因为复制功能)和更低的端到端延迟。
- 流处理
- 从0.10.0.0开始,kafka 支持轻量,但功能强大的流处理。
- kafka消息处理包含多个阶段。
- 原始输入数据是从kafka主题消费的,然后汇总,丰富,或者以其他的方式处理转化为新主题以供进一步消费或后续处理。
- 例如:一个推荐新闻文章,文章内容可能从“articles”主题获取;然后进一步处理内容,得到一个处理后的新内容,最后推荐给用户。这种处理是基于单个主题的实时数据流。
- 除了Kafka Streams,还有 Apache Storm 和 Apache Samza 也是不错的流处理框架。
- 事件采集
- Event sourcing是一种应用程序设计风格,按时间来记录状态的更改。
- Kafka 可以存储非常多的日志数据,为基于 event sourcing 的应用程序提供强有力的支持
- 提交日志
- **kafka 可以从外部为分布式系统提供日志提交功能。 日志有助于记录节点和行为间的数据,采用重新同步机制可以从失败节点恢复数据。 **
- Kafka的日志压缩 功能支持这一用法。 这一点与Apache BookKeeper 项目类似
集群部署
- **首先安装 分布式协调服务zookeeper,zookeeper安装 **
- 下载安装包,解压之后,通过复制得到一份配置文件:
cp zoo_sample.cfg zoo.cfg
,修改zoo.cfg
dataDir=/data/zookeeper/data dataLogDir=/data/zookeeper/logs server.1=192.168.18.22:8880:7770 server.2=192.168.18.24:8880:7770 server.3=192.168.18.27:8880:7770
- 将配置文件拷贝到另外两个节点的服务器上
- 分别在三个节点的
/data/zookeeper/data
目录下创建myid
文件, 第一个节点文件内容为 1,第二个节点文件内容为 2,第三个节点文件内容为 3。 - 分别在三台服务器上执行启动命令:
./bin/zkServer.sh start
- **确定启动是否正常 **
./bin/zkServer.sh status
- 下载安装包,解压之后,通过复制得到一份配置文件:
- kafka的安装
-
**进入到 **
config
目录修改server.properties
文件,三台服务器上都进行修改,broker.id
每台服务器不一样都要修改broker.id=0 listeners=PLAINTEXT://192.168.18.22:9092 log.dirs=/data/kafka/kafka-logs zookeeper.connect=192.168.18.22:2181,192.168.18.24:2181,192.168.18.27:2181
broker.id=1 listeners=PLAINTEXT://192.168.18.24:9092 log.dirs=/data/kafka/kafka-logs zookeeper.connect=192.168.18.22:2181,192.168.18.24:2181,192.168.18.27:2181
broker.id=2 listeners=PLAINTEXT://192.168.18.27:9092 log.dirs=/data/kafka/kafka-logs zookeeper.connect=192.168.18.22:2181,192.168.18.24:2181,192.168.18.27:2181
-
启动:
./bin/kafka-server-start.sh config/server.properties &
-
**创建topic(默认需要手动创建,可以修改配置文件让kafka自动创建不存在的topic) **
- 创建一个名称为 test topic ,指定 test topic有一个分区,一个副本。
./bin/kafka-topics.sh --create --topic test --bootstrap-server 192.168.18.22:9092 --partitions 1 --replication-factor
- 查看topic信息
./bin/kafka-topics.sh --describe --topic test --bootstrap-server 192.168.18.22:9092
- 创建一个名称为 test topic ,指定 test topic有一个分区,一个副本。
-
发送消息:用kafka自带的命名模拟消息发送
./bin/kafka-console-producer.sh --bootstrap-server 192.168.18.22:9092 --topic test
-
消费消息:用kafka自带的命名模拟消息消费
./bin/kafka-console-consumer.sh --bootstrap-server 192.168.18.22:9092 --topic test --from-beginning
-
监控
- kafka web console
- kafka monitor
- kafka manager
- kafka-eagle