External Disk-Based Sorting and Resampling of CSV Files in C++
External Disk-Based Sorting and Resampling of CSV Files in C++
In this post, I’ll walk you through a straightforward project I developed to address a work-related challenge. The motivation behind it was a large CSV file where each row represents an event with an associated timestamp. These events could be anything from user location tracking to stock tick updates. My challenge was the high density of events occurring within short time intervals. To manage this, I needed to resample the CSV to retain only one event per specified time window, say every 5 minutes. This approach significantly reduces the number of events and the file size, making the data more manageable and efficient to process
To make the project complete, I’ve incorporated several key features: logging capabilities, multithreading, and helper classes like file pollers that monitor and read files as soon as they appear in the specified directory. I won’t delve into each of these components here, as the code is written in C++—making it readable and largely self-explanatory. Additionally, some of these components are discussed in detail in previous posts. Here, I’ll primarily focus on the sorting and resampling logic. The full project, including tests, benchmarking functions, and more, is available in the GitHub repository linked above.
Let’s begin with the main()
method, which serves as the central hub, initializing the various components and monitoring for new files. As soon as a file is placed in the specified directory, the program detects it, reads it, performs the resampling, and outputs it as a new file.
To keep this post concise and focused on the core functionality, I’ve omitted some instantiation methods, include statements, and helper functions. For those interested, the complete code is available in the GitHub repository.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
int main(int argc, char **argv)
{
/*************************************************************************
*
* DIRECTORY WATCHER
*
*************************************************************************/
std::shared_ptr<SafeQueue<PollResult>> files_to_sort_queue = std::make_shared<SafeQueue<PollResult>>();
DirectoryPoller dir_poller = DirectoryPoller::builder("DirectoryPoller")
.with_directory(directory[0])
.with_sig_channel(sig_channel)
.build();
dir_poller.set_queue(files_to_sort_queue);
PollerBridge dir_poller_bridge(dir_poller);
dir_poller_bridge.start();
/*************************************************************************
*
* MAIN LOOP
*
*************************************************************************/
std::vector<std::string> files;
while (true)
{
PollResult r;
files_to_sort_queue->dequeue_with_timeout(1000, r);
if (!r.empty())
{
std::string file_path = r.get();
files.emplace_back(file_path);
if (files.size() == 10)
{
Sorter s(sig_channel);
const auto &sorted_file_path = s.sort(files);
files.clear();
const auto resampled_file_path = s.resample_and_write(15, sorted_file_path);
std::remove(sorted_file_path.c_str());
std::string new_resampled_file_path = resampled_file_path + ".csv";
std::rename(resampled_file_path.c_str(), new_resampled_file_path.c_str());
std::cout << "Resampled to:" << new_resampled_file_path << std::endl;
}
}
if (should_exit(sig_channel))
{
break;
}
}
dir_poller_bridge.join();
return 0;
}
The DirectoryPoller
is available both as a Mac OS version that uses kevent and a Linux version that works with inotify. The Mac OS X implementation of the directory poller is described in great detail in this previous post. The term “polling” can be somewhat misleading. The module doesn’t continuously check the directory for new files. Such an approach would consume unnecessary CPU cycles and require excessive computational power. While continuous polling could be a valid implementation in certain scenarios—such as when inotify is not supported, for example, in mounted directories—this was not a concern in my use case.
The above code essentially performs the following steps: It initiates a file poller in a separate thread that monitors a specified directory for new files. When a new file becomes available, its path is added to a queue. The main loop continuously monitors this queue and processes the file paths in batches of 10. The first step in processing is to sort the file rows externally based on their timestamp. This is necessary because we’re dealing with potentially large files that cannot be loaded entirely into memory, which is why an external merge sort is employed. Once the file is sorted, we iterate through the sorted data and retain only the rows that fall within the predefined time window.
Let’s move on to the core sorting and resampling process, starting with the sorter interface.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
#ifndef WORKER_H
#define WORKER_H
#include <string>
#include <memory>
#include <thread>
#include <vector>
#include "SafeQueue.h"
#include "SignalChannel.h"
#include "CSV.h"
class Compare
{
public:
bool is_smaller(const Row &ra, const Row &rb, const std::vector<std::string> &attr, size_t i);
// Ascending order sort
bool operator()(std::pair<int, std::string> line_a, std::pair<int, std::string> line_b);
};
class Sorter
{
private:
std::string m_name;
std::shared_ptr<SignalChannel> m_sig_channel;
private:
void write_chunk(std::string file_path, const std::vector<std::string> &lines);
std::vector<std::string> split_file_in_chunks_of_size(int mb, std::string file_path);
void merge_sort(const std::vector<std::string> &sorted_chunk_paths, const std::string &result_path);
std::string external_sort(const std::string &file_path);
std::string process(const std::string &file_path);
bool check_exit();
public:
Sorter(std::shared_ptr<SignalChannel> sig_channel);
std::string sort(std::vector<std::string> files);
std::string resample_and_write(size_t minutes, const std::string &file_path);
};
#endif
There are several method declarations, but I won’t cover every detail. Let’s go through the main methods one by one to explore what’s implemented under the hood.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
std::string Sorter::sort(std::vector<std::string> files)
{
std::vector<std::string> sorted_files;
for (const auto &f : files)
{
if (CSV_COLUMNS.empty())
{
CSV_COLUMNS = CSV::read_header(f);
}
auto f_sorted = external_sort(f);
sorted_files.emplace_back(f_sorted);
}
std::string min = *std::min_element(std::begin(files), std::end(files));
std::string max = *std::max_element(std::begin(files), std::end(files));
std::string result_path = Util::remove_extension(min) + "-" + Util::base_name(max);
merge_sort(sorted_files, result_path);
return result_path;
}
This method takes a batch of files to sort and merge, assuming they follow a specific naming convention. Each file is first sorted individually, then merged into a single output file. The output file is named using the lexicographically smallest and largest names in the batch, separated by a dash. This approach provides a clear view of the data range and original files included in the result.
Each file is sorted using an external sorting algorithm, which is designed for handling large datasets that exceed main memory capacity. External sorting is essential when data must be stored in slower external memory, such as disk drives, as it allows efficient sorting without loading all data into RAM.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
std::string Sorter::process(const std::string &file_path)
{
std::string sorted_chunk_path = file_path + "_s";
std::stringstream ss;
ss << "Sorting chunk file '"
<< file_path
<< "' and writing to '"
<< sorted_chunk_path
<< "'"
<< std::endl;
Logging::INFO(ss.str(), m_name);
CSV csv(file_path);
csv.sort_in_memory_and_write(COLUMNS_TO_SORT, sorted_chunk_path);
return sorted_chunk_path;
}
std::string Sorter::external_sort(const std::string &file_path)
{
/*
1. Put all paths into queue
2. Start n fixed number of worker threads
3. Sort files in parallel
*/
Logging::INFO("External sorting file '" + file_path + "'", m_name);
auto tmp_file_paths = split_file_in_chunks_of_size(MAX_CHUNK_SIZE_MB, file_path);
Logging::INFO("Split into " + std::to_string(tmp_file_paths.size()) + " chunks", m_name);
std::vector<std::future<std::string>> sort_results;
for (const auto &chunk_path : tmp_file_paths)
{
std::future<std::string> chunk_fut = std::async(std::launch::async,
[chunk_path, this]()
{
return this->process(chunk_path);
});
sort_results.push_back(std::move(chunk_fut));
}
// Wait for result
std::vector<std::string> sorted_chunk_file_paths;
for (auto &f : sort_results)
{
sorted_chunk_file_paths.emplace_back(f.get());
}
std::string result_path = file_path + "_s";
if (sorted_chunk_file_paths.size() == tmp_file_paths.size())
{
merge_sort(sorted_chunk_file_paths, result_path);
}
// Clean up tmp files
for (int i = 0; i < tmp_file_paths.size(); i++)
{
std::remove(tmp_file_paths[i].c_str());
}
return result_path;
}
The code above processes a single file by first dividing it into smaller chunks, which are then sorted in parallel by separate threads in memory. Since each chunk is small, in-memory sorting is efficient and accelerates the process. The code waits for each thread to complete using std::future. Once all threads have finished, the chunks are merged using a merge sort, and the path to the resulting file is returned.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
void Sorter::merge_sort(const std::vector<std::string> &sorted_chunk_paths, const std::string &result_path)
{
Logging::INFO("Merge sorting " + std::to_string(sorted_chunk_paths.size()) + " chunks to '" + result_path + "'", m_name);
std::vector<std::shared_ptr<std::ifstream>> streams;
for (const auto &file_path : sorted_chunk_paths)
{
std::shared_ptr<std::ifstream> in = std::make_shared<std::ifstream>(file_path);
streams.push_back(in);
}
// Skip header first so we don't mess up our sort
for (size_t i = 0; i < streams.size(); ++i)
{
std::shared_ptr<std::ifstream> in = streams[i];
std::string line;
std::getline(*in, line);
}
std::priority_queue<std::pair<int, std::string>, std::vector<std::pair<int, std::string>>, Compare> min_heap;
for (size_t i = 0; i < streams.size(); ++i)
{
std::shared_ptr<std::ifstream> in = streams[i];
std::string line;
std::getline(*in, line);
if (!in->bad() && !in->fail())
{
min_heap.push(std::pair<int, std::string>(i, line));
}
}
std::ofstream file(result_path);
// Write header
std::string sep = "";
for (const auto &c : COLUMNS_TO_SORT)
{
file << sep << c;
sep.assign(",");
}
file << std::endl;
flush(file);
while (min_heap.size() > 0)
{
std::pair<int, std::string> min_pair = min_heap.top();
min_heap.pop();
if (file.is_open())
{
file << min_pair.second << std::endl;
}
flush(file);
std::string next_line;
std::getline(*streams[min_pair.first], next_line);
if (!streams[min_pair.first]->bad() && !streams[min_pair.first]->fail())
{
min_heap.push(std::pair<int, std::string>(min_pair.first, next_line));
}
}
// clean up
for (int i = 0; i < streams.size(); i++)
{
streams[i]->close();
std::remove(sorted_chunk_paths[i].c_str());
}
file.close();
}
merge_sort()
takes a list of file paths and merges them row by row. For each file, it opens a new input stream. After skipping each file’s header, it reads a single row from each stream and pushes these rows into a priority queue. The priority queue is configured with the Compare comparator class (defined earlier) to prioritize rows based on a specific column value—such as the id and timestamp—so rows are sorted by event ID and time.
In each iteration, the minimum row is popped from the queue, appended to the target file, and then a new row is read from the corresponding file and added back into the queue. This process repeats until the priority queue is empty and no more rows are left to be read from the individual streams. At that point, all streams and the destination file are closed.
Now that we’ve sorted and merged the files, it’s time to resample the resulting file based on the timestamp value in each row.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
/**
* @brief Assumes file is already sorted
*
* @param minutes
* @param file_path
* @return true
* @return false
*/
std::string Sorter::resample_and_write(size_t minutes, const std::string &file_path)
{
std::string result_path = file_path + "_r";
std::ifstream in(file_path);
std::ofstream out(result_path);
std::string line;
// Skip header
std::getline(in, line);
// Write header
out << line << std::endl;
flush(out);
// Attempt to read first line
std::getline(in, line);
if (!in.bad() && !in.fail())
{
out << line << std::endl;
Row row = CSV::convert_to_row(line, COLUMNS_TO_SORT);
std::string current_id = row.at("id");
long current_timestamp = stol(row.at("timestamp"));
while (!in.eof())
{
std::getline(in, line);
if (!in.bad() && !in.fail())
{
Row row = CSV::convert_to_row(line, COLUMNS_TO_SORT);
std::string id = row.at("id");
long timestamp = stol(row.at("timestamp"));
if (id.compare(current_id) != 0)
{
// New ID
out << line << std::endl;
flush(out);
current_id = id;
current_timestamp = timestamp;
}
else
{
if (timestamp > current_timestamp + minutes)
{
out << line << std::endl;
flush(out);
current_id = id;
current_timestamp = timestamp;
}
}
}
}
}
return result_path;
}
The code above is straightforward. Here’s a quick summary: We read the file one row at a time, keeping in mind that the rows are sorted by ID and event time. If a row represents a new event (i.e., its event ID differs from the previously seen ID), we append it to the new destination file. If the ID is the same as the previous one, we compare the event time to the timestamp of the last persisted row. If the time difference exceeds the specified window, we append the row to the target file; otherwise, we ignore it. Once processing is complete, we simply return the file path of the target file to the caller.
That’s all for now. Be sure to check out the repository for the full code.