-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathaggregate_tables.R
68 lines (60 loc) · 2.27 KB
/
aggregate_tables.R
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
library(dplyr)
library(DBI)
library(rjson)
source('s_dplyr.R')
source('aggregate_tables/indicator_functions.R')
source('data_sources.R')
# TODO this shouldn't be both here and in config_file_funcs, but don't want to load jsonlite here
get_db_connection <- function(system_config_path='config_system.json') {
config <- fromJSON(file=system_config_path)
db_config <- config[['data_platform']][['database']]
if ('pass' %in% names(db_config)) {
names(db_config)[names(db_config) == 'pass'] <- 'password'
}
db <- do.call(src_postgres, db_config)
return(db)
}
drop_tables <- function(file) {
config <- fromJSON(file=file)
for (table.info in config) {
print(paste('Dropping', table.info$table, 'indicator table.'))
db <- get_db_connection()
dbRemoveTable(db$con, name=table.info$table)
}
}
write_tables <- function(file, debug) {
config <- fromJSON(file=file)
db <- get_db_connection()
for (table.info in config) {
print(paste('Computing indicators for ', table.info$table, 'indicator table.'))
df <- compute_indicators(table.info, db, debug)
print(paste('Writing ', table.info$table, 'indicator table.'))
dbRemoveTable(db$con, name=table.info$table)
copy_to(db, df=df, name=table.info$table, temporary=FALSE)
}
}
compute_indicators <- function(info, db, debug) {
debug <- as.logical(debug)
if (debug == T) {limit = 5000} else {limit = -1}
dfs <- lapply(info$components, function(component) {
print(paste('Getting data source ', component$table))
source.data <- get_data_source(db, component$table, limit)
group.by.str <- paste(info$by, collapse=', ')
print(paste('Grouping and aggregating', component$table))
df <- source.data %.% s_group_by(group.by.str) %.% aggregate(component$columns)
return(df)
})
print('merging...')
merged <- Reduce(function(...) merge(..., all.x=TRUE, all.y=TRUE, by=info$by), dfs)
return(merged)
}
aggregate <- function(data, columns) {
column.names <- names(columns)
f <- function(block) {
vector <- sapply(column.names, function(cname) get(columns[[cname]])(block))
df <- as.data.frame(t(vector))
names(df) <- column.names
return(df)
}
return(data %.% do(f(.)))
}