java - How to import a CSV file into a BigQuery table without any column names or schema? -
i'm writing java utility import few csv files gcs bigquery. can achieve bq load
, wanted using dataflow job. i'm using dataflow's pipeline , pardo transformer (returns tablerow apply on bigqueryio) , have created stringtorowconverter() transformation. here actual problem starts - forced specify schema destination table although don't want create new table if doesn't exist - trying load data. not want manually set column name tablerow have 600 columns.
public class stringtorowconverter extends dofn<string, tablerow> { private static logger logger = loggerfactory.getlogger(stringtorowconverter.class); public void processelement(processcontext c) { tablerow row = new tablerow(); row.set("do not know column name", c.element()); c.output(row); } }
moreover, assumed table exists in bigquery dataset , don't need create it, , csv file contains columns in correct order.
if there's no workaround scenario , column name needed data load, can have in first row of csv file.
any appreciated.
to avoid creation of table, should use bigqueryio.write.createdisposition.create_never of bigqueryio.write during pipeline configuration. source: https://cloud.google.com/dataflow/java-sdk/javadoc/com/google/cloud/dataflow/sdk/io/bigqueryio.write
you don't need know bigquery table schema upfront, can discover dynamically. instance, can use bigquery api (https://cloud.google.com/bigquery/docs/reference/rest/v2/tables/get) query table schema , pass parameter class stringtorowconverter. option , assuming first row header, skip first row , use map rest of file correctly.
the code below implements 2nd approach , configures output append existing bigquery table.
public class dfjob { public static class stringtorowconverter extends dofn<string, tablerow> { private string[] columnnames; private boolean isfirstrow = true; public void processelement(processcontext c) { tablerow row = new tablerow(); string[] parts = c.element().split(","); if (isfirstrow) { columnnames = arrays.copyof(parts, parts.length); isfirstrow = false; } else { (int = 0; < parts.length; i++) { row.set(columnnames[i], parts[i]); } c.output(row); } } } public static void main(string[] args) { dataflowpipelineoptions options = pipelineoptionsfactory.create() .as(dataflowpipelineoptions.class); options.setrunner(blockingdataflowpipelinerunner.class); pipeline p = pipeline.create(options); p.apply(textio.read.from("gs://dataflow-samples/myfile.csv")) .apply(pardo.of(new stringtorowconverter())) .apply(bigqueryio.write.to("mytable") .withcreatedisposition(bigqueryio.write.createdisposition.create_never) .withwritedisposition(bigqueryio.write.writedisposition.write_append)); pipelineresult result = p.run(); } }
Comments
Post a Comment