From 4b94f7b91de4b2473bbfae5980fbff241562b771 Mon Sep 17 00:00:00 2001 From: Andrei Kurilov <18027129+akurilov@users.noreply.github.com> Date: Fri, 7 Jun 2024 17:55:39 +0300 Subject: [PATCH] feat: convert back to sts --- README.md | 8 +++--- config/config.go | 13 ++++++--- config/config_test.go | 8 ++++-- helm/source-telegram/templates/sts.yaml | 20 ++++++------- main.go | 37 +++++++++++++++++++++---- model/channel.go | 19 +++++++------ model/filter.go | 1 + service/service.go | 9 ++++-- storage/storage_mongo.go | 25 ++++++++++++++++- storage/storage_mongo_test.go | 27 +++++++++--------- 10 files changed, 116 insertions(+), 51 deletions(-) diff --git a/README.md b/README.md index e32732a..3f213e0 100644 --- a/README.md +++ b/README.md @@ -2,10 +2,10 @@ Before deploying: ```shell -kubectl create secret generic source-telegram-tokens- \ - --from-literal=id= \ - --from-literal=hash= \ - --from-literal=phone= +kubectl create secret generic source-telegram-tokens \ + --from-literal=ids=,,... \ + --from-literal=hashes=,,... \ + --from-literal=phones=,,... ``` Once deployed in K8s, it requires a manual code input to complete the Telegram authentication. diff --git a/config/config.go b/config/config.go index d265fa4..56367ce 100644 --- a/config/config.go +++ b/config/config.go @@ -9,10 +9,10 @@ type Config struct { Api struct { Port uint16 `envconfig:"API_PORT" default:"50051" required:"true"` Telegram struct { - Id int32 `envconfig:"API_TELEGRAM_ID" required:"true"` - Hash string `envconfig:"API_TELEGRAM_HASH" required:"true"` - Password string `envconfig:"API_TELEGRAM_PASS" default:""` - Phone string `envconfig:"API_TELEGRAM_PHONE" required:"true"` + Ids []int32 `envconfig:"API_TELEGRAM_IDS" required:"true"` + Hashes []string `envconfig:"API_TELEGRAM_HASHES" required:"true"` + Phones []string `envconfig:"API_TELEGRAM_PHONES" required:"true"` + Password string `envconfig:"API_TELEGRAM_PASS" default:""` } Writer struct { Uri string `envconfig:"API_WRITER_URI" default:"resolver:50051" required:"true"` @@ -22,6 +22,7 @@ type Config struct { Log struct { Level int `envconfig:"LOG_LEVEL" default:"-4" required:"true"` } + Replica ReplicaConfig } type DbConfig struct { @@ -40,6 +41,10 @@ type DbConfig struct { } } +type ReplicaConfig struct { + Name string `envconfig:"REPLICA_NAME" required:"true"` +} + func NewConfigFromEnv() (cfg Config, err error) { err = envconfig.Process("", &cfg) return diff --git a/config/config_test.go b/config/config_test.go index 314cd26..de56852 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -15,10 +15,14 @@ func TestConfig(t *testing.T) { os.Setenv("API_TELEGRAM_ID", "123456789") os.Setenv("API_TELEGRAM_HASH", "deadcodecafebeef") os.Setenv("API_TELEGRAM_FEED_CHAT_IDS", "-1001754252633,-1001260622817,-1001801930101") + os.Setenv("REPLICA_NAME", "replica-1") + os.Setenv("API_TELEGRAM_IDS", "123,456,789") + os.Setenv("API_TELEGRAM_HASHES", "deadcode,cafebeef") + os.Setenv("API_TELEGRAM_PHONES", "1234,5678") cfg, err := NewConfigFromEnv() assert.Nil(t, err) assert.Equal(t, "writer:56789", cfg.Api.Writer.Uri) assert.Equal(t, slog.LevelWarn, slog.Level(cfg.Log.Level)) - assert.Equal(t, int32(123456789), cfg.Api.Telegram.Id) - assert.Equal(t, "deadcodecafebeef", cfg.Api.Telegram.Hash) + assert.Equal(t, []int32{123, 456, 789}, cfg.Api.Telegram.Ids) + assert.Equal(t, []string{"deadcode", "cafebeef"}, cfg.Api.Telegram.Hashes) } diff --git a/helm/source-telegram/templates/sts.yaml b/helm/source-telegram/templates/sts.yaml index d6749a4..b6f46d2 100644 --- a/helm/source-telegram/templates/sts.yaml +++ b/helm/source-telegram/templates/sts.yaml @@ -30,25 +30,25 @@ spec: containers: - name: {{ .Chart.Name }} env: - - name: POD_NAME + - name: REPLICA_NAME valueFrom: fieldRef: fieldPath: metadata.name - - name: API_TELEGRAM_ID + - name: API_TELEGRAM_IDS valueFrom: secretKeyRef: - name: "{{ include "source-telegram.fullname" . }}-tokens-{{ $(POD_NAME) | split('-') | last}}" - key: id - - name: API_TELEGRAM_HASH + name: "{{ include "source-telegram.fullname" . }}-tokens" + key: ids + - name: API_TELEGRAM_HASHES valueFrom: secretKeyRef: - name: "{{ include "source-telegram.fullname" . }}-tokens-{{ $(POD_NAME) | split('-') | last}}" - key: hash - - name: API_TELEGRAM_PHONE + name: "{{ include "source-telegram.fullname" . }}-tokens" + key: hashes + - name: API_TELEGRAM_PHONES valueFrom: secretKeyRef: - name: "{{ include "source-telegram.fullname" . }}-tokens-{{ $(POD_NAME) | split('-') | last}}" - key: phone + name: "{{ include "source-telegram.fullname" . }}-tokens" + key: phones - name: API_PORT value: "{{ .Values.service.portGrpc }}" - name: API_WRITER_URI diff --git a/main.go b/main.go index a5c0168..13e5fa9 100644 --- a/main.go +++ b/main.go @@ -14,6 +14,8 @@ import ( "log/slog" "os" "os/signal" + "strconv" + "strings" "sync" "syscall" "time" @@ -24,7 +26,7 @@ import ( //_ "net/http/pprof" ) -const chanCacheSize = 1024 +const chanCacheSize = 1_000 const chanCacheTtl = 1 * time.Minute func main() { @@ -58,14 +60,39 @@ func main() { } chCode <- line }() + + // determine the replica index + replicaNameParts := strings.Split(cfg.Replica.Name, "-") + if len(replicaNameParts) < 2 { + panic("unable to parse the replica name: " + cfg.Replica.Name) + } + var replicaIndex int + replicaIndex, err = strconv.Atoi(replicaNameParts[len(replicaNameParts)-1]) + if err != nil { + panic(err) + } + if replicaIndex < 0 { + panic(fmt.Sprintf("Negative replica index: %d", replicaIndex)) + } + log.Info(fmt.Sprintf("Replica: %d", replicaIndex)) + + if len(cfg.Api.Telegram.Ids) <= replicaIndex { + panic("Not enough telegram client ids, decrease the replica count or fix the config") + } + if len(cfg.Api.Telegram.Hashes) <= replicaIndex { + panic("Not enough telegram client hashes, decrease the replica count or fix the config") + } + if len(cfg.Api.Telegram.Phones) <= replicaIndex { + panic("Not enough phone numbers, decrease the replica count or fix the config") + } // - go client.NonInteractiveCredentialsProvider(authorizer, cfg.Api.Telegram.Phone, cfg.Api.Telegram.Password, chCode) + go client.NonInteractiveCredentialsProvider(authorizer, cfg.Api.Telegram.Phones[replicaIndex], cfg.Api.Telegram.Password, chCode) authorizer.TdlibParameters <- &client.SetTdlibParametersRequest{ // UseTestDc: false, UseSecretChats: false, - ApiId: cfg.Api.Telegram.Id, - ApiHash: cfg.Api.Telegram.Hash, + ApiId: cfg.Api.Telegram.Ids[replicaIndex], + ApiHash: cfg.Api.Telegram.Hashes[replicaIndex], SystemLanguageCode: "en", DeviceModel: "Awakari", SystemVersion: "1.0.0", @@ -125,7 +152,7 @@ func main() { chansJoined := map[int64]*model.Channel{} chansJoinedLock := &sync.Mutex{} - svc := service.NewService(clientTg, stor, chansJoined, chansJoinedLock, log) + svc := service.NewService(clientTg, stor, chansJoined, chansJoinedLock, log, replicaIndex) svc = service.NewServiceLogging(svc, log) go func() { b := backoff.NewExponentialBackOff() diff --git a/model/channel.go b/model/channel.go index 0571fee..9852315 100644 --- a/model/channel.go +++ b/model/channel.go @@ -3,13 +3,14 @@ package model import "time" type Channel struct { - Id int64 - GroupId string - UserId string - Name string - Link string - Created time.Time - Last time.Time - SubId string - Terms string + Id int64 + GroupId string + UserId string + Name string + Link string + Created time.Time + Last time.Time + SubId string + Terms string + Label string } diff --git a/model/filter.go b/model/filter.go index c8b9afa..d6423f9 100644 --- a/model/filter.go +++ b/model/filter.go @@ -5,4 +5,5 @@ type ChannelFilter struct { UserId string Pattern string SubId string + Label string } diff --git a/service/service.go b/service/service.go index e7a3a1d..24520e2 100644 --- a/service/service.go +++ b/service/service.go @@ -8,6 +8,7 @@ import ( "github.com/awakari/source-telegram/model" "github.com/awakari/source-telegram/storage" "log/slog" + "strconv" "sync" "time" ) @@ -28,6 +29,7 @@ type service struct { chansJoined map[int64]*model.Channel chansJoinedLock *sync.Mutex log *slog.Logger + replicaIndex int } const ListLimit = 1_000 @@ -40,6 +42,7 @@ func NewService( chansJoined map[int64]*model.Channel, chansJoinedLock *sync.Mutex, log *slog.Logger, + replicaIndex int, ) Service { return service{ clientTg: clientTg, @@ -47,6 +50,7 @@ func NewService( chansJoined: chansJoined, chansJoinedLock: chansJoinedLock, log: log, + replicaIndex: replicaIndex, } } @@ -105,8 +109,9 @@ func (svc service) refreshJoined(ctx context.Context) (err error) { if err == nil { svc.log.Debug(fmt.Sprintf("Refresh joined channels: got %d from the client", len(chatsJoined.ChatIds))) // - chanFilter := model.ChannelFilter{ - // TODO country code + var chanFilter model.ChannelFilter + if svc.replicaIndex > 0 { + chanFilter.Label = strconv.Itoa(svc.replicaIndex) } chans, err = svc.stor.GetPage(ctx, chanFilter, ListLimit, "", model.OrderAsc) // it's important to get all at once } diff --git a/storage/storage_mongo.go b/storage/storage_mongo.go index 8ac215a..6724638 100644 --- a/storage/storage_mongo.go +++ b/storage/storage_mongo.go @@ -23,6 +23,7 @@ type recChan struct { Last time.Time `bson:"last,omitempty"` SubId string `bson:"subId,omitempty"` Terms string `bson:"terms,omitempty"` + Label string `bson:"label,omitempty"` } const attrId = "id" @@ -34,6 +35,7 @@ const attrCreated = "created" const attrLast = "last" const attrSubId = "subId" const attrTerms = "terms" +const attrLabel = "label" type storageMongo struct { conn *mongo.Client @@ -83,6 +85,10 @@ var projGet = bson.D{ Key: attrTerms, Value: 1, }, + { + Key: attrLabel, + Value: 1, + }, } var sortGetBatchAsc = bson.D{ { @@ -177,6 +183,18 @@ func (sm storageMongo) ensureIndices(ctx context.Context, retentionPeriod time.D SetSparse(true). SetUnique(false), }, + { + Keys: bson.D{ + { + Key: attrLabel, + Value: 1, + }, + }, + Options: options. + Index(). + SetSparse(true). + SetUnique(false), + }, }) } @@ -195,6 +213,7 @@ func (sm storageMongo) Create(ctx context.Context, ch model.Channel) (err error) Created: ch.Created, SubId: ch.SubId, Terms: ch.Terms, + Label: ch.Label, } _, err = sm.coll.InsertOne(ctx, rec) err = decodeError(err, ch.Link) @@ -222,6 +241,7 @@ func (sm storageMongo) Read(ctx context.Context, link string) (ch model.Channel, ch.Last = rec.Last ch.SubId = rec.SubId ch.Terms = rec.Terms + ch.Label = rec.Label } err = decodeError(err, link) return @@ -268,7 +288,9 @@ func (sm storageMongo) Delete(ctx context.Context, link string) (err error) { func (sm storageMongo) GetPage(ctx context.Context, filter model.ChannelFilter, limit uint32, cursor string, order model.Order) (page []model.Channel, err error) { q := bson.M{} - // TODO filter by country + if filter.Label != "" { + q[attrLabel] = filter.Label + } if filter.UserId != "" { q[attrGroupId] = filter.GroupId q[attrUserId] = filter.UserId @@ -326,6 +348,7 @@ func (sm storageMongo) GetPage(ctx context.Context, filter model.ChannelFilter, UserId: rec.UserId, Name: rec.Name, Link: rec.Link, + Label: rec.Label, }) } } diff --git a/storage/storage_mongo_test.go b/storage/storage_mongo_test.go index 1caf712..fb2a84d 100644 --- a/storage/storage_mongo_test.go +++ b/storage/storage_mongo_test.go @@ -253,7 +253,7 @@ func TestStorageMongo_Update(t *testing.T) { func TestStorageMongo_GetPage(t *testing.T) { // - collName := fmt.Sprintf("feeds-test-%d", time.Now().UnixMicro()) + collName := fmt.Sprintf("tgchans-test-%d", time.Now().UnixMicro()) dbCfg := config.DbConfig{ Uri: dbUri, Name: "sources", @@ -277,12 +277,13 @@ func TestStorageMongo_GetPage(t *testing.T) { -1001754252633, -1001801930101, } - for _, id := range ids { + for i, id := range ids { _, err = sm.coll.InsertOne(ctx, bson.M{ attrId: id, attrName: strconv.FormatInt(id, 10), attrLink: fmt.Sprintf("https://t.me/c/%s/123", strconv.FormatInt(-id, 10)), attrSubId: strconv.FormatInt(-id, 10), + attrLabel: strconv.Itoa(i % 2), }) require.Nil(t, err) } @@ -310,18 +311,16 @@ func TestStorageMongo_GetPage(t *testing.T) { ids[3], }, }, - // TODO filter by country - //"filter": { - // filter: model.ChannelFilter{ - // - // }, - // limit: 10, - // page: []int64{ - // ids[2], - // ids[3], - // ids[4], - // }, - //}, + "filter": { + filter: model.ChannelFilter{ + Label: "1", + }, + limit: 10, + page: []int64{ + ids[1], + ids[3], + }, + }, "limit": { limit: 2, page: []int64{