scala - Insert rows to dataset depending on time difference between rows -
how go inserting rows existing dataset. shown table dumbed down verion of data have available dataset.
i want insert new row whenever time difference between 2 consecutive rows jumps on next full hour. columns except new time should same row above.
either datetime or date & time sufficient. put them in example show have access of them.
my general idea calculate amount of hour skips between 2 rows, create new dataset, join original one, , sort it.
+--------+-------------------+----------+--------+ | status| datetime| date| time| +--------+-------------------+----------+--------+ | start|2017-01-01 07:15:12|2017-01-01|07:15:12| | init|2017-01-01 07:22:12|2017-01-01|07:22:12| |a_status|2017-01-01 07:31:12|2017-01-01|07:31:12| |b_status|2017-01-01 10:30:12|2017-01-01|10:30:12| |c_status|2017-01-01 11:15:12|2017-01-01|11:15:12| +--------+-------------------+----------+--------+ +--------+-------------------+----------+--------+ | status| datetime| date| time| +--------+-------------------+----------+--------+ | start|2017-01-01 07:15:12|2017-01-01|07:15:12| | init|2017-01-01 07:22:12|2017-01-01|07:22:12| |a_status|2017-01-01 07:31:12|2017-01-01|07:31:12| |a_status|2017-01-01 08:00:00|2017-01-01|08:00:00| |a_status|2017-01-01 09:00:00|2017-01-01|09:00:00| |a_status|2017-01-01 10:00:00|2017-01-01|10:00:00| |b_status|2017-01-01 10:30:12|2017-01-01|10:30:12| |b_status|2017-01-01 11:00:00|2017-01-01|11:00:00| |c_status|2017-01-01 11:15:12|2017-01-01|11:15:12| +--------+-------------------+----------+--------+ my first thought calculating timedifference in hours , if >= 1 generate amount of rows in new dataset , join original. problem doesn't detect skip b_status c_status since 3/4 hours.
val df9 = df3.withcolumn("time_diff", ((unix_timestamp(lead($"datetime", 1).over(window.orderby("datetime"))) - unix_timestamp($"datetime"))/60/60)).show my next thought extracting hour part of time field , subtracting those. result int correct amount of lines though jump hh24 hh00 need handled extra.
i read explode function since generate new rows same data though have no idea yet if function applicable in case.
does have hints or implementations me out? maybe there easier way achieve this. have nice weekend.
you can achieve desired result using explode function need complex combination of hour, lead, window, udf, unix_timestamp, select, simpledateformat , many more functions explained below.
given dataframe
+--------+-------------------+----------+--------+ |status |datetime |date |time | +--------+-------------------+----------+--------+ |start |2017-01-01 07:15:12|2017-01-01|07:15:12| |init |2017-01-01 07:22:12|2017-01-01|07:22:12| |a_status|2017-01-01 07:31:12|2017-01-01|07:31:12| |b_status|2017-01-01 10:30:12|2017-01-01|10:30:12| |c_status|2017-01-01 11:15:12|2017-01-01|11:15:12| +--------+-------------------+----------+--------+ only status , datetime columns important date , time columna can derived datetime column changed. have select 2 , hour difference previous row as
val df2 = df.select($"status", unix_timestamp($"datetime").cast(timestamptype).as("datetime"), (hour(lead($"datetime", 1).over(window.orderby("datetime"))) - hour($"datetime")).as("hour")) which should give
+--------+---------------------+----+ |status |datetime |hour| +--------+---------------------+----+ |start |2017-01-01 07:15:12.0|0 | |init |2017-01-01 07:22:12.0|0 | |a_status|2017-01-01 07:31:12.0|3 | |b_status|2017-01-01 10:30:12.0|1 | |c_status|2017-01-01 11:15:12.0|null| +--------+---------------------+----+ now have hour difference, can array of datetime datetime value hour difference datetime can done defining udf function
def getdiffdatearray = udf((date : string, hour: int, value : int) => { if((value - 1) > 0) { var array = array.empty[string] for(time <- 0 value){ val format = new simpledateformat("yyyy-mm-dd hh:mm:ss") val originaldate = format.parse(date) val calendar = calendar.getinstance calendar.settimeinmillis(originaldate.gettime) calendar.set(calendar.hour_of_day, hour+time) if(time != 0){ calendar.set(calendar.minute, 0) calendar.set(calendar.second, 0) } array = array ++ array(format.format(calendar.gettime)) } array} else array(date) }) you can use explode function on array of datetime , date , time columns doing following
def gettimefromedatetime = udf((date: string) =>{ val parseformat = new simpledateformat("yyyy-mm-dd hh:mm:ss"); val timeformat = new simpledateformat("hh:mm:ss") val time = parseformat.parse(date) timeformat.format(time) }) df2.withcolumn("datetime", explode(getdiffdatearray($"datetime", hour($"datetime"), when($"hour".isnotnull, $"hour").otherwise(lit(0))))) .drop("hour") .withcolumn("date", $"datetime".cast(datetype)) .withcolumn("time", gettimefromedatetime($"datetime")) .show(false) this should give desired result as
+--------+-------------------+----------+--------+ |status |datetime |date |time | +--------+-------------------+----------+--------+ |start |2017-01-01 07:15:12|2017-01-01|07:15:12| |init |2017-01-01 07:22:12|2017-01-01|07:22:12| |a_status|2017-01-01 07:31:12|2017-01-01|07:31:12| |a_status|2017-01-01 08:00:00|2017-01-01|08:00:00| |a_status|2017-01-01 09:00:00|2017-01-01|09:00:00| |a_status|2017-01-01 10:00:00|2017-01-01|10:00:00| |b_status|2017-01-01 10:30:12|2017-01-01|10:30:12| |c_status|2017-01-01 11:15:12|2017-01-01|11:15:12| +--------+-------------------+----------+--------+ i hope answer helpful
Comments
Post a Comment