Java kan parallelisere stream-operationer for at udnytte multi-core-systemer. Denne artikel giver et perspektiv og viser, hvordan parallel stream kan forbedre ydeevnen med passende eksempler.
Streams i Java
En stream i Java er en sekvens af objekter repræsenteret som en kanal af data. Det har normalt en kilde hvor dataene er placeret og en destination hvor det transmitteres. Bemærk, at en strøm ikke er et depot; i stedet opererer den på en datakilde, såsom på en matrix eller en samling. De mellemliggende stykker i passagen kaldes faktisk strømmen. Under transmissionsprocessen gennemgår strømmen normalt en eller flere mulige transformationer, såsom filtrering eller sortering, eller det kan være en hvilken som helst anden proces, der opererer på dataene. Dette tilpasser de originale data til en anden form, typisk i henhold til programmørens behov. Derfor oprettes en ny strøm i overensstemmelse med den operation, der anvendes på den. For eksempel, når en strøm er sorteret, resulterer det i en ny strøm, der producerer et resultat, som derefter er sorteret. Det betyder, at de nye data er en transformeret kopi af originalen i stedet for at være i den originale form.
Sekventiel stream
Enhver stream-operation i Java, medmindre det udtrykkeligt er angivet som parallel, behandles sekventielt. De er dybest set ikke-parallelle strømme, der bruges en enkelt tråd til at behandle deres pipeline. Sekventielle streams drager aldrig fordel af multicore-systemet, selvom det underliggende system muligvis understøtter parallel eksekvering. Hvad sker der for eksempel, når vi anvender multithreading til at behandle streamen? Selv da opererer den på en enkelt kerne ad gangen. Den kan dog hoppe fra en kerne til en anden, medmindre den er eksplicit fastgjort til en specifik kerne. For eksempel er bearbejdning i fire forskellige tråde versus fire forskellige kerner naturligvis anderledes, hvor førstnævnte ikke matcher sidstnævnte. Det er ganske muligt at udføre flere tråde i et enkelt kernemiljø, men parallel behandling er en helt anden genre. Et program skal være designet til at kunne parallelprogrammering bortset fra at udføres i et miljø, der understøtter det. Dette er grunden til, at parallel programmering er en kompleks arena.
Lad os prøve et eksempel for at illustrere ideen yderligere.
package org.mano.example; import java.util.Arrays; import java.util.List; public class Main2 { public static oid main(String[] args) { List<Integer> list=Arrays.asList(1,2,3,4,5,6,7,8,9); list.stream().forEach(System.out::println); System.out.println(); list.parallelStream().forEach(System.out::println); } }
Output
123456789 685973214
Dette eksempel er en illustration af q sekventiel strøm såvel som q parallel strøm i drift. list.stream() arbejder i rækkefølge på en enkelt tråd med println() operation. list.parallelStream() , på den anden side, behandles parallelt, hvilket drager fuld fordel af det underliggende multicore-miljø. Det interessante aspekt ligger i outputtet fra det foregående program. I tilfælde af en sekventiel strøm udskrives listens indhold i en ordnet rækkefølge. Udgangen af den parallelle strøm er på den anden side uordnet, og sekvensen ændres hver gang programmet køres. Dette betyder mindst én ting:denne påkaldelse af list.parallelStream() metoden gør println sætning fungerer i flere tråde, noget som list.stream() gør i en enkelt tråd.
Parallel stream
Den primære motivation bag at bruge en parallel strøm er at gøre strømbehandling til en del af den parallelle programmering, selvom hele programmet måske ikke er paralleliseret. Parallel stream udnytter multicore-processorer, hvilket resulterer i en væsentlig stigning i ydeevnen. I modsætning til enhver parallel programmering er de komplekse og fejltilbøjelige. Java-streambiblioteket giver dog mulighed for at gøre det nemt og på en pålidelig måde. Hele programmet kan ikke paralleliseres. men i det mindste kan den del, der håndterer strømmen, paralleliseres. De er faktisk ret simple i den forstand, at vi kan påberåbe os nogle få metoder, og resten bliver taget hånd om. Der er et par måder at gøre det på. En sådan måde er at opnå en parallel strøm ved at kalde parallelStream() metode defineret af Indsamling . En anden måde er at kalde parallel() metode defineret af BaseStream på en sekventiel strøm. Den sekventielle strøm er paralleliseret af påkaldelsen. Bemærk, at den underliggende platform skal understøtte parallel programmering, f.eks. med et multicore-system. Ellers er der ingen mening i invokationen. Strømmen vil blive behandlet i rækkefølge i et sådant tilfælde, selvom vi har lavet påkaldelsen. Hvis invokationen foretages på en allerede parallel strøm, gør den intet og returnerer simpelthen strømmen.
For at sikre, at resultatet af parallel behandling anvendt på stream er det samme som opnås ved sekventiel behandling, skal parallelle strømme være statsløse, ikke-interfererende og associative.
Et hurtigt eksempel
package org.mano.example; import java.util.Arrays; import java.util.Comparator; import java.util.List; import java.util.Optional; import java.util.stream.Collectors; public class Main { public static void main(String[] args) { List<Employee> employees = Arrays.asList( new Employee(1276, "FFF",2000.00), new Employee(7865, "AAA",1200.00), new Employee(4975, "DDD",3000.00), new Employee(4499, "CCC",1500.00), new Employee(9937, "GGG",2800.00), new Employee(5634, "HHH",1100.00), new Employee(9276, "BBB",3200.00), new Employee(6852, "EEE",3400.00)); System.out.println("Original List"); printList(employees); // Using sequential stream long start = System.currentTimeMillis(); List<Employee> sortedItems = employees.stream() .sorted(Comparator .comparing(Employee::getName)) .collect(Collectors.toList()); long end = System.currentTimeMillis(); System.out.println("sorted using sequential stream"); printList(sortedItems); System.out.println("Total the time taken process :" + (end - start) + " milisec."); // Using parallel stream start = System.currentTimeMillis(); List<Employee> anotherSortedItems = employees .parallelStream().sorted(Comparator .comparing(Employee::getName)) .collect(Collectors.toList()); end = System.currentTimeMillis(); System.out.println("sorted using parallel stream"); printList(anotherSortedItems); System.out.println("Total the time taken process :" + (end - start) + " milisec."); double totsal=employees.parallelStream() .map(e->e.getSalary()) .reduce(0.00,(a1,a2)->a1+a2); System.out.println("Total Salary expense: "+totsal); Optional<Employee> maxSal=employees.parallelStream() .reduce((Employee e1, Employee e2)-> e1.getSalary()<e2.getSalary()?e2:e1); if(maxSal.isPresent()) System.out.println(maxSal.get().toString()); } public static void printList(List<Employee> list) { for (Employee e : list) System.out.println(e.toString()); } } package org.mano.example; public class Employee { private int empid; private String name; private double salary; public Employee() { super(); } public Employee(int empid, String name, double salary) { super(); this.empid = empid; this.name = name; this.salary = salary; } public int getEmpid() { return empid; } public void setEmpid(int empid) { this.empid = empid; } public String getName() { return name; } public void setName(String name) { this.name = name; } public double getSalary() { return salary; } public void setSalary(double salary) { this.salary = salary; } @Override public String toString() { return "Employee [empid=" + empid + ", name=" + name + ", salary=" + salary + "]"; } }
Bemærk i den forrige kode, hvordan vi har anvendt sortering på en stream en ved at bruge sekventiel udførelse.
List<Employee> sortedItems = employees.stream() .sorted(Comparator .comparing(Employee::getName)) .collect(Collectors.toList());
og parallel eksekvering opnås ved at ændre koden lidt.
List<Employee> anotherSortedItems = employees .parallelStream().sorted(Comparator .comparing(Employee::getName)) .collect(Collectors.toList());
Vi vil også sammenligne systemtiden for at få en idé om, hvilken del af koden, der tager mere tid. Parallel drift begynder, når den parallelle strøm eksplicit er opnået af parallelStream() metode. Der er en anden interessant metode, kaldet reduce() . Når vi anvender denne metode til en parallel strøm, kan operationen forekomme i forskellige tråde.
Vi kan dog altid skifte mellem parallel og sekventiel efter behov. Hvis vi ønsker at ændre den parallelle strøm til sekventiel, kan vi gøre det ved at kalde sekventiel() metode specificeret af BaseStream . Som vi så i vores første program, kan den operation, der udføres på streamen, bestilles eller uordnes i henhold til rækkefølgen af elementerne. Det betyder, at rækkefølgen afhænger af datakilden. Dette er dog ikke tilfældet i tilfælde af parallelle vandløb. For at øge ydeevnen behandles de parallelt. Fordi dette gøres uden nogen sekvens, hvor hver partition af strømmen behandles uafhængigt af de andre partitioner uden nogen koordinering, er konsekvensen uforudsigeligt uordnet. Men hvis vi specifikt ønsker at udføre en operation på hvert element i den parallelle strøm, der skal bestilles, kan vi overveje forEachOrdered() metode, som er et alternativ til forEach() metode.
Konklusion
Stream-API'erne har været en del af Java i lang tid, men tilføjelsen af tweak af parallel behandling er meget imødekommende og på samme tid en ret spændende funktion. Dette gælder især, fordi moderne maskiner er multicore, og der er et stigma om, at parallelprogrammeringsdesign er komplekst. API'erne leveret af Java giver mulighed for at inkorporere et skær af parallelle programmeringsjusteringer i et Java-program, der har det overordnede design af sekventiel udførelse. Dette er måske den bedste del af denne funktion.