forked from oscar.krause/fastapi-dls
code styling
This commit is contained in:
parent
c83130f138
commit
92fe6154e6
50
app/main.py
50
app/main.py
@ -3,7 +3,7 @@ from base64 import b64encode as b64enc
|
|||||||
from hashlib import sha256
|
from hashlib import sha256
|
||||||
from uuid import uuid4
|
from uuid import uuid4
|
||||||
from os.path import join, dirname
|
from os.path import join, dirname
|
||||||
from os import getenv
|
from os import getenv as env
|
||||||
|
|
||||||
from dotenv import load_dotenv
|
from dotenv import load_dotenv
|
||||||
from fastapi import FastAPI, HTTPException
|
from fastapi import FastAPI, HTTPException
|
||||||
@ -20,52 +20,28 @@ from starlette.responses import StreamingResponse, JSONResponse, HTMLResponse
|
|||||||
from sqlalchemy import create_engine
|
from sqlalchemy import create_engine
|
||||||
from sqlalchemy.orm import sessionmaker
|
from sqlalchemy.orm import sessionmaker
|
||||||
|
|
||||||
try:
|
from app.util import load_key, load_file
|
||||||
# Crypto | Cryptodome on Debian
|
|
||||||
from Crypto.PublicKey import RSA
|
|
||||||
from Crypto.PublicKey.RSA import RsaKey
|
|
||||||
except ModuleNotFoundError:
|
|
||||||
from Cryptodome.PublicKey import RSA
|
|
||||||
from Cryptodome.PublicKey.RSA import RsaKey
|
|
||||||
from orm import Origin, Lease, init as db_init
|
from orm import Origin, Lease, init as db_init
|
||||||
|
|
||||||
logger = logging.getLogger()
|
logger = logging.getLogger()
|
||||||
load_dotenv('../version.env')
|
load_dotenv('../version.env')
|
||||||
|
|
||||||
VERSION, COMMIT, DEBUG = getenv('VERSION', 'unknown'), getenv('COMMIT', 'unknown'), bool(getenv('DEBUG', False))
|
VERSION, COMMIT, DEBUG = env('VERSION', 'unknown'), env('COMMIT', 'unknown'), bool(env('DEBUG', False))
|
||||||
|
|
||||||
|
app = FastAPI(title='FastAPI-DLS', description='Minimal Delegated License Service (DLS).', version=VERSION)
|
||||||
def load_file(filename) -> bytes:
|
db = create_engine(str(env('DATABASE', 'sqlite:///db.sqlite')))
|
||||||
with open(filename, 'rb') as file:
|
|
||||||
content = file.read()
|
|
||||||
return content
|
|
||||||
|
|
||||||
|
|
||||||
def load_key(filename) -> RsaKey:
|
|
||||||
return RSA.import_key(extern_key=load_file(filename), passphrase=None)
|
|
||||||
|
|
||||||
|
|
||||||
# todo: initialize certificate (or should be done by user, and passed through "volumes"?)
|
|
||||||
|
|
||||||
__details = dict(
|
|
||||||
title='FastAPI-DLS',
|
|
||||||
description='Minimal Delegated License Service (DLS).',
|
|
||||||
version=VERSION,
|
|
||||||
)
|
|
||||||
|
|
||||||
app, db = FastAPI(**__details), create_engine(str(getenv('DATABASE', 'sqlite:///db.sqlite')))
|
|
||||||
db_init(db)
|
db_init(db)
|
||||||
|
|
||||||
DLS_URL = str(getenv('DLS_URL', 'localhost'))
|
DLS_URL = str(env('DLS_URL', 'localhost'))
|
||||||
DLS_PORT = int(getenv('DLS_PORT', '443'))
|
DLS_PORT = int(env('DLS_PORT', '443'))
|
||||||
SITE_KEY_XID = str(getenv('SITE_KEY_XID', '00000000-0000-0000-0000-000000000000'))
|
SITE_KEY_XID = str(env('SITE_KEY_XID', '00000000-0000-0000-0000-000000000000'))
|
||||||
INSTANCE_REF = str(getenv('INSTANCE_REF', '00000000-0000-0000-0000-000000000000'))
|
INSTANCE_REF = str(env('INSTANCE_REF', '00000000-0000-0000-0000-000000000000'))
|
||||||
INSTANCE_KEY_RSA = load_key(str(getenv('INSTANCE_KEY_RSA', join(dirname(__file__), 'cert/instance.private.pem'))))
|
INSTANCE_KEY_RSA = load_key(str(env('INSTANCE_KEY_RSA', join(dirname(__file__), 'cert/instance.private.pem'))))
|
||||||
INSTANCE_KEY_PUB = load_key(str(getenv('INSTANCE_KEY_PUB', join(dirname(__file__), 'cert/instance.public.pem'))))
|
INSTANCE_KEY_PUB = load_key(str(env('INSTANCE_KEY_PUB', join(dirname(__file__), 'cert/instance.public.pem'))))
|
||||||
TOKEN_EXPIRE_DELTA = relativedelta(hours=1) # days=1
|
TOKEN_EXPIRE_DELTA = relativedelta(hours=1) # days=1
|
||||||
LEASE_EXPIRE_DELTA = relativedelta(days=int(getenv('LEASE_EXPIRE_DAYS', 90)))
|
LEASE_EXPIRE_DELTA = relativedelta(days=int(env('LEASE_EXPIRE_DAYS', 90)))
|
||||||
|
|
||||||
CORS_ORIGINS = getenv('CORS_ORIGINS').split(',') if (getenv('CORS_ORIGINS')) else f'https://{DLS_URL}' # todo: prevent static https
|
CORS_ORIGINS = env('CORS_ORIGINS').split(',') if (env('CORS_ORIGINS')) else f'https://{DLS_URL}' # todo: prevent static https
|
||||||
|
|
||||||
jwt_encode_key = jwk.construct(INSTANCE_KEY_RSA.export_key().decode('utf-8'), algorithm=ALGORITHMS.RS256)
|
jwt_encode_key = jwk.construct(INSTANCE_KEY_RSA.export_key().decode('utf-8'), algorithm=ALGORITHMS.RS256)
|
||||||
jwt_decode_key = jwk.construct(INSTANCE_KEY_PUB.export_key().decode('utf-8'), algorithm=ALGORITHMS.RS256)
|
jwt_decode_key = jwk.construct(INSTANCE_KEY_PUB.export_key().decode('utf-8'), algorithm=ALGORITHMS.RS256)
|
||||||
|
17
app/util.py
Normal file
17
app/util.py
Normal file
@ -0,0 +1,17 @@
|
|||||||
|
try:
|
||||||
|
# Crypto | Cryptodome on Debian
|
||||||
|
from Crypto.PublicKey import RSA
|
||||||
|
from Crypto.PublicKey.RSA import RsaKey
|
||||||
|
except ModuleNotFoundError:
|
||||||
|
from Cryptodome.PublicKey import RSA
|
||||||
|
from Cryptodome.PublicKey.RSA import RsaKey
|
||||||
|
|
||||||
|
|
||||||
|
def load_file(filename) -> bytes:
|
||||||
|
with open(filename, 'rb') as file:
|
||||||
|
content = file.read()
|
||||||
|
return content
|
||||||
|
|
||||||
|
|
||||||
|
def load_key(filename) -> RsaKey:
|
||||||
|
return RSA.import_key(extern_key=load_file(filename), passphrase=None)
|
Loading…
Reference in New Issue
Block a user