app.py 1.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
  1. import asyncio
  2. from typing import Annotated
  3. import redis.asyncio as aioredis
  4. from aiofiles.tempfile import NamedTemporaryFile
  5. from fastapi import FastAPI, Path
  6. from fastapi.responses import Response
  7. from config import Config
  8. config = Config('config.hjson')
  9. redis = None
  10. app = FastAPI()
  11. @app.get(
  12. '/counter/{name}',
  13. responses = {
  14. 200: {
  15. 'content': {'image/png': {}}
  16. }
  17. },
  18. response_class=Response
  19. )
  20. async def get_counter(
  21. name: Annotated[
  22. str,
  23. Path(
  24. min_length=1,
  25. max_length=50,
  26. pattern=r'^[a-zA-Z0-9_]+$'
  27. )
  28. ]
  29. ):
  30. if await redis.exists(name):
  31. number = await redis.get(name)
  32. number = int(number) + 1
  33. else:
  34. number = 1
  35. await redis.set(name, str(number))
  36. async with NamedTemporaryFile('rb') as f:
  37. proc = await asyncio.create_subprocess_shell(
  38. f'{config.CounterPath} {number} {f.name}'
  39. )
  40. await proc.communicate()
  41. return Response(
  42. content=await f.read(),
  43. media_type='image/png'
  44. )
  45. @app.on_event('startup')
  46. async def startup_event():
  47. global redis
  48. redis = await aioredis.from_url(
  49. config.RedisURL
  50. )
  51. @app.on_event('shutdown')
  52. async def shutdown_event():
  53. if redis is not None:
  54. await redis.close()