Post

How To Implement a Platform That Allows Quick Plugin Development: Part 1

How To Implement a Platform That Allows Quick Plugin Development

GitHub repository

In this post, I will provide a comprehensive overview of the implementation of a streamlined, plugin-based analytics platform. This platform is designed to enable users with basic Python proficiency to easily create and deploy new plugins for data retrieval and analysis from various sources. Utilizing YAML, plugin developers can define necessary parameters, specify an icon for display on the plugin overview page, and determine the rendering format of the results (e.g., raw JSON or a Leaflet Map). Plugins are dynamically loaded and presented to end users through a web-based plugin overview interface, where users can select a plugin, execute it, and asynchronously retrieve the results.

returns

This type of workflow is characteristic of intelligence organizations, where analysts must query vast amounts of data. These analysts are often experts in specialized fields such as foreign policy, chemical warfare, or counterterrorism, but may lack the quantitative skills required to perform complex SQL queries or run sophisticated algorithms necessary to extract insights and complete their reports. Their daily tasks often involve running repetitive queries, such as requesting location updates on specific targets.

The concept behind this platform is to streamline such processes. Once an analyst has outlined the requirements and steps for a recurring query, a data scientist with Python expertise can develop a plugin to automate this task. This plugin is then made accessible to the analyst through a standardized, user-friendly web interface.

The plugin developer does not need to engage in full-stack development to deploy their solution. Instead, they can define the plugin’s meta details—such as its name, icon, and required input parameters—declaratively. The platform handles the rest, including deploying and rendering the input form, ensuring the plugin is executable with a simple button click, and passing any inputs to the plugin’s logic. The developer’s primary focus remains on implementing the core business logic, with the platform managing the execution and presentation of the results.

Overview

As previously mentioned, the primary objective is to streamline the plugin development process, enabling developers to create plugins swiftly without being burdened by architectural decisions or other cross-cutting concerns such as logging, while ensuring that the plugins are readily available to end users. From a plugin developer’s perspective, the goal is to offer a framework where the developer only needs to supply a Python script implementing a single method containing the desired business logic, along with a YAML file specifying metadata about the plugin. This metadata includes the plugin’s name, description, icon, required input parameters, and output format. The combination of the implemented Python method and the metadata file constitutes the plugin, simplifying the developer’s responsibilities to just these two components.

The platform will read the available plugins from the plugin directory and present them to the end user through a web-based user interface. This UI will display all plugins along with their icons, names, and descriptions. Users can click on a plugin icon to be prompted for input values required to execute the plugin’s business logic. Upon execution, the results may not be immediately available, as some plugins may require more time to compute than others. Regardless of execution time, users will be directed to an overview page where all completed runs are listed.

On this overview page, users can view the results of plugin executions by clicking on the corresponding list item. Results will be presented either as raw JSON or in an alternative format specified by the plugin developer. The input arguments used during execution will also be displayed, allowing users to re-run the plugin if desired. Additionally, users have the option to bookmark results or save the query in a favorites list for future execution.

Architectural Landscape

This component, written in C++, is designed to listen for incoming queries. Each query comprises a query name and a dictionary of parameters, where each key represents the parameter name and each value represents the corresponding parameter value. Queries are sent to the platform via a REST API. Although gRPC could be used, I opted for a straightforward HTTP REST service. This service listens in a separate thread and receives JSON objects representing the queries.

Logging in the platform is also handled in a separate thread to avoid saturating the critical path with logging operations, which can significantly impact performance. Any log messages generated are published to a queue, which are subsequently picked up by the logger thread. This thread is responsible for writing the log messages to a file.

Once the API receives a new incoming query, it places it into a queue. A dispatcher process, running in a separate thread, reads these new queries from the queue and submits them to a thread pool to be executed in a separate thread when one becomes available. When a thread from the pool picks up a new task to run the query, it forwards the query name along with its arguments to the Python interpreter. The interpreter calls the Python query wrapper with the query, which then searches the plugin directory for a Python script named <QUERY_NAME>Strategy.py.

As mentioned earlier, these *Strategy.py scripts are the actual plugins that implement the business logic. Each script must define a class named <QUERY_NAME>Strategy with a single method that encapsulates the desired functionality.

1
2
3
4
5
6
7
8
9
class <QUERY_NAME>Strategy:
  '''
  parameters:
    args: dictionary containing the parameter values from the original request
  returns:
    results True on success, False otherwise.
  '''
  def run(self, parameters: Dict[str, List[str]]) -> tuple[bool, dict]:
    # Logic here

The Python wrapper loads this class and calls its run_strategy() method. The run_strategy() method processes the query and persists any results as a JSON string in an SQLite database, along with the strategy name and the input arguments for future reference. Additionally, the username of the user who called the query API is stored, allowing results to be fetched individually for each platform user.

returns

The key idea here is that the dispatcher does not wait for any plugin results, as this could disrupt the system’s asynchronicity, especially since some plugins might take a considerable amount of time to compute due to their complexity. Instead, when a plugin completes its computation, it simply persists the result in the database. These results can then be fetched from our straightforward web UI.

In essence, the platform component is responsible only for listening for queries and ensuring the execution of the corresponding plugins. By confining the platform’s responsibilities to this singular task, we can ensure that even if the platform encounters a failure, end users can still retrieve finished results without relying on the platform. Retrieval is managed by a separate component—our web UI. This decouples the various landscape components from each other, enhancing maintainability and reliability.

Plugin Development

As previously outlined, the plugin development process in Python involves creating a new Python file named <QUERY_NAME>Strategy.py and placing it in the plugins directory. Additionally, the developer must provide a <QUERY_NAME>Strategy.yaml configuration file detailing the plugin and its required parameters. This YAML file is also stored in the plugin directory. The Python wrapper selects the appropriate Python script to call based on the query name, while the web application parses the .yaml file to ensure proper rendering of the plugin in the UI. This entails making the plugin available for user selection and providing input fields for parameter values.

Below is an example of such a plugin acoompanied by its YAML file:

GeofencingStrategy.py

1
2
3
4
5
6
7
class GeofencingStrategy:
    def run(self, parameters: Dict[str, List[str]]) -> tuple[bool, dict]:
        # ...
        # Business Logic
        # ...
        result = {'targets': ['Jack', 'Joe', 'Walter']}
        return True, result

GeofencingStrategy.yaml

1
2
3
4
5
6
7
name: Geofencing
icon: earth # Font Awesome icon name
description: Find all active mobile devices within a particular geo-fence area.
parameters:
  - latitude
  - longitude
  - radius

Platform

Let us now explore the primary components of the platform that serve as the foundation of its entire architecture. This pivotal component assumes the critical responsibility of receiving queries, dispatching them, and ensuring their execution.

Let’s outline some essential helper classes and structures that we’ll need as we progress further.

ThreadPool

This component initiates a defined number of threads, each tasked with monitoring a queue of pending tasks. Upon the availability of a new task, the first available thread capable of executing it will proceed to do so.

Dispatcher/thread_pool.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
#ifndef THREAD_POOL_H
#define THREAD_POOL_H

#include <condition_variable>
#include <functional>
#include <future>
#include <mutex>
#include <shared_mutex>
#include <thread>
#include <vector>
#include "tasks_queue.h"

// https://www.sobyte.net/post/2022-05/design-a-thread-pool/
class Threadpool
{
public:
    using wlock = std::unique_lock<std::shared_mutex>;
    using rlock = std::shared_lock<std::shared_mutex>;

public:
    Threadpool() = default;
    ~Threadpool();
    Threadpool(const Threadpool &) = delete;
    Threadpool(Threadpool &&) = delete;
    Threadpool &operator=(const Threadpool &) = delete;
    Threadpool &operator=(Threadpool &&) = delete;

public:
    void Init(int num);
    void Terminate(); // stop and process all delegated tasks
    void Cancel();    // stop and drop all tasks remained in queue

public:
    bool Initialized() const;
    bool IsRunning() const;
    int Size() const;

public:
    template <class F, class... Args>
    auto Async(F &&f, Args &&...args) -> std::future<decltype(f(args...))>
    {
        using return_t = decltype(f(args...));
        using future_t = std::future<return_t>;
        using task_t = std::packaged_task<return_t()>;

        {
            rlock lock(m_mutex);
            if (m_stop || m_cancel)
            {
                throw std::runtime_error("Delegating task to a threadpool that has been terminated or canceled.");
            }
        }

        auto bind_func = std::bind(std::forward<F>(f), std::forward<Args>(args)...);
        std::shared_ptr<task_t> task = std::make_shared<task_t>(std::move(bind_func));
        future_t fut = task->get_future();
        m_tasks.Emplace([task]() -> void
                        { (*task)(); });
        m_cond.notify_one();
        return fut;
    }

    void Spawn();

private:
    std::vector<std::thread> m_workers;
    mutable TasksQueue<std::function<void()>> m_tasks;
    bool m_stop{false};
    bool m_cancel{false};
    bool m_inited{false};
    mutable std::shared_mutex m_mutex;
    std::condition_variable_any m_cond;
    mutable std::once_flag m_once;
};

#endif

Dispatcher/thread_pool.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
#include "thread_pool.h"

#include <mutex>
#include <thread>

void Threadpool::Init(int num)
{
    std::call_once(m_once, [this, num]()
                   {
        wlock lock(m_mutex);
        m_stop = false;
        m_cancel = false;
        m_workers.reserve(num);
        for (int i = 0; i < num; ++i) {
            m_workers.emplace_back(std::bind(&Threadpool::Spawn, this));
        }
        m_inited = true; });
}

void Threadpool::Spawn()
{
    for (;;)
    {
        bool pop = false;
        std::function<void()> task;
        {
            wlock lock(m_mutex);

            /**
             Why do we need to lock a mutex with our cv?

            If we didn't lock then what happens if scheduler decides to switch context from this thread
            to a different thread after we checked the condition and decided that it is not time to stop waiting?

            The other thread might make a changes that would otherwise trigger the cv condition to indicate that
            we need to stop waiting. Switching back to the first thread, these changes would go unnoticed (because
            we already run the condition) and it would still think we should go back to continue sleeping, possibly
            forever.
             */
            m_cond.wait(lock, [this, &pop, &task]
                        {
                pop = m_tasks.Pop(task);
                bool should_stop_waiting = m_cancel || m_stop || pop;
                return should_stop_waiting; });
        }
        if (m_cancel || (m_stop && !pop))
        {
            return;
        }

        task();
    }
}

void Threadpool::Terminate()
{
    {
        wlock lock(m_mutex);
        if (IsRunning())
        {
            m_stop = true;
        }
        else
        {
            return;
        }
    }
    m_cond.notify_all();
    for (auto &worker : m_workers)
    {
        worker.join();
    }
}

void Threadpool::Cancel()
{
    {
        wlock lock(m_mutex);
        if (IsRunning())
        {
            m_cancel = true;
        }
        else
        {
            return;
        }
    }
    m_tasks.Clear();
    m_cond.notify_all();
    for (auto &worker : m_workers)
    {
        worker.join();
    }
}
bool Threadpool::Initialized() const
{
    rlock lock(m_mutex);
    return m_inited;
}

bool Threadpool::IsRunning() const
{
    return m_inited && !m_stop && !m_cancel;
}

int Threadpool::Size() const
{
    rlock lock(m_mutex);
    return m_workers.size();
}

Threadpool::~Threadpool() {}

TasksQueue

This thread-safe queue serves as the conduit utilized by our thread pool to store any queued tasks awaiting execution.

Dispatcher/tasks_queue.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
#ifndef TASKS_QUEUE_H
#define TASKS_QUEUE_H

#include <mutex>
#include <queue>

template <typename T>
class TasksQueue : protected std::queue<T>
{
public:
    using wlock = std::unique_lock<std::shared_mutex>;
    using rlock = std::shared_lock<std::shared_mutex>;

public:
    TasksQueue() = default;
    ~TasksQueue() { Clear(); }
    TasksQueue(const TasksQueue &) = delete;
    TasksQueue(TasksQueue &&) = delete;
    TasksQueue &operator=(const TasksQueue &) = delete;
    TasksQueue &operator=(TasksQueue &&) = delete;

public:
    bool IsEmpty() const
    {
        rlock lock(m_mutex);
        return std::queue<T>::empty();
    }

    size_t Size() const
    {
        rlock lock(m_mutex);
        return std::queue<T>::size();
    }

public:
    void Clear()
    {
        wlock lock(m_mutex);
        while (!std::queue<T>::empty())
        {
            std::queue<T>::pop();
        }
    }

    void Push(const T &obj)
    {
        wlock lock(m_mutex);
        std::queue<T>::push(obj);
    }

    template <typename... Args>
    void Emplace(Args &&...args)
    {
        wlock lock(m_mutex);
        std::queue<T>::emplace(std::forward<Args>(args)...);
    }

    bool Pop(T &holder)
    {
        wlock lock(m_mutex);
        if (std::queue<T>::empty())
        {
            return false;
        }
        else
        {
            holder = std::move(std::queue<T>::front());
            std::queue<T>::pop();
            return true;
        }
    }

private:
    mutable std::shared_mutex m_mutex;
};

#endif

LFQueue

This lock-free queue boasts heightened efficiency compared to the previously defined TasksQueue. Unlike the TasksQueue, it is exclusively shared between two threads: the API for writing new query requests into the queue and the Dispatcher for polling them out of the queue.

Dispatcher/lf_queue.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
#pragma once

#include <iostream>
#include <vector>
#include <atomic>

template <typename T>
class LFQueue final
{
public:
    LFQueue(std::size_t num_elems) : m_store(num_elems, T()) /* pre-allocation of vector storage. */
    {
    }

    auto GetNextToWriteTo() noexcept
    {
        return &m_store[m_next_write_index];
    }

    auto UpdateWriteIndex() noexcept
    {
        m_next_write_index = (m_next_write_index + 1) % m_store.size();
        m_num_elements++;
    }

    auto GetNextToRead() const noexcept -> const T *
    {
        return (Size() ? &m_store[m_next_read_index] : nullptr);
    }

    auto updateReadIndex() noexcept
    {
        m_next_read_index = (m_next_read_index + 1) % m_store.size();
        m_num_elements--;
    }

    auto Size() const noexcept
    {
        return m_num_elements.load();
    }

    // Deleted default, copy & move constructors and assignment-operators.
    LFQueue() = delete;

    LFQueue(const LFQueue &) = delete;

    LFQueue(const LFQueue &&) = delete;

    LFQueue &operator=(const LFQueue &) = delete;

    LFQueue &operator=(const LFQueue &&) = delete;

private:
    std::vector<T> m_store;
    std::atomic<size_t> m_next_write_index = {0};
    std::atomic<size_t> m_next_read_index = {0};
    std::atomic<size_t> m_num_elements = {0};
};

ThreadUtils

This is simply a collection for spawning and running new threads.

Common/thread_utils.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#pragma once

#include <iostream>
#include <atomic>
#include <thread>
#include <unistd.h>
#include <sys/syscall.h>

template <typename T, typename... A>
inline auto CreateAndStartThread(int core_id, const std::string &name, T &&func, A &&...args) noexcept
{
  auto t = new std::thread([&]()
                           { std::forward<T>(func)((std::forward<A>(args))...); });

  using namespace std::literals::chrono_literals;
  std::this_thread::sleep_for(1s);

  return t;
}

SignalChannel

An object shared among different threads to check whether they should stop running. The condition variable is set when an interrupt signal is received. Common/signan_channel.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#ifndef SIGNAL_CHANNEL_H
#define SIGNAL_CHANNEL_H

#include <atomic>
#include <mutex>
#include <condition_variable>

class SignalChannel
{
public:
    std::atomic<bool> m_shutdown_requested = false;
    std::mutex m_cv_mutex;
    std::condition_variable m_cv;
};

#endif

REST API

The platform must be capable of handling query execution requests. Each request should include the name of the plugin to be executed, along with the necessary parameter values. To facilitate integration with external components, such as a web UI, the platform provides an interface for invoking plugin logic. For simplicity, I am using REST over HTTP to achieve this.

The API implementation is straightforward. I am using cpprestsdk to build the service. The API component accepts a pointer to our dispatcher and listens for incoming HTTP POST requests formatted as JSON. When a new request is received, it extracts the query name and forwards it, along with the parameters as a JSON string, to the Dispatcher. We will explore the Dispatcher in the next section.

Api/api.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
#ifndef API_H
#define API_H

#include "../Dispatcher/dispatcher.h"

#include <atomic>
#include <thread>
#include <memory>
#include <cpprest/http_listener.h>

class Api
{
public:
    Api(const std::string &name, const std::string &m_host, const int port, std::shared_ptr<Dispatcher> dispatcher);
    void Run();
    ~Api();

    Api() = delete;
    Api(const Api &) = delete;
    Api(const Api &&) = delete;
    Api &operator=(const Api &) = delete;
    Api &operator=(const Api &&) = delete;

private:
    std::string m_name;
    std::string m_host;
    int m_port = -1;
    std::shared_ptr<Dispatcher> m_dispatcher;
    std::atomic<bool> m_running = {true};
    std::thread *m_thread = nullptr;

    void HandlePost(web::http::http_request request);
};

#endif

Api/api.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
#include "api.h"
#include "../Common/thread_utils.h"

#include <cpprest/json.h>

#include <iostream>
#include <map>
#include <set>
#include <string>
#include <functional>

Api::Api(const std::string &name, const std::string &host, const int port, std::shared_ptr<Dispatcher> dispatcher) : m_name(name), m_host(host), m_port(port), m_dispatcher(dispatcher)
{
    m_thread = CreateAndStartThread(-1, m_name, [this]()
                                    {
                                        Run();
                                    });
}

void HandleRequest(
    web::http::http_request request,
    std::function<void(web::json::value const &, web::json::value &)> action)
{
    auto answer = web::json::value::object();

    request
        .extract_json()
        .then([&answer, &action](pplx::task<web::json::value> task)
              {
         try
         {
            auto const & jvalue = task.get();

            if (!jvalue.is_null())
            {
               action(jvalue, answer);
            }
         }
         catch (web::http::http_exception const &e)
         {
            std::cout << e.what() << std::endl;
         } })
        .wait();

    request.reply(web::http::status_codes::OK, answer);
}

void Api::HandlePost(web::http::http_request request)
{
    HandleRequest(
        request,
        [this](web::json::value const &jvalue, web::json::value &answer)
        {
            Logging::INFO("Got json: " + jvalue.serialize(), m_name);
            std::string strategy_name = jvalue.at(U("name")).as_string();
            std::string parameters = jvalue.serialize();
            m_dispatcher->Dispatch(strategy_name, parameters);
        });
}

void Api::Run()
{
    std::string listenAddress = m_host + ":" + std::to_string(m_port) + "/api";
    web::http::experimental::listener::http_listener listener(listenAddress);

    listener.support(web::http::methods::POST, std::bind(&Api::HandlePost, this, std::placeholders::_1));

    try
    {
        listener
            .open()
            .then([&listener, &listenAddress]()
                  { std::cout << "Started listening on: " << listenAddress << std::endl; })
            .wait();

        while (m_running)
        {
        }
    }
    catch (std::exception const &e)
    {
        std::cout << e.what() << std::endl;
    }
}

Api::~Api()
{
    m_running = false;
    m_thread->join();
}

Dispatcher

The dispatcher possesses its own thread object, which begins operation upon the creation of the dispatcher. This thread continuously polls a queue containing incoming query requests. Each request is a tuple comprising the name of the plugin to be executed and the necessary arguments. When a new request is available in the queue, the dispatcher thread retrieves it, instantiates a new generic Python strategy object, wraps it as a new task using a lambda construct, and submits it to the thread pool for execution.

The dispatcher is also responsible for instantiating the Python interpreter. In the future, I will consider instantiating a separate Python interpreter for each plugin execution, as this feature is supported by Python 3.12 and later versions. For now, a single interpreter should suffice to avoid making this post overly complex.

The dispatcher utilizes the lock-free queue, which is specifically designed to be shared between only two threads: a publisher and a subscriber. This design is perfectly suitable for our needs. It is important to note that this differs from our thread pool task queue, which is shared among more than two threads.

Dispatcher/dispatcher.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
#pragma once

#include <string>
#include <fstream>
#include <cstdio>
#include <utility>
#include <Python.h>
#include <thread>
#include "lf_queue.h"
#include "Playground.h"
#include "thread_pool.h"
#include "../Common/thread_utils.h"
#include "../Logging/logging.h"
#include "../Strategies/strategy_factory.h"

constexpr size_t DISPATCHER_QUEUE_SIZE = 8 * 1024 * 1024;

class Dispatcher final
{

public:
    void FlushQueue() noexcept
    {

        while (m_running)
        {
            if (m_queue.Size())
            {
                auto next = m_queue.GetNextToRead();
                m_queue.updateReadIndex();

                auto query_name = std::get<0>(*next);
                auto parameters = std::get<1>(*next);

                Logging::INFO("Dispatching " + query_name, "Dispatcher");

                Strategy *strategy = StrategyFactory::GetInstance("python");

                auto task = [strategy, query_name](std::string params)
                {
                    strategy->Run(query_name, params);
                };

                m_tpool.Submit(task, parameters);
            }

            using namespace std::literals::chrono_literals;
            std::this_thread::sleep_for(10ms);
        }

        std::cout << "Dispatcher beak" << std::endl;
    }

    Dispatcher(const std::string name) : m_name(name), m_queue(DISPATCHER_QUEUE_SIZE)
    {
        PyImport_AppendInittab("Playground", PyInit_Playground);
        Py_Initialize();
        PyImport_ImportModule("Playground");
        m_thread = CreateAndStartThread(-1, "Dispatcher", [this]()
                                        { FlushQueue(); });
    }

    ~Dispatcher()
    {
        Logging::INFO("Shutting down", "Dispatcher");
        while (m_queue.Size())
        {
            using namespace std::literals::chrono_literals;
            std::this_thread::sleep_for(1s);
        }
        m_running = false;
        m_thread->join();
        Py_Finalize();
    }

    auto Dispatch(const std::string &name, std::string &params) noexcept
    {
        *(m_queue.GetNextToWriteTo()) = std::make_pair(name, params);
        m_queue.UpdateWriteIndex();
    }

    Dispatcher() = delete;
    Dispatcher(const Dispatcher &) = delete;
    Dispatcher(const Dispatcher &&) = delete;
    Dispatcher &operator=(const Dispatcher &) = delete;
    Dispatcher &operator=(const Dispatcher &&) = delete;

private:
    LFQueue<std::pair<std::string, std::string>> m_queue;
    std::atomic<bool> m_running = {true};
    std::thread *m_thread = nullptr;
    std::string m_name;
    ThreadPool m_tpool;
};

PythonStrategy

In the previous section, I discussed how the dispatcher creates a new generic Python strategy object. This strategy object, instantiated via a static factory, acts as an interface between the C++ and Python plugin environments. While I will not delve into the details of the factory’s implementation—since it follows a standard and straightforward pattern that can be referenced in the source code repository—it is worth reviewing the implementation of the PythonStrategy class. Despite its simplicity, understanding how this class facilitates the invocation of Python code from the platform’s C++ layer provides valuable insight.

Strategies/PythonStrategy.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#include "python_strategy.h"
#include "../Logging/logging.h"
#include <Python.h>
#include "Playground.h"

REGISTER_DEF_TYPE(PythonStrategy, python);

void PythonStrategy::Run(const std::string &query, const std::string &parameters) const
{
    Logging::INFO("Running:" + query, "PythonStrategy");

    char *pResult = call_do_work(query.c_str(), parameters.c_str());

    Logging::INFO("Result from Python call: " + std::string(pResult), "PythonStrategy");
}

The glue is the invocation of call_do_work(...). To understand this, we need to examine the Playground.pyx source code, which is written in Cython. Our CMake file invokes the Cython executable to generate Playground.h (C code) from the .pyx file, which we then use in our PythonStrategy implementation inorder to be able to actually call call_do_work(...).

1
2
3
// ...
#include "Playground.h"
// ...

Here is the Cython code:

PythonFunctions/Playground.pyx

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# cython: language_level=3
import sys
import os
dir_path = os.path.dirname(os.path.realpath(__file__))
sys.path.append(f"{dir_path}/PythonPlugins") # Update list of directories that the Python interpreter will search for when it tries to resolve a module name

from wrapper import do_work

cdef public char* call_do_work(const char* strategy_name, const char* params):
    try:
        success, msg = do_work(strategy_name.decode("UTF-8"), params.decode("UTF-8"))
        return msg.encode("UTF-8")
    except Exception as e:
        msg = f"Error: {e}"
        return msg.encode("UTF-8")

Python Wrapper

Now that we have transitioned into the Python environment, there are just a few additional components to implement to allow the user to call the appropriate Python plugin. As demonstrated in the code snippet above, we invoke a function called do_work(...), which accepts two parameters: the query name and the plugin arguments. Using the query name, we iterate over all Python files in the plugin directory that follow the naming convention <QUERY_NAME>Strategy.py. This process is handled within the find_and_run_strategy(...) function. Once the corresponding Python file is located, we instantiate the class of the same name and return it.

Next, we convert the plugin arguments from a JSON string into a Python dictionary and invoke the run(...) method on the newly instantiated strategy object. If the execution is successful, the result is persisted in a SQLite database. The code below also includes helper functions to forward the results to additional strategies, as specified by the user’s query. I will not delve into the details here, as the code is self-explanatory. In essence, the platform supports plugin chaining, where multiple strategies can be executed sequentially, with the result of one strategy being passed to the next. The final result is then persisted in the database. For example, a Geofencing strategy might return all individuals within a specified area, and this result could then be forwarded to a Contacts plugin, which would return the direct contacts of each target identified in the Geofencing query.

PythonPlugins/wrapper.py

from typing import Dict, List

from strategy_finder import find_and_run_strategy
def do_work(strategy_name: str, parameters: str) -> tuple[bool, str]:
    return find_and_run_strategy(strategy_name, parameters)

PythonPlugins/strategy_finder.py

from typing import Dict, List
import glob
import os
import importlib
from pathlib import Path
import sqlite3
from datetime import datetime, timezone
import json
import sys

def get_strategy(strategy_name: str):
    result = []
    path_list = Path(".").glob("**/*Strategy.py")
    for path in path_list:
        module_name = str(path).rsplit(".", 1)[0]
        class_name = module_name.rsplit("/",1)[-1]
        if class_name == strategy_name:
            try:
                module = importlib.import_module(class_name)
                importlib.reload(module)
            except Exception as e:
                print(f"Error: could not import module '{class_name}': {e}")
                return []
            try:
                clazz = getattr(module, class_name)
                i = clazz()
                result.append(i)
            except Exception as e:
                print(f"Error: could not instantiate class '{class_name}': {e}")
                return []
    return result

def persist(params, result_json: str, strategy_name: str) -> str:
    user = params["parameters"]["user"][0]
    case_id = params["parameters"]["caseId"][0]
    con = sqlite3.connect("result.db")
    cur = con.cursor()
    cur.execute("CREATE TABLE IF NOT EXISTS result (id INTEGER PRIMARY KEY AUTOINCREMENT, user TEXT, strategy TEXT, user_id TEXT, timestamp DATETIME, result_json TEXT, parameters_json TEXT)")
    con.commit()
    del params["parameters"]["user"]
    stmt = "INSERT INTO result VALUES (?,?,?,?,?,?,?)"
    cur.execute(stmt, (None, user, strategy_name, case_id, datetime.now(timezone.utc), result_json, json.dumps(params)))
    con.commit()

def do_work_recursive(strategy_name, params, original_strategy_name, original_forwards) -> tuple[bool, str]:
    is_final_recursive_call = False
    next_strategy = f"{strategy_name}Strategy"

    forwards = params["forwards"]
    del params["forwards"]

    user = params["parameters"]["user"][0]
    case_id = params["parameters"]["caseId"][0]

    strategies = get_strategy(next_strategy)
    if len(strategies) == 0:
        msg = f"Error: no strategy '{next_strategy}' found!"
        return False, msg

    strategy_to_run = strategies[0]
    strategy_success, result = strategy_to_run.run(params["parameters"])
    if not strategy_success:
        msg = f"Error: something went wrong running strategy '{next_strategy}'"
        return False, msg
    elif len(forwards) > 0:
        next_strategy = forwards[0]["name"]
        next_params = {"forwards": forwards[1:], "parameters": {"user": [user], "caseId": [case_id]}}
        forward_mappings = forwards[0]["parameters"]
        for mapping in forward_mappings:
            from_param = mapping["from"]
            to_param = mapping["to"]
            if len(to_param) > 0:
                next_params["parameters"][to_param] = [result[from_param]]

        strategy_success, msg = do_work_recursive(next_strategy, next_params, original_strategy_name, original_forwards)
        if not strategy_success:
            msg = f"Error: something went wrong running strategy '{next_strategy}': {msg}\n"
            return False, msg
    else:
        is_final_recursive_call = True

    if is_final_recursive_call:
        params["forwards"] = original_forwards
        persist(params, json.dumps(result), strategy_name)
    return True, "All Good!"

def find_and_run_strategy(strategy_name: str, params: str) -> tuple[bool, str]:
    params_dict = json.loads(params)
    params_as_list = {}
    for k, v in params_dict["parameters"].items():
        params_as_list[k] = [v]
    if "repeated" in params_dict:
        repeated = params_dict["repeated"]
        for k, v in repeated.items():
            params_as_list[k] = v
        del params_dict["repeated"]
    params_dict["parameters"] = params_as_list

    if "forwards" in params_dict:
        forwards = params_dict["forwards"]
    else:
        return False, "No forwards provided!"

    return do_work_recursive(strategy_name = strategy_name,
                             params= params_dict,
                             original_strategy_name = strategy_name,
                             original_forwards = forwards)

Gluing It All Together

We have now implemented all our components. It is time to integrate them and provide a main method as the entry point to our platform, which will instantiate and start all the components that constitute the platform. src/main.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
#include <iostream>
#include <string>
#include <unordered_map>
#include <signal.h>
#if __APPLE__
#include <sys/event.h> // for kqueue() etc.
#endif
#include "config.h"
#include "Dispatcher/dispatcher.h"
#include "Dispatcher/dispatcher_builder.h"
#include "Strategies/python_strategy.cpp"
#include "Common/signal_channel.h"
#include "Logging/logging.h"
#include "Api/api_builder.h"

/**
 * Create a return a shared channel for SIGINT signals.
 *
 */
std::shared_ptr<SignalChannel> listen_for_sigint(sigset_t &sigset)
{
    std::shared_ptr<SignalChannel> sig_channel = std::make_shared<SignalChannel>();

#ifdef __linux__
    // Listen for sigint event line Ctrl^c
    sigemptyset(&sigset);
    sigaddset(&sigset, SIGINT);
    sigaddset(&sigset, SIGTERM);
    pthread_sigmask(SIG_BLOCK, &sigset, nullptr);

    std::thread signal_handler{[&sig_channel, &sigset]()
                               {
                                   int signum = 0;

                                   // wait untl a signal is delivered
                                   sigwait(&sigset, &signum);
                                   sig_channel->m_shutdown_requested.store(true);

                                   // notify all waiting workers to check their predicate
                                   sig_channel->m_cv.notify_all();
                                   std::cout << "Received signal " << signum << "\n";
                                   return signum;
                               }};
    signal_handler.detach();
#elif __APPLE__
    std::thread signal_handler{[&sig_channel]()
                               {
                                   int kq = kqueue();

                                   /* Two kevent structs */
                                   struct kevent *ke = (struct kevent *)malloc(sizeof(struct kevent));

                                   /* Initialise struct for SIGINT */
                                   signal(SIGINT, SIG_IGN);
                                   EV_SET(ke, SIGINT, EVFILT_SIGNAL, EV_ADD, 0, 0, NULL);

                                   /* Register for the events */
                                   if (kevent(kq, ke, 1, NULL, 0, NULL) < 0)
                                   {
                                       perror("kevent");
                                       return false;
                                   }

                                   memset(ke, 0x00, sizeof(struct kevent));

                                   // Camp here for event
                                   if (kevent(kq, NULL, 0, ke, 1, NULL) < 0)
                                   {
                                       perror("kevent");
                                   }

                                   switch (ke->filter)
                                   {
                                   case EVFILT_SIGNAL:
                                       std::cout << "Received signal " << strsignal(ke->ident) << "\n";
                                       sig_channel->m_shutdown_requested.store(true);
                                       sig_channel->m_cv.notify_all();
                                       break;
                                   default:
                                       break;
                                   }

                                   return true;
                               }};
    signal_handler.detach();
#endif

    return sig_channel;
}

int main(int argc, char *argv[])
{

    std::cout << argv[0]
              << " Version "
              << MyApp_VERSION_MAJOR
              << "."
              << MyApp_VERSION_MINOR
              << std::endl;

    /*************************************************************************
     *
     * SIGINT CHANNEL
     *
     *************************************************************************/
    sigset_t sigset;
    std::shared_ptr<SignalChannel> signal_channel = listen_for_sigint(sigset);

    /*************************************************************************
     *
     * LOGGER
     *
     *************************************************************************/
    Logging::LogProcessor log_processor("Logger");

    /*************************************************************************
     *
     * DISPATCHER
     *
     *************************************************************************/
    std::shared_ptr<Dispatcher> dispatcher = DispatcherBuilder()
                                                 .WithName("Dispatcher")
                                                 .Build();

    /*************************************************************************
     *
     * API
     *
     *************************************************************************/
    std::unique_ptr<Api> api = ApiBuilder()
                                   .WithName("Api")
                                   .WithHost("http://localhost")
                                   .WithPort(8080)
                                   .WithDispatcher(dispatcher)
                                   .Build();

    while (true)
    {
        using namespace std::literals::chrono_literals;
        std::this_thread::sleep_for(1s);

        if (signal_channel->m_shutdown_requested.load())
        {
            break;
        }
    }

    Logging::INFO("Stopping", "Main");

    return 0;
}

Example Plugin

We are nearing completion of the core platform. Instead of presenting a fully developed plugin, I will provide a skeleton code for a simple “hello-world” style plugin, devoid of any business logic.

PythonPlugins/LocationStrategy.py

from typing import Dict, List

class LocationStrategy:
    def run(self, parameters: Dict[str, List[str]]) -> tuple[bool, dict]:
        print(f"Location with parameters={parameters}")
        result = {'loc': 'London', 'rating': 3.5}
        return True, result

Finally, starting the platform and running a strategy can be done with a command as simple as using curl.

1
curl --header "Content-Type: application/json" --request POST --data '{"name":"Location", "parameters": {"user":"fizzbuzzer", "caseId": "1", "term": "Lorem Ipsum"}, "forwards": [{"name": "Location", "parameters": [{"from": "loc", "to": "term"}]}]}' http://localhost:8080/api

In the next part of this article, I will introduce a front-end for the plugin platform. This interface will list and render the available plugins and allow for their execution through a web-based interface.

This post is licensed under CC BY 4.0 by the author.