Any plans to have RxDart be more close to canonical implementation of Rx

I understand that some tradeoffs had to be made to implement Rx in Dart (as outlined in this doc but in my opinion the best thing about is lost in the process – parity of features/api with other Rx implementations. It makes my RxSwift or RxJS skills for example less applicable and transferable to RxDart and vice versa.
Are there any plans to make APIs closer to canonical Rx such as RxJS?

2 thoughts on “Any plans to have RxDart be more close to canonical implementation of Rx

  1. “I see. Any way to have it more “inline”/concise like RxJS or RxSwift examples I added to this post? Ideally I’d like to wrap my observable creation in a single method in my, let’s say, view model class.”

    Yep! To create streams, there are a couple ways. First, using StreamControllers (most visually similar to what you were looking to do):

    Stream<String> createObservable() {
      final controller = new StreamController<String>();
      controller.add("First event will be emitted!");
      controller.addError(new Exception("Oh Noes! This error will be emitted"));
      return new Observable(;

    You can then listen to these like so:

    createObservable().listen((x) => print("Next: $x"),
            onError: (e, s) => print("Error: $e"),
            onDone: () => print("Completed"));

    Alternatively, the exact same thing can be achieved using async generator functions! These might be a bit advanced / look funny at first, and only return raw Streams (would have to be converted to an observable outside if you need da Rx Powers), but are really cool and quite expressive. Any values you yield from the function will be emitted as items in your stream! If you throw any errors, those will be propagated as well to the listener! This lets you write “synchronous looking” code that is in fact async 🙂

    Stream<String> createStreamWithGenerator() async* { // Notice that little async* 
      yield "First event will be emitted!";
      // Run a fake expensive operation for 2 seconds 
      await new Future.delayed(new Duration(seconds: 2)); 
      // This exception will be sent to the `onError` handlers
      throw new Exception("Oh Noes! Then this error will be emitted");
      // Then the stream closes implicitly

    You can then listen to these like so:

    createStreamWithGenerator().listen((x) => print("Next: $x"),
            onError: (dynamic error) => print("Error: $e"),
            onDone: () => print("Completed"));

    Or, if you needed to beef that async function up and flatmap it,

    new Observable(createStreamWithGenerator()).flatMap((x) => new Observable.just("Flatmapped"))
            .listen((x) => print("Next: $x"),
            onError: (e, s) => print("Error: $e"),
            onDone: () => print("Completed"));

    If you wanted to test these streams, you could do so using the Test package + Stream matchers:

    test('Creates a stream and throws an error', () async {
        await expect(createObservable(), emitsInOrder(["First event will be emitted!", emitsError(isException), emitsDone]));

    “That makes sense. Although I’d love to have more of my fellow RxSwift peeps swayed towards trying Dart through RxDart. And my main argument usually is that it’s exactly the same thing you already know….”

    Ya, it’s something we’ve thought a lot about, but it’s a difficult tradeoff. We’re certainly trying to accommodate new users as much as we can, but feel it would be a disservice in many ways to try to Rx-ify everything over the Dart standards. What if one Stream in DartLand cancelled when an error occurred, then you switch to a non-Rx Stream and it doesn’t? Or one stream can be listened to multiple times whereas every other stream can’t be? Those details could become maddening — “Wait, am I dealing with Rx where it will cancel or regular Streams here?! Oh, I can’t listen twice to a stream I had in the flatMap chain!?”

    A bit harder up front for adoption, but will make all our lives much easier when peeps start writing a bit more Dart, and let us be friends with the wider Dart community!