mirror of
https://github.com/d3vyce/fastapi-toolsets.git
synced 2026-03-02 01:10:47 +01:00
feat: simplify CLI feature (#23)
* chore: cleanup + add tests * chore: remove graph and show fixtures commands * feat: add async_command wrapper
This commit is contained in:
27
src/fastapi_toolsets/cli/utils.py
Normal file
27
src/fastapi_toolsets/cli/utils.py
Normal file
@@ -0,0 +1,27 @@
|
||||
"""CLI utility functions."""
|
||||
|
||||
import asyncio
|
||||
import functools
|
||||
from collections.abc import Callable, Coroutine
|
||||
from typing import Any, ParamSpec, TypeVar
|
||||
|
||||
P = ParamSpec("P")
|
||||
T = TypeVar("T")
|
||||
|
||||
|
||||
def async_command(func: Callable[P, Coroutine[Any, Any, T]]) -> Callable[P, T]:
|
||||
"""Decorator to run an async function as a sync CLI command.
|
||||
|
||||
Example:
|
||||
@fixture_cli.command("load")
|
||||
@async_command
|
||||
async def load(ctx: typer.Context) -> None:
|
||||
async with get_db_context() as session:
|
||||
await load_fixtures(session, registry)
|
||||
"""
|
||||
|
||||
@functools.wraps(func)
|
||||
def wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
|
||||
return asyncio.run(func(*args, **kwargs))
|
||||
|
||||
return wrapper
|
||||
Reference in New Issue
Block a user