Index: zabbix-3.4.12-1+buster/src/libs/zbxhistory/Makefile.am =================================================================== --- zabbix-3.4.12-1+buster.orig/src/libs/zbxhistory/Makefile.am +++ zabbix-3.4.12-1+buster/src/libs/zbxhistory/Makefile.am @@ -5,4 +5,5 @@ noinst_LIBRARIES = libzbxhistory.a libzbxhistory_a_SOURCES = \ history.c history.h \ history_sql.c \ - history_elastic.c + history_elastic.c \ + history_clickhouse.c Index: zabbix-3.4.12-1+buster/src/libs/zbxhistory/Makefile.in =================================================================== --- zabbix-3.4.12-1+buster.orig/src/libs/zbxhistory/Makefile.in +++ zabbix-3.4.12-1+buster/src/libs/zbxhistory/Makefile.in @@ -120,7 +120,8 @@ am__v_AR_1 = libzbxhistory_a_AR = $(AR) $(ARFLAGS) libzbxhistory_a_LIBADD = am_libzbxhistory_a_OBJECTS = history.$(OBJEXT) history_sql.$(OBJEXT) \ - history_elastic.$(OBJEXT) + history_elastic.$(OBJEXT) \ + history_clickhouse.$(OBJEXT) libzbxhistory_a_OBJECTS = $(am_libzbxhistory_a_OBJECTS) AM_V_P = $(am__v_P_@AM_V@) am__v_P_ = $(am__v_P_@AM_DEFAULT_V@) @@ -366,7 +367,8 @@ noinst_LIBRARIES = libzbxhistory.a libzbxhistory_a_SOURCES = \ history.c history.h \ history_sql.c \ - history_elastic.c + history_elastic.c \ + history_clickhouse.c all: all-am @@ -417,6 +419,7 @@ distclean-compile: -rm -f *.tab.c @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/history.Po@am__quote@ +@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/history_clickhouse.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/history_elastic.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/history_sql.Po@am__quote@ Index: zabbix-3.4.12-1+buster/src/libs/zbxhistory/history.c =================================================================== --- zabbix-3.4.12-1+buster.orig/src/libs/zbxhistory/history.c +++ zabbix-3.4.12-1+buster/src/libs/zbxhistory/history.c @@ -50,6 +50,7 @@ int zbx_history_init(char **error) { int i, ret; char *elastic_url; + char *clickhouse_url; const char *opts[] = { CONFIG_HISTORY_STORAGE, CONFIG_HISTORY_STR_STORAGE, @@ -62,8 +63,8 @@ int zbx_history_init(char **error) { if (elastic_url = zbx_strstartswith(opts[i], "elastic,")) ret = zbx_history_elastic_init(&history_ifaces[i], i, elastic_url, error); - /*else if (clickhouse_url = zbx_strstartswith(opts[i], "clickhouse,")) - ret = zbx_history_clickhouse_init(&history_ifaces[i], i, clickhouse_url, error);*/ + else if (clickhouse_url = zbx_strstartswith(opts[i], "clickhouse,")) + ret = zbx_history_clickhouse_init(&history_ifaces[i], i, clickhouse_url, error); else ret = zbx_history_sql_init(&history_ifaces[i], i, error); Index: zabbix-3.4.12-1+buster/src/libs/zbxhistory/history.h =================================================================== --- zabbix-3.4.12-1+buster.orig/src/libs/zbxhistory/history.h +++ zabbix-3.4.12-1+buster/src/libs/zbxhistory/history.h @@ -49,4 +49,7 @@ int zbx_history_sql_init(zbx_history_ifa /* elastic hist */ int zbx_history_elastic_init(zbx_history_iface_t *hist, unsigned char value_type, const char *url, char **error); +/* ClickHouse hist */ +int zbx_history_clickhouse_init(zbx_history_iface_t *hist, unsigned char value_type, const char *url, char **error); + #endif Index: zabbix-3.4.12-1+buster/src/libs/zbxhistory/history_clickhouse.c =================================================================== --- /dev/null +++ zabbix-3.4.12-1+buster/src/libs/zbxhistory/history_clickhouse.c @@ -0,0 +1,742 @@ +/* +** Zabbix +** Copyright (C) 2001-2018 Zabbix SIA +** Copyright (C) 2019 Vladimir Stupin +** +** This program is free software; you can redistribute it and/or modify +** it under the terms of the GNU General Public License as published by +** the Free Software Foundation; either version 2 of the License, or +** (at your option) any later version. +** +** This program is distributed in the hope that it will be useful, +** but WITHOUT ANY WARRANTY; without even the implied warranty of +** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +** GNU General Public License for more details. +** +** You should have received a copy of the GNU General Public License +** along with this program; if not, write to the Free Software +** Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +**/ + +#include "common.h" +#include "log.h" +#include "zbxjson.h" +#include "zbxalgo.h" +#include "dbcache.h" +#include "zbxhistory.h" +#include "zbxself.h" +#include "history.h" + +/* curl_multi_wait() is supported starting with version 7.28.0 (0x071c00) */ +#if defined(HAVE_LIBCURL) && LIBCURL_VERSION_NUM >= 0x071c00 + +#define ZBX_HISTORY_STORAGE_DOWN 10000 /* Timeout in milliseconds */ + +#define ZBX_JSON_ALLOCATE 2048 + + +const char *value_type_table[] = {"history", "history_str", "history_log", "history_uint", "history_text"}; + +typedef struct +{ + char *base_url; + char *buf; + size_t buf_alloc; + CURL *handle; +} +zbx_clickhouse_data_t; + +typedef struct +{ + unsigned char initialized; + zbx_vector_ptr_t ifaces; + + CURLM *handle; +} +zbx_clickhouse_writer_t; + +static zbx_clickhouse_writer_t writer; + +typedef struct +{ + char *data; + size_t alloc; + size_t offset; +} +zbx_httppage_t; + +static zbx_httppage_t page_r; + +typedef struct +{ + zbx_httppage_t page; + char errbuf[CURL_ERROR_SIZE]; +} +zbx_curlpage_t; + +static zbx_curlpage_t page_w[ITEM_VALUE_TYPE_MAX]; + +static size_t curl_write_cb(void *ptr, size_t size, size_t nmemb, void *userdata) +{ + size_t r_size = size * nmemb; + + zbx_httppage_t *page = (zbx_httppage_t *)userdata; + + zbx_strncpy_alloc(&page->data, &page->alloc, &page->offset, ptr, r_size); + + return r_size; +} + +static history_value_t history_str2value(char *str, unsigned char value_type) +{ + history_value_t value; + + switch (value_type) + { + case ITEM_VALUE_TYPE_LOG: + value.log = zbx_malloc(NULL, sizeof(zbx_log_value_t)); + memset(value.log, 0, sizeof(zbx_log_value_t)); + value.log->value = zbx_strdup(NULL, str); + break; + case ITEM_VALUE_TYPE_STR: + case ITEM_VALUE_TYPE_TEXT: + value.str = zbx_strdup(NULL, str); + break; + case ITEM_VALUE_TYPE_FLOAT: + value.dbl = atof(str); + break; + case ITEM_VALUE_TYPE_UINT64: + ZBX_STR2UINT64(value.ui64, str); + break; + } + + return value; +} + +static int history_parse_value(struct zbx_json_parse *jp, unsigned char value_type, zbx_history_record_t *hr) +{ + char *value = NULL; + size_t value_alloc = 0; + int ret = FAIL; + + if (SUCCEED != zbx_json_value_by_name_dyn(jp, "clock", &value, &value_alloc)) + goto out; + + hr->timestamp.sec = atoi(value); + + if (SUCCEED != zbx_json_value_by_name_dyn(jp, "ns", &value, &value_alloc)) + goto out; + + hr->timestamp.ns = atoi(value); + + if (SUCCEED != zbx_json_value_by_name_dyn(jp, "value", &value, &value_alloc)) + goto out; + + hr->value = history_str2value(value, value_type); + + if (ITEM_VALUE_TYPE_LOG == value_type) + { + + if (SUCCEED != zbx_json_value_by_name_dyn(jp, "timestamp", &value, &value_alloc)) + goto out; + + hr->value.log->timestamp = atoi(value); + + if (SUCCEED != zbx_json_value_by_name_dyn(jp, "logeventid", &value, &value_alloc)) + goto out; + + hr->value.log->logeventid = atoi(value); + + if (SUCCEED != zbx_json_value_by_name_dyn(jp, "severity", &value, &value_alloc)) + goto out; + + hr->value.log->severity = atoi(value); + + if (SUCCEED != zbx_json_value_by_name_dyn(jp, "source", &value, &value_alloc)) + goto out; + + hr->value.log->source = zbx_strdup(NULL, value); + } + + ret = SUCCEED; + +out: + zbx_free(value); + + return ret; +} + +static void clickhouse_log_error(CURL *handle, CURLcode error, const char *errbuf) +{ + long http_code; + + if (CURLE_HTTP_RETURNED_ERROR == error) + { + curl_easy_getinfo(handle, CURLINFO_RESPONSE_CODE, &http_code); + + if (0 != page_r.offset) + { + zabbix_log(LOG_LEVEL_ERR, "cannot get values from ClickHouse, HTTP error: %ld,", + " message: %s", http_code, page_r.data); + } + else + zabbix_log(LOG_LEVEL_ERR, "cannot get values from ClickHouse, HTTP error: %ld", http_code); + } + else + { + zabbix_log(LOG_LEVEL_ERR, "cannot get values from ClickHouse: %s", + '\0' != *errbuf ? errbuf : curl_easy_strerror(error)); + } +} + +/************************************************************************************ + * * + * Function: clickhouse_close * + * * + * Purpose: closes connection and releases allocated resources * + * * + * Parameters: hist - [IN] the history storage interface * + * * + ************************************************************************************/ +static void clickhouse_close(zbx_history_iface_t *hist) +{ + zbx_clickhouse_data_t *data = hist->data; + + zbx_free(data->buf); + data->buf_alloc = 0; + + if (NULL != data->handle) + { + if (NULL != writer.handle) + curl_multi_remove_handle(writer.handle, data->handle); + + curl_easy_cleanup(data->handle); + data->handle = NULL; + } +} + +/************************************************************************************ + * * + * Function: clickhouse_writer_init * + * * + * Purpose: initializes ClickHouse writer for a new batch of history values * + * * + ************************************************************************************/ +static void clickhouse_writer_init(void) +{ + if (0 != writer.initialized) + return; + + zbx_vector_ptr_create(&writer.ifaces); + + if (NULL == (writer.handle = curl_multi_init())) + { + zbx_error("Cannot initialize cURL multi session"); + exit(EXIT_FAILURE); + } + + writer.initialized = 1; +} + +/************************************************************************************ + * * + * Function: clickhouse_writer_release * + * * + * Purpose: releases initialized ClickHouse writer by freeing allocated resources * + * and setting its state to uninitialized. * + * * + ************************************************************************************/ +static void clickhouse_writer_release(void) +{ + int i; + + for (i = 0; i < writer.ifaces.values_num; i++) + clickhouse_close(writer.ifaces.values[i]); + + curl_multi_cleanup(writer.handle); + writer.handle = NULL; + + zbx_vector_ptr_destroy(&writer.ifaces); + + writer.initialized = 0; +} + +/************************************************************************************ + * * + * Function: clickhouse_writer_add_iface * + * * + * Purpose: adds history storage interface to be flushed later * + * * + * Parameters: db_insert - [IN] bulk insert data * + * * + ************************************************************************************/ +static void clickhouse_writer_add_iface(zbx_history_iface_t *hist) +{ + zbx_clickhouse_data_t *data = hist->data; + + clickhouse_writer_init(); + + if (NULL == (data->handle = curl_easy_init())) + { + zabbix_log(LOG_LEVEL_ERR, "cannot initialize cURL session"); + return; + } + curl_easy_setopt(data->handle, CURLOPT_URL, data->base_url); + curl_easy_setopt(data->handle, CURLOPT_POST, 1L); + curl_easy_setopt(data->handle, CURLOPT_POSTFIELDS, data->buf); + curl_easy_setopt(data->handle, CURLOPT_WRITEFUNCTION, curl_write_cb); + curl_easy_setopt(data->handle, CURLOPT_WRITEDATA, &page_w[hist->value_type].page); + curl_easy_setopt(data->handle, CURLOPT_FAILONERROR, 1L); + curl_easy_setopt(data->handle, CURLOPT_ERRORBUFFER, page_w[hist->value_type].errbuf); + *page_w[hist->value_type].errbuf = '\0'; + curl_easy_setopt(data->handle, CURLOPT_PRIVATE, &page_w[hist->value_type]); + page_w[hist->value_type].page.offset = 0; + if (0 < page_w[hist->value_type].page.alloc) + *page_w[hist->value_type].page.data = '\0'; + + curl_multi_add_handle(writer.handle, data->handle); + + zbx_vector_ptr_append(&writer.ifaces, hist); +} + +/************************************************************************************ + * * + * Function: clickhouse_writer_flush * + * * + * Purpose: posts historical data to ClickHouse storage * + * * + ************************************************************************************/ +static void clickhouse_writer_flush(void) +{ + const char *__function_name = "clickhouse_writer_flush"; + + struct curl_slist *curl_headers = NULL; + int i, running, previous, msgnum; + CURLMsg *msg; + zbx_vector_ptr_t retries; + + zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __function_name); + + if (0 == writer.initialized) + return; + + zbx_vector_ptr_create(&retries); + + curl_headers = curl_slist_append(curl_headers, "Content-Type: application/x-ndjson"); + + for (i = 0; i < writer.ifaces.values_num; i++) + { + zbx_history_iface_t *hist = (zbx_history_iface_t *)writer.ifaces.values[i]; + zbx_clickhouse_data_t *data = hist->data; + + curl_easy_setopt(data->handle, CURLOPT_HTTPHEADER, curl_headers); + + zabbix_log(LOG_LEVEL_DEBUG, "sending %s", data->buf); + } + +try_again: + previous = 0; + + do + { + int fds; + CURLMcode code; + char *error; + zbx_curlpage_t *curl_page; + + if (CURLM_OK != (code = curl_multi_perform(writer.handle, &running))) + { + zabbix_log(LOG_LEVEL_ERR, "cannot perform on curl multi handle: %s", curl_multi_strerror(code)); + break; + } + + if (CURLM_OK != (code = curl_multi_wait(writer.handle, NULL, 0, ZBX_HISTORY_STORAGE_DOWN, &fds))) + { + zabbix_log(LOG_LEVEL_ERR, "cannot wait on curl multi handle: %s", curl_multi_strerror(code)); + break; + } + + if (previous == running) + continue; + + while (NULL != (msg = curl_multi_info_read(writer.handle, &msgnum))) + { + /* If the error is due to malformed data, there is no sense on re-trying to send. */ + /* That's why we actually check for transport and curl errors separately */ + if (CURLE_HTTP_RETURNED_ERROR == msg->data.result) + { + if (CURLE_OK == curl_easy_getinfo(msg->easy_handle, CURLINFO_PRIVATE, + (char **)&curl_page) && '\0' != *curl_page->errbuf) + { + zabbix_log(LOG_LEVEL_ERR, "%s: %s", + "cannot send data to ClickHouse, HTTP error message", + curl_page->errbuf); + } + else + { + long int err; + + curl_easy_getinfo(msg->easy_handle, CURLINFO_RESPONSE_CODE, &err); + zabbix_log(LOG_LEVEL_ERR, "%s: %d", + "cannot send data to ClickHouse, HTTP error code", err); + } + } + else if (CURLE_OK != msg->data.result) + { + if (CURLE_OK == curl_easy_getinfo(msg->easy_handle, CURLINFO_PRIVATE, + (char **)&curl_page) && '\0' != *curl_page->errbuf) + { + zabbix_log(LOG_LEVEL_WARNING, "%s: %s", "cannot send data to ClickHouse", + curl_page->errbuf); + } + else + { + zabbix_log(LOG_LEVEL_WARNING, "%s: %s", "cannot send data to ClickHouse", + curl_easy_strerror(msg->data.result)); + } + + /* If the error is due to curl internal problems or unrelated */ + /* problems with HTTP, we put the handle in a retry list and */ + /* remove it from the current execution loop */ + zbx_vector_ptr_append(&retries, msg->easy_handle); + curl_multi_remove_handle(writer.handle, msg->easy_handle); + } + } + + previous = running; + } + while (running); + + /* We check if we have handles to retry. If yes, we put them back in the multi */ + /* handle and go to the beginning of the do while() for try sending the data again */ + /* after sleeping for ZBX_HISTORY_STORAGE_DOWN / 1000 (seconds) */ + if (0 < retries.values_num) + { + for (i = 0; i < retries.values_num; i++) + curl_multi_add_handle(writer.handle, retries.values[i]); + + zbx_vector_ptr_clear(&retries); + + sleep(ZBX_HISTORY_STORAGE_DOWN / 1000); + goto try_again; + } + + curl_slist_free_all(curl_headers); + + zbx_vector_ptr_destroy(&retries); + + clickhouse_writer_release(); + + zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __function_name); +} + +/****************************************************************************************************************** + * * + * history interface support * + * * + ******************************************************************************************************************/ + +/************************************************************************************ + * * + * Function: clickhouse_destroy * + * * + * Purpose: destroys history storage interface * + * * + * Parameters: hist - [IN] the history storage interface * + * * + ************************************************************************************/ +static void clickhouse_destroy(zbx_history_iface_t *hist) +{ + zbx_clickhouse_data_t *data = hist->data; + + clickhouse_close(hist); + + zbx_free(data->base_url); + zbx_free(data); +} + +/************************************************************************************ + * * + * Function: clickhouse_get_values * + * * + * Purpose: gets item history data from history storage * + * * + * Parameters: hist - [IN] the history storage interface * + * itemid - [IN] the itemid * + * start - [IN] the period start timestamp * + * count - [IN] the number of values to read * + * end - [IN] the period end timestamp * + * values - [OUT] the item history data values * + * * + * Return value: SUCCEED - the history data were read successfully * + * FAIL - otherwise * + * * + * Comments: This function reads values from [,] interval or * + * all values from the specified interval if count is zero. * + * * + ************************************************************************************/ +static int clickhouse_get_values(zbx_history_iface_t *hist, zbx_uint64_t itemid, int start, int count, int end, + zbx_vector_history_record_t *values) +{ + const char *__function_name = "clickhouse_get_values"; + + zbx_clickhouse_data_t *data = hist->data; + size_t sql_alloc = 0, sql_offset; + int ret = SUCCEED; + CURLcode err; + struct curl_slist *curl_headers = NULL; + char *sql = NULL, errbuf[CURL_ERROR_SIZE]; + const char *p = NULL; + struct zbx_json_parse jp, jp_sub, jp_data, jp_item; + zbx_history_record_t hr; + + zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __function_name); + + if (NULL == (data->handle = curl_easy_init())) + { + zabbix_log(LOG_LEVEL_ERR, "cannot initialize cURL session"); + + return FAIL; + } + + if (ITEM_VALUE_TYPE_LOG == hist->value_type) + zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, + "SELECT clock, ns, value, timestamp, source, severity, logeventid" + " FROM %s" + " WHERE itemid=" ZBX_FS_UI64, + value_type_table[hist->value_type], itemid); + else + zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, + "SELECT clock, ns, value" + " FROM %s" + " WHERE itemid=" ZBX_FS_UI64, + value_type_table[hist->value_type], itemid); + + if (0 < start) + zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, " AND clock>%d", start); + + if (0 < end) + zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, " AND clock<=%d", end); + + zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, " ORDER BY clock DESC"); + + if (0 < count) + zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, " LIMIT %d", count); + + zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, " FORMAT JSON"); + + curl_headers = curl_slist_append(curl_headers, "Content-Type: application/json"); + + curl_easy_setopt(data->handle, CURLOPT_URL, data->base_url); + curl_easy_setopt(data->handle, CURLOPT_POSTFIELDS, sql); + curl_easy_setopt(data->handle, CURLOPT_WRITEFUNCTION, curl_write_cb); + curl_easy_setopt(data->handle, CURLOPT_WRITEDATA, &page_r); + curl_easy_setopt(data->handle, CURLOPT_HTTPHEADER, curl_headers); + curl_easy_setopt(data->handle, CURLOPT_FAILONERROR, 1L); + curl_easy_setopt(data->handle, CURLOPT_ERRORBUFFER, errbuf); + + zabbix_log(LOG_LEVEL_DEBUG, "sending query to %s; post data: %s", data->base_url, sql); + + page_r.offset = 0; + *errbuf = '\0'; + if (CURLE_OK != (err = curl_easy_perform(data->handle))) + { + clickhouse_log_error(data->handle, err, errbuf); + ret = FAIL; + goto out; + } + + zabbix_log(LOG_LEVEL_DEBUG, "received from ClickHouse: %s", page_r.data); + + zbx_json_open(page_r.data, &jp); + zbx_json_brackets_open(jp.start, &jp_sub); + zbx_json_brackets_by_name(&jp_sub, "data", &jp_data); + + while (NULL != (p = zbx_json_next(&jp_data, p))) + { + if (SUCCEED != zbx_json_brackets_open(p, &jp_item)) + continue; + + if (SUCCEED != history_parse_value(&jp_item, hist->value_type, &hr)) + continue; + + zbx_vector_history_record_append_ptr(values, &hr); + } + +out: + clickhouse_close(hist); + + curl_slist_free_all(curl_headers); + + zbx_free(sql); + + zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __function_name); + + return ret; +} + +/************************************************************************************ + * * + * Function: clickhouse_add_values * + * * + * Purpose: sends history data to the storage * + * * + * Parameters: hist - [IN] the history storage interface * + * history - [IN] the history data vector (may have mixed value types) * + * * + ************************************************************************************/ +static int clickhouse_add_values(zbx_history_iface_t *hist, const zbx_vector_ptr_t *history) +{ + const char *__function_name = "clickhouse_add_values"; + + zbx_clickhouse_data_t *data = hist->data; + int i, num = 0; + ZBX_DC_HISTORY *h; + struct zbx_json json; + size_t buf_offset = 0; + + zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __function_name); + + if (ITEM_VALUE_TYPE_LOG == hist->value_type) + zbx_snprintf_alloc(&data->buf, &data->buf_alloc, &buf_offset, + "INSERT INTO %s(itemid, value, timestamp, source, severity, logeventid, clock, ns)" + " FORMAT JSONEachRow\n", value_type_table[hist->value_type]); + else + zbx_snprintf_alloc(&data->buf, &data->buf_alloc, &buf_offset, + "INSERT INTO %s(itemid, value, clock, ns) FORMAT JSONEachRow\n", + value_type_table[hist->value_type]); + + for (i = 0; i < history->values_num; i++) + { + h = (ZBX_DC_HISTORY *)history->values[i]; + + if (hist->value_type != h->value_type) + continue; + + zbx_json_init(&json, ZBX_JSON_ALLOCATE); + + zbx_json_adduint64(&json, "itemid", h->itemid); + + switch (h->value_type) + { + case ITEM_VALUE_TYPE_STR: + case ITEM_VALUE_TYPE_TEXT: + zbx_json_addstring(&json, "value", h->value.str, ZBX_JSON_TYPE_STRING); + break; + case ITEM_VALUE_TYPE_LOG: + zbx_json_addstring(&json, "value", h->value.log->value, ZBX_JSON_TYPE_STRING); + break; + case ITEM_VALUE_TYPE_FLOAT: + zbx_json_adddbl(&json, "value", h->value.dbl); + break; + case ITEM_VALUE_TYPE_UINT64: + zbx_json_adduint64(&json, "value", h->value.ui64); + break; + } + + if (ITEM_VALUE_TYPE_LOG == h->value_type) + { + const zbx_log_value_t *log; + + log = h->value.log; + + zbx_json_adduint64(&json, "timestamp", log->timestamp); + zbx_json_addstring(&json, "source", ZBX_NULL2EMPTY_STR(log->source), ZBX_JSON_TYPE_STRING); + zbx_json_adduint64(&json, "severity", log->severity); + zbx_json_adduint64(&json, "logeventid", log->logeventid); + } + + zbx_json_adduint64(&json, "clock", h->ts.sec); + zbx_json_adduint64(&json, "ns", h->ts.ns); + + zbx_json_close(&json); + + zbx_snprintf_alloc(&data->buf, &data->buf_alloc, &buf_offset, "%s\n", json.buffer); + + zbx_json_free(&json); + + num++; + } + + if (num > 0) + clickhouse_writer_add_iface(hist); + + zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __function_name); + + return num; +} + +/************************************************************************************ + * * + * Function: clickhouse_flush * + * * + * Purpose: flushes the history data to storage * + * * + * Parameters: hist - [IN] the history storage interface * + * * + * Comments: This function will try to flush the data until it succeeds or * + * unrecoverable error occurs * + * * + ************************************************************************************/ +static void clickhouse_flush(zbx_history_iface_t *hist) +{ + ZBX_UNUSED(hist); + + clickhouse_writer_flush(); +} + +/************************************************************************************ + * * + * Function: zbx_history_clickhouse_init * + * * + * Purpose: initializes history storage interface * + * * + * Parameters: hist - [IN] the history storage interface * + * value_type - [IN] the target value type * + * error - [OUT] the error message * + * * + * Return value: SUCCEED - the history storage interface was initialized * + * FAIL - otherwise * + * * + ************************************************************************************/ +int zbx_history_clickhouse_init(zbx_history_iface_t *hist, unsigned char value_type, const char *url, char **error) +{ + zbx_clickhouse_data_t *data; + + if (0 != curl_global_init(CURL_GLOBAL_ALL)) + { + *error = zbx_strdup(*error, "Cannot initialize cURL library"); + return FAIL; + } + + data = zbx_malloc(NULL, sizeof(zbx_clickhouse_data_t)); + memset(data, 0, sizeof(zbx_clickhouse_data_t)); + data->base_url = zbx_strdup(NULL, url); + zbx_rtrim(data->base_url, "/"); + data->buf = NULL; + data->buf_alloc = 0; + data->handle = NULL; + + hist->value_type = value_type; + hist->data = data; + hist->destroy = clickhouse_destroy; + hist->add_values = clickhouse_add_values; + hist->flush = clickhouse_flush; + hist->get_values = clickhouse_get_values; + hist->requires_trends = 0; + + return SUCCEED; +} + +#else + +int zbx_history_clickhouse_init(zbx_history_iface_t *hist, unsigned char value_type, char **error) +{ + ZBX_UNUSED(hist); + ZBX_UNUSED(value_type); + + *error = zbx_strdup(*error, "cURL library support >= 7.28.0 is required for ClickHouse history backend"); + return FAIL; +} + +#endif Index: zabbix-3.4.12-1+buster/include/zbxjson.h =================================================================== --- zabbix-3.4.12-1+buster.orig/include/zbxjson.h +++ zabbix-3.4.12-1+buster/include/zbxjson.h @@ -160,6 +160,7 @@ void zbx_json_addarray(struct zbx_json * void zbx_json_addstring(struct zbx_json *j, const char *name, const char *string, zbx_json_type_t type); void zbx_json_adduint64(struct zbx_json *j, const char *name, zbx_uint64_t value); void zbx_json_addint64(struct zbx_json *j, const char *name, zbx_int64_t value); +void zbx_json_adddbl(struct zbx_json *j, const char *name, double value); int zbx_json_close(struct zbx_json *j); int zbx_json_open(const char *buffer, struct zbx_json_parse *jp); Index: zabbix-3.4.12-1+buster/src/libs/zbxjson/json.c =================================================================== --- zabbix-3.4.12-1+buster.orig/src/libs/zbxjson/json.c +++ zabbix-3.4.12-1+buster/src/libs/zbxjson/json.c @@ -385,6 +385,14 @@ void zbx_json_addint64(struct zbx_json * zbx_json_addstring(j, name, buffer, ZBX_JSON_TYPE_INT); } +void zbx_json_adddbl(struct zbx_json *j, const char *name, double value) +{ + char buffer[MAX_ID_LEN]; + + zbx_snprintf(buffer, sizeof(buffer), ZBX_FS_DBL, value); + zbx_json_addstring(j, name, buffer, ZBX_JSON_TYPE_INT); +} + int zbx_json_close(struct zbx_json *j) { if (1 == j->level)