import struct import asyncio from typing import Annotated import redis.asyncio as aioredis from fastapi import FastAPI, Path from fastapi.responses import Response from config import Config config = Config('config.hjson') redis = None app = FastAPI() @app.get( '/counter/{name}', responses = { 200: { 'content': {'image/png': {}} } }, response_class=Response ) async def get_counter( name: Annotated[ str, Path( min_length=1, max_length=50, pattern=r'^[a-zA-Z0-9А-Яа-я\-\.,_@#:]+$' ) ] ): if await redis.exists(name): number = await redis.get(name) number = int(number) + 1 else: number = 1 await redis.set(name, str(number)) proc = await asyncio.create_subprocess_exec( config.CounterPath, stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE ) stdout, _ = await proc.communicate( input=struct.pack('L', number) ) return Response( content=stdout, media_type='image/png' ) @app.on_event('startup') async def startup_event(): global redis redis = await aioredis.from_url( config.RedisURL ) @app.on_event('shutdown') async def shutdown_event(): if redis is not None: await redis.close()