scala - Insert rows to dataset depending on time difference between rows -
how go inserting rows existing dataset. shown table dumbed down verion of data have available dataset.
i want insert new row whenever time difference between 2 consecutive rows jumps on next full hour. columns except new time should same row above.
either datetime or date & time sufficient. put them in example show have access of them.
my general idea calculate amount of hour skips between 2 rows, create new dataset, join original one, , sort it.
+--------+-------------------+----------+--------+ | status| datetime| date| time| +--------+-------------------+----------+--------+ | start|2017-01-01 07:15:12|2017-01-01|07:15:12| | init|2017-01-01 07:22:12|2017-01-01|07:22:12| |a_status|2017-01-01 07:31:12|2017-01-01|07:31:12| |b_status|2017-01-01 10:30:12|2017-01-01|10:30:12| |c_status|2017-01-01 11:15:12|2017-01-01|11:15:12| +--------+-------------------+----------+--------+ +--------+-------------------+----------+--------+ | status| datetime| date| time| +--------+-------------------+----------+--------+ | start|2017-01-01 07:15:12|2017-01-01|07:15:12| | init|2017-01-01 07:22:12|2017-01-01|07:22:12| |a_status|2017-01-01 07:31:12|2017-01-01|07:31:12| |a_status|2017-01-01 08:00:00|2017-01-01|08:00:00| |a_status|2017-01-01 09:00:00|2017-01-01|09:00:00| |a_status|2017-01-01 10:00:00|2017-01-01|10:00:00| |b_status|2017-01-01 10:30:12|2017-01-01|10:30:12| |b_status|2017-01-01 11:00:00|2017-01-01|11:00:00| |c_status|2017-01-01 11:15:12|2017-01-01|11:15:12| +--------+-------------------+----------+--------+
my first thought calculating timedifference in hours , if >= 1 generate amount of rows in new dataset , join original. problem doesn't detect skip b_status c_status since 3/4 hours.
val df9 = df3.withcolumn("time_diff", ((unix_timestamp(lead($"datetime", 1).over(window.orderby("datetime"))) - unix_timestamp($"datetime"))/60/60)).show
my next thought extracting hour part of time field , subtracting those. result int correct amount of lines though jump hh24 hh00 need handled extra.
i read explode function since generate new rows same data though have no idea yet if function applicable in case.
does have hints or implementations me out? maybe there easier way achieve this. have nice weekend.
you can achieve desired result using explode
function need complex combination of hour, lead, window, udf, unix_timestamp, select, simpledateformat
, many more functions explained below.
given dataframe
+--------+-------------------+----------+--------+ |status |datetime |date |time | +--------+-------------------+----------+--------+ |start |2017-01-01 07:15:12|2017-01-01|07:15:12| |init |2017-01-01 07:22:12|2017-01-01|07:22:12| |a_status|2017-01-01 07:31:12|2017-01-01|07:31:12| |b_status|2017-01-01 10:30:12|2017-01-01|10:30:12| |c_status|2017-01-01 11:15:12|2017-01-01|11:15:12| +--------+-------------------+----------+--------+
only status
, datetime
columns important date
, time
columna can derived datetime
column changed. have select 2 , hour difference previous row as
val df2 = df.select($"status", unix_timestamp($"datetime").cast(timestamptype).as("datetime"), (hour(lead($"datetime", 1).over(window.orderby("datetime"))) - hour($"datetime")).as("hour"))
which should give
+--------+---------------------+----+ |status |datetime |hour| +--------+---------------------+----+ |start |2017-01-01 07:15:12.0|0 | |init |2017-01-01 07:22:12.0|0 | |a_status|2017-01-01 07:31:12.0|3 | |b_status|2017-01-01 10:30:12.0|1 | |c_status|2017-01-01 11:15:12.0|null| +--------+---------------------+----+
now have hour difference, can array of datetime datetime
value hour difference datetime can done defining udf
function
def getdiffdatearray = udf((date : string, hour: int, value : int) => { if((value - 1) > 0) { var array = array.empty[string] for(time <- 0 value){ val format = new simpledateformat("yyyy-mm-dd hh:mm:ss") val originaldate = format.parse(date) val calendar = calendar.getinstance calendar.settimeinmillis(originaldate.gettime) calendar.set(calendar.hour_of_day, hour+time) if(time != 0){ calendar.set(calendar.minute, 0) calendar.set(calendar.second, 0) } array = array ++ array(format.format(calendar.gettime)) } array} else array(date) })
you can use explode
function on array of datetime , date
, time
columns doing following
def gettimefromedatetime = udf((date: string) =>{ val parseformat = new simpledateformat("yyyy-mm-dd hh:mm:ss"); val timeformat = new simpledateformat("hh:mm:ss") val time = parseformat.parse(date) timeformat.format(time) }) df2.withcolumn("datetime", explode(getdiffdatearray($"datetime", hour($"datetime"), when($"hour".isnotnull, $"hour").otherwise(lit(0))))) .drop("hour") .withcolumn("date", $"datetime".cast(datetype)) .withcolumn("time", gettimefromedatetime($"datetime")) .show(false)
this should give desired result as
+--------+-------------------+----------+--------+ |status |datetime |date |time | +--------+-------------------+----------+--------+ |start |2017-01-01 07:15:12|2017-01-01|07:15:12| |init |2017-01-01 07:22:12|2017-01-01|07:22:12| |a_status|2017-01-01 07:31:12|2017-01-01|07:31:12| |a_status|2017-01-01 08:00:00|2017-01-01|08:00:00| |a_status|2017-01-01 09:00:00|2017-01-01|09:00:00| |a_status|2017-01-01 10:00:00|2017-01-01|10:00:00| |b_status|2017-01-01 10:30:12|2017-01-01|10:30:12| |c_status|2017-01-01 11:15:12|2017-01-01|11:15:12| +--------+-------------------+----------+--------+
i hope answer helpful
Comments
Post a Comment