From cfd7020e1b07f57f866bec4a971bb75fd1cf445c Mon Sep 17 00:00:00 2001 From: Bryce Date: Fri, 7 Nov 2025 15:03:20 -0800 Subject: [PATCH] multi --- app.py | 293 +++++++++++++++++++++++++------------------------ worker_pool.py | 232 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 380 insertions(+), 145 deletions(-) create mode 100644 worker_pool.py diff --git a/app.py b/app.py index d5b9f88..bb86e60 100644 --- a/app.py +++ b/app.py @@ -135,166 +135,169 @@ def fetch_all_projects(): # Fetch details for each detailed_rows = [] - for p in projects: - pid = (p.get("projectId") or {}).get("native") - c = fetch_client(bearer, (p.get("clientId") or {}).get("native")) - cs = fetch_contacts(bearer, pid) + # for p in projects: + # pid = (p.get("projectId") or {}).get("native") + # c = fetch_client(bearer, (p.get("clientId") or {}).get("native")) + # cs = fetch_contacts(bearer, pid) - if pid is None: - continue - try: - detail = fetch_project_detail(bearer, pid) - except Exception as e: - print(f"[WARN] detail fetch failed for {pid}: {e}") - detail = {} - from pprint import pprint - defendant_one = next((c.get('orgContact', {}) for c in cs if "Defendant" in c.get('orgContact', {}).get('personTypes', [])), {}) - new_file_review = fetch_form(bearer, pid, "newFileReview") or {} - dates_and_deadlines = fetch_form(bearer, pid, "datesAndDeadlines") or {} - service_info = fetch_collection(bearer, pid, "serviceInfo") or [] - property_info = fetch_form(bearer, pid, "propertyInfo") - matter_overview = fetch_form(bearer, pid, "matterOverview") - fees_and_costs = fetch_form(bearer, pid, "feesAndCosts") or {} - property_contacts = fetch_form(bearer, pid, "propertyContacts") or {} - pprint(property_contacts) - lease_info_np = fetch_form(bearer, pid, "leaseInfoNP") or {} + # if pid is None: + # continue + # try: + # detail = fetch_project_detail(bearer, pid) + # except Exception as e: + # print(f"[WARN] detail fetch failed for {pid}: {e}") + # detail = {} + # from pprint import pprint + # defendant_one = next((c.get('orgContact', {}) for c in cs if "Defendant" in c.get('orgContact', {}).get('personTypes', [])), {}) + # new_file_review = fetch_form(bearer, pid, "newFileReview") or {} + # dates_and_deadlines = fetch_form(bearer, pid, "datesAndDeadlines") or {} + # service_info = fetch_collection(bearer, pid, "serviceInfo") or [] + # property_info = fetch_form(bearer, pid, "propertyInfo") + # matter_overview = fetch_form(bearer, pid, "matterOverview") + # fees_and_costs = fetch_form(bearer, pid, "feesAndCosts") or {} + # property_contacts = fetch_form(bearer, pid, "propertyContacts") or {} + # pprint(property_contacts) + # lease_info_np = fetch_form(bearer, pid, "leaseInfoNP") or {} - completed_tasks = [{"description": x.get("body") , - "completed": convert_to_pacific_time(x.get("completedDate"))} - for x in fetch_project_tasks(bearer, pid).get("items") - if x.get("isCompleted")] - pending_tasks = [{"description": x.get("body") , - "completed": convert_to_pacific_time(x.get("completedDate"))} - for x in fetch_project_tasks(bearer, pid).get("items") - if not x.get("isCompleted")] + # completed_tasks = [{"description": x.get("body") , + # "completed": convert_to_pacific_time(x.get("completedDate"))} + # for x in fetch_project_tasks(bearer, pid).get("items") + # if x.get("isCompleted")] + # pending_tasks = [{"description": x.get("body") , + # "completed": convert_to_pacific_time(x.get("completedDate"))} + # for x in fetch_project_tasks(bearer, pid).get("items") + # if not x.get("isCompleted")] - team = fetch_project_team(bearer, pid) - assigned_attorney = next((m.get('fullname') - for m in team - if ('Assigned Attorney' in [r.get('name') for r in m.get('teamOrgRoles')]) - ), '') - primary_contact = next((m.get('fullname') - for m in team - if ('Primary' in [r.get('name') for r in m.get('teamOrgRoles')]) - ), '') - secondary_paralegal = next((m.get('fullname') - for m in team - if ('Secondary Paralegal' in [r.get('name') for r in m.get('teamOrgRoles')]) - ), '') + # team = fetch_project_team(bearer, pid) + # assigned_attorney = next((m.get('fullname') + # for m in team + # if ('Assigned Attorney' in [r.get('name') for r in m.get('teamOrgRoles')]) + # ), '') + # primary_contact = next((m.get('fullname') + # for m in team + # if ('Primary' in [r.get('name') for r in m.get('teamOrgRoles')]) + # ), '') + # secondary_paralegal = next((m.get('fullname') + # for m in team + # if ('Secondary Paralegal' in [r.get('name') for r in m.get('teamOrgRoles')]) + # ), '') - # Extract notice service and expiration dates - notice_service_date = convert_to_pacific_time(new_file_review.get("noticeServiceDate")) or '' - notice_expiration_date = convert_to_pacific_time(new_file_review.get("noticeExpirationDate")) or '' + # # Extract notice service and expiration dates + # notice_service_date = convert_to_pacific_time(new_file_review.get("noticeServiceDate")) or '' + # notice_expiration_date = convert_to_pacific_time(new_file_review.get("noticeExpirationDate")) or '' - # Extract daily rent damages - daily_rent_damages = lease_info_np.get("dailyRentDamages") or dates_and_deadlines.get("dailyRentDamages") or '' + # # Extract daily rent damages + # daily_rent_damages = lease_info_np.get("dailyRentDamages") or dates_and_deadlines.get("dailyRentDamages") or '' - # Extract default date - default_date = convert_to_pacific_time(dates_and_deadlines.get("defaultDate")) or '' - case_filed_date = convert_to_pacific_time(dates_and_deadlines.get("dateCaseFiled")) or '' + # # Extract default date + # default_date = convert_to_pacific_time(dates_and_deadlines.get("defaultDate")) or '' + # case_filed_date = convert_to_pacific_time(dates_and_deadlines.get("dateCaseFiled")) or '' - # Extract motion hearing dates - demurrer_hearing_date = convert_to_pacific_time(dates_and_deadlines.get("demurrerHearingDate")) or '' - motion_to_strike_hearing_date = convert_to_pacific_time(dates_and_deadlines.get("mTSHearingDate")) or '' - motion_to_quash_hearing_date = convert_to_pacific_time(dates_and_deadlines.get("mTQHearingDate")) or '' - other_motion_hearing_date = convert_to_pacific_time(dates_and_deadlines.get("otherMotion1HearingDate")) or '' + # # Extract motion hearing dates + # demurrer_hearing_date = convert_to_pacific_time(dates_and_deadlines.get("demurrerHearingDate")) or '' + # motion_to_strike_hearing_date = convert_to_pacific_time(dates_and_deadlines.get("mTSHearingDate")) or '' + # motion_to_quash_hearing_date = convert_to_pacific_time(dates_and_deadlines.get("mTQHearingDate")) or '' + # other_motion_hearing_date = convert_to_pacific_time(dates_and_deadlines.get("otherMotion1HearingDate")) or '' - # Extract MSC details - msc_date = convert_to_pacific_time(dates_and_deadlines.get("mSCDate")) or '' - msc_time = dates_and_deadlines.get("mSCTime") or '' # Time field, not converting - msc_address = dates_and_deadlines.get("mSCAddress") or '' - msc_div_dept_room = dates_and_deadlines.get("mSCDeptDiv") or '' + # # Extract MSC details + # msc_date = convert_to_pacific_time(dates_and_deadlines.get("mSCDate")) or '' + # msc_time = dates_and_deadlines.get("mSCTime") or '' # Time field, not converting + # msc_address = dates_and_deadlines.get("mSCAddress") or '' + # msc_div_dept_room = dates_and_deadlines.get("mSCDeptDiv") or '' - # Extract trial details - trial_date = convert_to_pacific_time(dates_and_deadlines.get("trialDate")) or '' - trial_time = dates_and_deadlines.get("trialTime") or '' # Time field, not converting - trial_address = dates_and_deadlines.get("trialAddress") or '' - trial_div_dept_room = dates_and_deadlines.get("trialDeptDivRoom") or '' + # # Extract trial details + # trial_date = convert_to_pacific_time(dates_and_deadlines.get("trialDate")) or '' + # trial_time = dates_and_deadlines.get("trialTime") or '' # Time field, not converting + # trial_address = dates_and_deadlines.get("trialAddress") or '' + # trial_div_dept_room = dates_and_deadlines.get("trialDeptDivRoom") or '' - # Extract final result of trial/MSC - final_result = dates_and_deadlines.get("finalResultOfTrialMSCCa") or '' + # # Extract final result of trial/MSC + # final_result = dates_and_deadlines.get("finalResultOfTrialMSCCa") or '' - # Extract settlement details - date_of_settlement = convert_to_pacific_time(dates_and_deadlines.get("dateOfStipulation")) or '' - final_obligation = dates_and_deadlines.get("finalObligationUnderTheStip") or '' - def_comply_stip = dates_and_deadlines.get("defendantsComplyWithStip") or '' + # # Extract settlement details + # date_of_settlement = convert_to_pacific_time(dates_and_deadlines.get("dateOfStipulation")) or '' + # final_obligation = dates_and_deadlines.get("finalObligationUnderTheStip") or '' + # def_comply_stip = dates_and_deadlines.get("defendantsComplyWithStip") or '' - # Extract judgment and writ details - judgment_date = convert_to_pacific_time(dates_and_deadlines.get("dateOfJudgment")) or '' - writ_issued_date = convert_to_pacific_time(dates_and_deadlines.get("writIssuedDate")) or '' + # # Extract judgment and writ details + # judgment_date = convert_to_pacific_time(dates_and_deadlines.get("dateOfJudgment")) or '' + # writ_issued_date = convert_to_pacific_time(dates_and_deadlines.get("writIssuedDate")) or '' - # Extract lockout and stay details - scheduled_lockout = convert_to_pacific_time(dates_and_deadlines.get("sheriffScheduledDate")) or '' - oppose_stays = dates_and_deadlines.get("opposeStays") or '' + # # Extract lockout and stay details + # scheduled_lockout = convert_to_pacific_time(dates_and_deadlines.get("sheriffScheduledDate")) or '' + # oppose_stays = dates_and_deadlines.get("opposeStays") or '' - # Extract premises safety and entry code - premises_safety = new_file_review.get("lockoutSafetyIssuesOrSpecialCareIssues") or '' - matter_gate_code = property_info.get("propertyEntryCodeOrInstructions") or '' + # # Extract premises safety and entry code + # premises_safety = new_file_review.get("lockoutSafetyIssuesOrSpecialCareIssues") or '' + # matter_gate_code = property_info.get("propertyEntryCodeOrInstructions") or '' - # Extract possession recovered date - date_possession_recovered = convert_to_pacific_time(dates_and_deadlines.get("datePossessionRecovered")) or '' + # # Extract possession recovered date + # date_possession_recovered = convert_to_pacific_time(dates_and_deadlines.get("datePossessionRecovered")) or '' - # Extract attorney fees and costs - attorney_fees = fees_and_costs.get("totalAttorneysFees") or '' - costs = fees_and_costs.get("totalCosts") or '' + # # Extract attorney fees and costs + # attorney_fees = fees_and_costs.get("totalAttorneysFees") or '' + # costs = fees_and_costs.get("totalCosts") or '' - row = { - "client": c.get("firstName"), - "matter_description": p.get("projectName"), - "defendant_1": defendant_one.get('fullName', 'Unknown'), - "matter_open": convert_to_pacific_time(dates_and_deadlines.get("dateCaseFiled") or p.get("createdDate")), - "notice_type": new_file_review.get("noticeType", '') or '', - "case_number": dates_and_deadlines.get('caseNumber', '') or '', - "premises_address": property_info.get("premisesAddressWithUnit") or '', - "premises_city": property_info.get("premisesCity") or '', - "responsible_attorney": assigned_attorney, - "staff_person": primary_contact, - "staff_person_2": secondary_paralegal, - "phase_name": p.get("phaseName"), - "completed_tasks": completed_tasks, - "pending_tasks": pending_tasks, - "notice_service_date": notice_service_date, - "notice_expiration_date": notice_expiration_date, - "case_field_date": case_filed_date, - "daily_rent_damages": daily_rent_damages, - "default_date": default_date, - "demurrer_hearing_date": demurrer_hearing_date, - "motion_to_strike_hearing_date": motion_to_strike_hearing_date, - "motion_to_quash_hearing_date": motion_to_quash_hearing_date, - "other_motion_hearing_date": other_motion_hearing_date, - "msc_date": msc_date, - "msc_time": msc_time, - "msc_address": msc_address, - "msc_div_dept_room": msc_div_dept_room, - "trial_date": trial_date, - "trial_time": trial_time, - "trial_address": trial_address, - "trial_div_dept_room": trial_div_dept_room, - "final_result": final_result, - "date_of_settlement": date_of_settlement, - "final_obligation": final_obligation, - "def_comply_stip": def_comply_stip, - "judgment_date": judgment_date, - "writ_issued_date": writ_issued_date, - "scheduled_lockout": scheduled_lockout, - "oppose_stays": oppose_stays, - "premises_safety": premises_safety, - "matter_gate_code": matter_gate_code, - "date_possession_recovered": date_possession_recovered, - "attorney_fees": attorney_fees, - "costs": costs, - "documents_url": matter_overview.get('documentShareFolderURL') or '', - "service_attempt_date_1": convert_to_pacific_time(next(iter(service_info), {}).get('serviceDate')), - "contacts": cs, - "ProjectEmailAddress": p.get("projectEmailAddress"), - "Number": p.get("number"), - "IncidentDate": convert_to_pacific_time(p.get("incidentDate") or detail.get("incidentDate")), - "ProjectId": pid, - "ProjectName": p.get("projectName") or detail.get("projectName"), - "ProjectUrl": p.get("projectUrl") or detail.get("projectUrl"), - } - detailed_rows.append(row) + # row = { + # "client": c.get("firstName"), + # "matter_description": p.get("projectName"), + # "defendant_1": defendant_one.get('fullName', 'Unknown'), + # "matter_open": convert_to_pacific_time(dates_and_deadlines.get("dateCaseFiled") or p.get("createdDate")), + # "notice_type": new_file_review.get("noticeType", '') or '', + # "case_number": dates_and_deadlines.get('caseNumber', '') or '', + # "premises_address": property_info.get("premisesAddressWithUnit") or '', + # "premises_city": property_info.get("premisesCity") or '', + # "responsible_attorney": assigned_attorney, + # "staff_person": primary_contact, + # "staff_person_2": secondary_paralegal, + # "phase_name": p.get("phaseName"), + # "completed_tasks": completed_tasks, + # "pending_tasks": pending_tasks, + # "notice_service_date": notice_service_date, + # "notice_expiration_date": notice_expiration_date, + # "case_field_date": case_filed_date, + # "daily_rent_damages": daily_rent_damages, + # "default_date": default_date, + # "demurrer_hearing_date": demurrer_hearing_date, + # "motion_to_strike_hearing_date": motion_to_strike_hearing_date, + # "motion_to_quash_hearing_date": motion_to_quash_hearing_date, + # "other_motion_hearing_date": other_motion_hearing_date, + # "msc_date": msc_date, + # "msc_time": msc_time, + # "msc_address": msc_address, + # "msc_div_dept_room": msc_div_dept_room, + # "trial_date": trial_date, + # "trial_time": trial_time, + # "trial_address": trial_address, + # "trial_div_dept_room": trial_div_dept_room, + # "final_result": final_result, + # "date_of_settlement": date_of_settlement, + # "final_obligation": final_obligation, + # "def_comply_stip": def_comply_stip, + # "judgment_date": judgment_date, + # "writ_issued_date": writ_issued_date, + # "scheduled_lockout": scheduled_lockout, + # "oppose_stays": oppose_stays, + # "premises_safety": premises_safety, + # "matter_gate_code": matter_gate_code, + # "date_possession_recovered": date_possession_recovered, + # "attorney_fees": attorney_fees, + # "costs": costs, + # "documents_url": matter_overview.get('documentShareFolderURL') or '', + # "service_attempt_date_1": convert_to_pacific_time(next(iter(service_info), {}).get('serviceDate')), + # "contacts": cs, + # "ProjectEmailAddress": p.get("projectEmailAddress"), + # "Number": p.get("number"), + # "IncidentDate": convert_to_pacific_time(p.get("incidentDate") or detail.get("incidentDate")), + # "ProjectId": pid, + # "ProjectName": p.get("projectName") or detail.get("projectName"), + # "ProjectUrl": p.get("projectUrl") or detail.get("projectUrl"), + # } + # detailed_rows.append(row) + + import worker_pool + detailed_rows = worker_pool.process_projects_parallel(projects, bearer, 5) # Store the results in Firestore projects_ref = db.collection("projects") # Clear existing projects @@ -387,7 +390,7 @@ def get_filevine_bearer(): resp.raise_for_status() js = resp.json() token = js.get("access_token") - print(f"Got bearer {token}") + print(f"Got bearer js", js) return token @@ -405,7 +408,7 @@ def list_all_projects(bearer: str): offset = 0 # TODO we probably need to sync the data with fierbase cnt = 0 - while True: + while len(results) < 200: cnt = len(results) print(f"list try {tries}, starting at {offset}, previous count {last_count}, currently at {cnt}") tries += 1 @@ -414,12 +417,12 @@ def list_all_projects(bearer: str): if last_count is not None: # Some deployments use LastID/Offset pagination; adapt if needed offset = offset + last_count - print(f"OFFSET f{offset}") params["offset"] = offset r = requests.get(url, headers=headers, params=params, timeout=30) r.raise_for_status() page = r.json() from pprint import pprint + print(f"Fetched page. Headers: {r.headers}, Offset: {offset}") items = page.get("items", []) results.extend(items) has_more = page.get("hasMore") diff --git a/worker_pool.py b/worker_pool.py new file mode 100644 index 0000000..5936ab0 --- /dev/null +++ b/worker_pool.py @@ -0,0 +1,232 @@ +import concurrent.futures +import threading +from typing import List, Any, Callable, Tuple +import time + +# Global thread-local storage for bearer token to avoid passing it around +_thread_local = threading.local() + +def get_bearer_token(): + """Get bearer token from thread local storage""" + return getattr(_thread_local, 'bearer', None) + +def set_bearer_token(token): + """Set bearer token in thread local storage""" + _thread_local.bearer = token + +def worker_init(bearer_token: str): + """Initialize worker with bearer token""" + set_bearer_token(bearer_token) + +def process_project(project_data: dict, bearer_token: str) -> dict: + """ + Process a single project with all its API calls. + This is the function that will be executed by workers in parallel. + """ + # Set the bearer token for this thread + set_bearer_token(bearer_token) + + from app import ( + fetch_client, + fetch_contacts, + fetch_project_detail, + fetch_form, + fetch_collection, + fetch_project_tasks, + fetch_project_team, + convert_to_pacific_time + ) + + p = project_data + pid = (p.get("projectId") or {}).get("native") + print(f"Working on {pid}") + c = fetch_client(bearer_token, (p.get("clientId") or {}).get("native")) + cs = fetch_contacts(bearer_token, pid) + + if pid is None: + return {} + + try: + detail = fetch_project_detail(bearer_token, pid) + except Exception as e: + print(f"[WARN] detail fetch failed for {pid}: {e}") + detail = {} + + defendant_one = next((c.get('orgContact', {}) for c in cs if "Defendant" in c.get('orgContact', {}).get('personTypes', [])), {}) + + new_file_review = fetch_form(bearer_token, pid, "newFileReview") or {} + dates_and_deadlines = fetch_form(bearer_token, pid, "datesAndDeadlines") or {} + service_info = fetch_collection(bearer_token, pid, "serviceInfo") or [] + property_info = fetch_form(bearer_token, pid, "propertyInfo") + matter_overview = fetch_form(bearer_token, pid, "matterOverview") + fees_and_costs = fetch_form(bearer_token, pid, "feesAndCosts") or {} + property_contacts = fetch_form(bearer_token, pid, "propertyContacts") or {} + lease_info_np = fetch_form(bearer_token, pid, "leaseInfoNP") or {} + + completed_tasks = [{"description": x.get("body"), + "completed": convert_to_pacific_time(x.get("completedDate"))} + for x in fetch_project_tasks(bearer_token, pid).get("items") + if x.get("isCompleted")] + pending_tasks = [{"description": x.get("body"), + "completed": convert_to_pacific_time(x.get("completedDate"))} + for x in fetch_project_tasks(bearer_token, pid).get("items") + if not x.get("isCompleted")] + + team = fetch_project_team(bearer_token, pid) + assigned_attorney = next((m.get('fullname') + for m in team + if ('Assigned Attorney' in [r.get('name') for r in m.get('teamOrgRoles')]) + ), '') + primary_contact = next((m.get('fullname') + for m in team + if ('Primary' in [r.get('name') for r in m.get('teamOrgRoles')]) + ), '') + secondary_paralegal = next((m.get('fullname') + for m in team + if ('Secondary Paralegal' in [r.get('name') for r in m.get('teamOrgRoles')]) + ), '') + + # Extract notice service and expiration dates + notice_service_date = convert_to_pacific_time(new_file_review.get("noticeServiceDate")) or '' + notice_expiration_date = convert_to_pacific_time(new_file_review.get("noticeExpirationDate")) or '' + + # Extract daily rent damages + daily_rent_damages = lease_info_np.get("dailyRentDamages") or dates_and_deadlines.get("dailyRentDamages") or '' + + # Extract default date + default_date = convert_to_pacific_time(dates_and_deadlines.get("defaultDate")) or '' + case_filed_date = convert_to_pacific_time(dates_and_deadlines.get("dateCaseFiled")) or '' + + # Extract motion hearing dates + demurrer_hearing_date = convert_to_pacific_time(dates_and_deadlines.get("demurrerHearingDate")) or '' + motion_to_strike_hearing_date = convert_to_pacific_time(dates_and_deadlines.get("mTSHearingDate")) or '' + motion_to_quash_hearing_date = convert_to_pacific_time(dates_and_deadlines.get("mTQHearingDate")) or '' + other_motion_hearing_date = convert_to_pacific_time(dates_and_deadlines.get("otherMotion1HearingDate")) or '' + + # Extract MSC details + msc_date = convert_to_pacific_time(dates_and_deadlines.get("mSCDate")) or '' + msc_time = dates_and_deadlines.get("mSCTime") or '' # Time field, not converting + msc_address = dates_and_deadlines.get("mSCAddress") or '' + msc_div_dept_room = dates_and_deadlines.get("mSCDeptDiv") or '' + + # Extract trial details + trial_date = convert_to_pacific_time(dates_and_deadlines.get("trialDate")) or '' + trial_time = dates_and_deadlines.get("trialTime") or '' # Time field, not converting + trial_address = dates_and_deadlines.get("trialAddress") or '' + trial_div_dept_room = dates_and_deadlines.get("trialDeptDivRoom") or '' + + # Extract final result of trial/MSC + final_result = dates_and_deadlines.get("finalResultOfTrialMSCCa") or '' + + # Extract settlement details + date_of_settlement = convert_to_pacific_time(dates_and_deadlines.get("dateOfStipulation")) or '' + final_obligation = dates_and_deadlines.get("finalObligationUnderTheStip") or '' + def_comply_stip = dates_and_deadlines.get("defendantsComplyWithStip") or '' + + # Extract judgment and writ details + judgment_date = convert_to_pacific_time(dates_and_deadlines.get("dateOfJudgment")) or '' + writ_issued_date = convert_to_pacific_time(dates_and_deadlines.get("writIssuedDate")) or '' + + # Extract lockout and stay details + scheduled_lockout = convert_to_pacific_time(dates_and_deadlines.get("sheriffScheduledDate")) or '' + oppose_stays = dates_and_deadlines.get("opposeStays") or '' + + # Extract premises safety and entry code + premises_safety = new_file_review.get("lockoutSafetyIssuesOrSpecialCareIssues") or '' + matter_gate_code = property_info.get("propertyEntryCodeOrInstructions") or '' + + # Extract possession recovered date + date_possession_recovered = convert_to_pacific_time(dates_and_deadlines.get("datePossessionRecovered")) or '' + + # Extract attorney fees and costs + attorney_fees = fees_and_costs.get("totalAttorneysFees") or '' + costs = fees_and_costs.get("totalCosts") or '' + + row = { + "client": c.get("firstName"), + "matter_description": p.get("projectName"), + "defendant_1": defendant_one.get('fullName', 'Unknown'), + "matter_open": convert_to_pacific_time(dates_and_deadlines.get("dateCaseFiled") or p.get("createdDate")), + "notice_type": new_file_review.get("noticeType", '') or '', + "case_number": dates_and_deadlines.get('caseNumber', '') or '', + "premises_address": property_info.get("premisesAddressWithUnit") or '', + "premises_city": property_info.get("premisesCity") or '', + "responsible_attorney": assigned_attorney, + "staff_person": primary_contact, + "staff_person_2": secondary_paralegal, + "phase_name": p.get("phaseName"), + "completed_tasks": completed_tasks, + "pending_tasks": pending_tasks, + "notice_service_date": notice_service_date, + "notice_expiration_date": notice_expiration_date, + "case_field_date": case_filed_date, + "daily_rent_damages": daily_rent_damages, + "default_date": default_date, + "demurrer_hearing_date": demurrer_hearing_date, + "motion_to_strike_hearing_date": motion_to_strike_hearing_date, + "motion_to_quash_hearing_date": motion_to_quash_hearing_date, + "other_motion_hearing_date": other_motion_hearing_date, + "msc_date": msc_date, + "msc_time": msc_time, + "msc_address": msc_address, + "msc_div_dept_room": msc_div_dept_room, + "trial_date": trial_date, + "trial_time": trial_time, + "trial_address": trial_address, + "trial_div_dept_room": trial_div_dept_room, + "final_result": final_result, + "date_of_settlement": date_of_settlement, + "final_obligation": final_obligation, + "def_comply_stip": def_comply_stip, + "judgment_date": judgment_date, + "writ_issued_date": writ_issued_date, + "scheduled_lockout": scheduled_lockout, + "oppose_stays": oppose_stays, + "premises_safety": premises_safety, + "matter_gate_code": matter_gate_code, + "date_possession_recovered": date_possession_recovered, + "attorney_fees": attorney_fees, + "costs": costs, + "documents_url": matter_overview.get('documentShareFolderURL') or '', + "service_attempt_date_1": convert_to_pacific_time(next(iter(service_info), {}).get('serviceDate')), + "contacts": cs, + "ProjectEmailAddress": p.get("projectEmailAddress"), + "Number": p.get("number"), + "IncidentDate": convert_to_pacific_time(p.get("incidentDate") or detail.get("incidentDate")), + "ProjectId": pid, + "ProjectName": p.get("projectName") or detail.get("projectName"), + "ProjectUrl": p.get("projectUrl") or detail.get("projectUrl"), + } + print(f"finished {pid}") + + return row + +def process_projects_parallel(projects: List[dict], bearer_token: str, max_workers: int = 9) -> List[dict]: + """ + Process projects in parallel using a worker pool. + + Args: + projects: List of project data dictionaries + bearer_token: Filevine API bearer token + max_workers: Number of concurrent workers (default 20) + + Returns: + List of processed project dictionaries + """ + # Create a thread pool with specified number of workers + with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers, initializer=worker_init, initargs=(bearer_token,)) as executor: + # Submit all tasks to the executor + future_to_project = {executor.submit(process_project, project, bearer_token): project for project in projects} + + # Collect results as they complete + results = [] + for future in concurrent.futures.as_completed(future_to_project): + try: + result = future.result() + results.append(result) + except Exception as e: + print(f"[ERROR] Processing failed: {e}") + # Add empty dict or handle error appropriately + results.append({}) + + return results