Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add iter->status() check for loop iterators #2395

Merged
merged 5 commits into from
Jul 7, 2024
16 changes: 16 additions & 0 deletions src/cluster/slot_migrate.cc
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,12 @@ Status SlotMigrator::sendSnapshotByCmd() {
}
}

if (auto s = iter->status(); !s.ok()) {
auto err_str = s.ToString();
LOG(ERROR) << "[migrate] Failed to iterate keys of slot " << slot << ": " << err_str;
return {Status::NotOK, fmt::format("failed to iterate keys of slot {}: {}", slot, err_str)};
}

// It's necessary to send commands that are still in the pipeline since the final pipeline may not be sent
// while iterating keys because its size could be less than max_pipeline_size_
auto s = sendCmdsPipelineIfNeed(&restore_cmds, true);
Expand Down Expand Up @@ -820,6 +826,11 @@ Status SlotMigrator::migrateComplexKey(const rocksdb::Slice &key, const Metadata
}
}

if (auto s = iter->status(); !s.ok()) {
return {Status::NotOK,
fmt::format("failed to iterate values of the complex key {}: {}", key.ToString(), s.ToString())};
}

// Have to check the item count of the last command list
if (item_count % kMaxItemsInCommand != 0) {
*restore_cmds += redis::ArrayOfBulkStrings(user_cmd);
Expand Down Expand Up @@ -880,6 +891,11 @@ Status SlotMigrator::migrateStream(const Slice &key, const StreamMetadata &metad
}
}

if (auto s = iter->status(); !s.ok()) {
return {Status::NotOK,
fmt::format("failed to iterate values of the stream key {}: {}", key.ToString(), s.ToString())};
}

// commands like XTRIM and XDEL affect stream's metadata, but we use only XADD for a slot migration
// XSETID is used to adjust stream's info on the destination node according to the current values on the source
*restore_cmds += redis::ArrayOfBulkStrings({"XSETID", key.ToString(), metadata.last_generated_id.ToString(),
Expand Down
4 changes: 4 additions & 0 deletions src/search/index_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@ struct IndexManager {
index_map.Insert(std::move(info));
}

if (auto s = iter->status(); !s.ok()) {
return {Status::NotOK, fmt::format("fail to load index metadata: {}", s.ToString())};
}

return Status::OK();
}

Expand Down
4 changes: 4 additions & 0 deletions src/search/indexer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,10 @@ Status IndexUpdater::Build() const {
if (s.Is<Status::TypeMismatched>()) continue;
if (!s.OK()) return s;
}

if (auto s = iter->status(); !s.ok()) {
return {Status::NotOK, s.ToString()};
}
}

return Status::OK();
Expand Down
11 changes: 10 additions & 1 deletion src/storage/redis_db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,10 @@ rocksdb::Status Database::Keys(const std::string &prefix, std::vector<std::strin
}
}

if (auto s = iter->status(); !s.ok()) {
return s;
}

if (!storage_->IsSlotIdEncoded()) break;
if (prefix.empty()) break;
if (++slot_id >= HASH_SLOTS_SIZE) break;
Expand Down Expand Up @@ -368,6 +372,11 @@ rocksdb::Status Database::Scan(const std::string &cursor, uint64_t limit, const
keys->emplace_back(user_key);
cnt++;
}

if (auto s = iter->status(); !s.ok()) {
return s;
}

if (!storage_->IsSlotIdEncoded() || prefix.empty()) {
if (!keys->empty() && cnt >= limit) {
end_cursor->append(user_key);
Expand Down Expand Up @@ -587,7 +596,7 @@ rocksdb::Status SubKeyScanner::Scan(RedisType type, const Slice &user_key, const
break;
}
}
return rocksdb::Status::OK();
return iter->status();
}

RedisType WriteBatchLogData::GetRedisType() const { return type_; }
Expand Down
8 changes: 8 additions & 0 deletions src/storage/scripting.cc
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,10 @@ Status FunctionList(Server *srv, const redis::Connection *conn, const std::strin
result.emplace_back(lib.ToString(), iter->value().ToString());
}

if (auto s = iter->status(); !s.ok()) {
return {Status::NotOK, s.ToString()};
}

output->append(redis::MultiLen(result.size()));
for (const auto &[lib, code] : result) {
output->append(conn->HeaderOfMap(with_code ? 2 : 1));
Expand Down Expand Up @@ -488,6 +492,10 @@ Status FunctionListFunc(Server *srv, const redis::Connection *conn, const std::s
result.emplace_back(func.ToString(), iter->value().ToString());
}

if (auto s = iter->status(); !s.ok()) {
return {Status::NotOK, s.ToString()};
}

output->append(redis::MultiLen(result.size()));
for (const auto &[func, lib] : result) {
output->append(conn->HeaderOfMap(2));
Expand Down
29 changes: 27 additions & 2 deletions src/types/redis_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,10 @@ rocksdb::Status Stream::AutoClaim(const Slice &stream_name, const std::string &g
}
}

if (auto s = iter->status(); !s.ok()) {
return s;
}

if (has_next_entry) {
std::string tmp_group_name;
StreamEntryID entry_id = groupAndEntryIdFromPelInternalKey(iter->key(), tmp_group_name);
Expand Down Expand Up @@ -763,6 +767,10 @@ rocksdb::Status Stream::DestroyGroup(const Slice &stream_name, const std::string
*delete_cnt += 1;
}

if (auto s = iter->status(); !s.ok()) {
return s;
}

if (*delete_cnt != 0) {
metadata.group_number -= 1;
std::string metadata_bytes;
Expand Down Expand Up @@ -895,6 +903,11 @@ rocksdb::Status Stream::DestroyConsumer(const Slice &stream_name, const std::str
}
}
}

if (auto s = iter->status(); !s.ok()) {
return s;
}

batch->Delete(stream_cf_handle_, consumer_key);
StreamConsumerGroupMetadata group_metadata = decodeStreamConsumerGroupMetadataValue(get_group_value);
group_metadata.consumer_number -= 1;
Expand Down Expand Up @@ -1101,6 +1114,10 @@ rocksdb::Status Stream::Len(const Slice &stream_name, const StreamLenOptions &op
}
}

if (auto s = iter->status(); !s.ok()) {
return s;
}

return rocksdb::Status::OK();
}

Expand Down Expand Up @@ -1178,6 +1195,10 @@ rocksdb::Status Stream::range(const std::string &ns_key, const StreamMetadata &m
}
}

if (auto s = iter->status(); !s.ok()) {
return s;
}

return rocksdb::Status::OK();
}

Expand Down Expand Up @@ -1341,7 +1362,7 @@ rocksdb::Status Stream::GetGroupInfo(const Slice &stream_name,
group_metadata.push_back(tmp_item);
}
}
return rocksdb::Status::OK();
return iter->status();
}

rocksdb::Status Stream::GetConsumerInfo(
Expand Down Expand Up @@ -1375,7 +1396,7 @@ rocksdb::Status Stream::GetConsumerInfo(
consumer_metadata.push_back(tmp_item);
}
}
return rocksdb::Status::OK();
return iter->status();
}

rocksdb::Status Stream::Range(const Slice &stream_name, const StreamRangeOptions &options,
Expand Down Expand Up @@ -1539,6 +1560,10 @@ rocksdb::Status Stream::RangeWithPending(const Slice &stream_name, StreamRangeOp
if (count >= options.count) break;
}
}

if (auto s = iter->status(); !s.ok()) {
return s;
}
}
batch->Put(stream_cf_handle_, group_key, encodeStreamConsumerGroupMetadataValue(consumergroup_metadata));
batch->Put(stream_cf_handle_, consumer_key, encodeStreamConsumerMetadataValue(consumer_metadata));
Expand Down
Loading