Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SAMZA-2041: add hdfs and kinesis descriptor #857

Closed
wants to merge 2 commits into from

Conversation

lhaiesp
Copy link
Contributor

@lhaiesp lhaiesp commented Dec 12, 2018

No description provided.

@lhaiesp
Copy link
Contributor Author

lhaiesp commented Dec 12, 2018

@xinyuiscool @atoomula can you take a look?

@prateekm
Copy link
Contributor

@dxichen Can you take a look as well?

public Map<String, String> toConfig() {
final Map<String, String> config = new HashMap<>(super.toConfig());

final String systemName = getSystemName();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: The rest of the code-base does not use final for local variables since it's a bit boiler-platey.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make sense

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vjagadish1989 : using final is good habit and should be encouraged in samza code base. I don't think your suggestion makes sense here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style and good practice discussion will never end. We should talk more about these in team meeting and agree upon things together. I will just go for consistency for now

* @param valueSerde serde the values in the messages in the stream
* @param systemDescriptor system descriptor this stream descriptor was obtained from
*/
KinesisInputDescriptor(String streamId, Serde valueSerde, SystemDescriptor systemDescriptor) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should you make the serde typed for KinesisInputDescriptor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

*
*/
public class HdfsInputDescriptor
extends InputDescriptor<Object, HdfsInputDescriptor> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any chance we can expose a "typed" API instead of InputDescriptor<Object>?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, HDFS system is not able to leverage the serde concept in samza. Essentially the data type is determined by file type itself (csv, avro, orc, etc.). Users today need to config different reader to read different types of files. We don't just get bytes and deserialize them.

It may be possible to support a "typed" API after more thoughts and refactor. But it would be beyond the scope of this change

Copy link
Member

@dxichen dxichen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the patch, a couple minor things to think about before submitting.

* @return this input descriptor
*/
public KinesisInputDescriptor<StreamMessageType> withRegion(String region) {
this.region = Optional.of(region);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: For defensive programming, I would prefer String.stripToNull(region); Please update for similar invocations.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

.withRegion("STREAM-REGION")
.withAccessKey("YOUR-ACCESS_KEY")
.withSecretKey("YOUR-SECRET-KEY")
.withKCLConfig(kclConfig);
{% endhighlight %}

As an example, the below configuration is equivalent to invoking `kclClient#WithTableName(myTable)` on the KCL instance.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way that we may truncate these 2 code examples since they only differ by one line? IE addkclConfig.put("TableName", "myTable"); to the first sample.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not completely straightforward to do so. But I have tried to reduce the duplicate codes to make it less verbose.

private Optional<String> writeCompressionType = Optional.empty();
private Optional<String> writerClass = Optional.empty();

private Optional<Long> consumerBufferCapacity = Optional.empty();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer if we add all the consumer/producer configs in the the input and output descriptor respectively instead of the system descriptor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is another unfortunate unresolved issue in HDFS. Where all the configs are per system not per stream. Once we move to per stream config then it definitely makes sense to place them respectively in input and output descriptor.

@lhaiesp
Copy link
Contributor Author

lhaiesp commented Dec 13, 2018

@dxichen @vjagadish1989 addressed or responded to all your feedback. let me know if you have more comments

Copy link
Member

@dxichen dxichen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Contributor

@xinyuiscool xinyuiscool left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@asfgit asfgit closed this in c5348bf Dec 14, 2018
shekhars-li pushed a commit to shekhars-li/samza that referenced this pull request May 28, 2021
Author: Hai Lu <halu@linkedin.com>

Reviewers: Xinyu Liu <xiliu@linkedin.com>

Closes apache#857 from lhaiesp/master
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
5 participants