I've using streamline pulsar connector, each dataset receives the data
properly but cannot make join to be working

Dataset<Row> datasetPolicyWithWtm =
datasetPolicy.withWatermark("__publishTime", "5 minutes").as("pol");
Dataset<Row> datasetPhoneWithWtm =
datasetPhone.withWatermark("__publishTime", "5 minutes").as("ph");

        Dataset<Row> join = datasetPolicyWithWtm.join(
                functions.expr("pol.__key=ph.__key and ph.__publishTime >=
pol.__publishTime - interval 2 minutes and ph.__publishTime <=
pol.__publishTime + interval 2 minutes"),
.groupBy(functions.window(datasetPolicyWithWtm.col("__publishTime"), "2
minutes"), functions.col("pol.__key"))
Not sure what could be the reason

