Skip to main content

Python SDK

Install from PyPI (when published) or the Gateway repo.

pip install asg-gateway
# or from the asg-gateway repository:
# cd sdk/python && pip install -e .

Quick example

import os
import time
from asg_gateway import ASGGatewayClient, ASGGatewayError, ASGGatewayHTTPError

client = ASGGatewayClient(
api_key=os.environ["ASG_API_KEY"],
base_url="https://api.asgrefinery.io",
)
resp = client.submit_task(
task_type="image_classification",
data_url="https://example.com/image.jpg",
label_spec={"question": "Animal?", "options": ["cat", "dog"]},
)
task_id = resp["task_id"]
result = client.poll_result(task_id, timeout=300, interval=5)
print(result["consensus_label"], result["settlement"]["bsv_txid"])

ASGGatewayClient

Constructor

ParamDefaultDescription
api_keyBearer token (sk_sandbox_... / sk_live_...)
base_urlhttps://api.asgrefinery.ioGateway origin (no trailing slash)

Uses a shared requests.Session with Authorization header set.

submit_task(...)

ParamTypeDescription
task_typestrimage_classification, etc.
data_urlstrInput URL
label_specdictquestion + options
consensus_thresholdintdefault 3
callback_urlstr | Noneoptional
idempotency_keystr | Noneoptional

Returns: dict — e.g. task_id, status, created_at.

submit_batch(tasks, callback_url=None)

ParamTypeDescription
taskslist[dict]List of task bodies (same shape as REST)
callback_urlstr | NoneApplied to each task

Returns: dict with batch_id, task_count, status.

get_task(task_id: str) -> dict

Task summary (status, task_type, data_url, …).

get_result(task_id: str) -> dict

Note: Raises ASGGatewayHTTPError on 404 / 500. For 202 (still labeling), raw requests returns 202 — use poll_result for blocking behavior.

poll_result(task_id, timeout=300, interval=5) -> dict

Polls until 200 or timeout. Sleeps interval seconds between attempts.

register_webhook(callback_url, secret, events=None) -> dict

Returns: webhook_id, callback_url, events, active.

list_webhooks() -> list[dict]

Returns: list of {webhook_id, callback_url, events, active} (no secrets).

delete_webhook(webhook_id: str) -> None

Raises on non-2xx.

Error handling

ExceptionWhen
ASGGatewayHTTPErrorAny HTTP 4xx/5xx from requests (includes 401, 403, 429)
ASGGatewayErrorBase class; also raised when poll_result times out
from asg_gateway import ASGGatewayHTTPError

try:
client.submit_task(...)
except ASGGatewayHTTPError as e:
if e.status_code == 429:
# rate limited — backoff
...
elif e.status_code == 403:
# quota
...
info

ASGRateLimitError / ASGAuthError are not separate classes in the current SDK — branch on status_code as above.

Batch jobs (manifest)

The published SDK may not yet wrap POST /v1/jobs; use requests directly for manifest-based batch runs.

S3 manifest — pass AWS keys in credentials:

import requests

r = requests.post(
f"{base_url}/v1/jobs",
headers={"Authorization": f"Bearer {api_key}"},
json={
"manifest_url": "s3://my-bucket/batch/manifest.jsonl",
"label_spec": {"question": "Animal?", "options": ["cat", "dog"]},
"credentials": {
"access_key_id": "...",
"secret_access_key": "...",
"region": "us-west-2",
},
},
timeout=60,
)
r.raise_for_status()

GCS manifest — use gs:// and a service account JSON string:

r = requests.post(
f"{base_url}/v1/jobs",
headers={"Authorization": f"Bearer {api_key}"},
json={
"manifest_url": "gs://my-bucket/batch/manifest.jsonl",
"label_spec": {"question": "Animal?", "options": ["cat", "dog"]},
"credentials": {"service_account_json": open("sa.json").read()},
},
timeout=60,
)

Credentials are encrypted at rest on the server and are never returned on GET /v1/jobs/{id}.

Configuration tips

  • base_url — dev: http://localhost:8081.
  • timeout — pass per-call by patching Session or wrapping requests if you need non-default timeouts.
  • Retries — implement exponential backoff around submit_task / get_result for 5xx and 429.