#!/usr/bin/env python3 """ Sync script to fetch and store projects in Firestore This can be run manually from the command line to update the projects collection """ import sys import os import concurrent.futures import threading from typing import List, Dict, Any, Optional from datetime import datetime import pytz from dotenv import load_dotenv load_dotenv() # Add the current directory to the Python path so we can import app and models sys.path.append(os.path.dirname(os.path.abspath(__file__))) def convert_to_pacific_time(date_str): """Convert UTC date string to Pacific Time and format as YYYY-MM-DD. Args: date_str (str): UTC date string in ISO 8601 format (e.g., "2025-10-24T19:20:22.377Z") Returns: str: Date formatted as YYYY-MM-DD in Pacific Time, or empty string if input is empty """ if not date_str: return '' try: # Parse the UTC datetime utc_time = datetime.fromisoformat(date_str.replace('Z', '+00:00')) # Set timezone to UTC utc_time = utc_time.replace(tzinfo=pytz.UTC) # Convert to Pacific Time pacific_time = utc_time.astimezone(pytz.timezone('America/Los_Angeles')) # Format as YYYY-MM-DD return pacific_time.strftime('%Y-%m-%d') except (ValueError, AttributeError) as e: print(f"[WARN] Date conversion failed for '{date_str}': {e}") return '' from models.project_model import ProjectModel from filevine_client import FilevineClient # Global thread-local storage for FilevineClient to avoid passing it around _thread_local = threading.local() def get_filevine_client(): """Get FilevineClient from thread local storage""" return getattr(_thread_local, 'client', None) def set_filevine_client(client): """Set FilevineClient in thread local storage""" _thread_local.client = client def worker_init(client: FilevineClient): """Initialize worker with FilevineClient""" set_filevine_client(client) def process_project(index: int, total: int, project_data: dict, client: FilevineClient) -> Dict[str, Any]: """ Process a single project with all its API calls. This is the function that will be executed by workers in parallel. """ # Set the FilevineClient for this thread set_filevine_client(client) p = project_data pid = (p.get("projectId") or {}).get("native") print(f"Working on {pid} ({index}/{total})") client = get_filevine_client() if pid is None: return {} try: c = client.fetch_client((p.get("clientId") or {}).get("native")) cs = client.fetch_contacts(pid) detail = client.fetch_project_detail(pid) except Exception as e: print(f"[WARN] Failed to fetch essential data for {pid}: {e}") return {} defendant_one = next((c.get('orgContact', {}) for c in cs if "Defendant" in c.get('orgContact', {}).get('personTypes', [])), {}) try: new_file_review = client.fetch_form(pid, "newFileReview") or {} dates_and_deadlines = client.fetch_form(pid, "datesAndDeadlines") or {} service_info = client.fetch_collection(pid, "serviceInfo") or [] property_info = client.fetch_form(pid, "propertyInfo") or {} matter_overview = client.fetch_form(pid, "matterOverview") or {} fees_and_costs = client.fetch_form(pid, "feesAndCosts") or {} property_contacts = client.fetch_form(pid, "propertyContacts") or {} lease_info_np = client.fetch_form(pid, "leaseInfoNP") or {} tasks_result = client.fetch_project_tasks(pid) completed_tasks = [{"description": x.get("body"), "completed": convert_to_pacific_time(x.get("completedDate"))} for x in tasks_result.get("items", []) if x.get("isCompleted")] pending_tasks = [{"description": x.get("body"), "completed": convert_to_pacific_time(x.get("completedDate"))} for x in tasks_result.get("items", []) if not x.get("isCompleted")] team = client.fetch_project_team(pid) assigned_attorney = next((m.get('fullname') for m in team if ('Assigned Attorney' in [r.get('name') for r in m.get('teamOrgRoles')]) ), '') primary_contact = next((m.get('fullname') for m in team if ('Primary' in [r.get('name') for r in m.get('teamOrgRoles')]) ), '') secondary_paralegal = next((m.get('fullname') for m in team if ('Secondary Paralegal' in [r.get('name') for r in m.get('teamOrgRoles')]) ), '') # Extract notice service and expiration dates notice_service_date = convert_to_pacific_time(new_file_review.get("noticeServiceDate")) or '' notice_expiration_date = convert_to_pacific_time(new_file_review.get("noticeExpirationDate")) or '' # Extract daily rent damages daily_rent_damages = lease_info_np.get("dailyRentDamages") or dates_and_deadlines.get("dailyRentDamages") or '' # Extract default date default_date = convert_to_pacific_time(dates_and_deadlines.get("defaultDate")) or '' case_filed_date = convert_to_pacific_time(dates_and_deadlines.get("dateCaseFiled")) or '' # Extract motion hearing dates demurrer_hearing_date = convert_to_pacific_time(dates_and_deadlines.get("demurrerHearingDate")) or '' motion_to_strike_hearing_date = convert_to_pacific_time(dates_and_deadlines.get("mTSHearingDate")) or '' motion_to_quash_hearing_date = convert_to_pacific_time(dates_and_deadlines.get("mTQHearingDate")) or '' other_motion_hearing_date = convert_to_pacific_time(dates_and_deadlines.get("otherMotion1HearingDate")) or '' # Extract MSC details msc_date = convert_to_pacific_time(dates_and_deadlines.get("mSCDate")) or '' msc_time = dates_and_deadlines.get("mSCTime") or '' # Time field, not converting msc_address = dates_and_deadlines.get("mSCAddress") or '' msc_div_dept_room = dates_and_deadlines.get("mSCDeptDiv") or '' # Extract trial details trial_date = convert_to_pacific_time(dates_and_deadlines.get("trialDate")) or '' trial_time = dates_and_deadlines.get("trialTime") or '' # Time field, not converting trial_address = dates_and_deadlines.get("trialAddress") or '' trial_div_dept_room = dates_and_deadlines.get("trialDeptDivRoom") or '' # Extract final result of trial/MSC final_result = dates_and_deadlines.get("finalResultOfTrialMSCCa") or '' # Extract settlement details date_of_settlement = convert_to_pacific_time(dates_and_deadlines.get("dateOfStipulation")) or '' final_obligation = dates_and_deadlines.get("finalObligationUnderTheStip") or '' def_comply_stip = dates_and_deadlines.get("defendantsComplyWithStip") or '' # Extract judgment and writ details judgment_date = convert_to_pacific_time(dates_and_deadlines.get("dateOfJudgment")) or '' writ_issued_date = convert_to_pacific_time(dates_and_deadlines.get("writIssuedDate")) or '' # Extract lockout and stay details scheduled_lockout = convert_to_pacific_time(dates_and_deadlines.get("sheriffScheduledDate")) or '' oppose_stays = dates_and_deadlines.get("opposeStays") or '' # Extract premises safety and entry code premises_safety = new_file_review.get("lockoutSafetyIssuesOrSpecialCareIssues") or '' matter_gate_code = property_info.get("propertyEntryCodeOrInstructions") or '' # Extract possession recovered date date_possession_recovered = convert_to_pacific_time(dates_and_deadlines.get("datePossessionRecovered")) or '' # Extract attorney fees and costs attorney_fees = fees_and_costs.get("totalAttorneysFees") or '' costs = fees_and_costs.get("totalCosts") or '' from pprint import pprint property_managers = [property_contacts.get('propertyManager1'), property_contacts.get('propertyManager2'), property_contacts.get('propertyManager3'), property_contacts.get('propertyManager4')] import itertools # valid_property_managers = list(itertools.chain(*)) valid_property_managers = [e.get('address').lower() for pm in property_managers if pm and pm.get('emails') for e in pm.get('emails') if e and e.get('address')] pprint(valid_property_managers) row = ProjectModel( client=c.get("firstName", ""), matter_description=p.get("projectName", ""), defendant_1=defendant_one.get('fullName', 'Unknown'), matter_open=convert_to_pacific_time(dates_and_deadlines.get("dateCaseFiled") or p.get("createdDate")), notice_type=new_file_review.get("noticeType", '') or '', case_number=dates_and_deadlines.get('caseNumber', '') or '', premises_address=property_info.get("premisesAddressWithUnit", "") or '', premises_city=property_info.get("premisesCity", "") or '', responsible_attorney=assigned_attorney, staff_person=primary_contact, staff_person_2=secondary_paralegal, phase_name=p.get("phaseName", ""), completed_tasks=completed_tasks, pending_tasks=pending_tasks, notice_service_date=notice_service_date, notice_expiration_date=notice_expiration_date, case_field_date=case_filed_date, daily_rent_damages=daily_rent_damages, default_date=default_date, demurrer_hearing_date=demurrer_hearing_date, motion_to_strike_hearing_date=motion_to_strike_hearing_date, motion_to_quash_hearing_date=motion_to_quash_hearing_date, other_motion_hearing_date=other_motion_hearing_date, msc_date=msc_date, msc_time=msc_time, msc_address=msc_address, msc_div_dept_room=msc_div_dept_room, trial_date=trial_date, trial_time=trial_time, trial_address=trial_address, trial_div_dept_room=trial_div_dept_room, final_result=final_result, date_of_settlement=date_of_settlement, final_obligation=final_obligation, def_comply_stip=def_comply_stip, judgment_date=judgment_date, writ_issued_date=writ_issued_date, scheduled_lockout=scheduled_lockout, oppose_stays=oppose_stays, premises_safety=premises_safety, matter_gate_code=matter_gate_code, date_possession_recovered=date_possession_recovered, attorney_fees=attorney_fees, costs=costs, documents_url=matter_overview.get('documentShareFolderURL', '') or '', service_attempt_date_1=convert_to_pacific_time(next(iter(service_info), {}).get('serviceDate')), contacts=cs, project_email_address=p.get("projectEmailAddress", ""), number=p.get("number", ""), incident_date=convert_to_pacific_time(p.get("incidentDate") or detail.get("incidentDate")), project_id=pid, project_name=p.get("projectName") or detail.get("projectName"), project_url=p.get("projectUrl") or detail.get("projectUrl"), #property_contacts=property_contacts viewing_emails = valid_property_managers ) # Store the results in Firestore from app import db # Import db from app projects_ref = db.collection("projects") # Add new projects project_id = row.project_id if project_id: projects_ref.document(str(project_id)).set(row.to_dict()) print(f"Finished on {pid} ({index}/{total})") return row.to_dict() except Exception as e: print(f"[ERROR] Processing failed for {pid}: {e}") return {} def process_projects_parallel(projects: List[dict], client: FilevineClient, max_workers: int = 9) -> List[Dict[str, Any]]: """ Process projects in parallel using a worker pool. Args: projects: List of project data dictionaries client: FilevineClient instance max_workers: Number of concurrent workers (default 9) Returns: List of processed project dictionaries """ # Create a thread pool with specified number of workers total = len(projects) with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers, initializer=worker_init, initargs=(client,)) as executor: # Submit all tasks to the executor future_to_project = {executor.submit(process_project, indx, total, project, client): project for indx, project in enumerate(projects)} # Collect results as they complete results = [] for future in concurrent.futures.as_completed(future_to_project): try: result = future.result() results.append(result) except Exception as e: print(f"[ERROR] Processing failed: {e}") # Add empty dict or handle error appropriately results.append({}) return results def main(): """Main function to fetch and sync projects""" print("Starting project sync...") try: # Initialize Filevine client client = FilevineClient() bearer = client.get_bearer_token() # List projects (all pages) projects = client.list_all_projects() projects = [p for p in projects if (p.get("projectId") or {}).get("native") == 15914808] # Process projects in parallel detailed_rows = process_projects_parallel(projects, client, 9) print(f"Successfully synced {len(detailed_rows)} projects to Firestore") except Exception as e: print(f"Error during sync: {e}") import traceback traceback.print_exc() sys.exit(1) if __name__ == "__main__": main()