Spark Streaming es una adición a Spark API para transmisión en vivo y procesamiento de datos a gran escala. En lugar de lidiar con cantidades masivas de datos en bruto no estructurados y limpiarlos después, Spark Streaming realiza el procesamiento y la recopilación de datos casi en tiempo real.
Spark Streaming es una biblioteca de Spark para procesar flujos de datos casi continuos. La abstracción central es un flujo discretizado creado por la API Spark DStream para dividir los datos en lotes. La API de DStream está impulsada por Spark RDD (Conjuntos de datos distribuidos resistentes), lo que permite una integración perfecta con otros módulos de Apache Spark como Spark SQL y MLlib.
Las empresas aprovechan el poder de Spark Streaming en muchos casos de uso diferentes:
El enfoque de transmisión permite un análisis más rápido del comportamiento del cliente, sistemas de recomendación más rápidos y detección de fraude en tiempo real. Para los ingenieros, cualquier tipo de anomalía del sensor de los dispositivos IoT es visible a medida que se recopilan los datos.
Spark Streaming admite cargas de trabajo por lotes y de transmisión de forma nativa, lo que proporciona mejoras interesantes a las fuentes de datos. Este aspecto único satisface los siguientes requisitos de los sistemas modernos de transmisión de datos:
Todas las tecnologías, incluido Spark Streaming, tienen ventajas y desventajas:
Ventajas | Desventajas |
Rendimiento de velocidad excepcional para tareas complejas | Gran consumo de memoria |
Tolerancia a fallos | Difícil de usar, depurar y aprender |
Implementación sencilla en clústeres en la nube | No está bien documentado y los recursos de aprendizaje son escasos |
Soporte multilenguaje | Visualización de datos deficiente |
Integración para marcos de big data como Cassandra y MongoDB | Lento con pequeñas cantidades de datos |
Capacidad de unirse a múltiples tipos de bases de datos | Pocos algoritmos de aprendizaje automático |
¿Cómo funciona Spark Streaming?
Spark Streaming se ocupa de análisis complejos y a gran escala casi en tiempo real. La canalización de procesamiento de transmisión distribuida pasa por tres pasos:
1. Recibir transmisión de datos desde fuentes de transmisión en vivo.
2. Proceso los datos en un clúster en paralelo.
3. Salida los datos procesados en los sistemas.
Arquitectura de transmisión de Spark
La arquitectura central de Spark Streaming está en la transmisión discreta de lotes. En lugar de pasar por la canalización de procesamiento de flujo de un registro a la vez, los microlotes se asignan y procesan dinámicamente. Por lo tanto, los datos se transfieren a los trabajadores en función de los recursos disponibles y la localidad.
Cuando llegan los datos, el receptor los divide en particiones de RDD. La conversión a RDD permite procesar lotes con códigos y bibliotecas de Spark, ya que los RDD son una abstracción fundamental de los conjuntos de datos de Spark.
Fuentes de transmisión de Spark
Los flujos de datos requieren datos recibidos de las fuentes. Spark streaming divide estas fuentes en dos categorías:
- Fuentes básicas. Las fuentes directamente disponibles en la API principal de Streaming, como las conexiones de socket y los sistemas de archivos compatibles con HDFS
- Fuentes avanzadas. Las fuentes requieren vinculación de dependencias y no están disponibles en la API principal de Streaming, como Kafka o Kinesis.
Cada entrada DStream se conecta a un receptor. Para flujos de datos paralelos, la creación de múltiples DStreams también genera múltiples receptores.
Operaciones de transmisión de Spark
Spark Streaming incluye la ejecución de diferentes tipos de operaciones:
1. Operaciones de transformación modificar los datos recibidos de los DStreams de entrada, similares a los aplicados a los RDD. Las operaciones de transformación se evalúan con pereza y no se ejecutan hasta que los datos llegan a la salida.
2. Operaciones de salida envíe los DStreams a sistemas externos, como bases de datos o sistemas de archivos. Pasar a sistemas externos activa las operaciones de transformación.
3. Operaciones de DataFrame y SQL ocurren al convertir RDD en DataFrames y registrarlos como tablas temporales para realizar consultas.
4. Operaciones MLlib se utilizan para realizar algoritmos de aprendizaje automático, incluidos:
- Algoritmos de transmisión se aplican a datos en vivo, como transmisión de regresión lineal o transmisión de k-means.
- Algoritmos fuera de línea para aprender un modelo fuera de línea con datos históricos y aplicar el algoritmo para transmitir datos en línea.
Ejemplo de transmisión de Spark
El ejemplo de transmisión tiene la siguiente estructura:
La arquitectura se divide en dos partes y se ejecuta desde dos archivos:
- Ejecutar el primer archivo para establecer una conexión con la API de Twitter y crear un socket entre la API de Twitter y Spark. Mantenga el archivo en ejecución.
- Ejecutar el segundo archivo para solicitar y comenzar a transmitir los datos, imprimiendo Tweets procesados a la consola. Los datos enviados sin procesar se imprimen en el primer archivo.
Crear un objeto de escucha de Twitter
El TweetListener El objeto escucha los Tweets de la transmisión de Twitter con el StreamListener de tweepy . Cuando se realiza una solicitud en el socket al servidor (local), el TweetListener escucha los datos y extrae la información del Tweet (el texto del Tweet). Si el objeto Tweet extendido está disponible, TweetListener obtiene el extended campo, de lo contrario el texto se obtiene el campo. Por último, el oyente agrega __end al final de cada Tweet. Este paso posterior nos ayuda a filtrar el flujo de datos en Spark.
import tweepy
import json
from tweepy.streaming import StreamListener
class TweetListener(StreamListener):
# tweet object listens for the tweets
def __init__(self, csocket):
self.client_socket = csocket
def on_data(self, data):
try:
# Load data
msg = json.loads(data)
# Read extended Tweet if available
if "extended_tweet" in msg:
# Add "__end" at the end of each Tweet
self.client_socket\
.send(str(msg['extended_tweet']['full_text']+" __end")\
.encode('utf-8'))
print(msg['extended_tweet']['full_text'])
# Else read Tweet text
else:
# Add "__end" at the end of each Tweet
self.client_socket\
.send(str(msg['text']+"__end")\
.encode('utf-8'))
print(msg['text'])
return True
except BaseException as e:
print("error on_data: %s" % str(e))
return True
def on_error(self, status):
print(status)
return True
Si ocurre algún error en la conexión, la consola imprime la información.
Recopilar credenciales de desarrollador de Twitter
El portal para desarrolladores de Twitter contiene las credenciales de OAuth para establecer una conexión API con Twitter. La información está en la aplicación Claves y tokens pestaña.
Para recopilar los datos:
1. Genere la clave y secreto API ubicado en las Claves de consumidor del proyecto y guarde la información:
Las Claves de consumo verifica en Twitter tu identidad, como un nombre de usuario.
2. Genere el token de acceso y secreto de los tokens de autenticación y guarde la información:
Los tokens de autenticación permitir extraer datos específicos de Twitter.
Enviar datos desde la API de Twitter al socket
Con las credenciales de desarrollador, complete la API_KEY , API_SECRETO , ACCESO_TOKEN y ACCESO_SECRETO para acceder a la API de Twitter.
La función enviarDatos ejecuta la transmisión de Twitter cuando un cliente realiza una solicitud. Primero se verifica la solicitud de transmisión, luego se crea un objeto de escucha y los filtros de datos de transmisión se basan en la palabra clave y el idioma.
Por ejemplo:
from tweepy import Stream
from tweepy import OAuthHandler
API_KEY = "api_key"
API_SECRET = "api_secret"
ACCESS_TOKEN = "access_token"
ACCESS_SECRET = "access_secret"
def sendData(c_socket, keyword):
print("Start sending data from Twitter to socket")
# Authentication based on the developer credentials from twitter
auth = OAuthHandler(API_KEY, API_SECRET)
auth.set_access_token(ACCESS_TOKEN, ACCESS_SECRET)
# Send data from the Stream API
twitter_stream = Stream(auth, TweetListener(c_socket))
# Filter by keyword and language
twitter_stream.filter(track = keyword, languages=["en"])
Crear un socket TCP de escucha en el servidor
La última parte del primer archivo incluye la creación de un socket de escucha en un servidor local. La dirección y el puerto están vinculados y escuchan las conexiones del cliente Spark.
Por ejemplo:
import socket
if __name__ == "__main__":
# Create listening socket on server (local)
s = socket.socket()
# Host address and port
host = "127.0.0.1"
port = 5555
s.bind((host, port))
print("Socket is established")
# Server listens for connections
s.listen(4)
print("Socket is listening")
# Return the socket and the address of the client
c_socket, addr = s.accept()
print("Received request from: " + str(addr))
# Send data to client via socket for selected keyword
sendData(c_socket, keyword = ['covid'])
Una vez que el cliente Spark realiza una solicitud, el socket y la dirección del cliente se imprimen en la consola. Luego, el flujo de datos se envía al cliente en función del filtro de palabras clave seleccionado.
Este paso concluye el código en el primer archivo. Ejecutarlo imprime la siguiente información:
Mantenga el archivo ejecutándose y proceda a crear un cliente Spark.
Crear un receptor Spark DStream
En otro archivo, cree un contexto de Spark y un contexto de transmisión local con intervalos de lote de un segundo. El cliente lee desde el nombre de host y el socket del puerto.
import findspark
findspark.init()
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
sc = SparkContext(appName="tweetStream")
# Create a local StreamingContext with batch interval of 1 second
ssc = StreamingContext(sc, 1)
# Create a DStream that conencts to hostname:port
lines = ssc.socketTextStream("127.0.0.1", 5555)
Preprocesar datos
El preprocesamiento de RDD incluye dividir las líneas de datos recibidas donde __end aparece y pasa el texto a minúsculas. Los primeros diez elementos se imprimen en la consola.
# Split Tweets
words = lines.flatMap(lambda s: s.lower().split("__end"))
# Print the first ten elements of each DStream RDD to the console
words.pprint()
Después de ejecutar el código, no sucede nada ya que la evaluación es perezosa. El cálculo comienza cuando comienza el contexto de transmisión.
Comenzar a transmitir contexto y computación
Iniciar el contexto de transmisión envía una solicitud al host. El host envía los datos recopilados de Twitter al cliente Spark y el cliente procesa previamente los datos. La consola luego imprime el resultado.
# Start computing
ssc.start()
# Wait for termination
ssc.awaitTermination()
Iniciar el contexto de transmisión imprime en el primer archivo una solicitud recibida y transmite el texto de datos sin procesar:
El segundo archivo lee los datos cada segundo desde el socket y se aplica el preprocesamiento a los datos. El primer par de líneas están vacías hasta que se establece la conexión:
El contexto de transmisión está listo para finalizar en cualquier momento.