java - Processing very large xml file with Hadoop MapReduce -
i new hadoop , want read data large xml file (8gb).as read many blogs in-order-to process xml file, have write custom xmlinputformat class. used on of custom xmlinputformat class in blog. after run hadoop job takes large amount of time complete job using 150mb xml file input. need pass large xml chunk @ once mapper. bellow custom inputformat class
public class xmlinputformat extends textinputformat { public static final string start_tag_key = "xmlinput.start"; public static final string end_tag_key = "xmlinput.end"; public recordreader<longwritable, text> createrecordreader(inputsplit split, taskattemptcontext context) { return new xmlrecordreader(); } public static class xmlrecordreader extends recordreader<longwritable, text> { private byte[] starttag; private byte[] endtag; private long start; private long end; private fsdatainputstream fsin; private dataoutputbuffer buffer = new dataoutputbuffer(); private longwritable key = new longwritable(); private text value = new text(); @override public void initialize(inputsplit split, taskattemptcontext context) throws ioexception, interruptedexception { configuration conf = context.getconfiguration(); starttag = conf.get(start_tag_key).getbytes("utf-8"); endtag = conf.get(end_tag_key).getbytes("utf-8"); filesplit filesplit = (filesplit) split; // open file , seek start of split start = filesplit.getstart(); end = start + filesplit.getlength(); path file = filesplit.getpath(); filesystem fs = file.getfilesystem(conf); fsin = fs.open(filesplit.getpath()); fsin.seek(start); } @override public boolean nextkeyvalue() throws ioexception, interruptedexception { if (fsin.getpos() < end) { if (readuntilmatch(starttag, false)) { try { buffer.write(starttag); if (readuntilmatch(endtag, true)) { key.set(fsin.getpos()); value.set(buffer.getdata(), 0, buffer.getlength()); return true; } } { buffer.reset(); } } } return false; } @override public longwritable getcurrentkey() throws ioexception, interruptedexception { return key; } @override public text getcurrentvalue() throws ioexception, interruptedexception { return value; } @override public void close() throws ioexception { fsin.close(); } @override public float getprogress() throws ioexception { return math.min(1.0f, (fsin.getpos() - start) / (float)(end - start)); } private boolean readuntilmatch(byte[] match, boolean withinblock) throws ioexception { int = 0; while (true) { int noofbyteindicator = fsin.read(); // end of file: if (noofbyteindicator == -1) return false; // save buffer: if (withinblock) buffer.write(noofbyteindicator); string buffertoprint = new string(buffer.getdata()); // check if we're matching: if (noofbyteindicator == match[i]) { i++; if (i >= match.length) return true; } else = 0; // see if we've passed stop point: if (!withinblock && == 0 && fsin.getpos() >= end) return false; } } }}
does have working example? searched found same custom inputformat class.as read file 200mb should processed less minute.
i adding mapper class in case of need.
public class xmlmapper extends mapper<longwritable, text, text, text> { @override protected void map(longwritable key, text value, context context) throws ioexception, interruptedexception { map<string,string> mapvalue = null; string document = value.tostring(); // system.out.println("‘" + document + "‘"); try { xmlstreamreader reader = xmlinputfactory.newinstance() .createxmlstreamreader( new bytearrayinputstream(document.getbytes())); string propertyid = ""; string currentelement = ""; string newvalue= ""; stringbuffer sb = new stringbuffer(); while (reader.hasnext()) { int code = reader.next(); switch (code) { case xmlstreamconstants.start_element: currentelement = reader.getlocalname(); break; case xmlstreamconstants.end_element: currentelement = ""; break; case xmlstreamconstants.characters: if (currentelement.equalsignorecase("id")) { propertyid = reader.gettext(); sb.append(propertyid).append(","); } else if (currentelement.equalsignorecase("supplier")) { sb.append(reader.gettext()).append(","); } else if (currentelement.equalsignorecase("value")) { sb.append(reader.gettext()).append("\n"); } break; } } reader.close(); context.write(new text(propertyid), new text(sb.tostring())); } catch (exception e) { throw new ioexception(e); } }}
bellow sample xml need process.
<root> <row> <id>0000000001</id> <supplier>1111111111</supplier> <value>tpk100</value> </row> <row> <id>0000000002</id> <supplier>1111111111</supplier> <value>tpk101</value> </row> </root>
i need process data within each <row></row>
tag
Comments
Post a Comment