The Amp Admin Client provides Python bindings for the Amp Admin API, enabling you to register datasets, deploy jobs, and manage your Amp infrastructure programmatically.
- Installation
- Quick Start
- Core Concepts
- Client Configuration
- Dataset Operations
- Job Management
- Schema Validation
- Manifest Generation
- Deployment Workflows
- Revision Management
- Error Handling
The admin client is included in the amp package:
pip install ampOr with uv:
uv add ampfrom amp import Client
# Initialize client with both query and admin capabilities
client = Client(
query_url="grpc://localhost:1602", # Flight SQL endpoint
admin_url="http://localhost:8080", # Admin API endpoint
auth=True # Load auth from ~/.amp/cache
)# Define your dataset manifest
manifest = {
'kind': 'manifest',
'dependencies': {
'eth': '_/eth_firehose@1.0.0'
},
'tables': {
'blocks': {
'input': {'sql': 'SELECT * FROM eth.blocks'},
'schema': {'arrow': {'fields': [...]}},
'network': 'mainnet'
}
},
'functions': {}
}
# Register the dataset
client.datasets.register(
namespace='_',
name='my_dataset',
version='1.0.0',
manifest=manifest
)# Deploy the dataset
deploy_response = client.datasets.deploy(
namespace='_',
name='my_dataset',
revision='1.0.0',
parallelism=4,
end_block='latest'
)
# Wait for completion
job = client.jobs.wait_for_completion(
deploy_response.job_id,
poll_interval=5,
timeout=3600
)
print(f"Job completed with status: {job.status}")A manifest is a JSON document that defines a dataset's structure, dependencies, tables, and functions. Manifests include:
- dependencies: References to other datasets this dataset depends on
- tables: SQL transformations and output schemas
- functions: Custom Python/SQL functions (optional)
- network: Blockchain network identifier
Datasets are versioned using semantic versioning (e.g., 1.0.0). Each version has:
- A unique manifest
- Immutable registration
- Independent deployment history
Special tags provide convenient references:
- latest: Points to the highest semantic version
- dev: Points to a development manifest hash
Jobs represent long-running operations like dataset deployments. Jobs have states:
- Scheduled: Queued for execution
- Running: Currently executing
- Completed: Successfully finished
- Failed: Encountered an error
- Stopped: Stopped by user
Revisions represent physical table storage locations. Each table in a dataset can have multiple revisions, with one active at a time. Revisions track:
- Storage path (e.g., S3 location)
- Active/inactive status
- Writer job association
- File metadata (Parquet files with block ranges)
The Client class provides both query and admin functionality:
from amp import Client
# Full configuration with auth file
client = Client(
query_url="grpc://localhost:1602",
admin_url="http://localhost:8080",
auth=True
)
# Query operations (Flight SQL)
df = client.sql("SELECT * FROM eth.blocks LIMIT 10").to_pandas()
# Admin operations (HTTP API)
datasets = client.datasets.list_all()If you only need admin functionality:
from amp.admin import AdminClient
admin = AdminClient(
base_url="http://localhost:8080",
auth=True
)
# Access admin operations
admin.datasets.list_all()
admin.jobs.get(123)
admin.revisions.list(active=True)The client supports three authentication methods (highest to lowest priority):
- Explicit token: Pass
auth_token="your-token"directly - Environment variable: Set
AMP_AUTH_TOKENin your environment - Auth file: Pass
auth=Trueto load from~/.amp/cache/amp_cli_auth(auto-refreshing, shared with TS CLI)
# Option 1: Explicit token
client = Client(query_url="...", admin_url="...", auth_token="your-token")
# Option 2: Environment variable
# export AMP_AUTH_TOKEN="eyJhbGci..."
client = Client(query_url="...", admin_url="...")
# Option 3: Auth file (recommended)
client = Client(query_url="...", admin_url="...", auth=True)Note: You cannot combine
auth=Truewithauth_token- choose one method.
The legacy url parameter still works for Flight SQL:
# This still works
client = Client(url="grpc://localhost:1602")
client.sql("SELECT * FROM eth.blocks")# Simple registration
client.datasets.register(
namespace='_',
name='eth_blocks',
version='1.0.0',
manifest=manifest
)# List all datasets
for dataset in client.datasets.list_all():
print(f"{dataset.namespace}/{dataset.name}@{dataset.latest_version}")
print(f" Available versions: {dataset.versions}")# Pretty-print dataset structure
client.datasets.inspect('_', 'eth_firehose')
# Output:
# Dataset: _/eth_firehose@latest
# Kind: evm-rpc
#
# blocks (21 columns)
# block_num UInt64 NOT NULL
# timestamp Timestamp NOT NULL
# hash FixedSizeBinary(32) NOT NULL
# ...
# Get structured schema data programmatically
schema = client.datasets.describe('_', 'eth_firehose')
for table_name, columns in schema.items():
for col in columns:
print(f"{col['name']}: {col['type']} ({'NULL' if col['nullable'] else 'NOT NULL'})")# Get all versions of a dataset
versions = client.datasets.get_versions('_', 'eth_blocks')
print(f"Latest: {versions.special_tags.latest}")
print(f"Dev: {versions.special_tags.dev}")
for version_info in versions:
print(f" {version_info.version} - {version_info.manifest_hash}")# Get specific version info
info = client.datasets.get_version('_', 'eth_blocks', '1.0.0')
print(f"Kind: {info.kind}")
print(f"Manifest hash: {info.manifest_hash}")
print(f"Tables: {info.tables}")
print(f"Start block: {info.start_block}")# Retrieve the manifest for a version
manifest = client.datasets.get_manifest('_', 'eth_blocks', '1.0.0')
print(f"Tables: {list(manifest['tables'].keys())}")
print(f"Dependencies: {manifest['dependencies']}")# Deploy with all options
deploy_response = client.datasets.deploy(
namespace='_',
name='eth_blocks',
revision='1.0.0',
parallelism=8, # Number of parallel workers (raw datasets only)
end_block='latest', # Stop at latest block (vs continuous)
verify=True, # Cryptographic verification (EVM only)
worker_id='worker-eth-*', # Worker selector (exact ID or glob)
retry_strategy={ # Retry behavior
'strategy': 'bounded',
'max_attempts': 3
}
)
print(f"Started job: {deploy_response.job_id}")The end_block parameter controls when processing stops:
None(default): Continuous mode - never stops, processes new blocks as they arrive"latest": Stop at the latest block when the job starts"1000000": Stop at absolute block number 1,000,000"-100": Stop at 100 blocks before the latest block
# Get jobs for a specific dataset version
for job in client.datasets.list_jobs('_', 'eth_firehose', 'latest'):
print(f'{job.id}: {job.status} (node: {job.node_id})')# Restore all tables for a dataset version
result = client.datasets.restore('_', 'eth_firehose', 'latest')
print(f'Restored {result.total_files} files')
# Restore a specific table
client.datasets.restore_table('_', 'eth_firehose', 'latest', 'blocks')
# Restore with specific location
client.datasets.restore_table('_', 'eth_firehose', 'latest', 'blocks', location_id=42)# Delete all versions of a dataset
client.datasets.delete('_', 'old_dataset')
# Delete a specific version
client.datasets.delete_version('_', 'my_dataset', '1.0.0')# Get job by ID
job = client.jobs.get(123)
print(f"Status: {job.status}")
print(f"Node: {job.node_id}")
print(f"Created: {job.created_at}")
print(f"Updated: {job.updated_at}")# List jobs with pagination
response = client.jobs.list(limit=50)
for job in response:
print(f"Job {job.id}: {job.status}")
# Continue pagination if needed
if response.next_cursor:
next_page = client.jobs.list(
limit=50,
last_job_id=response.next_cursor
)from amp.admin.models import CreateJobRequest1, CreateJobRequest2, Kind, Kind1
# Create a garbage collection job
gc_request = CreateJobRequest1(kind=Kind.gc)
response = client.jobs.create(gc_request)
print(f"GC job: {response.job_id}")
# Create a raw materialization job with options
raw_request = CreateJobRequest2(
kind=Kind1.materialize_raw,
worker_id='worker-node-0',
retry_strategy={'strategy': 'bounded', 'max_attempts': 3}
)
response = client.jobs.create(raw_request)# Block until job completes or times out
try:
final_job = client.jobs.wait_for_completion(
job_id=123,
poll_interval=5, # Check every 5 seconds
timeout=3600 # Give up after 1 hour (None = infinite)
)
if final_job.status == 'Completed':
print("Job succeeded!")
elif final_job.status == 'Failed':
print("Job failed!")
except TimeoutError as e:
print(f"Job did not complete in time: {e}")# Get per-table progress
progress = client.jobs.get_progress(123)
print(f"Job {progress.job_id}: {progress.job_status}")
for table, info in progress.tables.items():
print(f" {table}:")
print(f" Blocks: {info.start_block} -> {info.current_block}")
print(f" Files: {info.files_count}")
print(f" Size: {info.total_size_bytes} bytes")
# Check for error details
if progress.detail:
print(f"Error detail: {progress.detail}")# List all events for a job
for event in client.jobs.get_events(123):
print(f"{event.created_at}: {event.event_type} (node: {event.node_id})")
# Get detailed info for a specific event
event_detail = client.jobs.get_event(123, event_id=1)
print(f"Type: {event_detail.event_type}")
print(f"Detail: {event_detail.detail}")# Stop a running job
client.jobs.stop(123)# Delete a single job (must be in terminal state)
client.jobs.delete(123)
# Delete all completed jobs
client.jobs.delete_by_status('Completed')
# Delete all terminal jobs (completed, stopped, and errored)
client.jobs.delete_by_status('Terminal')
# Other status filters: 'Stopped', 'Error'The schema client validates SQL queries and returns their output schemas without execution:
# Analyze a query with dependencies
response = client.schema.analyze(
dependencies={'eth': '_/eth_firehose@0.0.0'},
tables={
'blocks': 'SELECT block_num, hash, timestamp FROM eth.blocks WHERE block_num > 1000000'
}
)
# Inspect the schemas (one per table)
for table_name, schema in response.schemas.items():
print(f"{table_name}: {schema}")Note: The
dependenciesparameter requires exact version references or manifest hashes. Symbolic references like"latest"or"dev"are not allowed.
This is particularly useful for:
- Validating queries before registration
- Understanding output structure
- Generating correct Arrow schemas for manifests
The QueryBuilder provides a fluent API for generating manifests from SQL queries:
# Build a query
query = client.sql("SELECT block_num, hash FROM eth.blocks")
# Add dependencies
query = query.with_dependency('eth', '_/eth_firehose@1.0.0')
# Generate manifest
manifest = query.to_manifest(
table_name='blocks',
network='mainnet'
)The most powerful pattern combines query building, manifest generation, registration, and deployment:
# Build, register, and deploy in one chain
job = (
client.sql("SELECT block_num, hash FROM eth.blocks")
.with_dependency('eth', '_/eth_firehose@1.0.0')
.register_as(
namespace='_',
name='eth_blocks_simple',
version='1.0.0',
table_name='blocks',
network='mainnet'
)
.deploy(
end_block='latest',
parallelism=4,
wait=True # Block until completion
)
)
print(f"Deployment completed: {job.status}")manifest = (
client.sql("""
SELECT
t.token_address,
t.amount,
m.name,
m.symbol
FROM erc20_transfers t
JOIN token_metadata m ON t.token_address = m.address
""")
.with_dependency('erc20_transfers', '_/erc20_transfers@1.0.0')
.with_dependency('token_metadata', '_/token_metadata@1.0.0')
.to_manifest('enriched_transfers', 'mainnet')
)# 1. Develop query locally
query = client.sql("""
SELECT
block_num,
COUNT(*) as tx_count
FROM eth.transactions
GROUP BY block_num
""")
# Test the query
df = query.to_pandas()
print(df.head())
# 2. Register as dataset
query = query.with_dependency('eth', '_/eth_firehose@1.0.0')
client.datasets.register(
namespace='_',
name='tx_counts',
version='0.1.0',
manifest=query.to_manifest('tx_counts', 'mainnet')
)
# 3. Deploy to limited range for testing
deploy_resp = client.datasets.deploy(
namespace='_',
name='tx_counts',
revision='0.1.0',
end_block='10000', # Test on first 10k blocks
parallelism=2
)
# 4. Monitor progress
progress = client.jobs.get_progress(deploy_resp.job_id)
for table, info in progress.tables.items():
print(f"{table}: block {info.current_block}")
# 5. Wait for completion
job = client.jobs.wait_for_completion(deploy_resp.job_id, timeout=600)
if job.status == 'Completed':
print("Test deployment successful!")
# 6. Deploy full version
prod_deploy = client.datasets.deploy(
namespace='_',
name='tx_counts',
revision='0.1.0',
end_block='latest',
parallelism=8
)# Register production version
context = (
client.sql("SELECT * FROM processed_data")
.with_dependency('raw', '_/raw_data@2.0.0')
.register_as('_', 'processed_data', '2.0.0', 'data', 'mainnet')
)
# Deploy without waiting
job = context.deploy(
end_block='latest',
parallelism=16,
wait=False
)
print(f"Started production deployment: job {job.id}")
# Monitor progress separately
import time
while True:
progress = client.jobs.get_progress(job.id)
if progress.job_status in ('Completed', 'Failed', 'Stopped'):
print(f"Job finished: {progress.job_status}")
break
for table, info in progress.tables.items():
print(f" {table}: block {info.current_block}, {info.files_count} files")
time.sleep(30)# Deploy continuous processing (no end_block)
deploy_resp = client.datasets.deploy(
namespace='_',
name='realtime_data',
revision='1.0.0',
parallelism=4
# end_block=None means continuous
)
# Job will run indefinitely, processing new blocks as they arrive
print(f"Continuous deployment started: {deploy_resp.job_id}")
# Stop later when needed
client.jobs.stop(deploy_resp.job_id)Revisions manage physical table storage locations. This is typically used for advanced operations like manually managing table data.
Note: Revision operations are only available through the
AdminClientdirectly, not the unifiedClient.
from amp.admin import AdminClient
admin = AdminClient('http://localhost:8080', auth=True)# List all active revisions
revisions = admin.revisions.list(active=True)
for rev in revisions:
print(f"ID: {rev.id}")
print(f" Path: {rev.path}")
print(f" Active: {rev.active}")
print(f" Writer job: {rev.writer}")
if rev.metadata.dataset_name:
print(f" Dataset: {rev.metadata.dataset_namespace}/{rev.metadata.dataset_name}")
print(f" Table: {rev.metadata.table_name}")
# Pagination
page1 = admin.revisions.list(limit=50)
if page1:
page2 = admin.revisions.list(limit=50, last_id=page1[-1].id)rev = admin.revisions.get(42)
print(f"Path: {rev.path}")
print(f"Active: {rev.active}")
print(f"Writer: {rev.writer}")result = admin.revisions.create(
dataset='_/eth_firehose@0.0.0',
table_name='blocks',
storage_path='s3://my-bucket/revisions/blocks-v2'
)
print(f"Created revision with location ID: {result.location_id}")# Activate a revision (makes it the live data source for queries)
admin.revisions.activate(42, '_/eth_firehose@0.0.0', 'blocks')
# Deactivate the current revision for a table
admin.revisions.deactivate('_/eth_firehose@0.0.0', 'blocks')# Prune non-canonical files (schedule for garbage collection)
result = admin.revisions.prune(42)
print(f"Scheduled {result.files_scheduled} files for GC (delay: {result.gc_delay_secs}s)")
# Prune only files before a specific block
result = admin.revisions.prune(42, before_block=1000000)
# Truncate (delete all files immediately)
result = admin.revisions.truncate(42)
print(f"Deleted {result.files_deleted} files")# Restore files for a revision
result = admin.revisions.restore(42)
print(f"Restored {result.total_files} files")admin.revisions.delete(42)The admin client provides typed exceptions for different error scenarios:
from amp.admin.errors import (
AdminAPIError, # Base exception
DatasetNotFoundError,
InvalidManifestError,
JobNotFoundError,
DependencyValidationError,
SchedulerError,
RevisionNotFoundError,
InternalServerError,
)try:
client.datasets.register('_', 'my_dataset', '1.0.0', manifest)
except InvalidManifestError as e:
print(f"Manifest validation failed: {e.message}")
print(f"Error code: {e.error_code}")
except DependencyValidationError as e:
print(f"Dependency issue: {e.message}")
except AdminAPIError as e:
print(f"API error: {e.error_code} - {e.message}")
print(f"HTTP status: {e.status_code}")def robust_deploy(client, namespace, name, version, **deploy_options):
"""Deploy with comprehensive error handling."""
try:
# Check if dataset exists
try:
version_info = client.datasets.get_version(namespace, name, version)
print(f"Found dataset: {version_info.kind}, hash: {version_info.manifest_hash}")
except DatasetNotFoundError:
raise ValueError(f"Dataset {namespace}/{name}@{version} not registered")
# Deploy
deploy_resp = client.datasets.deploy(
namespace, name, version, **deploy_options
)
# Wait for completion with progress monitoring
import time
while True:
progress = client.jobs.get_progress(deploy_resp.job_id)
if progress.job_status in ('Completed', 'Failed', 'Stopped'):
job = client.jobs.get(deploy_resp.job_id)
if job.status == 'Completed':
print(f"Deployment successful: job {job.id}")
return job
else:
raise RuntimeError(f"Job failed with status: {job.status}")
for table, info in progress.tables.items():
print(f" {table}: block {info.current_block}")
time.sleep(5)
except AdminAPIError as e:
print(f"API error during deployment: {e.message}")
raise
# Usage
job = robust_deploy(
client,
namespace='_',
name='my_dataset',
version='1.0.0',
parallelism=4,
end_block='latest'
)# Recommended: shared with TS CLI, auto-refreshing
client = Client(query_url=..., admin_url=..., auth=True)with Client(query_url=..., admin_url=..., auth=True) as client:
client.datasets.register(...)
# Connection automatically closed# Validate before registration
response = client.schema.analyze(
dependencies={'eth': '_/eth_firehose@0.0.0'},
tables={'my_table': 'SELECT block_num, hash FROM eth.blocks'}
)
print(f"Columns: {response.schemas}")# Use semantic versioning
# - Major: Breaking schema changes
# - Minor: Backward-compatible additions
# - Patch: Bug fixes
client.datasets.register('_', 'my_dataset', '1.0.0', manifest_v1)
client.datasets.register('_', 'my_dataset', '1.1.0', manifest_v1_1) # Added columns
client.datasets.register('_', 'my_dataset', '2.0.0', manifest_v2) # Breaking change# Use get_progress for detailed monitoring
deploy_resp = client.datasets.deploy(...)
progress = client.jobs.get_progress(deploy_resp.job_id)
for table, info in progress.tables.items():
print(f"{table}: block {info.current_block}, {info.files_count} files")# Always specify full dependency references with exact versions
query = (
client.sql("SELECT * FROM base.data")
.with_dependency('base', '_/base_dataset@1.0.0') # Include version!
)
# Not: .with_dependency('base', 'base_dataset') # Missing namespace/version- See API Reference for complete API documentation
- Check examples/admin/ for more code samples