diff options
| author | Mariot Tsitoara <[email protected]> | 2021-01-06 17:32:23 +0100 |
|---|---|---|
| committer | Mariot Tsitoara <[email protected]> | 2021-01-06 17:32:23 +0100 |
| commit | 0a42f67b3ac10ac27ec7ee1bcf62bc8628c232e0 (patch) | |
| tree | f83a0f39469cf890be8d42582d5421cded762a43 /src/bin.rs | |
| parent | Limit runtime for reloading (diff) | |
| download | chan-downloader-0a42f67b3ac10ac27ec7ee1bcf62bc8628c232e0.tar.xz chan-downloader-0a42f67b3ac10ac27ec7ee1bcf62bc8628c232e0.zip | |
Use concurrent downloadsv0.2.0
Diffstat (limited to 'src/bin.rs')
| -rw-r--r-- | src/bin.rs | 102 |
1 files changed, 66 insertions, 36 deletions
@@ -8,13 +8,20 @@ use std::fs::create_dir_all; use std::path::PathBuf; use std::time::{Duration, Instant}; use std::thread; +use std::sync::Mutex; +use futures::stream::StreamExt; use clap::App; use indicatif::{ProgressBar, ProgressStyle}; -use reqwest::blocking::Client; +use lazy_static::lazy_static; +use reqwest::{Client, Error}; use chan_downloader::{get_image_links, get_page_content, get_thread_infos, save_image}; +lazy_static! { + static ref DOWNLOADED_FILES: Mutex<Vec<String>> = Mutex::new(Vec::new()); +} + fn main() { env_logger::init(); let yaml = load_yaml!("cli.yml"); @@ -25,36 +32,43 @@ fn main() { let reload: bool = matches.is_present("reload"); let interval: u64 = matches.value_of("interval").unwrap_or("5").parse().unwrap(); let limit: u64 = matches.value_of("limit").unwrap_or("120").parse().unwrap(); + let concurrent: usize = matches.value_of("concurrent").unwrap_or("2").parse().unwrap(); info!("Downloading images from {} to {}", thread, output); let directory = create_directory(thread, &output); - let mut downloaded_files: Vec<String> = Vec::new(); - let start = Instant::now(); let wait_time = Duration::from_secs(60 * interval); let limit_time = if reload { Duration::from_secs(60 * limit) } else { Duration::from_secs(0) }; loop { let load_start = Instant::now(); - explore_thread(thread, &directory, &mut downloaded_files); + explore_thread(thread, &directory, concurrent).unwrap(); let runtime = start.elapsed(); let load_runtime = load_start.elapsed(); if runtime > limit_time { - info!( "Runtime exceeded, exiting."); + info!("Runtime exceeded, exiting."); break; }; if let Some(remaining) = wait_time.checked_sub(load_runtime) { - info!( "Schedule slice has time left over; sleeping for {:?}", remaining); + info!("Schedule slice has time left over; sleeping for {:?}", remaining); thread::sleep(remaining); } info!("Downloader executed one more time for {:?}", load_runtime); } } -fn explore_thread(thread_link: &str, directory: &PathBuf, downloaded_files: &mut Vec<String>) { - let client = Client::builder().user_agent("reqwest").build().unwrap(); - let page_string = match get_page_content(thread_link, &client) { +fn mark_as_downloaded(file: &str) -> Result<&str, &str> { + let mut db = DOWNLOADED_FILES.lock().map_err(|_| "Failed to acquire MutexGuard")?; + db.push(file.to_string()); + Ok(file) +} + +#[tokio::main] +async fn explore_thread(thread_link: &str, directory: &PathBuf, concurrent: usize) -> Result<(), Error> { + let start = Instant::now(); + let client = Client::builder().user_agent("reqwest").build()?; + let page_string = match get_page_content(thread_link, &client).await { Ok(page_string) => { info!("Loaded content from {}", thread_link); page_string @@ -65,42 +79,58 @@ fn explore_thread(thread_link: &str, directory: &PathBuf, downloaded_files: &mut String::from("") }, }; - let (links_iter, number_of_links) = get_image_links(page_string.as_str()); - let pb = ProgressBar::new(number_of_links as u64); + let links_vec = get_image_links(page_string.as_str()); + let pb = ProgressBar::new(links_vec.len() as u64); pb.set_style(ProgressStyle::default_bar() .template("{spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len} {msg} ({eta})") .progress_chars("#>-")); pb.tick(); - for cap in links_iter.step_by(2) { - let img_path = directory.join(&cap[2]); - let image_path = img_path.to_str().unwrap(); - if downloaded_files.contains(&String::from(image_path)) { - info!("Image {} previously downloaded. Skipped", img_path.display()); - } else if !img_path.exists() { - match save_image( - format!("https:{}", &cap[1]).as_str(), - image_path, - &client, - ) { - Ok(path) => { - info!("Saved image to {}", &path); - downloaded_files.push(path); + let fetches = futures::stream::iter( + links_vec.into_iter().map(|link| { + let client = &client; + let pb = &pb; + async move { + let img_path = directory.join(link.name); + let image_path = img_path.to_str().unwrap(); + let has_been_downloaded = async { + let db = DOWNLOADED_FILES.lock().map_err(|_| String::from("Failed to acquire MutexGuard")).unwrap(); + db.contains(&String::from(image_path)) + }.await; + + if has_been_downloaded { + info!("Image {} previously downloaded. Skipped", img_path.display()); + } else if !img_path.exists() { + match save_image( + format!("https:{}", link.url).as_str(), + image_path, + &client, + ).await { + Ok(path) => { + info!("Saved image to {}", &path); + let result = mark_as_downloaded(&path).unwrap(); + info!("{} added to downloaded files", result); + } + Err(err) => { + error!("Couldn't save image {}", image_path); + eprintln!("Error: {}", err); + }, + } + } else { + info!("Image {} already exists. Skipped", img_path.display()); + let result = mark_as_downloaded(image_path).unwrap(); + info!("{} added to downloaded files", result); } - Err(err) => { - error!("Couldn't save image {}", image_path); - eprintln!("Error: {}", err); - }, + pb.inc(1); } - } else { - downloaded_files.push(String::from(image_path)); - info!("Image {} already exists. Skipped", img_path.display()); - } - pb.set_message(&cap[2].to_string()); - pb.inc(1); - } + }) + ).buffer_unordered(concurrent).collect::<Vec<()>>(); + fetches.await; + pb.finish_with_message("Done"); + info!("Done in {:?}", start.elapsed()); + Ok(()) } fn create_directory(thread_link: &str, output: &str) -> PathBuf { |