JavaNetworkWordCount Researches

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

JavaNetworkWordCount Researches

Eduardo Costa Alfaia
Hi Guys,

I did some changes in JavaNetworkWordCount for my researches in streaming process and I have added to the code the following lines in red:

ssc1.checkpoint("hdfs://computer22:54310/user/root/INPUT");
 JavaDStream<String> lines1 = ssc1.socketTextStream("localhost", Integer.parseInt("12345"));
 JavaDStream<String> lines2 = ssc1.socketTextStream("localhost", Integer.parseInt("12345"));
 JavaDStream<String> union2 = lines1.union(lines2);
     JavaDStream<String> words = union2.flatMap(new FlatMapFunction<String, String>() {
     @Override
       public Iterable<String> call(String x) {
          return Lists.newArrayList(SPACE.split(x));
        }
     });
     JavaPairDStream<String, Integer> wordCounts = words.map(
new PairFunction<String, String, Integer>() {
          @Override
          public Tuple2<String, Integer> call(String s) {
           return new Tuple2<String, Integer>(s, 1);
         }
        }).reduceByKey(new Function2<Integer, Integer, Integer>() {
          @Override
         public Integer call(Integer i1, Integer i2) {
           return i1 + i2;
          }
        });
 
      JavaPairDStream<String, Integer> counts = wordCounts.reduceByKeyAndWindow(
        new Function2<Integer, Integer, Integer>() {
          public Integer call(Integer i1, Integer i2) { return i1 + i2; }
        },
        new Function2<Integer, Integer, Integer>() {
          public Integer call(Integer i1, Integer i2) { return i1 - i2; }
        },
        new Duration(60 * 5 * 1000),
     new Duration(1 * 1000)
         
         counts.print();
   ssc1.start();
 
   }
 }


- We did a code in C that send words to workers.

- Result From Master terminal:

Time: 1389794084000 ms
-------------------------------------------
(,14294)
(impertinences,2)
(protracted.,3)
(burlesque.,3)
(Dorothea,,85)
(grant,,5)
(temples,,2)
(discord,17)
(conscience,48)
(singed,,2)
...

-------------------------------------------
Time: 1389794085000 ms
-------------------------------------------
(,38580)
(impertinences,5)
(protracted.,7)
(burlesque.,7)
(Dorothea,,259)
(grant,,12)
(temples,,7)
(discord,47)
(conscience,130)
(singed,,5)
...

My question is, where does it happening the union()? between in the nodes or in the master?  I am using three machines( 1 Master + 2 Nodes).
How could I get a total count of the words and show in the terminal?

Thanks all  




--
---
INFORMATIVA SUL TRATTAMENTO DEI DATI PERSONALI

I dati utilizzati per l'invio del presente messaggio sono trattati
dall'Università degli Studi di Brescia esclusivamente per finalità
istituzionali. Informazioni più dettagliate anche in ordine ai diritti
dell'interessato sono riposte nell'informativa generale e nelle notizie
pubblicate sul sito web dell'Ateneo nella sezione "Privacy".

Il contenuto di questo messaggio è rivolto unicamente alle persona cui
è indirizzato e può contenere informazioni la cui riservatezza è
tutelata legalmente. Ne sono vietati la riproduzione, la diffusione e l'uso
in mancanza di autorizzazione del destinatario. Qualora il messaggio
fosse pervenuto per errore, preghiamo di eliminarlo.
Reply | Threaded
Open this post in threaded view
|

Re: JavaNetworkWordCount Researches

Tathagata Das
All the computation with the data (that is, union, flatmap, map,
reduceByKey, reduceByKeyAndWindow) are executed on the workers in a
distributed manner. The data is received by the worker nodes and kept in
memory, then the computation is executed on the workers to the in-memory
data.

After the count is computed for every batch of data, the first 10 elements
of the generated counts are brought to master for being printed on the
screen. This is done by the counts.print() which pulls those 10 word-count
pairs and prints them.

On a related note, if you only want to counts over a window, you dont need
the first reduceByKey. The reduceByKeyAndWindow takes care of doing the
reduceByKey per batch and then doing the reduce across a window.

TD


On Wed, Jan 15, 2014 at 6:01 AM, Eduardo Costa Alfaia <
[hidden email]> wrote:

> Hi Guys,
>
> I did some changes in JavaNetworkWordCount for my researches in streaming
> process and I have added to the code the following lines in red:
>
> ssc1.checkpoint("hdfs://computer22:54310/user/root/INPUT");
>  JavaDStream<String> lines1 = ssc1.socketTextStream("localhost",
> Integer.parseInt("12345"));
>  JavaDStream<String> lines2 = ssc1.socketTextStream("localhost",
> Integer.parseInt("12345"));
>  JavaDStream<String> union2 = lines1.union(lines2);
>      JavaDStream<String> words = union2.flatMap(new
> FlatMapFunction<String, String>() {
>      @Override
>        public Iterable<String> call(String x) {
>           return Lists.newArrayList(SPACE.split(x));
>         }
>      });
>      JavaPairDStream<String, Integer> wordCounts = words.map(
> new PairFunction<String, String, Integer>() {
>           @Override
>           public Tuple2<String, Integer> call(String s) {
>            return new Tuple2<String, Integer>(s, 1);
>          }
>         }).reduceByKey(new Function2<Integer, Integer, Integer>() {
>           @Override
>          public Integer call(Integer i1, Integer i2) {
>            return i1 + i2;
>           }
>         });
>
>       JavaPairDStream<String, Integer> counts =
> wordCounts.reduceByKeyAndWindow(
>         new Function2<Integer, Integer, Integer>() {
>           public Integer call(Integer i1, Integer i2) { return i1 + i2; }
>         },
>         new Function2<Integer, Integer, Integer>() {
>           public Integer call(Integer i1, Integer i2) { return i1 - i2; }
>         },
>         new Duration(60 * 5 * 1000),
>      new Duration(1 * 1000)
>
>          counts.print();
>    ssc1.start();
>
>    }
>  }
>
>
> - We did a code in C that send words to workers.
>
> - Result From Master terminal:
>
> Time: 1389794084000 ms
> -------------------------------------------
> (,14294)
> (impertinences,2)
> (protracted.,3)
> (burlesque.,3)
> (Dorothea,,85)
> (grant,,5)
> (temples,,2)
> (discord,17)
> (conscience,48)
> (singed,,2)
> ...
>
> -------------------------------------------
> Time: 1389794085000 ms
> -------------------------------------------
> (,38580)
> (impertinences,5)
> (protracted.,7)
> (burlesque.,7)
> (Dorothea,,259)
> (grant,,12)
> (temples,,7)
> (discord,47)
> (conscience,130)
> (singed,,5)
> ...
>
> My question is, where does it happening the union()? between in the nodes
> or in the master?  I am using three machines( 1 Master + 2 Nodes).
> How could I get a total count of the words and show in the terminal?
>
> Thanks all
>
>
>
>
> --
> ---
> INFORMATIVA SUL TRATTAMENTO DEI DATI PERSONALI
>
> I dati utilizzati per l'invio del presente messaggio sono trattati
> dall'Università degli Studi di Brescia esclusivamente per finalità
> istituzionali. Informazioni più dettagliate anche in ordine ai diritti
> dell'interessato sono riposte nell'informativa generale e nelle notizie
> pubblicate sul sito web dell'Ateneo nella sezione "Privacy".
>
> Il contenuto di questo messaggio è rivolto unicamente alle persona cui
> è indirizzato e può contenere informazioni la cui riservatezza è
> tutelata legalmente. Ne sono vietati la riproduzione, la diffusione e l'uso
> in mancanza di autorizzazione del destinatario. Qualora il messaggio
> fosse pervenuto per errore, preghiamo di eliminarlo.
>
Reply | Threaded
Open this post in threaded view
|

Re: JavaNetworkWordCount Researches

Eduardo Costa Alfaia
Hi Tathagata,
Thank you very much by the explain.
Another curiosity is that I did some tests with this code yesterday where I used three machines like worker and I can see that one these machines have had the RAM memory increased, about 90% in use,  in compare the others this  hasn’t changed drastically and in this same machine I can see that the parts of file, in this case I am using the book Don Quixote in txt,  are save in hard disk specifically in /tmp/spark-local<numbers> increasing the used space. Sorry by the severals questions I am a newer in Stream processing and I looking for understand better how to work Spark DStream.

Best Regards

On Jan 16, 2014, at 1:48, Tathagata Das <[hidden email]> wrote:

> All the computation with the data (that is, union, flatmap, map,
> reduceByKey, reduceByKeyAndWindow) are executed on the workers in a
> distributed manner. The data is received by the worker nodes and kept in
> memory, then the computation is executed on the workers to the in-memory
> data.
>
> After the count is computed for every batch of data, the first 10 elements
> of the generated counts are brought to master for being printed on the
> screen. This is done by the counts.print() which pulls those 10 word-count
> pairs and prints them.
>
> On a related note, if you only want to counts over a window, you dont need
> the first reduceByKey. The reduceByKeyAndWindow takes care of doing the
> reduceByKey per batch and then doing the reduce across a window.
>
> TD
>
>
> On Wed, Jan 15, 2014 at 6:01 AM, Eduardo Costa Alfaia <
> [hidden email]> wrote:
>
>> Hi Guys,
>>
>> I did some changes in JavaNetworkWordCount for my researches in streaming
>> process and I have added to the code the following lines in red:
>>
>> ssc1.checkpoint("hdfs://computer22:54310/user/root/INPUT");
>> JavaDStream<String> lines1 = ssc1.socketTextStream("localhost",
>> Integer.parseInt("12345"));
>> JavaDStream<String> lines2 = ssc1.socketTextStream("localhost",
>> Integer.parseInt("12345"));
>> JavaDStream<String> union2 = lines1.union(lines2);
>>     JavaDStream<String> words = union2.flatMap(new
>> FlatMapFunction<String, String>() {
>>     @Override
>>       public Iterable<String> call(String x) {
>>          return Lists.newArrayList(SPACE.split(x));
>>        }
>>     });
>>     JavaPairDStream<String, Integer> wordCounts = words.map(
>> new PairFunction<String, String, Integer>() {
>>          @Override
>>          public Tuple2<String, Integer> call(String s) {
>>           return new Tuple2<String, Integer>(s, 1);
>>         }
>>        }).reduceByKey(new Function2<Integer, Integer, Integer>() {
>>          @Override
>>         public Integer call(Integer i1, Integer i2) {
>>           return i1 + i2;
>>          }
>>        });
>>
>>      JavaPairDStream<String, Integer> counts =
>> wordCounts.reduceByKeyAndWindow(
>>        new Function2<Integer, Integer, Integer>() {
>>          public Integer call(Integer i1, Integer i2) { return i1 + i2; }
>>        },
>>        new Function2<Integer, Integer, Integer>() {
>>          public Integer call(Integer i1, Integer i2) { return i1 - i2; }
>>        },
>>        new Duration(60 * 5 * 1000),
>>     new Duration(1 * 1000)
>>
>>         counts.print();
>>   ssc1.start();
>>
>>   }
>> }
>>
>>
>> - We did a code in C that send words to workers.
>>
>> - Result From Master terminal:
>>
>> Time: 1389794084000 ms
>> -------------------------------------------
>> (,14294)
>> (impertinences,2)
>> (protracted.,3)
>> (burlesque.,3)
>> (Dorothea,,85)
>> (grant,,5)
>> (temples,,2)
>> (discord,17)
>> (conscience,48)
>> (singed,,2)
>> ...
>>
>> -------------------------------------------
>> Time: 1389794085000 ms
>> -------------------------------------------
>> (,38580)
>> (impertinences,5)
>> (protracted.,7)
>> (burlesque.,7)
>> (Dorothea,,259)
>> (grant,,12)
>> (temples,,7)
>> (discord,47)
>> (conscience,130)
>> (singed,,5)
>> ...
>>
>> My question is, where does it happening the union()? between in the nodes
>> or in the master?  I am using three machines( 1 Master + 2 Nodes).
>> How could I get a total count of the words and show in the terminal?
>>
>> Thanks all
>>
>>
>>
>>
>> --
>> ---
>> INFORMATIVA SUL TRATTAMENTO DEI DATI PERSONALI
>>
>> I dati utilizzati per l'invio del presente messaggio sono trattati
>> dall'Università degli Studi di Brescia esclusivamente per finalità
>> istituzionali. Informazioni più dettagliate anche in ordine ai diritti
>> dell'interessato sono riposte nell'informativa generale e nelle notizie
>> pubblicate sul sito web dell'Ateneo nella sezione "Privacy".
>>
>> Il contenuto di questo messaggio è rivolto unicamente alle persona cui
>> è indirizzato e può contenere informazioni la cui riservatezza è
>> tutelata legalmente. Ne sono vietati la riproduzione, la diffusione e l'uso
>> in mancanza di autorizzazione del destinatario. Qualora il messaggio
>> fosse pervenuto per errore, preghiamo di eliminarlo.
>>


--
---
INFORMATIVA SUL TRATTAMENTO DEI DATI PERSONALI

I dati utilizzati per l'invio del presente messaggio sono trattati
dall'Università degli Studi di Brescia esclusivamente per finalità
istituzionali. Informazioni più dettagliate anche in ordine ai diritti
dell'interessato sono riposte nell'informativa generale e nelle notizie
pubblicate sul sito web dell'Ateneo nella sezione "Privacy".

Il contenuto di questo messaggio è rivolto unicamente alle persona cui
è indirizzato e può contenere informazioni la cui riservatezza è
tutelata legalmente. Ne sono vietati la riproduzione, la diffusione e l'uso
in mancanza di autorizzazione del destinatario. Qualora il messaggio
fosse pervenuto per errore, preghiamo di eliminarlo.
Reply | Threaded
Open this post in threaded view
|

Re: JavaNetworkWordCount Researches

Tathagata Das
Hi Eduardo,

If the streaming data is sent to Worker X, then the data is stored in the
memory of Worker X and another worker Y. if replication is disabled through
the StorageLevel in the input stream, then only worker X. That is why you
could be seeing one the machines have a high memory usage. The data is
essentially stored as RDDs within the memory of the Spark's worker
processes and by default, the used data is thrown out using Spark's default
LRU method. If you want to reduce the memory consumption within Spark's
storage memory, then there are two options.
1. You can set the Java property "spark.cleaner.ttl" appropriately (see Spark
configuration<http://spark.incubator.apache.org/docs/latest/configuration.html>).
This forces all data persisted in memory to be cleaned up. But this needs
to be set conservative so that you dont accidentally clear up data that
havent been process yet.
2. In the latest code of Spark Streaming (master branch of the Spark
repo<https://github.com/apache/incubator-spark/>),
if you set the property spark.streaming.unpersist, then the Spark Streaming
will automatically throw away data that is not needed any more.

Hopefully this clarifies the doubts. Feel free to ask more questions.

TD


On Thu, Jan 16, 2014 at 2:03 AM, Eduardo Costa Alfaia <
[hidden email]> wrote:

> Hi Tathagata,
> Thank you very much by the explain.
> Another curiosity is that I did some tests with this code yesterday where
> I used three machines like worker and I can see that one these machines
> have had the RAM memory increased, about 90% in use,  in compare the others
> this  hasn’t changed drastically and in this same machine I can see that
> the parts of file, in this case I am using the book Don Quixote in txt,
>  are save in hard disk specifically in /tmp/spark-local<numbers> increasing
> the used space. Sorry by the severals questions I am a newer in Stream
> processing and I looking for understand better how to work Spark DStream.
>
> Best Regards
>
> On Jan 16, 2014, at 1:48, Tathagata Das <[hidden email]>
> wrote:
>
> > All the computation with the data (that is, union, flatmap, map,
> > reduceByKey, reduceByKeyAndWindow) are executed on the workers in a
> > distributed manner. The data is received by the worker nodes and kept in
> > memory, then the computation is executed on the workers to the in-memory
> > data.
> >
> > After the count is computed for every batch of data, the first 10
> elements
> > of the generated counts are brought to master for being printed on the
> > screen. This is done by the counts.print() which pulls those 10
> word-count
> > pairs and prints them.
> >
> > On a related note, if you only want to counts over a window, you dont
> need
> > the first reduceByKey. The reduceByKeyAndWindow takes care of doing the
> > reduceByKey per batch and then doing the reduce across a window.
> >
> > TD
> >
> >
> > On Wed, Jan 15, 2014 at 6:01 AM, Eduardo Costa Alfaia <
> > [hidden email]> wrote:
> >
> >> Hi Guys,
> >>
> >> I did some changes in JavaNetworkWordCount for my researches in
> streaming
> >> process and I have added to the code the following lines in red:
> >>
> >> ssc1.checkpoint("hdfs://computer22:54310/user/root/INPUT");
> >> JavaDStream<String> lines1 = ssc1.socketTextStream("localhost",
> >> Integer.parseInt("12345"));
> >> JavaDStream<String> lines2 = ssc1.socketTextStream("localhost",
> >> Integer.parseInt("12345"));
> >> JavaDStream<String> union2 = lines1.union(lines2);
> >>     JavaDStream<String> words = union2.flatMap(new
> >> FlatMapFunction<String, String>() {
> >>     @Override
> >>       public Iterable<String> call(String x) {
> >>          return Lists.newArrayList(SPACE.split(x));
> >>        }
> >>     });
> >>     JavaPairDStream<String, Integer> wordCounts = words.map(
> >> new PairFunction<String, String, Integer>() {
> >>          @Override
> >>          public Tuple2<String, Integer> call(String s) {
> >>           return new Tuple2<String, Integer>(s, 1);
> >>         }
> >>        }).reduceByKey(new Function2<Integer, Integer, Integer>() {
> >>          @Override
> >>         public Integer call(Integer i1, Integer i2) {
> >>           return i1 + i2;
> >>          }
> >>        });
> >>
> >>      JavaPairDStream<String, Integer> counts =
> >> wordCounts.reduceByKeyAndWindow(
> >>        new Function2<Integer, Integer, Integer>() {
> >>          public Integer call(Integer i1, Integer i2) { return i1 + i2; }
> >>        },
> >>        new Function2<Integer, Integer, Integer>() {
> >>          public Integer call(Integer i1, Integer i2) { return i1 - i2; }
> >>        },
> >>        new Duration(60 * 5 * 1000),
> >>     new Duration(1 * 1000)
> >>
> >>         counts.print();
> >>   ssc1.start();
> >>
> >>   }
> >> }
> >>
> >>
> >> - We did a code in C that send words to workers.
> >>
> >> - Result From Master terminal:
> >>
> >> Time: 1389794084000 ms
> >> -------------------------------------------
> >> (,14294)
> >> (impertinences,2)
> >> (protracted.,3)
> >> (burlesque.,3)
> >> (Dorothea,,85)
> >> (grant,,5)
> >> (temples,,2)
> >> (discord,17)
> >> (conscience,48)
> >> (singed,,2)
> >> ...
> >>
> >> -------------------------------------------
> >> Time: 1389794085000 ms
> >> -------------------------------------------
> >> (,38580)
> >> (impertinences,5)
> >> (protracted.,7)
> >> (burlesque.,7)
> >> (Dorothea,,259)
> >> (grant,,12)
> >> (temples,,7)
> >> (discord,47)
> >> (conscience,130)
> >> (singed,,5)
> >> ...
> >>
> >> My question is, where does it happening the union()? between in the
> nodes
> >> or in the master?  I am using three machines( 1 Master + 2 Nodes).
> >> How could I get a total count of the words and show in the terminal?
> >>
> >> Thanks all
> >>
> >>
> >>
> >>
> >> --
> >> ---
> >> INFORMATIVA SUL TRATTAMENTO DEI DATI PERSONALI
> >>
> >> I dati utilizzati per l'invio del presente messaggio sono trattati
> >> dall'Università degli Studi di Brescia esclusivamente per finalità
> >> istituzionali. Informazioni più dettagliate anche in ordine ai diritti
> >> dell'interessato sono riposte nell'informativa generale e nelle notizie
> >> pubblicate sul sito web dell'Ateneo nella sezione "Privacy".
> >>
> >> Il contenuto di questo messaggio è rivolto unicamente alle persona cui
> >> è indirizzato e può contenere informazioni la cui riservatezza è
> >> tutelata legalmente. Ne sono vietati la riproduzione, la diffusione e
> l'uso
> >> in mancanza di autorizzazione del destinatario. Qualora il messaggio
> >> fosse pervenuto per errore, preghiamo di eliminarlo.
> >>
>
>
> --
> ---
> INFORMATIVA SUL TRATTAMENTO DEI DATI PERSONALI
>
> I dati utilizzati per l'invio del presente messaggio sono trattati
> dall'Università degli Studi di Brescia esclusivamente per finalità
> istituzionali. Informazioni più dettagliate anche in ordine ai diritti
> dell'interessato sono riposte nell'informativa generale e nelle notizie
> pubblicate sul sito web dell'Ateneo nella sezione "Privacy".
>
> Il contenuto di questo messaggio è rivolto unicamente alle persona cui
> è indirizzato e può contenere informazioni la cui riservatezza è
> tutelata legalmente. Ne sono vietati la riproduzione, la diffusione e l'uso
> in mancanza di autorizzazione del destinatario. Qualora il messaggio
> fosse pervenuto per errore, preghiamo di eliminarlo.
>