当前位置:首页 > 技术文章 > 正文内容

SeaTunnel 实践 | SeaTunnel 帮你快速玩转 Spark 数据处理

arlanguage5个月前 (12-16)技术文章38

Databricks 开源的 Apache Spark 对于分布式数据处理来说是一个伟大的进步。我们在使用 Spark 时发现了很多可圈可点之处,我们在此与大家分享一下我们在简化 Spark 使用和编程以及加快 Spark 在生产环境落地上做的一些努力。

01

一个 Spark Streaming 读取 Kafka 的案例


以一个线上案例为例,介绍如何使用 Spark Streaming 统计 Nginx 后端日志中每个域名下每个状态码每分钟出现的次数,并将结果数据输出到外部数据源 Elasticsearch 中。其中原始数据已经通过 Rsyslog 传输到了 Kafka 中。


02

流数据读取


从 Kafka 中每隔一段时间读取数据,生成 DStream。


val stream = KafkaUtils.createDirectStream[String, String](
  streamingContext,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams)
)


具体方法请参考

Spark Streaming + Kafka Integration Guide:

https://spark.apache.org/docs/2.4.8/streaming-kafka-0-10-integration.html



03

数据清洗


日志案例


192.168.0.1 seatunnel.apache.org 127.0.0.1 0.001s [22/Feb/2021:22:12:15 +0800] "GET /seatunnel HTTP/1.1" 200 8938 "http://github.com/" - "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.87 Safari/537.36"


通过 Split 方法从非结构化的原始数据 message 中获取域名以及状态码字段,并组成方便聚合的结构化数据格式 Map(key -> value)


val splitList = message.split(" ")
val domain = splitList(1)
val httpCode = splitList(9)
val item = Map((domain, httpCode) -> 1L)


04

数据聚合


利用 Spark 提供的 reduceByKey 方法对数据进行聚合计算,统计每分钟每个域名下的每个错误码出现的次数,其中 mapRdd 是在清洗数据阶段组成的 RDD。


val reduceRdd = mapRdd.reduceByKey((a:Long, b:Long) => (a + b))


05

数据输出


利用 Spark 提供的 foreachRDD 方法将结果数据 reduceRdd 输出到外部数据源 Elasticsearch。


reduceRdd.foreachRDD(rdd => {
    rdd.saveToEs("es_index" + "/es_type", esCfg)
})


06

不可避免的麻烦


我们的确可以利用 Spark 提供的 API 对数据进行自由处理,但是整套逻辑的开发调试是个不小的工程,需要一定的 Spark 基础以及使用经验才能开发出稳定高效的 Spark 处理流程。


除了业务逻辑开发方面的问题,任务发布上线时可能还会遇到以下不可逃避的麻烦:


  • 应用到生产环境调试周期长
  • 逻辑变更无法快速修改上线
  • 可能存在的数据丢失与重复问题
  • 如何最大化提升程序效率
  • 缺少应用运行状态监控

因此我们开始尝试更加简单高效的 Spark 方案,并试着解决以上问题。



07

一种简单高效的方式 - SeaTunnel


Apache SeaTunnel 是下一代高性能、分布式、海量数据集成框架。通过我们的努力让 Spark 的使用更简单,更高效,并将业界和广大用户使用 Spark 的优质经验固化到 SeaTunnel 这个产品中,明显减少学习成本,加快分布式数据处理能力在生产环境落地。


SeaTunnel 项目地址:

https://github.com/apache/incubator-seatunnel


SeaTunnel 原名 Waterdrop,2021 年 10 月 12 日起更名为 SeaTunnel。


08

SeaTunnel 的特性


  • 内置丰富插件,支持各种数据产品方便快捷的传输和集成数据,批流一体;


  • 基于模块化和插件化设计,支持热插拔,带来更好的扩展性和定制能力;


  • 特有的架构设计下,使得开发配置更简单,几乎零代码,无使用成本;


  • 经历多家企业,大规模生产环境使用和海量数据的洗礼,稳定健壮。


09

SeaTunnel 的原理和工作流程


SeaTunnel 2.X 版本尚在孵化中, 这里主要介绍 SeaTunnle 1.X 版本内容。SeaTunnel 利用了 Spark 的 Streaming, SQL, DataFrame 等技术,结合 Java 的反射机制、Service Loader 等技术实现了一套完整的可插拔的数据处理工作流,如下:


SeaTunnel pipeline


多个 Transform(Filter) 构建了数据处理的 Pipeline,满足各种各样的数据处理需求,如果您熟悉 SQL,也可以直接通过 SQL 插件构建数据处理的 Pipeline,简单高效。


以下是一个启动配置文件展示:


spark {
  # Seatunnel defined streaming batch duration in seconds
  spark.streaming.batchDuration = 5

  spark.app.name = "Waterdrop"
  spark.ui.port = 13000
}

input {
  socket {}
}

filter {
  split {
    fields = ["msg", "name"]
    delimiter = ","
    result_table_name = "tmp1"
  }
  
  sql {
    sql = "select * from tmp1"
  }
}

output {
  stdout {}
}


整个配置由4个部分组成:


  • spark 是 Spark 相关的配置,可配置的 Spark 参数见:Spark Configuration;
  • input 可配置任意的 input 插件及其参数,具体参数随不同的插件而变化。input 支持包括 File, Hive, Kafka, ES, Jdbc 等插件;


  • filter 可配置任意的 filter 插件及其参数,具体参数随不同的 filter 插件而变化。filter 中的多个插件按配置顺序形成了数据处理的pipeline, 默认上一个 filter 的输出是下一个filter 的输入;
  • output 可配置任意的 output 插件及其参数,具体参数随不同的插件而变化;


  • input 和 filter 以及 output 的随意组合,构建了多种多样的数据同步场景。


10

如何使用 SeaTunnel


Step 1 : 使用 SeaTunnel 前请先准备好 Spark 和 Java 运行环境。


Step 2 : 下载 SetTunnel 安装包并解压:

https://github.com/apache/incubator-seatunnel/releases


# 以 SeaTunnel 1.5.7 为例:
wget https://github.com/apache/incubator-seatunnel/releases/download/v1.5.7/seatunnel-1.5.7.zip
unzip seatunnel-1.5.7.zip
ln -s seatunnel-1.5.7 seatunnel
cd seatunnel


Step 3 : 配置 SeaTunnel(从 kafka 消费数据,做字符串分割,输出到终端), 编辑 config/application.conf。


spark {
  # Waterdrop defined streaming batch duration in seconds
  spark.streaming.batchDuration = 5

  spark.app.name = "Waterdrop"
  spark.ui.port = 13000
  spark.executor.instances = 2
  spark.executor.cores = 1
  spark.executor.memory = "1g"
}

input {
  kafka {
    topics = "mytopic"
    consumer.bootstrap.servers = "localhost:9092"
    consumer.zookeeper.connect = "localhost:2181"
    consumer.group.id = "waterdrop_group"
  }
}

filter {
  split {
    fields = ["msg", "name"]
    delimiter = ","
  }
}

output {
  stdout {}
}


Step 4 : 启动 SeaTunnel


./bin/start-seatunnel.sh --master yarn --deploy-mode client --config ./config/application.conf


通过这样一个配置文件启动的方式即可快速实现文章开始部分介绍的手写 Spark Streaming 读取 Kafka 进行处理后写入 ElasticSearch 的逻辑。


更详细的使用方法见 Seatunnel Quick Start:

https://interestinglab.github.io/seatunnel-docs/#/zh-cn/v1/quick-start


11

SeaTunnel RoadMap


SeaTunnel 后续规划主要按以下两个方面详细展开:


  1. 提供更多 Conncetor 和数据处理插件,提高易用性、可靠性、数据一致性,也欢迎各位开发者来贡献 Idea。
  2. 提升核心能力,包括但不限于兼容 Flink 最新版本、支持多插件版本、提供运行 Metrics 以及执行报告等。


详情参考 SeaTunnel RoadMap

https://github.com/orgs/apache/projects/28/views/1

扫描二维码推送至手机访问。

版权声明:本文由AR编程网发布,如需转载请注明出处。

本文链接:http://www.arlanguage.com/post/230.html

分享给朋友:

“SeaTunnel 实践 | SeaTunnel 帮你快速玩转 Spark 数据处理” 的相关文章

NGINX 路由配置与参数详解(https配置、跨域配置、socket配置)

一、概述Nginx 是一个高性能的开源Web服务器,也可以用作反向代理服务器、负载均衡器和HTTP缓存。它的设计目标是提供高并发、低内存消耗和高度可伸缩性,使其成为处理大量并发连接的理想选择。NGINX 基础部分可以参考我这篇文章:NGINX - 高级负载均衡器、Web服务器、反向代理二、https...

分享一段PHP代码的加密扩展 分享一段php代码的加密扩展怎么弄

介绍一个简洁、高性能、跨平台的 PHP7 代码加密扩展特点简单快速,经实测,几乎不影响性能兼容 OPcache、Xdebug 等其他扩展支持 Linux、macOS、Windows 等系统兼容 Apache、Nginx + PHP-fpm、命令行等运行模式加密算法较简单,这是出于速度考虑,但仍不易解...

Nginx教程

NginxNginx1. 基本概念2. centos7部署nginx1. 部署前准备2. 安装nginx3. 配置文件1. nginx目录结构2. 默认的nginx.conf1. nginx.conf内容结构:2. nginx.conf内容格式说明:3. location 语法详解1. 语法规则:2...

nginx配置集群 -websocket

nginx配置集群 -websocket前几天做一个nginx的反向代理,来代理websocket。因为上线时间的问题,所以是单节点运行。现在准备做集群优化,然后上容器环境。这样就需要配置nginx的负载均衡。不废话了,下面是配置文件。当然配置后需要验证,验证的时候会出现很多奇怪的问题。这里就不进行...

如何在本地部署WEB开发(PHP)环境

目前很多网站程序是基于PHP语言,比如比较有名的开源程序WordPress、Discuz、DedeCMS...对于大多初学者来说,本地部署WEB环境(PHP/ASP+Apache/Nginx+Mysql),一个一个安装调试是一件很麻烦的事。所以这次分享一下如何在本地快速搭建WEB环境!考虑到大家大多...

平稳运行半年的系统宕机了,记录一次排错调优的全过程

(一)前言最近发生了一件很让人头疼的事情,已经上线半年且平稳运行半年系统在年后早高峰的使用时发生了濒临宕机的情况。访问速度特别慢,后台查到大量time_wait的连接,从代码层面到架构层面到网络层面排查了几天几夜,总算是有了结果。(二)架构、问题描述先简单描述一下这个系统的架构,公网域名对应的公网I...