PBAP: remove transfer via pipe

Using a pipe was never fully supported by obexd (blocks
obexd). Transfering in suitably sized chunks (FDO #77272) will be a
more obexd friendly solution with a similar effect (not having to
buffer the entire address book in memory).
This commit is contained in:
Patrick Ohly 2014-07-02 13:19:01 +02:00
parent 4f16f586f1
commit 8142fefe91

View file

@ -75,17 +75,6 @@ class PullAll
// refers to chunks of m_buffer or m_tmpFile without copying them via the contact number. // refers to chunks of m_buffer or m_tmpFile without copying them via the contact number.
Content m_content; Content m_content;
// When using pipe:
// - split into queue of std::strings, read from start to finish
// - discard contact strings that are no longer needed
int m_firstContactInQueue;
ContactQueue m_queue;
// - buffer for reading from pipe
char *m_pipeBuffer;
size_t m_pipeBufferSize;
size_t m_pipeBufferUsed;
size_t m_pipeBufferTotal;
int m_numContacts; // Number of existing contacts, according to GetSize() or after downloading. int m_numContacts; // Number of existing contacts, according to GetSize() or after downloading.
int m_currentContact; // Numbered starting with zero according to discovery in addVCards. int m_currentContact; // Numbered starting with zero according to discovery in addVCards.
boost::shared_ptr<PbapSession> m_session; // Only set when there is a transfer ongoing. boost::shared_ptr<PbapSession> m_session; // Only set when there is a transfer ongoing.
@ -102,11 +91,6 @@ public:
}; };
PullAll::PullAll() : PullAll::PullAll() :
m_firstContactInQueue(0),
m_pipeBuffer(NULL),
m_pipeBufferSize(0),
m_pipeBufferUsed(0),
m_pipeBufferTotal(0),
m_numContacts(0), m_numContacts(0),
m_currentContact(0), m_currentContact(0),
m_tmpFileOffset(0) m_tmpFileOffset(0)
@ -114,7 +98,6 @@ PullAll::PullAll() :
PullAll::~PullAll() PullAll::~PullAll()
{ {
free(m_pipeBuffer);
} }
enum PullData enum PullData
@ -588,8 +571,7 @@ boost::shared_ptr<PullAll> PbapSession::startPullAll(PullData pullData)
state->m_numContacts = GDBusCXX::DBusClientCall1<uint16_t>(*m_session, "GetSize")(); state->m_numContacts = GDBusCXX::DBusClientCall1<uint16_t>(*m_session, "GetSize")();
SE_LOG_DEBUG(NULL, "Expecting %d contacts.", state->m_numContacts); SE_LOG_DEBUG(NULL, "Expecting %d contacts.", state->m_numContacts);
TmpFile::Type type = getenv("SYNCEVOLUTION_PBAP_PIPE") ? TmpFile::PIPE : TmpFile::FILE; state->m_tmpFile.create(TmpFile::FILE);
state->m_tmpFile.create(type);
SE_LOG_DEBUG(NULL, "Created temporary file for PullAll %s", state->m_tmpFile.filename().c_str()); SE_LOG_DEBUG(NULL, "Created temporary file for PullAll %s", state->m_tmpFile.filename().c_str());
GDBusCXX::DBusClientCall1<std::pair<GDBusCXX::DBusObject_t, Params> > pullall(*m_session, "PullAll"); GDBusCXX::DBusClientCall1<std::pair<GDBusCXX::DBusObject_t, Params> > pullall(*m_session, "PullAll");
std::pair<GDBusCXX::DBusObject_t, Params> tuple = std::pair<GDBusCXX::DBusObject_t, Params> tuple =
@ -638,14 +620,7 @@ const char *PullAll::addVCards(int startIndex, const pcrecpp::StringPiece &vcard
pcrecpp::RE re("[\\r\\n]*(^BEGIN:VCARD.*?^END:VCARD)", pcrecpp::RE re("[\\r\\n]*(^BEGIN:VCARD.*?^END:VCARD)",
pcrecpp::RE_Options().set_dotall(true).set_multiline(true)); pcrecpp::RE_Options().set_dotall(true).set_multiline(true));
while (re.Consume(&tmp, &vcarddata)) { while (re.Consume(&tmp, &vcarddata)) {
if (m_tmpFile.getType() == TmpFile::PIPE) { m_content[count] = vcarddata;
// Must copy into queue.
m_queue.push_back(std::string());
m_queue.back().assign(vcarddata.data(), vcarddata.size());
} else {
// Can continue using the memory-mapped file.
m_content[count] = vcarddata;
}
++count; ++count;
} }
@ -699,122 +674,61 @@ bool PullAll::getContact(int contactNumber, pcrecpp::StringPiece &vcard)
return false; return false;
} }
if (m_tmpFile.getType() == TmpFile::PIPE) { Content::iterator it;
// Delete old contacts. while ((it = m_content.find(contactNumber)) == m_content.end() &&
ContactQueue::iterator it = m_queue.begin(); m_session &&
while (m_firstContactInQueue < contactNumber) { (!m_session->transferComplete() ||
++it; m_tmpFile.moreData())) {
++m_firstContactInQueue; // Wait? We rely on regular propgress signals to wake us up.
// obex 0.47 sends them every 64KB, at least in combination
// with a Samsung Galaxy SIII. This may depend on both obexd
// and the phone, so better check ourselves and perhaps do it
// less often - unmap/map can be expensive and invalidates
// some of the unread data (at least how it is implemented
// now).
while (!m_session->transferComplete() && m_tmpFile.moreData() < 128 * 1024) {
g_main_context_iteration(NULL, true);
} }
m_queue.erase(m_queue.begin(), it); m_session->checkForError();
if (m_tmpFile.moreData()) {
bool eof = false; // Remap. This shifts all addresses already stored in
while (m_queue.empty() && // m_content, so beware and update those.
(!m_session->transferComplete() || !eof)) { pcrecpp::StringPiece oldMem = m_tmpFile.stringPiece();
// Read at least 64KB, increase buffer if too m_tmpFile.unmap();
// small. Happens at least once (initial read) and may m_tmpFile.map();
// happen again when a contact is larger than the current pcrecpp::StringPiece newMem = m_tmpFile.stringPiece();
// buffer size. ssize_t delta = newMem.data() - oldMem.data();
static const size_t chunkSize = 64 * 1024; BOOST_FOREACH (Content::value_type &entry, m_content) {
if (m_pipeBufferSize - m_pipeBufferUsed < chunkSize) { pcrecpp::StringPiece &vcard = entry.second;
size_t newSize = m_pipeBufferSize + chunkSize; vcard.set(vcard.data() + delta, vcard.size());
char *newBuffer = (char *)realloc(m_pipeBuffer, newSize);
if (!newBuffer) {
// Nothing changed, but we can't proceed.
SE_THROW("getContact(): out of memory");
}
m_pipeBuffer = newBuffer;
m_pipeBufferSize = newSize;
}
// Try reading. Blocks until at least one byte becomes available.
ssize_t newData = read(m_tmpFile.getFD(), m_pipeBuffer + m_pipeBufferUsed, m_pipeBufferSize - m_pipeBufferUsed);
SE_LOG_DEBUG(NULL, "PBAP content: next chunk %ld, total %ld, %s",
(long)newData, (long)m_pipeBufferTotal,
newData < 0 ? strerror(errno) : "<<okay>>");
if (newData == 0) {
eof = true;
} else if (newData < 0) {
SE_THROW(StringPrintf("reading PBAP data from pipe: %s", strerror(errno)));
} else {
m_pipeBufferUsed += newData;
m_pipeBufferTotal += newData;
} }
// File exists and obexd has written into it, so now we // File exists and obexd has written into it, so now we
// can unlink it to avoid leaking it if we crash. // can unlink it to avoid leaking it if we crash.
m_tmpFile.remove(); m_tmpFile.remove();
// Parse next chunk, shift remaining data that couldn't // Continue parsing where we stopped before.
// be parsed yet to beginning of buffer and continue; pcrecpp::StringPiece next(newMem.data() + m_tmpFileOffset,
pcrecpp::StringPiece next(m_pipeBuffer, m_pipeBufferUsed); newMem.size() - m_tmpFileOffset);
const char *end = addVCards(m_content.size(), next); const char *end = addVCards(m_content.size(), next);
size_t remaining = m_pipeBuffer + m_pipeBufferUsed - end; size_t newTmpFileOffset = end - newMem.data();
memmove(m_pipeBuffer, end, remaining); SE_LOG_DEBUG(NULL, "PBAP content parsed: %ld out of %d (total), %d out of %d (last update)",
m_pipeBufferUsed = remaining; (long)newTmpFileOffset,
newMem.size(),
(int)(end - next.data()),
next.size());
m_tmpFileOffset = newTmpFileOffset;
} }
if (m_queue.empty()) {
SE_LOG_DEBUG(NULL, "did not get the expected contact #%d, perhaps some contacts were deleted?", contactNumber);
return false;
}
const std::string &next = m_queue.front();
vcard.set(next.c_str(), next.size());
} else {
Content::iterator it;
while ((it = m_content.find(contactNumber)) == m_content.end() &&
m_session &&
(!m_session->transferComplete() ||
m_tmpFile.moreData())) {
// Wait? We rely on regular propgress signals to wake us up.
// obex 0.47 sends them every 64KB, at least in combination
// with a Samsung Galaxy SIII. This may depend on both obexd
// and the phone, so better check ourselves and perhaps do it
// less often - unmap/map can be expensive and invalidates
// some of the unread data (at least how it is implemented
// now).
while (!m_session->transferComplete() && m_tmpFile.moreData() < 128 * 1024) {
g_main_context_iteration(NULL, true);
}
m_session->checkForError();
if (m_tmpFile.moreData()) {
// Remap. This shifts all addresses already stored in
// m_content, so beware and update those.
pcrecpp::StringPiece oldMem = m_tmpFile.stringPiece();
m_tmpFile.unmap();
m_tmpFile.map();
pcrecpp::StringPiece newMem = m_tmpFile.stringPiece();
ssize_t delta = newMem.data() - oldMem.data();
BOOST_FOREACH (Content::value_type &entry, m_content) {
pcrecpp::StringPiece &vcard = entry.second;
vcard.set(vcard.data() + delta, vcard.size());
}
// File exists and obexd has written into it, so now we
// can unlink it to avoid leaking it if we crash.
m_tmpFile.remove();
// Continue parsing where we stopped before.
pcrecpp::StringPiece next(newMem.data() + m_tmpFileOffset,
newMem.size() - m_tmpFileOffset);
const char *end = addVCards(m_content.size(), next);
size_t newTmpFileOffset = end - newMem.data();
SE_LOG_DEBUG(NULL, "PBAP content parsed: %ld out of %d (total), %d out of %d (last update)",
(long)newTmpFileOffset,
newMem.size(),
(int)(end - next.data()),
next.size());
m_tmpFileOffset = newTmpFileOffset;
}
}
if (it == m_content.end()) {
SE_LOG_DEBUG(NULL, "did not get the expected contact #%d, perhaps some contacts were deleted?", contactNumber);
return false;
}
vcard = it->second;
} }
if (it == m_content.end()) {
SE_LOG_DEBUG(NULL, "did not get the expected contact #%d, perhaps some contacts were deleted?",
contactNumber);
return false;
}
vcard = it->second;
return true; return true;
} }