diff --git a/hoot-core/src/main/cpp/hoot/core/io/OsmApiWriter.cpp b/hoot-core/src/main/cpp/hoot/core/io/OsmApiWriter.cpp
index 7842b74..9a06144 100644
--- a/hoot-core/src/main/cpp/hoot/core/io/OsmApiWriter.cpp
+++ b/hoot-core/src/main/cpp/hoot/core/io/OsmApiWriter.cpp
@@ -51,7 +51,8 @@ namespace hoot
{
OsmApiWriter::OsmApiWriter(const QUrl &url, const QString &changeset)
- : _description(ConfigOptions().getChangesetDescription()),
+ : _startFlag(false),
+ _description(ConfigOptions().getChangesetDescription()),
_source(ConfigOptions().getChangesetSource()),
_hashtags(ConfigOptions().getChangesetHashtags()),
_maxWriters(ConfigOptions().getChangesetApidbWritersMax()),
@@ -67,7 +68,8 @@ OsmApiWriter::OsmApiWriter(const QUrl &url, const QString &changeset)
_changesetCount(0),
_debugOutput(ConfigOptions().getChangesetApidbWriterDebugOutput()),
_debugOutputPath(ConfigOptions().getChangesetApidbWriterDebugOutputPath()),
- _apiId(0)
+ _apiId(0),
+ _threadsCanExit(false)
{
_changesets.push_back(changeset);
if (isSupported(url))
@@ -75,7 +77,8 @@ OsmApiWriter::OsmApiWriter(const QUrl &url, const QString &changeset)
}
OsmApiWriter::OsmApiWriter(const QUrl& url, const QList<QString>& changesets)
- : _changesets(changesets),
+ : _startFlag(false),
+ _changesets(changesets),
_description(ConfigOptions().getChangesetDescription()),
_source(ConfigOptions().getChangesetSource()),
_hashtags(ConfigOptions().getChangesetHashtags()),
@@ -92,7 +95,8 @@ OsmApiWriter::OsmApiWriter(const QUrl& url, const QList<QString>& changesets)
_changesetCount(0),
_debugOutput(ConfigOptions().getChangesetApidbWriterDebugOutput()),
_debugOutputPath(ConfigOptions().getChangesetApidbWriterDebugOutputPath()),
- _apiId(0)
+ _apiId(0),
+ _threadsCanExit(false)
{
if (isSupported(url))
_url = url;
@@ -140,12 +144,12 @@ bool OsmApiWriter::apply()
_changeset.fixMalformedInput();
// Start the writer threads
LOG_INFO("Starting " << _maxWriters << " processing threads.");
+ _threadIdle.reserve(_maxWriters);
for (int i = 0; i < _maxWriters; ++i)
{
_threadStatus.push_back(ThreadStatus::Working);
_threadPool.push_back(thread(&OsmApiWriter::_changesetThreadFunc, this, i));
}
- _threadIdle.reserve(_maxWriters);
// Setup the progress indicators
long total = _changeset.getTotalElementCount();
float progress = 0.0f;
@@ -162,6 +166,13 @@ bool OsmApiWriter::apply()
_workQueueMutex.lock();
int queueSize = (int)_workQueue.size();
_workQueueMutex.unlock();
+ // If all threads have failed, fail the rest of the changeset and exit
+ if (_allThreadsFailed())
+ {
+ _changeset.failRemainingChangeset();
+ _threadsCanExit = true;
+ break;
+ }
// Only queue up enough work to keep all the threads busy with times QUEUE_SIZE_MULTIPLIER
// so that results can come back and update the changeset for more atomic changesets instead
// of a big list of nodes, then ways, then relations. This will give us fuller more atomic
@@ -177,9 +188,8 @@ bool OsmApiWriter::apply()
// Add the new work to the queue if there is any
if (newChangeset)
{
- _workQueueMutex.lock();
- _workQueue.push(changeset_info);
- _workQueueMutex.unlock();
+ // Push the new changeset onto the work queue
+ _pushChangesets(changeset_info);
}
else if (queueSize == 0 && !newChangeset && _changeset.hasElementsToSend() && _threadsAreIdle())
{
@@ -187,26 +197,39 @@ bool OsmApiWriter::apply()
// all of the threads are idle and not waiting for something to come back
// There are two things that can be done here, first is to put everything that is
// "ready to send" in a changeset and send it OR move everything to the error state
-
+/*
// Option #1: Get all of the remaining elements as a single changeset
_changesetMutex.lock();
_changeset.calculateRemainingChangeset(changeset_info);
_changesetMutex.unlock();
// Push that changeset
- _workQueueMutex.lock();
- _workQueue.push(changeset_info);
- _workQueueMutex.unlock();
+ _pushChangesets(changeset_info);
+ // Let the threads know that the remaining changeset is the "remaining" changeset
+ _threadsCanExit = true;
+*/
+ LOG_STATUS("Apply Changeset: Remaining elements unsendable...");
+ // Option #2: Move everything to the error state and exit
+ _changesetMutex.lock();
+ _changeset.failRemainingChangeset();
+ _changesetMutex.unlock();
+ // Let the threads know that the remaining changeset has failed
+ _threadsCanExit = true;
+ break;
}
else
{
+ // Indicate to the worker threads that there is work to be done
+ _startWork();
// Allow time for the worker threads to complete some work
- this_thread::sleep_for(chrono::milliseconds(10));
+ this_thread::yield();
}
}
else
{
+ // Indicate to the worker threads that there is work to be done
+ _startWork();
// Allow time for the worker threads to complete some work
- this_thread::sleep_for(chrono::milliseconds(10));
+ this_thread::yield();
}
// Show the progress
if (_showProgress)
@@ -220,10 +243,19 @@ bool OsmApiWriter::apply()
}
}
}
+ // Indicate to the worker threads that there is work to be done, if they haven't started already
+ _startWork();
// Wait for the threads to shutdown
for (int i = 0; i < _maxWriters; ++i)
_threadPool[i].join();
- LOG_INFO("Upload progress: 100%");
+ // Check for failed threads
+ if (_hasFailedThread())
+ {
+ LOG_ERROR("Multiple bad changeset ID errors in a row, is the API functioning correctly?");
+ _changeset.failRemainingChangeset();
+ }
+ // Final write for the error file
+ _changeset.writeErrorFile();
// Keep some stats
_stats.append(SingleStat("API Upload Time (sec)", timer.getElapsedAndRestart()));
_stats.append(SingleStat("Total OSM Changesets Uploaded", _changesetCount));
@@ -241,14 +273,15 @@ bool OsmApiWriter::apply()
void OsmApiWriter::_changesetThreadFunc(int index)
{
// Set the status to working
- _threadStatusMutex.lock();
- _threadStatus[index] = ThreadStatus::Working;
- _threadStatusMutex.unlock();
+ _updateThreadStatus(index, ThreadStatus::Working);
// Setup the network request object with OAuth or with username/password authentication
HootNetworkRequestPtr request = createNetworkRequest(true);
long id = -1;
long changesetSize = 0;
bool stop_thread = false;
+ int changeset_failures = 0;
+ // Before working, wait for the signal
+ _waitForStart();
// Iterate until all elements are sent and updated
while (!_changeset.isDone() && !stop_thread)
{
@@ -266,9 +299,7 @@ void OsmApiWriter::_changesetThreadFunc(int index)
if (workInfo)
{
// Set the status to working
- _threadStatusMutex.lock();
- _threadStatus[index] = ThreadStatus::Working;
- _threadStatusMutex.unlock();
+ _updateThreadStatus(index, ThreadStatus::Working);
// Create the changeset ID if required
if (id < 1)
{
@@ -278,16 +309,30 @@ void OsmApiWriter::_changesetThreadFunc(int index)
// An ID of less than 1 isn't valid, try to fix it
if (id < 1)
{
- _workQueueMutex.lock();
- _workQueue.push(workInfo);
- _workQueueMutex.unlock();
- // Reset the network request object and sleep it off
- request = createNetworkRequest(true);
- LOG_WARN("Bad changeset ID. Resetting network request object.");
- this_thread::sleep_for(chrono::milliseconds(100));
+ _pushChangesets(workInfo);
+ // Multiple changeset failures
+ changeset_failures++;
+ if (changeset_failures >= 3)
+ {
+ // Set the thread status to failed and report the error message in the main thread
+ _updateThreadStatus(index, ThreadStatus::Failed);
+ stop_thread = true;
+ }
+ else
+ {
+ // Reset the network request object and sleep it off
+ request = createNetworkRequest(true);
+ LOG_DEBUG("Bad changeset ID. Resetting network request object.");
+ this_thread::yield();
+ }
// Try a new create changeset request
continue;
}
+ else
+ {
+ // Reset the changeset failure count when one is successful
+ changeset_failures = 0;
+ }
// Make sure that the changeset is valid and isn't empty
if (workInfo->size() < 1)
{
@@ -339,7 +384,10 @@ void OsmApiWriter::_changesetThreadFunc(int index)
{
// Fail the entire changeset
_changeset.updateFailedChangeset(workInfo, true);
+ // Let the threads know that the remaining changeset is the "remaining" changeset
+ _threadsCanExit = true;
// Looping should end the thread because all of the remaining elements have now been set to the failed state
+ stop_thread = true;
continue;
}
// Split the changeset on conflict errors
@@ -352,29 +400,21 @@ void OsmApiWriter::_changesetThreadFunc(int index)
if ((int)workInfo->size() > _maxChangesetSize / 2)
{
// Split the changeset into half so that it is smaller and won't fail
- ChangesetInfoPtr split = _changeset.splitChangeset(workInfo);
- _workQueueMutex.lock();
- _workQueue.push(workInfo);
- _workQueue.push(split);
- _workQueueMutex.unlock();
+ _splitChangeset(workInfo);
}
else
{
// The changeset was closed already so set the ID to -1 and reprocess
id = -1;
// Push the changeset back on the queue
- _workQueueMutex.lock();
- _workQueue.push(workInfo);
- _workQueueMutex.unlock();
+ _pushChangesets(workInfo);
}
// Loop back around to work on the next changeset
continue;
}
else if (_fixConflict(request, workInfo, info->response))
{
- _workQueueMutex.lock();
- _workQueue.push(workInfo);
- _workQueueMutex.unlock();
+ _pushChangesets(workInfo);
// Loop back around to work on the next changeset
continue;
}
@@ -394,9 +434,7 @@ void OsmApiWriter::_changesetThreadFunc(int index)
// Try to automatically resolve certain issues, like out of date version
if (_resolveIssues(request, workInfo))
{
- _workQueueMutex.lock();
- _workQueue.push(workInfo);
- _workQueueMutex.unlock();
+ _pushChangesets(workInfo);
}
else
{
@@ -413,11 +451,9 @@ void OsmApiWriter::_changesetThreadFunc(int index)
{
// Splitting failed which means that the changeset only has one element in it,
// push it back on the queue and give the API a break
- _workQueueMutex.lock();
- _workQueue.push(workInfo);
- _workQueueMutex.unlock();
+ _pushChangesets(workInfo);
// Sleep the thread
- this_thread::sleep_for(chrono::milliseconds(10));
+ this_thread::sleep_for(chrono::milliseconds(100));
}
break;
default:
@@ -425,14 +461,11 @@ void OsmApiWriter::_changesetThreadFunc(int index)
LOG_ERROR("Changeset upload responded with HTTP status response: " << request->getHttpStatus());
// Fall through
case HttpResponseCode::HTTP_METHOD_NOT_ALLOWED:
+ case HttpResponseCode::HTTP_UNAUTHORIZED:
// This shouldn't ever happen, push back on the queue, only process a certain amount of times
workInfo->retry();
if (workInfo->canRetry())
- {
- _workQueueMutex.lock();
- _workQueue.push(workInfo);
- _workQueueMutex.unlock();
- }
+ _pushChangesets(workInfo);
else
_changeset.updateFailedChangeset(workInfo, true);
break;
@@ -441,7 +474,12 @@ void OsmApiWriter::_changesetThreadFunc(int index)
}
else
{
- if (!_changeset.hasElementsToSend() && !_changeset.isDone() && queueSize == 0)
+ if (_threadsCanExit)
+ {
+ stop_thread = true;
+ _updateThreadStatus(index, ThreadStatus::Completed);
+ }
+ else if (!_changeset.isDone() && queueSize == 0)
{
// This is a bad state where the producer thread says all elements are sent and
// waits for all threads to join but the changeset isn't "done".
@@ -459,40 +497,23 @@ void OsmApiWriter::_changesetThreadFunc(int index)
id = -1;
}
_threadStatusMutex.unlock();
- if (_threadsAreIdle())
- {
- // In this case there are elements that have been sent and not reported back
- // BUT there are no threads that are waiting for them either. Every thread
- // except the "first" worker thread will exit here. The first worker thread
- // Tries to calculate the remaining changeset and push in on the queue. It then
- // loops around and picks up the remaining changeset to process. Next time around
- // calculateRemainingChangeset() fails and this thread is stopped.
- if (index != 0)
- stop_thread = true;
- else if (_changeset.calculateRemainingChangeset(workInfo))
- {
- _workQueueMutex.lock();
- _workQueue.push(workInfo);
- _workQueueMutex.unlock();
- }
- else
- stop_thread = true;
- }
}
else
{
// Set the status to idle
- _threadStatusMutex.lock();
- _threadStatus[index] = ThreadStatus::Idle;
- _threadStatusMutex.unlock();
- // Sleep the thread
- this_thread::sleep_for(chrono::milliseconds(10));
+ _updateThreadStatus(index, ThreadStatus::Idle);
+ // Yield the thread
+ this_thread::yield();
}
}
}
// Close the changeset if one is still open
if (id != -1)
_closeChangeset(request, id);
+ // Update the thread to complete if it didn't fail
+ ThreadStatus status = _getThreadStatus(index);
+ if (status != ThreadStatus::Failed && status != ThreadStatus::Unknown)
+ _updateThreadStatus(index, ThreadStatus::Completed);
}
void OsmApiWriter::setConfiguration(const Settings& conf)
@@ -675,7 +696,9 @@ long OsmApiWriter::_createChangeset(HootNetworkRequestPtr request,
QString responseXml = QString::fromUtf8(request->getResponseContent().data());
- return responseXml.toLong();
+ // Only return the parsed response from HTTP 200 OK
+ if (request->getHttpStatus() == HttpResponseCode::HTTP_OK)
+ return responseXml.toLong();
}
catch (const HootException& ex)
{
@@ -956,12 +979,19 @@ bool OsmApiWriter::_splitChangeset(const ChangesetInfoPtr& workInfo, const QStri
_changesetMutex.unlock();
if (split->size() > 0)
{
- // Push both of the changesets onto the queue
- _workQueueMutex.lock();
- _workQueue.push(split);
- _workQueue.push(workInfo);
- _workQueueMutex.unlock();
- return true;
+ // Fail the split changeset if the error flag is set
+ if (split->getError())
+ {
+ _changeset.failChangeset(split);
+ _pushChangesets(workInfo);
+ return true;
+ }
+ else
+ {
+ // Push both of the changesets onto the queue
+ _pushChangesets(workInfo, split);
+ return true;
+ }
}
// Nothing was split out, return false
return false;
@@ -987,4 +1017,74 @@ int OsmApiWriter::_getNextApiId()
return ++_apiId;
}
+void OsmApiWriter::_pushChangesets(ChangesetInfoPtr changeset, ChangesetInfoPtr changeset2)
+{
+ std::lock_guard<std::mutex> lock(_workQueueMutex);
+ if (changeset)
+ _workQueue.push(changeset);
+ if (changeset2)
+ _workQueue.push(changeset2);
+}
+
+void OsmApiWriter::_startWork()
+{
+ std::lock_guard<std::mutex> lock(_startMutex);
+ _startFlag = true;
+ _start.notify_all();
+}
+
+void OsmApiWriter::_waitForStart()
+{
+ std::unique_lock<std::mutex> lock(_startMutex);
+ _start.wait(lock, [this]{ return _startFlag; });
+}
+
+OsmApiWriter::ThreadStatus OsmApiWriter::_getThreadStatus(int thread_index)
+{
+ // Validate the index
+ if (thread_index < 0 || thread_index >= (int)_threadStatus.size())
+ return ThreadStatus::Unknown;
+ // Lock the mutex and return the status
+ std::lock_guard<std::mutex> lock(_threadStatusMutex);
+ return _threadStatus[thread_index];
+}
+
+
+void OsmApiWriter::_updateThreadStatus(int thread_index, ThreadStatus status)
+{
+ // Validate the index
+ if (thread_index < 0 || thread_index >= (int)_threadStatus.size())
+ return;
+ // Lock the mutex and update the status
+ std::lock_guard<std::mutex> lock(_threadStatusMutex);
+ _threadStatus[thread_index] = status;
+}
+
+bool OsmApiWriter::_allThreadsFailed()
+{
+ std::lock_guard<std::mutex> lock(_threadStatusMutex);
+ for (size_t i = 0; i < _threadStatus.size(); ++i)
+ {
+ // Short circuit the loop if a thread isn't in the failed state
+ if (_threadStatus[i] != ThreadStatus::Failed)
+ return false;
+ }
+ // All threads are in the failed state
+ return true;
+}
+
+bool OsmApiWriter::_hasFailedThread()
+{
+ std::lock_guard<std::mutex> lock(_threadStatusMutex);
+ for (size_t i = 0; i < _threadStatus.size(); ++i)
+ {
+ // Short circuit the loop if a thread is in the failed state
+ if (_threadStatus[i] == ThreadStatus::Failed)
+ return true;
+ }
+ // All threads are in the non-failed state
+ return false;
+}
+
+
}