scala - How can I convert an arbitrary number of columns in a Spark dataframe from Timestamps to Longs? -
i'm writing in scala , using spark 1.6, , don't have option switch newer version. i'm attempting merge 2 dataframes, 1 pulled in avro files on hadoop cluster , 1 pulled in teradata db. can read them both in fine, , both guaranteed have same column names in same order, when try merge them using
data1.unionall(data2)
i hit error because avro converts timestamps longs, datatypes of 2 don't match fields. process repeated several times , know there @ least 1 timestamp field in tables, there can possibly more , not know names, i'm attempting make general method convert arbitrary number of columns timestamp longs. have far:
def transformtimestamps(df: dataframe): dataframe = { val convert_timestamp_udf = udf((time:timestamp) => time.gettime()) df.dtypes.foreach { f => val fname = f._1 val ftype = f._2 if (ftype == "timestamptype:) { println("found timestamp col: " + fname) df.withcolumn(fname, convert_timestamp_udf(df.col(fname))) df.printschema() } } return df }
with printouts can tell method correctly recognizing timestamp columns, .withcolumn transformation not working. printing schema in next line not show updated column. additionally, i've tried creating entirely new column transformed values, , not added df either. can spot why isn't working?
the following line transformation
df.withcolumn(fname, convert_timestamp_udf(df.col(fname)))
which not reflected on original dataframe
until action
performed. assignment work action can create temporary dataframe
, assign in loop
def transformtimestamps(df: dataframe): dataframe = { val convert_timestamp_udf = udf((time:timestamp) => time.gettime()) var tempdf = df df.schema.map(f => { val fname = f.name val ftype = f.datatype if (ftype.tostring == "timestamptype") { println("found timestamp col: " + fname) tempdf = tempdf.withcolumn(fname, convert_timestamp_udf(df.col(fname))) tempdf.printschema() } }) return tempdf }
i hope answer helpful
Comments
Post a Comment