aboutsummaryrefslogtreecommitdiff
path: root/src/bin.rs
diff options
context:
space:
mode:
authorMariot Tsitoara <[email protected]>2021-01-06 17:32:23 +0100
committerMariot Tsitoara <[email protected]>2021-01-06 17:32:23 +0100
commit0a42f67b3ac10ac27ec7ee1bcf62bc8628c232e0 (patch)
treef83a0f39469cf890be8d42582d5421cded762a43 /src/bin.rs
parentLimit runtime for reloading (diff)
downloadchan-downloader-0a42f67b3ac10ac27ec7ee1bcf62bc8628c232e0.tar.xz
chan-downloader-0a42f67b3ac10ac27ec7ee1bcf62bc8628c232e0.zip
Use concurrent downloadsv0.2.0
Diffstat (limited to 'src/bin.rs')
-rw-r--r--src/bin.rs102
1 files changed, 66 insertions, 36 deletions
diff --git a/src/bin.rs b/src/bin.rs
index e980e47..c77faf2 100644
--- a/src/bin.rs
+++ b/src/bin.rs
@@ -8,13 +8,20 @@ use std::fs::create_dir_all;
use std::path::PathBuf;
use std::time::{Duration, Instant};
use std::thread;
+use std::sync::Mutex;
+use futures::stream::StreamExt;
use clap::App;
use indicatif::{ProgressBar, ProgressStyle};
-use reqwest::blocking::Client;
+use lazy_static::lazy_static;
+use reqwest::{Client, Error};
use chan_downloader::{get_image_links, get_page_content, get_thread_infos, save_image};
+lazy_static! {
+ static ref DOWNLOADED_FILES: Mutex<Vec<String>> = Mutex::new(Vec::new());
+}
+
fn main() {
env_logger::init();
let yaml = load_yaml!("cli.yml");
@@ -25,36 +32,43 @@ fn main() {
let reload: bool = matches.is_present("reload");
let interval: u64 = matches.value_of("interval").unwrap_or("5").parse().unwrap();
let limit: u64 = matches.value_of("limit").unwrap_or("120").parse().unwrap();
+ let concurrent: usize = matches.value_of("concurrent").unwrap_or("2").parse().unwrap();
info!("Downloading images from {} to {}", thread, output);
let directory = create_directory(thread, &output);
- let mut downloaded_files: Vec<String> = Vec::new();
-
let start = Instant::now();
let wait_time = Duration::from_secs(60 * interval);
let limit_time = if reload { Duration::from_secs(60 * limit) } else { Duration::from_secs(0) };
loop {
let load_start = Instant::now();
- explore_thread(thread, &directory, &mut downloaded_files);
+ explore_thread(thread, &directory, concurrent).unwrap();
let runtime = start.elapsed();
let load_runtime = load_start.elapsed();
if runtime > limit_time {
- info!( "Runtime exceeded, exiting.");
+ info!("Runtime exceeded, exiting.");
break;
};
if let Some(remaining) = wait_time.checked_sub(load_runtime) {
- info!( "Schedule slice has time left over; sleeping for {:?}", remaining);
+ info!("Schedule slice has time left over; sleeping for {:?}", remaining);
thread::sleep(remaining);
}
info!("Downloader executed one more time for {:?}", load_runtime);
}
}
-fn explore_thread(thread_link: &str, directory: &PathBuf, downloaded_files: &mut Vec<String>) {
- let client = Client::builder().user_agent("reqwest").build().unwrap();
- let page_string = match get_page_content(thread_link, &client) {
+fn mark_as_downloaded(file: &str) -> Result<&str, &str> {
+ let mut db = DOWNLOADED_FILES.lock().map_err(|_| "Failed to acquire MutexGuard")?;
+ db.push(file.to_string());
+ Ok(file)
+}
+
+#[tokio::main]
+async fn explore_thread(thread_link: &str, directory: &PathBuf, concurrent: usize) -> Result<(), Error> {
+ let start = Instant::now();
+ let client = Client::builder().user_agent("reqwest").build()?;
+ let page_string = match get_page_content(thread_link, &client).await {
Ok(page_string) => {
info!("Loaded content from {}", thread_link);
page_string
@@ -65,42 +79,58 @@ fn explore_thread(thread_link: &str, directory: &PathBuf, downloaded_files: &mut
String::from("")
},
};
- let (links_iter, number_of_links) = get_image_links(page_string.as_str());
- let pb = ProgressBar::new(number_of_links as u64);
+ let links_vec = get_image_links(page_string.as_str());
+ let pb = ProgressBar::new(links_vec.len() as u64);
pb.set_style(ProgressStyle::default_bar()
.template("{spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len} {msg} ({eta})")
.progress_chars("#>-"));
pb.tick();
- for cap in links_iter.step_by(2) {
- let img_path = directory.join(&cap[2]);
- let image_path = img_path.to_str().unwrap();
- if downloaded_files.contains(&String::from(image_path)) {
- info!("Image {} previously downloaded. Skipped", img_path.display());
- } else if !img_path.exists() {
- match save_image(
- format!("https:{}", &cap[1]).as_str(),
- image_path,
- &client,
- ) {
- Ok(path) => {
- info!("Saved image to {}", &path);
- downloaded_files.push(path);
+ let fetches = futures::stream::iter(
+ links_vec.into_iter().map(|link| {
+ let client = &client;
+ let pb = &pb;
+ async move {
+ let img_path = directory.join(link.name);
+ let image_path = img_path.to_str().unwrap();
+ let has_been_downloaded = async {
+ let db = DOWNLOADED_FILES.lock().map_err(|_| String::from("Failed to acquire MutexGuard")).unwrap();
+ db.contains(&String::from(image_path))
+ }.await;
+
+ if has_been_downloaded {
+ info!("Image {} previously downloaded. Skipped", img_path.display());
+ } else if !img_path.exists() {
+ match save_image(
+ format!("https:{}", link.url).as_str(),
+ image_path,
+ &client,
+ ).await {
+ Ok(path) => {
+ info!("Saved image to {}", &path);
+ let result = mark_as_downloaded(&path).unwrap();
+ info!("{} added to downloaded files", result);
+ }
+ Err(err) => {
+ error!("Couldn't save image {}", image_path);
+ eprintln!("Error: {}", err);
+ },
+ }
+ } else {
+ info!("Image {} already exists. Skipped", img_path.display());
+ let result = mark_as_downloaded(image_path).unwrap();
+ info!("{} added to downloaded files", result);
}
- Err(err) => {
- error!("Couldn't save image {}", image_path);
- eprintln!("Error: {}", err);
- },
+ pb.inc(1);
}
- } else {
- downloaded_files.push(String::from(image_path));
- info!("Image {} already exists. Skipped", img_path.display());
- }
- pb.set_message(&cap[2].to_string());
- pb.inc(1);
- }
+ })
+ ).buffer_unordered(concurrent).collect::<Vec<()>>();
+ fetches.await;
+
pb.finish_with_message("Done");
+ info!("Done in {:?}", start.elapsed());
+ Ok(())
}
fn create_directory(thread_link: &str, output: &str) -> PathBuf {