Files
rothbard/worker_pool.py
2025-11-09 15:57:22 -08:00

227 lines
10 KiB
Python

import concurrent.futures
import threading
from typing import List, Any, Callable, Tuple
import time
# Global thread-local storage for FilevineClient to avoid passing it around
_thread_local = threading.local()
def get_filevine_client():
"""Get FilevineClient from thread local storage"""
return getattr(_thread_local, 'client', None)
def set_filevine_client(client):
"""Set FilevineClient in thread local storage"""
_thread_local.client = client
def worker_init(client: 'FilevineClient'):
"""Initialize worker with FilevineClient"""
set_filevine_client(client)
def process_project(index: int, total: int, project_data: dict, client: 'FilevineClient') -> dict:
"""
Process a single project with all its API calls.
This is the function that will be executed by workers in parallel.
"""
# Set the FilevineClient for this thread
set_filevine_client(client)
from app import convert_to_pacific_time
p = project_data
pid = (p.get("projectId") or {}).get("native")
print(f"Working on {pid} ({index}/{total})")
client = get_filevine_client()
c = client.fetch_client((p.get("clientId") or {}).get("native"))
cs = client.fetch_contacts(pid)
if pid is None:
return {}
try:
detail = client.fetch_project_detail(pid)
except Exception as e:
print(f"[WARN] detail fetch failed for {pid}: {e}")
detail = {}
defendant_one = next((c.get('orgContact', {}) for c in cs if "Defendant" in c.get('orgContact', {}).get('personTypes', [])), {})
new_file_review = client.fetch_form(pid, "newFileReview") or {}
dates_and_deadlines = client.fetch_form(pid, "datesAndDeadlines") or {}
service_info = client.fetch_collection(pid, "serviceInfo") or []
property_info = client.fetch_form(pid, "propertyInfo")
matter_overview = client.fetch_form(pid, "matterOverview")
fees_and_costs = client.fetch_form(pid, "feesAndCosts") or {}
property_contacts = client.fetch_form(pid, "propertyContacts") or {}
lease_info_np = client.fetch_form(pid, "leaseInfoNP") or {}
completed_tasks = [{"description": x.get("body"),
"completed": convert_to_pacific_time(x.get("completedDate"))}
for x in client.fetch_project_tasks(pid).get("items")
if x.get("isCompleted")]
pending_tasks = [{"description": x.get("body"),
"completed": convert_to_pacific_time(x.get("completedDate"))}
for x in client.fetch_project_tasks(pid).get("items")
if not x.get("isCompleted")]
team = client.fetch_project_team(pid)
assigned_attorney = next((m.get('fullname')
for m in team
if ('Assigned Attorney' in [r.get('name') for r in m.get('teamOrgRoles')])
), '')
primary_contact = next((m.get('fullname')
for m in team
if ('Primary' in [r.get('name') for r in m.get('teamOrgRoles')])
), '')
secondary_paralegal = next((m.get('fullname')
for m in team
if ('Secondary Paralegal' in [r.get('name') for r in m.get('teamOrgRoles')])
), '')
# Extract notice service and expiration dates
notice_service_date = convert_to_pacific_time(new_file_review.get("noticeServiceDate")) or ''
notice_expiration_date = convert_to_pacific_time(new_file_review.get("noticeExpirationDate")) or ''
# Extract daily rent damages
daily_rent_damages = lease_info_np.get("dailyRentDamages") or dates_and_deadlines.get("dailyRentDamages") or ''
# Extract default date
default_date = convert_to_pacific_time(dates_and_deadlines.get("defaultDate")) or ''
case_filed_date = convert_to_pacific_time(dates_and_deadlines.get("dateCaseFiled")) or ''
# Extract motion hearing dates
demurrer_hearing_date = convert_to_pacific_time(dates_and_deadlines.get("demurrerHearingDate")) or ''
motion_to_strike_hearing_date = convert_to_pacific_time(dates_and_deadlines.get("mTSHearingDate")) or ''
motion_to_quash_hearing_date = convert_to_pacific_time(dates_and_deadlines.get("mTQHearingDate")) or ''
other_motion_hearing_date = convert_to_pacific_time(dates_and_deadlines.get("otherMotion1HearingDate")) or ''
# Extract MSC details
msc_date = convert_to_pacific_time(dates_and_deadlines.get("mSCDate")) or ''
msc_time = dates_and_deadlines.get("mSCTime") or '' # Time field, not converting
msc_address = dates_and_deadlines.get("mSCAddress") or ''
msc_div_dept_room = dates_and_deadlines.get("mSCDeptDiv") or ''
# Extract trial details
trial_date = convert_to_pacific_time(dates_and_deadlines.get("trialDate")) or ''
trial_time = dates_and_deadlines.get("trialTime") or '' # Time field, not converting
trial_address = dates_and_deadlines.get("trialAddress") or ''
trial_div_dept_room = dates_and_deadlines.get("trialDeptDivRoom") or ''
# Extract final result of trial/MSC
final_result = dates_and_deadlines.get("finalResultOfTrialMSCCa") or ''
# Extract settlement details
date_of_settlement = convert_to_pacific_time(dates_and_deadlines.get("dateOfStipulation")) or ''
final_obligation = dates_and_deadlines.get("finalObligationUnderTheStip") or ''
def_comply_stip = dates_and_deadlines.get("defendantsComplyWithStip") or ''
# Extract judgment and writ details
judgment_date = convert_to_pacific_time(dates_and_deadlines.get("dateOfJudgment")) or ''
writ_issued_date = convert_to_pacific_time(dates_and_deadlines.get("writIssuedDate")) or ''
# Extract lockout and stay details
scheduled_lockout = convert_to_pacific_time(dates_and_deadlines.get("sheriffScheduledDate")) or ''
oppose_stays = dates_and_deadlines.get("opposeStays") or ''
# Extract premises safety and entry code
premises_safety = new_file_review.get("lockoutSafetyIssuesOrSpecialCareIssues") or ''
matter_gate_code = property_info.get("propertyEntryCodeOrInstructions") or ''
# Extract possession recovered date
date_possession_recovered = convert_to_pacific_time(dates_and_deadlines.get("datePossessionRecovered")) or ''
# Extract attorney fees and costs
attorney_fees = fees_and_costs.get("totalAttorneysFees") or ''
costs = fees_and_costs.get("totalCosts") or ''
row = {
"client": c.get("firstName"),
"matter_description": p.get("projectName"),
"defendant_1": defendant_one.get('fullName', 'Unknown'),
"matter_open": convert_to_pacific_time(dates_and_deadlines.get("dateCaseFiled") or p.get("createdDate")),
"notice_type": new_file_review.get("noticeType", '') or '',
"case_number": dates_and_deadlines.get('caseNumber', '') or '',
"premises_address": property_info.get("premisesAddressWithUnit") or '',
"premises_city": property_info.get("premisesCity") or '',
"responsible_attorney": assigned_attorney,
"staff_person": primary_contact,
"staff_person_2": secondary_paralegal,
"phase_name": p.get("phaseName"),
"completed_tasks": completed_tasks,
"pending_tasks": pending_tasks,
"notice_service_date": notice_service_date,
"notice_expiration_date": notice_expiration_date,
"case_field_date": case_filed_date,
"daily_rent_damages": daily_rent_damages,
"default_date": default_date,
"demurrer_hearing_date": demurrer_hearing_date,
"motion_to_strike_hearing_date": motion_to_strike_hearing_date,
"motion_to_quash_hearing_date": motion_to_quash_hearing_date,
"other_motion_hearing_date": other_motion_hearing_date,
"msc_date": msc_date,
"msc_time": msc_time,
"msc_address": msc_address,
"msc_div_dept_room": msc_div_dept_room,
"trial_date": trial_date,
"trial_time": trial_time,
"trial_address": trial_address,
"trial_div_dept_room": trial_div_dept_room,
"final_result": final_result,
"date_of_settlement": date_of_settlement,
"final_obligation": final_obligation,
"def_comply_stip": def_comply_stip,
"judgment_date": judgment_date,
"writ_issued_date": writ_issued_date,
"scheduled_lockout": scheduled_lockout,
"oppose_stays": oppose_stays,
"premises_safety": premises_safety,
"matter_gate_code": matter_gate_code,
"date_possession_recovered": date_possession_recovered,
"attorney_fees": attorney_fees,
"costs": costs,
"documents_url": matter_overview.get('documentShareFolderURL') or '',
"service_attempt_date_1": convert_to_pacific_time(next(iter(service_info), {}).get('serviceDate')),
"contacts": cs,
"ProjectEmailAddress": p.get("projectEmailAddress"),
"Number": p.get("number"),
"IncidentDate": convert_to_pacific_time(p.get("incidentDate") or detail.get("incidentDate")),
"ProjectId": pid,
"ProjectName": p.get("projectName") or detail.get("projectName"),
"ProjectUrl": p.get("projectUrl") or detail.get("projectUrl"),
"property_contacts": property_contacts
}
print(f"Finished on {pid} ({index}/{total})")
return row
def process_projects_parallel(projects: List[dict], client: 'FilevineClient', max_workers: int = 9) -> List[dict]:
"""
Process projects in parallel using a worker pool.
Args:
projects: List of project data dictionaries
client: FilevineClient instance
max_workers: Number of concurrent workers (default 20)
Returns:
List of processed project dictionaries
"""
# Create a thread pool with specified number of workers
total = len(projects)
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers, initializer=worker_init, initargs=(client,)) as executor:
# Submit all tasks to the executor
future_to_project = {executor.submit(process_project, indx, total, project, client): project for indx, project in enumerate(projects)}
# Collect results as they complete
results = []
for future in concurrent.futures.as_completed(future_to_project):
try:
result = future.result()
results.append(result)
except Exception as e:
print(f"[ERROR] Processing failed: {e}")
# Add empty dict or handle error appropriately
results.append({})
return results