"""Submit a job to a remote server and handle logging.
This module can be used either with Celery, in which case it will run in a
background thread, or as a normal function call, in which case it will block
the current execution thread.
"""
from __future__ import absolute_import, print_function, unicode_literals
import collections
import fnmatch
import io
import os
import os.path
import select
import socket
import sys
import time
from threading import Thread
from django.utils import timezone
import six
from django.core.files import File
from paramiko import AuthenticationException, BadHostKeyException
from paramiko.client import AutoAddPolicy, SSHClient
from celery.utils.log import get_task_logger
from .models import Interpreter, Job, Log, Result
from .wrapper.local import LocalWrapper
from .wrapper.remote import RemoteWrapper
logger = get_task_logger(__name__)
try:
from celery import shared_task
except ImportError:
logger.warning('Could not import Celery. '
'Tasks will not be implemented by Celery\'s queue.')
def shared_task(func):
"""Naive wrapper in case Celery does not exist."""
def delay(*args, **kwargs):
return func(*args, **kwargs)
func.delay = delay
return func
[docs]class LogPolicy(object):
"""Specify how logging should be done when running a job."""
LOG_NONE = 0
"""Don't log anything from the running job."""
LOG_LIVE = 1
"""Create Log objects immediately when they are received."""
LOG_TOTAL = 2
"""Combine all of stdout and stderr at the end of the job."""
[docs]def is_matching(filename, patterns=None):
"""Check if a filename matches the list of positive and negative patterns.
Positive patterns are strings like ``"1.txt"``, ``"[23].txt"``, or
``"*.txt"``.
Negative patterns are strings like ``"!1.txt"``, ``"![23].txt"``, or
``"!*.txt"``.
Each pattern is checked in turn, so the list of patterns ``["!*.txt",
"1.txt"]`` will still match ``"1.txt"``.
>>> from django_remote_submission.tasks import is_matching
>>> is_matching("1.txt", patterns=["1.txt"])
True
>>> is_matching("1.txt", patterns=["[12].txt"])
True
>>> is_matching("1.txt", patterns=["*.txt"])
True
>>> is_matching("1.txt", patterns=["1.txt", "!*.txt"])
False
>>> is_matching("1.txt", patterns=["!*.txt", "[12].txt"])
True
"""
if patterns is None:
patterns = ['*']
is_matching = False
for pattern in patterns:
if not pattern.startswith('!'):
if fnmatch.fnmatch(filename, pattern):
is_matching = True
else:
if fnmatch.fnmatch(filename, pattern[1:]):
is_matching = False
return is_matching
[docs]class LogContainer(object):
"""Manage logs sent by a job according to the log policy.
.. testsetup::
from django_remote_submission.models import Job, Server, Interpreter
from django.contrib.auth import get_user_model
python3 = Interpreter(name='Python 3', path='/bin/python3', arguments=['-u'])
server = Server(title='Remote', hostname='foo.invalid', port=22)
user = get_user_model()(username='john')
job = Job(title='My Job', program='print("hello world")',
remote_directory='/tmp/', remote_filename='foobar.py',
owner=user, server=server, interpreter=python3,
)
>>> from django_remote_submission.tasks import LogContainer, LogPolicy
>>> from datetime import datetime
>>> now = datetime(year=2017, month=1, day=2, hour=3, minute=4, second=5)
>>> logs = LogContainer(job, LogPolicy.LOG_LIVE)
>>> logs.write_stdout(now, 'hello world') # doctest: +SKIP
>>> Log.objects.get() # doctest: +SKIP
<Log: 2017-01-02 03:04:05 My Job>
"""
LogLine = collections.namedtuple('LogLine', [
'now', 'output',
])
[docs] def __init__(self, job, log_policy):
"""Instantiate a log container.
:param models.Job job: the job these logs are coming from
:param LogPolicy log_policy: the policy to use for logging
"""
self.job = job
"""The job that these logs are coming from."""
self.log_policy = log_policy
"""The policy to use when logging."""
self._stdout = []
"""The list of log lines that came from stdout."""
self._stderr = []
"""The list of log lines that came from stderr."""
def _write(self, lst, now, output):
"""Append the current log entry to the given list and flush.
:param lst: either :attr:`stdout` or :attr:`stderr`
:param datetime.datetime now: the time this line was produced
:param str output: the line of output from the job
"""
if self.log_policy != LogPolicy.LOG_NONE:
lst.append(LogContainer.LogLine(
now=now,
output=output,
))
if self.log_policy == LogPolicy.LOG_LIVE:
self.flush()
[docs] def write_stdout(self, now, output):
"""Write some output from a job's stdout stream.
:param datetime.datetime now: the time this output was produced
:param str output: the output that was produced
"""
self._write(self._stdout, now, output)
[docs] def write_stderr(self, now, output):
"""Write some output from a job's stderr stream.
:param datetime.datetime now: the time this output was produced
:param str output: the output that was produced
"""
self._write(self._stderr, now, output)
[docs] def flush(self):
"""Flush the stdout and stderr lists to Django models.
If the :attr:`log_policy` is :const:`LogPolicy.LOG_TOTAL`, this method
will need to be called at the end of the job to ensure all the data
gets written out.
There is no penalty for calling this method multiple times, so it can
be called at the end of the job regardless of which log policy is used.
"""
if len(self._stdout) > 0:
Log.objects.create(
time=self._stdout[-1].now,
content=''.join(line.output for line in self._stdout),
stream='stdout',
job=self.job,
)
del self._stdout[:]
if len(self._stderr) > 0:
Log.objects.create(
time=self._stderr[-1].now,
content=''.join(line.output for line in self._stderr),
stream='stderr',
job=self.job,
)
del self._stderr[:]
[docs]@shared_task
def submit_job_to_server(job_pk, password=None, public_key_filename=None, username=None,
timeout=None, log_policy=LogPolicy.LOG_LIVE,
store_results=None, remote=True):
"""Submit a job to the remote server.
This can be used as a Celery task, if the library is installed and running.
:param int job_pk: the primary key of the :class:`models.Job` to submit
:param str password: the password of the user submitting the job
:param public_key_filename: the path where it is.
:param str username: the username of the user submitting, if it is
different from the owner of the job
:param datetime.timedelta timeout: the timeout for running the job
:param LogPolicy log_policy: the policy to use for logging
:param list(str) store_results: the patterns to use for the results to store
:param bool remote: Either runs this task locally on the host or in a remote server.
"""
logger.debug("submit_job_to_server: %s", locals().keys())
wrapper_cls = RemoteWrapper if remote else LocalWrapper
job = Job.objects.get(pk=job_pk)
if username is None:
username = job.owner.username
wrapper = wrapper_cls(
hostname=job.server.hostname,
username=username,
port=job.server.port,
)
logs = LogContainer(
job=job,
log_policy=log_policy,
)
with wrapper.connect(password, public_key_filename):
wrapper.chdir(job.remote_directory)
with wrapper.open(job.remote_filename, 'wt') as f:
f.write(job.program)
time.sleep(1)
job.status = Job.STATUS.submitted
job.save()
interp = job.interpreter.path
workdir = job.remote_directory
args = job.interpreter.arguments
filename = job.remote_filename
job_status = wrapper.exec_command(
[interp] + args + [filename],
workdir,
timeout=timeout,
stdout_handler=logs.write_stdout,
stderr_handler=logs.write_stderr,
)
logs.flush()
job.status = Job.STATUS.success if job_status else Job.STATUS.failure
job.save()
file_attrs = wrapper.listdir_attr()
file_map = { attr.filename: attr for attr in file_attrs }
script_attr = file_map[job.remote_filename]
script_mtime = script_attr.st_mtime
results = []
for attr in file_attrs:
# logger.debug('Listing directory: {!r}'.format(attr))
if attr is script_attr:
continue
if attr.st_mtime < script_mtime:
continue
if not is_matching(attr.filename, store_results):
# logger.debug('Listing directory: is_matching: {}'.format(attr.filename))
continue
else:
# logger.debug('Listing directory: not is_matching: {}'.format(attr.filename))
pass
result = Result.objects.create(
remote_filename=attr.filename,
job=job,
)
with wrapper.open(attr.filename, 'rb') as f:
result.local_file.save(attr.filename, File(f), save=True)
results.append(result)
return { r.remote_filename: r.pk for r in results }
@shared_task
def copy_job_to_server(job_pk, password=None, public_key_filename=None, username=None,
timeout=None, log_policy=LogPolicy.LOG_LIVE,
store_results=None, remote=True):
"""Copy a job file to the remote server.
This can be used as a Celery task, if the library is installed and running.
:param int job_pk: the primary key of the :class:`models.Job` to submit
:param str password: the password of the user submitting the job
:param public_key_filename: the path where it is.
:param str username: the username of the user submitting, if it is
different from the owner of the job
:param datetime.timedelta timeout: the timeout for running the job
:param LogPolicy log_policy: the policy to use for logging
:param list(str) store_results: the patterns to use for the results to store
:param bool remote: Either runs this task locally on the host or in a remote server.
"""
logger.debug("copy_job_to_server: %s", locals().keys())
wrapper_cls = RemoteWrapper if remote else LocalWrapper
job = Job.objects.get(pk=job_pk)
if username is None:
username = job.owner.username
wrapper = wrapper_cls(
hostname=job.server.hostname,
username=username,
port=job.server.port,
)
with wrapper.connect(password, public_key_filename):
wrapper.chdir(job.remote_directory)
with wrapper.open(job.remote_filename, 'wt') as f:
f.write(job.program)
time.sleep(1)
job.status = Job.STATUS.submitted
job.save()
log = Log(
time=timezone.now(),
content='File {} successfully copied to {}.'.format(
job.remote_filename, job.remote_directory,
),
stream='stdout',
job=job,
)
log.save()
job.status = Job.STATUS.success
job.save()
return { }
[docs]@shared_task
def copy_key_to_server(username, password, hostname, port=22,
public_key_filename=None,
remote=True):
"""Copy the client key to the remote server so the next connections
do not need the password any further
This can be used as a Celery task, if the library is installed and running.
:param str username: the username of the user submitting
:param str password: the password of the user submitting the job
:param str hostname: The hostname used to connect to the server
:param int port: The port to connect to for SSH (usually 22)
:param public_key_filename: the path where it is.
:param bool remote: Either runs this task locally on the host or in a remote server.
"""
wrapper_cls = RemoteWrapper if remote else LocalWrapper
wrapper = wrapper_cls(
hostname=hostname,
username=username,
port=port,
)
# wrapper_cls.hostname = hostname
# wrapper_cls.username = username
# wrapper_cls.port = port
with wrapper.connect(password, public_key_filename):
wrapper.deploy_key_if_it_does_not_exist()
return None
[docs]@shared_task
def delete_key_from_server(username, password, hostname, port=22,
public_key_filename=None,
remote=True):
"""Delete the client key from the remote server so the next connections
will need password. This can be used at the logout of the session.
This can be used as a Celery task, if the library is installed and running.
:param str username: the username of the user submitting
:param str password: the password of the user submitting the job
:param str hostname: The hostname used to connect to the server
:param int port: The port to connect to for SSH (usually 22)
:param public_key_filename: the path where it is.
:param bool remote: Either runs this task locally on the host or in a remote server.
"""
wrapper_cls = RemoteWrapper if remote else LocalWrapper
wrapper = wrapper_cls(
hostname=hostname,
username=username,
port=port,
)
with wrapper.connect(password, public_key_filename):
wrapper.delete_key()
return None