Modify session handling (#568)

* Change thread unsafe interfaces to safe.

* Fix some compiler warnings
This commit is contained in:
An Tao 2020-09-15 08:28:04 +08:00 committed by GitHub
parent 6f7a062221
commit 4c9463eeb7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 189 additions and 86 deletions

View File

@ -1,6 +1,6 @@
/** /**
* *
* CacheMap.h * @file CacheMap.h
* An Tao * An Tao
* *
* Copyright 2018, An Tao. All rights reserved. * Copyright 2018, An Tao. All rights reserved.
@ -99,7 +99,7 @@ class CacheMap
} }
if (tickInterval_ > 0 && wheelsNumber_ > 0 && bucketsNumPerWheel_ > 0) if (tickInterval_ > 0 && wheelsNumber_ > 0 && bucketsNumPerWheel_ > 0)
{ {
timerId_ = loop_->runEvery(tickInterval_, [=]() { timerId_ = loop_->runEvery(tickInterval_, [this]() {
size_t t = ++ticksCounter_; size_t t = ++ticksCounter_;
size_t pow = 1; size_t pow = 1;
for (size_t i = 0; i < wheelsNumber_; ++i) for (size_t i = 0; i < wheelsNumber_; ++i)
@ -127,23 +127,46 @@ class CacheMap
}; };
~CacheMap() ~CacheMap()
{ {
map_.clear();
for (auto iter = wheels_.rbegin(); iter != wheels_.rend(); ++iter)
{ {
std::lock_guard<std::mutex> guard(mtx_); iter->clear();
map_.clear();
}
{
std::lock_guard<std::mutex> lock(bucketMutex_);
for (auto iter = wheels_.rbegin(); iter != wheels_.rend(); ++iter)
{
iter->clear();
}
} }
LOG_TRACE << "CacheMap destruct!"; LOG_TRACE << "CacheMap destruct!";
} }
struct MapValue struct MapValue
{ {
size_t timeout = 0; MapValue(const T2 &value,
T2 value; size_t timeout,
std::function<void()> &&callback)
: value_(value),
timeout_(timeout),
timeoutCallback_(std::move(callback))
{
}
MapValue(T2 &&value, size_t timeout, std::function<void()> &&callback)
: value_(std::move(value)),
timeout_(timeout),
timeoutCallback_(std::move(callback))
{
}
MapValue(T2 &&value, size_t timeout)
: value_(std::move(value)), timeout_(timeout)
{
}
MapValue(const T2 &value, size_t timeout)
: value_(value), timeout_(timeout)
{
}
MapValue(T2 &&value) : value_(std::move(value))
{
}
MapValue(const T2 &value) : value_(value)
{
}
MapValue() = default;
T2 value_;
size_t timeout_{0};
std::function<void()> timeoutCallback_; std::function<void()> timeoutCallback_;
WeakCallbackEntryPtr weakEntryPtr_; WeakCallbackEntryPtr weakEntryPtr_;
}; };
@ -165,23 +188,16 @@ class CacheMap
{ {
if (timeout > 0) if (timeout > 0)
{ {
MapValue v; MapValue v{std::move(value), timeout, std::move(timeoutCallback)};
v.value = std::move(value);
v.timeout = timeout;
v.timeoutCallback_ = std::move(timeoutCallback);
std::lock_guard<std::mutex> lock(mtx_); std::lock_guard<std::mutex> lock(mtx_);
map_[key] = std::move(v); map_.insert(std::make_pair(key, std::move(v)));
eraseAfter(timeout, key); eraseAfter(timeout, key);
} }
else else
{ {
MapValue v; MapValue v{std::move(value)};
v.value = std::move(value);
v.timeout = timeout;
v.timeoutCallback_ = std::function<void()>();
v.weakEntryPtr_ = WeakCallbackEntryPtr();
std::lock_guard<std::mutex> lock(mtx_); std::lock_guard<std::mutex> lock(mtx_);
map_[key] = std::move(v); map_.insert(std::make_pair(key, std::move(v)));
} }
} }
/** /**
@ -201,43 +217,79 @@ class CacheMap
{ {
if (timeout > 0) if (timeout > 0)
{ {
MapValue v; MapValue v{value, timeout, std::move(timeoutCallback)};
v.value = value;
v.timeout = timeout;
v.timeoutCallback_ = std::move(timeoutCallback);
std::lock_guard<std::mutex> lock(mtx_); std::lock_guard<std::mutex> lock(mtx_);
map_[key] = std::move(v); map_.insert(std::make_pair(key, std::move(v)));
eraseAfter(timeout, key); eraseAfter(timeout, key);
} }
else else
{ {
MapValue v; MapValue v{value};
v.value = value;
v.timeout = timeout;
v.timeoutCallback_ = std::function<void()>();
v.weakEntryPtr_ = WeakCallbackEntryPtr();
std::lock_guard<std::mutex> lock(mtx_); std::lock_guard<std::mutex> lock(mtx_);
map_[key] = std::move(v); map_.insert(std::make_pair(key, std::move(v)));
} }
} }
/// Return the reference to the value of the keyword. /**
T2 &operator[](const T1 &key) * @brief Return the value of the keyword.
*
* @param key
* @return T2
* @note This function returns a copy of the data in the cache. If the data
* is not found, a default T2 type value is returned and nothing is inserted
* into the cache.
*/
T2 operator[](const T1 &key)
{ {
int timeout = 0; int timeout = 0;
std::lock_guard<std::mutex> lock(mtx_); std::lock_guard<std::mutex> lock(mtx_);
auto iter = map_.find(key); auto iter = map_.find(key);
if (iter != map_.end()) if (iter != map_.end())
{ {
timeout = iter->second.timeout; timeout = iter->second.timeout_;
if (timeout > 0) if (timeout > 0)
eraseAfter(timeout, key); eraseAfter(timeout, key);
return iter->second.value; return iter->second.value_;
} }
return map_[key].value; return T2();
} }
/**
* @brief Modify or visit the data identified by the key parameter.
*
* @tparam Callable the type of the handler.
* @param key
* @param handler A callable that can modify or visit the data. The
* signature of the handler should be equivalent to 'void(T2&)' or
* 'void(const T2&)'
* @param timeout In seconds.
*
* @note This function is multiple-thread safe. if the data identified by
* the key doesn't exist, a new one is created and passed to the handler and
* stored in the cache with the timeout parameter. The changing of the data
* is protected by the mutex of the cache.
*/
template <typename Callable>
void modify(const T1 &key, Callable &&handler, size_t timeout = 0)
{
std::lock_guard<std::mutex> lock(mtx_);
auto iter = map_.find(key);
if (iter != map_.end())
{
timeout = iter->second.timeout_;
handler(iter->second.value_);
if (timeout > 0)
eraseAfter(timeout, key);
return;
}
MapValue v{T2(), timeout};
handler(v.value_);
map_.insert(std::make_pair(key, std::move(v)));
if (timeout > 0)
{
eraseAfter(timeout, key);
}
}
/// Check if the value of the keyword exists /// Check if the value of the keyword exists
bool find(const T1 &key) bool find(const T1 &key)
{ {
@ -248,7 +300,7 @@ class CacheMap
auto iter = map_.find(key); auto iter = map_.find(key);
if (iter != map_.end()) if (iter != map_.end())
{ {
timeout = iter->second.timeout; timeout = iter->second.timeout_;
flag = true; flag = true;
} }
@ -271,9 +323,9 @@ class CacheMap
auto iter = map_.find(key); auto iter = map_.find(key);
if (iter != map_.end()) if (iter != map_.end())
{ {
timeout = iter->second.timeout; timeout = iter->second.timeout_;
flag = true; flag = true;
value = iter->second.value; value = iter->second.value_;
} }
if (timeout > 0) if (timeout > 0)
@ -357,15 +409,16 @@ class CacheMap
} }
if (i < (wheelsNumber_ - 1)) if (i < (wheelsNumber_ - 1))
{ {
entryPtr = std::make_shared<CallbackEntry>([=]() { entryPtr = std::make_shared<CallbackEntry>(
if (delay > 0) [this, delay, i, t, entryPtr]() {
{ if (delay > 0)
std::lock_guard<std::mutex> lock(bucketMutex_); {
wheels_[i][(delay + (t % bucketsNumPerWheel_) - 1) % std::lock_guard<std::mutex> lock(bucketMutex_);
bucketsNumPerWheel_] wheels_[i][(delay + (t % bucketsNumPerWheel_) - 1) %
.insert(entryPtr); bucketsNumPerWheel_]
} .insert(entryPtr);
}); }
});
} }
else else
{ {
@ -397,14 +450,14 @@ class CacheMap
} }
else else
{ {
std::function<void()> cb = [=]() { std::function<void()> cb = [this, key]() {
std::lock_guard<std::mutex> lock(mtx_); std::lock_guard<std::mutex> lock(mtx_);
if (map_.find(key) != map_.end()) if (map_.find(key) != map_.end())
{ {
auto &value = map_[key]; auto &value = map_[key];
auto entryPtr = value.weakEntryPtr_.lock(); auto entryPtr = value.weakEntryPtr_.lock();
// entryPtr is used to avoid race conditions // entryPtr is used to avoid race conditions
if (value.timeout > 0 && !entryPtr) if (value.timeout_ > 0 && !entryPtr)
{ {
if (value.timeoutCallback_) if (value.timeoutCallback_)
{ {

View File

@ -236,10 +236,10 @@ class HttpRequest
} }
/// Get the session to which the request belongs. /// Get the session to which the request belongs.
virtual SessionPtr session() const = 0; virtual const SessionPtr &session() const = 0;
/// Get the session to which the request belongs. /// Get the session to which the request belongs.
SessionPtr getSession() const const SessionPtr &getSession() const
{ {
return session(); return session();
} }

View File

@ -1,6 +1,6 @@
/** /**
* *
* Session.h * @file Session.h
* An Tao * An Tao
* *
* Copyright 2018, An Tao. All rights reserved. * Copyright 2018, An Tao. All rights reserved.
@ -30,41 +30,85 @@ namespace drogon
class Session class Session
{ {
public: public:
using SessionMap = std::map<std::string, any>;
/** /**
* @brief Get the data identified by the key parameter. * @brief Get the data identified by the key parameter.
* @note if the data is not found, a default value is returned. * @note if the data is not found, a default value is returned.
* For example: * For example:
* @code * @code
auto &userName = sessionPtr->get<std::string>("user name"); auto userName = sessionPtr->get<std::string>("user name");
@endcode @endcode
*/ */
template <typename T> template <typename T>
const T &get(const std::string &key) const T get(const std::string &key) const
{
{
std::lock_guard<std::mutex> lck(mutex_);
auto it = sessionMap_.find(key);
if (it != sessionMap_.end())
{
if (typeid(T) == it->second.type())
{
return *(any_cast<T>(&(it->second)));
}
else
{
LOG_ERROR << "Bad type";
}
}
}
return T();
}
/**
* @brief Modify or visit the data identified by the key parameter.
*
* @tparam T the type of the data.
* @param key
* @param handler A callable that can modify or visit the data. The
* signature of the handler should be equivalent to 'void(T&)' or
* 'void(const T&)'
*
* @note This function is multiple-thread safe. if the data identified by
* the key doesn't exist, a new one is created and passed to the handler.
* The changing of the data is protected by the mutex of the session.
*/
template <typename T, typename Callable>
void modify(const std::string &key, Callable &&handler)
{ {
const static T nullVal = T();
std::lock_guard<std::mutex> lck(mutex_); std::lock_guard<std::mutex> lck(mutex_);
auto it = sessionMap_.find(key); auto it = sessionMap_.find(key);
if (it != sessionMap_.end()) if (it != sessionMap_.end())
{ {
if (typeid(T) == it->second.type()) if (typeid(T) == it->second.type())
{ {
return *(any_cast<T>(&(it->second))); handler(*(any_cast<T>(&(it->second))));
} }
else else
{ {
LOG_ERROR << "Bad type"; LOG_ERROR << "Bad type";
} }
} }
return nullVal; else
{
auto item = T();
handler(item);
sessionMap_.insert(std::make_pair(key, any(std::move(item))));
}
} }
/** /**
* @brief Get the 'any' object identified by the given key * @brief Modify or visit the session data.
*
* @tparam Callable: The signature of the callable should be equivalent to
* `void (Session::SessionMap &)` or `void (const Session::SessionMap &)`
* @param handler A callable that can modify the sessionMap_ inside the
* session.
* @note This function is multiple-thread safe.
*/ */
any &operator[](const std::string &key) template <typename Callable>
void modify(Callable &&handler)
{ {
std::lock_guard<std::mutex> lck(mutex_); std::lock_guard<std::mutex> lck(mutex_);
return sessionMap_[key]; handler(sessionMap_);
} }
/** /**
@ -77,7 +121,7 @@ class Session
void insert(const std::string &key, const any &obj) void insert(const std::string &key, const any &obj)
{ {
std::lock_guard<std::mutex> lck(mutex_); std::lock_guard<std::mutex> lck(mutex_);
sessionMap_[key] = obj; sessionMap_.insert(std::make_pair(key, obj));
} }
/** /**
@ -90,7 +134,7 @@ class Session
void insert(const std::string &key, any &&obj) void insert(const std::string &key, any &&obj)
{ {
std::lock_guard<std::mutex> lck(mutex_); std::lock_guard<std::mutex> lck(mutex_);
sessionMap_[key] = std::move(obj); sessionMap_.insert(std::make_pair(key, std::move(obj)));
} }
/** /**
@ -146,7 +190,6 @@ class Session
Session() = delete; Session() = delete;
private: private:
using SessionMap = std::map<std::string, any>;
SessionMap sessionMap_; SessionMap sessionMap_;
mutable std::mutex mutex_; mutable std::mutex mutex_;
std::string sessionId_; std::string sessionId_;

View File

@ -684,7 +684,7 @@ void HttpAppFrameworkImpl::callCallback(
{ {
if (useSession_) if (useSession_)
{ {
auto sessionPtr = req->getSession(); auto &sessionPtr = req->getSession();
assert(sessionPtr); assert(sessionPtr);
if (sessionPtr->needToChangeSessionId()) if (sessionPtr->needToChangeSessionId())
{ {

View File

@ -340,7 +340,7 @@ class HttpRequestImpl : public HttpRequest
void appendToBuffer(trantor::MsgBuffer *output) const; void appendToBuffer(trantor::MsgBuffer *output) const;
virtual SessionPtr session() const override virtual const SessionPtr &session() const override
{ {
return sessionPtr_; return sessionPtr_;
} }

View File

@ -1,6 +1,6 @@
/** /**
* *
* SessionManager.cc * @file SessionManager.cc
* An Tao * An Tao
* *
* Copyright 2018, An Tao. All rights reserved. * Copyright 2018, An Tao. All rights reserved.
@ -55,14 +55,21 @@ SessionPtr SessionManager::getSession(const std::string &sessionID,
{ {
assert(!sessionID.empty()); assert(!sessionID.empty());
SessionPtr sessionPtr; SessionPtr sessionPtr;
std::lock_guard<std::mutex> lock(mapMutex_); sessionMapPtr_->modify(
if (sessionMapPtr_->findAndFetch(sessionID, sessionPtr) == false) sessionID,
{ [&sessionPtr, &sessionID, needToSet](SessionPtr &sessionInCache) {
sessionPtr = if (sessionInCache)
std::shared_ptr<Session>(new Session(sessionID, needToSet)); {
sessionMapPtr_->insert(sessionID, sessionPtr, timeout_); sessionPtr = sessionInCache;
return sessionPtr; }
} else
{
sessionPtr =
std::shared_ptr<Session>(new Session(sessionID, needToSet));
sessionInCache = sessionPtr;
}
},
timeout_);
return sessionPtr; return sessionPtr;
} }

View File

@ -1,6 +1,6 @@
/** /**
* *
* SessionManager.h * @file SessionManager.h
* An Tao * An Tao
* *
* Copyright 2018, An Tao. All rights reserved. * Copyright 2018, An Tao. All rights reserved.
@ -37,7 +37,6 @@ class SessionManager : public trantor::NonCopyable
private: private:
std::unique_ptr<CacheMap<std::string, SessionPtr>> sessionMapPtr_; std::unique_ptr<CacheMap<std::string, SessionPtr>> sessionMapPtr_;
std::mutex mapMutex_;
trantor::EventLoop *loop_; trantor::EventLoop *loop_;
size_t timeout_; size_t timeout_;
}; };

View File

@ -30,12 +30,13 @@ int main()
cachePtr->insert("2", "2", 10, []() { LOG_DEBUG << "2 timeout"; }); cachePtr->insert("2", "2", 10, []() { LOG_DEBUG << "2 timeout"; });
}); });
trantor::EventLoop mainLoop; trantor::EventLoop mainLoop;
mainLoop.runAt(now.after(3).roundSecond().after(0.0013), [&]() { mainLoop.runAt(now.after(4).roundSecond().after(0.1013), [&]() {
(*main_cachePtr)["new"] = "new"; main_cachePtr->insert("new", "new");
if (main_cachePtr->find("1")) if (main_cachePtr->find("1"))
{ {
LOG_DEBUG << "find item 1:" << (*main_cachePtr)["1"]; LOG_DEBUG << "find item 1:" << (*main_cachePtr)["1"];
(*main_cachePtr)["1"] = "22"; //(*main_cachePtr)["1"] = "22";
main_cachePtr->modify("1", [](std::string &item) { item = "22"; });
LOG_DEBUG << (*main_cachePtr)["1"]; LOG_DEBUG << (*main_cachePtr)["1"];
} }
else else