ETL学*笔记之02--Flink消费Kafka数据并做实时处理(整合ES7)

发布时间:2021-07-25 07:28:19

在上篇博客中介绍了hadoop、spark和flink处理离线数据的demo,这回试一下实时数据的处理,使用kafka作为input,ES作为output


1. 安装并启动Kafka

brew install kafka

安装路径为 /usr/local/Cellar/kafka/2.5.0,因为kafka依赖于zookeeper,因此brew install会自动安装一个zk,路径为?/usr/local/Cellar/zookeeper/3.5.8


安装结束后还会告诉你一些启动命令


#运行zk
zkServer start

#后台模式运行zk
brew services start zookeeper

#运行zk与kafka
zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties & kafka-server-start /usr/local/etc/kafka/server.properties

#后台模式运行kafka
brew services start kafka

之后我们尝试往kafka里丢一些数据,进入到kafka的bin目录下


#创建一个topic
kafka-topics --create --replication-factor 1 --partitions 1 --topic test --zookeeper localhost:2181

#创建一个producer
kafka-console-producer --broker-list localhost:9092 --topic test

#创建一个consumer
kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning



可以看到kafka工作正常


2. Flink监听Kafka数据并进行WordCount?

沿用之前的项目,加入kafka的相关依赖



org.apache.flink
flink-connector-kafka-0.9_2.11
1.10.1


org.apache.kafka
kafka-clients
0.9.0.1

下面上代码,LineSplitter沿用上一篇中的代码


import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

import java.util.Properties;

public class StreamingWordCount {

public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//env.enableCheckpointing(1000);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

Properties props = new Properties();
props.setProperty("bootstrap.servers", "127.0.0.1:9092");
props.setProperty("group.id", "test");

FlinkKafkaConsumer09 consumer08 = new FlinkKafkaConsumer09("test", new SimpleStringSchema(), props);
DataStream stream = env.addSource(consumer08);

DataStream> counts = stream.flatMap(new LineSplitter()).keyBy(0).sum(1);

counts.print();

env.execute("WordCount from Kafka data");
}
}

main方法运行起来之后,开始监听kafka相应topic的数据,kafka produce每生产一次数据,就会执行一次WordCount计算,并将结果打印到控制台。下面做些升级,每次把WordCount的结果保存保存到ES中


3. WordCount的结果写入ES

本机上已经装好了es和kibana,es版本为7.7.1,kibana版本为6.6.1,首先创建一个test索引,用来保存WordCount的记录,1列为单词,1列为单词出现的次数,用单词作为_id


加入flink-es的pom依赖,因为很容易出现版本冲突,因此单独引入es-client,并在flink-connector-elasticsearch中移除对es-client的依赖,这样可以更大限度的解决版本冲突的问题



org.apache.flink
flink-connector-elasticsearch7_2.11
1.10.1


org.elasticsearch.client
elasticsearch-rest-high-level-client


org.elasticsearch.client
elasticsearch-rest-client


org.elasticsearch
elasticsearch






org.elasticsearch.client
elasticsearch-rest-high-level-client
7.2.0



org.elasticsearch.client
elasticsearch-rest-client
7.2.0


org.elasticsearch
elasticsearch
7.2.0



com.alibaba
fastjson
1.2.71

下面附上代码,关键部分在于esSink的构建,需要至少传递两个参数(还有一些可选的配置参数),es的连接信息和SinkFunction数据处理方法,在SinkFunction中将Tuple2元组变成Map,并写入到指定索引中。程序启动之后开始监听kafka producer,没产生一条数据,就会向es中写入或更新单词出现的次数。


import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
import org.apache.http.HttpHost;
import org.elasticsearch.client.Requests;

import java.util.*;

public class StreamingWordCount {

public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//env.enableCheckpointing(1000);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

Properties props = new Properties();
props.setProperty("bootstrap.servers", "127.0.0.1:9092");
props.setProperty("group.id", "test");

FlinkKafkaConsumer09 consumer08 = new FlinkKafkaConsumer09("test", new SimpleStringSchema(), props);
DataStream stream = env.addSource(consumer08);

DataStream> counts = stream.flatMap(new LineSplitter()).keyBy(0).sum(1);

counts.print();

List httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));

ElasticsearchSink.Builder> esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts, (Tuple2 element, RuntimeContext ctx, RequestIndexer indexer) -> {
Map json = new HashMap<>();
json.put("word", element.f0);
json.put("count", element.f1);
indexer.add(Requests.indexRequest().index("test").id(element.f0).source(json));
});

esSinkBuilder.setBulkFlushMaxActions(1);

counts.addSink(esSinkBuilder.build());

env.execute("WordCount from Kafka data");
}
}

4. 提交到Flink*台上运行

本机上已经安装好了单机的flink伪集群,localhost:8081进入管理页面



这样首先我们需要将代码打成jar包,在pom中加入构建插件









org.apache.maven.plugins
maven-assembly-plugin
2.6


jar-with-dependencies



com.example.study.flink.StreamingWordCount





make-assembly
package

single






使用mvn clean install 打包,在target下生成 xxx-jar-with-dependencies.jar,并上传到flink*台上,并submit,运行效果如下



在Task Managers中可以看到控制台的输出、log、JVM内存占用情况等信息


相关文档

  • 初中军训心得体会八百字
  • vivo怎么打开小爱同学
  • 华为p30pro特殊功能
  • 有哪些好看的重生完本小说?
  • 年年有鱼的新年剪纸窗花简单步骤
  • cad2014怎么把图放大
  • &quot;但愿人长久,千里共婵娟&quot;是苏轼写给对谁的思念的
  • 【算法】扩展卢卡斯详解
  • 2020年双百活动专场报告会党员个人心得观后感多篇
  • 支付宝怎么看是否结婚
  • 《春近》与《秋日二绝》阅读答案对比赏析
  • 庆祝国庆班会主题开场词
  • 岩土工程技术专业就业前景怎么样
  • IIS8 出现HTTP 500内部服务器错误解决方案
  • 结婚怎么不让别人把新房弄脏
  • 小瓜的可口做法推荐
  • 2020年单位疫情防控工作总结范文大全3篇
  • m3u8合并mp4软件_缓存视频合并App合并分段视频、m3u8等等,达人必备!
  • 墨尔本留学的个人心得体会
  • 为什么相亲后很快就结婚相亲结婚的都怎么样了
  • 士兵突击观后感600字5篇
  • 福州哪里有地方买婚纱
  • 高考生物核心考点:植物的激素调节
  • 公司年度工作计划共5篇
  • 行政的个人述职报告范文
  • 如何有效的快速提高自己的编程能力
  • 春季健康养生食谱有哪些
  • IP地址中的网络地址和主机地址分别是什么意思?
  • 令人感到心痛的语录
  • 临床执业医师考点:儿童基础免疫
  • 猜你喜欢

  • 零售药店管理制度汇总
  • 八年级英语(上)期中学业水*检测试卷
  • DSP结构与测试方法研究
  • 2018-2019学年福建省龙岩市上杭县第一中学等六校高二下学期期中考试地理试题 Word版
  • [精品课件]201x届高三英语二轮复* 专题二 语法基础 第六讲 介词、介词短语课件
  • 以暑假生活为话题的作文600字
  • 06第五章药品管理立法 PPT资料共122页
  • 物流系统控制培训课程(PPT 42页)
  • 桐城市孔城加油站管理服务有限公司企业信用报告-天眼查
  • 往年专业技术人员继续教育-信息化建设考试题库和答案解析含具体做题方法
  • 关于回到家乡舍不得离开的诗句有哪些
  • 黑龙江2015年土地估价师《管理基础与法规》:耕地占用税模拟试题
  • 2011年全国各地中考数学真题分类汇编:第29章锐角三角函数与特殊角
  • 广东省深圳市2015年高三第一次调研考试理综化学试题(一模)
  • 九年级(初三)化学 第二节物质组成的表示教案1.doc
  • 一种低抖动快锁定的时钟数据恢复电路设计
  • 有关离别的说说心情句子
  • 2020年两会学习体会:从‘治安’到‘管理’
  • 2012读书讲演比赛方案
  • 2017年中国烘焙专用油发展现状与市场前景分析(目录)
  • 化妆品行业策划方案51
  • 小学优秀作文素材安全你我他
  • 2019高考英语复*:语法专题 专题6 正反解读动词的时态和语态
  • 青海高原一株柳教学反思
  • 关于下雪的作文400字
  • 2019精选教育人教版八年级上册第二单元 试卷集.doc
  • 2019-2020年公务员考试备考行测《其他常识》考前练*题含答案解析(第六十四篇)[海南]
  • 浅谈如何培养低年级学生的计算能力[1]
  • 知秋初中作文
  • 四年级下册英语课件-M10 U2 Sam had lots of chocolate_|外研社(三起) (10)
  • 关于项目404解决方法:前提条件:项目路径都是正确的
  • 初一叙事作文《黯蓝(三)》200字(共7页PPT)
  • 树莓派 按键程序 附代码
  • 课外文言文阅读练习题
  • 基于java的在线作业提交点评系统设计与实现
  • 健康之路颈椎保健操视频
  • 关于钓鱼岛事件议论文作文
  • 我和足球的故事作文
  • 面对一朵花作文【初中初二800字】
  • 初二地理复*学案 (1)
  • 孕妇不能吃的中药
  • 自驾游汽车租赁合同
  • 电脑版