[HELP] Sending data to multiple TcpStreams upon receiving data from another TcpStream
from SuddenlyBlowGreen@lemmy.world to rust@lemmy.ml on 26 Sep 2023 21:09
https://lemmy.world/post/5780632

Hey guys,

I’ve been stuck on this problem for a while, maybe someone with more tokio/async experience can help me with it.

The goal of the program is this: It’s a TCP server that accepts connections. Any data/text received should be sent to the other connections.

The problem stems prom the face that I need write access to the vector of OwnedWriteHalfs in two places, simultaneously.

So when I first lock the RwLock of the vector to listen in a loop for the messages from the MPSC, that locks the vector to the part of the code that should put the connections in that vector.

Once, when I put it in the vector of OwnedWriteHalfs, and once when I go over the list to send the messages to the other clients.

Anybody got any ideas or pointers? Thanks!

The code:

use std::{io::Error, sync::Arc};

use tokio::{io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter},
            net::{tcp::{OwnedReadHalf, OwnedWriteHalf},
                  TcpListener},
            sync::{mpsc::{self, UnboundedReceiver, UnboundedSender},
                   RwLock}};

async fn stream_handler(stream : OwnedReadHalf, sender : UnboundedSender<(String, String)>)
{
    let addr = (&stream).peer_addr().unwrap().to_string();

    let mut reader = BufReader::new(stream);

    let mut buffer : Vec = vec![];

    while let Ok(n) = reader.read_until(b'\n', &mut buffer).await
    {
        if n == 0
        {
            break;
        }

        let message = String::from_utf8_lossy(&buffer[..]);

        sender.send((addr.clone(), message.to_string())).unwrap();

        buffer.clear();
    }
}

async fn send_to_others(
    recv : &mut UnboundedReceiver<(String, String)>,
    writes : Arc>>
)
{
    loop
    {
        let write_clone = writes.clone();
        let mut write = write_clone.write().await;
        if let Some(msg) = recv.recv().await
        {
            for stream in write.iter_mut()
            {
                let mut writer = BufWriter::new(stream);

                let message = format!("INCOMING: {} - {}", msg.0, msg.1);

                _ = writer.write_all(message.as_bytes()).await;
            }
        }
    }
}

#[tokio::main]
async fn main() -> Result<(), Error>
{
    let listener = TcpListener::bind("0.0.0.0:6667").await?;

    let (send, mut receive) : (
        UnboundedSender<(String, String)>,
        UnboundedReceiver<(String, String)>
    ) = mpsc::unbounded_channel();

    let writes : Arc>> = Arc::new(RwLock::new(vec![]));

    let writes_clone = writes.clone();

    tokio::spawn(async move {
        send_to_others(&mut receive, writes_clone).await;
    });

    loop
    {
        println!("starting loop");
        let send = send.clone();
        let writes = writes.clone();

        // This runs once per connection, and starts a background thread running the 'stream_handler' function
        if let Ok((stream, address)) = listener.accept().await
        {
            let (read, write) = stream.into_split();

            let mut writes = writes.write().await;

            writes.push(write);

            println!("New client: {:#?}", address.to_string());

            tokio::spawn(async move {
                stream_handler(read, send).await;
            });
        }
    }
}

#rust

threaded - newest