Added stats, parallel parsing of pages, and filtered fetch of listings
All checks were successful
Cargo Build & Test / Rust project - latest (1.85.1) (push) Successful in 3m34s
Cargo Build & Test / Rust project - latest (1.87) (push) Successful in 4m3s
Cargo Build & Test / Rust project - latest (1.86) (push) Successful in 4m13s
Cargo Build & Test / Rust project - latest (1.88) (push) Successful in 9m44s

This commit is contained in:
2025-07-06 23:38:27 -04:00
parent bbca1f3bcb
commit 448933ae67
7 changed files with 528 additions and 206 deletions

View File

@@ -1,15 +1,17 @@
use actix_web::{App, HttpServer, Responder, Result, get, post, web, web::Data};
use chrono::DateTime;
use chrono::{DateTime, Utc};
use clap::Parser;
use ebay_scraper_rust::db::{
DBTable, ItemAppearances, Listing, ParsedPage, ParsedStorage, SearchURL, get_initialized,
get_stats, listings_get_filtered,
};
use ebay_scraper_rust::{parser_ebay, parser_storage};
use ebay_scraper_rust::parser::parse_dir;
use ebay_scraper_rust::parser_storage;
use serde::{Deserialize, Serialize};
use std::path::{Path, PathBuf};
use std::path::PathBuf;
use std::sync::Mutex;
use std::time::Instant;
use tracing::{error, info, instrument};
use tracing::{info, instrument};
use tracing_subscriber::filter::EnvFilter;
use tracing_subscriber::fmt;
@@ -26,28 +28,32 @@ mod xdg_dirs;
)]
struct Args {}
#[get("/page/{timestamp}")]
async fn page_get(
db: Data<Mutex<rusqlite::Connection>>,
timestamp: web::Path<i64>,
) -> Result<impl Responder> {
Ok(web::Json(ParsedPage::lookup(
&db.lock().unwrap(),
chrono::DateTime::from_timestamp(*timestamp, 0).unwrap(),
)))
#[derive(Deserialize, Debug)]
struct ListingsFilter {
since: Option<i64>,
limit: Option<i64>,
cents_per_tbytes_max: Option<i64>,
}
#[get("/listing/{id}/history")]
async fn listing_history_get(
#[get("/listings")]
async fn listings_filtered_get(
db: Data<Mutex<rusqlite::Connection>>,
id: web::Path<i64>,
filter: web::Query<ListingsFilter>,
) -> Result<impl Responder> {
let history: Vec<_> = ItemAppearances::lookup(&db.lock().unwrap(), *id)
.iter()
.inspect(|e| info!("got: {:?}", e))
.filter_map(|e| Some((e.timestamp, e.current_bid_usd_cents?)))
.collect();
Ok(web::Json(history))
let start = Instant::now();
let res = listings_get_filtered(
&db.lock().unwrap(),
&DateTime::<Utc>::from_timestamp(filter.since.unwrap_or(0), 0).unwrap(),
filter.limit.unwrap_or(1_000),
filter.cents_per_tbytes_max.unwrap_or(100_00),
);
let elapsed = start.elapsed().as_micros() as f64 / 1000.0;
info!(
"Took {elapsed} milliseconds with {} listings found for a filter of {:?}",
res.len(),
filter
);
Ok(web::Json(res))
}
#[get("/listing/{id}")]
@@ -70,7 +76,39 @@ async fn listing_since_get(
)))
}
#[post("listing/parse")]
#[get("/listing/{id}/parsed")]
async fn listing_parse_get(
db: Data<Mutex<rusqlite::Connection>>,
id: web::Path<i64>,
) -> Result<impl Responder> {
Ok(web::Json(ParsedStorage::lookup(&db.lock().unwrap(), *id)))
}
#[derive(Serialize)]
struct APIHistory {
when: DateTime<Utc>,
current_bid_usd_cents: i64,
}
#[get("/listing/{id}/history")]
async fn listing_history_get(
db: Data<Mutex<rusqlite::Connection>>,
id: web::Path<i64>,
) -> Result<impl Responder> {
let history: Vec<_> = ItemAppearances::lookup(&db.lock().unwrap(), *id)
.iter()
// .inspect(|e| info!("got: {:?}", e))
.filter_map(|e| {
Some(APIHistory {
when: e.timestamp,
current_bid_usd_cents: e.current_bid_usd_cents?,
})
})
.collect();
Ok(web::Json(history))
}
#[post("/listing/parse")]
async fn parse_listings(db: Data<Mutex<rusqlite::Connection>>) -> Result<impl Responder> {
let mut cnt = 0;
let db_unlocked = db.lock().unwrap();
@@ -83,130 +121,37 @@ async fn parse_listings(db: Data<Mutex<rusqlite::Connection>>) -> Result<impl Re
Ok(web::Json(cnt))
}
#[get("listing/parse/{id}")]
async fn listing_parse_get(
db: Data<Mutex<rusqlite::Connection>>,
id: web::Path<i64>,
) -> Result<impl Responder> {
Ok(web::Json(ParsedStorage::lookup(&db.lock().unwrap(), *id)))
#[get("/category")]
async fn category_getnames(db: Data<Mutex<rusqlite::Connection>>) -> Result<impl Responder> {
Ok(web::Json(SearchURL::names(&db.lock().unwrap())))
}
pub fn timestamps_from_dir(path: &Path) -> Vec<i64> {
if !std::fs::exists(path).expect("Directory must exist") {
panic!(
"Directory {:?} does not exist, cannot grab timestamps from there.",
path
);
}
std::fs::read_dir(path)
.unwrap()
.map(|fpath| fpath.unwrap().path())
.filter_map(|fstem| {
fstem
.file_stem()
.and_then(|s| s.to_str())
.expect("Invalid file name")
.parse()
.ok()
})
.collect()
}
#[post("page/parse/{category}")]
#[post("/category/{category}/parse")]
#[instrument(skip_all)]
async fn parse_post(
async fn category_parse(
db: Data<Mutex<rusqlite::Connection>>,
downloaddir: Data<PathBuf>,
category: web::Path<String>,
) -> Result<impl Responder> {
let dir = &downloaddir.join(category.clone());
let start = Instant::now();
let count = parse_dir(
&downloaddir.join(category.clone()),
&category,
&db.lock().unwrap(),
)
.unwrap();
let elapsed = start.elapsed().as_micros() as f64 / 1000.0;
// Ensure the category is created.
let url_fpath = dir.join("url.json");
let url_contents = std::fs::read_to_string(&url_fpath)
.inspect_err(|e| error!("Failed reading {}: {e}", url_fpath.display()))?;
#[derive(Deserialize)]
struct URLJSON {
url: String,
}
let su = SearchURL {
full_url: serde_json::from_str::<URLJSON>(&url_contents).unwrap().url,
name: category.to_string(),
};
su.add_or_update(&db.lock().unwrap());
// Find all pages.
let pages = timestamps_from_dir(dir);
// See what pages haven't been seen before.
let to_parse = pages.iter().filter(|t| {
let ts = chrono::DateTime::from_timestamp(**t, 0).unwrap();
let p = ParsedPage::lookup(&db.lock().unwrap(), ts);
// Timestamp never seen before, lets pass it on.
if p.is_none() {
info!(
"Page of timestamp:{} and catagory:{category} never seen before, processing ...",
ts.timestamp()
);
return true;
}
// Timestamp was seen before *and* from the same catagory, don't pass
// it on.
if p.unwrap().category == *category {
info!(
"Page of timestamp:{} and catagory:{category} seen before, skipping ...",
ts.timestamp()
);
return false;
}
info!(
"Page of timestamp:{} seen before, but not of catagory:{category}, processing ...",
ts.timestamp()
);
return true;
});
let mut added_count = 0;
for p in to_parse {
let ts = chrono::DateTime::from_timestamp(*p, 0).unwrap();
ParsedPage {
timestamp: ts,
category: category.to_string(),
}
.add_or_update(&db.lock().unwrap());
let page_path = dir.join(format!("{}.html", ts.timestamp()));
let page_contents = std::fs::read_to_string(&page_path)
.inspect_err(|e| error!("Failed reading {}, error:{e}", page_path.display()))?;
let elements = parser_ebay::extract_data_from_html(&page_contents, &ts, &category).unwrap();
info!(
"Page {} contains {} elements",
ts.timestamp(),
elements.len()
);
added_count += elements.len();
for e in elements {
e.0.add_or_update(&db.lock().unwrap());
e.1.add_or_update(&db.lock().unwrap());
info!(
"From page {}, inserting id:{}, title:{}",
ts.timestamp(),
e.0.item_id,
e.0.title
);
}
}
info!("Added {added_count} listings");
Ok(added_count.to_string())
info!("Added {count} listings, took {elapsed} ms.");
Ok(count.to_string())
}
#[get("admin")]
#[get("/stats")]
async fn stats_get(db: Data<Mutex<rusqlite::Connection>>) -> Result<impl Responder> {
Ok(web::Json(get_stats(&db.lock().unwrap())))
}
#[get("/admin")]
async fn admin_get(db: Data<Mutex<rusqlite::Connection>>) -> Result<impl Responder> {
let db = db.lock().unwrap();
let query_start_time = Instant::now();
@@ -240,6 +185,12 @@ async fn admin_get(db: Data<Mutex<rusqlite::Connection>>) -> Result<impl Respond
"#,
);
html.push_str(&generate_table("SearchURLs", &search_urls));
html.push_str(&generate_table("Pages_Parsed", &parsed_pages));
html.push_str(&generate_table("Storage_Parsed", &parsed_storages));
html.push_str(&generate_table("Item_Appearances", &item_appearances));
html.push_str(&generate_table("Listings", &listings));
// Performance Metrics
let html_gen_time = html_gen_start_time.elapsed().as_micros() as f64 / 1000.0;
html.push_str(&format!(
@@ -251,23 +202,6 @@ async fn admin_get(db: Data<Mutex<rusqlite::Connection>>) -> Result<impl Respond
));
info!("DB Query ms: {total_query_time}, HTML Generation ms:{html_gen_time}");
// --- Tables ---
// SearchURLs
html.push_str(&generate_table("SearchURLs", &search_urls));
// ParsedPages
html.push_str(&generate_table("Pages_Parsed", &parsed_pages));
// ParsedStorage
html.push_str(&generate_table("Storage_Parsed", &parsed_storages));
// ItemAppearances
html.push_str(&generate_table("Item_Appearances", &item_appearances));
// Listings
html.push_str(&generate_table("Listings", &listings));
// Footer and Scripts
html.push_str(
r#"
@@ -279,42 +213,77 @@ async fn admin_get(db: Data<Mutex<rusqlite::Connection>>) -> Result<impl Respond
</body>
</html>"#,
);
Ok(web::Html::new(&html))
}
fn generate_table<T: Serialize>(title: &str, data: &[T]) -> String {
let mut table_html = format!(
"<h2>{} ({} rows)</h2><table class='pure-table pure-table-bordered sortable-table'><thead><tr>",
title,
data.len()
);
use serde_json::Value;
if data.len() > 0 {
for header in serde_json::to_value(&data[0])
.unwrap()
.as_object()
.unwrap()
.keys()
{
table_html.push_str(&format!("<th>{}</th>", header));
}
table_html.push_str("</tr></thead><tbody>");
for item in data {
table_html.push_str("<tr>");
let item_json = serde_json::to_value(item).unwrap();
if let Some(obj) = item_json.as_object() {
for (_key, value) in obj.iter() {
table_html
.push_str(&format!("<td>{}</td>", value.to_string().replace("\"", "")));
}
}
table_html.push_str("</tr>");
}
if data.is_empty() {
return format!(
"<h2>{} (0 rows)</h2><table class='pure-table pure-table-bordered pure-table-striped sortable-table'><thead><tr></tr></thead><tbody></tbody></table>",
title
);
}
table_html.push_str("</tbody></table>");
table_html
let mut headers: Vec<String> = serde_json::to_value(&data[0])
.unwrap_or(Value::Null)
.as_object()
.map_or(Vec::new(), |obj| obj.keys().cloned().collect());
// Define the desired order for specific columns.
let desired_order = ["id", "item", "item_id", "timestamp"];
// Sort the headers. Columns in `desired_order` come first,
// in that order. The rest are sorted alphabetically.
headers.sort_by(|a, b| {
let a_pos = desired_order
.iter()
.position(|&p| p == a)
.unwrap_or(usize::MAX);
let b_pos = desired_order
.iter()
.position(|&p| p == b)
.unwrap_or(usize::MAX);
a_pos.cmp(&b_pos).then_with(|| a.cmp(b))
});
// Create the HTML for the table header row.
let header_html = headers
.iter()
.map(|header| format!("<th>{}</th>", header))
.collect::<String>();
// Create the HTML for all the table body rows.
let body_html = data
.iter()
.map(|item| {
let item_json = serde_json::to_value(item).unwrap_or(Value::Null);
let obj = item_json.as_object();
// Create all cells for a single row.
let cells_html = headers
.iter()
.map(|header| {
let value = obj.and_then(|o| o.get(header)).unwrap_or(&Value::Null);
// Remove quotes from the resulting JSON string value for cleaner output.
format!("<td>{}</td>", value.to_string().replace('"', ""))
})
.collect::<String>();
format!("<tr>{}</tr>", cells_html)
})
.collect::<String>();
// Assemble the final table HTML.
format!(
"<h2>{} ({} rows)</h2><table class='pure-table pure-table-bordered pure-table-striped sortable-table'><thead><tr>{}</tr></thead><tbody>{}</tbody></table>",
title,
data.len(),
header_html,
body_html
)
}
#[actix_web::main]
@@ -332,15 +301,25 @@ async fn main() -> std::io::Result<()> {
);
let db_mutex = Data::new(Mutex::new(get_initialized(None)));
// Prepare our backend via pulling in what catagories we are preconfigured with.
SearchURL::scan(&db_mutex.lock().unwrap(), &scrapedatadir, "url.json");
HttpServer::new(move || {
App::new()
.service(page_get)
// .service(page_get)
// Listing handlers
.service(listing_get)
.service(listings_filtered_get)
.service(listing_history_get)
.service(listing_since_get)
.service(parse_post)
// Category handlers
.service(parse_listings)
.service(category_parse)
.service(category_getnames)
// Gnarly info dump
.service(admin_get)
.service(stats_get)
// Stuff which is passed into every request.
.app_data(db_mutex.clone())
.app_data(Data::new(scrapedatadir.clone()))
})