-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy path03_translation_service.py
More file actions
160 lines (127 loc) · 4.25 KB
/
03_translation_service.py
File metadata and controls
160 lines (127 loc) · 4.25 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
#!/usr/bin/env python3
"""
Basic API Example 03: Translation Service
A real-world service example with structured input/output and progress reporting.
This example demonstrates:
- Structured input validation
- Progress callbacks during processing
- Timeout configuration
- Rich result objects
Run: python basic/03_translation_service.py
"""
import asyncio
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
from src.utils.helpers import clear_mock_state, format_usdc
# Simulated translations (in production, use a real API)
TRANSLATIONS = {
"de": {
"Hello": "Hallo",
"World": "Welt",
"AGIRAILS": "AGIRAILS",
"is": "ist",
"awesome": "fantastisch",
},
"es": {
"Hello": "Hola",
"World": "Mundo",
"AGIRAILS": "AGIRAILS",
"is": "es",
"awesome": "increíble",
},
"fr": {
"Hello": "Bonjour",
"World": "Monde",
"AGIRAILS": "AGIRAILS",
"is": "est",
"awesome": "génial",
},
}
async def main() -> None:
print("AGIRAILS Basic API - Translation Service\n")
clear_mock_state()
from agirails import ACTPClient
from agirails.level0 import (
provide,
request,
set_provider_client,
start_provider,
stop_provider,
)
REQUESTER = "0x1111111111111111111111111111111111111111"
PROVIDER = "0x2222222222222222222222222222222222222222"
# 1. Create translation service
async def translate_handler(data: dict) -> dict:
"""Translate text to target language."""
text = data.get("text", "")
target_lang = data.get("targetLang", "de")
source_lang = data.get("sourceLang", "en")
# Get translation dictionary
translations = TRANSLATIONS.get(target_lang, {})
# Simulate word-by-word translation
words = text.split()
translated_words = []
for i, word in enumerate(words):
# Simulate processing time
await asyncio.sleep(0.1)
# Translate or keep original
translated = translations.get(word, word)
translated_words.append(translated)
translated_text = " ".join(translated_words)
return {
"original": text,
"translated": translated_text,
"sourceLang": source_lang,
"targetLang": target_lang,
"wordCount": len(words),
}
provider = provide(
"translate",
translate_handler,
description="AI-powered translation service",
)
# Set up client and provider
client = await ACTPClient.create(mode="mock", requester_address=REQUESTER)
await client.mint_tokens(PROVIDER, 10_000_000)
set_provider_client(client, address=PROVIDER)
await start_provider()
print(f"Translation provider started: {PROVIDER[:10]}...")
print()
try:
# 2. Test translations to different languages
test_cases = [
{"text": "Hello World", "targetLang": "de"},
{"text": "AGIRAILS is awesome", "targetLang": "es"},
{"text": "Hello AGIRAILS", "targetLang": "fr"},
]
for test in test_cases:
print(f"Translating to {test['targetLang'].upper()}: \"{test['text']}\"")
# Progress callback
def on_progress(status):
if status.progress:
print(f" Progress: {status.progress}%")
result = await request(
"translate",
input=test,
budget=2.00, # $2 per translation
timeout=30000, # 30 second timeout
on_progress=on_progress,
client=client,
provider=PROVIDER,
)
output = result.result
print(f" Result: \"{output['translated']}\"")
print(f" Words: {output['wordCount']}")
print(f" Cost: {format_usdc(result.transaction.amount)}")
print()
# 3. Summary
print("=" * 40)
print("Translation Service Demo Complete!")
print(f"Total translations: {len(test_cases)}")
print("=" * 40)
finally:
await stop_provider()
print("\nDone!")
if __name__ == "__main__":
asyncio.run(main())