Blog

Using Spark Streaming, Apache Kafka, and Object Storage for Stream Processing on Bluemix

Ilya Drabenia

stream-processing-apache-spark-kafka-on-ibm-bluemix

One of the key points in the Industrial Internet is stream data processing. Equipment fault monitoring, predictive maintenance, or real-time diagnostics are only a few of the possible use cases. Some of the services provided by IBM Bluemix enable you to significantly speed up the implementation of such use cases. With Bluemix, you are not required to deploy and configure Hadoop, Apache Kafka, or other big data tools. It allows you to launch service instances in a few clicks.

In this article, we explain how to integrate and use the most popular open-source tools for stream processing. We explore IBM Message Hub (for collecting streams), the Apache Spark service (for processing events), and IBM Object Storage (for storing results).

 

Scenario

Below is the scheme of a stream processing flow that we will implement in this post.

stream-processing-with-ibm-bluemix-message-hab-apache-kafka-spark

Event Producer generates sample messages, which then go to Message Hub. Spark jobs pick them up from Message Hub, process, and store in the Object Storage files.

Here is the code of our message producer that sends real-time data to Bluemix Message Hub.

 

IBM Message Hub

IBM Message Hub for Bluemix supports two message queuing systems: Apache Kafka and IBM MQ Light. In our sample flow, we use Apache Kafka as a tool for big data stream processing.

For configuring a connection to IBM Message Hub, check out this sample and documentation.

 

Apache Spark on Bluemix

To enable support for Spark Streaming, you need to include this library that contains the implementation of Spark Streaming into Spark dependencies.

Now, you can use StreamingContext with the following code:

val ssc = new StreamingContext(sc, Seconds(2))

apache-spark-on-ibm-bluemix

Developing a Spark job

For debugging our Spark job, we used Jupyter, a tool provided by the Bluemix Spark service for interactive job development. First, we develop some functionality in Jupyter and then copy it to our job that will further be submitted to the Apache Spark service.

apache-spark-on-ibm-bluemix-jupyter

Debugging a Spark job

 

Integrating Spark with Message Hub

To integrate Apache Spark with Message Hub:

  1. Include this additional IBM-specific library into your Spark job configuration.
  2. In your Spark job, provide the following configuration:
val kafkaProps = new MessageHubConfig

kafkaProps.setConfig("bootstrap.servers", "kafka01-prod01.messagehub.services.us-south.bluemix.net:9093")
kafkaProps.setConfig("kafka.user.name", "XXXXXXXXXXXXXXXXXX")
kafkaProps.setConfig("kafka.user.password", "**************")
kafkaProps.setConfig("kafka.topic", "mytopic")
kafkaProps.setConfig("api_key", "*******************************")
kafkaProps.setConfig("kafka_rest_url", "https://kafka-rest-prod01.messagehub.services.us-south.bluemix.net:443")

kafkaProps.createConfiguration()

val sc = new SparkContext()
val ssc = new StreamingContext(sc, Seconds(2))

You can find all the credentials needed for running Kafka in the Credentials section of the Message Hub service in the Bluemix console.

Now, you should be able to connect to Kafka and receive new events in real time.

val stream = ssc.createKafkaStream[String, String, StringDeserializer, StringDeserializer](
 kafkaProps,
 List(kafkaProps.getConfig("kafka.topic"))
)

 

Integrating Spark Streaming with Bluemix Object Storage

You might want to check out this post that gives you details on how to integrate the Spark service with Object Storage. To connect to Object Storage, you need to provide the following configuration in your Spark job:

val pfx = "fs.swift.service." + name

val conf = sc.getConf
conf.set(pfx + ".auth.url", "https://identity.open.softlayer.com")
conf.set(pfx + ".tenant", "sf56-d54664602866ee-20565106c03e")
conf.set(pfx + ".username", "Admin_58ad00f71fbcbebe819624b6d70df9ec6a494887")
conf.set(pfx + ".auth.endpoint.prefix", "endpoints")
conf.set(pfx + ".password", "************")
conf.set(pfx + ".apikey", "************")
conf.set(pfx + ".region", "dallas")
conf.set(pfx + ".hostname", "notebooks")

In this configuration, name is the job name. You can find the corresponding configuration settings in the Credentials section of your Object Storage service. Also, keep in mind that you need to pass the user_id property from the Credentials section to the username option.

 

Generating and processing events

The event producer code (we copied it with small modifications from IBM Message Hub samples):

public void run() {
   logger.log(Level.INFO, ProducerRunnable.class.toString() + " is starting.");

    while (!closing) {
        String fieldName = "records";
        // Push a message into the list to be sent.
        MessageList list = new MessageList();
        list.push("This is a test message" + producedMessages);

        try {
            // Create a producer record which will be sent
            // to the Message Hub service, providing the topic
            // name, field name and message. The field name and
            // message are converted to UTF-8.
            ProducerRecord<byte[], byte[]> record = new ProducerRecord<byte[], byte[]>(
                topic,
                fieldName.getBytes("UTF-8"),
                list.toString().getBytes("UTF-8"));

            // Synchronously wait for a response from Message Hub / Kafka.
            RecordMetadata m = kafkaProducer.send(record).get();
            producedMessages++;

            logger.log(Level.INFO, "Message produced, offset: " + m.offset());

            Thread.sleep(1000);
        } catch (final Exception e) {
            e.printStackTrace();
            shutdown();
            // Consumer will hang forever, so exit program.
            System.exit(-1);
        }
    }

    logger.log(Level.INFO, ProducerRunnable.class.toString() + " is shutting down.");
}

The code for the stream processing job:

object StreamProcessor {
  
  def main(args: Array[String]) {
    val configureKafka: MessageHubConfig = configureKafka

    val sc = new SparkContext()
    configureObjectStore(sc, "test3")

    val ssc = new StreamingContext(sc, Seconds(2))

    val stream = ssc.createKafkaStream[String, String, 
  StringDeserializer, StringDeserializer](
      configureKafka,
      List(configureKafka.getConfig("kafka.topic"))
    )

    stream.saveAsTextFiles("swift://notebook.test3/result.csv")

    ssc.start()
    ssc.awaitTermination()
  }

  def kafkaProps: MessageHubConfig = {
    val kafkaProps = new MessageHubConfig

    kafkaProps.setConfig("bootstrap.servers", "kafka01-prod01.messagehub.services.us-south.bluemix.net:9093")
    kafkaProps.setConfig("kafka.user.name", "*******************")
    kafkaProps.setConfig("kafka.user.password", "********************")
    kafkaProps.setConfig("kafka.topic", "mytopic")
    kafkaProps.setConfig("api_key", "***********************")
    kafkaProps.setConfig("kafka_rest_url", "https://kafka-rest-prod01.messagehub.services.us-south.bluemix.net:443")

    kafkaProps.createConfiguration()
    kafkaProps
  }

  def configureObjectStore(sc: SparkContext, name: String): Unit = {
    val pfx = "fs.swift.service." + name

    val conf = sc.getConf
    conf.set(pfx + ".auth.url", "https://identity.open.softlayer.com")
    conf.set(pfx + ".tenant", "sf56-d54664602866ee-20565106c03e")
    conf.set(pfx + ".username", "Admin_58ad00f71fbcbebe819624b6d70df9ec6a494887")
    conf.set(pfx + ".auth.endpoint.prefix", "endpoints")
    conf.set(pfx + ".password", "****************")
    conf.set(pfx + ".apikey", "****************")
    conf.set(pfx + ".region", "dallas")
    conf.set(pfx + ".hostname", "notebooks")
  }

}

 

Conclusion

During the development, we found out that Zookeeper—the regular tool for managing Kafka—was replaced by IBM with a special REST API, which makes the Message Hub API incompatible with many software created for typical Kafka deployments.

Also, the authentication and authorization process to access Message Hub is not trivial: it includes multiple steps and works with files somewhat unsuitable for a cloud environment.

We also found out that the integration of the Spark service with other Bluemix services is quite challenging. For integrating Spark with Message Hub, you need an additional dependency from IBM.

In general, usage of IBM Bluemix allows you to save about one man-month of work, since we avoid installation, configuration, and integration of multiple big data tools. However, development could be even more simplified by providing a mechanism for easier integration of different Bluemix services with each other. Let’s hope that IBM will implement it some day.

See also my evaluation of the stream processing services available in IBM Bluemix: Streaming Analytics, Apache Spark, and BigInsights.

 

About the author

Ilya Drabenia is a Technical Lead at Altoros. He has broad experience in building software architectures, including design and development of complex solutions. Ilya is passionate about microservices, domain-driven design, as well as scalable and parallel algorithms. He also holds an MS degree in Computer Science. See his profile on GitHub.


For the next parts of this series, subscribe to our blog or follow @altoros.

Get new posts right in your inbox!

2 Comments

Benchmarks and Research

Subscribe to new posts

Get new posts right in your inbox!