scala - Spark: create a sessionId based on timestamp -
i following transformation. given data frame records whether user logged. aim create sessionid each record based on timestamp , pre-defined value timeout = 20.
a session period defined : [first record --> first record + timeout]
for instance, original dataframe following:
scala> val df = sc.parallelize(list( ("user1",0), ("user1",3), ("user1",15), ("user1",22), ("user1",28), ("user1",41), ("user1",45), ("user1",85), ("user1",90) )).todf("user_id","timestamp")
df: org.apache.spark.sql.dataframe = [user_id: string, timestamp: int]
+-------+---------+ |user_id|timestamp| +-------+---------+ |user1 |0 | |user1 |3 | |user1 |15 | |user1 |22 | |user1 |28 | |user1 |41 | |user1 |45 | |user1 |85 | |user1 |90 | +-------+---------+
the goal is:
+-------+---------+----------+ |user_id|timestamp|session_id| +-------+---------+----------+ |user1 |0 | 0 |-> first record (session 0: period [0->20]) |user1 |3 | 0 | |user1 |15 | 0 | |user1 |22 | 1 |-> 22 not in [0->20]->new session(period 22->42) |user1 |28 | 1 | |user1 |41 | 1 | |user1 |45 | 2 |-> 45 not in [22->42]->newsession(period 45->65) |user1 |85 | 3 | |user1 |90 | 3 | +-------+---------+----------+
are there elegant solution solve problem, preferably in scala.
thanks in advance!
this may not elegant solution worked given data format.
sc.parallelize(list( ("user1", 0), ("user1", 3), ("user1", 15), ("user1", 22), ("user1", 28), ("user1", 41), ("user1", 45), ("user1", 85), ("user1", 90))).todf("user_id", "timestamp").map { x => val userid = x.getas[string]("user_id") val timestamp = x.getas[int]("timestamp") val session = timestamp / 20 (userid, timestamp, session) }.todf("user_id", "timestamp", "session").show()
result
you can change timestamp / 20
according need.
Comments
Post a Comment