Trouble with shared state updates between threads using Arc and Mutex in Rust
I'm working on a Rust application where I need to manage shared state between two threads using Arc
and Mutex
. The application has a Node
struct that holds some data, and I'm trying to update this data in one thread and read it in another. However, I'm facing an issue where updates made in the handle_client
thread are not visible in the do_broadcast
thread. I suspect this might be due to the way I'm handling locking, but I'm not sure how to resolve it.
Here's a simplified version of my code:
#[derive(Serialize, Deserialize, Debug)]
struct Message<P> {
// fields omitted for brevity
}
#[derive(Debug)]
struct Node {
msg_id: usize,
sum: usize,
log: HashMap<String, usize>,
node_id: String,
node_ids: Vec<String>,
}
impl Node {
fn step(&mut self, input: Message<Payload>, output: &mut io::StdoutLock) -> anyhow::Result<()> {
// implementation details
}
fn broadcast(&mut self, output: &mut io::StdoutLock) {
// implementation details
}
}
fn main() -> anyhow::Result<()> {
let node = Arc::new(Mutex::new(Node {
msg_id: 0,
sum: 0,
log: HashMap::new(),
node_id: "node1".to_string(),
node_ids: vec!["node2".to_string()],
}));
let _node = node.clone();
let handle_client = thread::spawn(move || {
let stdin = io::stdin().lock();
let inputs = serde_json::Deserializer::from_reader(stdin).into_iter::<Message<Payload>>();
let mut stdout = io::stdout().lock();
for input in inputs {
thread::sleep(Duration::from_millis(1));
let input = input.context("cannot deserialize the input message").unwrap();
_node.lock().unwrap().step(input, &mut stdout).unwrap();
}
});
let _node = node.clone();
let do_broadcast = thread::spawn(move || loop {
let mut stdout = io::stdout().lock();
_node.lock().unwrap().broadcast(&mut stdout);
thread::sleep(Duration::from_millis(100));
});
handle_client.join().unwrap();
do_broadcast.join().unwrap();
Ok(())
}
In the handle_client
thread, I'm processing user input and updating the Node
instance. In the do_broadcast
thread, I'm trying to read the updated values to broadcast them. However, the updates are not visible in the do_broadcast
thread.
What could be causing this issue, and how can I ensure that the do_broadcast
thread sees the updates made by the handle_client
thread?
Additional Information:
I attempted a workaround by modifying the loop in the handle_client
thread as follows:
for input in inputs {
thread::sleep(Duration::from_millis(1));
let input = input.context("cannot deserialize the input message").unwrap();
_node.lock().unwrap().step(input, &mut stdout).unwrap();
break; // Breaking after the first input
}
Adding break
after processing the first input resolves the issue, and the do_broadcast
thread is able to see the updated values. However, this solution is not viable for my application because I need to continuously listen for user input without breaking out of the loop.
How can I modify my code to continuously process user input in the handle_client
thread while ensuring that updates are visible in the do_broadcast
thread?
Answers
The issue you're encountering is likely related to the fact that the lock
method on Mutex
only guarantees exclusive access within the same thread. In your case, each thread locks the mutex separately, so changes made by one thread may not be visible to the other thread.
To ensure that updates made by one thread are visible to the other thread, you need to ensure that both threads are accessing the shared Node
instance through the same mutex. One way to achieve this is to share the mutex itself between threads rather than cloning it.
Here's how you can modify your code to achieve this:
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
use std::io::{self, BufRead};
use std::collections::HashMap;
use serde::{Serialize, Deserialize};
use anyhow::Context;
#[derive(Serialize, Deserialize, Debug)]
struct Message<P> {
// fields omitted for brevity
}
#[derive(Debug)]
struct Node {
msg_id: usize,
sum: usize,
log: HashMap<String, usize>,
node_id: String,
node_ids: Vec<String>,
}
impl Node {
fn step(&mut self, input: Message<Payload>, output: &mut io::StdoutLock) -> anyhow::Result<()> {
// implementation details
Ok(())
}
fn broadcast(&mut self, output: &mut io::StdoutLock) {
// implementation details
}
}
fn main() -> anyhow::Result<()> {
let node = Arc::new(Mutex::new(Node {
msg_id: 0,
sum: 0,
log: HashMap::new(),
node_id: "node1".to_string(),
node_ids: vec!["node2".to_string()],
}));
let handle_client = {
let node = node.clone();
thread::spawn(move || {
let stdin = io::stdin().lock();
let inputs = serde_json::Deserializer::from_reader(stdin).into_iter::<Message<Payload>>();
let mut stdout = io::stdout().lock();
for input in inputs {
let input = input.context("cannot deserialize the input message").unwrap();
let mut node = node.lock().unwrap();
node.step(input, &mut stdout).unwrap();
}
})
};
let do_broadcast = {
let node = node.clone();
thread::spawn(move || {
loop {
let mut stdout = io::stdout().lock();
let mut node = node.lock().unwrap();
node.broadcast(&mut stdout);
thread::sleep(Duration::from_millis(100));
}
})
};
handle_client.join().unwrap();
do_broadcast.join().unwrap();
Ok(())
}
In this modified version, both the handle_client
and do_broadcast
threads capture the shared node
instance by cloning the Arc
, ensuring that they both have access to the same mutex and therefore see the updates made by each other.
This should resolve the visibility issue you're encountering in your application.