Рубрики
Uncategorized

2 модули в Графике туманности: планировщик и исполнитель

Возможно, вы узнали оптимизатор двигателя запросов Demange Graph в последней статье. В этом … Tagged с базой данных, DevOps, OpenSource, программированием.

Возможно, вы узнали оптимизатор двигателя запросов Demange Graph в последней статье. В этой статье мы представим, как реализованы планировщик и исполнитель, два последних модуля двигателя запроса.

Обзор

На этапе выполнения механизм выполнения использует планировщик для преобразования плана физического выполнения, сгенерированного планировщиком, в серию исполнителей для управления их выполнением. У каждого планода в плане физического выполнения есть соответствующий исполнитель.

Структура исходных файлов

Исходной код планировщика находится в каталоге SRC/планировщика.

src/scheduler
├── AsyncMsgNotifyBasedScheduler.cpp
├── AsyncMsgNotifyBasedScheduler.h
├── CMakeLists.txt
├── Scheduler.cpp
└── Scheduler.h

Абстрактный класс планировщика определяет общие интерфейсы планировщиков, которые могут наследовать функции от класса для реализации различных типов планировщиков. Был реализован планировщик AsyncMSGNOTIFYBADESCHEDULER. Используя асинхронное сообщение об сообщении и алгоритм поиска в ширину, его можно предотвратить от ошибок переполнения стека. Исходной код исполнителя находится в рамках каталога SRC/Executor.

src/executor
├── admin
├── algo
├── CMakeLists.txt
├── ExecutionError.h
├── Executor.cpp
├── Executor.h
├── logic
├── maintain
├── mutate
├── query
├── StorageAccessExecutor.cpp
├── StorageAccessExecutor.h
└── test

Процесс

Во-первых, планировщик запускает обход всего плана выполнения из своего корневого узла, используя алгоритм поиска в ширине и создает свой механизм уведомления в соответствии с зависимостью между узлами. На этапе выполнения каждый узел должен быть выполнен после того, как будет уведомлен о том, что все узлы, от которых он зависит, были успешно выполнены. Для узла, после выполнения, он уведомит свои зависимые узлы до тех пор, пока весь план не будет выполнен успешно.

void AsyncMsgNotifyBasedScheduler::runExecutor(
    std::vector>&& futures,
    Executor* exe,
    folly::Executor* runner,
    std::vector>&& promises) const {
    folly::collect(futures).via(runner).thenTry(
        [exe, pros = std::move(promises), this](auto&& t) mutable {
            if (t.hasException()) {
                return notifyError(pros, Status::Error(t.exception().what()));
            }
            auto status = std::move(t).value();
            auto depStatus = checkStatus(std::move(status));
            if (!depStatus.ok()) {
                return notifyError(pros, depStatus);
            }
            // Execute in current thread.
            std::move(execute(exe)).thenTry(
                [pros = std::move(pros), this](auto&& exeTry) mutable {
                    if (exeTry.hasException()) {
                        return notifyError(pros, Status::Error(exeTry.exception().what()));
                    }
                    auto exeStatus = std::move(exeTry).value();
                    if (!exeStatus.ok()) {
                        return notifyError(pros, exeStatus);
                    }
                    return notifyOK(pros);
                });
        });
}

Каждый исполнитель проходит четыре этапа: «Создать», «Открыть», «Выполнить», а затем «закрыть».

Создайте

На фазе «Создать» соответствующий исполнитель будет генерироваться в соответствии с типом узла.

открытым

На фазе «открыть» до начала выполнения исполнитель инициализируется, медленные запросы прекращаются, а водяной знак памяти проверяется. При использовании графика туманности вы можете использовать Kill для прекращения запроса, поэтому статус текущего плана выполнения должен быть проверен перед выполнением каждого исполнителя. Если план находится в статусе убитого, исполнение будет прекращено. Перед выполнением каждого исполнителя запроса необходимо проверить, упало ли количество свободной памяти ниже водяного знака. Если водяной знак достигнут, выполнение будет прекращено, что может избежать OOM.

Status Executor::open() {
    if (qctx_->isKilled()) {
        VLOG(1) << "Execution is being killed. session: " << qctx()->rctx()->session()->id()
            << "ep: " << qctx()->plan()->id()
            << "query: " << qctx()->rctx()->query();
        return Status::Error("Execution had been killed");
    }
    auto status = MemInfo::make();
    NG_RETURN_IF_ERROR(status);
    auto mem = std::move(status).value();
    if (node_->isQueryNode() && mem->hitsHighWatermark(FLAGS_system_memory_high_watermark_ratio)) {
        return Status::Error(
            "Used memory(%ldKB) hits the high watermark(%lf) of total system memory(%ldKB).",
            mem->usedInKB(),
            FLAGS_system_memory_high_watermark_ratio,
            mem->totalInKB());
    }
    numRows_ = 0;
    execTime_ = 0;
    totalDuration_.reset();
    return Status::OK();
}

выполнять

Ввод и вывод исполнителя запроса в форме таблиц (набор данных). Выполнение исполнителя основано на модели итератора, что означает, что для каждого вычисления метод итератора входной таблицы вызывается для извлечения строки данных, а затем выполняется расчет. Такой процесс повторяется до тех пор, пока не будет выполнено обход всей входной таблицы. Результаты расчетов построены в новую таблицу и вывод следующему исполнителю в качестве его ввода.

folly::Future ProjectExecutor::execute() {
    SCOPED_TIMER(&execTime_);
    auto* project = asNode(node());
    auto columns = project->columns()->columns();
    auto iter = ectx_->getResult(project->inputVar()).iter();
    DCHECK(!!iter);
    QueryExpressionContext ctx(ectx_);

    VLOG(1) << "input: " << project->inputVar();
    DataSet ds;
    ds.colNames = project->colNames();
    ds.rows.reserve(iter->size());
    for (; iter->valid(); iter->next()) {
        Row row;
        for (auto& col : columns) {
            Value val = col->expr()->eval(ctx(iter.get()));
            row.values.emplace_back(std::move(val));
        }
        ds.rows.emplace_back(std::move(row));
    }
    VLOG(1) << node()->outputVar() << ":" << ds;
    return finish(ResultBuilder().value(Value(std::move(ds))).finish());
}

Если входная таблица текущего исполнителя не может использоваться другими исполнителями в качестве их ввода, память, занятая таблицей, будет отброшена на этапе выполнения, чтобы уменьшить использование памяти.

void Executor::drop() {
    for (const auto &inputVar : node()->inputVars()) {
        if (inputVar != nullptr) {
            // Make sure use the variable happened-before decrement count
            if (inputVar->userCount.fetch_sub(1, std::memory_order_release) == 1) {
                // Make sure drop happened-after count decrement
                CHECK_EQ(inputVar->userCount.load(std::memory_order_acquire), 0);
                ectx_->dropResult(inputVar->name);
                VLOG(1) << "Drop variable " << node()->outputVar();
            }
        }
    }
}

Закрыть

После того, как выполнение исполнителя будет выполнено, некоторая собранная информация о выполнении, такая как время выполнения и количество строк в таблице вывода, добавляется к статистике профилирования. Вы можете запустить оператор профиля, а затем просмотреть статистику в возвращенном результате.

Execution Plan (optimize time 141 us)

-----+------------------+--------------+-----------------------------------------------------+--------------------------------------
| id | name             | dependencies | profiling data                                      | operator info                       |
-----+------------------+--------------+-----------------------------------------------------+--------------------------------------
|  2 | Project          | 3            | ver: 0, rows: 56, execTime: 147us, totalTime: 160us | outputVar: [                        |
|    |                  |              |                                                     |   {                                 |
|    |                  |              |                                                     |     "colNames": [                   |
|    |                  |              |                                                     |       "VertexID",                   |
|    |                  |              |                                                     |       "player.age"                  |
|    |                  |              |                                                     |     ],                              |
|    |                  |              |                                                     |     "name": "__Project_2",          |
|    |                  |              |                                                     |     "type": "DATASET"               |
|    |                  |              |                                                     |   }                                 |
|    |                  |              |                                                     | ]                                   |
|    |                  |              |                                                     | inputVar: __TagIndexFullScan_1      |
|    |                  |              |                                                     | columns: [                          |
|    |                  |              |                                                     |   "$-.VertexID AS VertexID",        |
|    |                  |              |                                                     |   "player.age"                      |
|    |                  |              |                                                     | ]                                   |
----------+------------------+--------------+-----------------------------------------------------+--------------------------------------
|  3 | TagIndexFullScan | 0            | ver: 0, rows: 56, execTime: 0us, totalTime: 6863us  | outputVar: [                        |
|    |                  |              |                                                     |   {                                 |
|    |                  |              |                                                     |     "colNames": [                   |
|    |                  |              |                                                     |       "VertexID",                   |
|    |                  |              |                                                     |       "player.age"                  |
|    |                  |              |                                                     |     ],                              |
|    |                  |              |                                                     |     "name": "__TagIndexFullScan_1", |
|    |                  |              |                                                     |     "type": "DATASET"               |
|    |                  |              |                                                     |   }                                 |
|    |                  |              |                                                     | ]                                   |
|    |                  |              |                                                     | inputVar:                           |
|    |                  |              |                                                     | space: 318                          |
|    |                  |              |                                                     | dedup: false                        |
|    |                  |              |                                                     | limit: 9223372036854775807          |
|    |                  |              |                                                     | filter:                             |
|    |                  |              |                                                     | orderBy: []                         |
|    |                  |              |                                                     | schemaId: 319                       |
|    |                  |              |                                                     | isEdge: false                       |
|    |                  |              |                                                     | returnCols: [                       |
|    |                  |              |                                                     |   "_vid",                           |
|    |                  |              |                                                     |   "age"                             |
|    |                  |              |                                                     | ]                                   |
|    |                  |              |                                                     | indexCtx: [                         |
|    |                  |              |                                                     |   {                                 |
|    |                  |              |                                                     |     "columnHints": [],              |
|    |                  |              |                                                     |     "index_id": 325,                |
|    |                  |              |                                                     |     "filter": ""                    |
|    |                  |              |                                                     |   }                                 |
|    |                  |              |                                                     | ]                                   |
----------+------------------+--------------+-----------------------------------------------------+--------------------------------------
|  0 | Start            |              | ver: 0, rows: 0, execTime: 1us, totalTime: 19us     | outputVar: [                        |
|    |                  |              |                                                     |   {                                 |
|    |                  |              |                                                     |     "colNames": [],                 |
|    |                  |              |                                                     |     "type": "DATASET",              |
|    |                  |              |                                                     |     "name": "__Start_0"             |
|    |                  |              |                                                     |   }                                 |
|    |                  |              |                                                     | ]                                   |
----------+------------------+--------------+-----------------------------------------------------+--------------------------------------  

До сих пор объяснение исходного кода двигателя запроса было завершено. В следующий раз мы объясним реализацию некоторых функций графика туманности.

Если вы столкнетесь с какими -либо проблемами в процессе использования графика туманности, пожалуйста, обратитесь к Руководство по базе данных графика туманности Чтобы устранить проблему. Он подробно записывает точки знаний и конкретное использование базы данных графиков и график туманности базы данных графиков.

Оригинал: «https://dev.to/lisahui/2-modules-in-nebula-graph-scheduler-executor-41g»