Trouble with shared state updates between threads using Arc and

ghz 8months ago ⋅ 73 views

Trouble with shared state updates between threads using Arc and Mutex in Rust

I'm working on a Rust application where I need to manage shared state between two threads using Arc and Mutex. The application has a Node struct that holds some data, and I'm trying to update this data in one thread and read it in another. However, I'm facing an issue where updates made in the handle_client thread are not visible in the do_broadcast thread. I suspect this might be due to the way I'm handling locking, but I'm not sure how to resolve it.

Here's a simplified version of my code:

#[derive(Serialize, Deserialize, Debug)]
struct Message<P> {
    // fields omitted for brevity
}

#[derive(Debug)]
struct Node {
    msg_id: usize,
    sum: usize,
    log: HashMap<String, usize>,
    node_id: String,
    node_ids: Vec<String>,
}

impl Node {
    fn step(&mut self, input: Message<Payload>, output: &mut io::StdoutLock) -> anyhow::Result<()> {
        // implementation details
    }

    fn broadcast(&mut self, output: &mut io::StdoutLock) {
        // implementation details
    }
}

fn main() -> anyhow::Result<()> {
    let node = Arc::new(Mutex::new(Node {
        msg_id: 0,
        sum: 0,
        log: HashMap::new(),
        node_id: "node1".to_string(),
        node_ids: vec!["node2".to_string()],
    }));

    let _node = node.clone();

    let handle_client = thread::spawn(move || {
        let stdin = io::stdin().lock();
        let inputs = serde_json::Deserializer::from_reader(stdin).into_iter::<Message<Payload>>();
        let mut stdout = io::stdout().lock();

        for input in inputs {
            thread::sleep(Duration::from_millis(1));
            let input = input.context("cannot deserialize the input message").unwrap();
            _node.lock().unwrap().step(input, &mut stdout).unwrap();
        }
    });

    let _node = node.clone();

    let do_broadcast = thread::spawn(move || loop {
        let mut stdout = io::stdout().lock();
        _node.lock().unwrap().broadcast(&mut stdout);
        thread::sleep(Duration::from_millis(100));
    });

    handle_client.join().unwrap();
    do_broadcast.join().unwrap();

    Ok(())
}

In the handle_client thread, I'm processing user input and updating the Node instance. In the do_broadcast thread, I'm trying to read the updated values to broadcast them. However, the updates are not visible in the do_broadcast thread.

What could be causing this issue, and how can I ensure that the do_broadcast thread sees the updates made by the handle_client thread?

Additional Information:

I attempted a workaround by modifying the loop in the handle_client thread as follows:

for input in inputs {
    thread::sleep(Duration::from_millis(1));
    let input = input.context("cannot deserialize the input message").unwrap();
    _node.lock().unwrap().step(input, &mut stdout).unwrap();
    break;  // Breaking after the first input
}

Adding break after processing the first input resolves the issue, and the do_broadcast thread is able to see the updated values. However, this solution is not viable for my application because I need to continuously listen for user input without breaking out of the loop.

How can I modify my code to continuously process user input in the handle_client thread while ensuring that updates are visible in the do_broadcast thread?

Github Link of code

Answers

The issue you're encountering is likely related to the fact that the lock method on Mutex only guarantees exclusive access within the same thread. In your case, each thread locks the mutex separately, so changes made by one thread may not be visible to the other thread.

To ensure that updates made by one thread are visible to the other thread, you need to ensure that both threads are accessing the shared Node instance through the same mutex. One way to achieve this is to share the mutex itself between threads rather than cloning it.

Here's how you can modify your code to achieve this:

use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
use std::io::{self, BufRead};
use std::collections::HashMap;
use serde::{Serialize, Deserialize};
use anyhow::Context;

#[derive(Serialize, Deserialize, Debug)]
struct Message<P> {
    // fields omitted for brevity
}

#[derive(Debug)]
struct Node {
    msg_id: usize,
    sum: usize,
    log: HashMap<String, usize>,
    node_id: String,
    node_ids: Vec<String>,
}

impl Node {
    fn step(&mut self, input: Message<Payload>, output: &mut io::StdoutLock) -> anyhow::Result<()> {
        // implementation details
        Ok(())
    }

    fn broadcast(&mut self, output: &mut io::StdoutLock) {
        // implementation details
    }
}

fn main() -> anyhow::Result<()> {
    let node = Arc::new(Mutex::new(Node {
        msg_id: 0,
        sum: 0,
        log: HashMap::new(),
        node_id: "node1".to_string(),
        node_ids: vec!["node2".to_string()],
    }));

    let handle_client = {
        let node = node.clone();
        thread::spawn(move || {
            let stdin = io::stdin().lock();
            let inputs = serde_json::Deserializer::from_reader(stdin).into_iter::<Message<Payload>>();
            let mut stdout = io::stdout().lock();

            for input in inputs {
                let input = input.context("cannot deserialize the input message").unwrap();
                let mut node = node.lock().unwrap();
                node.step(input, &mut stdout).unwrap();
            }
        })
    };

    let do_broadcast = {
        let node = node.clone();
        thread::spawn(move || {
            loop {
                let mut stdout = io::stdout().lock();
                let mut node = node.lock().unwrap();
                node.broadcast(&mut stdout);
                thread::sleep(Duration::from_millis(100));
            }
        })
    };

    handle_client.join().unwrap();
    do_broadcast.join().unwrap();

    Ok(())
}

In this modified version, both the handle_client and do_broadcast threads capture the shared node instance by cloning the Arc, ensuring that they both have access to the same mutex and therefore see the updates made by each other.

This should resolve the visibility issue you're encountering in your application.