mirror of
https://github.com/twitter/the-algorithm.git
synced 2024-12-22 19:15:30 +00:00
617c8c787d
Unified User Action (UUA) is a centralized, real-time stream of user actions on Twitter, consumed by various product, ML, and marketing teams. UUA makes sure all internal teams consume the uniformed user actions data in an accurate and fast way.
170 lines
4.9 KiB
Plaintext
170 lines
4.9 KiB
Plaintext
import os
|
|
import itertools
|
|
import subprocess
|
|
import math
|
|
|
|
SERVICE_NAME = 'uua-email-notification-event'
|
|
|
|
CPU_NUM = 3
|
|
HEAP_SIZE = 3 * GB
|
|
RAM_SIZE = HEAP_SIZE + 1 * GB
|
|
# We make disk size larger than HEAP so that if we ever need to do a heap dump, it will fit on disk.
|
|
DISK_SIZE = HEAP_SIZE + 2 * GB
|
|
|
|
class Profile(Struct):
|
|
package = Default(String, SERVICE_NAME)
|
|
cmdline_flags = Default(String, '')
|
|
log_level = Default(String, 'INFO')
|
|
instances = Default(Integer, 20)
|
|
kafka_bootstrap_servers = Default(String, '/s/kafka/main-2:kafka-tls')
|
|
kafka_bootstrap_servers_remote_dest = Default(String, '/s/kafka/bluebird-1:kafka-tls')
|
|
source_topic = Default(String, 'notifications')
|
|
sink_topics = Default(String, 'unified_user_actions,unified_user_actions_engagements')
|
|
decider_overlay = Default(String, '')
|
|
|
|
resources = Resources(
|
|
cpu = CPU_NUM,
|
|
ram = RAM_SIZE,
|
|
disk = RAM_SIZE
|
|
)
|
|
|
|
install = Packer.install(
|
|
name = '{{profile.package}}',
|
|
version = Workflows.package_version()
|
|
)
|
|
|
|
async_profiler_install = Packer.install(
|
|
name = 'async-profiler',
|
|
role = 'csl-perf',
|
|
version = 'latest'
|
|
)
|
|
|
|
setup_jaas_config = Process(
|
|
name = 'setup_jaas_config',
|
|
cmdline = '''
|
|
mkdir -p jaas_config
|
|
echo "KafkaClient {
|
|
com.sun.security.auth.module.Krb5LoginModule required
|
|
principal=\\"discode@TWITTER.BIZ\\"
|
|
useKeyTab=true
|
|
storeKey=true
|
|
keyTab=\\"/var/lib/tss/keys/fluffy/keytabs/client/discode.keytab\\"
|
|
doNotPrompt=true;
|
|
};" >> jaas_config/jaas.conf
|
|
'''
|
|
)
|
|
|
|
main = JVMProcess(
|
|
name = SERVICE_NAME,
|
|
jvm = Java11(
|
|
heap = HEAP_SIZE,
|
|
extra_jvm_flags =
|
|
'-Djava.net.preferIPv4Stack=true'
|
|
|
|
' -XX:+UseNUMA'
|
|
' -XX:+AggressiveOpts'
|
|
' -XX:+PerfDisableSharedMem' # http://www.evanjones.ca/jvm-mmap-pause.html
|
|
|
|
' -Dlog_level={{profile.log_level}}'
|
|
' -Dlog.access.output=access.log'
|
|
' -Dlog.service.output={{name}}.log'
|
|
' -Djava.security.auth.login.config=jaas_config/jaas.conf'
|
|
),
|
|
arguments =
|
|
'-jar {{name}}-bin.jar'
|
|
' -admin.port=:{{thermos.ports[health]}}'
|
|
' -kafka.bootstrap.servers={{profile.kafka_bootstrap_servers}}'
|
|
' -kafka.bootstrap.servers.remote.dest={{profile.kafka_bootstrap_servers_remote_dest}}'
|
|
' -kafka.group.id={{name}}-{{environment}}'
|
|
' -kafka.producer.client.id={{name}}-{{environment}}'
|
|
' -kafka.max.pending.requests=10000'
|
|
' -kafka.consumer.fetch.max=1.megabytes'
|
|
' -kafka.max.poll.records=20000'
|
|
' -kafka.commit.interval=10.seconds'
|
|
' -kafka.producer.batch.size=16.kilobytes'
|
|
' -kafka.producer.buffer.mem=64.megabytes'
|
|
' -kafka.producer.linger=0.milliseconds'
|
|
' -kafka.producer.request.timeout=30.seconds'
|
|
' -kafka.producer.compression.type=lz4'
|
|
' -kafka.worker.threads=5'
|
|
' -kafka.source.topic={{profile.source_topic}}'
|
|
' -kafka.sink.topics={{profile.sink_topics}}'
|
|
' -decider.base=decider.yml'
|
|
' -decider.overlay={{profile.decider_overlay}}'
|
|
' -cluster={{cluster}}'
|
|
' {{profile.cmdline_flags}}',
|
|
resources = resources
|
|
)
|
|
|
|
stats = Stats(
|
|
library = 'metrics',
|
|
port = 'admin'
|
|
)
|
|
|
|
job_template = Service(
|
|
name = SERVICE_NAME,
|
|
role = 'discode',
|
|
instances = '{{profile.instances}}',
|
|
contact = 'disco-data-eng@twitter.com',
|
|
constraints = {'rack': 'limit:1', 'host': 'limit:1'},
|
|
announce = Announcer(
|
|
primary_port = 'health',
|
|
portmap = {'aurora': 'health', 'admin': 'health'}
|
|
),
|
|
task = Task(
|
|
resources = resources,
|
|
name = SERVICE_NAME,
|
|
processes = [async_profiler_install, install, setup_jaas_config, main, stats],
|
|
constraints = order(async_profiler_install, install, setup_jaas_config, main)
|
|
),
|
|
health_check_config = HealthCheckConfig(
|
|
initial_interval_secs = 100,
|
|
interval_secs = 60,
|
|
timeout_secs = 60,
|
|
max_consecutive_failures = 4
|
|
),
|
|
update_config = UpdateConfig(
|
|
batch_size = 50,
|
|
watch_secs = 90,
|
|
max_per_shard_failures = 3,
|
|
max_total_failures = 0,
|
|
rollback_on_failure = False
|
|
)
|
|
)
|
|
|
|
PRODUCTION = Profile(
|
|
# go/uua-decider
|
|
decider_overlay = '/usr/local/config/overlays/discode-default/UnifiedUserActions/prod/{{cluster}}/decider_overlay.yml'
|
|
)
|
|
|
|
STAGING = Profile(
|
|
package = SERVICE_NAME+'-staging',
|
|
cmdline_flags = '',
|
|
kafka_bootstrap_servers_remote_dest = '/s/kafka/custdevel:kafka-tls',
|
|
decider_overlay = '/usr/local/config/overlays/discode-default/UnifiedUserActions/staging/{{cluster}}/decider_overlay.yml' # go/uua-decider
|
|
)
|
|
|
|
DEVEL = STAGING(
|
|
log_level = 'INFO',
|
|
)
|
|
|
|
|
|
prod_job = job_template(
|
|
tier = 'preferred',
|
|
environment = 'prod',
|
|
).bind(profile = PRODUCTION)
|
|
|
|
staging_job = job_template(
|
|
environment = 'staging'
|
|
).bind(profile = STAGING)
|
|
|
|
devel_job = job_template(
|
|
environment = 'devel'
|
|
).bind(profile = DEVEL)
|
|
|
|
jobs = []
|
|
for cluster in ['atla', 'pdxa']:
|
|
jobs.append(prod_job(cluster = cluster))
|
|
jobs.append(staging_job(cluster = cluster))
|
|
jobs.append(devel_job(cluster = cluster))
|