From 0aa49221051085c8e7cc4f712dd95b209eca9177 Mon Sep 17 00:00:00 2001 From: Andrey Belvedersky Date: Thu, 3 Jun 2021 09:30:25 +0300 Subject: [PATCH] add NSQ --- cmd/main.go | 6 ++++- go.mod | 1 + go.sum | 4 ++++ main.go | 54 ++++++++++++++++++++++++++++++++++++++++++ pkg/bot/bot.go | 16 ++++++++++++- pkg/message/message.go | 44 ++++++++++++++++++++++++++++++++++ pkg/utils/errors.go | 10 +++++--- pkg/utils/nsqd.go | 17 +++++++++++++ pkg/utils/redis.go | 1 - 9 files changed, 147 insertions(+), 6 deletions(-) create mode 100644 main.go create mode 100644 pkg/message/message.go create mode 100644 pkg/utils/nsqd.go diff --git a/cmd/main.go b/cmd/main.go index 480975b..00956bf 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -1,7 +1,11 @@ package main -import "git/ecom/jira-bot/pkg/bot" +import ( + "git/ecom/jira-bot/pkg/bot" + "git/ecom/jira-bot/pkg/message" +) func main() { + message.Message() bot.JiraBot() } diff --git a/go.mod b/go.mod index fe59cac..be6ab9b 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,7 @@ require ( github.com/go-redis/redis/v8 v8.9.0 // indirect github.com/google/go-cmp v0.5.5 // indirect github.com/klauspost/cpuid v1.2.0 // indirect + github.com/nsqio/go-nsq v1.0.8 // indirect github.com/spf13/viper v1.7.1 github.com/stretchr/testify v1.7.0 // indirect github.com/valyala/quicktemplate v1.6.3 diff --git a/go.sum b/go.sum index bfdeaf1..2d05a75 100644 --- a/go.sum +++ b/go.sum @@ -113,6 +113,8 @@ github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:W github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= github.com/golang/protobuf v1.4.2 h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0= github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= +github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0 h1:0udJVsspx3VBr5FwtLhQQtuAsVc79tTq0ocGIPAU6qo= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= @@ -246,6 +248,8 @@ github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9 github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223 h1:F9x/1yl3T2AeKLr2AMdilSD8+f9bvMnNN8VS5iDtovc= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= +github.com/nsqio/go-nsq v1.0.8 h1:3L2F8tNLlwXXlp2slDUrUWSBn2O3nMh8R1/KEDFTHPk= +github.com/nsqio/go-nsq v1.0.8/go.mod h1:vKq36oyeVXgsS5Q8YEO7WghqidAVXQlcFxzQbQTuDEY= github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4= diff --git a/main.go b/main.go new file mode 100644 index 0000000..69b9dbb --- /dev/null +++ b/main.go @@ -0,0 +1,54 @@ +package main + +import ( + "log" + + "github.com/nsqio/go-nsq" +) + +type myMessageHandler struct{} + +func processMessage(b []byte) error { + log.Println(string(b)) + return nil +} + +// HandleMessage implements the Handler interface. +func (h *myMessageHandler) HandleMessage(m *nsq.Message) error { + log.Println("test") + if len(m.Body) == 0 { + return nil + } + + err := processMessage(m.Body) + + return err +} + +func main() { + config := nsq.NewConfig() + + // // .... + // producer, err := nsq.NewProducer("ekb-app01.myway.local:4150", config) + // if err != nil { + // log.Fatal(err) + // } + // if producer.Publish("grpc", []byte("Случилась какая-то ботва 1")); err != nil { + // log.Fatal(err) + // } + // producer.Stop() + + // ... + consumer, err := nsq.NewConsumer("grpc", "grpc", config) + if err != nil { + log.Fatal(err) + } + + consumer.AddHandler(&myMessageHandler{}) + + if consumer.ConnectToNSQLookupd("ekb-app01.myway.local:4161"); err != nil { + log.Fatal(err) + } + + //consumer.Stop() +} diff --git a/pkg/bot/bot.go b/pkg/bot/bot.go index 0b61ee6..f0576cb 100644 --- a/pkg/bot/bot.go +++ b/pkg/bot/bot.go @@ -14,12 +14,16 @@ import ( telegram "gopkg.in/tucnak/telebot.v2" ) + + // Чатбот func JiraBot() { + cfg, _ := config.GetConfig() redis := utils.Redis(cfg.Redis) ctx := context.Background() + pnsq := utils.GetProducer() jiraClient, _ := utils.GetClient(&cfg.Jira) b, err := telegram.NewBot(telegram.Settings{ URL: cfg.Telegram.Url, @@ -41,10 +45,20 @@ func JiraBot() { b.Handle("/start", scenes.Start(b, cfg)) b.Handle("/exit", scenes.Exit(b, redis, ctx)) + b.Handle("/hello", func(m *telegram.Message) { b.Send(m.Sender, "Hello World!") + messageBody := []byte("hello") + topicName := "error" + + // Synchronously publish a single message to the specified topic. + // Messages can also be sent asynchronously and/or in batches. + err = pnsq.Publish(topicName, messageBody) + if err != nil { + log.Fatal(err) + } }) - fmt.Println(templates.Title(b.Me, cfg.BotVersion,jiraClient.GetBaseURL().Host)) + fmt.Println(templates.Title(b.Me, cfg.BotVersion, jiraClient.GetBaseURL().Host)) b.Start() } diff --git a/pkg/message/message.go b/pkg/message/message.go new file mode 100644 index 0000000..b847c13 --- /dev/null +++ b/pkg/message/message.go @@ -0,0 +1,44 @@ +package message + +import ( + "log" + + "github.com/nsqio/go-nsq" +) + +type myMessageHandler struct{} + +func processMessage(b []byte) error { + log.Println(string(b)) + return nil +} + +// HandleMessage implements the Handler interface. +func (h *myMessageHandler) HandleMessage(m *nsq.Message) error { + if len(m.Body) == 0 { + return nil + } + err := processMessage(m.Body) + return err +} + +func Message() { + cf := nsq.NewConfig() + + consumer, err := nsq.NewConsumer("error", "error", cf) + if err != nil { + log.Fatal(err) + } + + // Set the Handler for messages received by this Consumer. Can be called multiple times. + // See also AddConcurrentHandlers. + consumer.AddHandler(&myMessageHandler{}) + + // Use nsqlookupd to discover nsqd instances. + // See also ConnectToNSQD, ConnectToNSQDs, ConnectToNSQLookupds. + err = consumer.ConnectToNSQLookupd("ekb-app01.myway.local:4161") + if err != nil { + log.Fatal(err) + } + +} diff --git a/pkg/utils/errors.go b/pkg/utils/errors.go index daca56b..183edbc 100644 --- a/pkg/utils/errors.go +++ b/pkg/utils/errors.go @@ -2,10 +2,14 @@ package utils import "log" -func SendError(devs []int) { +func CheckErr(err error) { + if err != nil { + sendError([]int{132, 31213, 213}) + } +} + +func sendError(devs []int) { for _, dev := range devs { log.Println(dev) } } - -// 169837862 diff --git a/pkg/utils/nsqd.go b/pkg/utils/nsqd.go new file mode 100644 index 0000000..06066c7 --- /dev/null +++ b/pkg/utils/nsqd.go @@ -0,0 +1,17 @@ +package utils + +import ( + "log" + + "github.com/nsqio/go-nsq" +) + +func GetProducer() *nsq.Producer { + config := nsq.NewConfig() + + producer, err := nsq.NewProducer("ekb-app01.myway.local:4150", config) + if err != nil { + log.Fatal(err) + } + return producer +} diff --git a/pkg/utils/redis.go b/pkg/utils/redis.go index 2ee664a..d6f498c 100644 --- a/pkg/utils/redis.go +++ b/pkg/utils/redis.go @@ -2,7 +2,6 @@ package utils import ( "git/ecom/jira-bot/pkg/config" - "github.com/go-redis/redis/v8" )