Connectors > Confluent®
Overview
ASAPIO Integration Add-on can connect to Confluent® or Apache Kafka® brokers, using the Confluent REST proxy.
The ASAPIO Connector is certified by Confluent (visit confluent.io for details).
Add-on/component name | Type |
ASAPIO Integration Add-on – Framework | Base component (required) |
ASAPIO Integration Add-on – Connector for Confluent®/Apache Kafka® | Additional package |
REST Proxy:
Confluent REST Proxy for Kafka is mandatory for the connectivity and subject to a separate license.Please see https://github.com/confluentinc/kafka-rest for more details.
Key features:
- Certified for Confluent, visit Confluent Hub for details
- Supports a wide range of SAP NetWeaver based systems, including SAP ERP, S/4HANA, BW, HCM and many more
- Out-of-the-box connectivity to Confluent® Platform, Confluent® Cloud and Kafka® (REST API v2 only)
- REST-based outbound communication via Confluent Kafka REST proxy (push) or direct with V3 API (see Release note please)
- Inbound interface via Confluent Kafka REST proxy (pull)
- Choose between event-driven (single events) or batch mode (multiple events, also multiple events per REST proxy call)
- Supported communication direction: Outbound, Inbound
- Batch mode allows multi-threading with multiple SAP work processes
Block Architectures for Confluent and Kafka:
Set-up connectivity
To establish connectivity with the Confluent® Kafka® platform, please proceed with the following activities and refer to the specific chapters in this documentation.
- Create RFC destinations to Confluent REST proxy in SAP system settings
- Set-up authentication against the Confluent REST proxy
- Set-up connection instance in ASAPIO Integration Add-on customizing
- Configure example outbound message for a simple example to test the connectivity
Create RFC destination
Create a new RFC destination of type “G” (HTTP Connection to External Server).
- Transaction: SM59
- Create new destination of type “G”
- Specify Target Host: Endpoint of Confluent REST Proxy.
- Save and click on “Connection Test”, which should result in HTTP status code 200.
Set-up authentication to REST proxy
Pre-requisites:
Please make sure you have either the user and password available for the REST proxy or you exchanged the certificates with the SAP system beforehand.
For further details on how to set-up the proxy, please visit Confluent documentation
Configure authentification
While creating the RFC connection to Confluent REST Proxy (see section above), please specifiy the authentification method.
- Transaction: SM59
- Choose the correct RFC destination
- Go to tab “Logon & Security”
- Select authentification method – Options:
- “Basic authentifcation”, with username and password
- SSL certificate-based authentication
Set-up basic settings
Activate BC-Set
Business Configuration sets (BC-Set) contain customizing and configuration-related table entries that are not imported with the add-on.
- Transaction: SCPR20
- BC-Set includes:
- Configuration for cloud adapter
- Configuration for cloud codepages
- Definition of IDoc segments
- Activate the BC-Set with default values: /ASADEV/ACI_BCSET_FRAMEWORK_KAFK
Configure cloud adapter
Add an entry for the connector to the list of cloud adapters:
- Transaction: SPRO
- Goto ASAPIO Cloud Integrator – Maintain Cloud Adapter.
- Add New Entry and specify:
- Cloud Type: name with which to reference this type of connector
ACI Handler Class: /ASADEV/CL_ACI_KAFKA_HANDLER
Set-up cloud codepages
Specify codepages used in the integration:
- Transaction: SPRO
- Goto ASAPIO Cloud Integrator – Maintain Cloud Codepages
- Add New Entry and specify the code pages to be used:
Set-up connection instance
Create the connection instance customizing that ties together the RFC destination created earlier and the cloud connector type:
- Transaction: SPRO
- Goto ASAPIO Cloud Integrator – Connection and Replication Object Customizing
- Or go directly to transaction: /ASADEV/68000202
- Add New Entry and specify:
- Field Instance: a name for this connection in
- Field RFC Dest. (Upload): the RFC destination create for the messaging endpoint
- Field ISO Code: the code page to use
- Field Cloud Type: KAFKA (or the name you chose when adding the connector)
Set-up Error Type Mapping
Create an entry in section Error Type Mapping and specify at least the following mapping:
Resp. Code | Message Type |
207 | Success |
Set-Up Connection Values
Maintain Default values for the connection to Confluent. Connections -> Default values
Default Attribute | Default Attribute Value |
KAFKA_ACCEPT | application/vnd.kafka.v2+json |
KAFKA_CALL_METHOD | POST |
KAFKA_CONTENT_TYPE | application/vnd.kafka.json.v2+json for JSON payloads without schema information
or application/vnd.kafka.jsonschema.v2+json for JSON payloads with schema information (configured in header attributes) |
Set-up outbound messaging
Create Message Type
Example: in the example below, we use the Sales Order (BUS2032) event. Please choose any other suitable example if required.
For each object to be sent via ACI you have to create a message type:
- Transaction: WE81
- Add New Entry and specify:
- Message Type: unique name for the integration
- Description: description of the purpose
Activate Message Type
The created message type has to be activated:
- Transaction: BD50
- Add New Entryand specify:
- Message Type: the created message type
- Active: tick the checkbox
Set-up additional settings in ‘Header Attributes’
Configure the topic to send the events to, the fields to be used for the key and the Ids of the key/value schemas:
- Go to section Header Attributes
- Add New Entry and specify:
Header Attribute | Header Attribute Value |
KAFKA_TOPIC | <topic name> e.g. sap_demo.sales_order |
KAFKA_KEY_FIELD | <fields for key> (separated by “;” if multiple), e.g. VBELN;AUART |
KAFKA_SCHEMA_ID | <id of value schema in SchemaRegistry>, e.g. 4711 |
KAFKA_KEY_SCHEMA_ID | <id of key schema in SchemaRegistry>, eg. 4712 |
Note: due to limitations in the REST proxy you always have to specify schemas for both key and value!
Set up ‘Business Object Event Linkage’
Link the configuration of the outbound object to a Business Object event:
- Go to section Header Attributes
- or use transaction SWE2
- Add New Entry and specify:
- Object Category: BO BOR Object Type
- Object Type: the Business Object Type sending the event
- Event: the event to react to
- Receiver Type: the message type of the outbound object (this is the link to the Addon configuration)
- Receiver Call: Function Module
- Receiver Function Module: /ASADEV/ACI_EVENTS_TRIGGER
- Linkage Activated: tick the checkbox
How to use Simple Notifications
Create Outbound Object configuration
- Transaction: SPRO
- Goto ASAPIO Cloud Integrator – Connection and Replication Object Customizing
- Or go directly to transaction: /ASADEV/68000202
- Select the created Connection
- Go to section Outbound Objects
- Add New Entry and specify:
- Object: name of the outbound configuration
- Extraction Func. Module: /ASADEV/ACI_GEN_NOTIFY_KAFKA
- Message Type: the created message type
- Load Type: Incremental Load
- Trace: activate for testing purposes
- Response Function: /ASADEV/ACI_KAFKA_RESP_HANDLER
Test the outbound event creation
In the example above, please pick any test sales order in transaction /nVA02 and force a change event, e.g. by changing the requested delivery date on header level.
For outbound messaging, you can use and even combine the following methods:
- Simple Notifications
- Message Builder (Generic View Generator)
- IDoc capturing
- Custom-built triggers and extractors
Pre-requisite for all methods is to create a message type, which will be used throughout the configuration process.
The following articles explain the individual possibilities.
How to use Message Builder (Generic View Extractor)
Create Outbound Object configuration
- Transaction: SPRO
- Goto ASAPIO Cloud Integrator – Connection and Replication Object Customizing
- Or go directly to transaction: /ASADEV/68000202
- Select the created Connection
- Go to section Outbound Objects
- Add New Entry and specify:
- Object: name of the outbound configuration
- Extraction Func. Module: /ASADEV/ACI_GEN_VIEW_EXTRACTOR
- Message Type: the created message type
- Load Type: Incremental Load
- Trace: activate for testing purposes
- Response Function: /ASADEV/ACI_KAFKA_RESP_HANDLER
- Formatting Function: /ASADEV/ACI_GEN_VIEWFRM_KAFKA
- Extraction View Name: create a Database View in transaction SE11 (Chapter 4.3.2)
Create database view
For the data events also configure the DB view that is used to define the extraction:
- Transaction: SE11 (for SAP ERP or S/4HANA on-prem deployments with SAP GUI access)
- Alternatively, you can use Eclipse with ABAP Development Tools, or the SAP Fiori App “Create Custom CDS Views” to create a database view if you have this app available in SAP S/4HANA.
Example: Sales Order view (e.g. to be used for Sales Order (BUS2032) change events)
Test the outbound event creation
In the example above, please pick any test sales order in transaction /nVA02 and force a change event, e.g. by changing the requested delivery date on header level.
Set-up Batch Job (Job processed messaging)
Prevent synchronous call for message type
Note
With the following settings change pointers will be set but not being sent directly.
- Transaction: SPRO
- Go to ASAPIO Cloud Integrator – Connection and Replication Object Customizing
- Or go directly to transaction: /ASADEV/68000202
- Click on Synchronous call for message type
- Or go directly to transaction: /ASADEV/ACI_SYNC
- Add New Entry and specify:
- Message Type: the created message type
- Sync. On: Clear checkbox (Change Pointers will be set and the event is not being sent)
Define Variant
- Transaction: n/ASADEV/ACI
- Select the Connection and hit enter
- Select Upload Type: I
- Select Replication Object
- Save Variant
Schedule background job
- Transaction: SM36
- Choose your start conditions
- ABAP program: /ASADEV/AMR_REPLICATOR
- Variant: the created and saved Variant
Test background job
- First create an Event
- Then run the job
- Check the ACI_Monitor in transaction /ASADEV/ACI_MONITOR: You will be able to see the Variant and the Trace. For troubleshooting, check the SLG1-Log.
Set-up Packed Load (split large data)
Create Outbound Object configuration
- Transaction: SPRO
- Goto ASAPIO Cloud Integrator – Connection and Replication Object Customizing
- Or go directly to transaction: /ASADEV/68000202
- Select the created Connection
- Go to section Outbound Objects
- Add New Entry and specify:
- Object: name of the outbound configuration
- Extraction Func. Module: /ASADEV/ACI_GEN_VIEW_EXTRACTOR
- Message Type: the created message type(optional)
- Load Type: Packed Load
- Trace: activate for testing purposes
- Response Function: /ASADEV/ACI_KAFKA_RESP_HANDLER
- Formatting Function: /ASADEV/ACI_GEN_VIEWFRM_KAFKA (depending on your use case)
- Extraction View Name: create a Database View in transaction SE11
Create database view
Note
Please also refer to chapter 4.3.2
For the data events also configure the DB view that is used to define the extraction:
- Transaction: SE11 (for SAP ERP or S/4HANA on-prem deployments with SAP GUI access)
- Alternatively, you can use Eclipse with ABAP Development Tools, or the SAP Fiori App “Create Custom CDS Views” to create a database view if you have this app available in SAP S/4HANA.
Example: Material master view
Set-up ‘Header Attributes’
- Go to section Header Attributes of the outbound object created previously
- Add New Entry and specify the header attributes and values
Header attribute | Header attribute value | Example |
---|---|---|
ACI_PACK_BDCP_COMMIT | Flag for changepointer creation. If set, changepointers will be generated for every entry. IF this flag is set, a messagetype has to be maintained in the outbound object. Caution: This may heavily impact performance. |
X |
ACI_PACK_TABLE | Name of the table to take the keyfields from. This is typically different then the db view specified in ‘ACI_VIEW‘ as we only want to build packages based on the header object and the db view typically contains sub-objects as well | MARA |
ACI_PACK_RETRY_TIME | Time in seconds. This is the duration in which the framework will attempt to get a new ressource from the servergroup | 60 |
ACI_PACK_WHERE_COND
(Optional) |
Condition that is applied to the table defined in ‘ACI_PACK_TABLE’ | Example: AEDAT GT ‘20220101’ |
ACI_PACK_SIZE | Number of entries to send | 500 |
ACI_PACK_KEY_LENGTH | Length of the key to use from the ACI_PACK_TABLE (e.g. MANDT + MATNR) | 13 |
KAFKA_KEY_FIELD | name of the Key Field | MATERIAL_NUMBER |
KAFKA_TOPIC | Topic name in your Confluent broker | Example.topic |
Execute the initial load
Warning
depending on the amount of data this can stress the SAP system servers immensely.
Please always consult with your basis team for the correct server group to use!
- Transaction: /ASADEV/ACI
- Select the Connection and hit enter
- Select Upload Type: P
- Select Replication Object
- Select a Servergroup (this is mandatory)
Standard Function Modules
Function Modules: Event Linkage
Function name | Description |
/ASADEV/ACI_EVENTS_TRIGGER | Customizable trigger for event processing |
Function Modules: Outbound
Mandatory Response Handler: /ASADEV/ACI_KAFKA_RESP_HANDLER (Kafka response handler method)
Extraction Func. Module | Formatting Function |
/ASADEV/ACI_GEN_NOTIFY_KAFKA
(Simple notification event) |
|
/ASADEV/ACI_GEN_VIEW_EXTRACTOR
(Dynamic data selection) |
/ASADEV/ACI_GEN_VIEWFRM_KAFKA
(Formatting function for DB-View extraction) |
Function Modules: Inbound
Function name | Description |
/ASADEV/ACI_SAMPLE_IDOC_JSON | Inbound JSON to generic IDoc |
/ASADEV/ACI_SAMPLE_IDOC_JSON2 | Inbound JSON to the new and improved generic IDoc |
/ASADEV/ACI_JSON_TO_IDOC | Converts special formatted JSON message to SAP Standard IDoc |
/ASADEV/ACI_IDOC_JSON_AS_XML | Obsolete – do use /ASADEV/ACI_SAMPLE_IDOC_JSON2 instead |
Set-up inbound messaging
The Asapio Integration Add-On is delivered with a possibility to pull data from the Confluent® Kafka® REST Proxy back into the SAP-System.
Configuration and customization of the inbound function modules is explained in this section.
With version 9.32405 (SP09) the data pull correctly supports the host header to support load balanced REST Proxy instances, e.g. one pull process always goes to the same REST Proxy instance.
Configure Inbound Object
Create Inbound Object configuration
- Transaction: SPRO
- Goto ASAPIO Cloud Integrator – Connection and Replication Object Customizing
- Or go directly to transaction: /ASADEV/68000202
- Select the created Connection
- Go to section Inbound Objects
- Add New Entry and specify:
- Object: name of the inbound configuration
- Func. Name: /ASADEV/ACI_SAMPLE_IDOC_JSON or any FM with a correct Interface (see above for options)
- Message Type: the created message type
- Trace: activate for testing purposes
Pull-Based Inbound offers following Header Attributes which can be maintained to receive new messages.
Connections → Inbound Objects → Header Attributes
Default Attribute | Example Value | Default |
KAFKA_CONTENT_TYPE mandatory |
For the call payload. This should always be application/vnd.kafka.json.v2+json for the calls to the polling APIs |
application/vnd.kafka.json.v2+json |
KAFKA_DOWNLOAD_TOPIC optional |
The topic to pull data from, e.g. example.topic | No default |
KAFKA_DOWNLOAD_ACCEPT optional |
Depending on the consumer format, e.g.: application/vnd.kafka.avro.v2+json | application/vnd.kafka.json.v2+json |
KAFKA_GROUPNAME
mandatory |
Name of the consumer group e.g. exampleconsumer | No default |
KAFKA_INSTANCE_NAME
mandatory |
Name of the consumer instance, e.g. example_matmas_consumer | No default |
KAFKA_MAX_BYTES optional |
Maximum number of bytes read in one fetch operation | 1000000 (i.e. 1 MB) |
KAFKA_TIMEOUT optional |
Maximum amount of milliseconds spend on fetching records | 1000 (i.e. 1 second) |
KAFKA_CONSUMER_FORMAT optional (new in 9.32405) |
The format of the consumed messages that is used to convert the messages into a JSON-compatible form. The REST proxy is doing the conversion and the SAP system always receives the message in JSON.
Valid values: “binary”, “avro”, “json”, “jsonschema” and “protobuf”. |
json |
KAFKA_CONSUMER_OFFSET_RESET optional (new in 9.32405) |
Where the consumer group starts if it is created new (has no impact if consumer group already exists). Valid values: “earliest”, “latest” | earliest |
KAFKA_CONSUMER_AUTO_COMMIT optional (new in 9.32405) |
Whether or not the consumer group commits the offsets automatically on fetch or if the ASAPIO add-on commits the offsets after handing the data off to the configured processing FM. Valid values: “true”, “false” |
false |
Execute Inbound message pull
- Transaction: /ASADEV/ACI
- Select the Connection and hit enter
- Select Replication Object
- Create Variant and Set Background Job
Inbound processing
We recommend to store the inbound data first and without any processing logic. That way the HTTP connection can be released quickly, and processing can take place asynchronously in parallel/afterwards.
The ASAPIO-specific generic IDoc type /ASADEV/ACI_GENERIC_IDOC can be used to store the message with payload.
Note
IDocs have the advantage that they can be processed multi-threaded afterwards.
Example interface for inbound processing function modules
If you don’t want to use the delivered function module that creates an IDoc, you can implement a custom function module, using the following interface. The payload is received in an importing parameter IT_CONTENT: