Files
ebay_scraper_rust/src/db.rs
hak8or 9331b55e08
All checks were successful
Cargo Build & Test / Rust project - latest (1.85.1) (push) Successful in 4m31s
Cargo Build & Test / Rust project - latest (1.86) (push) Successful in 4m40s
Cargo Build & Test / Rust project - latest (1.87) (push) Successful in 4m55s
Cargo Build & Test / Rust project - latest (1.88) (push) Successful in 10m2s
Ehhh, lets rethink this ... (parallel status)
2025-09-01 10:26:37 -04:00

816 lines
25 KiB
Rust

use chrono::{DateTime, Utc};
use rusqlite::Connection;
use serde::Deserialize;
use serde::Serialize;
use std::path::Path;
use strum::{EnumIter, IntoEnumIterator};
// use strum_macros::EnumIter;
use tracing::{error, info};
pub trait DBTable {
const TABLE_NAME: &'static str;
const TABLE_SCHEMA: &'static str;
fn initialize(conn: &Connection) {
let create_table = &format!(
"CREATE TABLE IF NOT EXISTS {} (
{}
)",
Self::TABLE_NAME,
Self::TABLE_SCHEMA
);
info!("Creating table with following schema;");
info!("{} ({})", Self::TABLE_NAME, Self::TABLE_SCHEMA);
conn.execute(create_table, ()).unwrap();
}
fn get_count(conn: &Connection) -> i64 {
let mut stmt = conn
.prepare(&format!("SELECT COUNT(*) FROM {}", Self::TABLE_NAME))
.ok()
.unwrap();
stmt.query_one([], |r| r.get(0))
.inspect_err(|e| error!("Failed to get count due to error\"{:?}\", returning 0", e))
.unwrap_or(0)
}
fn get_all(conn: &Connection) -> rusqlite::Result<Vec<Self>>
where
Self: Sized;
}
#[derive(Serialize, Debug, PartialEq, Clone)]
pub struct SearchURL {
pub full_url: String,
pub name: String,
}
impl DBTable for SearchURL {
const TABLE_NAME: &'static str = "SearchURLs";
const TABLE_SCHEMA: &'static str = "
id INTEGER PRIMARY KEY,
url TEXT NOT NULL UNIQUE,
name TEXT NOT NULL UNIQUE";
fn get_all(conn: &Connection) -> rusqlite::Result<Vec<Self>> {
let mut stmt = conn.prepare(&format!("SELECT url, name FROM {}", Self::TABLE_NAME))?;
let iter = stmt.query_map([], |row| {
Ok(SearchURL {
full_url: row.get(0)?,
name: row.get(1)?,
})
})?;
let mut result = Vec::new();
for item in iter {
result.push(item?);
}
Ok(result)
}
}
impl SearchURL {
pub fn lookup(conn: &Connection, name: &str) -> Option<Self> {
let mut stmt = conn
.prepare(&format!(
"SELECT * FROM {} WHERE name = ?",
Self::TABLE_NAME
))
.ok()?;
stmt.query_one([name], |row| {
Ok(SearchURL {
// id: row.get(0)?,
full_url: row.get(1)?,
name: row.get(2)?,
})
})
.ok()
}
pub fn add_or_update(&self, conn: &Connection) {
let _ = conn
.execute(
&format!(
"INSERT OR REPLACE INTO {} (name, url) VALUES (?1, ?2)",
Self::TABLE_NAME
),
(&self.name, &self.full_url),
)
.unwrap();
}
pub fn names(conn: &Connection) -> Vec<String> {
let mut stmt = conn
.prepare(&format!("SELECT name FROM {}", Self::TABLE_NAME))
.ok()
.unwrap();
stmt.query_map([], |row| Ok(row.get(0)))
.ok()
.unwrap()
.map(|e| e.unwrap())
.flatten()
.collect()
}
pub fn scan(conn: &Connection, downloads_dir: &Path, filename: &str) {
// Grab all directories.
let dirs = std::fs::read_dir(downloads_dir)
.unwrap()
.filter_map(|e| Some(e.ok()?.path()))
.filter(|e| e.is_dir());
#[derive(Deserialize)]
struct URLJSON {
url: String,
}
// Grab url JSON's.
for dir in dirs {
let url_fpath = dir.join(filename);
if !url_fpath.exists() {
info!("Skipping {:?} as file does not exist", url_fpath);
continue;
}
let url_contents = std::fs::read_to_string(&url_fpath)
.inspect_err(|e| error!("Failed reading {}: {e}", url_fpath.display()))
.unwrap();
let su = SearchURL {
full_url: serde_json::from_str::<URLJSON>(&url_contents).unwrap().url,
name: dir.file_name().unwrap().to_str().unwrap().to_owned(),
};
info!("Adding {:?} to search urls table", su);
su.add_or_update(&conn);
}
}
}
#[derive(Serialize, Debug, PartialEq, Clone, EnumIter)]
pub enum ParsedPageStatus {
PendingParse,
Ready,
}
impl TryFrom<i64> for ParsedPageStatus {
type Error = rusqlite::Error;
fn try_from(value: i64) -> Result<Self, Self::Error> {
match value {
0 => Ok(ParsedPageStatus::PendingParse),
1 => Ok(ParsedPageStatus::Ready),
_ => Err(rusqlite::Error::InvalidColumnType(
2,
"Invalid integer of {} for ParsedPageStatus".to_string(),
rusqlite::types::Type::Integer,
)),
}
}
}
#[derive(Serialize, Debug, PartialEq, Clone)]
pub struct Page {
pub timestamp: DateTime<Utc>,
pub category: String,
pub status: ParsedPageStatus,
}
impl DBTable for Page {
const TABLE_NAME: &'static str = "Pages";
const TABLE_SCHEMA: &'static str = "
id INTEGER PRIMARY KEY,
category TEXT NOT NULL,
timestamp INTEGER NOT NULL,
status INTEGER NOT NULL,
UNIQUE(category, timestamp)
FOREIGN KEY(category) REFERENCES SearchURLs(name)
";
fn get_all(conn: &Connection) -> rusqlite::Result<Vec<Self>> {
let mut stmt = conn.prepare(&format!(
"SELECT category, timestamp, status FROM {}",
Self::TABLE_NAME
))?;
let iter = stmt.query_map([], |row| {
Ok(Page {
category: row.get(0)?,
timestamp: row.get(1)?,
status: row.get::<_, i64>(2)?.try_into().unwrap(),
})
})?;
let mut result = Vec::new();
for item in iter {
result.push(item?);
}
Ok(result)
}
}
impl Page {
pub fn lookup(conn: &Connection, timestamp: DateTime<Utc>) -> Option<Self> {
let mut stmt = conn
.prepare(&format!(
"SELECT * FROM {} WHERE timestamp = ?",
Self::TABLE_NAME
))
.ok()?;
stmt.query_one([timestamp], |row| {
Ok(Page {
// id: row.get(0)?,
category: row.get(1)?,
timestamp: row.get(2)?,
status: row.get::<_, i64>(3)?.try_into().unwrap(),
})
})
.ok()
}
pub fn add_or_update(&self, conn: &Connection) {
let _ = conn
.execute(
&format!(
"INSERT OR REPLACE INTO {} (category, timestamp, status) VALUES (?1, ?2, ?3)",
Self::TABLE_NAME
),
(&self.category, self.timestamp, self.status.clone() as i64),
)
.unwrap();
}
pub fn lookup_status(
conn: &Connection,
status: ParsedPageStatus,
category: &str,
max: usize,
) -> Vec<Self> {
let mut stmt = conn
.prepare(&format!(
"SELECT category, timestamp, status FROM {} WHERE status = {} AND category = ?1 LIMIT {}",
Self::TABLE_NAME,
status.clone() as i64,
max
))
.unwrap();
stmt.query_map([category], |row| {
Ok(Self {
category: row.get(0)?,
timestamp: row.get(1)?,
status: row.get::<_, i64>(2)?.try_into().unwrap(),
})
})
.unwrap()
.inspect(|e| info!("debugging saw {:?}", e))
.filter_map(|e| e.ok())
.collect()
}
pub fn category_stats(conn: &Connection, category: &str) -> Vec<(ParsedPageStatus, i64, i64)> {
let mut res: Vec<(ParsedPageStatus, i64, i64)> = vec![];
for status in ParsedPageStatus::iter() {
let cnt_category_status = conn
.prepare(&format!(
"SELECT COUNT(*) FROM {} WHERE category = ?1 AND status = {}",
Self::TABLE_NAME,
status.clone() as i64
))
.ok()
.unwrap()
.query_one([category], |r| r.get(0))
.inspect_err(|e| error!("Failed to get count due to error\"{:?}\", returning 0", e))
.unwrap_or(0);
let cnt_category_total = conn
.prepare(&format!(
"SELECT COUNT(*) FROM {} WHERE category = ?1",
Self::TABLE_NAME
))
.ok()
.unwrap()
.query_one([category], |r| r.get(0))
.inspect_err(|e| error!("Failed to get count due to error\"{:?}\", returning 0", e))
.unwrap_or(0);
res.push((status, cnt_category_status, cnt_category_total));
}
res
}
}
#[derive(Serialize, Debug, PartialEq, Copy, Clone)]
pub struct ParsedStorage {
pub id: i64,
pub item: i64,
pub total_gigabytes: i64,
pub quantity: i64,
pub individual_size_gigabytes: i64,
pub parse_engine: i64,
pub needed_description_check: bool,
}
impl DBTable for ParsedStorage {
const TABLE_NAME: &'static str = "Storage_Parsed";
const TABLE_SCHEMA: &'static str = "
id INTEGER PRIMARY KEY,
item INTEGER,
total_gigabytes INTEGER,
quantity INTEGER,
sizes_gigabytes TEXT,
parse_engine INTEGER,
need_description_check INTEGER,
UNIQUE(item, parse_engine)
FOREIGN KEY(item) REFERENCES Listings(item_id)
";
fn get_all(conn: &Connection) -> rusqlite::Result<Vec<Self>> {
let mut stmt = conn.prepare(&format!("SELECT id, item, total_gigabytes, quantity, sizes_gigabytes, parse_engine, need_description_check FROM {}", Self::TABLE_NAME))?;
let iter = stmt.query_map([], |row| {
Ok(ParsedStorage {
id: row.get(0)?,
item: row.get(1)?,
total_gigabytes: row.get(2)?,
quantity: row.get(3)?,
individual_size_gigabytes: {
let r: String = row.get(4)?;
r.parse().unwrap_or(0)
},
parse_engine: row.get(5)?,
needed_description_check: row.get(6)?,
})
})?;
let mut result = Vec::new();
for item in iter {
result.push(item?);
}
Ok(result)
}
}
impl ParsedStorage {
pub fn lookup(conn: &Connection, item: i64) -> Vec<ParsedStorage> {
let mut stmt = conn
.prepare(&format!(
"SELECT * FROM {} WHERE item = ?",
Self::TABLE_NAME
))
.ok()
.unwrap();
stmt.query_map([item], |row| {
Ok(ParsedStorage {
id: row.get(0)?,
item: row.get(1)?,
total_gigabytes: row.get(2)?,
quantity: row.get(3)?,
individual_size_gigabytes: {
let r: String = row.get(4)?;
r.parse().unwrap()
},
parse_engine: row.get(5)?,
needed_description_check: row.get(6)?,
})
})
.ok()
.unwrap()
.map(|e| e.unwrap())
.collect()
}
pub fn add_or_update(&self, conn: &Connection) {
let _ = conn.execute(&format!("
INSERT OR REPLACE INTO {}
(item, total_gigabytes, quantity, sizes_gigabytes, parse_engine, need_description_check)
VALUES
(?1, ?2, ?3, ?4, ?5, ?6)",
Self::TABLE_NAME),
(
&self.item,
self.total_gigabytes,
self.quantity,
self.individual_size_gigabytes.to_string(),
self.parse_engine,
self.needed_description_check
)
).unwrap();
}
}
#[derive(Serialize, Debug, PartialEq, Clone)]
pub struct ItemAppearances {
pub item: i64,
pub timestamp: DateTime<Utc>,
pub category: String,
pub current_bid_usd_cents: Option<i64>,
}
impl DBTable for ItemAppearances {
const TABLE_NAME: &'static str = "Item_Appearances";
const TABLE_SCHEMA: &'static str = "
id INTEGER PRIMARY KEY,
item INTEGER NOT NULL,
category TEXT NOT NULL,
timestamp INTEGER NOT NULL,
current_bid_usd_cents INTEGER,
UNIQUE(item, timestamp),
FOREIGN KEY(item) REFERENCES Listings(item_id),
FOREIGN KEY(category, timestamp) REFERENCES Pages(category, timestamp)
";
fn get_all(conn: &Connection) -> rusqlite::Result<Vec<Self>> {
let mut stmt = conn.prepare(&format!(
"SELECT item, category, timestamp, current_bid_usd_cents FROM {}",
Self::TABLE_NAME
))?;
let iter = stmt.query_map([], |row| {
Ok(ItemAppearances {
item: row.get(0)?,
category: row.get(1)?,
timestamp: row.get(2)?,
current_bid_usd_cents: row.get(3)?,
})
})?;
let mut result = Vec::new();
for item in iter {
result.push(item?);
}
Ok(result)
}
}
impl ItemAppearances {
pub fn add_or_update(&self, conn: &Connection) {
let count = conn
.execute(
&format!(
"
INSERT OR REPLACE INTO {}
(item, timestamp, category, current_bid_usd_cents)
VALUES
(?1, ?2, ?3, ?4)",
Self::TABLE_NAME
),
(
self.item,
&self.timestamp,
&self.category,
self.current_bid_usd_cents,
),
)
.unwrap();
if count != 1 {
panic!("Expected count to be 1 but got {}", count);
}
}
pub fn lookup(conn: &Connection, listing_id: i64) -> Vec<ItemAppearances> {
let mut stmt = conn
.prepare(&format!(
"
SELECT * FROM {}
WHERE item IS ?1",
Self::TABLE_NAME,
))
.ok()
.unwrap();
stmt.query_map([listing_id], |row| {
Ok(ItemAppearances {
item: row.get(1)?,
category: row.get(2)?,
timestamp: row.get(3)?,
current_bid_usd_cents: row.get(4)?,
})
})
.ok()
.unwrap()
.map(|e| e.unwrap())
.collect()
}
}
#[derive(Serialize, Debug, PartialEq, Clone)]
pub struct Listing {
pub id: i64,
pub item_id: i64,
pub title: String,
pub buy_it_now_price_cents: Option<i64>,
pub has_best_offer: bool,
pub image_url: String,
}
impl DBTable for Listing {
const TABLE_NAME: &'static str = "Listings";
const TABLE_SCHEMA: &'static str = "
id INTEGER PRIMARY KEY,
item_id INTEGER NOT NULL UNIQUE,
title TEXT NOT NULL,
buy_it_now_usd_cents INTEGER,
has_best_offer INTEGER NOT NULL,
image_url TEXT NOT NULL
";
fn get_all(conn: &Connection) -> rusqlite::Result<Vec<Self>> {
let mut stmt = conn.prepare(&format!(
"SELECT id, item_id, title, buy_it_now_usd_cents, has_best_offer, image_url FROM {}",
Self::TABLE_NAME
))?;
let iter = stmt.query_map([], |row| {
Ok(Listing {
id: row.get(0)?,
item_id: row.get(1)?,
title: row.get(2)?,
buy_it_now_price_cents: row.get(3)?,
has_best_offer: row.get(4)?,
image_url: row.get(5)?,
})
})?;
let mut result = Vec::new();
for item in iter {
result.push(item?);
}
Ok(result)
}
}
impl Listing {
pub fn lookup(conn: &Connection, item_id: i64) -> Option<Listing> {
let mut stmt = conn
.prepare(&format!(
"SELECT * FROM {} WHERE item_id = ?",
Self::TABLE_NAME
))
.ok()?;
stmt.query_one([item_id], |row| {
Ok(Listing {
id: row.get(0)?,
item_id: row.get(1)?,
title: row.get(2)?,
buy_it_now_price_cents: row.get(3)?,
has_best_offer: row.get(4)?,
image_url: row.get(5)?,
})
})
.ok()
}
pub fn lookup_since(conn: &Connection, since: DateTime<Utc>, limit: i64) -> Vec<Self> {
let mut stmt = conn
.prepare(&format!(
"
SELECT *
FROM {0}
WHERE EXISTS (
SELECT 1
FROM {1}
WHERE
{1}.item = {0}.item_id AND
{1}.timestamp >= ?1
)
LIMIT {2}
",
Self::TABLE_NAME,
ItemAppearances::TABLE_NAME,
limit
))
.ok()
.unwrap();
stmt.query_map([since], |row| {
Ok(Listing {
id: row.get(0)?,
item_id: row.get(1)?,
title: row.get(2)?,
buy_it_now_price_cents: row.get(3)?,
has_best_offer: row.get(4)?,
image_url: row.get(5)?,
})
})
.ok()
.unwrap()
.map(|e| e.unwrap())
.collect()
}
pub fn lookup_non_parsed(conn: &Connection) -> Vec<(i64, String)> {
let mut stmt = conn
.prepare(&format!(
"
SELECT ei.item_id, ei.title FROM {} AS ei
LEFT JOIN {} AS sp ON ei.item_id = sp.item
WHERE sp.item IS NULL",
Self::TABLE_NAME,
ParsedStorage::TABLE_NAME
))
.ok()
.unwrap();
stmt.query_map([], |row| Ok((row.get(0)?, row.get(1)?)))
.ok()
.unwrap()
.map(|e| e.unwrap())
.collect()
}
pub fn add_or_update(&self, conn: &Connection) {
let count = conn
.execute(
&format!(
"INSERT OR REPLACE INTO {}
(
item_id,
title,
buy_it_now_usd_cents,
has_best_offer,
image_url
)
VALUES (?1, ?2, ?3, ?4, ?5)",
Self::TABLE_NAME
),
(
self.item_id,
&self.title,
self.buy_it_now_price_cents,
self.has_best_offer,
self.image_url.clone(),
),
)
.unwrap();
if count != 1 {
panic!("Expected count to be 1 but got {}", count);
}
}
}
#[derive(Serialize, Debug)]
pub struct ListingsFilterResult {
listing: Listing,
history: Vec<ItemAppearances>,
parsed: Vec<ParsedStorage>,
}
pub fn listings_get_filtered(
conn: &Connection,
since: &DateTime<Utc>,
limit: i64,
cents_per_tbytes_max: i64,
) -> Vec<ListingsFilterResult> {
// First grab all appearances seen since the timestamp including their
// history and parsings.
let listings_recent = Listing::lookup_since(conn, *since, 100_000)
.into_iter()
.map(|l| ListingsFilterResult {
listing: l.clone(),
history: ItemAppearances::lookup(conn, l.item_id),
parsed: ParsedStorage::lookup(conn, l.item_id),
})
.filter(|lr| lr.parsed.iter().any(|p| !p.needed_description_check))
.collect::<Vec<ListingsFilterResult>>();
info!(
"Found total {} listings since (str:{} epoch:{})",
listings_recent.len(),
*since,
since.timestamp()
);
// Then for each listing grab if within our price range.
let listings: Vec<ListingsFilterResult> = listings_recent
.into_iter()
.filter_map(|l| {
let mut history = l.history.clone();
history.sort_by_key(|h| h.timestamp);
// info!("item_id:{} history: {:?}", l.listing.item_id, history);
let cents = history
.last()
.map(|h| h.current_bid_usd_cents)
.unwrap_or(l.listing.buy_it_now_price_cents)?;
// info!("item_id:{} cents: {:?}", l.listing.item_id, cents);
let mut parses = l.parsed.clone();
parses.sort_by_key(|p| p.parse_engine);
// info!("item_id:{} parses: {:?}", l.listing.item_id, parses);
let gb = parses.last()?.total_gigabytes;
// info!("item_id:{} gb: {:?}", l.listing.item_id, gb);
let usd_per_tb = (cents as f64 / 100.0) / (gb as f64 / 1024.0);
// info!(
// "item_id: {}, gb:{}, cents:{}, usd_per_tb:{}, cents_per_tbytes_max:{}",
// l.listing.item_id, gb, cents, usd_per_tb, cents_per_tbytes_max
// );
if usd_per_tb >= (cents_per_tbytes_max as f64 / 100.0) {
None
} else {
Some(l)
}
})
.take(limit as usize)
.collect();
info!(
"Found total {} listings since (str:{} epoch:{}) filtered by price",
listings.len(),
*since,
since.timestamp()
);
listings
}
pub fn get_initialized(path: Option<&Path>) -> Connection {
let conn = match path {
Some(p) => Connection::open(&p),
None => Connection::open_in_memory(),
}
.unwrap();
SearchURL::initialize(&conn);
Listing::initialize(&conn);
ParsedStorage::initialize(&conn);
Page::initialize(&conn);
ItemAppearances::initialize(&conn);
conn
}
#[derive(Serialize, Debug)]
pub struct Stats {
rows_search_url: i64,
rows_listing: i64,
rows_parsed_storage: i64,
rows_parsed_page: i64,
rows_item_appearances: i64,
}
pub fn get_stats(conn: &Connection) -> Stats {
Stats {
rows_search_url: SearchURL::get_count(conn),
rows_listing: Listing::get_count(conn),
rows_parsed_storage: ParsedStorage::get_count(conn),
rows_parsed_page: Page::get_count(conn),
rows_item_appearances: ItemAppearances::get_count(conn),
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test_log::test]
fn sanity_check() {
let db = get_initialized(None);
let searchurl = SearchURL {
full_url: "google".to_owned(),
name: "ssd".to_owned(),
};
searchurl.add_or_update(&db);
assert_eq!(SearchURL::lookup(&db, &searchurl.name), Some(searchurl));
let listing = Listing {
id: 1,
item_id: 1234,
title: "Some Title".to_string(),
buy_it_now_price_cents: Some(123),
has_best_offer: false,
image_url: "google.com".to_string(),
};
listing.add_or_update(&db);
assert_eq!(Listing::lookup(&db, listing.item_id), Some(listing.clone()));
let parsed = ParsedStorage {
id: 1,
item: 1234,
total_gigabytes: 13,
quantity: 3,
individual_size_gigabytes: 13,
parse_engine: 9,
needed_description_check: true,
};
parsed.add_or_update(&db);
assert_eq!(ParsedStorage::lookup(&db, listing.item_id), vec![parsed]);
let page = Page {
category: "ssd".to_owned(),
timestamp: std::time::SystemTime::now().into(),
status: ParsedPageStatus::PendingParse,
};
page.add_or_update(&db);
assert_eq!(Page::lookup(&db, page.timestamp), Some(page.clone()));
assert_eq!(
Page::lookup_status(&db, ParsedPageStatus::PendingParse, "ssd", 10),
vec![page.clone()]
);
assert_eq!(
Page::lookup_status(&db, ParsedPageStatus::Ready, "ssd", 10),
vec![]
);
assert_eq!(
Page::category_stats(&db, "ssd"),
vec![
(ParsedPageStatus::PendingParse, 1, 1),
(ParsedPageStatus::Ready, 0, 1)
]
);
let apperance = ItemAppearances {
item: listing.item_id,
timestamp: page.timestamp,
category: page.category,
current_bid_usd_cents: Some(1233),
};
apperance.add_or_update(&db);
assert_eq!(
ItemAppearances::lookup(&db, listing.item_id),
vec![apperance]
);
assert_eq!(Listing::lookup_since(&db, page.timestamp, 3), vec![listing]);
assert_eq!(
Listing::lookup_since(&db, page.timestamp + chrono::Duration::seconds(1), 3),
vec![]
);
}
}