From 580cbb84a9d897c305612c883b56ad697f271d92 Mon Sep 17 00:00:00 2001 From: winlin Date: Wed, 23 Apr 2014 12:17:22 +0800 Subject: [PATCH] add threading.Lock for cherrypy, or performance suffer. --- trunk/research/api-server/server.py | 369 +++++++++++++++------------- 1 file changed, 200 insertions(+), 169 deletions(-) diff --git a/trunk/research/api-server/server.py b/trunk/research/api-server/server.py index 8a6893a67..12dab3189 100755 --- a/trunk/research/api-server/server.py +++ b/trunk/research/api-server/server.py @@ -368,7 +368,8 @@ class RESTServers(object): self.__nodes = [] self.__last_update = datetime.datetime.now(); - server_ip = "192.168.1.142"; + + self.__lock = threading.Lock() def __get_node(self, device_id): for node in self.__nodes: @@ -415,28 +416,33 @@ class RESTServers(object): ''' def POST(self): enable_crossdomain() - - req = cherrypy.request.body.read() - trace("post to nodes, req=%s"%(req)) - try: - json_req = json.loads(req) - except Exception, ex: - code = Error.system_parse_json - trace("parse the request to json failed, req=%s, ex=%s, code=%s"%(req, ex, code)) - return json.dumps({"code":code, "data": None}) - - device_id = json_req["device_id"] - node = self.__get_node(device_id) - if node is None: - node = ArmServer() - self.__nodes.append(node) - - node.ip = json_req["ip"] - node.device_id = device_id - node.public_ip = cherrypy.request.remote.ip - node.heartbeat = time.time() - return json.dumps({"code":Error.success, "data": {"id":node.id}}) + try: + self.__lock.acquire() + + req = cherrypy.request.body.read() + trace("post to nodes, req=%s"%(req)) + try: + json_req = json.loads(req) + except Exception, ex: + code = Error.system_parse_json + trace("parse the request to json failed, req=%s, ex=%s, code=%s"%(req, ex, code)) + return json.dumps({"code":code, "data": None}) + + device_id = json_req["device_id"] + node = self.__get_node(device_id) + if node is None: + node = ArmServer() + self.__nodes.append(node) + + node.ip = json_req["ip"] + node.device_id = device_id + node.public_ip = cherrypy.request.remote.ip + node.heartbeat = time.time() + + return json.dumps({"code":Error.success, "data": {"id":node.id}}) + finally: + self.__lock.release() ''' id canbe: @@ -455,58 +461,63 @@ class RESTServers(object): def GET(self, id=None, action="play", stream="live/livestream", index=None, local="false", device_id=None): enable_crossdomain() - self.__refresh_nodes() - data = self.__json_dump_nodes(self.__nodes) + try: + self.__lock.acquire() - server_ip = "demo.chnvideo.com" - ip = cherrypy.request.remote.ip - if type is not None: - peers = self.__get_peers_for_play(device_id) - if len(peers) > 0: - server_ip = self.__select_peer(peers, device_id) - - # demo, srs meeting urls. - if id == "meeting": - if index is None: - url = "http://%s:8085"%(server_ip) - elif local == "true": - url = "http://%s:8085/api/v1/servers?id=%s&index=%s&local=false"%(server_ip, id, index) + self.__refresh_nodes() + data = self.__json_dump_nodes(self.__nodes) + + server_ip = "demo.chnvideo.com" + ip = cherrypy.request.remote.ip + if type is not None: + peers = self.__get_peers_for_play(device_id) + if len(peers) > 0: + server_ip = self.__select_peer(peers, device_id) + + # demo, srs meeting urls. + if id == "meeting": + if index is None: + url = "http://%s:8085"%(server_ip) + elif local == "true": + url = "http://%s:8085/api/v1/servers?id=%s&index=%s&local=false"%(server_ip, id, index) + else: + rtmp_url = root.api.v1.chats.get_url_by_index(index) + if rtmp_url is None: + return "meeting stream not found" + urls = rtmp_url.replace("...vhost...", "?vhost=").replace("rtmp://", "").split("/") + hls_url = "http://%s:8080/%s/%s.m3u8"%(urls[0].strip(":19350").strip(":1935"), urls[1].split("?")[0], urls[2]) + return self.__generate_hls(hls_url) + # raspberry-pi urls. + elif id == "ingest" or id == "pi": + if action == "play": + url = "http://%s:8080/%s.html"%(server_ip, stream) + elif action == "rtmp": + url = "../../players/srs_player.html?server=%s&vhost=%s&app=%s&stream=%s&autostart=true"%(server_ip, server_ip, stream.split("/")[0], stream.split("/")[1]) + elif action == "hls": + hls_url = "http://%s:8080/%s.m3u8"%(server_ip, stream); + if stream.startswith("http://"): + hls_url = stream; + return self.__generate_hls(hls_url.replace(".m3u8.m3u8", ".m3u8")) + else: + url = "http://%s:8080/api/v1/versions"%(server_ip) + elif id == "gslb": + return json.dumps({"code":Error.success, "data": { + "edge":server_ip, "client":ip, + "peers":self.__json_dump_nodes(peers), + "streams": { + "hls-livestream-sales": "http://demo.chnvideo.com:8085/api/v1/servers?id=ingest&action=hls&device_id=chnvideo-sales-arm&stream=live/livestream", + "hls-cztv-sales": "http://demo.chnvideo.com:8085/api/v1/servers?id=ingest&action=hls&device_id=chnvideo-sales-arm&stream=live/rtmp_cztv01-sd", + "hls-livestream-dev": "http://demo.chnvideo.com:8085/api/v1/servers?id=ingest&action=hls&device_id=chnvideo-dev-arm&stream=live/livestream", + "hls-cztv-dev": "http://demo.chnvideo.com:8085/api/v1/servers?id=ingest&action=hls&device_id=chnvideo-dev-arm&stream=live/rtmp_cztv01-sd" + } + }}) + # others, default. else: - rtmp_url = root.api.v1.chats.get_url_by_index(index) - if rtmp_url is None: - return "meeting stream not found" - urls = rtmp_url.replace("...vhost...", "?vhost=").replace("rtmp://", "").split("/") - hls_url = "http://%s:8080/%s/%s.m3u8"%(urls[0].strip(":19350").strip(":1935"), urls[1].split("?")[0], urls[2]) - return self.__generate_hls(hls_url) - # raspberry-pi urls. - elif id == "ingest" or id == "pi": - if action == "play": - url = "http://%s:8080/%s.html"%(server_ip, stream) - elif action == "rtmp": - url = "../../players/srs_player.html?server=%s&vhost=%s&app=%s&stream=%s&autostart=true"%(server_ip, server_ip, stream.split("/")[0], stream.split("/")[1]) - elif action == "hls": - hls_url = "http://%s:8080/%s.m3u8"%(server_ip, stream); - if stream.startswith("http://"): - hls_url = stream; - return self.__generate_hls(hls_url.replace(".m3u8.m3u8", ".m3u8")) - else: - url = "http://%s:8080/api/v1/versions"%(server_ip) - elif id == "gslb": - return json.dumps({"code":Error.success, "data": { - "edge":server_ip, "client":ip, - "peers":self.__json_dump_nodes(peers), - "streams": { - "hls-livestream-sales": "http://demo.chnvideo.com:8085/api/v1/servers?id=ingest&action=hls&device_id=chnvideo-sales-arm&stream=live/livestream", - "hls-cztv-sales": "http://demo.chnvideo.com:8085/api/v1/servers?id=ingest&action=hls&device_id=chnvideo-sales-arm&stream=live/rtmp_cztv01-sd", - "hls-livestream-dev": "http://demo.chnvideo.com:8085/api/v1/servers?id=ingest&action=hls&device_id=chnvideo-dev-arm&stream=live/livestream", - "hls-cztv-dev": "http://demo.chnvideo.com:8085/api/v1/servers?id=ingest&action=hls&device_id=chnvideo-dev-arm&stream=live/rtmp_cztv01-sd" - } - }}) - # others, default. - else: - return json.dumps(data) - #return "id=%s, action=%s, stream=%s, url=%s, index=%s, local=%s"%(id, action, stream, url, index, local) - raise cherrypy.HTTPRedirect(url) + return json.dumps(data) + #return "id=%s, action=%s, stream=%s, url=%s, index=%s, local=%s"%(id, action, stream, url, index, local) + raise cherrypy.HTTPRedirect(url) + finally: + self.__lock.release() def DELETE(self, id): enable_crossdomain() @@ -577,6 +588,10 @@ class RESTNodes(object): def __init__(self): self.__nodes = [] + # @remark, if there is shared data, such as the self.__nodes, + # we must use lock for cherrypy, or the cpu of cherrypy will high + # and performance suffer. + self.__lock = threading.Lock() def __get_node(self, id): for node in self.__nodes: @@ -629,113 +644,128 @@ class RESTNodes(object): def GET(self, type=None, format=None, origin=None, vhost=None, port=None, stream=None, node_id=None): enable_crossdomain() - self.__refresh_nodes() - data = self.__json_dump_nodes(self.__nodes) + try: + self.__lock.acquire() - ip = cherrypy.request.remote.ip - if type is not None: - server = origin - peers = self.__get_peers_for_play(ip) - if len(peers) > 0: - server = self.__select_peer(peers, ip) - if type == "hls": - hls_url = "http://%s:%s/%s.m3u8"%(server, port, stream) - hls_url = hls_url.replace(".m3u8.m3u8", ".m3u8") - if format == "html": - return SrsUtility().hls_html(hls_url) - else: - #return hls_url - raise cherrypy.HTTPRedirect(hls_url) - elif type == "rtmp": - rtmp_url = "rtmp://%s:%s/%s?vhost=%s/%s"%(server, port, stream.split("/")[0], vhost, stream.split("/")[1]) - if format == "html": - html = "%s?server=%s&port=%s&vhost=%s&app=%s&stream=%s&autostart=true"%( - "http://demo.chnvideo.com:8085/srs/trunk/research/players/srs_player.html", - server, port, vhost, stream.split("/")[0], stream.split("/")[1]) - #return html - raise cherrypy.HTTPRedirect(html) - return rtmp_url - elif type == "gslb": - return json.dumps({"code":Error.success, "data": { - "edge":server, "client":ip, - "peers":self.__json_dump_nodes(peers), - "streams": { - "hls-cztv-html": "http://demo.chnvideo.com:8085/api/v1/nodes?type=hls&format=html&origin=demo.chnvideo.com&port=8080&stream=live/rtmp_cztv01-sd", - "hls-cztv-m3u8": "http://demo.chnvideo.com:8085/api/v1/nodes?type=hls&format=m3u8&origin=demo.chnvideo.com&port=8080&stream=live/rtmp_cztv01-sd", - "rtmp-cztv-html": "http://demo.chnvideo.com:8085/api/v1/nodes?type=rtmp&format=html&origin=demo.chnvideo.com&vhost=__defaultVhost__&port=1935&stream=live/rtmp_cztv01-sd", - "hls-livestream-html": "http://demo.chnvideo.com:8085/api/v1/nodes?type=hls&format=html&origin=demo.chnvideo.com&port=8080&stream=live/livestream", - "hls-livestream-m3u8": "http://demo.chnvideo.com:8085/api/v1/nodes?type=hls&format=m3u8&origin=demo.chnvideo.com&port=8080&stream=live/livestream", - "rtmp-livestream-html": "http://demo.chnvideo.com:8085/api/v1/nodes?type=rtmp&format=html&origin=demo.chnvideo.com&vhost=demo.srs.com&port=1935&stream=live/livestream", - "apk": "http://demo.chnvideo.com/android.srs.apk" - } - }}) - - return json.dumps({"code":Error.success, "data": data}) + self.__refresh_nodes() + data = self.__json_dump_nodes(self.__nodes) + + ip = cherrypy.request.remote.ip + if type is not None: + server = origin + peers = self.__get_peers_for_play(ip) + if len(peers) > 0: + server = self.__select_peer(peers, ip) + if type == "hls": + hls_url = "http://%s:%s/%s.m3u8"%(server, port, stream) + hls_url = hls_url.replace(".m3u8.m3u8", ".m3u8") + if format == "html": + return SrsUtility().hls_html(hls_url) + else: + #return hls_url + raise cherrypy.HTTPRedirect(hls_url) + elif type == "rtmp": + rtmp_url = "rtmp://%s:%s/%s?vhost=%s/%s"%(server, port, stream.split("/")[0], vhost, stream.split("/")[1]) + if format == "html": + html = "%s?server=%s&port=%s&vhost=%s&app=%s&stream=%s&autostart=true"%( + "http://demo.chnvideo.com:8085/srs/trunk/research/players/srs_player.html", + server, port, vhost, stream.split("/")[0], stream.split("/")[1]) + #return html + raise cherrypy.HTTPRedirect(html) + return rtmp_url + elif type == "gslb": + return json.dumps({"code":Error.success, "data": { + "edge":server, "client":ip, + "peers":self.__json_dump_nodes(peers), + "streams": { + "hls-cztv-html": "http://demo.chnvideo.com:8085/api/v1/nodes?type=hls&format=html&origin=demo.chnvideo.com&port=8080&stream=live/rtmp_cztv01-sd", + "hls-cztv-m3u8": "http://demo.chnvideo.com:8085/api/v1/nodes?type=hls&format=m3u8&origin=demo.chnvideo.com&port=8080&stream=live/rtmp_cztv01-sd", + "rtmp-cztv-html": "http://demo.chnvideo.com:8085/api/v1/nodes?type=rtmp&format=html&origin=demo.chnvideo.com&vhost=__defaultVhost__&port=1935&stream=live/rtmp_cztv01-sd", + "hls-livestream-html": "http://demo.chnvideo.com:8085/api/v1/nodes?type=hls&format=html&origin=demo.chnvideo.com&port=8080&stream=live/livestream", + "hls-livestream-m3u8": "http://demo.chnvideo.com:8085/api/v1/nodes?type=hls&format=m3u8&origin=demo.chnvideo.com&port=8080&stream=live/livestream", + "rtmp-livestream-html": "http://demo.chnvideo.com:8085/api/v1/nodes?type=rtmp&format=html&origin=demo.chnvideo.com&vhost=demo.srs.com&port=1935&stream=live/livestream", + "apk": "http://demo.chnvideo.com/android.srs.apk" + } + }}) + + return json.dumps({"code":Error.success, "data": data}) + finally: + self.__lock.release() def PUT(self): enable_crossdomain() - - req = cherrypy.request.body.read() - trace("put to nodes, req=%s"%(req)) + try: - json_req = json.loads(req) - except Exception, ex: - code = Error.system_parse_json - trace("parse the request to json failed, req=%s, ex=%s, code=%s"%(req, ex, code)) - return json.dumps({"code":code, "data": None}) - - id = str(json_req["id"]) - node = self.__get_node(id) - if node is None: - code = Error.cdn_node_not_exists - trace("cdn node not exists, req=%s, id=%s, code=%s"%(req, id, code)) - return json.dumps({"code":code, "data": None}) - - node.heartbeat = time.time() - node.srs_status = str(json_req["srs_status"]) - node.ip = str(json_req["ip"]) - if "origin" in json_req: - node.origin = str(json_req["origin"]); - node.public_ip = cherrypy.request.remote.ip - # reset if restart. - if node.srs_status != "running": - node.clients = 0 + self.__lock.acquire() + + req = cherrypy.request.body.read() + trace("put to nodes, req=%s"%(req)) + try: + json_req = json.loads(req) + except Exception, ex: + code = Error.system_parse_json + trace("parse the request to json failed, req=%s, ex=%s, code=%s"%(req, ex, code)) + return json.dumps({"code":code, "data": None}) - self.__refresh_nodes() - peers = self.__get_peers(node) - peers_data = self.__json_dump_nodes(peers) + id = str(json_req["id"]) + node = self.__get_node(id) + if node is None: + code = Error.cdn_node_not_exists + trace("cdn node not exists, req=%s, id=%s, code=%s"%(req, id, code)) + return json.dumps({"code":code, "data": None}) - res = json.dumps({"code":Error.success, "data": {"id":node.id, "peers":peers_data}}) - trace(res) - return res + node.heartbeat = time.time() + node.srs_status = str(json_req["srs_status"]) + node.ip = str(json_req["ip"]) + if "origin" in json_req: + node.origin = str(json_req["origin"]); + node.public_ip = cherrypy.request.remote.ip + # reset if restart. + if node.srs_status != "running": + node.clients = 0 + + self.__refresh_nodes() + peers = self.__get_peers(node) + peers_data = self.__json_dump_nodes(peers) + + res = json.dumps({"code":Error.success, "data": {"id":node.id, "peers":peers_data}}) + trace(res) + return res + finally: + self.__lock.release() def POST(self): enable_crossdomain() - - req = cherrypy.request.body.read() - trace("post to nodes, req=%s"%(req)) - try: - json_req = json.loads(req) - except Exception, ex: - code = Error.system_parse_json - trace("parse the request to json failed, req=%s, ex=%s, code=%s"%(req, ex, code)) - return json.dumps({"code":code, "data": None}) - - node = CdnNode() - node.ip = str(json_req["ip"]); - node.os = str(json_req["os"]); - if "origin" in json_req: - node.origin = str(json_req["origin"]); - node.srs_status = str(json_req["srs_status"]) - self.__nodes.append(node) - - self.__refresh_nodes() - peers = self.__get_peers(node) - peers_data = self.__json_dump_nodes(peers) - res = json.dumps({"code":Error.success, "data": {"id":node.id, "peers":peers_data}}) - trace(res) - return res + try: + self.__lock.acquire() + + req = cherrypy.request.body.read() + trace("post to nodes, req=%s"%(req)) + try: + json_req = json.loads(req) + except Exception, ex: + code = Error.system_parse_json + trace("parse the request to json failed, req=%s, ex=%s, code=%s"%(req, ex, code)) + return json.dumps({"code":code, "data": None}) + + node = CdnNode() + node.ip = str(json_req["ip"]); + node.os = str(json_req["os"]); + if "origin" in json_req: + node.origin = str(json_req["origin"]); + node.srs_status = str(json_req["srs_status"]) + self.__nodes.append(node) + + self.__refresh_nodes() + peers = self.__get_peers(node) + peers_data = self.__json_dump_nodes(peers) + + res = json.dumps({"code":Error.success, "data": {"id":node.id, "peers":peers_data}}) + trace(res) + return res + finally: + self.__lock.release() def OPTIONS(self, *args, **kwargs): enable_crossdomain() @@ -966,7 +996,8 @@ conf = { 'server.socket_port': port, 'tools.encode.on': True, 'tools.staticdir.on': True, - 'tools.encode.encoding': "utf-8" + 'tools.encode.encoding': "utf-8", + 'server.thread_pool': 2, # single thread server. }, '/': { 'tools.staticdir.dir': static_dir,