搜索
您的当前位置:首页正文

StructedStreaming消费Kafka数据突然存储不到HDFS

来源:步旅网

问题描述:

StructedStreaming消费Kafka数据存储到HDFS中,以前正常存储,突然就存储不进去了,可以新建文件夹,但是数据写入不进去了。
分析:通过流写出到控制台,能消费数据,说明消费正常,但是就是写不进HDFS中,说明写时不能触发HDFS保存。

/**
* 消费数据,写到控制台
*/
val query = spark.sql(sql)
    .writeStream
    .format("console")
    .outputMode("append")
    .start()

解决方案

网上也没找到相关解决方案,所以自己只能考虑,换种写数据的方式,通过foreachBatch这种方式,可以对每一条流进行触发。
foreachBatch允许在每个微批次的输出上进行任意操作和自定义逻辑。
foreachBatch(…)允许您指定对流式查询的每个微批次的输出数据执行的函数。从 Spark 2.4 开始,Scala、Java 和 Python 都支持这一点。它有两个参数:一个 DataFrame 或 Dataset,其中包含微批次的输出数据和微批次的唯一 ID。

// 原来写数据方式
val query = spark.sql(sql)
    .coalesce(1)
    .writeStream
    .format("parquet")
    .outputMode("append")
    .option("truncate", "false")
    .option("path", hdfs_save_path)
    .option("checkpointLocation", checkPointDir)
    .partitionBy("part_date")
    .trigger(Trigger.ProcessingTime("1 minutes"))
    .queryName("game_event")
    .start()
// 修改为foreachBatch的方式
 val query = spark.sql(sql)
            .coalesce(1)
            .writeStream.foreachBatch((batchDF: DataFrame, batchId: Long) =>
            batchDF.
              write.format("parquet")
              .mode("append")
              .partitionBy("part_date")
              .save(hdfs_save_path))
            .outputMode("append")
            .option("checkpointLocation", checkPointDir)
            .trigger(Trigger.ProcessingTime("1 minutes"))
            .queryName("game_event")
            .start()

修改后,意外的发现可以正常些数据了。

注意事项:

修改写入方式后,有可能还会报一个

Offsets out of range with no configured reset policy for partitions

因篇幅问题不能全部显示,请点此查看更多更全内容

Top