282 lines
13 KiB
Python
282 lines
13 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Sync script to fetch and store projects in Firestore
|
|
This can be run manually from the command line to update the projects collection
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
import concurrent.futures
|
|
import threading
|
|
from typing import List, Dict, Any, Optional
|
|
from datetime import datetime
|
|
import pytz
|
|
|
|
# Add the current directory to the Python path so we can import app and models
|
|
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
|
|
|
|
from app import fetch_all_projects, convert_to_pacific_time
|
|
from models.project_model import ProjectModel
|
|
from filevine_client import FilevineClient
|
|
|
|
# Global thread-local storage for FilevineClient to avoid passing it around
|
|
_thread_local = threading.local()
|
|
|
|
def get_filevine_client():
|
|
"""Get FilevineClient from thread local storage"""
|
|
return getattr(_thread_local, 'client', None)
|
|
|
|
def set_filevine_client(client):
|
|
"""Set FilevineClient in thread local storage"""
|
|
_thread_local.client = client
|
|
|
|
def worker_init(client: FilevineClient):
|
|
"""Initialize worker with FilevineClient"""
|
|
set_filevine_client(client)
|
|
|
|
def process_project(index: int, total: int, project_data: dict, client: FilevineClient) -> Dict[str, Any]:
|
|
"""
|
|
Process a single project with all its API calls.
|
|
This is the function that will be executed by workers in parallel.
|
|
"""
|
|
# Set the FilevineClient for this thread
|
|
set_filevine_client(client)
|
|
|
|
p = project_data
|
|
pid = (p.get("projectId") or {}).get("native")
|
|
print(f"Working on {pid} ({index}/{total})")
|
|
client = get_filevine_client()
|
|
|
|
if pid is None:
|
|
return {}
|
|
|
|
try:
|
|
c = client.fetch_client((p.get("clientId") or {}).get("native"))
|
|
cs = client.fetch_contacts(pid)
|
|
detail = client.fetch_project_detail(pid)
|
|
except Exception as e:
|
|
print(f"[WARN] Failed to fetch essential data for {pid}: {e}")
|
|
return {}
|
|
|
|
defendant_one = next((c.get('orgContact', {}) for c in cs if "Defendant" in c.get('orgContact', {}).get('personTypes', [])), {})
|
|
|
|
try:
|
|
new_file_review = client.fetch_form(pid, "newFileReview") or {}
|
|
dates_and_deadlines = client.fetch_form(pid, "datesAndDeadlines") or {}
|
|
service_info = client.fetch_collection(pid, "serviceInfo") or []
|
|
property_info = client.fetch_form(pid, "propertyInfo") or {}
|
|
matter_overview = client.fetch_form(pid, "matterOverview") or {}
|
|
fees_and_costs = client.fetch_form(pid, "feesAndCosts") or {}
|
|
property_contacts = client.fetch_form(pid, "propertyContacts") or {}
|
|
lease_info_np = client.fetch_form(pid, "leaseInfoNP") or {}
|
|
|
|
tasks_result = client.fetch_project_tasks(pid)
|
|
completed_tasks = [{"description": x.get("body"),
|
|
"completed": convert_to_pacific_time(x.get("completedDate"))}
|
|
for x in tasks_result.get("items", [])
|
|
if x.get("isCompleted")]
|
|
pending_tasks = [{"description": x.get("body"),
|
|
"completed": convert_to_pacific_time(x.get("completedDate"))}
|
|
for x in tasks_result.get("items", [])
|
|
if not x.get("isCompleted")]
|
|
|
|
team = client.fetch_project_team(pid)
|
|
assigned_attorney = next((m.get('fullname')
|
|
for m in team
|
|
if ('Assigned Attorney' in [r.get('name') for r in m.get('teamOrgRoles')])
|
|
), '')
|
|
primary_contact = next((m.get('fullname')
|
|
for m in team
|
|
if ('Primary' in [r.get('name') for r in m.get('teamOrgRoles')])
|
|
), '')
|
|
secondary_paralegal = next((m.get('fullname')
|
|
for m in team
|
|
if ('Secondary Paralegal' in [r.get('name') for r in m.get('teamOrgRoles')])
|
|
), '')
|
|
|
|
# Extract notice service and expiration dates
|
|
notice_service_date = convert_to_pacific_time(new_file_review.get("noticeServiceDate")) or ''
|
|
notice_expiration_date = convert_to_pacific_time(new_file_review.get("noticeExpirationDate")) or ''
|
|
|
|
# Extract daily rent damages
|
|
daily_rent_damages = lease_info_np.get("dailyRentDamages") or dates_and_deadlines.get("dailyRentDamages") or ''
|
|
|
|
# Extract default date
|
|
default_date = convert_to_pacific_time(dates_and_deadlines.get("defaultDate")) or ''
|
|
case_filed_date = convert_to_pacific_time(dates_and_deadlines.get("dateCaseFiled")) or ''
|
|
|
|
# Extract motion hearing dates
|
|
demurrer_hearing_date = convert_to_pacific_time(dates_and_deadlines.get("demurrerHearingDate")) or ''
|
|
motion_to_strike_hearing_date = convert_to_pacific_time(dates_and_deadlines.get("mTSHearingDate")) or ''
|
|
motion_to_quash_hearing_date = convert_to_pacific_time(dates_and_deadlines.get("mTQHearingDate")) or ''
|
|
other_motion_hearing_date = convert_to_pacific_time(dates_and_deadlines.get("otherMotion1HearingDate")) or ''
|
|
|
|
# Extract MSC details
|
|
msc_date = convert_to_pacific_time(dates_and_deadlines.get("mSCDate")) or ''
|
|
msc_time = dates_and_deadlines.get("mSCTime") or '' # Time field, not converting
|
|
msc_address = dates_and_deadlines.get("mSCAddress") or ''
|
|
msc_div_dept_room = dates_and_deadlines.get("mSCDeptDiv") or ''
|
|
|
|
# Extract trial details
|
|
trial_date = convert_to_pacific_time(dates_and_deadlines.get("trialDate")) or ''
|
|
trial_time = dates_and_deadlines.get("trialTime") or '' # Time field, not converting
|
|
trial_address = dates_and_deadlines.get("trialAddress") or ''
|
|
trial_div_dept_room = dates_and_deadlines.get("trialDeptDivRoom") or ''
|
|
|
|
# Extract final result of trial/MSC
|
|
final_result = dates_and_deadlines.get("finalResultOfTrialMSCCa") or ''
|
|
|
|
# Extract settlement details
|
|
date_of_settlement = convert_to_pacific_time(dates_and_deadlines.get("dateOfStipulation")) or ''
|
|
final_obligation = dates_and_deadlines.get("finalObligationUnderTheStip") or ''
|
|
def_comply_stip = dates_and_deadlines.get("defendantsComplyWithStip") or ''
|
|
|
|
# Extract judgment and writ details
|
|
judgment_date = convert_to_pacific_time(dates_and_deadlines.get("dateOfJudgment")) or ''
|
|
writ_issued_date = convert_to_pacific_time(dates_and_deadlines.get("writIssuedDate")) or ''
|
|
|
|
# Extract lockout and stay details
|
|
scheduled_lockout = convert_to_pacific_time(dates_and_deadlines.get("sheriffScheduledDate")) or ''
|
|
oppose_stays = dates_and_deadlines.get("opposeStays") or ''
|
|
|
|
# Extract premises safety and entry code
|
|
premises_safety = new_file_review.get("lockoutSafetyIssuesOrSpecialCareIssues") or ''
|
|
matter_gate_code = property_info.get("propertyEntryCodeOrInstructions") or ''
|
|
|
|
# Extract possession recovered date
|
|
date_possession_recovered = convert_to_pacific_time(dates_and_deadlines.get("datePossessionRecovered")) or ''
|
|
|
|
# Extract attorney fees and costs
|
|
attorney_fees = fees_and_costs.get("totalAttorneysFees") or ''
|
|
costs = fees_and_costs.get("totalCosts") or ''
|
|
|
|
row = ProjectModel(
|
|
client=c.get("firstName", ""),
|
|
matter_description=p.get("projectName", ""),
|
|
defendant_1=defendant_one.get('fullName', 'Unknown'),
|
|
matter_open=convert_to_pacific_time(dates_and_deadlines.get("dateCaseFiled") or p.get("createdDate")),
|
|
notice_type=new_file_review.get("noticeType", '') or '',
|
|
case_number=dates_and_deadlines.get('caseNumber', '') or '',
|
|
premises_address=property_info.get("premisesAddressWithUnit", "") or '',
|
|
premises_city=property_info.get("premisesCity", "") or '',
|
|
responsible_attorney=assigned_attorney,
|
|
staff_person=primary_contact,
|
|
staff_person_2=secondary_paralegal,
|
|
phase_name=p.get("phaseName", ""),
|
|
completed_tasks=completed_tasks,
|
|
pending_tasks=pending_tasks,
|
|
notice_service_date=notice_service_date,
|
|
notice_expiration_date=notice_expiration_date,
|
|
case_field_date=case_filed_date,
|
|
daily_rent_damages=daily_rent_damages,
|
|
default_date=default_date,
|
|
demurrer_hearing_date=demurrer_hearing_date,
|
|
motion_to_strike_hearing_date=motion_to_strike_hearing_date,
|
|
motion_to_quash_hearing_date=motion_to_quash_hearing_date,
|
|
other_motion_hearing_date=other_motion_hearing_date,
|
|
msc_date=msc_date,
|
|
msc_time=msc_time,
|
|
msc_address=msc_address,
|
|
msc_div_dept_room=msc_div_dept_room,
|
|
trial_date=trial_date,
|
|
trial_time=trial_time,
|
|
trial_address=trial_address,
|
|
trial_div_dept_room=trial_div_dept_room,
|
|
final_result=final_result,
|
|
date_of_settlement=date_of_settlement,
|
|
final_obligation=final_obligation,
|
|
def_comply_stip=def_comply_stip,
|
|
judgment_date=judgment_date,
|
|
writ_issued_date=writ_issued_date,
|
|
scheduled_lockout=scheduled_lockout,
|
|
oppose_stays=oppose_stays,
|
|
premises_safety=premises_safety,
|
|
matter_gate_code=matter_gate_code,
|
|
date_possession_recovered=date_possession_recovered,
|
|
attorney_fees=attorney_fees,
|
|
costs=costs,
|
|
documents_url=matter_overview.get('documentShareFolderURL', '') or '',
|
|
service_attempt_date_1=convert_to_pacific_time(next(iter(service_info), {}).get('serviceDate')),
|
|
contacts=cs,
|
|
project_email_address=p.get("projectEmailAddress", ""),
|
|
number=p.get("number", ""),
|
|
incident_date=convert_to_pacific_time(p.get("incidentDate") or detail.get("incidentDate")),
|
|
project_id=pid,
|
|
project_name=p.get("projectName") or detail.get("projectName"),
|
|
project_url=p.get("projectUrl") or detail.get("projectUrl"),
|
|
property_contacts=property_contacts
|
|
)
|
|
# Store the results in Firestore
|
|
from app import db # Import db from app
|
|
|
|
projects_ref = db.collection("projects")
|
|
|
|
# Add new projects
|
|
project_id = row.project_id
|
|
if project_id:
|
|
projects_ref.document(str(project_id)).set(row.to_dict())
|
|
|
|
print(f"Finished on {pid} ({index}/{total})")
|
|
return row.to_dict()
|
|
|
|
except Exception as e:
|
|
print(f"[ERROR] Processing failed for {pid}: {e}")
|
|
return {}
|
|
|
|
def process_projects_parallel(projects: List[dict], client: FilevineClient, max_workers: int = 9) -> List[Dict[str, Any]]:
|
|
"""
|
|
Process projects in parallel using a worker pool.
|
|
|
|
Args:
|
|
projects: List of project data dictionaries
|
|
client: FilevineClient instance
|
|
max_workers: Number of concurrent workers (default 9)
|
|
|
|
Returns:
|
|
List of processed project dictionaries
|
|
"""
|
|
# Create a thread pool with specified number of workers
|
|
total = len(projects)
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers, initializer=worker_init, initargs=(client,)) as executor:
|
|
# Submit all tasks to the executor
|
|
future_to_project = {executor.submit(process_project, indx, total, project, client): project for indx, project in enumerate(projects)}
|
|
|
|
# Collect results as they complete
|
|
results = []
|
|
for future in concurrent.futures.as_completed(future_to_project):
|
|
try:
|
|
result = future.result()
|
|
results.append(result)
|
|
except Exception as e:
|
|
print(f"[ERROR] Processing failed: {e}")
|
|
# Add empty dict or handle error appropriately
|
|
results.append({})
|
|
|
|
return results
|
|
|
|
def main():
|
|
"""Main function to fetch and sync projects"""
|
|
print("Starting project sync...")
|
|
try:
|
|
# Initialize Filevine client
|
|
client = FilevineClient()
|
|
bearer = client.get_bearer_token()
|
|
|
|
# List projects (all pages)
|
|
projects = client.list_all_projects()
|
|
projects = projects[:20]
|
|
|
|
# Process projects in parallel
|
|
detailed_rows = process_projects_parallel(projects, client, 9)
|
|
|
|
|
|
print(f"Successfully synced {len(detailed_rows)} projects to Firestore")
|
|
|
|
except Exception as e:
|
|
print(f"Error during sync: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
sys.exit(1)
|
|
|
|
if __name__ == "__main__":
|
|
main() |