123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176 |
- // Example of parsing JSON to document by parts.
- // Using C++11 threads
- // Temporarily disable for clang (older version) due to incompatibility with libstdc++
- #if (__cplusplus >= 201103L || (defined(_MSC_VER) && _MSC_VER >= 1700)) && !defined(__clang__)
- #include "rapidjson/document.h"
- #include "rapidjson/error/en.h"
- #include "rapidjson/writer.h"
- #include "rapidjson/ostreamwrapper.h"
- #include <condition_variable>
- #include <iostream>
- #include <mutex>
- #include <thread>
- using namespace rapidjson;
- template<unsigned parseFlags = kParseDefaultFlags>
- class AsyncDocumentParser {
- public:
- AsyncDocumentParser(Document& d)
- : stream_(*this)
- , d_(d)
- , parseThread_()
- , mutex_()
- , notEmpty_()
- , finish_()
- , completed_()
- {
- // Create and execute thread after all member variables are initialized.
- parseThread_ = std::thread(&AsyncDocumentParser::Parse, this);
- }
- ~AsyncDocumentParser() {
- if (!parseThread_.joinable())
- return;
- {
- std::unique_lock<std::mutex> lock(mutex_);
- // Wait until the buffer is read up (or parsing is completed)
- while (!stream_.Empty() && !completed_)
- finish_.wait(lock);
- // Automatically append '\0' as the terminator in the stream.
- static const char terminator[] = "";
- stream_.src_ = terminator;
- stream_.end_ = terminator + 1;
- notEmpty_.notify_one(); // unblock the AsyncStringStream
- }
- parseThread_.join();
- }
- void ParsePart(const char* buffer, size_t length) {
- std::unique_lock<std::mutex> lock(mutex_);
-
- // Wait until the buffer is read up (or parsing is completed)
- while (!stream_.Empty() && !completed_)
- finish_.wait(lock);
- // Stop further parsing if the parsing process is completed.
- if (completed_)
- return;
- // Set the buffer to stream and unblock the AsyncStringStream
- stream_.src_ = buffer;
- stream_.end_ = buffer + length;
- notEmpty_.notify_one();
- }
- private:
- void Parse() {
- d_.ParseStream<parseFlags>(stream_);
- // The stream may not be fully read, notify finish anyway to unblock ParsePart()
- std::unique_lock<std::mutex> lock(mutex_);
- completed_ = true; // Parsing process is completed
- finish_.notify_one(); // Unblock ParsePart() or destructor if they are waiting.
- }
- struct AsyncStringStream {
- typedef char Ch;
- AsyncStringStream(AsyncDocumentParser& parser) : parser_(parser), src_(), end_(), count_() {}
- char Peek() const {
- std::unique_lock<std::mutex> lock(parser_.mutex_);
- // If nothing in stream, block to wait.
- while (Empty())
- parser_.notEmpty_.wait(lock);
- return *src_;
- }
- char Take() {
- std::unique_lock<std::mutex> lock(parser_.mutex_);
- // If nothing in stream, block to wait.
- while (Empty())
- parser_.notEmpty_.wait(lock);
- count_++;
- char c = *src_++;
- // If all stream is read up, notify that the stream is finish.
- if (Empty())
- parser_.finish_.notify_one();
- return c;
- }
- size_t Tell() const { return count_; }
- // Not implemented
- char* PutBegin() { return 0; }
- void Put(char) {}
- void Flush() {}
- size_t PutEnd(char*) { return 0; }
- bool Empty() const { return src_ == end_; }
- AsyncDocumentParser& parser_;
- const char* src_; //!< Current read position.
- const char* end_; //!< End of buffer
- size_t count_; //!< Number of characters taken so far.
- };
- AsyncStringStream stream_;
- Document& d_;
- std::thread parseThread_;
- std::mutex mutex_;
- std::condition_variable notEmpty_;
- std::condition_variable finish_;
- bool completed_;
- };
- int main() {
- Document d;
- {
- AsyncDocumentParser<> parser(d);
- const char json1[] = " { \"hello\" : \"world\", \"t\" : tr";
- //const char json1[] = " { \"hello\" : \"world\", \"t\" : trX"; // Fot test parsing error
- const char json2[] = "ue, \"f\" : false, \"n\": null, \"i\":123, \"pi\": 3.14";
- const char json3[] = "16, \"a\":[1, 2, 3, 4] } ";
- parser.ParsePart(json1, sizeof(json1) - 1);
- parser.ParsePart(json2, sizeof(json2) - 1);
- parser.ParsePart(json3, sizeof(json3) - 1);
- }
- if (d.HasParseError()) {
- std::cout << "Error at offset " << d.GetErrorOffset() << ": " << GetParseError_En(d.GetParseError()) << std::endl;
- return EXIT_FAILURE;
- }
-
- // Stringify the JSON to cout
- OStreamWrapper os(std::cout);
- Writer<OStreamWrapper> writer(os);
- d.Accept(writer);
- std::cout << std::endl;
- return EXIT_SUCCESS;
- }
- #else // Not supporting C++11
- #include <iostream>
- int main() {
- std::cout << "This example requires C++11 compiler" << std::endl;
- }
- #endif
|