import concurrent.futures import threading from typing import List, Any, Callable, Tuple import time # Global thread-local storage for bearer token to avoid passing it around _thread_local = threading.local() def get_bearer_token(): """Get bearer token from thread local storage""" return getattr(_thread_local, 'bearer', None) def set_bearer_token(token): """Set bearer token in thread local storage""" _thread_local.bearer = token def worker_init(bearer_token: str): """Initialize worker with bearer token""" set_bearer_token(bearer_token) def process_project(index: int, total: int, project_data: dict, bearer_token: str) -> dict: """ Process a single project with all its API calls. This is the function that will be executed by workers in parallel. """ # Set the bearer token for this thread set_bearer_token(bearer_token) from app import ( fetch_client, fetch_contacts, fetch_project_detail, fetch_form, fetch_collection, fetch_project_tasks, fetch_project_team, convert_to_pacific_time ) p = project_data pid = (p.get("projectId") or {}).get("native") print(f"Working on {pid} ({index}/{total})") c = fetch_client(bearer_token, (p.get("clientId") or {}).get("native")) cs = fetch_contacts(bearer_token, pid) if pid is None: return {} try: detail = fetch_project_detail(bearer_token, pid) except Exception as e: print(f"[WARN] detail fetch failed for {pid}: {e}") detail = {} defendant_one = next((c.get('orgContact', {}) for c in cs if "Defendant" in c.get('orgContact', {}).get('personTypes', [])), {}) new_file_review = fetch_form(bearer_token, pid, "newFileReview") or {} dates_and_deadlines = fetch_form(bearer_token, pid, "datesAndDeadlines") or {} service_info = fetch_collection(bearer_token, pid, "serviceInfo") or [] property_info = fetch_form(bearer_token, pid, "propertyInfo") matter_overview = fetch_form(bearer_token, pid, "matterOverview") fees_and_costs = fetch_form(bearer_token, pid, "feesAndCosts") or {} property_contacts = fetch_form(bearer_token, pid, "propertyContacts") or {} lease_info_np = fetch_form(bearer_token, pid, "leaseInfoNP") or {} completed_tasks = [{"description": x.get("body"), "completed": convert_to_pacific_time(x.get("completedDate"))} for x in fetch_project_tasks(bearer_token, pid).get("items") if x.get("isCompleted")] pending_tasks = [{"description": x.get("body"), "completed": convert_to_pacific_time(x.get("completedDate"))} for x in fetch_project_tasks(bearer_token, pid).get("items") if not x.get("isCompleted")] team = fetch_project_team(bearer_token, pid) assigned_attorney = next((m.get('fullname') for m in team if ('Assigned Attorney' in [r.get('name') for r in m.get('teamOrgRoles')]) ), '') primary_contact = next((m.get('fullname') for m in team if ('Primary' in [r.get('name') for r in m.get('teamOrgRoles')]) ), '') secondary_paralegal = next((m.get('fullname') for m in team if ('Secondary Paralegal' in [r.get('name') for r in m.get('teamOrgRoles')]) ), '') # Extract notice service and expiration dates notice_service_date = convert_to_pacific_time(new_file_review.get("noticeServiceDate")) or '' notice_expiration_date = convert_to_pacific_time(new_file_review.get("noticeExpirationDate")) or '' # Extract daily rent damages daily_rent_damages = lease_info_np.get("dailyRentDamages") or dates_and_deadlines.get("dailyRentDamages") or '' # Extract default date default_date = convert_to_pacific_time(dates_and_deadlines.get("defaultDate")) or '' case_filed_date = convert_to_pacific_time(dates_and_deadlines.get("dateCaseFiled")) or '' # Extract motion hearing dates demurrer_hearing_date = convert_to_pacific_time(dates_and_deadlines.get("demurrerHearingDate")) or '' motion_to_strike_hearing_date = convert_to_pacific_time(dates_and_deadlines.get("mTSHearingDate")) or '' motion_to_quash_hearing_date = convert_to_pacific_time(dates_and_deadlines.get("mTQHearingDate")) or '' other_motion_hearing_date = convert_to_pacific_time(dates_and_deadlines.get("otherMotion1HearingDate")) or '' # Extract MSC details msc_date = convert_to_pacific_time(dates_and_deadlines.get("mSCDate")) or '' msc_time = dates_and_deadlines.get("mSCTime") or '' # Time field, not converting msc_address = dates_and_deadlines.get("mSCAddress") or '' msc_div_dept_room = dates_and_deadlines.get("mSCDeptDiv") or '' # Extract trial details trial_date = convert_to_pacific_time(dates_and_deadlines.get("trialDate")) or '' trial_time = dates_and_deadlines.get("trialTime") or '' # Time field, not converting trial_address = dates_and_deadlines.get("trialAddress") or '' trial_div_dept_room = dates_and_deadlines.get("trialDeptDivRoom") or '' # Extract final result of trial/MSC final_result = dates_and_deadlines.get("finalResultOfTrialMSCCa") or '' # Extract settlement details date_of_settlement = convert_to_pacific_time(dates_and_deadlines.get("dateOfStipulation")) or '' final_obligation = dates_and_deadlines.get("finalObligationUnderTheStip") or '' def_comply_stip = dates_and_deadlines.get("defendantsComplyWithStip") or '' # Extract judgment and writ details judgment_date = convert_to_pacific_time(dates_and_deadlines.get("dateOfJudgment")) or '' writ_issued_date = convert_to_pacific_time(dates_and_deadlines.get("writIssuedDate")) or '' # Extract lockout and stay details scheduled_lockout = convert_to_pacific_time(dates_and_deadlines.get("sheriffScheduledDate")) or '' oppose_stays = dates_and_deadlines.get("opposeStays") or '' # Extract premises safety and entry code premises_safety = new_file_review.get("lockoutSafetyIssuesOrSpecialCareIssues") or '' matter_gate_code = property_info.get("propertyEntryCodeOrInstructions") or '' # Extract possession recovered date date_possession_recovered = convert_to_pacific_time(dates_and_deadlines.get("datePossessionRecovered")) or '' # Extract attorney fees and costs attorney_fees = fees_and_costs.get("totalAttorneysFees") or '' costs = fees_and_costs.get("totalCosts") or '' row = { "client": c.get("firstName"), "matter_description": p.get("projectName"), "defendant_1": defendant_one.get('fullName', 'Unknown'), "matter_open": convert_to_pacific_time(dates_and_deadlines.get("dateCaseFiled") or p.get("createdDate")), "notice_type": new_file_review.get("noticeType", '') or '', "case_number": dates_and_deadlines.get('caseNumber', '') or '', "premises_address": property_info.get("premisesAddressWithUnit") or '', "premises_city": property_info.get("premisesCity") or '', "responsible_attorney": assigned_attorney, "staff_person": primary_contact, "staff_person_2": secondary_paralegal, "phase_name": p.get("phaseName"), "completed_tasks": completed_tasks, "pending_tasks": pending_tasks, "notice_service_date": notice_service_date, "notice_expiration_date": notice_expiration_date, "case_field_date": case_filed_date, "daily_rent_damages": daily_rent_damages, "default_date": default_date, "demurrer_hearing_date": demurrer_hearing_date, "motion_to_strike_hearing_date": motion_to_strike_hearing_date, "motion_to_quash_hearing_date": motion_to_quash_hearing_date, "other_motion_hearing_date": other_motion_hearing_date, "msc_date": msc_date, "msc_time": msc_time, "msc_address": msc_address, "msc_div_dept_room": msc_div_dept_room, "trial_date": trial_date, "trial_time": trial_time, "trial_address": trial_address, "trial_div_dept_room": trial_div_dept_room, "final_result": final_result, "date_of_settlement": date_of_settlement, "final_obligation": final_obligation, "def_comply_stip": def_comply_stip, "judgment_date": judgment_date, "writ_issued_date": writ_issued_date, "scheduled_lockout": scheduled_lockout, "oppose_stays": oppose_stays, "premises_safety": premises_safety, "matter_gate_code": matter_gate_code, "date_possession_recovered": date_possession_recovered, "attorney_fees": attorney_fees, "costs": costs, "documents_url": matter_overview.get('documentShareFolderURL') or '', "service_attempt_date_1": convert_to_pacific_time(next(iter(service_info), {}).get('serviceDate')), "contacts": cs, "ProjectEmailAddress": p.get("projectEmailAddress"), "Number": p.get("number"), "IncidentDate": convert_to_pacific_time(p.get("incidentDate") or detail.get("incidentDate")), "ProjectId": pid, "ProjectName": p.get("projectName") or detail.get("projectName"), "ProjectUrl": p.get("projectUrl") or detail.get("projectUrl"), "property_contacts": property_contacts } print(f"Finished on {pid} ({index}/{total})") return row def process_projects_parallel(projects: List[dict], bearer_token: str, max_workers: int = 9) -> List[dict]: """ Process projects in parallel using a worker pool. Args: projects: List of project data dictionaries bearer_token: Filevine API bearer token max_workers: Number of concurrent workers (default 20) Returns: List of processed project dictionaries """ # Create a thread pool with specified number of workers total = len(projects) with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers, initializer=worker_init, initargs=(bearer_token,)) as executor: # Submit all tasks to the executor future_to_project = {executor.submit(process_project, indx, total, project, bearer_token): project for indx, project in enumerate(projects)} # Collect results as they complete results = [] for future in concurrent.futures.as_completed(future_to_project): try: result = future.result() results.append(result) except Exception as e: print(f"[ERROR] Processing failed: {e}") # Add empty dict or handle error appropriately results.append({}) return results