GraphX pregel and spark streaming: the RDDs pushed into the rddQueue within the vprog are not processed -


i using graphx pregel , spark streaming. want vertex program (the vprog) creates rdd , pushes rddqueue processed.

val queueofrdds:queue[rdd[int]] = queue.empty[rdd[int]]         @transient val streamingcontext:streamingcontext = new streamingcontext(sc, seconds(1))     val inputdstream = streamingcontext.queuestream(queueofrdds,true,null) inputdstream.map(x => (x % 10, 1)).reducebykey(_ + _).print() streamingcontext.start()  val initialmessage = "init"  def vertexprogram(id: vertexid, attr: string, msgs: string): string =   {     queueofrdds.synchronized {       for(a <- 1 3) {         queueofrdds.+=sc.makerdd(1 1000, 10)         println("will add " + queueofrdds.size)       }     }     msgs   }    def sendmessage(...){...}   def messagecombiner(...){...}   val newgraph = pregel.apply(graph,initialmessage,1,edgedirection.out)(vertexprogram,sendmessage,messagecombiner) 

the expected result is:

will add1     add2     add3     add4     add5     add6     add7     ...     -------------------------------------------     time: 1503048820000 ms     -------------------------------------------     (0,100)     (6,100)     (3,100)     (9,100)     (4,100)     (1,100)     (7,100)     (8,100)     (5,100)     (2,100)      -------------------------------------------     time: 1503048820000 ms     -------------------------------------------     (0,100)     (6,100)     (3,100)     (9,100)     (4,100)     (1,100)     (7,100)     (8,100)     (5,100)     (2,100)      ...      -------------------------------------------     time: 1503048820000 ms     -------------------------------------------     (0,100)     (6,100)     (3,100)     (9,100)     (4,100)     (1,100)     (7,100)     (8,100)     (5,100)     (2,100) 

but got result:

    add1     add2     add3     add4     add5     add6     add7     ... 

the rdds pushed queueofrdds (its size increased) not processed. can me please

tl;dr: cannot work.

this code doesn't right. looks trying create initialize rdd within task (vertexprogram), making sparkcontext lazy or using object wrapper.

your program appends local copy of queue not visible actual driver program. if rdds correspond different contexts.


Comments

Popular posts from this blog

Is there a better way to structure post methods in Class Based Views -

performance - Why is XCHG reg, reg a 3 micro-op instruction on modern Intel architectures? -

jquery - Responsive Navbar with Sub Navbar -