scala - Insert rows to dataset depending on time difference between rows -


how go inserting rows existing dataset. shown table dumbed down verion of data have available dataset.

i want insert new row whenever time difference between 2 consecutive rows jumps on next full hour. columns except new time should same row above.

either datetime or date & time sufficient. put them in example show have access of them.

my general idea calculate amount of hour skips between 2 rows, create new dataset, join original one, , sort it.

+--------+-------------------+----------+--------+ |  status|           datetime|      date|    time| +--------+-------------------+----------+--------+ |   start|2017-01-01 07:15:12|2017-01-01|07:15:12| |    init|2017-01-01 07:22:12|2017-01-01|07:22:12| |a_status|2017-01-01 07:31:12|2017-01-01|07:31:12| |b_status|2017-01-01 10:30:12|2017-01-01|10:30:12| |c_status|2017-01-01 11:15:12|2017-01-01|11:15:12| +--------+-------------------+----------+--------+  +--------+-------------------+----------+--------+ |  status|           datetime|      date|    time| +--------+-------------------+----------+--------+ |   start|2017-01-01 07:15:12|2017-01-01|07:15:12| |    init|2017-01-01 07:22:12|2017-01-01|07:22:12| |a_status|2017-01-01 07:31:12|2017-01-01|07:31:12| |a_status|2017-01-01 08:00:00|2017-01-01|08:00:00| |a_status|2017-01-01 09:00:00|2017-01-01|09:00:00| |a_status|2017-01-01 10:00:00|2017-01-01|10:00:00| |b_status|2017-01-01 10:30:12|2017-01-01|10:30:12| |b_status|2017-01-01 11:00:00|2017-01-01|11:00:00| |c_status|2017-01-01 11:15:12|2017-01-01|11:15:12| +--------+-------------------+----------+--------+ 

my first thought calculating timedifference in hours , if >= 1 generate amount of rows in new dataset , join original. problem doesn't detect skip b_status c_status since 3/4 hours.

val df9 = df3.withcolumn("time_diff", ((unix_timestamp(lead($"datetime", 1).over(window.orderby("datetime"))) - unix_timestamp($"datetime"))/60/60)).show 

my next thought extracting hour part of time field , subtracting those. result int correct amount of lines though jump hh24 hh00 need handled extra.

i read explode function since generate new rows same data though have no idea yet if function applicable in case.

does have hints or implementations me out? maybe there easier way achieve this. have nice weekend.

you can achieve desired result using explode function need complex combination of hour, lead, window, udf, unix_timestamp, select, simpledateformat , many more functions explained below.

given dataframe

+--------+-------------------+----------+--------+ |status  |datetime           |date      |time    | +--------+-------------------+----------+--------+ |start   |2017-01-01 07:15:12|2017-01-01|07:15:12| |init    |2017-01-01 07:22:12|2017-01-01|07:22:12| |a_status|2017-01-01 07:31:12|2017-01-01|07:31:12| |b_status|2017-01-01 10:30:12|2017-01-01|10:30:12| |c_status|2017-01-01 11:15:12|2017-01-01|11:15:12| +--------+-------------------+----------+--------+ 

only status , datetime columns important date , time columna can derived datetime column changed. have select 2 , hour difference previous row as

val df2 = df.select($"status", unix_timestamp($"datetime").cast(timestamptype).as("datetime"), (hour(lead($"datetime", 1).over(window.orderby("datetime"))) - hour($"datetime")).as("hour")) 

which should give

+--------+---------------------+----+ |status  |datetime             |hour| +--------+---------------------+----+ |start   |2017-01-01 07:15:12.0|0   | |init    |2017-01-01 07:22:12.0|0   | |a_status|2017-01-01 07:31:12.0|3   | |b_status|2017-01-01 10:30:12.0|1   | |c_status|2017-01-01 11:15:12.0|null| +--------+---------------------+----+ 

now have hour difference, can array of datetime datetime value hour difference datetime can done defining udf function

def getdiffdatearray = udf((date : string, hour: int, value : int) => {   if((value - 1) > 0) {     var array = array.empty[string]     for(time <- 0 value){       val format = new simpledateformat("yyyy-mm-dd hh:mm:ss")       val originaldate = format.parse(date)       val calendar = calendar.getinstance       calendar.settimeinmillis(originaldate.gettime)       calendar.set(calendar.hour_of_day, hour+time)       if(time != 0){         calendar.set(calendar.minute, 0)         calendar.set(calendar.second, 0)       }       array = array ++ array(format.format(calendar.gettime))     }     array}   else array(date) }) 

you can use explode function on array of datetime , date , time columns doing following

def gettimefromedatetime = udf((date: string) =>{   val parseformat = new simpledateformat("yyyy-mm-dd hh:mm:ss");   val timeformat = new simpledateformat("hh:mm:ss")   val time = parseformat.parse(date)   timeformat.format(time) })  df2.withcolumn("datetime", explode(getdiffdatearray($"datetime", hour($"datetime"), when($"hour".isnotnull, $"hour").otherwise(lit(0)))))     .drop("hour")     .withcolumn("date", $"datetime".cast(datetype))     .withcolumn("time", gettimefromedatetime($"datetime"))     .show(false) 

this should give desired result as

+--------+-------------------+----------+--------+ |status  |datetime           |date      |time    | +--------+-------------------+----------+--------+ |start   |2017-01-01 07:15:12|2017-01-01|07:15:12| |init    |2017-01-01 07:22:12|2017-01-01|07:22:12| |a_status|2017-01-01 07:31:12|2017-01-01|07:31:12| |a_status|2017-01-01 08:00:00|2017-01-01|08:00:00| |a_status|2017-01-01 09:00:00|2017-01-01|09:00:00| |a_status|2017-01-01 10:00:00|2017-01-01|10:00:00| |b_status|2017-01-01 10:30:12|2017-01-01|10:30:12| |c_status|2017-01-01 11:15:12|2017-01-01|11:15:12| +--------+-------------------+----------+--------+ 

i hope answer helpful


Comments

Popular posts from this blog

Is there a better way to structure post methods in Class Based Views -

performance - Why is XCHG reg, reg a 3 micro-op instruction on modern Intel architectures? -

jquery - Responsive Navbar with Sub Navbar -