asyncpg/src/SqlConnect.cpp

372 lines
9.6 KiB
C++

#include "SqlConnect.h"
#include "SqlError.h"
#include "SqlValue.h"
#include <event.h>
#include <event2/event.h>
#include <libpq-fe.h>
#include <iostream>
#ifdef _WIN32
#include <io.h>
#else
#include <unistd.h>
#endif
namespace AsyncPg {
static void ev_connecting(evutil_socket_t /*fd*/, short /*what*/, void *arg)
{
auto *sqlConnect = reinterpret_cast<SqlConnect *>(arg);
sqlConnect->connecting();
}
static void ev_preparing(evutil_socket_t /*fd*/, short /*what*/, void *arg)
{
auto *sqlConnect = reinterpret_cast<SqlConnect *>(arg);
sqlConnect->preparing();
}
static void ev_executing(evutil_socket_t /*fd*/, short /*what*/, void *arg)
{
auto *sqlConnect = reinterpret_cast<SqlConnect *>(arg);
sqlConnect->executing();
}
SqlConnect::SqlConnect(std::string_view connInfo, event_base *evbase)
{
_evbase = evbase;
_connInfo = connInfo;
_connect = PQconnectStart(connInfo.data());
if (PQstatus(_connect) == CONNECTION_BAD) {
_error = SqlError(ErrorCode::ConnectionFailed, PQerrorMessage(_connect));
return;
}
#ifdef _WIN32
_socket = _dup(PQsocket(_connect));
#else
_socket = dup(PQsocket(_connect));
#endif
connecting();
}
SqlConnect::SqlConnect(SqlConnect &&other) noexcept
{
_evbase = other._evbase;
_callbackQueue = std::move(other._callbackQueue);
_connect = other._connect;
_connInfo = std::move(other._connInfo);
_error = std::move(other._error);
_result = std::move(other._result);
_isExec = other._isExec;
_socket = other._socket;
other._evbase = nullptr;
other._connect = nullptr;
}
SqlConnect &SqlConnect::operator=(SqlConnect &&other) noexcept
{
_evbase = other._evbase;
_callbackQueue = std::move(other._callbackQueue);
_connect = other._connect;
_connInfo = std::move(other._connInfo);
_error = std::move(other._error);
_result = std::move(other._result);
_isExec = other._isExec;
_socket = other._socket;
other._evbase = nullptr;
other._connect = nullptr;
return *this;
}
SqlConnect::~SqlConnect()
{
if (_connect)
PQfinish(_connect);
}
void SqlConnect::execute(std::string_view sql)
{
auto callback = [sql](SqlConnect *self) {
auto result =
PQsendQueryParams(self->connect(), sql.data(), 0, nullptr, nullptr, nullptr, nullptr, 1);
if (result != 1) {
self->_error = SqlError(ErrorCode::ExecutionFailed, PQerrorMessage(self->connect()));
self->pop();
return;
}
self->_error.clear();
auto event = event_new(self->_evbase, self->_socket, EV_READ, ev_executing, self);
event_add(event, nullptr);
};
push(callback);
}
void SqlConnect::execute(std::string_view sql, std::vector<SqlValue> params)
{
auto callback = [sql, params = std::move(params)](SqlConnect *self) {
const auto nParams = params.size();
auto *types = new unsigned int[nParams];
char **values = new char* [nParams];
int *lengths = new int[nParams];
int *formats = new int[nParams];
for (std::size_t i = 0, e = params.size(); i < e; ++i) {
const auto &[oid, length, value] = asPgValue(params[i]);
types[i] = oid;
values[i] = value;
lengths[i] = static_cast<int>(length);
formats[i] = 1;
}
auto result = PQsendQueryParams(
self->connect(), sql.data(), static_cast<int>(nParams),
types, values, lengths, formats, 1);
for (std::size_t i = 0, e = params.size(); i < e; ++i)
delete[] values[i];
delete[] types;
delete[] values;
delete[] lengths;
delete[] formats;
if (result != 1) {
self->_error = SqlError(ErrorCode::ExecutionFailed, PQerrorMessage(self->connect()));
self->pop();
return;
}
self->_error.clear();
auto event = event_new(self->_evbase, self->_socket, EV_READ, ev_executing, self);
event_add(event, nullptr);
};
push(callback);
}
void SqlConnect::prepare(std::string_view sql, std::vector<SqlType> sqlTypes)
{
auto callback = [sql, sqlTypes = std::move(sqlTypes)](SqlConnect *self) {
const auto nTypes = sqlTypes.size();
auto *types = new unsigned int[nTypes];
for (std::size_t i = 0, e = nTypes; i < e; ++i) {
types[i] = toPgType(sqlTypes[i]);
}
auto result = PQsendPrepare(
self->connect(), "", sql.data(), static_cast<int>(nTypes), types);
delete[] types;
if (result != 1) {
self->_error = SqlError(ErrorCode::PreparationFailed, PQerrorMessage(self->connect()));
self->pop();
return;
}
self->_error.clear();
auto event = event_new(self->_evbase, self->_socket, EV_READ, ev_preparing, self);
event_add(event, nullptr);
};
push(callback);
}
void SqlConnect::execute(std::vector<SqlValue> params)
{
auto callback = [params = std::move(params)](SqlConnect *self) {
const auto nParams = params.size();
char **values = new char* [nParams];
int *lengths = new int[nParams];
int *formats = new int[nParams];
for (std::size_t i = 0, e = params.size(); i < e; ++i) {
const auto &[oid, length, value] = asPgValue(params[i]);
values[i] = value;
lengths[i] = static_cast<int>(length);
formats[i] = 1;
}
auto result = PQsendQueryPrepared(
self->connect(), "", static_cast<int>(nParams), values, lengths, formats, 1);
for (std::size_t i = 0, e = params.size(); i < e; ++i)
delete[] values[i];
delete[] values;
delete[] lengths;
delete[] formats;
if (result != 1) {
self->_error = SqlError(ErrorCode::ExecutionFailed, PQerrorMessage(self->connect()));
self->pop();
return;
}
self->_error.clear();
auto event = event_new(self->_evbase, self->_socket, EV_READ, ev_executing, self);
event_add(event, nullptr);
};
push(callback);
}
bool SqlConnect::cancel()
{
while (!_callbackQueue.empty())
_callbackQueue.pop();
char errorBuffer[256];
auto cancelObject = PQgetCancel(_connect);
bool canceled = (PQcancel(cancelObject, errorBuffer, sizeof(errorBuffer)) != 0);
if (!canceled)
_error = SqlError(ErrorCode::CancelFailed, PQerrorMessage(_connect));
PQfreeCancel(cancelObject);
return canceled;
}
void SqlConnect::post(Callback func)
{
auto callback = [func = std::move(func)](SqlConnect *self) {
func(self);
self->pop();
};
push(callback);
}
SqlConnect SqlConnect::clone()
{
return SqlConnect(_connInfo, _evbase);
}
void SqlConnect::connecting()
{
auto ret = PQconnectPoll(_connect);
switch (ret) {
case PGRES_POLLING_READING: {
auto event = event_new(_evbase, _socket, EV_READ, ev_connecting, this);
event_add(event, nullptr);
} break;
case PGRES_POLLING_WRITING: {
auto event = event_new(_evbase, _socket, EV_WRITE, ev_connecting, this);
event_add(event, nullptr);
} break;
case PGRES_POLLING_OK:
pop();
break;
case PGRES_POLLING_FAILED:
_error = SqlError(ErrorCode::ConnectionFailed, PQerrorMessage(_connect));
pop();
break;
default:
break;
}
}
void SqlConnect::preparing()
{
auto pgconn = connect();
if (PQconsumeInput(pgconn) != 1) {
_error = SqlError(ErrorCode::PreparationFailed, PQerrorMessage(pgconn));
pop();
return;
}
if (PQisBusy(pgconn) == 1) {
auto event = event_new(_evbase, _socket, EV_READ, ev_preparing, this);
event_add(event, nullptr);
return;
}
if (auto pgResult = PQgetResult(pgconn)) {
if (PQresultStatus(pgResult) != PGRES_COMMAND_OK)
_error = SqlError(ErrorCode::PreparationFailed, PQerrorMessage(pgconn));
PQclear(pgResult);
}
while (auto pgResult = PQgetResult(pgconn))
PQclear(pgResult);
pop();
}
void SqlConnect::executing()
{
auto pgconn = connect();
if (PQconsumeInput(pgconn) != 1) {
_error = SqlError(ErrorCode::ExecutionFailed, PQerrorMessage(pgconn));
pop();
return;
}
if (PQisBusy(pgconn) == 1) {
auto event = event_new(_evbase, _socket, EV_READ, ev_executing, this);
event_add(event, nullptr);
return;
}
if (auto pgResult = PQgetResult(pgconn)) {
if (PQresultStatus(pgResult) == PGRES_TUPLES_OK) {
_result = SqlResult(pgResult);
} else {
_error = SqlError(ErrorCode::ExecutionFailed, PQerrorMessage(pgconn));
PQclear(pgResult);
}
}
while (auto pgResult = PQgetResult(pgconn))
PQclear(pgResult);
pop();
}
void SqlConnect::pop()
{
if (!_callbackQueue.empty()) {
auto callback = _callbackQueue.front();
_callbackQueue.pop();
callback(this);
} else {
_isExec = false;
}
}
const SqlResult &SqlConnect::result() const
{
return _result;
}
bool SqlConnect::isBusy() const
{
return _isExec;
}
const SqlError &SqlConnect::error() const
{
return _error;
}
bool SqlConnect::push(const SqlConnect::Callback &callback)
{
bool isCall = false;
if (_isExec) {
_callbackQueue.push(callback);
} else {
callback(this);
_isExec = true;
isCall = true;
}
return isCall;
}
struct pg_conn *SqlConnect::connect()
{
return _connect;
}
}