parsebyparts.cpp 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. // Example of parsing JSON to document by parts.
  2. // Using C++11 threads
  3. // Temporarily disable for clang (older version) due to incompatibility with libstdc++
  4. #if (__cplusplus >= 201103L || (defined(_MSC_VER) && _MSC_VER >= 1700)) && !defined(__clang__)
  5. #include "rapidjson/document.h"
  6. #include "rapidjson/error/en.h"
  7. #include "rapidjson/writer.h"
  8. #include "rapidjson/ostreamwrapper.h"
  9. #include <condition_variable>
  10. #include <iostream>
  11. #include <mutex>
  12. #include <thread>
  13. using namespace rapidjson;
  14. template<unsigned parseFlags = kParseDefaultFlags>
  15. class AsyncDocumentParser {
  16. public:
  17. AsyncDocumentParser(Document& d)
  18. : stream_(*this)
  19. , d_(d)
  20. , parseThread_()
  21. , mutex_()
  22. , notEmpty_()
  23. , finish_()
  24. , completed_()
  25. {
  26. // Create and execute thread after all member variables are initialized.
  27. parseThread_ = std::thread(&AsyncDocumentParser::Parse, this);
  28. }
  29. ~AsyncDocumentParser() {
  30. if (!parseThread_.joinable())
  31. return;
  32. {
  33. std::unique_lock<std::mutex> lock(mutex_);
  34. // Wait until the buffer is read up (or parsing is completed)
  35. while (!stream_.Empty() && !completed_)
  36. finish_.wait(lock);
  37. // Automatically append '\0' as the terminator in the stream.
  38. static const char terminator[] = "";
  39. stream_.src_ = terminator;
  40. stream_.end_ = terminator + 1;
  41. notEmpty_.notify_one(); // unblock the AsyncStringStream
  42. }
  43. parseThread_.join();
  44. }
  45. void ParsePart(const char* buffer, size_t length) {
  46. std::unique_lock<std::mutex> lock(mutex_);
  47. // Wait until the buffer is read up (or parsing is completed)
  48. while (!stream_.Empty() && !completed_)
  49. finish_.wait(lock);
  50. // Stop further parsing if the parsing process is completed.
  51. if (completed_)
  52. return;
  53. // Set the buffer to stream and unblock the AsyncStringStream
  54. stream_.src_ = buffer;
  55. stream_.end_ = buffer + length;
  56. notEmpty_.notify_one();
  57. }
  58. private:
  59. void Parse() {
  60. d_.ParseStream<parseFlags>(stream_);
  61. // The stream may not be fully read, notify finish anyway to unblock ParsePart()
  62. std::unique_lock<std::mutex> lock(mutex_);
  63. completed_ = true; // Parsing process is completed
  64. finish_.notify_one(); // Unblock ParsePart() or destructor if they are waiting.
  65. }
  66. struct AsyncStringStream {
  67. typedef char Ch;
  68. AsyncStringStream(AsyncDocumentParser& parser) : parser_(parser), src_(), end_(), count_() {}
  69. char Peek() const {
  70. std::unique_lock<std::mutex> lock(parser_.mutex_);
  71. // If nothing in stream, block to wait.
  72. while (Empty())
  73. parser_.notEmpty_.wait(lock);
  74. return *src_;
  75. }
  76. char Take() {
  77. std::unique_lock<std::mutex> lock(parser_.mutex_);
  78. // If nothing in stream, block to wait.
  79. while (Empty())
  80. parser_.notEmpty_.wait(lock);
  81. count_++;
  82. char c = *src_++;
  83. // If all stream is read up, notify that the stream is finish.
  84. if (Empty())
  85. parser_.finish_.notify_one();
  86. return c;
  87. }
  88. size_t Tell() const { return count_; }
  89. // Not implemented
  90. char* PutBegin() { return 0; }
  91. void Put(char) {}
  92. void Flush() {}
  93. size_t PutEnd(char*) { return 0; }
  94. bool Empty() const { return src_ == end_; }
  95. AsyncDocumentParser& parser_;
  96. const char* src_; //!< Current read position.
  97. const char* end_; //!< End of buffer
  98. size_t count_; //!< Number of characters taken so far.
  99. };
  100. AsyncStringStream stream_;
  101. Document& d_;
  102. std::thread parseThread_;
  103. std::mutex mutex_;
  104. std::condition_variable notEmpty_;
  105. std::condition_variable finish_;
  106. bool completed_;
  107. };
  108. int main() {
  109. Document d;
  110. {
  111. AsyncDocumentParser<> parser(d);
  112. const char json1[] = " { \"hello\" : \"world\", \"t\" : tr";
  113. //const char json1[] = " { \"hello\" : \"world\", \"t\" : trX"; // Fot test parsing error
  114. const char json2[] = "ue, \"f\" : false, \"n\": null, \"i\":123, \"pi\": 3.14";
  115. const char json3[] = "16, \"a\":[1, 2, 3, 4] } ";
  116. parser.ParsePart(json1, sizeof(json1) - 1);
  117. parser.ParsePart(json2, sizeof(json2) - 1);
  118. parser.ParsePart(json3, sizeof(json3) - 1);
  119. }
  120. if (d.HasParseError()) {
  121. std::cout << "Error at offset " << d.GetErrorOffset() << ": " << GetParseError_En(d.GetParseError()) << std::endl;
  122. return EXIT_FAILURE;
  123. }
  124. // Stringify the JSON to cout
  125. OStreamWrapper os(std::cout);
  126. Writer<OStreamWrapper> writer(os);
  127. d.Accept(writer);
  128. std::cout << std::endl;
  129. return EXIT_SUCCESS;
  130. }
  131. #else // Not supporting C++11
  132. #include <iostream>
  133. int main() {
  134. std::cout << "This example requires C++11 compiler" << std::endl;
  135. }
  136. #endif