java - How to reset the last accumulation value in EventStream? -
i have merged eventstream n observable values. values want smallest one. example:
    var<integer> = var.newsimplevar(2);     var<integer> b = var.newsimplevar(3);     var<integer> c = var.newsimplevar(4);     ...      eventstream<integer> m = eventstreams.merge(a.values(), b.values(), c.values(), ...);      m = m.filter(integer -> integer > 1).accumulate((i1, i2) -> math.min(i1, i2));     m.subscribe(integer -> system.out.println(integer));       a.setvalue(0); // @ point input filtered , not emitted dont want last value "2" in accumulation.      b.setvalue(5);     c.setvalue(3);     //output "2".   my problem want after first filtered value new init value accumulate. in case example "integer.max_value".
so next compare in accumulate isn't: 
 "math.min(2,5)" -> "math.min(2,3)" 
 
 "math.min(max_value,5)" -> "math.min(5,3)".
so output shouldn't be:
 2, 2, 2, 2, 2 
 
 -> 2 : output minimum 2
 b -> 3 : output minimum 2
 c -> 4 : output minimum 2
a -> 0 : ok condition (value < 1) true. reset or better repeat stream (without holding last value 2 in accumulation)
b -> 5 : output minimum 5
 c -> 3 : output minimum 3
 -> 4 : output minimum 3
 ...
i use
eventstreams.combine(a.values(), b.values(), c.values())             .map(t3 -> t3.map((a, b, c) -> min3(a, b, c)));   where define min3 take minimum of 3 values, ignoring zeroes.
Comments
Post a Comment