0.0.0.0:80 python-varlink-rpc / master varlink_rpc.py
master

Tree @master (Download .tar.gz)

varlink_rpc.py @masterraw · history · blame

import os
import sys
import time
import subprocess
import signal
import selectors
from pathlib import Path
import varlink

task_dir = Path(os.environ.get('TARU_TASK_DIR', '/etc/varlink-rpc'))

service = varlink.Service()

class RequestHandler(varlink.RequestHandler):
    service = service

tasks = {}
for task in task_dir.glob('*'):
    second_line = task.read_text().split("\n")[1]
    if second_line.startswith('# Parameters:'):
        tasks[task.name] = second_line.split(':', 1)[1].strip()
    else:
        print(f"Task {task.name} doesn't have an interface definition!", file=sys.stderr)

task_descriptions = '\n'.join(f"method {name}({params}) -> (stream: (stdout, stderr, returncode), msg: string)" for (name, params) in tasks.items())

interface = varlink.Interface(f"""
interface pl.mewp.varlink-rpc

{task_descriptions}
""")

class TaruRunner:
    pass

for name in tasks.keys():
    def run(self, *args, _more):
        if not _more:
            yield varlink.InvalidParameter('more')

        task = subprocess.Popen(['systemd-run', '--user', '--scope', '--unit', name, str(task_dir / name), *map(str, args)], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        try:
            selector = selectors.DefaultSelector()
            selector.register(task.stdout, selectors.EVENT_READ)
            selector.register(task.stderr, selectors.EVENT_READ)
            while task.poll() is None:
                for key, _ in selector.select():
                    data = key.fileobj.read1().decode()
                    if not data:
                        break
                    if key.fileobj is task.stdout:
                        stream = 'stdout'
                    else:
                        stream = 'stderr'
                    yield {'stream': stream, 'msg': data, '_continues': True}

            yield {'stream': 'returncode', 'msg': str(task.returncode)}
        finally:
            if task.poll() is None:
                subprocess.run(['systemctl', '--user', 'stop', name + '.scope'], check=True)

    setattr(TaruRunner, name, run)

service.interfaces[interface.name] = interface
service.interfaces_handlers[interface.name] = TaruRunner()

if __name__ == '__main__':
    server = varlink.ThreadingServer('unix:varlink.socket', RequestHandler)
    server.serve_forever()