From 448933ae67fa61df45299414bc9446e2a3618eab Mon Sep 17 00:00:00 2001 From: hak8or Date: Sun, 6 Jul 2025 23:38:27 -0400 Subject: [PATCH] Added stats, parallel parsing of pages, and filtered fetch of listings --- Cargo.lock | 52 +++++ Cargo.toml | 1 + src/db.rs | 175 +++++++++++++++-- src/main.rs | 349 ++++++++++++++++----------------- src/parser.rs | 139 +++++++++++++ src/parser_ebay.rs | 7 +- systemd/scraper_webapi.service | 11 ++ 7 files changed, 528 insertions(+), 206 deletions(-) create mode 100644 systemd/scraper_webapi.service diff --git a/Cargo.lock b/Cargo.lock index 13c05d4..23cd164 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -515,6 +515,31 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crossbeam-deque" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9dd111b7b7f7d55b72c0a6ae361660ee5853c9af73f70c3c2ef6858b950e2e51" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.9.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" +dependencies = [ + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-utils" +version = "0.8.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" + [[package]] name = "crypto-common" version = "0.1.6" @@ -655,6 +680,7 @@ dependencies = [ "clap", "dirs", "lazy_static", + "rayon", "regex", "rusqlite", "scraper", @@ -672,6 +698,12 @@ version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b2972feb8dffe7bc8c5463b1dacda1b0dfbed3710e50f977d965429692d74cd8" +[[package]] +name = "either" +version = "1.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" + [[package]] name = "encode_unicode" version = "1.0.0" @@ -1523,6 +1555,26 @@ dependencies = [ "getrandom 0.3.3", ] +[[package]] +name = "rayon" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b418a60154510ca1a002a752ca9714984e21e4241e804d32555251faf8b78ffa" +dependencies = [ + "either", + "rayon-core", +] + +[[package]] +name = "rayon-core" +version = "1.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1465873a3dfdaa8ae7cb14b4383657caab0b3e8a0aa9ae8e04b044854c8dfce2" +dependencies = [ + "crossbeam-deque", + "crossbeam-utils", +] + [[package]] name = "redox_syscall" version = "0.5.13" diff --git a/Cargo.toml b/Cargo.toml index 22c2340..40d86bc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,6 +9,7 @@ chrono = { version = "0.4.41", features = ["serde"] } clap = { version = "4.5.40", features = ["derive"] } dirs = "6.0.0" lazy_static = "1.5.0" +rayon = "1.10.0" regex = "1.11.1" rusqlite = { version = "0.36.0", features = ["bundled", "chrono"] } scraper = "0.23.1" diff --git a/src/db.rs b/src/db.rs index 8d035c2..f2ddfbe 100644 --- a/src/db.rs +++ b/src/db.rs @@ -1,8 +1,9 @@ use chrono::{DateTime, Utc}; use rusqlite::Connection; +use serde::Deserialize; use serde::Serialize; use std::path::Path; -use tracing::info; +use tracing::{error, info}; pub trait DBTable { const TABLE_NAME: &'static str; @@ -20,6 +21,16 @@ pub trait DBTable { conn.execute(create_table, ()).unwrap(); } + fn get_count(conn: &Connection) -> i64 { + let mut stmt = conn + .prepare(&format!("SELECT COUNT(*) FROM {}", Self::TABLE_NAME)) + .ok() + .unwrap(); + stmt.query_one([], |r| r.get(0)) + .inspect_err(|e| error!("Failed to get count due to error\"{:?}\", returning 0", e)) + .unwrap_or(0) + } + fn get_all(conn: &Connection) -> rusqlite::Result> where Self: Sized; @@ -95,6 +106,38 @@ impl SearchURL { .flatten() .collect() } + + pub fn scan(conn: &Connection, downloads_dir: &Path, filename: &str) { + // Grab all directories. + let dirs = std::fs::read_dir(downloads_dir) + .unwrap() + .filter_map(|e| Some(e.ok()?.path())) + .filter(|e| e.is_dir()); + + #[derive(Deserialize)] + struct URLJSON { + url: String, + } + + // Grab url JSON's. + for dir in dirs { + let url_fpath = dir.join(filename); + if !url_fpath.exists() { + info!("Skipping {:?} as file does not exist", url_fpath); + continue; + } + + let url_contents = std::fs::read_to_string(&url_fpath) + .inspect_err(|e| error!("Failed reading {}: {e}", url_fpath.display())) + .unwrap(); + let su = SearchURL { + full_url: serde_json::from_str::(&url_contents).unwrap().url, + name: dir.file_name().unwrap().to_str().unwrap().to_owned(), + }; + info!("Adding {:?} to search urls table", su); + su.add_or_update(&conn); + } + } } #[derive(Serialize, Debug, PartialEq, Clone)] @@ -213,7 +256,10 @@ impl DBTable for ParsedStorage { impl ParsedStorage { pub fn lookup(conn: &Connection, item: i64) -> Vec { let mut stmt = conn - .prepare(&format!("SELECT * FROM {} WHERE id = ?", Self::TABLE_NAME)) + .prepare(&format!( + "SELECT * FROM {} WHERE item = ?", + Self::TABLE_NAME + )) .ok() .unwrap(); stmt.query_map([item], |row| { @@ -414,24 +460,25 @@ impl Listing { let mut stmt = conn .prepare(&format!( " - SELECT * - FROM {0} - WHERE EXISTS ( - SELECT 1 - FROM {1} - WHERE - {1}.item = {0}.item_id AND - {1}.timestamp >= ?1 - ) - LIMIT ?2 - ", + SELECT * + FROM {0} + WHERE EXISTS ( + SELECT 1 + FROM {1} + WHERE + {1}.item = {0}.item_id AND + {1}.timestamp >= ?1 + ) + LIMIT {2} + ", Self::TABLE_NAME, - ItemAppearances::TABLE_NAME + ItemAppearances::TABLE_NAME, + limit )) .ok() .unwrap(); - stmt.query_map([since.timestamp(), limit], |row| { + stmt.query_map([since], |row| { Ok(Listing { id: row.get(0)?, item_id: row.get(1)?, @@ -496,6 +543,77 @@ impl Listing { } } +#[derive(Serialize, Debug)] +pub struct ListingsFilterResult { + listing: Listing, + history: Vec, + parsed: Vec, +} + +pub fn listings_get_filtered( + conn: &Connection, + since: &DateTime, + limit: i64, + cents_per_tbytes_max: i64, +) -> Vec { + // First grab all appearances seen since the timestamp including their + // history and parsings. + let listings_recent = Listing::lookup_since(conn, *since, 100_000) + .into_iter() + .map(|l| ListingsFilterResult { + listing: l.clone(), + history: ItemAppearances::lookup(conn, l.item_id), + parsed: ParsedStorage::lookup(conn, l.item_id), + }) + .filter(|lr| lr.parsed.iter().any(|p| !p.needed_description_check)) + .collect::>(); + info!( + "Found total {} listings since (str:{} epoch:{})", + listings_recent.len(), + *since, + since.timestamp() + ); + + // Then for each listing grab if within our price range. + let listings: Vec = listings_recent + .into_iter() + .filter_map(|l| { + let mut history = l.history.clone(); + history.sort_by_key(|h| h.timestamp); + // info!("item_id:{} history: {:?}", l.listing.item_id, history); + let cents = history + .last() + .map(|h| h.current_bid_usd_cents) + .unwrap_or(l.listing.buy_it_now_price_cents)?; + // info!("item_id:{} cents: {:?}", l.listing.item_id, cents); + let mut parses = l.parsed.clone(); + parses.sort_by_key(|p| p.parse_engine); + // info!("item_id:{} parses: {:?}", l.listing.item_id, parses); + let gb = parses.last()?.total_gigabytes; + // info!("item_id:{} gb: {:?}", l.listing.item_id, gb); + let usd_per_tb = (cents as f64 / 100.0) / (gb as f64 / 1024.0); + // info!( + // "item_id: {}, gb:{}, cents:{}, usd_per_tb:{}, cents_per_tbytes_max:{}", + // l.listing.item_id, gb, cents, usd_per_tb, cents_per_tbytes_max + // ); + if usd_per_tb >= (cents_per_tbytes_max as f64 / 100.0) { + None + } else { + Some(l) + } + }) + .take(limit as usize) + .collect(); + info!( + "Found total {} listings since (str:{} epoch:{}) filtered by price", + listings.len(), + *since, + since.timestamp() + ); + + listings +} + pub fn get_initialized(path: Option<&Path>) -> Connection { let conn = match path { Some(p) => Connection::open(&p), @@ -512,11 +630,30 @@ pub fn get_initialized(path: Option<&Path>) -> Connection { conn } +#[derive(Serialize, Debug)] +pub struct Stats { + rows_search_url: i64, + rows_listing: i64, + rows_parsed_storage: i64, + rows_parsed_page: i64, + rows_item_appearances: i64, +} + +pub fn get_stats(conn: &Connection) -> Stats { + Stats { + rows_search_url: SearchURL::get_count(conn), + rows_listing: Listing::get_count(conn), + rows_parsed_storage: ParsedStorage::get_count(conn), + rows_parsed_page: ParsedPage::get_count(conn), + rows_item_appearances: ItemAppearances::get_count(conn), + } +} + #[cfg(test)] mod tests { use super::*; - #[test] + #[test_log::test] fn sanity_check() { let db = get_initialized(None); @@ -548,7 +685,7 @@ mod tests { needed_description_check: true, }; parsed.add_or_update(&db); - assert_eq!(ParsedStorage::lookup(&db, listing.id), vec![parsed]); + assert_eq!(ParsedStorage::lookup(&db, listing.item_id), vec![parsed]); let page = ParsedPage { category: "ssd".to_owned(), @@ -570,5 +707,9 @@ mod tests { ); assert_eq!(Listing::lookup_since(&db, page.timestamp, 3), vec![listing]); + assert_eq!( + Listing::lookup_since(&db, page.timestamp + chrono::Duration::seconds(1), 3), + vec![] + ); } } diff --git a/src/main.rs b/src/main.rs index 156bed3..8dd7a65 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,15 +1,17 @@ use actix_web::{App, HttpServer, Responder, Result, get, post, web, web::Data}; -use chrono::DateTime; +use chrono::{DateTime, Utc}; use clap::Parser; use ebay_scraper_rust::db::{ DBTable, ItemAppearances, Listing, ParsedPage, ParsedStorage, SearchURL, get_initialized, + get_stats, listings_get_filtered, }; -use ebay_scraper_rust::{parser_ebay, parser_storage}; +use ebay_scraper_rust::parser::parse_dir; +use ebay_scraper_rust::parser_storage; use serde::{Deserialize, Serialize}; -use std::path::{Path, PathBuf}; +use std::path::PathBuf; use std::sync::Mutex; use std::time::Instant; -use tracing::{error, info, instrument}; +use tracing::{info, instrument}; use tracing_subscriber::filter::EnvFilter; use tracing_subscriber::fmt; @@ -26,28 +28,32 @@ mod xdg_dirs; )] struct Args {} -#[get("/page/{timestamp}")] -async fn page_get( - db: Data>, - timestamp: web::Path, -) -> Result { - Ok(web::Json(ParsedPage::lookup( - &db.lock().unwrap(), - chrono::DateTime::from_timestamp(*timestamp, 0).unwrap(), - ))) +#[derive(Deserialize, Debug)] +struct ListingsFilter { + since: Option, + limit: Option, + cents_per_tbytes_max: Option, } -#[get("/listing/{id}/history")] -async fn listing_history_get( +#[get("/listings")] +async fn listings_filtered_get( db: Data>, - id: web::Path, + filter: web::Query, ) -> Result { - let history: Vec<_> = ItemAppearances::lookup(&db.lock().unwrap(), *id) - .iter() - .inspect(|e| info!("got: {:?}", e)) - .filter_map(|e| Some((e.timestamp, e.current_bid_usd_cents?))) - .collect(); - Ok(web::Json(history)) + let start = Instant::now(); + let res = listings_get_filtered( + &db.lock().unwrap(), + &DateTime::::from_timestamp(filter.since.unwrap_or(0), 0).unwrap(), + filter.limit.unwrap_or(1_000), + filter.cents_per_tbytes_max.unwrap_or(100_00), + ); + let elapsed = start.elapsed().as_micros() as f64 / 1000.0; + info!( + "Took {elapsed} milliseconds with {} listings found for a filter of {:?}", + res.len(), + filter + ); + Ok(web::Json(res)) } #[get("/listing/{id}")] @@ -70,7 +76,39 @@ async fn listing_since_get( ))) } -#[post("listing/parse")] +#[get("/listing/{id}/parsed")] +async fn listing_parse_get( + db: Data>, + id: web::Path, +) -> Result { + Ok(web::Json(ParsedStorage::lookup(&db.lock().unwrap(), *id))) +} + +#[derive(Serialize)] +struct APIHistory { + when: DateTime, + current_bid_usd_cents: i64, +} + +#[get("/listing/{id}/history")] +async fn listing_history_get( + db: Data>, + id: web::Path, +) -> Result { + let history: Vec<_> = ItemAppearances::lookup(&db.lock().unwrap(), *id) + .iter() + // .inspect(|e| info!("got: {:?}", e)) + .filter_map(|e| { + Some(APIHistory { + when: e.timestamp, + current_bid_usd_cents: e.current_bid_usd_cents?, + }) + }) + .collect(); + Ok(web::Json(history)) +} + +#[post("/listing/parse")] async fn parse_listings(db: Data>) -> Result { let mut cnt = 0; let db_unlocked = db.lock().unwrap(); @@ -83,130 +121,37 @@ async fn parse_listings(db: Data>) -> Result>, - id: web::Path, -) -> Result { - Ok(web::Json(ParsedStorage::lookup(&db.lock().unwrap(), *id))) +#[get("/category")] +async fn category_getnames(db: Data>) -> Result { + Ok(web::Json(SearchURL::names(&db.lock().unwrap()))) } -pub fn timestamps_from_dir(path: &Path) -> Vec { - if !std::fs::exists(path).expect("Directory must exist") { - panic!( - "Directory {:?} does not exist, cannot grab timestamps from there.", - path - ); - } - - std::fs::read_dir(path) - .unwrap() - .map(|fpath| fpath.unwrap().path()) - .filter_map(|fstem| { - fstem - .file_stem() - .and_then(|s| s.to_str()) - .expect("Invalid file name") - .parse() - .ok() - }) - .collect() -} - -#[post("page/parse/{category}")] +#[post("/category/{category}/parse")] #[instrument(skip_all)] -async fn parse_post( +async fn category_parse( db: Data>, downloaddir: Data, category: web::Path, ) -> Result { - let dir = &downloaddir.join(category.clone()); + let start = Instant::now(); + let count = parse_dir( + &downloaddir.join(category.clone()), + &category, + &db.lock().unwrap(), + ) + .unwrap(); + let elapsed = start.elapsed().as_micros() as f64 / 1000.0; - // Ensure the category is created. - let url_fpath = dir.join("url.json"); - let url_contents = std::fs::read_to_string(&url_fpath) - .inspect_err(|e| error!("Failed reading {}: {e}", url_fpath.display()))?; - #[derive(Deserialize)] - struct URLJSON { - url: String, - } - let su = SearchURL { - full_url: serde_json::from_str::(&url_contents).unwrap().url, - name: category.to_string(), - }; - su.add_or_update(&db.lock().unwrap()); - - // Find all pages. - let pages = timestamps_from_dir(dir); - - // See what pages haven't been seen before. - let to_parse = pages.iter().filter(|t| { - let ts = chrono::DateTime::from_timestamp(**t, 0).unwrap(); - let p = ParsedPage::lookup(&db.lock().unwrap(), ts); - - // Timestamp never seen before, lets pass it on. - if p.is_none() { - info!( - "Page of timestamp:{} and catagory:{category} never seen before, processing ...", - ts.timestamp() - ); - return true; - } - - // Timestamp was seen before *and* from the same catagory, don't pass - // it on. - if p.unwrap().category == *category { - info!( - "Page of timestamp:{} and catagory:{category} seen before, skipping ...", - ts.timestamp() - ); - return false; - } - - info!( - "Page of timestamp:{} seen before, but not of catagory:{category}, processing ...", - ts.timestamp() - ); - return true; - }); - - let mut added_count = 0; - for p in to_parse { - let ts = chrono::DateTime::from_timestamp(*p, 0).unwrap(); - ParsedPage { - timestamp: ts, - category: category.to_string(), - } - .add_or_update(&db.lock().unwrap()); - - let page_path = dir.join(format!("{}.html", ts.timestamp())); - let page_contents = std::fs::read_to_string(&page_path) - .inspect_err(|e| error!("Failed reading {}, error:{e}", page_path.display()))?; - let elements = parser_ebay::extract_data_from_html(&page_contents, &ts, &category).unwrap(); - info!( - "Page {} contains {} elements", - ts.timestamp(), - elements.len() - ); - - added_count += elements.len(); - for e in elements { - e.0.add_or_update(&db.lock().unwrap()); - e.1.add_or_update(&db.lock().unwrap()); - info!( - "From page {}, inserting id:{}, title:{}", - ts.timestamp(), - e.0.item_id, - e.0.title - ); - } - } - - info!("Added {added_count} listings"); - Ok(added_count.to_string()) + info!("Added {count} listings, took {elapsed} ms."); + Ok(count.to_string()) } -#[get("admin")] +#[get("/stats")] +async fn stats_get(db: Data>) -> Result { + Ok(web::Json(get_stats(&db.lock().unwrap()))) +} + +#[get("/admin")] async fn admin_get(db: Data>) -> Result { let db = db.lock().unwrap(); let query_start_time = Instant::now(); @@ -240,6 +185,12 @@ async fn admin_get(db: Data>) -> Result>) -> Result>) -> Result "#, ); + Ok(web::Html::new(&html)) } fn generate_table(title: &str, data: &[T]) -> String { - let mut table_html = format!( - "

{} ({} rows)

", - title, - data.len() - ); + use serde_json::Value; - if data.len() > 0 { - for header in serde_json::to_value(&data[0]) - .unwrap() - .as_object() - .unwrap() - .keys() - { - table_html.push_str(&format!("", header)); - } - table_html.push_str(""); - - for item in data { - table_html.push_str(""); - let item_json = serde_json::to_value(item).unwrap(); - if let Some(obj) = item_json.as_object() { - for (_key, value) in obj.iter() { - table_html - .push_str(&format!("", value.to_string().replace("\"", ""))); - } - } - table_html.push_str(""); - } + if data.is_empty() { + return format!( + "

{} (0 rows)

{}
{}
", + title + ); } - table_html.push_str(""); - table_html + let mut headers: Vec = serde_json::to_value(&data[0]) + .unwrap_or(Value::Null) + .as_object() + .map_or(Vec::new(), |obj| obj.keys().cloned().collect()); + + // Define the desired order for specific columns. + let desired_order = ["id", "item", "item_id", "timestamp"]; + + // Sort the headers. Columns in `desired_order` come first, + // in that order. The rest are sorted alphabetically. + headers.sort_by(|a, b| { + let a_pos = desired_order + .iter() + .position(|&p| p == a) + .unwrap_or(usize::MAX); + let b_pos = desired_order + .iter() + .position(|&p| p == b) + .unwrap_or(usize::MAX); + a_pos.cmp(&b_pos).then_with(|| a.cmp(b)) + }); + + // Create the HTML for the table header row. + let header_html = headers + .iter() + .map(|header| format!("{}", header)) + .collect::(); + + // Create the HTML for all the table body rows. + let body_html = data + .iter() + .map(|item| { + let item_json = serde_json::to_value(item).unwrap_or(Value::Null); + let obj = item_json.as_object(); + + // Create all cells for a single row. + let cells_html = headers + .iter() + .map(|header| { + let value = obj.and_then(|o| o.get(header)).unwrap_or(&Value::Null); + // Remove quotes from the resulting JSON string value for cleaner output. + format!("{}", value.to_string().replace('"', "")) + }) + .collect::(); + + format!("{}", cells_html) + }) + .collect::(); + + // Assemble the final table HTML. + format!( + "

{} ({} rows)

{}{}
", + title, + data.len(), + header_html, + body_html + ) } #[actix_web::main] @@ -332,15 +301,25 @@ async fn main() -> std::io::Result<()> { ); let db_mutex = Data::new(Mutex::new(get_initialized(None))); + // Prepare our backend via pulling in what catagories we are preconfigured with. + SearchURL::scan(&db_mutex.lock().unwrap(), &scrapedatadir, "url.json"); + HttpServer::new(move || { App::new() - .service(page_get) + // .service(page_get) + // Listing handlers .service(listing_get) + .service(listings_filtered_get) .service(listing_history_get) .service(listing_since_get) - .service(parse_post) + // Category handlers .service(parse_listings) + .service(category_parse) + .service(category_getnames) + // Gnarly info dump .service(admin_get) + .service(stats_get) + // Stuff which is passed into every request. .app_data(db_mutex.clone()) .app_data(Data::new(scrapedatadir.clone())) }) diff --git a/src/parser.rs b/src/parser.rs index 8b13789..6a73cf2 100644 --- a/src/parser.rs +++ b/src/parser.rs @@ -1 +1,140 @@ +use crate::{ + db::{ParsedPage, SearchURL}, + parser_ebay, +}; +use rayon::prelude::*; +use serde::Deserialize; +use serde_json; +use std::path::Path; +use std::time::Instant; +use tracing::{debug, error, info}; +fn timestamps_from_dir(path: &Path) -> Vec { + if !std::fs::exists(path).expect("Directory must exist") { + panic!( + "Directory {:?} does not exist, cannot grab timestamps from there.", + path + ); + } + + std::fs::read_dir(path) + .unwrap() + .map(|fpath| fpath.unwrap().path()) + .filter_map(|fstem| { + fstem + .file_stem() + .and_then(|s| s.to_str()) + .expect("Invalid file name") + .parse() + .ok() + }) + .collect() +} + +pub fn parse_dir(dir: &Path, category: &str, db: &rusqlite::Connection) -> Option { + // Ensure the category is created. + let url_fpath = dir.join("url.json"); + let url_contents = std::fs::read_to_string(&url_fpath) + .inspect_err(|e| error!("Failed reading {}: {e}", url_fpath.display())) + .ok()?; + + #[derive(Deserialize)] + struct URLJSON { + url: String, + } + let su = SearchURL { + full_url: serde_json::from_str::(&url_contents).unwrap().url, + name: category.to_string(), + }; + su.add_or_update(&db); + + // See all pages haven't been seen before. + let query_start_time = Instant::now(); + let to_parse = timestamps_from_dir(dir).into_iter().filter(|t| { + let ts = chrono::DateTime::from_timestamp(*t, 0).unwrap(); + let p = ParsedPage::lookup(&db, ts); + + // Timestamp never seen before, lets pass it on. + if p.is_none() { + info!( + "Page Timestamp:{} Catagory:{category} never seen before, processing ...", + ts.timestamp() + ); + return true; + } + + // Timestamp was seen before *and* from the same catagory, don't pass + // it on. + if p.unwrap().category == *category { + info!( + "Page Timestamp:{} Catagory:{category} seen before, skipping ...", + ts.timestamp() + ); + return false; + } + + info!( + "Page Timestamp:{} Catagory:{category} seen before, but not of catagory:{category}, processing ...", + ts.timestamp() + ); + return true; + }).collect::>(); + let total_query_time = query_start_time.elapsed().as_micros() as f64 / 1000.0; + info!("Time spent finding pages to parse:{total_query_time} ms"); + + // For each page, read the file and parse it. + let query_start_time = Instant::now(); + let to_add = to_parse + .par_iter() + .map(|p| { + let ts = chrono::DateTime::from_timestamp(*p, 0).unwrap(); + let paged_info = ParsedPage { + timestamp: ts, + category: category.to_string(), + }; + + let page_path = dir.join(format!("{}.html", ts.timestamp())); + let page_contents = std::fs::read_to_string(&page_path) + .inspect_err(|e| error!("Failed reading {}, error:{e}", page_path.display())) + .ok()?; + let elements = + parser_ebay::parse_from_ebay_page(&page_contents, &ts, &category).unwrap(); + info!( + "Page Timestamp:{} Catagory:{category}, found {} elements", + ts.timestamp(), + elements.len() + ); + + Some((paged_info, elements)) + }) + .collect::>(); + let total_query_time = query_start_time.elapsed().as_micros() as f64 / 1000.0; + info!("Time spent reading and parsing pages: {total_query_time} ms"); + + // And lastly add it to our database! + let query_start_time = Instant::now(); + let mut added_count = 0; + for iter in to_add { + if iter.is_none() { + continue; + } + let (paged_info, elements) = iter.unwrap(); + paged_info.add_or_update(&db); + + for e in elements { + added_count += 1; + e.0.add_or_update(&db); + e.1.add_or_update(&db); + debug!( + "Page Timestamp:{} Catagory:{category}, inserting id:{}, title:{}", + paged_info.timestamp.timestamp(), + e.0.item_id, + e.0.title + ); + } + } + let total_query_time = query_start_time.elapsed().as_micros() as f64 / 1000.0; + info!("Time spent adding parsed pages: {total_query_time} ms"); + + return Some(added_count); +} diff --git a/src/parser_ebay.rs b/src/parser_ebay.rs index 26b78b5..b586ddc 100644 --- a/src/parser_ebay.rs +++ b/src/parser_ebay.rs @@ -18,7 +18,6 @@ fn parse_price(price_text: &str) -> Option { if let Some(first_part) = lower_price_text.split(" to ").next() { if let Some(caps) = PRICE_REGEX.captures(first_part) { if let Some(price_match) = caps.get(1) { - info!("Price string:{:?} parsed!", price_match); return price_match.as_str().replace(',', "").parse().ok(); } } @@ -49,7 +48,7 @@ fn parse_price(price_text: &str) -> Option { } /// Extracts item data from HTML content. -pub fn extract_data_from_html( +pub fn parse_from_ebay_page( html_content: &str, timestamp: &chrono::DateTime, category: &str, @@ -98,7 +97,7 @@ pub fn extract_data_from_html( continue; } if id.unwrap() == 123456 { - info!("Skipping {:?} due to bogus ID of 123456", element); + info!("Skipping element due to bogus ID of 123456"); continue; } @@ -178,7 +177,7 @@ mod tests { fn parse() { let timestamp = chrono::DateTime::from_timestamp(1750369463, 0).unwrap(); let html = include_str!("../test_data/scraper/raw_scraped/ssd/1750369463.html"); - let parsed = extract_data_from_html(html, ×tamp, "ssd").unwrap(); + let parsed = parse_from_ebay_page(html, ×tamp, "ssd").unwrap(); // assert_eq!(parsed.len(), 62); let parsed = parsed.first_chunk::<10>().unwrap(); diff --git a/systemd/scraper_webapi.service b/systemd/scraper_webapi.service new file mode 100644 index 0000000..1194546 --- /dev/null +++ b/systemd/scraper_webapi.service @@ -0,0 +1,11 @@ +[Unit] +Description=Run a single instance of a the web api +After=syslog.target network.target + +[Service] +Type=exec +#Environment=XDG_DATA_HOME=/home/hak8or/code/ebay_scraper_rust/.tmp_run +ExecStart=/usr/local/bin/ebay_scraper_rust + +[Install] +WantedBy=multi-user.target