// package server provides a MQTT 3.1.1 compliant MQTT server. package server import ( "errors" "fmt" "io" "net" "strconv" "sync/atomic" "time" "github.com/mochi-co/mqtt/server/events" "github.com/mochi-co/mqtt/server/internal/circ" "github.com/mochi-co/mqtt/server/internal/clients" "github.com/mochi-co/mqtt/server/internal/packets" "github.com/mochi-co/mqtt/server/internal/topics" "github.com/mochi-co/mqtt/server/internal/utils" "github.com/mochi-co/mqtt/server/listeners" "github.com/mochi-co/mqtt/server/listeners/auth" "github.com/mochi-co/mqtt/server/persistence" "github.com/mochi-co/mqtt/server/system" ) const ( // Version indicates the current server version. Version = "1.1.1" ) var ( // ErrListenerIDExists indicates that a listener with the same id already exists. ErrListenerIDExists = errors.New("listener id already exists") // ErrReadConnectInvalid indicates that the connection packet was invalid. ErrReadConnectInvalid = errors.New("connect packet was not valid") // ErrConnectNotAuthorized indicates that the connection packet had incorrect auth values. ErrConnectNotAuthorized = errors.New("connect packet was not authorized") // ErrInvalidTopic indicates that the specified topic was not valid. ErrInvalidTopic = errors.New("cannot publish to $ and $SYS topics") // ErrRejectPacket indicates that a packet should be dropped instead of processed. ErrRejectPacket = errors.New("packet rejected") // ErrClientDisconnect indicates that a client disconnected from the server. ErrClientDisconnect = errors.New("client disconnected") // ErrClientReconnect indicates that a client attempted to reconnect while still connected. ErrClientReconnect = errors.New("client sent connect while connected") // ErrServerShutdown is propagated when the server shuts down. ErrServerShutdown = errors.New("server is shutting down") // ErrSessionReestablished indicates that an existing client was replaced by a newly connected // client. The existing client is disconnected. ErrSessionReestablished = errors.New("client session re-established") // ErrConnectionFailed indicates that a client connection attempt failed for other reasons. ErrConnectionFailed = errors.New("connection attempt failed") // SysTopicInterval is the number of milliseconds between $SYS topic publishes. SysTopicInterval time.Duration = 30000 // inflightResendBackoff is a slice of seconds, which determines the // interval between inflight resend attempts. inflightResendBackoff = []int64{0, 1, 2, 10, 60, 120, 600, 3600, 21600} // inflightMaxResends is the maximum number of times to try resending QoS promises. inflightMaxResends = 6 ) // Server is an MQTT broker server. It should be created with server.New() // in order to ensure all the internal fields are correctly populated. type Server struct { inline inlineMessages // channels for direct publishing. Events events.Events // overrideable event hooks. Store persistence.Store // a persistent storage backend if desired. Options *Options // configurable server options. Listeners *listeners.Listeners // listeners are network interfaces which listen for new connections. Clients *clients.Clients // clients which are known to the broker. Topics *topics.Index // an index of topic filter subscriptions and retained messages. System *system.Info // values about the server commonly found in $SYS topics. bytepool *circ.BytesPool // a byte pool for incoming and outgoing packets. sysTicker *time.Ticker // the interval ticker for sending updating $SYS topics. done chan bool // indicate that the server is ending. } // Options contains configurable options for the server. type Options struct { // BufferSize overrides the default buffer size (circ.DefaultBufferSize) for the client buffers. BufferSize int // BufferBlockSize overrides the default buffer block size (DefaultBlockSize) for the client buffers. BufferBlockSize int } // inlineMessages contains channels for handling inline (direct) publishing. type inlineMessages struct { done chan bool // indicate that the server is ending. pub chan packets.Packet // a channel of packets to publish to clients } // New returns a new instance of MQTT server with no options. // This method has been deprecated and will be removed in a future release. // Please use NewServer instead. func New() *Server { return NewServer(nil) } // NewServer returns a new instance of an MQTT broker with optional values where applicable. func NewServer(opts *Options) *Server { if opts == nil { opts = new(Options) } s := &Server{ done: make(chan bool), bytepool: circ.NewBytesPool(opts.BufferSize), Clients: clients.New(), Topics: topics.New(), System: &system.Info{ Version: Version, Started: time.Now().Unix(), }, sysTicker: time.NewTicker(SysTopicInterval * time.Millisecond), inline: inlineMessages{ done: make(chan bool), pub: make(chan packets.Packet, 1024), }, Events: events.Events{}, Options: opts, } // Expose server stats using the system listener so it can be used in the // dashboard and other more experimental listeners. s.Listeners = listeners.New(s.System) return s } // AddStore assigns a persistent storage backend to the server. This must be // called before calling server.Server(). func (s *Server) AddStore(p persistence.Store) error { s.Store = p err := s.Store.Open() if err != nil { return err } return nil } // AddListener adds a new network listener to the server. func (s *Server) AddListener(listener listeners.Listener, config *listeners.Config) error { if _, ok := s.Listeners.Get(listener.ID()); ok { return ErrListenerIDExists } if config != nil { listener.SetConfig(config) } s.Listeners.Add(listener) err := listener.Listen(s.System) if err != nil { return err } return nil } // Serve starts the event loops responsible for establishing client connections // on all attached listeners, and publishing the system topics. func (s *Server) Serve() error { if s.Store != nil { err := s.readStore() if err != nil { return err } } go s.eventLoop() // spin up event loop for issuing $SYS values and closing server. go s.inlineClient() // spin up inline client for direct message publishing. s.Listeners.ServeAll(s.EstablishConnection) // start listening on all listeners. s.publishSysTopics() // begin publishing $SYS system values. return nil } // eventLoop loops forever, running various server processes at different intervals. func (s *Server) eventLoop() { for { select { case <-s.done: s.sysTicker.Stop() close(s.inline.done) return case <-s.sysTicker.C: s.publishSysTopics() } } } // inlineClient loops forever, sending directly-published messages // from the Publish method to subscribers. func (s *Server) inlineClient() { for { select { case <-s.inline.done: close(s.inline.pub) return case pk := <-s.inline.pub: s.publishToSubscribers(pk) } } } // readConnectionPacket reads the first incoming header for a connection, and if // acceptable, returns the valid connection packet. func (s *Server) readConnectionPacket(cl *clients.Client) (pk packets.Packet, err error) { fh := new(packets.FixedHeader) err = cl.ReadFixedHeader(fh) if err != nil { return } pk, err = cl.ReadPacket(fh) if err != nil { return } if pk.FixedHeader.Type != packets.Connect { return pk, ErrReadConnectInvalid } return } // onError is a pass-through method which triggers the OnError // event hook (if applicable), and returns the provided error. func (s *Server) onError(cl events.Client, err error) error { if err == nil { return err } // Note: if the error originates from a real cause, it will // have been captured as the StopCause. The two cases ignored // below are ordinary consequences of closing the connection. // If one of these ordinary conditions stops the connection, // then the client closed or broke the connection. if s.Events.OnError != nil && !errors.Is(err, io.EOF) { s.Events.OnError(cl, err) } return err } // onStorage is a pass-through method which delegates errors from // the persistent storage adapter to the onError event hook. func (s *Server) onStorage(cl events.Clientlike, err error) { if err == nil { return } _ = s.onError(cl.Info(), fmt.Errorf("storage: %w", err)) } // EstablishConnection establishes a new client when a listener // accepts a new connection. func (s *Server) EstablishConnection(lid string, c net.Conn, ac auth.Controller) error { xbr := s.bytepool.Get() // Get byte buffer from pools for receiving packet data. xbw := s.bytepool.Get() // and for sending. defer s.bytepool.Put(xbr) defer s.bytepool.Put(xbw) cl := clients.NewClient(c, circ.NewReaderFromSlice(s.Options.BufferBlockSize, xbr), circ.NewWriterFromSlice(s.Options.BufferBlockSize, xbw), s.System, ) cl.Start() defer cl.ClearBuffers() defer cl.Stop(nil) pk, err := s.readConnectionPacket(cl) if err != nil { return s.onError(cl.Info(), fmt.Errorf("read connection: %w", err)) } ackCode, err := pk.ConnectValidate() if err != nil { if err := s.ackConnection(cl, ackCode, false); err != nil { return s.onError(cl.Info(), fmt.Errorf("invalid connection send ack: %w", err)) } return s.onError(cl.Info(), fmt.Errorf("validate connection packet: %w", err)) } cl.Identify(lid, pk, ac) // Set client identity values from the connection packet. if !ac.Authenticate(pk.Username, pk.Password) { if err := s.ackConnection(cl, packets.CodeConnectBadAuthValues, false); err != nil { return s.onError(cl.Info(), fmt.Errorf("invalid connection send ack: %w", err)) } return s.onError(cl.Info(), ErrConnectionFailed) } atomic.AddInt64(&s.System.ConnectionsTotal, 1) atomic.AddInt64(&s.System.ClientsConnected, 1) defer atomic.AddInt64(&s.System.ClientsConnected, -1) defer atomic.AddInt64(&s.System.ClientsDisconnected, 1) sessionPresent := s.inheritClientSession(pk, cl) s.Clients.Add(cl) err = s.ackConnection(cl, ackCode, sessionPresent) if err != nil { return s.onError(cl.Info(), fmt.Errorf("ack connection packet: %w", err)) } err = s.ResendClientInflight(cl, true) if err != nil { s.onError(cl.Info(), fmt.Errorf("resend in flight: %w", err)) // pass-through, no return. } if s.Store != nil { s.onStorage(cl, s.Store.WriteClient(persistence.Client{ ID: "cl_" + cl.ID, ClientID: cl.ID, T: persistence.KClient, Listener: cl.Listener, Username: cl.Username, LWT: persistence.LWT(cl.LWT), })) } if s.Events.OnConnect != nil { s.Events.OnConnect(cl.Info(), events.Packet(pk)) } if err := cl.Read(s.processPacket); err != nil { s.sendLWT(cl) cl.Stop(err) } err = cl.StopCause() // Determine true cause of stop. if s.Events.OnDisconnect != nil { s.Events.OnDisconnect(cl.Info(), err) } return err } // ackConnection returns a Connack packet to a client. func (s *Server) ackConnection(cl *clients.Client, ack byte, present bool) error { return s.writeClient(cl, packets.Packet{ FixedHeader: packets.FixedHeader{ Type: packets.Connack, }, SessionPresent: present, ReturnCode: ack, }) } // inheritClientSession inherits the state of an existing client sharing the same // connection ID. If cleanSession is true, the state of any previously existing client // session is abandoned. func (s *Server) inheritClientSession(pk packets.Packet, cl *clients.Client) bool { if existing, ok := s.Clients.Get(pk.ClientIdentifier); ok { existing.Lock() defer existing.Unlock() existing.Stop(ErrSessionReestablished) // Issue a stop on the old client. // Per [MQTT-3.1.2-6]: // If CleanSession is set to 1, the Client and Server MUST discard any previous Session and start a new one. // The state associated with a CleanSession MUST NOT be reused in any subsequent session. if pk.CleanSession || existing.CleanSession { s.unsubscribeClient(existing) return false } cl.Inflight = existing.Inflight // Take address of existing session. cl.Subscriptions = existing.Subscriptions return true } else { atomic.AddInt64(&s.System.ClientsTotal, 1) if atomic.LoadInt64(&s.System.ClientsConnected) > atomic.LoadInt64(&s.System.ClientsMax) { atomic.AddInt64(&s.System.ClientsMax, 1) } return false } } // unsubscribeClient unsubscribes a client from all of their subscriptions. func (s *Server) unsubscribeClient(cl *clients.Client) { for k := range cl.Subscriptions { delete(cl.Subscriptions, k) if s.Topics.Unsubscribe(k, cl.ID) { if s.Events.OnUnsubscribe != nil { s.Events.OnUnsubscribe(k, cl.Info()) } atomic.AddInt64(&s.System.Subscriptions, -1) } } } // writeClient writes packets to a client connection. func (s *Server) writeClient(cl *clients.Client, pk packets.Packet) error { _, err := cl.WritePacket(pk) if err != nil { return fmt.Errorf("write: %w", err) } return nil } // processPacket processes an inbound packet for a client. Since the method is // typically called as a goroutine, errors are primarily for test checking purposes. func (s *Server) processPacket(cl *clients.Client, pk packets.Packet) error { switch pk.FixedHeader.Type { case packets.Connect: return s.processConnect(cl, pk) case packets.Disconnect: return s.processDisconnect(cl, pk) case packets.Pingreq: return s.processPingreq(cl, pk) case packets.Publish: r, err := pk.PublishValidate() if r != packets.Accepted { return err } return s.processPublish(cl, pk) case packets.Puback: return s.processPuback(cl, pk) case packets.Pubrec: return s.processPubrec(cl, pk) case packets.Pubrel: return s.processPubrel(cl, pk) case packets.Pubcomp: return s.processPubcomp(cl, pk) case packets.Subscribe: r, err := pk.SubscribeValidate() if r != packets.Accepted { return err } return s.processSubscribe(cl, pk) case packets.Unsubscribe: r, err := pk.UnsubscribeValidate() if r != packets.Accepted { return err } return s.processUnsubscribe(cl, pk) default: return fmt.Errorf("No valid packet available; %v", pk.FixedHeader.Type) } } // processConnect processes a Connect packet. The packet cannot be used to // establish a new connection on an existing connection. See EstablishConnection // instead. func (s *Server) processConnect(cl *clients.Client, pk packets.Packet) error { s.sendLWT(cl) cl.Stop(ErrClientReconnect) return nil } // processDisconnect processes a Disconnect packet. func (s *Server) processDisconnect(cl *clients.Client, pk packets.Packet) error { cl.Stop(ErrClientDisconnect) return nil } // processPingreq processes a Pingreq packet. func (s *Server) processPingreq(cl *clients.Client, pk packets.Packet) error { err := s.writeClient(cl, packets.Packet{ FixedHeader: packets.FixedHeader{ Type: packets.Pingresp, }, }) if err != nil { return err } return nil } // Publish creates a publish packet from a payload and sends it to the inline.pub // channel, where it is written directly to the outgoing byte buffers of any // clients subscribed to the given topic. Because the message is written directly // within the server, QoS is inherently 2 (exactly once). func (s *Server) Publish(topic string, payload []byte, retain bool) error { if len(topic) >= 4 && topic[0:4] == "$SYS" { return ErrInvalidTopic } pk := packets.Packet{ FixedHeader: packets.FixedHeader{ Type: packets.Publish, Retain: retain, }, TopicName: topic, Payload: payload, } if retain { s.retainMessage(&s.inline, pk) } // handoff packet to s.inline.pub channel for writing to client buffers // to avoid blocking the calling function. s.inline.pub <- pk return nil } // Info provides pseudo-client information for the inline messages processor. // It provides a 'client' to which inline retained messages can be assigned. func (*inlineMessages) Info() events.Client { return events.Client{ ID: "inline", Remote: "inline", Listener: "inline", } } // processPublish processes a Publish packet. func (s *Server) processPublish(cl *clients.Client, pk packets.Packet) error { if len(pk.TopicName) >= 4 && pk.TopicName[0:4] == "$SYS" { return nil // Clients can't publish to $SYS topics, so fail silently as per spec. } if !cl.AC.ACL(cl.Username, pk.TopicName, true) { return nil } // if an OnProcessMessage hook exists, potentially modify the packet. if s.Events.OnProcessMessage != nil { pkx, err := s.Events.OnProcessMessage(cl.Info(), events.Packet(pk)) if err == nil { pk = packets.Packet(pkx) // Only use the new package changes if there's no errors. } else { // If the ErrRejectPacket is return, abandon processing the packet. if err == ErrRejectPacket { return nil } if s.Events.OnError != nil { s.Events.OnError(cl.Info(), err) } } } if pk.FixedHeader.Retain { s.retainMessage(cl, pk) } if pk.FixedHeader.Qos > 0 { ack := packets.Packet{ FixedHeader: packets.FixedHeader{ Type: packets.Puback, }, PacketID: pk.PacketID, } if pk.FixedHeader.Qos == 2 { ack.FixedHeader.Type = packets.Pubrec } // omit errors in case of broken connection / LWT publish. ack send failures // will be handled by in-flight resending on next reconnect. s.onError(cl.Info(), s.writeClient(cl, ack)) } // if an OnMessage hook exists, potentially modify the packet. if s.Events.OnMessage != nil { if pkx, err := s.Events.OnMessage(cl.Info(), events.Packet(pk)); err == nil { pk = packets.Packet(pkx) } } // write packet to the byte buffers of any clients with matching topic filters. s.publishToSubscribers(pk) return nil } // retainMessage adds a message to a topic, and if a persistent store is provided, // adds the message to the store so it can be reloaded if necessary. func (s *Server) retainMessage(cl events.Clientlike, pk packets.Packet) { out := pk.PublishCopy() r := s.Topics.RetainMessage(out) atomic.AddInt64(&s.System.Retained, r) if s.Store != nil { id := "ret_" + out.TopicName if r == 1 { s.onStorage(cl, s.Store.WriteRetained(persistence.Message{ ID: id, T: persistence.KRetained, FixedHeader: persistence.FixedHeader(out.FixedHeader), TopicName: out.TopicName, Payload: out.Payload, })) } else { s.onStorage(cl, s.Store.DeleteRetained(id)) } } } // publishToSubscribers publishes a publish packet to all subscribers with // matching topic filters. func (s *Server) publishToSubscribers(pk packets.Packet) { for id, qos := range s.Topics.Subscribers(pk.TopicName) { if client, ok := s.Clients.Get(id); ok { // If the AllowClients value is set, only deliver the packet if the subscribed // client exists in the AllowClients value. For use with the OnMessage event hook // in cases where you want to publish messages to clients selectively. if pk.AllowClients != nil && !utils.InSliceString(pk.AllowClients, id) { continue } out := pk.PublishCopy() if qos > out.FixedHeader.Qos { // Inherit higher desired qos values. out.FixedHeader.Qos = qos } if out.FixedHeader.Qos > 0 { // If QoS required, save to inflight index. if out.PacketID == 0 { out.PacketID = uint16(client.NextPacketID()) } // If a message has a QoS, we need to ensure it is delivered to // the client at some point, one way or another. Store the publish // packet in the client's inflight queue and attempt to redeliver // if an appropriate ack is not received (or if the client is offline). sent := time.Now().Unix() q := client.Inflight.Set(out.PacketID, clients.InflightMessage{ Packet: out, Sent: sent, }) if q { atomic.AddInt64(&s.System.Inflight, 1) } if s.Store != nil { s.onStorage(client, s.Store.WriteInflight(persistence.Message{ ID: persistentID(client, out), T: persistence.KInflight, FixedHeader: persistence.FixedHeader(out.FixedHeader), TopicName: out.TopicName, Payload: out.Payload, Sent: sent, })) } } s.onError(client.Info(), s.writeClient(client, out)) } } } // processPuback processes a Puback packet. func (s *Server) processPuback(cl *clients.Client, pk packets.Packet) error { q := cl.Inflight.Delete(pk.PacketID) if q { atomic.AddInt64(&s.System.Inflight, -1) } if s.Store != nil { s.onStorage(cl, s.Store.DeleteInflight(persistentID(cl, pk))) } return nil } // processPubrec processes a Pubrec packet. func (s *Server) processPubrec(cl *clients.Client, pk packets.Packet) error { out := packets.Packet{ FixedHeader: packets.FixedHeader{ Type: packets.Pubrel, Qos: 1, }, PacketID: pk.PacketID, } err := s.writeClient(cl, out) if err != nil { return err } return nil } // processPubrel processes a Pubrel packet. func (s *Server) processPubrel(cl *clients.Client, pk packets.Packet) error { out := packets.Packet{ FixedHeader: packets.FixedHeader{ Type: packets.Pubcomp, }, PacketID: pk.PacketID, } err := s.writeClient(cl, out) if err != nil { return err } q := cl.Inflight.Delete(pk.PacketID) if q { atomic.AddInt64(&s.System.Inflight, -1) } if s.Store != nil { s.onStorage(cl, s.Store.DeleteInflight(persistentID(cl, pk))) } return nil } // processPubcomp processes a Pubcomp packet. func (s *Server) processPubcomp(cl *clients.Client, pk packets.Packet) error { q := cl.Inflight.Delete(pk.PacketID) if q { atomic.AddInt64(&s.System.Inflight, -1) } if s.Store != nil { s.onStorage(cl, s.Store.DeleteInflight(persistentID(cl, pk))) } return nil } // processSubscribe processes a Subscribe packet. func (s *Server) processSubscribe(cl *clients.Client, pk packets.Packet) error { retCodes := make([]byte, len(pk.Topics)) for i := 0; i < len(pk.Topics); i++ { if !cl.AC.ACL(cl.Username, pk.Topics[i], false) { retCodes[i] = packets.ErrSubAckNetworkError } else { r := s.Topics.Subscribe(pk.Topics[i], cl.ID, pk.Qoss[i]) if r { if s.Events.OnSubscribe != nil { s.Events.OnSubscribe(pk.Topics[i], cl.Info(), pk.Qoss[i]) } atomic.AddInt64(&s.System.Subscriptions, 1) } cl.NoteSubscription(pk.Topics[i], pk.Qoss[i]) retCodes[i] = pk.Qoss[i] if s.Store != nil { s.onStorage(cl, s.Store.WriteSubscription(persistence.Subscription{ ID: "sub_" + cl.ID + ":" + pk.Topics[i], T: persistence.KSubscription, Filter: pk.Topics[i], Client: cl.ID, QoS: pk.Qoss[i], })) } } } err := s.writeClient(cl, packets.Packet{ FixedHeader: packets.FixedHeader{ Type: packets.Suback, }, PacketID: pk.PacketID, ReturnCodes: retCodes, }) if err != nil { return err } // Publish out any retained messages matching the subscription filter and the user has // been allowed to subscribe to. for i := 0; i < len(pk.Topics); i++ { if retCodes[i] == packets.ErrSubAckNetworkError { continue } for _, pkv := range s.Topics.Messages(pk.Topics[i]) { s.onError(cl.Info(), s.writeClient(cl, pkv)) } } return nil } // processUnsubscribe processes an unsubscribe packet. func (s *Server) processUnsubscribe(cl *clients.Client, pk packets.Packet) error { for i := 0; i < len(pk.Topics); i++ { q := s.Topics.Unsubscribe(pk.Topics[i], cl.ID) if q { if s.Events.OnUnsubscribe != nil { s.Events.OnUnsubscribe(pk.Topics[i], cl.Info()) } atomic.AddInt64(&s.System.Subscriptions, -1) } cl.ForgetSubscription(pk.Topics[i]) } err := s.writeClient(cl, packets.Packet{ FixedHeader: packets.FixedHeader{ Type: packets.Unsuback, }, PacketID: pk.PacketID, }) if err != nil { return err } return nil } // atomicItoa reads an *int64 and formats a decimal string. func atomicItoa(ptr *int64) string { return strconv.FormatInt(atomic.LoadInt64(ptr), 10) } // persistentID return a string combining the client and packet // identifiers for use with the persistence layer. func persistentID(client *clients.Client, pk packets.Packet) string { return "if_" + client.ID + "_" + pk.FormatID() } // publishSysTopics publishes the current values to the server $SYS topics. // Due to the int to string conversions this method is not as cheap as // some of the others so the publishing interval should be set appropriately. func (s *Server) publishSysTopics() { pk := packets.Packet{ FixedHeader: packets.FixedHeader{ Type: packets.Publish, Retain: true, }, } uptime := time.Now().Unix() - atomic.LoadInt64(&s.System.Started) atomic.StoreInt64(&s.System.Uptime, uptime) topics := map[string]string{ "$SYS/broker/version": s.System.Version, "$SYS/broker/uptime": atomicItoa(&s.System.Uptime), "$SYS/broker/timestamp": atomicItoa(&s.System.Started), "$SYS/broker/load/bytes/received": atomicItoa(&s.System.BytesRecv), "$SYS/broker/load/bytes/sent": atomicItoa(&s.System.BytesSent), "$SYS/broker/clients/connected": atomicItoa(&s.System.ClientsConnected), "$SYS/broker/clients/disconnected": atomicItoa(&s.System.ClientsDisconnected), "$SYS/broker/clients/maximum": atomicItoa(&s.System.ClientsMax), "$SYS/broker/clients/total": atomicItoa(&s.System.ClientsTotal), "$SYS/broker/connections/total": atomicItoa(&s.System.ConnectionsTotal), "$SYS/broker/messages/received": atomicItoa(&s.System.MessagesRecv), "$SYS/broker/messages/sent": atomicItoa(&s.System.MessagesSent), "$SYS/broker/messages/publish/dropped": atomicItoa(&s.System.PublishDropped), "$SYS/broker/messages/publish/received": atomicItoa(&s.System.PublishRecv), "$SYS/broker/messages/publish/sent": atomicItoa(&s.System.PublishSent), "$SYS/broker/messages/retained/count": atomicItoa(&s.System.Retained), "$SYS/broker/messages/inflight": atomicItoa(&s.System.Inflight), "$SYS/broker/subscriptions/count": atomicItoa(&s.System.Subscriptions), } for topic, payload := range topics { pk.TopicName = topic pk.Payload = []byte(payload) q := s.Topics.RetainMessage(pk.PublishCopy()) atomic.AddInt64(&s.System.Retained, q) s.publishToSubscribers(pk) } if s.Store != nil { s.onStorage(&s.inline, s.Store.WriteServerInfo(persistence.ServerInfo{ Info: *s.System, ID: persistence.KServerInfo, })) } } // ResendClientInflight attempts to resend all undelivered inflight messages // to a client. func (s *Server) ResendClientInflight(cl *clients.Client, force bool) error { if cl.Inflight.Len() == 0 { return nil } nt := time.Now().Unix() for _, tk := range cl.Inflight.GetAll() { if tk.Resends >= inflightMaxResends { // After a reasonable time, drop inflight packets. cl.Inflight.Delete(tk.Packet.PacketID) if tk.Packet.FixedHeader.Type == packets.Publish { atomic.AddInt64(&s.System.PublishDropped, 1) } if s.Store != nil { s.onStorage(cl, s.Store.DeleteInflight(persistentID(cl, tk.Packet))) } continue } // Only continue if the resend backoff time has passed and there's a backoff time. if !force && (nt-tk.Sent < inflightResendBackoff[tk.Resends] || len(inflightResendBackoff) < tk.Resends) { continue } if tk.Packet.FixedHeader.Type == packets.Publish { tk.Packet.FixedHeader.Dup = true } tk.Resends++ tk.Sent = nt cl.Inflight.Set(tk.Packet.PacketID, tk) _, err := cl.WritePacket(tk.Packet) if err != nil { return err } if s.Store != nil { s.onStorage(cl, s.Store.WriteInflight(persistence.Message{ ID: persistentID(cl, tk.Packet), T: persistence.KInflight, FixedHeader: persistence.FixedHeader(tk.Packet.FixedHeader), TopicName: tk.Packet.TopicName, Payload: tk.Packet.Payload, Sent: tk.Sent, Resends: tk.Resends, })) } } return nil } // Close attempts to gracefully shutdown the server, all listeners, clients, and stores. func (s *Server) Close() error { close(s.done) s.Listeners.CloseAll(s.closeListenerClients) if s.Store != nil { s.Store.Close() } return nil } // closeListenerClients closes all clients on the specified listener. func (s *Server) closeListenerClients(listener string) { clients := s.Clients.GetByListener(listener) for _, cl := range clients { cl.Stop(ErrServerShutdown) } } // sendLWT issues an LWT message to a topic when a client disconnects. func (s *Server) sendLWT(cl *clients.Client) error { if cl.LWT.Topic != "" { err := s.processPublish(cl, packets.Packet{ FixedHeader: packets.FixedHeader{ Type: packets.Publish, Retain: cl.LWT.Retain, Qos: cl.LWT.Qos, }, TopicName: cl.LWT.Topic, Payload: cl.LWT.Message, }) if err != nil { return s.onError(cl.Info(), fmt.Errorf("send lwt: %s %w; %+v", cl.ID, err, cl.LWT)) } } return nil } // readStore reads in any data from the persistent datastore (if applicable). func (s *Server) readStore() error { info, err := s.Store.ReadServerInfo() if err != nil { return fmt.Errorf("load server info; %w", err) } s.loadServerInfo(info) clients, err := s.Store.ReadClients() if err != nil { return fmt.Errorf("load clients; %w", err) } s.loadClients(clients) subs, err := s.Store.ReadSubscriptions() if err != nil { return fmt.Errorf("load subscriptions; %w", err) } s.loadSubscriptions(subs) inflight, err := s.Store.ReadInflight() if err != nil { return fmt.Errorf("load inflight; %w", err) } s.loadInflight(inflight) retained, err := s.Store.ReadRetained() if err != nil { return fmt.Errorf("load retained; %w", err) } s.loadRetained(retained) return nil } // loadServerInfo restores server info from the datastore. func (s *Server) loadServerInfo(v persistence.ServerInfo) { version := s.System.Version s.System = &v.Info s.System.Version = version } // loadSubscriptions restores subscriptions from the datastore. func (s *Server) loadSubscriptions(v []persistence.Subscription) { for _, sub := range v { if s.Topics.Subscribe(sub.Filter, sub.Client, sub.QoS) { if cl, ok := s.Clients.Get(sub.Client); ok { cl.NoteSubscription(sub.Filter, sub.QoS) if s.Events.OnSubscribe != nil { s.Events.OnSubscribe(sub.Filter, cl.Info(), sub.QoS) } } } } } // loadClients restores clients from the datastore. func (s *Server) loadClients(v []persistence.Client) { for _, c := range v { cl := clients.NewClientStub(s.System) cl.ID = c.ClientID cl.Listener = c.Listener cl.Username = c.Username cl.LWT = clients.LWT(c.LWT) s.Clients.Add(cl) } } // loadInflight restores inflight messages from the datastore. func (s *Server) loadInflight(v []persistence.Message) { for _, msg := range v { if client, ok := s.Clients.Get(msg.Client); ok { client.Inflight.Set(msg.PacketID, clients.InflightMessage{ Packet: packets.Packet{ FixedHeader: packets.FixedHeader(msg.FixedHeader), PacketID: msg.PacketID, TopicName: msg.TopicName, Payload: msg.Payload, }, Sent: msg.Sent, Resends: msg.Resends, }) } } } // loadRetained restores retained messages from the datastore. func (s *Server) loadRetained(v []persistence.Message) { for _, msg := range v { s.Topics.RetainMessage(packets.Packet{ FixedHeader: packets.FixedHeader(msg.FixedHeader), TopicName: msg.TopicName, Payload: msg.Payload, }) } }