diff --git a/CHANGELOG.md b/CHANGELOG.md index ae8fa0b3d..e41d64bf9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/). - Correct FAPI header to `x-fapi-interaction-id` [PR #1557](https://github.com/3scale/APIcast/pull/1557) [THREESCALE-11957](https://issues.redhat.com/browse/THREESCALE-11957) - Only validate oidc setting if authentication method is set to oidc [PR #1568](https://github.com/3scale/APIcast/pull/1568) [THREESCALE-11441](https://issues.redhat.com/browse/THREESCALE-11441) - Reduce memory consumption when returning large response that has been routed through a proxy server. [PR #1572](https://github.com/3scale/APIcast/pull/1572) [THREESCALE-12258](https://issues.redhat.com/browse/THREESCALE-12258) +- Ensure the synchronization key is released correctly. [PR #1591](https://github.com/3scale/APIcast/pull/1590) ### Added - Update APIcast schema manifest [PR #1550](https://github.com/3scale/APIcast/pull/1550) diff --git a/gateway/src/resty/resolver.lua b/gateway/src/resty/resolver.lua index d2214a79d..d51d59632 100644 --- a/gateway/src/resty/resolver.lua +++ b/gateway/src/resty/resolver.lua @@ -398,7 +398,11 @@ function _M.get_servers(self, qname, opts) -- TODO: pass proper options to dns resolver (like SRV query type) - local sema, key = synchronization:acquire(format('qname:%s:qtype:%s', qname, 'A')) + local key = format('qname:%s:qtype:%s', qname, 'A') + local sema, err = synchronization:acquire(key) + if not sema then + return nil, "failed to acquire lock on key: " .. key .. 'error: ' .. err + end local ok = sema:wait(0) local answers, err = self:lookup(qname, not ok) diff --git a/gateway/src/resty/synchronization.lua b/gateway/src/resty/synchronization.lua index 67f13f161..a1a283377 100644 --- a/gateway/src/resty/synchronization.lua +++ b/gateway/src/resty/synchronization.lua @@ -39,7 +39,7 @@ function _M:acquire(key) if not semaphores then return nil, 'not initialized' end - return semaphores[key], key + return semaphores[key], nil end --- release semaphore @@ -60,7 +60,7 @@ end -- @param ...: the variable number of arguments that are going to be send to the callback function. function _M:run(key, timeout, callback, ...) local sema, err = self:acquire(key) - if err ~= key then + if not sema then ngx.log(ngx.WARN, 'failed to acquire lock on key: ', key, ' error: ', err) return false end @@ -75,7 +75,7 @@ function _M:run(key, timeout, callback, ...) local ret, result, execute_error = task:execute(...) if lock_acquired then - self.release(key) + self:release(key) sema:post() end diff --git a/spec/synchronization_spec.lua b/spec/synchronization_spec.lua index 2813c1f20..e5bdda3ee 100644 --- a/spec/synchronization_spec.lua +++ b/spec/synchronization_spec.lua @@ -60,6 +60,37 @@ describe("Synchronization", function() assert.same(err, "not initialized") end) + it("run releases the lock so a subsequent acquire succeeds", function() + local callback = function() return 1 end + + local ret, result = synchronization:run(key, 4, callback) + assert.same(ret, true) + assert.same(result, {1}) + + -- If run properly released the lock, a non-blocking acquire+wait should succeed + local sema = synchronization:acquire(key) + local ok = sema:wait(0) + assert.is_true(ok, "expected lock to be available after run completed") + + synchronization:release(key) + sema:post() + end) + + it("run releases the lock even when callback throws", function() + local callback = function() error("boom") end + + local ret, _, execute_error = synchronization:run(key, 4, callback) + assert.same(ret, false) + assert.is_truthy(execute_error) + + local sema = synchronization:acquire(key) + local ok = sema:wait(0) + assert.is_true(ok, "expected lock to be available after failed run") + + synchronization:release(key) + sema:post() + end) + it("validate that acquire/release workflow", function() local sema, _ = synchronization:acquire(key)