samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Job-Selina Wu <swucaree...@gmail.com>
Subject Re: no new topic created on Kafka
Date Tue, 28 Jul 2015 19:58:03 GMT
Hi, Yan:

     Thanks for fixing the bug for me.

Sincerely,
Selina

On Tue, Jul 28, 2015 at 12:03 PM, Yan Fang <yanfang724@gmail.com> wrote:

>  task.class=samza.http.demo.task.HttpDemoParserStreamTask ...
>
> you are not using the StateStream class...
>
> Fang, Yan
> yanfang724@gmail.com
>
> On Tue, Jul 28, 2015 at 11:48 AM, Job-Selina Wu <swucareer99@gmail.com>
> wrote:
>
> > Hi, Yan
> >
> > I like to correct my previous comment, when I comment out
> > systems.kafka.streams.http-demo.samza.offset.default=oldest
> > systems.kafka.streams.http-demo.samza.reset.offset=true
> >
> > *the logger is not show at *at samza-container-0.log, but it make sense.
> >
> >
> > Sincerely,
> > Seina
> >
> > On Tue, Jul 28, 2015 at 11:30 AM, Job-Selina Wu <swucareer99@gmail.com>
> > wrote:
> >
> > > Hi, Yan:
> > >
> > >       Thanks a lot for your reply.
> > >      I tried to comment out
> > systems.kafka.http-demo.samza.offset.default=oldest
> > > and then I tried to comment out
> > > systems.kafka.streams.http-demo.samza.offset.default=oldest
> > > systems.kafka.streams.http-demo.samza.reset.offset=true
> > >
> > >  The result is same as before.  1. the checkoutpoint topic was created,
> > 2.
> > > the log created by Logger can be found at /samza-container-0.log. 3. no
> > > exception is at samza-container-0.log.
> > >
> > >    I guess something conflict between HttpDemoParserStreamTask and
> > > HttpDemoStatsStreamTask? Is any resource registered by
> > > HttpDemoParserStreamTask and then HttpDemoStatsStreamTask can not
> > recreate
> > > a topic?
> > >
> > > Sincerely,
> > > Selina
> > >
> > > On Tue, Jul 28, 2015 at 9:37 AM, Yan Fang <yanfang724@gmail.com>
> wrote:
> > >
> > >> Can you comment out
> > "systems.kafka.http-demo.samza.offset.default=oldest"
> > >> to see how it works? This seems not a correct property.
> > >>
> > >> Thanks,
> > >>
> > >> Fang, Yan
> > >> yanfang724@gmail.com
> > >>
> > >> On Mon, Jul 27, 2015 at 5:54 PM, Job-Selina Wu <swucareer99@gmail.com
> >
> > >> wrote:
> > >>
> > >> > Hi, Dear All:
> > >> >
> > >> >        I have two Tasks at Samza. HttpDemoParserStreamTask and
> > >> > HttpDemoStatsStreamTask. They are almost same, except the output
> topic
> > >> name
> > >> > is different and the task name are different at properties file. I
> am
> > >> > wondering how should I debug on it?
> > >> >
> > >> >    More details are list below.
> > >> >
> > >> >    All your help is highly appreciated.
> > >> >
> > >> > Sincerely,
> > >> > Selina
> > >> >
> > >> >     Currently HttpDemoParserStreamTask run well.
> > >> >     However HttpDemoStatsStreamTask can generate the log correctly
> > >> withouot
> > >> > Exception at
> > >> >
> > >> >
> > >>
> >
> deploy/yarn/logs/userlogs/application_1438043584310_0001/container_1438043584310_0001_01_000002/
> > >> > samza-container-0.log
> > >> >
> > >> > The last record as below is right, however there is no topic "
> > >> > demo-stats-temp" was created.
> > >> > --------------------------------------
> > >> >
> > >> > 2015-07-27 17:34:48 HttpDemoParserStreamTask [INFO]
> > >> > key=CAESEAbQ1pC2TBvb-4SLDjMqsZ8: message={"timestamp":"2015-07-27
> > >> >
> > >> >
> > >>
> >
> 14:30:02:987","date":"06-21-2015","id":"CAESEAbQ1pC2TBvb-4SLDjMqsZ8","ip":"22.231.113.69","browser":"Chrome","postalCode":"95131","url":"
> > >> >
> > >>
> >
> http://somthing.sample2.com/whatever?someinfo\u003dwekio2icicicnenneniadidi
> > >> >
> > >>
> >
> ","language":"ENG","mobileBrand":"Samsung","carrierName":"Tmobile","deviceName":"Samsung
> > >> > Galaxy S6","operationSystem":"Android
> > >> >
> > >> >
> > >>
> >
> 5.0.2","screenSize":"5.1-inch","resolution":"1440p","campaignId":"65681290456292569","count":"5607"}
> > >> >
> > >> >
> > >> > -------------------The demo-stats.properties
> > >> > files-----------------------------
> > >> >
> > >> > # Job
> > >> > job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
> > >> > job.name=demo-stats-tmp
> > >>
> > >> >
> > >> >
> > >> >
> > >> >
> > >>
> >
> task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
> > >> >  task.checkpoint.system=kafka
> > >> >  # Normally, this would be 3, but we have only one broker.
> > >> >  task.checkpoint.replication.factor=1
> > >> >
> > >> >  # YARN
> > >> >
> > >> >
> > >>
> >
> yarn.package.path=file:///Users/selina/IdeaProjects/samza-Demo/target/hello-samza-0.9.1-dist.tar.gz
> > >> >
> > >> >  # Task
> > >> >  task.class=samza.http.demo.task.HttpDemoParserStreamTask
> > >> >  task.inputs=kafka.http-demo
> > >> >
> > >> >  # Serializers
> > >> >
> > >> >
> > >>
> >
> serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
> > >> >
> > >> >  # Kafka System
> > >> >
> > >> >
> > >>
> >
> systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
> > >> >  systems.kafka.samza.msg.serde=string
> > >> >
> > >> >  systems.kafka.samza.key.serde=string
> > >> >  systems.kafka.consumer.zookeeper.connect=localhost:2181/
> > >> >  systems.kafka.producer.bootstrap.servers=localhost:9092
> > >> >
> > >> >  #stream from begining
> > >> >  #systems.kafka.consumer.auto.offset.reset=smallest
> > >> > #http-demo from the oldest
> > >> >  systems.kafka.http-demo.samza.offset.default=oldest
> > >> > # all stream from the oldest
> > >> >  systems.kafka.streams.http-demo.samza.offset.default=oldest
> > >> >  systems.kafka.streams.http-demo.samza.reset.offset=true
> > >> >
> > >> >
> > >> >
> > >> > --------------------HttpDemoStatsStreamTask
> > >> > class----------------------------
> > >> >
> > >> > public class HttpDemoStatsStreamTask implements StreamTask  {
> > >> >
> > >> >     //output topic
> > >> >     private static final SystemStream OUTPUT_STREAM = new
> > >> > SystemStream("kafka", "demo-stats-temp");
> > >> >     Logger logger =
> > >> LoggerFactory.getLogger(HttpDemoStatsStreamTask.class);
> > >> >
> > >> >     @SuppressWarnings("unchecked")
> > >> >     @Override
> > >> >     public void process(IncomingMessageEnvelope envelope,
> > >> > MessageCollector collector, TaskCoordinator coordinator) throws
> > >> > Exception {
> > >> >
> > >> >
> > >> >         String key = (String) envelope.getKey();
> > >> >         String message = envelope.getMessage().toString();
> > >> >         logger.info("key=" + key + ": message=" + message);
> > >>
> > >> >
> > >> >         collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM,
> > >> > message));
> > >> >     }
> > >> > }
> > >> >
> > >> > -----Tail  of __samza_checkpoint_ver_1_for_demo-stats-tmp_1
> > >> > topic--------------
> > >> >
> > >> > {"Partition 0":0}
> > >> > {"SystemStreamPartition [kafka, http-demo,
> > >> >
> > >>
> > 0]":{"system":"kafka","partition":"0","offset":"0","stream":"http-demo"}}
> > >> > {"SystemStreamPartition [kafka, http-demo,
> > >> >
> > >> >
> > >>
> >
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> > >> > {"SystemStreamPartition [kafka, http-demo,
> > >> >
> > >> >
> > >>
> >
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> > >> > {"SystemStreamPartition [kafka, http-demo,
> > >> >
> > >> >
> > >>
> >
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> > >> > {"SystemStreamPartition [kafka, http-demo,
> > >> >
> > >> >
> > >>
> >
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> > >> > {"SystemStreamPartition [kafka, http-demo,
> > >> >
> > >> >
> > >>
> >
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> > >> > {"SystemStreamPartition [kafka, http-demo,
> > >> >
> > >> >
> > >>
> >
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> > >> > {"SystemStreamPartition [kafka, http-demo,
> > >> >
> > >> >
> > >>
> >
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> > >> > {"SystemStreamPartition [kafka, http-demo,
> > >> >
> > >> >
> > >>
> >
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> > >> > {"SystemStreamPartition [kafka, http-demo,
> > >> >
> > >> >
> > >>
> >
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> > >> > {"SystemStreamPartition [kafka, http-demo,
> > >> >
> > >> >
> > >>
> >
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> > >> > {"SystemStreamPartition [kafka, http-demo,
> > >> >
> > >> >
> > >>
> >
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> > >> > {"SystemStreamPartition [kafka, http-demo,
> > >> >
> > >> >
> > >>
> >
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> > >> > {"SystemStreamPartition [kafka, http-demo,
> > >> >
> > >> >
> > >>
> >
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> > >> >
> > >>
> > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message