GraphX pregel and spark streaming: the RDDs pushed into the rddQueue within the vprog are not processed -
i using graphx pregel , spark streaming. want vertex program (the vprog) creates rdd , pushes rddqueue processed.
val queueofrdds:queue[rdd[int]] = queue.empty[rdd[int]] @transient val streamingcontext:streamingcontext = new streamingcontext(sc, seconds(1)) val inputdstream = streamingcontext.queuestream(queueofrdds,true,null) inputdstream.map(x => (x % 10, 1)).reducebykey(_ + _).print() streamingcontext.start() val initialmessage = "init" def vertexprogram(id: vertexid, attr: string, msgs: string): string = { queueofrdds.synchronized { for(a <- 1 3) { queueofrdds.+=sc.makerdd(1 1000, 10) println("will add " + queueofrdds.size) } } msgs } def sendmessage(...){...} def messagecombiner(...){...} val newgraph = pregel.apply(graph,initialmessage,1,edgedirection.out)(vertexprogram,sendmessage,messagecombiner)
the expected result is:
will add1 add2 add3 add4 add5 add6 add7 ... ------------------------------------------- time: 1503048820000 ms ------------------------------------------- (0,100) (6,100) (3,100) (9,100) (4,100) (1,100) (7,100) (8,100) (5,100) (2,100) ------------------------------------------- time: 1503048820000 ms ------------------------------------------- (0,100) (6,100) (3,100) (9,100) (4,100) (1,100) (7,100) (8,100) (5,100) (2,100) ... ------------------------------------------- time: 1503048820000 ms ------------------------------------------- (0,100) (6,100) (3,100) (9,100) (4,100) (1,100) (7,100) (8,100) (5,100) (2,100)
but got result:
add1 add2 add3 add4 add5 add6 add7 ...
the rdds pushed queueofrdds (its size increased) not processed. can me please
tl;dr: cannot work.
this code doesn't right. looks trying create initialize rdd within task (vertexprogram
), making sparkcontext
lazy or using object wrapper.
your program appends local copy of queue
not visible actual driver program. if rdds
correspond different contexts.
Comments
Post a Comment