Vert.x with Rx-Java implementing caching for a service call -
say, someotherservice in 1 veritcle use userservice in different verticle, communication happens on event bus. represent it:
class someotherservice { final userservice userservice = new userservice(); // mutable state final map<string, single<string>> cache = new hashmap(); // not synchronized ? public single<string> getusersessioninfo(string id) { // seems not save ! : return cache.computeifabsent(id, _id -> { log("could not find " + id + " in cache. connecting userservice..."); return userservice.getusersessioninfo(id); // uses generated proxy send msg event bus call } ); } } // somewhere in verticle/micro-service on machine.
class userservice { public single<string> getusersessioninfo(string id) { return single.fromcallable( () -> { waitforonesecond(); log("getusersessioninfo " + id); if (id.equals("1")) return "one"; if (id.equals("2")) return "two"; else throw new exception("could not"); // legal? } ); } and client code, subscribe , deciding scheduler:
final observable<string> obs2 = observable.from(new string[] {"1", "1"}); // emulating sequential call of 'getusersessioninfo' fork in separate scheduler obs.flatmap(id -> { log("flatmap"); // on main thread return someotherservice.getusersessioninfo(id) .subscribeon(schedulera) // forking. thread starvation happen? (since have 10 threads in pool) .toobservable(); } ).subscribe( x -> log("next: " + x) ); the question is, how solution use hashmap cache (since shared state here) using computeifabsent method?
even though using event loop & event bus not save shared state , possible concurrency issue, assuming log-operation (like getusersessioninfo(id) happens in separate scheduler/threads?
should use replysubject instead implement caching? best practices vert.x + rx-java?
seems loon cache.computeifabsent run on eventloop safe because sequential ?
sorry.. lot of questions, guess can narrow down to: best practices implement cash service calls in vert.x , rx-java?
the whole example here:
i think found answer here: http://blog.danlew.net/2015/06/22/loading-data-from-multiple-sources-with-rxjava/ -
observable<data> source = observable .concat(memory, diskwithcache, networkwithsave) .first(); and when save using map.put(..) explicitly instead of using computeifabsent
and log i'm on event-loop i'm safe use un-sychronized cash map
Comments
Post a Comment