In Spark funktionieren die Funktionen auf RDD
s (wie map
hier) werden serialisiert und zur Bearbeitung an die Ausführenden gesendet. Dies impliziert, dass alle in diesen Operationen enthaltenen Elemente serialisierbar sein sollten.
Die Redis-Verbindung hier ist nicht serialisierbar, da sie TCP-Verbindungen zur Ziel-DB öffnet, die an den Computer gebunden sind, auf dem sie erstellt wird.
Die Lösung besteht darin, diese Verbindungen auf den Executoren im lokalen Ausführungskontext zu erstellen. Es gibt nur wenige Möglichkeiten, das zu tun. Zwei, die mir in den Sinn kommen, sind:
rdd.mapPartitions
:lässt Sie eine ganze Partition auf einmal verarbeiten und amortisiert somit die Kosten für die Erstellung von Verbindungen)- Singleton-Verbindungsmanager:Erstellen Sie die Verbindung einmal pro Executor
mapPartitions
ist einfacher, da lediglich eine kleine Änderung an der Programmstruktur erforderlich ist:
val perhit = perhitFile.mapPartitions{partition =>
val r = new RedisClient("192.168.1.101", 6379) // create the connection in the context of the mapPartition operation
val res = partition.map{ x =>
...
val refStr = r.hmget(...) // use r to process the local data
}
r.close // take care of resources
res
}
Ein Singleton-Verbindungsmanager kann mit einem Objekt modelliert werden, das einen verzögerten Verweis auf eine Verbindung enthält (Hinweis:Ein veränderlicher Verweis funktioniert ebenfalls).
object RedisConnection extends Serializable {
lazy val conn: RedisClient = new RedisClient("192.168.1.101", 6379)
}
Dieses Objekt kann dann verwendet werden, um 1 Verbindung pro Worker-JVM zu instanziieren und wird als Serializable
verwendet Objekt in einem Betriebsabschluss.
val perhit = perhitFile.map{x =>
val param = f(x)
val refStr = RedisConnection.conn.hmget(...) // use RedisConnection to get a connection to the local data
}
}
Der Vorteil der Verwendung des Singleton-Objekts ist weniger Overhead, da Verbindungen nur einmal von JVM erstellt werden (im Gegensatz zu 1 pro RDD-Partition)
Es gibt auch einige Nachteile:
- Bereinigung von Verbindungen ist schwierig (Hookdown/Timer)
- Man muss die Thread-Sicherheit gemeinsam genutzter Ressourcen gewährleisten
(*) Code dient zur Veranschaulichung. Nicht kompiliert oder getestet.