The Endpoint OpenAPI Docs →
GET /orders/{order_id}/images?format=jpeg&quality=80&preview=true&dev_mode=true
Returns a ZIP archive containing all enhanced images for the given order. Params: format (jpeg/png/webp), quality (1-90), preview, dev_mode.
curl example
curl "https://autoenhance.onrender.com/orders/{order_id}/images?dev_mode=true" -o images.zip
Try It
Paste an order ID to download its enhanced images instantly
Don't have an order? Create one here
Uploads 3 bundled real-estate photos to Autoenhance. No files needed.
or upload your own
How it works — Batch vs standard endpoints
Fast, all-or-nothing
Small payload, simple errors
Partial success is valid
Needs concurrency & throttling
Design decisions
- ✓Concurrency — Downloads run in parallel with a semaphore (max 5). Each image streams into the ZIP via
asyncio.as_completedand is freed immediately — no waiting for the full batch. - ✓Partial failure — If some images fail (still processing, timed out), the ZIP includes what succeeded plus a
_download_report.txt. - ✓Timeouts — 60s per image. Response headers report total / downloaded / failed counts.
- ✓Response format — ZIP with
ZIP_STORED(no compression) since JPEG/PNG/WebP are already compressed. Saves CPU with no size penalty. - ✓Redirects — Autoenhance returns 302 → asset server → S3. The client follows these transparently.
- ✓Connection reuse — A shared
httpx.AsyncClientis created once at startup via FastAPI lifespan. All requests reuse the same connection pool instead of creating one per request. - ✓Memory — Peak memory is bounded by the semaphore (max 5 in-flight), not the order size. The ZIP uses a
SpooledTemporaryFilethat spills to disk above 10 MB. Orders capped at 100 images as a safety limit. - ✓Async job pattern —
POST /orders/{id}/jobsreturns a job ID immediately (202). PollGET /jobs/{id}for status; download viaGET /jobs/{id}/download. Sidesteps server timeouts on large orders without duplicating any download logic.
Assumptions & open questions
- ?Upload pipeline is external — This endpoint is downstream of whatever upload flow the client uses (web app, SDK, direct API). We accept an
order_idafter images are already uploaded and enhanced. - ?Enhancement timing is unknown — Images may still be processing when the batch endpoint is called. Handled gracefully via partial success + failure report, but we can't trigger or wait for completion.
- ?Order schema is partially documented — We check for both
image_id/idandimage_name/nameto handle field name variations in the API response. - ?No completion webhook — Without a callback, the caller must wait or poll before requesting the batch download.
- ?Rate limits undocumented — We default to 5 concurrent downloads. The actual Autoenhance limit isn't published, so this can be tuned with production data.
Possible extensions
- →Poll-until-ready — A
wait=trueparam that retries until all images are processed before downloading. - →Webhook integration — Auto-trigger the batch download when Autoenhance signals an order is complete.
Production considerations
Observability
- ●Structured logging — Python
loggingat INFO/WARNING/ERROR. Logs order retrieval, per-image status, and final counts. - ○Distributed tracing — OpenTelemetry spans for the order fetch and each image download. Shows where time is spent (upstream latency vs. ZIP creation).
- ●Error tracking — Sentry SDK captures unhandled exceptions with request context. Activated via
SENTRY_DSNenv var; no-op when unset.
Metrics
- ○Request latency — P50/P95/P99 for the batch endpoint, plus per-image latency (scales with order size).
- ○Success/failure rates — Full success, partial success, and total failure. Alert on partial-failure spikes (may signal processing delays).
- ○Upstream health — Track Autoenhance API response times and errors separately. Detect degradation before users report it.
- ○ZIP size distribution — Monitor payload sizes to catch memory pressure early.
Versioning
- ●Autoenhance API — Pinned to
/v3. A v4 release won't break us until we explicitly migrate. - ○Our own API — Currently unversioned. Add a
/v1/prefix so the response format can evolve without breaking callers. - ○Dependency pinning — Exact versions in
requirements.txt. Addpip-toolsfor reproducible builds with transitive deps.
Security
- ●API key isolation — Stored in env var, never committed.
.gitignoreexcludes.env. - ●Input validation — Order ID validated as UUID before any upstream call. Malformed input returns 400 immediately.
- ○Rate limiting — Each batch call fans out to N upstream requests. Add per-IP throttling to prevent quota exhaustion.
- ●Authentication — Optional
SERVICE_API_KEYenv var. When set, batch and job endpoints require anX-API-Keyheader (timing-safe comparison). Leave unset for open access.
Testing
- ●Manual E2E — Verified with a real 3-image order against the live API, locally and on Render.
- ●Unit tests — 20 tests via
httpxmock transport. Covers validation, success/partial/total failure, retry logic, network errors, edge cases, health, and UI. - ○Integration tests — Real API calls via
x-dev-mode(no credits). Assert ZIP contents and structure. - ○Load testing — Stress-test with large orders (50+ images) and concurrent callers to find memory and timeout limits.
Operational
- ●Health check —
/healthreports status and API key config. Monitored by UptimeRobot. - ○Graceful degradation — Return 503 with Retry-After when Autoenhance is down, instead of timing out. Circuit breaker pattern.
- ●Caching — In-process ZIP cache keyed by
(order_id, format, quality, preview, dev_mode)with a 1-hour TTL. Repeat requests for the same order are served instantly without hitting the upstream API. - ○CI/CD — Auto-deploys from
mainvia Render. Add a test gate and staging environment.
Production-Hardened Version
Production-grade additions to the batch endpoint: input validation, unit tests, error tracking, and patterns for circuit breaking, caching, and rate limiting. The Interview tab is the primary submission.
UUID Input Validation IMPLEMENTED
app/routes/batch.py
# Validate order_id is a UUID before making upstream calls import re uuid_pattern = re.compile( r"^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$", re.IGNORECASE, ) if not uuid_pattern.match(order_id): raise HTTPException( status_code=400, detail=f"Invalid order ID format. Expected UUID, got: '{order_id}'", )
Unit Test Suite IMPLEMENTED
test_app.py — 20 tests, all passing
# Mock strategy: patch the shared client with MockTransport def make_mock_client(order_response, image_responses): transport = httpx.MockTransport(handler) return httpx.AsyncClient(transport=transport) # monkeypatch replaces the module-level shared client monkeypatch.setattr(state_module, "_http_client", make_mock_client(...))
httpx.MockTransport intercepts all outgoing HTTP — no real API calls, no credits consumed. Patches the shared _http_client instance per-test for full isolation.Sentry Error Tracking IMPLEMENTED
app/__init__.py
import sentry_sdk # Only active when DSN is configured (no-op otherwise) if os.getenv("SENTRY_DSN"): sentry_sdk.init( dsn=os.getenv("SENTRY_DSN"), traces_sample_rate=0.2, environment=os.getenv("SENTRY_ENV", "production"), )
SENTRY_DSN is unset.Performance Optimizations IMPLEMENTED
app/__init__.py, app/routes/batch.py
# Shared HTTP client — created once at startup @asynccontextmanager async def _lifespan(app: FastAPI): global _http_client _http_client = httpx.AsyncClient(timeout=60.0, follow_redirects=True) yield await _http_client.aclose() # ZIP_STORED — no compression on already-compressed images zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_STORED) # Free memory incrementally as images are written zf.writestr(filename, result["content"]) result["content"] = None # Cap at 100 images to bound memory usage MAX_IMAGES_PER_ORDER = 100
Runtime Stats & Monitoring IMPLEMENTED
app/state.py, app/routes/monitoring.py
# In-memory counters, incremented per request _stats = { "orders_processed": 0, "images_downloaded": 0, "images_failed": 0, "zips_served": 0, "errors": [], # last 20 errors } @app.get("/api/stats") async def runtime_stats(): return {"uptime_seconds": ..., **_stats}
Security Hardening IMPLEMENTED
app/__init__.py, app/auth.py
# CORS — restrict to known origins only app.add_middleware(CORSMiddleware, allow_origins=["https://autoenhance.onrender.com", ...], allow_methods=["GET", "POST", "HEAD", "OPTIONS"]) # Security headers on every response response.headers["X-Content-Type-Options"] = "nosniff" response.headers["X-Frame-Options"] = "DENY" response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin" # Admin token gates credit-consuming endpoints _admin_token = os.getenv("ADMIN_TOKEN") or secrets.token_urlsafe(32) def _require_admin(request): token = request.cookies.get("_at") or request.headers.get("X-Admin-Token") if not hmac.compare_digest(token, _admin_token): ... # Upstream errors logged, not leaked to clients logger.error("Upstream error: %d %s", status, text) raise HTTPException(502, "Failed to retrieve order")
httponly cookie — not visible in page source or accessible to JavaScript. Timing-safe comparison via hmac.compare_digest protects /api/create-order, /api/create-sample-order, and /sentry-debug from unauthorized use. Upstream API error bodies are logged server-side but never exposed to clients.Response Caching IMPLEMENTED
app/state.py — ZIP-level cache by (order_id, format, quality, preview, dev_mode)
# state.py — TTL dict, bypassed automatically during pytest _zip_cache: dict[tuple, dict] = {} def get_cached_zip(key: tuple) -> dict | None: entry = _zip_cache.get(key) if entry and time.time() - entry["cached_at"] < ZIP_CACHE_TTL_SECONDS: return entry _zip_cache.pop(key, None) return None # batch.py — check before downloading, store after cache_key = (order_id, image_format, quality, preview, dev_mode) cached = get_cached_zip(cache_key) if cached: return StreamingResponse(io.BytesIO(cached["zip_bytes"]), ...) zip_bytes, filename, headers = await _run_batch(...) set_cached_zip(cache_key, zip_bytes, filename, headers)
PYTEST_CURRENT_TEST) to prevent test cross-contamination. In production, swap for Redis with a TTL.Async Job Pattern for Large Orders IMPLEMENTED
app/routes/jobs.py
# 1. POST /orders/{order_id}/jobs — returns 202 immediately @router.post("/orders/{order_id}/jobs", status_code=202) async def create_batch_job(order_id, background_tasks, ...): job_id = str(uuid.uuid4()) set_job(job_id, {"status": "processing", ...}) background_tasks.add_task(_process_job, job_id, order_id, ...) return {"job_id": job_id} # 2. GET /jobs/{job_id} — poll until "complete" or "error" @router.get("/jobs/{job_id}") async def get_job_status(job_id): job = get_job(job_id) # returns None if expired (1h TTL) return {"job_id": job_id, "status": job["status"], "error": job.get("error")} # 3. GET /jobs/{job_id}/download — fetch the ZIP @router.get("/jobs/{job_id}/download") async def download_job_result(job_id): return StreamingResponse(io.BytesIO(job["zip_bytes"]), media_type="application/zip", ...)
_run_batch() helper as the synchronous endpoint — no duplicated logic.Circuit Breaker Pattern NEXT STEP
How it would integrate into batch.py
class CircuitBreaker: def __init__(self, failure_threshold=5, reset_timeout=60): self.failures = 0 self.threshold = failure_threshold self.reset_timeout = reset_timeout self.state = "closed" # closed | open | half-open self.last_failure_time = 0 def record_failure(self): self.failures += 1 if self.failures >= self.threshold: self.state = "open" self.last_failure_time = time.time() def allow_request(self) -> bool: if self.state == "closed": return True if time.time() - self.last_failure_time > self.reset_timeout: self.state = "half-open" return True return False # In the endpoint: if not circuit_breaker.allow_request(): raise HTTPException(503, detail="Autoenhance API temporarily unavailable", headers={"Retry-After": "60"})
Rate Limiting Our Endpoint NEXT STEP
# Using slowapi (built on limits library) from slowapi import Limiter from slowapi.util import get_remote_address limiter = Limiter(key_func=get_remote_address) @app.get("/orders/{order_id}/images") @limiter.limit("10/minute") async def batch_download_order_images(request: Request, ...): ...
Azoni AI CHATBOT
Ask me anything about this batch endpoint, the design decisions, or Charlton's background.