-
Notifications
You must be signed in to change notification settings - Fork 332
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SAMZA-2041: add hdfs and kinesis descriptor #857
Conversation
@xinyuiscool @atoomula can you take a look? |
@dxichen Can you take a look as well? |
public Map<String, String> toConfig() { | ||
final Map<String, String> config = new HashMap<>(super.toConfig()); | ||
|
||
final String systemName = getSystemName(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: The rest of the code-base does not use final
for local variables since it's a bit boiler-platey.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make sense
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vjagadish1989 : using final is good habit and should be encouraged in samza code base. I don't think your suggestion makes sense here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Style and good practice discussion will never end. We should talk more about these in team meeting and agree upon things together. I will just go for consistency for now
* @param valueSerde serde the values in the messages in the stream | ||
* @param systemDescriptor system descriptor this stream descriptor was obtained from | ||
*/ | ||
KinesisInputDescriptor(String streamId, Serde valueSerde, SystemDescriptor systemDescriptor) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should you make the serde typed for KinesisInputDescriptor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
* | ||
*/ | ||
public class HdfsInputDescriptor | ||
extends InputDescriptor<Object, HdfsInputDescriptor> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any chance we can expose a "typed" API instead of InputDescriptor<Object>?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately, HDFS system is not able to leverage the serde concept in samza. Essentially the data type is determined by file type itself (csv, avro, orc, etc.). Users today need to config different reader to read different types of files. We don't just get bytes and deserialize them.
It may be possible to support a "typed" API after more thoughts and refactor. But it would be beyond the scope of this change
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the patch, a couple minor things to think about before submitting.
* @return this input descriptor | ||
*/ | ||
public KinesisInputDescriptor<StreamMessageType> withRegion(String region) { | ||
this.region = Optional.of(region); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: For defensive programming, I would prefer String.stripToNull(region)
; Please update for similar invocations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
.withRegion("STREAM-REGION") | ||
.withAccessKey("YOUR-ACCESS_KEY") | ||
.withSecretKey("YOUR-SECRET-KEY") | ||
.withKCLConfig(kclConfig); | ||
{% endhighlight %} | ||
|
||
As an example, the below configuration is equivalent to invoking `kclClient#WithTableName(myTable)` on the KCL instance. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a way that we may truncate these 2 code examples since they only differ by one line? IE addkclConfig.put("TableName", "myTable");
to the first sample.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not completely straightforward to do so. But I have tried to reduce the duplicate codes to make it less verbose.
private Optional<String> writeCompressionType = Optional.empty(); | ||
private Optional<String> writerClass = Optional.empty(); | ||
|
||
private Optional<Long> consumerBufferCapacity = Optional.empty(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would prefer if we add all the consumer/producer configs in the the input and output descriptor respectively instead of the system descriptor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is another unfortunate unresolved issue in HDFS. Where all the configs are per system not per stream. Once we move to per stream config then it definitely makes sense to place them respectively in input and output descriptor.
@dxichen @vjagadish1989 addressed or responded to all your feedback. let me know if you have more comments |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Author: Hai Lu <halu@linkedin.com> Reviewers: Xinyu Liu <xiliu@linkedin.com> Closes apache#857 from lhaiesp/master
No description provided.