6 Commits

Author SHA1 Message Date
8528be9faa Add Bonanza Produce multi-invoice statement template
- Added multi-invoice template for Bonanza Produce with :multi and :multi-match? flags
- Template uses keywords for statement header to identify multi-invoice format
- Extracts invoice-number, date, customer-identifier (from RETURN line), and total
- Parses 4 invoices from statement PDF 13595522.pdf
- All tests pass (29 assertions, 0 failures, 0 errors)

- Added test: parse-bonanza-produce-statement-13595522
- Updated invoice-template-creator skill: emphasized test-first approach
2026-02-08 07:33:50 -08:00
9b0382e587 Add invoice-template-creator skill for automated template generation
New repository-based skill at .claude/skills/invoice-template-creator/:
- SKILL.md: Complete guide for creating invoice parsing templates
- references/examples.md: Common patterns and template examples
- Covers vendor identification, regex patterns, field extraction
- Includes testing strategies and common pitfalls

Updated AGENTS.md with reference to the new skill.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2026-02-08 07:33:46 -08:00
ef64d5fc41 Extract customer name and address for Bonanza Produce
- customer-identifier field: customer name (e.g., 'NICK THE GREEK')
- account-number field: street address (e.g., '600 VISTA WAY')
- Combined they provide full customer identification with address
- Updated test to verify both fields and their concatenation

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2026-02-08 07:33:33 -08:00
13ac73c7c5 Add location extraction for Bonanza Produce invoices
- Extract city/state/zip in location field
- Customer address now split across 3 fields:
  - customer-identifier: customer name
  - account-number: street address
  - location: city, state zip
- All components verified in test

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2026-02-08 07:33:30 -08:00
f51f4a0b8e Improve Bonanza Produce customer identifier extraction
- Extract customer name in customer-identifier field
- Extract street address in account-number field
- Use non-greedy regex with lookahead to capture clean values
- Update test to verify both name and address extraction

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2026-02-08 07:33:10 -08:00
9283f9238f Add Bonanza Produce invoice template
- Add new PDF template for Bonanza Produce vendor
- Template uses phone number 530-544-4136 as unique identifier
- Extracts invoice number, date, customer identifier, and total
- Includes passing test for invoice 03881260

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2026-02-08 07:33:05 -08:00
29 changed files with 685 additions and 3055 deletions

View File

@@ -1,174 +0,0 @@
---
name: clojure-eval
description: Evaluate Clojure code via nREPL using clj-nrepl-eval. Use this when you need to test code, check if edited files compile, verify function behavior, or interact with a running REPL session.
---
# Clojure REPL Evaluation
## When to Use This Skill
Use this skill when you need to:
- **Verify that edited Clojure files compile and load correctly**
- Test function behavior interactively
- Check the current state of the REPL
- Debug code by evaluating expressions
- Require or load namespaces for testing
- Validate that code changes work before committing
## How It Works
The `clj-nrepl-eval` command evaluates Clojure code against an nREPL server. **Session state persists between evaluations**, so you can require a namespace in one evaluation and use it in subsequent calls. Each host:port combination maintains its own session file.
## Instructions
### 0. Discover and select nREPL server
First, discover what nREPL servers are running in the current directory:
```bash
clj-nrepl-eval --discover-ports
```
This will show all nREPL servers (Clojure, Babashka, shadow-cljs, etc.) running in the current project directory.
**Then use the AskUserQuestion tool:**
- **If ports are discovered:** Prompt user to select which nREPL port to use:
- **question:** "Which nREPL port would you like to use?"
- **header:** "nREPL Port"
- **options:** Present each discovered port as an option with:
- **label:** The port number
- **description:** The server type and status (e.g., "Clojure nREPL server in current directory")
- Include up to 4 discovered ports as options
- The user can select "Other" to enter a custom port number
- **If no ports are discovered:** Prompt user how to start an nREPL server:
- **question:** "No nREPL servers found. How would you like to start one?"
- **header:** "Start nREPL"
- **options:**
- **label:** "deps.edn alias", **description:** "Find and use an nREPL alias in deps.edn"
- **label:** "Leiningen", **description:** "Start nREPL using 'lein repl'"
- The user can select "Other" for alternative methods or if they already have a server running on a specific port
IMPORTANT: IF you start a REPL do not supply a port let the nREPL start and return the port that it was started on.
### 1. Evaluate Clojure Code
> Evaluation automatically connects to the given port
Use the `-p` flag to specify the port and pass your Clojure code.
**Recommended: Pass code as a command-line argument:**
```bash
clj-nrepl-eval -p <PORT> "(+ 1 2 3)"
```
**For multiple expressions (single line):**
```bash
clj-nrepl-eval -p <PORT> "(def x 10) (+ x 20)"
```
**Alternative: Using heredoc (may require permission approval for multiline commands):**
```bash
clj-nrepl-eval -p <PORT> <<'EOF'
(def x 10)
(+ x 20)
EOF
```
**Alternative: Via stdin pipe:**
```bash
echo "(+ 1 2 3)" | clj-nrepl-eval -p <PORT>
```
### 2. Display nREPL Sessions
**Discover all nREPL servers in current directory:**
```bash
clj-nrepl-eval --discover-ports
```
Shows all running nREPL servers in the current project directory, including their type (clj/bb/basilisp) and whether they match the current working directory.
**Check previously connected sessions:**
```bash
clj-nrepl-eval --connected-ports
```
Shows only connections you have made before (appears after first evaluation on a port).
### 3. Common Patterns
**Require a namespace (always use :reload to pick up changes):**
```bash
clj-nrepl-eval -p <PORT> "(require '[my.namespace :as ns] :reload)"
```
**Test a function after requiring:**
```bash
clj-nrepl-eval -p <PORT> "(ns/my-function arg1 arg2)"
```
**Check if a file compiles:**
```bash
clj-nrepl-eval -p <PORT> "(require 'my.namespace :reload)"
```
**Multiple expressions:**
```bash
clj-nrepl-eval -p <PORT> "(def x 10) (* x 2) (+ x 5)"
```
**Complex multiline code (using heredoc):**
```bash
clj-nrepl-eval -p <PORT> <<'EOF'
(def x 10)
(* x 2)
(+ x 5)
EOF
```
*Note: Heredoc syntax may require permission approval.*
**With custom timeout (in milliseconds):**
```bash
clj-nrepl-eval -p <PORT> --timeout 5000 "(long-running-fn)"
```
**Reset the session (clears all state):**
```bash
clj-nrepl-eval -p <PORT> --reset-session
clj-nrepl-eval -p <PORT> --reset-session "(def x 1)"
```
## Available Options
- `-p, --port PORT` - nREPL port (required)
- `-H, --host HOST` - nREPL host (default: 127.0.0.1)
- `-t, --timeout MILLISECONDS` - Timeout (default: 120000 = 2 minutes)
- `-r, --reset-session` - Reset the persistent nREPL session
- `-c, --connected-ports` - List previously connected nREPL sessions
- `-d, --discover-ports` - Discover nREPL servers in current directory
- `-h, --help` - Show help message
## Important Notes
- **Prefer command-line arguments:** Pass code as quoted strings: `clj-nrepl-eval -p <PORT> "(+ 1 2 3)"` - works with existing permissions
- **Heredoc for complex code:** Use heredoc (`<<'EOF' ... EOF`) for truly multiline code, but note it may require permission approval
- **Sessions persist:** State (vars, namespaces, loaded libraries) persists across invocations until the nREPL server restarts or `--reset-session` is used
- **Automatic delimiter repair:** The tool automatically repairs missing or mismatched parentheses
- **Always use :reload:** When requiring namespaces, use `:reload` to pick up recent changes
- **Default timeout:** 2 minutes (120000ms) - increase for long-running operations
- **Input precedence:** Command-line arguments take precedence over stdin
## Typical Workflow
1. Discover nREPL servers: `clj-nrepl-eval --discover-ports`
2. Use **AskUserQuestion** tool to prompt user to select a port
3. Require namespace:
```bash
clj-nrepl-eval -p <PORT> "(require '[my.ns :as ns] :reload)"
```
4. Test function:
```bash
clj-nrepl-eval -p <PORT> "(ns/my-fn ...)"
```
5. Iterate: Make changes, re-require with `:reload`, test again

View File

@@ -1,82 +0,0 @@
# clj-nrepl-eval Examples
## Discovery
```bash
clj-nrepl-eval --connected-ports
```
## Heredoc for Multiline Code
```bash
clj-nrepl-eval -p 7888 <<'EOF'
(defn greet [name]
(str "Hello, " name "!"))
(greet "Claude")
EOF
```
### Heredoc Simplifies String Escaping
Heredoc avoids shell escaping issues with quotes, backslashes, and special characters:
```bash
# With heredoc - no escaping needed
clj-nrepl-eval -p 7888 <<'EOF'
(def regex #"\\d{3}-\\d{4}")
(def message "She said \"Hello!\" and waved")
(def path "C:\\Users\\name\\file.txt")
(println message)
EOF
# Without heredoc - requires complex escaping
clj-nrepl-eval -p 7888 "(def message \"She said \\\"Hello!\\\" and waved\")"
```
## Working with Project Namespaces
```bash
# Test a function after requiring
clj-nrepl-eval -p 7888 <<'EOF'
(require '[clojure-mcp-light.delimiter-repair :as dr] :reload)
(dr/delimiter-error? "(defn foo [x]")
EOF
```
## Verify Compilation After Edit
```bash
# If this returns nil, the file compiled successfully
clj-nrepl-eval -p 7888 "(require 'clojure-mcp-light.hook :reload)"
```
## Session Management
```bash
# Reset session if state becomes corrupted
clj-nrepl-eval -p 7888 --reset-session
```
## Common Workflow Patterns
### Load, Test, Iterate
```bash
# After editing a file, reload and test in one command
clj-nrepl-eval -p 7888 <<'EOF'
(require '[my.namespace :as ns] :reload)
(ns/my-function test-data)
EOF
```
### Run Tests After Changes
```bash
clj-nrepl-eval -p 7888 <<'EOF'
(require '[my.project.core :as core] :reload)
(require '[my.project.core-test :as test] :reload)
(clojure.test/run-tests 'my.project.core-test)
EOF
```

File diff suppressed because one or more lines are too long

3
.gitattributes vendored
View File

@@ -1,3 +0,0 @@
# Use bd merge for beads JSONL files
.beads/issues.jsonl merge=beads

View File

@@ -1,127 +0,0 @@
---
name: gitea-tea
description: Use tea CLI to create, manage, and checkout Gitea pull requests. Use this when opening a PR, managing PRs, or checking out PRs on the gitea remote (gitea.story-basking.ts.net).
---
# Gitea Tea CLI Skill
This skill covers using `tea` (Gitea's official CLI) for pull request workflows in this project.
## When to Use This Skill
Use this skill when you need to:
- Create a PR from a working branch to master on the gitea remote
- Open, list, or view PRs
- Checkout a PR locally for review or iteration
- Manage PR state (close, reopen, merge)
## Project Setup
The gitea remote is `gitea.story-basking.ts.net` with repo slug `notid/integreat`. The default push remote is **gitea**, NOT origin and NOT deploy.
In this project's environment:
- Gitea login is pre-configured for `gitea.story-basking.ts.net`
- Repo slug: `notid/integreat`
- Target branch for PRs: `master`
- The git remote named `gitea` points to this instance
## Creating a PR
Use `tea pulls create` to open a PR from the current branch to master. Always specify `-r notid/integreat -b master`:
```bash
tea pulls create -r notid/integreat -b master --title "Title" --description "Body"
```
Common flags:
- `-t, --title` - PR title
- `-d, --description` - PR body/description (use heredoc or file for long descriptions)
- `-a, --assignees` - Comma-separated usernames to assign
- `-L, --labels` - Comma-separated labels to apply
- `-m, --milestone` - Milestone to assign
**Writing a multiline description:**
```bash
tea pulls create -r notid/integreat -b master \
-t "feat: add feature" \
-d "$(cat <<'EOF'
## Summary
- Bullet point one
- Bullet point two
EOF
)"
```
Or write the body to a temp file first and reference it.
## Listing PRs
```bash
tea pulls list -r notid/integreat # List open PRs
tea pulls list -r notid/integreat --state all # All PRs
tea pulls list -r notid/integreat --limit 10 -o simple # Limit output, simple format
```
## Opening a PR in Browser
```bash
tea open pr <number> -r notid/integreat
tea open pr create -r notid/integreat # Open web UI to create a PR
```
## Checking Out a PR Locally
```bash
tea pulls checkout <number> -r notid/integreat
```
This fetches and checks out the PR branch locally.
## Managing PR State
**Close a PR:**
```bash
tea pulls close <number> -r notid/integreat --confirm
```
**Reopen a closed PR:**
```bash
tea pulls reopen <number> -r notid/integreat --confirm
```
**Merge a PR:**
```bash
tea pulls merge <number> -r notid/integreat --confirm
```
**Edit a PR (title, description, etc.):**
```bash
tea pulls edit <number> -r notid/integreat --title "New title" --description "New body"
```
## Full PR Creation Workflow
1. Ensure the branch is pushed to gitea:
```bash
git push gitea <branch-name>
```
2. Create the PR with tea:
```bash
tea pulls create -r notid/integreat -b master \
--title "feat: description of change" \
--description "Detailed PR body here"
```
3. Open the PR in browser to verify:
```bash
tea open pr <number> -r notid/integreat
```
## Tips
- Always use `-r notid/integreat` to specify the repo explicitly
- Use `-b master` to set the target branch (default may differ)
- The `--confirm` flag is required for destructive actions (close, merge)
- Use `-o simple`, `-o json`, `-o table`, etc. to control output format

View File

@@ -1,9 +0,0 @@
## Pull Requests on Gitea
This project uses **gitea story-basking.ts net** as the primary remote for PRs. Use `tea` (the Gitea CLI) to create and manage pull requests. The gitea remote is the one you push to, NOT origin and NOT deploy.
**When opening a PR**, load and follow the **gitea-tea** skill. In short:
- Target branch is always `master`
- Use `tea pulls create -r notid/integreat -b master --title "..." --description "..."`
Use 'bd' for task tracking

Binary file not shown.

View File

@@ -1,376 +0,0 @@
---
title: Rebase Invoice Templates, Merge to Master, and Integrate Branches
type: refactor
date: 2026-02-08
---
# Rebase Invoice Templates, Merge to Master, and Integrate Branches
## Overview
This plan outlines a series of git operations to reorganize the branch structure by:
1. Creating a rebase commit with all invoice template changes
2. Applying those changes onto `master`
3. Removing them from the current `clauding` branch
4. Merging `master` back into `clauding`
5. Finally merging `clauding` into `get-transactions2-page-working`
## Current State
### Branch Structure (as of Feb 8, 2026)
```
master (dc021b8c)
├─ deploy/master (dc021b8c)
└─ (other branches)
└─ clauding (0155d91e) - HEAD
├─ 16 commits ahead of master
└─ Contains invoice template work for Bonanza Produce
├─ db1cb194 Add Bonanza Produce invoice template
├─ ec754233 Improve Bonanza Produce customer identifier extraction
├─ af7bc324 Add location extraction for Bonanza Produce invoices
├─ 62107c99 Extract customer name and address for Bonanza Produce
├─ 7ecd569e Add invoice-template-creator skill for automated template generation
└─ 0155d91e Add Bonanza Produce multi-invoice statement template
```
### Merge Base
- **Merge base between `clauding` and `master`**: `dc021b8c`
- **Commits on `clauding` since merge base**: 16 commits
- **Invoice template commits**: 6 commits (db1cb194 through 0155d91e)
## Problem Statement
The current branch structure has:
1. Invoice template work mixed with other feature development in `clauding`
2. No clear separation between invoice template changes and transaction page work
3. A desire to get invoice template changes merged to `master` independently
4. A need to reorganize branches to prepare for merging `get-transactions2-page-working`
## Proposed Solution
Use git rebase and merge operations to create a cleaner branch hierarchy:
1. **Create a new branch** (`invoice-templates-rebased`) with only invoice template commits
2. **Rebase those commits** onto current `master`
3. **Merge** this clean branch to `master`
4. **Remove invoice template commits** from `clauding` branch
5. **Merge `master` into `clauding`** to sync
6. **Merge `clauding` into `get-transactions2-page-working`**
## Implementation Steps
### Phase 1: Extract and Rebase Invoice Templates
#### Step 1.1: Identify Invoice Template Commits
```bash
# From clauding branch, find the range of invoice template commits
git log --oneline --reverse dc021b8c..clauding
```
**Invoice template commits to extract** (6 commits in order):
1. `db1cb194` - Add Bonanza Produce invoice template
2. `ec754233` - Improve Bonanza Produce customer identifier extraction
3. `af7bc324` - Add location extraction for Bonanza Produce invoices
4. `62107c99` - Extract customer name and address for Bonanza Produce
5. `7ecd569e` - Add invoice-template-creator skill for automated template generation
6. `0155d91e` - Add Bonanza Produce multi-invoice statement template
#### Step 1.2: Create Rebased Branch
```bash
# Create a new branch from master with only invoice template commits
git checkout master
git pull origin master # Ensure master is up to date
git checkout -b invoice-templates-rebased
# Cherry-pick the invoice template commits in order
git cherry-pick db1cb194
git cherry-pick ec754233
git cherry-pick af7bc324
git cherry-pick 62107c99
git cherry-pick 7ecd569e
git cherry-pick 0155d91e
# Resolve any conflicts that arise during cherry-pick
# Run tests after each cherry-pick if conflicts occur
```
#### Step 1.3: Verify Rebased Branch
```bash
# Verify the commits are correctly applied
git log --oneline master..invoice-templates-rebased
# Run tests to ensure invoice templates still work
lein test auto-ap.parse.templates-test
```
#### Step 1.4: Merge to Master
```bash
# Merge the clean invoice templates to master
git checkout master
git merge invoice-templates-rebased --no-edit
# Push to remote
git push origin master
```
### Phase 2: Clean Up Clauding Branch
#### Step 2.1: Remove Invoice Template Commits from Clauding
```bash
# From clauding branch, find the commit before the first invoice template
git log --oneline clauding | grep -B1 "db1cb194"
# Suppose that's commit X, rebase clauding to remove invoice templates
git checkout clauding
# Option A: Interactive rebase (recommended for cleanup)
git rebase -i <commit-before-invoice-templates>
# In the editor, delete lines corresponding to invoice template commits:
# db1cb194
# ec754233
# af7bc324
# 62107c99
# 7ecd569e
# 0155d91e
# Save and exit to rebase
# Resolve any conflicts that occur
# Run tests after rebase
```
**OR**
```bash
# Option B: Hard reset to commit before invoice templates
# Identify the commit hash before db1cb194 (let's call it COMMIT_X)
git reset --hard COMMIT_X
# Then add back any non-invoice template commits from clauding
# (commits after the invoice templates that should remain)
git cherry-pick <non-invoice-commits-if-any>
```
#### Step 2.2: Verify Clauding Branch Cleanup
```bash
# Verify invoice template commits are removed
git log --oneline | grep -i "bonanza" # Should be empty
# Verify other commits remain
git log --oneline -20
# Run tests to ensure nothing broke
lein test
```
#### Step 2.3: Force Push Updated Clauding
```bash
# Force push the cleaned branch (use --force-with-lease for safety)
git push --force-with-lease origin clauding
```
### Phase 3: Sync Clauding with Master
#### Step 3.1: Merge Master into Clauding
```bash
git checkout clauding
git merge master --no-edit
# Resolve any conflicts
# Run tests
```
#### Step 3.2: Push Synced Clauding
```bash
git push origin clauding
```
### Phase 4: Final Merge to get-transactions2-page-working
#### Step 4.1: Merge Clauding to get-transactions2-page-working
```bash
git checkout get-transactions2-page-working
git merge clauding --no-edit
# Resolve any conflicts
# Run tests
```
#### Step 4.2: Push Final Branch
```bash
git push origin get-transactions2-page-working
```
## Acceptance Criteria
### Pre-operations Validation
- [ ] All invoice template commits identified correctly (6 commits)
- [ ] Merge base commit (`dc021b8c`) confirmed
- [ ] Current branch state documented
- [ ] Team notified of branch manipulation
### Post-Rebase Validation
- [ ] `invoice-templates-rebased` branch created from `master`
- [ ] All 6 invoice template commits applied correctly
- [ ] All invoice template tests pass
- [ ] No conflicts or unexpected changes during cherry-pick
### Post-Master Validation
- [ ] Invoice templates merged to `master`
- [ ] Changes pushed to remote `master`
- [ ] CI/CD passes on `master`
### Post-Cleanup Validation
- [ ] `clauding` branch has only non-invoice template commits
- [ ] No Bonanza Produce commits remain in `clauding` history
- [ ] All `clauding` tests pass
- [ ] Force push successful
### Post-Sync Validation
- [ ] `clauding` merged with `master`
- [ ] All conflicts resolved
- [ ] Changes pushed to remote
### Final Merge Validation
- [ ] `get-transactions2-page-working` merged with `clauding`
- [ ] All conflicts resolved
- [ ] Final tests pass
- [ ] Changes pushed to remote
## Success Metrics
- **Branch structure**: Invoice templates cleanly separated on `master`
- **Commit history**: Linear, no duplicate invoice template commits
- **Tests passing**: 100% of existing tests pass after each step
- **No data loss**: All work preserved in appropriate branches
- **Branch clarity**: Each branch has a clear, focused purpose
## Dependencies & Risks
### Dependencies
- [ ] All current work on `clauding` should be backed up or committed
- [ ] Team should be aware of branch manipulation to avoid force pushing
- [ ] CI/CD should be monitored during operations
### Risks
1. **Force push risk**: Force pushing `clauding` will rewrite history
- **Mitigation**: Use `--force-with-lease`, notify team beforehand
2. **Conflict resolution**: Multiple merge/conflict resolution points
- **Mitigation**: Test after each step, resolve conflicts carefully
3. **Work loss**: Potential to lose commits if operations go wrong
- **Mitigation**: Create backups, verify each step before proceeding
4. **CI/CD disruption**: Force pushes may affect CI/CD pipelines
- **Mitigation**: Coordinate with team, avoid during active deployments
### Contingency Plan
If something goes wrong:
1. **Recover `clauding` branch**:
```bash
git checkout clauding
git reset --hard origin/clauding # Restore from remote backup
```
2. **Recover master**:
```bash
git checkout master
git reset --hard origin/master # Restore from deploy/master
```
3. **Manual cherry-pick recovery**: If rebasing failed, manually cherry-pick remaining commits
## Alternative Approaches Considered
### Approach 1: Squash and Merge
**Pros**: Single clean commit, simple history
**Cons**: Loses individual commit history and context
**Rejected because**: Team uses merge commits (not squash), and individual commit history is valuable for tracking invoice template development.
### Approach 2: Keep Branches Separate
**Pros**: No branch manipulation needed
**Cons**: Branches remain tangled, harder to track progress
**Rejected because**: Goal is to cleanly separate invoice templates from transaction work.
### Approach 3: Rebase Clauding Onto Master
**Pros**: Linear history
**Cons**: Requires force push, may lose merge context
**Rejected because**: Current team workflow uses merge commits, and merging master into clauding preserves the integration point.
### Approach 4: Create New Branch Instead of Cleanup
**Pros**: Less risky, preserves full history
**Cons**: Accumulates branches, harder to track
**Rejected because**: Goal is cleanup and reorganization, not preservation.
## Related Work
- **Previous invoice template work**: `2026-02-07-feat-add-invoice-template-03881260-plan.md`
- **Current branch structure**: `clauding` has hierarchical relationship with `get-transactions2-page-working`
- **Team git workflow**: Uses merge commits (not rebasing), per repo research
## References & Research
### Internal References
- **Branch management patterns**: Repo research analysis (see `task_id: ses_3c2287be8ffe9icFi5jHEspaqh`)
- **Invoice template location**: `src/clj/auto_ap/parse/templates.clj`
- **Current branch structure**: Git log analysis
### Git Operations Documentation
- **Cherry-pick**: `git cherry-pick <commit>`
- **Interactive rebase**: `git rebase -i <base>`
- **Force push with lease**: `git push --force-with-lease`
- **Merge commits**: `git merge <branch> --no-edit`
### File Locations
- Templates: `src/clj/auto_ap/parse/templates.clj`
- Parser logic: `src/clj/auto_ap/parse.clj`
- Invoice PDF: `dev-resources/INVOICE - 03881260.pdf`
## Testing Plan
### Before Each Major Step
```bash
# Verify current branch state
git branch -vv
git log --oneline -10
# Run all tests
lein test
# Run specific invoice template tests
lein test auto-ap.parse.templates-test
```
### After Each Major Step
- Verify commit count and order
- Run full test suite
- Check for unintended changes
- Verify remote branch state matches local
## Notes
- **Team coordination**: Inform team before force pushing to avoid conflicts
- **Backup strategy**: All commits are preserved in the rebase process
- **Testing**: Verify at each step to catch issues early
- **Safety first**: Use `--force-with-lease` instead of `--force`
- **Documentation**: This plan serves as documentation for the operation

View File

@@ -1,250 +0,0 @@
---
title: Move Detailed Sales Data to DuckDB and Parquet
type: refactor
status: active
date: 2026-04-24
---
# Move Detailed Sales Data to DuckDB and Parquet
## Overview
Detailed sales records (orders, charges, line items, refunds) are currently stored in Datomic. Because Datomic is append-only, this high-volume data causes significant storage bloat. We will move these details to Parquet files stored on S3, using DuckDB as the query engine for views and summaries, while keeping the high-level `sales-summaries` in Datomic for ledger calculations.
---
## Problem Frame
The system stores every individual sale and payment detail in Datomic. While useful for auditing, this data is rarely accessed in detail after a few weeks, yet it permanently increases the Datomic database size. The app needs a "colder" but still queryable storage layer for these details.
---
## Requirements Trace
- R1. Detailed sales/payment entities must be moved from Datomic to Parquet files on S3.
- R2. `sales-summaries` must remain in Datomic to ensure ledger calculations remain performant and stable.
- R3. The "Sales Orders" and "Payments" views must continue to function (filtering, sorting, pagination) by querying the Parquet files via DuckDB.
- R4. The daily sales summary job must be updated to aggregate data from DuckDB instead of Datomic.
- R5. The system must handle "voids" of payments/orders in an immutable file format.
---
## Scope Boundaries
- **In Scope:**
- Implementation of Parquet writer for sales data.
- DuckDB integration for reading S3 Parquet files.
- Migration of existing detailed data from Datomic to S3.
- Updating the summary aggregation job.
- **Out of Scope:**
- Moving `sales-summaries` out of Datomic.
- Implementing a real-time streaming pipeline (sticking to batch/daily flushes).
---
## Context & Research
### Relevant Code and Patterns
- **Production Flow:** `auto-ap.square.core3`, `auto-ap.ezcater.core`, and `auto-ap.routes.ezcater-xls` all produce tagged maps that are currently sent to `dc/transact`.
- **Read Flow:** `auto-ap.datomic.sales-orders` and `auto-ap.ssr.payments` perform the current Datomic queries.
- **Aggregation:** `auto-ap.jobs.sales-summaries` uses `dc/q` to sum totals for the day.
---
## Key Technical Decisions
- **Storage Format:** Parquet. It is columnar, highly compressed, and natively supported by DuckDB.
- **Storage Location:** AWS S3. This removes the need for a managed database server.
- **Query Engine:** DuckDB. It can query Parquet files directly on S3 without importing them into a local database.
- **Write Strategy:** Daily Batch. To avoid the "small file problem" in S3/Parquet, data will be buffered (locally or in a staging table) and flushed as one file per day: `s3://bucket/sales-details/YYYY-MM-DD.parquet`.
- **Voiding Strategy:** Append-only log. A "void" is simply a new record with the same `external-id` and a `status: voided`. The read query will always select the record with the latest timestamp for a given ID.
---
## Implementation Units
- U1. **S3 Storage & DuckDB Infrastructure**
**Goal:** Setup the S3 bucket structure and the DuckDB connection utility.
**Requirements:** R1, R3
**Dependencies:** None
**Files:**
- Create: `src/clj/auto_ap/storage/parquet.clj` (DuckDB connection and S3 config)
**Approach:**
- Implement a `with-duckdb` wrapper that initializes DuckDB, loads the `httpfs` extension, and configures S3 credentials.
**Verification:**
- A test that can run a simple `SELECT 1` via DuckDB.
---
- U2. **Parquet Writer Implementation**
**Goal:** Create a service to convert sales maps into Parquet files and upload them to S3.
**Requirements:** R1
**Dependencies:** U1
**Files:**
- Modify: `src/clj/auto_ap/storage/parquet.clj`
- Test: `test/clj/auto_ap/storage/parquet_test.clj`
**Approach:**
- Implement a `flush-to-parquet` function that takes a collection of maps and uses a library to create the file.
- Implement the S3 upload logic.
- **Recovery:** Implement a "flush-log" in the local SQLite WAL. Mark records as `flushed: true` only after receiving a successful 200 OK from S3. On startup, the system should check for unflushed records and trigger a retry.
**Test scenarios:**
- Happy path: Write a list of 10 sales orders to a Parquet file and verify it exists on S3.
- Error path: Simulate an S3 connection failure during flush and verify that records remain in the local WAL and are successfully flushed on the next attempt.
- Edge case: Handle empty data sets without creating empty files.
**Verification:**
- Successful upload of a Parquet file that is readable by an external DuckDB CLI.
---
- U3. **Redirect Production Flow**
**Goal:** Change the Square/EzCater integrations to write to the Parquet writer instead of Datomic.
**Requirements:** R1
**Dependencies:** U2
**Files:**
- Modify: `src/clj/auto_ap/square/core3.clj`
- Modify: `src/clj/auto_ap/ezcater/core.clj`
- Modify: `src/clj/auto_ap/routes/ezcater_xls.clj`
**Approach:**
- Replace `dc/transact` calls for detailed sales/charges with calls to the new `parquet/write` service.
- *Note:* Keep the transaction for any related entities that must stay in Datomic (e.g., Client updates).
**Verification:**
- Run a Square import and verify that no new detailed entities appear in Datomic, but a new Parquet file is created.
---
- U4. **DuckDB Read Layer for Views**
**Goal:** Update the "Sales Orders" and "Payments" views to fetch data from DuckDB.
**Requirements:** R3, R5
**Dependencies:** U1
**Files:**
- Modify: `src/clj/auto_ap/datomic/sales_orders.clj`
- Modify: `src/clj/auto_ap/ssr/payments.clj`
- Test: `test/clj/auto_ap/integration/graphql/checks.clj`
**Approach:**
- Replace Datomic `q` and `pull` calls with DuckDB SQL queries.
- **Performance:** To optimize pagination, implement a "Metadata Index" file on S3 (or a Datomic entity) that stores the total record count per day. Use this to calculate pagination totals without scanning all Parquet files.
- **Deterministic Voids:** Use a combination of `timestamp` and a monotonic `sequence_number` for the `QUALIFY` clause to ensure deterministic results for records updated in the same millisecond.
- Map DuckDB result sets back to the existing map formats used by the views to minimize frontend changes.
**Test scenarios:**
- Happy path: List payments for a client across a date range.
- Integration: Void a payment in S3 and verify the view shows it as voided.
- Performance: Verify pagination totals load in < 200ms using the metadata index.
- Edge case: Handle two updates to the same record in the same millisecond and verify the latest sequence number wins.
**Verification:**
- The Payments table in the UI loads correctly and reflects the data in S3.
---
- U5. **Update Summary Aggregation Job**
**Goal:** Update the `sales-summaries` job to calculate totals using DuckDB.
**Requirements:** R2, R4
**Dependencies:** U1
**Files:**
- Modify: `src/clj/auto_ap/jobs/sales_summaries.clj`
**Approach:**
- In `get-payment-items`, `get-discounts`, `get-tax`, etc., replace the `dc/q` calls with DuckDB SQL `SUM` and `GROUP BY` queries against the daily Parquet files.
- Ensure the results are still written to the `sales-summary` entities in Datomic.
**Verification:**
- Run the `sales-summaries-v2` job and verify that the resulting Datomic summaries match the values in the S3 Parquet files.
---
- U6. **Historical Data Migration**
**Goal:** Move all existing detailed sales data from Datomic to Parquet files.
**Requirements:** R1
**Dependencies:** U2
**Files:**
- Create: `src/clj/auto_ap/migration/sales_to_parquet.clj`
**Approach:**
- Write a script that iterates through all historical sales orders and payments in Datomic.
- Group them by **Business Date** (the date of the sale, not the transaction date) to ensure consistency with future DuckDB queries.
- Write each day's data to the corresponding `YYYY-MM-DD.parquet` file on S3.
- Log any records with missing dates to a "dead-letter" file for manual review.
**Verification:**
- Count of records in Datomic vs count of records in S3.
---
- U7. **Datomic Cleanup**
**Goal:** Remove the detailed data from Datomic to reclaim space.
**Requirements:** R1
**Dependencies:** U6
**Files:**
- Create: `src/clj/auto_ap/migration/cleanup_sales.clj`
**Approach:**
- Use `[:db/retractEntity ...]` to remove all `#:sales-order`, `#:charge`, and `#:sales-refund` entities.
- **Batching:** Perform retractions in batches (e.g., by month) with a cooldown period between batches to avoid excessive Datomic transaction log bloat and performance degradation.
- *Safety:* Only run this after verifying U6 and U4.
**Verification:**
- Datomic database size decreases; detailed queries in Datomic return empty, while DuckDB queries return data.
---
## System-Wide Impact
- **Interaction graph:** The integration cores now depend on the Parquet/S3 service. The SSR views and Background Jobs now depend on the DuckDB service.
- **Error propagation:** S3 downtime will now cause "Sales Orders" views to fail and the Summary Job to fail. We should implement basic retry logic in the DuckDB wrapper.
- **State lifecycle risks:** There is a window between the "production" of a sale and the "flush" to Parquet. If the app crashes before a flush, data could be lost. *Mitigation:* Use a small local SQLite file as a write-ahead log for the daily buffer.
---
## Risks & Dependencies
| Risk | Mitigation |
|------|------------|
| S3 Latency for Views | Use DuckDB's caching and only query the files for the requested date range. |
| Data Loss before Flush | Implement a local SQLite staging file for the current day's data. |
| Schema Drift | Use a strict schema for Parquet files; handle missing columns in SQL with `COALESCE`. |
---
## Sources & References
- Related code: `src/clj/auto_ap/jobs/sales_summaries.clj`
- Related code: `src/clj/auto_ap/ssr/payments.clj`
- External docs: [DuckDB S3 Integration](https://duckdb.org/docs/extensions/httpfs)

View File

@@ -1,2 +0,0 @@
{"seq-no":1777103077792,"record":{"id":2}}{"seq-no":1777103077984,"record":{"id":1,"name":"test"}}{"seq-no":1777103126496,"record":{"id":2}}
{"seq-no":1777103126692,"record":{"id":1,"name":"test"}}

View File

@@ -93,14 +93,18 @@
[hiccup "2.0.0-alpha2"]
;; needed for java 11
[javax.xml.bind/jaxb-api "2.4.0-b180830.0359"]
[io.forward/clojure-mail "1.0.8"]
[lambdaisland/edn-lines "1.0.10"]
[org.duckdb/duckdb_jdbc "1.1.0"]
[org.xerial/sqlite-jdbc "3.45.1.0"]
[com.fasterxml.jackson.core/jackson-core "2.12.0"]
[com.fasterxml.jackson.core/jackson-databind "2.12.0"]
;; needed for java 11
[javax.xml.bind/jaxb-api "2.4.0-b180830.0359"]
[io.forward/clojure-mail "1.0.8"]
[lambdaisland/edn-lines "1.0.10"]]
:managed-dependencies [;; explicit dependencies to get to latest versions for above
[com.fasterxml.jackson.core/jackson-core "2.12.0"]
[com.fasterxml.jackson.core/jackson-databind "2.12.0"]
[com.fasterxml.jackson.core/jackson-annotations "2.12.0"]
[com.fasterxml.jackson.dataformat/jackson-dataformat-cbor "2.12.0"]
[commons-codec "1.12"]]
:plugins [[lein-ring "0.9.7"]
[lein-cljsbuild "1.1.5"]
[lein-ancient "0.6.15"]]
:clean-targets ^{:protect false} ["resources/public/js/compiled" "target"]
@@ -140,7 +144,7 @@
[com.bhauman/rebel-readline-cljs "0.1.4" :exclusions [org.clojure/clojurescript]]
[javax.servlet/servlet-api "2.5"]]
:plugins [[lein-pdo "0.1.1"]]
:jvm-opts ["-Dconfig=config/dev.edn" "-Xms4G" "-Xmx20G" "-XX:-OmitStackTraceInFastThrow" "-Djava.library.path=/home/noti/.local/lib"]}
:jvm-opts ["-Dconfig=config/dev.edn" "-Xms4G" "-Xmx20G" "-XX:-OmitStackTraceInFastThrow"]}
:uberjar
{:java-cmd "/usr/lib/jvm/java-11-openjdk/bin/java"

View File

@@ -1,180 +1,171 @@
(ns auto-ap.datomic.sales-orders
(:require
[auto-ap.storage.parquet :as pq]
[auto-ap.time :as atime]
[clj-time.coerce :as coerce]
[auto-ap.datomic
:refer [add-sorter-fields-2
apply-pagination
apply-sort-3
conn
merge-query
pull-id
pull-many
query2
visible-clients]]
[clj-time.coerce :as c]
[clj-time.core :as time]
[clojure.set :as set]
[clojure.string :as str]
[com.brunobonacci.mulog :as mu]
[ring.util.codec :as ring-codec]))
[datomic.api :as dc]
[iol-ion.query]))
(defn- payment-methods->charges [pm-str]
(when (not-empty pm-str)
(mapv (fn [pm] {:charge/type-name pm})
(str/split pm-str #","))))
(defn <-datomic [result]
(-> result
(update :sales-order/date c/from-date)
(update :sales-order/charges (fn [cs]
(map (fn [c]
(-> c
(update :charge/processor :db/ident)
(set/rename-keys {:expected-deposit/_charges :expected-deposit})
(update :expected-deposit first)))
cs)))))
(defn <-row
"Convert a flat parquet row into the shape consumers expect."
[row]
(let [pm (:payment-methods row)]
(-> row
(set/rename-keys
{:external-id :sales-order/external-id
:location :sales-order/location
:total :sales-order/total
:tax :sales-order/tax
:tip :sales-order/tip
:discount :sales-order/discount
:service-charge :sales-order/service-charge
:vendor :sales-order/vendor
:client-code :sales-order/client-code
:date :sales-order/date
:source :sales-order/source
:reference-link :sales-order/reference-link
:payment-methods :sales-order/payment-methods
:processors :sales-order/processors
:categories :sales-order/categories})
(update :sales-order/date #(some-> % str))
(dissoc :entity-type :_seq-no)
(assoc :sales-order/charges (payment-methods->charges pm)))))
(def default-read '[:db/id
:sales-order/external-id,
:sales-order/location,
:sales-order/date,
:sales-order/total,
:sales-order/tax,
:sales-order/tip,
:sales-order/line-items,
:sales-order/discount,
:sales-order/returns,
:sales-order/service-charge,
:sales-order/vendor,
:sales-order/source,
:sales-order/reference-link,
{:sales-order/client [:client/name :db/id :client/code]
:sales-order/charges [
:charge/type-name,
:charge/total,
:charge/tax,
:charge/tip,
:charge/external-id,
:charge/note,
:charge/date,
:charge/client,
:charge/location,
:charge/reference-link,
{:charge/processor [:db/ident]} {:expected-deposit/_charges [:db/id]}]}])
(defn build-where-clause [args]
(let [clauses (keep identity
[(when-let [c (:client-code args)]
(str "external_id.client = '" c "'"))
(when-let [v (:vendor args)]
(str "external_id.vendor = '" (name v) "'"))
(when-let [l (:location args)]
(str "location = '" l "'"))])]
(when (seq clauses)
(str "WHERE " (str/join " AND " clauses)))))
(defn raw-graphql-ids [db args]
(let [visible-clients (set (map :db/id (:clients args)))
selected-clients (->> (cond
(:client-id args)
(set/intersection #{(:client-id args)}
visible-clients)
(defn build-sort-clause [args]
(let [sort (or (:sort args) "date")
order (or (:order args) "DESC")]
(str "ORDER BY " sort " " order)))
(def page-size 100)
(:client-code args)
(set/intersection #{(pull-id db [:client/code (:client-code args)])}
visible-clients)
(defn raw-graphql-ids [args]
(let [start (some-> (:start (:date-range args)) .toString)
end (some-> (:end (:date-range args)) (.substring 0 10))
limit (or (:limit args) page-size)
offset (or (:offset args) 0)]
(when start
(let [result (pq/get-sales-orders start end
{:client (:client-code args)
:vendor (:vendor args)
:location (:location args)
:sort (or (:sort args) "date")
:order "DESC"
:limit limit
:offset offset})]
{:ids (mapv #(str (:external-id %)) (:rows result))
:rows (:rows result)
:count (:count result)}))))
:else
visible-clients)
(take 10)
set)
_ (mu/log ::selected-clients
:selected-clients selected-clients)
query (cond-> {:query {:find []
:in ['$ '[?clients ?start-date ?end-date]]
:where '[[(iol-ion.query/scan-sales-orders $ ?clients ?start-date ?end-date) [[?e _ ?sort-default] ...]]]}
:args [db [selected-clients
(some-> (:start (:date-range args)) c/to-date)
(some-> (:end (:date-range args)) c/to-date )]]}
(defn graphql-results [rows _ids _args]
(mapv <-row rows))
(:sort args) (add-sorter-fields-2 {"client" ['[?e :sales-order/client ?c]
'[?c :client/name ?sort-client]]
"location" ['[?e :sales-order/location ?sort-location]]
"source" ['[?e :sales-order/source ?sort-source]]
"date" ['[?e :sales-order/date ?sort-date]]
"total" ['[?e :sales-order/total ?sort-total]]
"tax" ['[?e :sales-order/tax ?sort-tax]]
"tip" ['[?e :sales-order/tip ?sort-tip]]}
args)
(:category args)
(merge-query {:query {:in ['?category]
:where ['[?e :sales-order/line-items ?li]
'[?li :order-line-item/category ?category]]}
:args [(:category args)]})
(defn- extract-date-str [v]
(when v
(cond
(instance? org.joda.time.DateTime v) (atime/unparse-local v atime/normal-date)
(instance? org.joda.time.LocalDate v) (atime/unparse-local v atime/normal-date)
(instance? java.util.Date v) (atime/unparse-local (coerce/to-date-time v) atime/normal-date)
(instance? java.time.LocalDate v) (.toString v)
(string? v) (if (re-find #"^\d{2}/\d{2}/\d{4}" v)
(-> (java.time.LocalDate/parse v (java.time.format.DateTimeFormatter/ofPattern "MM/dd/yyyy"))
.toString)
(if (> (count v) 10) (.substring v 0 10) v))
:else (str v))))
(:processor args)
(merge-query {:query {:in ['?processor]
:where ['[?e :sales-order/charges ?chg]
'[?chg :charge/processor ?processor]]}
:args [(keyword "ccp-processor"
(name (:processor args)))]})
(:type-name args)
(merge-query {:query {:in ['?type-name]
:where ['[?e :sales-order/charges ?chg]
'[?chg :charge/type-name ?type-name]]}
:args [(:type-name args)]})
(defn- get-date [qp k]
(or (extract-date-str (get qp k))
(extract-date-str (get qp (name k)))))
(:total-gte args)
(merge-query {:query {:in ['?total-gte]
:where ['[?e :sales-order/total ?a]
'[(>= ?a ?total-gte)]]}
:args [(:total-gte args)]})
(defn- kw->str [v]
(when (some? v)
(if (keyword? v) (name v) (str v))))
(:total-lte args)
(merge-query {:query {:in ['?total-lte]
:where ['[?e :sales-order/total ?a]
'[(<= ?a ?total-lte)]]}
:args [(:total-lte args)]})
(defn- qp->opts [qp]
(let [sort-params (:sort qp)
sort-key (when (seq sort-params) (-> sort-params first :name))
sort-dir (when (seq sort-params) (-> sort-params first :dir))]
(cond-> {}
(some? (:client-code qp)) (assoc :client (kw->str (:client-code qp)))
(some? (:location qp)) (assoc :location (kw->str (:location qp)))
(not-empty (:payment-method qp)) (assoc :payment-method (:payment-method qp))
(some? (:processor qp)) (assoc :processor (kw->str (:processor qp)))
(not-empty (:category qp)) (assoc :category (:category qp))
(:total-gte qp) (assoc :total-gte (:total-gte qp))
(:total-lte qp) (assoc :total-lte (:total-lte qp))
sort-key (assoc :sort sort-key)
sort-dir (assoc :order (or sort-dir "DESC"))
true (assoc :limit (or (:per-page qp) 25)
:offset (or (:start qp) 0)))))
(:total args)
(merge-query {:query {:in ['?total]
:where ['[?e :sales-order/total ?sales-order-total]
'[(iol-ion.query/dollars= ?sales-order-total ?total)]]}
:args [(:total args)]})
(defn- default-date-range []
(let [today (.toString (java.time.LocalDate/now))
week-ago (.toString (.minusDays (java.time.LocalDate/now) 7))]
[week-ago today]))
true
(merge-query {:query {:find ['?date '?e]
:where ['[?e :sales-order/date ?date]]}}))]
(defn- qp->date-range [qp]
(let [[default-start default-end] (default-date-range)]
[(or (get-date qp :start-date)
(extract-date-str (get-in qp [:date-range :start]))
default-start)
(or (get-date qp :end-date)
(extract-date-str (get-in qp [:date-range :end]))
default-end)]))
(cond->> (query2 query)
true (apply-sort-3 (assoc args :default-asc? false))
true (apply-pagination args))))
(defn- request->client-codes [request]
(let [clients (:clients request)
codes (keep :client/code clients)]
(when (seq codes) codes)))
(defn graphql-results [ids db _]
(let [results (->> (pull-many db default-read ids)
(group-by :db/id))
payments (->> ids
(map results)
(map first)
(mapv <-datomic))]
payments))
(defn fetch-page-ssr
"Fetch sales orders from parquet for the SSR page."
[request]
(let [qp (:query-params request)
raw-qp (some-> (:query-string request)
ring-codec/form-decode
(->> (into {} (remove (fn [[_ v]] (str/blank? v))))))
[start end] (qp->date-range (merge raw-qp qp))
opts (qp->opts qp)
client-codes (request->client-codes request)
opts (if client-codes (assoc opts :client-codes client-codes) opts)
result (pq/get-sales-orders start end opts)
rows (mapv <-row (:rows result))]
{:rows rows :count (:count result)}))
(defn summarize-orders [ids]
(defn summarize-page-ssr
"Summarize all matching sales orders via parquet."
[request]
(let [qp (:query-params request)
raw-qp (some-> (:query-string request)
ring-codec/form-decode
(->> (into {} (remove (fn [[_ v]] (str/blank? v))))))
[start end] (qp->date-range (merge raw-qp qp))
opts (dissoc (qp->opts qp) :limit :offset :sort :order)
client-codes (request->client-codes request)
opts (if client-codes (assoc opts :client-codes client-codes) opts)]
(pq/get-sales-orders-summary start end opts)))
(defn summarize-orders [rows]
(when (seq rows)
(let [total (reduce + 0.0 (map #(or (:sales-order/total %) 0.0) rows))
tax (reduce + 0.0 (map #(or (:sales-order/tax %) 0.0) rows))]
{:total total
:tax tax})))
(let [[total tax] (->>
(dc/q {:find ['(sum ?t) '(sum ?tax)]
:with ['?id]
:in ['$ '[?id ...]]
:where ['[?id :sales-order/total ?t]
'[?id :sales-order/tax ?tax]]}
(dc/db conn)
ids)
first)]
{:total total
:tax tax}))
(defn get-graphql [args]
(let [{:keys [ids rows count]} (mu/trace ::get-sales-order-ids [] (raw-graphql-ids args))]
[(mu/trace ::get-results [] (graphql-results rows ids args))
count
(summarize-orders rows)]))
(let [db (dc/db conn)
{ids-to-retrieve :ids matching-count :count} (mu/trace ::get-sales-order-ids [] (raw-graphql-ids db args))]
[(->> (mu/trace ::get-results [] (graphql-results ids-to-retrieve db args)))
matching-count
(summarize-orders ids-to-retrieve)]))
(defn summarize-graphql [args]
(let [{:keys [rows]} (raw-graphql-ids args)]
(summarize-orders rows)))
(let [db (dc/db conn)
{ids-to-retrieve :ids matching-count :count} (mu/trace ::get-sales-order-ids [] (raw-graphql-ids db args))]
(summarize-orders ids-to-retrieve)))

View File

@@ -1,7 +1,6 @@
(ns auto-ap.ezcater.core
(:require
[auto-ap.datomic :refer [conn random-tempid]]
[auto-ap.storage.parquet :as parquet]
[auto-ap.datomic :refer [conn random-tempid]]
[datomic.api :as dc]
[clj-http.client :as client]
[venia.core :as v]
@@ -21,41 +20,42 @@
:body (json/write-str {"query" (v/graphql-query q)})
:as :json})
:body
:data))
:data
))
(defn get-caterers [integration]
(:caterers (query integration {:venia/queries [{:query/data
[:caterers [:name :uuid [:address [:name :street]]]]}]})))
[:caterers [:name :uuid [:address [:name :street]]]]}]} )))
(defn get-subscriptions [integration]
(->> (query integration {:venia/queries [{:query/data
[:subscribers [:id [:subscriptions [:parentId :parentEntity :eventEntity :eventKey]]]]}]})
[:subscribers [:id [:subscriptions [:parentId :parentEntity :eventEntity :eventKey]] ]]}]} )
:subscribers
first
:subscriptions))
(defn get-integrations []
(map first (dc/q '[:find (pull ?i [:ezcater-integration/api-key
:ezcater-integration/subscriber-uuid
:db/id
{:ezcater-integration/integration-status [:db/id]}])
:in $
:where [?i :ezcater-integration/api-key]]
(dc/db conn))))
:ezcater-integration/subscriber-uuid
:db/id
:ezcater-integration/integration-status [:db/id]])
:in $
:where [?i :ezcater-integration/api-key]]
(dc/db conn))))
(defn mark-integration-status [integration integration-status]
@(dc/transact conn
[{:db/id (:db/id integration)
:ezcater-integration/integration-status (assoc integration-status
:db/id (or (-> integration :ezcater-integration/integration-status :db/id)
(random-tempid)))}]))
[{:db/id (:db/id integration)
:ezcater-integration/integration-status (assoc integration-status
:db/id (or (-> integration :ezcater-integration/integration-status :db/id)
(random-tempid)))}]))
(defn upsert-caterers
([integration]
@(dc/transact
conn
(for [caterer (get-caterers integration)]
{:db/id (:db/id integration)
{:db/id (:db/id integration)
:ezcater-integration/caterers [{:ezcater-caterer/name (str (:name caterer) " (" (:street (:address caterer)) ")")
:ezcater-caterer/search-terms (str (:name caterer) " " (:street (:address caterer)))
:ezcater-caterer/uuid (:uuid caterer)}]}))))
@@ -64,14 +64,14 @@
([integration]
(let [extant (get-subscriptions integration)
to-ensure (set (map first (dc/q '[:find ?cu
:in $
:where [_ :client/ezcater-locations ?el]
[?el :ezcater-location/caterer ?c]
[?c :ezcater-caterer/uuid ?cu]]
(dc/db conn))))
:in $
:where [_ :client/ezcater-locations ?el]
[?el :ezcater-location/caterer ?c]
[?c :ezcater-caterer/uuid ?cu]]
(dc/db conn))))
to-create (set/difference
to-ensure
(set (map :parentId extant)))]
to-ensure
(set (map :parentId extant)))]
(doseq [parentId to-create]
(query integration
{:venia/operation {:operation/type :mutation
@@ -94,6 +94,7 @@
:eventKey 'cancelled}}
[[:subscription [:parentId :parentEntity :eventEntity :eventKey]]]]]})))))
#_{:clj-kondo/ignore [:clojure-lsp/unused-public-var]}
(defn upsert-ezcater
([] (upsert-ezcater (get-integrations)))
@@ -114,11 +115,12 @@
(defn get-caterer [caterer-uuid]
(dc/pull (dc/db conn)
'[:ezcater-caterer/name
{:ezcater-integration/_caterers [:ezcater-integration/api-key]}
{:ezcater-location/_caterer [:ezcater-location/location
{:client/_ezcater-locations [:client/code]}]}]
[:ezcater-caterer/uuid caterer-uuid]))
'[:ezcater-caterer/name
{:ezcater-integration/_caterers [:ezcater-integration/api-key]}
{:ezcater-location/_caterer [:ezcater-location/location
{:client/_ezcater-locations [:client/code]}]}]
[:ezcater-caterer/uuid caterer-uuid]))
(defn round-carry-cents [f]
(with-precision 2 (double (.setScale (bigdec f) 2 java.math.RoundingMode/HALF_UP))))
@@ -133,159 +135,126 @@
0.15M
:else
0.07M)]
(round-carry-cents
(* commision%
0.01M
(+
(-> order :totals :subTotal :subunits)
(reduce +
0
(map (comp :subunits :cost) (:feesAndDiscounts (:catererCart order)))))))))
(round-carry-cents
(* commision%
0.01M
(+
(-> order :totals :subTotal :subunits )
(reduce +
0
(map (comp :subunits :cost) (:feesAndDiscounts (:catererCart order)))))))))
(defn ccp-fee [order]
(round-carry-cents
(* 0.000299M
(+
(-> order :totals :subTotal :subunits)
(-> order :totals :salesTax :subunits)
(reduce +
0
(map (comp :subunits :cost) (:feesAndDiscounts (:catererCart order))))))))
(round-carry-cents
(* 0.000299M
(+
(-> order :totals :subTotal :subunits )
(-> order :totals :salesTax :subunits )
(reduce +
0
(map (comp :subunits :cost) (:feesAndDiscounts (:catererCart order))))))))
(defn order->sales-order [{{:keys [timestamp]} :event {:keys [orderItems]} :catererCart :keys [client-code client-location uuid] :as order}]
(let [adjustment (round-carry-cents (- (+ (-> order :totals :subTotal :subunits (* 0.01))
(-> order :totals :salesTax :subunits (* 0.01)))
(-> order :catererCart :totals :catererTotalDue)
(-> order :catererCart :totals :catererTotalDue )
(commision order)
(ccp-fee order)))
service-charge (+ (commision order) (ccp-fee order))
tax (-> order :totals :salesTax :subunits (* 0.01))
tip (-> order :totals :tip :subunits (* 0.01))]
#:sales-order
{:date (atime/localize (coerce/to-date-time timestamp))
:external-id (str "ezcater/order/" client-code "-" client-location "-" uuid)
:client [:client/code client-code]
:location client-location
:reference-link (str (url/url "https://ezmanage.ezcater.com/orders/" uuid))
:line-items [#:order-line-item
{:external-id (str "ezcater/order/" client-code "-" client-location "-" uuid "-" 0)
:item-name "EZCater Catering"
:category "EZCater Catering"
:discount adjustment
:tax tax
:total (+ (-> order :totals :subTotal :subunits (* 0.01))
tax
tip)}]
:charges [#:charge
{:type-name "CARD"
:date (atime/localize (coerce/to-date-time timestamp))
:client [:client/code client-code]
:location client-location
:external-id (str "ezcater/charge/" uuid)
:processor :ccp-processor/ezcater
:total (+ (-> order :totals :subTotal :subunits (* 0.01))
tax
tip)
:tip tip}]
{:date (atime/localize (coerce/to-date-time timestamp))
:external-id (str "ezcater/order/" client-code "-" client-location "-" uuid)
:client [:client/code client-code]
:location client-location
:reference-link (str (url/url "https://ezmanage.ezcater.com/orders/" uuid ))
:line-items [#:order-line-item
{:external-id (str "ezcater/order/" client-code "-" client-location "-" uuid "-" 0)
:item-name "EZCater Catering"
:category "EZCater Catering"
:discount adjustment
:tax tax
:total (+ (-> order :totals :subTotal :subunits (* 0.01))
tax
tip)}]
:charges [#:charge
{:type-name "CARD"
:date (atime/localize (coerce/to-date-time timestamp))
:client [:client/code client-code]
:location client-location
:external-id (str "ezcater/charge/" uuid)
:processor :ccp-processor/ezcater
:total (+ (-> order :totals :subTotal :subunits (* 0.01))
tax
tip)
:tip tip}]
:total (+ (-> order :totals :subTotal :subunits (* 0.01))
tax
tip)
:discount adjustment
:service-charge service-charge
:tax tax
:tip tip
:returns 0.0
:vendor :vendor/ccp-ezcater}))
:total (+ (-> order :totals :subTotal :subunits (* 0.01))
tax
tip)
:discount adjustment
:service-charge service-charge
:tax tax
:tip tip
:returns 0.0
:vendor :vendor/ccp-ezcater}))
(defn- flatten-order-to-parquet! [order]
"Flatten a sales-order into entity-type tagged maps and buffer to parquet."
(let [so-ext-id (:sales-order/external-id order)
so-date (some-> (:sales-order/date order) .toString)
client (:sales-order/client order)
client-code (if (map? client) (:client/code client) client)]
(parquet/buffer! "sales-order"
{:entity-type "sales-order"
:external-id so-ext-id
:client-code client-code
:location (:sales-order/location order)
:vendor (:sales-order/vendor order)
:total (:sales-order/total order)
:tax (:sales-order/tax order)
:tip (:sales-order/tip order)
:discount (:sales-order/discount order)
:service-charge (:sales-order/service-charge order)
:date so-date})
(when-let [charges (:sales-order/charges order)]
(doseq [chg charges]
(parquet/buffer! "charge"
{:entity-type "charge"
:external-id (:charge/external-id chg)
:type-name (:charge/type-name chg)
:total (:charge/total chg)
:tax (:charge/tax chg)
:tip (:charge/tip chg)
:date so-date
:processor (some-> (:charge/processor chg) name)
:sales-order-external-id so-ext-id})))
(when-let [items (:sales-order/line-items order)]
(doseq [li items]
(parquet/buffer! "line-item"
{:entity-type "line-item"
:item-name (:order-line-item/item-name li)
:category (:order-line-item/category li)
:total (:order-line-item/total li)
:tax (:order-line-item/tax li)
:discount (:order-line-item/discount li)
:sales-order-external-id so-ext-id})))))
(defn get-by-id [integration id]
(query
integration
{:venia/queries [[:order {:id id}
[:uuid
:orderNumber
:orderSourceType
[:caterer
[:name
:uuid
[:address [:street]]]]
[:event
[:timestamp
:catererHandoffFoodTime
:orderType]]
[:catererCart [[:orderItems
[:name
:quantity
:posItemId
[:totalInSubunits
[:currency
:subunits]]]]
[:totals
[:catererTotalDue]]
[:feesAndDiscounts
{:type 'DELIVERY_FEE}
[[:cost
[:currency
:subunits]]]]]]
[:totals [[:customerTotalDue
[:currency
:subunits]]
[:pointOfSaleIntegrationFee
[:currency
:subunits]]
[:tip
[:currency
:subunits]]
[:salesTax
[:currency
:subunits]]
[:salesTaxRemittance
[:currency
:subunits]]
[:subTotal
[:currency
:subunits]]]]]]]}))
(query
integration
{:venia/queries [[:order {:id id}
[:uuid
:orderNumber
:orderSourceType
[:caterer
[:name
:uuid
[:address [:street]]]]
[:event
[:timestamp
:catererHandoffFoodTime
:orderType]]
[:catererCart [[:orderItems
[:name
:quantity
:posItemId
[:totalInSubunits
[:currency
:subunits]]]]
[:totals
[:catererTotalDue]]
[:feesAndDiscounts
{:type 'DELIVERY_FEE}
[[:cost
[:currency
:subunits]]]]]]
[:totals [[:customerTotalDue
[
:currency
:subunits
]]
[:pointOfSaleIntegrationFee
[
:currency
:subunits
]]
[:tip
[:currency
:subunits]]
[:salesTax
[
:currency
:subunits
]]
[:salesTaxRemittance
[:currency
:subunits
]]
[:subTotal
[:currency
:subunits]]]]]]]}))
(defn lookup-order [json]
(let [caterer (get-caterer (get json "parent_id"))
@@ -293,31 +262,26 @@
client (-> caterer :ezcater-location/_caterer first :client/_ezcater-locations :client/code)
location (-> caterer :ezcater-location/_caterer first :ezcater-location/location)]
(if (and client location)
(doto
(-> (get-by-id integration (get json "entity_id"))
(:order)
(assoc :client-code client
:client-location location))
(doto
(-> (get-by-id integration (get json "entity_id"))
(:order)
(assoc :client-code client
:client-location location))
(#(alog/info ::order-details :detail %)))
(alog/warn ::caterer-no-longer-has-location :json json))))
(defn import-order [json]
;; {"id" "bf3dcf5c-a68f-42d9-9084-049133e03d3d", "parent_type" "Caterer", "parent_id" "91541331-d7ae-4634-9e8b-ccbbcfb2ce70", "entity_type" "Order", "entity_id" "9ab05fee-a9c5-483b-a7f2-14debde4b7a8", "key" "accepted", "occurred_at" "2022-07-21T19:21:07.549Z"}
(alog/info
::try-import-order
:json json)
(when-let [order (some-> json
(lookup-order)
(order->sales-order)
(update :sales-order/date coerce/to-date)
(update-in [:sales-order/charges 0 :charge/date] coerce/to-date))]
(try
(flatten-order-to-parquet! order)
(alog/info ::order-buffered
:external-id (:sales-order/external-id order))
(catch Exception e
(alog/error ::buffer-failed
:exception e
:order (:sales-order/external-id order))))))
::try-import-order
:json json)
@(dc/transact conn (filter identity
[(some-> json
(lookup-order)
(order->sales-order)
(update :sales-order/date coerce/to-date)
(update-in [:sales-order/charges 0 :charge/date] coerce/to-date))])))
(defn upsert-recent []
(upsert-ezcater)
(let [last-sunday (coerce/to-date (time/plus (second (->> (time/today)
@@ -348,30 +312,30 @@
"key" "accepted",
"occurred_at" "2022-07-21T19:21:07.549Z"}
ezcater-order (lookup-order lookup-map)
extant-order (dc/pull (dc/db conn '[:sales-order/total]
:sales-order/tax
:sales-order/tip
:sales-order/discount
:sales-order/external-id
{:sales-order/charges [:charge/tax
:charge/tip
:charge/total
:charge/external-id]
:sales-order/line-items [:order-line-item/external-id
:order-line-item/total
:order-line-item/tax
:order-line-item/discount]})
[:sales-order/external-id order])
extant-order (dc/pull (dc/db conn) '[:sales-order/total
:sales-order/tax
:sales-order/tip
:sales-order/discount
:sales-order/external-id
{:sales-order/charges [:charge/tax
:charge/tip
:charge/total
:charge/external-id]
:sales-order/line-items [:order-line-item/external-id
:order-line-item/total
:order-line-item/tax
:order-line-item/discount]}]
[:sales-order/external-id order])
updated-order (-> (order->sales-order ezcater-order)
(select-keys
#{:sales-order/total
:sales-order/tax
:sales-order/tip
:sales-order/discount
:sales-order/charges
:sales-order/external-id
:sales-order/line-items})
#{:sales-order/total
:sales-order/tax
:sales-order/tip
:sales-order/discount
:sales-order/charges
:sales-order/external-id
:sales-order/line-items})
(update :sales-order/line-items
(fn [c]
(map #(select-keys % #{:order-line-item/external-id

View File

@@ -3,7 +3,6 @@
[auto-ap.jobs.core :refer [execute]]
[auto-ap.logging :as alog]
[auto-ap.time :as atime]
[auto-ap.storage.parquet :as pq]
[clj-time.coerce :as c]
[clj-time.core :as time]
[clj-time.periodic :as per]
@@ -40,14 +39,17 @@
(dc/db conn)
number)))
(defn delete-all []
@(dc/transact-async conn
(->>
(dc/q '[:find ?ss
:where [?ss :sales-summary/date]]
(dc/db conn))
(map (fn [[ss]]
[:db/retractEntity ss])))))
(->>
(dc/q '[:find ?ss
:where [?ss :sales-summary/date]]
(dc/db conn))
(map (fn [[ ss]]
[:db/retractEntity ss])))))
(defn dirty-sales-summaries [c]
(let [client-id (dc/entid (dc/db conn) c)]
@@ -96,86 +98,101 @@
"card refunds" 41400
"food app refunds" 41400})
(defn- get-payment-items-parquet [c date]
(let [date-str (.toString date)]
(when-let [rows (seq (pq/query-deduped "charge" date-str date-str))]
(let [client-code (if (map? c) (:client/code c) c)
filtered (filter #(= client-code (:client-code %)) rows)]
(reduce
(fn [acc {:keys [processor type-name total]}]
(update acc
(cond
(= type-name "CARD") "Card Payments"
(= type-name "CASH") "Cash Payments"
(#{"SQUARE_GIFT_CARD" "WALLET" "GIFT_CARD"} type-name) "Gift Card Payments"
(#{"doordash" "grubhub" "uber-eats"} processor) "Food App Payments"
:else "Unknown")
(fnil + 0.0)
(or total 0.0)))
{}
filtered)))))
(defn get-payment-items [c date]
(->>
(dc/q '[:find ?processor ?type-name (sum ?total)
:with ?c
:in $ [?clients ?start-date ?end-date]
:where [(iol-ion.query/scan-sales-orders $ ?clients ?start-date ?end-date) [[?e _ ?sort-default] ...]]
[?e :sales-order/charges ?c]
[?c :charge/type-name ?type-name]
(or-join [?c ?processor]
(and [?c :charge/processor ?p]
[?p :db/ident ?processor])
(and
(not [?c :charge/processor])
[(ground :ccp-processor/na) ?processor]))
[?c :charge/total ?total]]
(dc/db conn)
[[c] date date])
(reduce
(fn [acc [processor type-name total]]
(update
acc
(cond (= type-name "CARD")
"Card Payments"
(= type-name "CASH")
"Cash Payments"
(#{"SQUARE_GIFT_CARD" "WALLET" "GIFT_CARD"} type-name)
"Gift Card Payments"
(#{:ccp-processor/toast
#_:ccp-processor/ezcater
#_:ccp-processor/koala
:ccp-processor/doordash
:ccp-processor/grubhub
:ccp-processor/uber-eats} processor)
"Food App Payments"
:else
"Unknown")
(fnil + 0.0)
total))
{})
(map (fn [[k v]]
{:db/id (str (java.util.UUID/randomUUID))
:sales-summary-item/sort-order 0
:sales-summary-item/category k
:ledger-mapped/amount (if (= "Card Payments" k)
(- v (get-fee c date))
v)
:ledger-mapped/ledger-side :ledger-side/debit}))))
(defn- get-discounts-parquet [c date]
(let [client-code (if (map? c) (:client/code c) c)
date-str (.toString date)
discount (auto-ap.storage.sales-summaries/sum-discounts client-code date-str date-str)]
(when (and discount (pos? discount))
{:db/id (str (java.util.UUID/randomUUID))
:sales-summary-item/sort-order 1
:sales-summary-item/category "Discounts"
:ledger-mapped/amount discount
:ledger-mapped/ledger-side :ledger-side/debit})))
(defn- get-refund-items-parquet [c date]
(let [client-code (if (map? c) (:client/code c) c)
date-str (.toString date)
refunds (auto-ap.storage.sales-summaries/sum-refunds-by-type client-code date-str date-str)]
(when (seq refunds)
(map (fn [[type-name total]]
{:db/id (str (java.util.UUID/randomUUID))
:sales-summary-item/sort-order 3
:sales-summary-item/category (cond
(= type-name "CARD") "Card Refunds"
(= type-name "CASH") "Cash Refunds"
:else "Food App Refunds")
:ledger-mapped/amount total
:ledger-mapped/ledger-side :ledger-side/credit})
refunds))))
(defn- get-tax-parquet [c date]
(let [client-code (if (map? c) (:client/code c) c)
date-str (.toString date)
tax (auto-ap.storage.sales-summaries/sum-taxes client-code date-str date-str)]
(defn get-discounts [c date]
(when-let [discount (ffirst (dc/q '[:find (sum ?discount)
:with ?e
:in $ [?clients ?start-date ?end-date]
:where [(iol-ion.query/scan-sales-orders $ ?clients ?start-date ?end-date) [[?e _ ?sort-default] ...]]
[?e :sales-order/discount ?discount]]
(dc/db conn)
[[c] date date]))]
{:db/id (str (java.util.UUID/randomUUID))
:sales-summary-item/category "Tax"
:sales-summary-item/sort-order 1
:ledger-mapped/ledger-side :ledger-side/credit
:ledger-mapped/amount (or tax 0.0)}))
:sales-summary-item/category "Discounts"
:ledger-mapped/amount discount
:ledger-mapped/ledger-side :ledger-side/debit}))
(defn get-refund-items [c date]
(->>
(dc/q '[:find ?type-name (sum ?t)
:with ?e
:in $ [?clients ?start-date ?end-date]
:where
:where [(iol-ion.query/scan-sales-refunds $ ?clients ?start-date ?end-date) [[?e _ ?sort-default] ...]]
[?e :sales-refund/type ?type-name]
[?e :sales-refund/total ?t]]
(dc/db conn)
[[c] date date])
(reduce
(fn [acc [type-name total]]
(update
acc
(cond (= type-name "CARD")
"Card Refunds"
(= type-name "CASH")
"Cash Refunds"
:else
"Food App Refunds")
(fnil + 0.0)
total))
{})
(map (fn [[k v]]
{:db/id (str (java.util.UUID/randomUUID))
:sales-summary-item/sort-order 3
:sales-summary-item/category k
:ledger-mapped/amount v
:ledger-mapped/ledger-side :ledger-side/credit}))))
(defn- get-tip-parquet [c date]
(let [client-code (if (map? c) (:client/code c) c)
date-str (.toString date)
tip (auto-ap.storage.sales-summaries/sum-tips client-code date-str date-str)]
{:ledger-mapped/ledger-side :ledger-side/credit
:sales-summary-item/sort-order 2
:db/id (str (java.util.UUID/randomUUID))
:sales-summary-item/category "Tip"
:ledger-mapped/amount (or tip 0.0)}))
(defn- get-sales-parquet [c date]
(let [client-code (if (map? c) (:client/code c) c)
date-str (.toString date)
sales (auto-ap.storage.sales-summaries/sum-sales-by-category client-code date-str date-str)]
(for [{:keys [category total tax discount]} sales]
{:db/id (str (java.util.UUID/randomUUID))
:sales-summary-item/category (or category "Unknown")
:sales-summary-item/sort-order 0
:sales-summary-item/total total
:sales-summary-item/net (- (+ total discount) tax)
:sales-summary-item/tax tax
:sales-summary-item/discount discount
:ledger-mapped/ledger-side :ledger-side/credit
:ledger-mapped/amount (- (+ total discount) tax)})))
(defn get-fees [c date]
(when-let [fee (get-fee c date)]
@@ -276,17 +293,19 @@
:sales-summary/items
(->>
(get-sales-parquet c date)
(concat (get-payment-items-parquet c date))
(concat (get-refund-items-parquet c date))
(cons (get-discounts-parquet c date))
(get-sales c date)
(concat (get-payment-items c date))
(concat (get-refund-items c date))
(cons (get-discounts c date))
(cons (get-fees c date))
(cons (get-tax-parquet c date))
(cons (get-tip-parquet c date))
(cons (get-tax c date))
(cons (get-tip c date))
(cons (get-returns c date))
(filter identity)
(map (fn [z]
(assoc z :ledger-mapped/account (some-> z :sales-summary-item/category str/lower-case name->number lookup-account)
:sales-summary-item/manual? false))))}]
:sales-summary-item/manual? false))
)) }]
(if (seq (:sales-summary/items result))
(do
(alog/info ::upserting-summaries
@@ -294,11 +313,12 @@
@(dc/transact conn [[:upsert-entity result]]))
@(dc/transact conn [{:db/id id :sales-summary/dirty false}]))))))
(comment
;; TODO: Move to test file or proper location
(let [c (auto-ap.datomic/pull-attr (dc/db @conn) :db/id [:client/code "NGCL"])
date #inst "2024-04-14T00:00:00-07:00"]
(get-payment-items c date)))
(let [c (auto-ap.datomic/pull-attr (dc/db conn) :db/id [:client/code "NGCL" ])
date #inst "2024-04-14T00:00:00-07:00"]
(get-payment-items c date)
)
(defn reset-summaries []
@(dc/transact conn (->> (dc/q '[:find ?sos
@@ -308,13 +328,16 @@
(map (fn [[sos]]
[:db/retractEntity sos])))))
(comment
(auto-ap.datomic/transact-schema conn)
@(dc/transact conn [{:db/ident :sales-summary/total-unknown-processor-payments
:db/noHistory true,
:db/valueType :db.type/double
:db/cardinality :db.cardinality/one}])
:db/noHistory true,
:db/valueType :db.type/double
:db/cardinality :db.cardinality/one}])
(apply mark-dirty [:client/code "NGCL"] (last-n-days 30))
@@ -333,7 +356,7 @@
[?sos :sales-summary/date ?d]
[(= ?d #inst "2024-04-10T00:00:00-07:00")]]
(dc/db conn))
(dc/q '[:find ?n ?p2 (sum ?total)
:with ?c
:in $ [?clients ?start-date ?end-date]
@@ -346,18 +369,23 @@
(dc/db conn)
[[(auto-ap.datomic/pull-attr (dc/db conn) :db/id [:client/code "NGHW"])] #inst "2024-04-11T00:00:00-07:00" #inst "2024-04-11T00:00:00-07:00"])
(dc/q '[:find ?n
(dc/q '[:find ?n
:in $ [?clients ?start-date ?end-date]
:where [(iol-ion.query/scan-sales-orders $ ?clients ?start-date ?end-date) [[?e _ ?sort-default] ...]]
[?e :sales-order/line-items ?li]
[?li :order-line-item/item-name ?n]]
[?li :order-line-item/item-name ?n] ]
(dc/db conn)
[[(auto-ap.datomic/pull-attr (dc/db conn) :db/id [:client/code "NGCL"])] #inst "2024-04-11T00:00:00-07:00" #inst "2024-04-24T00:00:00-07:00"])
@(dc/transact conn [{:db/id :sales-summary/total-tax :db/ident :sales-summary/total-tax-legacy}
{:db/id :sales-summary/total-tip :db/ident :sales-summary/total-tip-legacy}])
(auto-ap.datomic/transact-schema conn)
)
@(dc/transact conn [{:db/id :sales-summary/total-tax :db/ident :sales-summary/total-tax-legacy}
{:db/id :sales-summary/total-tip :db/ident :sales-summary/total-tip-legacy}])
(auto-ap.datomic/transact-schema conn))
(defn -main [& _]
(execute "sales-summaries" sales-summaries-v2))

View File

@@ -1,220 +0,0 @@
(ns auto-ap.migration.cleanup-sales
(:require [auto-ap.datomic :refer [conn]]
[auto-ap.storage.parquet :as pq]
[amazonica.aws.s3 :as s3]
[datomic.api :as d-api]
[clojure.string :as str]))
(def ^:private BATCH-SIZE 1000)
(def ^:private DRY-RUN? true)
(defn- set-dry-run! [v]
(alter-var-root #'DRY-RUN? (constantly v)))
; -- query helpers
(defn- query-sales-order-ids
"Return all entity IDs that have :sales-order/external-id."
[db]
(->> (d-api/q '[:find ?e
:where [?e :sales-order/external-id]]
db)
(map first)))
(defn- collect-child-ids
"Gather child entity IDs for a batch of sales orders. Returns map with
keys :orders, :charges, :line-items, :refunds — each a vector of
entity IDs eligible for retraction."
[db order-ids]
(let [order-set (set order-ids)
charges (->> (d-api/q '[:find ?c
:in $ [?o ...]
:where [$ ?o :sales-order/charges ?c]]
db order-set)
(map second))
refunds (->> (d-api/q '[:find ?r
:in $ [?o ...]
:where [$ ?o :sales-order/refunds ?r]]
db order-set)
(map second))
line-items (->> (d-api/q '[:find ?li
:in $ [?c ...]
:where [$ ?c :charge/line-items ?li]]
db charges)
(map second))]
{:orders order-ids
:charges (vec charges)
:line-items (vec line-items)
:refunds (vec refunds)}))
; -- transaction batching
(defn- batch-transact
"Issue [:db/retractEntity ...] transactions in batches of BATCH-SIZE.
conn$ is a Datomic connection object.
entity-ids should be a seq of Long entity IDs."
[conn entity-ids]
(let [batches (partition-all BATCH-SIZE entity-ids)
_ (doseq [[idx batch] (map-indexed vector batches)]
(let [n (count batch)
txes (map (fn [eid]
[:db/retractEntity eid])
batch)]
(println " batch" idx ":" n "retracts")
(when-not DRY-RUN?
@(d-api/transact conn txes))))]
:done))
(defn- retract-all-child-ids!
"Retract orders, charges, line-items and refunds from all entity-ID
maps produced by collect-child-ids. Logs progress every batch."
[conn child-entity-map]
(doseq [[type id-seq] child-entity-map]
(when (seq id-seq)
(println "retracting" type ":" (count id-seq) "ids")
(batch-transact conn id-seq))))
; -- month grouping
(defn- group-orders-by-month
"Group sales order entity IDs by [year month] extracted from
:sales-order/day-value. Returns map {{y m} [eid ...]}."
[db order-ids]
(reduce (fn [acc eid]
(when-let [day-val (:sales-order/day-value
(d-api/entity db eid))]
(let [[y m _] (str/split (str day-val) #"-")
k [(Integer/parseInt y)
(Integer/parseInt m)]]
(update acc k conj eid))))
{}
order-ids))
; -- S3 verification (uses amazonica + parquet module)
(def ENTITY-TYPES ["sales-order" "charge"
"line-item" "sales-refund"])
(defn- s3-keys-for-date
"Build S3 parquet keys for all entity types on a given date."
[date-str]
(mapv #(pq/parquet-key % date-str) ENTITY-TYPES))
(defn- days-in-month
"Return seq of YYYY-MM-DD strings for all days in [year month]."
[year month]
(let [start (java.time.LocalDate/of year month 1)
first-of-next (.plusMonths start 1)
diff (.toEpochDay first-of-next)
start-day (.toEpochDay start)]
(for [d (range start-day diff)]
(.toString (java.time.LocalDate/ofEpochDay d)))))
(defn- object-exists?
"Check if an S3 object exists via head-object (no download)."
[key]
(try
(s3/get-object {:bucket-name pq/*bucket*
:key key}
{:request-method :head})
true
(catch com.amazonaws.services.s3.model.AmazonS3Exception _
false)))
(defn- verify-month-in-s3?
"Check that every day in [year month] has at least one backing
Parquet file on S3 across all entity types.
Returns a map {:ok bool :missing vec-of-dates}."
[year month]
(let [dates (days-in-month year month)]
(loop [[d & rest] dates
result []]
(if-not d
{:ok (empty? result)
:missing result}
(let [keys (s3-keys-for-date d)
found? (some object-exists? keys)]
(recur rest
(if found?
result
(conj result d))))))))
; -- public API: delete-by-month
(defn- delete-by-month [conn client-entid year month]
"Retract all sales entities for a specific year+month.
Returns :ok on success, :skipped if S3 verification failed."
(println "=== deleting" year "-" month
"dry-run? =" DRY-RUN?)
(let [db (d-api/db conn)
all-ids (query-sales-order-ids db)
group (group-orders-by-month db all-ids)
target-keys (get group [year month] [])]
(if (zero? (count target-keys))
(do (println " no orders found for" year "-" month)
:skipped)
(do
(let [child-maps (collect-child-ids db target-keys)
total-ids (->> child-maps vals
(reduce into [])
distinct
count)]
(println " " total-ids "total entities to retract")
(when-not DRY-RUN?
(retract-all-child-ids! conn child-maps)))
:ok))))
; -- public API: cleanup-all
(defn cleanup-all []
"Remove ALL sales-order, charge, line-item, sales-refund from
Datomic. Uses d-api/transact to issue [:db/retractEntity ...] for
each entity. Iterates over every month found in DB."
(let [db (d-api/db conn)
all-ids (query-sales-order-ids db)
group (group-orders-by-month db all-ids)
months (sort (keys group))]
(println "found" (count months) "months of data")
(doseq [[y m] months]
(delete-by-month conn nil y m))
(println "cleanup-all complete")))
; -- public API: safe-cleanup-all
(defn- collect-all-months [conn]
"Return sorted vec of [year month] pairs with sales orders in DB."
(let [db (d-api/db conn)
all-ids (query-sales-order-ids db)
grouped (group-orders-by-month db all-ids)]
(sort (keys grouped))))
(defn safe-cleanup-all []
"Same as cleanup-all but verifies S3 data exists first.
Before deleting a month's entities, checks that parquet files
exist in auto-ap.storage.parquet bucket under prefix 'sales-details'."
(let [conn$ conn
months (collect-all-months conn)]
(println "=== safe-cleanup-all"
"months:" (count months)
"dry-run? =" DRY-RUN?)
(doseq [[y m] months]
(when-not DRY-RUN?
(let [result (verify-month-in-s3? y m)
missing (:missing result)]
(cond
(:ok result)
(do (println "verified" y "-" m "S3 OK, deleting...")
(delete-by-month conn$ nil y m))
(> (count missing) 0)
(do (println "ERROR" y "-" m "missing in S3:"
(str/join ", " missing))
(throw
(ex-info
"Missing S3 data — aborting!"
{:year y :month m
:missing missing})))
:else
(println "SKIPPING" y "-" m "no parquet files")))))
(println "safe-cleanup-all complete")))

View File

@@ -1,230 +0,0 @@
(ns auto-ap.migration.sales-to-parquet
"Migrate historical sales data from Datomic to Parquet + S3.
Groups records by business date and writes daily partitions.
Dead-letter records (missing dates) are written separately.
Usage:
(migrate-all) ; full migration earliest → latest
(write-day-by-day \"2024-01-01\" \"2024-03-31\") ; date range
(write-dead-letter [flat]) ; write orphaned records"
(:require [auto-ap.datomic :refer [conn]]
[auto-ap.storage.parquet :as p]
[clojure.string :as str]
[datomic.api :as dc]))
(defn- fetch-all-sales-order-ids []
"Query Datomic for all sales-order external-ids (as entity IDs).
Returns a vector of entitity ids."
(->> (dc/q '[:find ?e
:where [?e :sales-order/external-id _]]
(dc/db conn))
(map first)
vec))
(def ^:private sales-order-read
'[:sales-order/external-id
:sales-order/date
{:sales-order/client [:client/code :client/name]}
:sales-order/location
{:sales-order/vendor [:vendor/name]}
:sales-order/total
:sales-order/tax
:sales-order/tip
:sales-order/discount
:sales-order/service-charge
:sales-order/source
:sales-order/reference-link
{:sales-order/charges
[:charge/external-id
:charge/type-name
:charge/total
:charge/tax
:charge/tip
:charge/date
{:charge/processor [:db/ident]}
:charge/returns
{:charge/client [:client/code]}]}
{:sales-order/line-items
[:order-line-item/item-name
:order-line-item/category
:order-line-item/total
:order-line-item/tax
:order-line-item/discount
:order-line-item/unit-price
:order-line-item/quantity
:order-line-item/note]}])
(defn- pull-sales-order-data [eids]
"Batch pull full sales-order entities plus nested children."
(if (empty? eids)
[]
(dc/pull-many (dc/db conn)
sales-order-read
eids)))
(defn- flatten-order-to-pieces! [order date-str flat]
"Flatten a pulled sales-order into :entity-type tagged maps.
Appends to the existing flat vector, which is returned."
(let [so-ext-id (:sales-order/external-id order)
so-date date-str
client-code (get-in order [:sales-order/client :client/code])
vendor-name (get-in order [:sales-order/vendor :vendor/name])
charges (:sales-order/charges order)
items (:sales-order/line-items order)
payment-methods (->> charges (map :charge/type-name) distinct (str/join ","))
processors (->> charges (map #(get-in % [:charge/processor :db/ident])) (remove nil?) distinct (map name) (str/join ","))
categories (->> items (map :order-line-item/category) (remove nil?) distinct (str/join ","))]
(vswap! flat conj
{:entity-type "sales-order"
:external-id (str so-ext-id)
:client-code client-code
:location (:sales-order/location order)
:vendor vendor-name
:total (:sales-order/total order)
:tax (:sales-order/tax order)
:tip (:sales-order/tip order)
:discount (:sales-order/discount order)
:service-charge (:sales-order/service-charge order)
:date so-date
:source (:sales-order/source order)
:reference-link (:sales-order/reference-link order)
:payment-methods payment-methods
:processors processors
:categories categories})
(when-let [charges (:sales-order/charges order)]
(doseq [chg charges]
(vswap! flat conj
{:entity-type "charge"
:external-id (str (get chg :charge/external-id))
:type-name (get chg :charge/type-name)
:total (get chg :charge/total)
:tax (get chg :charge/tax)
:tip (get chg :charge/tip)
:date so-date
:processor (get-in chg [:charge/processor :db/ident])
:sales-order-external-id (str so-ext-id)})
(when-let [returns (:charge/returns chg)]
(doseq [rt returns]
(vswap! flat conj
{:entity-type "sales-refund"
:type-name (get rt :type-name)
:total (get rt :total)
:sales-order-external-id (str so-ext-id)})))))
(when-let [items (:sales-order/line-items order)]
(doseq [li items]
(vswap! flat conj
{:entity-type "line-item"
:item-name (get li :order-line-item/item-name)
:category (get li :order-line-item/category)
:total (get li :order-line-item/total)
:tax (get li :order-line-item/tax)
:discount (get li :order-line-item/discount)
:sales-order-external-id (str so-ext-id)})))))
(defn -fetch-order-ids-for-date
"Query Datomic for all sales-order eids on a given business date."
[db date-str]
(let [ld (java.time.LocalDate/parse date-str)
start (-> ld (.atStartOfDay (java.time.ZoneId/of "America/Los_Angeles")) .toInstant java.util.Date/from)
end (-> ld (.plusDays 1) (.atStartOfDay (java.time.ZoneId/of "America/Los_Angeles")) .toInstant java.util.Date/from)]
(->> (dc/q '[:find ?e
:in $ ?start ?end
:where [?e :sales-order/date ?d]
[(>= ?d ?start)]
[(< ?d ?end)]]
db start end)
(map first)
vec)))
(defn write-day-by-day
([start-date end-date]
(write-day-by-day start-date end-date {}))
([start-date end-date opts]
(let [all-dates (set (or (opts :date-set) []))
date-range (if (empty? all-dates)
(p/date-seq start-date end-date)
(filter all-dates
(p/date-seq start-date end-date)))
batch-size (or (opts :batch-size) 100)]
(doseq [^String day date-range]
(println "[migration] processing" day)
(let [eids (-fetch-order-ids-for-date (dc/db conn) day)
batches (partition-all batch-size eids)]
(doseq [batch batches]
(let [orders (pull-sales-order-data batch)
flat (volatile! [])]
(doseq [o orders]
(flatten-order-to-pieces! o day flat))
(doseq [r @flat]
(p/buffer! (:entity-type r) r)))))
(doseq [etype ["sales-order" "charge"
"line-item" "sales-refund"]]
(p/flush-to-parquet! etype day))
(println "[migration]" day "complete"))
{:status :completed :total-days (count date-range)})))
(defn- write-dead-letter
([flat]
(write-dead-letter "dead" flat))
([prefix flat]
"Write records with missing dates to a parquet file."
(let [dead (filter #(nil? (:date %)) flat)]
(when (seq dead)
(doseq [r dead]
(p/buffer!
(str prefix "-" (:entity-type r))
r))))))
(defn- flush-all-types []
"Flush all entity-type buffers, tracking counts."
(let [etypes ["sales-order" "charge"
"line-item" "sales-refund"]
today (.toString (java.time.LocalDate/now))
start (p/total-buf-count)]
(doseq [et etypes]
(try
(p/flush-to-parquet! et today)
(catch Exception e
(println "[migration/flush]" et "error:" (.getMessage e)))))
{:records-flush (- (p/total-buf-count) start)}))
(defn- get-date-range []
"Get the earliest and latest business dates from Datomic."
(let [dates (->> (dc/q '[:find ?d
:where [_ :sales-order/date ?d]]
(dc/db conn))
(map first)
distinct
sort)]
[(when (seq dates) (.toString (first dates)))
(when (seq dates) (.toString (last dates)))]))
(defn migrate-all []
"Full migration from earliest to latest date: load unflushed,
fetch / buffer / flush day by day. Write dead-records for
sales orders with missing dates."
(println "[migration] starting full migration...")
(p/load-unflushed!)
(let [order-ids (fetch-all-sales-order-ids)
start-date (first (get-date-range))
end-date (second (get-date-range))]
(if-not (seq order-ids)
(do
(println "[migration] no orders found")
:no-orders)
(try
;; pull & buffer any orders missing a business date
(doseq [o (pull-sales-order-data order-ids)
:when (not (:sales-order/date o))]
(let [flat (volatile! [])]
(flatten-order-to-pieces! o "unknown" flat)
(doseq [r @flat]
(p/buffer! "dead" r))))
(write-day-by-day start-date end-date {:batch-size 100})
(flush-all-types)
(println "[migration] done")
:ok
(catch Exception e
(println "[migration/error]" (.getMessage e))
e)))))

View File

@@ -1,6 +1,6 @@
(ns auto-ap.routes.ezcater-xls
(:require
[auto-ap.datomic :refer [conn]]
[auto-ap.datomic :refer [audit-transact conn]]
[auto-ap.logging :as alog]
[clojure.data.json :as json]
[auto-ap.parse.excel :as excel]
@@ -12,7 +12,6 @@
[auto-ap.ssr.ui :refer [base-page]]
[auto-ap.ssr.utils :refer [html-response]]
[auto-ap.time :as atime]
[auto-ap.storage.parquet :as parquet]
[bidi.bidi :as bidi]
[clj-time.coerce :as coerce]
[clojure.java.io :as io]
@@ -55,95 +54,54 @@
event-date (some-> (excel/xls-date->date event-date)
coerce/to-date-time
atime/as-local-time
coerce/to-date)]
(cond (and event-date client-id location)
coerce/to-date )]
(cond (and event-date client-id location )
[:order #:sales-order
{:date event-date
:external-id (str "ezcater/order/" client-id "-" location "-" order-number)
:client client-id
:location location
:reference-link (str order-number)
:line-items [#:order-line-item
{:external-id (str "ezcater/order/" client-id "-" location "-" order-number "-" 0)
:item-name "EZCater Catering"
:category "EZCater Catering"
:discount (fmt-amount (or adjustments 0.0))
:tax (fmt-amount tax)
:total (fmt-amount (+ food-total
tax))}]
{:date event-date
:external-id (str "ezcater/order/" client-id "-" location "-" order-number)
:client client-id
:location location
:reference-link (str order-number)
:line-items [#:order-line-item
{:external-id (str "ezcater/order/" client-id "-" location "-" order-number "-" 0)
:item-name "EZCater Catering"
:category "EZCater Catering"
:discount (fmt-amount (or adjustments 0.0))
:tax (fmt-amount tax)
:total (fmt-amount (+ food-total
tax))}]
:charges [#:charge
{:type-name "CARD"
:date event-date
:client client-id
:location location
:external-id (str "ezcater/charge/" client-id "-" location "-" order-number "-" 0)
:processor :ccp-processor/ezcater
:total (fmt-amount (+ food-total
tax
tip))
:tip (fmt-amount tip)}]
:total (fmt-amount (+ food-total
tax
(or adjustments 0.0)))
:discount (fmt-amount (or adjustments 0.0))
:service-charge (fmt-amount (+ fee commission))
:tax (fmt-amount tax)
:tip (fmt-amount tip)
:returns 0.0
:vendor :vendor/ccp-ezcater}]
:charges [#:charge
{:type-name "CARD"
:date event-date
:client client-id
:location location
:external-id (str "ezcater/charge/" client-id "-" location "-" order-number "-" 0)
:processor :ccp-processor/ezcater
:total (fmt-amount (+ food-total
tax
tip))
:tip (fmt-amount tip)}]
:total (fmt-amount (+ food-total
tax
(or adjustments 0.0)))
:discount (fmt-amount (or adjustments 0.0))
:service-charge (fmt-amount (+ fee commission))
:tax (fmt-amount tax)
:tip (fmt-amount tip)
:returns 0.0
:vendor :vendor/ccp-ezcater}]
caterer-name
(do
(alog/warn ::missing-client
:order order-number
:store-name store-name
:caterer-name caterer-name)
[:missing caterer-name])
caterer-name
(do
(alog/warn ::missing-client
:order order-number
:store-name store-name
:caterer-name caterer-name)
[:missing caterer-name])
:else
nil)))
(defn- flatten-order-to-parquet! [order]
"Flatten a sales-order into entity-type tagged maps and buffer to parquet."
(let [so-ext-id (:sales-order/external-id order)
so-date (some-> (:sales-order/date order) .toString)
client (:sales-order/client order)
client-code (if (map? client) (:client/code client) client)]
(parquet/buffer! "sales-order"
{:entity-type "sales-order"
:external-id so-ext-id
:client-code client-code
:location (:sales-order/location order)
:vendor (:sales-order/vendor order)
:total (:sales-order/total order)
:tax (:sales-order/tax order)
:tip (:sales-order/tip order)
:discount (:sales-order/discount order)
:service-charge (:sales-order/service-charge order)
:date so-date})
(when-let [charges (:sales-order/charges order)]
(doseq [chg charges]
(parquet/buffer! "charge"
{:entity-type "charge"
:external-id (:charge/external-id chg)
:type-name (:charge/type-name chg)
:total (:charge/total chg)
:tax (:charge/tax chg)
:tip (:charge/tip chg)
:date so-date
:processor (some-> (:charge/processor chg) name)
:sales-order-external-id so-ext-id})))
(when-let [items (:sales-order/line-items order)]
(doseq [li items]
(parquet/buffer! "line-item"
{:entity-type "line-item"
:item-name (:order-line-item/item-name li)
:category (:order-line-item/category li)
:total (:order-line-item/total li)
:tax (:order-line-item/tax li)
:discount (:order-line-item/discount li)
:sales-order-external-id so-ext-id})))))
:else
nil)))
(defn stream->sales-orders [s]
(let [clients (map first (dc/q '[:find (pull ?c [:client/code
@@ -158,7 +116,7 @@
object (str "/ezcater-xls/" (str (java.util.UUID/randomUUID)))]
(mu/log ::writing-temp-xls
:location object)
(s3/put-object {:bucket-name (:data-bucket env)
(s3/put-object {:bucket-name (:data-bucket env)
:key object
:input-stream s})
(into []
@@ -200,13 +158,13 @@
});")]])])
(defn upload-xls [{:keys [identity] :as request}]
(let [file (or (get (:params request) :file)
(get (:params request) "file"))]
(mu/log ::uploading-file
:file file)
(with-open [s (io/input-stream (:tempfile file))]
(try
(try
(let [parse-results (stream->sales-orders s)
new-orders (->> parse-results
(filter (comp #{:order} first))
@@ -214,20 +172,9 @@
missing-location (->> parse-results
(filter (comp #{:missing} first))
(map last))
buffered-count (loop [orders new-orders
count 0]
(if-let [o (first orders)]
(do
(try
(flatten-order-to-parquet! o)
(catch Exception e
(alog/error ::buffer-failed
:exception e
:order (:sales-order/external-id o))))
(recur (rest orders) (inc count)))
count))]
(html-response [:div (format "Successfully imported %d orders." buffered-count)
(map last))]
(audit-transact new-orders identity)
(html-response [:div (format "Successfully imported %d orders." (count new-orders))
(when (seq missing-location)
[:div "Missing the following locations"
[:ul.ul
@@ -235,7 +182,7 @@
[:li ml])]])]))
(catch Exception e
(alog/error ::import-error
:error e)
:error e)
(html-response [:div (.getMessage e)]))))))
(defn page [{:keys [matched-route request-method] :as request}]

View File

@@ -3,7 +3,6 @@
[auto-ap.datomic :refer [conn remove-nils]]
[auto-ap.logging :as log :refer [capture-context->lc with-context-as]]
[auto-ap.time :as atime]
[auto-ap.storage.parquet :as parquet]
[cemerick.url :as url]
[clj-http.client :as client]
[clj-time.coerce :as coerce]
@@ -28,9 +27,11 @@
"Authorization" (str "Bearer " (:client/square-auth-token client))
"Content-Type" "application/json"}))
(defn ->square-date [d]
(f/unparse (f/formatter "YYYY-MM-dd'T'HH:mm:ssZZ") d))
(def manifold-api-stream
(let [stream (s/stream 100)]
(->> stream
@@ -41,10 +42,10 @@
(de/loop [attempt 0]
(-> (de/chain (de/future-with (ex/execute-pool)
#_(log/info ::request-started
:url (:url request)
:attempt attempt
:source "Square 3"
:background-job "Square 3")
:url (:url request)
:attempt attempt
:source "Square 3"
:background-job "Square 3")
(try
(client/request (assoc request
:socket-timeout 10000
@@ -103,6 +104,7 @@
:exception error))
[]))))
(def item-cache (atom {}))
(defn fetch-catalog [client i v]
@@ -122,11 +124,13 @@
#(do (swap! item-cache assoc i %)
%))))
(defn fetch-catalog-cache [client i version]
(if (get @item-cache i)
(de/success-deferred (get @item-cache i))
(fetch-catalog client i version)))
(defn item->category-name-impl [client item version]
(capture-context->lc
(cond (:item_id (:item_variation_data item))
@@ -157,6 +161,7 @@
:item item)
"Uncategorized"))))
(defn item-id->category-name [client i version]
(capture-context->lc
(-> [client i]
@@ -221,6 +226,7 @@
(concat (:orders result) continued-results))))
(:orders result)))))))
(defn search
([client location start end]
(capture-context->lc
@@ -244,9 +250,11 @@
(concat (:orders result) continued-results))))
(:orders result))))))))
(defn amount->money [amt]
(* 0.01 (or (:amount amt) 0.0)))
;; to get totals:
(comment
(reduce
@@ -272,7 +280,7 @@
:reference-link (str (url/url "https://squareup.com/receipt/preview" (:id t)))
:external-id (when (:id t)
(str "square/charge/" (:id t)))
:processor (cond
:processor (cond
(#{"OTHER" "THIRD_PARTY_CARD"} (:type t))
(condp = (some-> (:note t) str/lower-case)
"doordash" :ccp-processor/doordash
@@ -285,9 +293,7 @@
(condp = (:name (:source order))
"GRUBHUB" :ccp-processor/grubhub
"UBEREATS" :ccp-processor/uber-eats
"Uber Eats" :ccp-processor/uber-eats
"DOORDASH" :ccp-processor/doordash
"DoorDash" :ccp-processor/doordash
"Koala" :ccp-processor/koala
"koala-production" :ccp-processor/koala
:ccp-processor/na))
@@ -343,10 +349,7 @@
(s/reduce conj []))]
[(remove-nils
#:sales-order
{:date (if (= "Invoices" (:name (:source order)))
(when (:closed_at order)
(coerce/to-date (time/to-time-zone (coerce/to-date-time (:closed_at order)) (time/time-zone-for-id "America/Los_Angeles"))))
(coerce/to-date (time/to-time-zone (coerce/to-date-time (:created_at order)) (time/time-zone-for-id "America/Los_Angeles"))))
{:date (coerce/to-date (time/to-time-zone (coerce/to-date-time (:created_at order)) (time/time-zone-for-id "America/Los_Angeles")))
:client (:db/id client)
:location (:square-location/client-location location)
:external-id (str "square/order/" (:client/code client) "-" (:square-location/client-location location) "-" (:id order))
@@ -376,9 +379,6 @@
;; sometimes orders stay open in square. At least one payment
;; is needed to import, in order to avoid importing orders in-progress.
(and
(if (= "Invoices" (:name (:source order)))
(boolean (:closed_at order))
true)
(or (> (count (:tenders order)) 0)
(seq (:returns order)))
(or (= #{} (set (map #(:status (:card_details %)) (:tenders order))))
@@ -407,6 +407,7 @@
:client client
:location location)))))))
(defn get-payment [client p]
(de/chain (manifold-api-call
{:url (str "https://connect.squareup.com/v2/payments/" p)
@@ -415,6 +416,7 @@
:body
:payment))
(defn continue-payout-entry-list [c l poi cursor]
(capture-context->lc lc
(de/chain
@@ -592,57 +594,6 @@
(s/buffer 5)
(s/realize-each)
(s/reduce conj []))))))
(defn- flatten-order-to-parquet! [order]
"Flatten a sales-order into entity-type tagged maps and buffer to parquet.
Returns the sales-order external-id for logging."
(let [so-ext-id (:sales-order/external-id order)
so-date (some-> (:sales-order/date order) .toString)
client (:sales-order/client order)
client-code (when client (if (map? client)
(:client/code client)
client))]
(parquet/buffer! "sales-order"
{:entity-type "sales-order"
:external-id so-ext-id
:client-code (or client-code (:db/id client))
:location (:sales-order/location order)
:vendor (:sales-order/vendor order)
:total (:sales-order/total order)
:tax (:sales-order/tax order)
:tip (:sales-order/tip order)
:discount (:sales-order/discount order)
:service-charge (:sales-order/service-charge order)
:date so-date})
(when-let [charges (:sales-order/charges order)]
(doseq [chg charges]
(parquet/buffer! "charge"
{:entity-type "charge"
:external-id (:charge/external-id chg)
:type-name (:charge/type-name chg)
:total (:charge/total chg)
:tax (:charge/tax chg)
:tip (:charge/tip chg)
:date so-date
:processor (some-> (:charge/processor chg) name)
:sales-order-external-id so-ext-id})
(when-let [returns (:charge/returns chg)]
(doseq [rt returns]
(parquet/buffer! "sales-refund"
{:entity-type "sales-refund"
:type-name (:type-name rt)
:total (:total rt)
:sales-order-external-id so-ext-id})))))
(when-let [items (:sales-order/line-items order)]
(doseq [li items]
(parquet/buffer! "line-item"
{:entity-type "line-item"
:item-name (:order-line-item/item-name li)
:category (:order-line-item/category li)
:total (:order-line-item/total li)
:tax (:order-line-item/tax li)
:discount (:order-line-item/discount li)
:sales-order-external-id so-ext-id})))))
(defn upsert
([client]
(apply de/zip
@@ -657,13 +608,8 @@
(doseq [x (partition-all 100 results)]
(log/info ::loading-orders
:count (count x))
(doseq [order x]
(try
(flatten-order-to-parquet! order)
(catch Exception e
(log/error ::buffer-failed
:exception e
:order (:sales-order/external-id order))))))))))))
@(dc/transact-async conn x))))))))
(defn upsert-payouts
([client]
@@ -713,6 +659,7 @@
(log/info ::done-loading-refunds)))))))
(defn get-cash-shift [client id]
(de/chain (manifold-api-call {:url (str (url/url "https://connect.squareup.com/v2/cash-drawers/shifts" id))
:method :get
@@ -871,6 +818,8 @@
d1
d2))
(defn remove-voided-orders
([client]
(apply de/zip
@@ -897,7 +846,7 @@
(:sales-order/external-id o))))))
(s/map (fn [[o]]
[[:db/retractEntity [:sales-order/external-id (:sales-order/external-id o)]]]))
(s/reduce into [])))
(fn [results]
@@ -905,28 +854,28 @@
(doseq [x (partition-all 100 results)]
(log/info ::removing-orders
:count (count x))
@(dc/transact-async conn x)
@(dc/transact-async conn x)))))
(de/catch (fn [e]
(log/warn ::couldnt-remove :error e)
nil)))))))))))
nil) ))))))
#_(comment
(require 'auto-ap.time-reader)
#_(comment
(require 'auto-ap.time-reader)
@(let [[c [l]] (get-square-client-and-location "DBFS")]
(log/peek :x [c l])
(search c l #clj-time/date-time "2026-03-28" #clj-time/date-time "2026-03-29"))
@(let [[c [l]] (get-square-client-and-location "NGAK")]
(log/peek :x [c l])
@(let [[c [l]] (get-square-client-and-location "NGAK") ]
(log/peek :x [ c l])
(remove-voided-orders c l #clj-time/date-time "2024-04-11" #clj-time/date-time "2024-04-15"))
(doseq [c (get-square-clients)]
(try
@(remove-voided-orders c)
(catch Exception e
nil)))
)
(remove-voided-orders c l #clj-time/date-time "2024-04-11" #clj-time/date-time "2024-04-15"))
(doseq [c (get-square-clients)]
(try
@(remove-voided-orders c)
(catch Exception e
nil)))
)
(defn upsert-all [& clients]
(capture-context->lc
@@ -995,6 +944,8 @@
[:clients clients]
@(apply upsert-all clients)))
(comment
(defn refunds-raw-cont
([client l cursor so-far]
@@ -1021,18 +972,19 @@
:headers (client-base-headers client)
:as :json})
:body)))
(->>
@(let [[c [l]] (get-square-client-and-location "NGGG")]
(->>
@(let [[c [l]] (get-square-client-and-location "NGGG")]
(search c l (time/now) (time/plus (time/now) (time/days -1))))
(filter (fn [r]
(str/starts-with? (:created_at r) "2024-03-14"))))
(search c l (time/plus (time/now))))
(filter (fn [r]
(str/starts-with? (:created_at r) "2024-03-14"))))
(def refs
(->>
@(let [[c [l]] (get-square-client-and-location "NGGG")]
(refunds-raw-cont c l nil []))
(filter (fn [r]
(str/starts-with? (:created_at r) "2024-03-14")))))
@@ -1043,31 +995,36 @@
(map (fn [r] @(get-payment c (:payment_id r))) refs))
(get-square-client-and-location "NGGB")
(get-square-client-and-location "NGGB")
(def my-results
(let [[c [l]] (get-square-client-and-location "NGFA")]))
(clojure.data.csv/write-csv *out*
(for [c (get-square-clients)
l (:client/square-locations c)
:when (:square-location/client-location l)
bad-row (try (->> @(search c l (coerce/to-date-time #inst "2024-04-01T00:00:00-07:00") (coerce/to-date-time #inst "2024-04-15T23:59:00-07:00"))
(filter #(not (should-import-order? %)))
(map #(first (deref (order->sales-order c l %))))
(filter (fn already-exists [o]
(when (:sales-order/external-id o)
(seq (dc/q '[:find ?i
:in $ ?ei
:where [?i :sales-order/external-id ?ei]]
(dc/db conn)
(:sales-order/external-id o)))))))
(catch Exception e
[]))]
[(:client/code c) (atime/unparse-local (clj-time.coerce/to-date-time (:sales-order/date bad-row)) atime/normal-date) (:sales-order/total bad-row) (:sales-order/tax bad-row) (:sales-order/tip bad-row) (:db/id bad-row)])
:separator \tab)
(for [c (get-square-clients)
l (:client/square-locations c)
:when (:square-location/client-location l)
bad-row (try (->> @(search c l (coerce/to-date-time #inst "2024-04-01T00:00:00-07:00") (coerce/to-date-time #inst "2024-04-15T23:59:00-07:00"))
(filter #(not (should-import-order? %)))
(map #(first (deref (order->sales-order c l %))))
(filter (fn already-exists [o]
(when (:sales-order/external-id o)
(seq (dc/q '[:find ?i
:in $ ?ei
:where [?i :sales-order/external-id ?ei]]
(dc/db conn)
(:sales-order/external-id o)))))))
(catch Exception e
[]))]
[(:client/code c) (atime/unparse-local (clj-time.coerce/to-date-time (:sales-order/date bad-row)) atime/normal-date) (:sales-order/total bad-row) (:sales-order/tax bad-row) (:sales-order/tip bad-row) (:db/id bad-row)])
:separator \tab)
;; =>
;; =>
(require 'auto-ap.time-reader)
@@ -1076,40 +1033,35 @@
(clojure.pprint/pprint (let [[c [l]] (get-square-client-and-location "NGVT")]
l
(def z @(search c l #clj-time/date-time "2025-02-23T00:00:00-08:00"
#clj-time/date-time "2025-02-28T00:00:00-08:00"))
(take 10 (map #(first (deref (order->sales-order c l %))) z))))
(->> z
(def z @(search c l #clj-time/date-time "2025-02-23T00:00:00-08:00"
#clj-time/date-time "2025-02-28T00:00:00-08:00"))
(take 10 (map #(first (deref (order->sales-order c l %))) z)))
)
(->> z
(filter (fn [o]
(seq (filter (comp #{"OTHER"} :type) (:tenders o)))))
(filter #(not (:name (:source %))))
(count))
(doseq [[code] (seq (dc/q '[:find ?code
:in $
:where [?o :sales-order/date ?d]
[(>= ?d #inst "2026-01-01")]
[?o :sales-order/source "Invoices"]
[?o :sales-order/client ?c]
[?c :client/code ?code]]
(dc/db conn)))
:let [[c [l]] (get-square-client-and-location code)]
order @(search c l #clj-time/date-time "2026-01-01T00:00:00-08:00" (time/now))
:when (= "Invoices" (:name (:source order)))
:let [[sales-order] @(order->sales-order c l order)]]
(when (should-import-order? order)
(println "DATE IS" (:sales-order/date sales-order))
(when (some-> (:sales-order/date sales-order) coerce/to-date-time (time/after? #clj-time/date-time "2026-2-16T00:00:00-08:00"))
(println "WOULD UPDATE" sales-order)
@(dc/transact auto-ap.datomic/conn [sales-order]))
#_@(dc/transact)
(println "DONE")))
(count)
)
(doseq [c (get-square-clients)]
(println "Upserting" (:client/name c))
@(upsert c))
#_(filter (comp #{"OTHER"} :type) (mapcat :tenders z))
@(let [[c [l]] (get-square-client-and-location "NGRY")]
#_(search c l (clj-time.coerce/from-date #inst "2025-02-28") (clj-time.coerce/from-date #inst "2025-03-01"))
(let [[c [l]] (get-square-client-and-location "LFHH")]
(search c l (clj-time.coerce/from-date #inst "2025-02-28") (clj-time.coerce/from-date #inst "2025-03-01"))
(order->sales-order c l (:order (get-order c l "KdvwntmfMNTKBu8NOocbxatOs18YY")))))
(:order (get-order c l "CLjQqkzVfGa82o5hEFUrGtUGO6QZY" ))
)
)

View File

@@ -1,13 +1,11 @@
(ns auto-ap.ssr.components.date-range
(:require [auto-ap.ssr.components :as com]
[auto-ap.ssr.components.buttons :as but]
[auto-ap.ssr.svg :as svg]
[auto-ap.time :as atime]
[clj-time.coerce :as c]
[clj-time.core :as t]
[clj-time.periodic :as per]))
(defn date-range-field [{:keys [value id apply-button?]}]
(defn date-range-field [{:keys [value id]}]
[:div {:id id}
(com/field {:label "Date Range"}
[:div.space-y-4
@@ -23,17 +21,11 @@
(atime/unparse-local atime/normal-date))
:placeholder "Date"
:size :small
:class "shrink date-filter-input"})
:class "shrink"})
(com/date-input {:name "end-date"
:value (some-> (:end value)
(atime/unparse-local atime/normal-date))
:placeholder "Date"
:size :small
:class "shrink date-filter-input"})
(when apply-button?
(but/button- {:color :secondary
:size :small
:type "button"
"x-on:click" "$dispatch('datesApplied')"}
"Apply"))]])])
:class "shrink"})]])])

View File

@@ -37,7 +37,7 @@
[auto-ap.ssr.invoice.common :refer [default-read]]
[auto-ap.ssr.invoice.import :as invoice-import]
[auto-ap.ssr.invoice.new-invoice-wizard :as new-invoice-wizard :refer [location-select*]]
[auto-ap.ssr.components.date-range :as dr]
[auto-ap.ssr.pos.common :refer [date-range-field*]]
[auto-ap.ssr.svg :as svg]
[auto-ap.ssr.utils
:refer [apply-middleware-to-all-handlers assert-schema
@@ -77,7 +77,7 @@
[:div {:id "exact-match-id-tag"}]))
(defn filters [request]
[:form#invoice-filters {"hx-trigger" "datesApplied, change delay:500ms from:.filter-trigger, keyup changed from:.hot-filter delay:1000ms"
[:form#invoice-filters {"hx-trigger" "change delay:500ms, keyup changed from:.hot-filter delay:1000ms"
"hx-get" (bidi/path-for ssr-routes/only-routes
::route/table)
"hx-target" "#entity-table"
@@ -92,8 +92,7 @@
:url (bidi/path-for ssr-routes/only-routes :vendor-search)
:value (:vendor (:query-params request))
:value-fn :db/id
:content-fn :vendor/name
:class "filter-trigger"}))
:content-fn :vendor/name}))
(com/field {:label "Account"}
(com/typeahead {:name "account"
:id "account"
@@ -101,12 +100,8 @@
:value (:account (:query-params request))
:value-fn :db/id
:content-fn #(:account/name (d-accounts/clientize (dc/pull (dc/db conn) d-accounts/default-read (:db/id %))
(:db/id (:client request))))
:class "filter-trigger"}))
(dr/date-range-field {:value {:start (:start-date (:query-params request))
:end (:end-date (:query-params request))}
:id "date-range"
:apply-button? true})
(:db/id (:client request))))}))
(date-range-field* request)
(com/field {:label "Check #"}
(com/text-input {:name "check-number"
:id "check-number"
@@ -491,10 +486,7 @@
:fetch-page fetch-page
:oob-render
(fn [request]
[(assoc-in (dr/date-range-field {:value {:start (:start-date (:query-params request))
:end (:end-date (:query-params request))}
:id "date-range"
:apply-button? true}) [1 :hx-swap-oob] true)
[(assoc-in (date-range-field* request) [1 :hx-swap-oob] true)
(assoc-in (exact-match-id* request) [1 :hx-swap-oob] true)])
:query-schema query-schema
:parse-query-params (fn [p]

View File

@@ -81,7 +81,7 @@
data (into []
(for [client-id client-ids
d date
[client-id account-id location debits credits balance count] (iol-ion.query/detailed-account-snapshot (dc/db conn) client-id (coerce/to-date d))
[client-id account-id location debits credits balance count] (iol-ion.query/detailed-account-snapshot (dc/db conn) client-id (coerce/to-date (time/plus d (time/days 1))))
:let [account ((or (lookup-account client-id) {}) account-id)]]
{:client-id client-id
:account-id account-id

View File

@@ -104,18 +104,19 @@
:size :small})])
(com/field {:label "Payment Type"}
(com/radio-card {:size :small
:name "payment-type"
:value (:payment-type (:query-params request))
:options [{:value ""
:content "All"}
{:value "cash"
:content "Cash"}
{:value "check"
:content "Check"}
{:value "debit"
:content "Debit"}]}))
:name "payment-type"
:value (:payment-type (:query-params request))
:options [{:value ""
:content "All"}
{:value "cash"
:content "Cash"}
{:value "check"
:content "Check"}
{:value "debit"
:content "Debit"}]}))
(exact-match-id* request)]])
(def default-read '[*
[:payment/date :xform clj-time.coerce/from-date]
{:invoice-payment/_payment [* {:invoice-payment/invoice [*]}]}
@@ -211,6 +212,7 @@
'[(iol-ion.query/dollars= ?transaction-amount ?amount)]]}
:args [(:amount query-params)]})
(:status route-params)
(merge-query {:query {:in ['?status]
:where ['[?e :payment/status ?status]]}
@@ -241,30 +243,30 @@
refunds))
(defn sum-visible-pending [ids]
(->>
(dc/q {:find ['?id '?o]
:in ['$ '[?id ...]]
:where ['[?id :payment/amount ?o]
'[?id :payment/status :payment-status/pending]]}
(dc/db conn)
ids)
(->>
(dc/q {:find ['?id '?o]
:in ['$ '[?id ...]]
:where ['[?id :payment/amount ?o]
'[?id :payment/status :payment-status/pending]]}
(dc/db conn)
ids)
(map last)
(reduce
+
0.0)))
(defn sum-client-pending [clients]
(->>
(dc/q {:find '[?e ?a]
:in '[$ [?clients ?start ?end]]
:where '[[(iol-ion.query/scan-payments $ ?clients ?start ?end) [[?e _ ?sort-default] ...]]
[?e :payment/status :payment-status/pending]
[?e :payment/amount ?a]]}
(dc/db conn)
[clients
nil
nil])
(->>
(dc/q {:find '[?e ?a]
:in '[$ [?clients ?start ?end]]
:where '[[(iol-ion.query/scan-payments $ ?clients ?start ?end) [[?e _ ?sort-default] ...]]
[?e :payment/status :payment-status/pending]
[?e :payment/amount ?a]]}
(dc/db conn)
[clients
nil
nil])
(map last)
(reduce
+
@@ -275,14 +277,16 @@
{ids-to-retrieve :ids matching-count :count
all-ids :all-ids} (fetch-ids db request)]
[(->> (hydrate-results ids-to-retrieve db request))
matching-count
(sum-visible-pending all-ids)
(sum-client-pending (extract-client-ids (:clients request)
(:client request)
(:client-id (:query-params request))
(when (:client-code (:query-params request))
[:client/code (:client-code (:query-params request))])))]))
(:client request)
(:client-id (:query-params request))
(when (:client-code (:query-params request))
[:client/code (:client-code (:query-params request))])))
]))
(def query-schema (mc/schema
[:maybe [:map {:date-range [:date-range :start-date :end-date]}
@@ -323,7 +327,7 @@
(assoc-in (exact-match-id* request) [1 :hx-swap-oob] true)])
:query-schema query-schema
:action-buttons (fn [request]
(let [[_ _ visible-in-float total-in-float] (:page-results request)]
(let [[_ _ visible-in-float total-in-float ] (:page-results request)]
[(com/pill {:color :primary} " Visible in float "
(format "$%,.2f" visible-in-float))
(com/pill {:color :secondary} " Total in float "
@@ -350,7 +354,7 @@
(= (-> request :query-params :sort first :name) "Bank account")
(-> entity :payment/bank-account :bank-account/name)
:else nil))
:title (fn [r]
(str
@@ -405,7 +409,7 @@
:render (fn [{:payment/keys [date]}]
(some-> date (atime/unparse-local atime/normal-date)))}
{:key "amount"
:sort-key "amount"
:sort-key "amount"
:name "Amount"
:render (fn [{:payment/keys [amount]}]
(some->> amount (format "$%.2f")))}
@@ -417,10 +421,10 @@
(map :invoice-payment/invoice)
(filter identity)
(map (fn [invoice]
{:link (hu/url (bidi/path-for ssr-routes/only-routes
::invoice-route/all-page)
{:exact-match-id (:db/id invoice)})
:content (str "Inv. " (:invoice/invoice-number invoice))})))
{:link (hu/url (bidi/path-for ssr-routes/only-routes
::invoice-route/all-page)
{:exact-match-id (:db/id invoice)})
:content (str "Inv. " (:invoice/invoice-number invoice))})))
(some-> p :transaction/_payment ((fn [t]
[{:link (hu/url (bidi/path-for client-routes/routes
:transactions)
@@ -430,6 +434,8 @@
(def row* (partial helper/row* grid-page))
(comment
(mc/decode query-schema {"exact-match-id" "123"} (mt/transformer main-transformer mt/strip-extra-keys-transformer))
(mc/decode query-schema {} (mt/transformer main-transformer mt/strip-extra-keys-transformer))
@@ -439,6 +445,7 @@
(mc/decode query-schema {"payment-type" "food"} (mt/transformer main-transformer mt/strip-extra-keys-transformer))
(mc/decode query-schema {"vendor" "87"} (mt/transformer main-transformer mt/strip-extra-keys-transformer))
(mc/decode query-schema {"start-date" #inst "2023-12-21T08:00:00.000-00:00"} (mt/transformer main-transformer mt/strip-extra-keys-transformer)))
(defn delete [{check :entity :as request identity :identity}]
@@ -452,7 +459,7 @@
#(assert-can-see-client identity (:db/id (:payment/client check))))
(notify-if-locked (:db/id (:payment/client check))
(:payment/date check))
(let [removing-payments (mapcat (fn [x]
(let [ removing-payments (mapcat (fn [x]
(let [invoice (:invoice-payment/invoice x)
new-balance (+ (:invoice/outstanding-balance invoice)
(:invoice-payment/amount x))]
@@ -468,9 +475,9 @@
:payment/status :payment-status/voided}]
(audit-transact (cond-> removing-payments
true (conj updated-payment)
(:transaction/_payment check) (conj [:db/retract (:db/id (first (:transaction/_payment check)))
(:transaction/_payment check) (conj [:db/retract (:db/id (first (:transaction/_payment check)))
:transaction/payment
(:db/id check)]))
(:db/id check)]))
identity)
(html-response (row* (:identity request) updated-payment {:delete-after-settle? true :class "live-removed"
@@ -571,6 +578,7 @@
(assoc-in [:query-params :start] 0)
(assoc-in [:query-params :per-page] 250))))
:else
selected)
updated-count (void-payments-internal ids (:identity request))]
@@ -583,7 +591,7 @@
(defn wrap-status-from-source [handler]
(fn [{:keys [matched-current-page-route] :as request}]
(let [request (cond-> request
(let [ request (cond-> request
(= ::route/cleared-page matched-current-page-route) (assoc-in [:route-params :status] :payment-status/cleared)
(= ::route/pending-page matched-current-page-route) (assoc-in [:route-params :status] :payment-status/pending)
(= ::route/voided-page matched-current-page-route) (assoc-in [:route-params :status] :payment-status/voided)
@@ -597,7 +605,7 @@
::route/pending-page (-> (helper/page-route grid-page)
(wrap-implied-route-param :status :payment-status/pending))
::route/voided-page (-> (helper/page-route grid-page)
(wrap-implied-route-param :status :payment-status/voided))
(wrap-implied-route-param :status :payment-status/voided))
::route/all-page (-> (helper/page-route grid-page)
(wrap-implied-route-param :status nil))
@@ -610,6 +618,7 @@
::route/bulk-delete (-> bulk-delete-dialog
(wrap-admin))
::route/table (helper/table-route grid-page)}
(fn [h]
(-> h

View File

@@ -1,7 +1,7 @@
(ns auto-ap.ssr.pos.sales-orders
(:require
[auto-ap.datomic
:refer [add-sorter-fields apply-pagination apply-sort-3 merge-query
:refer [add-sorter-fields apply-pagination apply-sort-3 conn merge-query
pull-many query2]]
[auto-ap.datomic.sales-orders :as d-sales]
[auto-ap.query-params :as query-params :refer [wrap-copy-qp-pqp]]
@@ -17,6 +17,7 @@
[auto-ap.time :as atime]
[bidi.bidi :as bidi]
[clj-time.coerce :as c]
[datomic.api :as dc]
[malli.core :as mc]))
(def query-schema (mc/schema
@@ -171,8 +172,11 @@
charges))
(defn fetch-page [request]
(let [{:keys [rows count]} (d-sales/fetch-page-ssr request)]
[rows count]))
(let [db (dc/db conn)
{ids-to-retrieve :ids matching-count :count} (fetch-ids db request)]
[(->> (hydrate-results ids-to-retrieve db request))
matching-count]))
(def grid-page
@@ -196,13 +200,13 @@
:title "Sales orders"
:entity-name "Sales orders"
:route :pos-sales-table
:action-buttons (fn [request]
(let [{:keys [total tax]} (d-sales/summarize-page-ssr request)]
(when (and total tax)
[(com/pill {:color :primary}
(format "Total $%.2f" total))
(com/pill {:color :secondary}
(format "Tax $%.2f" tax))])))
:action-buttons (fn [request]
(let [{:keys [total tax]} (d-sales/summarize-orders (:ids (fetch-ids (dc/db conn) request)))]
(when (and total tax)
[(com/pill {:color :primary}
(format "Total $%.2f" total))
(com/pill {:color :secondary}
(format "Tax $%.2f" tax))])))
:row-buttons (fn [_ e]
(when (:sales-order/reference-link e)
[(com/a-icon-button {:href (:sales-order/reference-link e)}

View File

@@ -1,432 +0,0 @@
(ns auto-ap.storage.parquet
(:require [config.core :refer [env]]
[amazonica.aws.s3 :as s3]
[clojure.java.io :as io]
[clojure.string :as str]
[clojure.data.json :as json]
[clojure.core.cache :as cache]
[com.brunobonacci.mulog :as mu])
(:import (java.sql DriverManager)
(java.time LocalDate)))
(def ^:dynamic *bucket* (:data-bucket env))
(def parquet-prefix "sales-details")
(defn s3-location [filename]
(str "s3://" *bucket* "/" filename))
(defn parquet-key [entity-type date-str]
(let [month-str (if (= (count date-str) 10)
(subs date-str 0 7)
date-str)]
(str parquet-prefix "/" entity-type "/" month-str ".parquet")))
(def db (atom nil))
(defn connect! []
(let [conn (DriverManager/getConnection "jdbc:duckdb:")
stmt (.createStatement conn)]
(.execute stmt "INSTALL httpfs; LOAD httpfs;")
(when-let [key (:aws-access-key-id env)]
(.execute stmt (str "SET s3_access_key_id='" key "'"))
(.execute stmt (str "SET s3_secret_access_key='" (:aws-secret-access-key env) "'"))
(.execute stmt (str "SET s3_region='" (or (:aws-region env) "us-east-1") "'")))
(.execute stmt "PRAGMA enable_object_cache")
(.execute stmt "SET temp_directory='/tmp/duckdb-temp'")
(.execute stmt "SET memory_limit='2GB'")
(.close stmt)
(.addShutdownHook (Runtime/getRuntime)
(Thread. #(when-let [c @db] (.close ^java.sql.Connection c))))
(reset! db conn)))
(defn disconnect! []
(locking db
(when-let [c @db]
(.close c)
(reset! db nil))))
(defmacro with-duckdb
[& body]
`(let [conn# (or @db (connect!))]
(try
(let [~'conn conn#]
~@body)
(finally
(when (and (not @db) conn#)
(.close conn#))))))
(defn execute! [sql]
(with-duckdb
(let [stmt (.createStatement conn)]
(.execute stmt sql)
nil)))
(defn- sql-snippet [sql] (subs sql 0 (min (count sql) 500)))
(defn query-scalar [sql]
(mu/trace ::query-scalar
[:sql (sql-snippet sql)]
(with-duckdb
(let [stmt (.createStatement conn)
rs (.executeQuery stmt sql)]
(when (.next rs)
(.getObject rs 1))))))
(def ^:private count-cache
(atom (-> (cache/ttl-cache-factory {} :ttl 1800000)
(cache/lru-cache-factory :threshold 256))))
(defn- cached-count [sql]
(if-let [v (find @count-cache sql)]
(do (mu/log ::count-cache :hit true :sql (sql-snippet sql)) (val v))
(do (mu/log ::count-cache :hit false :sql (sql-snippet sql))
(let [result (query-scalar sql)]
(swap! count-cache assoc sql result)
result))))
(defn query-rows [sql]
(mu/trace ::query-rows
[:sql (sql-snippet sql)]
(with-duckdb
(let [stmt (.createStatement conn)
rs (.executeQuery stmt sql)
meta (.getMetaData rs)
col-count (.getColumnCount meta)
cols (vec (for [i (range 1 (inc col-count))]
(keyword (.getColumnLabel meta i))))]
(loop [rows []]
(if (.next rs)
(recur (conj rows
(zipmap cols
(vec (for [i (range 1 (inc col-count))]
(.getObject rs i))))))
rows))))))
(defn execute-to-parquet! [sql ^String parquet-path]
(with-duckdb
(let [stmt (.createStatement conn)]
(.execute stmt
(format "COPY (%s) TO '%s' (FORMAT PARQUET, OVERWRITE_OR_IGNORE, ROW_GROUP_SIZE 10000, COMPRESSION 'zstd')"
sql parquet-path))
(io/file parquet-path))))
(defn upload-parquet! [local-parquet-file s3-key]
(s3/put-object {:bucket-name *bucket*
:key s3-key
:file local-parquet-file})
(s3-location s3-key))
(defonce *buffers* (atom {}))
(defn- wal-dir []
(io/file (System/getProperty "user.dir" "/tmp")
"parquet-wal"))
(defn- init-wal! []
(let [dir (wal-dir)]
(when-not (.exists dir)
(.mkdirs dir))))
(defn buffer! [entity-type record]
(init-wal!)
(let [seq-no (System/currentTimeMillis)
entry (assoc record :_seq-no seq-no)]
(swap! *buffers* update entity-type (fnil conj []) entry)
(try
(let [wal-file (io/file (wal-dir)
(str entity-type ".jsonl"))]
(io/make-parents wal-file)
(with-open [w (io/writer wal-file :append true)]
(.write w ^String (json/write-str {:seq-no seq-no
:record record}))
(.write w (int \newline))))
(catch Exception e
(println "[parquet/wal]" (.getMessage e))))
entry))
(defn clear-buffer! [entity-type]
(swap! *buffers* dissoc entity-type))
(defn buffer-count [entity-type]
(-> @*buffers* (get entity-type []) count))
(defn total-buf-count []
(->> @*buffers*
vals (mapcat identity) count))
(defn flush-to-parquet! [entity-type date-str]
"Flush buffered records for entity-type to monthly parquet + S3.
Reads existing monthly file (if any), merges with new records, and uploads.
Uses temp table to ensure ROW_GROUP_SIZE is respected (DuckDB ignores it
when reading directly from S3 via COPY)."
(let [records (get @*buffers* entity-type [])]
(if (empty? records)
{:status :no-records}
(let [date-str (or date-str (.toString (LocalDate/now)))
s3-key (parquet-key entity-type date-str)
s3-url (s3-location s3-key)
jsonl-file (io/file "/tmp"
(str entity-type "-" date-str ".jsonl"))
parquet-file (io/file "/tmp"
(str entity-type "-" date-str ".parquet"))
tbl (format "\"_flush_%s_%s\""
(clojure.string/replace entity-type "-" "_")
(subs date-str 0 7))]
(try
(with-open [w (io/writer jsonl-file :append true)]
(doseq [r records]
(.write w ^String (json/write-str (dissoc r :_seq-no)))
(.write w (int \newline))))
(let [existing-sql (format
"SELECT * FROM read_parquet('%s', union_by_name=true)"
s3-url)
new-sql (format
"SELECT * FROM read_json_auto('%s')"
(.getAbsolutePath jsonl-file))]
(execute! (format "CREATE OR REPLACE TABLE %s AS SELECT * FROM (%s UNION ALL %s) ORDER BY \"client-code\", date"
tbl existing-sql new-sql))
(execute! (format "COPY (SELECT * FROM %s) TO '%s' (FORMAT PARQUET, OVERWRITE_OR_IGNORE, ROW_GROUP_SIZE 10000, COMPRESSION 'zstd')"
tbl (.getAbsolutePath parquet-file)))
(execute! (format "DROP TABLE IF EXISTS %s" tbl))
(upload-parquet! parquet-file s3-key)
(clear-buffer! entity-type)
(.delete ^java.io.File jsonl-file)
(.delete ^java.io.File parquet-file)
{:key s3-key :status :ok})
(catch Exception e
(execute! (format "DROP TABLE IF EXISTS %s" tbl))
(throw (ex-info "Flush failed"
{:entity-type entity-type
:error (.getMessage e)}))))))))
(defn flush-by-date! []
"Flush all entity types for today."
(let [etypes ["sales-order" "charge"
"line-item" "sales-refund"]
today (.toString (LocalDate/now))
flushed (into #{}
(keep (fn [et]
(let [{:keys [status]}
(flush-to-parquet! et today)]
(when (= status :ok)
et))))
etypes)]
{:flushed flushed}))
(defn load-unflushed! []
"Restore unflushed records from WAL jsonl files into *buffers."
(init-wal!)
(let [etypes ["sales-order" "charge"
"line-item" "sales-refund"]
loaded (reduce-kv
(fn [acc et data]
(if-not (empty? data)
(assoc acc et
(->> (str/split-lines data)
(keep #(try
(let [entry (json/read-str %)]
(when entry
(assoc (:record entry) :_seq-no (:seq-no entry))))
(catch Exception _)))))
acc))
{}
(into {}
(keep (fn [et]
(let [f (io/file
(wal-dir)
(str et ".jsonl"))]
(when (.exists f)
[et (slurp f)]))))
etypes))]
(swap! *buffers* merge loaded)))
(defn get-unflushed-count []
(total-buf-count))
(defn unflushed-records? []
(not= 0 (total-buf-count)))
;;; DuckDB Read Layer
(defn date-seq [start end]
"Seq of YYYY-MM-DD strings between start and end inclusive."
(let [sd (LocalDate/parse start)
ed (LocalDate/parse end)]
(when (.isAfter sd ed)
(throw (ex-info "date-seq: start must be <= end" {:start start :end end})))
(let [days (int (- (.toEpochDay ed)
(.toEpochDay sd)))]
(for [i (range 0 (inc days))]
(.toString (.plusDays sd i))))))
(defn today []
(.toString (LocalDate/now)))
(def ^:private mm-dd-yyyy (java.time.format.DateTimeFormatter/ofPattern "MM/dd/yyyy"))
(defn- normalize-date-str [s]
(when s
(if (re-find #"^\d{2}/\d{2}/\d{4}" s)
(.toString (LocalDate/parse s mm-dd-yyyy))
(if (> (count s) 10) (subs s 0 10) s))))
(defn month-seq [start-date end-date]
"Seq of YYYY-MM strings between start-date and end-date inclusive."
(let [sd (LocalDate/parse (normalize-date-str start-date))
ed (LocalDate/parse (normalize-date-str end-date))]
(loop [months [] cur sd]
(if (.isAfter cur ed)
months
(recur (conj months (.toString (.withDayOfMonth cur 1)))
(.plusMonths cur 1))))))
(defn- parquet-glob [entity-type start-date end-date]
"Build explicit file list for the date range using monthly partitions.
Monthly files mean only 1-3 files for typical queries, 12 for a full year."
(let [prefix (format "s3://%s/sales-details/%s/" *bucket* entity-type)]
(vec
(map (fn [m]
(format "'%s%s.parquet'" prefix m))
(month-seq start-date end-date)))))
(defn parquet-query [entity-type start-date end-date]
"Build SQL to read monthly parquet files in date range.
Uses explicit file list (monthly = few files) + WHERE date filter.
Normalizes date formats (handles MM/dd/yyyy from UI)."
(let [sd (normalize-date-str start-date)
ed (normalize-date-str end-date)
files (parquet-glob entity-type sd ed)
base (format "SELECT * FROM read_parquet([%s], union_by_name=true)"
(str/join ", " files))
sql (format "%s WHERE date >= '%s' AND date <= '%s'"
base sd ed)]
{:sql sql
:count-sql (format "SELECT COUNT(*) FROM (%s) t" sql)}))
(defn- like-clause [col v]
(str "\"" col "\" LIKE '%" v "%'"))
(defn- build-sales-orders-where [opts]
(let [eq-clauses (keep
(fn [[key col]]
(let [v (get opts key)]
(when v
(str "\"" col "\" = '" v "'"))))
[[:client "client-code"]
[:vendor "vendor"]
[:location "location"]])
in-clauses (keep
(fn [[key col]]
(let [vs (get opts key)]
(when (seq vs)
(str "\"" col "\" IN ("
(str/join ", " (map #(str "'" % "'") vs))
")"))))
[[:client-codes "client-code"]])
like-clauses (keep
(fn [[key col]]
(let [v (get opts key)]
(when v
(like-clause col v))))
[[:payment-method "payment-methods"]
[:processor "processors"]
[:category "categories"]])
range-clauses (keep
(fn [[key col op]]
(let [v (get opts key)]
(when v
(str "\"" col "\" " op " " v))))
[[:total-gte "total" ">="]
[:total-lte "total" "<="]])
all-clauses (concat eq-clauses in-clauses like-clauses range-clauses)]
(when (seq all-clauses)
(str/join " AND " all-clauses))))
(defn get-sales-orders
([start-date end-date]
(get-sales-orders start-date end-date {}))
([start-date end-date opts]
(mu/trace ::get-sales-orders
[:start-date start-date :end-date end-date :opts opts]
(try
(let [q (parquet-query "sales-order" start-date end-date)
base-sql (:sql q)
has-where? (str/includes? base-sql " WHERE ")
sort (get opts :sort "date")
order (get opts :order "DESC")
limit (get opts :limit)
offset (get opts :offset)
extra-clauses (build-sales-orders-where opts)
full-sql (if extra-clauses
(str base-sql (if has-where? " AND " " WHERE ") extra-clauses)
base-sql)
data-sql (cond-> full-sql
sort (str " ORDER BY " sort " " (name order))
limit (str " LIMIT " limit)
offset (str " OFFSET " offset))
count-sql (format "SELECT COUNT(*) FROM (%s) t" full-sql)]
(mu/log ::get-sales-orders :data-sql data-sql :count-sql count-sql)
(let [cnt (cached-count count-sql)
rows (query-rows data-sql)]
{:rows rows
:count (or (int cnt) 0)}))
(catch Exception e
(mu/log ::get-sales-orders :error e :start-date start-date :end-date end-date :opts opts)
(throw e))))))
(def ^:private summary-cache
(atom (-> (cache/ttl-cache-factory {} :ttl 1800000)
(cache/lru-cache-factory :threshold 256))))
(defn- cached-summary [sql]
(if-let [v (find @summary-cache sql)]
(do (mu/log ::summary-cache :hit true :sql (sql-snippet sql)) v)
(do (mu/log ::summary-cache :hit false :sql (sql-snippet sql))
(let [result (let [row (first (query-rows sql))]
{:total (or (:total row) 0.0)
:tax (or (:tax row) 0.0)})]
(swap! summary-cache assoc sql result)
result))))
(defn get-sales-orders-summary
([start-date end-date]
(get-sales-orders-summary start-date end-date {}))
([start-date end-date opts]
(mu/trace ::get-sales-orders-summary
[:start-date start-date :end-date end-date :opts opts]
(try
(let [q (parquet-query "sales-order" start-date end-date)
base-sql (:sql q)
has-where? (str/includes? base-sql " WHERE ")
extra-clauses (build-sales-orders-where opts)
full-sql (if extra-clauses
(str base-sql (if has-where? " AND " " WHERE ") extra-clauses)
base-sql)
sum-sql (format "SELECT COALESCE(SUM(total), 0) as total, COALESCE(SUM(tax), 0) as tax FROM (%s) t" full-sql)]
(mu/log ::get-sales-orders-summary :sum-sql sum-sql)
(cached-summary sum-sql))
(catch Exception e
(mu/log ::get-sales-orders-summary :error e :start-date start-date :end-date end-date :opts opts)
(throw e))))))
(defn query-deduped [entity-type start-date end-date]
"Query records deduplicated by external-id (latest _seq_no wins)."
(let [q (parquet-query entity-type start-date end-date)]
(query-rows
(str (:sql q)
" QUALIFY ROW_NUMBER() OVER"
" (PARTITION BY \"external-id\""
" ORDER BY _seq_no DESC) = 1"))))
(defn query-by-entity-id [entity-type external-id
start-date end-date]
(->> (query-deduped entity-type start-date end-date)
(filter #(= (:external_id %)
(name external-id)))
first))
(defn count-records-in-parquet
[entity-type start-date end-date]
(let [q (parquet-query entity-type
start-date end-date)]
(or (int (query-scalar (:count-sql q))) 0)))

View File

@@ -1,184 +0,0 @@
(ns auto-ap.storage.sales-summaries
"Aggregation functions querying Parquet files on S3 via DuckDB.
Entity types: sales-order | charge | line-item | sales-refund
S3 pattern: s3://<bucket>/sales-details/<entity-type>/<YYYY-MM-DD>.parquet"
(:require [auto-ap.storage.parquet :as p]
[clojure.string :as str]))
(defn- dq [name]
(str "\"" name "\""))
(defn- sum-dbl [val]
(try
(if val (double val) 0.0)
(catch Exception _e
0.0)))
(defn- pq-files [entity-type start-date end-date]
"Vector of S3 parquet file paths for date range (monthly partitions)."
(let [months (p/month-seq start-date end-date)]
(vec
(map #(str "'s3://" p/*bucket*
"/sales-details/" entity-type "/"
% ".parquet") months))))
(defn sum-payments-by-type [client-id start-date end-date]
"Return {processor-key -> {type-name-string -> total-double}}."
(let [files (pq-files "charge" start-date end-date)]
(try
(let [sql (str "SELECT "
(dq "processor")
" AS proc, "
(dq "type-name")
" AS type_name, "
"SUM("
(dq "total")
")::DOUBLE AS total_amount "
"FROM read_parquet(["
(str/join ", " files)
"]) "
"WHERE "
(dq "client-code")
" = '" client-id "' "
"GROUP BY "
(dq "processor") ", "
(dq "type-name"))]
(let [rows (p/query-rows sql)]
(reduce (fn [acc row]
(let [proc (:proc row)
tname (str/trim (name (:type_name row)))
total (sum-dbl (:total_amount row))]
(update acc proc
(fn [inner]
(let [b (or inner {})]
(assoc b
tname
(+ (get b tname 0.0) total)))))))
{}
rows)))
(catch Exception e
(println "[sales-summaries]" (.getMessage e))
{}))))
(defn sum-discounts [client-id start-date end-date]
(let [files (pq-files "sales-order" start-date end-date)]
(try
(let [sql (str "SELECT SUM("
(dq "discount")
")::DOUBLE AS discount_total "
"FROM read_parquet(["
(str/join ", " files)
"]) "
"WHERE "
(dq "client-code")
" = '" client-id "'")]
(or (some-> (first (p/query-rows sql)) :discount_total sum-dbl) 0.0))
(catch Exception e
(println "[sales-summaries/discounts]" (.getMessage e))
0.0))))
(defn sum-refunds-by-type [client-id start-date end-date]
(let [files (pq-files "sales-refund" start-date end-date)]
(try
(let [sql (str "SELECT "
(dq "type-name")
" AS type_name, "
"SUM("
(dq "total")
")::DOUBLE AS total_amount "
"FROM read_parquet(["
(str/join ", " files)
"]) "
"WHERE "
(dq "sales-order-external-id")
" IN (SELECT "
(dq "external-id")
" FROM read_parquet(["
(str/join ", " (pq-files "sales-order" start-date end-date))
"]) WHERE "
(dq "client-code")
" = '" client-id "') "
"GROUP BY " (dq "type-name"))]
(let [rows (p/query-rows sql)]
(reduce (fn [acc row]
(let [tname (str/trim (name (:type_name row)))
total (sum-dbl (:total_amount row))]
(assoc acc tname (+ (get acc tname 0.0) total))))
{}
rows)))
(catch Exception e
(println "[sales-summaries/refunds]" (.getMessage e))
{}))))
(defn sum-taxes [client-id start-date end-date]
(let [files (pq-files "sales-order" start-date end-date)]
(try
(let [sql (str "SELECT SUM("
(dq "tax")
")::DOUBLE AS tax_total "
"FROM read_parquet(["
(str/join ", " files)
"]) "
"WHERE "
(dq "client-code")
" = '" client-id "'")]
(or (some-> (first (p/query-rows sql)) :tax_total sum-dbl) 0.0))
(catch Exception e
(println "[sales-summaries/tax]" (.getMessage e))
0.0))))
(defn sum-tips [client-id start-date end-date]
(let [files (pq-files "sales-order" start-date end-date)]
(try
(let [sql (str "SELECT SUM("
(dq "tip")
")::DOUBLE AS tip_total "
"FROM read_parquet(["
(str/join ", " files)
"]) "
"WHERE "
(dq "client-code")
" = '" client-id "'")]
(or (some-> (first (p/query-rows sql)) :tip_total sum-dbl) 0.0))
(catch Exception e
(println "[sales-summaries/tip]" (.getMessage e))
0.0))))
(defn sum-sales-by-category [client-id start-date end-date]
(let [files (pq-files "line-item" start-date end-date)]
(try
(let [sql (str "SELECT "
(dq "category")
" AS category, "
"SUM("
(dq "total")
")::DOUBLE AS total_amount, "
"SUM("
(dq "tax")
")::DOUBLE AS tax_amount, "
"SUM("
(dq "discount")
")::DOUBLE AS discount_amount "
"FROM read_parquet(["
(str/join ", " files)
"]) "
"WHERE "
(dq "sales-order-external-id")
" IN (SELECT "
(dq "external-id")
" FROM read_parquet(["
(str/join ", " (pq-files "sales-order" start-date end-date))
"]) WHERE "
(dq "client-code")
" = '" client-id "') "
"GROUP BY " (dq "category"))]
(let [rows (p/query-rows sql)]
(mapv (fn [row]
{:category (or (:category row) "Unknown")
:total (sum-dbl (:total_amount row))
:tax (sum-dbl (:tax_amount row))
:discount (sum-dbl (:discount_amount row))})
rows)))
(catch Exception e
(println "[sales-summaries/sales]" (.getMessage e))
[]))))

View File

@@ -51,22 +51,3 @@
(is (= "720.33" (:total (nth results 1))))
(is (= "853.16" (:total (nth results 2))))
(is (= "1066.60" (:total (nth results 3)))))))
(deftest parse-bonanza-produce-invoice-03882095
(testing "Should parse Bonanza Produce invoice 03882095 with customer identifier including address"
(let [pdf-file (io/file "dev-resources/INVOICE - 03882095.pdf")
pdf-text (:out (clojure.java.shell/sh "pdftotext" "-layout" (str pdf-file) "-"))
results (sut/parse pdf-text)
result (first results)]
(is (some? results) "parse should return a result")
(is (some? result) "Template should match and return a result")
(when result
(is (= "Bonanza Produce" (:vendor-code result)))
(is (= "03882095" (:invoice-number result)))
(let [d (:date result)]
(is (= 2026 (time/year d)))
(is (= 1 (time/month d)))
(is (= 23 (time/day d))))
(is (= "NICK THE GREEK" (:customer-identifier result)))
(is (= "600 VISTA WAY" (str/trim (:account-number result))))
(is (= "946.24" (:total result)))))))

View File

@@ -1,30 +0,0 @@
(ns auto-ap.storage.parquet-test
(:require [auto-ap.storage.parquet :as p]
[clojure.test :refer [deftest is testing use-fixtures]]))
(deftest test-query-scalar
(testing "SELECT 1 returns 1"
(is (= 1 (p/query-scalar "SELECT 1")))))
(deftest test-query-scalar-with-expression
(testing "SELECT 2 + 2 returns 4"
(is (= 4 (p/query-scalar "SELECT 2 + 2")))))
(deftest test-buffer
(testing "buffer! adds record to buffer"
(p/clear-buffer! "test-type")
(p/buffer! "test-type" {:id 1 :name "test"})
(is (= 1 (p/buffer-count "test-type")))))
(deftest test-clear-buffer
(testing "clear-buffer! empties buffer"
(p/clear-buffer! "test-type")
(p/buffer! "test-type" {:id 2})
(is (= 1 (p/buffer-count "test-type")))
(p/clear-buffer! "test-type")
(is (= 0 (p/buffer-count "test-type")))))
(deftest test-date-seq
(testing "date-seq generates correct sequence"
(let [result (p/date-seq "2024-04-01" "2024-04-03")]
(is (= ["2024-04-01" "2024-04-02" "2024-04-03"] result)))))

View File

@@ -1,113 +0,0 @@
(ns auto-ap.storage.perf-test
(:require [auto-ap.storage.parquet :as p]
[amazonica.aws.s3 :as s3]
[clojure.java.io :as io]
[clojure.string :as str])
(:import (java.sql DriverManager)
(java.time Instant)))
(defn timestamp []
(System/currentTimeMillis))
(defn timed [label sql-fn]
(let [start (timestamp)
result (sql-fn)
elapsed (- (timestamp) start)]
(println (format "%s: %d ms" label elapsed))
result))
(defn run-perf-tests []
(p/connect!)
(try
(let [bucket "data.dev.app.integreatconsult.com"
prefix "test-duckdb"
local-parquet "/tmp/test_data.parquet"
s3-key (str prefix "/data.parquet")]
;; Create 100k test rows
(println "\n=== Creating 100k test rows ===")
(p/execute! "DROP TABLE IF EXISTS test_data")
(p/execute! (str "
CREATE TABLE test_data AS
SELECT
i AS id,
'order_' || i AS external_id,
CASE (i % 5)
WHEN 0 THEN 'north'
WHEN 1 THEN 'south'
WHEN 2 THEN 'east'
WHEN 3 THEN 'west'
ELSE 'central'
END AS region,
CASE (i % 8)
WHEN 0 THEN 'food'
WHEN 1 THEN 'beverage'
WHEN 2 THEN 'alcohol'
WHEN 3 THEN 'catering'
WHEN 4 THEN 'retail'
WHEN 5 THEN 'dessert'
WHEN 6 THEN 'merch'
ELSE 'other'
END AS category,
ROUND(1 + ABS(RANDOM() % 10000) / 100.0, 2) AS amount,
CAST(DATE '2024-01-01' + (i % 365) * INTERVAL '1 day' AS DATE) AS sale_date,
CASE WHEN i % 20 = 0 THEN 'voided' ELSE 'active' END AS status
FROM generate_series(1, 100000) AS t(i)"))
(println "Row count:" (p/query-scalar "SELECT COUNT(*) FROM test_data"))
(println "Voided count:" (p/query-scalar "SELECT COUNT(*) FROM test_data WHERE status = 'voided'"))
(println "Amount > 3 count:" (p/query-scalar "SELECT COUNT(*) FROM test_data WHERE amount > 3"))
;; Write to local parquet
(println "\n=== Writing local parquet ===")
(timed "Write parquet" #(p/execute-to-parquet! "SELECT * FROM test_data" local-parquet))
(let [f (io/file local-parquet)]
(println "File size:" (format "%.1f MB" (/ (.length f) 1048576.0))))
;; Upload to S3
(println "\n=== Uploading to S3 ===")
(timed "S3 upload" #(p/upload-parquet! (io/file local-parquet) prefix))
(println "S3 URI:" (p/s3-location s3-key))
;; Now test reading from S3
(println "\n=== Performance Tests (reading from S3) ===")
(let [s3-uri (str "s3://" bucket "/" s3-key)]
;; Register S3 parquet as a view/table in DuckDB
(p/execute! (format "CREATE VIEW s3_test AS SELECT * FROM read_parquet('%s')" s3-uri))
(println "Total rows in S3:" (p/query-scalar "SELECT COUNT(*) FROM s3_test"))
;; Test 1: Page 1 - first 25 rows
(println "\n--- Test 1: Page 1 (LIMIT 25 OFFSET 0) ---")
(timed "First page (25 rows)" #(p/query-rows "SELECT * FROM s3_test ORDER BY id LIMIT 25"))
(println "Sample row:" (first (p/query-rows "SELECT * FROM s3_test ORDER BY id LIMIT 1")))
;; Test 2: Page 20 - rows 475-500 (OFFSET 475)
(println "\n--- Test 2: Page 20 (LIMIT 25 OFFSET 475) ---")
(timed "Page 20 (25 rows)" #(p/query-rows "SELECT * FROM s3_test ORDER BY id LIMIT 25 OFFSET 475"))
;; Test 3: Filter amount > 3 (no pagination)
(println "\n--- Test 3: Filter amount > 3 (no limit) ---")
(timed "Filter amount > 3 (all)" #(do (p/query-scalar "SELECT COUNT(*) FROM s3_test WHERE amount > 3") :done))
;; Test 4: Filter + pagination
(println "\n--- Test 4: Filter amount > 3 + LIMIT 25 ---")
(timed "Filter + paginated (25 rows)" #(p/query-rows "SELECT * FROM s3_test WHERE amount > 3 ORDER BY id LIMIT 25"))
;; Test 5: Filter + page 20
(println "\n--- Test 5: Filter amount > 3 + LIMIT 25 OFFSET 475 ---")
(timed "Filter + page 20" #(p/query-rows "SELECT * FROM s3_test WHERE amount > 3 ORDER BY id LIMIT 25 OFFSET 475"))
;; Test 6: Aggregation on S3 data
(println "\n--- Test 6: Aggregation (SUM, AVG on amount) ---")
(timed "Aggregation SUM/AVG" #(p/query-scalar "SELECT SUM(amount), AVG(amount) FROM s3_test WHERE status = 'active'"))
;; Cleanup
(p/execute! "DROP VIEW IF EXISTS s3_test")
(p/execute! "DROP TABLE IF EXISTS test_data"))
(finally
(p/disconnect!))))
(comment
(run-perf-tests)
(println "\n=== Done ==="))