博客
关于我
Flink Sql on Zeppelin(6)——Hive Streaming
阅读量:323 次
发布时间:2019-03-01

本文共 2485 字,大约阅读时间需要 8 分钟。

概述

  • Flink 1.11 于上周二正式发布,之前我也分享过其新特性。本期内容定为探讨 Flink 与 Hive 的集成,原计划准备实时数仓数据,但由于数据获取和时间限制,转而以理论分享为主。
  • 在参与前,请确保将 Flink 连接 Hive 所需的包添加至 lib 目录。
  • 接下来,我将从 Hive Streaming Sink 和 Hive Streaming Source 两个方面展开讨论。

Hive Streaming Sink

  • 首先,了解 Hive Streaming Sink 的官方描述及其工作原理。以下是其主要参数解析:
    • partition.time-extractor.timestamp-pattern:与 DDL 中的分区字段一致,用于抽取时间戳。
    • sink.partition-commit.trigger:分区触发器类型,需 Source 表中定义 Watermark,当 Watermark 超过提取分区时间加上 sink.partition-commit.delay 时,触发分区提交。
    • sink.partition-commit.delay:延迟时间,用于控制分区提交的时机。
    • sink.partition-commit.policy.kind:定义分区提交策略,通常采用 metastore 和 success-file策略,以通知 Metastore 并成功文件合并。
  • 为了演示效果,我们创建了一个 Source 表和 Hive Sink 表,以下是代码示例:
  • %flink.ssqldrop table if exists datagen; CREATE TABLE datagen ( f_sequence INT, f_random INT, f_random_str STRING, ts AS localtimestamp, WATERMARK FOR ts AS ts) WITH ('connector' = 'datagen', 'rows-per-second'='5', 'fields.f_sequence.kind'='sequence', 'fields.f_sequence.start'='1', 'fields.f_sequence.end'='50', 'fields.f_random.min'='1', 'fields.f_random.max'='50', 'fields.f_random_str.length'='10');
    %flink.ssql//切换到 Hive 方言 stenv.getConfig().setSqlDialect(SqlDialect.HIVE);%flink.ssqldrop table if exists hive_table; CREATE TABLE hive_table ( f_sequence INT, f_random INT, f_random_str STRING) PARTITIONED BY (dt STRING, hr STRING, mi STRING) STORED AS parquet TBLPROPERTIES ('partition.time-extractor.timestamp-pattern'='$dt $hr:$mi:00', 'sink.partition-commit.trigger'='partition-time', 'sink.partition-commit.delay'='1 min', 'sink.partition-commit.policy.kind'='metastore,success-file');
    %flink.ssql insert into hive_table select f_sequence, f_random, f_random_str, DATE_FORMAT(ts, 'yyyy-MM-dd'), DATE_FORMAT(ts, 'HH'), DATE_FORMAT(ts, 'mm') from datagen;
  • 完成后,通过 Batch 模式查询 hive_table 验证数据是否已成功写入。

Hive Streaming Source

  • Hive Streaming Source 同样支持两种表类型:分区表和非分区表。分区表监控新分区产生,非分区表监控新文件生成,均需原子性处理文件或分区信息。
  • Source 参数解析如下:
    • stream-source.enable:开启流模式。
    • stream-source.monitor-interval:监控新文件/分区产生的间隔时间。
    • stream-source.consume-order:可选 create-time 或 partition-time,建议使用 partition-time 以确保读取最新数据。
    • stream-source.consume-start-offset:指定起始分区时间。
  • 通过以下 SQL 代码测试 Hive Streaming Source:
  • %flink.ssql(type=update)SELECT count(*) FROM hive_table /*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.consume-start-offset'='2020-07-19')*/;
  • 测试结果表明,若在 HDFS 目录中直接插入新文件,Hive Source 无法读取新数据。建议在启动 Sink 写入数据后,观察 Source 是否能正确读取最新数据。

最后

  • 本次分享仅涉及 Hive Streaming Sink 和 Source,Hive Dim Source 及其他高级功能将在后续文章中详细探讨。
  • 如需进一步了解或遇到问题,可加入 Flink on Zeppelin 钉钉群进行讨论。

转载地址:http://edmo.baihongyu.com/

你可能感兴趣的文章
Objective-C实现number of digits解字符数算法(附完整源码)
查看>>
Objective-C实现NumberOfIslands岛屿的个数算法(附完整源码)
查看>>
Objective-C实现n皇后问题算法(附完整源码)
查看>>
Objective-C实现OCR文字识别(附完整源码)
查看>>
Objective-C实现odd even sort奇偶排序算法(附完整源码)
查看>>
Objective-C实现page rank算法(附完整源码)
查看>>
Objective-C实现PageRank算法(附完整源码)
查看>>
Objective-C实现pascalTriangle帕斯卡三角形算法(附完整源码)
查看>>
Objective-C实现perfect cube完全立方数算法(附完整源码)
查看>>
Objective-C实现PNG图片格式转换BMP图片格式(附完整源码)
查看>>
Objective-C实现pollard rho大数分解算法(附完整源码)
查看>>
Objective-C实现quick select快速选择算法(附完整源码)
查看>>
Objective-C实现recursive bubble sor递归冒泡排序算法(附完整源码)
查看>>
Objective-C实现recursive insertion sort递归插入排序算法(附完整源码)
查看>>
Objective-C实现RedBlackTree红黑树算法(附完整源码)
查看>>
Objective-C实现redis分布式锁(附完整源码)
查看>>
Objective-C实现reverse letters反向字母算法(附完整源码)
查看>>
Objective-C实现ripple adder涟波加法器算法(附完整源码)
查看>>
Objective-C实现RodCutting棒材切割最大利润算法(附完整源码)
查看>>
Objective-C实现Romberg算法(附完整源码)
查看>>