only load 7 days
This commit is contained in:
@@ -19,7 +19,7 @@ class FilevineClient:
|
||||
"x-fv-orgid": str(FV_ORG_ID),
|
||||
"x-fv-userid": str(FV_USER_ID),
|
||||
}
|
||||
|
||||
|
||||
def get_bearer_token(self) -> str:
|
||||
"""Get a new bearer token using Filevine credentials"""
|
||||
url = "https://identity.filevine.com/connect/token"
|
||||
@@ -41,16 +41,21 @@ class FilevineClient:
|
||||
self.bearer_token = token
|
||||
self.headers["Authorization"] = f"Bearer {token}"
|
||||
return token
|
||||
|
||||
def list_all_projects(self) -> List[Dict[str, Any]]:
|
||||
"""Fetch all projects from Filevine API"""
|
||||
|
||||
def list_all_projects(self, latest_activity_since: Optional[str] = None) -> List[Dict[str, Any]]:
|
||||
"""Fetch all projects from Filevine API, optionally filtered by latest activity date.
|
||||
|
||||
Args:
|
||||
latest_activity_since: Optional date string in mm/dd/yyyy, mm-dd-yyyy, or yyyy-mm-dd format.
|
||||
Only projects with activity since this date will be returned.
|
||||
"""
|
||||
base = f"{self.base_url}/Projects?limit=500"
|
||||
results = []
|
||||
last_count = None
|
||||
tries = 0
|
||||
offset = 0
|
||||
cnt = 0
|
||||
|
||||
|
||||
while True:
|
||||
cnt = len(results)
|
||||
print(f"list try {tries}, starting at {offset}, previous count {last_count}, currently at {cnt}")
|
||||
@@ -60,6 +65,11 @@ class FilevineClient:
|
||||
if last_count is not None:
|
||||
offset = offset + last_count
|
||||
params["offset"] = offset
|
||||
|
||||
# Add latestActivitySince filter if provided
|
||||
if latest_activity_since:
|
||||
params["latestActivitySince"] = latest_activity_since
|
||||
|
||||
r = requests.get(url, headers=self.headers, params=params, timeout=30)
|
||||
r.raise_for_status()
|
||||
page = r.json()
|
||||
@@ -73,42 +83,42 @@ class FilevineClient:
|
||||
if tries > 200:
|
||||
break
|
||||
return results
|
||||
|
||||
|
||||
def fetch_project_detail(self, project_id_native: int) -> Dict[str, Any]:
|
||||
"""Fetch detailed information for a specific project"""
|
||||
url = f"{self.base_url}/Projects/{project_id_native}"
|
||||
r = requests.get(url, headers=self.headers, timeout=30)
|
||||
r.raise_for_status()
|
||||
return r.json()
|
||||
|
||||
|
||||
def fetch_project_team(self, project_id_native: int) -> List[Dict[str, Any]]:
|
||||
"""Fetch team members for a specific project"""
|
||||
url = f"{self.base_url}/Projects/{project_id_native}/team?limit=1000"
|
||||
r = requests.get(url, headers=self.headers, timeout=30)
|
||||
r.raise_for_status()
|
||||
return r.json().get('items') or []
|
||||
|
||||
|
||||
def fetch_project_tasks(self, project_id_native: int) -> Dict[str, Any]:
|
||||
"""Fetch tasks for a specific project"""
|
||||
url = f"{self.base_url}/Projects/{project_id_native}/tasks"
|
||||
r = requests.get(url, headers=self.headers, timeout=30)
|
||||
r.raise_for_status()
|
||||
return r.json()
|
||||
|
||||
|
||||
def fetch_client(self, client_id_native: int) -> Dict[str, Any]:
|
||||
"""Fetch client information by client ID"""
|
||||
url = f"{self.base_url}/contacts/{client_id_native}"
|
||||
r = requests.get(url, headers=self.headers, timeout=30)
|
||||
r.raise_for_status()
|
||||
return r.json()
|
||||
|
||||
|
||||
def fetch_contacts(self, project_id_native: int) -> Optional[List[Dict[str, Any]]]:
|
||||
"""Fetch contacts for a specific project"""
|
||||
url = f"{self.base_url}/projects/{project_id_native}/contacts"
|
||||
r = requests.get(url, headers=self.headers, timeout=30)
|
||||
r.raise_for_status()
|
||||
return r.json().get("items")
|
||||
|
||||
|
||||
def fetch_form(self, project_id_native: int, form: str) -> Dict[str, Any]:
|
||||
"""Fetch a specific form for a project"""
|
||||
try:
|
||||
@@ -119,7 +129,7 @@ class FilevineClient:
|
||||
except Exception as e:
|
||||
print(e)
|
||||
return {}
|
||||
|
||||
|
||||
def fetch_collection(self, project_id_native: int, collection: str) -> List[Dict[str, Any]]:
|
||||
"""Fetch a collection for a project"""
|
||||
try:
|
||||
|
||||
Reference in New Issue
Block a user