.NET Rx C# Observable.FromEventPattern does not run OnCompleted

ghz 8months ago ⋅ 121 views

I could not figure it out why the following code could not run the OnCompleted, can anyone please help me thanks.

Basically what I'm doing is everytime I press a key, I'll fire an event and convert that event to an observable so that I can subscribe an observer to it. The observer simply prints out a text. Also is it a good idea to dispose the subscription once the observable has been consumed (which means call the Dispose inside the do while loop)?

I'm using .Net Rx via Nuget - System.Reactive (6.0.0)

This is the screenshot shows the result on terminal enter image description here

//Create observable from Observable.FromEventPattern
public class MainApp
{
    public event EventHandler<DeviceEventArgs>? DeviceEvent;
    private ConsoleKeyInfo ch;

    public void Start()
    {
        string[] deviceNames = ["Loop", "Sick", "RFID Reader", "Barrier"];
        string[] deviceStatus = ["On", "Off"];
        var rand = new Random();

        IDisposable devEventSubscription;
        var deviceEventObservables = Observable.FromEventPattern<EventHandler<DeviceEventArgs>, DeviceEventArgs>(
                h => DeviceEvent += h,
                h => DeviceEvent -= h)
            .Select(ep => ep.EventArgs);
        do
        {
            Console.WriteLine("Press space bar or enter to exit the program");
            ch = Console.ReadKey();
            Console.WriteLine();

            // Assume ch is data stream from the device that need to convert to Observables
            Console.WriteLine($"Char {ch.KeyChar} was entered");

            devEventSubscription = deviceEventObservables.Subscribe((eventArgs) =>
            {
                Console.WriteLine($"Received an observable. Time: {eventArgs.Epoch}. Type: {eventArgs.DeviceName}. Status: {eventArgs.Status}");
            },
            () => Console.WriteLine("Completed"));

            if (DeviceEvent != null)
            {
                DeviceEvent(null, new DeviceEventArgs()
                {
                    Epoch = DateTimeOffset.Now.ToUnixTimeSeconds(),
                    Status = deviceStatus[rand.Next(deviceStatus.Length)],
                    DeviceName = deviceNames[rand.Next(deviceNames.Length)]
                });
            }
            // Question: why i dont' see the Completed message from the OnCompleted and
            // should i move this line out of the do while loop?
            devEventSubscription?.Dispose();

        } while (!char.IsWhiteSpace(ch.KeyChar));
        
        Console.ReadKey();
    }
}


class Sample
{
    private static void Main()
    {
        var m = new MainApp();
        m.Start();
    }
}

Answers

It seems like your code has a couple of issues. Let's address them step by step.

  1. OnCompleted not firing: The OnCompleted message is not being printed because you are disposing of the subscription immediately after subscribing to the observable. Therefore, the subscription is disposed before the observable has a chance to complete.

  2. Subscription management: Disposing of the subscription inside the loop will prevent subsequent events from being handled. It's generally not a good idea to dispose of the subscription if you plan to continue receiving events.

Here's a revised version of your code:

using System;
using System.Reactive.Linq;

public class MainApp
{
    public event EventHandler<DeviceEventArgs>? DeviceEvent;
    private ConsoleKeyInfo ch;
    private IDisposable? devEventSubscription;

    public void Start()
    {
        string[] deviceNames = { "Loop", "Sick", "RFID Reader", "Barrier" };
        string[] deviceStatus = { "On", "Off" };
        var rand = new Random();

        var deviceEventObservables = Observable.FromEventPattern<EventHandler<DeviceEventArgs>, DeviceEventArgs>(
                h => DeviceEvent += h,
                h => DeviceEvent -= h)
            .Select(ep => ep.EventArgs);

        do
        {
            Console.WriteLine("Press space bar or enter to exit the program");
            ch = Console.ReadKey();
            Console.WriteLine();

            // Assume ch is data stream from the device that need to convert to Observables
            Console.WriteLine($"Char {ch.KeyChar} was entered");

            devEventSubscription = deviceEventObservables.Subscribe((eventArgs) =>
            {
                Console.WriteLine($"Received an observable. Time: {eventArgs.Epoch}. Type: {eventArgs.DeviceName}. Status: {eventArgs.Status}");
            },
            () => Console.WriteLine("Completed"));

            if (DeviceEvent != null)
            {
                DeviceEvent(null, new DeviceEventArgs()
                {
                    Epoch = DateTimeOffset.Now.ToUnixTimeSeconds(),
                    Status = deviceStatus[rand.Next(deviceStatus.Length)],
                    DeviceName = deviceNames[rand.Next(deviceNames.Length)]
                });
            }

        } while (!char.IsWhiteSpace(ch.KeyChar));

        // Dispose the subscription outside the loop
        devEventSubscription?.Dispose();
        Console.ReadKey();
    }
}

public class DeviceEventArgs : EventArgs
{
    public long Epoch { get; set; }
    public string Status { get; set; } = "";
    public string DeviceName { get; set; } = "";
}

class Sample
{
    private static void Main()
    {
        var m = new MainApp();
        m.Start();
    }
}

In this version, I've moved the disposal of the subscription outside the loop to ensure that events can still be handled after the first iteration. This should allow your OnCompleted message to be printed correctly.