import re
import simplejson as json
import subprocess
import shutil
import logging
from pathlib import Path
_LOGGER = logging.getLogger()
[docs]
class RScript:
"""
A generic interface to executing an R script.
It uses the ``Rscript`` executable found in the PATH. It expects to be
given a root directory (likely a temporary one) where it'll look for its
input files and produce its outputs.
"""
def __init__(self, root_path):
[docs]
self.root_path = Path(root_path)
[docs]
self.rscript_exe = shutil.which('Rscript')
if self.rscript_exe is None:
raise ValueError("Could not find `Rscript` executable in PATH")
[docs]
def run(self, script_path, *args):
script_path = Path(script_path).absolute()
result = subprocess.run(
[self.rscript_exe, script_path, *args],
cwd=self.root_path,
capture_output=True,
)
if result.returncode != 0:
self._log_failure(result)
raise ValueError(f"Failed RScript call: {script_path}")
for line in result.stderr.decode('utf-8').split("\n"):
_LOGGER.warning(line)
return result.stdout.decode('utf-8')
[docs]
def write_csv(self, filename, df):
df.to_csv(self.root_path / filename, index=False)
[docs]
def write_json(self, filename, data):
with open(self.root_path / filename, 'w') as f:
json.dump(data, f)
[docs]
def read_key_value_json(self, filename, key_name, value_name):
raw_data = self._read_raw_json(filename)
if raw_data is None:
return None
return {entry[key_name]: entry[value_name] for entry in raw_data}
[docs]
def read_flat_json(self, filename, discard_keys=[]):
raw_data = self._read_raw_json(filename)
if raw_data is None:
return None
if len(raw_data) == 0:
return None
else:
return {k: v for k, v in raw_data[0].items() if k not in discard_keys}
[docs]
def get_r_version(self):
result = subprocess.run(
[self.rscript_exe, '--version'],
cwd=self.root_path,
capture_output=True,
)
if result.returncode == 0:
output = result.stdout.decode('utf-8').strip()
return re.sub(r'^.*version ', '', output)
else:
self._log_failure(result)
return None
[docs]
def get_growthrates_version(self):
result = subprocess.run(
[self.rscript_exe, '-e', 'library(growthrates); getNamespaceVersion("growthrates")'],
cwd=self.root_path,
capture_output=True,
)
if result.returncode == 0:
output = result.stdout.decode('utf-8').split()[-1]
return re.sub('"', '', output).strip()
else:
self._log_failure(result)
return None
def _read_raw_json(self, filename):
path = self.root_path / filename
if not path.exists():
return None
text = path.read_text()
_LOGGER.info(f"{filename}: {text}")
return json.loads(text, use_decimal=True)
def _log_failure(self, result):
_LOGGER.error("STDOUT:")
for line in result.stdout.decode('utf-8').split("\n"):
_LOGGER.error(line)
_LOGGER.error("STDERR:")
for line in result.stderr.decode('utf-8').split("\n"):
_LOGGER.error(line)