Skip to content
Merged

Beta #78

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 34 additions & 18 deletions compiler/api/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import json
import re
import shutil
from functools import partial
from pathlib import Path
from typing import NamedTuple

Expand Down Expand Up @@ -45,7 +44,10 @@
# # # # # # # # # # # # # # # # # # # # # # # #
""".strip()

open = partial(open, encoding="utf-8")

def open_utf8(path: str | Path, mode: str = "r", *args, **kwargs):
return open(path, mode, encoding="utf-8", *args, **kwargs)


types_to_constructors: dict[str, list[str]] = {}
types_to_functions: dict[str, list[str]] = {}
Expand All @@ -55,7 +57,7 @@
namespaces_to_functions: dict[str, list[str]] = {}

try:
with open(API_HOME_PATH / "docs.json") as f:
with open_utf8(API_HOME_PATH / "docs.json") as f:
docs = json.load(f)
except FileNotFoundError:
docs = {"type": {}, "constructor": {}, "method": {}}
Expand Down Expand Up @@ -184,465 +186,479 @@
return ("\n ".join(items), len(items)) if items else (None, 0)


def start() -> None: # noqa: C901
shutil.rmtree(DESTINATION_PATH / "types", ignore_errors=True)
shutil.rmtree(DESTINATION_PATH / "functions", ignore_errors=True)
shutil.rmtree(DESTINATION_PATH / "base", ignore_errors=True)

with (
open(API_HOME_PATH / "source/auth_key.tl") as f1,
open(API_HOME_PATH / "source/sys_msgs.tl") as f2,
open(API_HOME_PATH / "source/main_api.tl") as f3,
open_utf8(API_HOME_PATH / "source/auth_key.tl") as f1,
open_utf8(API_HOME_PATH / "source/sys_msgs.tl") as f2,
open_utf8(API_HOME_PATH / "source/main_api.tl") as f3,
):
schema = (f1.read() + f2.read() + f3.read()).splitlines()
schema = (str(f1.read()) + str(f2.read()) + str(f3.read())).splitlines()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The f.read() method on a text file (opened without 'b' in the mode) already returns a string. The explicit str() conversions here are redundant and can be removed for cleaner code.

Suggested change
schema = (str(f1.read()) + str(f2.read()) + str(f3.read())).splitlines()
schema = (f1.read() + f2.read() + f3.read()).splitlines()


with (
open(API_HOME_PATH / "template/type.txt") as f1,
open(API_HOME_PATH / "template/combinator.txt") as f2,
open_utf8(API_HOME_PATH / "template/type.txt") as f1,
open_utf8(API_HOME_PATH / "template/combinator.txt") as f2,
):
type_tmpl = f1.read()
combinator_tmpl = f2.read()
type_tmpl = str(f1.read())
combinator_tmpl = str(f2.read())
Comment on lines +205 to +206
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The f.read() method on a text file (opened without 'b' in the mode) already returns a string. The explicit str() conversions here are redundant and can be removed for cleaner code.

Suggested change
type_tmpl = str(f1.read())
combinator_tmpl = str(f2.read())
type_tmpl = f1.read()
combinator_tmpl = f2.read()


layer = None
combinators: list[Combinator] = []

section = None
section = ""
for line in schema:
if section_match := SECTION_RE.match(line):
section = section_match.group(1)
continue

if layer_match := LAYER_RE.match(line):
layer = layer_match.group(1)
continue

if combinator_match := COMBINATOR_RE.match(line):
qualname, id, qualtype = combinator_match.groups()

namespace, name = (
qualname.split(".") if "." in qualname else ("", qualname)
)
name = camel(name)
qualname = ".".join([namespace, name]).lstrip(".")

typespace, type = (
qualtype.split(".") if "." in qualtype else ("", qualtype)
)
type = camel(type)
qualtype = ".".join([typespace, type]).lstrip(".")

has_flags = bool(FLAGS_RE_3.findall(line))

args = ARGS_RE.findall(line)

# Fix arg name being "self" or "from" (reserved python keywords)
for i, item in enumerate(args):
if item[0] == "self":
args[i] = ("is_self", item[1])
if item[0] == "from":
args[i] = ("from_peer", item[1])

combinator = Combinator(
section=section,
qualname=qualname,
namespace=namespace,
name=name,
id=f"0x{id}",
has_flags=has_flags,
args=args,
qualtype=qualtype,
typespace=typespace,
type=type,
)

combinators.append(combinator)

for c in combinators:
qualtype = c.qualtype

if qualtype.startswith("Vector"):
qualtype = qualtype.split("<")[1][:-1]

d = types_to_constructors if c.section == "types" else types_to_functions

if qualtype not in d:
d[qualtype] = []

d[qualtype].append(c.qualname)

if c.section == "types":
key = c.namespace

if key not in namespaces_to_types:
namespaces_to_types[key] = []

if c.type not in namespaces_to_types[key]:
namespaces_to_types[key].append(c.type)

for k, v in types_to_constructors.items():
for i in v:
with contextlib.suppress(KeyError):
constructors_to_functions[i] = types_to_functions[k]

for qualtype, qualval in types_to_constructors.items():
typespace, type = qualtype.split(".") if "." in qualtype else ("", qualtype)
dir_path = DESTINATION_PATH / "base" / typespace

module = type

if module == "Updates":
module = "UpdatesT"

dir_path.mkdir(parents=True, exist_ok=True)

constructors = sorted(qualval)
constr_count = len(constructors)
items = "\n ".join([f"{c}" for c in constructors])

type_docs = docs["type"].get(qualtype, None)

type_docs = type_docs["desc"] if type_docs else "Telegram API base type."

docstring = type_docs

docstring += (
f"\n\n Constructors:\n"
f" This base type has {constr_count} constructor{'s' if constr_count > 1 else ''} available.\n\n"
f" .. currentmodule:: pyrogram.raw.types\n\n"
f" .. autosummary::\n"
f" :nosignatures:\n\n"
f" {items}"
)

references, ref_count = get_references(qualtype, "types")

if references:
docstring += f"\n\n Functions:\n This object can be returned by {ref_count} function{'s' if ref_count > 1 else ''}.\n\n .. currentmodule:: pyrogram.raw.functions\n\n .. autosummary::\n :nosignatures:\n\n {references}"

with open(dir_path / f"{snake(module)}.py", "w") as f:
with open_utf8(dir_path / f"{snake(module)}.py", "w") as f:
f.write(
type_tmpl.format(
warning=WARNING,
docstring=docstring,
name=type,
qualname=qualtype,
types=", ".join([f'"raw.types.{c}"' for c in constructors]),
doc_name=snake(type).replace("_", "-"),
),
)

for c in combinators:
sorted_args = sort_args(c.args)

arguments = (", *, " if c.args else "") + (
", ".join([f"{i[0]}: {get_type_hint(i[1])}" for i in sorted_args])
if sorted_args
else ""
)

fields = (
"\n ".join(
[f"self.{i[0]} = {i[0]} # {i[1]}" for i in sorted_args],
)
if sorted_args
else "pass"
)

docstring = ""
docstring_args = []

combinator_docs = (
docs["method"] if c.section == "functions" else docs["constructor"]
)

for arg in sorted_args:
arg_name, arg_type = arg
is_optional = FLAGS_RE.match(arg_type)
arg_type = arg_type.split("?")[-1]

arg_docs = combinator_docs.get(c.qualname, None)

arg_docs = arg_docs["params"].get(arg_name, "N/A") if arg_docs else "N/A"

docstring_args.append(
f"{arg_name} ({get_docstring_arg_type(arg_type)}{', *optional*' if is_optional else ''}):\n {arg_docs}\n",
)

if c.section == "types":
constructor_docs = docs["constructor"].get(c.qualname, None)

constructor_docs = (
constructor_docs["desc"]
if constructor_docs
else "Telegram API type."
)
docstring += constructor_docs + "\n"
docstring += (
f"\n Constructor of :obj:`~pyrogram.raw.base.{c.qualtype}`."
)
elif function_docs := docs["method"].get(c.qualname, None):
docstring += function_docs["desc"] + "\n"
else:
docstring += "Telegram API function."

docstring += f"\n\n Details:\n - Layer: ``{layer}``\n - ID: ``{c.id[2:].upper()}``\n\n"
docstring += " Parameters:\n " + (
"\n ".join(docstring_args)
if docstring_args
else "No parameters required.\n"
)

if c.section == "functions":
docstring += "\n Returns:\n " + get_docstring_arg_type(
c.qualtype,
)
else:
references, count = get_references(c.qualname, "constructors")

if references:
docstring += f"\n Functions:\n This object can be returned by {count} function{'s' if count > 1 else ''}.\n\n .. currentmodule:: pyrogram.raw.functions\n\n .. autosummary::\n :nosignatures:\n\n {references}"

write_types = read_types = "" if c.has_flags else "# No flags\n "

for arg_name, arg_type in c.args:
flag = FLAGS_RE_2.match(arg_type)

if re.match(r"flags\d?", arg_name) and arg_type == "#":
write_flags = []

for i in c.args:
flag = FLAGS_RE_2.match(i[1])

if flag:
if arg_name != f"flags{flag.group(1)}":
continue

if flag.group(3) == "true" or flag.group(3).startswith(
"Vector",
):
write_flags.append(
f"{arg_name} |= (1 << {flag.group(2)}) if self.{i[0]} else 0",
)
else:
write_flags.append(
f"{arg_name} |= (1 << {flag.group(2)}) if self.{i[0]} is not None else 0",
)

write_flags = "\n ".join(
[
f"{arg_name} = 0",
"\n ".join(write_flags),
f"b.write(Int({arg_name}))\n ",
],
)

write_types += write_flags
read_types += f"\n {arg_name} = Int.read(b)\n "

continue

if flag:
number, index, flag_type = flag.groups()

if flag_type == "true":
read_types += "\n "
read_types += f"{arg_name} = True if flags{number} & (1 << {index}) else False"
elif flag_type in CORE_TYPES:
write_types += "\n "
write_types += f"if self.{arg_name} is not None:\n "
write_types += (
f"b.write({flag_type.title()}(self.{arg_name}))\n "
)

read_types += "\n "
read_types += f"{arg_name} = {flag_type.title()}.read(b) if flags{number} & (1 << {index}) else None"
elif "vector" in flag_type.lower():
sub_type = arg_type.split("<")[1][:-1]

write_types += "\n "
write_types += f"if self.{arg_name} is not None:\n "
write_types += f"b.write(Vector(self.{arg_name}{f', {sub_type.title()}' if sub_type in CORE_TYPES else ''}))\n "

read_types += "\n "
read_types += f"{arg_name} = TLObject.read(b{f', {sub_type.title()}' if sub_type in CORE_TYPES else ''}) if flags{number} & (1 << {index}) else []\n "
else:
write_types += "\n "
write_types += f"if self.{arg_name} is not None:\n "
write_types += f"b.write(self.{arg_name}.write())\n "

read_types += "\n "
read_types += f"{arg_name} = TLObject.read(b) if flags{number} & (1 << {index}) else None\n "
else:
write_types += "\n "
if arg_type in CORE_TYPES:
write_types += (
f"b.write({arg_type.title()}(self.{arg_name}))\n "
)

read_types += "\n "
read_types += (
f"{arg_name} = {arg_type.title()}.read(b)\n "
)
elif "vector" in arg_type.lower():
sub_type = arg_type.split("<")[1][:-1]

write_types += f"b.write(Vector(self.{arg_name}{f', {sub_type.title()}' if sub_type in CORE_TYPES else ''}))\n "

read_types += "\n "
read_types += f"{arg_name} = TLObject.read(b{f', {sub_type.title()}' if sub_type in CORE_TYPES else ''})\n "
else:
write_types += f"b.write(self.{arg_name}.write())\n "

read_types += "\n "
read_types += f"{arg_name} = TLObject.read(b)\n "

slots = ", ".join([f'"{i[0]}"' for i in sorted_args])
return_arguments = ", ".join([f"{i[0]}={i[0]}" for i in sorted_args])

base_class = (
f"raw.base.{c.qualtype}"
if c.qualtype in types_to_constructors
else "TLObject"
)

if c.section == "functions":
base_class = "TLObject"

compiled_combinator = combinator_tmpl.format(
warning=WARNING,
base=base_class,
name=c.name,
docstring=docstring,
slots=slots,
id=c.id,
qualname=f"{c.section}.{c.qualname}",
arguments=arguments,
fields=fields,
read_types=read_types,
write_types=write_types,
return_arguments=return_arguments,
)

directory = "types" if c.section == "types" else c.section

dir_path = DESTINATION_PATH / directory / c.namespace

dir_path.mkdir(exist_ok=True, parents=True)

module = c.name

if module == "Updates":
module = "UpdatesT"

with open(dir_path / f"{snake(module)}.py", "w") as f:
with open_utf8(dir_path / f"{snake(module)}.py", "w") as f:
f.write(compiled_combinator)

d = (
namespaces_to_constructors
if c.section == "types"
else namespaces_to_functions
)

if c.namespace not in d:
d[c.namespace] = []

d[c.namespace].append(c.name)

for namespace, types in namespaces_to_types.items():
with open(DESTINATION_PATH / "base" / namespace / "__init__.py", "w") as f:
with open_utf8(
DESTINATION_PATH / "base" / namespace / "__init__.py", "w"
) as f:
f.write(f"{WARNING}\n\n")

all = []

for t in types:
module = t

if module == "Updates":
module = "UpdatesT"

all.append(t)

f.write(f"from .{snake(module)} import {t}\n")

if not namespace:
f.write(
f"from . import {', '.join(filter(bool, namespaces_to_types))}",
)

all.extend(filter(bool, namespaces_to_types))

f.write("\n\n__all__ = [\n")
for it in all:
f.write(f' "{it}",\n')
f.write("]\n")

for namespace, types in namespaces_to_constructors.items():
with open(DESTINATION_PATH / "types" / namespace / "__init__.py", "w") as f:
with open_utf8(
DESTINATION_PATH / "types" / namespace / "__init__.py", "w"
) as f:
f.write(f"{WARNING}\n\n")

all = []

for t in types:
module = t

if module == "Updates":
module = "UpdatesT"

all.append(t)

f.write(f"from .{snake(module)} import {t}\n")

if not namespace:
f.write(
f"from . import {', '.join(filter(bool, namespaces_to_constructors))}\n",
)

all.extend(filter(bool, namespaces_to_constructors))

f.write("\n\n__all__ = [\n")
for it in all:
f.write(f' "{it}",\n')
f.write("]\n")

for namespace, types in namespaces_to_functions.items():
with open(
with open_utf8(
DESTINATION_PATH / "functions" / namespace / "__init__.py",
"w",
) as f:
f.write(f"{WARNING}\n\n")

all = []

for t in types:
module = t

if module == "Updates":
module = "UpdatesT"

all.append(t)

f.write(f"from .{snake(module)} import {t}\n")

if not namespace:
f.write(
f"from . import {', '.join(filter(bool, namespaces_to_functions))}",
)

all.extend(filter(bool, namespaces_to_functions))

f.write("\n\n__all__ = [\n")
for it in all:
f.write(f' "{it}",\n')
f.write("]\n")

with open(DESTINATION_PATH / "all.py", "w", encoding="utf-8") as f:
with open_utf8(DESTINATION_PATH / "all.py", "w") as f:
f.write(WARNING + "\n\n")
f.write(f"layer = {layer}\n\n")
f.write("objects = {")

for c in combinators:
f.write(f'\n {c.id}: "pyrogram.raw.{c.section}.{c.qualname}",')

f.write('\n 0xbc799737: "pyrogram.raw.core.BoolFalse",')
f.write('\n 0x997275b5: "pyrogram.raw.core.BoolTrue",')
f.write('\n 0x1cb5c415: "pyrogram.raw.core.Vector",')
f.write('\n 0x73f1f8dc: "pyrogram.raw.core.MsgContainer",')
f.write('\n 0xae500895: "pyrogram.raw.core.FutureSalts",')
f.write('\n 0x0949d9dc: "pyrogram.raw.core.FutureSalt",')
f.write('\n 0x3072cfa1: "pyrogram.raw.core.GzipPacked",')
f.write('\n 0x5bb8e511: "pyrogram.raw.core.Message",')

f.write("\n}\n")

Check warning on line 661 in compiler/api/compiler.py

View check run for this annotation

codefactor.io / CodeFactor

compiler/api/compiler.py#L189-L661

Very Complex Method


if __name__ == "__main__":
Expand Down
2 changes: 1 addition & 1 deletion compiler/api/template/combinator.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ from typing import List, Optional, Any
{warning}


class {name}(TLObject): # type: ignore
class {name}({base}): # type: ignore
"""{docstring}
"""

Expand Down
7 changes: 1 addition & 6 deletions compiler/api/template/type.txt
Original file line number Diff line number Diff line change
@@ -1,13 +1,8 @@
{warning}

from typing import Union
from pyrogram import raw
from pyrogram.raw.core import TLObject

{name} = Union[{types}]


class {name}: # type: ignore
class {name}(TLObject): # type: ignore
"""{docstring}
"""

Expand Down
2 changes: 2 additions & 0 deletions compiler/docs/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from pathlib import Path

# Paths
page_template = ""
toctree = ""
HOME = "compiler/docs"
DESTINATION = "docs/source/telegram"
PYROGRAM_API_DEST = "docs/source/api"
Expand Down Expand Up @@ -70,149 +72,149 @@
return re.sub(r"([a-z0-9])([A-Z])", r"\1_\2", s).lower()


def generate_raw(source_path, base) -> None:
all_entities = {}

def build(path, level=0) -> None:
path_obj = Path(path)

for i in path_obj.iterdir():
if i.is_dir():
if not i.name.startswith("__"):
build(str(i), level=level + 1)
else:
if i.name.startswith("__"):
continue
with i.open(encoding="utf-8") as f:
p = ast.parse(f.read())

for node in ast.walk(p):
if isinstance(node, ast.ClassDef):
name = node.name
break
else:
continue

full_path = (
Path(path).name + "/" + snake(name).replace("_", "-") + ".rst"
)

if level:
full_path = base + "/" + full_path

namespace = path.split("/")[-1]
if namespace in ["base", "types", "functions"]:
namespace = ""

full_name = f"{(namespace + '.') if namespace else ''}{name}"

Path(DESTINATION, full_path).parent.mkdir(
parents=True,
exist_ok=True,
)

with Path(DESTINATION, full_path).open("w", encoding="utf-8") as f:
f.write(
page_template.format(
title=full_name,
title_markup="=" * len(full_name),
full_class_path="pyrogram.raw.{}".format(
".".join(full_path.split("/")[:-1]) + "." + name,
),
),
)

if path_obj.name not in all_entities:
all_entities[path_obj.name] = []

all_entities[path_obj.name].append(name)

build(source_path)

for k, v in sorted(all_entities.items()):
v = sorted(v)
entities = [f"{i} <{snake(i).replace('_', '-')}>" for i in v]

if k != base:
inner_path = base + "/" + k + "/index" + ".rst"
module = f"pyrogram.raw.{base}.{k}"
else:
for i in sorted(all_entities, reverse=True):
if i != base:
entities.insert(0, f"{i}/index")

inner_path = base + "/index" + ".rst"
module = f"pyrogram.raw.{base}"

with Path(DESTINATION, inner_path).open("w", encoding="utf-8") as f:
if k == base:
f.write(":tocdepth: 1\n\n")
k = "Raw " + k

f.write(
toctree.format(
title=k.title(),
title_markup="=" * len(k),
module=module,
entities="\n ".join(entities),
),
)

f.write("\n")

Check notice on line 163 in compiler/docs/compiler.py

View check run for this annotation

codefactor.io / CodeFactor

compiler/docs/compiler.py#L75-L163

Complex Method


def pyrogram_api() -> None:
# Discovery logic

methods_categories = {}
methods_path = Path(METHODS_PATH)
if not methods_path.exists():
methods_path = Path("../../") / METHODS_PATH

for entry in sorted(methods_path.iterdir()):
if entry.is_dir() and entry.name != "decorators":
category_name = entry.name
methods = []
for file in sorted(entry.glob("*.py")):
if file.name == "__init__.py":
continue

# Check if it defines a class or is special function (idle, compose)
with Path(file).open(encoding="utf-8") as f:
tree = ast.parse(f.read())

has_class = any(
isinstance(node, ast.ClassDef) for node in ast.walk(tree)
)
is_special = file.stem in ["idle", "compose"]

if has_class or is_special:
methods.append(file.stem)

if methods:
title = METHODS_TITLES.get(
category_name,
category_name.replace("_", " ").title(),
)
methods_categories[category_name] = (title, methods)

types_categories = {}
types_lib_path = Path(TYPES_LIB_PATH)
if not types_lib_path.exists():
types_lib_path = Path("../../") / TYPES_LIB_PATH

for entry in sorted(types_lib_path.iterdir()):
if entry.is_dir():
category_name = entry.name
types = []
for file in sorted(entry.glob("*.py")):
if file.name == "__init__.py":
continue
with Path(file).open(encoding="utf-8") as f:
try:
tree = ast.parse(f.read())
except Exception:
continue

Check notice on line 217 in compiler/docs/compiler.py

View check run for this annotation

codefactor.io / CodeFactor

compiler/docs/compiler.py#L216-L217

Try, Except, Continue detected. (B112)
types.extend(
node.name
for node in ast.walk(tree)
Expand All @@ -232,8 +234,8 @@
with Path(file).open(encoding="utf-8") as f:
try:
tree = ast.parse(f.read())
except Exception:
continue

Check notice on line 238 in compiler/docs/compiler.py

View check run for this annotation

codefactor.io / CodeFactor

compiler/docs/compiler.py#L237-L238

Try, Except, Continue detected. (B112)
for node in ast.walk(tree):
if isinstance(node, ast.ClassDef):
class_name = node.name
Expand Down Expand Up @@ -261,8 +263,8 @@
with Path(file).open(encoding="utf-8") as f:
try:
tree = ast.parse(f.read())
except Exception:
continue

Check notice on line 267 in compiler/docs/compiler.py

View check run for this annotation

codefactor.io / CodeFactor

compiler/docs/compiler.py#L266-L267

Try, Except, Continue detected. (B112)
enums_list.extend(
node.name for node in ast.walk(tree) if isinstance(node, ast.ClassDef)
)
Expand Down
2 changes: 1 addition & 1 deletion pyrogram/errors/rpc_error.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class RPCError(Exception):

def __init__(
self,
value: int | str | raw.types.RpcError = None,
value: int | str | raw.types.RpcError | None = None,
rpc_name: str | None = None,
is_unknown: bool = False,
is_signed: bool = False,
Expand Down
4 changes: 2 additions & 2 deletions pyrogram/file_id.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,8 @@ def __init__(
media_id: int | None = None,
access_hash: int | None = None,
volume_id: int | None = None,
thumbnail_source: ThumbnailSource = None,
thumbnail_file_type: FileType = None,
thumbnail_source: ThumbnailSource | None = None,
thumbnail_file_type: FileType | None = None,
thumbnail_size: str = "",
secret: int | None = None,
local_id: int | None = None,
Expand Down
2 changes: 1 addition & 1 deletion pyrogram/handlers/deleted_bot_business_messages_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class DeletedBotBusinessMessagesHandler(Handler):
The deleted messages, as list.
"""

def __init__(self, callback: Callable, filters: Filter = None) -> None:
def __init__(self, callback: Callable, filters: Filter | None = None) -> None:
super().__init__(callback, filters)

async def check(self, client: pyrogram.Client, messages: list[Message]) -> bool:
Expand Down
2 changes: 1 addition & 1 deletion pyrogram/handlers/deleted_messages_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class DeletedMessagesHandler(Handler):
The deleted messages, as list.
"""

def __init__(self, callback: Callable, filters: Filter = None) -> None:
def __init__(self, callback: Callable, filters: Filter | None = None) -> None:
super().__init__(callback, filters)

async def check(self, client: pyrogram.Client, messages: list[Message]) -> bool:
Expand Down
2 changes: 1 addition & 1 deletion pyrogram/handlers/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@


class Handler:
def __init__(self, callback: Callable, filters: Filter = None) -> None:
def __init__(self, callback: Callable, filters: Filter | None = None) -> None:
self.callback = callback
self.filters = filters

Expand Down
2 changes: 1 addition & 1 deletion pyrogram/methods/bots/delete_bot_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
class DeleteBotCommands:
async def delete_bot_commands(
self: pyrogram.Client,
scope: types.BotCommandScope = None,
scope: types.BotCommandScope | None = None,
language_code: str = "",
) -> bool:
"""Delete the list of the bot's commands for the given scope and user language.
Expand Down
2 changes: 1 addition & 1 deletion pyrogram/methods/bots/get_bot_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
class GetBotCommands:
async def get_bot_commands(
self: pyrogram.Client,
scope: types.BotCommandScope = None,
scope: types.BotCommandScope | None = None,
language_code: str = "",
) -> list[types.BotCommand]:
"""Get the current list of the bot's commands for the given scope and user language.
Expand Down
3 changes: 2 additions & 1 deletion pyrogram/methods/bots/send_game.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ async def send_game(
reply_markup: types.InlineKeyboardMarkup
| types.ReplyKeyboardMarkup
| types.ReplyKeyboardRemove
| types.ForceReply = None,
| types.ForceReply
| None = None,
) -> types.Message:
"""Send a game.

Expand Down
2 changes: 1 addition & 1 deletion pyrogram/methods/bots/set_bot_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ class SetBotCommands:
async def set_bot_commands(
self: pyrogram.Client,
commands: list[types.BotCommand],
scope: types.BotCommandScope = None,
scope: types.BotCommandScope | None = None,
language_code: str = "",
) -> bool:
"""Set the list of the bot's commands.
Expand Down
2 changes: 1 addition & 1 deletion pyrogram/methods/bots/set_bot_default_privileges.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
class SetBotDefaultPrivileges:
async def set_bot_default_privileges(
self: pyrogram.Client,
privileges: types.ChatPrivileges = None,
privileges: types.ChatPrivileges | None = None,
for_channels: bool | None = None,
) -> bool:
"""Change the default privileges requested by the bot when it's added as an administrator to groups or channels.
Expand Down
2 changes: 1 addition & 1 deletion pyrogram/methods/bots/set_chat_menu_button.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ class SetChatMenuButton:
async def set_chat_menu_button(
self: pyrogram.Client,
chat_id: int | str | None = None,
menu_button: types.MenuButton = None,
menu_button: types.MenuButton | None = None,
) -> bool:
"""Change the bot's menu button in a private chat, or the default menu button.

Expand Down
4 changes: 2 additions & 2 deletions pyrogram/methods/business/send_invoice.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ async def send_invoice(
photo_size: int | None = None,
photo_mime_type: str | None = None,
start_parameter: str | None = None,
extended_media: types.InputMedia = None,
extended_media: types.InputMedia | None = None,
reply_to_message_id: int | None = None,
message_thread_id: int | None = None,
quote_text: str | None = None,
allow_paid_broadcast: bool | None = None,
quote_entities: list[types.MessageEntity] | None = None,
reply_markup: types.InlineKeyboardMarkup = None,
reply_markup: types.InlineKeyboardMarkup | None = None,
):
"""Use this method to send invoices.

Expand Down
2 changes: 1 addition & 1 deletion pyrogram/methods/chats/get_chat_event_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ async def get_chat_event_log(
query: str = "",
offset_id: int = 0,
limit: int = 0,
filters: types.ChatEventFilter = None,
filters: types.ChatEventFilter | None = None,
user_ids: list[int | str] | None = None,
) -> AsyncGenerator[types.ChatEvent, None] | None:
"""Get the actions taken by chat members and administrators in the last 48h.
Expand Down
2 changes: 1 addition & 1 deletion pyrogram/methods/chats/promote_chat_member.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ async def promote_chat_member(
self: pyrogram.Client,
chat_id: int | str,
user_id: int | str,
privileges: types.ChatPrivileges = None,
privileges: types.ChatPrivileges | None = None,
title: str | None = None,
) -> bool:
"""Promote or demote a user in a supergroup or a channel.
Expand Down
2 changes: 1 addition & 1 deletion pyrogram/methods/chats/update_folder.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ async def update_folder(
exclude_muted: bool | None = None,
exclude_read: bool | None = None,
exclude_archived: bool | None = None,
color: enums.FolderColor = None,
color: enums.FolderColor | None = None,
emoji: str | None = None,
) -> bool:
"""Create or update a user's folder.
Expand Down
1 change: 1 addition & 0 deletions pyrogram/methods/messages/copy_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ async def copy_message(
allow_paid_broadcast: bool | None = None,
invert_media: bool = False,
reply_markup: types.InlineKeyboardMarkup
| None
| types.ReplyKeyboardMarkup
| types.ReplyKeyboardRemove
| types.ForceReply = None,
Expand Down
2 changes: 1 addition & 1 deletion pyrogram/methods/messages/edit_inline_caption.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ async def edit_inline_caption(
inline_message_id: str,
caption: str,
parse_mode: enums.ParseMode | None = None,
reply_markup: types.InlineKeyboardMarkup = None,
reply_markup: types.InlineKeyboardMarkup | None = None,
invert_media: bool | None = None,
) -> bool:
"""Edit the caption of inline media messages.
Expand Down
2 changes: 1 addition & 1 deletion pyrogram/methods/messages/edit_inline_media.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,252 +18,252 @@
class EditInlineMedia:
MAX_RETRIES = 3

async def edit_inline_media(
self: pyrogram.Client,
inline_message_id: str,
media: types.InputMedia,
reply_markup: types.InlineKeyboardMarkup = None,
reply_markup: types.InlineKeyboardMarkup | None = None,
) -> bool:
"""Edit animation, audio, document, photo or video messages, or replace text with animation, audio, document, photo or video messages.

When the inline message is edited, a new file can't be uploaded. Use a previously uploaded file via its file_id
or specify a URL.

.. include:: /_includes/usable-by/bots.rst

Parameters:
inline_message_id (``str``):
Required if *chat_id* and *message_id* are not specified.
Identifier of the inline message.

media (:obj:`~pyrogram.types.InputMedia`):
One of the InputMedia objects describing an animation, audio, document, photo or video.

reply_markup (:obj:`~pyrogram.types.InlineKeyboardMarkup`, *optional*):
An InlineKeyboardMarkup object.

Returns:
``bool``: On success, True is returned.

Example:
.. code-block:: python

from pyrogram.types import InputMediaPhoto, InputMediaVideo, InputMediaAudio

# Bots only

# Replace the current media with a local photo
await app.edit_inline_media(inline_message_id, InputMediaPhoto("new_photo.jpg"))

# Replace the current media with a local video
await app.edit_inline_media(inline_message_id, InputMediaVideo("new_video.mp4"))

# Replace the current media with a local audio
await app.edit_inline_media(inline_message_id, InputMediaAudio("new_audio.mp3"))
"""
caption = media.caption
parse_mode = media.parse_mode

is_bytes_io = isinstance(media.media, io.BytesIO)
is_uploaded_file = is_bytes_io or (
isinstance(media.media, str) and await AsyncPath(media.media).is_file()
)

is_external_url = not is_uploaded_file and re.match(
"^https?://",
media.media,
)

if is_bytes_io and not hasattr(media.media, "name"):
media.media.name = "media"

if is_uploaded_file:
filename_attribute = [
raw.types.DocumentAttributeFilename(
file_name=media.media.name
if is_bytes_io
else Path(media.media).name,
),
]
else:
filename_attribute = []

if isinstance(media, types.InputMediaPhoto):
if is_uploaded_file:
media = raw.types.InputMediaUploadedPhoto(
file=await self.save_file(media.media),
spoiler=media.has_spoiler,
)
elif is_external_url:
media = raw.types.InputMediaPhotoExternal(
url=media.media,
spoiler=media.has_spoiler,
)
else:
media = utils.get_input_media_from_file_id(
media.media,
FileType.PHOTO,
)
elif isinstance(media, types.InputMediaVideo):
if is_uploaded_file:
media = raw.types.InputMediaUploadedDocument(
mime_type=(
None if is_bytes_io else self.guess_mime_type(media.media)
)
or "video/mp4",
thumb=await self.save_file(media.thumb),
file=await self.save_file(media.media),
spoiler=media.has_spoiler,
attributes=[
raw.types.DocumentAttributeVideo(
supports_streaming=media.supports_streaming or None,
duration=media.duration,
w=media.width,
h=media.height,
),
*filename_attribute,
],
)
elif is_external_url:
media = raw.types.InputMediaDocumentExternal(
url=media.media,
spoiler=media.has_spoiler,
)
else:
media = utils.get_input_media_from_file_id(
media.media,
FileType.VIDEO,
)
elif isinstance(media, types.InputMediaAudio):
if is_uploaded_file:
media = raw.types.InputMediaUploadedDocument(
mime_type=(
None if is_bytes_io else self.guess_mime_type(media.media)
)
or "audio/mpeg",
thumb=await self.save_file(media.thumb),
file=await self.save_file(media.media),
attributes=[
raw.types.DocumentAttributeAudio(
duration=media.duration,
performer=media.performer,
title=media.title,
),
*filename_attribute,
],
)
elif is_external_url:
media = raw.types.InputMediaDocumentExternal(url=media.media)
else:
media = utils.get_input_media_from_file_id(
media.media,
FileType.AUDIO,
)
elif isinstance(media, types.InputMediaAnimation):
if is_uploaded_file:
media = raw.types.InputMediaUploadedDocument(
mime_type=(
None if is_bytes_io else self.guess_mime_type(media.media)
)
or "video/mp4",
thumb=await self.save_file(media.thumb),
file=await self.save_file(media.media),
spoiler=media.has_spoiler,
attributes=[
raw.types.DocumentAttributeVideo(
supports_streaming=True,
duration=media.duration,
w=media.width,
h=media.height,
),
raw.types.DocumentAttributeAnimated(),
*filename_attribute,
],
nosound_video=True,
)
elif is_external_url:
media = raw.types.InputMediaDocumentExternal(
url=media.media,
spoiler=media.has_spoiler,
)
else:
media = utils.get_input_media_from_file_id(
media.media,
FileType.ANIMATION,
)
elif isinstance(media, types.InputMediaDocument):
if is_uploaded_file:
media = raw.types.InputMediaUploadedDocument(
mime_type=(
None if is_bytes_io else self.guess_mime_type(media.media)
)
or "application/zip",
thumb=await self.save_file(media.thumb),
file=await self.save_file(media.media),
attributes=filename_attribute,
force_file=True,
)
elif is_external_url:
media = raw.types.InputMediaDocumentExternal(url=media.media)
else:
media = utils.get_input_media_from_file_id(
media.media,
FileType.DOCUMENT,
)

unpacked = utils.unpack_inline_message_id(inline_message_id)
dc_id = unpacked.dc_id

session = await get_session(self, dc_id)

if is_uploaded_file:
uploaded_media = await self.invoke(
raw.functions.messages.UploadMedia(
peer=raw.types.InputPeerSelf(),
media=media,
),
)

actual_media = (
raw.types.InputMediaPhoto(
id=raw.types.InputPhoto(
id=uploaded_media.photo.id,
access_hash=uploaded_media.photo.access_hash,
file_reference=uploaded_media.photo.file_reference,
),
spoiler=getattr(media, "has_spoiler", None),
)
if isinstance(media, types.InputMediaPhoto)
else raw.types.InputMediaDocument(
id=raw.types.InputDocument(
id=uploaded_media.document.id,
access_hash=uploaded_media.document.access_hash,
file_reference=uploaded_media.document.file_reference,
),
spoiler=getattr(media, "has_spoiler", None),
)
)
else:
actual_media = media

for i in range(self.MAX_RETRIES):
try:
return await session.invoke(
raw.functions.messages.EditInlineBotMessage(
id=unpacked,
media=actual_media,
reply_markup=await reply_markup.write(self)
if reply_markup
else None,
**await self.parser.parse(caption, parse_mode),
),
sleep_threshold=self.sleep_threshold,
)
except RPCError as e:
if i == self.MAX_RETRIES - 1:
raise

if isinstance(e, MediaEmpty):
# Must wait due to a server race condition
await asyncio.sleep(1)
return None

Check notice on line 269 in pyrogram/methods/messages/edit_inline_media.py

View check run for this annotation

codefactor.io / CodeFactor

pyrogram/methods/messages/edit_inline_media.py#L21-L269

Complex Method
2 changes: 1 addition & 1 deletion pyrogram/methods/messages/edit_inline_reply_markup.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ class EditInlineReplyMarkup:
async def edit_inline_reply_markup(
self: pyrogram.Client,
inline_message_id: str,
reply_markup: types.InlineKeyboardMarkup = None,
reply_markup: types.InlineKeyboardMarkup | None = None,
) -> bool:
"""Edit only the reply markup of inline messages sent via the bot (for inline bots).

Expand Down
2 changes: 1 addition & 1 deletion pyrogram/methods/messages/edit_inline_text.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ async def edit_inline_text(
text: str,
parse_mode: enums.ParseMode | None = None,
disable_web_page_preview: bool | None = None,
reply_markup: types.InlineKeyboardMarkup = None,
reply_markup: types.InlineKeyboardMarkup | None = None,
invert_media: bool | None = None,
) -> bool:
"""Edit the text of inline messages.
Expand Down
2 changes: 1 addition & 1 deletion pyrogram/methods/messages/edit_message_caption.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ async def edit_message_caption(
parse_mode: enums.ParseMode | None = None,
caption_entities: list[types.MessageEntity] | None = None,
invert_media: bool = False,
reply_markup: types.InlineKeyboardMarkup = None,
reply_markup: types.InlineKeyboardMarkup | None = None,
business_connection_id: str | None = None,
) -> types.Message:
"""Edit the caption of media messages.
Expand Down
Loading
Loading