1 Commits

Author SHA1 Message Date
9331b55e08 Ehhh, lets rethink this ... (parallel status)
All checks were successful
Cargo Build & Test / Rust project - latest (1.85.1) (push) Successful in 4m31s
Cargo Build & Test / Rust project - latest (1.86) (push) Successful in 4m40s
Cargo Build & Test / Rust project - latest (1.87) (push) Successful in 4m55s
Cargo Build & Test / Rust project - latest (1.88) (push) Successful in 10m2s
2025-09-01 10:26:37 -04:00
7 changed files with 638 additions and 94 deletions

45
Cargo.lock generated
View File

@@ -680,6 +680,7 @@ dependencies = [
"clap", "clap",
"dirs", "dirs",
"lazy_static", "lazy_static",
"liblzma",
"rayon", "rayon",
"regex", "regex",
"rusqlite", "rusqlite",
@@ -687,6 +688,7 @@ dependencies = [
"serde", "serde",
"serde_json", "serde_json",
"similar-asserts", "similar-asserts",
"strum",
"test-log", "test-log",
"tracing", "tracing",
"tracing-subscriber", "tracing-subscriber",
@@ -1161,6 +1163,26 @@ version = "0.2.174"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1171693293099992e19cddea4e8b849964e9846f4acee11b3948bcc337be8776" checksum = "1171693293099992e19cddea4e8b849964e9846f4acee11b3948bcc337be8776"
[[package]]
name = "liblzma"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0791ab7e08ccc8e0ce893f6906eb2703ed8739d8e89b57c0714e71bad09024c8"
dependencies = [
"liblzma-sys",
]
[[package]]
name = "liblzma-sys"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "01b9596486f6d60c3bbe644c0e1be1aa6ccc472ad630fe8927b456973d7cb736"
dependencies = [
"cc",
"libc",
"pkg-config",
]
[[package]] [[package]]
name = "libredox" name = "libredox"
version = "0.1.3" version = "0.1.3"
@@ -1891,6 +1913,29 @@ version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f"
[[package]]
name = "strum"
version = "0.27.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f64def088c51c9510a8579e3c5d67c65349dcf755e5479ad3d010aa6454e2c32"
dependencies = [
"phf",
"strum_macros",
]
[[package]]
name = "strum_macros"
version = "0.27.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c77a8c5abcaf0f9ce05d62342b7d298c346515365c36b673df4ebe3ced01fde8"
dependencies = [
"heck",
"proc-macro2",
"quote",
"rustversion",
"syn",
]
[[package]] [[package]]
name = "syn" name = "syn"
version = "2.0.103" version = "2.0.103"

View File

@@ -9,12 +9,14 @@ chrono = { version = "0.4.41", features = ["serde"] }
clap = { version = "4.5.40", features = ["derive"] } clap = { version = "4.5.40", features = ["derive"] }
dirs = "6.0.0" dirs = "6.0.0"
lazy_static = "1.5.0" lazy_static = "1.5.0"
liblzma = "0.4.2"
rayon = "1.10.0" rayon = "1.10.0"
regex = "1.11.1" regex = "1.11.1"
rusqlite = { version = "0.36.0", features = ["bundled", "chrono"] } rusqlite = { version = "0.36.0", features = ["bundled", "chrono"] }
scraper = "0.23.1" scraper = "0.23.1"
serde = { version = "1.0.219", features = ["derive"] } serde = { version = "1.0.219", features = ["derive"] }
serde_json = "1.0.140" serde_json = "1.0.140"
strum = { version = "0.27.1", features = ["std", "derive", "phf", "strum_macros"] }
test-log = { version = "0.2.17", features = ["trace"] } test-log = { version = "0.2.17", features = ["trace"] }
tracing = { version = "0.1.41", features = ["attributes"] } tracing = { version = "0.1.41", features = ["attributes"] }
tracing-subscriber = { version = "0.3.19", features = ["env-filter"] } tracing-subscriber = { version = "0.3.19", features = ["env-filter"] }

354
readme.md
View File

@@ -3,10 +3,352 @@
This is a dumb little tool which ingests raw HTML files, does some parsing on them, and serves the results over a web API. This is a dumb little tool which ingests raw HTML files, does some parsing on them, and serves the results over a web API.
```bash ```bash
export URL_BASE="http://scraper.homelab.hak8or.com:8080"; \ export URL_BASE="localhost:9876"; \
echo run0 && http POST "$URL_BASE/page/parse/ssd" && \ echo run0 && http POST "$URL_BASE/category/ssd/discover" && \
echo run1 && http POST "$URL_BASE/listing/parse" && \ echo run1 && http POST "$URL_BASE/category/ssd/parse" && \
echo run2 && http GET "$URL_BASE/listing/since/12345678/2" && \ echo run2 && http GET "$URL_BASE/category/ssd/parse" && \
echo run3 && http GET "$URL_BASE/listing/388484391867" && \ echo run3 && http POST "$URL_BASE/listing/parse" && \
echo run4 && http GET "$URL_BASE/listing/286605201240/history" echo run4 && http GET "$URL_BASE/listings" since:=10099 limit:=10 cents_per_tbytes_max:=900 && \
echo run5 && http GET "$URL_BASE/listing/267267322597" && \
echo run6 && http GET "$URL_BASE/listing/267267322597/history" &&
echo run7 && http GET "$URL_BASE/listing/267267322597/parsed"
```
```
run0
HTTP/1.1 200 OK
content-length: 0
content-type: text/plain; charset=utf-8
date: Thu, 10 Jul 2025 04:26:49 GMT
run1
HTTP/1.1 200 OK
content-length: 0
content-type: text/plain; charset=utf-8
date: Thu, 10 Jul 2025 04:26:49 GMT
run2
HTTP/1.1 200 OK
content-length: 36
content-type: application/json
date: Thu, 10 Jul 2025 04:26:49 GMT
[
[
"PendingParse",
1,
1
],
[
"Ready",
0,
1
]
]
run3
HTTP/1.1 200 OK
content-length: 2
content-type: application/json
date: Thu, 10 Jul 2025 04:26:49 GMT
62
run4
HTTP/1.1 200 OK
content-length: 4232
content-type: application/json
date: Thu, 10 Jul 2025 04:26:49 GMT
[
{
"history": [
{
"category": "ssd",
"current_bid_usd_cents": 1260,
"item": 286605201240,
"timestamp": "2025-06-19T21:44:23Z"
}
],
"listing": {
"buy_it_now_price_cents": null,
"has_best_offer": true,
"id": 5,
"image_url": "https://i.ebayimg.com/images/g/3NoAAeSwPrtoDb1O/s-l500.webp",
"item_id": 286605201240,
"title": "Fanxiang M.2 SSD 1TB NVMe PCIe Gen 3x 4 M2 Internal Solid State Drive 3500MB/s"
},
"parsed": [
{
"id": 5,
"individual_size_gigabytes": 1024,
"item": 286605201240,
"needed_description_check": false,
"parse_engine": 0,
"quantity": 1,
"total_gigabytes": 1024
}
]
},
{
"history": [
{
"category": "ssd",
"current_bid_usd_cents": 2400,
"item": 177133381123,
"timestamp": "2025-06-19T21:44:23Z"
}
],
"listing": {
"buy_it_now_price_cents": null,
"has_best_offer": false,
"id": 22,
"image_url": "https://i.ebayimg.com/images/g/-VMAAOSwaX1oNyx4/s-l500.webp",
"item_id": 177133381123,
"title": "SanDisk professional G-DRIVE SSD 2TB, A+ condition"
},
"parsed": [
{
"id": 22,
"individual_size_gigabytes": 2048,
"item": 177133381123,
"needed_description_check": false,
"parse_engine": 0,
"quantity": 1,
"total_gigabytes": 2048
}
]
},
{
"history": [
{
"category": "ssd",
"current_bid_usd_cents": 3108,
"item": 187263467837,
"timestamp": "2025-06-19T21:44:23Z"
}
],
"listing": {
"buy_it_now_price_cents": null,
"has_best_offer": false,
"id": 35,
"image_url": "https://i.ebayimg.com/images/g/hn8AAOSw1hJoNrJm/s-l500.webp",
"item_id": 187263467837,
"title": "Used Fanxiang S880 4TB SSD NVME M.2 SSD PCIe 4x4 7300MBS Solid State Drive"
},
"parsed": [
{
"id": 35,
"individual_size_gigabytes": 4096,
"item": 187263467837,
"needed_description_check": false,
"parse_engine": 0,
"quantity": 1,
"total_gigabytes": 4096
}
]
},
{
"history": [
{
"category": "ssd",
"current_bid_usd_cents": 1000,
"item": 267267367821,
"timestamp": "2025-06-19T21:44:23Z"
}
],
"listing": {
"buy_it_now_price_cents": null,
"has_best_offer": false,
"id": 37,
"image_url": "https://i.ebayimg.com/images/g/Cr8AAOSwXY1oN6m8/s-l500.webp",
"item_id": 267267367821,
"title": "(Lot of 6) Samsung MZ-VLB2560 256GB M.2 NVMe Internal SSD (MZVLB256HBHQ-000H1)"
},
"parsed": [
{
"id": 37,
"individual_size_gigabytes": 256,
"item": 267267367821,
"needed_description_check": false,
"parse_engine": 0,
"quantity": 6,
"total_gigabytes": 1536
}
]
},
{
"history": [
{
"category": "ssd",
"current_bid_usd_cents": 4600,
"item": 187263491149,
"timestamp": "2025-06-19T21:44:23Z"
}
],
"listing": {
"buy_it_now_price_cents": null,
"has_best_offer": false,
"id": 44,
"image_url": "https://i.ebayimg.com/images/g/v2EAAOSwg9poNrTr/s-l500.webp",
"item_id": 187263491149,
"title": "Used Silicon Power 4TB US75 Nvme PCIe Gen4x4 M.2 2280 SSD R/W Up to 7000/6500 MB"
},
"parsed": [
{
"id": 44,
"individual_size_gigabytes": 4096,
"item": 187263491149,
"needed_description_check": false,
"parse_engine": 0,
"quantity": 1,
"total_gigabytes": 4096
}
]
},
{
"history": [
{
"category": "ssd",
"current_bid_usd_cents": 1000,
"item": 267267351339,
"timestamp": "2025-06-19T21:44:23Z"
}
],
"listing": {
"buy_it_now_price_cents": null,
"has_best_offer": false,
"id": 46,
"image_url": "https://i.ebayimg.com/images/g/z8EAAOSwyKZoN6TW/s-l500.webp",
"item_id": 267267351339,
"title": "(Lot of 6) Used -Micron MTFDDAV256TBN 256GB, M.2 2280 Solid State Drive"
},
"parsed": [
{
"id": 46,
"individual_size_gigabytes": 256,
"item": 267267351339,
"needed_description_check": false,
"parse_engine": 0,
"quantity": 6,
"total_gigabytes": 1536
}
]
},
{
"history": [
{
"category": "ssd",
"current_bid_usd_cents": 99,
"item": 306325087069,
"timestamp": "2025-06-19T21:44:23Z"
}
],
"listing": {
"buy_it_now_price_cents": null,
"has_best_offer": false,
"id": 59,
"image_url": "https://i.ebayimg.com/images/g/zuUAAOSwIoJoN5yC/s-l500.webp",
"item_id": 306325087069,
"title": "T298 ~ HP OEM Desktop Z240 Workstation Heatsink w NVMe M.2 256GB SSD 826414-001"
},
"parsed": [
{
"id": 59,
"individual_size_gigabytes": 256,
"item": 306325087069,
"needed_description_check": false,
"parse_engine": 0,
"quantity": 1,
"total_gigabytes": 256
}
]
},
{
"history": [
{
"category": "ssd",
"current_bid_usd_cents": 1000,
"item": 267267322597,
"timestamp": "2025-06-19T21:44:23Z"
}
],
"listing": {
"buy_it_now_price_cents": null,
"has_best_offer": false,
"id": 60,
"image_url": "https://i.ebayimg.com/images/g/r8YAAOSwlkdoN5uW/s-l500.webp",
"item_id": 267267322597,
"title": "(Lot of 5) Used - Micro 1100 256GB SATA III 2.5\" SSD MTFDDAK256TBN"
},
"parsed": [
{
"id": 60,
"individual_size_gigabytes": 256,
"item": 267267322597,
"needed_description_check": false,
"parse_engine": 0,
"quantity": 5,
"total_gigabytes": 1280
}
]
}
]
run5
HTTP/1.1 200 OK
content-length: 237
content-type: application/json
date: Thu, 10 Jul 2025 04:26:49 GMT
{
"buy_it_now_price_cents": null,
"has_best_offer": false,
"id": 60,
"image_url": "https://i.ebayimg.com/images/g/r8YAAOSwlkdoN5uW/s-l500.webp",
"item_id": 267267322597,
"title": "(Lot of 5) Used - Micro 1100 256GB SATA III 2.5\" SSD MTFDDAK256TBN"
}
run6
HTTP/1.1 200 OK
content-length: 62
content-type: application/json
date: Thu, 10 Jul 2025 04:26:50 GMT
[
{
"current_bid_usd_cents": 1000,
"when": "2025-06-19T21:44:23Z"
}
]
run7
HTTP/1.1 200 OK
content-length: 149
content-type: application/json
date: Thu, 10 Jul 2025 04:26:50 GMT
[
{
"id": 60,
"individual_size_gigabytes": 256,
"item": 267267322597,
"needed_description_check": false,
"parse_engine": 0,
"quantity": 5,
"total_gigabytes": 1280
}
]
``` ```

128
src/db.rs
View File

@@ -3,6 +3,8 @@ use rusqlite::Connection;
use serde::Deserialize; use serde::Deserialize;
use serde::Serialize; use serde::Serialize;
use std::path::Path; use std::path::Path;
use strum::{EnumIter, IntoEnumIterator};
// use strum_macros::EnumIter;
use tracing::{error, info}; use tracing::{error, info};
pub trait DBTable { pub trait DBTable {
@@ -140,30 +142,54 @@ impl SearchURL {
} }
} }
#[derive(Serialize, Debug, PartialEq, Clone, EnumIter)]
pub enum ParsedPageStatus {
PendingParse,
Ready,
}
impl TryFrom<i64> for ParsedPageStatus {
type Error = rusqlite::Error;
fn try_from(value: i64) -> Result<Self, Self::Error> {
match value {
0 => Ok(ParsedPageStatus::PendingParse),
1 => Ok(ParsedPageStatus::Ready),
_ => Err(rusqlite::Error::InvalidColumnType(
2,
"Invalid integer of {} for ParsedPageStatus".to_string(),
rusqlite::types::Type::Integer,
)),
}
}
}
#[derive(Serialize, Debug, PartialEq, Clone)] #[derive(Serialize, Debug, PartialEq, Clone)]
pub struct ParsedPage { pub struct Page {
pub timestamp: DateTime<Utc>, pub timestamp: DateTime<Utc>,
pub category: String, pub category: String,
pub status: ParsedPageStatus,
} }
impl DBTable for ParsedPage { impl DBTable for Page {
const TABLE_NAME: &'static str = "Pages_Parsed"; const TABLE_NAME: &'static str = "Pages";
const TABLE_SCHEMA: &'static str = " const TABLE_SCHEMA: &'static str = "
id INTEGER PRIMARY KEY, id INTEGER PRIMARY KEY,
category TEXT NOT NULL, category TEXT NOT NULL,
timestamp INTEGER NOT NULL, timestamp INTEGER NOT NULL,
status INTEGER NOT NULL,
UNIQUE(category, timestamp) UNIQUE(category, timestamp)
FOREIGN KEY(category) REFERENCES SearchURLs(name) FOREIGN KEY(category) REFERENCES SearchURLs(name)
"; ";
fn get_all(conn: &Connection) -> rusqlite::Result<Vec<Self>> { fn get_all(conn: &Connection) -> rusqlite::Result<Vec<Self>> {
let mut stmt = conn.prepare(&format!( let mut stmt = conn.prepare(&format!(
"SELECT category, timestamp FROM {}", "SELECT category, timestamp, status FROM {}",
Self::TABLE_NAME Self::TABLE_NAME
))?; ))?;
let iter = stmt.query_map([], |row| { let iter = stmt.query_map([], |row| {
Ok(ParsedPage { Ok(Page {
category: row.get(0)?, category: row.get(0)?,
timestamp: row.get(1)?, timestamp: row.get(1)?,
status: row.get::<_, i64>(2)?.try_into().unwrap(),
}) })
})?; })?;
@@ -174,7 +200,7 @@ impl DBTable for ParsedPage {
Ok(result) Ok(result)
} }
} }
impl ParsedPage { impl Page {
pub fn lookup(conn: &Connection, timestamp: DateTime<Utc>) -> Option<Self> { pub fn lookup(conn: &Connection, timestamp: DateTime<Utc>) -> Option<Self> {
let mut stmt = conn let mut stmt = conn
.prepare(&format!( .prepare(&format!(
@@ -183,10 +209,11 @@ impl ParsedPage {
)) ))
.ok()?; .ok()?;
stmt.query_one([timestamp], |row| { stmt.query_one([timestamp], |row| {
Ok(ParsedPage { Ok(Page {
// id: row.get(0)?, // id: row.get(0)?,
category: row.get(1)?, category: row.get(1)?,
timestamp: row.get(2)?, timestamp: row.get(2)?,
status: row.get::<_, i64>(3)?.try_into().unwrap(),
}) })
}) })
.ok() .ok()
@@ -196,13 +223,70 @@ impl ParsedPage {
let _ = conn let _ = conn
.execute( .execute(
&format!( &format!(
"INSERT OR REPLACE INTO {} (category, timestamp) VALUES (?1, ?2)", "INSERT OR REPLACE INTO {} (category, timestamp, status) VALUES (?1, ?2, ?3)",
Self::TABLE_NAME Self::TABLE_NAME
), ),
(&self.category, self.timestamp), (&self.category, self.timestamp, self.status.clone() as i64),
) )
.unwrap(); .unwrap();
} }
pub fn lookup_status(
conn: &Connection,
status: ParsedPageStatus,
category: &str,
max: usize,
) -> Vec<Self> {
let mut stmt = conn
.prepare(&format!(
"SELECT category, timestamp, status FROM {} WHERE status = {} AND category = ?1 LIMIT {}",
Self::TABLE_NAME,
status.clone() as i64,
max
))
.unwrap();
stmt.query_map([category], |row| {
Ok(Self {
category: row.get(0)?,
timestamp: row.get(1)?,
status: row.get::<_, i64>(2)?.try_into().unwrap(),
})
})
.unwrap()
.inspect(|e| info!("debugging saw {:?}", e))
.filter_map(|e| e.ok())
.collect()
}
pub fn category_stats(conn: &Connection, category: &str) -> Vec<(ParsedPageStatus, i64, i64)> {
let mut res: Vec<(ParsedPageStatus, i64, i64)> = vec![];
for status in ParsedPageStatus::iter() {
let cnt_category_status = conn
.prepare(&format!(
"SELECT COUNT(*) FROM {} WHERE category = ?1 AND status = {}",
Self::TABLE_NAME,
status.clone() as i64
))
.ok()
.unwrap()
.query_one([category], |r| r.get(0))
.inspect_err(|e| error!("Failed to get count due to error\"{:?}\", returning 0", e))
.unwrap_or(0);
let cnt_category_total = conn
.prepare(&format!(
"SELECT COUNT(*) FROM {} WHERE category = ?1",
Self::TABLE_NAME
))
.ok()
.unwrap()
.query_one([category], |r| r.get(0))
.inspect_err(|e| error!("Failed to get count due to error\"{:?}\", returning 0", e))
.unwrap_or(0);
res.push((status, cnt_category_status, cnt_category_total));
}
res
}
} }
#[derive(Serialize, Debug, PartialEq, Copy, Clone)] #[derive(Serialize, Debug, PartialEq, Copy, Clone)]
@@ -318,7 +402,7 @@ impl DBTable for ItemAppearances {
current_bid_usd_cents INTEGER, current_bid_usd_cents INTEGER,
UNIQUE(item, timestamp), UNIQUE(item, timestamp),
FOREIGN KEY(item) REFERENCES Listings(item_id), FOREIGN KEY(item) REFERENCES Listings(item_id),
FOREIGN KEY(category, timestamp) REFERENCES Pages_Parsed(category, timestamp) FOREIGN KEY(category, timestamp) REFERENCES Pages(category, timestamp)
"; ";
fn get_all(conn: &Connection) -> rusqlite::Result<Vec<Self>> { fn get_all(conn: &Connection) -> rusqlite::Result<Vec<Self>> {
@@ -624,7 +708,7 @@ pub fn get_initialized(path: Option<&Path>) -> Connection {
SearchURL::initialize(&conn); SearchURL::initialize(&conn);
Listing::initialize(&conn); Listing::initialize(&conn);
ParsedStorage::initialize(&conn); ParsedStorage::initialize(&conn);
ParsedPage::initialize(&conn); Page::initialize(&conn);
ItemAppearances::initialize(&conn); ItemAppearances::initialize(&conn);
conn conn
@@ -644,7 +728,7 @@ pub fn get_stats(conn: &Connection) -> Stats {
rows_search_url: SearchURL::get_count(conn), rows_search_url: SearchURL::get_count(conn),
rows_listing: Listing::get_count(conn), rows_listing: Listing::get_count(conn),
rows_parsed_storage: ParsedStorage::get_count(conn), rows_parsed_storage: ParsedStorage::get_count(conn),
rows_parsed_page: ParsedPage::get_count(conn), rows_parsed_page: Page::get_count(conn),
rows_item_appearances: ItemAppearances::get_count(conn), rows_item_appearances: ItemAppearances::get_count(conn),
} }
} }
@@ -687,12 +771,28 @@ mod tests {
parsed.add_or_update(&db); parsed.add_or_update(&db);
assert_eq!(ParsedStorage::lookup(&db, listing.item_id), vec![parsed]); assert_eq!(ParsedStorage::lookup(&db, listing.item_id), vec![parsed]);
let page = ParsedPage { let page = Page {
category: "ssd".to_owned(), category: "ssd".to_owned(),
timestamp: std::time::SystemTime::now().into(), timestamp: std::time::SystemTime::now().into(),
status: ParsedPageStatus::PendingParse,
}; };
page.add_or_update(&db); page.add_or_update(&db);
assert_eq!(ParsedPage::lookup(&db, page.timestamp), Some(page.clone())); assert_eq!(Page::lookup(&db, page.timestamp), Some(page.clone()));
assert_eq!(
Page::lookup_status(&db, ParsedPageStatus::PendingParse, "ssd", 10),
vec![page.clone()]
);
assert_eq!(
Page::lookup_status(&db, ParsedPageStatus::Ready, "ssd", 10),
vec![]
);
assert_eq!(
Page::category_stats(&db, "ssd"),
vec![
(ParsedPageStatus::PendingParse, 1, 1),
(ParsedPageStatus::Ready, 0, 1)
]
);
let apperance = ItemAppearances { let apperance = ItemAppearances {
item: listing.item_id, item: listing.item_id,

View File

@@ -2,3 +2,4 @@ pub mod db;
pub mod parser; pub mod parser;
pub mod parser_ebay; pub mod parser_ebay;
pub mod parser_storage; pub mod parser_storage;
pub mod xdg_dirs;

View File

@@ -1,12 +1,14 @@
use actix_web::{App, HttpServer, Responder, Result, get, post, web, web::Data}; use actix_web::{App, HttpServer, Responder, Result, get, post, rt, web, web::Data};
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use clap::Parser; use clap::Parser;
use ebay_scraper_rust::db::{ use ebay_scraper_rust::db;
DBTable, ItemAppearances, Listing, ParsedPage, ParsedStorage, SearchURL, get_initialized, use ebay_scraper_rust::db::DBTable;
get_stats, listings_get_filtered, use ebay_scraper_rust::db::Page;
}; use ebay_scraper_rust::parser;
use ebay_scraper_rust::parser::parse_dir;
use ebay_scraper_rust::parser_storage; use ebay_scraper_rust::parser_storage;
use ebay_scraper_rust::xdg_dirs;
// use rt::mpsc;
// use rt::time::timeout;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::Mutex; use std::sync::Mutex;
@@ -18,8 +20,6 @@ use tracing_subscriber::fmt;
use tracing_subscriber::prelude::__tracing_subscriber_SubscriberExt; use tracing_subscriber::prelude::__tracing_subscriber_SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt; use tracing_subscriber::util::SubscriberInitExt;
mod xdg_dirs;
#[derive(Parser, Debug)] #[derive(Parser, Debug)]
#[clap( #[clap(
name = "ebay-scraper-rust", name = "ebay-scraper-rust",
@@ -41,7 +41,7 @@ async fn listings_filtered_get(
filter: web::Query<ListingsFilter>, filter: web::Query<ListingsFilter>,
) -> Result<impl Responder> { ) -> Result<impl Responder> {
let start = Instant::now(); let start = Instant::now();
let res = listings_get_filtered( let res = db::listings_get_filtered(
&db.lock().unwrap(), &db.lock().unwrap(),
&DateTime::<Utc>::from_timestamp(filter.since.unwrap_or(0), 0).unwrap(), &DateTime::<Utc>::from_timestamp(filter.since.unwrap_or(0), 0).unwrap(),
filter.limit.unwrap_or(1_000), filter.limit.unwrap_or(1_000),
@@ -61,7 +61,7 @@ async fn listing_get(
db: Data<Mutex<rusqlite::Connection>>, db: Data<Mutex<rusqlite::Connection>>,
id: web::Path<i64>, id: web::Path<i64>,
) -> Result<impl Responder> { ) -> Result<impl Responder> {
Ok(web::Json(Listing::lookup(&db.lock().unwrap(), *id))) Ok(web::Json(db::Listing::lookup(&db.lock().unwrap(), *id)))
} }
#[get("/listing/{id}/parsed")] #[get("/listing/{id}/parsed")]
@@ -69,7 +69,10 @@ async fn listing_parse_get(
db: Data<Mutex<rusqlite::Connection>>, db: Data<Mutex<rusqlite::Connection>>,
id: web::Path<i64>, id: web::Path<i64>,
) -> Result<impl Responder> { ) -> Result<impl Responder> {
Ok(web::Json(ParsedStorage::lookup(&db.lock().unwrap(), *id))) Ok(web::Json(db::ParsedStorage::lookup(
&db.lock().unwrap(),
*id,
)))
} }
#[derive(Serialize)] #[derive(Serialize)]
@@ -83,7 +86,7 @@ async fn listing_history_get(
db: Data<Mutex<rusqlite::Connection>>, db: Data<Mutex<rusqlite::Connection>>,
id: web::Path<i64>, id: web::Path<i64>,
) -> Result<impl Responder> { ) -> Result<impl Responder> {
let history: Vec<_> = ItemAppearances::lookup(&db.lock().unwrap(), *id) let history: Vec<_> = db::ItemAppearances::lookup(&db.lock().unwrap(), *id)
.iter() .iter()
// .inspect(|e| info!("got: {:?}", e)) // .inspect(|e| info!("got: {:?}", e))
.filter_map(|e| { .filter_map(|e| {
@@ -100,7 +103,7 @@ async fn listing_history_get(
async fn parse_listings(db: Data<Mutex<rusqlite::Connection>>) -> Result<impl Responder> { async fn parse_listings(db: Data<Mutex<rusqlite::Connection>>) -> Result<impl Responder> {
let mut cnt = 0; let mut cnt = 0;
let db_unlocked = db.lock().unwrap(); let db_unlocked = db.lock().unwrap();
Listing::lookup_non_parsed(&db_unlocked) db::Listing::lookup_non_parsed(&db_unlocked)
.iter() .iter()
.map(|l| parser_storage::parse_size_and_quantity(l.0, &l.1)) .map(|l| parser_storage::parse_size_and_quantity(l.0, &l.1))
.inspect(|_| cnt = cnt + 1) .inspect(|_| cnt = cnt + 1)
@@ -111,7 +114,33 @@ async fn parse_listings(db: Data<Mutex<rusqlite::Connection>>) -> Result<impl Re
#[get("/category")] #[get("/category")]
async fn category_getnames(db: Data<Mutex<rusqlite::Connection>>) -> Result<impl Responder> { async fn category_getnames(db: Data<Mutex<rusqlite::Connection>>) -> Result<impl Responder> {
Ok(web::Json(SearchURL::names(&db.lock().unwrap()))) Ok(web::Json(db::SearchURL::names(&db.lock().unwrap())))
}
async fn category_discover_worker(
db: Data<Mutex<rusqlite::Connection>>,
downloaddir: Data<PathBuf>,
category: web::Path<String>,
) {
}
#[post("/category/{category}/discover")]
#[instrument(skip_all)]
async fn category_discover(
db: Data<Mutex<rusqlite::Connection>>,
downloaddir: Data<PathBuf>,
category: web::Path<String>,
) -> Result<impl Responder> {
let start = Instant::now();
parser::add_pages(
&db.lock().unwrap(),
&downloaddir.join(category.clone()),
&category,
);
let elapsed = start.elapsed().as_micros() as f64 / 1000.0;
info!("Added many pages to the category, took {elapsed} ms.");
Ok("")
} }
#[post("/category/{category}/parse")] #[post("/category/{category}/parse")]
@@ -121,33 +150,40 @@ async fn category_parse(
downloaddir: Data<PathBuf>, downloaddir: Data<PathBuf>,
category: web::Path<String>, category: web::Path<String>,
) -> Result<impl Responder> { ) -> Result<impl Responder> {
let start = Instant::now(); parser::parse_pages(&db.lock().unwrap(), &downloaddir, &category, 100);
let count = parse_dir( Ok("")
&downloaddir.join(category.clone()), }
&category,
&db.lock().unwrap(),
)
.unwrap();
let elapsed = start.elapsed().as_micros() as f64 / 1000.0;
info!("Added {count} listings, took {elapsed} ms."); #[get("/category/{category}/parse")]
Ok(count.to_string()) #[instrument(skip_all)]
async fn category_parse_get(
db: Data<Mutex<rusqlite::Connection>>,
category: web::Path<String>,
) -> Result<impl Responder> {
let start = Instant::now();
let stats = Page::category_stats(&db.lock().unwrap(), &category);
stats
.iter()
.for_each(|(status, cnt, total)| info!("{:?} {} {}", status, cnt, total));
let elapsed = start.elapsed().as_micros() as f64 / 1000.0;
info!("Found, took {elapsed} ms.");
Ok(web::Json(stats))
} }
#[get("/stats")] #[get("/stats")]
async fn stats_get(db: Data<Mutex<rusqlite::Connection>>) -> Result<impl Responder> { async fn stats_get(db: Data<Mutex<rusqlite::Connection>>) -> Result<impl Responder> {
Ok(web::Json(get_stats(&db.lock().unwrap()))) Ok(web::Json(db::get_stats(&db.lock().unwrap())))
} }
#[get("/admin")] #[get("/admin")]
async fn admin_get(db: Data<Mutex<rusqlite::Connection>>) -> Result<impl Responder> { async fn admin_get(db: Data<Mutex<rusqlite::Connection>>) -> Result<impl Responder> {
let db = db.lock().unwrap(); let db = db.lock().unwrap();
let query_start_time = Instant::now(); let query_start_time = Instant::now();
let search_urls = SearchURL::get_all(&db).unwrap_or_default(); let search_urls = db::SearchURL::get_all(&db).unwrap_or_default();
let parsed_pages = ParsedPage::get_all(&db).unwrap_or_default(); let parsed_pages = db::Page::get_all(&db).unwrap_or_default();
let parsed_storages = ParsedStorage::get_all(&db).unwrap_or_default(); let parsed_storages = db::ParsedStorage::get_all(&db).unwrap_or_default();
let item_appearances = ItemAppearances::get_all(&db).unwrap_or_default(); let item_appearances = db::ItemAppearances::get_all(&db).unwrap_or_default();
let listings = Listing::get_all(&db).unwrap_or_default(); let listings = db::Listing::get_all(&db).unwrap_or_default();
let total_query_time = query_start_time.elapsed().as_micros() as f64 / 1000.0; let total_query_time = query_start_time.elapsed().as_micros() as f64 / 1000.0;
let html_gen_start_time = Instant::now(); let html_gen_start_time = Instant::now();
@@ -274,6 +310,10 @@ fn generate_table<T: Serialize>(title: &str, data: &[T]) -> String {
) )
} }
async fn pages_pickup() -> std::io::Result<()> {
Ok(())
}
#[actix_web::main] #[actix_web::main]
async fn main() -> std::io::Result<()> { async fn main() -> std::io::Result<()> {
tracing_subscriber::registry() tracing_subscriber::registry()
@@ -287,10 +327,10 @@ async fn main() -> std::io::Result<()> {
"Starting with scraped data dir of \"{}\".", "Starting with scraped data dir of \"{}\".",
scrapedatadir.to_str().unwrap() scrapedatadir.to_str().unwrap()
); );
let db_mutex = Data::new(Mutex::new(get_initialized(None))); let db_mutex = Data::new(Mutex::new(db::get_initialized(None)));
// Prepare our backend via pulling in what catagories we are preconfigured with. // Prepare our backend via pulling in what catagories we are preconfigured with.
SearchURL::scan(&db_mutex.lock().unwrap(), &scrapedatadir, "url.json"); db::SearchURL::scan(&db_mutex.lock().unwrap(), &scrapedatadir, "url.json");
HttpServer::new(move || { HttpServer::new(move || {
App::new() App::new()
@@ -298,10 +338,13 @@ async fn main() -> std::io::Result<()> {
.service(listing_get) .service(listing_get)
.service(listings_filtered_get) .service(listings_filtered_get)
.service(listing_history_get) .service(listing_history_get)
.service(listing_parse_get)
// Category handlers // Category handlers
.service(parse_listings) .service(parse_listings)
.service(category_parse) .service(category_parse)
.service(category_discover)
.service(category_getnames) .service(category_getnames)
.service(category_parse_get)
// Gnarly info dump // Gnarly info dump
.service(admin_get) .service(admin_get)
.service(stats_get) .service(stats_get)
@@ -312,4 +355,6 @@ async fn main() -> std::io::Result<()> {
.bind(("0.0.0.0", 9876))? .bind(("0.0.0.0", 9876))?
.run() .run()
.await .await
// tokio::join!(server, pages_pickup)
} }

View File

@@ -1,5 +1,5 @@
use crate::{ use crate::{
db::{ParsedPage, SearchURL}, db::{Page, ParsedPageStatus, SearchURL},
parser_ebay, parser_ebay,
}; };
use rayon::prelude::*; use rayon::prelude::*;
@@ -31,12 +31,12 @@ fn timestamps_from_dir(path: &Path) -> Vec<i64> {
.collect() .collect()
} }
pub fn parse_dir(dir: &Path, category: &str, db: &rusqlite::Connection) -> Option<usize> { pub fn add_pages(db: &rusqlite::Connection, dir: &Path, category: &str) {
// Ensure the category is created. // Ensure the category is created.
let url_fpath = dir.join("url.json"); let url_fpath = dir.join("url.json");
let url_contents = std::fs::read_to_string(&url_fpath) let url_contents = std::fs::read_to_string(&url_fpath)
.inspect_err(|e| error!("Failed reading {}: {e}", url_fpath.display())) .inspect_err(|e| error!("Failed reading {}: {e}", url_fpath.display()))
.ok()?; .unwrap();
#[derive(Deserialize)] #[derive(Deserialize)]
struct URLJSON { struct URLJSON {
@@ -50,62 +50,75 @@ pub fn parse_dir(dir: &Path, category: &str, db: &rusqlite::Connection) -> Optio
// See all pages haven't been seen before. // See all pages haven't been seen before.
let query_start_time = Instant::now(); let query_start_time = Instant::now();
let to_parse = timestamps_from_dir(dir).into_iter().filter(|t| { let to_parse = timestamps_from_dir(dir)
.into_iter()
.filter(|t| {
let ts = chrono::DateTime::from_timestamp(*t, 0).unwrap(); let ts = chrono::DateTime::from_timestamp(*t, 0).unwrap();
let p = ParsedPage::lookup(&db, ts); let p = Page::lookup(&db, ts);
// Timestamp never seen before, lets pass it on. // Timestamp never seen before, lets pass it on.
if p.is_none() { if p.is_none() {
info!( info!(
"Page Timestamp:{} Catagory:{category} never seen before, processing ...", "Page Timestamp:{} Catagory:{category} never seen before",
ts.timestamp() ts.timestamp()
); );
return true; return true;
} }
// Timestamp was seen before *and* from the same catagory, don't pass // Timestamp was seen before *and* from the same catagory, don't pass it on.
// it on.
if p.unwrap().category == *category { if p.unwrap().category == *category {
info!( info!(
"Page Timestamp:{} Catagory:{category} seen before, skipping ...", "Page Timestamp:{} Catagory:{category} seen before, skipping",
ts.timestamp() ts.timestamp()
); );
return false; return false;
} }
info!( info!(
"Page Timestamp:{} Catagory:{category} seen before, but not of catagory:{category}, processing ...", "Page Timestamp:{} Catagory:{category} seen before, but not of catagory:{category}",
ts.timestamp() ts.timestamp()
); );
return true; return true;
}).collect::<Vec<_>>(); })
.collect::<Vec<_>>();
let total_query_time = query_start_time.elapsed().as_micros() as f64 / 1000.0; let total_query_time = query_start_time.elapsed().as_micros() as f64 / 1000.0;
info!("Time spent finding pages to parse:{total_query_time} ms"); info!("Time spent finding pages to parse:{total_query_time} ms");
// For each page, read the file and parse it. // Say we are going to parse the pages.
let query_start_time = Instant::now();
let pages = to_parse.iter().map(|p| Page {
timestamp: chrono::DateTime::from_timestamp(*p, 0).unwrap(),
category: category.to_string(),
status: crate::db::ParsedPageStatus::PendingParse,
});
for p in pages {
p.add_or_update(&db);
}
let total_query_time = query_start_time.elapsed().as_micros() as f64 / 1000.0;
info!("Time spent inserting pages marked as ready to parse:{total_query_time} ms");
}
pub fn parse_pages(db: &rusqlite::Connection, dir: &Path, category: &str, batch: usize) {
let to_parse = Page::lookup_status(&db, ParsedPageStatus::PendingParse, category, batch);
let query_start_time = Instant::now(); let query_start_time = Instant::now();
let to_add = to_parse let to_add = to_parse
.par_iter() .par_iter()
.map(|p| { .map(|p| {
let ts = chrono::DateTime::from_timestamp(*p, 0).unwrap(); let page_path = dir
let paged_info = ParsedPage { .join(category)
timestamp: ts, .join(format!("{}.html", p.timestamp.timestamp()));
category: category.to_string(),
};
let page_path = dir.join(format!("{}.html", ts.timestamp()));
let page_contents = std::fs::read_to_string(&page_path) let page_contents = std::fs::read_to_string(&page_path)
.inspect_err(|e| error!("Failed reading {}, error:{e}", page_path.display())) .inspect_err(|e| error!("Failed reading {}, error:{e}", page_path.display()))
.ok()?; .ok()?;
let elements = let elements =
parser_ebay::parse_from_ebay_page(&page_contents, &ts, &category).unwrap(); parser_ebay::parse_from_ebay_page(&page_contents, &p.timestamp, &category).unwrap();
info!( info!(
"Page Timestamp:{} Catagory:{category}, found {} elements", "Page Timestamp:{} Catagory:{category}, found {} elements",
ts.timestamp(), p.timestamp.timestamp(),
elements.len() elements.len()
); );
Some((paged_info, elements)) Some((p, elements))
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let total_query_time = query_start_time.elapsed().as_micros() as f64 / 1000.0; let total_query_time = query_start_time.elapsed().as_micros() as f64 / 1000.0;
@@ -113,7 +126,6 @@ pub fn parse_dir(dir: &Path, category: &str, db: &rusqlite::Connection) -> Optio
// And lastly add it to our database! // And lastly add it to our database!
let query_start_time = Instant::now(); let query_start_time = Instant::now();
let mut added_count = 0;
for iter in to_add { for iter in to_add {
if iter.is_none() { if iter.is_none() {
continue; continue;
@@ -122,7 +134,6 @@ pub fn parse_dir(dir: &Path, category: &str, db: &rusqlite::Connection) -> Optio
paged_info.add_or_update(&db); paged_info.add_or_update(&db);
for e in elements { for e in elements {
added_count += 1;
e.0.add_or_update(&db); e.0.add_or_update(&db);
e.1.add_or_update(&db); e.1.add_or_update(&db);
debug!( debug!(
@@ -135,6 +146,4 @@ pub fn parse_dir(dir: &Path, category: &str, db: &rusqlite::Connection) -> Optio
} }
let total_query_time = query_start_time.elapsed().as_micros() as f64 / 1000.0; let total_query_time = query_start_time.elapsed().as_micros() as f64 / 1000.0;
info!("Time spent adding parsed pages: {total_query_time} ms"); info!("Time spent adding parsed pages: {total_query_time} ms");
return Some(added_count);
} }