mirror of
https://github.com/twitter/the-algorithm.git
synced 2024-12-22 10:25:29 +00:00
[Medium][UUA] Clean up BCE in UUA
This is to clean up the BCE adapters and services in UUA since BCE no longer exists.
This commit is contained in:
parent
4df87a278e
commit
23fa75d406
|
@ -1,13 +0,0 @@
|
|||
scala_library(
|
||||
sources = [
|
||||
"*.scala",
|
||||
],
|
||||
tags = ["bazel-compatible"],
|
||||
dependencies = [
|
||||
"client-events/thrift/src/thrift/storage/twitter/behavioral_event:behavioral_event-scala",
|
||||
"kafka/finagle-kafka/finatra-kafka/src/main/scala",
|
||||
"unified_user_actions/adapter/src/main/scala/com/twitter/unified_user_actions/adapter:base",
|
||||
"unified_user_actions/adapter/src/main/scala/com/twitter/unified_user_actions/adapter/common",
|
||||
"unified_user_actions/thrift/src/main/thrift/com/twitter/unified_user_actions:unified_user_actions-scala",
|
||||
],
|
||||
)
|
|
@ -1,96 +0,0 @@
|
|||
package com.twitter.unified_user_actions.adapter.behavioral_client_event
|
||||
|
||||
import com.twitter.client_event_entities.serverside_context_key.latest.thriftscala.FlattenedServersideContextKey
|
||||
import com.twitter.storage.behavioral_event.thriftscala.EventLogContext
|
||||
import com.twitter.storage.behavioral_event.thriftscala.FlattenedEventLog
|
||||
import com.twitter.unified_user_actions.adapter.common.AdapterUtils
|
||||
import com.twitter.unified_user_actions.thriftscala.ActionType
|
||||
import com.twitter.unified_user_actions.thriftscala.BreadcrumbTweet
|
||||
import com.twitter.unified_user_actions.thriftscala.ClientEventNamespace
|
||||
import com.twitter.unified_user_actions.thriftscala.EventMetadata
|
||||
import com.twitter.unified_user_actions.thriftscala.Item
|
||||
import com.twitter.unified_user_actions.thriftscala.ProductSurface
|
||||
import com.twitter.unified_user_actions.thriftscala.ProductSurfaceInfo
|
||||
import com.twitter.unified_user_actions.thriftscala.SourceLineage
|
||||
import com.twitter.unified_user_actions.thriftscala.UnifiedUserAction
|
||||
import com.twitter.unified_user_actions.thriftscala.UserIdentifier
|
||||
|
||||
case class ProductSurfaceRelated(
|
||||
productSurface: Option[ProductSurface],
|
||||
productSurfaceInfo: Option[ProductSurfaceInfo])
|
||||
|
||||
trait BaseBCEAdapter {
|
||||
def toUUA(e: FlattenedEventLog): Seq[UnifiedUserAction]
|
||||
|
||||
protected def getUserIdentifier(c: EventLogContext): UserIdentifier =
|
||||
UserIdentifier(
|
||||
userId = c.userId,
|
||||
guestIdMarketing = c.guestIdMarketing
|
||||
)
|
||||
|
||||
protected def getEventMetadata(e: FlattenedEventLog): EventMetadata =
|
||||
EventMetadata(
|
||||
sourceLineage = SourceLineage.BehavioralClientEvents,
|
||||
sourceTimestampMs =
|
||||
e.context.driftAdjustedEventCreatedAtMs.getOrElse(e.context.eventCreatedAtMs),
|
||||
receivedTimestampMs = AdapterUtils.currentTimestampMs,
|
||||
// Client UI language or from Gizmoduck which is what user set in Twitter App.
|
||||
// Please see more at https://sourcegraph.twitter.biz/git.twitter.biz/source/-/blob/finatra-internal/international/src/main/scala/com/twitter/finatra/international/LanguageIdentifier.scala
|
||||
// The format should be ISO 639-1.
|
||||
language = e.context.languageCode.map(AdapterUtils.normalizeLanguageCode),
|
||||
// Country code could be IP address (geoduck) or User registration country (gizmoduck) and the former takes precedence.
|
||||
// We don’t know exactly which one is applied, unfortunately,
|
||||
// see https://sourcegraph.twitter.biz/git.twitter.biz/source/-/blob/finatra-internal/international/src/main/scala/com/twitter/finatra/international/CountryIdentifier.scala
|
||||
// The format should be ISO_3166-1_alpha-2.
|
||||
countryCode = e.context.countryCode.map(AdapterUtils.normalizeCountryCode),
|
||||
clientAppId = e.context.clientApplicationId,
|
||||
clientVersion = e.context.clientVersion,
|
||||
clientPlatform = e.context.clientPlatform,
|
||||
viewHierarchy = e.v1ViewTypeHierarchy,
|
||||
clientEventNamespace = Some(
|
||||
ClientEventNamespace(
|
||||
page = e.page,
|
||||
section = e.section,
|
||||
element = e.element,
|
||||
action = e.actionName,
|
||||
subsection = e.subsection
|
||||
)),
|
||||
breadcrumbViews = e.v1BreadcrumbViewTypeHierarchy,
|
||||
breadcrumbTweets = e.v1BreadcrumbTweetIds.map { breadcrumbs =>
|
||||
breadcrumbs.map { breadcrumb =>
|
||||
BreadcrumbTweet(
|
||||
tweetId = breadcrumb.serversideContextId.toLong,
|
||||
sourceComponent = breadcrumb.sourceComponent)
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
protected def getBreadcrumbTweetIds(
|
||||
breadcrumbTweetIds: Option[Seq[FlattenedServersideContextKey]]
|
||||
): Seq[BreadcrumbTweet] =
|
||||
breadcrumbTweetIds
|
||||
.getOrElse(Nil).map(breadcrumb => {
|
||||
BreadcrumbTweet(
|
||||
tweetId = breadcrumb.serversideContextId.toLong,
|
||||
sourceComponent = breadcrumb.sourceComponent)
|
||||
})
|
||||
|
||||
protected def getBreadcrumbViews(breadcrumbView: Option[Seq[String]]): Seq[String] =
|
||||
breadcrumbView.getOrElse(Nil)
|
||||
|
||||
protected def getUnifiedUserAction(
|
||||
event: FlattenedEventLog,
|
||||
actionType: ActionType,
|
||||
item: Item,
|
||||
productSurface: Option[ProductSurface] = None,
|
||||
productSurfaceInfo: Option[ProductSurfaceInfo] = None
|
||||
): UnifiedUserAction =
|
||||
UnifiedUserAction(
|
||||
userIdentifier = getUserIdentifier(event.context),
|
||||
actionType = actionType,
|
||||
item = item,
|
||||
eventMetadata = getEventMetadata(event),
|
||||
productSurface = productSurface,
|
||||
productSurfaceInfo = productSurfaceInfo
|
||||
)
|
||||
}
|
|
@ -1,39 +0,0 @@
|
|||
package com.twitter.unified_user_actions.adapter.behavioral_client_event
|
||||
|
||||
import com.twitter.finagle.stats.NullStatsReceiver
|
||||
import com.twitter.finagle.stats.StatsReceiver
|
||||
import com.twitter.finatra.kafka.serde.UnKeyed
|
||||
import com.twitter.storage.behavioral_event.thriftscala.FlattenedEventLog
|
||||
import com.twitter.unified_user_actions.adapter.AbstractAdapter
|
||||
import com.twitter.unified_user_actions.thriftscala._
|
||||
|
||||
class BehavioralClientEventAdapter
|
||||
extends AbstractAdapter[FlattenedEventLog, UnKeyed, UnifiedUserAction] {
|
||||
|
||||
import BehavioralClientEventAdapter._
|
||||
|
||||
override def adaptOneToKeyedMany(
|
||||
input: FlattenedEventLog,
|
||||
statsReceiver: StatsReceiver = NullStatsReceiver
|
||||
): Seq[(UnKeyed, UnifiedUserAction)] =
|
||||
adaptEvent(input).map { e => (UnKeyed, e) }
|
||||
}
|
||||
|
||||
object BehavioralClientEventAdapter {
|
||||
def adaptEvent(e: FlattenedEventLog): Seq[UnifiedUserAction] =
|
||||
// See go/bcecoverage for event namespaces, usage and coverage details
|
||||
Option(e)
|
||||
.map { e =>
|
||||
(e.page, e.actionName) match {
|
||||
case (Some("tweet_details"), Some("impress")) =>
|
||||
TweetImpressionBCEAdapter.TweetDetails.toUUA(e)
|
||||
case (Some("fullscreen_video"), Some("impress")) =>
|
||||
TweetImpressionBCEAdapter.FullscreenVideo.toUUA(e)
|
||||
case (Some("fullscreen_image"), Some("impress")) =>
|
||||
TweetImpressionBCEAdapter.FullscreenImage.toUUA(e)
|
||||
case (Some("profile"), Some("impress")) =>
|
||||
ProfileImpressionBCEAdapter.Profile.toUUA(e)
|
||||
case _ => Nil
|
||||
}
|
||||
}.getOrElse(Nil)
|
||||
}
|
|
@ -1,34 +0,0 @@
|
|||
package com.twitter.unified_user_actions.adapter.behavioral_client_event
|
||||
|
||||
import com.twitter.client.behavioral_event.action.impress.latest.thriftscala.Impress
|
||||
import com.twitter.client_event_entities.serverside_context_key.latest.thriftscala.FlattenedServersideContextKey
|
||||
import com.twitter.unified_user_actions.thriftscala.Item
|
||||
|
||||
trait ImpressionBCEAdapter extends BaseBCEAdapter {
|
||||
type ImpressedItem <: Item
|
||||
|
||||
def getImpressedItem(
|
||||
context: FlattenedServersideContextKey,
|
||||
impression: Impress
|
||||
): ImpressedItem
|
||||
|
||||
/**
|
||||
* The start time of an impression in milliseconds since epoch. In BCE, the impression
|
||||
* tracking clock will start immediately after the page is visible with no initial delay.
|
||||
*/
|
||||
def getImpressedStartTimestamp(impression: Impress): Long =
|
||||
impression.visibilityPctDwellStartMs
|
||||
|
||||
/**
|
||||
* The end time of an impression in milliseconds since epoch. In BCE, the impression
|
||||
* tracking clock will end before the user exit the page.
|
||||
*/
|
||||
def getImpressedEndTimestamp(impression: Impress): Long =
|
||||
impression.visibilityPctDwellEndMs
|
||||
|
||||
/**
|
||||
* The UI component that hosted the impressed item.
|
||||
*/
|
||||
def getImpressedUISourceComponent(context: FlattenedServersideContextKey): String =
|
||||
context.sourceComponent
|
||||
}
|
|
@ -1,52 +0,0 @@
|
|||
package com.twitter.unified_user_actions.adapter.behavioral_client_event
|
||||
|
||||
import com.twitter.client.behavioral_event.action.impress.latest.thriftscala.Impress
|
||||
import com.twitter.client_event_entities.serverside_context_key.latest.thriftscala.FlattenedServersideContextKey
|
||||
import com.twitter.storage.behavioral_event.thriftscala.FlattenedEventLog
|
||||
import com.twitter.unified_user_actions.thriftscala.ActionType
|
||||
import com.twitter.unified_user_actions.thriftscala.ClientProfileV2Impression
|
||||
import com.twitter.unified_user_actions.thriftscala.Item
|
||||
import com.twitter.unified_user_actions.thriftscala.ProductSurface
|
||||
import com.twitter.unified_user_actions.thriftscala.ProfileActionInfo
|
||||
import com.twitter.unified_user_actions.thriftscala.ProfileInfo
|
||||
import com.twitter.unified_user_actions.thriftscala.UnifiedUserAction
|
||||
|
||||
object ProfileImpressionBCEAdapter {
|
||||
val Profile = new ProfileImpressionBCEAdapter()
|
||||
}
|
||||
|
||||
class ProfileImpressionBCEAdapter extends ImpressionBCEAdapter {
|
||||
override type ImpressedItem = Item.ProfileInfo
|
||||
|
||||
override def toUUA(e: FlattenedEventLog): Seq[UnifiedUserAction] =
|
||||
(e.v2Impress, e.v1UserIds) match {
|
||||
case (Some(v2Impress), Some(v1UserIds)) =>
|
||||
v1UserIds.map { user =>
|
||||
getUnifiedUserAction(
|
||||
event = e,
|
||||
actionType = ActionType.ClientProfileV2Impression,
|
||||
item = getImpressedItem(user, v2Impress),
|
||||
productSurface = Some(ProductSurface.ProfilePage)
|
||||
)
|
||||
}
|
||||
case _ => Nil
|
||||
}
|
||||
|
||||
override def getImpressedItem(
|
||||
context: FlattenedServersideContextKey,
|
||||
impression: Impress
|
||||
): ImpressedItem =
|
||||
Item.ProfileInfo(
|
||||
ProfileInfo(
|
||||
actionProfileId = context.serversideContextId.toLong,
|
||||
profileActionInfo = Some(
|
||||
ProfileActionInfo.ClientProfileV2Impression(
|
||||
ClientProfileV2Impression(
|
||||
impressStartTimestampMs = getImpressedStartTimestamp(impression),
|
||||
impressEndTimestampMs = getImpressedEndTimestamp(impression),
|
||||
sourceComponent = getImpressedUISourceComponent(context)
|
||||
)
|
||||
)
|
||||
)
|
||||
))
|
||||
}
|
|
@ -1,84 +0,0 @@
|
|||
package com.twitter.unified_user_actions.adapter.behavioral_client_event
|
||||
|
||||
import com.twitter.client.behavioral_event.action.impress.latest.thriftscala.Impress
|
||||
import com.twitter.client_event_entities.serverside_context_key.latest.thriftscala.FlattenedServersideContextKey
|
||||
import com.twitter.storage.behavioral_event.thriftscala.FlattenedEventLog
|
||||
import com.twitter.unified_user_actions.thriftscala.ActionType
|
||||
import com.twitter.unified_user_actions.thriftscala.ClientTweetV2Impression
|
||||
import com.twitter.unified_user_actions.thriftscala.Item
|
||||
import com.twitter.unified_user_actions.thriftscala.ProductSurface
|
||||
import com.twitter.unified_user_actions.thriftscala.TweetActionInfo
|
||||
import com.twitter.unified_user_actions.thriftscala.TweetInfo
|
||||
import com.twitter.unified_user_actions.thriftscala.UnifiedUserAction
|
||||
|
||||
object TweetImpressionBCEAdapter {
|
||||
val TweetDetails = new TweetImpressionBCEAdapter(ActionType.ClientTweetV2Impression)
|
||||
val FullscreenVideo = new TweetImpressionBCEAdapter(
|
||||
ActionType.ClientTweetVideoFullscreenV2Impression)
|
||||
val FullscreenImage = new TweetImpressionBCEAdapter(
|
||||
ActionType.ClientTweetImageFullscreenV2Impression)
|
||||
}
|
||||
|
||||
class TweetImpressionBCEAdapter(actionType: ActionType) extends ImpressionBCEAdapter {
|
||||
override type ImpressedItem = Item.TweetInfo
|
||||
|
||||
override def toUUA(e: FlattenedEventLog): Seq[UnifiedUserAction] =
|
||||
(actionType, e.v2Impress, e.v1TweetIds, e.v1BreadcrumbTweetIds) match {
|
||||
case (ActionType.ClientTweetV2Impression, Some(v2Impress), Some(v1TweetIds), _) =>
|
||||
toUUAEvents(e, v2Impress, v1TweetIds)
|
||||
case (
|
||||
ActionType.ClientTweetVideoFullscreenV2Impression,
|
||||
Some(v2Impress),
|
||||
_,
|
||||
Some(v1BreadcrumbTweetIds)) =>
|
||||
toUUAEvents(e, v2Impress, v1BreadcrumbTweetIds)
|
||||
case (
|
||||
ActionType.ClientTweetImageFullscreenV2Impression,
|
||||
Some(v2Impress),
|
||||
_,
|
||||
Some(v1BreadcrumbTweetIds)) =>
|
||||
toUUAEvents(e, v2Impress, v1BreadcrumbTweetIds)
|
||||
case _ => Nil
|
||||
}
|
||||
|
||||
private def toUUAEvents(
|
||||
e: FlattenedEventLog,
|
||||
v2Impress: Impress,
|
||||
v1TweetIds: Seq[FlattenedServersideContextKey]
|
||||
): Seq[UnifiedUserAction] =
|
||||
v1TweetIds.map { tweet =>
|
||||
getUnifiedUserAction(
|
||||
event = e,
|
||||
actionType = actionType,
|
||||
item = getImpressedItem(tweet, v2Impress),
|
||||
productSurface = getProductSurfaceRelated.productSurface,
|
||||
productSurfaceInfo = getProductSurfaceRelated.productSurfaceInfo
|
||||
)
|
||||
}
|
||||
|
||||
override def getImpressedItem(
|
||||
context: FlattenedServersideContextKey,
|
||||
impression: Impress
|
||||
): ImpressedItem =
|
||||
Item.TweetInfo(
|
||||
TweetInfo(
|
||||
actionTweetId = context.serversideContextId.toLong,
|
||||
tweetActionInfo = Some(
|
||||
TweetActionInfo.ClientTweetV2Impression(
|
||||
ClientTweetV2Impression(
|
||||
impressStartTimestampMs = getImpressedStartTimestamp(impression),
|
||||
impressEndTimestampMs = getImpressedEndTimestamp(impression),
|
||||
sourceComponent = getImpressedUISourceComponent(context)
|
||||
)
|
||||
))
|
||||
))
|
||||
|
||||
private def getProductSurfaceRelated: ProductSurfaceRelated =
|
||||
actionType match {
|
||||
case ActionType.ClientTweetV2Impression =>
|
||||
ProductSurfaceRelated(
|
||||
productSurface = Some(ProductSurface.TweetDetailsPage),
|
||||
productSurfaceInfo = None)
|
||||
case _ => ProductSurfaceRelated(productSurface = None, productSurfaceInfo = None)
|
||||
}
|
||||
}
|
|
@ -1,139 +0,0 @@
|
|||
package com.twitter.unified_user_actions.adapter
|
||||
|
||||
import com.twitter.inject.Test
|
||||
import com.twitter.storage.behavioral_event.thriftscala.FlattenedEventLog
|
||||
import com.twitter.unified_user_actions.adapter.TestFixtures.BCEFixture
|
||||
import com.twitter.unified_user_actions.adapter.behavioral_client_event.BehavioralClientEventAdapter
|
||||
import com.twitter.unified_user_actions.thriftscala._
|
||||
import com.twitter.util.Time
|
||||
import org.scalatest.prop.TableDrivenPropertyChecks
|
||||
|
||||
class BehavioralClientEventAdapterSpec extends Test with TableDrivenPropertyChecks {
|
||||
|
||||
test("basic event conversion should be correct") {
|
||||
new BCEFixture {
|
||||
Time.withTimeAt(frozenTime) { _ =>
|
||||
val tests = Table(
|
||||
("event", "expected", "description"),
|
||||
(
|
||||
makeBCEEvent(),
|
||||
makeUUAImpressEvent(productSurface = Some(ProductSurface.TweetDetailsPage)),
|
||||
"tweet_details conversion"),
|
||||
(makeBCEProfileImpressEvent(), makeUUAProfileImpressEvent(), "profile conversion"),
|
||||
(
|
||||
makeBCEVideoFullscreenImpressEvent(),
|
||||
makeUUAVideoFullscreenImpressEvent(),
|
||||
"fullscreen_video conversion"),
|
||||
(
|
||||
makeBCEImageFullscreenImpressEvent(),
|
||||
makeUUAImageFullscreenImpressEvent(),
|
||||
"fullscreen_image conversion"),
|
||||
)
|
||||
forEvery(tests) { (input: FlattenedEventLog, expected: UnifiedUserAction, desc: String) =>
|
||||
assert(Seq(expected) === BehavioralClientEventAdapter.adaptEvent(input), desc)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
test(
|
||||
"tweet_details is NOT missing productSurface[Info] when empty breadcrumb components and breadcrumbs tweets id") {
|
||||
new BCEFixture {
|
||||
Time.withTimeAt(frozenTime) { _ =>
|
||||
val input = makeBCEEvent(v1BreadcrumbViewTypeHierarchy = None, v1BreadcrumbTweetIds = None)
|
||||
val expected =
|
||||
makeUUAImpressEvent(
|
||||
productSurface = Some(ProductSurface.TweetDetailsPage),
|
||||
breadcrumbViews = None,
|
||||
breadcrumbTweets = None)
|
||||
val actual = BehavioralClientEventAdapter.adaptEvent(input)
|
||||
|
||||
assert(Seq(expected) === actual)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
test("tweet_details is not missing productSurface[Info] when only breadcrumb tweets is empty") {
|
||||
new BCEFixture {
|
||||
Time.withTimeAt(frozenTime) { _ =>
|
||||
val input = makeBCEEvent(v1BreadcrumbTweetIds = None)
|
||||
val expected = makeUUAImpressEvent(
|
||||
productSurface = Some(ProductSurface.TweetDetailsPage),
|
||||
breadcrumbViews = Some(viewBreadcrumbs),
|
||||
breadcrumbTweets = None
|
||||
)
|
||||
val actual = BehavioralClientEventAdapter.adaptEvent(input)
|
||||
|
||||
assert(Seq(expected) === actual)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
test("unsupported events should be skipped") {
|
||||
new BCEFixture {
|
||||
val unsupportedPage = "unsupported_page"
|
||||
val unsupportedAction = "unsupported_action"
|
||||
val supportedNamespaces = Table(
|
||||
("page", "actions"),
|
||||
("tweet_details", Seq("impress")),
|
||||
("profile", Seq("impress")),
|
||||
)
|
||||
|
||||
forAll(supportedNamespaces) { (page: String, actions: Seq[String]) =>
|
||||
actions.foreach { supportedAction =>
|
||||
assert(
|
||||
BehavioralClientEventAdapter
|
||||
.adaptEvent(
|
||||
makeBCEEvent(
|
||||
currentPage = Some(unsupportedPage),
|
||||
actionName = Some(supportedAction))).isEmpty)
|
||||
|
||||
assert(BehavioralClientEventAdapter
|
||||
.adaptEvent(
|
||||
makeBCEEvent(currentPage = Some(page), actionName = Some(unsupportedAction))).isEmpty)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
test("event w/ missing info should be skipped") {
|
||||
new BCEFixture {
|
||||
val eventsWithMissingInfo = Table(
|
||||
("event", "description"),
|
||||
(null.asInstanceOf[FlattenedEventLog], "null event"),
|
||||
(makeBCEEvent(v2Impress = None), "impression event missing v2Impress"),
|
||||
(makeBCEEvent(v1TweetIds = None), "tweet event missing v1TweetIds"),
|
||||
(makeBCEProfileImpressEvent(v1UserIds = None), "profile event missing v1UserIds"),
|
||||
(
|
||||
makeBCEVideoFullscreenImpressEvent(v1BreadcrumbTweetIds = None),
|
||||
"fullscreen_video event missing v1BreadcrumbTweetIds"),
|
||||
(
|
||||
makeBCEImageFullscreenImpressEvent(v1BreadcrumbTweetIds = None),
|
||||
"fullscreen_image event missing v1BreadcrumbTweetIds"),
|
||||
)
|
||||
|
||||
forEvery(eventsWithMissingInfo) { (event: FlattenedEventLog, desc: String) =>
|
||||
assert(
|
||||
BehavioralClientEventAdapter
|
||||
.adaptEvent(event).isEmpty,
|
||||
desc)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
test("use eventCreateAtMs when driftAdjustedTimetampMs is empty") {
|
||||
new BCEFixture {
|
||||
Time.withTimeAt(frozenTime) { _ =>
|
||||
val input = makeBCEEvent(
|
||||
context = makeBCEContext(driftAdjustedEventCreatedAtMs = None)
|
||||
)
|
||||
val expected = makeUUAImpressEvent(
|
||||
createTs = eventCreatedTime,
|
||||
productSurface = Some(ProductSurface.TweetDetailsPage))
|
||||
val actual = BehavioralClientEventAdapter.adaptEvent(input)
|
||||
|
||||
assert(Seq(expected) === actual)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,25 +0,0 @@
|
|||
package com.twitter.unified_user_actions.service;
|
||||
|
||||
import com.twitter.finatra.decider.modules.DeciderModule
|
||||
import com.twitter.finatra.kafka.serde.UnKeyed
|
||||
import com.twitter.inject.server.TwitterServer
|
||||
import com.twitter.kafka.client.processor.AtLeastOnceProcessor
|
||||
import com.twitter.storage.behavioral_event.thriftscala.FlattenedEventLog
|
||||
import com.twitter.unified_user_actions.service.module.KafkaProcessorBehavioralClientEventModule
|
||||
|
||||
object BehavioralClientEventServiceMain extends BehavioralClientEventService
|
||||
|
||||
class BehavioralClientEventService extends TwitterServer {
|
||||
override val modules = Seq(
|
||||
KafkaProcessorBehavioralClientEventModule,
|
||||
DeciderModule
|
||||
)
|
||||
|
||||
override protected def setup(): Unit = {}
|
||||
|
||||
override protected def start(): Unit = {
|
||||
val processor = injector.instance[AtLeastOnceProcessor[UnKeyed, FlattenedEventLog]]
|
||||
closeOnExit(processor)
|
||||
processor.start()
|
||||
}
|
||||
}
|
|
@ -1,87 +0,0 @@
|
|||
package com.twitter.unified_user_actions.service.module
|
||||
|
||||
import com.google.inject.Provides
|
||||
import com.twitter.decider.Decider
|
||||
import com.twitter.finagle.stats.StatsReceiver
|
||||
import com.twitter.finatra.kafka.serde.UnKeyed
|
||||
import com.twitter.finatra.kafka.serde.UnKeyedSerde
|
||||
import com.twitter.inject.annotations.Flag
|
||||
import com.twitter.inject.TwitterModule
|
||||
import com.twitter.kafka.client.processor.AtLeastOnceProcessor
|
||||
import com.twitter.storage.behavioral_event.thriftscala.FlattenedEventLog
|
||||
import com.twitter.unified_user_actions.adapter.behavioral_client_event.BehavioralClientEventAdapter
|
||||
import com.twitter.unified_user_actions.kafka.CompressionTypeFlag
|
||||
import com.twitter.unified_user_actions.kafka.serde.NullableScalaSerdes
|
||||
import com.twitter.util.Duration
|
||||
import com.twitter.util.StorageUnit
|
||||
import com.twitter.util.logging.Logging
|
||||
import javax.inject.Singleton
|
||||
|
||||
object KafkaProcessorBehavioralClientEventModule extends TwitterModule with Logging {
|
||||
override def modules = Seq(FlagsModule)
|
||||
|
||||
private val adapter: BehavioralClientEventAdapter = new BehavioralClientEventAdapter
|
||||
private final val processorName: String = "uuaProcessor"
|
||||
|
||||
@Provides
|
||||
@Singleton
|
||||
def providesKafkaProcessor(
|
||||
decider: Decider,
|
||||
@Flag(FlagsModule.cluster) cluster: String,
|
||||
@Flag(FlagsModule.kafkaSourceCluster) kafkaSourceCluster: String,
|
||||
@Flag(FlagsModule.kafkaDestCluster) kafkaDestCluster: String,
|
||||
@Flag(FlagsModule.kafkaSourceTopic) kafkaSourceTopic: String,
|
||||
@Flag(FlagsModule.kafkaSinkTopics) kafkaSinkTopics: Seq[String],
|
||||
@Flag(FlagsModule.kafkaGroupId) kafkaGroupId: String,
|
||||
@Flag(FlagsModule.kafkaProducerClientId) kafkaProducerClientId: String,
|
||||
@Flag(FlagsModule.kafkaMaxPendingRequests) kafkaMaxPendingRequests: Int,
|
||||
@Flag(FlagsModule.kafkaWorkerThreads) kafkaWorkerThreads: Int,
|
||||
@Flag(FlagsModule.commitInterval) commitInterval: Duration,
|
||||
@Flag(FlagsModule.maxPollRecords) maxPollRecords: Int,
|
||||
@Flag(FlagsModule.maxPollInterval) maxPollInterval: Duration,
|
||||
@Flag(FlagsModule.sessionTimeout) sessionTimeout: Duration,
|
||||
@Flag(FlagsModule.fetchMax) fetchMax: StorageUnit,
|
||||
@Flag(FlagsModule.batchSize) batchSize: StorageUnit,
|
||||
@Flag(FlagsModule.linger) linger: Duration,
|
||||
@Flag(FlagsModule.bufferMem) bufferMem: StorageUnit,
|
||||
@Flag(FlagsModule.compressionType) compressionTypeFlag: CompressionTypeFlag,
|
||||
@Flag(FlagsModule.retries) retries: Int,
|
||||
@Flag(FlagsModule.retryBackoff) retryBackoff: Duration,
|
||||
@Flag(FlagsModule.requestTimeout) requestTimeout: Duration,
|
||||
@Flag(FlagsModule.enableTrustStore) enableTrustStore: Boolean,
|
||||
@Flag(FlagsModule.trustStoreLocation) trustStoreLocation: String,
|
||||
statsReceiver: StatsReceiver,
|
||||
): AtLeastOnceProcessor[UnKeyed, FlattenedEventLog] = {
|
||||
KafkaProcessorProvider.provideDefaultAtLeastOnceProcessor(
|
||||
name = processorName,
|
||||
kafkaSourceCluster = kafkaSourceCluster,
|
||||
kafkaGroupId = kafkaGroupId,
|
||||
kafkaSourceTopic = kafkaSourceTopic,
|
||||
sourceKeyDeserializer = UnKeyedSerde.deserializer,
|
||||
sourceValueDeserializer = NullableScalaSerdes
|
||||
.Thrift[FlattenedEventLog](statsReceiver.counter("deserializerErrors")).deserializer,
|
||||
commitInterval = commitInterval,
|
||||
maxPollRecords = maxPollRecords,
|
||||
maxPollInterval = maxPollInterval,
|
||||
sessionTimeout = sessionTimeout,
|
||||
fetchMax = fetchMax,
|
||||
processorMaxPendingRequests = kafkaMaxPendingRequests,
|
||||
processorWorkerThreads = kafkaWorkerThreads,
|
||||
adapter = adapter,
|
||||
kafkaSinkTopics = kafkaSinkTopics,
|
||||
kafkaDestCluster = kafkaDestCluster,
|
||||
kafkaProducerClientId = kafkaProducerClientId,
|
||||
batchSize = batchSize,
|
||||
linger = linger,
|
||||
bufferMem = bufferMem,
|
||||
compressionType = compressionTypeFlag.compressionType,
|
||||
retries = retries,
|
||||
retryBackoff = retryBackoff,
|
||||
requestTimeout = requestTimeout,
|
||||
statsReceiver = statsReceiver,
|
||||
trustStoreLocationOpt = if (enableTrustStore) Some(trustStoreLocation) else None,
|
||||
decider = decider,
|
||||
zone = ZoneFiltering.zoneMapping(cluster),
|
||||
)
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue