forked from zstackio/zstack
-
Notifications
You must be signed in to change notification settings - Fork 0
<fix>[ceph]: cap & parallelize host conn check #4145
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
zstack-robot-2
wants to merge
1
commit into
5.5.22
Choose a base branch
from
sync/jin.ma/fix/ZSTAC-85421
base: 5.5.22
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+197
−1
Open
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
182 changes: 182 additions & 0 deletions
182
.../src/test/groovy/org/zstack/test/integration/storage/ceph/CephHostStorageCheckCase.groovy
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,182 @@ | ||
| package org.zstack.test.integration.storage.ceph | ||
|
|
||
| import org.springframework.http.HttpEntity | ||
| import org.zstack.core.cloudbus.CloudBus | ||
| import org.zstack.core.cloudbus.CloudBusCallBack | ||
| import org.zstack.header.host.HostConstant | ||
| import org.zstack.header.message.MessageReply | ||
| import org.zstack.header.storage.primary.PrimaryStorageConstant | ||
| import org.zstack.kvm.KVMAgentCommands | ||
| import org.zstack.kvm.KVMHostAsyncHttpCallMsg | ||
| import org.zstack.sdk.HostInventory | ||
| import org.zstack.sdk.PrimaryStorageInventory | ||
| import org.zstack.storage.ceph.primary.CephPrimaryStorageBase | ||
| import org.zstack.storage.primary.CheckHostStorageConnectionMsg | ||
| import org.zstack.test.integration.storage.StorageTest | ||
| import org.zstack.testlib.EnvSpec | ||
| import org.zstack.testlib.SubCase | ||
| import org.zstack.utils.data.SizeUnit | ||
| import org.zstack.utils.gson.JSONObjectUtil | ||
|
|
||
| import java.util.concurrent.CountDownLatch | ||
| import java.util.concurrent.TimeUnit | ||
| import java.util.concurrent.atomic.AtomicReference | ||
|
|
||
| class CephHostStorageCheckCase extends SubCase { | ||
| EnvSpec env | ||
| CloudBus bus | ||
|
|
||
| @Override | ||
| void setup() { | ||
| useSpring(StorageTest.springSpec) | ||
| } | ||
|
|
||
| @Override | ||
| void environment() { | ||
| env = makeEnv { | ||
| zone { | ||
| name = "zone" | ||
| cluster { | ||
| name = "test-cluster" | ||
| hypervisorType = "KVM" | ||
|
|
||
| kvm { | ||
| name = "host1" | ||
| managementIp = "127.0.0.1" | ||
| username = "root" | ||
| password = "password" | ||
| usedMem = 1000 | ||
| totalCpu = 10 | ||
| } | ||
|
|
||
| kvm { | ||
| name = "host2" | ||
| managementIp = "127.0.0.2" | ||
| username = "root" | ||
| password = "password" | ||
| usedMem = 1000 | ||
| totalCpu = 10 | ||
| } | ||
|
|
||
| attachPrimaryStorage("ceph-pri") | ||
| } | ||
|
|
||
| cephPrimaryStorage { | ||
| name = "ceph-pri" | ||
| description = "Test" | ||
| totalCapacity = SizeUnit.GIGABYTE.toByte(100) | ||
| availableCapacity = SizeUnit.GIGABYTE.toByte(100) | ||
| url = "ceph://pri" | ||
| fsid = "7ff218d9-f525-435f-8a40-3618d1772a64" | ||
| monUrls = ["root:password@localhost/?monPort=7777"] | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| void test() { | ||
| env.create { | ||
| bus = bean(CloudBus.class) | ||
| testCheckNotSerializedAcrossHosts() | ||
| testPerMessageTimeoutHonored() | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| void clean() { | ||
| env.delete() | ||
| } | ||
|
|
||
| // ZSTAC-85421: a stuck per-host check must not block other hosts' check on the | ||
| // same primary storage. The chain syncLevel is raised from 1 to 10 so the second | ||
| // host's check runs concurrently instead of queueing behind the stuck one. | ||
| void testCheckNotSerializedAcrossHosts() { | ||
| def ps = env.inventoryByName("ceph-pri") as PrimaryStorageInventory | ||
| def host1 = env.inventoryByName("host1") as HostInventory | ||
| def host2 = env.inventoryByName("host2") as HostInventory | ||
|
|
||
| CountDownLatch host1Entered = new CountDownLatch(1) | ||
| CountDownLatch release = new CountDownLatch(1) | ||
|
|
||
| env.simulator(CephPrimaryStorageBase.CHECK_HOST_STORAGE_CONNECTION_PATH) { HttpEntity<String> e -> | ||
| def cmd = JSONObjectUtil.toObject(e.body, CephPrimaryStorageBase.CheckHostStorageConnectionCmd) | ||
| if (cmd.hostUuid == host1.uuid) { | ||
| host1Entered.countDown() | ||
| release.await(60, TimeUnit.SECONDS) | ||
| } | ||
| return new KVMAgentCommands.AgentResponse() | ||
| } | ||
|
|
||
| CountDownLatch reply1Done = new CountDownLatch(1) | ||
| sendCheckMsg(ps.uuid, host1.uuid, { MessageReply r -> reply1Done.countDown() }) | ||
| assert host1Entered.await(10, TimeUnit.SECONDS) | ||
|
|
||
| AtomicReference<MessageReply> reply2 = new AtomicReference<>() | ||
| CountDownLatch reply2Done = new CountDownLatch(1) | ||
| sendCheckMsg(ps.uuid, host2.uuid, { MessageReply r -> reply2.set(r); reply2Done.countDown() }) | ||
|
|
||
| assert reply2Done.await(15, TimeUnit.SECONDS) | ||
| assert reply2.get().isSuccess() | ||
| assert reply1Done.getCount() == 1 | ||
|
|
||
| release.countDown() | ||
| assert reply1Done.await(15, TimeUnit.SECONDS) | ||
| } | ||
|
|
||
| // ZSTAC-85421: a KVMHostAsyncHttpCallMsg carrying an explicit timeout must fail at | ||
| // that timeout instead of riding the default 1800s. The ceph check relies on this to | ||
| // cap its blast radius at 5 minutes. | ||
| void testPerMessageTimeoutHonored() { | ||
| def host1 = env.inventoryByName("host1") as HostInventory | ||
| def stuckPath = "/test/zstac85421/stuck" | ||
| CountDownLatch release = new CountDownLatch(1) | ||
|
|
||
| env.simulator(stuckPath) { HttpEntity<String> e -> | ||
| release.await(60, TimeUnit.SECONDS) | ||
| return new KVMAgentCommands.AgentResponse() | ||
| } | ||
|
|
||
| KVMHostAsyncHttpCallMsg msg = new KVMHostAsyncHttpCallMsg() | ||
| msg.setHostUuid(host1.uuid) | ||
| msg.setPath(stuckPath) | ||
| msg.setCommand(new KVMAgentCommands.AgentCommand()) | ||
| msg.setNoStatusCheck(true) | ||
| msg.setTimeout(TimeUnit.SECONDS.toMillis(3)) | ||
| bus.makeTargetServiceIdByResourceUuid(msg, HostConstant.SERVICE_ID, host1.uuid) | ||
|
|
||
| AtomicReference<MessageReply> reply = new AtomicReference<>() | ||
| CountDownLatch done = new CountDownLatch(1) | ||
| long start = System.currentTimeMillis() | ||
| bus.send(msg, new CloudBusCallBack(null) { | ||
| @Override | ||
| void run(MessageReply r) { | ||
| reply.set(r) | ||
| done.countDown() | ||
| } | ||
| }) | ||
|
|
||
| assert done.await(20, TimeUnit.SECONDS) | ||
| long elapsed = System.currentTimeMillis() - start | ||
| assert !reply.get().isSuccess() | ||
| assert elapsed >= 2000 | ||
| assert elapsed < 20000 | ||
|
Comment on lines
+159
to
+163
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 把失败类型收紧到 timeout。 当前只断言“失败 + 耗时区间”,这会把路由错误、模拟器异常等非超时失败也算成通过,不能直接证明 🤖 Prompt for AI Agents |
||
|
|
||
| release.countDown() | ||
| } | ||
|
|
||
| private void sendCheckMsg(String psUuid, String hostUuid, Closure cb) { | ||
| CheckHostStorageConnectionMsg msg = new CheckHostStorageConnectionMsg() | ||
| msg.setPrimaryStorageUuid(psUuid) | ||
| msg.setHostUuids([hostUuid]) | ||
| bus.makeTargetServiceIdByResourceUuid(msg, PrimaryStorageConstant.SERVICE_ID, psUuid) | ||
| bus.send(msg, new CloudBusCallBack(null) { | ||
| @Override | ||
| void run(MessageReply r) { | ||
| if (cb != null) { | ||
| cb(r) | ||
| } | ||
| } | ||
| }) | ||
| } | ||
| } | ||
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
补上 host1 最终结果断言。
这里现在只证明了
host2没被串行化,但没有验证host1在放行后是否成功完成;如果实现变成“并发了但最终失败”,这个用例仍然会通过。建议像reply2一样保存reply1并断言成功。🤖 Prompt for AI Agents