java - Apache Camel: asyncCallback reply threads change in Camel 17 and Camel 18/19 -
with camel version 17.x calls on synchroniztion.oncomplete() callback on 'activemq:queue' done in different threads , if processing of response inside oncomplete slow messages did not got blocked , queued. should result of asyncconsumer=true&defaulttaskexecutortype=threadpool&concurrentconsumers=2&maxconcurrentconsumers=100 configuration understand. example output showcase :
received async reply: 2000 ok received async reply: 3000 ok received async reply: 5000 ok received async reply: 9000 ok received async reply: 10000 ok finished async reply: 2000 ok finished async reply: 3000 ok finished async reply: 5000 ok finished async reply: 9000 ok finished async reply: 10000 ok
so "finished" logs after "received" each called in different thread. replies received async , how long 1 handled doesn't affect receiving of others.
after upgrading camel 18.x (or 19.x) no longer same. , receiving reply , handling (long process) blocks receiving of others. because same thread(s) used call synchroniztion.oncomplete() replies queued until handled.
received async reply: 2000 ok received async reply: 10000 ok received async reply: 9000 ok finished async reply: 2000 ok received async reply: 3000 ok finished async reply: 3000 ok finished async reply: 9000 ok finished async reply: 10000 ok received async reply: 5000 ok finished async reply: 5000 ok
i thought these new properties configure this: replytoconcurrentconsumers=2&replytomaxconcurrentconsumers=100 , if new message comes processed , replied in new thread if current threads processing right (and of course if max count not reached it's normal thread process several messages , queue them)
maybe wrong how configure route similar result in camel 17. increasing replytoconcurrentconsumers property works think should scaled dynamically , rely on replytomaxconcurrentconsumers if needed.
the code :
public class asynccallbacktest { private static final logger log = loggerfactory.getlogger(asynccallbacktest.class); public static void main(string[] args) throws exception { // create , setup default camel context final camelcontext camelcontext = new defaultcamelcontext(); setuproutes(camelcontext); camelcontext.start(); // real test asynccallbacktest(camelcontext); camelcontext.stop(); system.exit(0); } private static void setuproutes(camelcontext camelcontext) throws exception { camelcontext.addroutes(new routebuilder() { public void configure() { from("activemq://queue:asynctest?asyncconsumer=true&defaulttaskexecutortype=threadpool" + "&concurrentconsumers=2&maxconcurrentconsumers=100") .process(exchange -> { final string msg = string.valueof(exchange.getin().getbody()); exchange.getout().setbody(msg + " ok"); }); } }); } private static void asynccallbacktest(camelcontext camelcontext) throws exception { final int[] delays = new int[]{9000, 10000, 5000, 2000, 3000}; final countdownlatch countdownlatch = new countdownlatch(delays.length); final synchronization callback = new synchronizationadapter() { @override public void oncomplete(exchange exchange) { log.info("received async reply: " + exchange.getout().getbody()); final int delay = (int) exchange.getin().getbody(); synchronized (this) { try { this.wait(delay); } catch (interruptedexception e) { e.printstacktrace(); } } log.info("finished async reply: " + exchange.getout().getbody()); super.oncomplete(exchange); } @override public void ondone(exchange exchange) { countdownlatch.countdown(); } }; final producertemplate producertemplate = camelcontext.createproducertemplate(); (int = 0; < delays.length; i++) { final exchange exchange = new defaultexchange(camelcontext); exchange.getin().setbody(delays[i]); exchange.setpattern(exchangepattern.inout); producertemplate.asynccallback("activemq://queue:asynctest?" + "replytoconcurrentconsumers=2&replytomaxconcurrentconsumers=100", exchange, callback); } countdownlatch.await(); camelcontext.stop(); }
}
Comments
Post a Comment