Skip to content

Commit

Permalink
feat: convert back to sts
Browse files Browse the repository at this point in the history
  • Loading branch information
akurilov committed Jun 7, 2024
1 parent 543742f commit 4b94f7b
Show file tree
Hide file tree
Showing 10 changed files with 116 additions and 51 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

Before deploying:
```shell
kubectl create secret generic source-telegram-tokens-<NUM> \
--from-literal=id=<TG_APP_ID> \
--from-literal=hash=<TG_APP_HASH> \
--from-literal=phone=<TG_PHONE_NUM>
kubectl create secret generic source-telegram-tokens \
--from-literal=ids=<TG_APP_ID_0>,<TG_APP_ID_1>,... \
--from-literal=hashes=<TG_APP_HASH_0>,<TG_APP_HASH_1>,... \
--from-literal=phones=<TG_PHONE_NUM_0>,<TG_PHONE_NUM_1>,...
```

Once deployed in K8s, it requires a manual code input to complete the Telegram authentication.
Expand Down
13 changes: 9 additions & 4 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ type Config struct {
Api struct {
Port uint16 `envconfig:"API_PORT" default:"50051" required:"true"`
Telegram struct {
Id int32 `envconfig:"API_TELEGRAM_ID" required:"true"`
Hash string `envconfig:"API_TELEGRAM_HASH" required:"true"`
Password string `envconfig:"API_TELEGRAM_PASS" default:""`
Phone string `envconfig:"API_TELEGRAM_PHONE" required:"true"`
Ids []int32 `envconfig:"API_TELEGRAM_IDS" required:"true"`
Hashes []string `envconfig:"API_TELEGRAM_HASHES" required:"true"`
Phones []string `envconfig:"API_TELEGRAM_PHONES" required:"true"`
Password string `envconfig:"API_TELEGRAM_PASS" default:""`
}
Writer struct {
Uri string `envconfig:"API_WRITER_URI" default:"resolver:50051" required:"true"`
Expand All @@ -22,6 +22,7 @@ type Config struct {
Log struct {
Level int `envconfig:"LOG_LEVEL" default:"-4" required:"true"`
}
Replica ReplicaConfig
}

type DbConfig struct {
Expand All @@ -40,6 +41,10 @@ type DbConfig struct {
}
}

type ReplicaConfig struct {
Name string `envconfig:"REPLICA_NAME" required:"true"`
}

func NewConfigFromEnv() (cfg Config, err error) {
err = envconfig.Process("", &cfg)
return
Expand Down
8 changes: 6 additions & 2 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,14 @@ func TestConfig(t *testing.T) {
os.Setenv("API_TELEGRAM_ID", "123456789")
os.Setenv("API_TELEGRAM_HASH", "deadcodecafebeef")
os.Setenv("API_TELEGRAM_FEED_CHAT_IDS", "-1001754252633,-1001260622817,-1001801930101")
os.Setenv("REPLICA_NAME", "replica-1")
os.Setenv("API_TELEGRAM_IDS", "123,456,789")
os.Setenv("API_TELEGRAM_HASHES", "deadcode,cafebeef")
os.Setenv("API_TELEGRAM_PHONES", "1234,5678")
cfg, err := NewConfigFromEnv()
assert.Nil(t, err)
assert.Equal(t, "writer:56789", cfg.Api.Writer.Uri)
assert.Equal(t, slog.LevelWarn, slog.Level(cfg.Log.Level))
assert.Equal(t, int32(123456789), cfg.Api.Telegram.Id)
assert.Equal(t, "deadcodecafebeef", cfg.Api.Telegram.Hash)
assert.Equal(t, []int32{123, 456, 789}, cfg.Api.Telegram.Ids)
assert.Equal(t, []string{"deadcode", "cafebeef"}, cfg.Api.Telegram.Hashes)
}
20 changes: 10 additions & 10 deletions helm/source-telegram/templates/sts.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,25 +30,25 @@ spec:
containers:
- name: {{ .Chart.Name }}
env:
- name: POD_NAME
- name: REPLICA_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: API_TELEGRAM_ID
- name: API_TELEGRAM_IDS
valueFrom:
secretKeyRef:
name: "{{ include "source-telegram.fullname" . }}-tokens-{{ $(POD_NAME) | split('-') | last}}"
key: id
- name: API_TELEGRAM_HASH
name: "{{ include "source-telegram.fullname" . }}-tokens"
key: ids
- name: API_TELEGRAM_HASHES
valueFrom:
secretKeyRef:
name: "{{ include "source-telegram.fullname" . }}-tokens-{{ $(POD_NAME) | split('-') | last}}"
key: hash
- name: API_TELEGRAM_PHONE
name: "{{ include "source-telegram.fullname" . }}-tokens"
key: hashes
- name: API_TELEGRAM_PHONES
valueFrom:
secretKeyRef:
name: "{{ include "source-telegram.fullname" . }}-tokens-{{ $(POD_NAME) | split('-') | last}}"
key: phone
name: "{{ include "source-telegram.fullname" . }}-tokens"
key: phones
- name: API_PORT
value: "{{ .Values.service.portGrpc }}"
- name: API_WRITER_URI
Expand Down
37 changes: 32 additions & 5 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"log/slog"
"os"
"os/signal"
"strconv"
"strings"
"sync"
"syscall"
"time"
Expand All @@ -24,7 +26,7 @@ import (
//_ "net/http/pprof"
)

const chanCacheSize = 1024
const chanCacheSize = 1_000
const chanCacheTtl = 1 * time.Minute

func main() {
Expand Down Expand Up @@ -58,14 +60,39 @@ func main() {
}
chCode <- line
}()

// determine the replica index
replicaNameParts := strings.Split(cfg.Replica.Name, "-")
if len(replicaNameParts) < 2 {
panic("unable to parse the replica name: " + cfg.Replica.Name)
}
var replicaIndex int
replicaIndex, err = strconv.Atoi(replicaNameParts[len(replicaNameParts)-1])
if err != nil {
panic(err)
}
if replicaIndex < 0 {
panic(fmt.Sprintf("Negative replica index: %d", replicaIndex))
}
log.Info(fmt.Sprintf("Replica: %d", replicaIndex))

if len(cfg.Api.Telegram.Ids) <= replicaIndex {
panic("Not enough telegram client ids, decrease the replica count or fix the config")
}
if len(cfg.Api.Telegram.Hashes) <= replicaIndex {
panic("Not enough telegram client hashes, decrease the replica count or fix the config")
}
if len(cfg.Api.Telegram.Phones) <= replicaIndex {
panic("Not enough phone numbers, decrease the replica count or fix the config")
}
//
go client.NonInteractiveCredentialsProvider(authorizer, cfg.Api.Telegram.Phone, cfg.Api.Telegram.Password, chCode)
go client.NonInteractiveCredentialsProvider(authorizer, cfg.Api.Telegram.Phones[replicaIndex], cfg.Api.Telegram.Password, chCode)
authorizer.TdlibParameters <- &client.SetTdlibParametersRequest{
//
UseTestDc: false,
UseSecretChats: false,
ApiId: cfg.Api.Telegram.Id,
ApiHash: cfg.Api.Telegram.Hash,
ApiId: cfg.Api.Telegram.Ids[replicaIndex],
ApiHash: cfg.Api.Telegram.Hashes[replicaIndex],
SystemLanguageCode: "en",
DeviceModel: "Awakari",
SystemVersion: "1.0.0",
Expand Down Expand Up @@ -125,7 +152,7 @@ func main() {
chansJoined := map[int64]*model.Channel{}
chansJoinedLock := &sync.Mutex{}

svc := service.NewService(clientTg, stor, chansJoined, chansJoinedLock, log)
svc := service.NewService(clientTg, stor, chansJoined, chansJoinedLock, log, replicaIndex)
svc = service.NewServiceLogging(svc, log)
go func() {
b := backoff.NewExponentialBackOff()
Expand Down
19 changes: 10 additions & 9 deletions model/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ package model
import "time"

type Channel struct {
Id int64
GroupId string
UserId string
Name string
Link string
Created time.Time
Last time.Time
SubId string
Terms string
Id int64
GroupId string
UserId string
Name string
Link string
Created time.Time
Last time.Time
SubId string
Terms string
Label string
}
1 change: 1 addition & 0 deletions model/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ type ChannelFilter struct {
UserId string
Pattern string
SubId string
Label string
}
9 changes: 7 additions & 2 deletions service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/awakari/source-telegram/model"
"github.com/awakari/source-telegram/storage"
"log/slog"
"strconv"
"sync"
"time"
)
Expand All @@ -28,6 +29,7 @@ type service struct {
chansJoined map[int64]*model.Channel
chansJoinedLock *sync.Mutex
log *slog.Logger
replicaIndex int
}

const ListLimit = 1_000
Expand All @@ -40,13 +42,15 @@ func NewService(
chansJoined map[int64]*model.Channel,
chansJoinedLock *sync.Mutex,
log *slog.Logger,
replicaIndex int,
) Service {
return service{
clientTg: clientTg,
stor: stor,
chansJoined: chansJoined,
chansJoinedLock: chansJoinedLock,
log: log,
replicaIndex: replicaIndex,
}
}

Expand Down Expand Up @@ -105,8 +109,9 @@ func (svc service) refreshJoined(ctx context.Context) (err error) {
if err == nil {
svc.log.Debug(fmt.Sprintf("Refresh joined channels: got %d from the client", len(chatsJoined.ChatIds)))
//
chanFilter := model.ChannelFilter{
// TODO country code
var chanFilter model.ChannelFilter
if svc.replicaIndex > 0 {
chanFilter.Label = strconv.Itoa(svc.replicaIndex)
}
chans, err = svc.stor.GetPage(ctx, chanFilter, ListLimit, "", model.OrderAsc) // it's important to get all at once
}
Expand Down
25 changes: 24 additions & 1 deletion storage/storage_mongo.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type recChan struct {
Last time.Time `bson:"last,omitempty"`
SubId string `bson:"subId,omitempty"`
Terms string `bson:"terms,omitempty"`
Label string `bson:"label,omitempty"`
}

const attrId = "id"
Expand All @@ -34,6 +35,7 @@ const attrCreated = "created"
const attrLast = "last"
const attrSubId = "subId"
const attrTerms = "terms"
const attrLabel = "label"

type storageMongo struct {
conn *mongo.Client
Expand Down Expand Up @@ -83,6 +85,10 @@ var projGet = bson.D{
Key: attrTerms,
Value: 1,
},
{
Key: attrLabel,
Value: 1,
},
}
var sortGetBatchAsc = bson.D{
{
Expand Down Expand Up @@ -177,6 +183,18 @@ func (sm storageMongo) ensureIndices(ctx context.Context, retentionPeriod time.D
SetSparse(true).
SetUnique(false),
},
{
Keys: bson.D{
{
Key: attrLabel,
Value: 1,
},
},
Options: options.
Index().
SetSparse(true).
SetUnique(false),
},
})
}

Expand All @@ -195,6 +213,7 @@ func (sm storageMongo) Create(ctx context.Context, ch model.Channel) (err error)
Created: ch.Created,
SubId: ch.SubId,
Terms: ch.Terms,
Label: ch.Label,
}
_, err = sm.coll.InsertOne(ctx, rec)
err = decodeError(err, ch.Link)
Expand Down Expand Up @@ -222,6 +241,7 @@ func (sm storageMongo) Read(ctx context.Context, link string) (ch model.Channel,
ch.Last = rec.Last
ch.SubId = rec.SubId
ch.Terms = rec.Terms
ch.Label = rec.Label
}
err = decodeError(err, link)
return
Expand Down Expand Up @@ -268,7 +288,9 @@ func (sm storageMongo) Delete(ctx context.Context, link string) (err error) {

func (sm storageMongo) GetPage(ctx context.Context, filter model.ChannelFilter, limit uint32, cursor string, order model.Order) (page []model.Channel, err error) {
q := bson.M{}
// TODO filter by country
if filter.Label != "" {
q[attrLabel] = filter.Label
}
if filter.UserId != "" {
q[attrGroupId] = filter.GroupId
q[attrUserId] = filter.UserId
Expand Down Expand Up @@ -326,6 +348,7 @@ func (sm storageMongo) GetPage(ctx context.Context, filter model.ChannelFilter,
UserId: rec.UserId,
Name: rec.Name,
Link: rec.Link,
Label: rec.Label,
})
}
}
Expand Down
27 changes: 13 additions & 14 deletions storage/storage_mongo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ func TestStorageMongo_Update(t *testing.T) {

func TestStorageMongo_GetPage(t *testing.T) {
//
collName := fmt.Sprintf("feeds-test-%d", time.Now().UnixMicro())
collName := fmt.Sprintf("tgchans-test-%d", time.Now().UnixMicro())
dbCfg := config.DbConfig{
Uri: dbUri,
Name: "sources",
Expand All @@ -277,12 +277,13 @@ func TestStorageMongo_GetPage(t *testing.T) {
-1001754252633,
-1001801930101,
}
for _, id := range ids {
for i, id := range ids {
_, err = sm.coll.InsertOne(ctx, bson.M{
attrId: id,
attrName: strconv.FormatInt(id, 10),
attrLink: fmt.Sprintf("https://t.me/c/%s/123", strconv.FormatInt(-id, 10)),
attrSubId: strconv.FormatInt(-id, 10),
attrLabel: strconv.Itoa(i % 2),
})
require.Nil(t, err)
}
Expand Down Expand Up @@ -310,18 +311,16 @@ func TestStorageMongo_GetPage(t *testing.T) {
ids[3],
},
},
// TODO filter by country
//"filter": {
// filter: model.ChannelFilter{
//
// },
// limit: 10,
// page: []int64{
// ids[2],
// ids[3],
// ids[4],
// },
//},
"filter": {
filter: model.ChannelFilter{
Label: "1",
},
limit: 10,
page: []int64{
ids[1],
ids[3],
},
},
"limit": {
limit: 2,
page: []int64{
Expand Down

0 comments on commit 4b94f7b

Please sign in to comment.