scala - Spark Interactive/Adhoc Job which can take Dynamic Arguments for Spark Context -


i looking solution interactive/adhoc spark job. have arguments need pass spark job. fine, want pass these arguments selected user form dropdown menu user.

so e.g. spark-submit job looks below, following arguments "prod /opt/var/var-spark/application.conf vc fx yes yes yes".

$spark_home/bin/spark-submit \   --class main.gmr.sparkgmr \   --deploy-mode client \   --verbose \   --driver-java-options "-dlog4j.configuration=file:///opt/var/spark-2.1.0-bin-hadoop2.7/conf/log4j.properties" \   --conf "spark.executor.extrajavaoptions=-dlog4j.configuration=file:///opt/var/spark-2.1.0-bin-hadoop2.7/conf/log4j.properties" \   file:///opt/var/var-spark/var-spark-assembly-1.0.jar \   prod /opt/var/var-spark/application.conf vc fx yes yes yes 

now want make job running because caches many dataframes in memory can used later analysis. problem job dies , in-memory dataframes/views not there more.

also, want submit different arguments job next time, e.g. "prod /opt/var/var-spark/application.conf sc dx yes yes yes".

approaches tried: tried use livy api /batches endpoint submit job arguments job starts, processing , dies. /sessions endpoint ideal choice not allow me submit class name , arguments parameters in request header.

also tried use spark structured streaming following code get arguments dataframe, fails error below:

 val lines = spark.readstream       .format("socket")       .option("host", "localhost")       .option("port", 4042)       .load();      import spark.implicits._;     val words = lines.as[string].flatmap(_.split(",")).collect;        var =  words(0);     var b = words(1);     var c = words(2);     var d = words(3);     var e = words(4);      val wordcounts = words.groupby("value").count();      val query = wordcounts.writestream      .outputmode("complete")       .format("console")       .start()         query.awaittermination() 

error:

exception in thread "main" org.apache.spark.sql.analysisexception: queries streaming sources must executed writestream.start();; textsocket @ org.apache.spark.sql.catalyst.analysis.unsupportedoperationchecker$.org$apache$spark$sql$catalyst$analysis$unsupportedoperationchecker$$throwerror(unsupportedoperationchecker.scala:196)

i want arguments in a, b, c, d, e above code , can pass dynamic arguments queries run in job.

any clues , other approach appreciated.

thanks


Comments

Popular posts from this blog

Is there a better way to structure post methods in Class Based Views -

performance - Why is XCHG reg, reg a 3 micro-op instruction on modern Intel architectures? -

jquery - Responsive Navbar with Sub Navbar -