574 lines
25 KiB
Python
574 lines
25 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Sync script to fetch and store projects in Firestore
|
|
This can be run manually from the command line to update the projects collection
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
import concurrent.futures
|
|
import threading
|
|
from typing import List, Dict, Any, Optional
|
|
from datetime import datetime, timedelta
|
|
import pytz
|
|
from dotenv import load_dotenv
|
|
load_dotenv()
|
|
|
|
# Add the current directory to the Python path so we can import app and models
|
|
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
|
|
|
|
def batch_write_to_firestore(db, collection_name: str, documents: List[tuple], batch_size: int = 500):
|
|
"""Write documents to Firestore in batches from the main thread.
|
|
|
|
Args:
|
|
db: Firestore client
|
|
collection_name: Name of the collection
|
|
documents: List of (doc_id, data) tuples
|
|
batch_size: Number of documents per batch
|
|
"""
|
|
collection = db.collection(collection_name)
|
|
total = len(documents)
|
|
written = 0
|
|
|
|
for i in range(0, total, batch_size):
|
|
batch = documents[i:i + batch_size]
|
|
try:
|
|
write_batch = db.batch()
|
|
for doc_id, data in batch:
|
|
ref = collection.document(str(doc_id))
|
|
write_batch.set(ref, data)
|
|
write_batch.commit()
|
|
written += len(batch)
|
|
print(f"[BATCH] Wrote {written}/{total} documents")
|
|
except Exception as e:
|
|
print(f"[ERROR] Batch write failed: {e}")
|
|
|
|
print(f"[BATCH] Completed writing {written} documents to Firestore")
|
|
|
|
def convert_to_pacific_time(date_str):
|
|
"""Convert UTC date string to Pacific Time and format as YYYY-MM-DD.
|
|
|
|
Args:
|
|
date_str (str): UTC date string in ISO 8601 format (e.g., "2025-10-24T19:20:22.377Z")
|
|
|
|
Returns:
|
|
str: Date formatted as YYYY-MM-DD in Pacific Time, or empty string if input is empty
|
|
"""
|
|
if not date_str:
|
|
return ''
|
|
|
|
try:
|
|
# Parse the UTC datetime
|
|
utc_time = datetime.fromisoformat(date_str.replace('Z', '+00:00'))
|
|
|
|
# Set timezone to UTC
|
|
utc_time = utc_time.replace(tzinfo=pytz.UTC)
|
|
|
|
# Convert to Pacific Time
|
|
pacific_time = utc_time.astimezone(pytz.timezone('America/Los_Angeles'))
|
|
|
|
# Format as YYYY-MM-DD
|
|
return pacific_time.strftime('%Y-%m-%d')
|
|
except (ValueError, AttributeError) as e:
|
|
print(f"[WARN] Date conversion failed for '{date_str}': {e}")
|
|
return ''
|
|
|
|
|
|
def parse_local_date(date_str):
|
|
"""Parse a date string that is already in local (Pacific) time as YYYY-MM-DD.
|
|
|
|
Filevine date-picker fields store calendar dates as midnight UTC, but they
|
|
represent the user's local date. This function extracts just the date portion
|
|
without any timezone conversion.
|
|
|
|
Args:
|
|
date_str (str): ISO 8601 date string (e.g., "2026-05-29T00:00:00Z")
|
|
|
|
Returns:
|
|
str: Date formatted as YYYY-MM-DD, or empty string if input is empty
|
|
"""
|
|
if not date_str:
|
|
return ''
|
|
|
|
try:
|
|
dt = datetime.fromisoformat(date_str.replace('Z', '+00:00'))
|
|
return dt.strftime('%Y-%m-%d')
|
|
except (ValueError, AttributeError) as e:
|
|
print(f"[WARN] Date parse failed for '{date_str}': {e}")
|
|
return ''
|
|
|
|
|
|
def extract_domains_from_emails(emails: List[str]) -> List[str]:
|
|
"""Extract unique domains from a list of email addresses.
|
|
|
|
Args:
|
|
emails (List[str]): List of email addresses
|
|
|
|
Returns:
|
|
List[str]: List of unique domains extracted from the emails
|
|
"""
|
|
if not emails:
|
|
return []
|
|
|
|
domains = set()
|
|
for email in emails:
|
|
if email and '@' in email:
|
|
domain = email.split('@')[1].lower()
|
|
domains.add(domain)
|
|
|
|
return sorted(list(domains))
|
|
|
|
|
|
def record_sync_stats(db, recent_successes: int, oldest_successes: int, failures: int):
|
|
"""Record sync statistics for today in Firestore.
|
|
|
|
Args:
|
|
db: Firestore client
|
|
recent_successes: Number of recently active projects updated
|
|
oldest_successes: Number of oldest projects updated
|
|
failures: Number of failed updates
|
|
"""
|
|
from datetime import datetime as dt
|
|
pacific = pytz.timezone('America/Los_Angeles')
|
|
today = dt.now(pacific).strftime('%Y-%m-%d')
|
|
doc_id = f"sync_{today}"
|
|
|
|
try:
|
|
doc_ref = db.collection("sync_stats").document(doc_id)
|
|
doc = doc_ref.get()
|
|
if doc.exists:
|
|
current = doc.to_dict()
|
|
doc_ref.update({
|
|
"recent_successes": current.get("recent_successes", 0) + recent_successes,
|
|
"oldest_successes": current.get("oldest_successes", 0) + oldest_successes,
|
|
"failures": current.get("failures", 0) + failures,
|
|
"updated_at": dt.now(pytz.UTC).isoformat()
|
|
})
|
|
else:
|
|
doc_ref.set({
|
|
"date": today,
|
|
"recent_successes": recent_successes,
|
|
"oldest_successes": oldest_successes,
|
|
"failures": failures,
|
|
"created_at": dt.now(pytz.UTC).isoformat()
|
|
})
|
|
print(f"[STATS] Recorded sync stats: recent={recent_successes}, oldest={oldest_successes}, failures={failures}")
|
|
except Exception as e:
|
|
print(f"[ERROR] Failed to record sync stats: {e}")
|
|
|
|
from models.project_model import ProjectModel
|
|
from filevine_client import FilevineClient
|
|
|
|
# Global thread-local storage for FilevineClient to avoid passing it around
|
|
_thread_local = threading.local()
|
|
|
|
def get_filevine_client():
|
|
"""Get FilevineClient from thread local storage"""
|
|
return getattr(_thread_local, 'client', None)
|
|
|
|
def set_filevine_client(client):
|
|
"""Set FilevineClient in thread local storage"""
|
|
_thread_local.client = client
|
|
|
|
def worker_init(client: FilevineClient):
|
|
"""Initialize worker with FilevineClient"""
|
|
set_filevine_client(client)
|
|
|
|
def process_project(index: int, total: int, project_data: dict, client: FilevineClient) -> Dict[str, Any]:
|
|
"""
|
|
Process a single project with all its API calls.
|
|
This is the function that will be executed by workers in parallel.
|
|
"""
|
|
# Set the FilevineClient for this thread
|
|
set_filevine_client(client)
|
|
|
|
p = project_data
|
|
pid = (p.get("projectId") or {}).get("native")
|
|
client = get_filevine_client()
|
|
|
|
if pid is None:
|
|
print(f"[SKIP] Missing projectId for item {index}")
|
|
return {}
|
|
|
|
project_name = p.get("projectName", "")
|
|
try:
|
|
c = client.fetch_client((p.get("clientId") or {}).get("native"))
|
|
cs = client.fetch_contacts(pid)
|
|
detail = client.fetch_project_detail(pid)
|
|
except Exception as e:
|
|
print(f"[ERROR] Failed to fetch essential data for project {pid} '{project_name}': {e}")
|
|
return {}
|
|
|
|
defendant_one = next((c.get('orgContact', {}) for c in cs if "Defendant" in c.get('orgContact', {}).get('personTypes', [])), {})
|
|
|
|
try:
|
|
new_file_review = client.fetch_form(pid, "newFileReview") or {}
|
|
dates_and_deadlines = client.fetch_form(pid, "datesAndDeadlines") or {}
|
|
service_info = client.fetch_collection(pid, "serviceInfo") or []
|
|
property_info = client.fetch_form(pid, "propertyInfo") or {}
|
|
matter_overview = client.fetch_form(pid, "matterOverview") or {}
|
|
fees_and_costs = client.fetch_form(pid, "feesAndCosts") or {}
|
|
property_contacts = client.fetch_form(pid, "propertyContacts") or {}
|
|
lease_info_np = client.fetch_form(pid, "leaseInfoNP") or {}
|
|
|
|
tasks_result = client.fetch_project_tasks(pid)
|
|
completed_tasks = [{"description": x.get("body"),
|
|
"completed": convert_to_pacific_time(x.get("completedDate"))}
|
|
for x in tasks_result.get("items", [])
|
|
if x.get("isCompleted")]
|
|
pending_tasks = [{"description": x.get("body"),
|
|
"completed": convert_to_pacific_time(x.get("completedDate"))}
|
|
for x in tasks_result.get("items", [])
|
|
if not x.get("isCompleted")]
|
|
|
|
team = client.fetch_project_team(pid)
|
|
assigned_attorney = next((m.get('fullname')
|
|
for m in team
|
|
if ('Assigned Attorney' in [r.get('name') for r in m.get('teamOrgRoles')])
|
|
), '')
|
|
primary_contact = next((m.get('fullname')
|
|
for m in team
|
|
if ('Primary' in [r.get('name') for r in m.get('teamOrgRoles')])
|
|
), '')
|
|
secondary_paralegal = next((m.get('fullname')
|
|
for m in team
|
|
if ('Secondary Paralegal' in [r.get('name') for r in m.get('teamOrgRoles')])
|
|
), '')
|
|
|
|
# Extract notice service and expiration dates
|
|
notice_service_date = parse_local_date(new_file_review.get("noticeServiceDate")) or ''
|
|
notice_expiration_date = parse_local_date(new_file_review.get("noticeExpirationDate")) or ''
|
|
|
|
# Extract daily rent damages
|
|
daily_rent_damages = lease_info_np.get("dailyRentDamages") or dates_and_deadlines.get("dailyRentDamages") or ''
|
|
|
|
# Extract default date
|
|
default_date = parse_local_date(dates_and_deadlines.get("defaultDate")) or ''
|
|
case_filed_date = parse_local_date(dates_and_deadlines.get("dateCaseFiled")) or ''
|
|
|
|
# Extract motion hearing dates
|
|
demurrer_hearing_date = parse_local_date(dates_and_deadlines.get("demurrerHearingDate")) or ''
|
|
motion_to_strike_hearing_date = parse_local_date(dates_and_deadlines.get("mTSHearingDate")) or ''
|
|
motion_to_quash_hearing_date = parse_local_date(dates_and_deadlines.get("mTQHearingDate")) or ''
|
|
other_motion_hearing_date = parse_local_date(dates_and_deadlines.get("otherMotion1HearingDate")) or ''
|
|
|
|
# Extract MSC details
|
|
msc_date = parse_local_date(dates_and_deadlines.get("mSCDate")) or ''
|
|
msc_time = dates_and_deadlines.get("mSCTime") or '' # Time field, not converting
|
|
msc_address = dates_and_deadlines.get("mSCAddress") or ''
|
|
msc_div_dept_room = dates_and_deadlines.get("mSCDeptDiv") or ''
|
|
|
|
# Extract trial details
|
|
trial_date = parse_local_date(dates_and_deadlines.get("trialDate")) or ''
|
|
trial_time = dates_and_deadlines.get("trialTime") or '' # Time field, not converting
|
|
trial_address = dates_and_deadlines.get("trialAddress") or ''
|
|
trial_div_dept_room = dates_and_deadlines.get("trialDeptDivRoom") or ''
|
|
|
|
# Extract final result of trial/MSC
|
|
final_result = dates_and_deadlines.get("finalResultOfTrialMSCCa") or ''
|
|
|
|
# Extract settlement details
|
|
date_of_settlement = parse_local_date(dates_and_deadlines.get("dateOfStipulation")) or ''
|
|
final_obligation = dates_and_deadlines.get("finalObligationUnderTheStip") or ''
|
|
def_comply_stip = dates_and_deadlines.get("defendantsComplyWithStip") or ''
|
|
|
|
# Extract judgment and writ details
|
|
judgment_date = parse_local_date(dates_and_deadlines.get("dateOfJudgment")) or ''
|
|
writ_issued_date = parse_local_date(dates_and_deadlines.get("writIssuedDate")) or ''
|
|
|
|
# Extract lockout and stay details
|
|
scheduled_lockout = parse_local_date(dates_and_deadlines.get("sheriffScheduledDate")) or ''
|
|
oppose_stays = dates_and_deadlines.get("opposeStays") or ''
|
|
|
|
# Extract premises safety and entry code
|
|
premises_safety = new_file_review.get("lockoutSafetyIssuesOrSpecialCareIssues") or ''
|
|
matter_gate_code = property_info.get("propertyEntryCodeOrInstructions") or ''
|
|
|
|
# Extract possession recovered date
|
|
date_possession_recovered = parse_local_date(dates_and_deadlines.get("datePossessionRecovered")) or ''
|
|
|
|
# Extract attorney fees and costs
|
|
attorney_fees = fees_and_costs.get("totalAttorneysFees") or ''
|
|
costs = fees_and_costs.get("totalCosts") or ''
|
|
property_managers = [property_contacts.get('propertyManager1'), property_contacts.get('propertyManager2'), property_contacts.get('propertyManager3'), property_contacts.get('propertyManager4')]
|
|
valid_property_managers = [e.get('address').lower() for pm in property_managers if pm and pm.get('emails') for e in pm.get('emails') if e and e.get('address')]
|
|
|
|
|
|
row = ProjectModel(
|
|
client=c.get("firstName", ""),
|
|
matter_description=p.get("projectName", ""),
|
|
defendant_1=defendant_one.get('fullName', 'Unknown'),
|
|
matter_open=parse_local_date(dates_and_deadlines.get("dateCaseFiled") or p.get("createdDate")),
|
|
notice_type=new_file_review.get("noticeType", '') or '',
|
|
case_number=dates_and_deadlines.get('caseNumber', '') or '',
|
|
premises_address=property_info.get("premisesAddressWithUnit", "") or '',
|
|
premises_city=property_info.get("premisesCity", "") or '',
|
|
responsible_attorney=assigned_attorney,
|
|
staff_person=primary_contact,
|
|
staff_person_2=secondary_paralegal,
|
|
phase_name=p.get("phaseName", ""),
|
|
completed_tasks=completed_tasks,
|
|
pending_tasks=pending_tasks,
|
|
notice_service_date=notice_service_date,
|
|
notice_expiration_date=notice_expiration_date,
|
|
case_field_date=case_filed_date,
|
|
case_filed_date=case_filed_date,
|
|
daily_rent_damages=daily_rent_damages,
|
|
default_date=default_date,
|
|
demurrer_hearing_date=demurrer_hearing_date,
|
|
motion_to_strike_hearing_date=motion_to_strike_hearing_date,
|
|
motion_to_quash_hearing_date=motion_to_quash_hearing_date,
|
|
other_motion_hearing_date=other_motion_hearing_date,
|
|
msc_date=msc_date,
|
|
msc_time=msc_time,
|
|
msc_address=msc_address,
|
|
msc_div_dept_room=msc_div_dept_room,
|
|
trial_date=trial_date,
|
|
trial_time=trial_time,
|
|
trial_address=trial_address,
|
|
trial_div_dept_room=trial_div_dept_room,
|
|
final_result=final_result,
|
|
date_of_settlement=date_of_settlement,
|
|
final_obligation=final_obligation,
|
|
def_comply_stip=def_comply_stip,
|
|
judgment_date=judgment_date,
|
|
writ_issued_date=writ_issued_date,
|
|
scheduled_lockout=scheduled_lockout,
|
|
oppose_stays=oppose_stays,
|
|
premises_safety=premises_safety,
|
|
matter_gate_code=matter_gate_code,
|
|
date_possession_recovered=date_possession_recovered,
|
|
attorney_fees=attorney_fees,
|
|
costs=costs,
|
|
documents_url=matter_overview.get('documentShareFolderURL', '') or '',
|
|
service_attempt_date_1=parse_local_date(next(iter(service_info), {}).get('serviceDate')),
|
|
contacts=cs,
|
|
project_email_address=p.get("projectEmailAddress", ""),
|
|
number=p.get("number", "") or matter_overview.get('matterNumber', ''),
|
|
incident_date=parse_local_date(p.get("incidentDate") or detail.get("incidentDate")),
|
|
project_id=pid,
|
|
project_name=p.get("projectName") or detail.get("projectName"),
|
|
project_url=p.get("projectUrl") or detail.get("projectUrl"),
|
|
#property_contacts=property_contacts
|
|
viewing_emails = valid_property_managers,
|
|
viewing_domains = extract_domains_from_emails(valid_property_managers),
|
|
last_synced_at=datetime.now(pytz.UTC).isoformat()
|
|
)
|
|
print(f"[{index}/{total}] Saved: {pid} | Matter {row.number} | {project_name}")
|
|
return row.to_dict()
|
|
|
|
except Exception as e:
|
|
print(f"[ERROR] Failed to process project {pid} '{project_name}': {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
return {}
|
|
|
|
def process_projects_parallel(projects: List[dict], client: FilevineClient, max_workers: int = 10) -> List[Dict[str, Any]]:
|
|
"""
|
|
Process projects in parallel using a worker pool.
|
|
|
|
Args:
|
|
projects: List of project data dictionaries
|
|
client: FilevineClient instance
|
|
max_workers: Number of concurrent workers (default 10)
|
|
|
|
Returns:
|
|
List of processed project dictionaries
|
|
"""
|
|
total = len(projects)
|
|
success_count = 0
|
|
fail_count = 0
|
|
|
|
print(f"[WORKERS] Starting parallel processing of {total} projects with {max_workers} workers...")
|
|
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers, initializer=worker_init, initargs=(client,)) as executor:
|
|
future_to_project = {executor.submit(process_project, indx, total, project, client): project for indx, project in enumerate(projects)}
|
|
|
|
results = []
|
|
for future in concurrent.futures.as_completed(future_to_project):
|
|
try:
|
|
result = future.result()
|
|
if result and result.get('ProjectId'):
|
|
success_count += 1
|
|
else:
|
|
fail_count += 1
|
|
results.append(result)
|
|
except Exception as e:
|
|
fail_count += 1
|
|
print(f"[ERROR] Worker thread failed: {e}")
|
|
results.append({})
|
|
|
|
print(f"[WORKERS] Completed: {success_count} succeeded, {fail_count} failed, {total} total")
|
|
return results
|
|
|
|
def get_oldest_unsynced_projects(db, fraction: float = 0.2) -> List[int]:
|
|
"""Get the oldest fraction of projects by last_synced_at from Firestore.
|
|
|
|
Args:
|
|
db: Firestore client
|
|
fraction: Fraction of projects to return (default 0.2 = 1/5th)
|
|
|
|
Returns:
|
|
List of project IDs (native) that need syncing
|
|
"""
|
|
try:
|
|
projects_ref = db.collection("projects")
|
|
all_docs = list(projects_ref.stream())
|
|
|
|
# Exclude archived projects from the sync pool
|
|
active_docs = [doc for doc in all_docs if not doc.to_dict().get("is_archived")]
|
|
total = len(active_docs)
|
|
count_to_sync = max(1, int(total * fraction))
|
|
|
|
# Sort by last_synced_at ascending (empty strings first, then oldest timestamps)
|
|
sorted_docs = sorted(active_docs, key=lambda doc: doc.to_dict().get("last_synced_at", ""))
|
|
selected_docs = sorted_docs[:count_to_sync]
|
|
result_ids = [int(doc.id) for doc in selected_docs if doc.id and doc.id != "None"]
|
|
|
|
print(f"[SYNC STRATEGY] {total} active projects in Firestore, will sync oldest {len(result_ids)} ({fraction*100:.0f}%)")
|
|
if selected_docs:
|
|
sample = selected_docs[0].to_dict()
|
|
print(f"[SYNC STRATEGY] Oldest: ID={result_ids[0]}, last_synced_at='{sample.get('last_synced_at', 'N/A')}'")
|
|
if len(selected_docs) > 1:
|
|
sample = selected_docs[-1].to_dict()
|
|
print(f"[SYNC STRATEGY] Cutoff: ID={result_ids[-1]}, last_synced_at='{sample.get('last_synced_at', 'N/A')}'")
|
|
return result_ids
|
|
|
|
except Exception as e:
|
|
print(f"[ERROR] Failed to get oldest unsynced projects: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
return []
|
|
|
|
|
|
def main():
|
|
"""Main function to fetch and sync projects"""
|
|
import argparse
|
|
|
|
parser = argparse.ArgumentParser(description='Sync Filevine projects to Firestore')
|
|
parser.add_argument('--mode', choices=['full', 'last_n', 'oldest_percent', 'hybrid', 'single'],
|
|
default='hybrid', help='Sync mode: full=all projects, last_n=recently active, oldest_percent=oldest by last_synced_at, hybrid=last_n+oldest_percent, single=one project')
|
|
parser.add_argument('--days', type=int, default=14, help='Number of days for last_n mode (default: 14)')
|
|
parser.add_argument('--percent', type=float, default=20.0, help='Percentage for oldest_percent mode (default: 20)')
|
|
parser.add_argument('--project-id', type=int, help='Project ID for single mode (required when mode=single)')
|
|
args = parser.parse_args()
|
|
|
|
if args.mode == 'single' and not args.project_id:
|
|
parser.error("--project-id is required when mode is 'single'")
|
|
|
|
print(f"[SYNC] Starting sync - mode={args.mode}, workers=10")
|
|
try:
|
|
client = FilevineClient()
|
|
client.get_bearer_token()
|
|
from app import db
|
|
|
|
recent_successes = 0
|
|
oldest_successes = 0
|
|
total_failures = 0
|
|
|
|
if args.mode == 'full':
|
|
print("[MODE] Full sync - fetching all projects")
|
|
projects = client.list_all_projects()
|
|
detailed_rows = process_projects_parallel(projects, client, max_workers=10)
|
|
documents = []
|
|
for row in detailed_rows:
|
|
if row.get('ProjectId'):
|
|
row['is_archived'] = (row.get('phase_name') == 'Archived')
|
|
documents.append((row.get('ProjectId'), row))
|
|
batch_write_to_firestore(db, "projects", documents)
|
|
success_count = sum(1 for r in detailed_rows if r.get('ProjectId'))
|
|
fail_count = len(detailed_rows) - success_count
|
|
record_sync_stats(db, success_count, 0, fail_count)
|
|
|
|
elif args.mode == 'last_n':
|
|
days_ago = (datetime.now() - timedelta(days=args.days)).strftime('%Y-%m-%d')
|
|
print(f"[MODE] Last {args.days} days - fetching active since {days_ago}")
|
|
projects = client.list_all_projects(latest_activity_since=days_ago)
|
|
detailed_rows = process_projects_parallel(projects, client, max_workers=10)
|
|
documents = []
|
|
for row in detailed_rows:
|
|
if row.get('ProjectId'):
|
|
row['is_archived'] = (row.get('phase_name') == 'Archived')
|
|
documents.append((row.get('ProjectId'), row))
|
|
batch_write_to_firestore(db, "projects", documents)
|
|
success_count = sum(1 for r in detailed_rows if r.get('ProjectId'))
|
|
fail_count = len(detailed_rows) - success_count
|
|
record_sync_stats(db, success_count, 0, fail_count)
|
|
|
|
elif args.mode == 'oldest_percent':
|
|
fraction = args.percent / 100.0
|
|
oldest_ids = get_oldest_unsynced_projects(db, fraction=fraction)
|
|
print(f"[MODE] Oldest {args.percent}% - fetching {len(oldest_ids)} projects")
|
|
|
|
all_projects = client.list_all_projects()
|
|
projects = [p for p in all_projects if p.get("projectId", {}).get("native") in set(oldest_ids)]
|
|
detailed_rows = process_projects_parallel(projects, client, max_workers=10)
|
|
documents = []
|
|
for row in detailed_rows:
|
|
if row.get('ProjectId'):
|
|
row['is_archived'] = (row.get('phase_name') == 'Archived')
|
|
documents.append((row.get('ProjectId'), row))
|
|
batch_write_to_firestore(db, "projects", documents)
|
|
success_count = sum(1 for r in detailed_rows if r.get('ProjectId'))
|
|
fail_count = len(detailed_rows) - success_count
|
|
record_sync_stats(db, 0, success_count, fail_count)
|
|
|
|
elif args.mode == 'single':
|
|
print(f"[MODE] Single project - fetching project {args.project_id}")
|
|
project_detail = client.fetch_project_detail(args.project_id)
|
|
projects = [project_detail] if project_detail else []
|
|
detailed_rows = process_projects_parallel(projects, client, max_workers=10)
|
|
documents = []
|
|
for row in detailed_rows:
|
|
if row.get('ProjectId'):
|
|
row['is_archived'] = (row.get('phase_name') == 'Archived')
|
|
documents.append((row.get('ProjectId'), row))
|
|
batch_write_to_firestore(db, "projects", documents)
|
|
success_count = sum(1 for r in detailed_rows if r.get('ProjectId'))
|
|
fail_count = len(detailed_rows) - success_count
|
|
record_sync_stats(db, success_count, 0, fail_count)
|
|
|
|
elif args.mode == 'hybrid':
|
|
print("[MODE] Hybrid - active + oldest")
|
|
|
|
days_ago = (datetime.now() - timedelta(days=args.days)).strftime('%Y-%m-%d')
|
|
active_projects = client.list_all_projects(latest_activity_since=days_ago)
|
|
active_ids = {p.get("projectId", {}).get("native") for p in active_projects}
|
|
print(f"[SYNC] {len(active_projects)} active since {days_ago}")
|
|
|
|
fraction = args.percent / 100.0
|
|
oldest_ids = get_oldest_unsynced_projects(db, fraction=fraction)
|
|
|
|
all_ids_to_sync = active_ids.union(set(oldest_ids))
|
|
print(f"[SYNC] {len(all_ids_to_sync)} total unique to sync")
|
|
|
|
all_projects = client.list_all_projects()
|
|
projects = [p for p in all_projects if p.get("projectId", {}).get("native") in all_ids_to_sync]
|
|
detailed_rows = process_projects_parallel(projects, client, max_workers=10)
|
|
|
|
# Classify successes by source
|
|
project_ids_synced = {r.get('ProjectId') for r in detailed_rows if r.get('ProjectId')}
|
|
recent_successes = len([pid for pid in project_ids_synced if pid in active_ids])
|
|
oldest_successes = len([pid for pid in project_ids_synced if pid in oldest_ids])
|
|
|
|
documents = []
|
|
for row in detailed_rows:
|
|
if row.get('ProjectId'):
|
|
row['is_archived'] = (row.get('phase_name') == 'Archived')
|
|
documents.append((row.get('ProjectId'), row))
|
|
batch_write_to_firestore(db, "projects", documents)
|
|
|
|
total_failures = len(detailed_rows) - len(project_ids_synced)
|
|
record_sync_stats(db, recent_successes, oldest_successes, total_failures)
|
|
|
|
print(f"[SYNC] Complete - {len(documents)} projects saved to Firestore")
|
|
|
|
except Exception as e:
|
|
print(f"Error during sync: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
sys.exit(1)
|
|
|
|
if __name__ == "__main__":
|
|
main()
|