diff --git a/hoot-core/src/main/cpp/hoot/core/io/OsmApiWriter.cpp b/hoot-core/src/main/cpp/hoot/core/io/OsmApiWriter.cpp
index 3372fea..7842b74 100644
--- a/hoot-core/src/main/cpp/hoot/core/io/OsmApiWriter.cpp
+++ b/hoot-core/src/main/cpp/hoot/core/io/OsmApiWriter.cpp
@@ -34,7 +34,9 @@
#include <hoot/core/util/ConfigOptions.h>
#include <hoot/core/util/FileUtils.h>
#include <hoot/core/util/HootException.h>
+#include <hoot/core/util/HootNetworkUtils.h>
#include <hoot/core/util/Log.h>
+#include <hoot/core/util/OsmApiUtils.h>
// Tgs
#include <tgs/System/Timer.h>
@@ -48,55 +50,6 @@ using namespace Tgs;
namespace hoot
{
-const char* OsmApiWriter::API_PATH_CAPABILITIES = "/api/capabilities";
-const char* OsmApiWriter::API_PATH_PERMISSIONS = "/api/0.6/permissions";
-const char* OsmApiWriter::API_PATH_CREATE_CHANGESET = "/api/0.6/changeset/create";
-const char* OsmApiWriter::API_PATH_CLOSE_CHANGESET = "/api/0.6/changeset/%1/close";
-const char* OsmApiWriter::API_PATH_UPLOAD_CHANGESET = "/api/0.6/changeset/%1/upload";
-const char* OsmApiWriter::API_PATH_GET_ELEMENT = "/api/0.6/%1/%2";
-
-const char* OsmApiWriter::CONTENT_TYPE_XML = "text/xml; charset=UTF-8";
-
-OsmApiWriter::OsmApiWriter(const QString& output_file, const QList<QString>& changesets)
- : _changesets(changesets),
- _description(ConfigOptions().getChangesetDescription()),
- _source(ConfigOptions().getChangesetSource()),
- _hashtags(ConfigOptions().getChangesetHashtags()),
- _maxWriters(1), // Not used in test apply
- _maxPushSize(ConfigOptions().getChangesetApidbSizeMax()),
- _maxChangesetSize(ConfigOptions().getChangesetMaxSize()),
- _throttleWriters(false), // Not used in test apply
- _throttleTime(0), // Not used in test apply
- _showProgress(false),
- _consumerKey(""), // Not used in test apply
- _consumerSecret(""), // Not used in test apply
- _accessToken(""), // Not used in test apply
- _secretToken(""), // Not used in test apply
- _changesetCount(0),
- _testApplyPathname(output_file)
-{
-}
-
-OsmApiWriter::OsmApiWriter(const QString& output_file, const QString& changeset)
- : _description(ConfigOptions().getChangesetDescription()),
- _source(ConfigOptions().getChangesetSource()),
- _hashtags(ConfigOptions().getChangesetHashtags()),
- _maxWriters(1), // Not used in test apply
- _maxPushSize(ConfigOptions().getChangesetApidbSizeMax()),
- _maxChangesetSize(ConfigOptions().getChangesetMaxSize()),
- _throttleWriters(false), // Not used in test apply
- _throttleTime(0), // Not used in test apply
- _showProgress(false),
- _consumerKey(""), // Not used in test apply
- _consumerSecret(""), // Not used in test apply
- _accessToken(""), // Not used in test apply
- _secretToken(""), // Not used in test apply
- _changesetCount(0),
- _testApplyPathname(output_file)
-{
- _changesets.push_back(changeset);
-}
-
OsmApiWriter::OsmApiWriter(const QUrl &url, const QString &changeset)
: _description(ConfigOptions().getChangesetDescription()),
_source(ConfigOptions().getChangesetSource()),
@@ -112,7 +65,9 @@ OsmApiWriter::OsmApiWriter(const QUrl &url, const QString &changeset)
_accessToken(ConfigOptions().getHootOsmAuthAccessToken()),
_secretToken(ConfigOptions().getHootOsmAuthAccessTokenSecret()),
_changesetCount(0),
- _testApplyPathname("")
+ _debugOutput(ConfigOptions().getChangesetApidbWriterDebugOutput()),
+ _debugOutputPath(ConfigOptions().getChangesetApidbWriterDebugOutputPath()),
+ _apiId(0)
{
_changesets.push_back(changeset);
if (isSupported(url))
@@ -135,7 +90,9 @@ OsmApiWriter::OsmApiWriter(const QUrl& url, const QList<QString>& changesets)
_accessToken(ConfigOptions().getHootOsmAuthAccessToken()),
_secretToken(ConfigOptions().getHootOsmAuthAccessTokenSecret()),
_changesetCount(0),
- _testApplyPathname("")
+ _debugOutput(ConfigOptions().getChangesetApidbWriterDebugOutput()),
+ _debugOutputPath(ConfigOptions().getChangesetApidbWriterDebugOutputPath()),
+ _apiId(0)
{
if (isSupported(url))
_url = url;
@@ -188,6 +145,7 @@ bool OsmApiWriter::apply()
_threadStatus.push_back(ThreadStatus::Working);
_threadPool.push_back(thread(&OsmApiWriter::_changesetThreadFunc, this, i));
}
+ _threadIdle.reserve(_maxWriters);
// Setup the progress indicators
long total = _changeset.getTotalElementCount();
float progress = 0.0f;
@@ -280,85 +238,6 @@ bool OsmApiWriter::apply()
return success;
}
-QStringList OsmApiWriter::testApply()
-{
- Timer timer;
- QStringList output_paths;
- // Load all of the changesets into memory
- _changeset.setMaxPushSize(_maxPushSize);
- for (int i = 0; i < _changesets.size(); ++i)
- {
- LOG_INFO("Loading changeset: " << _changesets[i]);
- _changeset.loadChangeset(_changesets[i]);
- _stats.append(SingleStat(QString("Changeset (%1) Load Time (sec)").arg(_changesets[i]), timer.getElapsedAndRestart()));
- }
- // Split any ways that need splitting
- _changeset.splitLongWays(_capabilities.getWayNodes());
- // Setup the progress indicators
- long total = _changeset.getTotalElementCount();
- float progress = 0.0f;
- float increment = 0.01f;
- long id = 1;
- long changesetSize = 0;
- int file_id = 1;
- // Setup the increment
- if (total < 10000)
- increment = 0.1f;
- else if (total < 100000)
- increment = 0.05f;
- QFileInfo file(_testApplyPathname);
- // Iterate all changes until there are no more elements to send
- while (_changeset.hasElementsToSend())
- {
- // Divide up the changes into atomic changesets
- ChangesetInfoPtr changeset_info(new ChangesetInfo());
- // Repeat divide until all changes have been committed
- _changeset.calculateChangeset(changeset_info);
- // Write out the changeset to disk
- changesetSize += changeset_info->size();
- // Write out the changeset file
- QString output = QString("%1/%2-%3.%4")
- .arg(file.absolutePath())
- .arg(file.baseName())
- .arg(QString::number(file_id), 4, '0')
- .arg(file.completeSuffix());
- FileUtils::writeFully(output, _changeset.getChangesetString(changeset_info, id));
- _changeset.updateChangeset(changeset_info);
- changesetSize += changeset_info->size();
- // Keep the output pathname
- output_paths.push_back(output);
- // When the current changeset is nearing the 50k max (or the specified max), close the changeset
- // otherwise keep it open and go again
- if (changesetSize > _maxChangesetSize - (int)(_maxPushSize * 1.5))
- id++;
- // Show the progress
- if (_showProgress)
- {
- float percent_complete = _changeset.getProcessedCount() / (float)total;
- // Actual progress is calculated and once it passes the next increment it is reported
- if (percent_complete >= progress + increment)
- {
- progress = percent_complete - fmod(percent_complete, increment);
- _progress.set(percent_complete, "Apply changeset test...");
- }
- }
- file_id++;
- }
- LOG_INFO("Apply test progress: 100%");
- // Keep some stats
- _stats.append(SingleStat("API Upload Time (sec)", timer.getElapsedAndRestart()));
- _stats.append(SingleStat("Total OSM Changesets Uploaded", _changesetCount));
- _stats.append(SingleStat("Total Nodes in Changeset", _changeset.getTotalNodeCount()));
- _stats.append(SingleStat("Total Ways in Changeset", _changeset.getTotalWayCount()));
- _stats.append(SingleStat("Total Relations in Changeset", _changeset.getTotalRelationCount()));
- _stats.append(SingleStat("Total Elements Created", _changeset.getTotalCreateCount()));
- _stats.append(SingleStat("Total Elements Modified", _changeset.getTotalModifyCount()));
- _stats.append(SingleStat("Total Elements Deleted", _changeset.getTotalDeleteCount()));
- _stats.append(SingleStat("Total Errors", _changeset.getFailedCount()));
- // Return the output paths
- return output_paths;
-}
-
void OsmApiWriter::_changesetThreadFunc(int index)
{
// Set the status to working
@@ -409,17 +288,32 @@ void OsmApiWriter::_changesetThreadFunc(int index)
// Try a new create changeset request
continue;
}
+ // Make sure that the changeset is valid and isn't empty
+ if (workInfo->size() < 1)
+ {
+ LOG_DEBUG("Empty changeset created.");
+ // Jump back to the beginning and release the empty workInfo object
+ continue;
+ }
+ QString changeset = _changeset.getChangesetString(workInfo, id);
// Display the changeset in TRACE mode
- LOG_TRACE("Thread: " << std::this_thread::get_id() << "\n" << _changeset.getChangesetString(workInfo, id));
+ LOG_TRACE("Thread: " << std::this_thread::get_id() << "\n" << changeset);
+ // Output the debug request if requested
+ int apiId = _getNextApiId();
+ if (_debugOutput)
+ _writeDebugFile("Request-", changeset, apiId, id);
// Upload the changeset
- OsmApiFailureInfoPtr info = _uploadChangeset(request, id, _changeset.getChangesetString(workInfo, id));
+ OsmApiFailureInfoPtr info = _uploadChangeset(request, id, changeset);
+ // Output the debug response if requested
+ if (_debugOutput)
+ _writeDebugFile("Response", info->response, apiId, id, info->status);
if (info->success)
{
// Display the upload response in TRACE mode
- LOG_TRACE("Thread: " << std::this_thread::get_id() << "\n" << QString(request->getResponseContent()));
+ LOG_TRACE("Thread: " << std::this_thread::get_id() << "\n" << info->response);
// Update the changeset with the response
_changesetMutex.lock();
- _changeset.updateChangeset(QString(request->getResponseContent()));
+ _changeset.updateChangeset(info->response);
_changesetMutex.unlock();
// Update the size of the current changeset that is open
changesetSize += workInfo->size();
@@ -451,16 +345,28 @@ void OsmApiWriter::_changesetThreadFunc(int index)
// Split the changeset on conflict errors
switch (info->status)
{
- case 409: // Conflict, check for version conflicts and fix, or split and continue
+ case HttpResponseCode::HTTP_CONFLICT: // Conflict, check for version conflicts and fix, or split and continue
{
if (_changesetClosed(info->response))
{
- // The changeset was closed already so set the ID to -1 and reprocess
- id = -1;
- // Push the changeset back on the queue
- _workQueueMutex.lock();
- _workQueue.push(workInfo);
- _workQueueMutex.unlock();
+ if ((int)workInfo->size() > _maxChangesetSize / 2)
+ {
+ // Split the changeset into half so that it is smaller and won't fail
+ ChangesetInfoPtr split = _changeset.splitChangeset(workInfo);
+ _workQueueMutex.lock();
+ _workQueue.push(workInfo);
+ _workQueue.push(split);
+ _workQueueMutex.unlock();
+ }
+ else
+ {
+ // The changeset was closed already so set the ID to -1 and reprocess
+ id = -1;
+ // Push the changeset back on the queue
+ _workQueueMutex.lock();
+ _workQueue.push(workInfo);
+ _workQueueMutex.unlock();
+ }
// Loop back around to work on the next changeset
continue;
}
@@ -476,9 +382,9 @@ void OsmApiWriter::_changesetThreadFunc(int index)
// This includes when the changeset is too big, i.e.:
// The changeset <id> was closed at <dtg> UTC
}
- case 400: // Placeholder ID is missing or not unique
- case 404: // Diff contains elements where the given ID could not be found
- case 412: // Precondition Failed, Relation with id cannot be saved due to other member
+ case HttpResponseCode::HTTP_BAD_REQUEST: // Placeholder ID is missing or not unique
+ case HttpResponseCode::HTTP_NOT_FOUND: // Diff contains elements where the given ID could not be found
+ case HttpResponseCode::HTTP_PRECONDITION_FAILED: // Precondition Failed, Relation with id cannot be saved due to other member
if (!_splitChangeset(workInfo, info->response))
{
if (!workInfo->getAttemptedResolveChangesetIssues())
@@ -500,9 +406,9 @@ void OsmApiWriter::_changesetThreadFunc(int index)
}
}
break;
- case 500: // Internal Server Error, could be caused by the database being saturated
- case 502: // Bad Gateway, there are issues with the gateway, split and retry
- case 504: // Gateway Timeout, server is taking too long, split and retry
+ case HttpResponseCode::HTTP_INTERNAL_SERVER_ERROR: // Internal Server Error, could be caused by the database being saturated
+ case HttpResponseCode::HTTP_BAD_GATEWAY: // Bad Gateway, there are issues with the gateway, split and retry
+ case HttpResponseCode::HTTP_GATEWAY_TIMEOUT: // Gateway Timeout, server is taking too long, split and retry
if (!_splitChangeset(workInfo, info->response))
{
// Splitting failed which means that the changeset only has one element in it,
@@ -517,7 +423,8 @@ void OsmApiWriter::_changesetThreadFunc(int index)
default:
// This is a big problem, report it and try again
LOG_ERROR("Changeset upload responded with HTTP status response: " << request->getHttpStatus());
- case 405:
+ // Fall through
+ case HttpResponseCode::HTTP_METHOD_NOT_ALLOWED:
// This shouldn't ever happen, push back on the queue, only process a certain amount of times
workInfo->retry();
if (workInfo->canRetry())
@@ -534,11 +441,43 @@ void OsmApiWriter::_changesetThreadFunc(int index)
}
else
{
- if (!_changeset.hasElementsToSend() && queueSize == 0 && _threadsAreIdle())
+ if (!_changeset.hasElementsToSend() && !_changeset.isDone() && queueSize == 0)
{
- // In this case there are elements that have been sent and not reported back
- // BUT there are no threads that are waiting for them either
- stop_thread = true;
+ // This is a bad state where the producer thread says all elements are sent and
+ // waits for all threads to join but the changeset isn't "done".
+ // Set the status to idle and start idle timer
+ _threadStatusMutex.lock();
+ if (_threadStatus[index] != ThreadStatus::Idle)
+ {
+ _threadStatus[index] = ThreadStatus::Idle;
+ _threadIdle[index].reset();
+ }
+ else if (id > 0 && _threadIdle[index].getElapsed() > 10 * 1000)
+ {
+ // Close the current changeset so all data is "committed"
+ _closeChangeset(request, id);
+ id = -1;
+ }
+ _threadStatusMutex.unlock();
+ if (_threadsAreIdle())
+ {
+ // In this case there are elements that have been sent and not reported back
+ // BUT there are no threads that are waiting for them either. Every thread
+ // except the "first" worker thread will exit here. The first worker thread
+ // Tries to calculate the remaining changeset and push in on the queue. It then
+ // loops around and picks up the remaining changeset to process. Next time around
+ // calculateRemainingChangeset() fails and this thread is stopped.
+ if (index != 0)
+ stop_thread = true;
+ else if (_changeset.calculateRemainingChangeset(workInfo))
+ {
+ _workQueueMutex.lock();
+ _workQueue.push(workInfo);
+ _workQueueMutex.unlock();
+ }
+ else
+ stop_thread = true;
+ }
}
else
{
@@ -571,6 +510,8 @@ void OsmApiWriter::setConfiguration(const Settings& conf)
_consumerSecret = options.getHootOsmAuthConsumerSecret();
_accessToken = options.getHootOsmAuthAccessToken();
_secretToken = options.getHootOsmAuthAccessTokenSecret();
+ _debugOutput = options.getChangesetApidbWriterDebugOutput();
+ _debugOutputPath = options.getChangesetApidbWriterDebugOutputPath();
}
bool OsmApiWriter::isSupported(const QUrl &url)
@@ -595,7 +536,7 @@ bool OsmApiWriter::queryCapabilities(HootNetworkRequestPtr request)
try
{
QUrl capabilities = _url;
- capabilities.setPath(API_PATH_CAPABILITIES);
+ capabilities.setPath(OsmApiEndpoints::API_PATH_CAPABILITIES);
request->networkRequest(capabilities);
QString responseXml = QString::fromUtf8(request->getResponseContent().data());
QString printableUrl = capabilities.toString(QUrl::RemoveUserInfo);
@@ -619,7 +560,7 @@ bool OsmApiWriter::validatePermissions(HootNetworkRequestPtr request)
try
{
QUrl permissions = _url;
- permissions.setPath(API_PATH_PERMISSIONS);
+ permissions.setPath(OsmApiEndpoints::API_PATH_PERMISSIONS);
request->networkRequest(permissions);
QString responseXml = QString::fromUtf8(request->getResponseContent().data());
success = _parsePermissions(responseXml);
@@ -713,7 +654,7 @@ long OsmApiWriter::_createChangeset(HootNetworkRequestPtr request,
try
{
QUrl changeset = _url;
- changeset.setPath(API_PATH_CREATE_CHANGESET);
+ changeset.setPath(OsmApiEndpoints::API_PATH_CREATE_CHANGESET);
QString xml = QString(
"<osm>"
" <changeset>"
@@ -727,7 +668,7 @@ long OsmApiWriter::_createChangeset(HootNetworkRequestPtr request,
QByteArray content = xml.toUtf8();
QMap<QNetworkRequest::KnownHeaders, QVariant> headers;
- headers[QNetworkRequest::ContentTypeHeader] = CONTENT_TYPE_XML;
+ headers[QNetworkRequest::ContentTypeHeader] = HootNetworkUtils::CONTENT_TYPE_XML;
headers[QNetworkRequest::ContentLengthHeader] = content.length();
request->networkRequest(changeset, QNetworkAccessManager::Operation::PutOperation, content);
@@ -749,18 +690,18 @@ void OsmApiWriter::_closeChangeset(HootNetworkRequestPtr request, long id)
try
{
QUrl changeset = _url;
- changeset.setPath(QString(API_PATH_CLOSE_CHANGESET).arg(id));
+ changeset.setPath(QString(OsmApiEndpoints::API_PATH_CLOSE_CHANGESET).arg(id));
request->networkRequest(changeset, QNetworkAccessManager::Operation::PutOperation);
QString responseXml = QString::fromUtf8(request->getResponseContent().data());
switch (request->getHttpStatus())
{
- case 404:
+ case HttpResponseCode::HTTP_NOT_FOUND:
LOG_WARN("Unknown changeset");
break;
- case 409:
+ case HttpResponseCode::HTTP_CONFLICT:
LOG_WARN("Changeset conflict: " << responseXml);
break;
- case 200:
+ case HttpResponseCode::HTTP_OK:
// Changeset closed successfully
_changesetCountMutex.lock();
_changesetCount++;
@@ -808,11 +749,11 @@ OsmApiWriter::OsmApiFailureInfoPtr OsmApiWriter::_uploadChangeset(HootNetworkReq
try
{
QUrl change = _url;
- change.setPath(QString(API_PATH_UPLOAD_CHANGESET).arg(id));
+ change.setPath(QString(OsmApiEndpoints::API_PATH_UPLOAD_CHANGESET).arg(id));
QByteArray content = changeset.toUtf8();
QMap<QNetworkRequest::KnownHeaders, QVariant> headers;
- headers[QNetworkRequest::ContentTypeHeader] = CONTENT_TYPE_XML;
+ headers[QNetworkRequest::ContentTypeHeader] = HootNetworkUtils::CONTENT_TYPE_XML;
headers[QNetworkRequest::ContentLengthHeader] = content.length();
request->networkRequest(change, headers, QNetworkAccessManager::Operation::PostOperation, content);
@@ -822,19 +763,19 @@ OsmApiWriter::OsmApiFailureInfoPtr OsmApiWriter::_uploadChangeset(HootNetworkReq
switch (info->status)
{
- case 200:
+ case HttpResponseCode::HTTP_OK:
info->success = true;
break;
- case 400:
+ case HttpResponseCode::HTTP_BAD_REQUEST:
LOG_WARN("Changeset Upload Error: Error parsing XML changeset - " << info->response);
break;
- case 404:
+ case HttpResponseCode::HTTP_NOT_FOUND:
LOG_WARN("Unknown changeset or elements don't exist");
break;
- case 409:
+ case HttpResponseCode::HTTP_CONFLICT:
LOG_WARN("Changeset conflict: " << info->response);
break;
- case 412:
+ case HttpResponseCode::HTTP_PRECONDITION_FAILED:
LOG_WARN("Changeset precondition failed: " << info->response);
break;
default:
@@ -921,7 +862,7 @@ QString OsmApiWriter::_getNode(HootNetworkRequestPtr request, long id)
if (id < 1)
return "";
// Get the node by ID
- return _getElement(request, QString(API_PATH_GET_ELEMENT).arg("node").arg(id));
+ return _getElement(request, QString(OsmApiEndpoints::API_PATH_GET_ELEMENT).arg("node").arg(id));
}
QString OsmApiWriter::_getWay(HootNetworkRequestPtr request, long id)
@@ -930,7 +871,7 @@ QString OsmApiWriter::_getWay(HootNetworkRequestPtr request, long id)
if (id < 1)
return "";
// Get the way by ID
- return _getElement(request, QString(API_PATH_GET_ELEMENT).arg("way").arg(id));
+ return _getElement(request, QString(OsmApiEndpoints::API_PATH_GET_ELEMENT).arg("way").arg(id));
}
QString OsmApiWriter::_getRelation(HootNetworkRequestPtr request, long id)
@@ -939,20 +880,20 @@ QString OsmApiWriter::_getRelation(HootNetworkRequestPtr request, long id)
if (id < 1)
return "";
// Get the relation by ID
- return _getElement(request, QString(API_PATH_GET_ELEMENT).arg("relation").arg(id));
+ return _getElement(request, QString(OsmApiEndpoints::API_PATH_GET_ELEMENT).arg("relation").arg(id));
}
QString OsmApiWriter::_getElement(HootNetworkRequestPtr request, const QString& endpoint)
{
// Don't follow an uninitialized URL or empty endpoint
- if (endpoint == API_PATH_GET_ELEMENT || endpoint == "")
+ if (endpoint == OsmApiEndpoints::API_PATH_GET_ELEMENT || endpoint == "")
return "";
try
{
QUrl get = _url;
get.setPath(endpoint);
request->networkRequest(get);
- if (request->getHttpStatus() == 200)
+ if (request->getHttpStatus() == HttpResponseCode::HTTP_OK)
return QString::fromUtf8(request->getResponseContent().data());
else
LOG_WARN("GET error: " << QString::fromUtf8(request->getResponseContent().data()));
@@ -1026,5 +967,24 @@ bool OsmApiWriter::_splitChangeset(const ChangesetInfoPtr& workInfo, const QStri
return false;
}
+void OsmApiWriter::_writeDebugFile(const QString& type, const QString& data, int file_id, long changeset_id, int status)
+{
+ // Setup the path including the changeset and file IDs, type and HTTP status
+ QString path = QString("%1/OsmApiWriter-%2-%3-%4-%5.osc")
+ .arg(_debugOutputPath)
+ .arg(QString::number(file_id), 6, '0')
+ .arg(QString::number(changeset_id), 5, '0')
+ .arg(type)
+ .arg(QString::number(status), 3, '0');
+ // Write the data to the file
+ FileUtils::writeFully(path, data);
+}
+
+int OsmApiWriter::_getNextApiId()
+{
+ // Lock the mutex and increment the API ID counter
+ std::lock_guard<std::mutex> lock(_apiIdMutex);
+ return ++_apiId;
+}
}