1
0
Fork 0
mirror of https://github.com/Ysurac/openmptcprouter-vps-admin.git synced 2025-03-09 15:40:05 +00:00

Fix for firewall rules deletion and small changes

This commit is contained in:
Ycarus 2020-11-23 16:54:46 +01:00
parent 580cf442b6
commit 3f7c4ebb95

View file

@ -148,6 +148,31 @@ def get_username_from_userid(userid):
return user return user
return '' return ''
def check_username_serial(username, serial):
with open('/etc/openmptcprouter-vps-admin/omr-admin-config.json') as f:
content = f.read()
content = re.sub(",\s*}", "}", content) # pylint: disable=W1401
try:
data = json.loads(content)
except ValueError as e:
return {'error': 'Config file not readable', 'route': 'check_serial'}
if 'serial_enforce' not in content or content['serial_enforce'] == False:
return True
if 'serial' not in content['users'][0][username]:
content['users'][0][username]['serial'] = serial
with open('/etc/openmptcprouter-vps-admin/omr-admin-config.json', 'w') as outfile:
json.dump(content, outfile, indent=4)
return True
if content['users'][0][username]['serial'] == serial:
return True
if 'serial_error' not in content['users'][0][username]:
content['users'][0][username]['serial_error'] = 0
else:
content['users'][0][username]['serial_error'] = content['users'][0][username]['serial_error'] + 1
with open('/etc/openmptcprouter-vps-admin/omr-admin-config.json', 'w') as outfile:
json.dump(content, outfile, indent=4)
return False
def set_global_param(key, value): def set_global_param(key, value):
with open('/etc/openmptcprouter-vps-admin/omr-admin-config.json') as f: with open('/etc/openmptcprouter-vps-admin/omr-admin-config.json') as f:
content = f.read() content = f.read()
@ -241,6 +266,7 @@ def v2ray_add_user(user, restart=1):
final_md5 = hashlib.md5(file_as_bytes(open('/etc/v2ray/v2ray-server.json', 'rb'))).hexdigest() final_md5 = hashlib.md5(file_as_bytes(open('/etc/v2ray/v2ray-server.json', 'rb'))).hexdigest()
if initial_md5 != final_md5 and restart == 1: if initial_md5 != final_md5 and restart == 1:
os.system("systemctl -q restart v2ray") os.system("systemctl -q restart v2ray")
return v2rayuuid
def v2ray_del_user(user, restart=1): def v2ray_del_user(user, restart=1):
v2rayuuid = str(uuid.uuid1()) v2rayuuid = str(uuid.uuid1())
@ -626,6 +652,7 @@ def shorewall_del_port(username, port, proto, name, fwtype='ACCEPT', source_dip=
elif fwtype == 'DNAT' and not port + ' # OMR redirect ' + name + ' port ' + proto in line and not port + ' # OMR ' + username + ' redirect ' + name + ' port ' + proto in line: elif fwtype == 'DNAT' and not port + ' # OMR redirect ' + name + ' port ' + proto in line and not port + ' # OMR ' + username + ' redirect ' + name + ' port ' + proto in line:
n.write(line) n.write(line)
else: else:
comment = ''
if source_dip != '': if source_dip != '':
comment = ' to ' + source_dip comment = ' to ' + source_dip
if dest_ip != '': if dest_ip != '':
@ -967,13 +994,16 @@ async def status(request: Request):
# Get VPS status # Get VPS status
@app.get('/status', summary="Get current server load average, uptime and release") @app.get('/status', summary="Get current server load average, uptime and release")
async def status(userid: Optional[int] = Query(None), current_user: User = Depends(get_current_user)): async def status(userid: Optional[int] = Query(None), serial: Optional[str] = Query(None), current_user: User = Depends(get_current_user)):
LOG.debug('Get status...') LOG.debug('Get status...')
if not current_user.permissions == "admin": if not current_user.permissions == "admin":
userid = current_user.userid userid = current_user.userid
if userid is None: if userid is None:
userid = 0 userid = 0
username = get_username_from_userid(userid) username = get_username_from_userid(userid)
if not current_user.permissions == "admin" and serial is not None:
if not check_username_serial(username, serial):
return {'error': 'False serial number'}
vps_loadavg = os.popen("cat /proc/loadavg | awk '{print $1\" \"$2\" \"$3}'").read().rstrip() vps_loadavg = os.popen("cat /proc/loadavg | awk '{print $1\" \"$2\" \"$3}'").read().rstrip()
vps_uptime = os.popen("cat /proc/uptime | awk '{print $1}'").read().rstrip() vps_uptime = os.popen("cat /proc/uptime | awk '{print $1}'").read().rstrip()
vps_hostname = socket.gethostname() vps_hostname = socket.gethostname()
@ -1021,13 +1051,16 @@ async def status(userid: Optional[int] = Query(None), current_user: User = Depen
# Get VPS config # Get VPS config
@app.get('/config', summary="Get full server configuration for current user") @app.get('/config', summary="Get full server configuration for current user")
async def config(userid: Optional[int] = Query(None), current_user: User = Depends(get_current_user)): async def config(userid: Optional[int] = Query(None), serial: Optional[str] = Query(None), current_user: User = Depends(get_current_user)):
LOG.debug('Get config...') LOG.debug('Get config...')
if not current_user.permissions == "admin": if not current_user.permissions == "admin":
userid = current_user.userid userid = current_user.userid
if userid is None: if userid is None:
userid = 0 userid = 0
username = get_username_from_userid(userid) username = get_username_from_userid(userid)
if not current_user.permissions == "admin" and serial is not None:
if not check_username_serial(username, serial):
return {'error': 'False serial number'}
with open('/etc/openmptcprouter-vps-admin/omr-admin-config.json') as f: with open('/etc/openmptcprouter-vps-admin/omr-admin-config.json') as f:
try: try:
omr_config_data = json.load(f) omr_config_data = json.load(f)
@ -2330,6 +2363,16 @@ def client2client(*, params: ClienttoClient, current_user: User = Depends(get_cu
os.system("systemctl -q reload shorewall") os.system("systemctl -q reload shorewall")
return {'result': 'done'} return {'result': 'done'}
class SerialEnforce(BaseModel):
enable: bool = False
@app.post('/serialenforce', summary="Enable client serial number control")
def serialenforce(*, params: SerialEnforce, current_user: User = Depends(get_current_user)):
if not current_user.permissions == "admin":
return {'result': 'permission', 'reason': 'Need admin user', 'route': 'serialenforce'}
set_global_param('serial_enforce', params.enable)
return {'result': 'done'}
@app.get('/list_users', summary="List all users") @app.get('/list_users', summary="List all users")
async def list_users(current_user: User = Depends(get_current_user)): async def list_users(current_user: User = Depends(get_current_user)):
if not current_user.permissions == "admin": if not current_user.permissions == "admin":