java - Apache Camel: asyncCallback reply threads change in Camel 17 and Camel 18/19 -


with camel version 17.x calls on synchroniztion.oncomplete() callback on 'activemq:queue' done in different threads , if processing of response inside oncomplete slow messages did not got blocked , queued. should result of asyncconsumer=true&defaulttaskexecutortype=threadpool&concurrentconsumers=2&maxconcurrentconsumers=100 configuration understand. example output showcase :

received async reply: 2000 ok received async reply: 3000 ok received async reply: 5000 ok received async reply: 9000 ok received async reply: 10000 ok finished async reply: 2000 ok finished async reply: 3000 ok finished async reply: 5000 ok finished async reply: 9000 ok finished async reply: 10000 ok 

so "finished" logs after "received" each called in different thread. replies received async , how long 1 handled doesn't affect receiving of others.

after upgrading camel 18.x (or 19.x) no longer same. , receiving reply , handling (long process) blocks receiving of others. because same thread(s) used call synchroniztion.oncomplete() replies queued until handled.

received async reply: 2000 ok received async reply: 10000 ok received async reply: 9000 ok finished async reply: 2000 ok received async reply: 3000 ok finished async reply: 3000 ok finished async reply: 9000 ok finished async reply: 10000 ok received async reply: 5000 ok finished async reply: 5000 ok 

i thought these new properties configure this: replytoconcurrentconsumers=2&replytomaxconcurrentconsumers=100 , if new message comes processed , replied in new thread if current threads processing right (and of course if max count not reached it's normal thread process several messages , queue them)

maybe wrong how configure route similar result in camel 17. increasing replytoconcurrentconsumers property works think should scaled dynamically , rely on replytomaxconcurrentconsumers if needed.

the code :

public class asynccallbacktest {  private static final logger log = loggerfactory.getlogger(asynccallbacktest.class);  public static void main(string[] args) throws exception {     // create , setup default camel context     final camelcontext camelcontext = new defaultcamelcontext();     setuproutes(camelcontext);     camelcontext.start();      // real test     asynccallbacktest(camelcontext);      camelcontext.stop();     system.exit(0); }  private static void setuproutes(camelcontext camelcontext) throws exception {     camelcontext.addroutes(new routebuilder() {         public void configure() {             from("activemq://queue:asynctest?asyncconsumer=true&defaulttaskexecutortype=threadpool" +                     "&concurrentconsumers=2&maxconcurrentconsumers=100")                     .process(exchange -> {                         final string msg = string.valueof(exchange.getin().getbody());                         exchange.getout().setbody(msg + " ok");                     });         }     }); }  private static void asynccallbacktest(camelcontext camelcontext) throws exception {     final int[] delays = new int[]{9000, 10000, 5000, 2000, 3000};      final countdownlatch countdownlatch = new countdownlatch(delays.length);      final synchronization callback = new synchronizationadapter() {         @override         public void oncomplete(exchange exchange) {             log.info("received async reply: " + exchange.getout().getbody());             final int delay = (int) exchange.getin().getbody();             synchronized (this) {                 try {                     this.wait(delay);                 } catch (interruptedexception e) {                     e.printstacktrace();                 }             }             log.info("finished async reply: " + exchange.getout().getbody());              super.oncomplete(exchange);         }          @override         public void ondone(exchange exchange) {             countdownlatch.countdown();         }     };      final producertemplate producertemplate = camelcontext.createproducertemplate();      (int = 0; < delays.length; i++) {         final exchange exchange = new defaultexchange(camelcontext);         exchange.getin().setbody(delays[i]);         exchange.setpattern(exchangepattern.inout);          producertemplate.asynccallback("activemq://queue:asynctest?" +                         "replytoconcurrentconsumers=2&replytomaxconcurrentconsumers=100",                 exchange, callback);     }      countdownlatch.await();     camelcontext.stop();  } 

}


Comments

Popular posts from this blog

Is there a better way to structure post methods in Class Based Views -

performance - Why is XCHG reg, reg a 3 micro-op instruction on modern Intel architectures? -

c# - Asp.net web api : redirect unauthorized requst to forbidden page -