1
0
Fork 0
mirror of https://github.com/Ysurac/openmptcprouter-vps-admin.git synced 2025-02-12 10:31:52 +00:00

Migrate from flask to fastapi

This commit is contained in:
Ysurac 2019-11-02 17:10:10 +00:00
parent 9ac484ebfd
commit 53fd153bad
2 changed files with 224 additions and 141 deletions

View file

@ -1,5 +1,12 @@
{
"user":"openmptcprouter",
"pass":"MySecretKey",
"port":65500
"port": 65500,
"users": [
{
"openmptcprouter": {
"username": "openmptcprouter",
"hashed_password": "MySecretKey",
"disabled": false
}
}
]
}

352
omr-admin.py Normal file → Executable file
View file

@ -1,5 +1,5 @@
#!/usr/bin/env python3
# Copyright (C) 2018 Ycarus (Yannick Chabanois) <ycarus@zugaina.org>
# Copyright (C) 2018-2019 Ycarus (Yannick Chabanois) <ycarus@zugaina.org>
#
# This is free software, licensed under the GNU General Public License v3.0.
# See /LICENSE for more information.
@ -15,29 +15,38 @@ import socket
import re
import hashlib
import time
from datetime import timedelta
import uvicorn
import jwt
from pprint import pprint
from datetime import datetime,timedelta
from tempfile import mkstemp
from typing import List
from shutil import move
from pprint import pprint
from netjsonconfig import OpenWrt
from flask import Flask, jsonify, request, session
from flask_jwt_simple import (
JWTManager, jwt_required, create_jwt, get_jwt_identity
)
from fastapi import Depends, FastAPI, HTTPException, Security
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm, SecurityScopes
from jwt import PyJWTError
from passlib.context import CryptContext
from pydantic import BaseModel, ValidationError
from starlette.status import HTTP_401_UNAUTHORIZED
#from flask import Flask, jsonify, request, session
#from flask_jwt_simple import (
# JWTManager, jwt_required, create_jwt, get_jwt_identity
#)
app = Flask(__name__)
import logging
log = logging.getLogger('werkzeug')
log.setLevel(logging.ERROR)
# Setup the Flask-JWT-Simple extension
log = logging.getLogger('api')
#log.setLevel(logging.ERROR)
log.setLevel(logging.DEBUG)
# Generate a random secret key
app.config['SECRET_KEY'] = uuid.uuid4().hex
app.config['JWT_SECRET_KEY'] = uuid.uuid4().hex
app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(hours=24)
jwt = JWTManager(app)
SECRET_KEY = uuid.uuid4().hex
JWT_SECRET_KEY = uuid.uuid4().hex
PERMANENT_SESSION_LIFETIME = timedelta(hours=24)
ACCESS_TOKEN_EXPIRE_MINUTES = 60
ALGORITHM = "HS256"
# Get main net interface
file = open('/etc/shorewall/params.net', "r")
@ -111,38 +120,115 @@ def set_lastchange():
with open('/etc/openmptcprouter-vps-admin/omr-admin-config.json','w') as outfile:
json.dump(data,outfile,indent=4)
with open('/etc/openmptcprouter-vps-admin/omr-admin-config.json') as f:
omr_config_data = json.load(f)
fake_users_db = omr_config_data['users'][0]
def verify_password(plain_password, hashed_password):
#return pwd_context.verify(plain_password, hashed_password)
log.debug("plain_password: " + plain_password + " - hashed_password: " + hashed_password)
if plain_password == hashed_password:
log.debug("password true")
return True
return False
def get_password_hash(password):
#return pwd_context.hash(password)
return password
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
def authenticate_user(fake_db, username: str, password: str):
user = get_user(fake_db, username)
if not user:
log.debug("user doesn't exist")
return False
if not verify_password(password, user.hashed_password):
log.debug("wrong password")
return False
return user
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: str = None
class User(BaseModel):
username: str
# email: str = None
# full_name: str = None
disabled: bool = None
class UserInDB(User):
hashed_password: str
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/token")
app = FastAPI()
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(
tokenUrl="/token",
scopes={"me": "Read information about the current user.", "items": "Read items."},
)
def create_access_token(*, data: dict, expires_delta: timedelta = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except PyJWTError:
raise credentials_exception
user = get_user(fake_users_db, username=token_data.username)
if user is None:
raise credentials_exception
return user
# Provide a method to create access tokens. The create_jwt()
# function is used to actually generate the token
@app.route('/login', methods=['POST'])
def login():
if not request.is_json:
return jsonify({"msg": "Missing JSON in request"}), 400
session.permanent = True
with open('/etc/openmptcprouter-vps-admin/omr-admin-config.json') as f:
omr_config_data = json.load(f)
params = request.get_json()
username = params.get('username', None)
password = params.get('password', None)
if not username:
return jsonify({"msg": "Missing username parameter"}), 400
if not password:
return jsonify({"msg": "Missing password parameter"}), 400
if username != omr_config_data["user"] or password != omr_config_data["pass"]:
return jsonify({"msg": "Bad username or password"}), 401
@app.post('/token', response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(status_code=400, detail="Incorrect username or password")
# Identity can be any data that is json serializable
ret = {'token': create_jwt(identity=username)}
return jsonify(ret), 200
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
# Get VPS status
@app.route('/status', methods=['GET'])
@jwt_required
@app.get('/status')
def status():
vps_loadavg = os.popen("cat /proc/loadavg | awk '{print $1\" \"$2\" \"$3}'").read().rstrip()
vps_uptime = os.popen("cat /proc/uptime | awk '{print $1}'").read().rstrip()
@ -150,14 +236,13 @@ def status():
mptcp_enabled = os.popen('sysctl -n net.mptcp.mptcp_enabled').read().rstrip()
if iface:
return jsonify({'vps': {'loadavg': vps_loadavg,'uptime': vps_uptime,'mptcp': mptcp_enabled,'hostname': vps_hostname}, 'network': {'tx': get_bytes('tx',iface),'rx': get_bytes('rx',iface)}}), 200
return {'vps': {'loadavg': vps_loadavg,'uptime': vps_uptime,'mptcp': mptcp_enabled,'hostname': vps_hostname}, 'network': {'tx': get_bytes('tx',iface),'rx': get_bytes('rx',iface)}}
else:
return jsonify({'error': 'No iface defined','route': 'status'}), 200
return {'error': 'No iface defined','route': 'status'}
# Get VPS config
@app.route('/config', methods=['GET'])
@jwt_required
def config():
@app.get('/config')
def config(current_user: User = Depends(get_current_user)):
with open('/etc/openmptcprouter-vps-admin/omr-admin-config.json') as f:
try:
omr_config_data = json.load(f)
@ -350,12 +435,11 @@ def config():
if '#DNAT net vpn:$OMR_ADDR tcp 1-64999' in line:
shorewall_redirect = "disable"
return jsonify({'vps': {'kernel': vps_kernel,'machine': vps_machine,'omr_version': vps_omr_version,'loadavg': vps_loadavg,'uptime': vps_uptime,'aes': vps_aes},'shadowsocks': {'key': shadowsocks_key,'port': shadowsocks_port,'method': shadowsocks_method,'fast_open': shadowsocks_fast_open,'reuse_port': shadowsocks_reuse_port,'no_delay': shadowsocks_no_delay,'mptcp': shadowsocks_mptcp,'ebpf': shadowsocks_ebpf,'obfs': shadowsocks_obfs,'obfs_plugin': shadowsocks_obfs_plugin,'obfs_type': shadowsocks_obfs_type},'glorytun': {'key': glorytun_key,'udp': {'host_ip': glorytun_udp_host_ip,'client_ip': glorytun_udp_client_ip},'tcp': {'host_ip': glorytun_tcp_host_ip,'client_ip': glorytun_tcp_client_ip},'port': glorytun_port,'chacha': glorytun_chacha},'dsvpn': {'key': dsvpn_key, 'host_ip': dsvpn_host_ip, 'client_ip': dsvpn_client_ip, 'port': dsvpn_port},'openvpn': {'key': openvpn_key,'client_key': openvpn_client_key,'client_crt': openvpn_client_crt,'client_ca': openvpn_client_ca,'host_ip': openvpn_host_ip, 'client_ip': openvpn_client_ip, 'port': openvpn_port},'mlvpn': {'key': mlvpn_key, 'host_ip': mlvpn_host_ip, 'client_ip': mlvpn_client_ip},'shorewall': {'redirect_ports': shorewall_redirect},'mptcp': {'enabled': mptcp_enabled,'checksum': mptcp_checksum,'path_manager': mptcp_path_manager,'scheduler': mptcp_scheduler, 'syn_retries': mptcp_syn_retries},'network': {'congestion_control': congestion_control,'ipv6_network': ipv6_network,'ipv6': ipv6_addr,'ipv4': ipv4_addr,'domain': vps_domain},'vpn': {'available': available_vpn,'current': vpn},'iperf': {'user': 'openmptcprouter','password': 'openmptcprouter', 'key': iperf3_key},'pihole': {'state': pihole}}), 200
return {'vps': {'kernel': vps_kernel,'machine': vps_machine,'omr_version': vps_omr_version,'loadavg': vps_loadavg,'uptime': vps_uptime,'aes': vps_aes},'shadowsocks': {'key': shadowsocks_key,'port': shadowsocks_port,'method': shadowsocks_method,'fast_open': shadowsocks_fast_open,'reuse_port': shadowsocks_reuse_port,'no_delay': shadowsocks_no_delay,'mptcp': shadowsocks_mptcp,'ebpf': shadowsocks_ebpf,'obfs': shadowsocks_obfs,'obfs_plugin': shadowsocks_obfs_plugin,'obfs_type': shadowsocks_obfs_type},'glorytun': {'key': glorytun_key,'udp': {'host_ip': glorytun_udp_host_ip,'client_ip': glorytun_udp_client_ip},'tcp': {'host_ip': glorytun_tcp_host_ip,'client_ip': glorytun_tcp_client_ip},'port': glorytun_port,'chacha': glorytun_chacha},'dsvpn': {'key': dsvpn_key, 'host_ip': dsvpn_host_ip, 'client_ip': dsvpn_client_ip, 'port': dsvpn_port},'openvpn': {'key': openvpn_key,'client_key': openvpn_client_key,'client_crt': openvpn_client_crt,'client_ca': openvpn_client_ca,'host_ip': openvpn_host_ip, 'client_ip': openvpn_client_ip, 'port': openvpn_port},'mlvpn': {'key': mlvpn_key, 'host_ip': mlvpn_host_ip, 'client_ip': mlvpn_client_ip},'shorewall': {'redirect_ports': shorewall_redirect},'mptcp': {'enabled': mptcp_enabled,'checksum': mptcp_checksum,'path_manager': mptcp_path_manager,'scheduler': mptcp_scheduler, 'syn_retries': mptcp_syn_retries},'network': {'congestion_control': congestion_control,'ipv6_network': ipv6_network,'ipv6': ipv6_addr,'ipv4': ipv4_addr,'domain': vps_domain},'vpn': {'available': available_vpn,'current': vpn},'iperf': {'user': 'openmptcprouter','password': 'openmptcprouter', 'key': iperf3_key},'pihole': {'state': pihole}}
# Set shadowsocks config
@app.route('/shadowsocks', methods=['POST'])
@jwt_required
def shadowsocks():
@app.get('/shadowsocks')
def shadowsocks(current_user: User = Depends(get_current_user)):
with open('/etc/shadowsocks-libev/config.json') as f:
content = f.read()
content = re.sub(",\s*}","}",content)
@ -390,7 +474,7 @@ def shadowsocks():
vps_domain = os.popen('wget -4 -qO- -T 2 http://hostname.openmptcprouter.com').read().rstrip()
if port is None or method is None or fast_open is None or reuse_port is None or no_delay is None or key is None:
return jsonify({'result': 'error','reason': 'Invalid parameters','route': 'shadowsocks'})
return {'result': 'error','reason': 'Invalid parameters','route': 'shadowsocks'}
if obfs:
if obfs_plugin == 'v2ray':
if obfs_type == 'tls':
@ -420,18 +504,17 @@ def shadowsocks():
shorewall_add_port(str(port),'tcp','shadowsocks')
shorewall_add_port(str(port),'udp','shadowsocks')
set_lastchange()
return jsonify({'result': 'done','reason': 'changes applied','route': 'shadowsocks'})
return {'result': 'done','reason': 'changes applied','route': 'shadowsocks'}
else:
return jsonify({'result': 'done','reason': 'no changes','route': 'shadowsocks'})
return {'result': 'done','reason': 'no changes','route': 'shadowsocks'}
# Set shorewall config
@app.route('/shorewall', methods=['POST'])
@jwt_required
def shorewall():
@app.post('/shorewall')
def shorewall(current_user: User = Depends(get_current_user)):
params = request.get_json()
state = params.get('redirect_ports', None)
if state is None:
return jsonify({'result': 'error','reason': 'Invalid parameters','route': 'shorewall'})
return {'result': 'error','reason': 'Invalid parameters','route': 'shorewall'}
initial_md5 = hashlib.md5(file_as_bytes(open('/etc/shorewall/rules', 'rb'))).hexdigest()
fd, tmpfile = mkstemp()
with open('/etc/shorewall/rules','r') as f, open(tmpfile,'a+') as n:
@ -452,52 +535,48 @@ def shorewall():
if not initial_md5 == final_md5:
os.system("systemctl -q reload shorewall")
# Need to do the same for IPv6...
return jsonify({'result': 'done','reason': 'changes applied'})
return {'result': 'done','reason': 'changes applied'}
@app.route('/shorewalllist', methods=['POST'])
@jwt_required
def shorewall_list():
@app.post('/shorewalllist')
def shorewall_list(current_user: User = Depends(get_current_user)):
params = request.get_json()
name = params.get('name', None)
if name is None:
return jsonify({'result': 'error','reason': 'Invalid parameters','route': 'shorewalllist'})
return {'result': 'error','reason': 'Invalid parameters','route': 'shorewalllist'}
fwlist = []
with open('/etc/shorewall/rules','r') as f:
for line in f:
if '# OMR ' + name in line:
fwlist.append(line)
return jsonify({'list': fwlist})
return {'list': fwlist}
@app.route('/shorewallopen', methods=['POST'])
@jwt_required
def shorewall_open():
@app.post('/shorewallopen')
def shorewall_open(current_user: User = Depends(get_current_user)):
params = request.get_json()
name = params.get('name', None)
port = params.get('port', None)
proto = params.get('proto', None)
fwtype = params.get('fwtype', None)
if name is None:
return jsonify({'result': 'error','reason': 'Invalid parameters','route': 'shorewalllist'})
return {'result': 'error','reason': 'Invalid parameters','route': 'shorewalllist'}
shorewall_add_port(str(port),proto,name,fwtype)
return jsonify({'result': 'done','reason': 'changes applied'})
return {'result': 'done','reason': 'changes applied'}
@app.route('/shorewallclose', methods=['POST'])
@jwt_required
def shorewall_close():
@app.post('/shorewallclose')
def shorewall_close(current_user: User = Depends(get_current_user)):
params = request.get_json()
name = params.get('name', None)
port = params.get('port', None)
proto = params.get('proto', None)
fwtype = params.get('fwtype', None)
if name is None:
return jsonify({'result': 'error','reason': 'Invalid parameters','route': 'shorewalllist'})
return {'result': 'error','reason': 'Invalid parameters','route': 'shorewalllist'}
shorewall_del_port(str(port),proto,name,fwtype)
return jsonify({'result': 'done','reason': 'changes applied'})
return {'result': 'done','reason': 'changes applied'}
# Set MPTCP config
@app.route('/mptcp', methods=['POST'])
@jwt_required
def mptcp():
@app.post('/mptcp')
def mptcp(current_user: User = Depends(get_current_user)):
params = request.get_json()
checksum = params.get('checksum', None)
path_manager = params.get('path_manager', None)
@ -505,38 +584,41 @@ def mptcp():
syn_retries = params.get('syn_retries', None)
congestion_control = params.get('congestion_control', None)
if not checksum or not path_manager or not scheduler or not syn_retries or not congestion_control:
return jsonify({'result': 'error','reason': 'Invalid parameters','route': 'mptcp'})
return {'result': 'error','reason': 'Invalid parameters','route': 'mptcp'}
os.system('sysctl -qw net.mptcp.mptcp_checksum=' + checksum)
os.system('sysctl -qw net.mptcp.mptcp_path_manager=' + path_manager)
os.system('sysctl -qw net.mptcp.mptcp_scheduler=' + scheduler)
os.system('sysctl -qw net.mptcp.mptcp_syn_retries=' + syn_retries)
os.system('sysctl -qw net.ipv4.tcp_congestion_control=' + congestion_control)
set_lastchange()
return jsonify({'result': 'done','reason': 'changes applied'})
return {'result': 'done','reason': 'changes applied'}
class Vpn(BaseModel):
vpn: str
# Set global VPN config
@app.route('/vpn', methods=['POST'])
@jwt_required
def vpn():
params = request.get_json()
vpn = params.get('vpn', None)
@app.post('/vpn')
def vpn(*,vpnconfig: Vpn,current_user: User = Depends(get_current_user)):
vpn = vpnconfig.vpn
log.debug("VPN !")
if not vpn:
return jsonify({'result': 'error','reason': 'Invalid parameters','route': 'vpn'})
return {'result': 'error','reason': 'Invalid parameters','route': 'vpn'}
os.system('echo ' + vpn + ' > /etc/openmptcprouter-vps-admin/current-vpn')
set_lastchange()
return jsonify({'result': 'done','reason': 'changes applied'})
return {'result': 'done','reason': 'changes applied'}
class GlorytunConfig(BaseModel):
key: str
port: int
chacha: bool
# Set Glorytun config
@app.route('/glorytun', methods=['POST'])
@jwt_required
def glorytun():
params = request.get_json()
key = params.get('key', None)
port = params.get('port', None)
chacha = params.get('chacha', True)
if not key or port is None:
return jsonify({'result': 'error','reason': 'Invalid parameters','route': 'glorytun'})
@app.post('/glorytun')
def glorytun(*, glorytunconfig: GlorytunConfig,current_user: User = Depends(get_current_user)):
key = glorytunconfig.key
port = glorytunconfig.port
chacha = glorytunconfig.chacha
initial_md5 = hashlib.md5(file_as_bytes(open('/etc/glorytun-tcp/tun0', 'rb'))).hexdigest()
with open('/etc/glorytun-tcp/tun0.key','w') as outfile:
outfile.write(key)
@ -579,17 +661,16 @@ def glorytun():
os.system("systemctl -q restart glorytun-udp@tun0")
shorewall_add_port(str(port),'tcp','glorytun')
set_lastchange()
return jsonify({'result': 'done'})
return {'result': 'done'}
# Set A Dead Simple VPN config
@app.route('/dsvpn', methods=['POST'])
@jwt_required
def dsvpn():
@app.post('/dsvpn')
def dsvpn(current_user: User = Depends(get_current_user)):
params = request.get_json()
key = params.get('key', None)
port = params.get('port', None)
if not key or port is None:
return jsonify({'result': 'error','reason': 'Invalid parameters','route': 'dsvpn'})
return {'result': 'error','reason': 'Invalid parameters','route': 'dsvpn'}
initial_md5 = hashlib.md5(file_as_bytes(open('/etc/dsvpn/dsvpn.key', 'rb'))).hexdigest()
with open('/etc/dsvpn/dsvpn.key','w') as outfile:
outfile.write(key)
@ -598,16 +679,15 @@ def dsvpn():
os.system("systemctl -q restart dsvpn-server")
shorewall_add_port(str(port),'tcp','dsvpn')
set_lastchange()
return jsonify({'result': 'done'})
return {'result': 'done'}
# Set OpenVPN config
@app.route('/openvpn', methods=['POST'])
@jwt_required
def openvpn():
@app.post('/openvpn')
def openvpn(current_user: User = Depends(get_current_user)):
params = request.get_json()
key = params.get('key', None)
if not key:
return jsonify({'result': 'error','reason': 'Invalid parameters','route': 'openvpn'})
return {'result': 'error','reason': 'Invalid parameters','route': 'openvpn'}
initial_md5 = hashlib.md5(file_as_bytes(open('/etc/openvpn/server/static.key', 'rb'))).hexdigest()
with open('/etc/openvpn/server/static.key','w') as outfile:
outfile.write(base64.b64decode(key))
@ -615,16 +695,17 @@ def openvpn():
if not initial_md5 == final_md5:
os.system("systemctl -q restart openvpn@tun0")
set_lastchange()
return jsonify({'result': 'done'})
return {'result': 'done'}
class Wanips(BaseModel):
ips: str
# Set WANIP
@app.route('/wan', methods=['POST'])
@jwt_required
def wan():
params = request.get_json()
ips = params.get('ips', None)
@app.post('/wan')
def wan(*, wanips: Wanips,current_user: User = Depends(get_current_user)):
ips = wanips.ips
if not ips:
return jsonify({'result': 'error','reason': 'Invalid parameters','route': 'wan'})
return {'result': 'error','reason': 'Invalid parameters','route': 'wan'}
initial_md5 = hashlib.md5(file_as_bytes(open('/etc/shadowsocks-libev/local.acl', 'rb'))).hexdigest()
with open('/etc/shadowsocks-libev/local.acl','w') as outfile:
outfile.write('[white_list]\n')
@ -635,61 +716,55 @@ def wan():
#for x in range (1,os.cpu_count()):
#os.system("systemctl restart shadowsocks-libev-server@config" + str(x) + ".service")
return jsonify({'result': 'done'})
return {'result': 'done'}
# Update VPS
@app.route('/update', methods=['GET'])
@jwt_required
def update():
@app.get('/update')
def update(current_user: User = Depends(get_current_user)):
os.system("wget -O - http://www.openmptcprouter.com/server/debian9-x86_64.sh | sh")
# Need to reboot if kernel change
return jsonify({'result': 'done'})
return {'result': 'done'}
# Backup
@app.route('/backuppost', methods=['POST'])
@jwt_required
def backuppost():
@app.post('/backuppost')
def backuppost(current_user: User = Depends(get_current_user)):
params = request.get_json()
backup_file = params.get('data', None)
if not backup_file:
return jsonify({'result': 'error','reason': 'Invalid parameters','route': 'backuppost'})
return {'result': 'error','reason': 'Invalid parameters','route': 'backuppost'}
with open('/var/opt/openmptcprouter/backup.tar.gz','wb') as f:
f.write(base64.b64decode(backup_file))
return jsonify({'result': 'done'})
return {'result': 'done'}
@app.route('/backupget', methods=['GET'])
@jwt_required
def send_backup():
@app.get('/backupget')
def send_backup(current_user: User = Depends(get_current_user)):
with open('/var/opt/openmptcprouter/backup.tar.gz',"rb") as backup_file:
file_base64 = base64.b64encode(backup_file.read())
file_base64utf = file_base64.decode('utf-8')
return jsonify({'data': file_base64utf})
return {'data': file_base64utf}
@app.route('/backuplist', methods=['GET'])
@jwt_required
def list_backup():
@app.get('/backuplist')
def list_backup(current_user: User = Depends(get_current_user)):
if os.path.isfile('/var/opt/openmptcprouter/backup.tar.gz'):
modiftime = os.path.getmtime('/var/opt/openmptcprouter/backup.tar.gz')
return jsonify({'backup': True, 'modif': modiftime})
return {'backup': True, 'modif': modiftime}
else:
return jsonify({'backup': False})
return {'backup': False}
@app.route('/backupshow', methods=['GET'])
@jwt_required
def show_backup():
@app.get('/backupshow')
def show_backup(current_user: User = Depends(get_current_user)):
if os.path.isfile('/var/opt/openmptcprouter/backup.tar.gz'):
router = OpenWrt(native=open('/var/opt/openmptcprouter/backup.tar.gz'))
return jsonify({'backup': True,'data': router})
return {'backup': True,'data': router}
else:
return jsonify({'backup': False})
return {'backup': False}
@app.route('/backupedit', methods=['POST'])
@jwt_required
def edit_backup():
@app.post('/backupedit')
def edit_backup(current_user: User = Depends(get_current_user)):
params = request.get_data()
o = OpenWrt(params)
o.write('backup',path='/var/opt/openmptcprouter/')
return jsonify({'result': 'done'})
return {'result': 'done'}
if __name__ == '__main__':
@ -698,4 +773,5 @@ if __name__ == '__main__':
omrport=65500
if 'port' in omr_config_data:
omrport = omr_config_data["port"]
app.run(host='0.0.0.0',port=omrport,ssl_context=('/etc/openmptcprouter-vps-admin/cert.pem','/etc/openmptcprouter-vps-admin/key.pem'),threaded=True)
uvicorn.run(app,host='0.0.0.0',port=omrport,log_level='debug',ssl_certfile='/etc/openmptcprouter-vps-admin/cert.pem',ssl_keyfile='/etc/openmptcprouter-vps-admin/key.pem')
# uvicorn.run(app,host='0.0.0.0',port=omrport,ssl_context=('/etc/openmptcprouter-vps-admin/cert.pem','/etc/openmptcprouter-vps-admin/key.pem'),threaded=True)