高手的存在,就是让服务10亿人的时候,你感觉只是为你一个人服务......

kafka入门

目录
  1. 1. 介绍
  2. 2. 与其他常用Message Queue对比
    1. 2.1. 术语
    2. 2.2. 拓扑结构
    3. 2.3. Topics 和Logs
    4. 2.4. 分布式
    5. 2.5. Producers
    6. 2.6. Consumers
    7. 2.7. 消息的有序性
  3. 3. 环境搭建
    1. 3.1. 下载kafka
    2. 3.2. 启动服务
      1. 3.2.1. 先启动zookeeper
      2. 3.2.2. 再启动kafka
    3. 3.3. 创建 topic
    4. 3.4. 发送消息
    5. 3.5. 启动consumer
    6. 3.6. 搭建一个多个broker的集群

公司的一些项目用的ActivemMQ,最近有个项目开始尝试使用Kafka,有幸帮忙测试了kafka-Producer和Consumer消息吞吐相关的性能,包括broker切换消息的丢失率。在此之前,我对kafka完全不了解。
kafka 作为分布式的消息系统,性能大大超过传统的ActiveMQ,市场前景很广,虽然目前还有一些不足的地方,但以后很可能会是分布式消息系统的主流,有必要学习一下!

介绍

kafka是Linkedin于2010年12月份开源的消息系统,它主要用于处理活跃的流式数据。活跃的流式数据在web网站应用中非常常见,这些数据包括网站的pv、用户访问了什么内容,搜索了什么内容等。 这些数据通常以日志的形式记录下来,然后每隔一段时间进行一次统计处理。
传统的日志分析系统提供了一种离线处理日志信息的可扩展方案,但若要进行实时处理,通常会有较大延迟。而现有的消息(队列)系统能够很好的处理实时或者近似实时的应用,但未处理的数据通常不会写到磁盘上,这对于Hadoop之类(一小时或者一天只处理一部分数据)的离线应用而言,可能存在问题。Kafka正是为了解决以上问题而设计的,它能够很好地离线和在线应用。

与其他常用Message Queue对比

  • RabbitMQ
    RabbitMQ是使用Erlang编写的一个开源的消息队列,本身支持很多的协议:AMQP,XMPP, SMTP, STOMP,也正因如此,它非常重量级,更适合于企业级的开发。同时实现了Broker构架,这意味着消息在发送给客户端时先在中心队列排队。对路由,负载均衡或者数据持久化都有很好的支持。
  • Redis
    Redis是一个基于Key-Value对的NoSQL数据库,开发维护很活跃。虽然它是一个Key-Value数据库存储系统,但它本身支持MQ功能,所以完全可以当做一个轻量级的队列服务来使用。对于RabbitMQ和Redis的入队和出队操作,各执行100万次,每10万次记录一次执行时间。测试数据分为128Bytes、512Bytes、1K和10K四个不同大小的数据。实验表明:入队时,当数据比较小时Redis的性能要高于RabbitMQ,而如果数据大小超过了10K,Redis则慢的无法忍受;出队时,无论数据大小,Redis都表现出非常好的性能,而RabbitMQ的出队性能则远低于Redis。
  • ZeroMQ
    ZeroMQ号称最快的消息队列系统,尤其针对大吞吐量的需求场景。ZMQ能够实现RabbitMQ不擅长的高级/复杂的队列,但是开发人员需要自己组合多种技术框架,技术上的复杂度是对这MQ能够应用成功的挑战。ZeroMQ具有一个独特的非中间件的模式,你不需要安装和运行一个消息服务器或中间件,因为你的应用程序将扮演了这个服务角色。你只需要简单的引用ZeroMQ程序库,可以使用NuGet安装,然后你就可以愉快的在应用程序之间发送消息了。但是ZeroMQ仅提供非持久性的队列,也就是说如果宕机,数据将会丢失。其中,Twitter的Storm 0.9.0以前的版本中默认使用ZeroMQ作为数据流的传输(Storm从0.9版本开始同时支持ZeroMQ和Netty作为传输模块)。
  • ActiveMQ
    ActiveMQ是Apache下的一个子项目。 类似于ZeroMQ,它能够以代理人和点对点的技术实现队列。同时类似于RabbitMQ,它少量代码就可以高效地实现高级应用场景。
    -Kafka/Jafka
    Kafka是Apache下的一个子项目,是一个高性能跨语言分布式发布/订阅消息队列系统,而Jafka是在Kafka之上孵化而来的,即Kafka的一个升级版。具有以下特性:快速持久化,可以在O(1)的系统开销下进行消息持久化;高吞吐,在一台普通的服务器上既可以达到10W/s的吞吐速率;完全的分布式系统,Broker、Producer、Consumer都原生自动支持分布式,自动实现复杂均衡;支持Hadoop数据并行加载,对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka通过Hadoop的并行加载机制来统一了在线和离线的消息处理。Apache Kafka相对于ActiveMQ是一个非常轻量级的消息系统,除了性能非常好之外,还是一个工作良好的分布式系统。

    术语

    首先让我们看几个基本的消息系统术语:
  • Broker
    Kafka集群包含一个或多个服务器,这种服务器被称为broker
  • Topic
    每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)
  • Partition
    Parition是物理上的概念,每个Topic包含一个或多个Partition.
  • Producer
    负责发布消息到Kafka broker
  • Consumer
    消息消费者,向Kafka broker读取消息的客户端。
  • Consumer Group
    每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。

    拓扑结构

    Alt text
    如上图所示,一个典型的Kafka集群中包含若干Producer(可以是web前端产生的Page View,或者是服务器日志,系统CPU、Memory等),若干broker(Kafka支持水平扩展,一般broker数量越多,集群吞吐率越高),若干Consumer Group,以及一个Zookeeper集群。Kafka通过Zookeeper管理集群配置,选举leader,以及在Consumer Group发生变化时进行rebalance。Producer使用push模式将消息发布到broker,Consumer使用pull模式从broker订阅并消费消息。
    需要提到的一点:客户端和服务端通过TCP协议通信。Kafka提供了Java客户端,并且对多种语言都提供了支持。

    Topics 和Logs

    先来看一下Kafka提供的一个抽象概念:topic.
    一个topic是对一组消息的归纳。Topic在逻辑上可以被认为是一个queue,每条消费都必须指定它的topic,可以简单理解为必须指明把这条消息放进哪个queue里。对每个topic,Kafka 对它的日志进行了分区,如下图所示:
    Alt text
    每个分区都由一系列有序的、不可变的消息组成,这些消息被连续的追加到分区中。分区中的每个消息都有一个连续的序列号叫做offset,用来在分区中唯一的标识这个消息。
    在一个可配置的时间段内,Kafka集群保留所有发布的消息,不管这些消息有没有被消费。比如,如果消息的保存策略被设置为2天,那么在一个消息被发布的两天时间内,它都是可以被消费的。之后它将被丢弃以释放空间。Kafka的性能是和数据量无关的常量级的,所以保留太多的数据并不是问题。

实际上每个consumer唯一需要维护的数据是消息在日志中的位置,也就是offset.这个offset有consumer来维护:一般情况下随着consumer不断的读取消息,这offset的值不断增加,但其实consumer可以以任意的顺序读取消息,比如它可以将offset设置成为一个旧的值来重读之前的消息。

以上特点的结合,使Kafka consumers非常的轻量级:它们可以在不对集群和其他consumer造成影响的情况下读取消息。你可以使用命令行来”tail”消息而不会对其他正在消费消息的consumer造成影响。

将日志分区可以达到以下目的:首先这使得每个日志的数量不会太大,可以在单个服务上保存。另外每个分区可以单独发布和消费,为并发操作topic提供了一种可能。

分布式

每个分区在Kafka集群的若干服务中都有副本,这样这些持有副本的服务可以共同处理数据和请求,副本数量是可以配置的。副本使Kafka具备了容错能力。
每个分区都由一个服务器作为“leader”,零或若干服务器作为“followers”,leader负责处理消息的读和写,followers则去复制leader.如果leader down了,followers中的一台则会自动成为leader。集群中的每个服务都会同时扮演两个角色:作为它所持有的一部分分区的leader,同时作为其他分区的followers,这样集群就会据有较好的负载均衡。

Producers

Producer将消息发布到它指定的topic中,并负责决定发布到哪个分区。通常简单的由负载均衡机制随机选择分区,但也可以通过特定的分区函数选择分区。使用的更多的是第二种,我们也是使用的这个。

Consumers

发布消息通常有两种模式:队列模式(queuing)和发布-订阅模式(publish-subscribe)。队列模式中,consumers可以同时从服务端读取消息,每个消息只被其中一个consumer读到;发布-订阅模式中消息被广播到所有的consumer中。Consumers可以加入一个consumer 组,共同竞争一个topic,topic中的消息将被分发到组中的一个成员中。同一组中的consumer可以在不同的程序中,也可以在不同的机器上。如果所有的consumer都在一个组中,这就成为了传统的队列模式,在各consumer中实现负载均衡。如果所有的consumer都不在不同的组中,这就成为了发布-订阅模式,所有的消息都被分发到所有的consumer中。更常见的是,每个topic都有若干数量的consumer组,每个组都是一个逻辑上的“订阅者”,为了容错和更好的稳定性,每个组由若干consumer组成。这其实就是一个发布-订阅模式,只不过订阅者是个组而不是单个consumer。
Alt text
由两个机器组成的集群拥有4个分区 (P0-P3) 2个consumer组. A组有两个consumerB组有4个

消息的有序性

相比传统的消息系统,Kafka可以很好的保证有序性。
传统的队列在服务器上保存有序的消息,如果多个consumers同时从这个服务器消费消息,服务器就会以消息存储的顺序向consumer分发消息。虽然服务器按顺序发布消息,但是消息是被异步的分发到各consumer上,所以当消息到达时可能已经失去了原来的顺序,这意味着并发消费将导致顺序错乱。为了避免故障,这样的消息系统通常使用“专用consumer”的概念,其实就是只允许一个消费者消费消息,当然这就意味着失去了并发性。

在这方面Kafka做的更好,通过分区的概念,Kafka可以在多个consumer组并发的情况下提供较好的有序性和负载均衡。将每个分区分只分发给一个consumer组,这样一个分区就只被这个组的一个consumer消费,就可以顺序的消费这个分区的消息。因为有多个分区,依然可以在多个consumer组之间进行负载均衡。注意consumer组的数量不能多于分区的数量,也就是有多少分区就允许多少并发消费。

Kafka只能保证一个分区之内消息的有序性,在不同的分区之间是不可以的,这已经可以满足大部分应用的需求。如果需要topic中所有消息的有序性,那就只能让这个topic只有一个分区,当然也就只有一个consumer组消费它。


环境搭建

下载kafka

下载最新版本的kafka,我用的kafka_2.9.2-0.8.1.1

我是放在/opt 目录下的
tar -xvf kafka_2.9.2-0.8.1.1.tgz

启动服务

先启动zookeeper

Kafka用到了Zookeeper,所有首先启动Zookper,下面简单的启用一个单实例的Zookkeeper服务。可以在命令的结尾加个&符号,这样就可以启动后离开控制台。

cd kafka_2.9.2-0.8.1.1/bin/
bash zookeeper-server-start.sh /opt/kafka_2.9.2-0.8.1.1/config/zookeeper.properties &

这里可能会报错:
Unrecognized VM option ‘+UseCompressedOops’
Could not create the Java virtual machine.

解决方法:
查看 bin/kafka-run-class.sh
找到
if [ -z “$KAFKA_JVM_PERFORMANCE_OPTS” ]; then
KAFKA_JVM_PERFORMANCE_OPTS=”-server -XX:+UseCompressedOops -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark -XX:+DisableExplicitGC -Djava.awt.headless=true”
fi
去掉-XX:+UseCompressedOops
再启用就可以了

再启动kafka

bash kafka-server-start.sh /opt/kafka_2.9.2-0.8.1.1/config/server.properties &

如果报错,启动不了,可以多开几个终端试一试。

创建 topic

创建一个叫做“test”的topic,它只有一个分区,一个副本。

bash kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic test

可以通过list命令查看创建的topic:

bash kafka-topics.sh –list –zookeeper localhost:2181

除了手动创建topic,还可以配置broker让它自动创建topic.

发送消息

Kafka 使用一个简单的命令行producer,从文件中或者从标准输入中读取消息并发送到服务端。默认的每条命令将发送一条消息。
运行producer并在控制台中输一些消息,这些消息将被发送到服务端:

bash kafka-console-producer.sh –broker-list localhost:9092 –topic
test

ctrl+c可以退出发送。

启动consumer

Kafka也有一个命令行consumer可以读取消息并输出到标准输出

bash kafka-console-consumer.sh –zookeeper localhost:2181 –topic test –from-beginning

搭建一个多个broker的集群

刚才只是启动了单个broker,现在启动有3个broker组成的集群,这些broker节点也都是在本机上的:
首先为每个节点编写配置文件:

cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties

在拷贝出的新文件中添加以下参数:
config/server-1.properties:
broker.id=1
port=9093
log.dir=/tmp/kafka-logs-1

config/server-2.properties:
broker.id=2
port=9094
log.dir=/tmp/kafka-logs-2

broker.id在集群中唯一的标注一个节点,因为在同一个机器上,所以必须制定不同的端口和日志文件,避免数据被覆盖。
刚才已经启动可Zookeeper和一个节点,现在启动另外两个节点:

bin/kafka-server-start.sh config/server-1.properties &
bin/kafka-server-start.sh config/server-2.properties &

创建一个拥有3个副本的topic:

bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 3 –partitions 1 –topic my-replicated-topic

现在我们搭建了一个集群,怎么知道每个节点的信息呢?运行“”describe topics”命令就可以了:

bin/kafka-topics.sh –describe –zookeeper localhost:2181 –topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0

下面解释一下这些输出。第一行是对所有分区的一个描述,然后每个分区都会对应一行,因为我们只有一个分区所以下面就只加了一行。
leader:负责处理消息的读和写,leader是从所有节点中随机选择的.
replicas:列出了所有的副本节点,不管节点是否在服务中.
isr:是正在服务中的节点.
在我们的例子中,节点1是作为leader运行。
向topic发送消息:

bin/kafka-console-producer.sh –broker-list localhost:9092 –topic my-replicated-topic
my test message 1my test message 2

消费这些消息:

bin/kafka-console-consumer.sh –zookeeper localhost:2181 –from-beginning –topic my-replicated-topic

my test message 1
my test message 2

测试一下容错能力.Broker 1作为leader运行,现在我们kill掉它:

ps | grep server-1.properties7564 ttys002 0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.6/Home/bin/java…
kill -9 7564

另外一个节点被选做了leader,node 1 不再出现在 in-sync 副本列表中:

bin/kafka-topics.sh –describe –zookeeper localhost:218192 –topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 1,2,0 Isr: 2,0

虽然最初负责续写消息的leader down掉了,但之前的消息还是可以消费的:

bin/kafka-console-consumer.sh –zookeeper localhost:2181 –from-beginning –topic my-replicated-topic

my test message 1
my test message 2