apache spark - Why does Complete output mode require aggregation? -


i work latest structured streaming in apache spark 2.2 , got following exception:

org.apache.spark.sql.analysisexception: complete output mode not supported when there no streaming aggregations on streaming dataframes/datasets;;

why complete output mode require streaming aggregation? happen if spark allowed complete output mode no aggregations in streaming query?

scala> spark.version res0: string = 2.2.0  import org.apache.spark.sql.execution.streaming.memorystream import org.apache.spark.sql.sqlcontext implicit val sqlcontext: sqlcontext = spark.sqlcontext val source = memorystream[(int, int)] val ids = source.tods.todf("time", "id").   withcolumn("time", $"time" cast "timestamp"). // <-- convert time column int timestamp   dropduplicates("id").   withcolumn("time", $"time" cast "long")  // <-- convert time column timestamp int  import org.apache.spark.sql.streaming.{outputmode, trigger} import scala.concurrent.duration._ scala> val q = ids.      |   writestream.      |   format("memory").      |   queryname("dups").      |   outputmode(outputmode.complete).  // <-- memory sink supports checkpointing complete output mode      |   trigger(trigger.processingtime(30.seconds)).      |   option("checkpointlocation", "checkpoint-dir"). // <-- use checkpointing save state between restarts      |   start org.apache.spark.sql.analysisexception: complete output mode not supported when there no streaming aggregations on streaming dataframes/datasets;; project [cast(time#10 bigint) time#15l, id#6] +- deduplicate [id#6], true    +- project [cast(time#5 timestamp) time#10, id#6]       +- project [_1#2 time#5, _2#3 id#6]          +- streamingexecutionrelation memorystream[_1#2,_2#3], [_1#2, _2#3]    @ org.apache.spark.sql.catalyst.analysis.unsupportedoperationchecker$.org$apache$spark$sql$catalyst$analysis$unsupportedoperationchecker$$throwerror(unsupportedoperationchecker.scala:297)   @ org.apache.spark.sql.catalyst.analysis.unsupportedoperationchecker$.checkforstreaming(unsupportedoperationchecker.scala:115)   @ org.apache.spark.sql.streaming.streamingquerymanager.createquery(streamingquerymanager.scala:232)   @ org.apache.spark.sql.streaming.streamingquerymanager.startquery(streamingquerymanager.scala:278)   @ org.apache.spark.sql.streaming.datastreamwriter.start(datastreamwriter.scala:247)   ... 57 elided 

from structured streaming programming guide - other queries (excluding aggregations, mapgroupswithstate , flatmapgroupswithstate):

complete mode not supported infeasible keep unaggregated data in result table.

to answer question:

what happen if spark allowed complete output mode no aggregations in streaming query?

probably oom.

the puzzling part why dropduplicates("id") not marked aggregation.


Comments

Popular posts from this blog

Is there a better way to structure post methods in Class Based Views -

performance - Why is XCHG reg, reg a 3 micro-op instruction on modern Intel architectures? -

jquery - Responsive Navbar with Sub Navbar -