为什么需要kafka kafka用来干嘛的

1.卡夫卡是什么?Kafka是一个分布式、分区、多副本和多订户的平台,最初由Linkedin开发。它是一个基于ZooKeeper协调的分布式流处理平台,由Scala和Java编写。它通常用于web日志、

本文最后更新时间:  2023-03-26 11:58:39

1.卡夫卡是什么?

Kafka是一个分布式、分区、多副本和多订户的平台,最初由Linkedin开发。它是一个基于ZooKeeper协调的分布式流处理平台,由Scala和Java编写。它通常用于web日志、访问日志、消息服务等。Linkedin在2010年将其贡献给Apache基金会,成为顶级开源项目。

1.卡夫卡的核心建筑和功能

Kafka的核心功能是:高性能的消息发送和高性能的消息消费;其核心架构如下:

2.卡夫卡的主要功能可以用下面三句话来概括。

生产者发送消息给Kafka服务器消费者从Kafka服务器读取消息Kafka服务器依托ZooKeeper集群进行服务的协调管理

3.卡夫卡涉及的术语

Broker:一般中文翻译为broker、broker、agent,但这些词并不能很好的表达broker的意思,所以只知道broker的意思是Kafka服务器;

Message: message,使用紧凑的二进制字节数据保存消息,节省内存空和传输效率;

主题:Topic,代表一种消息;

分区:Partition,kafka采用主题-分区-消息的三级结构来分配负载;

Offset: displacement,给分区下的每条消息分配一个位移值,表示消息在分区中的位置;应当指出,该消息还具有位移概念;

副本:copy,备份多个日志,防止数据丢失;有两种类型:领导者副本和追随者副本。

领导者和追随者:领导者和追随者。通常情况下,领导对外提供服务。跟随者只是被动地跟随领导的状态,与领导保持步调一致,充当领导的替补,让领导挂机后能被选举为新领导接手工作;

ISR(in-sync replica):表示保持与leaderreplica到replica集的同步,

4.Kafka的安装和使用实例

1)下载安装包

卡夫卡下载地址:https://kafka.apache.org/downloads

最新版本是:

kafka_2.11-2.3.1.tgzkafka_2.12-2.3.1.tgz

在这两个文件中,2.11/2.12分别表示编译Kafka的Scala语言版本,后面的2.3.1就是Kafka的版本。从稳定性来看,Scala2.11版本2.11编译的Kafka更好,我们可以下载Kafka _ 2.11-2.3.1.tgz。

2)安装

下载后上传到我们的服务器。

解压缩:tar -zxvf kafka_2.11-2.3.1.tgcd kafka_2.11-2.3.1

启动Kafka(启动前应安装JDK8)

Kafka解压后可以直接运行,进入可执行目录,查看可执行文件,如下图所示:

3)启动ZooKeeper

首先要启动ZooKeeper服务器(ZooKeeper是为Kafka提供协调服务的工具),Kafka内置了ZooKeeper,可以直接启动:

./zookeeper-server-start.sh ../config/zookeeper.properties

启动日志如下所示:

[2019-10-28 09:22:17,425] INFOReading configuration from: ../config/zookeeper.properties(org.apache.zookeeper.server.quorum.QuorumPeerConfig)[2019-10-28 09:22:17,437] INFOautopurge.snapRetainCount set to 3(org.apache.zookeeper.server.DatadirCleanupManager)[2019-10-28 09:22:17,437] INFOautopurge.purgeInterval set to 0(org.apache.zookeeper.server.DatadirCleanupManager)[2019-10-28 09:22:17,437] INFO Purgetask is not scheduled. (org.apache.zookeeper.server.DatadirCleanupManager)[2019-10-28 09:22:17,438] WARNEither no config or no quorum defined in config, running in standalone mode(org.apache.zookeeper.server.quorum.QuorumPeerMain)[2019-10-28 09:22:17,487] INFOReading configuration from: ../config/zookeeper.properties(org.apache.zookeeper.server.quorum.QuorumPeerConfig)[2019-10-28 09:22:17,488] INFOStarting server (org.apache.zookeeper.server.ZooKeeperServerMain)[2019-10-28 09:22:17,512] INFOServerenvironment:zookeeper.version=3.4.14-4c25d480e66aadd371de8bd2fd8da255ac140bcf,built on 03/06/2019 16:18 GMT (org.apache.zookeeper.server.ZooKeeperServer)[2019-10-28 09:22:17,512] INFOServer environment:host.name=localhost (org.apache.zookeeper.server.ZooKeeperServer)[2019-10-28 09:22:17,512] INFOServer environment:java.version=1.8.0_25(org.apache.zookeeper.server.ZooKeeperServer)…[2019-10-28 09:22:17,586] INFObinding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)

当启动日志中出现0.0.0.0/0.0.0: 2181时,启动基本成功(使用端口2181)。

4)启动卡夫卡

然后启动Kafka服务器并启动命令:

./kafka-server-start.sh ../config/server.properties

启动日志如下所示:

[2019-10-28 09:22:45,571] INFORegistered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)[2019-10-28 09:22:46,495] INFORegistered signal handlers for TERM, INT, HUP (org.apache.kafka.common.utils.LoggingSignalHandler)[2019-10-28 09:22:46,497] INFOstarting (kafka.server.KafkaServer)[2019-10-28 09:22:46,499] INFOConnecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)[2019-10-28 09:22:46,547] INFO[ZooKeeperClient Kafka server] Initializing a new session to localhost:2181. (kafka.zookeeper.ZooKeeperClient)…[2019-10-28 09:22:50,272] INFO Kafkaversion: 2.3.1 (org.apache.kafka.common.utils.AppInfoParser)[2019-10-28 09:22:50,273] INFO KafkacommitId: 18a913733fb71c01 (org.apache.kafka.common.utils.AppInfoParser)[2019-10-28 09:22:50,273] INFO KafkastartTimeMs: 1572225770265 (org.apache.kafka.common.utils.AppInfoParser)[2019-10-28 09:22:50,281] INFO [KafkaServerid=0] started (kafka.server.KafkaServer)

启动日志中出现[kafka server id = 0]started(Kafka . server . Kafka server)表示Kafka启动成功(默认服务端口为9092)

5)创建一个主题

Kafka启动成功后,我们会用消息的发送和消费来创建一个话题。下面的日志将创建一个名为test的主题,其中只有一个分区和一个副本。

./kafka-topics.sh --create--zookeeper localhost:2181 --topic test --partitions 1 --replication-factor 1

成功创建主题后,您可以查看主题的状态:

6)检查题目

命令:

./kafka-topics.sh --describe --zookeeperlocalhost:2181 --topic test

信息显示如下:

[root@localhost bin]# ./kafka-topics.sh--describe --zookeeper localhost:2181 --topic testTopic:test PartitionCount:1 ReplicationFactor:1Configs:Topic:test Partition: 0 Leader: 0Replicas:0 Isr: 0

7)生产新闻

Kafka提供脚本攻击,可以持续接收标准输入,并发送到指定主题。脚本工具启动命令如下:

./kafka-console-producer.sh--broker-list localhost:9092 --topic test

控制台输入信息:

[root@localhost bin]# ./kafka-console-producer.sh--broker-list localhost:9092 --topic test>hello world!>this my first Kafka example;

打开这个控制台后,可以连续输入字符形成消息,按回车键后文本就会发出。按Ctrl+C退出。

8)消费者新闻

卡夫卡也提供了相应的剧本。用户使用指定主题下的消息,并将它们打印到标准输出。

./kafka-console-consumer.sh--bootstrap-server localhost:9092 --topic test --from-beginning

控制打印如下:

[root@localhost bin]# ./kafka-console-consumer.sh--bootstrap-server localhost:9092 --topic test --from-beginninghello world!this my first Kafka example;

二。卡夫卡设计思想的要点

1.消息引擎系统

消息引擎系统用于在不同的应用程序之间传输消息,因此其他设计包括两个重要因素:消息设计和传输协议设计。

消息设计:Kafka消息是用二进制方式来保存的结构化消息;传输协议设计:Kafka自定义一套二进制的消息传输协议;

2.消息引擎范例:

1)消息队列模型

基于队列提供消息传输服务,主要用于进程/线程之间的通信。该模型定义了消息队列、发送方和接收方,并提供了点对点的消息传递模式。示例:客服接听电话

2)发布/订阅模型

定义了生产者和消费者、发布者和订阅者等角色。发布者生成消息并将其发送到指定的主题,所有订阅了该主题的订阅者都可以接收到该主题下的所有消息。例如:报纸订阅

Kafka同时支持这两种消息引擎范式,其初衷是为了解决互联网公司中超海量数据的实时传输。

3.卡夫卡的高通量低延迟设计思想

卡夫卡设计之初,考虑了以下四个方面。

Kafka实现高吞吐量、低延迟的设计思想是,每次写操作只将数据写入操作系统的页面缓存,然后由操作系统决定何时将页面缓存中的数据写回磁盘。在读取消息时,首先会从OS的页面缓存中读取,如果命中,会通过页面缓存直接发送到网络的Socket(零拷贝技术)。

三。卡夫卡使用场景

Kafka以其消息引擎而闻名,该引擎特别适合在生产环境中处理那些流数据。下面是一些实际应用中的典型使用场景。

温馨提示:内容均由网友自行发布提供,仅用于学习交流,如有版权问题,请联系我们。