Vert.x with Rx-Java implementing caching for a service call -


say, someotherservice in 1 veritcle use userservice in different verticle, communication happens on event bus. represent it:

class someotherservice {      final userservice userservice = new userservice();      // mutable state     final map<string, single<string>> cache = new hashmap(); // not synchronized ?      public single<string> getusersessioninfo(string id) {         // seems not save ! :         return cache.computeifabsent(id, _id -> {                 log("could not find " + id + " in cache. connecting userservice...");                 return userservice.getusersessioninfo(id); // uses generated proxy send msg event bus call             }         );     } } 

// somewhere in verticle/micro-service on machine.

class userservice {      public single<string> getusersessioninfo(string id) {         return single.fromcallable( () -> {              waitforonesecond();              log("getusersessioninfo " + id);              if (id.equals("1"))                 return "one";             if (id.equals("2"))                 return "two";              else throw new exception("could not"); // legal?             }         );     } 

and client code, subscribe , deciding scheduler:

 final observable<string> obs2 = observable.from(new string[] {"1", "1"});           // emulating sequential call of 'getusersessioninfo' fork in separate scheduler         obs.flatmap(id -> {                     log("flatmap"); // on main thread                     return someotherservice.getusersessioninfo(id)                                            .subscribeon(schedulera) // forking. thread starvation happen? (since have 10 threads in pool)                                            .toobservable();                  }         ).subscribe(                 x -> log("next: " + x)         ); 

the question is, how solution use hashmap cache (since shared state here) using computeifabsent method?

even though using event loop & event bus not save shared state , possible concurrency issue, assuming log-operation (like getusersessioninfo(id) happens in separate scheduler/threads?

should use replysubject instead implement caching? best practices vert.x + rx-java?

seems loon cache.computeifabsent run on eventloop safe because sequential ?

sorry.. lot of questions, guess can narrow down to: best practices implement cash service calls in vert.x , rx-java?

the whole example here:

i think found answer here: http://blog.danlew.net/2015/06/22/loading-data-from-multiple-sources-with-rxjava/ -

observable<data> source = observable .concat(memory, diskwithcache, networkwithsave) .first();  

and when save using map.put(..) explicitly instead of using computeifabsent

and log i'm on event-loop i'm safe use un-sychronized cash map


Comments

Popular posts from this blog

What is happening when Matlab is starting a "parallel pool"? -

angular - DownloadURL return null in below code -

php - Cannot override Laravel Spark authentication with own implementation -