from SuddenlyBlowGreen@lemmy.world to rust@lemmy.ml on 26 Sep 2023 21:09
https://lemmy.world/post/5780632
Hey guys,
I’ve been stuck on this problem for a while, maybe someone with more tokio/async experience can help me with it.
The goal of the program is this: It’s a TCP server that accepts connections. Any data/text received should be sent to the other connections.
The problem stems prom the face that I need write access to the vector of OwnedWriteHalf
s in two places, simultaneously.
So when I first lock the RwLock of the vector to listen in a loop for the messages from the MPSC, that locks the vector to the part of the code that should put the connections in that vector.
Once, when I put it in the vector of OwnedWriteHalf
s, and once when I go over the list to send the messages to the other clients.
Anybody got any ideas or pointers? Thanks!
The code:
use std::{io::Error, sync::Arc}; use tokio::{io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter}, net::{tcp::{OwnedReadHalf, OwnedWriteHalf}, TcpListener}, sync::{mpsc::{self, UnboundedReceiver, UnboundedSender}, RwLock}}; async fn stream_handler(stream : OwnedReadHalf, sender : UnboundedSender<(String, String)>) { let addr = (&stream).peer_addr().unwrap().to_string(); let mut reader = BufReader::new(stream); let mut buffer : Vec = vec![]; while let Ok(n) = reader.read_until(b'\n', &mut buffer).await { if n == 0 { break; } let message = String::from_utf8_lossy(&buffer[..]); sender.send((addr.clone(), message.to_string())).unwrap(); buffer.clear(); } } async fn send_to_others( recv : &mut UnboundedReceiver<(String, String)>, writes : Arc>> ) { loop { let write_clone = writes.clone(); let mut write = write_clone.write().await; if let Some(msg) = recv.recv().await { for stream in write.iter_mut() { let mut writer = BufWriter::new(stream); let message = format!("INCOMING: {} - {}", msg.0, msg.1); _ = writer.write_all(message.as_bytes()).await; } } } } #[tokio::main] async fn main() -> Result<(), Error> { let listener = TcpListener::bind("0.0.0.0:6667").await?; let (send, mut receive) : ( UnboundedSender<(String, String)>, UnboundedReceiver<(String, String)> ) = mpsc::unbounded_channel(); let writes : Arc>> = Arc::new(RwLock::new(vec![])); let writes_clone = writes.clone(); tokio::spawn(async move { send_to_others(&mut receive, writes_clone).await; }); loop { println!("starting loop"); let send = send.clone(); let writes = writes.clone(); // This runs once per connection, and starts a background thread running the 'stream_handler' function if let Ok((stream, address)) = listener.accept().await { let (read, write) = stream.into_split(); let mut writes = writes.write().await; writes.push(write); println!("New client: {:#?}", address.to_string()); tokio::spawn(async move { stream_handler(read, send).await; }); } } }
threaded - newest