From 3633923fa76cd0296eb12613cc11614692e342b9 Mon Sep 17 00:00:00 2001 From: Bryce Date: Tue, 12 May 2026 23:40:25 -0700 Subject: [PATCH] refactor: overhaul sync script with CLI modes, batch writes, and archive tracking Add argparse with full/last_n/oldest_percent/hybrid/single sync modes. Implement batch Firestore writes to reduce API overhead. Add is_archived flag based on phase_name during sync. Track last_synced_at on each project for incremental sync. Improve logging with structured prefixes and worker summaries. Remove dead code (duplicate date function, sync_single helper). --- sync.py | 218 +++++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 151 insertions(+), 67 deletions(-) diff --git a/sync.py b/sync.py index fc34901..57ed024 100644 --- a/sync.py +++ b/sync.py @@ -9,7 +9,7 @@ import os import concurrent.futures import threading from typing import List, Dict, Any, Optional -from datetime import datetime +from datetime import datetime, timedelta import pytz from dotenv import load_dotenv load_dotenv() @@ -17,6 +17,34 @@ load_dotenv() # Add the current directory to the Python path so we can import app and models sys.path.append(os.path.dirname(os.path.abspath(__file__))) +def batch_write_to_firestore(db, collection_name: str, documents: List[tuple], batch_size: int = 500): + """Write documents to Firestore in batches from the main thread. + + Args: + db: Firestore client + collection_name: Name of the collection + documents: List of (doc_id, data) tuples + batch_size: Number of documents per batch + """ + collection = db.collection(collection_name) + total = len(documents) + written = 0 + + for i in range(0, total, batch_size): + batch = documents[i:i + batch_size] + try: + write_batch = db.batch() + for doc_id, data in batch: + ref = collection.document(str(doc_id)) + write_batch.set(ref, data) + write_batch.commit() + written += len(batch) + print(f"[BATCH] Wrote {written}/{total} documents") + except Exception as e: + print(f"[ERROR] Batch write failed: {e}") + + print(f"[BATCH] Completed writing {written} documents to Firestore") + def convert_to_pacific_time(date_str): """Convert UTC date string to Pacific Time and format as YYYY-MM-DD. @@ -67,22 +95,6 @@ def extract_domains_from_emails(emails: List[str]) -> List[str]: return sorted(list(domains)) - try: - # Parse the UTC datetime - utc_time = datetime.fromisoformat(date_str.replace('Z', '+00:00')) - - # Set timezone to UTC - utc_time = utc_time.replace(tzinfo=pytz.UTC) - - # Convert to Pacific Time - pacific_time = utc_time.astimezone(pytz.timezone('America/Los_Angeles')) - - # Format as YYYY-MM-DD - return pacific_time.strftime('%m/%d/%Y') - except (ValueError, AttributeError) as e: - print(f"[WARN] Date conversion failed for '{date_str}': {e}") - return '' - from models.project_model import ProjectModel from filevine_client import FilevineClient @@ -111,18 +123,19 @@ def process_project(index: int, total: int, project_data: dict, client: Filevine p = project_data pid = (p.get("projectId") or {}).get("native") - print(f"Working on {pid} ({index}/{total})") client = get_filevine_client() if pid is None: + print(f"[SKIP] Missing projectId for item {index}") return {} + project_name = p.get("projectName", "") try: c = client.fetch_client((p.get("clientId") or {}).get("native")) cs = client.fetch_contacts(pid) detail = client.fetch_project_detail(pid) except Exception as e: - print(f"[WARN] Failed to fetch essential data for {pid}: {e}") + print(f"[ERROR] Failed to fetch essential data for project {pid} '{project_name}': {e}") return {} defendant_one = next((c.get('orgContact', {}) for c in cs if "Defendant" in c.get('orgContact', {}).get('personTypes', [])), {}) @@ -171,9 +184,6 @@ def process_project(index: int, total: int, project_data: dict, client: Filevine # Extract default date default_date = convert_to_pacific_time(dates_and_deadlines.get("defaultDate")) or '' case_filed_date = convert_to_pacific_time(dates_and_deadlines.get("dateCaseFiled")) or '' - cf = dates_and_deadlines.get("dateCaseFiled") - from pprint import pprint - print(f"CASE FILED {case_filed_date} {cf}") # Extract motion hearing dates demurrer_hearing_date = convert_to_pacific_time(dates_and_deadlines.get("demurrerHearingDate")) or '' @@ -219,12 +229,8 @@ def process_project(index: int, total: int, project_data: dict, client: Filevine # Extract attorney fees and costs attorney_fees = fees_and_costs.get("totalAttorneysFees") or '' costs = fees_and_costs.get("totalCosts") or '' - from pprint import pprint property_managers = [property_contacts.get('propertyManager1'), property_contacts.get('propertyManager2'), property_contacts.get('propertyManager3'), property_contacts.get('propertyManager4')] - import itertools - # valid_property_managers = list(itertools.chain(*)) valid_property_managers = [e.get('address').lower() for pm in property_managers if pm and pm.get('emails') for e in pm.get('emails') if e and e.get('address')] - print(valid_property_managers) row = ProjectModel( @@ -284,79 +290,165 @@ def process_project(index: int, total: int, project_data: dict, client: Filevine project_url=p.get("projectUrl") or detail.get("projectUrl"), #property_contacts=property_contacts viewing_emails = valid_property_managers, - viewing_domains = extract_domains_from_emails(valid_property_managers) + viewing_domains = extract_domains_from_emails(valid_property_managers), + last_synced_at=datetime.now(pytz.UTC).isoformat() ) - # Store the results in Firestore - from app import db # Import db from app - - projects_ref = db.collection("projects") - from pprint import pprint - # pprint([p.get("number"), property_info, new_file_review]) - - # Add new projects - project_id = row.project_id - if project_id: - projects_ref.document(str(project_id)).set(row.to_dict()) - - print(f"Finished on {pid} Matter {row.number} ({index}/{total})") + print(f"[{index}/{total}] Saved: {pid} | Matter {row.number} | {project_name}") return row.to_dict() except Exception as e: - print(f"[ERROR] Processing failed for {pid}: {e}") + print(f"[ERROR] Failed to process project {pid} '{project_name}': {e}") + import traceback + traceback.print_exc() return {} -def process_projects_parallel(projects: List[dict], client: FilevineClient, max_workers: int = 9) -> List[Dict[str, Any]]: +def process_projects_parallel(projects: List[dict], client: FilevineClient, max_workers: int = 10) -> List[Dict[str, Any]]: """ Process projects in parallel using a worker pool. Args: projects: List of project data dictionaries client: FilevineClient instance - max_workers: Number of concurrent workers (default 9) + max_workers: Number of concurrent workers (default 10) Returns: List of processed project dictionaries """ - # Create a thread pool with specified number of workers total = len(projects) + success_count = 0 + fail_count = 0 + + print(f"[WORKERS] Starting parallel processing of {total} projects with {max_workers} workers...") + with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers, initializer=worker_init, initargs=(client,)) as executor: - # Submit all tasks to the executor future_to_project = {executor.submit(process_project, indx, total, project, client): project for indx, project in enumerate(projects)} - # Collect results as they complete results = [] for future in concurrent.futures.as_completed(future_to_project): try: result = future.result() + if result and result.get('ProjectId'): + success_count += 1 + else: + fail_count += 1 results.append(result) except Exception as e: - print(f"[ERROR] Processing failed: {e}") - # Add empty dict or handle error appropriately + fail_count += 1 + print(f"[ERROR] Worker thread failed: {e}") results.append({}) + print(f"[WORKERS] Completed: {success_count} succeeded, {fail_count} failed, {total} total") return results +def get_oldest_unsynced_projects(db, fraction: float = 0.2) -> List[int]: + """Get the oldest fraction of projects by last_synced_at from Firestore. + + Args: + db: Firestore client + fraction: Fraction of projects to return (default 0.2 = 1/5th) + + Returns: + List of project IDs (native) that need syncing + """ + try: + projects_ref = db.collection("projects") + all_docs = list(projects_ref.stream()) + total = len(all_docs) + count_to_sync = max(1, int(total * fraction)) + + # Sort by last_synced_at ascending (empty strings first, then oldest timestamps) + sorted_docs = sorted(all_docs, key=lambda doc: doc.to_dict().get("last_synced_at", "")) + selected_docs = sorted_docs[:count_to_sync] + result_ids = [int(doc.id) for doc in selected_docs if doc.id and doc.id != "None"] + + print(f"[SYNC STRATEGY] {total} projects in Firestore, will sync oldest {len(result_ids)} ({fraction*100:.0f}%)") + if selected_docs: + sample = selected_docs[0].to_dict() + print(f"[SYNC STRATEGY] Oldest: ID={result_ids[0]}, last_synced_at='{sample.get('last_synced_at', 'N/A')}'") + if len(selected_docs) > 1: + sample = selected_docs[-1].to_dict() + print(f"[SYNC STRATEGY] Cutoff: ID={result_ids[-1]}, last_synced_at='{sample.get('last_synced_at', 'N/A')}'") + return result_ids + + except Exception as e: + print(f"[ERROR] Failed to get oldest unsynced projects: {e}") + import traceback + traceback.print_exc() + return [] + + def main(): """Main function to fetch and sync projects""" - print("Starting project sync...") + import argparse + + parser = argparse.ArgumentParser(description='Sync Filevine projects to Firestore') + parser.add_argument('--mode', choices=['full', 'last_n', 'oldest_percent', 'hybrid', 'single'], + default='hybrid', help='Sync mode: full=all projects, last_n=recently active, oldest_percent=oldest by last_synced_at, hybrid=last_n+oldest_percent, single=one project') + parser.add_argument('--days', type=int, default=14, help='Number of days for last_n mode (default: 14)') + parser.add_argument('--percent', type=float, default=20.0, help='Percentage for oldest_percent mode (default: 20)') + parser.add_argument('--project-id', type=int, help='Project ID for single mode (required when mode=single)') + args = parser.parse_args() + + if args.mode == 'single' and not args.project_id: + parser.error("--project-id is required when mode is 'single'") + + print(f"[SYNC] Starting sync - mode={args.mode}, workers=10") try: - # Initialize Filevine client client = FilevineClient() - bearer = client.get_bearer_token() + client.get_bearer_token() + from app import db - # List projects (all pages) with filter for projects updated in the last 7 days - from datetime import datetime, timedelta - seven_days_ago = (datetime.now() - timedelta(days=7)).strftime('%Y-%m-%d') - projects = client.list_all_projects(latest_activity_since=seven_days_ago) + if args.mode == 'full': + print("[MODE] Full sync - fetching all projects") + projects = client.list_all_projects() + + elif args.mode == 'last_n': + days_ago = (datetime.now() - timedelta(days=args.days)).strftime('%Y-%m-%d') + print(f"[MODE] Last {args.days} days - fetching active since {days_ago}") + projects = client.list_all_projects(latest_activity_since=days_ago) + + elif args.mode == 'oldest_percent': + fraction = args.percent / 100.0 + oldest_ids = get_oldest_unsynced_projects(db, fraction=fraction) + print(f"[MODE] Oldest {args.percent}% - fetching {len(oldest_ids)} projects") + + all_projects = client.list_all_projects() + projects = [p for p in all_projects if p.get("projectId", {}).get("native") in set(oldest_ids)] + + elif args.mode == 'single': + print(f"[MODE] Single project - fetching project {args.project_id}") + project_detail = client.fetch_project_detail(args.project_id) + projects = [project_detail] if project_detail else [] + + elif args.mode == 'hybrid': + print("[MODE] Hybrid - active + oldest") - #projects = [p for p in projects if (p.get("projectId") or {}).get("native") == 15914808] - #projects = projects[:10] + days_ago = (datetime.now() - timedelta(days=args.days)).strftime('%Y-%m-%d') + active_projects = client.list_all_projects(latest_activity_since=days_ago) + active_ids = {p.get("projectId", {}).get("native") for p in active_projects} + print(f"[SYNC] {len(active_projects)} active since {days_ago}") + + fraction = args.percent / 100.0 + oldest_ids = get_oldest_unsynced_projects(db, fraction=fraction) + + all_ids_to_sync = active_ids.union(set(oldest_ids)) + print(f"[SYNC] {len(all_ids_to_sync)} total unique to sync") + + all_projects = client.list_all_projects() + projects = [p for p in all_projects if p.get("projectId", {}).get("native") in all_ids_to_sync] # Process projects in parallel - detailed_rows = process_projects_parallel(projects, client, 9) + detailed_rows = process_projects_parallel(projects, client, max_workers=10) + # Batch write all results to Firestore + documents = [] + for row in detailed_rows: + if row.get('ProjectId'): + row['is_archived'] = (row.get('phase_name') == 'Archived') + documents.append((row.get('ProjectId'), row)) + batch_write_to_firestore(db, "projects", documents) - print(f"Successfully synced {len(detailed_rows)} projects to Firestore") + print(f"[SYNC] Complete - {len(documents)} projects saved to Firestore") except Exception as e: print(f"Error during sync: {e}") @@ -364,13 +456,5 @@ def main(): traceback.print_exc() sys.exit(1) -def sync_single(x): - client = FilevineClient() - z = process_project(0, 1, client.fetch_project_detail(x), client) - from pprint import pprint - - #pprint(z) - - if __name__ == "__main__": main()