Oracle
 sql >> Datenbank >  >> RDS >> Oracle

Warum Kafka jdbc connect Daten als BLOB anstelle von varchar einfügen

Sieht aus wie Kafkas String wird dem NCLOB von Oracle zugeordnet :

<table border="1">
<tr>
<th>Schema Type</th><th>MySQL</th><th>Oracle</th><th>PostgreSQL</th><th>SQLite</th>
</tr>
<tr>
<td>INT8</td><td>TINYINT</td><td>NUMBER(3,0)</td><td>SMALLINT</td><td>NUMERIC</td>
</tr>
<tr>
<td>INT16</td><td>SMALLINT</td><td>NUMBER(5,0)</td><td>SMALLINT</td><td>NUMERIC</td>
</tr>
<tr>
<td>INT32</td><td>INT</td><td>NUMBER(10,0)</td><td>INT</td><td>NUMERIC</td>
</tr>
<tr>
<td>INT64</td><td>BIGINT</td><td>NUMBER(19,0)</td><td>BIGINT</td><td>NUMERIC</td>
</tr>
<tr>
<td>FLOAT32</td><td>FLOAT</td><td>BINARY_FLOAT</td><td>REAL</td><td>REAL</td>
</tr>
<tr>
<td>FLOAT64</td><td>DOUBLE</td><td>BINARY_DOUBLE</td><td>DOUBLE PRECISION</td><td>REAL</td>
</tr>
<tr>
<td>BOOLEAN</td><td>TINYINT</td><td>NUMBER(1,0)</td><td>BOOLEAN</td><td>NUMERIC</td>
</tr>
<tr>
<td>STRING</td><td>VARCHAR(256)</td><td>NCLOB</td><td>TEXT</td><td>TEXT</td>
</tr>
<tr>
<td>BYTES</td><td>VARBINARY(1024)</td><td>BLOB</td><td>BYTEA</td><td>BLOB</td>
</tr>
<tr>
<td>'Decimal'</td><td>DECIMAL(65,s)</td><td>NUMBER(*,s)</td><td>DECIMAL</td><td>NUMERIC</td>
</tr>
<tr>
<td>'Date'</td><td>DATE</td><td>DATE</td><td>DATE</td><td>NUMERIC</td>
</tr>
<tr>
<td>'Time'</td><td>TIME(3)</td><td>DATE</td><td>TIME</td><td>NUMERIC</td>
</tr>
<tr>
<td>'Timestamp'</td><td>TIMESTAMP(3)</td><td>TIMESTAMP</td><td>TIMESTAMP</td><td>NUMERIC</td>
</tr>
</table>

Quelle:https://www.ibm.com/support/knowledgecenter/en/SSPT3X_4.2.5/com.ibm.swg.im.infosphere.biginsights.admin.doc/doc/admin_kafka_jdbc_sink.html

https://docs.confluent.io/current/connect /connect-jdbc/docs/sink_connector.html

AKTUALISIEREN

OracleDialect Klasse (https://github.com/confluentinc/kafka-connect-jdbc/blob/master/src/main/java/io/confluent/connect/jdbc/sink/dialect/OracleDialect.java ) hat CLOB fest codiert Wert und erweitern Sie ihn einfach mit Ihrer eigenen Klasse und ändern Sie diese Zuordnung nicht, da der Dialekttyp in der statischen Methode in JdbcSinkTask definiert ist (https://github.com/confluentinc/kafka-connect-jdbc/blob/master/src/main/java/io/confluent/connect/jdbc/sink/JdbcSinkTask.java )

final DbDialect dbDialect = DbDialect.fromConnectionString(config.connectionUrl);