EventHub streaming in PySpark with sparse events - Future timeout [300]

This is different than the issue for really big joins. That is because of a BroadcastHashJoin timeout, which you can find a solution for here. My issue was that the stream timed out when my event hub didn't receive an event for over 5 minutes.

TLDR;

Your event hubs conf dictionary should look like the following. eventhubs.operationTimeout and eventhubs.receiverTimeout are the important parts.

operation_timeout = datetime.time(<hours>, <minutes>, <seconds>).strftime("PT%HH%MM%SS")

conf = {  
  'eventhubs.connectionString' : connection_string,
  'eventhubs.consumerGroup': 'consumergroup',
  'eventhubs.operationTimeout': operation_timeout,
  'eventhubs.receiverTimeout': operation_timeout,
}

operation_timeout is an ISO-8601 duration string that gets passed to java.time.Duration. So you could just write it out manually, too.

Otherwise, the journey:

I have an event hub stream that is getting an event only once every 15 minutes and was getting this error after running the job:

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 27.0 failed 4 times, most recent failure: Lost task 5.3 in stage 27.0 (TID 429, 10.139.64.9, executor 0): java.util.concurrent.TimeoutException: Futures timed out after [300 seconds]  
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
    at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
    at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
    at scala.concurrent.Await$.result(package.scala:190)
    at org.apache.spark.eventhubs.client.CachedEventHubsReceiver.checkCursor(CachedEventHubsReceiver.scala:96)
    at org.apache.spark.eventhubs.client.CachedEventHubsReceiver.org$apache$spark$eventhubs$client$CachedEventHubsReceiver$$receive(CachedEventHubsReceiver.scala:130)
    at org.apache.spark.eventhubs.client.CachedEventHubsReceiver$.receive(CachedEventHubsReceiver.scala:179)
    at org.apache.spark.eventhubs.rdd.EventHubsRDD.compute(EventHubsRDD.scala:122)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:336)

Looks like other people were seeing the same thing, so the folks at Microsoft fixed their library here. Looks like there was a 300 seconds timeout value hardcoded, which they turned into something you could set manually.

How to set that manually? You'll never guess, but the answer was in the docs. You have to add a value to the event hub config (you know, the place with your connection string). That value is eventhubs.operationTimeout. I had initially tried to pass something like '1200s' and '1200 seconds', but those failed with a java.time.format.DateTimeParseException: Text cannot be parsed to a Duration. And for some reason the results got cached, so I had to restart the cluster before getting the actual answer to work. To end the suspense, the solution was to set the value to an ISO-8601 duration string, which you can do like datetime.time(<hours>, <minutes, <seconds>).strftime("PT%HH%MM%SS").