Jason Pan

Oceanus 消费 TDBank/TubeMQ 多 Tid 遇到的问题与解决

潘忠显 / 2023-06-28


本文记录了在Oceanus上以Jar方式使用一个消费组,同时消费 TDBank 多个接口数据遇到的一些问题,以及解决方法。

1. 背景

TDBank(腾讯数据银行)是公司内的负责数据收集、分发、预处理的平台系统。其中有一部份功能是:业务侧部分日志数据通过 TDM / TGLOG 等方式落文件,TDAgent 会采集上报的,最终可写到消息队列 TubeMQ。

Oceanus 是公司内的实时计算平台。在 Oceanus 平台上,可以消费 TubeMQ 中的数据进行实时计算。

笔者在前段时间的工作中,需要在Oceanus上创建实时任务,从多个 Tdbank 业务下消费数据进行计算,而且每个业务下会同时消费多个接口数据。期间遇到了不少的问题,本文记录下问题内容、排查方式和解决方法。

任务所使用的开发和运行环境如下:

2. 没有多 Tid 消费的示例

在天穹的 iWiki 使用Tube表 中,只有一个使用 flink-connector-tube 1.9 消费单个 Tid 的示例。直接使用 TubeSourceFunction 创建一个SourceFunction,在通过 env.addSource 将其添加成 Data Source。

//初始化TubeSourceFunction
SourceFunction<byte[]> sourceFunction =
  new TubeSourceFunction(
  masterAddress,
  topic,
  tids,
  consumerGroup,
  configuration
);
env.addSource(sourceFunction) //添加source
  .flatMap(deserializer) //添加反序列化器
  .addSink(new ConsoleSink()); //添加sink

而我需要使用 flink-connector-tube 1.15 去消费多个 Tid,即没有 1.15 SDK 的示例,也没有 1.9 SDK 消费多个 tid的示例,所以只能自己去探索一下。

尝试创建多个 TubeSourceFunction

因为一个消费组只会记录一组消费的状态,如果创建的多个 TubeSourceFunction,相当于并发的使用同一个消费组,无论是1.15 还是 1.9 这样都是不允许的。

如果这样使用,会在 Flink 拉起之后会立马抛异常退出。

尝试使用重载的 TubeSourceFunction

tube-connector 1.15 的 TubeSourceFunction 的构造函数有两个。

第一个是对应 1.9 示例中的消费单个 tid 的构造函数。与 1.9 的定义不同,这直接将 Deserializer 的 Schema 作为函数传入,直接 env.addSource 得到的将会是 DataStream[RowData] 而无需再做额外的反序列化:

public TubeSourceFunction(
        String masterAddress,
        String topic,
        String consumerGroup,
        DeserializationSchema<RowData> deserializationSchema,
        String tids,
        Configuration configuration
)

第二个构造函数如下,从输入参数可以猜测,可以传入多个 tid 以及多个 tid 对应的 DeserializationSchema——tidToDeserializationSchema,并且需要给每个 tid 传入一个路径用于后续的数据区分 —— tidToDataCollectionIds

public TubeSourceFunction(
    String masterAddress,
    String topic,
    String consumerGroup,
    Map<String, DeserializationSchema<RowData>> tidToDeserializationSchema,
    Map<String, Set<ObjectPath>> tidToDataCollectionIds,
    String tids,
    boolean isMultiplexing,
    Configuration configuration
)

尝试使用第二个构造函数,去消费多个tid的数据,但是在这个过程中还会遇到新的问题。接下来解释下这些问题和解决。

3. 多 tid Source 数据分离

上边有提到,直接使用消费单个tid的TubeSourceFunction 添加的 Source 会得到一个 DataStream[RowData]。这个 RowData 是已经通过 DeserializerSchema 反序列化之后的,换句话说,可以直接通过 getString(pos) 或 getInt(pos) 等函数,直接根据接口定义的字段位置,从第pos个字段中,获取对应类型的值。

如何区分不同 tid 的数据

而通过第二个消费多个tid的 TubeSourceFunction 添加的Source同样会得到一个DataStream[RowData]那问题来了,从同一个流中,区分出不同的tid的数据呢?问了 天穹helper没有给找到具体的解释,只能自己的打印每一个RowData观察一下:

dump_multiple_tid_source_rowdata

上边打印结果显示:该 Source 的所有非 RowData 只有两个元素,第一个是一个byte[],第二个是个字符串路径,这个路径恰好就是一个消费组ID + tidToDataCollectionIds传入的标签。因此,区分tid思路就是根据这个路径将数据分成不同的流。

自定义一个 ProcessFucntion 并重写其 processElement 函数,通过Flink 旁路输出的方式,得到多个流:

class MultipleTidSourceSplitter(
    likeTag: OutputTag[Array[Byte]],
    playTag: OutputTag[Array[Byte]])
  extends ProcessFunction[RowData, RowData] {
  override def processElement(
      value: RowData,
      ctx: ProcessFunction[RowData, RowData]#Context,
      out: Collector[RowData]): Unit = {

    val bytes = value.getBinary(0)
    val path = value.getString(1).toString
    if (path.endsWith("MapLikeFlow")) {
      ctx.output(likeTag, bytes)
    } else if (path.endsWith("MapEnjoyResultFlow")) {
      ctx.output(playTag, bytes)
    }
  }
}

如何反序列化

上边区分出了不同数据的数据流,如何对 Array[Byte] 反序列化得到不同tid对应的RowData呢?光靠瞎猜和尝试是不行了,

这里要看 tube-connector 的源码看看这个结构到底是如何产生的,实际上上文提到的路径,也是在相同的位置塞进去的:

tube-source-func-emit-rowdata

具体的这里 bytes 是 RowData +DataOutputSerializer 通过 DeserializationSchema 创建出来的 serializer 进行序列化得到的结果:

TypeSerializer<RowData> typeSerializer = deserializationSchema.getProducedType().createSerializer(new ExecutionConfig());

RowData rowData = (RowData)var7.next();
typeSerializer.serialize(rowData, dataOutputSerializer);
byte[] bytes = dataOutputSerializer.getCopyOfBuffer();
dataOutputSerializer.clear();

将上边的过程进行逆向的操作,便可以得到对应 bytes 的 RowData:

val typeSerializer = schema.getProducedType.createSerializer(new ExecutionConfig)
val deser = new DataInputDeserializer(bytes)
val rowData = typeSerializer.deserialize(deser)

4. 多Tid数据放大问题

前面的处理,已经能消费到多个 tid 的数据。但是在将程序部署运行之后,发现产生的数据数量远比数据源的真实数据数量多。

具体地,针对单个 tid 分别使用两种方式去消费,即使用单 tid 和多 tid 的 TubeSourceFunction 进行消费,发现后者产生的数据数量巨大。通过 Flink UI 也可以清楚看到这一点:

单 tid 消费方式输入输出流量放大 6 倍左右:

multiple-tid-in-out-traffic-1

多 tid 消费方式输入输出流量放大 4000 倍左右

multiple-tid-in-out-traffic-2

很明显如果仅仅是因为添加一个路径,是差不出这么大的差距的。这里联系了 TEG 实时湖仓研发组 的同学,确认了一下此处是有 Bug,进行了及时的修改并提供了 SNAPSHOT 版本,更新依赖之后解决此问题。

5. TDBank Header与Oceanus解析不一致问题

源数据有创建两个业务,以区分正式环境和测试环境,但是两个业务中的接口配置完全一样。之前在测试环境的数据源已经接入完成,没有问题,但是新起任务消费正式环境数据时,报了一个奇怪的错误:

Can’t find deserialization schema for tid 195430192

说他奇怪,是因为正式的消费和测试的消费tid一样,这里传入的都没有任何的数字形式的 tid:

fetch-schema-of-strange-tid

也找到了抛出异常的函数位置,也就是说这个是从 tdMsg 里边获得 属性 tid 的时候得到了一个数字串:

tdmsg-get-tid

联系TEG 实时湖仓研发组 的同学帮忙排查,发现是。flink-format-tdmsg-csv 模块中,有个 TDMsgCsvUtils 会对 TDMsg Head 做解析,会一次取头中的 iname/id/tid 作为 tid:

tdmsg-parse-head-src

而查询TDBank上报的消息头中确实是有一个 id=195430192:

tdbank-head-with-id

解决方法

经了解,上述数据源是业务侧同学自己上报的 tglog,也就是只有body的部分;tglog server 上有部署tdbank采集的agent,可能是不同版本或者不同配置的agent上报不同。显然这里的上报字段和 flink-format-tdmsg-csv 中对 tdbank 头的解析理解是不一致的。

实时湖仓的同学提出两种解决方案:

两者都需要拉去源码,然后生成 jar 后合到自己的代码中。但是前者是平台有提供依赖,做的修改可能带来冲突或者不生效。因此,直接修改 flink-connector-tube 源码。

具体地,这里 Scala 项目使用自定义 jar 的方法:

6. 其他问题

空字段解析问题

tdbank 是有混合数据源或者普通数据源。普通数据源如果以 \t 分割且如果前边的字段没有填写内容,会被 tube-connector 给处理掉,导致字段个数不一致的错误,不能正确消费到。入 TDW 是没有问题的。

代码填充前三个字段为空字符串:

empty-prefix-field-in-tdbank

消费数据的开始是从两个 Long 字段开始的,前边的被掐掉了:

mismatch-field-count-in-tdbank

入TDW是没有问题的:

correct-in-tdw

解决:上报者保证第一个字段别填空。

忽略异常数据

对于上边的异常数据,如果默认设置,会一直卡在上边的位置。因为 TubeSourceFunction 会记录消费的 TDBank的位置,消费到哪里没有正确的处理就任务退出,则不会记录到消费那条日志。

解决:可以在构建Desrializer的时候,将IgnoreError设置为true:

new TDMsgCsvFormatDeserializer.Builder(newTdmFormatInfo())
.setDelimiter('|')
.setIgnoreErrors(true)
.build())