Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Internal subscription on "$JS.API.CONSUMER.CREATE.mystream" took too long: #5534

Open
nuttert opened this issue Jun 13, 2024 · 7 comments
Open

Comments

@nuttert
Copy link

nuttert commented Jun 13, 2024

Hey, I got the error in my golang app:

2024-06-13T21:41:57.808742Z  Can't publish snapshot part 0 for snapshot.mystream, err=nats: timeout

At the same time I see in the nats-server logs:

[1437208] 2024/06/13 21:41:51.952237 [WRN] Internal subscription on "$JS.API.CONSUMER.CREATE.addresses.bHxuj0CQ.mystream" took too long: 3.050367636s
[1437208] 2024/06/13 21:41:53.645097 [WRN] 10.150.97.133:57038 - cid:127571 - Readloop processing time: 4.845843252s

Configuration:

nats-server: v2.10.14

I have thousands of subscriptions at the same time and I don't care if the processing time is a few seconds, in this context it's normal, I just want a guarantee that messages will be delivered even after 30 seconds without errors.

I tried different settings, even something ubnormal like this:

ConsumerLimits: nats.StreamConsumerLimits{
	InactiveThreshold: time.Hour * 24,
	MaxAckPending:     100000000, // Max messages in-flight before consumer is considered slow
},

or added this to the nats config:

write_deadline: 200s
max_pending: 5000MB

But it did't help.

Here is how I create the stream:

	natsCfg := &nats.StreamConfig{
		Name:     name,
		Subjects: config.Subjects,
		Storage:  nats.MemoryStorage, 
		ConsumerLimits: nats.StreamConsumerLimits{
			InactiveThreshold: time.Hour * 24,
			MaxAckPending:     100000000, // Max messages in-flight before consumer is considered slow
		},
	}
	_, err := c.jetStream.AddStream(natsCfg)

(By the way, FileStorage absolutely fails on the setup and crashes after a few seconds under a production load with context deadline exceeded)

And here is how I consume the stream:

jetSub, err = c.jetStream.Subscribe(channel, func(msg *nats.Msg) {
	c.HandleMessage(ctx, msg, sub)
}, opt...)

What I excpect

I expect that there is a parameter for the deadline, so that after for example 10-40 seconds the queue will be processed and all clients will receive their messages.

@kozlovic kozlovic transferred this issue from nats-io/nats-streaming-server Jun 13, 2024
@wallyqs
Copy link
Member

wallyqs commented Jun 13, 2024

what is the stream info of the stream?

@nuttert
Copy link
Author

nuttert commented Jun 14, 2024

@wallyqs 👋

Information for Stream addresses created 2024-06-14 06:41:44

              Subjects: addresses, snapshot.addresses, meta.addresses
              Replicas: 1
               Storage: Memory

Options:

             Retention: Limits
       Acknowledgments: true
        Discard Policy: Old
      Duplicate Window: 1m0s
     Allows Msg Delete: true
          Allows Purge: true
        Allows Rollups: false

Limits:

      Maximum Messages: unlimited
   Maximum Per Subject: unlimited
         Maximum Bytes: unlimited
           Maximum Age: 1m0s
  Maximum Message Size: unlimited
     Maximum Consumers: unlimited

State:

              Messages: 12
                 Bytes: 46 KiB
        First Sequence: 25,887 @ 2024-06-14 09:19:13 UTC
         Last Sequence: 25,898 @ 2024-06-14 09:20:03 UTC
      Active Consumers: 12,859
    Number of Subjects: 2
@neilalexander
Copy link
Member

Can you please provide some information about the hardware you are running on?

Are you creating/deleting consumers at a high rate?

@nuttert
Copy link
Author

nuttert commented Jun 14, 2024

By the way, I do not know why it shows Active Consumers: 12,859, I stopped the consumers and I see it in the connection info:

Stat
curl http://localhost:8222/connz
{
  "server_id": "",
  "now": "2024-06-14T09:23:58.415476377Z",
  "num_connections": 11,
  "total": 11,
  "offset": 0,
  "limit": 1024,
  "connections": [
    {
      "cid": 15,
      "kind": "Client",
      "type": "nats",
      "start": "2024-06-14T06:37:30.858537226Z",
      "last_activity": "2024-06-14T09:23:58.012855323Z",
      "rtt": "34.743631ms",
      "uptime": "2h46m27s",
      "idle": "0s",
      "pending_bytes": 0,
      "in_msgs": 9564,
      "out_msgs": 9404,
      "in_bytes": 1284174,
      "out_bytes": 281204,
      "subscriptions": 31,
      "lang": "go",
      "version": "1.31.0"
    },
    {
      "cid": 51654,
      "kind": "Client",
      "type": "nats",
      "start": "2024-06-14T08:10:42.677638Z",
      "last_activity": "2024-06-14T09:23:58.257782137Z",
      "rtt": "104µs",
      "uptime": "1h13m15s",
      "idle": "0s",
      "pending_bytes": 0,
      "in_msgs": 38372,
      "out_msgs": 38372,
      "in_bytes": 797843,
      "out_bytes": 12701112,
      "subscriptions": 13,
      "lang": "go",
      "version": "1.31.0"
    },
    {
      "cid": 51665,
      "kind": "Client",
      "type": "nats",
      "start": "2024-06-14T08:10:42.868960442Z",
      "last_activity": "2024-06-14T09:23:58.335321427Z",
      "rtt": "129µs",
      "uptime": "1h13m15s",
      "idle": "0s",
      "pending_bytes": 0,
      "in_msgs": 217851,
      "out_msgs": 217877,
      "in_bytes": 7043586,
      "out_bytes": 99753737,
      "subscriptions": 17,
      "lang": "go",
      "version": "1.31.0"
    },
    {
      "cid": 51692,
      "kind": "Client",
      "type": "nats",
      "start": "2024-06-14T08:10:43.019711244Z",
      "last_activity": "2024-06-14T09:23:57.408833285Z",
      "rtt": "67.704936ms",
      "uptime": "1h13m15s",
      "idle": "1s",
      "pending_bytes": 0,
      "in_msgs": 26772,
      "out_msgs": 26772,
      "in_bytes": 648731,
      "out_bytes": 8500641,
      "subscriptions": 11,
      "lang": "go",
      "version": "1.31.0"
    },
    {
      "cid": 51823,
      "kind": "Client",
      "type": "nats",
      "start": "2024-06-14T08:10:44.43207149Z",
      "last_activity": "2024-06-14T09:23:58.238796291Z",
      "rtt": "172µs",
      "uptime": "1h13m13s",
      "idle": "0s",
      "pending_bytes": 0,
      "in_msgs": 4416,
      "out_msgs": 4416,
      "in_bytes": 685037,
      "out_bytes": 143576,
      "subscriptions": 7,
      "lang": "go",
      "version": "1.31.0"
    },
    {
      "cid": 51840,
      "kind": "Client",
      "type": "nats",
      "start": "2024-06-14T08:10:45.38545206Z",
      "last_activity": "2024-06-14T09:23:57.578974071Z",
      "rtt": "89µs",
      "uptime": "1h13m13s",
      "idle": "0s",
      "pending_bytes": 0,
      "in_msgs": 49505,
      "out_msgs": 49505,
      "in_bytes": 791000,
      "out_bytes": 13002297,
      "subscriptions": 17,
      "lang": "go",
      "version": "1.31.0"
    },
    {
      "cid": 51883,
      "kind": "Client",
      "type": "nats",
      "start": "2024-06-14T08:10:46.387022365Z",
      "last_activity": "2024-06-14T09:23:57.577615768Z",
      "rtt": "178µs",
      "uptime": "1h13m12s",
      "idle": "0s",
      "pending_bytes": 0,
      "in_msgs": 1386066,
      "out_msgs": 1386073,
      "in_bytes": 12108424,
      "out_bytes": 421515781,
      "subscriptions": 207,
      "lang": "go",
      "version": "1.31.0"
    },
    {
      "cid": 51894,
      "kind": "Client",
      "type": "nats",
      "start": "2024-06-14T08:10:46.62979798Z",
      "last_activity": "2024-06-14T09:23:58.055923948Z",
      "rtt": "51.709789ms",
      "uptime": "1h13m11s",
      "idle": "0s",
      "pending_bytes": 0,
      "in_msgs": 357653,
      "out_msgs": 357726,
      "in_bytes": 5349335,
      "out_bytes": 119162746,
      "subscriptions": 27,
      "lang": "go",
      "version": "1.31.0"
    },
    {
      "cid": 51937,
      "kind": "Client",
      "type": "nats",
      "start": "2024-06-14T08:10:48.203870986Z",
      "last_activity": "2024-06-14T09:23:57.677567078Z",
      "rtt": "143µs",
      "uptime": "1h13m10s",
      "idle": "0s",
      "pending_bytes": 0,
      "in_msgs": 11986913,
      "out_msgs": 13248943,
      "in_bytes": 1292928436,
      "out_bytes": 11349122177,
      "subscriptions": 45,
      "lang": "go",
      "version": "1.31.0"
    },
    {
      "cid": 51952,
      "kind": "Client",
      "type": "nats",
      "start": "2024-06-14T08:10:50.390251299Z",
      "last_activity": "2024-06-14T09:23:58.335435258Z",
      "rtt": "84µs",
      "uptime": "1h13m8s",
      "idle": "0s",
      "pending_bytes": 0,
      "in_msgs": 111348,
      "out_msgs": 111167,
      "in_bytes": 24700337784,
      "out_bytes": 1386505298,
      "subscriptions": 9,
      "lang": "go",
      "version": "1.31.0"
    },
    {
      "cid": 80219,
      "kind": "Client",
      "type": "nats",
      "start": "2024-06-14T08:29:35.731346021Z",
      "last_activity": "2024-06-14T09:23:51.307329375Z",
      "rtt": "65µs",
      "uptime": "54m22s",
      "idle": "7s",
      "pending_bytes": 0,
      "in_msgs": 8215,
      "out_msgs": 8215,
      "in_bytes": 137200,
      "out_bytes": 5154807841,
      "subscriptions": 71,
      "lang": "go",
      "version": "1.31.0"
    }
  ]
}

Short info about configuration for the nats server:

Mem: 64 GB
8 cores

Are you creating/deleting consumers at a high rate?

Yes, when I run my system, there are thousands of consumers trying to connect to the server at the same time(the server holds 1 connection to nats, and creates subscriptions for the clients)

@neilalexander
Copy link
Member

How are you creating the consumers?

Do your clients re-bind to existing named consumers to pick up where they left off, or are they creating new consumers each time they connect? (If so, you might want to set an inactive threshold to clean up old consumers when they go idle.)

@nuttert
Copy link
Author

nuttert commented Jun 14, 2024

natsCfg := &nats.StreamConfig{
		Name:     name,
		Subjects: config.Subjects,
		Storage:  nats.MemoryStorage, 
		ConsumerLimits: nats.StreamConsumerLimits{
			InactiveThreshold: time.Hour * 24,
			MaxAckPending:     100000000, // Max messages in-flight before consumer is considered slow
		},
	}
	_, err := c.jetStream.AddStream(natsCfg)

I tried to use InactiveThreshold to fix the initial problem with timeout error and context deadline exceeded when I used file storage option. Probably it does not help and I can remove it.

However I explicitly close all subscriptions:

sub.WaitForCancellationAsync(func() {
	log.Info().Msgf("Unsubscribed %s", channel)
	jetSub.Unsubscribe()
})
@nuttert
Copy link
Author

nuttert commented Jun 14, 2024

Here is how I create a consumer:

	jetSub, err = c.jetStream.Subscribe(channel, func(msg *nats.Msg) {
			go c.HandleMessage(ctx, msg, sub)
		}, opt...)

(I already wrote it above in the description)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
3 participants