Files
rothbard/sync.py
Bryce 662be72f6a feat: Implement comprehensive project data model and synchronization system
- Added ProjectModel class in models/project_model.py to define structure for Filevine project data with proper type hints and conversion methods (to_dict/from_dict)
- Implemented get_firestore_document() helper function in app.py for retrieving specific Firestore documents
- Enhanced dashboard pagination in app.py with improved error handling and debugging output for property contacts and project IDs
- Overhauled sync.py with:
  * Parallel processing using ThreadPoolExecutor for efficient project synchronization
  * Comprehensive extraction of project data from Filevine forms (newFileReview, datesAndDeadlines, propertyInfo, etc.)
  * Improved error handling and logging throughout the sync process
  * Proper handling of date conversions and field mappings from Filevine to Firestore
  * Added property contacts email extraction and viewing_emails array population
  * Added support for filtering projects by specific ProjectId (15914808) for targeted sync
- Added proper initialization of Filevine client in worker threads using thread-local storage
- Improved handling of optional fields and default values in ProjectModel
- Added detailed logging for progress tracking during synchronization

This implementation enables reliable synchronization of Filevine project data to Firestore with proper data modeling and error handling, supporting the dashboard's data requirements.
2025-11-09 20:21:53 -08:00

291 lines
13 KiB
Python

#!/usr/bin/env python3
"""
Sync script to fetch and store projects in Firestore
This can be run manually from the command line to update the projects collection
"""
import sys
import os
import concurrent.futures
import threading
from typing import List, Dict, Any, Optional
from datetime import datetime
import pytz
# Add the current directory to the Python path so we can import app and models
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from app import fetch_all_projects, convert_to_pacific_time
from models.project_model import ProjectModel
from filevine_client import FilevineClient
# Global thread-local storage for FilevineClient to avoid passing it around
_thread_local = threading.local()
def get_filevine_client():
"""Get FilevineClient from thread local storage"""
return getattr(_thread_local, 'client', None)
def set_filevine_client(client):
"""Set FilevineClient in thread local storage"""
_thread_local.client = client
def worker_init(client: FilevineClient):
"""Initialize worker with FilevineClient"""
set_filevine_client(client)
def process_project(index: int, total: int, project_data: dict, client: FilevineClient) -> Dict[str, Any]:
"""
Process a single project with all its API calls.
This is the function that will be executed by workers in parallel.
"""
# Set the FilevineClient for this thread
set_filevine_client(client)
p = project_data
pid = (p.get("projectId") or {}).get("native")
print(f"Working on {pid} ({index}/{total})")
client = get_filevine_client()
if pid is None:
return {}
try:
c = client.fetch_client((p.get("clientId") or {}).get("native"))
cs = client.fetch_contacts(pid)
detail = client.fetch_project_detail(pid)
except Exception as e:
print(f"[WARN] Failed to fetch essential data for {pid}: {e}")
return {}
defendant_one = next((c.get('orgContact', {}) for c in cs if "Defendant" in c.get('orgContact', {}).get('personTypes', [])), {})
try:
new_file_review = client.fetch_form(pid, "newFileReview") or {}
dates_and_deadlines = client.fetch_form(pid, "datesAndDeadlines") or {}
service_info = client.fetch_collection(pid, "serviceInfo") or []
property_info = client.fetch_form(pid, "propertyInfo") or {}
matter_overview = client.fetch_form(pid, "matterOverview") or {}
fees_and_costs = client.fetch_form(pid, "feesAndCosts") or {}
property_contacts = client.fetch_form(pid, "propertyContacts") or {}
lease_info_np = client.fetch_form(pid, "leaseInfoNP") or {}
tasks_result = client.fetch_project_tasks(pid)
completed_tasks = [{"description": x.get("body"),
"completed": convert_to_pacific_time(x.get("completedDate"))}
for x in tasks_result.get("items", [])
if x.get("isCompleted")]
pending_tasks = [{"description": x.get("body"),
"completed": convert_to_pacific_time(x.get("completedDate"))}
for x in tasks_result.get("items", [])
if not x.get("isCompleted")]
team = client.fetch_project_team(pid)
assigned_attorney = next((m.get('fullname')
for m in team
if ('Assigned Attorney' in [r.get('name') for r in m.get('teamOrgRoles')])
), '')
primary_contact = next((m.get('fullname')
for m in team
if ('Primary' in [r.get('name') for r in m.get('teamOrgRoles')])
), '')
secondary_paralegal = next((m.get('fullname')
for m in team
if ('Secondary Paralegal' in [r.get('name') for r in m.get('teamOrgRoles')])
), '')
# Extract notice service and expiration dates
notice_service_date = convert_to_pacific_time(new_file_review.get("noticeServiceDate")) or ''
notice_expiration_date = convert_to_pacific_time(new_file_review.get("noticeExpirationDate")) or ''
# Extract daily rent damages
daily_rent_damages = lease_info_np.get("dailyRentDamages") or dates_and_deadlines.get("dailyRentDamages") or ''
# Extract default date
default_date = convert_to_pacific_time(dates_and_deadlines.get("defaultDate")) or ''
case_filed_date = convert_to_pacific_time(dates_and_deadlines.get("dateCaseFiled")) or ''
# Extract motion hearing dates
demurrer_hearing_date = convert_to_pacific_time(dates_and_deadlines.get("demurrerHearingDate")) or ''
motion_to_strike_hearing_date = convert_to_pacific_time(dates_and_deadlines.get("mTSHearingDate")) or ''
motion_to_quash_hearing_date = convert_to_pacific_time(dates_and_deadlines.get("mTQHearingDate")) or ''
other_motion_hearing_date = convert_to_pacific_time(dates_and_deadlines.get("otherMotion1HearingDate")) or ''
# Extract MSC details
msc_date = convert_to_pacific_time(dates_and_deadlines.get("mSCDate")) or ''
msc_time = dates_and_deadlines.get("mSCTime") or '' # Time field, not converting
msc_address = dates_and_deadlines.get("mSCAddress") or ''
msc_div_dept_room = dates_and_deadlines.get("mSCDeptDiv") or ''
# Extract trial details
trial_date = convert_to_pacific_time(dates_and_deadlines.get("trialDate")) or ''
trial_time = dates_and_deadlines.get("trialTime") or '' # Time field, not converting
trial_address = dates_and_deadlines.get("trialAddress") or ''
trial_div_dept_room = dates_and_deadlines.get("trialDeptDivRoom") or ''
# Extract final result of trial/MSC
final_result = dates_and_deadlines.get("finalResultOfTrialMSCCa") or ''
# Extract settlement details
date_of_settlement = convert_to_pacific_time(dates_and_deadlines.get("dateOfStipulation")) or ''
final_obligation = dates_and_deadlines.get("finalObligationUnderTheStip") or ''
def_comply_stip = dates_and_deadlines.get("defendantsComplyWithStip") or ''
# Extract judgment and writ details
judgment_date = convert_to_pacific_time(dates_and_deadlines.get("dateOfJudgment")) or ''
writ_issued_date = convert_to_pacific_time(dates_and_deadlines.get("writIssuedDate")) or ''
# Extract lockout and stay details
scheduled_lockout = convert_to_pacific_time(dates_and_deadlines.get("sheriffScheduledDate")) or ''
oppose_stays = dates_and_deadlines.get("opposeStays") or ''
# Extract premises safety and entry code
premises_safety = new_file_review.get("lockoutSafetyIssuesOrSpecialCareIssues") or ''
matter_gate_code = property_info.get("propertyEntryCodeOrInstructions") or ''
# Extract possession recovered date
date_possession_recovered = convert_to_pacific_time(dates_and_deadlines.get("datePossessionRecovered")) or ''
# Extract attorney fees and costs
attorney_fees = fees_and_costs.get("totalAttorneysFees") or ''
costs = fees_and_costs.get("totalCosts") or ''
from pprint import pprint
property_managers = [property_contacts.get('propertyManager1'), property_contacts.get('propertyManager2'), property_contacts.get('propertyManager3'), property_contacts.get('propertyManager4')]
import itertools
# valid_property_managers = list(itertools.chain(*))
valid_property_managers = [e.get('address').lower() for pm in property_managers if pm and pm.get('emails') for e in pm.get('emails') if e and e.get('address')]
pprint(valid_property_managers)
row = ProjectModel(
client=c.get("firstName", ""),
matter_description=p.get("projectName", ""),
defendant_1=defendant_one.get('fullName', 'Unknown'),
matter_open=convert_to_pacific_time(dates_and_deadlines.get("dateCaseFiled") or p.get("createdDate")),
notice_type=new_file_review.get("noticeType", '') or '',
case_number=dates_and_deadlines.get('caseNumber', '') or '',
premises_address=property_info.get("premisesAddressWithUnit", "") or '',
premises_city=property_info.get("premisesCity", "") or '',
responsible_attorney=assigned_attorney,
staff_person=primary_contact,
staff_person_2=secondary_paralegal,
phase_name=p.get("phaseName", ""),
completed_tasks=completed_tasks,
pending_tasks=pending_tasks,
notice_service_date=notice_service_date,
notice_expiration_date=notice_expiration_date,
case_field_date=case_filed_date,
daily_rent_damages=daily_rent_damages,
default_date=default_date,
demurrer_hearing_date=demurrer_hearing_date,
motion_to_strike_hearing_date=motion_to_strike_hearing_date,
motion_to_quash_hearing_date=motion_to_quash_hearing_date,
other_motion_hearing_date=other_motion_hearing_date,
msc_date=msc_date,
msc_time=msc_time,
msc_address=msc_address,
msc_div_dept_room=msc_div_dept_room,
trial_date=trial_date,
trial_time=trial_time,
trial_address=trial_address,
trial_div_dept_room=trial_div_dept_room,
final_result=final_result,
date_of_settlement=date_of_settlement,
final_obligation=final_obligation,
def_comply_stip=def_comply_stip,
judgment_date=judgment_date,
writ_issued_date=writ_issued_date,
scheduled_lockout=scheduled_lockout,
oppose_stays=oppose_stays,
premises_safety=premises_safety,
matter_gate_code=matter_gate_code,
date_possession_recovered=date_possession_recovered,
attorney_fees=attorney_fees,
costs=costs,
documents_url=matter_overview.get('documentShareFolderURL', '') or '',
service_attempt_date_1=convert_to_pacific_time(next(iter(service_info), {}).get('serviceDate')),
contacts=cs,
project_email_address=p.get("projectEmailAddress", ""),
number=p.get("number", ""),
incident_date=convert_to_pacific_time(p.get("incidentDate") or detail.get("incidentDate")),
project_id=pid,
project_name=p.get("projectName") or detail.get("projectName"),
project_url=p.get("projectUrl") or detail.get("projectUrl"),
#property_contacts=property_contacts
viewing_emails = valid_property_managers
)
# Store the results in Firestore
from app import db # Import db from app
projects_ref = db.collection("projects")
# Add new projects
project_id = row.project_id
if project_id:
projects_ref.document(str(project_id)).set(row.to_dict())
print(f"Finished on {pid} ({index}/{total})")
return row.to_dict()
except Exception as e:
print(f"[ERROR] Processing failed for {pid}: {e}")
return {}
def process_projects_parallel(projects: List[dict], client: FilevineClient, max_workers: int = 9) -> List[Dict[str, Any]]:
"""
Process projects in parallel using a worker pool.
Args:
projects: List of project data dictionaries
client: FilevineClient instance
max_workers: Number of concurrent workers (default 9)
Returns:
List of processed project dictionaries
"""
# Create a thread pool with specified number of workers
total = len(projects)
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers, initializer=worker_init, initargs=(client,)) as executor:
# Submit all tasks to the executor
future_to_project = {executor.submit(process_project, indx, total, project, client): project for indx, project in enumerate(projects)}
# Collect results as they complete
results = []
for future in concurrent.futures.as_completed(future_to_project):
try:
result = future.result()
results.append(result)
except Exception as e:
print(f"[ERROR] Processing failed: {e}")
# Add empty dict or handle error appropriately
results.append({})
return results
def main():
"""Main function to fetch and sync projects"""
print("Starting project sync...")
try:
# Initialize Filevine client
client = FilevineClient()
bearer = client.get_bearer_token()
# List projects (all pages)
projects = client.list_all_projects()
projects = [p for p in projects if (p.get("projectId") or {}).get("native") == 15914808]
# Process projects in parallel
detailed_rows = process_projects_parallel(projects, client, 9)
print(f"Successfully synced {len(detailed_rows)} projects to Firestore")
except Exception as e:
print(f"Error during sync: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
if __name__ == "__main__":
main()