StructedStreaming消费Kafka数据存储到HDFS中,以前正常存储,突然就存储不进去了,可以新建文件夹,但是数据写入不进去了。
分析:通过流写出到控制台,能消费数据,说明消费正常,但是就是写不进HDFS中,说明写时不能触发HDFS保存。
/**
* 消费数据,写到控制台
*/
val query = spark.sql(sql)
.writeStream
.format("console")
.outputMode("append")
.start()
网上也没找到相关解决方案,所以自己只能考虑,换种写数据的方式,通过foreachBatch这种方式,可以对每一条流进行触发。
foreachBatch允许在每个微批次的输出上进行任意操作和自定义逻辑。
foreachBatch(…)允许您指定对流式查询的每个微批次的输出数据执行的函数。从 Spark 2.4 开始,Scala、Java 和 Python 都支持这一点。它有两个参数:一个 DataFrame 或 Dataset,其中包含微批次的输出数据和微批次的唯一 ID。
// 原来写数据方式
val query = spark.sql(sql)
.coalesce(1)
.writeStream
.format("parquet")
.outputMode("append")
.option("truncate", "false")
.option("path", hdfs_save_path)
.option("checkpointLocation", checkPointDir)
.partitionBy("part_date")
.trigger(Trigger.ProcessingTime("1 minutes"))
.queryName("game_event")
.start()
// 修改为foreachBatch的方式
val query = spark.sql(sql)
.coalesce(1)
.writeStream.foreachBatch((batchDF: DataFrame, batchId: Long) =>
batchDF.
write.format("parquet")
.mode("append")
.partitionBy("part_date")
.save(hdfs_save_path))
.outputMode("append")
.option("checkpointLocation", checkPointDir)
.trigger(Trigger.ProcessingTime("1 minutes"))
.queryName("game_event")
.start()
修改后,意外的发现可以正常些数据了。
修改写入方式后,有可能还会报一个
Offsets out of range with no configured reset policy for partitions
因篇幅问题不能全部显示,请点此查看更多更全内容