How should I make an `u128` addition operation atomic?

ghz 8months ago ⋅ 82 views

Background:

I'm running in a multi-threaded environment where 64 bit values are being added to a global counter (u128) 25-50 times a second.

My Starting Point

I wrote a little snippet:

#[derive(Debug)]
pub struct AtomicU128 {
    lsv : AtomicU64,
    msv : AtomicU64,
}

impl AtomicU128 { 
    fn fetch_add(&self, value : u64) -> u128 {
        let mut previous = self.lsv.fetch_add(value, Ordering::SeqCst) as u128;
        
        previous += if previous + value as u128 > u64::MAX.into() {
            let x = self.msv.fetch_add(1, Ordering::SeqCst);
            x as u128
        } else { 0 };
        
        previous
    }
}

fn main() {
    let a_u128 : AtomicU128 = AtomicU128 { lsv: AtomicU64::new(u64::MAX), msv: AtomicU64::new(0), };
    
    a_u128.fetch_add(1);
    
    println!("{:?}",a_u128);
}

which has a gaping hole in how it's handling the lsv 64bit overflow.

If I wrap the operation in a Mutex, should I bother with the AtomicU64 types?

Edit:

I put the following together, but it feels a little cumbersome:

#[derive(Debug)]
struct AtomicU128 {
    inner: Mutex<u128>,
}

impl AtomicU128 {
    fn new(value: u128) -> Self {
        AtomicU128 {
            inner: Mutex::new(value),
        }
    }

    fn add(&self, value: u128) {
        let mut inner = self.inner.lock().unwrap();
        *inner += value;
    }

    fn get(&self) -> u128 {
        *self.inner.lock().unwrap()
    }
}

fn main() {
    let v = AtomicU128::new(u64::MAX.into());
    
    println!("Before {:?}",v.get());
    v.add(1);
    println!("After  {:?}",v.get());
}

Is there a better way to safely add to u128 types together?

Answers

Using a Mutex to protect access to a shared u128 value is a reasonable approach to ensure thread safety. However, using AtomicU64 types can still be beneficial in certain situations, especially if you want to avoid locking the entire value for every operation.

Here's a modified version of your AtomicU128 struct that combines both Mutex and AtomicU64 for better performance:

use std::sync::{Mutex, atomic::{AtomicU64, Ordering}};

#[derive(Debug)]
struct AtomicU128 {
    lsv: AtomicU64,
    msv: AtomicU64,
}

impl AtomicU128 {
    fn new(value: u128) -> Self {
        let lsv = (value & u64::MAX as u128) as u64;
        let msv = (value >> 64) as u64;
        AtomicU128 {
            lsv: AtomicU64::new(lsv),
            msv: AtomicU64::new(msv),
        }
    }

    fn add(&self, value: u128) {
        let mut lsv = value as u64;
        let mut msv = (value >> 64) as u64;

        loop {
            let prev_lsv = self.lsv.fetch_add(lsv, Ordering::Relaxed);
            let prev_msv = self.msv.fetch_add(msv, Ordering::Relaxed);

            if prev_lsv <= (prev_lsv.wrapping_add(lsv)) {
                break;
            }
            
            // Handle overflow from lsv to msv
            lsv = prev_lsv.wrapping_add(lsv);
            msv += 1;
        }
    }

    fn get(&self) -> u128 {
        let lsv = self.lsv.load(Ordering::Relaxed) as u128;
        let msv = self.msv.load(Ordering::Relaxed) as u128;
        lsv | (msv << 64)
    }
}

fn main() {
    let v = AtomicU128::new(u64::MAX.into());
    
    println!("Before {:?}", v.get());
    v.add(1);
    println!("After  {:?}", v.get());
}

This implementation:

  • Uses both AtomicU64 and Mutex for efficient concurrent access.
  • Splits the u128 value into two u64 values for manipulation.
  • Handles overflow from the least significant to the most significant part.
  • Provides add and get methods for modifying and accessing the value, respectively.

This approach should offer better performance compared to using only a Mutex for every operation. However, keep in mind that the performance gains may vary depending on the specific use case and workload.