Post

Efficient Directory Polling Implementation in C++ for Mac OS X

Efficient Directory Polling Implementation in C++ for Mac OS X

In this sample code project I demonstrate the implementation of a straightforward directory poller in C++. Typically, on a Linux system, the inotify API can be used for this purpose. However, due to its incompatibility with Mac OS, I adopt an alternative approach, leveraging the kqueue and kevent API.

The DirectoryPoller Class

Let us first define the interface of our DirectoryPoller class:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
#ifdef __APPLE__

#ifndef DIRECTORY_POLLER_MAC_OS_H
#define DIRECTORY_POLLER_MAC_OS_H
#include <memory>
#include <string>
#include <vector>
#include <sys/event.h> // for kqueue() etc.
#include <set>
#include "AbstractPoller.h"
#include "impl/PollResult.h"

class DirectoryPollerBuilder;

class DirectoryPoller : public AbstractPoller
{
public:
  DirectoryPoller(std::string name, std::string dir_to_watch, std::shared_ptr<SignalChannel> sig_channel);
  ~DirectoryPoller() override;
  AbstractPoller *clone() const override;

  friend class DirectoryPollerBuilder;
  static DirectoryPollerBuilder builder(std::string name);

private:
  int m_kq = -1;
  int m_fd = -1;
  struct kevent *m_ke;
  std::string m_dir_to_watch;
  std::vector<std::string> m_file_paths;
  std::set<std::string> m_last_files;
  bool init_dir_watch();
  std::set<std::string> list_files();
  PollResult poll() override;
  bool should_add_file(const std::string &f, bool starting_up);
  void clean() override;
};

#endif
#endif

And here is the associated implementation:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
#ifdef __APPLE__
#include "DirectoryPollerMacOS.h"
#include "DirectoryPollerBuilder.h"
#include "logging/Logging.h"
#include "Util.h"
#include <stdlib.h> // for srand(), rand()
#include <thread>
#include <sstream>
#include <string> // for strerror()
#include <chrono>
#include <iostream>
#include <stdio.h>
#include <errno.h>  // for errno
#include <fcntl.h>  // for O_RDONLY
#include <stdio.h>  // for fprintf()
#include <stdlib.h> // for EXIT_SUCCESS
#include <string.h> // for strerror()
#include <unistd.h> // for close()
#include <filesystem>
#include <algorithm> // for set_difference()
#include <signal.h>

DirectoryPoller::DirectoryPoller(std::string name, std::string dir_to_watch, std::shared_ptr<SignalChannel> sig_channel) : AbstractPoller(name, sig_channel), m_dir_to_watch(dir_to_watch)
{
  m_last_files = list_files();
  for (const std::string &f : m_last_files)
  {
    if (should_add_file(f, true))
    {
      Logging::INFO("Adding file:'" + f + "'", m_name);
      m_file_paths.emplace_back(f);
    }
  }
}

bool DirectoryPoller::should_add_file(const std::string &f, bool starting_up)
{
  return (starting_up || !Util::str_ends_with(f.c_str(), "_inprogress")) && !Util::str_ends_with(f.c_str(), "_done");
}

std::set<std::string> DirectoryPoller::list_files()
{
  std::set<std::string> files;
  for (const auto &entry : std::filesystem::directory_iterator(m_dir_to_watch))
  {
    files.emplace(entry.path());
  }
  return files;
}

bool DirectoryPoller::init_dir_watch()
{
  Logging::INFO("Watching '" + m_dir_to_watch + "'", m_name);

  /* A single kqueue */
  // https://www.freebsd.org/cgi/man.cgi?kqueue
  m_kq = kqueue();

  /* Two kevent structs */
  m_ke = (struct kevent *)malloc(sizeof(struct kevent));

  /* Initialise the struct for the file descriptor */
  m_fd = open(m_dir_to_watch.c_str(), O_RDONLY);
  EV_SET(m_ke, m_fd, EVFILT_VNODE, EV_ADD | EV_CLEAR, NOTE_DELETE | NOTE_RENAME | NOTE_WRITE, 0, NULL);

  /* Register for the event */
  if (kevent(m_kq, m_ke, 1, NULL, 0, NULL) < 0)
  {
    perror("kevent");
    return false;
  }

  return true;
}

PollResult DirectoryPoller::poll()
{
  // Do not wait for events if we already have files
  if (!m_file_paths.empty())
  {
    std::string p = m_file_paths.back();
    m_file_paths.pop_back();
    return PollResult(p);
  }
  else
  {
    // Poll for events
    if (m_fd == -1 || m_kq == -1)
    {
      if (!init_dir_watch())
      {
        Logging::ERROR("Unable to init directory watch for '" + m_dir_to_watch + "'", m_name);
        kill(getpid(), SIGINT);
      }
    }

    memset(m_ke, 0x00, sizeof(struct kevent));

    // Camp here for event
    struct timespec timeout;
    timeout.tv_sec = 0;
    timeout.tv_nsec = 1;
    int return_value = kevent(m_kq, NULL, 0, m_ke, 1, &timeout);
    if (return_value < 0)
    {
      perror("kevent");
    }
    else if (return_value == 0)
    {
      // timeout
    }
    else
    {
      switch (m_ke->filter)
      {
      /* File descriptor event: let's examine what happened to the file */
      case EVFILT_VNODE:
        Logging::DEBUG("Events " + std::to_string(m_ke->fflags) + " on file descriptor " + std::to_string(m_ke->ident), m_name);

        if (m_ke->fflags & NOTE_DELETE)
        {
          Logging::DEBUG("The unlink() system call was called on the file referenced by the descriptor", m_name);
        }
        if (m_ke->fflags & NOTE_WRITE)
        {
          Logging::DEBUG("A write occurred on the file referenced by the descriptor", m_name);
          std::set<std::string> current = list_files();

          std::set<std::string> added;
          // In the end, the set 'added' will contain the current-m_last_files.
          std::set_difference(current.begin(), current.end(), m_last_files.begin(), m_last_files.end(), std::inserter(added, added.end()));
          for (const std::string &f : added)
          {
            if (should_add_file(f, false))
            {
              Logging::INFO("Adding file '" + f + "'", m_name);
              m_file_paths.emplace_back(f);
            }
          }

          std::set<std::string> removed;
          // In the end, the set 'removed' will contain the m_last_files-current.
          std::set_difference(m_last_files.begin(), m_last_files.end(), current.begin(), current.end(), std::inserter(removed, removed.end()));
          for (const std::string &f : removed)
          {
            Logging::INFO("File '" + f + "' removed from directory", m_name);
          }

          m_last_files = current;
        }
        if (m_ke->fflags & NOTE_EXTEND)
        {
          Logging::INFO("The file referenced by the descriptor was extended", m_name);
        }
        if (m_ke->fflags & NOTE_ATTRIB)
        {
          Logging::INFO("The file referenced by the descriptor had its attributes changed", m_name);
        }
        if (m_ke->fflags & NOTE_LINK)
        {
          Logging::INFO("The link count on the file changed", m_name);
        }
        if (m_ke->fflags & NOTE_RENAME)
        {
          Logging::INFO("The file referenced by the descriptor was renamed", m_name);
        }
        if (m_ke->fflags & NOTE_REVOKE)
        {
          Logging::INFO("Access to the file was revoked via revoke(2) or the underlying fileystem was unmounted", m_name);
        }
        break;

      /* This should never happen */
      default:
        Logging::ERROR("Unknown filter", m_name);
      }
    }

    if (m_file_paths.empty())
    {
      return PollResult("");
    }
    else
    {
      std::string p = m_file_paths.back();
      m_file_paths.pop_back();
      return PollResult(p);
    }
  }
}

AbstractPoller *DirectoryPoller::clone() const
{
  return new DirectoryPoller(*this);
}

DirectoryPollerBuilder DirectoryPoller::builder(std::string name)
{
  return DirectoryPollerBuilder(name);
}

void DirectoryPoller::clean()
{
  Logging::INFO("Cleaning up", m_name);
}

DirectoryPoller::~DirectoryPoller()
{
  if (fcntl(m_kq, F_GETFD) != -1)
  {
    close(m_kq);
  }
}
#endif

The primary concept is that a client has the capability to invoke the poll() function, obtaining a new file path encapsulated within a PollResult object, if one is available.

DirectoryPollerBuilder

If you’re curious about the implementation of the DirectoryPollerBuilder, here is the associated code:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#ifndef DIRECTORY_POLLER_BUILDER_H
#define DIRECTORY_POLLER_BUILDER_H

#ifdef __linux__
#include "DirectoryPoller.h"
#elif __APPLE__
#include "DirectoryPollerMacOS.h"
#endif

#include <string>

class DirectoryPollerBuilder
{
private:
    std::string m_name;
    std::string m_dir_to_watch;
    std::shared_ptr<SignalChannel> m_sig_channel;

public:
    DirectoryPollerBuilder(std::string name);
    DirectoryPollerBuilder &with_directory(std::string p);
    DirectoryPollerBuilder &with_sig_channel(std::shared_ptr<SignalChannel> sc);
    DirectoryPoller build();
};

#endif
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#include "DirectoryPollerBuilder.h"
#include "DirectoryPoller.h"

DirectoryPollerBuilder::DirectoryPollerBuilder(std::string name) : m_name(name)
{
}

DirectoryPollerBuilder &DirectoryPollerBuilder::with_directory(std::string p)
{
    m_dir_to_watch = p;
    return *this;
}

DirectoryPollerBuilder &DirectoryPollerBuilder::with_sig_channel(std::shared_ptr<SignalChannel> sc)
{
    m_sig_channel = sc;
    return *this;
}

DirectoryPoller DirectoryPollerBuilder::build()
{

    if (m_dir_to_watch.empty())
    {
        throw std::runtime_error("No directory must be provided");
    }

    if (!m_sig_channel)
    {
        throw std::runtime_error("No signal channel provided");
    }

    DirectoryPoller poller = DirectoryPoller(m_name, m_dir_to_watch, m_sig_channel);

    return poller;
}

The poller can then be used as follows:

1
2
3
4
5
6
7
8
9
  DirectoryPoller poller = DirectoryPoller::builder("DirectoryPoller")
                               .with_directory(dir_to_watch)
                               .with_sig_channel(sig_channel)
                               .build();

...
std::unique_ptr<std::thread> m_t = std::make_unique<std::thread>(&AbstractPoller::run, &poller);
...
// Possibly join()

AbstractPoller & AbstractWorker

AbstractPoller essentially constitutes an abstract class delineating the foundational polling interface for the poller:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#ifndef ABSTRACT_POLLER_H
#define ABSTRACT_POLLER_H

#include "AbstractWorker.h"
#include "impl/PollResult.h"

class AbstractPoller : public AbstractWorker
{
private:
  void step() override;
  virtual PollResult poll() = 0;
  virtual void clean() = 0;

public:
  AbstractPoller(std::string name, std::shared_ptr<SignalChannel> sig_channel);
  virtual ~AbstractPoller(){};
  virtual AbstractPoller *clone() const = 0;
};

#endif
1
2
3
4
5
6
7
8
9
10
11
12
13
14
#include "AbstractPoller.h"

AbstractPoller::AbstractPoller(std::string name, std::shared_ptr<SignalChannel> sig_channel) : AbstractWorker(name, sig_channel)
{
}

void AbstractPoller::step()
{
  PollResult r = poll();
  if (!r.empty())
  {
    m_queue->enqueue(r);
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#ifndef ABSTRACT_WORKER_H
#define ABSTRACT_WORKER_H

#include "SafeQueue.h"
#include "impl/PollResult.h"
#include "SignalChannel.h"
#include <string>

class AbstractWorker
{
public:
  AbstractWorker(std::string name, std::shared_ptr<SignalChannel> sig_channel) : m_name(name), m_sig_channel(sig_channel){};
  void set_queue(SafeQueue<PollResult> *queue_);
  void run();
  ~AbstractWorker(){};

protected:
  SafeQueue<PollResult> *m_queue;
  const std::string m_name;

private:
  std::shared_ptr<SignalChannel> m_sig_channel;
  virtual void step() = 0;
  virtual void clean() = 0;
};

#endif
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#include "AbstractWorker.h"
#include "logging/Logging.h"

void AbstractWorker::set_queue(SafeQueue<PollResult> *queue) { m_queue = queue; }

void AbstractWorker::run()
{
  while (!m_sig_channel->m_shutdown_requested.load())
  {
    {
      std::unique_lock shutdown_lock(m_sig_channel->m_cv_mutex);
      m_sig_channel->m_cv.wait_for(shutdown_lock, std::chrono::milliseconds(10), [this]()
                                   { bool should_shutdown = m_sig_channel->m_shutdown_requested.load();
                                     return should_shutdown; });
    }

    step();
  }
  Logging::INFO("Shutting down", m_name);
  clean();
}

SignalChannel

SignalChannel functions as a shared variable across threads, facilitating the communication of notifications to the poller, prompting it to halt the polling process:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#ifndef SIGNAL_CHANNEL_H
#define SIGNAL_CHANNEL_H

#include <atomic>
#include <condition_variable>

class SignalChannel
{
public:
    std::atomic<bool> m_shutdown_requested = false;
    std::mutex m_cv_mutex;
    std::condition_variable m_cv;
};

#endif

Logging

Finally, Logging represents a logging utility utilized in my personal projects. A detailed discussion on this utility will be presented in a subsequent post.

This post is licensed under CC BY 4.0 by the author.