Python SDK
Install from PyPI (when published) or the Gateway repo.
pip install asg-gateway
# or from the asg-gateway repository:
# cd sdk/python && pip install -e .
Quick example
import os
import time
from asg_gateway import ASGGatewayClient, ASGGatewayError, ASGGatewayHTTPError
client = ASGGatewayClient(
api_key=os.environ["ASG_API_KEY"],
base_url="https://api.asgrefinery.io",
)
resp = client.submit_task(
task_type="image_classification",
data_url="https://example.com/image.jpg",
label_spec={"question": "Animal?", "options": ["cat", "dog"]},
)
task_id = resp["task_id"]
result = client.poll_result(task_id, timeout=300, interval=5)
print(result["consensus_label"], result["settlement"]["bsv_txid"])
ASGGatewayClient
Constructor
| Param | Default | Description |
|---|---|---|
api_key | — | Bearer token (sk_sandbox_... / sk_live_...) |
base_url | https://api.asgrefinery.io | Gateway origin (no trailing slash) |
Uses a shared requests.Session with Authorization header set.
submit_task(...)
| Param | Type | Description |
|---|---|---|
task_type | str | image_classification, etc. |
data_url | str | Input URL |
label_spec | dict | question + options |
consensus_threshold | int | default 3 |
callback_url | str | None | optional |
idempotency_key | str | None | optional |
Returns: dict — e.g. task_id, status, created_at.
submit_batch(tasks, callback_url=None)
| Param | Type | Description |
|---|---|---|
tasks | list[dict] | List of task bodies (same shape as REST) |
callback_url | str | None | Applied to each task |
Returns: dict with batch_id, task_count, status.
get_task(task_id: str) -> dict
Task summary (status, task_type, data_url, …).
get_result(task_id: str) -> dict
Note: Raises ASGGatewayHTTPError on 404 / 500. For 202 (still labeling), raw requests returns 202 — use poll_result for blocking behavior.
poll_result(task_id, timeout=300, interval=5) -> dict
Polls until 200 or timeout. Sleeps interval seconds between attempts.
register_webhook(callback_url, secret, events=None) -> dict
Returns: webhook_id, callback_url, events, active.
list_webhooks() -> list[dict]
Returns: list of {webhook_id, callback_url, events, active} (no secrets).
delete_webhook(webhook_id: str) -> None
Raises on non-2xx.
Error handling
| Exception | When |
|---|---|
ASGGatewayHTTPError | Any HTTP 4xx/5xx from requests (includes 401, 403, 429) |
ASGGatewayError | Base class; also raised when poll_result times out |
from asg_gateway import ASGGatewayHTTPError
try:
client.submit_task(...)
except ASGGatewayHTTPError as e:
if e.status_code == 429:
# rate limited — backoff
...
elif e.status_code == 403:
# quota
...
ASGRateLimitError / ASGAuthError are not separate classes in the current SDK — branch on status_code as above.
Batch jobs (manifest)
The published SDK may not yet wrap POST /v1/jobs; use requests directly for manifest-based batch runs.
S3 manifest — pass AWS keys in credentials:
import requests
r = requests.post(
f"{base_url}/v1/jobs",
headers={"Authorization": f"Bearer {api_key}"},
json={
"manifest_url": "s3://my-bucket/batch/manifest.jsonl",
"label_spec": {"question": "Animal?", "options": ["cat", "dog"]},
"credentials": {
"access_key_id": "...",
"secret_access_key": "...",
"region": "us-west-2",
},
},
timeout=60,
)
r.raise_for_status()
GCS manifest — use gs:// and a service account JSON string:
r = requests.post(
f"{base_url}/v1/jobs",
headers={"Authorization": f"Bearer {api_key}"},
json={
"manifest_url": "gs://my-bucket/batch/manifest.jsonl",
"label_spec": {"question": "Animal?", "options": ["cat", "dog"]},
"credentials": {"service_account_json": open("sa.json").read()},
},
timeout=60,
)
Credentials are encrypted at rest on the server and are never returned on GET /v1/jobs/{id}.
Configuration tips
base_url— dev:http://localhost:8081.timeout— pass per-call by patchingSessionor wrappingrequestsif you need non-default timeouts.- Retries — implement exponential backoff around
submit_task/get_resultfor 5xx and 429.