diff --git a/flexus_client_kit/ckit_ask_model.py b/flexus_client_kit/ckit_ask_model.py index 1bf99c15..783befa0 100644 --- a/flexus_client_kit/ckit_ask_model.py +++ b/flexus_client_kit/ckit_ask_model.py @@ -188,7 +188,7 @@ async def bot_subchat_create_multiple( variable_values={ "who_is_asking": who_is_asking, "persona_id": persona_id, - "first_question": first_question, + "first_question": [json.dumps(q) for q in first_question], "first_calls": first_calls, "title": title, "fcall_id": fcall_id, diff --git a/flexus_simple_bots/avatar_from_idea_imagine.py b/flexus_simple_bots/avatar_from_idea_imagine.py index cd766ddf..8eb6897a 100644 --- a/flexus_simple_bots/avatar_from_idea_imagine.py +++ b/flexus_simple_bots/avatar_from_idea_imagine.py @@ -1,8 +1,185 @@ -import os, sys, asyncio, base64, io +import os, sys, asyncio, base64, io, json +from pathlib import Path from PIL import Image import xai_sdk -client = xai_sdk.Client() +_default_client = None + + +DEFAULT_MODEL = "grok-imagine-image" +DEFAULT_RESOLUTION = "1k" +_STYLE_BANK_MANIFEST = Path(__file__).parent / "bot_pictures" / "style_bank" / "manifest.json" + + +def create_xai_client(api_key: str | None = None): + if api_key: + return xai_sdk.Client(api_key=api_key) + return xai_sdk.Client() + + +def _get_default_client(): + global _default_client + if _default_client is None: + _default_client = create_xai_client() + return _default_client + + +def _image_to_data_url(image_bytes: bytes, mime: str = "image/png") -> str: + return f"data:{mime};base64,{base64.b64encode(image_bytes).decode('utf-8')}" + + +def style_bank_manifest() -> list[dict]: + if not _STYLE_BANK_MANIFEST.exists(): + return [] + with open(_STYLE_BANK_MANIFEST, "r", encoding="utf-8") as f: + rows = json.load(f) + if not isinstance(rows, list): + raise ValueError(f"Bad style-bank manifest: {_STYLE_BANK_MANIFEST}") + return rows + + +def default_style_bank_files() -> dict[str, bytes]: + root = Path(__file__).parent + files = {} + for row in style_bank_manifest(): + rel = str(row.get("source_path", "")).strip() + target_name = str(row.get("target_name", "")).strip() + if not rel or not target_name: + continue + path = root / rel + if not path.exists(): + continue + files[target_name] = path.read_bytes() + return files + + +async def _sample_image( + xclient, + *, + prompt: str, + image_urls: list[str], + resolution: str = DEFAULT_RESOLUTION, +) -> bytes: + kwargs = { + "prompt": prompt, + "model": DEFAULT_MODEL, + "aspect_ratio": None, + "resolution": resolution, + "image_format": "base64", + } + image_urls = image_urls[:5] + if len(image_urls) == 1: + kwargs["image_url"] = image_urls[0] + else: + kwargs["image_urls"] = image_urls + + def _api_call(): + return xclient.image.sample(**kwargs) + + rsp = await asyncio.to_thread(_api_call) + return rsp.image + + +_FULLSIZE_TARGET = (1024, 1536) +_MAX_IMAGE_BYTES = 250_000 + + +def _encode_webp_within_limit( + im: Image.Image, + quality: int = 100, + max_bytes: int = _MAX_IMAGE_BYTES, + min_quality: int = 40, +) -> bytes: + for q in range(quality, min_quality - 1, -5): + out = io.BytesIO() + im.save(out, "WEBP", quality=q, method=6) + data = out.getvalue() + if len(data) <= max_bytes: + return data + out = io.BytesIO() + im.save(out, "WEBP", quality=min_quality, method=6) + return out.getvalue() + + +def _save_fullsize_webp_bytes( + png_bytes: bytes, + quality: int = 100, + target_size: tuple[int, int] = _FULLSIZE_TARGET, +) -> tuple[bytes, tuple[int, int]]: + tw, th = target_size + with Image.open(io.BytesIO(png_bytes)) as im: + im = make_transparent(im) + iw, ih = im.size + if (iw, ih) != (tw, th): + scale = min(tw / iw, th / ih) + new_w, new_h = int(iw * scale), int(ih * scale) + im = im.resize((new_w, new_h), Image.LANCZOS) + canvas = Image.new("RGBA", (tw, th), (0, 0, 0, 0)) + canvas.paste(im, ((tw - new_w) // 2, (th - new_h) // 2)) + im = canvas + data = _encode_webp_within_limit(im, quality) + size = im.size + return data, size + + +def _save_avatar_256_webp_bytes(avatar_png_bytes: bytes, quality: int = 100) -> tuple[bytes, tuple[int, int]]: + with Image.open(io.BytesIO(avatar_png_bytes)) as im: + im = make_transparent(im) + s = min(im.size) + cx, cy = im.size[0] // 2, im.size[1] // 2 + im = im.crop((cx - s // 2, cy - s // 2, cx + s // 2, cy + s // 2)).resize((256, 256), Image.LANCZOS) + data = _encode_webp_within_limit(im, quality) + size = im.size + return data, size + + +async def generate_avatar_assets_from_idea( + *, + input_image_bytes: bytes, + description: str, + style_reference_images: list[bytes], + api_key: str, + count: int = 5, +) -> list[dict]: + if not description or not description.strip(): + raise ValueError("description is required") + if count < 1 or count > 10: + raise ValueError("count must be in range [1, 10]") + + xclient = create_xai_client(api_key) + refs = [_image_to_data_url(input_image_bytes)] + refs += [_image_to_data_url(x) for x in style_reference_images] + refs = refs[:5] + + fullsize_prompt = ( + f"{description.strip()}. " + "Create a full-size variation of the character on pure solid bright green background (#00FF00)." + ) + avatar_prompt = ( + f"{description.strip()}. " + "Make avatar suitable for small pictures, face much bigger exactly in the center, " + "use a pure solid bright green background (#00FF00)." + ) + + async def _one(i: int): + fullsize_png = await _sample_image(xclient, prompt=fullsize_prompt, image_urls=refs) + fullsize_webp, fullsize_size = _save_fullsize_webp_bytes(fullsize_png) + + avatar_png = await _sample_image( + xclient, + prompt=avatar_prompt, + image_urls=[_image_to_data_url(fullsize_png)], + ) + avatar_webp_256, avatar_size_256 = _save_avatar_256_webp_bytes(avatar_png) + return { + "index": i, + "fullsize_webp": fullsize_webp, + "fullsize_size": fullsize_size, + "avatar_webp_256": avatar_webp_256, + "avatar_size_256": avatar_size_256, + } + + return await asyncio.gather(*[_one(i) for i in range(count)]) def make_transparent(im, fringe_fight: float = 0.30, defringe_radius: int = 4): @@ -64,19 +241,28 @@ async def make_fullsize_variations(input_path: str, base_name: str, out_dir: str async def generate_one(i): def api_call(): - return client.image.sample( + return _get_default_client().image.sample( prompt="Make variations of the charactor on solid bright green background (#00FF00).", - model="grok-imagine-image", + model=DEFAULT_MODEL, image_url=image_url, aspect_ratio=None, # does not work for image edit - resolution="1k", + resolution=DEFAULT_RESOLUTION, image_format="base64" ) rsp = await asyncio.to_thread(api_call) png_bytes = rsp.image + tw, th = _FULLSIZE_TARGET with Image.open(io.BytesIO(png_bytes)) as im: im = make_transparent(im) + iw, ih = im.size + if (iw, ih) != (tw, th): + scale = min(tw / iw, th / ih) + new_w, new_h = int(iw * scale), int(ih * scale) + im = im.resize((new_w, new_h), Image.LANCZOS) + canvas = Image.new("RGBA", (tw, th), (0, 0, 0, 0)) + canvas.paste(im, ((tw - new_w) // 2, (th - new_h) // 2)) + im = canvas fn = os.path.join(out_dir, f"i{i:02d}-{base_name}-{im.size[0]}x{im.size[1]}.webp") im.save(fn, 'WEBP', quality=85, method=6) print(f"Saved {fn}") @@ -90,12 +276,12 @@ async def make_avatar(i: int, png_bytes: bytes, base_name: str, out_dir: str): image_url = f"data:image/png;base64,{base64.b64encode(png_bytes).decode('utf-8')}" def api_call(): - return client.image.sample( + return _get_default_client().image.sample( prompt="Make avatar suitable for small pictures, face much bigger exactly in the center, use a pure solid bright green background (#00FF00).", - model="grok-imagine-image", + model=DEFAULT_MODEL, image_url=image_url, aspect_ratio=None, # does not work for image edit - resolution="1k", + resolution=DEFAULT_RESOLUTION, image_format="base64" ) rsp = await asyncio.to_thread(api_call) diff --git a/flexus_simple_bots/bot_pictures/__init__.py b/flexus_simple_bots/bot_pictures/__init__.py new file mode 100644 index 00000000..8b137891 --- /dev/null +++ b/flexus_simple_bots/bot_pictures/__init__.py @@ -0,0 +1 @@ + diff --git a/flexus_simple_bots/bot_pictures/style_bank/__init__.py b/flexus_simple_bots/bot_pictures/style_bank/__init__.py new file mode 100644 index 00000000..8b137891 --- /dev/null +++ b/flexus_simple_bots/bot_pictures/style_bank/__init__.py @@ -0,0 +1 @@ + diff --git a/flexus_simple_bots/bot_pictures/style_bank/manifest.json b/flexus_simple_bots/bot_pictures/style_bank/manifest.json new file mode 100644 index 00000000..775907e1 --- /dev/null +++ b/flexus_simple_bots/bot_pictures/style_bank/manifest.json @@ -0,0 +1,27 @@ +[ + { + "target_name": "frog.webp", + "source_path": "frog/frog-256x256.webp", + "label": "Cute mascot style" + }, + { + "target_name": "strategist.webp", + "source_path": "strategist/strategist-256x256.webp", + "label": "Professional portrait style" + }, + { + "target_name": "ad_monster.webp", + "source_path": "admonster/ad_monster-256x256.webp", + "label": "Playful monster style" + }, + { + "target_name": "karen.webp", + "source_path": "karen/karen-256x256.webp", + "label": "Clean assistant style" + }, + { + "target_name": "boss.webp", + "source_path": "boss/boss-256x256.webp", + "label": "Founder portrait style" + } +] diff --git a/setup.py b/setup.py index b18ba449..dde6751c 100644 --- a/setup.py +++ b/setup.py @@ -58,6 +58,7 @@ def run(self): "pandas", "playwright", "openai", + "xai-sdk", "mcp", "python-telegram-bot>=20.0", ],