Twitter Custom Spark Receiver

Twitter Custom Spark Receiver

Create your twitter feed filtering the handles you follow and also filtering out retweets by those handles using custom Spark receiver.

Create Twitter Developer Account from https://developer.twitter.com/apps

Get the credentials Create app then Keys and Tokens under app get the required Tokens.

Create twitter4j.properties file

debug=true
oauth.consumerKey=XXXSetOfqG154C2XXX
oauth.consumerSecret=XXXX
oauth.accessToken=XXXX
oauth.accessTokenSecret=XXXX

Create Twitter Receiver by filtering your the handles you follow

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver
import twitter4j.{FilterQuery, StallWarning, Status, StatusDeletionNotice, StatusListener, TwitterFactory, TwitterStream, TwitterStreamFactory}

import scala.concurrent.Promise

class TwitterMyFeedReceiver extends Receiver[Status](StorageLevel.MEMORY_ONLY){

  import scala.concurrent.ExecutionContext.Implicits.global

  val twitterStreamPromise = Promise[TwitterStream]
  val twitterStreamFuture = twitterStreamPromise.future

  private def simpleStatusListener = new StatusListener {

    override def onStatus(status: Status): Unit = store(status)
    override def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice): Unit = ()
    override def onTrackLimitationNotice(numberOfLimitedStatuses: Int): Unit = ()
    override def onScrubGeo(userId: Long, upToStatusId: Long): Unit = ()
    override def onStallWarning(warning: StallWarning): Unit = ()
    override def onException(ex: Exception): Unit = ex.printStackTrace()
  }

  override def onStart(): Unit = {

    val twitterStream = new TwitterStreamFactory("src/main/resources")
      .getInstance()
      .addListener(simpleStatusListener)
      .sample("en")

      // Filter by your Friends
    val screenName = "XXXXXX"
    val twitter = new TwitterFactory("src/main/resources").getInstance()
    val twitterFriends = twitter.getFriendsIDs(screenName, -1)  
    val myFriendsId = twitterFriends.getIDs
    val myFilterQuery = new FilterQuery()

    val myTwitterStream = {
      twitterStream.filter(myFilterQuery.follow(myFriendsId: _*))
    }

    twitterStreamPromise.success(myTwitterStream)
  }

  override def onStop(): Unit = twitterStreamFuture.foreach{ twitterStream =>
    twitterStream.cleanUp()
    twitterStream.shutdown()
  }

}

Create Twitter Feed Filtering out Retweets

package playground

import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import twitter4j.Status

import twitter4j.Friendship

object TwitterMyFeed {


  val spark = SparkSession.builder()
    .appName("TwitterProject")
    .master("local[*]")
    .config("spark.testing.memory",471859200)
    .config("spark.driver.memory","2g")
    .config("spark.executor.memory","2g")
    .getOrCreate()

  val ssc = new StreamingContext(spark.sparkContext,Seconds(1))


  def readTwitter():Unit = {
    val twitterStream : DStream[Status] = ssc.receiverStream(new TwitterMyFeedReceiver)
    val tweets = twitterStream.map { status =>
      val username = status.getUser.getName
      val followers = status.getUser.getFollowersCount
      val userScreenName = status.getUser.getScreenName
      val retweet = status.isRetweet
      val text = status.getText

      s"User: $username UserScreenName: $userScreenName ($followers followers) retweet: $retweet tweets: $text"

    }

    tweets.print()
    ssc.start()
    ssc.awaitTermination()

  }

  def readTwitterFiltered():Unit = {
    val twitterStream  = ssc.receiverStream(new TwitterMyFeedReceiver)
    .map { status => (
      status.getUser.getName,
      status.getId,
      status.getUser.getId,
      status.getUser.getFollowersCount,
      status.getUser.getScreenName,
      status.isRetweet,
      status.isFavorited,
      status.getFavoriteCount,

      status.getText )
      }

    // Retweet Filter
    twitterStream.filter(tweet =>
      tweet._7.equals(false)
    ).print()

  }

  def main(args: Array[String]): Unit = {

    readTwitterFiltered()
    ssc.start()
    ssc.awaitTermination()

  }

}

Results