Neo Tech Blog

NEO TECH LBLOG - Praxis-Techblog für IT, Web und Mobile
Martin Gerlach

Java SE 8 Neuerungen (Teil 2): Stream-API

1 Kommentar


Im ersten Teil dieser Serie über Neuheiten in Java SE 8 wurden Lambda-Ausdrücke sowie Default-Methoden in Interfaces vorgestellt. Im zweiten Teil wird der ehemalige Neofoniker und jetzige Senior Software-Entwickler bei idealo, Martin Gerlach, eine weitere Neuerung in Java 8 vorstellen: die Stream API. Mit ihr sollen Zugriffe auf Daten vereinfacht werden.

Mit Hilfe der Lambda-Ausdrücken und den Default-Methoden ist es leicht möglich Operationen auf (potentiell unendlich langen) Datenströmen (Streams) im funktionalen Stil auszuführen. Ein Beispiel dafür sind z.B. Map/Reduce-Operationen auf Collections. Auch andere, in der Analyse großer Datenmengen übliche Funktionen, lassen sich damit durchführen. Zudem besteht die Möglichkeit, bestimmte Operationen auf einfache Weise automatisch parallel durch mehrere Threads auszuführen zu lassen.

Streams im Überblick und erste Beispiele

Die Package-Dokumentation von java.util.stream gibt einen ausführlichen Überblick über die Stream-API. Darüber hinaus wurden mittles Default-Methoden diverse Interfaces des Collection-Frameworks sowie vereinzelte weitere Interfaces und Klassen um Streaming-Features erweitert, ein Beispiel ist die Methode java.io.BufferedReader.lines(). Einen Überblick über die Erweiterungen des JDK hinsichtlich Streams und Lambdas findet man hier. Eine vollständige Behandlung des Themas würde hier den Rahmen sprengen. Dieser Artikel gibt daher lediglich einige Beispiele.

Ausgangspunkt sind die neuen Default-Methoden java.util.Collection.stream() sowie die Variante für parallele Verarbeitung, .parallelStream(), mittels der man von jeder Implementierung von Collection<E> einen Stream<E> über die Elemente der Collection erhält. Durch Aufruf dieser Methoden findet zunächst noch kein Verarbeitungsschritt statt, sondern es werden mittels verschiedener “Fluent API Calls” die durchzuführenden Operationen als Sequenz von intermediären (“intermediate”) Operationen und maximal einer abschließenden (“terminal”) Operation definiert. z.B.:

 

List<Point> points;                 // java.awt.Point(int x, int y)

// Map-Reduce

int sumYForPositiveX = points.stream()

.filter(p -> p.x > 0)                    // intermediate

.mapToInt(p -> p.y)                    // intermediate

.reduce(0, Integer::sum); // = .sum()        // terminal

// GroupBy-(Map)-Reduce

Map<Integer, Integer> sumYbySignumX = points.stream()

.collect(Collectors.groupingBy(            // terminal

p -> Integer.signum(p.x),                    

Collectors.summingInt(p -> p.y)));

// Map-Collect

List<Double> distancesFromOrigin = points.stream()  

        .mapToDouble(p -> p.distance(0.0, 0.0))            // i.

        .collect(Collectors.toCollection(ArrayList::new));    // t.   

 

Internal Iteration bei Java 8

Im Unterschied zum Interface Iterator<E>, mittels dem der Programmierer die Iteration über Iterable<E>-Implementierungen (z.B. auch Implementierungen von Collection<E>) von außen (“External Iteration”) steuern kann (bzw. muss!), liegt der Fokus von Stream<E> auf der Durchführung bestimmter Operationen auf der zugrundeliegenden Datenstruktur. Implementierungen von Stream<E> sowie der Varianten für primitive Typen (IntStream, LongStream, DoubleStream und diverse auf primitive Typen zugeschnittene Methoden) stellen für die zahlreichen Methoden dieses Interfaces Implementierungen bereit, die sich intern um die Ausführung dieser Operationen kümmern und u.a. auch das Iterieren über die Elemente der zu analysierenden Datenstruktur übernehmen. Das nennt man “Internal Iteration”. So wird es dem Entwickler ermöglicht, durch Angabe der verschiedenen Operationen mittels der Streaming-API auszudrücken, was mit seiner Datenstruktur gemacht werden soll und es der im JDK enthaltenen Implementierung der API zu überlassen, wie dies geschieht.

Brückenfunktionalitäten zwischen den beiden Welten bieten BaseStream.iterator() sowie Iterator().forEachRemaining() und Iterable.forEach(). Einen Iterator als Stream-Quelle zu benutzen ist etwas komplizierter (siehe Stackoverflow zum ersten, zweiten und dritten).

 Eigenschaften von Streams

Streams sind:

Persistenzlos – Streams speichern keine Daten, sondern dienen dazu, Daten von einer Quelle (Datenstruktur, I/O-Ressource, …) durch eine Abfolge (“Pipeline”) von Operationen zu verarbeiten und abschließend zu aggregieren, einer Datensenke zuzuführen, o.ä.

Funktional – Streams verändern ihre Datenquelle nicht, Stream-Operationen erzeugen vielmehr neue Streams mit veränderten Eigenschaften. filter() löscht z.B. keine Elemente aus einer zugrundeliegenden Collection, sondern erzeugt einen neuen Stream, der bestimmte Elemente der Collection auslässt.

Lazy – Intermediäre Operationen sind immer lazy, d.h. sie werden nicht bei ihrem Aufruf, sondern erst bei Aufruf einer terminalen Operation ausgeführt. Dies hat den Vorteil, dass Abfolgen von Operationen wie z.B. im Falle von “Finde den ersten String mit drei aufeinanderfolgenden Vokalen” (.stream().filter(s -> contains3ConsecutiveVowels(s)).findFirst()), optimiert werden können. In diesem Beispiel muss eben nicht (immer) jeder String untersucht werden.

Möglicherweise unendlich – Streams über Collections sind endlich, aber Streams müssen nicht endlich sein. Durch “abkürzende” (short-circuiting) Operationen (s.u.) wie limit()oder findFirst() sind endliche Berechnungen über unendliche Streams möglich. Es lässt sich auch eine langlaufende terminale Operation vorstellen, die durch zustandslose intermediäre Operationen wie etwa map() und filter() verarbeitete Elemente eines unendlich langen (Input)Streams fortlaufend per Seiteneffekt (s.u.) in eine Datensenke schreibt.

Consumable – Wie im Falle eines Iterators können die Elemente eines Streams nur einmal durchlaufen werden d.h., es kann nur max. eine terminale Operation ausgeführt werden, die dazu führt, dass vorgelagerte intermediäre Operationen (in optimierter Art und Weise) ablaufen. Streams können also vor Aufruf der ersten terminalen Operation nicht zwischengespeichert werden, um weitere terminale Operationen auszuführen:

 Stream<Integer> xStream  = points.stream().map(p -> p.x);    // ok    

List<Integer> x1 = xStream.collect(Collectors.toList());    // ok

List<Integer> x2 = xStream.collect(Collectors.toList());    // Boom!

// IllegalStateException – … stream has already been operated upon …

Erlaubt ist hingegen das Speichern der Streamdefinition als Lambda, welches dann mehrfach ausgeführt wird:

Supplier<Stream<Integer>> s = () -> points.stream().map(p -> p.x);   

List<Integer> x1 = s.get().collect(Collectors.toList());    // ok

List<Integer> x2 = s.get().collect(Collectors.toList());    // ok!

Sequentiell oder Parallel – Streams können “parallel sein”, was dazu führt, dass viele (aber nicht alle) Operationen parallel ablaufen. Zum Einsatz kommt dabei der common ForkJoinPool der JVM. Außer über die in der JavaDoc beschriebenen System-Properties kann man leider keinen Einfluss auf den verwendeten Pool nehmen. Weiterhin muss man damit rechnen, dass bestimmte Operationen die Ausführung durch Synchronisationsmechanismen sogar verlangsamen, so wird z.B. sorted() immer nur single-threaded ausgeführt und vorher wie hinterher mittels Barriers synchronisiert. Ist das Sortieren die einzige oder die mit Abstand umfassendste Aufgabe des Streams vor der terminalen Operation, so macht die Verwendung eines parallelen Streams hier keinen Sinn.

Java-8-Stream-APIs erzeugen mit wenigen Ausnahmen wie Collection.parallelStream() per default sequentielle Streams, welche mittels BaseStream.parallel() in parallele umgewandelt werden können. BaseStream.sequential() erklärt umgekehrt einen Stream für sequentiell.

Geordnet oder Ungeordnet – Streams über geordnete Collections wie etwa Listen oder sortierte Sets sind geordnet, Streams über HashSets sind es z.B. nicht. Operationen wie sorted() wandeln einen nicht geordneten Stream in einen geordneten um, und die Operation unordered() ermöglichst es den Stream für nachfolgende Operationen als nicht geordnet zu betrachten, was die parallele Ausführung von Operationen wie distinct() oder gruppierte Reduktion (grouped reduction, “group by”) i.d.R. etwas effizienter macht.

Für identisch geordnete Streams muss die Ausführung einer Kette von Operationen für jeden Stream dasselbe Ergebnis liefern, für ungeordnete Streams muss dies nicht der Fall sein, z.B. muss IntStream.range(0, 3).unordered().map(x -> x*2).collect(Collectors.toList()) bei wiederholter Ausführung nicht unbedingt immer [0, 2, 4] erzeugen, sondern es wäre für jede Ausführung jede Permutation der resultierenden Listenelemente erlaubt. Diese Ordnung zu garantieren ist für sequentielle Streams fast immer viel einfacher als für parallele!

Eigenschaften von Stream-Operationen

Intermediär oder Terminal – Auf einem Stream können beliebig viele intermediäre Operationen zum Einsatz kommen und maximal eine terminale Operation. Intermediäre Operationen wie map(), filter(), distinct(), sorted() usw. geben daher wieder einen Stream zurück, auf dem dann durch Aufruf der Fluent-API-Methoden weitere Operationen ausgeführt werden können. Terminale Operationen wie reduce(), collect(), findFirst(), findAny() usw. konsumieren ihren Stream indem sie alle intermediären Operationen ausführen und dann ein endgültiges Ergebnis (keinen Stream) liefern.

Short-circuitingStream-Operationen sind abkürzend, wenn sie einen Stream “verkürzen”. Sie können intermediär sein wie skip()und limit() oder terminal wie findFirst() und findAny().

Stateless oder Stateful – Manche intermediären Stream-Operationen müssen einen internen Zustand halten, um ihr Ergebnis zu berechnen. Beispiele dafür sind limit(), skip(), distinct(), sorted(). Manche dieser Operationen, z.B. distinct() und sorted(), die den ganzen Stream sehen müssen, um ein korrektes Ergebnis zu liefern, funktionieren daher generell nicht besonders gut auf parallelen Streams (s.o.). limit() und skip() sind nur dann problematisch, wenn ein geordneter Stream parallel bearbeitet werden soll, da es dann wichtig ist, welche Elemente einbezogen bzw. übersprungen werden.

Seiteneffekte – Prinizipiell kann jede Stream-Operation Seiteneffekte erzeugen, indem sie außerhalb ihrer Lambda-Scopes liegende Objekte verändert, was allerdings generell nicht zu empfehlen ist. z.B.:

List<String> stringList = new ArrayList<>();
points.stream().forEach(p -> results.add(p.toString())); // ARGH!

Dies ist ein vermeidbarer Seiteneffekt. Anstelle von forEach(Consumer) (was im übrigen auch für geordnete Streams die Reihenfolge im Gegensatz zu forEachOrdered(Consumer) nicht einhalten muss!) sollte hier besser collect(Collector) verwendet werden:

 

List<String> stringList = points.stream()

        .map(Object::toString)         // or: p -> p.toString()

        .collect(Collectors.toList());

 

Ein nicht vermeidbarer Seiteneffekte ist z.B. die Ausgabe auf der Console bzw. generell das Schreiben in Datensenken. Außerdem gibt es in der Kategorie noch die intermediäre Operation peek(Consumer):

 

List<Point> positiveX = points.stream()

        .peek(System.out::println) // all

.filter(p -> p.x > 0)

.peek(p -> System.out.println(„** “ + p)) // only filtered

.collect(Collectors.toList());

 

Das peek()-Beispiel zeigt übrigens auch sehr schön, in welcher Reihenfolge die Operationen tatsächlich ausgeführt werden, man achte dazu auf die Position der mit “**” beginnenden Ausgaben der aus der Liste [ Point(1,1), Point(1,-1), Point(-1,-1), Point(-1,1) ] herausgefilterten Punkte mit positiver X-Koordinate:

 

java.awt.Point[x=1,y=1]

** java.awt.Point[x=1,y=1]

java.awt.Point[x=1,y=-1]

** java.awt.Point[x=1,y=-1]

java.awt.Point[x=-1,y=-1]

java.awt.Point[x=-1,y=1]

Wie man sieht, werden in einem sequentiellen Stream zunächst für jedes Listen-Element alle relevanten Stream-Operationen ausgeführt bevor mit dem nächsten Listen-Element fortgefahren wird. Weiterhin finden die Ausgaben nur statt, wenn eine terminale Operation wie hier collect() in der Pipeline vorhanden ist.

Reduce vs. Collect oder Immutability vs. Mutabilty

Die Operation collect() im vorigen Beispiel ist eigentlich wenig “funktional”, da sie eine durch Collectors.toList() intern) initial erzeugte Liste durch die Operation verändert, nämlich mit Werten befüllt.

Im Gegensatz dazu geht reduce() von nicht veränderlichen (immutable) Objekten aus, die schrittweise von der eigentlichen Reduktionsfunktion (z.B. Aufsummierung, Konkatenierung) zu neuen nicht veränderlichen Objekten und schließlich zum Ergebnis zusammengefügt werden.

Beispiele von String-Repräsentationen

Folgende Beispiele zum Zusammenfügen von String-Repräsentationen (Strings sind in Java unveränderliche Objekte) der Elemente der Punkteliste aus den vorangegangen Beispielen verdeutlichen den Unterschied:

1. String-Konkatenation

String pointsStr = points.stream().reduce(„“,     // Initial value

(String s, Point p) -> s + “ “ + p,         // Accumulator

(String s1, String s2) -> s1 + s2));        // Combiner

 

Die Kombinationsfunktion (Combiner) benötigt man nur für parallele Ausführung, da hier die Reduktion zunächst auf Teillisten erfolgt und die Teilergebnisse anschließend kombiniert werden müssen.

Natürlich weiß schon ein Java-Anfänger, dass String-Konkatenation keine besonders performante Operation ist. Der Compiler optimiert bei langen “+”-Ketten zwar durch interne Benutzung von StringBuilder, aber in diesem Fall ist das nur innerhalb der Lambdas für die Einzelteile möglich, so dass hier im Endeffekt anstelle vieler String-Objekte viele StringBuilder-Objekte erzeugt und wieder verworfen werden. Besser wäre vielleicht dieser Ansatz:

2. Reduce mit StringBuilder

String pointsStr = points.stream().reduce(new StringBuilder(),

StringBuilder::append,     // (sb, point) -> sb.append(point)

StringBuilder::append)    // (sb, sb) -> sb.append(sb)

.toString();                // make String from StringBuilder

 

Nun werden dem initialen Wert, einem neuen StringBuilder-Objekt mittels append() die String-Repräsentationen der Listen-Elemente der Reihe nach hinzugefügt. Das funktioniert allerdings nur, weil append() eine “Funktion” ist, sie gibt nämlich als Ergebnis den (veränderten) StringBuilder wieder zurück. Außerdem funktioniert dieser Ansatz nur sequentiell! Denn wenn derselbe(!) initiale Wert in parallel ausgeführten Verarbeitungsschritten (points.parallelStream().reduce(…)) durch append() verändert wird, führt das mindestens dazu, dass höchstwahrscheinlich die Ordnung der Liste nicht eingehalten wird, aber in der Regel endet dieser Versuch mit einer Exception, da StringBuilder-Objekte nicht threadsafe sind.

Für solche Fälle gibt es in Java 8 daher collect() – auch “mutable reduction” genannt:

3. Collect mit StringBuilder

String pointsStr = points.parallelStream()

.collect(StringBuilder::new,

StringBuilder::append, StringBuilder::append)

.toString();

Der Unterschied zu reduce() ist, dass hier als Startwert gar kein Datenwert sondern eine Funktion (Lambda-Ausdruck oder Methodenreferenz) erwartet wird, die für jeden parallelen Verarbeitungsteil das eigentliche (veränderliche) Startobjekt erzeugt (Supplier). Hier startet also jeder parallele Teil mit einem eigenen StringBuilder. Desweiteren sind Akkumulator und Combiner hier keine BiFunctions, sondern BiConsumers, die keine Rückgabewerte erwarten bzw. Rückgabewerte, wie etwa von append(), verwerfen.

Die Klasse Collectors stellt viele nützliche Collector-Implementierungen zur Benutzung mit der Stream-Methode collect(Collector) bereit. Collector-Objekte fassen die o.g. Akkumulations- und Kombinationslogik zusammen.

 

Als einführende und weiterführende Literatur zu Streams seien als zwei von vielen diese Quellen genannt:

  • Kurz und bündig: Die Package-Beschreibung von java.util.stream.
  • Ausführlich: Richard Warburton, “Java 8 Lambdas – Functional Programming for the Masses”, O’Reilly

Desweiteren danke ich Angelika Langer und Klaus Kreft für einen inspirierenden Lambda- und Streams-Hackathon am 6.11.2014 in Berlin, aus dem das eine oder andere Beispiel in veränderter Form für diesen Artikel verwendet wurde.

Im dritten und letzten Teil dieser Serie über Java 8 werden das neue Date/Time-API sowie verschiedene kleinere Neuerungen vorgestellt sowie einige kleine Unzulänglichkeiten beleuchtet. Abschließend folgt noch eine kurze Betrachtung der Vorteile eines raschen Umstiegs auf Java 8. Hier kannst Du den Blog abonnieren.

Zurück zum ersten Teil: Java 8 Neuerungen – Lambda Ausdrücke und Default Methoden

____

Wir suchen Java Developer:

Software Developer Java (m/w)

Sr. Java Developer F&E – Schwerpunkt Computerlinguistik

Sr. Java Developer (m/w) F&E – Schwerpunkt Machine Learning/ Big Data

Senior Java Developer (m/w) – Schwerpunkt Solr / Lucene / Elastic Search

Bewirb Dich jetzt!

 

Martin Gerlach

Autor: Martin Gerlach

Martin Gerlach arbeitet seit 2013 als Senior Softwareentwickler im Bereich Angebotsimport bei idealo, Deutschlands größtem Preisvergleichsportal. Dort befasst er sich mit der Entwicklung perfomanter, skalierbarer Datenimport-, Transformations- und Analysetools. Sein besonderes Interesse gilt dabei verteilten Frameworks für Streaming und Analyse großer Datenmengen sowie damit einhergehenden "funktionalen" Ansätzen in der Programmierung. Vorher war Martin über 8 Jahre lang für IBM und 5 Jahre für Neofonie sowohl in Forschung und Entwicklung als auch in Kundenprojekten tätig. Er ist Master of Science Absolvent der HAW Hamburg in Informatik mit Schwerpunkt "Verteilte Systeme".

Ein Kommentar

  1. Genialer Artikel, vielen Dank!
    Schöne Grüße

Schreibe einen Kommentar

Pflichtfelder sind mit * markiert.