Jason Pan

构建大数据处理流水线

潘忠显 / 2023-04-02


[TOC]

大数据已经势不可挡,每个企业都可以利用数据。当前各种组织面临的一个重要挑战是:设计能够将数据快速转化为战略决策的强大系统。

要应对这一挑战,就需要建立大数据处理流水线。它有非常多的应用场景,例如:

本文会依次介绍大数据相关组件选择,Kafka、Flink 和 Cassandra 的设置和启动,建立 Scala 数据处理项目,Flink 提交并运行任务。通过一个实际项目,构建出大数据处理流水线。

通过实际操作,你会熟悉各种大数据组件的使用,完成大数据处理的 Hello World!

一、组件的选择

如果你在大公司或者是使用云上的集群服务,Kafka、Flink 集群会作为基础设施,是可以直接使用的。即使在本机,这些组件也是可以通过 Docker 直接使用。但是为了更清楚地理解这些组件、语言之间的关系,下文会使用从官网下载压缩包、通过提供的脚本配置和启动 Kafka和Flink。

通常大数据的处理,会使用消息队列接收下海量数据,然后用流计算平台去消费数据并计算,最后会汇总在不同的存储中。当然,也会有多级连接的情况,比如中间计算结果会再进入到消息队列,再次经过流计算系统等等。

big-data-pipeline

Apache Spark 是一个被大规模使用的开源框架,允许近乎实时的数据处理。 Apache Spark 不提供原生流处理。Spark使用微批处理,可以模拟实时。相比于 Spark, Apache Flink 更专注于实时处理,延迟要低于 Spark,而且其作业优化是自动完成的,使用者无需关注 shuffles, broadcasts 等操作。

Apache Kafka

消息队列有不少组件可供选择。他们的侧重点会略有不同,一些侧重易于集成,另一些可能侧重健壮性等。

最受欢迎的消息代理之一是 RabbitMQ,它可以在许多用例中充当消息流的可靠来源。但当涉及到大数据应用时,有些属性 RabbitMQ 就很难保证了。比如分区容错性、高吞吐量、水平可伸缩性和强冗余。

而在这些方面,Kafka 都有一些优势,使其成为大数据实时处理的常用消息队列选择。

Scala 语言

Scala 是一种发展非常迅速的语言,它同时具备面向对象编程范式的优点,以及函数式编程语言的优点。它具有令人很强的通用性、简洁性,以及相当高的性能。Scala 可以被编译成 Java 字节码,最终由 JVM 执行。

Spark 使 Scala 成为大数据应用程序的顶级编程语言之一,Spark 使用 Scala 会获得很好的性能。这使得大数据生态系统将 Scala 视为一种优雅且高效的语言。另外,Flink 和 Kafka 都是用 Scala(以及 Java)编写的。

Apache Cassandra

Cassandra 是一个 NoSQL 分布式数据库。目前DB-Engines上排名11,这里演示选择 Cassandra 有几个原因:Apache 项目,Flink有官方连接器;配置和接口都比较简单;可顺便了解一下这个数据库。

组件版本

二、环境搭建

我这里使用的MacBook进行实验。

2.1 基本环境

安装 Java 1.8,使用OpenJDK可能会遇到一些奇怪的问题。

brew install openjdk@8
export PATH="/usr/local/opt/openjdk@8/bin:$PATH" # 可以添加到 ~/.zshrc 中

安装 Scala,根据Scala 官方指引,在 MacBook 上可以直接使用 brew 安装:

brew install coursier/formulas/coursier && cs setup

在安装完之后,会有以下命令行工具可用:

2.2 Kafka 配置

真正的解释清楚 Kafka 架构可能会很复杂,这里简化对 Kafka 的一些术语和原理做下解释。

Kafka 是一个事件流框架,这里提到的”事件“和消息队列中的”消息“两个词,具有相同的含义。事件 会被发送到指定的主题主题 包含了一系列有序的事件。主题可以被分区(partitioned)和复制(replicated)。

Kafka 对外服务的节点,叫做代理(brokers),负责主题日志。Zookeeper 主要负责协调服务器和管理主题。所谓的Kafka 集群,就是启动一个由 Zookeeper 协调的多个brokers。

通常,生产者将事件发送到指定 topic,消费者从这些主题消费。消费的时候,可以自己指定消费组,集群可以为不同的消费组记录消费到的位置,不会相互影响。

Kafka 的官方文档对其设计介绍的非常清晰,有兴趣的同学可以进一步阅读:https://kafka.apache.org/documentation/#design。也有一个相对简单的介绍,可以了解:https://www.instaclustr.com/blog/apache-kafka-architecture/

启动本地 Kafka 集群

kafka-downloads

使用 Kafka binary 包,需要从Kafka 官网上下载压缩包,进行解压(上图中 Scala 2.13 对应的版本),有两个文件夹可以关注一下:

启动 Zookeeper

./bin/zookeeper-server-start.sh ./config/zookeeper.properties

启动一个 Broker

./bin/kafka-server-start.sh ./config/server.properties

创建一个主题(主题叫 flink-input,只复制一次,指定 server 地址):

./bin/kafka-topics.sh --create --topic flink-input --replication-factor 1 --bootstrap-server localhost:9092

上边复制因子(replication-factor)设置的只能是1,因为我们只启动了一个代理的,而复制因子不能超过代理数量。

生产和消费消息

这里先直接使用Kafka 提供的脚本工具,测试一下消费的生产和消费。

查询当前 Kafka 中有哪些主题(目前只有 “flink-input”):

./bin/kafka-topics.sh --bootstrap-server=localhost:9092 --list

生产消息(将消息塞入到指定主题),首先运行下边命令,会出现一个交互模式,在交互模式下,每一行会作为一个条消息存储在 Kafka 的主题中:

bin/kafka-console-producer.sh --topic flink-input --bootstrap-server localhost:9092

消费消息,这里跟 producer.sh 的参数区别在于加了个 --from-beginning 表示从头消费;如果不加,默认是从尾部开始消费:

bin/kafka-console-consumer.sh --topic flink-input --bootstrap-server localhost:9092 --from-beginning

start-kafka-cluster

与上节对 Kafka 的术语和框架介绍类似,这里简单地描述一下 Flink 的作业管理器任务管理器

Flink 的官方文档对架构的介绍,也是非常清楚的,图文并茂,可以进一步去了解。

需要注意的是,Flink 1.15.4 是基于 Scala 2.12 的。所以后边开发应用程序,应该在 SBT 中做配置,不然当安装了不同 Scala 版本时会有问题。

Flink 官网下载对应的包,我这里选的 “flink-1.15.4-bin-scala_2.12.tgz",不仅下载解压和上节 Kafka 的处理非常相似,其目录结构和启动方式也非常相似:

启动 Flink 集群

./bin/start-cluster.sh

启动集群之后,Flink 准备好了接收要运行的作业。可以浏览器访问http://localhost:8081,直接通过其 Web UI 检查运行状态(可以看到有1个可用的 Slot):

flink-web-ui

Flink 允许我们提交 .jar 文件。接下来一节,会创建一个简单的 Scala 应用程序,生成 .jar 作为 Flink 的作业,再进行提交任务的演示。

调试 Flink 应用程序可能很棘手,但是可以通过查询 Flink TaskExecutor 作业输出日志,快速了解具体发生了什么。需要切换到 Flink 根目录下运行:

tail ./log/flink-*-taskexecutor-*.out

2.4 配置 Cassandra

这里直接拉取容器,来启动一个 Cassandra 服务(服务会使用到9042端口):

docker run -p 9042:9042 –rm –name cassandra -d cassandra:3.11

利用镜像中的 cqlsh 跟 Cassandra 服务进行交互。CQL 代表 Cassandra 查询语言(Cassandra 不是SQL):

docker exec -it cassandra cqlsh

创建键空间和包含单列的表,进入交互模式后,输入CQL语句:

CREATE KEYSPACE cassandraSink WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : '1'};
USE cassandraSink
CREATE TABLE IF NOT EXISTS messages (payload text PRIMARY KEY);

等我们的 Flink 启动以后,就可以进入到 cqlsh 来查询键空间中是否有对应的消息了:

SELECT * FROM cassandraSink.messages;

三、Scala 应用程序

前文其实有提到 Scala 可以按照官网的步骤进行安装,安装完之后,会有 sbt 指令可以用。sbt 是 Scala 社区中首选的包管理器, sbt 还提供了一些用于开始使用 Scala 的模板项目。

创建一个Scala 2项目,非常简单(会提示输入项目名,我这里命名为 “hello-world”):

sbt new scala/hello-world.g8

运行,跳转到项目目录下,可以直接运行:

cd hello-world
sbt run

经过编译和运行,很快就会看到打印的屏幕输出:

scala-hello-world

上边运行 sbt run 的同时,会产生对应 .jar 文件。上图中,最后还将其提交到 Flink 集群进行运行:

../flink-1.15.4/bin/flink run ./target/scala-2.13/hello-world_2.13-1.0.jar

3.1 SBT 配置

前面提到 Flink 允许我们提交自己的 .jar 文件,我们需要添加非 Flink 原生的依赖项,比如 connector。这些信息必须存在于 jar 中,一个包所有依赖的 jar 被称为 fat jaruber jar。我们可以通过 sbt 管理这些包。

根据sbt 插件文档,在项目中安装插件的有效方法是创建一个"plugins.sbt"文件。此文件应位于项目根目录的 /project 目录中。

我们例子中,目前唯一插件是 assembly plugin。因此,我们的 plugins.sbt 应该是这样的:

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "1.2.0")

前边也介绍了,Flink 1.15.4 需要 Scala 2.12.7 才能运行,这里需要同时将 build.sbt 文件中的 scalaVersion := "2.13.x" 做替换:

scalaVersion := "2.12.7"

在build.sbt中,给 Scala 项目添加需要依赖的基础库:

libraryDependencies += "org.scala-lang.modules" %% "scala-parser-combinators" % "2.1.1";
libraryDependencies += "org.apache.flink" % "flink-core" % "1.15.0"
libraryDependencies += "org.apache.flink" %% "flink-streaming-scala" % "1.15.0";
libraryDependencies += "org.apache.flink" % "flink-connector-kafka" % "1.15.0";
libraryDependencies += "org.apache.flink" %% "flink-connector-cassandra" % "1.15.0";
libraryDependencies += "org.apache.flink" % "flink-clients" % "1.15.0";

build.sbt 文件中的每一条都是 设置表达式:表达式由一个键、一个运算符和一个值组成。最常见的设置表达式是依赖表达式,以上边最后一行为例:libraryDependencies 是一个键,+= 是个运算符,"org.apache.flink" % "flink-clients" % "1.15.0" 是值,% 分割的三个字符串分别代表 Maven 工件 group ID、artifact ID 和版本。

3.2 Scala 项目代码

本节会修改 hello-world 项目中的 ./src/main/scala/Main.scala 文件,将事件从 Kafka 中取出存储到 Cassandra 表的逻辑,用 Scala 代码实现。将数据从一个系统推送到另外一个系统的动作,有个术语来描述—— sink。

先看一下原来的代码文件,文件名和对象名需要相同(分别是Main.scala 和Main),对象扩展了 App trait:

object Main extends App {
  println("Hello, World!")
}

接下来,替换 Main.scala 的内容:

import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.cassandra.CassandraSink
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.flink.util.Collector
import org.apache.flink.streaming.api.scala._

object Main extends App{

 // 环境初始化。配置 CheckpointingMode
 val env = StreamExecutionEnvironment.getExecutionEnvironment
 env.enableCheckpointing(60000, CheckpointingMode.EXACTLY_ONCE)
 env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)

 // 构造 KafkaSource,用于从中消费消息
 val source : KafkaSource[String] = KafkaSource.builder()
   .setBootstrapServers("localhost:9092")
   .setTopics("flink-input")
   .setGroupId("group1")
   .setStartingOffsets(OffsetsInitializer.earliest())
   .setValueOnlyDeserializer(new SimpleStringSchema())
   .build()

 // 定义如何处理source种获得的消息。在Scala中,CassandraSink 只能使用Tuples
 val tuples = env.fromSource(source, WatermarkStrategy.noWatermarks(), "KafkaSource")
   .flatMap(
     new FlatMapFunction[String,Tuple1[String]]{
       override def flatMap(t: String, collector: Collector[Tuple1[String]]): Unit =
         collector.collect(Tuple1[String](t))
     }
   )

 // 将处理后的消息下沉到 Cassandra
 CassandraSink.addSink(tuples)
   .setHost("127.0.0.1")
   .setQuery("INSERT INTO cassandraKafkaSink.messages (payload) values (?);")
   .build()
   .name("flinkTestSink")

 //executing Flink job
 env.execute("Flink test")

}

3.3 项目打包

前面也曾提到过,我们要提交的 jar 文件需要将所有的依赖都打进去,因此我们需要执行指令:

sbt assembly

这样之后会得到一个完整的包,路径类似于“./target/scala-2.12/hello-world-assembly-1.0.jar”,可以看到单独的 jar 文件是12K,完整的包则是 112M。

sbt-assembly

经过前面几节的介绍,当前我们的服务器上,已经有了以下的服务和文件:

万事俱备,本节就来提交一下 Flink 作业,然后进行测试。

提交任务可以从前面提到的 http://localhost:8081/#/submit Web UI 进行操作,也可以直接命令行操作:

../flink-1.15.4/bin/flink run ./target/scala-2.12/hello-world-assembly-1.0.jar

然后在管理端页面上,可以看到,提交的任务被运行起来了:

flink-web-ui-submit-task

然后我们再往 Kafka topic 里边写入消息,写一条查一次,就可以观察到,会实时的被写入到了 Cassandra:

big-data-pipeline-test

后续

本文简单介绍了大数据处理的基本流程、组件,用 Scala 构建出 .jar 文件并提交到 Flink 做流计算任务。

实际的项目,会比本文示例项目复杂得多。

在后续文章中,会进一步介绍 Flink DataStream API (窗口,Join),任务处理的并行执行,Scala 语言进阶等内容。

参考文献