scala - Spark Interactive/Adhoc Job which can take Dynamic Arguments for Spark Context -
i looking solution interactive/adhoc spark job. have arguments need pass spark job. fine, want pass these arguments selected user form dropdown menu user.
so e.g. spark-submit job looks below, following arguments "prod /opt/var/var-spark/application.conf vc fx yes yes yes".
$spark_home/bin/spark-submit \ --class main.gmr.sparkgmr \ --deploy-mode client \ --verbose \ --driver-java-options "-dlog4j.configuration=file:///opt/var/spark-2.1.0-bin-hadoop2.7/conf/log4j.properties" \ --conf "spark.executor.extrajavaoptions=-dlog4j.configuration=file:///opt/var/spark-2.1.0-bin-hadoop2.7/conf/log4j.properties" \ file:///opt/var/var-spark/var-spark-assembly-1.0.jar \ prod /opt/var/var-spark/application.conf vc fx yes yes yes
now want make job running because caches many dataframes in memory can used later analysis. problem job dies , in-memory dataframes/views not there more.
also, want submit different arguments job next time, e.g. "prod /opt/var/var-spark/application.conf sc dx yes yes yes".
approaches tried: tried use livy api /batches endpoint submit job arguments job starts, processing , dies. /sessions endpoint ideal choice not allow me submit class name , arguments parameters in request header.
also tried use spark structured streaming following code get arguments dataframe, fails error below:
val lines = spark.readstream .format("socket") .option("host", "localhost") .option("port", 4042) .load(); import spark.implicits._; val words = lines.as[string].flatmap(_.split(",")).collect; var = words(0); var b = words(1); var c = words(2); var d = words(3); var e = words(4); val wordcounts = words.groupby("value").count(); val query = wordcounts.writestream .outputmode("complete") .format("console") .start() query.awaittermination()
error:
exception in thread "main" org.apache.spark.sql.analysisexception: queries streaming sources must executed writestream.start();; textsocket @ org.apache.spark.sql.catalyst.analysis.unsupportedoperationchecker$.org$apache$spark$sql$catalyst$analysis$unsupportedoperationchecker$$throwerror(unsupportedoperationchecker.scala:196)
i want arguments in a, b, c, d, e above code , can pass dynamic arguments queries run in job.
any clues , other approach appreciated.
thanks
Comments
Post a Comment