apache spark - Why does Complete output mode require aggregation? -
i work latest structured streaming in apache spark 2.2 , got following exception:
org.apache.spark.sql.analysisexception: complete output mode not supported when there no streaming aggregations on streaming dataframes/datasets;;
why complete output mode require streaming aggregation? happen if spark allowed complete output mode no aggregations in streaming query?
scala> spark.version res0: string = 2.2.0  import org.apache.spark.sql.execution.streaming.memorystream import org.apache.spark.sql.sqlcontext implicit val sqlcontext: sqlcontext = spark.sqlcontext val source = memorystream[(int, int)] val ids = source.tods.todf("time", "id").   withcolumn("time", $"time" cast "timestamp"). // <-- convert time column int timestamp   dropduplicates("id").   withcolumn("time", $"time" cast "long")  // <-- convert time column timestamp int  import org.apache.spark.sql.streaming.{outputmode, trigger} import scala.concurrent.duration._ scala> val q = ids.      |   writestream.      |   format("memory").      |   queryname("dups").      |   outputmode(outputmode.complete).  // <-- memory sink supports checkpointing complete output mode      |   trigger(trigger.processingtime(30.seconds)).      |   option("checkpointlocation", "checkpoint-dir"). // <-- use checkpointing save state between restarts      |   start org.apache.spark.sql.analysisexception: complete output mode not supported when there no streaming aggregations on streaming dataframes/datasets;; project [cast(time#10 bigint) time#15l, id#6] +- deduplicate [id#6], true    +- project [cast(time#5 timestamp) time#10, id#6]       +- project [_1#2 time#5, _2#3 id#6]          +- streamingexecutionrelation memorystream[_1#2,_2#3], [_1#2, _2#3]    @ org.apache.spark.sql.catalyst.analysis.unsupportedoperationchecker$.org$apache$spark$sql$catalyst$analysis$unsupportedoperationchecker$$throwerror(unsupportedoperationchecker.scala:297)   @ org.apache.spark.sql.catalyst.analysis.unsupportedoperationchecker$.checkforstreaming(unsupportedoperationchecker.scala:115)   @ org.apache.spark.sql.streaming.streamingquerymanager.createquery(streamingquerymanager.scala:232)   @ org.apache.spark.sql.streaming.streamingquerymanager.startquery(streamingquerymanager.scala:278)   @ org.apache.spark.sql.streaming.datastreamwriter.start(datastreamwriter.scala:247)   ... 57 elided 
from structured streaming programming guide - other queries (excluding aggregations, mapgroupswithstate , flatmapgroupswithstate):
complete mode not supported infeasible keep unaggregated data in result table.
to answer question:
what happen if spark allowed complete output mode no aggregations in streaming query?
probably oom.
the puzzling part why dropduplicates("id") not marked aggregation.
Comments
Post a Comment