Add libevent

This commit is contained in:
Andrey Rodionov 2023-05-28 01:25:05 +03:00
parent 73e1a752cc
commit c38bf77a89
10 changed files with 159 additions and 141 deletions

View File

@ -1,3 +1,2 @@
project(subprojects)
add_subdirectory(asio)

View File

@ -1,15 +0,0 @@
cmake_minimum_required(VERSION 3.5)
include(FetchContent)
FetchContent_Declare(asio
GIT_REPOSITORY git@github.com:chriskohlhoff/asio.git
GIT_TAG master
CONFIGURE_COMMAND ""
BUILD_COMMAND ""
)
FetchContent_GetProperties(asio)
if(NOT asio_POPULATED)
FetchContent_Populate(asio)
endif()
add_library(asio INTERFACE)
target_include_directories(asio INTERFACE ${asio_SOURCE_DIR}/asio/include)

31
cmake/FindLibEvent.cmake Normal file
View File

@ -0,0 +1,31 @@
set(LibEvent_EXTRA_PREFIXES /usr /usr/local /opt/local "$ENV{HOME}")
foreach(prefix ${LibEvent_EXTRA_PREFIXES})
list(APPEND LibEvent_INCLUDE_PATHS "${prefix}/include")
list(APPEND LibEvent_LIB_PATHS "${prefix}/lib")
endforeach()
find_path(LibEvent_INCLUDE_DIR event.h PATHS ${LibEvent_INCLUDE_PATHS})
find_library(LibEvent_LIBRARIES NAMES event PATHS ${LibEvent_LIB_PATHS})
if (LibEvent_LIBRARIES AND LibEvent_INCLUDE_DIR)
set(LibEvent_FOUND TRUE)
set(LibEvent_LIBRARIES ${LIBEVENT_LIB})
else ()
set(LibEvent_FOUND FALSE)
endif ()
if (LibEvent_FOUND)
if (NOT LibEvent_FIND_QUIETLY)
message(STATUS "Found LibEvent: ${LIBEVENT_LIB}")
endif ()
else ()
if (LibEvent_FIND_REQUIRED)
message(FATAL_ERROR "Could NOT find LibEvent.")
endif ()
message(STATUS "LibEvent NOT found.")
endif ()
mark_as_advanced(
LIBEVENT_LIB
LibEvent_INCLUDE_DIR
)

View File

@ -1,7 +1,9 @@
# Файлы для сборки
# Файлы для сборки
file(GLOB HEADERS "*.h")
file(GLOB SOURCES "*.cpp")
list(APPEND CMAKE_MODULE_PATH "${CMAKE_SOURCE_DIR}/cmake/")
# Пути к бинарным файлам
set(BINARY_DIR "${CMAKE_BINARY_DIR}")
set(APPPATH "bin")

View File

@ -5,11 +5,9 @@ project(asyncpg LANGUAGES CXX)
set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
file(GLOB HEADERS "*.h")
file(GLOB SOURCES "*.cpp")
include("${CMAKE_SOURCE_DIR}/cmake/libs.cmake")
find_package(PostgreSQL)
find_package(LibEvent)
target_link_libraries(asyncpg PRIVATE ${PostgreSQL_LIBRARIES} asio)
target_link_libraries(asyncpg PRIVATE ${PostgreSQL_LIBRARIES} ${LibEvent_LIBRARIES})

View File

@ -1,25 +1,37 @@
#include "SqlConnect.h"
#include "SqlValue.h"
#include "asio/io_service.hpp"
#include "asio/local/stream_protocol.hpp"
#include <event.h>
#include <event2/event.h>
#include <libpq-fe.h>
#include <iostream>
#include <libpq-fe.h>
#include <thread>
#include <unistd.h>
namespace AsyncPg {
static asio::local::stream_protocol::socket socket(asio::io_context *service, PGconn *connect)
static void ev_connecting(evutil_socket_t fd, short what, void *arg)
{
asio::local::stream_protocol stream_protocol;
auto *serv = service ? service : SqlConnect::service();
return asio::local::stream_protocol::socket(*serv, stream_protocol, dup(PQsocket(connect)));
auto *sqlConnect = reinterpret_cast<SqlConnect *>(arg);
sqlConnect->connecting();
}
SqlConnect::SqlConnect(std::string_view connInfo, asio::io_context *service)
static void ev_preparing(evutil_socket_t fd, short what, void *arg)
{
_service = service;
auto *sqlConnect = reinterpret_cast<SqlConnect *>(arg);
sqlConnect->preparing();
}
static void ev_executing(evutil_socket_t fd, short what, void *arg)
{
auto *sqlConnect = reinterpret_cast<SqlConnect *>(arg);
sqlConnect->executing();
}
SqlConnect::SqlConnect(std::string_view connInfo, event_base *evbase)
{
_evbase = evbase;
_connInfo = connInfo;
_connect = PQconnectStart(connInfo.data());
if (PQstatus(_connect) == CONNECTION_BAD) {
@ -27,6 +39,7 @@ SqlConnect::SqlConnect(std::string_view connInfo, asio::io_context *service)
return;
}
_socket = dup(PQsocket(_connect));
connecting();
}
@ -47,7 +60,9 @@ void SqlConnect::execute(std::string_view sql)
return;
}
self->_error.clear();
self->executing();
auto event = event_new(self->_evbase, self->_socket, EV_READ, ev_executing, self);
event_add(event, nullptr);
};
push(callback);
}
@ -81,7 +96,9 @@ void SqlConnect::execute(std::string_view sql, std::vector<SqlValue> params)
return;
}
self->_error.clear();
self->executing();
auto event = event_new(self->_evbase, self->_socket, EV_READ, ev_executing, self);
event_add(event, nullptr);
};
push(callback);
}
@ -104,7 +121,9 @@ void SqlConnect::prepare(std::string_view sql, std::vector<SqlType> sqlTypes)
return;
}
self->_error.clear();
self->preparing();
auto event = event_new(self->_evbase, self->_socket, EV_READ, ev_preparing, self);
event_add(event, nullptr);
};
push(callback);
}
@ -136,7 +155,9 @@ void SqlConnect::execute(std::vector<SqlValue> params)
return;
}
self->_error.clear();
self->executing();
auto event = event_new(self->_evbase, self->_socket, EV_READ, ev_executing, self);
event_add(event, nullptr);
};
push(callback);
}
@ -168,53 +189,27 @@ void SqlConnect::post(const Callback &func)
SqlConnect SqlConnect::clone()
{
return SqlConnect(_connInfo, _service);
}
asio::io_context *SqlConnect::service()
{
static bool started = false;
static asio::io_context service{static_cast<int>(std::thread::hardware_concurrency())};
if (!started) {
std::thread thread([](){
::asio::io_context::work work(*SqlConnect::service());
SqlConnect::service()->run();
});
thread.detach();
started = true;
}
return &service;
return SqlConnect(_connInfo, _evbase);
}
void SqlConnect::connecting()
{
auto ret = PQconnectPoll(_connect);
switch (ret) {
case PGRES_POLLING_READING:
socket(_service, _connect).async_read_some(
::asio::null_buffers(),
[this]([[maybe_unused]] std::error_code code, [[maybe_unused]] std::size_t size) {
connecting();
});
break;
case PGRES_POLLING_WRITING:
socket(_service, _connect).async_write_some(
::asio::null_buffers(),
[this]([[maybe_unused]] std::error_code code, [[maybe_unused]] std::size_t size) {
connecting();
});
break;
case PGRES_POLLING_READING: {
auto event = event_new(_evbase, _socket, EV_READ, ev_connecting, this);
event_add(event, nullptr);
} break;
case PGRES_POLLING_WRITING: {
auto event = event_new(_evbase, _socket, EV_WRITE, ev_connecting, this);
event_add(event, nullptr);
} break;
case PGRES_POLLING_OK:
pop();
break;
case PGRES_POLLING_FAILED:
_error = SqlError("Connection to database failed.", PQerrorMessage(_connect));
break;
default:
break;
}
@ -222,63 +217,57 @@ void SqlConnect::connecting()
void SqlConnect::preparing()
{
socket(_service, _connect).async_read_some(
::asio::null_buffers(),
[this]([[maybe_unused]] std::error_code code, [[maybe_unused]] std::size_t size) {
auto pgconn = connect();
if (PQconsumeInput(pgconn) != 1) {
_error = SqlError("Preparation sql query failed.", PQerrorMessage(pgconn));
pop();
return;
}
auto pgconn = connect();
if (PQconsumeInput(pgconn) != 1) {
_error = SqlError("Preparation sql query failed.", PQerrorMessage(pgconn));
pop();
return;
}
if (PQisBusy(pgconn) != 1) {
executing();
return;
}
if (PQisBusy(pgconn) == 1) {
auto event = event_new(_evbase, _socket, EV_READ, ev_preparing, this);
event_add(event, nullptr);
return;
}
if (auto pgResult = PQgetResult(pgconn)) {
if (PQresultStatus(pgResult) != PGRES_COMMAND_OK)
_error = SqlError("Preparation sql query failed.", PQerrorMessage(pgconn));
PQclear(pgResult);
}
if (auto pgResult = PQgetResult(pgconn)) {
if (PQresultStatus(pgResult) != PGRES_COMMAND_OK)
_error = SqlError("Preparation sql query failed.", PQerrorMessage(pgconn));
PQclear(pgResult);
}
while (auto pgResult = PQgetResult(pgconn))
PQclear(pgResult);
pop();
});
while (auto pgResult = PQgetResult(pgconn))
PQclear(pgResult);
pop();
}
void SqlConnect::executing()
{
socket(_service, _connect).async_read_some(
::asio::null_buffers(),
[this]([[maybe_unused]] std::error_code code, [[maybe_unused]] std::size_t size) {
auto pgconn = connect();
if (PQconsumeInput(pgconn) != 1) {
_error = SqlError("Execution sql query failed.", PQerrorMessage(pgconn));
pop();
return;
}
auto pgconn = connect();
if (PQconsumeInput(pgconn) != 1) {
_error = SqlError("Execution sql query failed.", PQerrorMessage(pgconn));
pop();
return;
}
if (PQisBusy(pgconn) != 1) {
executing();
return;
}
if (PQisBusy(pgconn) == 1) {
auto event = event_new(_evbase, _socket, EV_READ, ev_executing, this);
event_add(event, nullptr);
return;
}
if (auto pgResult = PQgetResult(pgconn)) {
if (PQresultStatus(pgResult) == PGRES_TUPLES_OK) {
_result = SqlResult(pgResult);
} else {
_error = SqlError("Execution sql query failed.", PQerrorMessage(pgconn));
PQclear(pgResult);
}
}
if (auto pgResult = PQgetResult(pgconn)) {
if (PQresultStatus(pgResult) == PGRES_TUPLES_OK) {
_result = SqlResult(pgResult);
} else {
_error = SqlError("Execution sql query failed.", PQerrorMessage(pgconn));
PQclear(pgResult);
}
}
while (auto pgResult = PQgetResult(pgconn))
PQclear(pgResult);
pop();
});
while (auto pgResult = PQgetResult(pgconn))
PQclear(pgResult);
pop();
}
void SqlConnect::pop()

View File

@ -8,8 +8,7 @@
#include <functional>
typedef struct pg_conn PGconn;
namespace asio { class io_context; }
struct event_base;
namespace AsyncPg {
@ -23,7 +22,7 @@ public:
/// Конструктор класса
/// @param connInfo Строка соединения с базой данных в URI формате
/// @param service Сервис ввода-вывода
explicit SqlConnect(std::string_view connInfo, asio::io_context *service = nullptr);
explicit SqlConnect(std::string_view connInfo, struct event_base *evbase = nullptr);
/// Деструктор класса
~SqlConnect();
@ -66,15 +65,6 @@ public:
/// @return Результат выполнения запроса
const SqlResult &result() const;
/// Возвращает сервис ввода-вывода
/// @return Сервис ввода-вывода
static asio::io_context *service();
protected:
/// Возвращает соединение PostgreSql
/// @return Соединение PostgreSql
PGconn *connect();
/// Производит соединение с PostgreSql
void connecting();
@ -84,6 +74,11 @@ protected:
/// Производит запуск SQL запроса
void executing();
protected:
/// Возвращает соединение PostgreSql
/// @return Соединение PostgreSql
PGconn *connect();
/// Добавляет обработчик результата SQL запроса в очередь
/// @param callback Функция обратного вызова
/// @return Была ли вызван callback
@ -93,13 +88,14 @@ protected:
void pop();
private:
asio::io_context *_service = nullptr;
struct event_base *_evbase = nullptr;
std::queue<Callback> _callbackQueue;
PGconn *_connect = nullptr;
std::string _connInfo;
SqlError _error;
SqlResult _result;
bool _isExec = true;
int _socket = -1;
};
}

View File

@ -1,4 +1,4 @@
include("${CMAKE_SOURCE_DIR}/cmake/main.cmake")
include("${CMAKE_SOURCE_DIR}/cmake/main.cmake")
# Места нахождения бинарных файлов
set(CMAKE_RUNTIME_OUTPUT_DIRECTORY "${BINARY_DIR}/${TESTPATH}")

View File

@ -1,9 +1,8 @@
cmake_minimum_required(VERSION 3.10)
project(tst_main_mnl VERSION 1.0.0)
set(LIBRARIES
asyncpg asio
)
set(LIBRARIES asyncpg)
include(../manual.cmake)
#target_link_libraries(tst_main_mnl PRIVATE asio)
find_package(LibEvent)
target_link_libraries(${PROJECT_NAME} ${LibEvent_LIBRARIES})

View File

@ -4,7 +4,7 @@
#include <asyncpg/SqlRecord.h>
#include <asyncpg/SqlValue.h>
#include <asio/io_service.hpp>
#include <event.h>
#include <cstdint>
#include <memory>
@ -15,15 +15,13 @@
int main(int argc, char* argv[])
{
asio::io_service service{static_cast<int>(std::thread::hardware_concurrency())};
AsyncPg::SqlConnect connect("postgresql://postgres:1@localhost/RPLMCAPP", &service);
auto *evBase = event_base_new();
AsyncPg::SqlConnect connect("postgresql://postgres:1@localhost/RPLMCAPP", evBase);
connect.prepare(R"(SELECT * FROM "dbo"."test" WHERE "a" >= $1)", {AsyncPg::SqlType::Decimal});
connect.execute({AsyncPg::makeSqlValue<AsyncPg::SqlType::Decimal>("1.001")});
// connect.execute(
// R"(SELECT * FROM "dbo"."test" WHERE "a" >= $1)",
// {AsyncPg::makeSqlValue<AsyncPg::SqlType::Decimal>("1.001")});
connect.post([](AsyncPg::SqlConnect *self) {
if (self->error())
return;
@ -38,5 +36,26 @@ int main(int argc, char* argv[])
}
}
});
service.run();
connect.execute(
R"(SELECT * FROM "dbo"."test" WHERE "a" >= $1)",
{AsyncPg::makeSqlValue<AsyncPg::SqlType::Decimal>("1.001")});
connect.post([](AsyncPg::SqlConnect *self) {
if (self->error())
return;
for (const auto &record : self->result()) {
for (const auto &field : record) {
if (auto value = field.value<AsyncPg::SqlType::Decimal>())
std::cout << *value;
else
std::cout << "NULL";
std::cout << std::endl;
}
}
});
event_base_dispatch(evBase);
event_base_free(evBase);
}