1
0
Fork 0
mirror of https://github.com/iiab/iiab.git synced 2025-03-09 15:40:17 +00:00
iiab/roles/xsce-admin/files/cmdsrv/xsce-cmdsrv
2017-05-27 11:09:50 -07:00

2075 lines
69 KiB
Python

#!/usr/bin/python
"""
XSCE Multi-threaded Polling Command server
Author: Tim Moody <tim(at)timmoody(dot)com>
Contributions: Guillaume Aubert (gaubert) <guillaume(dot)aubert(at)gmail(dot)com>
Felipe Cruz <felipecruz@loogica.net>
Accepts commands in the form COMMAND <optional json-encoded arguments>
Returns json-encoded results
"""
import os, sys, syslog, signal
import pwd, grp
import time
from datetime import date, datetime, timedelta
import threading, subprocess
import shlex
import zmq
import sqlite3
import json
import xml.etree.ElementTree as ET
import yaml
import ConfigParser
import re
import urllib2
import string
import crypt
import spwd
import cracklib
# Config Files
xsce_config_file = "/etc/xsce/xsce.env"
xsce_cmdsrv_config_file = "/etc/xsce/cmdsrv.conf"
xsce_ini_file = "/etc/xsce/xsce.ini"
# Variables that should be read from config file
# All of these variables will be read from config files and recomputed in init()
xsce_git_path = "/root/xsce"
xsce_base_path = "/opt/schoolserver"
xsce_cmdsrv_dbname = "xsce_cmdsrv.0.2.db"
cmdsrv_no_workers = 5
cmdsrv_job_poll_sleep_interval = 1
cmdsrv_max_concurrent_jobs = 7
# Derived Variables
xsce_cmdsrv_path = xsce_base_path + "/xsce_cmdsrv"
xsce_cmdsrv_pid_file = "/var/run/xsce-cmdsrv.pid"
xsce_cmdsrv_ready_file = "/var/run/xsce-cmdsrv-ready"
xsce_cmdsrv_dbpath = xsce_cmdsrv_path + "/" + xsce_cmdsrv_dbname
# Constants - ToDo add to xsce_cmdsrv_config_file
# for kiwix zims
kiwix_catalog_file = "/etc/xsce/kiwix_catalog.json"
zim_downloads_dir = "/library/downloads/zims/"
zim_download_prefix = "kiwix-0.9+"
zim_working_dir = "/library/working/zims/"
zim_dir = "/library/zims"
zim_content_dir = "/library/zims/content/"
zim_index_dir = "/library/zims/index/"
rachel_downloads_dir = "/library/downloads/"
rachel_working_dir = "/library/working/"
rachel_version = 'rachelusb_32EN_3_1_5'
# Global Variables
last_command_rowid = 0
last_job_rowid = 0
active_commands = {}
zims_wip = {}
jobs_requested = {}
jobs_to_restart = {}
jobs_to_cancel = {}
prereq_jobs = {}
jobs_running = {}
running_job_count = 0
ansible_running_flag = False
daemon_mode = False
init_error = False
# Locking
lock = threading.Lock() # for updating global variables
db_lock = threading.Lock() # for sqlite db concurrency
# vars read from ansible vars directory
# effective is composite where local takes precedence
default_vars = {}
local_vars = {}
effective_vars = {}
ansible_facts = {}
ansible_tags = {}
xsce_ini = {}
kiwix_catalog = {}
# vars set by admin-console
config_vars = {}
# available commands are in cmd_handler
def main():
"""Server routine"""
#global daemon_mode
#daemon_mode = True # for testing
global init_error
# if not in daemon mode don't trap errors
if daemon_mode == False:
init()
else:
try:
init()
except:
init_error = True
log(syslog.LOG_INFO, 'XSCE Command Server Initialization Failed' )
worker_data_url = "inproc://worker_data"
worker_control_url = "inproc://worker_control"
ipc_sock = "/run/cmdsrv_sock"
client_url = "ipc://" + ipc_sock
owner = pwd.getpwnam(apache_data)
group = grp.getgrnam("xsce-admin")
# Prepare our context and sockets
context = zmq.Context.instance()
# Socket to talk to clients
clients = context.socket(zmq.ROUTER)
clients.bind(client_url)
os.chown(ipc_sock, owner.pw_uid, group.gr_gid)
os.chmod(ipc_sock, 0770)
# Socket to talk to workers
workers_data = context.socket(zmq.DEALER)
workers_data.bind(worker_data_url)
workers_control = context.socket(zmq.PUB)
workers_control.bind(worker_control_url)
# Launch thread to monitor jobs
thread = threading.Thread(target=job_minder_thread, args=(client_url, worker_control_url,))
thread.start()
# Launch pool of worker threads
for i in range(cmdsrv_no_workers):
thread = threading.Thread(target=cmd_proc_thread, args=(worker_data_url, worker_control_url,))
thread.start()
poll = zmq.Poller()
poll.register(clients, zmq.POLLIN)
poll.register(workers_data, zmq.POLLIN)
server_run = True
set_ready_flag("ON")
while server_run == True:
sockets = dict(poll.poll())
if clients in sockets:
ident, msg = clients.recv_multipart()
tprint('sending message server received from client to worker %s id %s' % (msg, ident))
if msg == "STOP":
# Tell the worker threads to shut down
set_ready_flag("OFF")
tprint('sending control message server received from client to worker %s id %s' % (msg, ident))
workers_control.send("EXIT")
clients.send_multipart([ident, '{"Status": "Stopping"}'])
log(syslog.LOG_INFO, 'Stopping XSCE Command Server')
time.sleep(3)
server_run = False
else:
# if in daemon mode report and init failed alwasy return same error message
if daemon_mode == True and init_error == True:
msg = '{"Error": "XSCE-CMDSRV failed to initialize","Alert": "True"}'
clients.send_multipart([ident, msg])
else:
tprint('sending data message server received from client to worker %s id %s' % (msg, ident))
log(syslog.LOG_INFO, 'Received CMD Message %s.' % msg )
workers_data.send_multipart([ident, msg])
if workers_data in sockets:
ident, msg = workers_data.recv_multipart()
tprint('Sending worker message to client %s id %s' % (msg[:60], ident))
clients.send_multipart([ident, msg])
# Clean up if server is stopped
clients.close()
workers_data.close()
workers_control.close()
context.term()
# Delete any pid file to keep systemd happy
try:
os.remove(xsce_cmdsrv_pid_file)
except OSError:
pass
sys.exit()
def job_minder_thread(client_url, worker_control_url, context=None):
"""Job Monitoring Worker Routine"""
global jobs_requested # queue from command processor
global jobs_to_restart # queue from incomplete jobs in init
global jobs_to_cancel # queue of scheduled or running jobs to be cancellled from cancel command
global prereq_jobs # status of predecessor job = depend_on_job_id; used to cascade job steps; we do not support multiple descendents
global jobs_running # queue of jobs running, started from jobs requested
global ansible_running_flag
global running_job_count
tprint ("in job_minder_thread")
log(syslog.LOG_INFO, 'Job Minder Thread Started')
# working lists
jobs_requested_list = [] # just the job_id from jobs_requested
jobs_requested_done = [] # list of processed job_id from jobs_requested
prereq_jobs_to_clear = [] # prerequisite jobs whose descendent has been processed
jobs_to_close = [] # list of running jobs that have completed or been cancelled
prereq_info = {}
# control signals from main thread
context = context or zmq.Context.instance()
control_socket = context.socket(zmq.SUB)
control_socket.connect(worker_control_url)
control_socket.setsockopt(zmq.SUBSCRIBE,"")
# Socket to send job status back to main
resp_socket = context.socket(zmq.DEALER)
resp_socket.connect(client_url)
poll = zmq.Poller()
poll.register(control_socket, zmq.POLLIN)
# Restart any incomplete jobs found during init
for job_id in jobs_to_restart:
job_info = jobs_to_restart[job_id]
if job_info['has_dependent'] == "Y":
prereq_info ['status'] = 'STARTED' # for both 'STARTED' and 'RESTARTED'
prereq_jobs[job_id] = prereq_info
job_info = start_job(job_id, job_info, status='RESTARTED')
jobs_running[job_id] = job_info
jobs_to_restart = {}
thread_run = True
# Main loop of XSCE-CMDSRV
while thread_run == True:
lock.acquire() # will block if lock is already held
try:
jobs_requested_list = jobs_requested.keys() # create copy of keys so we can update global dictionary in loop
finally:
lock.release() # release lock, no matter what
# Go through each job in requested queue and conditionally start or cancel
# These are marked Scheduled
jobs_requested_done = [] # list of processed job_ids from jobs_requested to be cleared later
jobs_requested_list.sort()
for job_id in jobs_requested_list:
job_info = jobs_requested[job_id]
#print "starting cancel of requested"
# Remove cancelled jobs
if job_id in jobs_to_cancel:
job_info = cancel_req_job(job_id, job_info)
jobs_requested[job_id] = job_info
jobs_requested_done.append(job_id)
# we will pop from jobs_to_cancel later when clearing jobs_requested
continue
# don't create ansible job if one is running
if job_info['cmd'] in ["RUN-ANSIBLE", "RESET-NETWORK"] and ansible_running_flag == True:
continue
# don't start job if at max allowed
if running_job_count >= cmdsrv_max_concurrent_jobs:
continue
#print "starting prereq check"
# don't start job if it depends on another job that is not finished
depend_on_job_id = job_info['depend_on_job_id']
if depend_on_job_id in prereq_jobs:
prereq_status = prereq_jobs[depend_on_job_id]['status']
if prereq_status == 'SCHEDULED' or prereq_status == 'STARTED':
continue # successor step can't start yet
else:
if prereq_status == 'SUCCEEDED':
job_info = start_job(job_id, job_info) # Create running job
jobs_running[job_id] = job_info
if job_id in prereq_jobs:
prereq_jobs[job_id]['status'] = 'STARTED'
jobs_requested_done.append(job_id)
prereq_jobs_to_clear.append(depend_on_job_id) # mark for deletion
else: # predecessor failed or was cancelled
job_info = cancel_req_job(job_id, job_info)
jobs_requested_done.append(job_id)
prereq_jobs_to_clear.append(depend_on_job_id) # mark for deletion
else: # not a multi-step job or first step of multi
job_info = start_job(job_id, job_info) # Create running job
jobs_running[job_id] = job_info
jobs_requested_done.append(job_id)
#print 'starting clear'
# Clear started or cancelled jobs from requested queue and prereq dict
for job_id in jobs_requested_done:
jobs_requested.pop(job_id, None)
if job_id in jobs_to_cancel:
jobs_to_cancel.pop(job_id, None)
jobs_requested_done = []
# clear prerequisites for started or failed jobs from prereq_jobs
for depend_on_job_id in prereq_jobs_to_clear:
prereq_jobs.pop (depend_on_job_id, None)
prereq_jobs_to_clear = []
#print 'starting poll'
# poll jobs_running for completed jobs
for job_id in jobs_running:
jobs_running[job_id]['subproc'].poll()
returncode = jobs_running[job_id]['subproc'].returncode
if returncode == None:
tprint (str(job_id) + ' still running.')
# Cancel job if requested
if job_id in jobs_to_cancel:
tprint (str(job_id) + ' cancelled.')
p = jobs_running[job_id]['subproc']
p.send_signal(signal.SIGINT)
t = 0
while t < 5:
rc = p.poll()
if rc != None:
break
time.sleep(1)
t += 1
if rc == None:
p.kill()
job_info = end_job(job_id, job_info, 'CANCELLED')
jobs_to_close.append(job_id)
upd_job_cancelled(job_id)
else:
tprint (str(job_id) + ' terminated.')
# job_info['status_datetime'] = str(datetime.now()) ?
if returncode == 0:
status = 'SUCCEEDED'
else:
status = 'FAILED'
job_info = end_job(job_id, job_info, status)
# flag job for removal
jobs_to_close.append(job_id)
#print 'starting close jobs'
# now remove closed jobs from running list
for key in jobs_to_close:
if jobs_running[key]['has_dependent'] == "N": # delete from command list if last step of command
active_commands.pop(jobs_running[key]['cmd_rowid'], None)
if jobs_running[key]['cmd'] == "INST-ZIMS":
zims_wip.pop(jobs_running[key]['cmd_args']['zim_id'], None)
jobs_running.pop(key, None)
if key in jobs_to_cancel:
jobs_to_cancel.pop(key, None)
jobs_to_close = []
#print 'starting socket poll'
sockets = dict(poll.poll(1000))
if control_socket in sockets:
ctl_msg = control_socket.recv()
tprint('got ctl msg %s in monitor' % ctl_msg)
if ctl_msg == "EXIT":
# stop loop in order to terminate thread
log(syslog.LOG_INFO, 'Stopping XSCE Command Server Monitor Thread')
thread_run = False
#print 'starting sleep'
time.sleep(cmdsrv_job_poll_sleep_interval)
#print 'ready to loop'
# Clean up if thread is stopped
resp_socket.close()
control_socket.close()
#context.term()
#sys.exit()
def start_job(job_id, job_info, status='STARTED'):
global running_job_count
global ansible_running_flag
global prereq_jobs
job_info['output_file'] = '/tmp/job-' + str(job_id)
job_info['file'] = open(job_info['output_file'], 'w')
args = shlex.split(job_info['job_command'])
job_info['subproc'] = subprocess.Popen(args, stdout=job_info['file'], stderr=subprocess.STDOUT)
job_info['job_pid'] = job_info['subproc'].pid
job_info['status'] = status
job_info['status_datetime'] = str(datetime.now())
job_info['job_output'] = ""
if job_id in prereq_jobs:
prereq_jobs[job_id]['status'] = 'STARTED'
if job_info['cmd'] in ["RUN-ANSIBLE", "RESET-NETWORK"]:
ansible_running_flag = True
if status == 'STARTED':
log_msg = "Starting"
else:
log_msg = "Restarting"
log(syslog.LOG_INFO, "%s Job: %s, job_id: %s, pid: %s" % (log_msg, job_info['cmd'], job_id, job_info['job_pid']))
# update jobs table
upd_job_started(job_id, job_info['job_pid'], status)
running_job_count += 1
return (job_info)
def end_job(job_id, job_info, status):
global prereq_jobs
global jobs_running
global ansible_running_flag
global running_job_count
jobs_running[job_id]['file'].close()
# load output from tmp file
output_file = jobs_running[job_id]['output_file']
file = open(output_file, 'r')
job_output = file.read()
# remove non-printing chars
job_output = filter(lambda x: x in string.printable, job_output)
jobs_running[job_id]['job_output'] = job_output
file.close()
# job_info['status_datetime'] = str(datetime.now()) ?
jobs_running[job_id]['status'] = status
if job_id in prereq_jobs:
prereq_jobs[job_id]['status'] = status
log(syslog.LOG_INFO, "Job: %s, job_id: %s, pid: %s, status:%s" % (jobs_running[job_id]['cmd'], job_id, jobs_running[job_id]['job_pid'], status))
# update jobs table and remove tmp output file
upd_job_finished(job_id, job_output, status)
os.remove(output_file)
if job_info['cmd'] in ["RUN-ANSIBLE", "RESET-NETWORK"]:
ansible_running_flag = False
#if status == "SUCCEEDED":
read_xsce_ini_file() # reread ini file after running ansible
running_job_count -= 1
if running_job_count < 0:
running_job_count = 0
return (job_info)
def cancel_req_job(job_id, job_info):
global active_commands
job_info['output_file'] = None
job_info['file'] = None
job_info['subproc'] = None
#job_info['job_pid'] = None
job_info['status'] = 'CANCELLED'
job_info['status_datetime'] = str(datetime.now())
#job_info['job_output'] = ""
if job_id in prereq_jobs:
prereq_jobs[job_id]['status'] = job_info['status']
log(syslog.LOG_INFO, "Cancelling Job: %s, job_id: %s" % (job_info['cmd'], job_id))
# update jobs table
upd_job_cancelled(job_id)
# Remove Active Command and Zim WIP
if job_info['has_dependent'] == "N": # delete from command list if last step of command
active_commands.pop(job_info['cmd_rowid'], None)
if job_info['cmd'] == "INST-ZIMS":
zims_wip.pop(job_info['cmd_args']['zim_id'], None)
return (job_info)
def cmd_proc_thread(worker_data_url, worker_control_url, context=None):
"""Command Processing Worker Routine"""
context = context or zmq.Context.instance()
# Socket to talk to dispatcher
data_socket = context.socket(zmq.DEALER)
data_socket.connect(worker_data_url)
# control signals from main thread
control_socket = context.socket(zmq.SUB)
control_socket.connect(worker_control_url)
control_socket.setsockopt(zmq.SUBSCRIBE,"")
poll = zmq.Poller()
poll.register(data_socket, zmq.POLLIN)
poll.register(control_socket, zmq.POLLIN)
thread_run = True
while thread_run == True:
sockets = dict(poll.poll())
# process command
if data_socket in sockets:
ident, cmd_msg = data_socket.recv_multipart()
tprint('sending message server received from client to worker %s id %s' % (cmd_msg, ident))
cmd_resp = cmd_handler(cmd_msg)
#print cmd_resp
# 8/23/2015 added .encode() to response as list_library was giving unicode errors
data_socket.send_multipart([ident, cmd_resp.encode()])
if control_socket in sockets:
ctl_msg = control_socket.recv()
tprint('got ctl msg %s in worker' % ctl_msg)
if ctl_msg == "EXIT":
# stop loop in order to terminate thread
log(syslog.LOG_INFO, 'Stopping XSCE Command Server Worker Thread')
thread_run = False
# Clean up if thread is stopped
data_socket.close()
control_socket.close()
#context.term()
#sys.exit()
def cmd_handler(cmd_msg):
# List of recognized commands and corresponding routine
# Don't do anything else
avail_cmds = {
"TEST": do_test,
"LIST-LIBR": list_library,
"WGET": wget_file,
"GET-ANS": get_ans_facts,
"GET-ANS-TAGS": get_ans_tags,
"GET-VARS": get_install_vars,
"GET-XSCE-INI": get_xsce_ini,
"GET-CONF": get_config_vars,
"SET-CONF": set_config_vars,
"GET-MEM-INFO": get_mem_info,
"GET-SPACE-AVAIL": get_space_avail,
"GET-STORAGE-INFO": get_storage_info_lite,
"RUN-ANSIBLE": run_ansible,
"RESET-NETWORK": run_ansible,
"GET-JOB-STAT": get_last_jobs_stat,
"CANCEL-JOB": cancel_job,
"GET-WHLIST": get_white_list,
"SET-WHLIST": set_white_list,
"GET-INET-SPEED": get_inet_speed,
"GET-INET-SPEED2": get_inet_speed2,
"GET-KIWIX-CAT": get_kiwix_catalog,
"GET-ZIM-STAT": get_zim_stat,
"INST-ZIMS": install_zims,
"GET-RACHEL-STAT": get_rachel_stat,
"INST-RACHEL": install_rachel,
"DEL-DOWNLOADS": del_downloads,
"RESTART-KIWIX": restart_kiwix,
"REBOOT": reboot_server,
"POWEROFF": poweroff_server,
"REMOTE-ADMIN-CTL": remote_admin_ctl, #true/false
"GET-REMOTE-ADMIN-STATUS": get_remote_admin_status, #returns activated
"CHGPW": change_password
}
# Check for Duplicate Command
dup_cmd = next((job_id for job_id, active_cmd_msg in active_commands.items() if active_cmd_msg == cmd_msg), None)
if dup_cmd != None:
strip_cmd_msg = cmd_msg.replace('\"','')
log(syslog.LOG_ERR, "Error: %s duplicates an Active Command." % strip_cmd_msg)
resp = '{"Error": "' + strip_cmd_msg + ' duplicates an Active Command"}'
return (resp)
# store the command in database
cmd_rowid = insert_command(cmd_msg)
# process the command
cmd_info = {}
# parse for arguments
cmd_parts = cmd_msg.split(' ',1)
cmd = cmd_parts[0]
if len(cmd_parts)>1:
try:
cmd_args = json.loads(cmd_parts[1])
except:
return cmd_malformed(cmd)
else:
cmd_args = {}
cmd_info['cmd_rowid'] = cmd_rowid
cmd_info['cmd_msg'] = cmd_msg
cmd_info['cmd'] = cmd
cmd_info['cmd_args'] = cmd_args
#print (cmd_info)
# commands that run scripts should check for malicious characters in cmd_args and return error if found
# bad_command = validate_command(cmd_args)
# if not in daemon mode don't trap errors
if daemon_mode == False:
resp = avail_cmds[cmd](cmd_info)
else:
if cmd in avail_cmds:
try:
resp = avail_cmds[cmd](cmd_info)
except:
log(syslog.LOG_ERR, "Error: Unexpected error in Command %s." % cmd)
resp = '{"Error": "Unexpected error in Command ' + cmd + '"}'
else:
log(syslog.LOG_ERR, "Error: Unknown Command %s." % cmd)
resp = '{"Error": "Unknown Command"}'
return (resp)
#
# Functions to process Commands
#
def do_test(cmd_info):
#resp = cmd_success("TEST")
#return (resp)
outp = subprocess.check_output(["scripts/test.sh"])
json_outp = json_array("TEST",outp)
return (json_outp)
def list_library(cmd_info):
libr_list = {}
file_list = []
target_dir = "/library/"
try:
target_dir += cmd_info['cmd_args']['sub_dir']
except:
return cmd_malformed(cmd_info['cmd'])
libr_list['path'] = target_dir
cmdstr = "/usr/bin/du -ah " + target_dir
if not os.path.exists(target_dir):
resp = cmd_error(cmd='LIST-LIBR', msg='Path not Found.')
return (resp)
try:
outp = subproc_cmd(cmdstr)
file_arr = outp.split('\n')
for file in file_arr:
if file == "":
continue
tmp_arr = file.split("\t")
if tmp_arr[1] == target_dir:
continue
size = tmp_arr[0]
filename = tmp_arr[1].split("/")[-1]
file_attr = {}
file_attr['size'] = size
file_attr['filename'] = filename
file_list.append(file_attr)
libr_list['file_list'] = file_list
#str_json = json.dumps(library_list)
#json_resp = '{ "' + target_dir + '":' + str_json + '}'
json_resp = json.dumps(libr_list)
#print json_resp
except:
return cmd_malformed(cmd_info['cmd'])
return (json_resp)
def del_downloads(cmd_info):
target_dir = "/library/downloads/"
error_flag = False
try:
target_dir += cmd_info['cmd_args']['sub_dir']
if '..' in cmd_info['cmd_args']['sub_dir']: # could be legit, but ../ and /.. are not
return cmd_malformed(cmd_info['cmd'])
file_list = cmd_info['cmd_args']['file_list']
except:
return cmd_malformed(cmd_info['cmd'])
cmdstr = "rm " + target_dir + "/"
for file in file_list:
try:
outp = subproc_cmd(cmdstr + file)
except:
# ignore for now but report
error_flag = True
pass
if error_flag:
resp = cmd_success_msg(cmd_info['cmd'], "- Some errors occurred")
else:
resp = cmd_success(cmd_info['cmd'])
return (resp)
def subproc_cmd(cmdstr):
args = shlex.split(cmdstr)
outp = subprocess.check_output(args)
return (outp)
def get_ansible_version():
outp = subprocess.check_output(ansible_program + " --version | head -n1 | cut -f2 -d' '",shell=True)
return outp
def wget_file(cmd_info):
resp = cmd_info['cmd'] + " done."
return (resp)
def get_ans_facts(cmd_info):
resp = json.dumps(ansible_facts)
return (resp)
def get_ans_tags(cmd_info):
resp = json.dumps(ansible_tags)
return (resp)
def get_install_vars(cmd_info):
resp = json.dumps(effective_vars)
return (resp)
def get_xsce_ini(cmd_info):
read_xsce_ini_file()
resp = json.dumps(xsce_ini)
return (resp)
def get_mem_info(cmd_info):
outp = subprocess.check_output(["/usr/bin/free", "-h"])
json_outp = json_array("system_memory", outp)
return (json_outp)
def get_space_avail(cmd_info):
space_avail = {}
space_avail['library_on_root'] = True
libr_attr = {}
cmd = df_program + " -m"
cmd_args = shlex.split(cmd)
outp = subprocess.check_output(cmd_args)
dev_arr = outp.split('\n')
for dev_str in dev_arr[1:-1]:
dev_attr = dev_str.split()
if dev_attr[5] == '/':
space_avail['root'] = parse_df_str(dev_str)
if dev_attr[5] == '/library':
space_avail['library'] = parse_df_str(dev_str)
space_avail['library_on_root'] = False
if ('root' in space_avail):
resp = json.dumps(space_avail)
else:
resp = cmd_error(cmd_info['cmd'], "No root partition found")
return (resp)
def get_storage_info_lite(cmd_info):
outp = subprocess.check_output([df_program, "-lh"])
json_outp = json_array("system_fs", outp)
return (json_outp)
# the following does not work on lvm or raid or iso usb drives
# we wil fix later
def get_storage_info(cmd_info):
system_storage = []
cmd = "lsblk -aP -o NAME,FSTYPE,TYPE,SIZE,MOUNTPOINT,LABEL,UUID,PARTLABEL,PARTUUID,MODEL"
cmd_args = shlex.split(cmd)
outp = subprocess.check_output(cmd_args)
dev_arr = outp.split('\n')
system_storage_idx = -1
for item in dev_arr:
if len(item) == 0: # last element is blank
continue
item_attr = get_storage_info_str2dict(item)
if item_attr['TYPE'] == "disk":
cur_dev = '/dev/'+ item_attr['NAME']
dev_info = get_storage_info_parted(cur_dev)
dev_info['lsblk'] = item_attr
system_storage.append(dev_info)
system_storage_idx += 1
elif item_attr['TYPE'] == "part":
part_dev = '/dev/'+ item_attr['NAME']
# lsblk includes extended partition that holds other partition(s)
if item_attr['FSTYPE'] != "" and \
part_dev in system_storage[system_storage_idx]['index']:
blkidx = system_storage[system_storage_idx]['index'][part_dev]
system_storage[system_storage_idx]['blocks'][blkidx]['lsblk'] = item_attr
part_prop = get_storage_info_df(part_dev)
system_storage[system_storage_idx]['blocks'][blkidx]['part_prop'] = part_prop
#print dev_info
resp = json.dumps(system_storage)
return (resp)
def get_storage_info_str2dict(str):
s2 = str.replace("=",'":')
s3 = s2.replace('" ', '","')
s4 = '{"' + s3 + '}'
sd = json.loads(s4)
return (sd)
def get_storage_info_parted(dev):
dev_info = {'device':dev, 'desc':'', 'log_sect_size':'', 'part_tbl':'', 'phys_sect_size':'', 'size':'', 'type':'', 'blocks':[]}
try:
parts = subprocess.check_output(["parted", dev, "-ms", "print", "free"])
except subprocess.CalledProcessError as e:
#skip devices that cause problems
pass
else:
blkstr = parts.split("BYT;\n")[1]
#print "blkstr is " + blkstr
dev_info = {}
dev_blk = blkstr.split(';\n') # dev_blk[0] is the device itself, some can be free space
dev_char = dev_blk[0]
dev_attr = dev_char.split(':')
dev_info['device'] = dev_attr[0]
dev_info['size'] = dev_attr[1]
dev_info['type'] = dev_attr[2]
dev_info['log_sect_size'] = dev_attr[3]
dev_info['phys_sect_size'] = dev_attr[4]
dev_info['part_tbl'] = dev_attr[5]
dev_info['desc'] = dev_attr[6]
dev_info['blocks'] = []
dev_blk = dev_blk[1:]
dev_blk = [a for a in dev_blk if a not in ['\n']]
blk_idx = {}
blk_idx_num = 0
for blk in dev_blk: # blocks can be free or partitioned
#print "blk is " + blk
if blk == '':
continue
blk_info = {}
blk_attr = blk.split(':')
blk_info['part_no'] = blk_attr[0] # is 1 for free
blk_info['start'] = blk_attr[1]
blk_info['end'] = blk_attr[2]
blk_info['size'] = blk_attr[3]
blk_info['type'] = blk_attr[4]
if blk_info['type'] == 'free':
blk_info['part_dev'] = 'unallocated'
elif blk_info['type'] == '': # this is an extension partition, so ignore
continue
else:
# this is a kluge
if "mmc" in dev_info['device']:
blk_info['part_dev'] = dev_info['device'] + 'p' + blk_info['part_no']
else:
blk_info['part_dev'] = dev_info['device'] + blk_info['part_no']
#print blk_info['part_dev']
if len(blk_attr) == 7:
blk_info['flag'] = blk_attr[6]
else:
blk_info['flag'] = ""
dev_info['blocks'].append(blk_info)
if blk_info['part_dev'] != 'unallocated': # create an index to actual partitions in blocks array
blk_idx[blk_info['part_dev']] = blk_idx_num
blk_idx_num += 1
dev_info['index'] = blk_idx
return (dev_info)
def get_storage_info_df(part_dev):
part_prop = {}
outp = subprocess.check_output([df_program, part_dev, "-m"])
str_array = outp.split('\n')
part_prop = parse_df_str(str_array[1])
#print part_prop
return (part_prop)
def parse_df_str(df_str):
dev_attr = {}
dev_attr_array = df_str.split()
dev_attr_array = [a for a in dev_attr_array if a not in ['']]
dev_attr['dev'] = dev_attr_array[0]
dev_attr['size_in_megs'] = dev_attr_array[1]
dev_attr['avail_in_megs'] = dev_attr_array[3]
dev_attr['mount_point'] = dev_attr_array[5]
return (dev_attr)
def get_inet_speed(cmd_info):
outp = subprocess.check_output(["scripts/get_inet_speed"])
json_outp = json_array("internet_speed", outp)
return (json_outp)
def get_inet_speed2(cmd_info):
outp = subprocess.check_output(["/usr/bin/speedtest-cli","--simple"])
json_outp = json_array("internet_speed", outp)
return (json_outp)
def get_white_list(cmd_info):
whlist = []
try:
stream = open(squid_whitelist, 'r')
whlist = stream.read()
stream.close()
except IOError:
whlist = "[]"
#resp = '{"xsce_whitelist": "' + whlist + '"}'
resp = json_array("xsce_whitelist", whlist)
return (resp)
def set_white_list(cmd_info):
whlist = cmd_info['cmd_args']['xsce_whitelist']
whlist = filter(None, whlist) # remove blank lines
resp = cmd_success(cmd_info['cmd'])
try:
stream = open(squid_whitelist, 'w')
for item in whlist:
stream.write(item + '\n')
except IOError:
resp = cmd_error
finally:
stream.close()
return (resp)
def get_kiwix_catalog(cmd_info):
outp = subprocess.check_output(["scripts/get_kiwix_catalog"])
if outp == "SUCCESS":
read_kiwix_catalog()
resp = cmd_success("GET-KIWIX-CAT")
return (resp)
else:
return ('{"Error": "' + outp + '."}')
def get_zim_stat(cmd_info):
all_zims = {}
all_zims['WIP'] = zims_wip
all_zims['INSTALLED'] = {}
lib_xml_file = zim_dir + "/library.xml"
try:
tree = ET.parse(lib_xml_file)
root = tree.getroot()
for child in root:
attributes = child.attrib
if 'id' in attributes:
id = attributes['id']
all_zims['INSTALLED'][id] = attributes
except IOError:
all_zims['INSTALLED'] = {}
resp = json.dumps(all_zims)
return (resp)
def get_config_vars(cmd_info):
read_config_vars()
resp = json.dumps(config_vars)
return (resp)
def set_config_vars(cmd_info):
global config_vars
lock.acquire() # will block if lock is already held
try:
config_vars = cmd_info['cmd_args']['config_vars']
finally:
lock.release() # release lock, no matter what
write_config_vars()
#print config_vars
resp = cmd_success(cmd_info['cmd'])
return (resp)
def run_ansible(cmd_info): # create multiple jobs to run in succession
global ansible_running_flag
global jobs_requested
if ansible_running_flag:
return (cmd_error(msg="Ansible Command already Running."))
#return ('{"Error": "Ansible Command already Running."}')
if cmd_info['cmd'] == "RUN-ANSIBLE":
job_command = ansible_playbook_program + " -i " + xsce_git_path + "/ansible_hosts " + xsce_git_path + "/xsce-from-console.yml --connection=local"
if 'cmd_args' in cmd_info:
tags = cmd_info['cmd_args']['tags']
if tags != "ALL-TAGS":
job_command += ' --tags="' + tags +'"'
else:
return cmd_malformed(cmd_info['cmd'])
else: # cmd is "RESET-NETWORK"
job_command = xsce_git_path + "/xsce-network"
resp = request_job(cmd_info, job_command)
return resp
def get_rachel_stat(cmd_info):
# see if rachel installed from xsce_ini
# see if rachel content installed from xsce_ini.rachel_content_path
# get list of modules and modules.out
global xsce_ini
rachel_stat = {}
rachel_stat['status'] = 'NOT-INSTALLED'
rachel_stat['content_installed'] = False
if 'rachel' in xsce_ini:
rachel_stat['status'] = 'INSTALLED'
#print xsce_ini['rachel']['enabled']
if xsce_ini['rachel']['enabled'] == "True": # For some reason True/False from ini is string not boolean
rachel_stat['status'] = 'ENABLED'
rachel_index = xsce_ini['rachel']['rachel_content_path'] + "index.php"
if os.path.isfile(rachel_index):
rachel_stat['content_installed'] = True
mods_enabled = get_rachel_modules(xsce_ini['rachel']['rachel_content_path'] + "modules", True)
mods_disabled = get_rachel_modules(xsce_ini['rachel']['rachel_content_path'] + "modules.out", False)
rachel_stat['enabled'] = {}
rachel_stat['enabled'].update (mods_enabled)
rachel_stat['enabled'].update (mods_disabled)
resp = json.dumps(rachel_stat)
return (resp)
def get_rachel_modules(module_dir, state):
modules = {}
dir_list = [os.path.join(module_dir,o) for o in os.listdir(module_dir) if os.path.isdir(os.path.join(module_dir,o))]
for dir in dir_list:
f = open(dir + '/index.htmlf')
html = f.read()
nophp = re.sub(r'<\?(.*)\?>', r'', html)
# assume these html fragments remain constant - based on 3.1.5
start = nophp.find('<h2>') + len('<h2>')
end = nophp[start:].find('</h2>')
h2 = nophp[start:start+end]
title = re.sub(r'<(.*?)>', r'', h2)
module = {}
module['path'] = dir
module['enabled'] = state
modules[title] = module
return (modules)
def install_zims(cmd_info):
global ansible_running_flag
global jobs_requested
global kiwix_catalog
if 'cmd_args' in cmd_info:
zimId = cmd_info['cmd_args']['zim_id']
if zimId in kiwix_catalog:
zimFileRef = kiwix_catalog[zimId]['file_ref']
else:
resp = cmd_error(cmd='INST-ZIMS', msg='Zim ID not found in Command')
return resp
downloadSrcFile = kiwix_catalog[zimId]['download_url']
try:
rc = urllib2.urlopen(downloadSrcFile)
rc.close()
except (urllib2.URLError) as exc:
errmsg = str("Zim File " + zimFileRef + " not found in Cmd")
resp = cmd_error(cmd='INST-ZIMS', msg=errmsg)
return resp
targetDir = zim_working_dir + zimFileRef
if not os.path.exists(targetDir):
try:
os.makedirs(targetDir)
except OSError:
resp = cmd_error(cmd='INST-ZIMS', msg='Error creating directory in Command')
return resp
else:
return cmd_malformed(cmd_info['cmd'])
# at this point we can create all the jobs
downloadSrcFile = kiwix_catalog[zimId]['download_url']
downloadFile = zim_downloads_dir + zim_download_prefix + zimFileRef + ".zip"
# download zip file
job_command = "/usr/bin/wget -c --progress=dot:giga " + downloadSrcFile + " -O " + downloadFile
job_id = request_one_job(cmd_info, job_command, 1, -1, "Y")
#print job_command
# unzip
job_command = "/usr/bin/unzip -uo " + downloadFile + " -d " + targetDir
job_id = request_one_job(cmd_info, job_command, 2, job_id, "Y")
#print job_command
# move to location and clean up
job_command = "/opt/schoolserver/xsce_cmdsrv/scripts/zim_install_step3.sh " + zimFileRef
#print job_command
resp = request_job(cmd_info=cmd_info, job_command=job_command, cmd_step_no=3, depend_on_job_id=job_id, has_dependent="N")
#resp = cmd_error(cmd='INST-ZIMS', msg='Just testing')
return resp
def install_rachel(cmd_info):
global ansible_running_flag
global jobs_requested
global xsce_ini
# make sure rachel is installed
if 'rachel' not in xsce_ini:
resp = cmd_error(cmd='INST-RACHEL', msg='RACHEL is not installed')
return resp
downloadSrcFile = xsce_ini['rachel']['rachel_src_url']
rachel_version = xsce_ini['rachel']['rachel_version']
try:
rc = urllib2.urlopen(downloadSrcFile)
rc.close()
except (urllib2.URLError) as exc:
errmsg = str("Can't access " + downloadSrcFile + " Cmd")
resp = cmd_error(cmd='INST-RACHEL', msg=errmsg)
return resp
rachelName = rachel_version
targetDir = rachel_working_dir + rachelName
if not os.path.exists(targetDir):
try:
os.makedirs(targetDir)
except OSError:
resp = cmd_error(cmd='INST-RACHEL', msg='Error creating directory in Command')
return resp
# at this point we can create all the jobs
downloadFile = rachel_downloads_dir + rachelName + '.zip'
# download zip file
job_command = "/usr/bin/wget -c --progress=dot:giga " + downloadSrcFile + " -O " + downloadFile
job_id = request_one_job(cmd_info, job_command, 1, -1, "Y")
#print job_command
# unzip
#job_command = "/usr/bin/unzip -uo " + downloadFile + " -d " + targetDir
job_command = "/opt/schoolserver/xsce_cmdsrv/scripts/rachel_install_step2.sh " + rachelName
job_id = request_one_job(cmd_info, job_command, 2, job_id, "Y")
#print job_command
# move to location and clean up
job_command = "/opt/schoolserver/xsce_cmdsrv/scripts/rachel_install_step3.sh " + rachelName
#print job_command
resp = request_job(cmd_info=cmd_info, job_command=job_command, cmd_step_no=3, depend_on_job_id=job_id, has_dependent="N")
#resp = cmd_error(cmd='INST-ZIMS', msg='Just testing')
return resp
def restart_kiwix(cmd_info):
rc = subprocess.call(["/usr/bin/xsce-make-kiwix-lib"])
#print rc
if rc == 0:
resp = cmd_success(cmd_info['cmd'])
else:
resp = cmd_error(cmd_info['cmd'])
return (resp)
def reboot_server(cmd_info):
resp = cmd_success_msg(cmd_info['cmd'], 'Reboot Initiated')
outp = subprocess.Popen(["scripts/reboot.sh"])
return (resp)
def poweroff_server(cmd_info):
resp = cmd_success_msg(cmd_info['cmd'], 'Power Off Initiated')
outp = subprocess.Popen(["scripts/poweroff.sh"])
return (resp)
def remote_admin_ctl(cmd_info):
try:
bool_activate = cmd_info['cmd_args']['activate']
outp = subprocess.Popen(["scripts/remote_admin_ctl.sh",bool_activate])
resp = get_remote_admin_status(cmd_info)
return (resp)
except:
return cmd_malformed(cmd_info['cmd'])
def get_remote_admin_status(cmd_info):
outp = subprocess.check_output(["scripts/get_remote_admin_status.sh"])
#resp = json_array("remote",outp)
return (outp.strip())
def change_password(cmd_info):
#print cmd_info['cmd_args']
try:
user = cmd_info['cmd_args']['user']
oldpasswd = cmd_info['cmd_args']['oldpasswd']
newpasswd = cmd_info['cmd_args']['newpasswd']
except:
return cmd_malformed(cmd_info['cmd'])
# Prevent shell injection on arguments - will disallow some legal characters from password
# Assume this is not needed as shell is false on call to chpasswd
#for key in cmd_info['cmd_args']:
# match = re.search('[ ;,|<>()`=&\r\n]', cmd_info['cmd_args'][key])
# if match != None:
# return cmd_malformed(cmd_info['cmd'])
# May not set root password
if user == "root":
resp = cmd_error(cmd=cmd_info['cmd'], msg='May not change root password.')
return resp
# see if user exists
try:
spwddb = spwd.getspnam(user)
except:
resp = cmd_error(cmd=cmd_info['cmd'], msg='User not found or system error.')
return resp
# check old password - N.B. allows password guessing
readpasswd = spwddb[1]
pwparts = readpasswd.split('$')
salt = '$' + pwparts[1] + '$' + pwparts[2] +'$'
calcpasswd = crypt.crypt(oldpasswd, salt)
if calcpasswd != readpasswd:
resp = cmd_error(cmd=cmd_info['cmd'], msg='Old Password Incorrect.')
return resp
# check password for valid characters and min length of 8 - need better regex as all characters are legal
#match = re.search('^[A-Za-z0-9#$+*]{8,}$', newpasswd)
#if match != None:
# resp = cmd_error(cmd=cmd_info['cmd'], msg='Illegal Characters in Password.')
# return resp
# check password strength
is_valid, message = isStrongPassword(newpasswd)
if not is_valid:
resp = cmd_error(cmd=cmd_info['cmd'], msg='Password Strength: ' + message + '.')
return resp
# create new password hash
newhash = crypt.crypt(newpasswd, salt)
pwinput = user + ':' + newhash + '\n'
# finally change password
p = subprocess.Popen(['chpasswd', '-e'], stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.STDOUT)
out = p.communicate(input=pwinput)[0]
rc = p.returncode
if rc != 0:
resp = cmd_error(cmd=cmd_info['cmd'], msg='System Error: Failure to change password')
else:
resp = cmd_success_msg(cmd=cmd_info['cmd'], msg='Password Changed')
return resp
def isStrongPassword(password):
message = ""
is_valid = True
try:
cracklib.VeryFascistCheck(password)
except ValueError, e:
is_valid = False
message = e.message
return is_valid, message
def request_job(cmd_info, job_command, cmd_step_no=1, depend_on_job_id=-1, has_dependent="N"):
global jobs_requested
active_commands[cmd_info['cmd_rowid']] = cmd_info['cmd_msg']
job_id = request_one_job(cmd_info, job_command, cmd_step_no, depend_on_job_id, has_dependent)
return ('{"Success": "Job ' + str(job_id) + ' Scheduled."}')
def request_one_job(cmd_info, job_command, cmd_step_no, depend_on_job_id, has_dependent):
global jobs_requested
global prereq_jobs
prereq_info = {}
job_id = get_job_id()
job_info = {}
job_info['cmd_rowid'] = cmd_info['cmd_rowid']
job_info['cmd'] = cmd_info['cmd']
job_info['cmd_args'] = cmd_info['cmd_args']
job_info['cmd_step_no'] = cmd_step_no
job_info['depend_on_job_id'] = depend_on_job_id
job_info['has_dependent'] = has_dependent
job_info['job_command'] = job_command
job_info['status'] = 'SCHEDULED'
job_info['status_datetime'] = str(datetime.now())
jobs_requested[job_id] = job_info
insert_job(job_id, cmd_info['cmd_rowid'], job_command, cmd_step_no, depend_on_job_id, has_dependent)
if has_dependent == "Y":
prereq_info ['status'] = job_info['status']
prereq_jobs[job_id] = prereq_info
else:
if cmd_info['cmd'] == "INST-ZIMS":
id = cmd_info['cmd_args']['zim_id']
if id in kiwix_catalog:
zims_wip[id] = kiwix_catalog[id]
else:
log(syslog.LOG_ERR, "Error: Unknown kiwix zim - %s." % id)
#print "unknown kiwix zim - ", id
return job_id
def cancel_job(cmd_info):
global jobs_to_cancel
global jobs_requested
try:
job_id = int(cmd_info['cmd_args']['job_id'])
except ValueError:
return cmd_malformed(cmd_info['cmd'])
if job_id in jobs_requested:
jobs_to_cancel[job_id] = True
return ('{"Success": "Scheduled Job ' + str(job_id) + ' Cancelled."}')
elif job_id in jobs_running :
jobs_to_cancel[job_id] = True
return ('{"Success": "Running Job ' + str(job_id) + ' Cancelled."}')
else:
return ('{"Error": "Job ' + str(job_id) + ' Not Running or Scheduled."}')
def get_last_jobs_stat(cmd_info):
db_lock.acquire() # will block if lock is already held
try:
conn = sqlite3.connect(xsce_cmdsrv_dbpath)
cur = conn.execute ("SELECT jobs.rowid, job_command, job_output, job_status, last_update_datetime, cmd_msg FROM jobs, commands where cmd_rowid = commands.rowid ORDER BY jobs.rowid DESC LIMIT 30")
last_jobs = cur.fetchall()
conn.close()
except sqlite3.Error, e:
tprint ("Error %s:" % e.args[0])
log(syslog.LOG_ERR, "Sql Lite3 Error %s:" % e.args[0])
finally:
if conn:
conn.close()
db_lock.release()
#print "job running"
#print jobs_running
# get status output for incomplete ansible jobs
last_jobs_cc = []
for job in last_jobs:
job_id = job[0]
if job_id in jobs_running:
job_cc = []
job_cc.append(job[0])
job_cc.append(job[1])
job_output = job[2]
#if jobs_running[job_id]['cmd'] == "RUN-ANSIBLE" and jobs_running[job_id]['status'] == "STARTED":
if jobs_running[job_id]['status'] == "STARTED" or jobs_running[job_id]['status'] == "RESTARTED":
# load output from tmp file
output_file = jobs_running[job_id]['output_file']
file = open(output_file, 'r')
job_output = file.read()
file.close()
#print "job output" + job_output
job_cc.append(job_output)
job_cc.append(job[3])
job_cc.append(job[4])
job_cc.append(job[5])
last_jobs_cc.append(job_cc)
else:
last_jobs_cc.append(job)
resp = json.dumps(last_jobs_cc)
return resp
def get_jobs_running(cmd_info):
global jobs_running
today = str(date.today())
job_stat = {}
cur_jobs = {}
for job, job_info in jobs_running.iteritems():
if today in job_info['status_datetime'] or jobinfo['status'] in ['SCHEDULED','STARTED']:
job_stat['status'] = job_info['status']
job_stat['job_command'] = job_info['job_command']
job_stat['status_datetime'] = job_info['status_datetime']
job_stat['status_datetime'] = job_info['status_datetime']
job_stat['job_output'] = job_info['job_output']
cur_jobs[job] = job_stat
resp = json.dumps(cur_jobs)
return resp
def json_array(name, str):
try:
str_array = str.split('\n')
str_json = json.dumps(str_array)
json_resp = '{ "' + name + '":' + str_json + '}'
except StandardError:
json_resp = cmd_error()
return (json_resp)
def validate_command(cmd):
match = re.search('[;,|<>()=&\r\n]', cmd, flags=0)
if match != None:
log(syslog.LOG_ERR, "Error: Malformed Command")
return ('{"Error": "Malformed Command."}')
else:
return None
def get_cmd_info_key(cmd_info, key):
if key in cmd_info:
return (cmd_info[key])
else:
return None
def tprint(msg):
"""like print, but won't get newlines confused with multiple threads DELETE AFTER TESTING"""
if daemon_mode == False:
sys.stdout.write(msg + '\n')
sys.stdout.flush()
def cmd_success(cmd):
return (cmd_success_msg(cmd, ""))
def cmd_success_msg(cmd, msg):
return ('{"Success": "' + cmd + " " + msg + '."}')
def cmd_error(cmd="", msg="Internal Server Error processing Command"):
log(syslog.LOG_ERR, "Error: %s %s." % (msg, cmd))
return ('{"Error": "' + msg + ' ' + cmd + '."}')
def cmd_malformed(cmd=None):
log(syslog.LOG_ERR, "Error: Malformed Command %s." % cmd)
return ('{"Error": "Malformed Command ' + cmd + '."}')
def insert_command(cmd_msg):
global last_command_rowid
lock.acquire() # will block if lock is already held
try:
cmd_id = last_command_rowid + 1
last_command_rowid = cmd_id
finally:
lock.release() # release lock, no matter what
now = datetime.now()
db_lock.acquire()
try:
conn = sqlite3.connect(xsce_cmdsrv_dbpath)
conn.execute ("INSERT INTO commands (rowid, cmd_msg, create_datetime) VALUES (?,?,?)", (cmd_id, cmd_msg, now))
conn.commit()
conn.close()
except sqlite3.Error, e:
tprint ("Error %s:" % e.args[0])
log(syslog.LOG_ERR, "Sql Lite3 Error %s:" % e.args[0])
finally:
if conn:
conn.close()
db_lock.release()
return (cmd_id)
def insert_job(job_id, cmd_rowid, job_command, cmd_step_no, depend_on_job_id, has_dependent):
#print "in insert job"
now = datetime.now()
job_pid=0
job_output=""
job_status="SCHEDULED"
db_lock.acquire()
try:
conn = sqlite3.connect(xsce_cmdsrv_dbpath)
conn.execute ("INSERT INTO jobs (rowid, cmd_rowid, cmd_step_no, depend_on_job_id, has_dependent, job_command, job_pid, job_output, job_status, create_datetime, last_update_datetime) VALUES (?,?,?,?,?,?,?,?,?,?,?)",
(job_id, cmd_rowid, cmd_step_no, depend_on_job_id, has_dependent, job_command, job_pid, job_output, job_status, now, now))
conn.commit()
conn.close()
except sqlite3.Error, e:
tprint ("Error %s:" % e.args[0])
log(syslog.LOG_ERR, "Sql Lite3 Error %s:" % e.args[0])
finally:
if conn:
conn.close()
db_lock.release()
def upd_job_started(job_id, job_pid, job_status="STARTED"):
now = datetime.now()
db_lock.acquire()
try:
conn = sqlite3.connect(xsce_cmdsrv_dbpath)
conn.execute ("UPDATE jobs SET job_pid = ?, job_status = ?, last_update_datetime = ? WHERE rowid = ?", (job_pid, job_status, now, job_id))
conn.commit()
conn.close()
except sqlite3.Error, e:
tprint ("Error %s:" % e.args[0])
log(syslog.LOG_ERR, "Sql Lite3 Error %s:" % e.args[0])
finally:
if conn:
conn.close()
db_lock.release()
def upd_job_finished(job_id, job_output, job_status="FINISHED"):
now = datetime.now()
db_lock.acquire()
try:
conn = sqlite3.connect(xsce_cmdsrv_dbpath)
conn.execute ("UPDATE jobs SET job_status = ?, job_output = ?, last_update_datetime = ? WHERE rowid = ?", (job_status, job_output, now, job_id))
conn.commit()
conn.close()
except sqlite3.Error, e:
tprint ("Error %s:" % e.args[0])
log(syslog.LOG_ERR, "Sql Lite3 Error %s:" % e.args[0])
finally:
if conn:
conn.close()
db_lock.release()
def upd_job_cancelled(job_id, job_status="CANCELLED"):
now = datetime.now()
db_lock.acquire()
try:
conn = sqlite3.connect(xsce_cmdsrv_dbpath)
conn.execute ("UPDATE jobs SET job_status = ?, last_update_datetime = ? WHERE rowid = ?", (job_status, now, job_id))
conn.commit()
conn.close()
except sqlite3.Error, e:
tprint ("Error %s:" % e.args[0])
log(syslog.LOG_ERR, "Sql Lite3 Error %s:" % e.args[0])
finally:
if conn:
conn.close()
db_lock.release()
def get_job_id():
global last_job_rowid
lock.acquire() # will block if lock is already held
try:
job_id = last_job_rowid + 1
last_job_rowid = job_id
finally:
lock.release() # release lock, no matter what
return(job_id)
def init():
global last_command_rowid
global last_job_rowid
# Read application variables from config files
app_config()
# Read vars from ansible file into global vars
read_xsce_vars()
read_xsce_ini_file()
# Get ansible facts and tags for localhost
get_ansible_facts()
get_ansible_tags()
read_kiwix_catalog()
# Compute variables derived from all of the above
compute_vars()
# See if queue.db exists and create if not
# Opening a connection creates if not exist
# No DB locking done in init as is single threaded
statinfo = os.stat(xsce_cmdsrv_dbpath)
if statinfo.st_size == 0:
conn = sqlite3.connect(xsce_cmdsrv_dbpath)
conn.execute ("CREATE TABLE commands (cmd_msg text, create_datetime text)")
conn.commit()
conn.execute ("CREATE TABLE jobs (cmd_rowid integer, cmd_step_no integer, depend_on_job_id integer, has_dependent text, job_command text, job_pid integer, job_output text, job_status text, create_datetime text, last_update_datetime text)")
conn.commit()
conn.close()
else:
conn = sqlite3.connect(xsce_cmdsrv_dbpath)
cur = conn.execute("SELECT max (rowid) from commands")
row = cur.fetchone()
if row[0] is not None:
last_command_rowid = row[0]
cur = conn.execute("SELECT max (rowid) from jobs")
row = cur.fetchone()
if row[0] is not None:
last_job_rowid = row[0]
cur.close()
conn.close()
get_incomplete_jobs()
def read_config_vars():
global config_vars
stream = open("/etc/xsce/config_vars.yml", 'r')
config_vars = yaml.load(stream)
stream.close()
if config_vars == None:
config_vars = {} # try to make the ajax call happy
def read_xsce_ini_file():
global xsce_ini
xsce_ini_tmp = {}
config = ConfigParser.ConfigParser()
config.read(xsce_ini_file)
for section in config.sections():
xsce_ini_sec = {}
opts = config.options(section)
for opt in opts:
attr = config.get(section, opt)
xsce_ini_sec[opt] = attr
xsce_ini_tmp[section] = xsce_ini_sec
xsce_ini = xsce_ini_tmp
def write_config_vars():
global config_vars
lock.acquire() # will block if lock is already held
try:
stream = open("/etc/xsce/config_vars.yml", 'w')
stream.write('# DO NOT MODIFY THIS FILE.\n')
stream.write('# IT IS AUTOMATICALLY GENERATED.\n')
for key in config_vars:
if isinstance(config_vars[key], (int, float)): # boolean is int
value = str(config_vars[key])
else:
value = '"' + config_vars[key] + '"'
entry = key + ': ' + value
stream.write(entry + '\n')
stream.close()
finally:
lock.release() # release lock, no matter what
def read_xsce_vars():
global default_vars
global local_vars
global effective_vars
stream = open(xsce_git_path + "/vars/default_vars.yml", 'r')
default_vars = yaml.load(stream)
stream.close()
stream = open(xsce_git_path + "/vars/local_vars.yml", 'r')
local_vars = yaml.load(stream)
stream.close()
if local_vars == None:
local_vars = {}
# combine vars with local taking precedence
# exclude derived vars marked by {
for key in default_vars:
if isinstance(default_vars[key], str):
findpos = default_vars[key].find("{")
if findpos == -1:
effective_vars[key] = default_vars[key]
else:
effective_vars[key] = default_vars[key]
for key in local_vars:
if isinstance(local_vars[key], str):
findpos = local_vars[key].find("{")
if findpos == -1:
effective_vars[key] = local_vars[key]
else:
effective_vars[key] = local_vars[key]
def get_ansible_facts():
global ansible_facts
command = ansible_program + " localhost -i " + xsce_git_path + "/ansible_hosts -m setup -o --connection=local"
args = shlex.split(command)
outp = subprocess.check_output(args)
if (get_ansible_version() < '2'):
splitter = 'success >> '
else:
splitter = 'SUCCESS => '
ans_str = outp.split(splitter)
ans = json.loads(ans_str[1])
ansible_facts = ans['ansible_facts']
def get_ansible_tags():
global ansible_tags
if (get_ansible_version() < '2'):
command = ansible_playbook_program + " -i " + xsce_git_path + "/ansible_hosts " + xsce_git_path + "/xsce-from-console.yml --connection=local --tags=???"
splitter = 'values: '
else:
command = ansible_playbook_program + " -i " + xsce_git_path + "/ansible_hosts " + xsce_git_path + "/xsce-from-console.yml --connection=local --list-tags"
splitter = 'TASK TAGS: '
args = shlex.split(command)
try:
outp = subprocess.check_output(args)
except subprocess.CalledProcessError as e:
outp = e.output
# get just the tag list and remove final newline
split = outp.split(splitter)
ans_tags_str = split[1].split('\n')[0]
ansible_tags['ansible_tags'] = ans_tags_str
def read_kiwix_catalog():
global kiwix_catalog
stream = open (kiwix_catalog_file,"r")
kiwix_catalog_with_date = json.load(stream)
kiwix_catalog = kiwix_catalog_with_date['zims']
stream.close()
def get_incomplete_jobs():
global jobs_requested
global jobs_to_restart
jobs_to_cancel = {}
prereq_info = {}
# calculate boot time so we can tell if pid is ours
with open('/proc/uptime', 'r') as f:
uptime_str = f.readline()
now = datetime.now()
seconds_since_boot = float(uptime_str.split()[0])
boot_delta = timedelta(seconds=seconds_since_boot)
boot_time = now - boot_delta
# get jobs from database that didn't finish, group by command in desc order so we only done the last one
conn = sqlite3.connect(xsce_cmdsrv_dbpath)
sql = "SELECT jobs.rowid, cmd_rowid, cmd_step_no, depend_on_job_id, has_dependent, job_command, job_pid, job_output, job_status, jobs.create_datetime, cmd_msg from commands, jobs "
sql += "WHERE commands.rowid = jobs.cmd_rowid and job_status IN ('STARTED', 'RESTARTED', 'SCHEDULED') ORDER BY job_command, jobs.rowid DESC"
cur = conn.execute(sql)
last_command = ""
for row in cur.fetchall():
job_id, cmd_rowid, cmd_step_no, depend_on_job_id, has_dependent, job_command, job_pid, job_output, job_status, create_datetime, cmd_msg = row
job_created_time = datetime.strptime(create_datetime, "%Y-%m-%d %H:%M:%S.%f") # create_datetime to datetime type
# Kill any jobs hanging around. In future we might try to monitor them since we have the job_output.
# But make sure they are since last reboot or might not be ours
if job_created_time > boot_time:
if job_pid > 0:
try:
tprint ("Removing pid %s if still running" % job_pid)
log(syslog.LOG_INFO, "Removing pid %s if still running" % job_pid)
os.kill(job_pid, signal.SIGKILL)
except OSError:
pass
# rm job_output file
try:
output_file = '/tmp/job-' + str(job_id)
os.remove(output_file)
except OSError:
pass
job_info = {}
parse = cmd_msg.split(' ')
job_info['cmd'] = parse[0]
try:
job_info['cmd_args'] = json.loads(parse[1])
except IndexError:
job_info['cmd_args'] = {}
job_info['cmd_rowid'] = cmd_rowid
job_info['job_command'] = job_command
job_info['cmd_step_no'] = cmd_step_no
job_info['depend_on_job_id'] = depend_on_job_id
job_info['has_dependent'] = has_dependent
job_info['job_command'] = job_command
job_info['status'] = job_status
job_info['create_datetime'] = create_datetime
job_info['status_datetime'] = str(datetime.now())
# only restart if we haven't already seen this command
# we assume that we can always use the highest numbered job for a given command
if job_command != last_command:
if job_status == 'SCHEDULED':
jobs_requested[job_id] = job_info
if has_dependent == "Y":
prereq_info ['status'] = 'SCHEDULED'
prereq_jobs[job_id] = prereq_info
else:
jobs_to_restart[job_id] = job_info
# Add to active_commands
active_commands[cmd_rowid] = cmd_msg
if job_info['cmd'] == "INST-ZIMS":
id = job_info['cmd_args']['zim_id']
if id in kiwix_catalog:
zims_wip[id] = kiwix_catalog[id]
else:
log(syslog.LOG_ERR, "Error: Unknown kiwix zim - %s." % id)
#print "unknown kiwix zim - ", id
else: # cancel duplicate
jobs_to_cancel[job_id] = job_info
last_command = job_command
cur.close()
conn.close()
for job_id in jobs_to_cancel:
upd_job_cancelled(job_id)
if jobs_to_cancel[job_id]['has_dependent'] == "Y":
prereq_info ['status'] = 'CANCELLED'
prereq_jobs[job_id] = prereq_info
# fix up prereq_jobs with status of completed prereq jobs, not selected in the previous query
conn = sqlite3.connect(xsce_cmdsrv_dbpath)
sql = "SELECT rowid, job_status from jobs WHERE rowid IN (SELECT depend_on_job_id FROM jobs where job_status IN ('STARTED', 'RESTARTED', 'SCHEDULED'))"
cur = conn.execute(sql)
for row in cur.fetchall():
job_id, job_status = row
if not job_id in prereq_jobs:
prereq_info ['status'] = job_status
prereq_jobs[job_id] = prereq_info
cur.close()
conn.close()
def app_config():
global xsce_cmdsrv_path
global xsce_cmdsrv_pid_file
global xsce_cmdsrv_dbpath
# Read variables from xsce config file
read_xsce_conf_file()
# Read variables from xsce-cmdsrv config file
read_xsce_cmdsrv_conf_file()
# Compute app variables from variables in these files
xsce_cmdsrv_path = xsce_base_path + "/xsce_cmdsrv"
xsce_cmdsrv_pid_file = "/var/run/xsce-cmdsrv.pid"
xsce_cmdsrv_dbpath = xsce_cmdsrv_path + "/" + xsce_cmdsrv_dbname
def read_xsce_cmdsrv_conf_file():
global cmdsrv_no_workers
global cmdsrv_job_poll_sleep_interval
global cmdsrv_max_concurrent_jobs
global kiwix_catalog_file
global zim_downloads_dir
global zim_download_prefix
global rachel_downloads_dir
global rachel_working_dir
global rachel_version
global ansible_playbook_program
global ansible_program
global apache_data
global squid_service
global df_program
global squid_whitelist
stream = open (xsce_cmdsrv_config_file,"r")
inp = json.load(stream)
stream.close()
conf = inp['xsce-cmdsrv_conf']
cmdsrv_no_workers = conf['cmdsrv_no_workers']
cmdsrv_job_poll_sleep_interval = conf['cmdsrv_job_poll_sleep_interval']
cmdsrv_max_concurrent_jobs = conf['cmdsrv_max_concurrent_jobs']
xsce_cmdsrv_pid_file = conf['xsce_cmdsrv_pid_file']
kiwix_catalog_file = conf['kiwix_catalog_file']
zim_downloads_dir = conf['zim_downloads_dir']
zim_download_prefix = conf['zim_download_prefix']
rachel_downloads_dir = conf['rachel_downloads_dir']
rachel_working_dir = conf['rachel_working_dir']
squid_service = conf['squid_service']
squid_whitelist = "/etc/%s/sites.whitelist.txt" % squid_service
ansible_playbook_program = conf['ansible_playbook_program']
ansible_program = conf['ansible_program']
apache_data = conf['apache_data']
df_program = conf['df_program']
# These two were taken from the OLPC idmgr application
def read_xsce_conf_file():
global xsce_base_path
global xsce_git_path
load_config = {}
try:
f = open(xsce_config_file, "r")
try:
for line in f:
line = line.lstrip()
if line == '' or line.startswith('#'):
continue
k, v = [x.strip() for x in line.split('=', 1)]
try:
v = float(v)
v = int(v)
except ValueError:
pass
if isinstance(v, str):
#Maybe it's quoted.
for quote in '\'"':
if (v.startswith(quote) and v.endswith(quote)
and quote not in v[1:-1]):
v = v[1:-1]
break
load_config[k] = v
except IOError:
load_config = {}
finally:
f.close()
except IOError:
log(syslog.LOG_ERR, 'Error reading XSCE Command Server Config')
else:
xsce_base_path = load_config['XSCE_BASE_PATH']
xsce_git_path = load_config['XSCE_DIR']
def compute_vars():
global zim_dir
global zim_content_dir
global zim_index_dir
zim_dir = effective_vars['xsce_zim_path']
zim_content_dir = zim_dir + "/content/"
zim_index_dir = zim_dir + "/index/"
def set_ready_flag(on_off):
if on_off == "ON":
try:
ready_file = open( xsce_cmdsrv_ready_file, "w" );
ready_file.write( "ready" );
ready_file.write( "\n" );
ready_file.close();
except OSError, e:
syslog.openlog( 'xsce_cmdsrv', 0, syslog.LOG_USER )
syslog.syslog( syslog.LOG_ALERT, "Writing Ready file: %s [%d]" % (e.strerror, e.errno) )
syslog.closelog()
raise
else:
try:
os.remove(xsce_cmdsrv_ready_file)
except OSError, e:
syslog.openlog( 'xsce_cmdsrv', 0, syslog.LOG_USER )
syslog.syslog( syslog.LOG_ALERT, "Removing Ready file: %s [%d]" % (e.strerror, e.errno) )
syslog.closelog()
pass # log and keep going; we're just trying to shut down
def createDaemon(pidfilename):
"""Detach a process from the controlling terminal and run it in the
background as a daemon.
"""
pid = os.fork()
if (pid == 0): # The first child.
pid = os.fork()
if (pid == 0): # The second child.
# Since the current working directory may be a mounted filesystem,
# we avoid the issue of not being able to unmount the filesystem at
# shutdown time by changing it to the /opt directory where the cmdsrv is installed.
os.chdir(xsce_cmdsrv_path)
#os.umask(config.UMASK)
else:
# write out pid of child
try:
pidfile = open( pidfilename, "w" );
pidfile.write( str( pid ) );
pidfile.write( "\n" );
pidfile.close();
except OSError, e:
syslog.openlog( 'xsce_cmdsrv', 0, syslog.LOG_USER )
syslog.syslog( syslog.LOG_ALERT, "Writing PID file: %s [%d]" % (e.strerror, e.errno) )
syslog.closelog()
raise
os._exit(0) # Exit parent (the first child) of the second child.
else:
os._exit(0)# Exit parent of the first child.
# Close all open file descriptors.
# Use the getrlimit method to retrieve the maximum file descriptor
# number that can be opened by this process. If there is not limit
# on the resource, use the default value.
#
import resource # Resource usage information.
maxfd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
if (maxfd == resource.RLIM_INFINITY):
maxfd = MAXFD
# Iterate through and close all file descriptors.
for fd in range(0, maxfd):
try:
os.close(fd)
except OSError: # ERROR, fd wasn't open to begin with (ignored)
pass
# Redirect the standard I/O file descriptors to the specified file. Since
# the daemon has no controlling terminal, most daemons redirect stdin,
# stdout, and stderr to /dev/null. This is done to prevent side-effects
# from reads and writes to the standard I/O file descriptors.
# This call to open is guaranteed to return the lowest file descriptor,
# which will be 0 (stdin), since it was closed above.
os.open(os.devnull, os.O_RDWR) # standard input (0)
# Duplicate standard input to standard output and standard error.
os.dup2(0, 1)# standard output (1)
os.dup2(0, 2)# standard error (2)
return(0)
# Now start the application
if __name__ == "__main__":
# do the UNIX double-fork magic if --daemon passed as argument
if len(sys.argv) > 1:
if sys.argv[1] in ('--daemon'):
retCode = createDaemon( xsce_cmdsrv_pid_file )
#retCode = createDaemon( sys.argv[1] ) # pass pid file
daemon_mode = True
if len(sys.argv) == 1: # create pseudo pid file so php client can test for it
try:
pidfile = open( xsce_cmdsrv_pid_file, "w" );
pidfile.write( str( 0 ) );
pidfile.write( "\n" );
pidfile.close();
except OSError, e:
syslog.openlog( 'xsce_cmdsrv', 0, syslog.LOG_USER )
syslog.syslog( syslog.LOG_ALERT, "Writing PID file: %s [%d]" % (e.strerror, e.errno) )
syslog.closelog()
raise
syslog.openlog( 'xsce_cmdsrv', 0, syslog.LOG_USER )
log = syslog.syslog
log(syslog.LOG_INFO, 'Starting XSCE Command Server')
# Now run the main routine
main()