Database
 sql >> Datenbank >  >> RDS >> Database

Was sind sequentielle und parallele Streams in Java?

Java kann Stream-Operationen parallelisieren, um Multi-Core-Systeme zu nutzen. Dieser Artikel bietet eine Perspektive und zeigt anhand geeigneter Beispiele, wie paralleler Stream die Leistung verbessern kann.

Streams in Java

Ein Stream in Java ist eine Folge von Objekten, die als Datenleitung dargestellt werden. Es hat normalerweise eine Quelle wo sich die Daten befinden und ein Ziel wo es übertragen wird. Beachten Sie, dass ein Stream kein Repository ist; Stattdessen arbeitet es mit einer Datenquelle, z. B. mit einem Array oder einer Sammlung. Die Zwischenbits in der Passage werden eigentlich als Stream bezeichnet. Während des Übertragungsprozesses durchläuft der Stream normalerweise eine oder mehrere mögliche Transformationen, wie z. B. Filtern oder Sortieren, oder es kann jeder andere Prozess sein, der mit den Daten arbeitet. Dadurch werden die Originaldaten typischerweise gemäß den Anforderungen des Programmierers in eine andere Form gebracht. Daher wird gemäß der darauf angewendeten Operation ein neuer Stream erstellt. Wenn beispielsweise ein Stream sortiert wird, führt dies zu einem neuen Stream, der ein Ergebnis erzeugt, das dann sortiert wird. Das bedeutet, dass die neuen Daten eine transformierte Kopie des Originals sind und nicht in der ursprünglichen Form vorliegen.

Sequenzieller Stream

Jede Stream-Operation in Java wird sequenziell verarbeitet, sofern sie nicht ausdrücklich als parallel angegeben ist. Sie sind im Grunde nicht parallele Streams, die einen einzelnen Thread verwenden, um ihre Pipeline zu verarbeiten. Sequentielle Streams nutzen das Multicore-System niemals aus, selbst wenn das zugrunde liegende System möglicherweise eine parallele Ausführung unterstützt. Was passiert zum Beispiel, wenn wir Multithreading anwenden, um den Stream zu verarbeiten? Selbst dann arbeitet es jeweils auf einem einzelnen Kern. Es kann jedoch von einem Kern zum anderen springen, es sei denn, es ist explizit an einen bestimmten Kern gepinnt. Beispielsweise ist die Verarbeitung in vier verschiedenen Threads gegenüber vier verschiedenen Kernen offensichtlich anders, wenn ersterer nicht mit letzterem übereinstimmt. Es ist durchaus möglich, mehrere Threads in einer Single-Core-Umgebung auszuführen, aber Parallelverarbeitung ist ein ganz anderes Genre. Ein Programm muss von Grund auf für die parallele Programmierung entwickelt werden, abgesehen von der Ausführung in einer Umgebung, die dies unterstützt. Aus diesem Grund ist die parallele Programmierung ein komplexes Gebiet.

Lassen Sie uns ein Beispiel ausprobieren, um die Idee weiter zu veranschaulichen.

package org.mano.example;

import java.util.Arrays;
import java.util.List;

public class Main2 {
   public static oid main(String[] args) {
      List<Integer> list=Arrays.asList(1,2,3,4,5,6,7,8,9);
      list.stream().forEach(System.out::println);
      System.out.println();
      list.parallelStream().forEach(System.out::println);
   }
}

Ausgabe

123456789
685973214

Dieses Beispiel ist eine Veranschaulichung von q sequentiellem Strom sowie q parallelem Strom im Betrieb. Die list.stream() arbeitet nacheinander auf einem einzelnen Thread mit println() Betrieb. list.parallelStream() , wird hingegen parallel verarbeitet, wobei die zugrunde liegende Multicore-Umgebung voll ausgenutzt wird. Der interessante Aspekt liegt in der Ausgabe des vorhergehenden Programms. Bei einem sequentiellen Stream wird der Inhalt der Liste in einer geordneten Reihenfolge gedruckt. Die Ausgabe des parallelen Streams ist dagegen ungeordnet und die Reihenfolge ändert sich bei jedem Programmlauf. Dies bedeutet mindestens eines:den Aufruf von list.parallelStream() Methode macht die println -Anweisung in mehreren Threads operieren, etwas, das list.stream() tut in einem einzigen Thread.

Parallel-Stream

Die Hauptmotivation hinter der Verwendung eines parallelen Streams besteht darin, die Stream-Verarbeitung zu einem Teil der parallelen Programmierung zu machen, selbst wenn das gesamte Programm möglicherweise nicht parallelisiert wird. Paralleler Stream nutzt Multicore-Prozessoren, was zu einer erheblichen Leistungssteigerung führt. Anders als jede parallele Programmierung sind sie komplex und fehleranfällig. Die Java-Stream-Bibliothek bietet jedoch die Möglichkeit, dies einfach und zuverlässig zu tun. Das gesamte Programm darf nicht parallelisiert werden. aber zumindest der Teil, der den Stream verarbeitet, kann parallelisiert werden. Sie sind eigentlich ziemlich einfach in dem Sinne, dass wir einige Methoden aufrufen können und der Rest erledigt wird. Es gibt ein paar Möglichkeiten, dies zu tun. Eine Möglichkeit besteht darin, einen parallelen Stream durch Aufrufen von parallelStream() zu erhalten Methode definiert durch Collection . Eine andere Möglichkeit besteht darin, parallel() aufzurufen Methode definiert durch BaseStream in einem sequentiellen Stream. Der sequentielle Strom wird durch den Aufruf parallelisiert. Beachten Sie, dass die zugrunde liegende Plattform parallele Programmierung unterstützen muss, z. B. bei einem Multicore-System. Andernfalls hat der Aufruf keinen Sinn. Der Stream würde in einem solchen Fall der Reihe nach verarbeitet werden, auch wenn wir den Aufruf vorgenommen haben. Wenn der Aufruf auf einem bereits parallelen Stream erfolgt, tut er nichts und gibt einfach den Stream zurück.

Um sicherzustellen, dass das Ergebnis der auf den Stream angewendeten parallelen Verarbeitung dasselbe ist wie das der sequentiellen Verarbeitung, müssen parallele Streams zustandslos, nicht störend und assoziativ sein.

Ein kurzes Beispiel

package org.mano.example;

import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

public class Main {

   public static void main(String[] args) {
      List<Employee> employees = Arrays.asList(
         new Employee(1276, "FFF",2000.00),
         new Employee(7865, "AAA",1200.00),
         new Employee(4975, "DDD",3000.00),
         new Employee(4499, "CCC",1500.00),
         new Employee(9937, "GGG",2800.00),
         new Employee(5634, "HHH",1100.00),
         new Employee(9276, "BBB",3200.00),
         new Employee(6852, "EEE",3400.00));

      System.out.println("Original List");
      printList(employees);

      // Using sequential stream
      long start = System.currentTimeMillis();
      List<Employee> sortedItems = employees.stream()
         .sorted(Comparator
            .comparing(Employee::getName))
         .collect(Collectors.toList());
      long end = System.currentTimeMillis();

      System.out.println("sorted using sequential stream");
      printList(sortedItems);
      System.out.println("Total the time taken process :"
         + (end - start) + " milisec.");

      // Using parallel stream
      start = System.currentTimeMillis();
      List<Employee> anotherSortedItems = employees
         .parallelStream().sorted(Comparator
            .comparing(Employee::getName))
         .collect(Collectors.toList());
      end = System.currentTimeMillis();

      System.out.println("sorted using parallel stream");
      printList(anotherSortedItems);
      System.out.println("Total the time taken process :"
         + (end - start) + " milisec.");


      double totsal=employees.parallelStream()
         .map(e->e.getSalary())
         .reduce(0.00,(a1,a2)->a1+a2);
      System.out.println("Total Salary expense: "+totsal);
      Optional<Employee> maxSal=employees.parallelStream()
         .reduce((Employee e1, Employee e2)->
         e1.getSalary()<e2.getSalary()?e2:e1);
      if(maxSal.isPresent())
         System.out.println(maxSal.get().toString());
   }

   public static void printList(List<Employee> list) {
      for (Employee e : list)
         System.out.println(e.toString());
   }
}


package org.mano.example;

public class Employee {
   private int empid;
   private String name;
   private double salary;

   public Employee() {
      super();
   }

   public Employee(int empid, String name,
         double salary) {
      super();
      this.empid = empid;
      this.name = name;
      this.salary = salary;
   }

   public int getEmpid() {
      return empid;
   }

   public void setEmpid(int empid) {
      this.empid = empid;
   }

   public String getName() {
      return name;
   }

   public void setName(String name) {
      this.name = name;
   }

   public double getSalary() {
      return salary;
   }

   public void setSalary(double salary) {
      this.salary = salary;
   }

   @Override
   public String toString() {
      return "Employee [empid=" + empid + ", name="
         + name + ", salary=" + salary + "]";
   }
}

Beachten Sie im vorherigen Code, wie wir die Sortierung auf einen Stream angewendet haben, indem wir die sequentielle Ausführung verwendet haben.

List<Employee> sortedItems = employees.stream()
               .sorted(Comparator
               .comparing(Employee::getName))
               .collect(Collectors.toList());

und parallele Ausführung wird durch geringfügige Änderung des Codes erreicht.

List<Employee> anotherSortedItems = employees
               .parallelStream().sorted(Comparator
               .comparing(Employee::getName))
               .collect(Collectors.toList());

Wir vergleichen auch die Systemzeit, um eine Vorstellung davon zu bekommen, welcher Teil des Codes mehr Zeit in Anspruch nimmt. Der parallele Betrieb beginnt, sobald der parallele Stream explizit von parallelStream() abgerufen wird Methode. Es gibt noch eine weitere interessante Methode namens reduce() . Wenn wir diese Methode auf einen parallelen Stream anwenden, kann die Operation in verschiedenen Threads stattfinden.

Wir können jedoch je nach Bedarf immer zwischen parallel und sequenziell wechseln. Wenn wir den parallelen Stream in einen sequentiellen ändern möchten, können wir dies tun, indem wir sequential() aufrufen Methode, die von BaseStream angegeben wird . Wie wir in unserem ersten Programm gesehen haben, kann die auf dem Stream ausgeführte Operation gemäß der Reihenfolge der Elemente geordnet oder ungeordnet sein. Das bedeutet, dass die Reihenfolge von der Datenquelle abhängt. Dies ist jedoch bei parallelen Strömen nicht der Fall. Um die Leistung zu steigern, werden sie parallel verarbeitet. Da dies ohne jede Reihenfolge erfolgt, bei der jede Partition des Streams unabhängig von den anderen Partitionen ohne Koordination verarbeitet wird, ist die Folge unvorhersehbar ungeordnet. Wenn wir jedoch speziell eine Operation für jedes zu bestellende Element im parallelen Stream ausführen möchten, können wir forEachOrdered() in Betracht ziehen Methode, die eine Alternative zu forEach() ist Methode.

Schlussfolgerung

Die Stream-APIs waren lange Zeit ein Teil von Java, aber das Hinzufügen der Optimierung der parallelen Verarbeitung ist sehr einladend und gleichzeitig ein ziemlich faszinierendes Feature. Dies gilt insbesondere, weil moderne Maschinen Multicore sind und es ein Stigma gibt, dass paralleles Programmierdesign komplex ist. Die von Java bereitgestellten APIs bieten die Möglichkeit, einen Hauch von Optimierungen der parallelen Programmierung in ein Java-Programm zu integrieren, das das Gesamtdesign der sequentiellen Ausführung hat. Das ist vielleicht der beste Teil dieser Funktion.