Skip to content

Commit

Permalink
Refactor locator storage initialization and migration logic
Browse files Browse the repository at this point in the history
- Introduce command constants for locator-related database operations.
- Add a map for locator queries to support both SQLite and PostgreSQL.
- Remove redundant schema and migration functions by incorporating them into the Locator's init and migrate methods.
- Ensure GID columns are added and updated properly during migration.
- Update locator tests to use the new query architecture.
  • Loading branch information
umputun committed Jan 11, 2025
1 parent 99babde commit fc5505a
Show file tree
Hide file tree
Showing 2 changed files with 143 additions and 85 deletions.
224 changes: 140 additions & 84 deletions app/storage/locator.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,70 @@ import (
"github.com/umputun/tg-spam/lib/spamcheck"
)

// locator-related command constants
const (
CmdCreateLocatorTables engine.DBCmd = iota + 400
CmdCreateLocatorIndexes
CmdAddGIDColumnMessages
CmdAddGIDColumnSpam
)

// locatorQueries holds all locator-related queries
var locatorQueries = engine.QueryMap{
engine.Sqlite: {
CmdCreateLocatorTables: `
CREATE TABLE IF NOT EXISTS messages (
hash TEXT PRIMARY KEY,
gid TEXT NOT NULL DEFAULT '',
time TIMESTAMP,
chat_id INTEGER,
user_id INTEGER,
user_name TEXT,
msg_id INTEGER
);
CREATE TABLE IF NOT EXISTS spam (
user_id INTEGER PRIMARY KEY,
gid TEXT NOT NULL DEFAULT '',
time TIMESTAMP,
checks TEXT
)`,
CmdCreateLocatorIndexes: `
CREATE INDEX IF NOT EXISTS idx_messages_user_id ON messages(user_id);
CREATE INDEX IF NOT EXISTS idx_messages_user_name ON messages(user_name);
CREATE INDEX IF NOT EXISTS idx_spam_time ON spam(time);
CREATE INDEX IF NOT EXISTS idx_messages_gid ON messages(gid);
CREATE INDEX IF NOT EXISTS idx_spam_gid ON spam(gid)`,
CmdAddGIDColumnMessages: "ALTER TABLE messages ADD COLUMN gid TEXT DEFAULT ''",
CmdAddGIDColumnSpam: "ALTER TABLE spam ADD COLUMN gid TEXT DEFAULT ''",
},
engine.Postgres: {
CmdCreateLocatorTables: `
CREATE TABLE IF NOT EXISTS messages (
hash TEXT PRIMARY KEY,
gid TEXT NOT NULL DEFAULT '',
time TIMESTAMP,
chat_id BIGINT,
user_id BIGINT,
user_name TEXT,
msg_id INTEGER
);
CREATE TABLE IF NOT EXISTS spam (
user_id BIGINT PRIMARY KEY,
gid TEXT NOT NULL DEFAULT '',
time TIMESTAMP,
checks TEXT
)`,
CmdCreateLocatorIndexes: `
CREATE INDEX IF NOT EXISTS idx_messages_user_id ON messages(user_id);
CREATE INDEX IF NOT EXISTS idx_messages_user_name ON messages(user_name);
CREATE INDEX IF NOT EXISTS idx_spam_time ON spam(time);
CREATE INDEX IF NOT EXISTS idx_messages_gid ON messages(gid);
CREATE INDEX IF NOT EXISTS idx_spam_gid ON spam(gid)`,
CmdAddGIDColumnMessages: "ALTER TABLE messages ADD COLUMN IF NOT EXISTS gid TEXT DEFAULT ''",
CmdAddGIDColumnSpam: "ALTER TABLE spam ADD COLUMN IF NOT EXISTS gid TEXT DEFAULT ''",
},
}

// Locator stores messages metadata and spam results for a given ttl period.
// It is used to locate the message in the chat by its hash and to retrieve spam check results by userID.
// Useful to match messages from admin chat (only text available) to the original message and to get spam results using UserID.
Expand All @@ -41,64 +105,97 @@ type SpamData struct {
Checks []spamcheck.Response
}

var locatorSchema = `
CREATE TABLE IF NOT EXISTS messages (
hash TEXT PRIMARY KEY,
gid TEXT NOT NULL DEFAULT '',
time TIMESTAMP,
chat_id INTEGER,
user_id INTEGER,
user_name TEXT,
msg_id INTEGER
);
CREATE TABLE IF NOT EXISTS spam (
user_id INTEGER PRIMARY KEY,
gid TEXT NOT NULL DEFAULT '',
time TIMESTAMP,
checks TEXT
);
CREATE INDEX IF NOT EXISTS idx_messages_user_id ON messages(user_id);
CREATE INDEX IF NOT EXISTS idx_messages_user_name ON messages(user_name);
CREATE INDEX IF NOT EXISTS idx_spam_time ON spam(time);
CREATE INDEX IF NOT EXISTS idx_messages_gid ON messages(gid);
CREATE INDEX IF NOT EXISTS idx_spam_gid ON spam(gid);
`

// NewLocator creates new Locator. ttl defines how long to keep messages in db, minSize defines the minimum number of messages to keep
func NewLocator(ctx context.Context, ttl time.Duration, minSize int, db *engine.SQL) (*Locator, error) {
if db == nil {
return nil, fmt.Errorf("db connection is nil")
}
res := &Locator{ttl: ttl, minSize: minSize, db: db, RWLocker: db.MakeLock()}
if err := res.init(ctx); err != nil {
return nil, fmt.Errorf("failed to init locator storage: %w", err)
}
return res, nil
}

//nolint:dupl // it's ok to have similar code for different tables
func (l *Locator) init(ctx context.Context) error {
tx, err := l.db.Beginx()
if err != nil {
return fmt.Errorf("failed to start transaction: %w", err)
}
defer tx.Rollback()

// first check if tables exist, we can't do this in a transaction because ALTER TABLE will fail
var exists int
err := db.GetContext(ctx, &exists, "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='messages'")
// create tables first
createSchema, err := engine.PickQuery(locatorQueries, l.db.Type(), CmdCreateLocatorTables)
if err != nil {
return nil, fmt.Errorf("failed to check for messages table existence: %w", err)
return fmt.Errorf("failed to get create tables query: %w", err)
}
if _, err = tx.ExecContext(ctx, createSchema); err != nil {
return fmt.Errorf("failed to create schema: %w", err)
}

// try to migrate if needed
if err = l.migrate(ctx, tx, l.db.GID()); err != nil {
return fmt.Errorf("failed to migrate tables: %w", err)
}

// create indices after migration when all columns exist
createIndexes, err := engine.PickQuery(locatorQueries, l.db.Type(), CmdCreateLocatorIndexes)
if err != nil {
return fmt.Errorf("failed to get create indexes query: %w", err)
}
if _, err = tx.ExecContext(ctx, createIndexes); err != nil {
return fmt.Errorf("failed to create indexes: %w", err)
}

if err = tx.Commit(); err != nil {
return fmt.Errorf("failed to commit transaction: %w", err)
}
return nil
}

func (l *Locator) migrate(ctx context.Context, tx *sqlx.Tx, gid string) error {
// try to select with new structure, if works - already migrated
var count int
err := tx.GetContext(ctx, &count, "SELECT COUNT(*) FROM messages WHERE gid = ''")
if err == nil {
log.Printf("[DEBUG] locator tables already migrated")
return nil
}

// add gid column to messages
addGIDMessagesQuery, err := engine.PickQuery(locatorQueries, l.db.Type(), CmdAddGIDColumnMessages)
if err != nil {
return fmt.Errorf("failed to get add messages GID query: %w", err)
}

_, err = tx.ExecContext(ctx, addGIDMessagesQuery)
if err != nil && !strings.Contains(err.Error(), "duplicate column") {
return fmt.Errorf("failed to add gid column to messages: %w", err)
}

if exists == 0 { // tables do not exist, create them
tx, err := db.Begin()
if err != nil {
return nil, fmt.Errorf("failed to start transaction: %w", err)
}
defer tx.Rollback()
// add gid column to spam
addGIDSpamQuery, err := engine.PickQuery(locatorQueries, l.db.Type(), CmdAddGIDColumnSpam)
if err != nil {
return fmt.Errorf("failed to get add spam GID query: %w", err)
}

if _, err = tx.ExecContext(ctx, locatorSchema); err != nil {
return nil, fmt.Errorf("failed to create schema: %w", err)
}
_, err = tx.ExecContext(ctx, addGIDSpamQuery)
if err != nil && !strings.Contains(err.Error(), "duplicate column") {
return fmt.Errorf("failed to add gid column to spam: %w", err)
}

if err = tx.Commit(); err != nil {
return nil, fmt.Errorf("failed to commit transaction: %w", err)
}
// update existing records with provided gid
if _, err = tx.ExecContext(ctx, "UPDATE messages SET gid = ? WHERE gid = ''", gid); err != nil {
return fmt.Errorf("failed to update gid for existing messages: %w", err)
}

// migrate tables
if err := migrateLocator(&db.DB, db.GID()); err != nil {
return nil, fmt.Errorf("failed to migrate locator: %w", err)
if _, err = tx.ExecContext(ctx, "UPDATE spam SET gid = ? WHERE gid = ''", gid); err != nil {
return fmt.Errorf("failed to update gid for existing spam: %w", err)
}

return &Locator{ttl: ttl, minSize: minSize, db: db, RWLocker: db.MakeLock()}, nil
log.Printf("[DEBUG] locator tables migrated")
return nil
}

// Close closes the database
Expand Down Expand Up @@ -257,44 +354,3 @@ func (m MsgMeta) String() string {
func (s SpamData) String() string {
return fmt.Sprintf("{time: %s, checks: %+v}", s.Time.Format(time.RFC3339), s.Checks)
}

// migration function
func migrateLocator(db *sqlx.DB, gid string) error {
// add gid column to messages if it doesn't exist
if _, err := db.Exec(`ALTER TABLE messages ADD COLUMN gid TEXT DEFAULT ''`); err != nil {
if !strings.Contains(err.Error(), "duplicate column name") {
return fmt.Errorf("failed to alter messages table: %w", err)
}
}

// add gid column to spam if it doesn't exist
if _, err := db.Exec(`ALTER TABLE spam ADD COLUMN gid TEXT DEFAULT ''`); err != nil {
if !strings.Contains(err.Error(), "duplicate column name") {
return fmt.Errorf("failed to alter spam table: %w", err)
}
}

// update existing records with the provided gid
res1, err := db.Exec("UPDATE messages SET gid = ? WHERE gid = ''", gid)
if err != nil {
return fmt.Errorf("failed to update gid for existing messages: %w", err)
}
messagesAffected, err := res1.RowsAffected()
if err != nil {
return fmt.Errorf("failed to get messages affected rows: %w", err)
}

res2, err := db.Exec("UPDATE spam SET gid = ? WHERE gid = ''", gid)
if err != nil {
return fmt.Errorf("failed to update gid for existing spam: %w", err)
}
spamAffected, err := res2.RowsAffected()
if err != nil {
return fmt.Errorf("failed to get spam affected rows: %w", err)
}
if messagesAffected > 0 || spamAffected > 0 {
log.Printf("[DEBUG] locator tables migrated, gid updated to %q, messages: %d, spam: %d", gid, messagesAffected, spamAffected)
}

return nil
}
4 changes: 3 additions & 1 deletion app/storage/locator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,9 @@ func TestLocator_Migration(t *testing.T) {
defer db.Close()

// create schema with new tables including gid
_, err = db.Exec(locatorSchema)
createSchema, err := engine.PickQuery(locatorQueries, db.Type(), CmdCreateLocatorTables)
require.NoError(t, err)
_, err = db.Exec(createSchema)
require.NoError(t, err)

// create first locator which should trigger migration
Expand Down

0 comments on commit fc5505a

Please sign in to comment.