flume-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ODa (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (FLUME-3340) Error with HDFS SINK kerberized
Date Thu, 11 Jul 2019 08:39:00 GMT

     [ https://issues.apache.org/jira/browse/FLUME-3340?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

ODa updated FLUME-3340:
-----------------------
    Description: 
Hello,

I'm a big data administrator with cloudera distribution.

We use Flume to collect external data and we push it in hdfs (hadoop).

We have a issue with hdfs sink with theses messages :

 

2019-07-11 09:22:12,147 INFO org.apache.flume.sink.hdfs.BucketWriter: Creating hdfs:/app/2019-07-11/apps_flume01_2019-07-11.1562829732122.json.tmp

2019-07-11 09:22:12,441 ERROR org.apache.flume.sink.hdfs.HDFSEventSink: process failed
java.lang.NullPointerException
 at org.apache.kafka.common.security.plain.PlainSaslServer$PlainSaslServerFactory.getMechanismNames(PlainSaslServer.java:165)
 at org.apache.hadoop.security.SaslRpcServer$FastSaslServerFactory.<init>(SaslRpcServer.java:381)
 at org.apache.hadoop.security.SaslRpcServer.init(SaslRpcServer.java:186)
 at org.apache.hadoop.ipc.RPC.getProtocolProxy(RPC.java:570)
 at org.apache.hadoop.hdfs.NameNodeProxies.createNNProxyWithClientProtocol(NameNodeProxies.java:418)
 at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:314)
 at org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider$DefaultProxyFactory.createProxy(ConfiguredFailoverProxyProvider.java:68)
 at org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider.getProxy(ConfiguredFailoverProxyProvider.java:152)
 at org.apache.hadoop.io.retry.RetryInvocationHandler.<init>(RetryInvocationHandler.java:75)
 at org.apache.hadoop.io.retry.RetryInvocationHandler.<init>(RetryInvocationHandler.java:66)
 at org.apache.hadoop.io.retry.RetryProxy.create(RetryProxy.java:58)
 at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:181)
 at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:763)
 at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:694)
 at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:159)
 at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2816)
 at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:98)
 at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2853)
 at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2835)
 at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:387)
 at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:378)
 at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
 at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:260)
 at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:252)
 at org.apache.flume.sink.hdfs.BucketWriter$9$1.run(BucketWriter.java:701)
 at java.security.AccessController.doPrivileged(Native Method)
 at javax.security.auth.Subject.doAs(Subject.java:422)
 at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1924)
 at org.apache.flume.auth.UGIExecutor.execute(UGIExecutor.java:46)
 at org.apache.flume.auth.KerberosAuthenticator.execute(KerberosAuthenticator.java:64)
 at org.apache.flume.sink.hdfs.BucketWriter$9.call(BucketWriter.java:698)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)
2019-07-11 09:22:12,443 ERROR org.apache.flume.SinkRunner: Unable to deliver event. Exception
follows.
org.apache.flume.EventDeliveryException: java.lang.NullPointerException
 at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:451)
 at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
 at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
 at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
 at org.apache.kafka.common.security.plain.PlainSaslServer$PlainSaslServerFactory.getMechanismNames(PlainSaslServer.java:165)
 at org.apache.hadoop.security.SaslRpcServer$FastSaslServerFactory.<init>(SaslRpcServer.java:381)
 at org.apache.hadoop.security.SaslRpcServer.init(SaslRpcServer.java:186)
 at org.apache.hadoop.ipc.RPC.getProtocolProxy(RPC.java:570)
 at org.apache.hadoop.hdfs.NameNodeProxies.createNNProxyWithClientProtocol(NameNodeProxies.java:418)
 at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:314)
 at org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider$DefaultProxyFactory.createProxy(ConfiguredFailoverProxyProvider.java:68)
 at org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider.getProxy(ConfiguredFailoverProxyProvider.java:152)
 at org.apache.hadoop.io.retry.RetryInvocationHandler.<init>(RetryInvocationHandler.java:75)
 at org.apache.hadoop.io.retry.RetryInvocationHandler.<init>(RetryInvocationHandler.java:66)
 at org.apache.hadoop.io.retry.RetryProxy.create(RetryProxy.java:58)
 at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:181)
 at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:763)
 at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:694)
 at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:159)
 at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2816)
 at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:98)
 at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2853)
 at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2835)
 at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:387)
 at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:378)
 at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
 at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:260)
 at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:252)
 at org.apache.flume.sink.hdfs.BucketWriter$9$1.run(BucketWriter.java:701)
 at java.security.AccessController.doPrivileged(Native Method)
 at javax.security.auth.Subject.doAs(Subject.java:422)
 at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1924)
 at org.apache.flume.auth.UGIExecutor.execute(UGIExecutor.java:46)
 at org.apache.flume.auth.KerberosAuthenticator.execute(KerberosAuthenticator.java:64)
 at org.apache.flume.sink.hdfs.BucketWriter$9.call(BucketWriter.java:698)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 ... 1 more

 

 

The source kafka (confluent community) is securiezed with SASL_PLAINTEXT

 

The flume configuration is :(

 

agent.sources = apps
agent.channels = appsChannel
agent.sinks = appsSink

###
### Source definition
###
agent.sources.apps.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.apps.kafka.bootstrap.servers =kafka1:9092,kafka2:9092,kafka3:9092

agent.sources.apps.kafka.topics = mytopic01

agent.sources.apps.kafka.consumer.client.id=ClientIDapps
agent.sources.apps.kafka.consumer.group.id=GroupIDapps

agent.sources.apps.channels = appsChannel
agent.sources.apps.batchSize=500

agent.sources.apps.interceptors = i1 hostint
agent.sources.apps.interceptors.i1.type = timestamp
agent.sources.apps.interceptors.hostint.type = org.apache.flume.interceptor.HostInterceptor$Builder
agent.sources.apps.interceptors.hostint.preserveExisting = true
agent.sources.apps.interceptors.hostint.useIP = false

agent.sources.apps.kafka.consumer.security.protocol=SASL_PLAINTEXT
agent.sources.apps.kafka.consumer.sasl.mechanism=PLAIN
agent.sources.apps.kafka.consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule
required username=username password=password ;
###
### Channel definition
###

agent.channels.appsChannel.type = memory
agent.channels.appsChannel.capacity = 500000
agent.channels.appsChannel.transactionCapacity = 1000

 

###

### Sink definition

agent.sinks.appsSink.type = hdfs
agent.sinks.appsSink.hdfs.kerberosPrincipal = $KERBEROS_PRINCIPAL
agent.sinks.appsSink.hdfs.kerberosKeytab = $KERBEROS_KEYTAB
agent.sinks.appsSink.maxOpenFiles = 100
agent.sinks.appsSink.hdfs.path = hdfs:/apps/%Y-%m-%d
agent.sinks.appsSink.hdfs.filePrefix=apps_%\{host}_%Y-%m-%d
agent.sinks.appsSink.hdfs.fileSuffix=.json
agent.sinks.appsSink.hdfs.rollInterval=60
agent.sinks.appsSink.hdfs.rollSize=0
agent.sinks.appsSink.hdfs.rollCount=100000
agent.sinks.appsSink.hdfs.idleTimeout=60
agent.sinks.appsSink.hdfs.callTimeout=60000
agent.sinks.appsSink.hdfs.batchSize=1000
agent.sinks.appsSink.hdfs.fileType=DataStream
agent.sinks.appsSink.hdfs.writeFormat=Writable
agent.sinks.appsSink.hdfs.useLocalTimeStamp=false
agent.sinks.appsSink.hdfs.serializer=TEXT
agent.sinks.appsSink.channel = appsChannel

Best Regards,
ODa

  was:
Hello,

I'm a big data administrator with cloudera distribution.

We use Flume to collect external data and we push it in hdfs (hadoop).

We have a issue with hdfs sink with theses messages :

 

2019-07-11 09:22:12,147 INFO org.apache.flume.sink.hdfs.BucketWriter: Creating hdfs:/app/2019-07-11/apps_flume01_2019-07-11.1562829732122.json.tmp

2019-07-11 09:22:12,441 ERROR org.apache.flume.sink.hdfs.HDFSEventSink: process failed
java.lang.NullPointerException
 at org.apache.kafka.common.security.plain.PlainSaslServer$PlainSaslServerFactory.getMechanismNames(PlainSaslServer.java:165)
 at org.apache.hadoop.security.SaslRpcServer$FastSaslServerFactory.<init>(SaslRpcServer.java:381)
 at org.apache.hadoop.security.SaslRpcServer.init(SaslRpcServer.java:186)
 at org.apache.hadoop.ipc.RPC.getProtocolProxy(RPC.java:570)
 at org.apache.hadoop.hdfs.NameNodeProxies.createNNProxyWithClientProtocol(NameNodeProxies.java:418)
 at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:314)
 at org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider$DefaultProxyFactory.createProxy(ConfiguredFailoverProxyProvider.java:68)
 at org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider.getProxy(ConfiguredFailoverProxyProvider.java:152)
 at org.apache.hadoop.io.retry.RetryInvocationHandler.<init>(RetryInvocationHandler.java:75)
 at org.apache.hadoop.io.retry.RetryInvocationHandler.<init>(RetryInvocationHandler.java:66)
 at org.apache.hadoop.io.retry.RetryProxy.create(RetryProxy.java:58)
 at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:181)
 at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:763)
 at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:694)
 at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:159)
 at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2816)
 at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:98)
 at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2853)
 at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2835)
 at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:387)
 at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:378)
 at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
 at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:260)
 at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:252)
 at org.apache.flume.sink.hdfs.BucketWriter$9$1.run(BucketWriter.java:701)
 at java.security.AccessController.doPrivileged(Native Method)
 at javax.security.auth.Subject.doAs(Subject.java:422)
 at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1924)
 at org.apache.flume.auth.UGIExecutor.execute(UGIExecutor.java:46)
 at org.apache.flume.auth.KerberosAuthenticator.execute(KerberosAuthenticator.java:64)
 at org.apache.flume.sink.hdfs.BucketWriter$9.call(BucketWriter.java:698)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)
2019-07-11 09:22:12,443 ERROR org.apache.flume.SinkRunner: Unable to deliver event. Exception
follows.
org.apache.flume.EventDeliveryException: java.lang.NullPointerException
 at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:451)
 at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
 at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
 at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
 at org.apache.kafka.common.security.plain.PlainSaslServer$PlainSaslServerFactory.getMechanismNames(PlainSaslServer.java:165)
 at org.apache.hadoop.security.SaslRpcServer$FastSaslServerFactory.<init>(SaslRpcServer.java:381)
 at org.apache.hadoop.security.SaslRpcServer.init(SaslRpcServer.java:186)
 at org.apache.hadoop.ipc.RPC.getProtocolProxy(RPC.java:570)
 at org.apache.hadoop.hdfs.NameNodeProxies.createNNProxyWithClientProtocol(NameNodeProxies.java:418)
 at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:314)
 at org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider$DefaultProxyFactory.createProxy(ConfiguredFailoverProxyProvider.java:68)
 at org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider.getProxy(ConfiguredFailoverProxyProvider.java:152)
 at org.apache.hadoop.io.retry.RetryInvocationHandler.<init>(RetryInvocationHandler.java:75)
 at org.apache.hadoop.io.retry.RetryInvocationHandler.<init>(RetryInvocationHandler.java:66)
 at org.apache.hadoop.io.retry.RetryProxy.create(RetryProxy.java:58)
 at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:181)
 at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:763)
 at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:694)
 at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:159)
 at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2816)
 at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:98)
 at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2853)
 at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2835)
 at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:387)
 at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:378)
 at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
 at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:260)
 at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:252)
 at org.apache.flume.sink.hdfs.BucketWriter$9$1.run(BucketWriter.java:701)
 at java.security.AccessController.doPrivileged(Native Method)
 at javax.security.auth.Subject.doAs(Subject.java:422)
 at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1924)
 at org.apache.flume.auth.UGIExecutor.execute(UGIExecutor.java:46)
 at org.apache.flume.auth.KerberosAuthenticator.execute(KerberosAuthenticator.java:64)
 at org.apache.flume.sink.hdfs.BucketWriter$9.call(BucketWriter.java:698)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 ... 1 more

 

 

The source kafka (confluent community) is securiezed with SASL_PLAINTEXT

 

The flume configuration is :(

 

agent.sources = apps
agent.channels = appsChannel
agent.sinks = appsSink

###
### Source definition
###
agent.sources.apps.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.apps.kafka.bootstrap.servers =kafka1:9092,kafka2:9092,kafka3:9092

agent.sources.apps.kafka.topics = mytopic01

agent.sources.apps.kafka.consumer.client.id=ClientIDapps
agent.sources.apps.kafka.consumer.group.id=GroupIDapps

agent.sources.apps.channels = appsChannel
agent.sources.apps.batchSize=500

agent.sources.apps.interceptors = i1 hostint
agent.sources.apps.interceptors.i1.type = timestamp
agent.sources.apps.interceptors.hostint.type = org.apache.flume.interceptor.HostInterceptor$Builder
agent.sources.apps.interceptors.hostint.preserveExisting = true
agent.sources.apps.interceptors.hostint.useIP = false

agent.sources.apps.kafka.consumer.security.protocol=SASL_PLAINTEXT
agent.sources.apps.kafka.consumer.sasl.mechanism=PLAIN
agent.sources.apps.kafka.consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule
required username=username password=password ;
###
### Channel definition
###

agent.channels.appsChannel.type = memory
agent.channels.appsChannel.capacity = 500000
agent.channels.appsChannel.transactionCapacity = 1000

 

###

### Sink definition

agent.sinks.appsSink.type = hdfs
agent.sinks.appsSink.hdfs.kerberosPrincipal = $KERBEROS_PRINCIPAL
agent.sinks.appsSink.hdfs.kerberosKeytab = $KERBEROS_KEYTAB
agent.sinks.appsSink.maxOpenFiles = 100
agent.sinks.appsSink.hdfs.path = hdfs:/apps/%Y-%m-%d
agent.sinks.appsSink.hdfs.filePrefix=apps_%\{host}_%Y-%m-%d
agent.sinks.appsSink.hdfs.fileSuffix=.json
agent.sinks.appsSink.hdfs.rollInterval=60
agent.sinks.appsSink.hdfs.rollSize=0
agent.sinks.appsSink.hdfs.rollCount=100000
agent.sinks.appsSink.hdfs.idleTimeout=60
agent.sinks.appsSink.hdfs.callTimeout=60000
agent.sinks.appsSink.hdfs.batchSize=1000
agent.sinks.appsSink.hdfs.fileType=DataStream
agent.sinks.appsSink.hdfs.writeFormat=Writable
agent.sinks.appsSink.hdfs.useLocalTimeStamp=false
agent.sinks.appsSink.hdfs.serializer=TEXT
agent.sinks.appsSink.channel = appsChannel


> Error with HDFS SINK kerberized
> -------------------------------
>
>                 Key: FLUME-3340
>                 URL: https://issues.apache.org/jira/browse/FLUME-3340
>             Project: Flume
>          Issue Type: Choose from below ...
>          Components: Configuration
>    Affects Versions: 1.6.0
>         Environment: Flume 1.6 (embedded release in CDH 15.16.1)
> kafka 2.11-2.1.1cp1-1 (conluent community)
>    3 securized brokers witch SASL_PLAINTEXT (mechanism PLAIN)
> Hadoop 2.6.0 (CDH 15.6.1) kerberized
> RHEL 6.9
>            Reporter: ODa
>            Priority: Blocker
>
> Hello,
> I'm a big data administrator with cloudera distribution.
> We use Flume to collect external data and we push it in hdfs (hadoop).
> We have a issue with hdfs sink with theses messages :
>  
> 2019-07-11 09:22:12,147 INFO org.apache.flume.sink.hdfs.BucketWriter: Creating hdfs:/app/2019-07-11/apps_flume01_2019-07-11.1562829732122.json.tmp
> 2019-07-11 09:22:12,441 ERROR org.apache.flume.sink.hdfs.HDFSEventSink: process failed
> java.lang.NullPointerException
>  at org.apache.kafka.common.security.plain.PlainSaslServer$PlainSaslServerFactory.getMechanismNames(PlainSaslServer.java:165)
>  at org.apache.hadoop.security.SaslRpcServer$FastSaslServerFactory.<init>(SaslRpcServer.java:381)
>  at org.apache.hadoop.security.SaslRpcServer.init(SaslRpcServer.java:186)
>  at org.apache.hadoop.ipc.RPC.getProtocolProxy(RPC.java:570)
>  at org.apache.hadoop.hdfs.NameNodeProxies.createNNProxyWithClientProtocol(NameNodeProxies.java:418)
>  at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:314)
>  at org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider$DefaultProxyFactory.createProxy(ConfiguredFailoverProxyProvider.java:68)
>  at org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider.getProxy(ConfiguredFailoverProxyProvider.java:152)
>  at org.apache.hadoop.io.retry.RetryInvocationHandler.<init>(RetryInvocationHandler.java:75)
>  at org.apache.hadoop.io.retry.RetryInvocationHandler.<init>(RetryInvocationHandler.java:66)
>  at org.apache.hadoop.io.retry.RetryProxy.create(RetryProxy.java:58)
>  at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:181)
>  at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:763)
>  at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:694)
>  at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:159)
>  at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2816)
>  at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:98)
>  at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2853)
>  at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2835)
>  at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:387)
>  at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:378)
>  at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
>  at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:260)
>  at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:252)
>  at org.apache.flume.sink.hdfs.BucketWriter$9$1.run(BucketWriter.java:701)
>  at java.security.AccessController.doPrivileged(Native Method)
>  at javax.security.auth.Subject.doAs(Subject.java:422)
>  at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1924)
>  at org.apache.flume.auth.UGIExecutor.execute(UGIExecutor.java:46)
>  at org.apache.flume.auth.KerberosAuthenticator.execute(KerberosAuthenticator.java:64)
>  at org.apache.flume.sink.hdfs.BucketWriter$9.call(BucketWriter.java:698)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
> 2019-07-11 09:22:12,443 ERROR org.apache.flume.SinkRunner: Unable to deliver event. Exception
follows.
> org.apache.flume.EventDeliveryException: java.lang.NullPointerException
>  at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:451)
>  at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
>  at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
>  at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NullPointerException
>  at org.apache.kafka.common.security.plain.PlainSaslServer$PlainSaslServerFactory.getMechanismNames(PlainSaslServer.java:165)
>  at org.apache.hadoop.security.SaslRpcServer$FastSaslServerFactory.<init>(SaslRpcServer.java:381)
>  at org.apache.hadoop.security.SaslRpcServer.init(SaslRpcServer.java:186)
>  at org.apache.hadoop.ipc.RPC.getProtocolProxy(RPC.java:570)
>  at org.apache.hadoop.hdfs.NameNodeProxies.createNNProxyWithClientProtocol(NameNodeProxies.java:418)
>  at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:314)
>  at org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider$DefaultProxyFactory.createProxy(ConfiguredFailoverProxyProvider.java:68)
>  at org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider.getProxy(ConfiguredFailoverProxyProvider.java:152)
>  at org.apache.hadoop.io.retry.RetryInvocationHandler.<init>(RetryInvocationHandler.java:75)
>  at org.apache.hadoop.io.retry.RetryInvocationHandler.<init>(RetryInvocationHandler.java:66)
>  at org.apache.hadoop.io.retry.RetryProxy.create(RetryProxy.java:58)
>  at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:181)
>  at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:763)
>  at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:694)
>  at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:159)
>  at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2816)
>  at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:98)
>  at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2853)
>  at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2835)
>  at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:387)
>  at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:378)
>  at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
>  at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:260)
>  at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:252)
>  at org.apache.flume.sink.hdfs.BucketWriter$9$1.run(BucketWriter.java:701)
>  at java.security.AccessController.doPrivileged(Native Method)
>  at javax.security.auth.Subject.doAs(Subject.java:422)
>  at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1924)
>  at org.apache.flume.auth.UGIExecutor.execute(UGIExecutor.java:46)
>  at org.apache.flume.auth.KerberosAuthenticator.execute(KerberosAuthenticator.java:64)
>  at org.apache.flume.sink.hdfs.BucketWriter$9.call(BucketWriter.java:698)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  ... 1 more
>  
>  
> The source kafka (confluent community) is securiezed with SASL_PLAINTEXT
>  
> The flume configuration is :(
>  
> agent.sources = apps
> agent.channels = appsChannel
> agent.sinks = appsSink
> ###
> ### Source definition
> ###
> agent.sources.apps.type = org.apache.flume.source.kafka.KafkaSource
> agent.sources.apps.kafka.bootstrap.servers =kafka1:9092,kafka2:9092,kafka3:9092
> agent.sources.apps.kafka.topics = mytopic01
> agent.sources.apps.kafka.consumer.client.id=ClientIDapps
> agent.sources.apps.kafka.consumer.group.id=GroupIDapps
> agent.sources.apps.channels = appsChannel
> agent.sources.apps.batchSize=500
> agent.sources.apps.interceptors = i1 hostint
> agent.sources.apps.interceptors.i1.type = timestamp
> agent.sources.apps.interceptors.hostint.type = org.apache.flume.interceptor.HostInterceptor$Builder
> agent.sources.apps.interceptors.hostint.preserveExisting = true
> agent.sources.apps.interceptors.hostint.useIP = false
> agent.sources.apps.kafka.consumer.security.protocol=SASL_PLAINTEXT
> agent.sources.apps.kafka.consumer.sasl.mechanism=PLAIN
> agent.sources.apps.kafka.consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule
required username=username password=password ;
> ###
> ### Channel definition
> ###
> agent.channels.appsChannel.type = memory
> agent.channels.appsChannel.capacity = 500000
> agent.channels.appsChannel.transactionCapacity = 1000
>  
> ###
> ### Sink definition
> agent.sinks.appsSink.type = hdfs
> agent.sinks.appsSink.hdfs.kerberosPrincipal = $KERBEROS_PRINCIPAL
> agent.sinks.appsSink.hdfs.kerberosKeytab = $KERBEROS_KEYTAB
> agent.sinks.appsSink.maxOpenFiles = 100
> agent.sinks.appsSink.hdfs.path = hdfs:/apps/%Y-%m-%d
> agent.sinks.appsSink.hdfs.filePrefix=apps_%\{host}_%Y-%m-%d
> agent.sinks.appsSink.hdfs.fileSuffix=.json
> agent.sinks.appsSink.hdfs.rollInterval=60
> agent.sinks.appsSink.hdfs.rollSize=0
> agent.sinks.appsSink.hdfs.rollCount=100000
> agent.sinks.appsSink.hdfs.idleTimeout=60
> agent.sinks.appsSink.hdfs.callTimeout=60000
> agent.sinks.appsSink.hdfs.batchSize=1000
> agent.sinks.appsSink.hdfs.fileType=DataStream
> agent.sinks.appsSink.hdfs.writeFormat=Writable
> agent.sinks.appsSink.hdfs.useLocalTimeStamp=false
> agent.sinks.appsSink.hdfs.serializer=TEXT
> agent.sinks.appsSink.channel = appsChannel
> Best Regards,
> ODa



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@flume.apache.org
For additional commands, e-mail: issues-help@flume.apache.org


Mime
View raw message