From 9331b55e0878fc40e44af39b08bf0f41ea99708f Mon Sep 17 00:00:00 2001 From: hak8or Date: Mon, 1 Sep 2025 10:26:37 -0400 Subject: [PATCH] Ehhh, lets rethink this ... (parallel status) --- Cargo.lock | 45 +++++++ Cargo.toml | 2 + readme.md | 354 +++++++++++++++++++++++++++++++++++++++++++++++++- src/db.rs | 128 ++++++++++++++++-- src/lib.rs | 1 + src/main.rs | 109 +++++++++++----- src/parser.rs | 93 +++++++------ 7 files changed, 638 insertions(+), 94 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 23cd164..a9e7a4e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -680,6 +680,7 @@ dependencies = [ "clap", "dirs", "lazy_static", + "liblzma", "rayon", "regex", "rusqlite", @@ -687,6 +688,7 @@ dependencies = [ "serde", "serde_json", "similar-asserts", + "strum", "test-log", "tracing", "tracing-subscriber", @@ -1161,6 +1163,26 @@ version = "0.2.174" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1171693293099992e19cddea4e8b849964e9846f4acee11b3948bcc337be8776" +[[package]] +name = "liblzma" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0791ab7e08ccc8e0ce893f6906eb2703ed8739d8e89b57c0714e71bad09024c8" +dependencies = [ + "liblzma-sys", +] + +[[package]] +name = "liblzma-sys" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "01b9596486f6d60c3bbe644c0e1be1aa6ccc472ad630fe8927b456973d7cb736" +dependencies = [ + "cc", + "libc", + "pkg-config", +] + [[package]] name = "libredox" version = "0.1.3" @@ -1891,6 +1913,29 @@ version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" +[[package]] +name = "strum" +version = "0.27.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f64def088c51c9510a8579e3c5d67c65349dcf755e5479ad3d010aa6454e2c32" +dependencies = [ + "phf", + "strum_macros", +] + +[[package]] +name = "strum_macros" +version = "0.27.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c77a8c5abcaf0f9ce05d62342b7d298c346515365c36b673df4ebe3ced01fde8" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "rustversion", + "syn", +] + [[package]] name = "syn" version = "2.0.103" diff --git a/Cargo.toml b/Cargo.toml index 40d86bc..c6ad35f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,12 +9,14 @@ chrono = { version = "0.4.41", features = ["serde"] } clap = { version = "4.5.40", features = ["derive"] } dirs = "6.0.0" lazy_static = "1.5.0" +liblzma = "0.4.2" rayon = "1.10.0" regex = "1.11.1" rusqlite = { version = "0.36.0", features = ["bundled", "chrono"] } scraper = "0.23.1" serde = { version = "1.0.219", features = ["derive"] } serde_json = "1.0.140" +strum = { version = "0.27.1", features = ["std", "derive", "phf", "strum_macros"] } test-log = { version = "0.2.17", features = ["trace"] } tracing = { version = "0.1.41", features = ["attributes"] } tracing-subscriber = { version = "0.3.19", features = ["env-filter"] } diff --git a/readme.md b/readme.md index 5018f14..bb32b81 100644 --- a/readme.md +++ b/readme.md @@ -3,10 +3,352 @@ This is a dumb little tool which ingests raw HTML files, does some parsing on them, and serves the results over a web API. ```bash -export URL_BASE="http://scraper.homelab.hak8or.com:8080"; \ -echo run0 && http POST "$URL_BASE/page/parse/ssd" && \ -echo run1 && http POST "$URL_BASE/listing/parse" && \ -echo run2 && http GET "$URL_BASE/listing/since/12345678/2" && \ -echo run3 && http GET "$URL_BASE/listing/388484391867" && \ -echo run4 && http GET "$URL_BASE/listing/286605201240/history" +export URL_BASE="localhost:9876"; \ +echo run0 && http POST "$URL_BASE/category/ssd/discover" && \ +echo run1 && http POST "$URL_BASE/category/ssd/parse" && \ +echo run2 && http GET "$URL_BASE/category/ssd/parse" && \ +echo run3 && http POST "$URL_BASE/listing/parse" && \ +echo run4 && http GET "$URL_BASE/listings" since:=10099 limit:=10 cents_per_tbytes_max:=900 && \ +echo run5 && http GET "$URL_BASE/listing/267267322597" && \ +echo run6 && http GET "$URL_BASE/listing/267267322597/history" && +echo run7 && http GET "$URL_BASE/listing/267267322597/parsed" +``` + +``` +run0 +HTTP/1.1 200 OK +content-length: 0 +content-type: text/plain; charset=utf-8 +date: Thu, 10 Jul 2025 04:26:49 GMT + + + + +run1 +HTTP/1.1 200 OK +content-length: 0 +content-type: text/plain; charset=utf-8 +date: Thu, 10 Jul 2025 04:26:49 GMT + + + + +run2 +HTTP/1.1 200 OK +content-length: 36 +content-type: application/json +date: Thu, 10 Jul 2025 04:26:49 GMT + +[ + [ + "PendingParse", + 1, + 1 + ], + [ + "Ready", + 0, + 1 + ] +] + + +run3 +HTTP/1.1 200 OK +content-length: 2 +content-type: application/json +date: Thu, 10 Jul 2025 04:26:49 GMT + +62 + + +run4 +HTTP/1.1 200 OK +content-length: 4232 +content-type: application/json +date: Thu, 10 Jul 2025 04:26:49 GMT + +[ + { + "history": [ + { + "category": "ssd", + "current_bid_usd_cents": 1260, + "item": 286605201240, + "timestamp": "2025-06-19T21:44:23Z" + } + ], + "listing": { + "buy_it_now_price_cents": null, + "has_best_offer": true, + "id": 5, + "image_url": "https://i.ebayimg.com/images/g/3NoAAeSwPrtoDb1O/s-l500.webp", + "item_id": 286605201240, + "title": "Fanxiang M.2 SSD 1TB NVMe PCIe Gen 3x 4 M2 Internal Solid State Drive 3500MB/s" + }, + "parsed": [ + { + "id": 5, + "individual_size_gigabytes": 1024, + "item": 286605201240, + "needed_description_check": false, + "parse_engine": 0, + "quantity": 1, + "total_gigabytes": 1024 + } + ] + }, + { + "history": [ + { + "category": "ssd", + "current_bid_usd_cents": 2400, + "item": 177133381123, + "timestamp": "2025-06-19T21:44:23Z" + } + ], + "listing": { + "buy_it_now_price_cents": null, + "has_best_offer": false, + "id": 22, + "image_url": "https://i.ebayimg.com/images/g/-VMAAOSwaX1oNyx4/s-l500.webp", + "item_id": 177133381123, + "title": "SanDisk professional G-DRIVE SSD 2TB, A+ condition" + }, + "parsed": [ + { + "id": 22, + "individual_size_gigabytes": 2048, + "item": 177133381123, + "needed_description_check": false, + "parse_engine": 0, + "quantity": 1, + "total_gigabytes": 2048 + } + ] + }, + { + "history": [ + { + "category": "ssd", + "current_bid_usd_cents": 3108, + "item": 187263467837, + "timestamp": "2025-06-19T21:44:23Z" + } + ], + "listing": { + "buy_it_now_price_cents": null, + "has_best_offer": false, + "id": 35, + "image_url": "https://i.ebayimg.com/images/g/hn8AAOSw1hJoNrJm/s-l500.webp", + "item_id": 187263467837, + "title": "Used Fanxiang S880 4TB SSD NVME M.2 SSD PCIe 4x4 7300MBS Solid State Drive" + }, + "parsed": [ + { + "id": 35, + "individual_size_gigabytes": 4096, + "item": 187263467837, + "needed_description_check": false, + "parse_engine": 0, + "quantity": 1, + "total_gigabytes": 4096 + } + ] + }, + { + "history": [ + { + "category": "ssd", + "current_bid_usd_cents": 1000, + "item": 267267367821, + "timestamp": "2025-06-19T21:44:23Z" + } + ], + "listing": { + "buy_it_now_price_cents": null, + "has_best_offer": false, + "id": 37, + "image_url": "https://i.ebayimg.com/images/g/Cr8AAOSwXY1oN6m8/s-l500.webp", + "item_id": 267267367821, + "title": "(Lot of 6) Samsung MZ-VLB2560 256GB M.2 NVMe Internal SSD (MZVLB256HBHQ-000H1)" + }, + "parsed": [ + { + "id": 37, + "individual_size_gigabytes": 256, + "item": 267267367821, + "needed_description_check": false, + "parse_engine": 0, + "quantity": 6, + "total_gigabytes": 1536 + } + ] + }, + { + "history": [ + { + "category": "ssd", + "current_bid_usd_cents": 4600, + "item": 187263491149, + "timestamp": "2025-06-19T21:44:23Z" + } + ], + "listing": { + "buy_it_now_price_cents": null, + "has_best_offer": false, + "id": 44, + "image_url": "https://i.ebayimg.com/images/g/v2EAAOSwg9poNrTr/s-l500.webp", + "item_id": 187263491149, + "title": "Used Silicon Power 4TB US75 Nvme PCIe Gen4x4 M.2 2280 SSD R/W Up to 7000/6500 MB" + }, + "parsed": [ + { + "id": 44, + "individual_size_gigabytes": 4096, + "item": 187263491149, + "needed_description_check": false, + "parse_engine": 0, + "quantity": 1, + "total_gigabytes": 4096 + } + ] + }, + { + "history": [ + { + "category": "ssd", + "current_bid_usd_cents": 1000, + "item": 267267351339, + "timestamp": "2025-06-19T21:44:23Z" + } + ], + "listing": { + "buy_it_now_price_cents": null, + "has_best_offer": false, + "id": 46, + "image_url": "https://i.ebayimg.com/images/g/z8EAAOSwyKZoN6TW/s-l500.webp", + "item_id": 267267351339, + "title": "(Lot of 6) Used -Micron MTFDDAV256TBN 256GB, M.2 2280 Solid State Drive" + }, + "parsed": [ + { + "id": 46, + "individual_size_gigabytes": 256, + "item": 267267351339, + "needed_description_check": false, + "parse_engine": 0, + "quantity": 6, + "total_gigabytes": 1536 + } + ] + }, + { + "history": [ + { + "category": "ssd", + "current_bid_usd_cents": 99, + "item": 306325087069, + "timestamp": "2025-06-19T21:44:23Z" + } + ], + "listing": { + "buy_it_now_price_cents": null, + "has_best_offer": false, + "id": 59, + "image_url": "https://i.ebayimg.com/images/g/zuUAAOSwIoJoN5yC/s-l500.webp", + "item_id": 306325087069, + "title": "T298 ~ HP OEM Desktop Z240 Workstation Heatsink w NVMe M.2 256GB SSD 826414-001" + }, + "parsed": [ + { + "id": 59, + "individual_size_gigabytes": 256, + "item": 306325087069, + "needed_description_check": false, + "parse_engine": 0, + "quantity": 1, + "total_gigabytes": 256 + } + ] + }, + { + "history": [ + { + "category": "ssd", + "current_bid_usd_cents": 1000, + "item": 267267322597, + "timestamp": "2025-06-19T21:44:23Z" + } + ], + "listing": { + "buy_it_now_price_cents": null, + "has_best_offer": false, + "id": 60, + "image_url": "https://i.ebayimg.com/images/g/r8YAAOSwlkdoN5uW/s-l500.webp", + "item_id": 267267322597, + "title": "(Lot of 5) Used - Micro 1100 256GB SATA III 2.5\" SSD MTFDDAK256TBN" + }, + "parsed": [ + { + "id": 60, + "individual_size_gigabytes": 256, + "item": 267267322597, + "needed_description_check": false, + "parse_engine": 0, + "quantity": 5, + "total_gigabytes": 1280 + } + ] + } +] + + +run5 +HTTP/1.1 200 OK +content-length: 237 +content-type: application/json +date: Thu, 10 Jul 2025 04:26:49 GMT + +{ + "buy_it_now_price_cents": null, + "has_best_offer": false, + "id": 60, + "image_url": "https://i.ebayimg.com/images/g/r8YAAOSwlkdoN5uW/s-l500.webp", + "item_id": 267267322597, + "title": "(Lot of 5) Used - Micro 1100 256GB SATA III 2.5\" SSD MTFDDAK256TBN" +} + + +run6 +HTTP/1.1 200 OK +content-length: 62 +content-type: application/json +date: Thu, 10 Jul 2025 04:26:50 GMT + +[ + { + "current_bid_usd_cents": 1000, + "when": "2025-06-19T21:44:23Z" + } +] + + +run7 +HTTP/1.1 200 OK +content-length: 149 +content-type: application/json +date: Thu, 10 Jul 2025 04:26:50 GMT + +[ + { + "id": 60, + "individual_size_gigabytes": 256, + "item": 267267322597, + "needed_description_check": false, + "parse_engine": 0, + "quantity": 5, + "total_gigabytes": 1280 + } +] ``` diff --git a/src/db.rs b/src/db.rs index f2ddfbe..61a87e9 100644 --- a/src/db.rs +++ b/src/db.rs @@ -3,6 +3,8 @@ use rusqlite::Connection; use serde::Deserialize; use serde::Serialize; use std::path::Path; +use strum::{EnumIter, IntoEnumIterator}; +// use strum_macros::EnumIter; use tracing::{error, info}; pub trait DBTable { @@ -140,30 +142,54 @@ impl SearchURL { } } +#[derive(Serialize, Debug, PartialEq, Clone, EnumIter)] +pub enum ParsedPageStatus { + PendingParse, + Ready, +} +impl TryFrom for ParsedPageStatus { + type Error = rusqlite::Error; + + fn try_from(value: i64) -> Result { + match value { + 0 => Ok(ParsedPageStatus::PendingParse), + 1 => Ok(ParsedPageStatus::Ready), + _ => Err(rusqlite::Error::InvalidColumnType( + 2, + "Invalid integer of {} for ParsedPageStatus".to_string(), + rusqlite::types::Type::Integer, + )), + } + } +} + #[derive(Serialize, Debug, PartialEq, Clone)] -pub struct ParsedPage { +pub struct Page { pub timestamp: DateTime, pub category: String, + pub status: ParsedPageStatus, } -impl DBTable for ParsedPage { - const TABLE_NAME: &'static str = "Pages_Parsed"; +impl DBTable for Page { + const TABLE_NAME: &'static str = "Pages"; const TABLE_SCHEMA: &'static str = " id INTEGER PRIMARY KEY, category TEXT NOT NULL, timestamp INTEGER NOT NULL, + status INTEGER NOT NULL, UNIQUE(category, timestamp) FOREIGN KEY(category) REFERENCES SearchURLs(name) "; fn get_all(conn: &Connection) -> rusqlite::Result> { let mut stmt = conn.prepare(&format!( - "SELECT category, timestamp FROM {}", + "SELECT category, timestamp, status FROM {}", Self::TABLE_NAME ))?; let iter = stmt.query_map([], |row| { - Ok(ParsedPage { + Ok(Page { category: row.get(0)?, timestamp: row.get(1)?, + status: row.get::<_, i64>(2)?.try_into().unwrap(), }) })?; @@ -174,7 +200,7 @@ impl DBTable for ParsedPage { Ok(result) } } -impl ParsedPage { +impl Page { pub fn lookup(conn: &Connection, timestamp: DateTime) -> Option { let mut stmt = conn .prepare(&format!( @@ -183,10 +209,11 @@ impl ParsedPage { )) .ok()?; stmt.query_one([timestamp], |row| { - Ok(ParsedPage { + Ok(Page { // id: row.get(0)?, category: row.get(1)?, timestamp: row.get(2)?, + status: row.get::<_, i64>(3)?.try_into().unwrap(), }) }) .ok() @@ -196,13 +223,70 @@ impl ParsedPage { let _ = conn .execute( &format!( - "INSERT OR REPLACE INTO {} (category, timestamp) VALUES (?1, ?2)", + "INSERT OR REPLACE INTO {} (category, timestamp, status) VALUES (?1, ?2, ?3)", Self::TABLE_NAME ), - (&self.category, self.timestamp), + (&self.category, self.timestamp, self.status.clone() as i64), ) .unwrap(); } + + pub fn lookup_status( + conn: &Connection, + status: ParsedPageStatus, + category: &str, + max: usize, + ) -> Vec { + let mut stmt = conn + .prepare(&format!( + "SELECT category, timestamp, status FROM {} WHERE status = {} AND category = ?1 LIMIT {}", + Self::TABLE_NAME, + status.clone() as i64, + max + )) + .unwrap(); + stmt.query_map([category], |row| { + Ok(Self { + category: row.get(0)?, + timestamp: row.get(1)?, + status: row.get::<_, i64>(2)?.try_into().unwrap(), + }) + }) + .unwrap() + .inspect(|e| info!("debugging saw {:?}", e)) + .filter_map(|e| e.ok()) + .collect() + } + + pub fn category_stats(conn: &Connection, category: &str) -> Vec<(ParsedPageStatus, i64, i64)> { + let mut res: Vec<(ParsedPageStatus, i64, i64)> = vec![]; + + for status in ParsedPageStatus::iter() { + let cnt_category_status = conn + .prepare(&format!( + "SELECT COUNT(*) FROM {} WHERE category = ?1 AND status = {}", + Self::TABLE_NAME, + status.clone() as i64 + )) + .ok() + .unwrap() + .query_one([category], |r| r.get(0)) + .inspect_err(|e| error!("Failed to get count due to error\"{:?}\", returning 0", e)) + .unwrap_or(0); + let cnt_category_total = conn + .prepare(&format!( + "SELECT COUNT(*) FROM {} WHERE category = ?1", + Self::TABLE_NAME + )) + .ok() + .unwrap() + .query_one([category], |r| r.get(0)) + .inspect_err(|e| error!("Failed to get count due to error\"{:?}\", returning 0", e)) + .unwrap_or(0); + res.push((status, cnt_category_status, cnt_category_total)); + } + res + } } #[derive(Serialize, Debug, PartialEq, Copy, Clone)] @@ -318,7 +402,7 @@ impl DBTable for ItemAppearances { current_bid_usd_cents INTEGER, UNIQUE(item, timestamp), FOREIGN KEY(item) REFERENCES Listings(item_id), - FOREIGN KEY(category, timestamp) REFERENCES Pages_Parsed(category, timestamp) + FOREIGN KEY(category, timestamp) REFERENCES Pages(category, timestamp) "; fn get_all(conn: &Connection) -> rusqlite::Result> { @@ -624,7 +708,7 @@ pub fn get_initialized(path: Option<&Path>) -> Connection { SearchURL::initialize(&conn); Listing::initialize(&conn); ParsedStorage::initialize(&conn); - ParsedPage::initialize(&conn); + Page::initialize(&conn); ItemAppearances::initialize(&conn); conn @@ -644,7 +728,7 @@ pub fn get_stats(conn: &Connection) -> Stats { rows_search_url: SearchURL::get_count(conn), rows_listing: Listing::get_count(conn), rows_parsed_storage: ParsedStorage::get_count(conn), - rows_parsed_page: ParsedPage::get_count(conn), + rows_parsed_page: Page::get_count(conn), rows_item_appearances: ItemAppearances::get_count(conn), } } @@ -687,12 +771,28 @@ mod tests { parsed.add_or_update(&db); assert_eq!(ParsedStorage::lookup(&db, listing.item_id), vec![parsed]); - let page = ParsedPage { + let page = Page { category: "ssd".to_owned(), timestamp: std::time::SystemTime::now().into(), + status: ParsedPageStatus::PendingParse, }; page.add_or_update(&db); - assert_eq!(ParsedPage::lookup(&db, page.timestamp), Some(page.clone())); + assert_eq!(Page::lookup(&db, page.timestamp), Some(page.clone())); + assert_eq!( + Page::lookup_status(&db, ParsedPageStatus::PendingParse, "ssd", 10), + vec![page.clone()] + ); + assert_eq!( + Page::lookup_status(&db, ParsedPageStatus::Ready, "ssd", 10), + vec![] + ); + assert_eq!( + Page::category_stats(&db, "ssd"), + vec![ + (ParsedPageStatus::PendingParse, 1, 1), + (ParsedPageStatus::Ready, 0, 1) + ] + ); let apperance = ItemAppearances { item: listing.item_id, diff --git a/src/lib.rs b/src/lib.rs index 92ecfdf..6c8fd85 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,3 +2,4 @@ pub mod db; pub mod parser; pub mod parser_ebay; pub mod parser_storage; +pub mod xdg_dirs; diff --git a/src/main.rs b/src/main.rs index a214a6a..f2668e4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,12 +1,14 @@ -use actix_web::{App, HttpServer, Responder, Result, get, post, web, web::Data}; +use actix_web::{App, HttpServer, Responder, Result, get, post, rt, web, web::Data}; use chrono::{DateTime, Utc}; use clap::Parser; -use ebay_scraper_rust::db::{ - DBTable, ItemAppearances, Listing, ParsedPage, ParsedStorage, SearchURL, get_initialized, - get_stats, listings_get_filtered, -}; -use ebay_scraper_rust::parser::parse_dir; +use ebay_scraper_rust::db; +use ebay_scraper_rust::db::DBTable; +use ebay_scraper_rust::db::Page; +use ebay_scraper_rust::parser; use ebay_scraper_rust::parser_storage; +use ebay_scraper_rust::xdg_dirs; +// use rt::mpsc; +// use rt::time::timeout; use serde::{Deserialize, Serialize}; use std::path::PathBuf; use std::sync::Mutex; @@ -18,8 +20,6 @@ use tracing_subscriber::fmt; use tracing_subscriber::prelude::__tracing_subscriber_SubscriberExt; use tracing_subscriber::util::SubscriberInitExt; -mod xdg_dirs; - #[derive(Parser, Debug)] #[clap( name = "ebay-scraper-rust", @@ -41,7 +41,7 @@ async fn listings_filtered_get( filter: web::Query, ) -> Result { let start = Instant::now(); - let res = listings_get_filtered( + let res = db::listings_get_filtered( &db.lock().unwrap(), &DateTime::::from_timestamp(filter.since.unwrap_or(0), 0).unwrap(), filter.limit.unwrap_or(1_000), @@ -61,7 +61,7 @@ async fn listing_get( db: Data>, id: web::Path, ) -> Result { - Ok(web::Json(Listing::lookup(&db.lock().unwrap(), *id))) + Ok(web::Json(db::Listing::lookup(&db.lock().unwrap(), *id))) } #[get("/listing/{id}/parsed")] @@ -69,7 +69,10 @@ async fn listing_parse_get( db: Data>, id: web::Path, ) -> Result { - Ok(web::Json(ParsedStorage::lookup(&db.lock().unwrap(), *id))) + Ok(web::Json(db::ParsedStorage::lookup( + &db.lock().unwrap(), + *id, + ))) } #[derive(Serialize)] @@ -83,7 +86,7 @@ async fn listing_history_get( db: Data>, id: web::Path, ) -> Result { - let history: Vec<_> = ItemAppearances::lookup(&db.lock().unwrap(), *id) + let history: Vec<_> = db::ItemAppearances::lookup(&db.lock().unwrap(), *id) .iter() // .inspect(|e| info!("got: {:?}", e)) .filter_map(|e| { @@ -100,7 +103,7 @@ async fn listing_history_get( async fn parse_listings(db: Data>) -> Result { let mut cnt = 0; let db_unlocked = db.lock().unwrap(); - Listing::lookup_non_parsed(&db_unlocked) + db::Listing::lookup_non_parsed(&db_unlocked) .iter() .map(|l| parser_storage::parse_size_and_quantity(l.0, &l.1)) .inspect(|_| cnt = cnt + 1) @@ -111,7 +114,33 @@ async fn parse_listings(db: Data>) -> Result>) -> Result { - Ok(web::Json(SearchURL::names(&db.lock().unwrap()))) + Ok(web::Json(db::SearchURL::names(&db.lock().unwrap()))) +} + +async fn category_discover_worker( + db: Data>, + downloaddir: Data, + category: web::Path, +) { +} + +#[post("/category/{category}/discover")] +#[instrument(skip_all)] +async fn category_discover( + db: Data>, + downloaddir: Data, + category: web::Path, +) -> Result { + let start = Instant::now(); + parser::add_pages( + &db.lock().unwrap(), + &downloaddir.join(category.clone()), + &category, + ); + let elapsed = start.elapsed().as_micros() as f64 / 1000.0; + + info!("Added many pages to the category, took {elapsed} ms."); + Ok("") } #[post("/category/{category}/parse")] @@ -121,33 +150,40 @@ async fn category_parse( downloaddir: Data, category: web::Path, ) -> Result { - let start = Instant::now(); - let count = parse_dir( - &downloaddir.join(category.clone()), - &category, - &db.lock().unwrap(), - ) - .unwrap(); - let elapsed = start.elapsed().as_micros() as f64 / 1000.0; + parser::parse_pages(&db.lock().unwrap(), &downloaddir, &category, 100); + Ok("") +} - info!("Added {count} listings, took {elapsed} ms."); - Ok(count.to_string()) +#[get("/category/{category}/parse")] +#[instrument(skip_all)] +async fn category_parse_get( + db: Data>, + category: web::Path, +) -> Result { + let start = Instant::now(); + let stats = Page::category_stats(&db.lock().unwrap(), &category); + stats + .iter() + .for_each(|(status, cnt, total)| info!("{:?} {} {}", status, cnt, total)); + let elapsed = start.elapsed().as_micros() as f64 / 1000.0; + info!("Found, took {elapsed} ms."); + Ok(web::Json(stats)) } #[get("/stats")] async fn stats_get(db: Data>) -> Result { - Ok(web::Json(get_stats(&db.lock().unwrap()))) + Ok(web::Json(db::get_stats(&db.lock().unwrap()))) } #[get("/admin")] async fn admin_get(db: Data>) -> Result { let db = db.lock().unwrap(); let query_start_time = Instant::now(); - let search_urls = SearchURL::get_all(&db).unwrap_or_default(); - let parsed_pages = ParsedPage::get_all(&db).unwrap_or_default(); - let parsed_storages = ParsedStorage::get_all(&db).unwrap_or_default(); - let item_appearances = ItemAppearances::get_all(&db).unwrap_or_default(); - let listings = Listing::get_all(&db).unwrap_or_default(); + let search_urls = db::SearchURL::get_all(&db).unwrap_or_default(); + let parsed_pages = db::Page::get_all(&db).unwrap_or_default(); + let parsed_storages = db::ParsedStorage::get_all(&db).unwrap_or_default(); + let item_appearances = db::ItemAppearances::get_all(&db).unwrap_or_default(); + let listings = db::Listing::get_all(&db).unwrap_or_default(); let total_query_time = query_start_time.elapsed().as_micros() as f64 / 1000.0; let html_gen_start_time = Instant::now(); @@ -274,6 +310,10 @@ fn generate_table(title: &str, data: &[T]) -> String { ) } +async fn pages_pickup() -> std::io::Result<()> { + Ok(()) +} + #[actix_web::main] async fn main() -> std::io::Result<()> { tracing_subscriber::registry() @@ -287,10 +327,10 @@ async fn main() -> std::io::Result<()> { "Starting with scraped data dir of \"{}\".", scrapedatadir.to_str().unwrap() ); - let db_mutex = Data::new(Mutex::new(get_initialized(None))); + let db_mutex = Data::new(Mutex::new(db::get_initialized(None))); // Prepare our backend via pulling in what catagories we are preconfigured with. - SearchURL::scan(&db_mutex.lock().unwrap(), &scrapedatadir, "url.json"); + db::SearchURL::scan(&db_mutex.lock().unwrap(), &scrapedatadir, "url.json"); HttpServer::new(move || { App::new() @@ -298,10 +338,13 @@ async fn main() -> std::io::Result<()> { .service(listing_get) .service(listings_filtered_get) .service(listing_history_get) + .service(listing_parse_get) // Category handlers .service(parse_listings) .service(category_parse) + .service(category_discover) .service(category_getnames) + .service(category_parse_get) // Gnarly info dump .service(admin_get) .service(stats_get) @@ -312,4 +355,6 @@ async fn main() -> std::io::Result<()> { .bind(("0.0.0.0", 9876))? .run() .await + + // tokio::join!(server, pages_pickup) } diff --git a/src/parser.rs b/src/parser.rs index 6a73cf2..98907e4 100644 --- a/src/parser.rs +++ b/src/parser.rs @@ -1,5 +1,5 @@ use crate::{ - db::{ParsedPage, SearchURL}, + db::{Page, ParsedPageStatus, SearchURL}, parser_ebay, }; use rayon::prelude::*; @@ -31,12 +31,12 @@ fn timestamps_from_dir(path: &Path) -> Vec { .collect() } -pub fn parse_dir(dir: &Path, category: &str, db: &rusqlite::Connection) -> Option { +pub fn add_pages(db: &rusqlite::Connection, dir: &Path, category: &str) { // Ensure the category is created. let url_fpath = dir.join("url.json"); let url_contents = std::fs::read_to_string(&url_fpath) .inspect_err(|e| error!("Failed reading {}: {e}", url_fpath.display())) - .ok()?; + .unwrap(); #[derive(Deserialize)] struct URLJSON { @@ -50,62 +50,75 @@ pub fn parse_dir(dir: &Path, category: &str, db: &rusqlite::Connection) -> Optio // See all pages haven't been seen before. let query_start_time = Instant::now(); - let to_parse = timestamps_from_dir(dir).into_iter().filter(|t| { - let ts = chrono::DateTime::from_timestamp(*t, 0).unwrap(); - let p = ParsedPage::lookup(&db, ts); + let to_parse = timestamps_from_dir(dir) + .into_iter() + .filter(|t| { + let ts = chrono::DateTime::from_timestamp(*t, 0).unwrap(); + let p = Page::lookup(&db, ts); + + // Timestamp never seen before, lets pass it on. + if p.is_none() { + info!( + "Page Timestamp:{} Catagory:{category} never seen before", + ts.timestamp() + ); + return true; + } + + // Timestamp was seen before *and* from the same catagory, don't pass it on. + if p.unwrap().category == *category { + info!( + "Page Timestamp:{} Catagory:{category} seen before, skipping", + ts.timestamp() + ); + return false; + } - // Timestamp never seen before, lets pass it on. - if p.is_none() { info!( - "Page Timestamp:{} Catagory:{category} never seen before, processing ...", + "Page Timestamp:{} Catagory:{category} seen before, but not of catagory:{category}", ts.timestamp() ); return true; - } - - // Timestamp was seen before *and* from the same catagory, don't pass - // it on. - if p.unwrap().category == *category { - info!( - "Page Timestamp:{} Catagory:{category} seen before, skipping ...", - ts.timestamp() - ); - return false; - } - - info!( - "Page Timestamp:{} Catagory:{category} seen before, but not of catagory:{category}, processing ...", - ts.timestamp() - ); - return true; - }).collect::>(); + }) + .collect::>(); let total_query_time = query_start_time.elapsed().as_micros() as f64 / 1000.0; info!("Time spent finding pages to parse:{total_query_time} ms"); - // For each page, read the file and parse it. + // Say we are going to parse the pages. + let query_start_time = Instant::now(); + let pages = to_parse.iter().map(|p| Page { + timestamp: chrono::DateTime::from_timestamp(*p, 0).unwrap(), + category: category.to_string(), + status: crate::db::ParsedPageStatus::PendingParse, + }); + for p in pages { + p.add_or_update(&db); + } + let total_query_time = query_start_time.elapsed().as_micros() as f64 / 1000.0; + info!("Time spent inserting pages marked as ready to parse:{total_query_time} ms"); +} + +pub fn parse_pages(db: &rusqlite::Connection, dir: &Path, category: &str, batch: usize) { + let to_parse = Page::lookup_status(&db, ParsedPageStatus::PendingParse, category, batch); let query_start_time = Instant::now(); let to_add = to_parse .par_iter() .map(|p| { - let ts = chrono::DateTime::from_timestamp(*p, 0).unwrap(); - let paged_info = ParsedPage { - timestamp: ts, - category: category.to_string(), - }; - - let page_path = dir.join(format!("{}.html", ts.timestamp())); + let page_path = dir + .join(category) + .join(format!("{}.html", p.timestamp.timestamp())); let page_contents = std::fs::read_to_string(&page_path) .inspect_err(|e| error!("Failed reading {}, error:{e}", page_path.display())) .ok()?; let elements = - parser_ebay::parse_from_ebay_page(&page_contents, &ts, &category).unwrap(); + parser_ebay::parse_from_ebay_page(&page_contents, &p.timestamp, &category).unwrap(); info!( "Page Timestamp:{} Catagory:{category}, found {} elements", - ts.timestamp(), + p.timestamp.timestamp(), elements.len() ); - Some((paged_info, elements)) + Some((p, elements)) }) .collect::>(); let total_query_time = query_start_time.elapsed().as_micros() as f64 / 1000.0; @@ -113,7 +126,6 @@ pub fn parse_dir(dir: &Path, category: &str, db: &rusqlite::Connection) -> Optio // And lastly add it to our database! let query_start_time = Instant::now(); - let mut added_count = 0; for iter in to_add { if iter.is_none() { continue; @@ -122,7 +134,6 @@ pub fn parse_dir(dir: &Path, category: &str, db: &rusqlite::Connection) -> Optio paged_info.add_or_update(&db); for e in elements { - added_count += 1; e.0.add_or_update(&db); e.1.add_or_update(&db); debug!( @@ -135,6 +146,4 @@ pub fn parse_dir(dir: &Path, category: &str, db: &rusqlite::Connection) -> Optio } let total_query_time = query_start_time.elapsed().as_micros() as f64 / 1000.0; info!("Time spent adding parsed pages: {total_query_time} ms"); - - return Some(added_count); }