In {T308356}, we made it possible to get Tables and DataStream[Row] of WMF event streams. However, in order to do transformation on the DataStream API level we need a TypeInformation[Row] version of the output of any transformation. We really only need this when converting back up to the Table API. To do this, we can again rely on the event JSONSchemas, but we need to convert the JsonSchema to this TypeInformation[Row].
Flink actually has a built in class to do this: [[ https://nightlies.apache.org/flink/flink-docs-release-1.15/api/java/org/apache/flink/formats/json/JsonRowSchemaConverter.html | JsonRowSchemaConverter ]]. However, it does not support our [[ JsonRowSchemaConverter | map types convention ]]. Also, the JsonSchemaConverter we wrote in T308356 does not convert to some types that JsonRowSchemaConverter does, e.g. Timestamps.
We need compatible converter implementations for both Table API DataTypes/Schemas as well as DataStream API TypeInformation[Row]. This should be pretty easy to do.
Once we have this, we should be able to do DataStream level transforms and then convert back to Table API like this:
```lang=scala
outputJsonSchema: ObjectNode = // ...
val outputTypeInfo: TypeInformation[Row] = JsonSchemaConverter.toTypeInformationRow(outputJsonSchema)
val outputStream: DataStream[Row] = inputDataStream.map(row => {
val newRow = Row.withNames(row.getKind)
row.getFieldNames(true).forEach(fieldName => {
newRow.setField(fieldName, row.getField(fieldName))
})
newRow.setField("new_text_field", "new value")
newRow
})(outputTypeInfo)
val outputTable: Table = tableEnv.fromDataStream(outputStream)
```