本文共 2485 字,大约阅读时间需要 8 分钟。
partition.time-extractor.timestamp-pattern:与 DDL 中的分区字段一致,用于抽取时间戳。sink.partition-commit.trigger:分区触发器类型,需 Source 表中定义 Watermark,当 Watermark 超过提取分区时间加上 sink.partition-commit.delay 时,触发分区提交。sink.partition-commit.delay:延迟时间,用于控制分区提交的时机。sink.partition-commit.policy.kind:定义分区提交策略,通常采用 metastore 和 success-file策略,以通知 Metastore 并成功文件合并。%flink.ssqldrop table if exists datagen; CREATE TABLE datagen ( f_sequence INT, f_random INT, f_random_str STRING, ts AS localtimestamp, WATERMARK FOR ts AS ts) WITH ('connector' = 'datagen', 'rows-per-second'='5', 'fields.f_sequence.kind'='sequence', 'fields.f_sequence.start'='1', 'fields.f_sequence.end'='50', 'fields.f_random.min'='1', 'fields.f_random.max'='50', 'fields.f_random_str.length'='10'); %flink.ssql//切换到 Hive 方言 stenv.getConfig().setSqlDialect(SqlDialect.HIVE);%flink.ssqldrop table if exists hive_table; CREATE TABLE hive_table ( f_sequence INT, f_random INT, f_random_str STRING) PARTITIONED BY (dt STRING, hr STRING, mi STRING) STORED AS parquet TBLPROPERTIES ('partition.time-extractor.timestamp-pattern'='$dt $hr:$mi:00', 'sink.partition-commit.trigger'='partition-time', 'sink.partition-commit.delay'='1 min', 'sink.partition-commit.policy.kind'='metastore,success-file'); %flink.ssql insert into hive_table select f_sequence, f_random, f_random_str, DATE_FORMAT(ts, 'yyyy-MM-dd'), DATE_FORMAT(ts, 'HH'), DATE_FORMAT(ts, 'mm') from datagen;
stream-source.enable:开启流模式。stream-source.monitor-interval:监控新文件/分区产生的间隔时间。stream-source.consume-order:可选 create-time 或 partition-time,建议使用 partition-time 以确保读取最新数据。stream-source.consume-start-offset:指定起始分区时间。%flink.ssql(type=update)SELECT count(*) FROM hive_table /*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.consume-start-offset'='2020-07-19')*/; 转载地址:http://edmo.baihongyu.com/