Files
rothbard/sync.py
Bryce 3633923fa7 refactor: overhaul sync script with CLI modes, batch writes, and archive tracking
Add argparse with full/last_n/oldest_percent/hybrid/single sync modes.

Implement batch Firestore writes to reduce API overhead.

Add is_archived flag based on phase_name during sync.

Track last_synced_at on each project for incremental sync.

Improve logging with structured prefixes and worker summaries.

Remove dead code (duplicate date function, sync_single helper).
2026-05-12 23:40:25 -07:00

461 lines
20 KiB
Python

#!/usr/bin/env python3
"""
Sync script to fetch and store projects in Firestore
This can be run manually from the command line to update the projects collection
"""
import sys
import os
import concurrent.futures
import threading
from typing import List, Dict, Any, Optional
from datetime import datetime, timedelta
import pytz
from dotenv import load_dotenv
load_dotenv()
# Add the current directory to the Python path so we can import app and models
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
def batch_write_to_firestore(db, collection_name: str, documents: List[tuple], batch_size: int = 500):
"""Write documents to Firestore in batches from the main thread.
Args:
db: Firestore client
collection_name: Name of the collection
documents: List of (doc_id, data) tuples
batch_size: Number of documents per batch
"""
collection = db.collection(collection_name)
total = len(documents)
written = 0
for i in range(0, total, batch_size):
batch = documents[i:i + batch_size]
try:
write_batch = db.batch()
for doc_id, data in batch:
ref = collection.document(str(doc_id))
write_batch.set(ref, data)
write_batch.commit()
written += len(batch)
print(f"[BATCH] Wrote {written}/{total} documents")
except Exception as e:
print(f"[ERROR] Batch write failed: {e}")
print(f"[BATCH] Completed writing {written} documents to Firestore")
def convert_to_pacific_time(date_str):
"""Convert UTC date string to Pacific Time and format as YYYY-MM-DD.
Args:
date_str (str): UTC date string in ISO 8601 format (e.g., "2025-10-24T19:20:22.377Z")
Returns:
str: Date formatted as YYYY-MM-DD in Pacific Time, or empty string if input is empty
"""
if not date_str:
return ''
try:
# Parse the UTC datetime
utc_time = datetime.fromisoformat(date_str.replace('Z', '+00:00'))
# Set timezone to UTC
utc_time = utc_time.replace(tzinfo=pytz.UTC)
# Convert to Pacific Time
pacific_time = utc_time.astimezone(pytz.timezone('America/Los_Angeles'))
# Format as YYYY-MM-DD
return pacific_time.strftime('%Y-%m-%d')
except (ValueError, AttributeError) as e:
print(f"[WARN] Date conversion failed for '{date_str}': {e}")
return ''
def extract_domains_from_emails(emails: List[str]) -> List[str]:
"""Extract unique domains from a list of email addresses.
Args:
emails (List[str]): List of email addresses
Returns:
List[str]: List of unique domains extracted from the emails
"""
if not emails:
return []
domains = set()
for email in emails:
if email and '@' in email:
# Extract domain part after @
domain = email.split('@')[1].lower()
domains.add(domain)
return sorted(list(domains))
from models.project_model import ProjectModel
from filevine_client import FilevineClient
# Global thread-local storage for FilevineClient to avoid passing it around
_thread_local = threading.local()
def get_filevine_client():
"""Get FilevineClient from thread local storage"""
return getattr(_thread_local, 'client', None)
def set_filevine_client(client):
"""Set FilevineClient in thread local storage"""
_thread_local.client = client
def worker_init(client: FilevineClient):
"""Initialize worker with FilevineClient"""
set_filevine_client(client)
def process_project(index: int, total: int, project_data: dict, client: FilevineClient) -> Dict[str, Any]:
"""
Process a single project with all its API calls.
This is the function that will be executed by workers in parallel.
"""
# Set the FilevineClient for this thread
set_filevine_client(client)
p = project_data
pid = (p.get("projectId") or {}).get("native")
client = get_filevine_client()
if pid is None:
print(f"[SKIP] Missing projectId for item {index}")
return {}
project_name = p.get("projectName", "")
try:
c = client.fetch_client((p.get("clientId") or {}).get("native"))
cs = client.fetch_contacts(pid)
detail = client.fetch_project_detail(pid)
except Exception as e:
print(f"[ERROR] Failed to fetch essential data for project {pid} '{project_name}': {e}")
return {}
defendant_one = next((c.get('orgContact', {}) for c in cs if "Defendant" in c.get('orgContact', {}).get('personTypes', [])), {})
try:
new_file_review = client.fetch_form(pid, "newFileReview") or {}
dates_and_deadlines = client.fetch_form(pid, "datesAndDeadlines") or {}
service_info = client.fetch_collection(pid, "serviceInfo") or []
property_info = client.fetch_form(pid, "propertyInfo") or {}
matter_overview = client.fetch_form(pid, "matterOverview") or {}
fees_and_costs = client.fetch_form(pid, "feesAndCosts") or {}
property_contacts = client.fetch_form(pid, "propertyContacts") or {}
lease_info_np = client.fetch_form(pid, "leaseInfoNP") or {}
tasks_result = client.fetch_project_tasks(pid)
completed_tasks = [{"description": x.get("body"),
"completed": convert_to_pacific_time(x.get("completedDate"))}
for x in tasks_result.get("items", [])
if x.get("isCompleted")]
pending_tasks = [{"description": x.get("body"),
"completed": convert_to_pacific_time(x.get("completedDate"))}
for x in tasks_result.get("items", [])
if not x.get("isCompleted")]
team = client.fetch_project_team(pid)
assigned_attorney = next((m.get('fullname')
for m in team
if ('Assigned Attorney' in [r.get('name') for r in m.get('teamOrgRoles')])
), '')
primary_contact = next((m.get('fullname')
for m in team
if ('Primary' in [r.get('name') for r in m.get('teamOrgRoles')])
), '')
secondary_paralegal = next((m.get('fullname')
for m in team
if ('Secondary Paralegal' in [r.get('name') for r in m.get('teamOrgRoles')])
), '')
# Extract notice service and expiration dates
notice_service_date = convert_to_pacific_time(new_file_review.get("noticeServiceDate")) or ''
notice_expiration_date = convert_to_pacific_time(new_file_review.get("noticeExpirationDate")) or ''
# Extract daily rent damages
daily_rent_damages = lease_info_np.get("dailyRentDamages") or dates_and_deadlines.get("dailyRentDamages") or ''
# Extract default date
default_date = convert_to_pacific_time(dates_and_deadlines.get("defaultDate")) or ''
case_filed_date = convert_to_pacific_time(dates_and_deadlines.get("dateCaseFiled")) or ''
# Extract motion hearing dates
demurrer_hearing_date = convert_to_pacific_time(dates_and_deadlines.get("demurrerHearingDate")) or ''
motion_to_strike_hearing_date = convert_to_pacific_time(dates_and_deadlines.get("mTSHearingDate")) or ''
motion_to_quash_hearing_date = convert_to_pacific_time(dates_and_deadlines.get("mTQHearingDate")) or ''
other_motion_hearing_date = convert_to_pacific_time(dates_and_deadlines.get("otherMotion1HearingDate")) or ''
# Extract MSC details
msc_date = convert_to_pacific_time(dates_and_deadlines.get("mSCDate")) or ''
msc_time = dates_and_deadlines.get("mSCTime") or '' # Time field, not converting
msc_address = dates_and_deadlines.get("mSCAddress") or ''
msc_div_dept_room = dates_and_deadlines.get("mSCDeptDiv") or ''
# Extract trial details
trial_date = convert_to_pacific_time(dates_and_deadlines.get("trialDate")) or ''
trial_time = dates_and_deadlines.get("trialTime") or '' # Time field, not converting
trial_address = dates_and_deadlines.get("trialAddress") or ''
trial_div_dept_room = dates_and_deadlines.get("trialDeptDivRoom") or ''
# Extract final result of trial/MSC
final_result = dates_and_deadlines.get("finalResultOfTrialMSCCa") or ''
# Extract settlement details
date_of_settlement = convert_to_pacific_time(dates_and_deadlines.get("dateOfStipulation")) or ''
final_obligation = dates_and_deadlines.get("finalObligationUnderTheStip") or ''
def_comply_stip = dates_and_deadlines.get("defendantsComplyWithStip") or ''
# Extract judgment and writ details
judgment_date = convert_to_pacific_time(dates_and_deadlines.get("dateOfJudgment")) or ''
writ_issued_date = convert_to_pacific_time(dates_and_deadlines.get("writIssuedDate")) or ''
# Extract lockout and stay details
scheduled_lockout = convert_to_pacific_time(dates_and_deadlines.get("sheriffScheduledDate")) or ''
oppose_stays = dates_and_deadlines.get("opposeStays") or ''
# Extract premises safety and entry code
premises_safety = new_file_review.get("lockoutSafetyIssuesOrSpecialCareIssues") or ''
matter_gate_code = property_info.get("propertyEntryCodeOrInstructions") or ''
# Extract possession recovered date
date_possession_recovered = convert_to_pacific_time(dates_and_deadlines.get("datePossessionRecovered")) or ''
# Extract attorney fees and costs
attorney_fees = fees_and_costs.get("totalAttorneysFees") or ''
costs = fees_and_costs.get("totalCosts") or ''
property_managers = [property_contacts.get('propertyManager1'), property_contacts.get('propertyManager2'), property_contacts.get('propertyManager3'), property_contacts.get('propertyManager4')]
valid_property_managers = [e.get('address').lower() for pm in property_managers if pm and pm.get('emails') for e in pm.get('emails') if e and e.get('address')]
row = ProjectModel(
client=c.get("firstName", ""),
matter_description=p.get("projectName", ""),
defendant_1=defendant_one.get('fullName', 'Unknown'),
matter_open=convert_to_pacific_time(dates_and_deadlines.get("dateCaseFiled") or p.get("createdDate")),
notice_type=new_file_review.get("noticeType", '') or '',
case_number=dates_and_deadlines.get('caseNumber', '') or '',
premises_address=property_info.get("premisesAddressWithUnit", "") or '',
premises_city=property_info.get("premisesCity", "") or '',
responsible_attorney=assigned_attorney,
staff_person=primary_contact,
staff_person_2=secondary_paralegal,
phase_name=p.get("phaseName", ""),
completed_tasks=completed_tasks,
pending_tasks=pending_tasks,
notice_service_date=notice_service_date,
notice_expiration_date=notice_expiration_date,
case_field_date=case_filed_date,
case_filed_date=case_filed_date,
daily_rent_damages=daily_rent_damages,
default_date=default_date,
demurrer_hearing_date=demurrer_hearing_date,
motion_to_strike_hearing_date=motion_to_strike_hearing_date,
motion_to_quash_hearing_date=motion_to_quash_hearing_date,
other_motion_hearing_date=other_motion_hearing_date,
msc_date=msc_date,
msc_time=msc_time,
msc_address=msc_address,
msc_div_dept_room=msc_div_dept_room,
trial_date=trial_date,
trial_time=trial_time,
trial_address=trial_address,
trial_div_dept_room=trial_div_dept_room,
final_result=final_result,
date_of_settlement=date_of_settlement,
final_obligation=final_obligation,
def_comply_stip=def_comply_stip,
judgment_date=judgment_date,
writ_issued_date=writ_issued_date,
scheduled_lockout=scheduled_lockout,
oppose_stays=oppose_stays,
premises_safety=premises_safety,
matter_gate_code=matter_gate_code,
date_possession_recovered=date_possession_recovered,
attorney_fees=attorney_fees,
costs=costs,
documents_url=matter_overview.get('documentShareFolderURL', '') or '',
service_attempt_date_1=convert_to_pacific_time(next(iter(service_info), {}).get('serviceDate')),
contacts=cs,
project_email_address=p.get("projectEmailAddress", ""),
number=p.get("number", "") or matter_overview.get('matterNumber', ''),
incident_date=convert_to_pacific_time(p.get("incidentDate") or detail.get("incidentDate")),
project_id=pid,
project_name=p.get("projectName") or detail.get("projectName"),
project_url=p.get("projectUrl") or detail.get("projectUrl"),
#property_contacts=property_contacts
viewing_emails = valid_property_managers,
viewing_domains = extract_domains_from_emails(valid_property_managers),
last_synced_at=datetime.now(pytz.UTC).isoformat()
)
print(f"[{index}/{total}] Saved: {pid} | Matter {row.number} | {project_name}")
return row.to_dict()
except Exception as e:
print(f"[ERROR] Failed to process project {pid} '{project_name}': {e}")
import traceback
traceback.print_exc()
return {}
def process_projects_parallel(projects: List[dict], client: FilevineClient, max_workers: int = 10) -> List[Dict[str, Any]]:
"""
Process projects in parallel using a worker pool.
Args:
projects: List of project data dictionaries
client: FilevineClient instance
max_workers: Number of concurrent workers (default 10)
Returns:
List of processed project dictionaries
"""
total = len(projects)
success_count = 0
fail_count = 0
print(f"[WORKERS] Starting parallel processing of {total} projects with {max_workers} workers...")
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers, initializer=worker_init, initargs=(client,)) as executor:
future_to_project = {executor.submit(process_project, indx, total, project, client): project for indx, project in enumerate(projects)}
results = []
for future in concurrent.futures.as_completed(future_to_project):
try:
result = future.result()
if result and result.get('ProjectId'):
success_count += 1
else:
fail_count += 1
results.append(result)
except Exception as e:
fail_count += 1
print(f"[ERROR] Worker thread failed: {e}")
results.append({})
print(f"[WORKERS] Completed: {success_count} succeeded, {fail_count} failed, {total} total")
return results
def get_oldest_unsynced_projects(db, fraction: float = 0.2) -> List[int]:
"""Get the oldest fraction of projects by last_synced_at from Firestore.
Args:
db: Firestore client
fraction: Fraction of projects to return (default 0.2 = 1/5th)
Returns:
List of project IDs (native) that need syncing
"""
try:
projects_ref = db.collection("projects")
all_docs = list(projects_ref.stream())
total = len(all_docs)
count_to_sync = max(1, int(total * fraction))
# Sort by last_synced_at ascending (empty strings first, then oldest timestamps)
sorted_docs = sorted(all_docs, key=lambda doc: doc.to_dict().get("last_synced_at", ""))
selected_docs = sorted_docs[:count_to_sync]
result_ids = [int(doc.id) for doc in selected_docs if doc.id and doc.id != "None"]
print(f"[SYNC STRATEGY] {total} projects in Firestore, will sync oldest {len(result_ids)} ({fraction*100:.0f}%)")
if selected_docs:
sample = selected_docs[0].to_dict()
print(f"[SYNC STRATEGY] Oldest: ID={result_ids[0]}, last_synced_at='{sample.get('last_synced_at', 'N/A')}'")
if len(selected_docs) > 1:
sample = selected_docs[-1].to_dict()
print(f"[SYNC STRATEGY] Cutoff: ID={result_ids[-1]}, last_synced_at='{sample.get('last_synced_at', 'N/A')}'")
return result_ids
except Exception as e:
print(f"[ERROR] Failed to get oldest unsynced projects: {e}")
import traceback
traceback.print_exc()
return []
def main():
"""Main function to fetch and sync projects"""
import argparse
parser = argparse.ArgumentParser(description='Sync Filevine projects to Firestore')
parser.add_argument('--mode', choices=['full', 'last_n', 'oldest_percent', 'hybrid', 'single'],
default='hybrid', help='Sync mode: full=all projects, last_n=recently active, oldest_percent=oldest by last_synced_at, hybrid=last_n+oldest_percent, single=one project')
parser.add_argument('--days', type=int, default=14, help='Number of days for last_n mode (default: 14)')
parser.add_argument('--percent', type=float, default=20.0, help='Percentage for oldest_percent mode (default: 20)')
parser.add_argument('--project-id', type=int, help='Project ID for single mode (required when mode=single)')
args = parser.parse_args()
if args.mode == 'single' and not args.project_id:
parser.error("--project-id is required when mode is 'single'")
print(f"[SYNC] Starting sync - mode={args.mode}, workers=10")
try:
client = FilevineClient()
client.get_bearer_token()
from app import db
if args.mode == 'full':
print("[MODE] Full sync - fetching all projects")
projects = client.list_all_projects()
elif args.mode == 'last_n':
days_ago = (datetime.now() - timedelta(days=args.days)).strftime('%Y-%m-%d')
print(f"[MODE] Last {args.days} days - fetching active since {days_ago}")
projects = client.list_all_projects(latest_activity_since=days_ago)
elif args.mode == 'oldest_percent':
fraction = args.percent / 100.0
oldest_ids = get_oldest_unsynced_projects(db, fraction=fraction)
print(f"[MODE] Oldest {args.percent}% - fetching {len(oldest_ids)} projects")
all_projects = client.list_all_projects()
projects = [p for p in all_projects if p.get("projectId", {}).get("native") in set(oldest_ids)]
elif args.mode == 'single':
print(f"[MODE] Single project - fetching project {args.project_id}")
project_detail = client.fetch_project_detail(args.project_id)
projects = [project_detail] if project_detail else []
elif args.mode == 'hybrid':
print("[MODE] Hybrid - active + oldest")
days_ago = (datetime.now() - timedelta(days=args.days)).strftime('%Y-%m-%d')
active_projects = client.list_all_projects(latest_activity_since=days_ago)
active_ids = {p.get("projectId", {}).get("native") for p in active_projects}
print(f"[SYNC] {len(active_projects)} active since {days_ago}")
fraction = args.percent / 100.0
oldest_ids = get_oldest_unsynced_projects(db, fraction=fraction)
all_ids_to_sync = active_ids.union(set(oldest_ids))
print(f"[SYNC] {len(all_ids_to_sync)} total unique to sync")
all_projects = client.list_all_projects()
projects = [p for p in all_projects if p.get("projectId", {}).get("native") in all_ids_to_sync]
# Process projects in parallel
detailed_rows = process_projects_parallel(projects, client, max_workers=10)
# Batch write all results to Firestore
documents = []
for row in detailed_rows:
if row.get('ProjectId'):
row['is_archived'] = (row.get('phase_name') == 'Archived')
documents.append((row.get('ProjectId'), row))
batch_write_to_firestore(db, "projects", documents)
print(f"[SYNC] Complete - {len(documents)} projects saved to Firestore")
except Exception as e:
print(f"Error during sync: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
if __name__ == "__main__":
main()