c# - Observable from chained Tasks -


i'm trying create observable each item produced via asynchronous task. next item should produced via async call on result of previous item (co-recursion). in "generate" parlance - except generate not support async (nor support delegate on initial state.

var ob = observable.generate(    async () => await producefirst(),        // task<t> producefirst()    prev => continue(prev)                   // bool continue(t);    async prev => await producenext(prev)    // task<t> producenext(t)    item => item ); 

as more concrete example, peek messages servicebus queue fetching them 100 messages @ time, implement producefirst, continue , producenext follows:

task<ienumerable<brokeredmessage>> producefirst()  {     const int batchsize = 100;     return _servicebusreceiver.peekbatchasync(batchsize); }  bool continue(ienumerable<brokeredmessage> prev) {     return prev.any(); }  async task<ienumerable<brokeredmessage>> producenext(ienumerable<brokeredmessage> prev)  {     const int batchsize = 100;     return (await _servicebusreceiver.peekbatchasync(prev.last().sequencenumber, batchsize + 1)).skip(1) } 

and call .selectmany(i => i) on iobservable<ienumerable<brokeredmessage>> turn iobservable<brokeredmessage>

where _servicebusreceiver instance of interface follows:

public interface iservicebusreceiver {     task<ienumerable<brokeredmessage>> peekbatchasync(int batchsize);     task<ienumerable<brokeredmessage>> peekbatchasync(long fromsequencenumber, int batchsize); } 

and brokeredmessage https://msdn.microsoft.com/en-us/library/microsoft.servicebus.messaging.brokeredmessage.aspx

if going roll own async generate function recommend use of recursive scheduling instead of wrapping while loop.

public static iobservable<tresult> generate<tresult>(     func<task<tresult>> initialstate,     func<tresult, bool> condition,     func<tresult, task<tresult>> iterate,     func<tresult, tresult> resultselector,     ischeduler scheduler = null)  {   var s = scheduler ?? scheduler.default;    return observable.create<tresult>(async obs => {     return s.schedule(await initialstate(), async (state, self) =>      {       if (!condition(state))       {         obs.oncompleted();         return;       }        obs.onnext(resultselector(state));        self(await iterate(state));      });   }); } 

this has couple of advantages. first, able cancel this, simple while loop there no way cancel directly, in fact don't return subscribe function until observable has completed. second, lets control scheduling/asynchrony of each item (which makes testing breeze), makes better overall fit library


Comments

Popular posts from this blog

qt - Using float or double for own QML classes -

Create Outlook appointment via C# .Net -

ios - Swift Array Resetting Itself -