Skip to content

k3wsjobd

Action-CI Documentation Status Package

Gevent-based WebSocket server for async job processing. Receives job descriptions from clients, runs them asynchronously, and reports progress back periodically.

k3wsjobd is a component of pykit3 project: a python3 toolkit set.

Installation

pip install k3wsjobd

Quick Start

import k3wsjobd

# Start WebSocket job server
k3wsjobd.run(ip='127.0.0.1', port=33445)

API Reference

k3wsjobd

This module is a gevent based websocket server. When the server receives a job description from a client, it runs that job asynchronously in a thread, and reports the progress back to the client periodically.

Job

Bases: object

Source code in k3wsjobd/wsjobd.py
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
class Job(object):
    lock = threading.RLock()
    sessions = {}

    def __init__(self, channel, msg, func):
        """
        Job description is a string formatted in json, it is used to tell wsjobd what to do,
         it can contain the following fields:
        :param channel:
        report_system_load: a boolean to indicate whether to report system load, if set to true and the progress info
        is a dict, then the system load info will be add to progress dict by key `system_load`, the value is also a dict
        , which contains three fields: `mem_available`, `cpu_idle_percent`, `client_number`.
        cpu_sample_interval: set the interval used by psutil.cpu_times_percent, the default is 0.02
        :param msg:
        check_load: a dict to enable system load check, also to set customed thresholds, the can contain the following fields
        ident : required. the identifier of a job, it is used to prevent from creating the same job repeatedly.
        progress: a dict to set progress reporting related settings, it can contain the following fields:
            - `interval`: the interval of progress reporting, default is 5 seconds
        `key`: the sub field in which the progress info located
        :param func: required. the function of job, it contain module name and function name, seperated by a dot,
        the module shoud in the `jobs` directory.
        """
        self.ident = msg["ident"]
        self.channel = channel
        self.data = msg
        self.worker = func
        self.ctx = {}
        self.err = None
        self.progress_available = threading.Event()

        if self.ident in self.sessions:
            logger.info(
                "job: %s already exists, created by chennel %s" % (self.ident, repr(self.sessions[self.ident].channel))
            )
            return
        else:
            self.sessions[self.ident] = self
            logger.info(
                ("inserted job: %s to sessions by channel %s, " + "there are %d jobs in sessions now")
                % (self.ident, repr(self.channel), len(self.sessions))
            )

        self.thread = k3thread.start(target=self.work, args=(), daemon=True)

    def work(self):
        logger.info("job %s started, the data is: %s" % (self.ident, self.data))

        try:
            self.worker(self)
        except Exception as e:
            logger.exception("job %s got exception: %s" % (self.ident, repr(e)))
            self.err = e
        finally:
            logger.info("job %s ended" % self.ident)
            self.close()

    def close(self):
        with self.lock:
            del self.sessions[self.ident]
            logger.info(
                ("removed job: %s from sessions, there are %d " + "jobs in sessions now")
                % (self.ident, len(self.sessions))
            )

__init__(channel, msg, func)

Job description is a string formatted in json, it is used to tell wsjobd what to do, it can contain the following fields: :param channel: report_system_load: a boolean to indicate whether to report system load, if set to true and the progress info is a dict, then the system load info will be add to progress dict by key system_load, the value is also a dict , which contains three fields: mem_available, cpu_idle_percent, client_number. cpu_sample_interval: set the interval used by psutil.cpu_times_percent, the default is 0.02 :param msg: check_load: a dict to enable system load check, also to set customed thresholds, the can contain the following fields ident : required. the identifier of a job, it is used to prevent from creating the same job repeatedly. progress: a dict to set progress reporting related settings, it can contain the following fields: - interval: the interval of progress reporting, default is 5 seconds key: the sub field in which the progress info located :param func: required. the function of job, it contain module name and function name, seperated by a dot, the module shoud in the jobs directory.

Source code in k3wsjobd/wsjobd.py
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
def __init__(self, channel, msg, func):
    """
    Job description is a string formatted in json, it is used to tell wsjobd what to do,
     it can contain the following fields:
    :param channel:
    report_system_load: a boolean to indicate whether to report system load, if set to true and the progress info
    is a dict, then the system load info will be add to progress dict by key `system_load`, the value is also a dict
    , which contains three fields: `mem_available`, `cpu_idle_percent`, `client_number`.
    cpu_sample_interval: set the interval used by psutil.cpu_times_percent, the default is 0.02
    :param msg:
    check_load: a dict to enable system load check, also to set customed thresholds, the can contain the following fields
    ident : required. the identifier of a job, it is used to prevent from creating the same job repeatedly.
    progress: a dict to set progress reporting related settings, it can contain the following fields:
        - `interval`: the interval of progress reporting, default is 5 seconds
    `key`: the sub field in which the progress info located
    :param func: required. the function of job, it contain module name and function name, seperated by a dot,
    the module shoud in the `jobs` directory.
    """
    self.ident = msg["ident"]
    self.channel = channel
    self.data = msg
    self.worker = func
    self.ctx = {}
    self.err = None
    self.progress_available = threading.Event()

    if self.ident in self.sessions:
        logger.info(
            "job: %s already exists, created by chennel %s" % (self.ident, repr(self.sessions[self.ident].channel))
        )
        return
    else:
        self.sessions[self.ident] = self
        logger.info(
            ("inserted job: %s to sessions by channel %s, " + "there are %d jobs in sessions now")
            % (self.ident, repr(self.channel), len(self.sessions))
        )

    self.thread = k3thread.start(target=self.work, args=(), daemon=True)

License

The MIT License (MIT) - Copyright (c) 2015 Zhang Yanpo (张炎泼)