Can spark convert String to Integer when reading using schema in structured streaming

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Can spark convert String to Integer when reading using schema in structured streaming

Aniruddha P Tekade
Hi,

I am new to spark and learning spark structured streaming. I am using structured streaming with schema specified with the help of case class and encoders to get the streaming dataframe.

case class SampleLogEntry(
                             dateTime: Timestamp,
                             clientIp: String,
                             userId: String,
                             operation: String,
                             bucketName: String,
                             contAccUsrId: String,
                             reqHeader: Integer,
                             reqBody: Integer,
                             respHeader: Integer,
                             respBody: Integer,
                             totalReqResSize: Integer,
                             duration: Integer,
                             objectName: String,
                             httpStatus: Integer,
                             s3ReqId: String,
                             etag: String,
                             errCode: Integer,
                             srcBucket: String
                           )

val sampleLogSchema = Encoders.product[SampleLogEntry].schema // using encoders

val rawData = spark
      .readStream
      .format("")
      .option("delimiter", "|")
      .option("header", "true")
      .schema(sampleLogSchema) 
      .load("/Users/home/learning-spark/logs")
However, I am getting only null values with this schema -

-------------------------------------------
Batch: 0
-------------------------------------------
+--------+----+------+-----+----------+------------+---------+-------+----------+--------+---------+--------+----------+----------+--------+---------+-------+---------+
|dateTime|  IP|userId|s3Api|bucketName|accessUserId|reqHeader|reqBody|respHeader|respBody|totalSize|duration|objectName|httpStatus|reqestId|objectTag|errCode|srcBucket|
+--------+----+------+-----+----------+------------+---------+-------+----------+--------+---------+--------+----------+----------+--------+---------+-------+---------+
|    null|null|  null| null|      null|        null|     null|   null|      null|    null|     null|    null|      null|      null|    null|     null|   null|     null|
|    null|null|  null| null|      null|        null|     null|   null|      null|    null|     null|    null|      null|      null|    null|     null|   null|     null|
|    null|null|  null| null|      null|        null|     null|   null|      null|    null|     null|    null|      null|      null|    null|     null|   null|     null|
|    null|null|  null| null|      null|        null|     null|   null|      null|    null|     null|    null|      null|      null|    null|     null|   null|     null|
After trying multiple option like getting schema from sample data, defining schema structType I changed every field in this schema to String -

case class SampleLogEntry(
                       dateTime: String,
                       IP: String,
                       userId: String,
                       s3Api: String,
                       bucketName: String,
                       accessUserId: String,
                       reqHeader: String,
                       reqBody: String,
                       respHeader: String,
                       respBody: String,
                       totalSize: String,
                       duration: String,
                       objectName: String,
                       httpStatus: String,
                       reqestId: String,
                       objectTag: String,
                       errCode: String,
                       srcBucket: String
                     )


I am new to spark and streaming. I am using structured streaming with schema specified with the help of case class and encoders to get the streaming dataframe.

case class SampleLogEntry(
                             dateTime: Timestamp,
                             clientIp: String,
                             userId: String,
                             operation: String,
                             bucketName: String,
                             contAccUsrId: String,
                             reqHeader: Integer,
                             reqBody: Integer,
                             respHeader: Integer,
                             respBody: Integer,
                             totalReqResSize: Integer,
                             duration: Integer,
                             objectName: String,
                             httpStatus: Integer,
                             s3ReqId: String,
                             etag: String,
                             errCode: Integer,
                             srcBucket: String
                           )

val sampleLogSchema = Encoders.product[SampleLogEntry].schema // using encoders

val rawData = spark
      .readStream
      .format("")
      .option("delimiter", "|")
      .option("header", "true")
      .schema(sampleLogSchema) 
      .load("/Users/home/learning-spark/logs")

However, I am getting only null values with this schema -

-------------------------------------------
Batch: 0
-------------------------------------------
+--------+----+------+-----+----------+------------+---------+-------+----------+--------+---------+--------+----------+----------+--------+---------+-------+---------+
|dateTime|  IP|userId|s3Api|bucketName|accessUserId|reqHeader|reqBody|respHeader|respBody|totalSize|duration|objectName|httpStatus|reqestId|objectTag|errCode|srcBucket|
+--------+----+------+-----+----------+------------+---------+-------+----------+--------+---------+--------+----------+----------+--------+---------+-------+---------+
|    null|null|  null| null|      null|        null|     null|   null|      null|    null|     null|    null|      null|      null|    null|     null|   null|     null|
|    null|null|  null| null|      null|        null|     null|   null|      null|    null|     null|    null|      null|      null|    null|     null|   null|     null|
|    null|null|  null| null|      null|        null|     null|   null|      null|    null|     null|    null|      null|      null|    null|     null|   null|     null|
|    null|null|  null| null|      null|        null|     null|   null|      null|    null|     null|    null|      null|      null|    null|     null|   null|     null|

After trying multiple option like getting schema from sample data, defining schema structType I changed every field in this schema to String -

case class SampleLogEntry(
                       dateTime: String,
                       IP: String,
                       userId: String,
                       s3Api: String,
                       bucketName: String,
                       accessUserId: String,
                       reqHeader: String,
                       reqBody: String,
                       respHeader: String,
                       respBody: String,
                       totalSize: String,
                       duration: String,
                       objectName: String,
                       httpStatus: String,
                       reqestId: String,
                       objectTag: String,
                       errCode: String,
                       srcBucket: String
                     )

gets me the following expected output -

+--------------------+---------+------+-------+----------+------------+---------+-------+----------+--------+---------+--------+----------+----------+--------------------+---------+-----------------+---------+
|            dateTime|       IP|userId|  s3Api|bucketName|accessUserId|reqHeader|reqBody|respHeader|respBody|totalSize|duration|objectName|httpStatus|            reqestId|objectTag|          errCode|srcBucket|
+--------------------+---------+------+-------+----------+------------+---------+-------+----------+--------+---------+--------+----------+----------+--------------------+---------+-----------------+---------+
|2019-07-18 00:00:...|10.29.2.5|  null|unknown|      null|        null|        0|      0|         0|     250|      250|     247|      null|       400|08084d90-299e-14a...|        0|InvalidBucketName|     null|
|2019-07-18 00:00:...|10.29.2.6|  null|unknown|      null|        null|        0|      0|         0|     250|      250|     291|      null|       400|08084d92-299e-14a...|        0|InvalidBucketName|     null|
|2019-07-18 00:00:...|10.29.2.5|  null|unknown|      null|        null|        0|      0|         0|     250|      250|     246|      null|       400|08084d94-299e-14a...|        0|InvalidBucketName|     null|
|2019-07-18 00:00:...|10.29.2.6|  null|unknown|      null|        null|        0|      0|         0|     250|      250|     227|      null|       400|08084d96-299e-14a...|        0|InvalidBucketName|     null|
|2019-07-18 00:00:...|10.29.2.5|  null|unknown|      null|        null|        0|      0|         0|     250|      250|     287|      null|       400|08084d98-299e-14a...|        0|InvalidBucketName|     null|

But this is not something I am desiring. Apart from documentation, I am following a book by Francois GarillotGerard Maas titled as Stream Processing with Apache Spark and found these ways where I can use encoders to provide schema for scala development. Is there any other way that I can use to perform this. My logs are pipe | separated records. My log file records everything in string from source nodes which I can not change.

How do I set up schema so that I can read data from log files with specified datatypes of the fields? If spark can not change a String to Integer, what is the workaround? 


Best,
Aniruddha
-----------