-
Notifications
You must be signed in to change notification settings - Fork 4.5k
xds: generic lrs client for load reporting #8250
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
xds: generic lrs client for load reporting #8250
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #8250 +/- ##
==========================================
+ Coverage 82.15% 82.29% +0.14%
==========================================
Files 412 419 +7
Lines 40562 41787 +1225
==========================================
+ Hits 33322 34390 +1068
- Misses 5875 5949 +74
- Partials 1365 1448 +83
🚀 New features to boost your workflow:
|
78ab34a
to
ce9ba3d
Compare
e140792
to
2e2674f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks fine overall. Just a few comments inline.
func (ls *LoadStore) ReporterForCluster(clusterName, serviceName string) PerClusterReporter { | ||
panic("unimplemented") | ||
func (ls *LoadStore) ReporterForCluster(clusterName, serviceName string) *PerClusterReporter { | ||
if ls == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's fine to panic if a nil
LoadStore
is used. Why not? It seems like a pretty severe programming error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed nil check. @easwars any reason why this check is there in existing code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably for tests to use a nil
load store. If that is not required anymore and tests are happy, we should be good to remove the nil
check.
} | ||
|
||
// CallStarted records a call started in the LoadStore. | ||
func (p *PerClusterReporter) CallStarted(locality string) { | ||
panic("unimplemented") | ||
if p == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As above. And below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
} | ||
|
||
func (rcd *rpcCountData) decrInProgress() { | ||
atomic.AddUint64(rcd.inProgress, negativeOneUInt64) // atomic.Add(x, -1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: the const doesn't seem to buy us anything, since we're already needing to comment what this means. IMO delete the constant and inline it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made it inline
s = rld.sum | ||
rld.sum = 0 | ||
c = rld.count | ||
rld.count = 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could do something like this, which might(?) be more quickly understood:
s = rld.sum | |
rld.sum = 0 | |
c = rld.count | |
rld.count = 0 | |
s, rld.sum = rld.sum, 0 | |
c, rld.count = rld.count, 0 |
Or,
s = rld.sum | |
rld.sum = 0 | |
c = rld.count | |
rld.count = 0 | |
s, c = rld.sum, rld.count | |
rld.sum, rld.count = 0, 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did the first one
c = rld.count | ||
rld.count = 0 | ||
rld.mu.Unlock() | ||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please don't use bare return
s that return values. It can be hard to understand what's going on, mainly for longer functions.
return | |
return s, c |
Or pair with the second option above:
func (rld *rpcLoadData) loadAndClear() (float64, int64) {
rld.mu.Lock()
defer rld.mu.Unlock()
s, c := rld.sum, rld.count
rld.sum, rld.count = 0, 0
return s, c
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah added them to return
return c, err | ||
} | ||
|
||
/* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this commented out?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have mentioned in the PR description. It is because the tests had compilation errors because some things were added in 8183 PR. Now its merged so i have rebased on latest and uncommented it.
I mostly skimmed the changes - @easwars may also want to take a quick pass. The commits here aren't quite as easy to review as the last change, since they go file-by-file. It would have been easier if one commit copied all the files, so that we could just skip that one commit when reviewing. |
0a8d352
to
aaa667d
Compare
// Note that new entries are added to this map, but never removed. This is | ||
// potentially a memory leak. But the memory is allocated for each new | ||
// (cluster,service) pair, and the memory allocated is just pointers and | ||
// maps. So this shouldn't get too bad. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please add this to the list of things that are being tracked? Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, i had already added. Initially, I was thinking to first change in internal code but i think its fine to fix here after migration.
clusters map[string]map[string]*PerClusterReporter | ||
} | ||
|
||
// newStore creates a LoadStore. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comments needs to be updated to match the function name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
// Wait for the provided context to be done (timeout or cancellation). | ||
if ctx != nil { | ||
<-ctx.Done() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How and why would a user use this feature?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea is that if its last reference to underlying stream, stop should wait for context to be done which will allow any pending load to be flushed during the wait. If not, then just go ahead and close the straem.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I updated the logic to wait only if its the last reference just before closing the straem.
func (ls *LoadStore) ReporterForCluster(clusterName, serviceName string) PerClusterReporter { | ||
panic("unimplemented") | ||
func (ls *LoadStore) ReporterForCluster(clusterName, serviceName string) *PerClusterReporter { | ||
if ls == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably for tests to use a nil
load store. If that is not required anymore and tests are happy, we should be good to remove the nil
check.
// If a cluster's loadData is empty (no load to report), it's not appended to | ||
// the returned slice. | ||
func (ls *LoadStore) stats(clusterNames []string) []*loadData { | ||
var ret []*loadData |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Can we please move this variable declaration to be on the line above the if len(clusterNames) == 0 { ... }
. The code just reads better when it starts with a call to lock and a deferred call to unlock and everything else follows after that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
// appendClusterStats gets Data for the given cluster, append to ret, and return | ||
// the new slice. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment needs some updating since there is no single "given cluster", but a collection of clusters.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
wg.Wait() | ||
|
||
gotStoreData := ls.stats() | ||
if diff := cmp.Diff(wantStoreData, gotStoreData, cmpopts.EquateEmpty(), cmp.AllowUnexported(loadData{}), cmpopts.IgnoreFields(loadData{}, "reportInterval")); diff != "" { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This diff command is repeated multiple times with the same (or maybe almost same) set of options. Maybe a helper function can be written to do the comparison and the test can simply call if err := verifyStoreData(got, want); err != nil { ... }
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved to helper
} | ||
var resp v3lrspb.LoadStatsResponse | ||
if err := proto.Unmarshal(r, &resp); err != nil { | ||
lrs.logger.Infof("Failed to unmarshal response to LoadStatsResponse: %v", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a warning log for the marshal error, but an info log for the unmarshal error. Please use a consistent log level, and if you end up choosing info, please add a verbosity check too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made both Infof guarded by verbosity check
// The LRSClient owns a bunch of streams to individual LRS servers. | ||
// | ||
// Once all references to a stream are dropped, the stream is closed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Once all references to a channel are dropped
What channel are we referring to here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the streams which are running for each server. are you suggesting to mention them as channel?
if lrs.cancelStream == nil { | ||
// It is possible that Stop() is called before the cleanup function | ||
// is called, thereby setting cancelStream to nil. Hence we need a | ||
// nil check here bofore invoking the cancel function. | ||
return | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't quite follow this. The only place that sets lrs.cancelStream
to nil
is right below in this same closure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed it. May be it had a reason in internal code because it was within xds client? Here its not possible
func New(config Config) (*LRSClient, error) { | ||
switch { | ||
case config.Node.ID == "": | ||
return nil, errors.New("lrsclient: node ID is empty") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: the error message says "node ID", but this is the complete node configuration. So, please update the error message to reflect that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the case is checking for node ID. We shouldn't have node without ID. Updated to mention that.
@@ -30,6 +33,28 @@ import "context" | |||
// | |||
// It is safe for concurrent use. | |||
type LoadStore struct { | |||
lrsStream *streamImpl |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The only reason we need the streamImpl
here is to call stop
on it. Why can't we simply have a function pointer here instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah i moved the function to LoadStore instead and setting to load store directly from lrs client. Also, renamed to Stop
ctx, cancel := context.WithCancel(context.Background()) | ||
lrs.cancelStream = cancel | ||
lrs.doneCh = make(chan struct{}) | ||
lrs.loadStore = newLoadStore(lrs) | ||
go lrs.runner(ctx) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can all of this be done in newStreamImpl
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
delete(c.lrsStreams, serverIdentifier) | ||
tr.Close() | ||
} | ||
lrs.cleanup = cleanup |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is how I see the API being used:
- user creates a new LRS client with
lrsclient.New()
- when they want to report load to a server, they would call
ReportLoad
on the returned LRS client from above- this would create a new
streamImpl
if required or reuse an existing one - the user is returned a reference to the associated
LoadStore
- this would create a new
- going forward, all user interactions are with the
LoadStore
and thePerClusterReporter
returned from the store - Eventually, they would call
Stop()
on the store to indicate that they no longer wish to report loads- this calls
streamImpl.stop()
, which calls this cleanup function - we need a once func somewhere in this call path to ensure that multiple calls to
LoadStore.Stop()
does not decrement the ref count multiple times
- this calls
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah that's the flow.
we need a once func somewhere in this call path to ensure that multiple calls to LoadStore.Stop() does not decrement the ref count multiple times
once func is not possible because LoadStore is shared? The stop function though has a check if lrs reference is already 0 and stop is called, it returns early but log an error.
@@ -43,38 +68,372 @@ type LoadStore struct { | |||
// attempt to flush any unreported load data to the LRS server. It will either | |||
// wait for this attempt to complete, or for the provided context to be done | |||
// before canceling the LRS stream. | |||
func (ls *LoadStore) Stop(ctx context.Context) error { | |||
panic("unimplemented") | |||
func (ls *LoadStore) Stop(ctx context.Context) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can remove this paragraph about reference counting from this docstring.
Also, where do we make the last attempt to flush any unsent loads?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i modified it to just callout the last reference to stream and if this is the last reference to stream, only then wait for context to be done.
Also, where do we make the last attempt to flush any unsent loads?
I mentioned in the other comment. Basically, we wait for context to complete to allow some time for load to be flushed based on load reporting interval because i think we don't want to make multiple attempts in less than reporting interval? wdyt?
func (*LRSClient) ReportLoad(_ clients.ServerIdentifier) *LoadStore { | ||
panic("unimplemented") | ||
// ReportLoad creates and returns a LoadStore for the caller to report loads | ||
// using a LoadReportingStream. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should mention here that the caller should call Stop
on the returned LoadStore
when they are done reporting load to this server.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
9b88d68
to
a263158
Compare
This is the change to make generic LRS client for load reporting to LRS server.
The PR copies the existing
xds/internal/xdsclient/load/store.go
,xds/internal/xdsclient/transport/lrs/lrs_stream.go
,xds/internal/xdsclient/load/store_test.go
xds/internal/xdsclient/tests/loadreport_test.go
from internal xdsclient code and then modify them to use the generic client types and interfaces. Each "copy" commit is followed by the "modify" commit for that file. Reviewers can start from reviewing the "modify" commit.
PS: Currently loadreport_test.go has compilation error as so its commented out as it is depends on some of the functions added in #8183
RELEASE NOTES: None