Post

2 followers Follow
0
Avatar

Streaming Platform

Hi,

 

I need to enable communication between GF and Kafka or RabbitMQ.

how to proceed? Many thanks in advance.

 

Regards

Wael HORCHANI

Please sign in to leave a comment.

10 comments

0
Avatar

Hello,

I think you intend to ingest streaming data via Kafka or RabbitMQ, (filter some of them according to your own logic optionally), and store them into GemFire.

In this case, you can simply call GemFire client's Java APIs in data ingestion application developed with Java API for Kafka or RabbitMQ.

Are there any conditions (such as needing to know comprehensive ways without programming and so on) for your actual use case?

Akihiro Kitada 0 votes
0
Avatar

Hi,

Thank you for reply.

In fact, my data is stored initially in GF cluster in two regions, I need to make complex join operation in real time between them and store the result in a third region.

While GF does not allow this kind of calculation, we tryed to send data into Storm via Kafka to perform the join and resend the final result tto GF.

  1. Passing the data from GF to Kafka is not working as an object. We can only pass a simple string.
  2. When Kafka is installed in distant machine (real case in production) it can't connect to GF

Have you any idea? Thnks.

Wael HORCHANI 0 votes
0
Avatar

Hello,

Thank you for your providing the additional information on what you want to do.

I'm not sure at which timing you want to trigger such kind of join operations but you may be able to realize your requirements by writing "join logic" in Function or AsyncEventLister in terms of using GemFire feature only rather than using other software components.  

Akihiro Kitada 0 votes
0
Avatar

Hi Akihiro,

Thanks for the proposal. In fact we are returned to this architecture (GF + Kafka + Storm) because we don't found anywhere an end-to-end sample of data-dependent GF function and how to call it from Java application client. Can you please share with us a little example that we can test quickly and build on it our use case?

Regards,

Wael HORCHANI 0 votes
0
Avatar

Hello Wael,

I'll prepare the simple example to call a Function from GemFire client, which join data from two different regions and put results into other region. Please wait for a moment.

The basic steps could be below.

  1. Call a Function from GemFire client application with arguments (ex. source regions name to join and destination region name)
  2. In the Function, get an existing Cache object by CacheFactory.getAnyInstance() API, execute own logic to join two source regions and put the result to destination region (ex. just join two values based on simple Strings from source regions which key is same, and put them into destination region)

Thanks. 

Akihiro Kitada 0 votes
0
Avatar

Hello Akihiro,

thank very much for your help. if you are busy, a simple example that query region data only will be enough and we will build the rest.

waiting for your feedback when u r done.

regards,

Wael HORCHANI 0 votes
0
Avatar

Hello Wael,

I uploaded my simple example to trigger Function to join values from two regions and store the result into the other region.

https://github.com/AkihiroKitada/GemFireFunctionToJoinRegions

Unfortunately, this is based on data-independent Function. It just execute the Function on one of the specific server nodes in the cluster.

You may need data-dependent Function for executing join for Partitioned region - i.e., executing join for each local primary buckets on each nodes. In such case, please let me know.

Akihiro Kitada 0 votes
0
Avatar

Hello Akihiro,

 

The code run successfully.

One change made, when sendind arguments to the called function, I must change "setArguments" by "withArgs". I run 9.0.4 version of GF.

Many thanks for your help.

Ragards,

Wael HORCHANI 0 votes
0
Avatar

Thank you for your update.

>I must change "setArguments" by "withArgs".

According to java docs, "withArgs" should be deprecated since GemFire 9.0. You may change back to "setArguments" in the future release of GemFire.

Anyhow, my code is not data-dependent. You may be able to improve my code like the following.

  • Execute function to process for local data in each members hosting Partitioned region in parallel in terms of data-dependent processing
  • Colocate regions to be joined
  • According to your use case, you can modify to get only key lists rather than whole data entries in the function code in terms of reducing temporary memory
Akihiro Kitada 0 votes