Files
rothbard/sync.py
2025-11-20 22:42:33 -08:00

326 lines
14 KiB
Python

#!/usr/bin/env python3
"""
Sync script to fetch and store projects in Firestore
This can be run manually from the command line to update the projects collection
"""
import sys
import os
import concurrent.futures
import threading
from typing import List, Dict, Any, Optional
from datetime import datetime
import pytz
from dotenv import load_dotenv
load_dotenv()
# Add the current directory to the Python path so we can import app and models
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
def convert_to_pacific_time(date_str):
"""Convert UTC date string to Pacific Time and format as YYYY-MM-DD.
Args:
date_str (str): UTC date string in ISO 8601 format (e.g., "2025-10-24T19:20:22.377Z")
Returns:
str: Date formatted as YYYY-MM-DD in Pacific Time, or empty string if input is empty
"""
if not date_str:
return ''
try:
# Parse the UTC datetime
utc_time = datetime.fromisoformat(date_str.replace('Z', '+00:00'))
# Set timezone to UTC
utc_time = utc_time.replace(tzinfo=pytz.UTC)
# Convert to Pacific Time
pacific_time = utc_time.astimezone(pytz.timezone('America/Los_Angeles'))
# Format as YYYY-MM-DD
return pacific_time.strftime('%m/%d/%Y')
except (ValueError, AttributeError) as e:
print(f"[WARN] Date conversion failed for '{date_str}': {e}")
return ''
from models.project_model import ProjectModel
from filevine_client import FilevineClient
# Global thread-local storage for FilevineClient to avoid passing it around
_thread_local = threading.local()
def get_filevine_client():
"""Get FilevineClient from thread local storage"""
return getattr(_thread_local, 'client', None)
def set_filevine_client(client):
"""Set FilevineClient in thread local storage"""
_thread_local.client = client
def worker_init(client: FilevineClient):
"""Initialize worker with FilevineClient"""
set_filevine_client(client)
def process_project(index: int, total: int, project_data: dict, client: FilevineClient) -> Dict[str, Any]:
"""
Process a single project with all its API calls.
This is the function that will be executed by workers in parallel.
"""
# Set the FilevineClient for this thread
set_filevine_client(client)
p = project_data
pid = (p.get("projectId") or {}).get("native")
print(f"Working on {pid} ({index}/{total})")
client = get_filevine_client()
if pid is None:
return {}
try:
c = client.fetch_client((p.get("clientId") or {}).get("native"))
cs = client.fetch_contacts(pid)
detail = client.fetch_project_detail(pid)
except Exception as e:
print(f"[WARN] Failed to fetch essential data for {pid}: {e}")
return {}
defendant_one = next((c.get('orgContact', {}) for c in cs if "Defendant" in c.get('orgContact', {}).get('personTypes', [])), {})
try:
new_file_review = client.fetch_form(pid, "newFileReview") or {}
dates_and_deadlines = client.fetch_form(pid, "datesAndDeadlines") or {}
service_info = client.fetch_collection(pid, "serviceInfo") or []
property_info = client.fetch_form(pid, "propertyInfo") or {}
matter_overview = client.fetch_form(pid, "matterOverview") or {}
fees_and_costs = client.fetch_form(pid, "feesAndCosts") or {}
property_contacts = client.fetch_form(pid, "propertyContacts") or {}
lease_info_np = client.fetch_form(pid, "leaseInfoNP") or {}
tasks_result = client.fetch_project_tasks(pid)
completed_tasks = [{"description": x.get("body"),
"completed": convert_to_pacific_time(x.get("completedDate"))}
for x in tasks_result.get("items", [])
if x.get("isCompleted")]
pending_tasks = [{"description": x.get("body"),
"completed": convert_to_pacific_time(x.get("completedDate"))}
for x in tasks_result.get("items", [])
if not x.get("isCompleted")]
team = client.fetch_project_team(pid)
assigned_attorney = next((m.get('fullname')
for m in team
if ('Assigned Attorney' in [r.get('name') for r in m.get('teamOrgRoles')])
), '')
primary_contact = next((m.get('fullname')
for m in team
if ('Primary' in [r.get('name') for r in m.get('teamOrgRoles')])
), '')
secondary_paralegal = next((m.get('fullname')
for m in team
if ('Secondary Paralegal' in [r.get('name') for r in m.get('teamOrgRoles')])
), '')
# Extract notice service and expiration dates
notice_service_date = convert_to_pacific_time(new_file_review.get("noticeServiceDate")) or ''
notice_expiration_date = convert_to_pacific_time(new_file_review.get("noticeExpirationDate")) or ''
# Extract daily rent damages
daily_rent_damages = lease_info_np.get("dailyRentDamages") or dates_and_deadlines.get("dailyRentDamages") or ''
# Extract default date
default_date = convert_to_pacific_time(dates_and_deadlines.get("defaultDate")) or ''
case_filed_date = convert_to_pacific_time(dates_and_deadlines.get("dateCaseFiled")) or ''
# Extract motion hearing dates
demurrer_hearing_date = convert_to_pacific_time(dates_and_deadlines.get("demurrerHearingDate")) or ''
motion_to_strike_hearing_date = convert_to_pacific_time(dates_and_deadlines.get("mTSHearingDate")) or ''
motion_to_quash_hearing_date = convert_to_pacific_time(dates_and_deadlines.get("mTQHearingDate")) or ''
other_motion_hearing_date = convert_to_pacific_time(dates_and_deadlines.get("otherMotion1HearingDate")) or ''
# Extract MSC details
msc_date = convert_to_pacific_time(dates_and_deadlines.get("mSCDate")) or ''
msc_time = dates_and_deadlines.get("mSCTime") or '' # Time field, not converting
msc_address = dates_and_deadlines.get("mSCAddress") or ''
msc_div_dept_room = dates_and_deadlines.get("mSCDeptDiv") or ''
# Extract trial details
trial_date = convert_to_pacific_time(dates_and_deadlines.get("trialDate")) or ''
trial_time = dates_and_deadlines.get("trialTime") or '' # Time field, not converting
trial_address = dates_and_deadlines.get("trialAddress") or ''
trial_div_dept_room = dates_and_deadlines.get("trialDeptDivRoom") or ''
# Extract final result of trial/MSC
final_result = dates_and_deadlines.get("finalResultOfTrialMSCCa") or ''
# Extract settlement details
date_of_settlement = convert_to_pacific_time(dates_and_deadlines.get("dateOfStipulation")) or ''
final_obligation = dates_and_deadlines.get("finalObligationUnderTheStip") or ''
def_comply_stip = dates_and_deadlines.get("defendantsComplyWithStip") or ''
# Extract judgment and writ details
judgment_date = convert_to_pacific_time(dates_and_deadlines.get("dateOfJudgment")) or ''
writ_issued_date = convert_to_pacific_time(dates_and_deadlines.get("writIssuedDate")) or ''
# Extract lockout and stay details
scheduled_lockout = convert_to_pacific_time(dates_and_deadlines.get("sheriffScheduledDate")) or ''
oppose_stays = dates_and_deadlines.get("opposeStays") or ''
# Extract premises safety and entry code
premises_safety = new_file_review.get("lockoutSafetyIssuesOrSpecialCareIssues") or ''
matter_gate_code = property_info.get("propertyEntryCodeOrInstructions") or ''
# Extract possession recovered date
date_possession_recovered = convert_to_pacific_time(dates_and_deadlines.get("datePossessionRecovered")) or ''
# Extract attorney fees and costs
attorney_fees = fees_and_costs.get("totalAttorneysFees") or ''
costs = fees_and_costs.get("totalCosts") or ''
from pprint import pprint
property_managers = [property_contacts.get('propertyManager1'), property_contacts.get('propertyManager2'), property_contacts.get('propertyManager3'), property_contacts.get('propertyManager4')]
import itertools
# valid_property_managers = list(itertools.chain(*))
valid_property_managers = [e.get('address').lower() for pm in property_managers if pm and pm.get('emails') for e in pm.get('emails') if e and e.get('address')]
pprint(valid_property_managers)
row = ProjectModel(
client=c.get("firstName", ""),
matter_description=p.get("projectName", ""),
defendant_1=defendant_one.get('fullName', 'Unknown'),
matter_open=convert_to_pacific_time(dates_and_deadlines.get("dateCaseFiled") or p.get("createdDate")),
notice_type=new_file_review.get("noticeType", '') or '',
case_number=dates_and_deadlines.get('caseNumber', '') or '',
premises_address=property_info.get("premisesAddressWithUnit", "") or '',
premises_city=property_info.get("premisesCity", "") or '',
responsible_attorney=assigned_attorney,
staff_person=primary_contact,
staff_person_2=secondary_paralegal,
phase_name=p.get("phaseName", ""),
completed_tasks=completed_tasks,
pending_tasks=pending_tasks,
notice_service_date=notice_service_date,
notice_expiration_date=notice_expiration_date,
case_field_date=case_filed_date,
daily_rent_damages=daily_rent_damages,
default_date=default_date,
demurrer_hearing_date=demurrer_hearing_date,
motion_to_strike_hearing_date=motion_to_strike_hearing_date,
motion_to_quash_hearing_date=motion_to_quash_hearing_date,
other_motion_hearing_date=other_motion_hearing_date,
msc_date=msc_date,
msc_time=msc_time,
msc_address=msc_address,
msc_div_dept_room=msc_div_dept_room,
trial_date=trial_date,
trial_time=trial_time,
trial_address=trial_address,
trial_div_dept_room=trial_div_dept_room,
final_result=final_result,
date_of_settlement=date_of_settlement,
final_obligation=final_obligation,
def_comply_stip=def_comply_stip,
judgment_date=judgment_date,
writ_issued_date=writ_issued_date,
scheduled_lockout=scheduled_lockout,
oppose_stays=oppose_stays,
premises_safety=premises_safety,
matter_gate_code=matter_gate_code,
date_possession_recovered=date_possession_recovered,
attorney_fees=attorney_fees,
costs=costs,
documents_url=matter_overview.get('documentShareFolderURL', '') or '',
service_attempt_date_1=convert_to_pacific_time(next(iter(service_info), {}).get('serviceDate')),
contacts=cs,
project_email_address=p.get("projectEmailAddress", ""),
number=p.get("number", ""),
incident_date=convert_to_pacific_time(p.get("incidentDate") or detail.get("incidentDate")),
project_id=pid,
project_name=p.get("projectName") or detail.get("projectName"),
project_url=p.get("projectUrl") or detail.get("projectUrl"),
#property_contacts=property_contacts
viewing_emails = valid_property_managers
)
# Store the results in Firestore
from app import db # Import db from app
projects_ref = db.collection("projects")
from pprint import pprint
# pprint([p.get("number"), property_info, new_file_review])
# Add new projects
project_id = row.project_id
if project_id:
projects_ref.document(str(project_id)).set(row.to_dict())
print(f"Finished on {pid} Matter {row.number} ({index}/{total})")
return row.to_dict()
except Exception as e:
print(f"[ERROR] Processing failed for {pid}: {e}")
return {}
def process_projects_parallel(projects: List[dict], client: FilevineClient, max_workers: int = 9) -> List[Dict[str, Any]]:
"""
Process projects in parallel using a worker pool.
Args:
projects: List of project data dictionaries
client: FilevineClient instance
max_workers: Number of concurrent workers (default 9)
Returns:
List of processed project dictionaries
"""
# Create a thread pool with specified number of workers
total = len(projects)
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers, initializer=worker_init, initargs=(client,)) as executor:
# Submit all tasks to the executor
future_to_project = {executor.submit(process_project, indx, total, project, client): project for indx, project in enumerate(projects)}
# Collect results as they complete
results = []
for future in concurrent.futures.as_completed(future_to_project):
try:
result = future.result()
results.append(result)
except Exception as e:
print(f"[ERROR] Processing failed: {e}")
# Add empty dict or handle error appropriately
results.append({})
return results
def main():
"""Main function to fetch and sync projects"""
print("Starting project sync...")
try:
# Initialize Filevine client
client = FilevineClient()
bearer = client.get_bearer_token()
# List projects (all pages) with filter for projects updated in the last 7 days
from datetime import datetime, timedelta
seven_days_ago = (datetime.now() - timedelta(days=14)).strftime('%Y-%m-%d')
projects = client.list_all_projects(latest_activity_since=seven_days_ago)
#projects = [p for p in projects if (p.get("projectId") or {}).get("native") == 15914808]
#projects = projects[:10]
# Process projects in parallel
detailed_rows = process_projects_parallel(projects, client, 9)
print(f"Successfully synced {len(detailed_rows)} projects to Firestore")
except Exception as e:
print(f"Error during sync: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
if __name__ == "__main__":
main()