Commit b689357f authored by Alok Saldanha's avatar Alok Saldanha
Browse files

Applied "Formatting and new enumeration for Cellxgene-gateway" patch

parent df812f32
......@@ -13,7 +13,7 @@ from threading import Thread
from flask_api import status
from cellxgene_gateway import env
from cellxgene_gateway.cache_entry import CacheEntry
from cellxgene_gateway.cache_entry import CacheEntry, CacheEntryStatus
from cellxgene_gateway.cellxgene_exception import CellxgeneException
from cellxgene_gateway.subprocess_backend import SubprocessBackend
......@@ -22,8 +22,10 @@ process_backend = SubprocessBackend()
def is_port_in_use(port):
import socket
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
return s.connect_ex(('localhost', port)) == 0
return s.connect_ex(("localhost", port)) == 0
class BackendCache:
def __init__(self):
......@@ -38,7 +40,9 @@ class BackendCache:
matches = [
c
for c in contents
if c.key.dataset == key.dataset and c.key.annotation_file == key.annotation_file and c.status != "terminated"
if c.key.dataset == key.dataset
and c.key.annotation_file == key.annotation_file
and c.status != CacheEntryStatus.terminated
]
if len(matches) == 0:
......
......@@ -6,17 +6,25 @@
# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
# OR CONDITIONS OF ANY KIND, either express or implied. See the License for
# the specific language governing permissions and limitations under the License.
import psutil
import logging
import datetime
import logging
from flask import make_response, request, render_template
import psutil
from enum import Enum
from flask import make_response, render_template, request
from requests import get, post, put
from cellxgene_gateway import env
from cellxgene_gateway.cellxgene_exception import CellxgeneException
from cellxgene_gateway.util import current_time_stamp
from cellxgene_gateway.flask_util import querystring
from cellxgene_gateway.util import current_time_stamp
class CacheEntryStatus(Enum):
loaded = "loaded"
loading = "loading"
error = "error"
terminated = "terminated"
class CacheEntry:
def __init__(
......@@ -26,7 +34,7 @@ class CacheEntry:
port,
launchtime,
timestamp,
status,
status: CacheEntryStatus,
message,
all_output,
stderr,
......@@ -52,7 +60,7 @@ class CacheEntry:
port,
current_time_stamp(),
current_time_stamp(),
"loading",
CacheEntryStatus.loading,
None,
None,
None,
......@@ -61,13 +69,13 @@ class CacheEntry:
def set_loaded(self, pid):
self.pid = pid
self.status = "loaded"
self.status = CacheEntryStatus.loaded
def set_error(self, message, stderr, http_status):
self.message = message
self.stderr = stderr
self.http_status = http_status
self.status = "error"
self.status = CacheEntryStatus.error
def append_output(self, output):
if self.all_output == None:
......@@ -77,10 +85,12 @@ class CacheEntry:
def terminate(self):
pid = self.pid
if pid != None and self.status != "terminated":
if pid != None and self.status != CacheEntryStatus.terminated:
terminated = []
def on_terminate(p):
terminated.append(p.pid)
p = psutil.Process(pid)
children = p.children()
for child in children:
......@@ -89,44 +99,46 @@ class CacheEntry:
terminated.append(p.pid)
p.terminate()
psutil.wait_procs([p], callback=on_terminate)
logging.getLogger("cellxgene_gateway").info(f"terminated {terminated}")
self.status = "terminated"
logging.getLogger("cellxgene_gateway").info(
f"terminated {terminated}"
)
self.status = CacheEntryStatus.terminated
def serve_content(self, path):
gateway_basepath = (
f"{env.external_protocol}://{env.external_host}/view/{self.key.pathpart}/"
)
gateway_basepath = f"{env.external_protocol}://{env.external_host}/view/{self.key.pathpart}/"
subpath = path[len(self.key.pathpart) :] # noqa: E203
if len(subpath) == 0:
r = make_response(f"Redirect to {gateway_basepath}\n", 301)
r.headers["location"] = gateway_basepath+querystring()
r.headers["location"] = gateway_basepath + querystring()
return r
elif self.status == "loading":
elif self.status == CacheEntryStatus.loading:
launch_time = datetime.datetime.fromtimestamp(self.launchtime)
return render_template(
"loading.html", launchtime=launch_time, all_output=self.all_output
"loading.html",
launchtime=launch_time,
all_output=self.all_output,
)
port = self.port
cellxgene_basepath = f"http://127.0.0.1:{port}"
headers = {}
copy_headers = [
'accept',
'accept-encoding',
'accept-language',
'cache-control',
'connection',
'content-length',
'content-type',
'cookie',
'host',
'origin',
'pragma',
'referer',
'sec-fetch-mode',
'sec-fetch-site',
'user-agent'
"accept",
"accept-encoding",
"accept-language",
"cache-control",
"connection",
"content-length",
"content-type",
"cookie",
"host",
"origin",
"pragma",
"referer",
"sec-fetch-mode",
"sec-fetch-site",
"user-agent",
]
for h in copy_headers:
if h in request.headers:
......@@ -135,20 +147,14 @@ class CacheEntry:
full_path = cellxgene_basepath + subpath + querystring()
if request.method in ["GET", "HEAD", "OPTIONS"]:
cellxgene_response = get(
full_path, headers=headers
)
cellxgene_response = get(full_path, headers=headers)
elif request.method == "PUT":
cellxgene_response = put(
full_path,
headers=headers,
data=request.data,
full_path, headers=headers, data=request.data,
)
elif request.method == "POST":
cellxgene_response = post(
full_path,
headers=headers,
data=request.data,
full_path, headers=headers, data=request.data,
)
else:
raise CellxgeneException(
......@@ -169,9 +175,7 @@ class CacheEntry:
resp_headers[h] = cellxgene_response.headers[h]
gateway_response = make_response(
gateway_content,
cellxgene_response.status_code,
resp_headers,
gateway_content, cellxgene_response.status_code, resp_headers,
)
return gateway_response
......@@ -17,11 +17,12 @@ from cellxgene_gateway.cellxgene_exception import CellxgeneException
# There are three kinds of CacheKey:
# 1) somedir/dataset.h5ad: a dataset
# in this case, pathpart == dataset == 'somedir/dataset.h5ad'
# 2) somedir/dataset_annotations/saldaal1-T5HMVBNV.csv : an actual annotaitons file.
# in this case, pathpart == 'dataset_annotations/saldaal1-T5HMVBNV.csv', dataset == 'somedir/dataset.h5ad'
# 2) somedir/dataset_annotations/my_annotations.csv : an actual annotaitons file.
# in this case, pathpart == 'dataset_annotations/my_annotations.csv', dataset == 'somedir/dataset.h5ad'
# 3) somedir/dataset_annotations: an annotation directory. The corresponding h5ad must exist, but the directory may not.
# in this case, pathpart == 'dataset_annotations', dataset == 'somedir/dataset.h5ad'
class CacheKey:
def __init__(self, pathpart, dataset, annotation_file):
self.pathpart = pathpart
......
......@@ -51,8 +51,13 @@ def create_dir(parent_path, dir_name):
else:
os.mkdir(full_path)
annotations_suffix = '_annotations'
annotations_suffix = "_annotations"
def make_h5ad(el):
return el[:-len(annotations_suffix)]+'.h5ad'
return el[: -len(annotations_suffix)] + ".h5ad"
def make_annotations(el):
return el[:-5]+annotations_suffix
return el[:-5] + annotations_suffix
......@@ -7,21 +7,32 @@
# OR CONDITIONS OF ANY KIND, either express or implied. See the License for
# the specific language governing permissions and limitations under the License.
import os
import logging
import os
import socket
cellxgene_location = os.environ.get("CELLXGENE_LOCATION")
cellxgene_data = os.environ.get("CELLXGENE_DATA")
gateway_port = int(os.environ.get("GATEWAY_PORT", "5005"))
external_host = os.environ.get("EXTERNAL_HOST", os.environ.get("GATEWAY_HOST", f"localhost:{gateway_port}"))
external_protocol = os.environ.get("EXTERNAL_PROTOCOL", os.environ.get("GATEWAY_PROTOCOL", "http"))
external_host = os.environ.get(
"EXTERNAL_HOST",
os.environ.get("GATEWAY_HOST", f"localhost:{gateway_port}"),
)
external_protocol = os.environ.get(
"EXTERNAL_PROTOCOL", os.environ.get("GATEWAY_PROTOCOL", "http")
)
ip = os.environ.get("GATEWAY_IP")
extra_scripts = os.environ.get("GATEWAY_EXTRA_SCRIPTS")
ttl = os.environ.get("GATEWAY_TTL")
enable_upload = os.environ.get("GATEWAY_ENABLE_UPLOAD", "").lower() in ['true', '1']
enable_annotations = os.environ.get("GATEWAY_ENABLE_ANNOTATIONS", "").lower() in ['true', '1']
enable_backed_mode = os.environ.get("GATEWAY_ENABLE_BACKED_MODE", "").lower() in ['true', '1']
enable_upload = os.environ.get(
"GATEWAY_ENABLE_UPLOAD", ""
).lower() in ["true", "1"]
enable_annotations = os.environ.get(
"GATEWAY_ENABLE_ANNOTATIONS", ""
).lower() in ["true", "1"]
enable_backed_mode = os.environ.get(
"GATEWAY_ENABLE_BACKED_MODE", ""
).lower() in ['true', '1']
env_vars = {
"CELLXGENE_LOCATION": cellxgene_location,
......@@ -40,6 +51,7 @@ optional_env_vars = {
"GATEWAY_ENABLE_BACKED_MODE": enable_backed_mode,
}
def validate():
if not all(env_vars.values()):
raise ValueError(
......@@ -58,5 +70,9 @@ def validate():
"""
)
else:
logging.getLogger("cellxgene_gateway").info(f"Got required env: {env_vars}", )
logging.getLogger("cellxgene_gateway").info(f"Got optional env: {optional_env_vars}")
logging.getLogger("cellxgene_gateway").info(
f"Got required env: {env_vars}",
)
logging.getLogger("cellxgene_gateway").info(
f"Got optional env: {optional_env_vars}"
)
......@@ -7,9 +7,10 @@
# OR CONDITIONS OF ANY KIND, either express or implied. See the License for
# the specific language governing permissions and limitations under the License.
from cellxgene_gateway import env
from json import loads
from cellxgene_gateway import env
def get_extra_scripts():
# can be array of script tags to inject on every page, e.g. for google analytics could be
......
......@@ -19,20 +19,40 @@ def recurse_dir(path):
all_entries = sorted(os.listdir(path))
def is_h5ad(el):
return el.endswith('.h5ad') and os.path.isfile(os.path.join(path, el))
return el.endswith(".h5ad") and os.path.isfile(os.path.join(path, el))
h5ad_entries = [x for x in all_entries if is_h5ad(x)]
annotation_dir_entries = [x for x in all_entries if x.endswith(annotations_suffix) and make_h5ad(x) in h5ad_entries]
annotation_dir_entries = [
x
for x in all_entries
if x.endswith(annotations_suffix) and make_h5ad(x) in h5ad_entries
]
def list_annotations(el):
full_path = os.path.join(path, el)
if not os.path.isdir(full_path):
entries = []
else:
entries = [{
"name": x[:-13] if (len(x) > 13 and x[-13] in ['-','_']) else (
x[:-4] if x.endswith('.csv') else x),
"path": os.path.join(full_path, x).replace(env.cellxgene_data, ""),
} for x in sorted(os.listdir(full_path)) if x.endswith('.csv') and os.path.isfile(os.path.join(full_path, x))]
return [{"name":'new', "class":'new', "path":full_path.replace(env.cellxgene_data, "")}] + entries
entries = [
{
"name": x[:-13]
if (len(x) > 13 and x[-13] in ["-", "_"])
else (x[:-4] if x.endswith(".csv") else x),
"path": os.path.join(full_path, x).replace(
env.cellxgene_data, ""
),
}
for x in sorted(os.listdir(full_path))
if x.endswith(".csv")
and os.path.isfile(os.path.join(full_path, x))
]
return [
{
"name": "new",
"class": "new",
"path": full_path.replace(env.cellxgene_data, ""),
}
] + entries
def make_entry(el):
full_path = os.path.join(path, el)
......@@ -63,16 +83,26 @@ def recurse_dir(path):
def render_entries(entries):
return "<ul>" + "\n".join([render_entry(e) for e in entries]) + "</ul>"
def get_url(entry):
return f"/view/{ entry['path'].lstrip('/') }"
def get_class(entry):
return f" class='{entry['class']}'" if 'class' in entry else ''
return f" class='{entry['class']}'" if "class" in entry else ""
def render_annotations(entry):
if len(entry['annotations']) > 0:
return ' | annotations: ' + ", ".join([f"<a href='{get_url(a)}'{get_class(a)}>{a['name']}</a>" for a in entry['annotations']])
if len(entry["annotations"]) > 0:
return " | annotations: " + ", ".join(
[
f"<a href='{get_url(a)}'{get_class(a)}>{a['name']}</a>"
for a in entry["annotations"]
]
)
else:
return ''
return ""
def render_entry(entry):
if entry["type"] == "file":
......
......@@ -7,16 +7,17 @@
# OR CONDITIONS OF ANY KIND, either express or implied. See the License for
# the specific language governing permissions and limitations under the License.
import json
import logging
# import BaseHTTPServer
import os
import logging
from threading import Thread, Lock
import json
from threading import Lock, Thread
from flask import (
Flask,
redirect,
make_response,
redirect,
render_template,
request,
send_from_directory,
......@@ -27,22 +28,27 @@ from werkzeug.utils import secure_filename
from cellxgene_gateway import env
from cellxgene_gateway.backend_cache import BackendCache
from cellxgene_gateway.cache_entry import CacheEntryStatus
from cellxgene_gateway.cellxgene_exception import CellxgeneException
from cellxgene_gateway.dir_util import create_dir, is_subdir
from cellxgene_gateway.filecrawl import recurse_dir, render_entries
from cellxgene_gateway.extra_scripts import get_extra_scripts
from cellxgene_gateway.filecrawl import recurse_dir, render_entries
from cellxgene_gateway.path_util import get_key
from cellxgene_gateway.process_exception import ProcessException
from cellxgene_gateway.prune_process_cache import PruneProcessCache
from cellxgene_gateway.util import current_time_stamp
from cellxgene_gateway.path_util import get_key
app = Flask(__name__)
def _force_https(app):
def wrapper(environ, start_response):
environ['wsgi.url_scheme'] = env.external_protocol
environ["wsgi.url_scheme"] = env.external_protocol
return app(environ, start_response)
return wrapper
app.wsgi_app = _force_https(app.wsgi_app)
cache = BackendCache()
......@@ -114,6 +120,7 @@ def index():
enable_upload=env.enable_upload,
)
def make_user():
dir_name = request.form["directory"]
......@@ -135,13 +142,17 @@ def upload_file():
upload_dir = request.form["path"]
full_upload_path = os.path.join(env.cellxgene_data, upload_dir)
if is_subdir(full_upload_path, env.cellxgene_data) and os.path.isdir(full_upload_path):
if is_subdir(full_upload_path, env.cellxgene_data) and os.path.isdir(
full_upload_path
):
if request.method == "POST":
if "file" in request.files:
f = request.files["file"]
if f and f.filename.endswith(".h5ad"):
f.save(
os.path.join(full_upload_path, secure_filename(f.filename))
os.path.join(
full_upload_path, secure_filename(f.filename)
)
)
return redirect("/filecrawl.html", code=302)
else:
......@@ -163,31 +174,40 @@ def upload_file():
if env.enable_upload:
app.add_url_rule('/make_user', 'make_user', make_user, methods=["POST"])
app.add_url_rule('/make_subdir', 'make_subdir', make_subdir, methods=["POST"])
app.add_url_rule('/upload_file', 'upload_file', upload_file, methods=["POST"])
app.add_url_rule("/make_user", "make_user", make_user, methods=["POST"])
app.add_url_rule(
"/make_subdir", "make_subdir", make_subdir, methods=["POST"]
)
app.add_url_rule(
"/upload_file", "upload_file", upload_file, methods=["POST"]
)
@app.route("/filecrawl.html")
def filecrawl():
entries = recurse_dir(env.cellxgene_data)
rendered_html = render_entries(entries)
resp = make_response(render_template(
"filecrawl.html",
extra_scripts=get_extra_scripts(),
rendered_html=rendered_html,
))
resp = make_response(
render_template(
"filecrawl.html",
extra_scripts=get_extra_scripts(),
rendered_html=rendered_html,
)
)
resp.headers["Cache-Control"] = "no-cache, no-store, must-revalidate"
resp.headers["Pragma"] = "no-cache"
resp.headers["Expires"] = "0"
resp.headers['Cache-Control'] = 'public, max-age=0'
resp.headers["Cache-Control"] = "public, max-age=0"
return resp
@app.route("/filecrawl/<path:path>")
def do_filecrawl(path):
filecrawl_path = os.path.join(env.cellxgene_data, path)
if not os.path.isdir(filecrawl_path):
raise CellxgeneException(
"Path is not directory: " + filecrawl_path, status.HTTP_400_BAD_REQUEST
"Path is not directory: " + filecrawl_path,
status.HTTP_400_BAD_REQUEST,
)
entries = recurse_dir(filecrawl_path)
rendered_html = render_entries(entries)
......@@ -198,11 +218,16 @@ def do_filecrawl(path):
path=path,
)
entry_lock = Lock()
@app.route("/view/<path:path>", methods=["GET", "PUT", "POST"])
def do_view(path):
key = get_key(path)
print(f"view path={path}, dataset={key.dataset}, annotation_file= {key.annotation_file}, key={key.pathpart}")
print(
f"view path={path}, dataset={key.dataset}, annotation_file= {key.annotation_file}, key={key.pathpart}"
)
with entry_lock:
match = cache.check_entry(key)
if match is None:
......@@ -211,9 +236,9 @@ def do_view(path):
match.timestamp = current_time_stamp()
if match.status == "loaded" or match.status == "loading":
if match.status == CacheEntryStatus.loaded or match.status == CacheEntryStatus.loading:
return match.serve_content(path)
elif match.status == "error":
elif match.status == CacheEntryStatus.error:
raise ProcessException.from_cache_entry(match)
......@@ -221,16 +246,25 @@ def do_view(path):
def do_GET_status():
return render_template("cache_status.html", entry_list=cache.entry_list)
@app.route("/cache_status.json", methods=["GET"])
def do_GET_status_json():
return json.dumps({'launchtime':app.launchtime,
'entry_list':[{
'dataset': entry.key.dataset,
'annotation_file': entry.key.annotation_file,
'launchtime': entry.launchtime,
'last_access': entry.timestamp,
'status': entry.status
} for entry in cache.entry_list]})
return json.dumps(
{
"launchtime": app.launchtime,
"entry_list": [
{
"dataset": entry.key.dataset,
"annotation_file": entry.key.annotation_file,
"launchtime": entry.launchtime,
"last_access": entry.timestamp,
"status": entry.status,
}
for entry in cache.entry_list
],
}
)
@app.route("/relaunch/<path:path>", methods=["GET"])
def do_relaunch(path):
......@@ -239,7 +273,11 @@ def do_relaunch(path):
if not match is None:
match.terminate()
qs = request.query_string.decode()
return redirect(url_for("do_view", path=path) + (f'?{qs}' if len(qs) > 0 else ''), code=302)
return redirect(
url_for("do_view", path=path) + (f"?{qs}" if len(qs) > 0 else ""),
code=302,
)
@app.route("/terminate/<path:path>", methods=["GET"])
def do_terminate(path):
......@@ -251,7 +289,10 @@ def do_terminate(path):