autoenhance.ai BATCH API
Charlton Smith — Technical Assessment | The Interview Submission tab is the deliverable. The rest is extra.
View Source on GitHub

The Endpoint OpenAPI Docs →

GET /orders/{order_id}/images?format=jpeg&quality=80&preview=true&dev_mode=true

Returns a ZIP archive containing all enhanced images for the given order. Params: format (jpeg/png/webp), quality (1-90), preview, dev_mode.

curl example
curl "https://autoenhance.onrender.com/orders/{order_id}/images?dev_mode=true"   -o images.zip

Try It

Paste an order ID to download its enhanced images instantly

Advanced options
Don't have an order? Create one here

Uploads 3 bundled real-estate photos to Autoenhance. No files needed.


or upload your own
📷
Drop images here or click to browse
JPG, PNG, or WebP — multiple files supported
How it works — Batch vs standard endpoints
Standard endpoint 1 request → 1 image
Fast, all-or-nothing
Small payload, simple errors
Batch endpoint (this) 1 request → N images
Partial success is valid
Needs concurrency & throttling

Design decisions

  • Concurrency — Downloads run in parallel with a semaphore (max 5). Each image streams into the ZIP via asyncio.as_completed and is freed immediately — no waiting for the full batch.
  • Partial failure — If some images fail (still processing, timed out), the ZIP includes what succeeded plus a _download_report.txt.
  • Timeouts — 60s per image. Response headers report total / downloaded / failed counts.
  • Response format — ZIP with ZIP_STORED (no compression) since JPEG/PNG/WebP are already compressed. Saves CPU with no size penalty.
  • Redirects — Autoenhance returns 302 → asset server → S3. The client follows these transparently.
  • Connection reuse — A shared httpx.AsyncClient is created once at startup via FastAPI lifespan. All requests reuse the same connection pool instead of creating one per request.
  • Memory — Peak memory is bounded by the semaphore (max 5 in-flight), not the order size. The ZIP uses a SpooledTemporaryFile that spills to disk above 10 MB. Orders capped at 100 images as a safety limit.
  • Async job patternPOST /orders/{id}/jobs returns a job ID immediately (202). Poll GET /jobs/{id} for status; download via GET /jobs/{id}/download. Sidesteps server timeouts on large orders without duplicating any download logic.
Assumptions & open questions
  • ?Upload pipeline is external — This endpoint is downstream of whatever upload flow the client uses (web app, SDK, direct API). We accept an order_id after images are already uploaded and enhanced.
  • ?Enhancement timing is unknown — Images may still be processing when the batch endpoint is called. Handled gracefully via partial success + failure report, but we can't trigger or wait for completion.
  • ?Order schema is partially documented — We check for both image_id/id and image_name/name to handle field name variations in the API response.
  • ?No completion webhook — Without a callback, the caller must wait or poll before requesting the batch download.
  • ?Rate limits undocumented — We default to 5 concurrent downloads. The actual Autoenhance limit isn't published, so this can be tuned with production data.

Possible extensions

  • Poll-until-ready — A wait=true param that retries until all images are processed before downloading.
  • Webhook integration — Auto-trigger the batch download when Autoenhance signals an order is complete.
Production considerations

Observability

  • Structured logging — Python logging at INFO/WARNING/ERROR. Logs order retrieval, per-image status, and final counts.
  • Distributed tracing — OpenTelemetry spans for the order fetch and each image download. Shows where time is spent (upstream latency vs. ZIP creation).
  • Error tracking — Sentry SDK captures unhandled exceptions with request context. Activated via SENTRY_DSN env var; no-op when unset.

Metrics

  • Request latency — P50/P95/P99 for the batch endpoint, plus per-image latency (scales with order size).
  • Success/failure rates — Full success, partial success, and total failure. Alert on partial-failure spikes (may signal processing delays).
  • Upstream health — Track Autoenhance API response times and errors separately. Detect degradation before users report it.
  • ZIP size distribution — Monitor payload sizes to catch memory pressure early.

Versioning

  • Autoenhance API — Pinned to /v3. A v4 release won't break us until we explicitly migrate.
  • Our own API — Currently unversioned. Add a /v1/ prefix so the response format can evolve without breaking callers.
  • Dependency pinning — Exact versions in requirements.txt. Add pip-tools for reproducible builds with transitive deps.

Security

  • API key isolation — Stored in env var, never committed. .gitignore excludes .env.
  • Input validation — Order ID validated as UUID before any upstream call. Malformed input returns 400 immediately.
  • Rate limiting — Each batch call fans out to N upstream requests. Add per-IP throttling to prevent quota exhaustion.
  • Authentication — Optional SERVICE_API_KEY env var. When set, batch and job endpoints require an X-API-Key header (timing-safe comparison). Leave unset for open access.

Testing

  • Manual E2E — Verified with a real 3-image order against the live API, locally and on Render.
  • Unit tests — 20 tests via httpx mock transport. Covers validation, success/partial/total failure, retry logic, network errors, edge cases, health, and UI.
  • Integration tests — Real API calls via x-dev-mode (no credits). Assert ZIP contents and structure.
  • Load testing — Stress-test with large orders (50+ images) and concurrent callers to find memory and timeout limits.

Operational

  • Health check/health reports status and API key config. Monitored by UptimeRobot.
  • Graceful degradation — Return 503 with Retry-After when Autoenhance is down, instead of timing out. Circuit breaker pattern.
  • Caching — In-process ZIP cache keyed by (order_id, format, quality, preview, dev_mode) with a 1-hour TTL. Repeat requests for the same order are served instantly without hitting the upstream API.
  • CI/CD — Auto-deploys from main via Render. Add a test gate and staging environment.

Live Server Stats

-
Orders
-
Images
-
ZIPs
-
Failed

Production-Hardened Version

Production-grade additions to the batch endpoint: input validation, unit tests, error tracking, and patterns for circuit breaking, caching, and rate limiting. The Interview tab is the primary submission.

UUID Input Validation IMPLEMENTED

app/routes/batch.py

# Validate order_id is a UUID before making upstream calls
import re
uuid_pattern = re.compile(
    r"^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$",
    re.IGNORECASE,
)
if not uuid_pattern.match(order_id):
    raise HTTPException(
        status_code=400,
        detail=f"Invalid order ID format. Expected UUID, got: '{order_id}'",
    )
Rejects bad input before any upstream call. Prevents wasted round-trips and injection vectors.

Unit Test Suite IMPLEMENTED

test_app.py — 20 tests, all passing

Invalid UUID rejected
SQL injection blocked
Empty order ID → 400
Valid UUID passes through
Full success → ZIP
Correct filenames in ZIP
Partial failure + report
All fail → 422
Empty order → 404
Order not found
Duplicate names deduped
Quality param passthrough
Retry on 5xx succeeds
No retry on 4xx
Network error → 502
Upstream 401 → 401
Upstream 500 → 502
Too many images → 413
Health endpoint
UI returns HTML
# Mock strategy: patch the shared client with MockTransport
def make_mock_client(order_response, image_responses):
    transport = httpx.MockTransport(handler)
    return httpx.AsyncClient(transport=transport)

# monkeypatch replaces the module-level shared client
monkeypatch.setattr(state_module, "_http_client", make_mock_client(...))
httpx.MockTransport intercepts all outgoing HTTP — no real API calls, no credits consumed. Patches the shared _http_client instance per-test for full isolation.

Sentry Error Tracking IMPLEMENTED

app/__init__.py

import sentry_sdk

# Only active when DSN is configured (no-op otherwise)
if os.getenv("SENTRY_DSN"):
    sentry_sdk.init(
        dsn=os.getenv("SENTRY_DSN"),
        traces_sample_rate=0.2,
        environment=os.getenv("SENTRY_ENV", "production"),
    )
FastAPI integration is automatic — Sentry captures unhandled exceptions with full request context. 20% trace sampling keeps costs low. No-op when SENTRY_DSN is unset.

Live Error Dashboard LIVE

Open Sentry →
  • Loading issues...

Performance Optimizations IMPLEMENTED

app/__init__.py, app/routes/batch.py

# Shared HTTP client — created once at startup
@asynccontextmanager
async def _lifespan(app: FastAPI):
    global _http_client
    _http_client = httpx.AsyncClient(timeout=60.0, follow_redirects=True)
    yield
    await _http_client.aclose()

# ZIP_STORED — no compression on already-compressed images
zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_STORED)

# Free memory incrementally as images are written
zf.writestr(filename, result["content"])
result["content"] = None

# Cap at 100 images to bound memory usage
MAX_IMAGES_PER_ORDER = 100
Connection reuse avoids per-request handshakes. ZIP_STORED saves CPU since JPEG/PNG/WebP are already compressed. Incremental freeing prevents 2x peak memory. 100-image cap bounds worst-case allocation.

Runtime Stats & Monitoring IMPLEMENTED

app/state.py, app/routes/monitoring.py

# In-memory counters, incremented per request
_stats = {
    "orders_processed": 0,
    "images_downloaded": 0,
    "images_failed": 0,
    "zips_served": 0,
    "errors": [],  # last 20 errors
}

@app.get("/api/stats")
async def runtime_stats():
    return {"uptime_seconds": ..., **_stats}
Live stats bar on the Interview tab auto-refreshes every 30s. Stats are also injected into the Azoni AI chatbot context so it can answer questions like “How many images have you processed?” with real numbers.

Security Hardening IMPLEMENTED

app/__init__.py, app/auth.py

# CORS — restrict to known origins only
app.add_middleware(CORSMiddleware,
    allow_origins=["https://autoenhance.onrender.com", ...],
    allow_methods=["GET", "POST", "HEAD", "OPTIONS"])

# Security headers on every response
response.headers["X-Content-Type-Options"] = "nosniff"
response.headers["X-Frame-Options"] = "DENY"
response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"

# Admin token gates credit-consuming endpoints
_admin_token = os.getenv("ADMIN_TOKEN") or secrets.token_urlsafe(32)
def _require_admin(request):
    token = request.cookies.get("_at") or request.headers.get("X-Admin-Token")
    if not hmac.compare_digest(token, _admin_token): ...

# Upstream errors logged, not leaked to clients
logger.error("Upstream error: %d %s", status, text)
raise HTTPException(502, "Failed to retrieve order")
CORS blocks cross-origin abuse. Security headers prevent clickjacking, MIME sniffing, and referrer leaks. Admin token is set as an httponly cookie — not visible in page source or accessible to JavaScript. Timing-safe comparison via hmac.compare_digest protects /api/create-order, /api/create-sample-order, and /sentry-debug from unauthorized use. Upstream API error bodies are logged server-side but never exposed to clients.

Response Caching IMPLEMENTED

app/state.py — ZIP-level cache by (order_id, format, quality, preview, dev_mode)

# state.py — TTL dict, bypassed automatically during pytest
_zip_cache: dict[tuple, dict] = {}

def get_cached_zip(key: tuple) -> dict | None:
    entry = _zip_cache.get(key)
    if entry and time.time() - entry["cached_at"] < ZIP_CACHE_TTL_SECONDS:
        return entry
    _zip_cache.pop(key, None)
    return None

# batch.py — check before downloading, store after
cache_key = (order_id, image_format, quality, preview, dev_mode)
cached = get_cached_zip(cache_key)
if cached:
    return StreamingResponse(io.BytesIO(cached["zip_bytes"]), ...)
zip_bytes, filename, headers = await _run_batch(...)
set_cached_zip(cache_key, zip_bytes, filename, headers)
ZIP responses are cached in-process for 1 hour. A repeat request for the same order + params is served instantly without touching the upstream API. Cache is disabled during pytest (via PYTEST_CURRENT_TEST) to prevent test cross-contamination. In production, swap for Redis with a TTL.

Async Job Pattern for Large Orders IMPLEMENTED

app/routes/jobs.py

# 1. POST /orders/{order_id}/jobs — returns 202 immediately
@router.post("/orders/{order_id}/jobs", status_code=202)
async def create_batch_job(order_id, background_tasks, ...):
    job_id = str(uuid.uuid4())
    set_job(job_id, {"status": "processing", ...})
    background_tasks.add_task(_process_job, job_id, order_id, ...)
    return {"job_id": job_id}

# 2. GET /jobs/{job_id} — poll until "complete" or "error"
@router.get("/jobs/{job_id}")
async def get_job_status(job_id):
    job = get_job(job_id)  # returns None if expired (1h TTL)
    return {"job_id": job_id, "status": job["status"], "error": job.get("error")}

# 3. GET /jobs/{job_id}/download — fetch the ZIP
@router.get("/jobs/{job_id}/download")
async def download_job_result(job_id):
    return StreamingResponse(io.BytesIO(job["zip_bytes"]), media_type="application/zip", ...)
Sidesteps Render's 30s response timeout for large orders. The client gets a job ID in milliseconds and polls until the background download completes. Jobs expire after 1 hour. The background task reuses the same _run_batch() helper as the synchronous endpoint — no duplicated logic.

Circuit Breaker Pattern NEXT STEP

How it would integrate into batch.py

class CircuitBreaker:
    def __init__(self, failure_threshold=5, reset_timeout=60):
        self.failures = 0
        self.threshold = failure_threshold
        self.reset_timeout = reset_timeout
        self.state = "closed"  # closed | open | half-open
        self.last_failure_time = 0

    def record_failure(self):
        self.failures += 1
        if self.failures >= self.threshold:
            self.state = "open"
            self.last_failure_time = time.time()

    def allow_request(self) -> bool:
        if self.state == "closed": return True
        if time.time() - self.last_failure_time > self.reset_timeout:
            self.state = "half-open"
            return True
        return False

# In the endpoint:
if not circuit_breaker.allow_request():
    raise HTTPException(503, detail="Autoenhance API temporarily unavailable",
        headers={"Retry-After": "60"})
Prevents cascading failures. After 5 consecutive errors, returns 503 immediately for 60s instead of letting every request time out.

Rate Limiting Our Endpoint NEXT STEP

# Using slowapi (built on limits library)
from slowapi import Limiter
from slowapi.util import get_remote_address

limiter = Limiter(key_func=get_remote_address)

@app.get("/orders/{order_id}/images")
@limiter.limit("10/minute")
async def batch_download_order_images(request: Request, ...):
    ...
Each batch call fans out to N upstream requests. Without throttling, one caller could exhaust API quotas. 10 req/min per IP is a reasonable default.

Azoni AI CHATBOT

Ask me anything about this batch endpoint, the design decisions, or Charlton's background.

Hi! I'm Azoni AI, Charlton's portfolio chatbot. I know all about this batch image downloader — the design decisions, error handling, tech stack, and more. Ask me anything!