#67 Understanding BitTorrent 6: Download the whole file
Introduction
In this blog post, we will dive deep into the creation of a basic BitTorrent client using Rust. We'll start by setting up our project, parsing command-line arguments, and managing asynchronous file downloads. Whether you're a seasoned Rustacean or new to the language, this guide will provide you with a practical understanding of how to leverage Rust's capabilities for network programming. By the end of this tutorial, you'll have a working BitTorrent client that can download files from peers, and you'll gain insights into Rust’s asynchronous programming model that could be applied to a wide range of other projects.
Overview of the Code
Our BitTorrent client in Rust is structured around several key components, each playing a critical role in the functionality of the application. The program is designed to handle command-line arguments, read torrent file metadata, manage downloads, and save files to the disk. Let’s break down the primary sections of our code to better understand their purposes and interactions.
Command Line Parsing
The backbone of user interaction with our BitTorrent client is the clap crate, which we use to parse command-line arguments. Our implementation utilizes the Args struct to define expected inputs and the Commands enum to differentiate between various user commands, focusing on the Download command for this tutorial. This approach allows us to neatly encapsulate command options and provide a clear interface for users to interact with the program.
Asynchronous Setup
Given the I/O-heavy nature of a file download application, asynchronous programming is essential. We leverage Rust’s tokio runtime to handle asynchronous tasks efficiently. The use of the #[tokio::main] macro simplifies the setup of the async environment, allowing us to focus on the logic of handling torrent file operations and network I/O without worrying about the underlying thread management.
Reading Torrent Files
The initial step in any BitTorrent download is to parse the .torrent file, which contains metadata about the files to be downloaded. Torrent::read function is designed to asynchronously read and parse this metadata, preparing the client to manage the download process. This functionality is crucial for understanding what needs to be downloaded and how the pieces of the file are structured across different peers.
Download Management
Once the torrent metadata is loaded, the core of our application involves managing the downloads. This includes connecting to peers, requesting pieces of the file, and handling the data as it is received. Our download logic checks the type of file structure (single or multiple files) and processes the incoming data accordingly, ensuring that each piece is correctly assembled and saved to the local filesystem.
File Handling
Handling files efficiently is vital in a BitTorrent client. Depending on whether the torrent describes a single file or multiple files, our application either writes a single continuous stream to the disk or handles multiple file paths and writes data to each file appropriately. This part of the code is especially sensitive to errors, so careful management of file paths and write operations is essential to avoid data corruption.
Detailed Code Breakdown
- Parsing Command Line Arguments
In this series, we have used clap crate for command line stuff. So we continue using it.
Download
takes two fields.
-
output
: the path that the downloaded file should be put -
torrent
: the path where torrent file put
#[derive(Parser)]
#[command(author, version, about, long_about = None)]
struct Args {
#[command(subcommand)]
command: Commands,
}
#[derive(Subcommand)]
enum Commands {
Download {
#[arg(short)]
output: PathBuf,
torrent: PathBuf,
},
}
- Asynchronous Main Function
Tokio is one of very popular crates for writing asynchronous applications with the Rust programming language.
Tokio provides main
macro that set up a Runtime without requiring the user to use Runtime or Builder directly.
#[tokio::main]
async fn main() -> anyhow::Result<()> {
Ok(())
}
Inside main function, we have to parse the command.
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let args = Args::parse();
match args.command {
Commands::Download { output, torrent } => {
}
}
Ok(())
}
- Reading and Parsing the Torrent File
I explained about reading torrent file in previous blog.
I will not explain much about read function, however, if you are curious about it, you can see here.
let t = Torrent::read(torrent).await?;
println!("Starting download for {}", t.info.name);
- Downloading Files
main.rs
This is the main part of this blog.
We need to create download::all
function which returns the file.
let files = download::all(&t).await?;
match &t.info.keys {
Keys::SingleFile { .. } => {
tokio::fs::write(
&output,
files.into_iter().next().expect("always one file").bytes(),
)
.await?;
}
Keys::MultiFile { .. } => {
while let Some(file) = files.into_iter().next() {
let file_path = file.path().join(std::path::MAIN_SEPARATOR_STR);
tokio::fs::write(&file_path, file.bytes()).await?;
}
}
}
println!("Downloaded test.torrent to {}.", output.display());
download.rs
all
function takes a parameter Torrent
struct which contains the information about torrent file.
pub async fn all(t: &Torrent) -> anyhow::Result<Downloaded> {
}
First of all, we need to discover peers that hold a piece of file.
In terms of discovering peers, I explained in this blog.
pub async fn all(t: &Torrent) -> anyhow::Result<Downloaded> {
let info_hash = t.info_hash();
let request = tracker::http::Request::new(&info_hash, t.length());
let addr = tracker::get_addr(&t.announce)?;
let peers = {
let res = reqwest::get(request.url(&url.to_string())).await?;
let res: tracker::http::Response =
serde_bencode::from_bytes(&res.bytes().await?).context("parse response")?;
res.peers.0
};
}
peers
are struct of Peers that hold Vec of address.
To establish connection with peer asynchronously, we need to construct a Peer
by calling Peer::new()
.
When dealing with I/O-bound tasks in Rust, we need to convert a regular iterator into a stream using futures_util::stream::iter()
and then apply transformations such as map
and buffer_unordered
.
The buffer_unordered
combinator is used to control how many asynchronous operations can run concurrently. In this case, up to 5 operations can be processed at the same time.
let mut peers = futures_util::stream::iter(peers)
.map(|peer_addr| async move {
let peer = Peer::new(peer_addr, &info_hash).await;
(peer_addr, peer)
})
.buffer_unordered(5);
let mut peer_list = Vec::new();
while let Some((peer_addr, peer)) = peers.next().await {
match peer {
Ok(peer) => {
eprintln!("Completed handshake with {peer_addr}");
peer_list.push(peer);
if peer_list.len() > 5 {
break;
}
}
Err(e) => {
eprintln!("Could not handshake with {peer_addr}. Disconnecting: {e}");
}
}
}
drop(peers);
We assume all peers have piece of file. If not, fail to assert assert!(no_peers.is_empty());
.
let mut peers = peer_list;
let mut need_pieces = BinaryHeap::new();
let mut no_peers = Vec::new();
for piece_i in 0..t.info.pieces.0.len() {
let piece = Piece::new(piece_i, &t, &peers);
if piece.peers().is_empty() {
no_peers.push(piece);
} else {
need_pieces.push(piece);
}
}
assert!(no_peers.is_empty());
Initializes a vector all_pieces filled with zeros. Its size is based on the total length of the data to be downloaded, as specified by t.length().
let mut all_pieces = vec![0; t.length()];
Continuously processes each piece needed for the file. The loop extracts pieces from need_pieces until none are left.
while let Some(piece) = need_pieces.pop() {
}
Calculates the actual length of the current piece and determines the number of blocks within that piece. This accounts for potentially variable piece sizes, especially for the last piece of the file.
let plength = piece.length();
let npiece = piece.index();
let piece_length = plength.min(t.length() - plength * npiece);
let total_blocks = if piece_length % BLOCK_SIZE as usize == 0 {
piece_length / BLOCK_SIZE as usize
} else {
(piece_length / BLOCK_SIZE as usize) + 1
};
Filters and collects peers that have the piece, ensuring that only relevant peers are used for downloading the current piece.
let peers: Vec<_> = peers
.iter_mut()
.enumerate()
.filter_map(|(peer_i, peer)| piece.peers().contains(&peer_i).then_some(peer))
.collect();
Initializes channels for managing block download tasks. submit is used to dispatch block download requests, and finish is used to receive completed blocks.
let (submit, tasks) = kanal::bounded_async(total_blocks);
for block in 0..total_blocks {
submit
.send(block)
.await
.expect("bound holds all these limits");
}
let (finish, mut done) = tokio::sync::mpsc::channel(total_blocks);
Each peer is tasked asynchronously with downloading parts of the piece. FuturesUnordered allows for handling these tasks concurrently, not sequentially.
let mut participants = futures_util::stream::FuturesUnordered::new();
for peer in peers {
participants.push(peer.participate(
piece.index() as u32,
total_blocks as u32,
piece_length as u32,
submit.clone(),
tasks.clone(),
finish.clone(),
));
}
A loop that concurrently handles participants finishing and blocks being received. Blocks are inserted into all_blocks at the correct positions, and the loop breaks when all blocks are received or no more data is forthcoming.
let mut all_blocks: Vec<u8> = vec![0; piece_length];
let mut bytes_received = 0;
loop {
tokio::select! {
joined = participants.next(), if !participants.is_empty() => {
// if a participant ends early, it's either slow or failed.
match joined {
None => {},
Some(Ok(_)) => {},
Some(Err(_)) => {},
}
},
piece = done.recv() => {
// keep track of the bytes in message
if let Some(piece) = piece {
// let piece = Piece::ref_from_bytes(&piece.block()[..]).expect("always get all Piece response fields from peer");
all_blocks[piece.begin() as usize ..][..piece.block().len()].copy_from_slice(piece.block());
bytes_received += piece.block().len();
if bytes_received == piece_length {
break;
}
} else {
break;
}
},
}
}
After all blocks for a piece are received, their integrity is verified against the expected SHA1 hash provided in the torrent metadata.
let mut hasher = Sha1::new();
hasher.update(&all_blocks);
let hash: [u8; 20] = hasher.finalize().try_into().expect("");
assert_eq!(hash, piece.hash());
The fully assembled and verified piece is copied into the correct position in all_pieces, ensuring that each piece is stored in order based on its index.
all_pieces[piece.index() * t.info.plength..][..piece_length].copy_from_slice(&all_blocks);
Return downloded file.
Ok(Downloaded {
bytes: all_pieces,
files: match &t.info.keys {
Keys::SingleFile { length } => vec![File {
length: *length,
path: vec![t.info.name.clone()],
}],
Keys::MultiFile { files } => files.clone(),
},
})
It's a bit long so you might be get lost.
I published the whole code here.
Conclusion
Today, we've explored the depths of creating a BitTorrent client in Rust, tackling the complexities of peer-to-peer file downloads. By breaking down the intricate details of managing asynchronous downloads, selecting peers, and handling data blocks, we've demonstrated not only Rust's capability to handle complex network operations but also its prowess in maintaining robust and efficient code execution.
Thank you for reading whole Understanding BitTorrent series. I hope you enjoyed it.
Discussion