Продолжаем доработку сервера Zabbix. В прошлый раз мы добавили в библиотеку zbxhistory поддержку возможности использования хранилищ разного типа. На этот раз нужно добавить поддержку хранилища нового типа, которую будем реализовывать на базе поддержки хранилища Elasticsearch, частично заимствуя фрагменты из кода поддержки SQL-хранилищ.
Готовую заплатку для сервера Zabbix с реализацией поддержки хранения исторических данных в ClickHouse можно найти по ссылке zabbix3_4_12_server_storage_per_table.patch.
Ниже описаны внесённые заплаткой доработки и объяснение их логики.
У меня ушло некоторое время на изучение функций для работы со структурами данных в формате JSON. Чтобы не пришлось вспоминать их снова, опишу те из них, которыми я пользовался непосредственно в описываемой заплатке.
Заголовочный файл include/zbxjson.h, файл с реализацией функций - src/libs/zbxjson/json.c
Создание JSON, в котором корневым элементом является словарь. Фактически, в пустой буфер будут добавлены фигурные скобки {}, текущий указатель будет указывать на закрывающую скобку, уровень вложенности увеличится с 0 до 1, а в статусе будет ZBX_JSON_EMPTY.
Работает аналогично zbx_json_init, но корневым элементом структуры JSON будет массив, а в буфер вместо фигурных скобок {} будут вставлены квадратные скобки [].
Очищает буфер от хранящейся в нём структуры JSON. Сама память при этом не освобождается.
Освобождает буфер, который был занят сформированной структурой JSON.
Вставляет в то место буфера, куда указывает текущий указатель:
Текущий указатель передвигается на вставленную закрывающую фигурную скобку, уровень вложенности увеличивается на единицу, в статус записывается ZBX_JSON_EMPTY.
Весь текст, который ранее лежал в буфере после указателя, сдвигается так, что оказывается позади вставленного текста.
Работает аналогично zbx_json_addobject, но вместо фигурных скобок вставляются квадратные [].
Вставляет в то место буфера, куда указывает текущий указатель:
Текущий указатель передвигается на вставленную закрывающую квадратную скобку, уровень вложенности увеличивается на единицу, в статус записывается ZBX_JSON_EMPTY.
Весь текст, который ранее лежал в буфере после указателя, сдвигается так, что оказывается позади вставленного текста.
Вставляет в то место буфера, куда указывает текущий указатель:
Если тип строки type равен ZBX_JSON_TYPE_STRING, то строка string заключается в двойные кавычки. Если string равен NULL, то добавляется строка null без кавычек.
Текущий указатель передвигается на символ, следующий за последним вставленным, уровень вложенности не меняется, в статус записывается ZBX_JSON_COMMA.
Весь текст, который ранее лежал в буфере после указателя, сдвигается так, что оказывается позади вставленного текста.
Вставляет в то место буфера, куда указывает текущий указатель:
Текущий указатель передвигается на символ, следующий за последним вставленным, уровень вложенности не меняется, в статус записывается ZBX_JSON_COMMA.
Весь текст, который ранее лежал в буфере после указателя, сдвигается так, что оказывается позади вставленного текста.
Вставляет в то место буфера, куда указывает текущий указатель:
Текущий указатель передвигается на символ, следующий за последним вставленным, уровень вложенности не меняется, в статус записывается ZBX_JSON_COMMA.
Весь текст, который ранее лежал в буфере после указателя, сдвигается так, что оказывается позади вставленного текста.
Передвигает текущий указатель так, что пропускается закрывающая фигурная } или квадратная скобка ].
Уровень вложенности уменьшается на единицу, статус меняется на ZBX_JSON_COMMA.
buffer - строка с завершающим нулём, содержащая текст JSON.
json - структура с указателями на начало и конец фрагмента JSON в буфере buffer.
Проверяет, что текст в буфере buffer является правильным JSON, инициализирует структуру json. При ошибках возвращает FAIL, в случае успеха - SUCCEED.
json - структура с указателями на начало и конец фрагмента JSON, являющегося массивом.
p - указатель внутри фрагмента json, указывающий на начальный символ элемента массива или словаря.
Ищет следующий элемент массива или словаря и возвращает указатель на его начальный символ. Если указанный в p элемент был последним, возвращает NULL.
json - структура с указателями на начало и конец фрагмента JSON, являющегося словарём.
name - имя ключа в словаре, значение которого нужно найти.
Возвращает указатель на первый символ значения или NULL, если указанного ключа нет в словаре.
p - указатель на открывающую скобку, указывающую на начало фрагмента JSON, который нужно найти.
json - структура с указателями на начало и конец фрагмента JSON, являющегося массивом или словарём.
Ищет в указанном фрагменте JSON открывающую скобку, затем находит парную ей закрывающую скобку и записывает указатели на начало и конец найденного фрагмента в структуру json.
При ошибках возвращает FAIL, при успешном завершении - SUCCEED.
json - структура с указателями на начало и конец фрагмента JSON, являющегося словарём.
name - имя ключа в словаре, значение которого нужно найти.
json_out - фрагмент JSON, являющийся значением ключа name.
Ищет в указанном фрагменте JSON указанный ключ, находит открывающую и закрывающую скобки, в структуру json_out записывает указатели на начало и конец найденного фрагмента.
При ошибках возвращает FAIL, при успешном завершении - SUCCEED.
json - структура с указателями на начало и конец фрагмента JSON, являющегося словарём.
name - имя ключа в словаре, значение которого нужно найти.
string - указатель на указатель на буфер, в который будет помещено найденное значение.
string_alloc - указатель на переменную с размером буфера.
Ищет в указанном фрагменте JSON, являющемся словарём, значение элемента с указанным ключом name. Найденное значение записывается в буфер. Если в буфере не было достаточно места для сохранения найденного значения, функция выделяет под буфер другой фрагмент памяти, обновляет указатель на буфер и его размер.
Немного опережая события, заранее добавим в код Zabbix дополнительную функцию zbx_json_adddbl, которая позже понадобится нам для формирования JSON с данными, вставляемыми в таблицу history с числами с плавающей запятой. Объявление функции добавим в файл include/zbxjson.h, а реализацию функции добавим в файл src/libs/zbxjson/json.c следующим образом:
Index: zabbix-3.4.12-1+buster/include/zbxjson.h =================================================================== --- zabbix-3.4.12-1+buster.orig/include/zbxjson.h +++ zabbix-3.4.12-1+buster/include/zbxjson.h @@ -160,6 +160,7 @@ void zbx_json_addarray(struct zbx_json * void zbx_json_addstring(struct zbx_json *j, const char *name, const char *string, zbx_json_type_t type); void zbx_json_adduint64(struct zbx_json *j, const char *name, zbx_uint64_t value); void zbx_json_addint64(struct zbx_json *j, const char *name, zbx_int64_t value); +void zbx_json_adddbl(struct zbx_json *j, const char *name, double value); int zbx_json_close(struct zbx_json *j); int zbx_json_open(const char *buffer, struct zbx_json_parse *jp); Index: zabbix-3.4.12-1+buster/src/libs/zbxjson/json.c =================================================================== --- zabbix-3.4.12-1+buster.orig/src/libs/zbxjson/json.c +++ zabbix-3.4.12-1+buster/src/libs/zbxjson/json.c @@ -385,6 +385,14 @@ void zbx_json_addint64(struct zbx_json * zbx_json_addstring(j, name, buffer, ZBX_JSON_TYPE_INT); } +void zbx_json_adddbl(struct zbx_json *j, const char *name, double value) +{ + char buffer[MAX_ID_LEN]; + + zbx_snprintf(buffer, sizeof(buffer), ZBX_FS_DBL, value); + zbx_json_addstring(j, name, buffer, ZBX_JSON_TYPE_INT); +} + int zbx_json_close(struct zbx_json *j) { if (1 == j->level)
В файле src/libs/zbxhistory/history.c раскомментируем ранее добавленный нами комментарий с намёком на поддержку ClickHouse:
Index: zabbix-3.4.12-1+buster/src/libs/zbxhistory/history.c =================================================================== --- zabbix-3.4.12-1+buster.orig/src/libs/zbxhistory/history.c +++ zabbix-3.4.12-1+buster/src/libs/zbxhistory/history.c @@ -62,8 +63,8 @@ int zbx_history_init(char **error) { if (elastic_url = zbx_strstartswith(opts[i], "elastic,")) ret = zbx_history_elastic_init(&history_ifaces[i], i, elastic_url, error); - /*else if (clickhouse_url = zbx_strstartswith(opts[i], "clickhouse,")) - ret = zbx_history_clickhouse_init(&history_ifaces[i], i, clickhouse_url, error);*/ + else if (clickhouse_url = zbx_strstartswith(opts[i], "clickhouse,")) + ret = zbx_history_clickhouse_init(&history_ifaces[i], i, clickhouse_url, error); else ret = zbx_history_sql_init(&history_ifaces[i], i, error);
В добавленном коде используется указатель на строку clickhouse_url, добавим его объявление:
Index: zabbix-3.4.12-1+buster/src/libs/zbxhistory/history.c =================================================================== --- zabbix-3.4.12-1+buster.orig/src/libs/zbxhistory/history.c +++ zabbix-3.4.12-1+buster/src/libs/zbxhistory/history.c @@ -50,6 +50,7 @@ int zbx_history_init(char **error) { int i, ret; char *elastic_url; + char *clickhouse_url; const char *opts[] = { CONFIG_HISTORY_STORAGE, CONFIG_HISTORY_STR_STORAGE,
Объявление функции zbx_history_clickhouse_init нужно добавить в заголовочный файл src/libs/zbxhistory/history.h:
Index: zabbix-3.4.12-1+buster/src/libs/zbxhistory/history.h =================================================================== --- zabbix-3.4.12-1+buster.orig/src/libs/zbxhistory/history.h +++ zabbix-3.4.12-1+buster/src/libs/zbxhistory/history.h @@ -49,4 +49,7 @@ int zbx_history_sql_init(zbx_history_ifa /* elastic hist */ int zbx_history_elastic_init(zbx_history_iface_t *hist, unsigned char value_type, const char *url, char **error); +/* ClickHouse hist */ +int zbx_history_clickhouse_init(zbx_history_iface_t *hist, unsigned char value_type, const char *url, char **error); + #endif
Перед дальнейшими действиями скопируем файл src/libs/zbxhistory/history_elastic.c в файл src/libs/zbxhistory/history_clickhouse.c и заменим все упоминания Elasticsearch на ClickHouse, в том числе в отладочных сообщениях, комментариях и именах функций.
Теперь нужно прописать новый файл history_clickhouse.c в Make-файлы src/libs/zbxhistory/Makefile.am и src/libs/zbxhistory/Makefile.in, чтобы они участвовали в процессе сборки:
Index: zabbix-3.4.12-1+buster/src/libs/zbxhistory/Makefile.am =================================================================== --- zabbix-3.4.12-1+buster.orig/src/libs/zbxhistory/Makefile.am +++ zabbix-3.4.12-1+buster/src/libs/zbxhistory/Makefile.am @@ -5,4 +5,5 @@ noinst_LIBRARIES = libzbxhistory.a libzbxhistory_a_SOURCES = \ history.c history.h \ history_sql.c \ - history_elastic.c + history_elastic.c \ + history_clickhouse.c Index: zabbix-3.4.12-1+buster/src/libs/zbxhistory/Makefile.in =================================================================== --- zabbix-3.4.12-1+buster.orig/src/libs/zbxhistory/Makefile.in +++ zabbix-3.4.12-1+buster/src/libs/zbxhistory/Makefile.in @@ -120,7 +120,8 @@ am__v_AR_1 = libzbxhistory_a_AR = $(AR) $(ARFLAGS) libzbxhistory_a_LIBADD = am_libzbxhistory_a_OBJECTS = history.$(OBJEXT) history_sql.$(OBJEXT) \ - history_elastic.$(OBJEXT) + history_elastic.$(OBJEXT) \ + history_clickhouse.$(OBJEXT) libzbxhistory_a_OBJECTS = $(am_libzbxhistory_a_OBJECTS) AM_V_P = $(am__v_P_@AM_V@) am__v_P_ = $(am__v_P_@AM_DEFAULT_V@) @@ -366,7 +367,8 @@ noinst_LIBRARIES = libzbxhistory.a libzbxhistory_a_SOURCES = \ history.c history.h \ history_sql.c \ - history_elastic.c + history_elastic.c \ + history_clickhouse.c all: all-am @@ -417,6 +419,7 @@ distclean-compile: -rm -f *.tab.c @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/history.Po@am__quote@ +@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/history_clickhouse.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/history_elastic.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/history_sql.Po@am__quote@
Получилось два полностью аналогичных по сути модуля поддержки хранилищ с разными именами. Продолжим переделку нового модуля. Сначала пройдусь кратко по мелким изменениям.
В функции clickhouse_writer_flush была удалена обработка сообщений об ошибках Elasticsearch при успешном коде ответа HTTP, т.к. ClickHouse о любых ошибках выполнения запросов всегда сообщает соответствующим кодом статуса HTTP:
@@ -402,19 +401,6 @@ zbx_vector_ptr_append(&retries, msg->easy_handle); curl_multi_remove_handle(writer.handle, msg->easy_handle); } - else if (CURLE_OK == curl_easy_getinfo(msg->easy_handle, CURLINFO_PRIVATE, (char **)&curl_page) - && SUCCEED == clickhouse_is_error_present(&curl_page->page, &error)) - { - zabbix_log(LOG_LEVEL_WARNING, "%s() %s: %s", __function_name, - "cannot send data to ClickHouse", error); - zbx_free(error); - - /* If the error is due to ClickHouse internal problems (for example an index */ - /* became read-only), we put the handle in a retry list and */ - /* remove it from the current execution loop */ - zbx_vector_ptr_append(&retries, msg->easy_handle); - curl_multi_remove_handle(writer.handle, msg->easy_handle); - } } previous = running;
Было удалено макроопределение константы ZBX_IDX_JSON_ALLOCATE, т.к. в коде поддержки ClickHouse оно не использовалось.
В структуре zbx_clickhouse_data_t было удалено поле post_url, т.к. оказалось достаточно уже имеющегося в структуре поля base_url.
Были удалены функции history_value2str и clickhouse_is_error_present (бывшая elastic_is_error_present), т.к. они больше не используются.
Что касается доработок по существу, то они затрагивают функции clickhouse_get_values и clickhouse_add_values. Приведу обе функции полностью в окончательном виде:
/************************************************************************************ * * * Function: clickhouse_get_values * * * * Purpose: gets item history data from history storage * * * * Parameters: hist - [IN] the history storage interface * * itemid - [IN] the itemid * * start - [IN] the period start timestamp * * count - [IN] the number of values to read * * end - [IN] the period end timestamp * * values - [OUT] the item history data values * * * * Return value: SUCCEED - the history data were read successfully * * FAIL - otherwise * * * * Comments: This function reads <count> values from [<start>,<end>] interval or * * all values from the specified interval if count is zero. * * * ************************************************************************************/ static int clickhouse_get_values(zbx_history_iface_t *hist, zbx_uint64_t itemid, int start, int count, int end, zbx_vector_history_record_t *values) { const char *__function_name = "clickhouse_get_values"; zbx_clickhouse_data_t *data = hist->data; size_t sql_alloc = 0, sql_offset; int ret = SUCCEED; CURLcode err; struct curl_slist *curl_headers = NULL; char *sql = NULL, errbuf[CURL_ERROR_SIZE]; const char *p = NULL; struct zbx_json_parse jp, jp_sub, jp_data, jp_item; zbx_history_record_t hr; zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __function_name); if (NULL == (data->handle = curl_easy_init())) { zabbix_log(LOG_LEVEL_ERR, "cannot initialize cURL session"); return FAIL; } if (ITEM_VALUE_TYPE_LOG == hist->value_type) zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, "SELECT clock, ns, value, timestamp, source, severity, logeventid" " FROM %s" " WHERE itemid=" ZBX_FS_UI64, value_type_table[hist->value_type], itemid); else zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, "SELECT clock, ns, value" " FROM %s" " WHERE itemid=" ZBX_FS_UI64, value_type_table[hist->value_type], itemid); if (0 < start) zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, " AND clock>%d", start); if (0 < end) zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, " AND clock<=%d", end); zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, " ORDER BY clock DESC"); if (0 < count) zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, " LIMIT %d", count); zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, " FORMAT JSON"); curl_headers = curl_slist_append(curl_headers, "Content-Type: application/json"); curl_easy_setopt(data->handle, CURLOPT_URL, data->base_url); curl_easy_setopt(data->handle, CURLOPT_POSTFIELDS, sql); curl_easy_setopt(data->handle, CURLOPT_WRITEFUNCTION, curl_write_cb); curl_easy_setopt(data->handle, CURLOPT_WRITEDATA, &page_r); curl_easy_setopt(data->handle, CURLOPT_HTTPHEADER, curl_headers); curl_easy_setopt(data->handle, CURLOPT_FAILONERROR, 1L); curl_easy_setopt(data->handle, CURLOPT_ERRORBUFFER, errbuf); zabbix_log(LOG_LEVEL_DEBUG, "sending query to %s; post data: %s", data->base_url, sql); page_r.offset = 0; *errbuf = '\0'; if (CURLE_OK != (err = curl_easy_perform(data->handle))) { clickhouse_log_error(data->handle, err, errbuf); ret = FAIL; goto out; } zabbix_log(LOG_LEVEL_DEBUG, "received from ClickHouse: %s", page_r.data); zbx_json_open(page_r.data, &jp); zbx_json_brackets_open(jp.start, &jp_sub); zbx_json_brackets_by_name(&jp_sub, "data", &jp_data); while (NULL != (p = zbx_json_next(&jp_data, p))) { if (SUCCEED != zbx_json_brackets_open(p, &jp_item)) continue; if (SUCCEED != history_parse_value(&jp_item, hist->value_type, &hr)) continue; zbx_vector_history_record_append_ptr(values, &hr); } out: clickhouse_close(hist); curl_slist_free_all(curl_headers); zbx_free(sql); zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __function_name); return ret; } /************************************************************************************ * * * Function: clickhouse_add_values * * * * Purpose: sends history data to the storage * * * * Parameters: hist - [IN] the history storage interface * * history - [IN] the history data vector (may have mixed value types) * * * ************************************************************************************/ static int clickhouse_add_values(zbx_history_iface_t *hist, const zbx_vector_ptr_t *history) { const char *__function_name = "clickhouse_add_values"; zbx_clickhouse_data_t *data = hist->data; int i, num = 0; ZBX_DC_HISTORY *h; struct zbx_json json; size_t buf_offset = 0; zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __function_name); if (ITEM_VALUE_TYPE_LOG == hist->value_type) zbx_snprintf_alloc(&data->buf, &data->buf_alloc, &buf_offset, "INSERT INTO %s(itemid, value, timestamp, source, severity, logeventid, clock, ns)" " FORMAT JSONEachRow\n", value_type_table[hist->value_type]); else zbx_snprintf_alloc(&data->buf, &data->buf_alloc, &buf_offset, "INSERT INTO %s(itemid, value, clock, ns) FORMAT JSONEachRow\n", value_type_table[hist->value_type]); for (i = 0; i < history->values_num; i++) { h = (ZBX_DC_HISTORY *)history->values[i]; if (hist->value_type != h->value_type) continue; zbx_json_init(&json, ZBX_JSON_ALLOCATE); zbx_json_adduint64(&json, "itemid", h->itemid); switch (h->value_type) { case ITEM_VALUE_TYPE_STR: case ITEM_VALUE_TYPE_TEXT: zbx_json_addstring(&json, "value", h->value.str, ZBX_JSON_TYPE_STRING); break; case ITEM_VALUE_TYPE_LOG: zbx_json_addstring(&json, "value", h->value.log->value, ZBX_JSON_TYPE_STRING); break; case ITEM_VALUE_TYPE_FLOAT: zbx_json_adddbl(&json, "value", h->value.dbl); break; case ITEM_VALUE_TYPE_UINT64: zbx_json_adduint64(&json, "value", h->value.ui64); break; } if (ITEM_VALUE_TYPE_LOG == h->value_type) { const zbx_log_value_t *log; log = h->value.log; zbx_json_adduint64(&json, "timestamp", log->timestamp); zbx_json_addstring(&json, "source", ZBX_NULL2EMPTY_STR(log->source), ZBX_JSON_TYPE_STRING); zbx_json_adduint64(&json, "severity", log->severity); zbx_json_adduint64(&json, "logeventid", log->logeventid); } zbx_json_adduint64(&json, "clock", h->ts.sec); zbx_json_adduint64(&json, "ns", h->ts.ns); zbx_json_close(&json); zbx_snprintf_alloc(&data->buf, &data->buf_alloc, &buf_offset, "%s\n", json.buffer); zbx_json_free(&json); num++; } if (num > 0) clickhouse_writer_add_iface(hist); zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __function_name); return num; }
При доработке функции clickhouse_get_values массив строковых констант value_type_str был заменён на массив строковых констант value_type_table:
-const char *value_type_str[] = {"dbl", "str", "log", "uint", "text"}; +const char *value_type_table[] = {"history", "history_str", "history_log", "history_uint", "history_text"};
Из всех сделанных изменений отдельно остановлюсь на исправлении одной из ошибок, которая перекочевала в файл history_clickhouse.c из файла history_elastic.c. Не могу скзать, является ли это ошибкой в исходном файле, но в коде поддержки ClickHouse эта проблема проявлялась следующим образом: в журнале сервера Zabbix при попытках вставки новых данных в таблицы истории в файле /var/log/zabbix/zabbix_server.log появлялись ошибки "400 Bad Request", хотя на первый взгляд данные в таблицы всё-таки записывались.
Оказалось, что часть запросов к ClickHouse были попросту пустыми POST-запросами. Более пристальное изучение причин проблемы позволило обнаружить ошибку: при формировании запроса к ClickHouse на вставку данных иногда, при попытке добавить в него очередное значение, данные в буфере попросту очищались. Получившийся пустой запрос и выполнялся, из-за чего ClickHouse периодически сообщал об ошибках, а графики в веб-интерфейсе прерывались.
В функции добавки значений использовался уже распределённый ранее буфер hist->data->buf, но при каждом вызове этой функции считалось, что его размер buf_alloc равен нулю. Вот как это выглядит в исходном модуле history_elastic.c:
static int elastic_add_values(zbx_history_iface_t *hist, const zbx_vector_ptr_t *history) { const char *__function_name = "elastic_add_values"; zbx_elastic_data_t *data = hist->data; int i, num = 0; ZBX_DC_HISTORY *h; struct zbx_json json_idx, json; size_t buf_alloc = 0, buf_offset = 0;
Чтобы устранить ошибку, я решил вынести переменную с размером буфера из тела функции в структуру, содержащую указатель на буфер:
typedef struct { char *base_url; char *buf; + size_t buf_alloc; CURL *handle; } zbx_clickhouse_data_t;
После этой доработки сервер Zabbix, наконец, начал исправно писать данные в ClickHouse.