apache spark - Why does Complete output mode require aggregation? -
i work latest structured streaming in apache spark 2.2 , got following exception:
org.apache.spark.sql.analysisexception: complete output mode not supported when there no streaming aggregations on streaming dataframes/datasets;;
why complete output mode require streaming aggregation? happen if spark allowed complete output mode no aggregations in streaming query?
scala> spark.version res0: string = 2.2.0 import org.apache.spark.sql.execution.streaming.memorystream import org.apache.spark.sql.sqlcontext implicit val sqlcontext: sqlcontext = spark.sqlcontext val source = memorystream[(int, int)] val ids = source.tods.todf("time", "id"). withcolumn("time", $"time" cast "timestamp"). // <-- convert time column int timestamp dropduplicates("id"). withcolumn("time", $"time" cast "long") // <-- convert time column timestamp int import org.apache.spark.sql.streaming.{outputmode, trigger} import scala.concurrent.duration._ scala> val q = ids. | writestream. | format("memory"). | queryname("dups"). | outputmode(outputmode.complete). // <-- memory sink supports checkpointing complete output mode | trigger(trigger.processingtime(30.seconds)). | option("checkpointlocation", "checkpoint-dir"). // <-- use checkpointing save state between restarts | start org.apache.spark.sql.analysisexception: complete output mode not supported when there no streaming aggregations on streaming dataframes/datasets;; project [cast(time#10 bigint) time#15l, id#6] +- deduplicate [id#6], true +- project [cast(time#5 timestamp) time#10, id#6] +- project [_1#2 time#5, _2#3 id#6] +- streamingexecutionrelation memorystream[_1#2,_2#3], [_1#2, _2#3] @ org.apache.spark.sql.catalyst.analysis.unsupportedoperationchecker$.org$apache$spark$sql$catalyst$analysis$unsupportedoperationchecker$$throwerror(unsupportedoperationchecker.scala:297) @ org.apache.spark.sql.catalyst.analysis.unsupportedoperationchecker$.checkforstreaming(unsupportedoperationchecker.scala:115) @ org.apache.spark.sql.streaming.streamingquerymanager.createquery(streamingquerymanager.scala:232) @ org.apache.spark.sql.streaming.streamingquerymanager.startquery(streamingquerymanager.scala:278) @ org.apache.spark.sql.streaming.datastreamwriter.start(datastreamwriter.scala:247) ... 57 elided
from structured streaming programming guide - other queries (excluding aggregations, mapgroupswithstate
, flatmapgroupswithstate
):
complete mode not supported infeasible keep unaggregated data in result table.
to answer question:
what happen if spark allowed complete output mode no aggregations in streaming query?
probably oom.
the puzzling part why dropduplicates("id")
not marked aggregation.
Comments
Post a Comment