Skip to content

Python SDK

A production-ready client using requests. Handles authentication, rate-limit headers, automatic retries with exponential back-off, and typed error responses.

"""ghostflow.py — GhostFlow API client."""
from __future__ import annotations
import os
import time
from dataclasses import dataclass
from typing import Any
import requests
@dataclass
class RateLimitInfo:
limit: int
remaining: int
reset: int
class GhostFlowError(Exception):
"""Raised when the API returns a non-2xx response."""
def __init__(self, code: str, message: str, status: int) -> None:
super().__init__(message)
self.code = code
self.status = status
class GhostFlowClient:
"""Lightweight wrapper around the GhostFlow REST API."""
def __init__(
self,
api_key: str | None = None,
base_url: str = "https://devcore.getghostflow.io/api/v1",
max_retries: int = 2,
timeout: float = 30.0,
) -> None:
self.api_key = api_key or os.environ["GF_API_KEY"]
self.base_url = base_url.rstrip("/")
self.max_retries = max_retries
self.timeout = timeout
self._session = requests.Session()
self._session.headers.update(
{
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
}
)
# ── Core request method ───────────────────────────────────
def request(
self,
method: str,
path: str,
**kwargs: Any,
) -> tuple[Any, RateLimitInfo]:
"""Send an API request with automatic retry on 429 / 5xx."""
last_error: GhostFlowError | None = None
for attempt in range(self.max_retries + 1):
resp = self._session.request(
method,
f"{self.base_url}{path}",
timeout=self.timeout,
**kwargs,
)
rate_limit = RateLimitInfo(
limit=int(resp.headers.get("x-ratelimit-limit", 0)),
remaining=int(resp.headers.get("x-ratelimit-remaining", 0)),
reset=int(resp.headers.get("x-ratelimit-reset", 0)),
)
if resp.ok:
return resp.json(), rate_limit
body = resp.json() if resp.headers.get("content-type", "").startswith("application/json") else {}
last_error = GhostFlowError(
code=body.get("code", "UNKNOWN"),
message=body.get("message", resp.reason or ""),
status=resp.status_code,
)
if resp.status_code in (429, 500, 502, 503) and attempt < self.max_retries:
retry_after = float(resp.headers.get("retry-after", 1))
time.sleep(retry_after)
continue
break
raise last_error # type: ignore[misc]
# ── Convenience shortcuts ─────────────────────────────────
def get(self, path: str, **params: Any) -> tuple[Any, RateLimitInfo]:
return self.request("GET", path, params=params)
def post(self, path: str, json: Any = None) -> tuple[Any, RateLimitInfo]:
return self.request("POST", path, json=json)
def put(self, path: str, json: Any = None) -> tuple[Any, RateLimitInfo]:
return self.request("PUT", path, json=json)
def delete(self, path: str) -> tuple[Any, RateLimitInfo]:
return self.request("DELETE", path)
from ghostflow import GhostFlowClient, GhostFlowError
gf = GhostFlowClient() # picks up GF_API_KEY from env
# List campaigns
campaigns, rl = gf.get("/campaigns")
print(f"Found {len(campaigns)} campaigns, {rl.remaining} requests left")
# Create a campaign
campaign, _ = gf.post("/campaigns", json={
"name": "Summer Promo",
"url": "https://example.com/promo",
})
print(f"Created campaign {campaign['id']}")
# Dashboard stats
stats, _ = gf.get("/reports/dashboard", **{"from": "2025-01-01", "to": "2025-01-31"})
try:
gf.get("/campaigns/non-existent-id")
except GhostFlowError as e:
if e.code == "NOT_FOUND":
print("Campaign does not exist")
elif e.code == "AUTH_INVALID_API_KEY":
print("Check your API key")
elif e.code == "RATE_LIMITED":
print("Slow down — retry after backoff")
else:
print(f"API error {e.status}: {e}")